Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1125490
  • 博文数量: 170
  • 博客积分: 1603
  • 博客等级: 上尉
  • 技术积分: 1897
  • 用 户 组: 普通用户
  • 注册时间: 2010-07-09 15:54
文章分类

全部博文(170)

文章存档

2016年(27)

2015年(21)

2014年(27)

2013年(21)

2012年(7)

2011年(67)

我的朋友

分类: Python/Ruby

2013-06-19 19:36:06

这个应该是转载的

点击(此处)折叠或打开

  1. #! /usr/bin/python
  2. # -*- coding: UTF-8 -*-

  3. import sys,time,select,threading
  4. import struct,Queue,copy

  5. class TalkServer(object):
  6.     MinMassageLen = 400
  7.     MaxMassageLen = 1200
  8.     def __init__(self):
  9.         self.sock = None
  10.         self.bindEopll = None
  11.         self.clientEopll = None
  12.         self.connectionPoll = {}
  13.         self.clientLock = None
  14.         self.cond = None
  15.         self.dataQueue = None

  16.     def __start__(self):
  17.         self.bindEopll = select.epoll()
  18.         self.clientEopll = select.epoll()
  19.         self.bindEopll.register(self.sock.fileno(), select.EPOLLIN)
  20.         self.clientLock = threading.Lock()
  21.         self.cond = threading.Condition()
  22.         self.dataQueue = Queue.Queue()

  23.     def __bindDeamon(self):
  24.         while True:
  25.             events = self.bindEopll.poll(1)
  26.             for fileno, event in events:
  27.                 if fileno == self.sock.fileno():
  28.                     self.clientLock.acquire()
  29.                     newConnection, address = self.sock.accept()
  30.                     self.clientEopll.register(newConnection.fileno(), select.EPOLLIN)
  31. # 保存客户端socket文件符fileno及其对应的连接,同时初始化一个0字节的str,用于存放分包数据
  32.                     self.connectionPoll[newConnection.fileno()] = [newConnection,'']
  33.                     self.clientLock.release()
  34.                 else:
  35.                     self.clientLock.acquire()
  36.                     for clientFileno in self.connectionPoll.keys():
  37.                         self.clientEopll.unregister(clientFileno)
  38.                         self.connectionPoll[clientFileno][0].close()
  39.                     self.connectionPoll.clear()
  40.                     self.clientEopll.close()
  41.                     self.bindEopll.unregister(self.sock.fileno())
  42.                     self.bindEopll.close()
  43.                     self.sock.close()
  44.                     self.clientLock.release()

  45.     def __clientDeamon(self):
  46.         while True:
  47.             events = self.clientEopll.poll(0.01)
  48.             for fileno, event in events:
  49.                 if event & select.EPOLLIN:
  50.                     data = self.connectionPoll[fileno].recv(1024)
  51.                     if len(data) > 0:
  52.                         self.cond.acquire()
  53.                         self.__unpackDataToQueue(data, fileno)
  54.                         self.cond.release()
  55.                 else:
  56.                     self.clientLock.acquire()
  57.                     self.clientEopll.unregister(fileno)
  58.                     self.connectionPoll[fileno][0].close()
  59.                     del self.connectionPoll[fileno]
  60.                     self.clientLock.release()

  61.     def __unpackDataToQueue(self,data,sockFileno):
  62.         """An iteraor to unpack data from socket buffer"""
  63.         if self.connectionPoll[sockFileno][1] != '':
  64. # 上次解包有未处理的分包数据
  65.             if len(self.connectionPoll[sockFileno][1])<8:
  66. # 上次遗留的分包长度小于包头长度
  67.                 if len(self.connectionPoll[sockFileno][1]) + len(data) < 8:
  68. # 合并后长度依旧不小于包头长度,直接合并两次数据包
  69.                     self.connectionPoll[sockFileno][1] += data
  70.                     return
  71.                 try:
  72. # 尝试将上次遗留数据与新数据合并
  73.                     packMark,dataLen = struct.unpack('Ii',self.connectionPoll[sockFileno][1] + data)
  74. # 检查合并数据包头,包头错误直接丢弃上次数据
  75.                     if (packMark != 1346 or ((dataLen > MaxMassageLen - 4) or (dataLen < MinMassageLen + 4))):
  76.                         self.connectionPoll[sockFileno][1] = ''
  77.                         __unpackDataToQueue(data,sockFileno)
  78.                         return
  79.                     else:
  80. # 包头正确合并两数据包并清除保存遗留数据的变量,递归调用解包函数
  81.                         data = self.connectionPoll[sockFileno][1] + data
  82.                         self.connectionPoll[sockFileno][1] = ''
  83.                         __unpackDataToQueue(data,sockFileno)
  84.                         return
  85.                 except:
  86.                     self.connectionPoll[sockFileno][1] = ''
  87.                     __unpackDataToQueue(data,sockFileno)
  88.                     return
  89.             else:
  90.                 data = self.connectionPoll[sockFileno][1] + data
  91.                 self.connectionPoll[sockFileno][1] = ''
  92.                 __unpackDataToQueue(data,sockFileno)
  93.                 return
  94.         if len(data)>8:
  95. # 数据包长度大于包头长度
  96.             try:
  97.                 packMark,dataLen = struct.unpack('Ii',data)
  98.                 if (packMark != 1346 or ((dataLen > MaxMassageLen - 4) or (dataLen < MinMassageLen + 4))):
  99.                     return
  100.             except:
  101.                 return
  102.             if dataLen > len(data) - 8:
  103.                 self.connectionPoll[sockFileno][1] = data
  104.                 return
  105.             elif (dataLen == len(data)):
  106. # self.cond.acquire()
  107.                 self.dataQueue.put((data[8:],sockFileno),False)
  108. # self.cond.release()
  109.                 return
  110.             elif dataLen < len(data) - 8:
  111. # self.cond.acquire()
  112.                 self.dataQueue.put((data[8:dataLen + 8],sockFileno),False)
  113. # self.cond.release()
  114.                 __unpackDataToQueue(data[dataLen + 8:],sockFileno)
  115.                 return
  116.         else:
  117. # 数据包长度小于包头长度,保存当前数据分包
  118.             self.connectionPoll[sockFileno][1] = data
  119.             return


  120.     def __talkDeamon(self):
  121.         time.sleep(5)
  122.         while True:
  123.             self.cond.acquire()
  124.             if(self.dataQueue.empty()):
  125.                 self.cond.wait()
  126.             while(not self.dataQueue.empty()):
  127.                 package = self.dataQueue.get()
  128.             self.cond.release()


1
阅读(1319) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~