Chinaunix首页 | 论坛 | 博客
  • 博客访问: 23286
  • 博文数量: 6
  • 博客积分: 1400
  • 博客等级: 上尉
  • 技术积分: 74
  • 用 户 组: 普通用户
  • 注册时间: 2009-12-21 20:29
文章分类
文章存档

2011年(1)

2010年(4)

2009年(1)

我的朋友

分类: Python/Ruby

2010-04-11 16:04:45

Python epoll网络编程二:客户端模型
第一篇简单介绍了一个使用epoll的服务器模型,本节介绍使用epoll的客户端模型,客户端连接多个服务器。
以下是发送hello请求的客户端,可以作为探测程序,探测服务器的工作状态,如果返回greeting,表示服务器正常工作。代码如下:

文件:epoll_python.zip
大小:2KB
下载:下载

def usage():
    print 'python epoll_client.py : : ...'
    print '\texample: python epoll_client.py localhost:9999 localhost:9998' 
    sys.exit(1)
    
    
class HelloClient(EpollConnector):
    ''' extends EpolloServer, overload do_operation''' 
    def __init__(self, logger, srvs):
        EpollConnector.__init__(self, logger, srvs)
    
    def parse_response(self, fileno):
        if self.responses[fileno] != epoll_util.greeting_response:
            try:
                print 'error in response %s'%str(self.connections[fileno].getpeername())
            except:
                print 'disconnect'
    

if __name__ == "__main__":
    srvs = []
    for arg in sys.argv[1:]:
        try:    
            a, p = arg.split(':')
            srvs.append((a, int(p)))
        except : 
            usage()
    if len(srvs) == 0:
        usage()
    srv = HelloClient(None, srvs)
    srv.init_epoll()
    srv.loop_epoll()


EpollConnector.init_epoll():
epoll_connector.py
    21    def init_epoll(self):
    22        # fileno set for connection
    23        self.conns_index = {}
    24        self.connections = {}
    25        self.requests = {}
    26        self.responses = {}
    27        for i in range( len( self.srvs_addr ) ):
    28            try:
    29                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    30                s.connect(self.srvs_addr[i])
    31                s.setblocking(0)
    32                fileno = s.fileno()
    33                self.epoll.register(fileno, select.EPOLLOUT | select.EPOLLET)
    34            except socket.error:
    35                print 'ERROR in connect to %s'%str( self.srvs_addr[i] )
    36                sys.exit(1)
    37            else:
    38                self.conns_index[fileno] = i
    39                self.connections[fileno] = s
    40                self.requests[fileno] = b''
    41                self.responses[fileno] = b''

23-26行:初始化连接信息,key均为fileno; conns_index的value是客户连接编号; connections,requests,responses 的value分别是socket和请求与相应;
29-32行:初始化客户端连接,监听EPOLLOUT;与服务器模型相反,服务器模型初始化监听EPOLLIN;
34-36行:当连接失败时,退出;
38-41行:当连接成功,初始化连接的信息;

EpollConnector.loop_epoll():
epoll_connector.py
    92    def loop_epoll(self):
    93        try:
    94            while self.status != epoll_util.close_status:
    95                events = self.epoll.poll()                
    96                for fileno, event in events:
    97                    self.do_epoll(fileno, event)
    98        finally:
    99            self.close_epoll()

92-99行:与服务器模型相同,处理监听事件;


EpollConnector.do_epoll():
epoll_connector.py
    72    def do_epoll(self, fileno, event):
    73        try:
    74            if event & select.EPOLLOUT:
    75                self.do_request(fileno)
    76            elif event & select.EPOLLIN:                        
    77                self.do_response(fileno)
    78            elif event & select.EPOLLHUP:
    79                self.hup_epoll(fileno)
    80        except:
    81            raise

74-75行:EPOLLOUT 可读时,调用do_request读取相应;
77-78行:EPOLLIN可读时,调用do_response读取相应;
request和response是针对连接请求而言,因此客户端与服务器段的监听事件与监听动作正好相反;

EpollConnector.do_request(self, fileno):
epoll_connector.py
    83    def set_request(self, fileno):
    84        self.requests[fileno] = epoll_util.hello_request
    85        
    86    def do_request(self, fileno):
    87        self.set_request(fileno)
    88        epoll_util.send_epoll(self.connections[fileno], self.requests[fileno])
    89        self.epoll.modify(fileno, select.EPOLLIN | select.EPOLLET)
    90        self.requests[fileno] = b''

87行:调用set_request(fileno)
设置请求,具体的客户端可overload此函数;

EpollConnector.do_response(self, fileno):
epoll_connector.py
    43    #overload
    44    def parse_response(self, response):
    45        if self.responses[fileno] == epoll_util.fail_response:
    46            pass
    47    
    48    def do_response(self, fileno):
    49        self.responses[fileno] = b''
    50        s = self.connections[fileno]
    51        self.responses[fileno] += epoll_util.recv_epoll(s)
    52        self.parse_response(fileno)
    53        self.epoll.modify(fileno, select.EPOLLOUT | select.EPOLLET)

52行:调用parse_response(response)
处理请求相应,具体的客户端可overload此函数;


源代码如下:
     1 import socket
     2 import select
     3 import sys
     4 import epoll_util
     5
     6 class EpollConnector:
     7    '''generic epoll connectors which connect to down stream server'''
     8    def __init__(self, logger, srvs):
     9        self.logger = logger
    10        self.srvs_addr = srvs
    11        #status
    12        self.status = epoll_util.begin_status
    13                
    14        # initial epoll fileno
    15        self.epoll = select.epoll()
    16    
    17    def set_epoll(self, poll):
    18        self.epoll.close()
    19        self.epoll = poll
    20
    21    def init_epoll(self):
    22        # fileno set for connection
    23        self.conns_index = {}
    24        self.connections = {}
    25        self.requests = {}
    26        self.responses = {}
    27        for i in range( len( self.srvs_addr ) ):
    28            try:
    29                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    30                s.connect(self.srvs_addr[i])
    31                s.setblocking(0)
    32                fileno = s.fileno()
    33                self.epoll.register(fileno, select.EPOLLOUT | select.EPOLLET)
    34            except socket.error:
    35                print 'ERROR in connect to %s'%str( self.srvs_addr[i] )
    36                sys.exit(1)
    37            else:
    38                self.conns_index[fileno] = i
    39                self.connections[fileno] = s
    40                self.requests[fileno] = b''
    41                self.responses[fileno] = b''
    42
    43    #overload
    44    def parse_response(self, response):
    45        if self.responses[fileno] == epoll_util.fail_response:
    46            pass
    47    
    48    def do_response(self, fileno):
    49        self.responses[fileno] = b''
    50        s = self.connections[fileno]
    51        self.responses[fileno] += epoll_util.recv_epoll(s)
    52        self.parse_response(fileno)
    53        self.epoll.modify(fileno, select.EPOLLOUT | select.EPOLLET)
    54        
    55    #overload
    56    def recover(self, fileno):
    57        print 'hup_epoll', fileno
    58    
    59    def hup_epoll(self, fileno):
    60        self.recover(fileno)
    61        self.epoll.unregister(fileno)
    62        self.connections[fileno].close()
    63        del self.connections[fileno]
    64        del self.requests[fileno]
    65        del self.responses[fileno]
    66        del self.conns_index[fileno]
    67        
    68        
    69    def close_epoll(self):
    70        self.epoll.close()
    71        
    72    def do_epoll(self, fileno, event):
    73        try:
    74            if event & select.EPOLLOUT:
    75                self.do_request(fileno)
    76            elif event & select.EPOLLIN:                        
    77                self.do_response(fileno)
    78            elif event & select.EPOLLHUP:
    79                self.hup_epoll(fileno)
    80        except:
    81            raise
    82        
    83    def set_request(self, fileno):
    84        self.requests[fileno] = epoll_util.hello_request
    85        
    86    def do_request(self, fileno):
    87        self.set_request(fileno)
    88        epoll_util.send_epoll(self.connections[fileno], self.requests[fileno])
    89        self.epoll.modify(fileno, select.EPOLLIN | select.EPOLLET)
    90        self.requests[fileno] = b''
    91    
    92    def loop_epoll(self):
    93        try:
    94            while self.status != epoll_util.close_status:
    95                events = self.epoll.poll()                
    96                for fileno, event in events:
    97                    self.do_epoll(fileno, event)
    98        finally:
    99            self.close_epoll()
   100
   101
   102 def usage():
   103    print 'python epoll_client.py : : ...'
   104    print '\texample: python epoll_client.py 218.241.108.68:9999 218.241.108.68:9998' 
   105    sys.exit(2)
   106    
   107    
   108 class HelloClient(EpollConnector):
   109    ''' extends EpolloServer, overload do_operation''' 
   110    def __init__(self, logger, srvs):
   111        EpollConnector.__init__(self, logger, srvs)
   112    
   113    def parse_response(self, fileno):
   114        if self.responses[fileno] != epoll_util.greeting_response:
   115            try:
   116                print 'error in response %s'%str(self.connections[fileno].getpeername())
   117            except:
   118                print 'disconnect'
   119    
   120
   121 if __name__ == "__main__":
   122    srvs = []
   123    for arg in sys.argv[1:]:
   124        try:    
   125            a, p = arg.split(':')
   126            srvs.append((a, int(p)))
   127        except : 
   128            usage()
   129    if len(srvs) == 0:
   130        usage()
   131    srv = HelloClient(None, srvs)
   132    srv.init_epoll()
   133    srv.loop_epoll()

阅读(1967) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~