三、异步client与异步server的通信
1. 服务端代码
pythone socket的server段,开放三个端口:10000,10001,10002.
例子中是每个server绑定一个端口,测试的时候需要分别开3个shell,分别运行.
这太麻烦了,就分别用三个Thread来运行这些services
#!/usr/bin/env python
#
# -*- coding:utf-8 -*-
# File: multithrd_socket_server.py
#
import optparse
import os
import socket
import time
from threading import Thread
from io import StringIO
txt = '''1111
2222
3333
4444
'''
# 服务端程序处理过程
def server(listen_socket):
while True:
buf = StringIO(txt)
sock, addr = listen_socket.accept()
print('Somebody at %s wants poetry!' %(addr,))
while True:
try:
line = buf.readline().strip()
if not line:
sock.close()
break
sock.sendall(line.encode('utf8')) # this is a blocking call
print('send bytes to client: %s' %line)
except socket.error:
sock.close()
break
time.sleep(0.5) # server每发送一个单词后等待一会
sock.close()
print('\n')
# 同时开启三个服务端线程,分别在三个端口监听
# 服务端程序为阻塞方式,只能一次服务于一个客户端
def main():
ports = [10000, 10001, 10002]
for port in ports:
listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
addres = ('127.0.0.1', port)
listen_socket.bind(addres)
listen_socket.listen(5)
print("start listen at: %d" %port)
worker = Thread(target = server, args = [listen_socket])
worker.setDaemon(True)
worker.start()
if __name__ == '__main__':
main()
while True:
time.sleep(0.1) # 如果不sleep的话, CPU会被Python完全占用了
2. 同步客户端
下面是一个client, 用阻塞方式,先后连接这个三个端口的server:
from socket import *
# 建立三个客户端,分别连接三个不同的服务端程序, 接收服务端传过来的数据并打印
# 这三个客户端是阻塞方式通信的
if __name__ == '__main__':
ports = [10000, 10001, 10002]
for port in ports:
address = ('127.0.0.1', port)
sock = socket(AF_INET, SOCK_STREAM)
sock.connect(address)
poem = ''
while True:
data = sock.recv(4)
if not data:
sock.close()
break
poem += data.decode('utf8')
print(poem)
sock.close()
3. 异步客户端:
import datetime, errno, optparse, select, socket
def connect(port):
"""Connect to the given server and return a non-blocking socket."""
address = ('127.0.0.1', port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(address)
sock.setblocking(0) # 设置为非阻塞模式
return sock
def format_address(address):
host, port = address
return '%s:%s' % (host or '127.0.0.1', port)
if __name__ == '__main__':
ports = [10000, 10001, 10002]
start = datetime.datetime.now()
sockets = list(map(connect, ports))
poems = dict.fromkeys(sockets, '') # socket -> accumulated poem
sock2task = dict([(s, i + 1) for i, s in enumerate(sockets)])
while sockets:
#运用select来确保那些可读取的异步socket可以立即开始读取IO
#OS不停的搜索目前可以read的socket,有的话就返回rlist
rlist, _, _ = select.select(sockets, [], [])
for sock in rlist:
data = ''
while True:
try:
new_data = sock.recv(1024)
new_data = new_data.decode('utf8')
except socket.error as e:
if e.args[0] == errno.EWOULDBLOCK:
break
raise
else:
if not new_data:
break
else:
print(new_data)
data += new_data
task_num = sock2task[sock]
if not data:
print(poems[sock]) # 打印sock接收到的数据
sockets.remove(sock)
sock.close()
print('Task %d finished\n' % task_num)
else:
addr_fmt = format_address(sock.getpeername())
msg = 'Task %d: got %d bytes of poetry from %s\n'
print(msg % (task_num, len(data), addr_fmt))
poems[sock] += data # 保存每个sock接收到的数据
elapsed = datetime.datetime.now() - start
print('Got poems in %s' %elapsed)
对客户端的异步改造主要有两点:
同步模式下,客户端分别创建socket;
而在异步模式下,client开始就创建了所有的socket。
通过“sock.setblocking(0)”设置socket为异步模式。
通过Unix系统的select来返回可读取socket信息
最为核心的是8行和26行。尤其是26行的select操作返回待读取socket的列表
四、asyncore实现异步socket通信
该模块提供了异步socket服务客户端和服务器的基础架构。
只有两种方法让程序在单个处理器上的同时做超过一件的事情。
多线程编程是最简单,最普遍的方式,
但还有另一种非常不同的技术,可以让你具有多线程几乎所有的优点,实际上并没有使用多线程。
程序的瓶颈主要在于I/O时是比较可行的。如果你的程序的瓶颈在处理器,确实需要多线程,不过网络服务器的瓶颈大多在I/O。
如果您的操作系统支持I/O库的select()系统调用(一般都支持) ,
那么你可以使用它同时处理多个通信信道,做其他的工作的同时让I/O在后台执行,这比多线程更容易理解。
asyncore和asynchat的基本思路是创建一个或多个网络通道,即asyncore.dispatcher和asynchat.async_chat的实例。
然后添加到全局映射,如果你没有创建自己的映射,可以直接使用loop()函数。
loop()激活所有通道服务,执行直到最后一个通道关闭。
4.1 接口文档
1. asyncore.loop([timeout[, use_poll[, map[, count]]]])
进入轮询循环直到所有打开的通道已被关闭或计数通过。
所有的参数都是可选的。 count参数默认为None,只有当所有通道都被关闭时循环才会终止。
timeout参数设置为select()或poll()调用设置超时,以秒为单位,默认为30秒。
use_poll参数,如果为true ,则表示 poll()优先于select(),默认值为False。
map是包含监控的channel的字典。channel关闭时会从map中删除。不指定map时会使用全局map。
Channel(asyncore.dispatcher , asynchat.async_chat和其子类的实例)可以自由地混合在map上)。
2. class asyncore.dispatcher:
dispatcher底层socket的轻便包装。它有一些事件处理供异步循环调用。
否则,它可以被视为一个正常的非阻塞socket对象。底层事件的触发或者连接状态改变通知异步循环发生了高层事件,
高层事件有:
. handle_connect():Implied by the first read or write event。
. handle_close():Implied by a read event with no data available。
. handle_accept():Implied by a read event on a listening socket。
3. asyncore.dispatcher的新方法:
在异步处理,每个映射通道的readable()和writable()方法用于确定通道的socket
是否应该被添加到select()或poll()通道的频道列表中以读写事件。因此通道事件比基本socket 事件要多。
在子类中重写的方法如下:
. handle_read():当异步循环检测到通道的read()将成功时调用。
. handle_write():当异步循环检测到通道的write()将成功时调用。需要缓冲以提高性能。
def handle_write(self):
sent = self.send(self.buffer)
self.buffer = self.buffer[sent:]
. handle_expt():
当有socket连接有带外数据【 out of band (OOB)】时调用。这几乎不会发生,因为OOB的支持很好且很少使用。
. handle_connect():
当活动socket实际创建连接时调用。可能发送“welcome” banner,或与远程端点启动协议协商。
. handle_close():
当关闭socket时调用。
. handle_error():
当异常引发又没有其他处理时调用。默认版本print压缩的traceback。
. handle_accept():
监听通道(被动开启) ,当本端通过connect()可以和远端建立连接时在监听通道(被动开启)上调用。
4. readable():
每次异步循环时调用,以确定通道的socket是否应该被添加到产生读事件列表。
默认的方法只返回True,表示在默认情况下,所有通道将拥有读取事件。
5. writable():每次异步循环时调用,以确定通道的socket是否应该被添加到产生写事件列表。
默认的方法只返回True,表示在默认情况下,所有通道将拥有写事件。
6. 下面方法和socket相同,有些有所扩充:
. create_socket(family, type):参见socket文档。
. connect(address):参见socket文档。
. send(data):发送数据到远端socket。
. recv(buffer_size):从远端socket最多接收buffer_size字节的数据。空字符串意味着该信道已被远端关闭。
. listen(backlog):监听的连接数,默认和最小值都为1,最大值根据系统确定,一般是5。
. bind(address):绑定socket到address。socket必须未绑定。
地址的格式取决于地址族 ,参见socket文档。
调用set_reuse_addr()方法可以把socket设置为可重用的(参见设置SO_REUSEADDR选项)。
. accept():接受连接。
socket必须绑定到地址和监听连接。
返回值可以是None 或一对(conn, address),其中conn是新的可用来在连接上发送和接收数据socket对象,
address 是绑定到连接上远端套接字的地址。
None意味着连接并没有发生,在这种情况下,服务器应该忽略并继续侦听其他连接。
. close():关闭套接字。
Socket对象上的所有未来的操作将失败。
远端将接收没有更多的数据(排队数据清空之后) 。socket也会被垃圾收集自动关闭。
7. asyncore.dispatcher_with_send:
dispatcher的子类,增加了简单的缓冲输出,对于简单的客户端有用。详细资料参考:asynchat.async_chat。
8. class asyncore.file_dispatcher:
封装了文件描述符或文件对象及映射参数(可选)供poll()和loop()函数使用的文件分发器。
它提供了文件对象或其他具备fileno()方法的对象,调用fileno()并传递到file_wrapper构造函数。可用于UNIX。
9. class asyncore.file_wrapper:
接收整数文件描述符并调用os.dup()复制句柄,这样原句柄可以关闭,而文件包装器不受影响。
该类封装了大量方法以模拟socket给file_dispatcher类使用。可用于UNIX。
4.2. 示例1
1. 服务端程序
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: asyncore_server.py
#
import socket
import asyncore
import threading
MAX_RECV = 4096
#負責接受client socket連線
class AgentServer(asyncore.dispatcher):
def __init__(self, port):
#asyncore.dispatcher的constructor
asyncore.dispatcher.__init__(self)
#client socket
self.clientSocket = None
#server port
self.port = port
#建立等待的socket
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.bind(('', self.port))
self.listen(5)
def handle_accept(self):
#接受client socket的連線
self.clientSocket, address = self.accept()
print 'New client from : ' + address[0]
#將和client連線的socket加上一些功能 (自訂socket)
self.clientSocket = ClientAgent(self.clientSocket)
#自訂的client連線socket
class ClientAgent(asyncore.dispatcher):
def __init__(self, socket):
asyncore.dispatcher.__init__(self, socket)
#要送出的data
self.SendData = ""
#從client收到的data
def handle_read(self):
self.RecvData = self.recv(MAX_RECV)
if len(self.RecvData) > 0:
print "recv : " + self.RecvData
#送出data到client
def handle_write(self):
send_byte = self.send(self.SendData)
#一次可能不會全部送完(一次最多送出512 bytes ?)
if send_byte > 0:
send_out = self.SendData[:send_byte]
self.SendData = self.SendData[send_byte:]
self.handle_write()
else:
print "send all!!"
self.SendData = ""
#不自動執行handle_write
def writable(self):
return False
def handle_close(self):
print "close connection : " + self.getpeername()[0]
self.close()
#產生等待client連線的thread
class listen_client_thread(threading.Thread):
def __init__(self,port):
self.agentServer = AgentServer(port)
threading.Thread.__init__ ( self )
def run(self):
print "Listen Client ..."
asyncore.loop()
#產生處理輸入的thread
class input_thread(threading.Thread):
def __init__(self,listen_thread):
self.listen_thread = listen_thread
threading.Thread.__init__ ( self )
def run(self):
while 1:
send_data = raw_input()
self.listen_thread.agentServer.clientSocket.SendData = send_data
self.listen_thread.agentServer.clientSocket.handle_write()
if __name__ == "__main__":
#產生等待client連線的thread
listen_thread = listen_client_thread(111)
listen_thread.start()
#產生處理輸入的thread
input_thread(listen_thread).start()
2. Client 程式 :
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: asyncore_client.py
#
import asyncore, socket
import threading
#收到data最大長度
MAX_RECV = 4096
#連線server的socket
class client(asyncore.dispatcher):
def __init__(self, host, port):
asyncore.dispatcher.__init__(self)
self.SendData = ""
self.RecvData = ""
#和server建立連線
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect( (host, port) )
def handle_connect(self):
print 'connect!!'
def handle_close(self):
print "disconnection : " + self.getpeername()[0]
self.close()
#收到的data
def handle_read(self):
self.RecvData = self.recv(MAX_RECV)
if len(self.RecvData) > 0:
print "recv : " + self.RecvData
#送出data
def handle_write(self):
send_byte = self.send(self.SendData)
if send_byte > 0:
send_out = self.SendData[:send_byte]
self.SendData = self.SendData[send_byte:]
print "send : " + send_out
self.handle_write()
else:
print "send all!!"
self.SendData = ""
#自動偵測送出永遠失敗
def writable(self):
return False
#等待server傳送訊息的thread
class send_server_thread(threading.Thread):
def __init__(self,host,port):
self.client = client(host, port)
threading.Thread.__init__ ( self )
def run(self):
try:
asyncore.loop()
except:
pass
#等待user input的thread
class input_thread(threading.Thread):
def __init__(self,client_thread):
self.client_thread = client_thread
threading.Thread.__init__ ( self )
def run(self):
while 1:
send_data = raw_input()
self.client_thread.client.SendData = send_data
self.client_thread.client.handle_write()
#主程式
if __name__ == "__main__":
#建立和server連線, 並且等待溝通
client_thread = send_server_thread("localhost",111)
client_thread.start()
#等待user input
input_thread(client_thread).start()
4.3 示例2
The Python asyncore and aynchat modules
The Python standard library provides two modules—asyncore and
asynchat—to help in writing concurrent network servers using
event-based designs. The documentation does not give good examples,
so I am making some notes.
1. Overview
The basic idea behind the asyncore module is that:
. there is a function, asyncore.loop() that does select() on a bunch of ‘channels’.
Channels are thin wrappers around sockets.
. when select returns, it reports which sockets have data waiting to be read,
which ones are now free to send more data, and which ones have errors;
loop() examines the event and the socket’s state to create a higher level event;
. it then calls a method on the channel corresponding to the higher level event.
asyncore provides a low-level, but flexible API to build network
servers. asynchat builds upon asyncore and provides an API that is
more suitable for request/response type of protocols.
2. aysncore
The asyncore module’s API consists of:
. the loop() method, to be called by a driver program written by you;
. the dispatcher class, to be subclassed by you to do useful stuff.
The dispatcher class is what is called ‘channel’ elsewhere.
+-------------+ +--------+
| driver code |---------> | loop() |
+-------------+ +--------+
| |
| | loop-dispatcher API (a)
| |
| +--------------+
| | dispatcher |
+----------------->| subclass |
+--------------+
|
| dispatcher-logic API (b)
|
+--------------+
| server logic |
+--------------+
This is all packaged nicely in an object oriented way. So, we have
the dispatcher class, that extends/wraps around the socket class (from
the socket module in the Python standard library). It provides all
the socket class’ methods, as well as methods to handle the higher
level events. You are supposed to subclass dispatcher and implement
the event handling methods to do something useful.
3. The loop-dispatcher API
The loop function looks like this:
loop( [timeout[, use_poll[, map[,count]]]])
What is the map? It is a dictionary whose keys are the
file-descriptors, or fds, of the socket (i.e., socket.fileno()), and
whose values are the dispatcher objects which you want to handle events on that socket/fd.
When we create a new dispatcher object, it automatically gets added to a
global list of sockets (which is invisible to us, and managed behind the scenes).
The loop() function does a select() on this list.
We can over-ride the list that loop looks at, by providing an explicit map.
But then, we would need to add/remove dispatchers we create to/from this map ourselves.
(Hmm… we might always wantto use explicit maps; then our loop calls will be thread safe and we
will be able to launch multiple threads, each calling loop on
different maps.)
4. Methods a dispatcher subclass should implement
loop() needs the dispatcher to implement some methods:
. readable(): should return True, if you want the fd to be observed for read events;
. writable(): should return True, if you want the fd to be observed for write events;
If either readable or writable returns True, the corresponding fd will be examined
for errors also. Obviously, it makes no sense to have a dispatcher
which returns False for both readable and writable.
Some other methods that loop calls on dispatchers are:
. handle_read: socket is readable; dispatcher.recv() can be used to actually get the data
. handle_write: socket is writable; dispatcher.send(data) can be used to actually send the data
. handle_error: socket encountered an error
. handle_expt: socket received OOB data (not really used in practice)
. handle_close: socket was closed remotely or locally
For server dispatchers, loop calls one more event:
. handle_accept:
a new incoming connection can be accept()ed.
Call the accept() method really accept the connection.
To create a server socket, call the bind() and listen() methods on it first.
Client sockets get this event:
. handle_connect: connection to remote endpoint has been made.
To initiate the connection, first call the connect() method on it.
Client sockets are discussed in the asyncore documentation so I will not discuss them here.
Other socket methods are available in dispatcher: create_socket,
close, set_resue_addr. They are not called by loop but are available
so that your code can call them when it needs to create a new socket, close an existing socket,
and tell the OS to set the SO_REUSEADDR flag on the server socket.
5. How to write a server using asyncore
The standard library documentation gives a client example, but not a
server example. Here are some notes on the latter.
1. Subclass dispatcher to create a listening socket
2. In its handle_accept method, create new dispatchers. They’ll get added to the global socket map.
Note: the handlers must not block or take too much time… or the server won’t be concurrent.
This is because when multiple sockets get an event, loop calls their dispatchers one-by-one, in the same thread.
The socket-like functions that dispatcher extends should not be bypassed in order to access the low level socket functions.
They do funky things to detect higher level events. For e.g., how does asyncore figure out that the socket is closed?
If I remember correctly, there are two ways to detect whether a non-blocking socket is closed:
select() returns a read event, but when you call recv()/read() you get zero bytes;
you call send()/write() and it fails with an error (sending zero bytes is not an error).
(I wish I had a copy of Unix Network Programming by Stevens handy
right now.) dispatcher will detect both events above and if any one of them occurs, will call handle_close.
This frees you from having to look at low-level events, and think in terms of higher level events.
The code for a server based on asyncore is below:
asyncore_echo_server.py
import logging
import asyncore
import socket
logging.basicConfig(level=logging.DEBUG, format="%(created)-15s %(msecs)d %(levelname)8s %(thread)d %(name)s %(message)s")
log = logging.getLogger(__name__)
BACKLOG = 5
SIZE = 1024
class EchoHandler(asyncore.dispatcher):
def __init__(self, conn_sock, client_address, server):
self.server = server
self.client_address = client_address
self.buffer = ""
# We dont have anything to write, to start with
self.is_writable = False
# Create ourselves, but with an already provided socket
asyncore.dispatcher.__init__(self, conn_sock)
log.debug("created handler; waiting for loop")
def readable(self):
return True # We are always happy to read
def writable(self):
return self.is_writable # But we might not have
# anything to send all the time
def handle_read(self):
log.debug("handle_read")
data = self.recv(SIZE)
log.debug("after recv")
if data:
log.debug("got data")
self.buffer += data
self.is_writable = True # sth to send back now
else:
log.debug("got null data")
def handle_write(self):
log.debug("handle_write")
if self.buffer:
sent = self.send(self.buffer)
log.debug("sent data")
self.buffer = self.buffer[sent:]
else:
log.debug("nothing to send")
if len(self.buffer) == 0:
self.is_writable = False
# Will this ever get called? Does loop() call
# handle_close() if we called close, to start with?
def handle_close(self):
log.debug("handle_close")
log.info("conn_closed: client_address=%s:%s" % \
(self.client_address[0],
self.client_address[1]))
self.close()
#pass
class EchoServer(asyncore.dispatcher):
allow_reuse_address = False
request_queue_size = 5
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM
def __init__(self, address, handlerClass=EchoHandler):
self.address = address
self.handlerClass = handlerClass
asyncore.dispatcher.__init__(self)
self.create_socket(self.address_family,
self.socket_type)
if self.allow_reuse_address:
self.set_reuse_addr()
self.server_bind()
self.server_activate()
def server_bind(self):
self.bind(self.address)
log.debug("bind: address=%s:%s" % (self.address[0], self.address[1]))
def server_activate(self):
self.listen(self.request_queue_size)
log.debug("listen: backlog=%d" % self.request_queue_size)
def fileno(self):
return self.socket.fileno()
def serve_forever(self):
asyncore.loop()
# TODO: try to implement handle_request()
# Internal use
def handle_accept(self):
(conn_sock, client_address) = self.accept()
if self.verify_request(conn_sock, client_address):
self.process_request(conn_sock, client_address)
def verify_request(self, conn_sock, client_address):
return True
def process_request(self, conn_sock, client_address):
log.info("conn_made: client_address=%s:%s" % \
(client_address[0],
client_address[1]))
self.handlerClass(conn_sock, client_address, self)
def handle_close(self):
self.close()
and to use it:
interface = "0.0.0.0"
port = 8080
server = asyncore_echo_server.EchoServer((interface, port))
server.serve_forever()
阅读(1466) | 评论(0) | 转发(0) |