修改加入群发的服务器端版本,更好的理解多线程内通信
import threading,socket,sys,os,time
import daemon
SERVER = "0.0.0.0"
PORT = 8888
def MyTalkThread(sock,num):
import struct
global speakWords
global sockectPool
global sockLock
while True:
try:
curSock,useradd = sock.accept()
sockectPool.append(curSock)
curSock.settimeout(20)
#这里修改群发内容是做telnet测试用,客户端写好后应该把这个卸载正确包判断内
sockLock.acquire()
speakWords = "loli\n\r"
sockLock.release()
while True:
print curSock
getData = curSock.recv(1024)
try:
packageType,packageLong,packageData = struct.unpack('BB1022S',getData)
except:
print "error package"
print len(getData),":",getData
break
if packageType == 1000:
packageData = packageData[:pacakgeLong]
print num,"Thread get data",packageData
else:
break
sockectPool.reomve(curSock)
curSock.close()
except:
print "error or time out"
sockectPool.remove(curSock)
curSock.close()
class TalkServer(object):
def __init__(self):
self.socket = None
def run(self):
global sockectPool
global speakWords
global sockLock
speakWords = ''
sockectPool = []
sockLock = threading.Lock()
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind((SERVER,PORT))
self.socket.listen(10)
thread_pool = []
for i in range(10):
TalkThread = threading.Thread(target=MyTalkThread,args=(self.socket,i))
TalkThread.start()
thread_pool.append(TalkThread)
while True:
if len(sockectPool) > 0 and len(speakWords) > 0:
for threadSock in sockectPool:
threadSock.send(speakWords)
sockLock.acquire()
speakWords =''
sockLock.release()
time.sleep(1)
if __name__ == '__main__':
myas = TalkServer()
myas.run()
终于通过global sockectPool,global speakWords两个全局变量实现了群发!当然要有个全局变量用来加锁互斥。不过1秒中内多有人发消息就会丢群发内容,锁的方式还要修改,应该在线程内修改speakWords变量前面加个循环while speakWords !='':这样就不会丢群发内容了
线程还没增加线程的代码,不过应该很容易在外面加了
晚上写着写着终于稍微搞懂点用锁后才发现之前思路了错误了
之前一直想把接收到的包负值到 一个全局变量,然后锁住线程,再在线程外用函数(就是 if len(sockectPool) > 0 and len(speakWords) > 0那里)群发这个变量后清空变量
但是这样作一直有个问题,线程中的锁了以后线程外的 函数貌似不理锁的?
后来改变思路。。。。。当每个线程建立连接后,把当前sock追加到一个全局变量sockPool中
当任意一个sock收到包后,从sockPool中取出所有已经连接的sock然后群发包内容就是。。。。
这样完全不需要锁(实际最后我还是加了锁,原因见最后)
代码改为
#! /usr/bin/python
import daemon
import threading,socket,sys,os,time
SERVER = "0.0.0.0"
PORT = 8888
def MyTalkThread(sock,num):
import struct
global sockectPool
global sockLock
while True:
try:
curSock,useradd = sock.accept()
curSock.settimeout(120)
threadInfo = {'thrID':num,'thrUser':'','thrSock':curSock}
while True:
getData = curSock.recv(1024)
print "Thread",num,"get data"
sockLock.acquire()
print "locked"
if threadInfo not in sockectPool:
sockectPool.append(threadInfo)
try:
packageType,packageLong,packageData = struct.unpack('BB1022s',getData)
print packageType,packageLong,packageData
except:
print "error package"
print len(getData),":",getData
sockectPool.remove(threadInfo)
break
if packageType == 100:
print "no err"
packageData = packageData[:packageLong]
for ThreadInfoPool in sockectPool:
ThreadInfoPool['thrSock'].send(packageData)
print "right free lock"
sockLock.release()
else:
sockectPool.remove(threadInfo)
break
print "out free lock"
sockLock.release()
curSock.close()
except:
if threadInfo in sockectPool:sockectPool.remove(threadInfo)
if sockLock.locked():sockLock.release()
print "error or time out"
curSock.close()
class TalkServer(object):
def __init__(self):
self.socket = None
def run(self):
global sockectPool
global sockLock
speakWords = ''
sockectPool = []
sockLock = threading.Lock()
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind((SERVER,PORT))
self.socket.listen(10)
thread_pool = []
for i in range(10):
TalkThread = threading.Thread(target=MyTalkThread,args=(self.socket,i))
TalkThread.start()
thread_pool.append(TalkThread)
if __name__ == '__main__':
myas = TalkServer()
myas.run()
不过保留锁似乎更安全,应为sockectPool的append和remove虽然基本不会碰到其他线程的sock,多个线程同时操作一个变量不加锁不知道安全不安全