Chinaunix首页 | 论坛 | 博客
  • 博客访问: 178850
  • 博文数量: 13
  • 博客积分: 10
  • 博客等级: 民兵
  • 技术积分: 832
  • 用 户 组: 普通用户
  • 注册时间: 2010-12-16 22:43
个人简介

最近关注Python,Linux,LLVM

文章分类

全部博文(13)

文章存档

2014年(13)

分类: Python/Ruby

2014-04-02 12:25:39

该框架是我前几天回复写的,其实蛮有代表性,就重新整理下发到博客。

多进程 v.s. 多线程。两者区别和教科书理论基本一致,下面列出两点实践中的重要差别:
1. OS差异。Win没有父子进程,新建进程开销远大于线程,Win线程运行很有效率;Linux/UNIX父子进程可以共享大量资源,新建进程开销略大于线程,运行效率差别不大,“线程共享进程地址空间能简化数据共享”可能是此处用线程的最大理由了。
2. 编程语言差异。OS系统调用都是C接口,系统提供何种进程/线程模型,C就可以使用何种模型;Python的进程模型和C进程模型一致,但Python线程模型和C线程模型有一个重要区别 - GIL(global intepreter lock, 即全局解释锁),GIL的存在使得Python字节码在同一个进程内只能串行执行,所以Python多线程是一个伪并行,只适合用在IO-bound任务中。 

架构图


源代码

点击(此处)折叠或打开

  1. # -*- coding: utf-8 -*-
  2. import multiprocessing as mp
  3. import os
  4. import random
  5. from signal import signal, SIGINT, SIG_IGN, siginterrupt
  6. import time

  7. def data_source():
  8.     """数据源。

  9.     随机选择一个浮点数,作为worker进程的sleep时间,
  10.     具体实践时可以将这部分实现改为读取数据库。
  11.     """
  12.     dataset = [0.1, 0.2, 0.3, 0.4, 0.5]
  13.     while True:
  14.         time.sleep(0.2)
  15.         yield random.choice(dataset)

  16. def proc_proxy(cntl_q, data_q, exit_flag):
  17.     """从数据源读取数据。

  18.     先通过cntl_q通知主进程,
  19.     再将数据通过data_q发给worker。
  20.     """
  21.     for item in data_source():
  22.         cntl_q.put({'event': 'data'})
  23.         data_q.put(item)
  24.         if exit_flag.is_set():
  25.             cntl_q.put({'event': 'exit', 'pid': os.getpid()})
  26.             break


  27. def proc_worker(cntl_q, data_q):
  28.     """处理数据。

  29.     从data_q获取数据,处理完毕后通过cntl_q通知主进程,
  30.     然后退出。
  31.     """
  32.     item = data_q.get()
  33.     time.sleep(item)
  34.     cntl_q.put({'event': 'exit', 'pid': os.getpid()})

  35. def main():
  36.     proc_pool = {} # 记录创建的所有子进程
  37.     cntl_q = mp.Queue() # 控制信息传递队列
  38.     data_q = mp.Queue() # 具体数据传递队列
  39.     exit_flag = mp.Event() # 退出标记,初始值为False

  40.     # 收到SIGINT,通知proxy停止读取数据
  41.     signal(SIGINT, lambda x, y: exit_flag.set())
  42.     siginterrupt(SIGINT, False)

  43.     # 启动proxy进程,后续按需启动woker进程
  44.     print 'main {} started'.format(os.getpid())
  45.     proc = mp.Process(target=proc_proxy, args=(cntl_q, data_q, exit_flag))
  46.     proc.start()
  47.     proc_pool[proc.pid] = proc
  48.     print 'proxy {} started'.format(proc.pid)

  49.     while True:
  50.         item = cntl_q.get()
  51.         if item['event'] == 'data':
  52.             proc = mp.Process(target=proc_worker, args=(cntl_q, data_q))
  53.             proc.start()
  54.             proc_pool[proc.pid] = proc
  55.             print 'worker {} started'.format(proc.pid)
  56.         elif item['event'] == 'exit':
  57.             proc = proc_pool.pop(item['pid'])
  58.             proc.join()
  59.             print 'child {} stopped'.format(item['pid'])
  60.         else:
  61.             print 'It\'s impossible !'

  62.         if not proc_pool: # 所有子进程均已退出
  63.             break

  64.     print 'main {} stopped'.format(os.getpid())

  65. if __name__ == '__main__':
  66.     main()
备注
每个proxy进程请求创建一个woker进程,意味着外部数据源或proxy进程必须控制数据接入速度,否则会创建大量worker进程,对服务器造成巨大压力。 

阅读(13128) | 评论(1) | 转发(1) |
给主人留下些什么吧!~~

reyleon2014-05-13 20:08:40

嘿嘿, 支持一下 ! 那帖子是我发的, 解决了我一个大问题呀.  目前一直在用,挺好.