Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1102512
  • 博文数量: 170
  • 博客积分: 1603
  • 博客等级: 上尉
  • 技术积分: 1897
  • 用 户 组: 普通用户
  • 注册时间: 2010-07-09 15:54
文章分类

全部博文(170)

文章存档

2016年(27)

2015年(21)

2014年(27)

2013年(21)

2012年(7)

2011年(67)

我的朋友

分类: Python/Ruby

2011-04-23 12:58:13

今天和程序员聊天说到这个socket编程,发现之前的思路是错误的
之前的想法是socket.listen 10,线程先开5,如果超过5个用户再新开线程(这里完全是受了apache配置文件中初始进程数,最低进程数量那部分的影响)...
聊天后发现这样写是不正确的开销太大,实际使用时还是使用select、epoll等模型(之前查非阻塞的时候也知道实际要用select来写,但当时考虑到多线程不用考虑非阻塞所以暂时没打算上select,现在看来不上不行,否则整体思路都不正确)
看来今天还要大修服务器端呐,写完放上代码
注:python2.6以上才有epoll,redhat5系默认是python2.4

网上查些资料,找到下面的连接
里面说得比较清楚了,网上千篇一律的说select实现非阻塞socket,但是对于刚写socket的人来说,非阻塞有什么意义?比如说我之前的思路就是开多个线程去监听,同步阻塞的写法不也是正确的么?
上面的文章就给了我答案
“不过子进程阿,线程之类的还是落伍了,有一些好的IO复用方式可以实现非阻塞的网络编程”
看过这句话就明白了,io复用——当然是大大降低资源消耗啊

写了一下午select终于彻底搞明白了。
关于同步,异步,阻塞非阻塞,之前怎么翻资料都觉得好像是看懂了,但是用起来的时候总觉得还是没理解。无论别人比喻用的多么精彩形象,用的时候总感觉不对,写玩select后终于理解了。先放上一段理解select的代码——如果你不是程序员的话,光看网上(比如上面的连接)的select代码,根本没办法理解select。
select简单来说就是异步处理socket缓冲区的内容,举个例子
比如说在没有用select的情况下,socket.accept()后会一直等待连接,这就是阻塞。但是用了select的情况呢,比较下面两段代码。
==================================================================
代码1:
socket.bind((SERVER,PORT))
socket.listen(10)
socket.accpet()
代码2:
socket.bind((SERVER,PORT))
socket.listen(10)
a,b,c = select.select([socket,],[],[],0)
    if len(a) !=0:
socket.accpet()

实际上select并没有把recv的阻塞变成非阻塞(time参数是设置自己select本身是否是阻塞的),其实是select对缓冲区扫描的返回结果让程序决定是否去执行socket.accpet()。之后你在一个死循环中不停的通过select扫描socket缓冲区,一但有对应内容就select就返回给a,a不等于0就开始接受数据---即socket.accept(),缓冲区的数据再发过来,这样就形成了异步(同步就是数据发过来立刻accpet()而不是等待select返回缓冲区结果再去接受数据),看懂这里了select,同步异步阻塞非阻塞也就明白了。

Format

C Type

Python type

Standard size

Notes

x

pad byte

no value

 

 

c

char

string of length 1

1

 

b

signed char

integer

1

(3)

B

unsigned char

integer

1

(3)

?

_Bool

bool

1

(1)

h

short

integer

2

(3)

H

unsigned short

integer

2

(3)

i

int

integer

4

(3)

I

unsigned int

integer

4

(3)

l

long

integer

4

(3)

L

unsigned long

integer

4

(3)

q

long long

integer

8

(2), (3)

Q

unsigned long long

integer

8

(2), (3)

f

float

float

4

(4)

d

double

float

8

(4)

s

char[]

string

 

 

p

char[]

string

 

 

P

void *

integer

 

(5), (3)


下面是可以用telnet的测试代码,代码也可以改为2个线程
一个专门用来监听新socket连接并把新连接写入socket_pool,一个专门处理与客户端之间的socket连接。
  1. import daemon
  2. import threading,socket,select,sys,os,time
  3. SERVER = "0.0.0.0"
  4. PORT = 8888
  5. #下面是通用的python守护进程类,网上下的,放开后运行就直接是守护进程了
  6. #p = daemon.DaemonContext()
  7. #p.open()

  8. class TalkServer(object):

  9.     def __init__(self):
  10. #监听socket,就是bind  ip 和port
  11.         self.socket = None
  12. #连接池
  13.         self.socket_pool = []
  14. #错误的连接(比如空包或解包错误的连接)
  15.         self.errPackageCount_pool = {}
  16.         self.Lock = None

  17.     def GetThread(self):
  18. #永真循环
  19.         while True:
  20. #select 绑定的socket
  21.             GetList,SendList,ErrList = select.select([self.socket,],[],[],0)
  22. #如果获取到连接,就把连接压到连接池中(实际写项目代码貌似不用list而是用一个队列)
  23.             if len(GetList) > 0:
  24.                 try:
  25.                     curSock,userAddr = self.socket.accept()
  26. # curSock.settimeout(15)
  27.                     self.socket_pool.append(curSock)
  28.                     print "get new socket"
  29.                 except:
  30.                     print "error or time out"

  31.             get_sock_pool,send_sock_pool,err_sock_pool = select.select(self.socket_pool,[],[],0)

  32.             if len(get_sock_pool) == 0 and len(self.socket_pool) == 0:print "no connection and errcountpool num",len(self.errPackageCount_pool);time.sleep(3)
  33. # else: print self.socket_pool;time.sleep(1)
  34. #如果连接池中有连接
  35.             if len(get_sock_pool) > 0:
  36.                 import struct
  37.                 for curSock in get_sock_pool:
  38. #接受连接数据
  39.                     getData = curSock.recv(1024)
  40. #如果当前连接数据大小为0吧连接丢入错误池并累加数据,超过5次关闭连接
  41.                     if len(getData) == 0:
  42.                         print curSock,"will disconnect!!"
  43.                         if curSock in self.errPackageCount_pool.keys():
  44.                             if self.errPackageCount_pool[curSock] >= 5:
  45.                                 self.errPackageCount_pool.pop(curSock)
  46.                                 self.socket_pool.remove(curSock)
  47.                                 curSock.close()
  48.                             else:
  49.                                 self.errPackageCount_pool[curSock] +=1
  50.                         else:
  51.                             self.errPackageCount_pool[curSock] = 1
  52.                         continue
  53.                     try:
  54.                         packageType,packageLong,packageData = struct.unpack('BB1022s',getData)
  55. #解包错误也压入错误池
  56.                     except:
  57.                         print "error package",len(getData),"::::::::::::::",getData
  58.                         if curSock in self.errPackageCount_pool.keys():
  59.                             print self.errPackageCount_pool[curSock]
  60.                             if self.errPackageCount_pool[curSock] >= 5:
  61.                                 self.errPackageCount_pool.pop(curSock)
  62.                                 self.socket_pool.remove(curSock)
  63.                                 curSock.close()
  64.                             else:
  65.                                 self.errPackageCount_pool[curSock] +=1
  66.                         else:self.errPackageCount_pool[curSock] = 1
  67.                         continue
  68.                     if packageType in (100,101,102,103):
  69.                         print "no err"
  70.                         if curSock in self.errPackageCount_pool.keys():self.errPackageCount_pool.pop(curSock)
  71.                         if packageType == 100:
  72.                             for sock_in_pool in self.socket_pool:
  73.                                 if curSock is not sock_in_pool:
  74.                                     errSendCount = 0
  75.                                     sock_in_pool.setblocking(1)
  76.                                     while errSendCount <=5:
  77.                                         try:
  78.                                             sock_in_pool.send(getData)
  79.                                             break
  80.                                         except:
  81.                                             errSendCount += 1
  82.                                     sock_in_pool.setblocking(0)
  83.                                     if errSendCount >= 5:
  84.                                         sock_in_pool.close()
  85.                                         self.socket_pool.remove(sock_in_pool)
  86.                                         
  87.                         elif packageType == 101:
  88.                             print "get quit word"
  89.                             curSock.send(struct.pack('BB1022s',101,8,'self_out'))
  90.                             self.socket_pool.remove(curSock)
  91.                             curSock.close()
  92.                     else:
  93.                         print "error package type!"
  94.                         if curSock in self.errPackageCount_pool.keys():
  95.                             if self.errPackageCount_pool[curSock] >= 5:
  96.                                 self.errPackageCount_pool.pop(curSock)
  97.                                 self.socket_pool.remove(curSock)
  98.                                 curSock.close()
  99.                             else:
  100.                                 self.errPackageCount_pool[curSock] +=1
  101.                         else:self.errPackageCount_pool[curSock] = 1
  102.                         continue
  103.             
  104.             if len(err_sock_pool) > 0:
  105.                 print "second part"
  106.                 print err_sock_pool
  107.                 for sock in err_sock_pool:
  108.                     if sock in self.errPackageCount_pool.keys():self.errPackageCount_pool.pop(sock)
  109.                     self.socket_pool.remove(sock)
  110.                     sock.close()

  111.     def run(self):
  112.         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  113.         for i in range(10):
  114.             try:
  115.                 self.socket.bind((SERVER,PORT))
  116.                 break
  117.             except:
  118.                 print "fail,sleep 10"
  119.             time.sleep(10)
  120.         print "strt listening"
  121. # self.socket.setblocking(1)
  122.         self.socket.listen(10)
  123.         self.sockLock = threading.Lock()
  124.         GetThread = threading.Thread(target=self.GetThread)
  125.         GetThread.start()

  126. if __name__ == '__main__':
  127.     mytalk_server = TalkServer()
  128.     mytalk_server.run()
今天又修改了下代码加了个错误次数统计池self.errsockCount_pool

======================================================================================
下面是客户端代码,增加了个连接状态self.serverStatus。
  1. import threading,socket,select,sys,os,time
  2. import struct
  3. SERVER = "127.0.0.1"
  4. PORT = 8888

  5. class TalkClient(object):

  6.     def __init__(self):
  7.         self.socket = None
  8.         self.serverStatus = 0

  9.     def getChar(self):
  10.         errCount = 0
  11.         while True:
  12.             if self.serverStatus == 0:
  13.                 break
  14.             Data = self.socket.recv(1024)
  15.             if len(Data) == 0:
  16.                 if errCount >=5:
  17.                     self.serverStatus = 0
  18.                 errCount +=1
  19.                 print "server mabe close try more",6-errCount,"times"
  20.                 continue
  21.             packageType,packageLong,backChar = struct.unpack('BB1022s',Data)
  22.             errCount = 0
  23.             if packageType == 101:
  24.                 words = backChar[0:packageLong]
  25.                 print words
  26.                 serverStatus = 0
  27.                 break
  28.             print backChar[0:packageLong]

  29.     def sendChar(self):
  30.         while True:
  31.             errCount = 0
  32.             if self.serverStatus == 0:break
  33.             loli = raw_input()
  34.             packageType = 100
  35.             if loli == 'quit':
  36.                 packageType = 101
  37.             packageLong = len(loli)
  38.             package = struct.pack('BB1022s',packageType,packageLong,loli)
  39.             while errCount <=5:
  40.                 try:
  41.                     self.socket.send(package)
  42.                     break
  43.                 except:
  44.                     print errCount
  45.                     errCount += 1
  46.             if errCount >=5:
  47.                 self.serverStatus = 0
  48.                 break
  49.             if packageType == 101:break

  50.     def run(self):
  51.         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  52.         try:
  53.             self.socket.connect((SERVER,PORT))
  54.             self.serverStatus = 1
  55.         except:
  56.             print "connect refused exit"
  57.             sys.exit(0)
  58.         getThread = threading.Thread(target=self.getChar)
  59.         sendThread = threading.Thread(target=self.sendChar)
  60.         getThread.start()
  61.         sendThread.start()
  62.         threading.Thread.join(sendThread)
  63.         threading.Thread.join(getThread)
  64. # time.sleep(3)
  65.         self.socket.close()

  66. if __name__ == '__main__':
  67.     myas = TalkClient()
  68.     myas.run()
  69.     print "exit"
  70.     sys.exit(0)
阅读(2774) | 评论(0) | 转发(1) |
0

上一篇:Python写聊天室.Part 4

下一篇:python进度条

给主人留下些什么吧!~~