Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1130110
  • 博文数量: 170
  • 博客积分: 1603
  • 博客等级: 上尉
  • 技术积分: 1897
  • 用 户 组: 普通用户
  • 注册时间: 2010-07-09 15:54
文章分类

全部博文(170)

文章存档

2016年(27)

2015年(21)

2014年(27)

2013年(21)

2012年(7)

2011年(67)

我的朋友

分类: Python/Ruby

2016-03-22 11:08:18

用epoll写的,换select写法也差不多


点击(此处)折叠或打开

  1. # -*- coding: UTF-8 -*-
  2. import re
  3. import socket
  4. import select
  5. import logging
  6. import struct
  7. import time

  8. # 里面存放自定义常量
  9. import _static

  10. __author__ = 'loliz_000'


  11. def decode_socket_data(data_buffer):
  12.     """
  13.     命令包解析函数,所有包都以WPROJ_PACKAGER_PACK_HEAD + 包命令类型为开头 包内存在string,则第三部分为string的长度
  14.     @param data_buffer:
  15.     @return 返回none,解析失败
  16.     @return 返回结构 (剩余半包, [(包1命令码,包1结果码,包1内容), (包2命令码,包2结果码,包2内容)])
  17.     """
  18.     data_list = []
  19.     buffer_size = len(data_buffer)
  20.     if buffer_size < 12:
  21.         return data_buffer, data_list
  22.     try:
  23.         # 这里是我自定义包结构
  24.         wproj_head_code, package_command_type, code, package_len = struct.unpack('>HHII', data_buffer[0:12])
  25.         # 解包,判断包头码
  26.         if wproj_head_code != _static.WPROJ_PACKAGER_PACK_HEAD:
  27.             return None
  28.     except struct.error:
  29.         return None
  30.     # 包长度过长
  31.     if package_len >= _static.WPROJ_PACKAGER_MAX_PACK_LEN:
  32.         logging.warning('decode_socket_data find data over max size')
  33.         return None
  34.     # 没有包体
  35.     if buffer_size == 12:
  36.         if package_len > 0:
  37.             return None
  38.         data_list.append((package_command_type, code, None))
  39.         return None, data_list
  40.     # 所需包长度 > 实际包长, 这是一个半包
  41.     if package_len > len(data_buffer[12:]):
  42.         return data_buffer, data_list
  43.     # 包长度正好
  44.     elif package_len == len(data_buffer[12:]):
  45.         try:
  46.             data = struct.unpack('>%ds' % package_len, data_buffer[12:])
  47.         except struct.error:
  48.             return None
  49.         data_list.append((package_command_type, code, data))
  50.         return None, data_list
  51.     # 所需包长度 < 实际包长 有粘包,递归处理
  52.     else:
  53.         try:
  54.             data = struct.unpack('>%ds' % package_len, data_buffer[12:12+package_len])
  55.         except struct.error:
  56.             return None
  57.         data_list.append((package_command_type, code, data))
  58.         decode_socket_res = decode_socket_data(data_buffer[12+package_len:])
  59.         if not decode_socket_res:
  60.             return None
  61.         return decode_socket_res[0], data_list.extend(decode_socket_res[1])



  62. class AgentServer:
  63.     """
  64.     打包端守护进程
  65.     """
  66.     LOG_TYPE_RE = re.compile('^((DEBUG)|(INFO)|(WARNING)|(ERROR)|(CRITICAL))+?$', re.IGNORECASE)

  67.     def __init__(self):
  68.         self.bind_port = 2000
  69.         self.bind_sock = None
  70.         self.epoll = None
  71.         self.bind_lock = None
  72.         self.queue = None
  73.         self.socket_pool_dict = {}

  74.     def bind_sock_start(self):
  75.         logging.info('PackageAgentServer bind socket start')
  76.         bind_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  77.         #bind_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  78.         try:
  79.             socket.setdefaulttimeout(10)
  80.             bind_sock.bind(('0.0.0.0', self.bind_port))
  81.             bind_sock.listen(100)
  82.             self.bind_sock = bind_sock
  83.             logging.warning('PackageAgentServer bind socket success')
  84.         except socket.error:
  85.             logging.warning('PackageAgentServer bind socket get socket error')
  86.             return False
  87.         except OSError:
  88.             logging.warning('PackageAgentServer bind socket get os error')
  89.             return False
  90.         except IOError:
  91.             logging.warning('PackageAgentServer bind socket get io error')
  92.             return False
  93.         logging.info('PackageAgentServer bind socket end')
  94.         self.epoll = select.epoll()
  95.         self.epoll.register(self.bind_sock.fileno(), select.EPOLLIN)
  96.         return True


  97.     def add_to_socket_pool(self, sock):
  98.         self.bind_lock.acquire()
  99.         # socket 连接池 添加
  100.         self.epoll.register(sock.fileno(), select.EPOLLIN | select.EPOLLET)
  101.         self.socket_pool_dict[sock.fileno] = (sock, None)
  102.         self.bind_lock.release()

  103.     def del_from_socket_pool(self, sock):
  104.         if isinstance(sock, socket.socket):
  105.             fd = sock.fileno()
  106.         else:
  107.             fd = sock
  108.         self.bind_lock.acquire()
  109.         try:
  110.             # socket 连接池移除
  111.             self.socket_pool_dict.pop(fd)
  112.             self.epoll.unregister(fd, select.EPOLLIN | select.EPOLLET)
  113.         except KeyError:
  114.             logging.warning('del_from_socket_pool pop sock more then once')
  115.         self.bind_lock.release()

  116.     def connection_loop(self):
  117.         """
  118.         获取新tcp-ip连接,另启一个线程运行
  119.         """
  120.         error_count = 0
  121.         logging.info('scoket connection_loop start')
  122.         while 1: # 这个循不在主线程里运行,fork后不需要处理
  123.             # 退出标记为1结束循环
  124.             if self.single_out:
  125.                 break
  126.             if error_count > 500:
  127.                 logging.warning('connection_loop get too much error')
  128.                 self.single_out = 1
  129.                 continue
  130.             for fd, events in self.epoll.poll():
  131.                 if fd == self.bind_sock.fileno():
  132.                     client_sock, client_ipaddr = self.bind_sock.accept()
  133.                     self.add_to_socket_pool(client_sock)
  134.                 elif select.EPOLLIN & events:
  135.                     cur_sock, half_package = self.socket_pool_dict[fd]
  136.                     data = cur_sock.recv(10000)
  137.                     # 有半包数据
  138.                     if half_package:
  139.                         data = half_package + data
  140.                     # 调用解析数据包
  141.                     decode_data = decode_socket_data(data)
  142.                     # 包解析出错,返回None
  143.                     if not decode_data:
  144.                         # 自己写个关闭函数
  145.                         close_ret = close_sock(cur_sock, _static.DECODE_PACKAGE_FAILURE, 'package decode failure')
  146.                         if close_ret['ret'] > 0:
  147.                             logging.error('close socket error at DECODE_PACKAGE, error %s' % close_ret['msg'])
  148.                         self.del_from_socket_pool(fd)
  149.                         continue
  150.                     # 包解析正确, 半包传入socket_pool_dict,供下次数据处理用
  151.                     self.socket_pool_dict[fd][1], data_list = decode_data
  152.                     # 已经处理好的数据列表塞入队列, 这里可以做点优化, 直接在解析包的函数中就塞入队列
  153.                     for _data in data_list:
  154.                             self.queue.put((cur_sock, _data))
  155.                 elif select.EPOLLHUP & events:
  156.                     self.del_from_socket_pool(fd)
  157.                 else:
  158.                     logging.warning('unkonw event')


  159.     def work_loop(self):
  160.         while 1:
  161.             # 退出标记为1,结束循环
  162.             if self.single_out:
  163.                 break
  164.             if self.queue.empty():
  165.                 time.sleep(0.01)
  166.             else:
  167.                 queue_data = self.queue.get()
  168.                 cur_sock = queue_data[0]
  169.                 package_command_type, code, data = queue_data[1]
  170.                 # do what you want do

  171.     def start(self):
  172.         # 绑定端口
  173.         if not self.bind_sock_start():
  174.             raise
  175.         import threading
  176.         # 启动数据接收线程
  177.         data_thread = threading.Thread(target=self.connection_loop)
  178.         data_thread.setDaemon(True)
  179.         data_thread.start()
  180.         # 启动数据处理线程
  181.         data_thread = threading.Thread(target=self.work_loop)
  182.         data_thread.setDaemon(True)
  183.         data_thread.start()


阅读(5148) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~