Chinaunix首页 | 论坛 | 博客
  • 博客访问: 5120182
  • 博文数量: 921
  • 博客积分: 16037
  • 博客等级: 上将
  • 技术积分: 8469
  • 用 户 组: 普通用户
  • 注册时间: 2006-04-05 02:08
文章分类

全部博文(921)

文章存档

2020年(1)

2019年(3)

2018年(3)

2017年(6)

2016年(47)

2015年(72)

2014年(25)

2013年(72)

2012年(125)

2011年(182)

2010年(42)

2009年(14)

2008年(85)

2007年(89)

2006年(155)

分类: Python/Ruby

2015-06-24 16:35:12

有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的。
下面记录下分别基于Select/Poll/Epoll的echo server实现。
Python Select Server,可监控事件数量有限制:


  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3. import select
  4. import socket
  5. import Queue
  6.   
  7. server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  8. server.setblocking(False)
  9. server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR , 1)
  10. server_address= ('192.168.1.5',8080)
  11. server.bind(server_address)
  12. server.listen(10)
  13.   
  14. #select轮询等待读socket集合
  15. inputs = [server]
  16. #select轮询等待写socket集合
  17. outputs = []
  18. message_queues = {}
  19. #select超时时间
  20. timeout = 20
  21.   
  22. while True:
  23.     print "等待活动连接......"
  24.     readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)
  25.   
  26.     if not (readable or writable or exceptional) :
  27.         print "select超时无活动连接,重新select...... "
  28.         continue;
  29.     #循环可读事件
  30.     for s in readable :
  31.         #如果是server监听的socket
  32.         if s is server:
  33.             #同意连接
  34.             connection, client_address = s.accept()
  35.             print "新连接: ", client_address
  36.             connection.setblocking(0)
  37.             #将连接加入到select可读事件队列
  38.             inputs.append(connection)
  39.             #新建连接为key的字典,写回读取到的消息
  40.             message_queues[connection] = Queue.Queue()
  41.         else:
  42.             #不是本机监听就是客户端发来的消息
  43.             data = s.recv(1024)
  44.             if data :
  45.                 print "收到数据:" , data , "客户端:",s.getpeername()
  46.                 message_queues[s].put(data)
  47.                 if s not in outputs:
  48.                     #将读取到的socket加入到可写事件队列
  49.                     outputs.append(s)
  50.             else:
  51.                 #空白消息,关闭连接
  52.                 print "关闭连接:", client_address
  53.                 if s in outputs :
  54.                     outputs.remove(s)
  55.                 inputs.remove(s)
  56.                 s.close()
  57.                 del message_queues[s]
  58.     for s in writable:
  59.         try:
  60.             msg = message_queues[s].get_nowait()
  61.         except Queue.Empty:
  62.             print "连接:" , s.getpeername() , '消息队列为空'
  63.             outputs.remove(s)
  64.         else:
  65.             print "发送数据:" , msg , "到", s.getpeername()
  66.             s.send(msg)
  67.       
  68.     for s in exceptional:
  69.         print "异常连接:", s.getpeername()
  70.         inputs.remove(s)
  71.         if s in outputs:
  72.             outputs.remove(s)
  73.         s.close()
  74.         del message_queues[s]

Python Poll Server,Select升级版,无可监控事件数量限制,还是要轮询所有事件:


  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3. import socket
  4. import select
  5. import Queue
  6.   
  7. server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  8. server.setblocking(False)
  9. server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  10. server_address = ("192.168.1.5", 8080)
  11. server.bind(server_address)
  12. server.listen(5)
  13. print "服务器启动成功,监听IP:" , server_address
  14. message_queues = {}
  15. #超时,毫秒
  16. timeout = 5000
  17. #监听哪些事件
  18. READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
  19. READ_WRITE = (READ_ONLY|select.POLLOUT)
  20. #新建轮询事件对象
  21. poller = select.poll()
  22. #注册本机监听socket到等待可读事件事件集合
  23. poller.register(server,READ_ONLY)
  24. #文件描述符到socket映射
  25. fd_to_socket = {server.fileno():server,}
  26. while True:
  27.     print "等待活动连接......"
  28.     #轮询注册的事件集合
  29.     events = poller.poll(timeout)
  30.     if not events:
  31.       print "poll超时,无活动连接,重新poll......"
  32.       continue
  33.     print "有" , len(events), "个新事件,开始处理......"
  34.     for fd ,flag in events:
  35.         s = fd_to_socket[fd]
  36.         #可读事件
  37.         if flag & (select.POLLIN | select.POLLPRI) :
  38.             if s is server :
  39.                 #如果socket是监听的server代表有新连接
  40.                 connection , client_address = s.accept()
  41.                 print "新连接:" , client_address
  42.                 connection.setblocking(False)
  43.                   
  44.                 fd_to_socket[connection.fileno()] = connection
  45.                 #加入到等待读事件集合
  46.                 poller.register(connection,READ_ONLY)
  47.                 message_queues[connection] = Queue.Queue()
  48.             else :
  49.                 #接收客户端发送的数据
  50.                 data = s.recv(1024)
  51.                 if data:
  52.                     print "收到数据:" , data , "客户端:" , s.getpeername()
  53.                     message_queues[s].put(data)
  54.                     #修改读取到消息的连接到等待写事件集合
  55.                     poller.modify(s,READ_WRITE)
  56.                 else :
  57.                     # Close the connection
  58.                     print " closing" , s.getpeername()
  59.                     # Stop listening for input on the connection
  60.                     poller.unregister(s)
  61.                     s.close()
  62.                     del message_queues[s]
  63.         #连接关闭事件
  64.         elif flag & select.POLLHUP :
  65.             print " Closing ", s.getpeername() ,"(HUP)"
  66.             poller.unregister(s)
  67.             s.close()
  68.         #可写事件
  69.         elif flag & select.POLLOUT :
  70.             try:
  71.                 msg = message_queues[s].get_nowait()
  72.             except Queue.Empty:
  73.                 print s.getpeername() , " queue empty"
  74.                 poller.modify(s,READ_ONLY)
  75.             else :
  76.                 print "发送数据:" , data , "客户端:" , s.getpeername()
  77.                 s.send(msg)
  78.         #异常事件
  79.         elif flag & select.POLLERR:
  80.             print " exception on" , s.getpeername()
  81.             poller.unregister(s)
  82.             s.close()
  83.             del message_queues[s]

Python Epoll Server,基于回调的事件通知模式,轻松管理大量连接:


  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3. import socket, select
  4. import Queue
  5.  
  6. serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  7. serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  8. server_address = ("192.168.1.5", 8080)
  9. serversocket.bind(server_address)
  10. serversocket.listen(1)
  11. print "服务器启动成功,监听IP:" , server_address
  12. serversocket.setblocking(0)
  13. timeout = 10
  14. #新建epoll事件对象,后续要监控的事件添加到其中
  15. epoll = select.epoll()
  16. #添加服务器监听fd到等待读事件集合
  17. epoll.register(serversocket.fileno(), select.EPOLLIN)
  18. message_queues = {}
  19.  
  20. fd_to_socket = {serversocket.fileno():serversocket,}
  21. while True:
  22.   print "等待活动连接......"
  23.   #轮询注册的事件集合
  24.   events = epoll.poll(timeout)
  25.   if not events:
  26.      print "epoll超时无活动连接,重新轮询......"
  27.      continue
  28.   print "有" , len(events), "个新事件,开始处理......"
  29.   for fd, event in events:
  30.      socket = fd_to_socket[fd]
  31.      #可读事件
  32.      if event & select.EPOLLIN:
  33.          #如果活动socket为服务器所监听,有新连接
  34.          if socket == serversocket:
  35.             connection, address = serversocket.accept()
  36.             print "新连接:" , address
  37.             connection.setblocking(0)
  38.             #注册新连接fd到待读事件集合
  39.             epoll.register(connection.fileno(), select.EPOLLIN)
  40.             fd_to_socket[connection.fileno()] = connection
  41.             message_queues[connection] = Queue.Queue()
  42.          #否则为客户端发送的数据
  43.          else:
  44.             data = socket.recv(1024)
  45.             if data:
  46.                print "收到数据:" , data , "客户端:" , socket.getpeername()
  47.                message_queues[socket].put(data)
  48.                #修改读取到消息的连接到等待写事件集合
  49.                epoll.modify(fd, select.EPOLLOUT)
  50.      #可写事件
  51.      elif event & select.EPOLLOUT:
  52.         try:
  53.            msg = message_queues[socket].get_nowait()
  54.         except Queue.Empty:
  55.            print socket.getpeername() , " queue empty"
  56.            epoll.modify(fd, select.EPOLLIN)
  57.         else :
  58.            print "发送数据:" , data , "客户端:" , socket.getpeername()
  59.            socket.send(msg)
  60.      #关闭事件
  61.      elif event & select.EPOLLHUP:
  62.         epoll.unregister(fd)
  63.         fd_to_socket[fd].close()
  64.         del fd_to_socket[fd]
  65. epoll.unregister(serversocket.fileno())
  66. epoll.close()
  67. serversocket.close()
原文链接


阅读(2223) | 评论(0) | 转发(0) |
0

上一篇:twisted reactor解剖

下一篇:epoll事件掩码

给主人留下些什么吧!~~