需求:
1、周期性对一批机器执行某操作;
2、该操作执行所需时间不固定,有波动;
3、每次操作机器数量较多,需用多进程实现,但又不宜针对每个机器单独开启进程,否则进程太多;
由于每个周期的操作时间不固定,难免出现两个周期重叠在一起,所以每个周期的操作尽量保持独立,这样不会彼此影响。
基于以上考虑,程序的大体思路是:主进程周期性fork一个子进程,子进程使用multiprocessing创建多进程执行具体操作。代码如下:
-
#! /usr/bin/env python
-
# -*- coding: utf-8 -*-
-
#
-
-
import os
-
import sys
-
import time
-
import random
-
import multiprocessing
-
-
# 目标机器集合,少则几百,多则上千
-
hosts = [ '192.168.1.1',
-
'192.168.1.2',
-
'192.168.1.3',
-
'192.168.1.4',
-
'192.168.1.5',
-
'192.168.1.6',
-
'192.168.1.7',
-
'192.168.1.8',
-
'192.168.1.9',
-
'192.168.1.10',
-
]
-
-
# 针对每个机器的具体操作,这里没有进行任何处理
-
def handle(host):
-
print 'Handling %s' % host
-
time.sleep(random.randint(10, 15))
-
return host
-
-
# 循环从task_queue取一个待处理的机器进行处理,并将结果放入done_queue,直到遇到'STOP'
-
def grandchild(task_queue, done_queue):
-
for item in iter(task_queue.get, 'STOP'):
-
result = handle(item)
-
done_queue.put(result)
-
-
# 每个周期的操作在这个单独的子进程中进行
-
def child():
-
print 'Hi! This is child %d, my father is %d' %\
-
(os.getpid(), os.getppid())
-
-
task_queue = multiprocessing.Queue()
-
done_queue = multiprocessing.Queue()
-
-
# 将待处理的机器放入task_queue
-
for host in hosts:
-
print 'put %s in task_queue' % host
-
task_queue.put(host)
-
-
# 创建固定数量进程,并发处理
-
PN = 6
-
processes = []
-
for i in range(PN):
-
process = multiprocessing.Process(target=grandchild, args=(task_queue, done_queue))
-
process.start()
-
processes.append(process)
-
-
# 打印done_queue中的结果,这里可以针对结果在进行处理,如存数据库
-
print 'Unordered results:'
-
for i in range(len(hosts)):
-
print 'get %s from done_queue' % done_queue.get()
-
-
# 结束处理进程
-
for i in range(PN):
-
task_queue.put('STOP')
-
print 'Stopping process #%d' % i
-
-
# 避免处理进程成为僵尸
-
print 'joining.....'
-
for process in processes:
-
process.join()
-
-
sys.exit()
-
-
children = []
-
-
# 清理僵尸进程
-
def clear_defunct():
-
for child in children:
-
pid, status = os.waitpid(child, os.WNOHANG)
-
if pid:
-
children.remove(pid)
-
print 'clear defunct', pid
-
-
if __name__ == '__main__':
-
# 周期性创建子进程
-
while True:
-
pid = os.fork()
-
if pid < 0:
-
print 'fork error'
-
else:
-
if pid == 0:
-
child()
-
else:
-
children.append(pid)
-
print 'Hi! This is parent %d' % os.getpid()
-
-
clear_defunct()
-
-
time.sleep(30)
欢迎指正!
阅读(3507) | 评论(0) | 转发(1) |