该框架是我前几天回复写的,其实蛮有代表性,就重新整理下发到博客。
多进程 v.s. 多线程。两者区别和教科书理论基本一致,下面列出两点实践中的重要差别:
1. OS差异。Win没有父子进程,新建进程开销远大于线程,Win线程运行很有效率;Linux/UNIX父子进程可以共享大量资源,新建进程开销略大于线程,运行效率差别不大,“线程共享进程地址空间能简化数据共享”可能是此处用线程的最大理由了。
2. 编程语言差异。OS系统调用都是C接口,系统提供何种进程/线程模型,C就可以使用何种模型;Python的进程模型和C进程模型一致,但Python线程模型和C线程模型有一个重要区别 - GIL(global intepreter lock, 即全局解释锁),GIL的存在使得Python字节码在同一个进程内只能串行执行,所以Python多线程是一个伪并行,只适合用在IO-bound任务中。
架构图:
源代码:
-
# -*- coding: utf-8 -*-
-
import multiprocessing as mp
-
import os
-
import random
-
from signal import signal, SIGINT, SIG_IGN, siginterrupt
-
import time
-
-
def data_source():
-
"""数据源。
-
-
随机选择一个浮点数,作为worker进程的sleep时间,
-
具体实践时可以将这部分实现改为读取数据库。
-
"""
-
dataset = [0.1, 0.2, 0.3, 0.4, 0.5]
-
while True:
-
time.sleep(0.2)
-
yield random.choice(dataset)
-
-
def proc_proxy(cntl_q, data_q, exit_flag):
-
"""从数据源读取数据。
-
-
先通过cntl_q通知主进程,
-
再将数据通过data_q发给worker。
-
"""
-
for item in data_source():
-
cntl_q.put({'event': 'data'})
-
data_q.put(item)
-
if exit_flag.is_set():
-
cntl_q.put({'event': 'exit', 'pid': os.getpid()})
-
break
-
-
-
def proc_worker(cntl_q, data_q):
-
"""处理数据。
-
-
从data_q获取数据,处理完毕后通过cntl_q通知主进程,
-
然后退出。
-
"""
-
item = data_q.get()
-
time.sleep(item)
-
cntl_q.put({'event': 'exit', 'pid': os.getpid()})
-
-
def main():
-
proc_pool = {} # 记录创建的所有子进程
-
cntl_q = mp.Queue() # 控制信息传递队列
-
data_q = mp.Queue() # 具体数据传递队列
-
exit_flag = mp.Event() # 退出标记,初始值为False
-
-
# 收到SIGINT,通知proxy停止读取数据
-
signal(SIGINT, lambda x, y: exit_flag.set())
-
siginterrupt(SIGINT, False)
-
-
# 启动proxy进程,后续按需启动woker进程
-
print 'main {} started'.format(os.getpid())
-
proc = mp.Process(target=proc_proxy, args=(cntl_q, data_q, exit_flag))
-
proc.start()
-
proc_pool[proc.pid] = proc
-
print 'proxy {} started'.format(proc.pid)
-
-
while True:
-
item = cntl_q.get()
-
if item['event'] == 'data':
-
proc = mp.Process(target=proc_worker, args=(cntl_q, data_q))
-
proc.start()
-
proc_pool[proc.pid] = proc
-
print 'worker {} started'.format(proc.pid)
-
elif item['event'] == 'exit':
-
proc = proc_pool.pop(item['pid'])
-
proc.join()
-
print 'child {} stopped'.format(item['pid'])
-
else:
-
print 'It\'s impossible !'
-
-
if not proc_pool: # 所有子进程均已退出
-
break
-
-
print 'main {} stopped'.format(os.getpid())
-
-
if __name__ == '__main__':
-
main()
备注:每个proxy进程请求创建一个woker进程,意味着外部数据源或proxy进程必须控制数据接入速度,否则会创建大量worker进程,对服务器造成巨大压力。
阅读(13128) | 评论(1) | 转发(1) |