今天和程序员聊天说到这个socket编程,发现之前的思路是错误的
之前的想法是socket.listen 10,线程先开5,如果超过5个用户再新开线程(这里完全是受了apache配置文件中初始进程数,最低进程数量那部分的影响)...
聊天后发现这样写是不正确的开销太大,实际使用时还是使用select、epoll等模型(之前查非阻塞的时候也知道实际要用select来写,但当时考虑到多线程不用考虑非阻塞所以暂时没打算上select,现在看来不上不行,否则整体思路都不正确)
看来今天还要大修服务器端呐,写完放上代码
注:python2.6以上才有epoll,redhat5系默认是python2.4
网上查些资料,找到下面的连接
里面说得比较清楚了,网上千篇一律的说select实现非阻塞socket,但是对于刚写socket的人来说,非阻塞有什么意义?比如说我之前的思路就是开多个线程去监听,同步阻塞的写法不也是正确的么?
上面的文章就给了我答案
“不过子进程阿,线程之类的还是落伍了,有一些好的IO复用方式可以实现非阻塞的网络编程”
看过这句话就明白了,io复用——当然是大大降低资源消耗啊
写了一下午select终于彻底搞明白了。
关于同步,异步,阻塞非阻塞,之前怎么翻资料都觉得好像是看懂了,但是用起来的时候总觉得还是没理解。无论别人比喻用的多么精彩形象,用的时候总感觉不对,写玩select后终于理解了。先放上一段理解select的代码——如果你不是程序员的话,光看网上(比如上面的连接)的select代码,根本没办法理解select。
select简单来说就是异步处理socket缓冲区的内容,举个例子
比如说在没有用select的情况下,socket.accept()后会一直等待连接,这就是阻塞。但是用了select的情况呢,比较下面两段代码。
==================================================================
代码1:
socket.bind((SERVER,PORT))
socket.listen(10)
socket.accpet()
代码2:
socket.bind((SERVER,PORT))
socket.listen(10)
a,b,c = select.select([socket,],[],[],0)
if len(a) !=0:
socket.accpet()
实际上select并没有把recv的阻塞变成非阻塞(time参数是设置自己select本身是否是阻塞的),其实是select对缓冲区扫描的返回结果让程序决定是否去执行socket.accpet()。之后你在一个死循环中不停的通过select扫描socket缓冲区,一但有对应内容就select就返回给a,a不等于0就开始接受数据---即socket.accept(),缓冲区的数据再发过来,这样就形成了异步(同步就是数据发过来立刻accpet()而不是等待select返回缓冲区结果再去接受数据),看懂这里了select,同步异步阻塞非阻塞也就明白了。
Format
|
C Type
|
Python type
|
Standard size
|
Notes
|
x
|
pad byte
|
no value
|
|
|
c
|
char
|
string of length 1
|
1
|
|
b
|
signed char
|
integer
|
1
|
(3)
|
B
|
unsigned char
|
integer
|
1
|
(3)
|
?
|
_Bool
|
bool
|
1
|
(1)
|
h
|
short
|
integer
|
2
|
(3)
|
H
|
unsigned short
|
integer
|
2
|
(3)
|
i
|
int
|
integer
|
4
|
(3)
|
I
|
unsigned int
|
integer
|
4
|
(3)
|
l
|
long
|
integer
|
4
|
(3)
|
L
|
unsigned long
|
integer
|
4
|
(3)
|
q
|
long long
|
integer
|
8
|
(2), (3)
|
Q
|
unsigned long long
|
integer
|
8
|
(2), (3)
|
f
|
float
|
float
|
4
|
(4)
|
d
|
double
|
float
|
8
|
(4)
|
s
|
char[]
|
string
|
|
|
p
|
char[]
|
string
|
|
|
P
|
void *
|
integer
|
|
(5), (3)
|
下面是可以用telnet的测试代码,代码也可以改为2个线程
一个专门用来监听新socket连接并把新连接写入socket_pool,一个专门处理与客户端之间的socket连接。
- import daemon
-
import threading,socket,select,sys,os,time
-
SERVER = "0.0.0.0"
-
PORT = 8888
- #下面是通用的python守护进程类,网上下的,放开后运行就直接是守护进程了
-
#p = daemon.DaemonContext()
-
#p.open()
-
-
class TalkServer(object):
-
-
def __init__(self):
- #监听socket,就是bind ip 和port
-
self.socket = None
- #连接池
-
self.socket_pool = []
- #错误的连接(比如空包或解包错误的连接)
-
self.errPackageCount_pool = {}
-
self.Lock = None
-
-
def GetThread(self):
- #永真循环
-
while True:
- #select 绑定的socket
-
GetList,SendList,ErrList = select.select([self.socket,],[],[],0)
- #如果获取到连接,就把连接压到连接池中(实际写项目代码貌似不用list而是用一个队列)
-
if len(GetList) > 0:
-
try:
-
curSock,userAddr = self.socket.accept()
-
# curSock.settimeout(15)
-
self.socket_pool.append(curSock)
-
print "get new socket"
-
except:
-
print "error or time out"
-
-
get_sock_pool,send_sock_pool,err_sock_pool = select.select(self.socket_pool,[],[],0)
-
-
if len(get_sock_pool) == 0 and len(self.socket_pool) == 0:print "no connection and errcountpool num",len(self.errPackageCount_pool);time.sleep(3)
-
# else: print self.socket_pool;time.sleep(1)
-
#如果连接池中有连接
-
if len(get_sock_pool) > 0:
-
import struct
-
for curSock in get_sock_pool:
- #接受连接数据
-
getData = curSock.recv(1024)
- #如果当前连接数据大小为0吧连接丢入错误池并累加数据,超过5次关闭连接
-
if len(getData) == 0:
-
print curSock,"will disconnect!!"
-
if curSock in self.errPackageCount_pool.keys():
-
if self.errPackageCount_pool[curSock] >= 5:
-
self.errPackageCount_pool.pop(curSock)
-
self.socket_pool.remove(curSock)
-
curSock.close()
-
else:
-
self.errPackageCount_pool[curSock] +=1
-
else:
-
self.errPackageCount_pool[curSock] = 1
-
continue
-
try:
-
packageType,packageLong,packageData = struct.unpack('BB1022s',getData)
- #解包错误也压入错误池
-
except:
-
print "error package",len(getData),"::::::::::::::",getData
-
if curSock in self.errPackageCount_pool.keys():
-
print self.errPackageCount_pool[curSock]
-
if self.errPackageCount_pool[curSock] >= 5:
-
self.errPackageCount_pool.pop(curSock)
-
self.socket_pool.remove(curSock)
-
curSock.close()
-
else:
-
self.errPackageCount_pool[curSock] +=1
-
else:self.errPackageCount_pool[curSock] = 1
-
continue
-
if packageType in (100,101,102,103):
-
print "no err"
-
if curSock in self.errPackageCount_pool.keys():self.errPackageCount_pool.pop(curSock)
-
if packageType == 100:
-
for sock_in_pool in self.socket_pool:
-
if curSock is not sock_in_pool:
-
errSendCount = 0
-
sock_in_pool.setblocking(1)
-
while errSendCount <=5:
-
try:
-
sock_in_pool.send(getData)
-
break
-
except:
-
errSendCount += 1
-
sock_in_pool.setblocking(0)
-
if errSendCount >= 5:
-
sock_in_pool.close()
-
self.socket_pool.remove(sock_in_pool)
-
-
elif packageType == 101:
-
print "get quit word"
-
curSock.send(struct.pack('BB1022s',101,8,'self_out'))
-
self.socket_pool.remove(curSock)
-
curSock.close()
-
else:
-
print "error package type!"
-
if curSock in self.errPackageCount_pool.keys():
-
if self.errPackageCount_pool[curSock] >= 5:
-
self.errPackageCount_pool.pop(curSock)
-
self.socket_pool.remove(curSock)
-
curSock.close()
-
else:
-
self.errPackageCount_pool[curSock] +=1
-
else:self.errPackageCount_pool[curSock] = 1
-
continue
-
-
if len(err_sock_pool) > 0:
-
print "second part"
-
print err_sock_pool
-
for sock in err_sock_pool:
-
if sock in self.errPackageCount_pool.keys():self.errPackageCount_pool.pop(sock)
-
self.socket_pool.remove(sock)
-
sock.close()
-
-
def run(self):
-
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
for i in range(10):
-
try:
-
self.socket.bind((SERVER,PORT))
-
break
-
except:
-
print "fail,sleep 10"
-
time.sleep(10)
-
print "strt listening"
-
# self.socket.setblocking(1)
-
self.socket.listen(10)
-
self.sockLock = threading.Lock()
-
GetThread = threading.Thread(target=self.GetThread)
-
GetThread.start()
-
-
if __name__ == '__main__':
-
mytalk_server = TalkServer()
-
mytalk_server.run()
今天又修改了下代码加了个错误次数统计池self.errsockCount_pool
======================================================================================
下面是客户端代码,增加了个连接状态self.serverStatus。
- import threading,socket,select,sys,os,time
-
import struct
-
SERVER = "127.0.0.1"
-
PORT = 8888
-
-
class TalkClient(object):
-
-
def __init__(self):
-
self.socket = None
-
self.serverStatus = 0
-
-
def getChar(self):
-
errCount = 0
-
while True:
-
if self.serverStatus == 0:
-
break
-
Data = self.socket.recv(1024)
-
if len(Data) == 0:
-
if errCount >=5:
-
self.serverStatus = 0
-
errCount +=1
-
print "server mabe close try more",6-errCount,"times"
-
continue
-
packageType,packageLong,backChar = struct.unpack('BB1022s',Data)
-
errCount = 0
-
if packageType == 101:
-
words = backChar[0:packageLong]
-
print words
-
serverStatus = 0
-
break
-
print backChar[0:packageLong]
-
-
def sendChar(self):
-
while True:
-
errCount = 0
-
if self.serverStatus == 0:break
-
loli = raw_input()
-
packageType = 100
-
if loli == 'quit':
-
packageType = 101
-
packageLong = len(loli)
-
package = struct.pack('BB1022s',packageType,packageLong,loli)
-
while errCount <=5:
-
try:
-
self.socket.send(package)
-
break
-
except:
-
print errCount
-
errCount += 1
-
if errCount >=5:
-
self.serverStatus = 0
-
break
-
if packageType == 101:break
-
-
def run(self):
-
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
try:
-
self.socket.connect((SERVER,PORT))
-
self.serverStatus = 1
-
except:
-
print "connect refused exit"
-
sys.exit(0)
-
getThread = threading.Thread(target=self.getChar)
-
sendThread = threading.Thread(target=self.sendChar)
-
getThread.start()
-
sendThread.start()
-
threading.Thread.join(sendThread)
-
threading.Thread.join(getThread)
-
# time.sleep(3)
-
self.socket.close()
-
-
if __name__ == '__main__':
-
myas = TalkClient()
-
myas.run()
-
print "exit"
-
sys.exit(0)
阅读(1704) | 评论(0) | 转发(0) |