Chinaunix首页 | 论坛 | 博客
  • 博客访问: 5119812
  • 博文数量: 921
  • 博客积分: 16037
  • 博客等级: 上将
  • 技术积分: 8469
  • 用 户 组: 普通用户
  • 注册时间: 2006-04-05 02:08
文章分类

全部博文(921)

文章存档

2020年(1)

2019年(3)

2018年(3)

2017年(6)

2016年(47)

2015年(72)

2014年(25)

2013年(72)

2012年(125)

2011年(182)

2010年(42)

2009年(14)

2008年(85)

2007年(89)

2006年(155)

分类: Python/Ruby

2012-05-01 21:48:13

I am learning how to use Twisted AMP. I am developing a program that sends data from a client to a server and inserts the data in a SQLite3 DB. The server then sends back a result to the client which indicates success or error (try and except might not be the best way to do this but it is only a temporary solution while I work out the main problem). In order to do this I modified an example I found that originally did a sum and returned the result, so I realize that this might not be the most efficient way to do what I am trying to do. In particular I am trying to do some timings on multiple insertions (i.e. send the data to the server multiple times for multiple insertions) and I have included the code I have written. It works but clearly it is not a good way to send multiple data for insertion since I am performing multiple connections before running the reactor.

I have tried several ways to get around this including passing the ClientCreator to reactor.callWhenRunning() but you cannot do this with a deferred.

Any suggestions, advice or help with how to do this would be much appreciated. Here is the code.

Server:


 

  1. from twisted.protocols import amp
  2. from twisted.internet import reactor
  3. from twisted.internet.protocol import Factory
  4. import sqlite3, time

  5. class Insert(amp.Command):
  6.     arguments = [('data', amp.Integer())]
  7.     response = [('insert_result', amp.Integer())]

  8. class Protocol(amp.AMP):
  9.     def __init__(self):
  10.        self.conn = sqlite3.connect('biomed1.db')
  11.        self.c =self.conn.cursor()
  12.        self.res=None

  13.     @Insert.responder
  14.     def dbInsert(self, data):
  15.         self.InsertDB(data) #call the DB inserter
  16.         result=self.res # send back the result of the insertion
  17.         return {'insert_result': result}

  18.     def InsertDB(self,data):
  19.       tm=time.time()
  20.       print "insert time:",tm
  21.       chx=data
  22.       PID=2
  23.       device_ID=5
  24.       try:
  25.         self.c.execute("INSERT INTO btdata4(co2_data, patient_Id, sensor_Id) VALUES ('%s','%s','%s')" % (chx, PID, device_ID))
  26.       except Exception, err:
  27.              print err
  28.              self.res=0
  29.       else:
  30.              self.res=1

  31.       self.conn.commit()


  32. pf = Factory()
  33. pf.protocol = Protocol
  34. reactor.listenTCP(1234, pf)
  35. reactor.run()

Client:

  1. from twisted.internet import reactor
  2. from twisted.internet.protocol import ClientCreator
  3. from twisted.protocols import amp
  4. import time

  5. class Insert(amp.Command):
  6.     arguments = [('data', amp.Integer())]
  7.     response = [('insert_result', amp.Integer())]

  8. def connected(protocol):
  9.     return protocol.callRemote(Insert, data=5555).addCallback(gotResult)

  10. def gotResult(result):
  11.     print 'insert_result:', result['insert_result']
  12.     tm=time.time()
  13.     print "stop", tm

  14. def error(reason):
  15.     print "error", reason

  16. tm=time.time()
  17. print "start",tm
  18. for i in range (10): #send data over ten times
  19.   ClientCreator(reactor, amp.AMP).connectTCP(
  20.      '127.0.0.1', 1234).addCallback(connected).addErrback(error)

  21. reactor.run()

End of Code.

Thank you.

 

1 Answer

Few things which will improve your Server code.

First and foremost: The use of direct database access functions is discouraged in twisted, as they normally causes block. Twisted has nice abstraction for database access which provides twisted approach to db connection - twisted.adbapi

Now on to reuse of db connection: If you want to reuse certain assets (like database connection) across a number of Protocol instances, you should initialize those in constructor of Factory or if you dont fancy initiating such things at a launch time, create an resource access method, which will initiate resource upon first method call then assign it to class variable and return that on subsequent calls.

When Factory creates a specific Protocol intance, it will add a reference to itself inside the protocol, see

Then within your Protocol instance, you can access shared database connection instance like:

  1. self.factory.whatever_name_for_db_connection.doSomething()

Reworked Server code (I dont have python, twisted or even decent IDE available, so this is pretty much untested, some errors are to be expected)


 

  1. from twisted.protocols import amp
  2. from twisted.internet import reactor
  3. from twisted.internet.protocol import Factory
  4. import time

  5. class AMPDBAccessProtocolFactory(Factory):
  6.     def getDBConnection(self):
  7.         if 'dbConnection' in dir(self):
  8.             return self.dbConnection
  9.         else:
  10.             self.dbConnection = SQLLiteTestConnection(self.dbURL)
  11.             return self.dbConnection

  12. class SQLLiteTestConnection(object):
  13.     """
  14.     Provides abstraction for database access and some business functions.
  15.     """
  16.     def __init__(self,dbURL):
  17.         self.dbPool = adbapi.ConnectionPool("sqlite3" , dbURL, check_same_thread=False)

  18.     def insertBTData4(self,data):
  19.         query = "INSERT INTO btdata4(co2_data, patient_Id, sensor_Id) VALUES (%s,%s,%s)"
  20.         tm=time.time()
  21.         print "insert time:",tm
  22.         chx=data
  23.         PID=2
  24.         device_ID=5
  25.         dF = self.dbPool.runQuery(query,(chx, PID, device_ID))
  26.         dF.addCallback(self.onQuerySuccess,insert_data=data)
  27.         return dF
  28.     def onQuerySuccess(self,insert_data,*r):
  29.         """
  30.         Here you can inspect query results or add any other valuable information to be parsed at client.
  31.         For the test sake we will just return True to a customer if query was a success.
  32.         original data available at kw argument insert_data
  33.         """
  34.         return True


  35. class Insert(amp.Command):
  36.     arguments = [('data', amp.Integer())]
  37.     response = [('insert_result', amp.Integer())]

  38. class MyAMPProtocol(amp.AMP):

  39.     @Insert.responder
  40.     def dbInsert(self, data):
  41.         db = self.factory.getDBConnection()
  42.         dF = db.insertBTData4(data)
  43.         dF.addErrback(self.onInsertError,data)
  44.         return dF

  45.     def onInsertError(self, error, data):
  46.         """
  47.         Here you could do some additional error checking or inspect data
  48.         which was handed for insert here. For now we will just throw the same exception again
  49.         so that the client gets notified
  50.         """
  51.         raise error

  52. if __name__=='__main__':
  53.     pf = AMPDBAccessProtocolFactory()
  54.     pf.protocol = MyAMPProtocol
  55.     pf.dbURL='biomed1.db'
  56.     reactor.listenTCP(1234, pf)
  57.     reactor.run()

Now on to the client. IF AMP follows the overall RPC logic (cant test it currently) it should be able to peruse the same connection across a number of calls. So I have created a ServerProxy class which will hold that perusable protocol instance and provide abstraction for calls:


 

  1. from twisted.internet import reactor
  2. from twisted.internet.protocol import ClientCreator
  3. from twisted.protocols import amp
  4. import time

  5. class Insert(amp.Command):
  6.     arguments = [('data', amp.Integer())]
  7.     response = [('insert_result', amp.Integer())]

  8. class ServerProxy(object):
  9.     def connected(self,protocol):
  10.         self.serverProxy = protocol # assign protocol as instance variable
  11.         reactor.callLater(5,self.startMultipleInsert) #after five seconds start multiple insert procedure

  12.     def remote_insert(self,data):
  13.         return self.serverProxy.callRemote(Insert, data)

  14.     def startMultipleInsert(self):
  15.         for i in range (10): #send data over ten times
  16.             dF = self.remote_insert(i)
  17.             dF.addCallback(self.gotInsertResult)
  18.             dF.addErrback(error)

  19.     def gotInsertResult(self,result):
  20.         print 'insert_result:', str(result)
  21.         tm=time.time()
  22.         print "stop", tm

  23. def error(reason):
  24.     print "error", reason


  25. def main():
  26.     tm=time.time()
  27.     print "start",tm
  28.     serverProxy = ServerProxy()
  29.     ClientCreator(reactor, amp.AMP).connectTCP('127.0.0.1', 1234).addCallback(serverProxy.connected).addErrback(error)
  30.     reactor.run()

  31. if __name__=='__main__':
  32.     main()


 

Thanks very much. I am currently working through your solution. I originally used the adbapi but what was happening was that by the time the adbapi returned the deferred, the AMP result had already returned a value. Basically I had two deferreds in series (the adbapi and the remote call) If I am waiting for the insert to complete from the client side then (as far as I can see) the adbapi gives no advantage. However, you might have found a way around this, so I will get your code running and see what happens. Many thanks. – 

 

I suspect that you are thinking about twisted process in standart linear code way. This was the major obstacle I had to overcome, when first starting on twisted projects. In twisted you can not think of your execution flow as being linear. Therefore - you can, for example do a multiple database calls in a rapid sequence without waiting for their reply on the next line. Wherever you receive deferred, you have not executed anything yet (in most cases), you have put a job out for execution. – 

 

I am new to Twisted so you are right. I ran your code and got this error: File "DBAserver4.py", line 48, in dbInsert db = self.protocol.getDBConnection() exceptions.AttributeError: 'MyAMPProtocol' object has no attribute 'protocol' - I tried various things to make it work but to no avail. I did a hack and was able to run but it got hung on the query format, which I will play around with. But I cannot see why it does not see the protocol in MyAMPProtocol. Thanks again. – 

That worked. I should have thought to try that. I'll work on the query format and it should run through. Thanks! – 




from:

阅读(1144) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~