Chinaunix首页 | 论坛 | 博客
  • 博客访问: 157280
  • 博文数量: 13
  • 博客积分: 45
  • 博客等级: 民兵
  • 技术积分: 871
  • 用 户 组: 普通用户
  • 注册时间: 2011-09-02 10:59
个人简介

Nobody.

文章分类

全部博文(13)

文章存档

2013年(13)

分类: Python/Ruby

2013-06-24 16:23:44

需求:
1、周期性对一批机器执行某操作;
2、该操作执行所需时间不固定,有波动;
3、每次操作机器数量较多,需用多进程实现,但又不宜针对每个机器单独开启进程,否则进程太多;

由于每个周期的操作时间不固定,难免出现两个周期重叠在一起,所以每个周期的操作尽量保持独立,这样不会彼此影响。
基于以上考虑,程序的大体思路是:主进程周期性fork一个子进程,子进程使用multiprocessing创建多进程执行具体操作。代码如下:

  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. #

  4. import os
  5. import sys
  6. import time
  7. import random
  8. import multiprocessing

  9. # 目标机器集合,少则几百,多则上千
  10. hosts = [ '192.168.1.1',
  11.           '192.168.1.2',
  12.           '192.168.1.3',
  13.           '192.168.1.4',
  14.           '192.168.1.5',
  15.           '192.168.1.6',
  16.           '192.168.1.7',
  17.           '192.168.1.8',
  18.           '192.168.1.9',
  19.           '192.168.1.10',
  20.          ]

  21. # 针对每个机器的具体操作,这里没有进行任何处理
  22. def handle(host):
  23.     print 'Handling %s' % host
  24.     time.sleep(random.randint(10, 15))
  25.     return host

  26. # 循环从task_queue取一个待处理的机器进行处理,并将结果放入done_queue,直到遇到'STOP'
  27. def grandchild(task_queue, done_queue):
  28.     for item in iter(task_queue.get, 'STOP'):
  29.         result = handle(item)
  30.         done_queue.put(result)

  31. # 每个周期的操作在这个单独的子进程中进行
  32. def child():
  33.     print 'Hi! This is child %d, my father is %d' %\
  34.                         (os.getpid(), os.getppid())

  35.     task_queue = multiprocessing.Queue()
  36.     done_queue = multiprocessing.Queue()

  37.     # 将待处理的机器放入task_queue
  38.     for host in hosts:
  39.         print 'put %s in task_queue' % host
  40.         task_queue.put(host)

  41.     # 创建固定数量进程,并发处理
  42.     PN = 6
  43.     processes = []
  44.     for i in range(PN):
  45.         process = multiprocessing.Process(target=grandchild, args=(task_queue, done_queue))
  46.         process.start()
  47.         processes.append(process)

  48.     # 打印done_queue中的结果,这里可以针对结果在进行处理,如存数据库
  49.     print 'Unordered results:'
  50.     for i in range(len(hosts)):
  51.         print 'get %s from done_queue' % done_queue.get()

  52.     # 结束处理进程
  53.     for i in range(PN):
  54.         task_queue.put('STOP')
  55.         print 'Stopping process #%d' % i

  56.     # 避免处理进程成为僵尸
  57.     print 'joining.....'
  58.     for process in processes:
  59.         process.join()
  60.     
  61.     sys.exit()

  62. children = []

  63. # 清理僵尸进程
  64. def clear_defunct():
  65.     for child in children:
  66.         pid, status = os.waitpid(child, os.WNOHANG)
  67.         if pid:
  68.             children.remove(pid)
  69.             print 'clear defunct', pid

  70. if __name__ == '__main__':
  71.     # 周期性创建子进程
  72.     while True:
  73.         pid = os.fork()
  74.         if pid < 0:
  75.             print 'fork error'
  76.         else:
  77.             if pid == 0:
  78.                 child()
  79.             else:
  80.                 children.append(pid)
  81.                 print 'Hi! This is parent %d' % os.getpid()

  82.         clear_defunct()

  83.         time.sleep(30)

欢迎指正!
阅读(3557) | 评论(0) | 转发(1) |
给主人留下些什么吧!~~