Chinaunix首页 | 论坛 | 博客
  • 博客访问: 836671
  • 博文数量: 91
  • 博客积分: 2544
  • 博客等级: 少校
  • 技术积分: 1885
  • 用 户 组: 普通用户
  • 注册时间: 2006-12-12 09:08
文章存档

2016年(10)

2014年(2)

2013年(4)

2012年(23)

2011年(23)

2010年(13)

2009年(14)

2007年(2)

分类: LINUX

2016-05-12 10:32:05

自己编写的环形缓冲区模拟模型,数据来的时候会把老的数据冲掉。然而python下的多线程不是真正的多线程,可以把其中的threading改成进程(process),把一般的队列改成multiprocess.queue也可以实现同样的功能。
这个模型,对于要把任务分发到多个worker上去执行时,还是比较有用的。

多线程代码如下:

点击(此处)折叠或打开

  1. #!/usr/bin/env python
  2. #encoding: utf8
  3. #
  4. #

  5. import threading
  6. from time import ctime
  7. import time


  8. class MyThread(threading.Thread):
  9.     def __init__(self, func, args, name=''):
  10.         threading.Thread.__init__(self)
  11.         self.name = name
  12.         self.func = func
  13.         self.args = args

  14.     def run(self):
  15.         print 'create %s'%self.name
  16.         self.res = self.func(*self.args)



  17. class CQueue():
  18.     def __init__(self, len):
  19.         self.cque = [] #circle queue
  20.         self.ppos = 0 #put position
  21.         self.gpos = 0 #get position
  22.         self.clen = 0 #current position
  23.         self.tlen = len #total length

  24.         #init circle queue
  25.         for i in range(len):
  26.             self.cque.append(0)

  27.     def get_item(self, n):
  28.         return self.cque[n]

  29.     def set_item(self, n, v):
  30.         self.cque[n] = v

  31.     def is_empty(self):
  32.         return (self.clen == 0)

  33.     def is_full(self):
  34.         return (self.clen == self.tlen)

  35.     def printst(self):
  36.         print 'totalen=%d, clen=%d, ppos=%d, gpos=%d' % \
  37.             (self.tlen, self.clen, self.ppos, self.gpos)
  38.         print str(self.cque)



  39. class Worker():
  40.     def __init__(self, qlen):
  41.         self.cq = CQueue(qlen)
  42.         self.qlock = threading.Lock()
  43.         self.cv = threading.Condition(self.qlock)

  44.     def writer(self):
  45.         ''' writer: who writing item to queue. '''
  46.         while True:
  47.             self.cv.acquire()
  48.             print 'writer locked'
  49.             self.cq.set_item(self.cq.ppos, 1)
  50.             self.cq.ppos += 1
  51.             if self.cq.ppos == self.cq.tlen: #q is full, roll
  52.                 self.cq.ppos = 0
  53.             self.cq.clen += 1

  54.             self.cv.notify()
  55.             self.cq.printst()
  56.             self.cv.release()
  57.             time.sleep(1)


  58.     def reader(self):
  59.         ''' reader who get item from queue. '''
  60.         while True:
  61.             self.cv.acquire()
  62.             print 'read locked'
  63.             while self.cq.is_empty():
  64.                 self.cv.wait()
  65.             #read one
  66.             print 'read one : %d' % self.cq.get_item(self.cq.gpos)
  67.             self.cq.set_item(self.cq.gpos, 0)

  68.             self.cq.gpos += 1
  69.             if self.cq.gpos == self.cq.tlen: #roll
  70.                 self.cq.gpos = 0
  71.             self.cq.clen -= 1

  72.             #self.cq.printst()
  73.             self.cv.release()
  74.             time.sleep(1)


  75. def do_work():
  76.     ''' do my job '''
  77.     w = Worker(10)
  78.     w.cq.printst()
  79.     threads = []

  80.     #create writer
  81.     for i in range(1):
  82.         t = MyThread(w.writer, (), 'writer')
  83.         threads.append(t)

  84.     #create reader
  85.     for i in range(2):
  86.         t = MyThread(w.reader, (), 'reader')
  87.         threads.append(t)

  88.     nth = len(threads)
  89.     for i in range(nth):
  90.         threads.start()

  91.     for i in range(nth):
  92.         threads.join()

  93. #main
  94. if __name__ == '__main__':
  95.     do_work()


阅读(4162) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~