Chinaunix首页 | 论坛 | 博客
  • 博客访问: 165653
  • 博文数量: 38
  • 博客积分: 1831
  • 博客等级: 上尉
  • 技术积分: 310
  • 用 户 组: 普通用户
  • 注册时间: 2007-08-07 11:08
文章分类

全部博文(38)

文章存档

2013年(2)

2012年(27)

2011年(1)

2010年(8)

分类:

2012-11-02 14:02:49

原文地址:pyDES不支持多线程吗 作者:hawkli

今天对项目的各个已知BUG进行回溯,一个已经有一个月多的BUG终于有时间开始找原因。
写了二个测试程序,研究pyDES为何在我的项目中有时解密出来的数据是乱码,程序如下:
testServer.py
  1. #!/usr/bin/python
  2. #-*- coding:utf-8-*-

  3. import os,sys
  4. import SocketServer,socket
  5. import time,datetime
  6. import base64
  7. import pyDes
  8. import threading,Queue

  9. class MyServer(SocketServer.BaseRequestHandler):
  10.     def handle(self):
  11.         while True:
  12.             recvData = None
  13.             try:
  14.                 recvData = self.request.recv(sockSize)
  15.                 if not recvData:
  16.                     time.sleep(1)
  17.                     continue
  18.                 recvData1 = _3ds.Decrypt(recvData)
  19.                 userIp = str(self.client_address[0])
  20.                 output = "recvData OK."
  21.                 self.request.sendall(output)
  22.                 tt = str(time.strftime('[%Y-%m-%d_%H:%M:%S]'))
  23.                 writeLog(str(tt+"|decode="+recvData1)+"|"+userIp+"|recvData="+recvData)
  24.             except:
  25.                 writeLog("recvData Error:"+userIp+str(sys.exc_info()[0])+str(sys.exc_info()[1]))
  26.     

  27.             out = str(tt)+"recvData="+str(recvData)+"decode="+str(recvData1)
  28.             print out.decode("utf-8").encode("GBK")

  29. class CLogInfo():
  30.     """日志记录模块
  31.     """
  32.     def __init__(self, logfile):
  33.         try:
  34.             import logging
  35.             self.logger = None
  36.             self.logger = logging.getLogger()
  37.             self.hdlr = logging.FileHandler(logfile)
  38.             formatter = logging.Formatter("[%(asctime)s] %(message)s", "%Y-%m-%d %H:%M:%S")
  39.             self.hdlr.setFormatter(formatter)
  40.             self.logger.addHandler(self.hdlr)
  41.             self.logger.setLevel(logging.DEBUG)
  42.         except:
  43.             ##print "log init error!"
  44.             exit(1)

  45.     def output(self,logInfo,errFlag=0):
  46.         try:
  47.             if (errFlag):
  48.                 ##print "error:"+logInfo
  49.                 self.logger.error("error:"+logInfo)
  50.             else:
  51.                 ##print logInfo
  52.                 self.logger.info(logInfo)
  53.         except:
  54.             ##print "log output error!"
  55.             exit(1)

  56.     def close(self):
  57.         try:
  58.             self.logger.removeHandler(self.hdlr)
  59.         except:
  60.             #print "log closed error!"
  61.             exit(1)

  62. def writeLog(info):
  63.     """记录日志方法
  64.     """
  65.     try:
  66.         log.output(info)
  67.         
  68.     except:
  69.         print "writeLog Error:"+str(sys.exc_info()[0])+str(sys.exc_info()[1])
  70.         pass
  71.     
  72. class _3DES():
  73.     """加密模块
  74.     """
  75.     def __init__(self):
  76.         self._3DES = pyDes.triple_des('0123456789012345', pyDes.CBC, '01234567', pad = None, padmode = pyDes.PAD_PKCS5)
  77.     def Encrypt(self,data):
  78.         e = self._3DES.encrypt(data)
  79.         return base64.b64encode(e)
  80.   
  81.     def Decrypt(self,enData):
  82.         d = base64.b64decode(enData)
  83.         return self._3DES.decrypt(d)


  84.    
  85. if __name__ == "__main__":
  86.     sockSize = 640000
  87.     _3ds = _3DES()
  88.     global LOG,log
  89.     LOG = "c:\\testserver.log"
  90.     log = CLogInfo(LOG)
  91.     serverIP = "192.168.0.1"
  92.     port = "1111"
  93.     print "start..."
  94.     
  95.     srv = SocketServer.ThreadingTCPServer((serverIP, int(port)),MyServer)
  96.     srv.serve_forever()
send.py,更简单,就是一个循环不停的发:
  1. #!/usr/bin/python
  2. #-*- coding:utf-8-*-

  3. import os,sys
  4. import ConfigParser
  5. import SocketServer,socket
  6. import time,datetime
  7. import base64
  8. import pyDes
  9. import threading,Queue

  10. class _3DES():
  11.     """加密模块
  12.     """
  13.     def __init__(self):
  14.         self._3DES = pyDes.triple_des('0123456789012345', pyDes.CBC, '01234567', pad = None, padmode = pyDes.PAD_PKCS5)
  15.     def Encrypt(self,data):
  16.         e = self._3DES.encrypt(data)
  17.         return base64.b64encode(e)
  18.   
  19.     def Decrypt(self,enData):
  20.         d = base64.b64decode(enData)
  21.         return self._3DES.decrypt(d)

  22. def CommandRunThread(id,IP,PORT,sendCMD):
  23.     """运行命令的线程模块
  24.     """
  25.     try:
  26.         sock = getSock(IP, int(PORT))
  27.         sock.sendall(sendCMD)
  28.         TResponse = sock.recv(64000)
  29.         receiveData = str(IP) +": " + TResponse
  30.         sock.close()
  31.     except:
  32.         receiveData = str(IP) + ":Connect failed. 无法连接.\n"
  33.         #print receiveData[IP]
  34.     return receiveData

  35. def getSock(host,port):
  36.     socket.setdefaulttimeout(10)
  37.     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  38.     sock.connect((host, port))
  39.     return sock
  40.     

  41. if __name__ == "__main__":
  42.     while True:
  43.         _3ds = _3DES()
  44.         cmd = '中文cmdcommand\3mv123325i2,中文测试'
  45.         cmd = _3ds.Encrypt(cmd)
  46.         result1 = CommandRunThread(id,'192.168.0.1','1111',cmd)
  47.         print result1.decode('utf-8').encode("gbk")
经过检查,发现当一对一的情况下,程序很正常,但是如果在不同的机器上各启动一个send.py,服务端就立即报错,收到的数据非常的妖异,部分字串解密成功,部分乱掉看都看不懂,我分析认为是pyDES在socketServer中无法下常使用的原因。只要改为base64就什么问题都没了。
目前无解中,如果一直找不到解决方案,只能放弃pyDES了。
不知道还有什么加解密方案可以用的。

2011年2月22日更新:
group.google上有人让我在纯多线程环境下写个测试来证明的确是多线程的问题,前个月杂事很多,一直没官,今天花了十分钟整了个测试程序,明确pyDES肯定在多线程下有问题的,我已经去邮件问pyDES的开发者的,看是否能有解决方案,目前我非常的困扰,我需要一个支持多线程的不受操作系统环境影响的加解密解决方案,速度能接受就行。如果实在不行,只能自己写个算法了,对自己写算法没有信心。

  1. #!/usr/bin/python
  2. #-*- coding:utf-8-*-
  3. import Queue, threading, sys, os
  4. import time,urllib
  5. import base64
  6. import pyDes

  7. # working thread
  8. class Worker(threading.Thread):
  9.     worker_count = 0
  10.     def __init__( self, workQueue, resultQueue, timeout = 0, **kwds):
  11.         threading.Thread.__init__( self, **kwds )
  12.         self.id = Worker.worker_count
  13.         Worker.worker_count += 1
  14.         self.setDaemon( True )
  15.         self.workQueue = workQueue
  16.         self.resultQueue = resultQueue
  17.         self.timeout = timeout
  18.         self.start( )

  19.     def run( self ):
  20.         ''' the get-some-work, do-some-work main loop of worker threads '''
  21.         while True:
  22.             try:
  23.                 callable, args, kwds = self.workQueue.get(timeout=self.timeout)
  24.                 res = callable(*args, **kwds)
  25.                 print "worker[%2d]: %s" % (self.id, str(res) )
  26.                 self.resultQueue.put( res )
  27.             except Queue.Empty:
  28.                 break
  29.             except :
  30.                 print 'worker[%2d]' % self.id, sys.exc_info()[:2]
  31.                
  32. class WorkerManager:
  33.     def __init__( self, num_of_workers=10, timeout = 1):
  34.         self.workQueue = Queue.Queue()
  35.         self.resultQueue = Queue.Queue()
  36.         self.workers = []
  37.         self.timeout = timeout
  38.         self._recruitThreads( num_of_workers )

  39.     def _recruitThreads( self, num_of_workers ):
  40.         for i in range( num_of_workers ):
  41.             worker = Worker( self.workQueue, self.resultQueue, self.timeout )
  42.             self.workers.append(worker)

  43.     def wait_for_complete( self):
  44.         # ...then, wait for each of them to terminate:
  45.         while len(self.workers):
  46.             worker = self.workers.pop()
  47.             worker.join( )
  48.             if worker.isAlive() and not self.workQueue.empty():
  49.                 self.workers.append( worker )
  50.         print "All jobs are are completed."

  51.     def add_job( self, callable, *args, **kwds ):
  52.         self.workQueue.put( (callable, args, kwds) )
  53.     
  54.     def get_result( self, *args, **kwds ):
  55.         return self.resultQueue.get( *args, **kwds )



  56. def test_job(id, sleep = 0.001 ):
  57.     try:
  58.         result = _3ds.Encrypt("hello")
  59.         #result = base64.b64encode("HELLO")
  60.         print id, result
  61.     except:
  62.         print '[%4d]' % id, sys.exc_info()[:2]
  63.     return id

  64. def test():
  65.     import socket
  66.     socket.setdefaulttimeout(10)
  67.     print 'start testing'
  68.     wm = WorkerManager(10)
  69.     for i in range(100):
  70.         wm.add_job( test_job, i, i*0.001 )
  71.     wm.wait_for_complete()
  72.     print 'end testing'

  73. class _3DES():
  74.     """加密模块
  75.     """
  76.     def __init__(self):
  77.         self._3DES = pyDes.triple_des('0123456789012345', pyDes.CBC, '01234567', pad = None, padmode = pyDes.PAD_PKCS5)
  78.     def Encrypt(self,data):
  79.         e = self._3DES.encrypt(data)
  80.         return base64.b64encode(e)
  81.   
  82.     def Decrypt(self,enData):
  83.         d = base64.b64decode(enData)
  84.         return self._3DES.decrypt(d)


  85. if __name__ == '__main__':
  86.     _3ds = _3DES()
  87.     test()

执行以上程序,程序抛出大量乱七八糟的异常,但只要改成base64,什么问题都没了。

2月22日第二次更新:

我得到了作者的回复,pyDES不是线程安全的。。。如果要在线程中使用,需要为每一个线程准备一个实例。。。。
所以,无解了,准备自己的加解密算法吧,先翻书去了。
阅读(965) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~