Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1102471
  • 博文数量: 170
  • 博客积分: 1603
  • 博客等级: 上尉
  • 技术积分: 1897
  • 用 户 组: 普通用户
  • 注册时间: 2010-07-09 15:54
文章分类

全部博文(170)

文章存档

2016年(27)

2015年(21)

2014年(27)

2013年(21)

2012年(7)

2011年(67)

我的朋友

分类: Python/Ruby

2011-04-21 18:48:28

修改加入群发的服务器端版本,更好的理解多线程内通信
import threading,socket,sys,os,time
import daemon
SERVER = "0.0.0.0"
PORT = 8888

def MyTalkThread(sock,num):
    import struct
    global speakWords
    global sockectPool
    global sockLock
    while True:
        try:
            curSock,useradd = sock.accept()
            sockectPool.append(curSock)
            curSock.settimeout(20)
#这里修改群发内容是做telnet测试用,客户端写好后应该把这个卸载正确包判断内
            sockLock.acquire()
            speakWords = "loli\n\r"
            sockLock.release()
            while True:
                print curSock
                getData = curSock.recv(1024)
                try:
                    packageType,packageLong,packageData = struct.unpack('BB1022S',getData)
                except:
                    print "error package"
                    print len(getData),":",getData
                    break
                if packageType == 1000:
                    packageData = packageData[:pacakgeLong]
                    print num,"Thread get data",packageData
                else:
                    break
            sockectPool.reomve(curSock)
            curSock.close()
        except:
            print "error or time out"
            sockectPool.remove(curSock)
            curSock.close()

class TalkServer(object):
    def __init__(self):
        self.socket = None
    def run(self):
        global sockectPool
        global speakWords
        global sockLock
        speakWords = ''
        sockectPool = []
        sockLock = threading.Lock()
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.bind((SERVER,PORT))
        self.socket.listen(10)
        thread_pool = []
        for i in range(10):
            TalkThread = threading.Thread(target=MyTalkThread,args=(self.socket,i))
            TalkThread.start()
            thread_pool.append(TalkThread)
        while True:
            if len(sockectPool) > 0 and len(speakWords) > 0:
                for threadSock in sockectPool:
                    threadSock.send(speakWords)
                sockLock.acquire()
                speakWords =''
                sockLock.release()
            time.sleep(1)

if __name__ == '__main__':
    myas = TalkServer()
    myas.run()

终于通过global sockectPool,global speakWords两个全局变量实现了群发!当然要有个全局变量用来加锁互斥。不过1秒中内多有人发消息就会丢群发内容,锁的方式还要修改,应该在线程内修改speakWords变量前面加个循环while speakWords !='':这样就不会丢群发内容了
线程还没增加线程的代码,不过应该很容易在外面加了


晚上写着写着终于稍微搞懂点用锁后才发现之前思路了错误了
之前一直想把接收到的包负值到 一个全局变量,然后锁住线程,再在线程外用函数(就是  if len(sockectPool) > 0 and len(speakWords) > 0那里)群发这个变量后清空变量
但是这样作一直有个问题,线程中的锁了以后线程外的 函数貌似不理锁的?

后来改变思路。。。。。当每个线程建立连接后,把当前sock追加到一个全局变量sockPool中
当任意一个sock收到包后,从sockPool中取出所有已经连接的sock然后群发包内容就是。。。。
这样完全不需要锁(实际最后我还是加了锁,原因见最后)
代码改为
#! /usr/bin/python
import daemon

import threading,socket,sys,os,time
SERVER = "0.0.0.0"
PORT = 8888

def MyTalkThread(sock,num):
    import struct
    global sockectPool
    global sockLock
    while True:
        try:
            curSock,useradd = sock.accept()
            curSock.settimeout(120)
            threadInfo = {'thrID':num,'thrUser':'','thrSock':curSock}
            while True:
                getData = curSock.recv(1024)
                print "Thread",num,"get data"
                sockLock.acquire()
                print "locked"
                if threadInfo not in sockectPool:
                    sockectPool.append(threadInfo)
                try:
                    packageType,packageLong,packageData = struct.unpack('BB1022s',getData)
                    print packageType,packageLong,packageData
                except:
                    print "error package"
                    print len(getData),":",getData
                    sockectPool.remove(threadInfo)
                    break
                if packageType == 100:
                    print "no err"
                    packageData = packageData[:packageLong]
                    for ThreadInfoPool in sockectPool:
                            ThreadInfoPool['thrSock'].send(packageData)
                    print "right free lock"
                    sockLock.release()
                else:
                    sockectPool.remove(threadInfo)
                    break
            print "out free lock"
            sockLock.release()
            curSock.close()
        except:
            if threadInfo in sockectPool:sockectPool.remove(threadInfo)
            if sockLock.locked():sockLock.release()
            print "error or time out"
            curSock.close()

class TalkServer(object):
    def __init__(self):
        self.socket = None
    def run(self):
        global sockectPool
        global sockLock
        speakWords = ''
        sockectPool = []
        sockLock = threading.Lock()
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.bind((SERVER,PORT))
        self.socket.listen(10)
        thread_pool = []
        for i in range(10):
            TalkThread = threading.Thread(target=MyTalkThread,args=(self.socket,i))
            TalkThread.start()
            thread_pool.append(TalkThread)

if __name__ == '__main__':
    myas = TalkServer()
    myas.run()

不过保留锁似乎更安全,应为sockectPool的append和remove虽然基本不会碰到其他线程的sock,多个线程同时操作一个变量不加锁不知道安全不安全
阅读(1028) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~