I am learning how to use Twisted AMP. I am developing a program that sends data from a client to a server and inserts the data in a SQLite3 DB. The server then sends back a result to the client which indicates success or error (try and except might not be the best way to do this but it is only a temporary solution while I work out the main problem). In order to do this I modified an example I found that originally did a sum and returned the result, so I realize that this might not be the most efficient way to do what I am trying to do. In particular I am trying to do some timings on multiple insertions (i.e. send the data to the server multiple times for multiple insertions) and I have included the code I have written. It works but clearly it is not a good way to send multiple data for insertion since I am performing multiple connections before running the reactor.
I have tried several ways to get around this including passing the ClientCreator to reactor.callWhenRunning() but you cannot do this with a deferred.
Any suggestions, advice or help with how to do this would be much appreciated. Here is the code.
- from twisted.protocols import amp
- from twisted.internet import reactor
- from twisted.internet.protocol import Factory
- import sqlite3, time
- class Insert(amp.Command):
- arguments = [('data', amp.Integer())]
- response = [('insert_result', amp.Integer())]
- class Protocol(amp.AMP):
- def __init__(self):
- self.conn = sqlite3.connect('biomed1.db')
- self.c =self.conn.cursor()
- self.res=None
- @Insert.responder
- def dbInsert(self, data):
- self.InsertDB(data) #call the DB inserter
- result=self.res # send back the result of the insertion
- return {'insert_result': result}
- def InsertDB(self,data):
- tm=time.time()
- print "insert time:",tm
- chx=data
- PID=2
- device_ID=5
- try:
- self.c.execute("INSERT INTO btdata4(co2_data, patient_Id, sensor_Id) VALUES ('%s','%s','%s')" % (chx, PID, device_ID))
- except Exception, err:
- print err
- self.res=0
- else:
- self.res=1
- self.conn.commit()
- pf = Factory()
- pf.protocol = Protocol
- reactor.listenTCP(1234, pf)
- reactor.run()
- from twisted.internet import reactor
- from twisted.internet.protocol import ClientCreator
- from twisted.protocols import amp
- import time
- class Insert(amp.Command):
- arguments = [('data', amp.Integer())]
- response = [('insert_result', amp.Integer())]
- def connected(protocol):
- return protocol.callRemote(Insert, data=5555).addCallback(gotResult)
- def gotResult(result):
- print 'insert_result:', result['insert_result']
- tm=time.time()
- print "stop", tm
- def error(reason):
- print "error", reason
- tm=time.time()
- print "start",tm
- for i in range (10): #send data over ten times
- ClientCreator(reactor, amp.AMP).connectTCP(
- '', 1234).addCallback(connected).addErrback(error)
- reactor.run()
End of Code.
Thank you.
1 Answer
Few things which will improve your Server code.
First and foremost: The use of direct database access functions is discouraged in twisted, as they normally causes block. Twisted has nice abstraction for database access which provides twisted approach to db connection - twisted.adbapi
Now on to reuse of db connection: If you want to reuse certain assets (like database connection) across a number of Protocol instances, you should initialize those in constructor of Factory or if you dont fancy initiating such things at a launch time, create an resource access method, which will initiate resource upon first method call then assign it to class variable and return that on subsequent calls.
When Factory creates a specific Protocol intance, it will add a reference to itself inside the protocol, see
Then within your Protocol instance, you can access shared database connection instance like:
- self.factory.whatever_name_for_db_connection.doSomething()
Reworked Server code (I dont have python, twisted or even decent IDE available, so this is pretty much untested, some errors are to be expected)
- from twisted.protocols import amp
- from twisted.internet import reactor
- from twisted.internet.protocol import Factory
- import time
- class AMPDBAccessProtocolFactory(Factory):
- def getDBConnection(self):
- if 'dbConnection' in dir(self):
- return self.dbConnection
- else:
- self.dbConnection = SQLLiteTestConnection(self.dbURL)
- return self.dbConnection
- class SQLLiteTestConnection(object):
- """
- Provides abstraction for database access and some business functions.
- """
- def __init__(self,dbURL):
- self.dbPool = adbapi.ConnectionPool("sqlite3" , dbURL, check_same_thread=False)
- def insertBTData4(self,data):
- query = "INSERT INTO btdata4(co2_data, patient_Id, sensor_Id) VALUES (%s,%s,%s)"
- tm=time.time()
- print "insert time:",tm
- chx=data
- PID=2
- device_ID=5
- dF = self.dbPool.runQuery(query,(chx, PID, device_ID))
- dF.addCallback(self.onQuerySuccess,insert_data=data)
- return dF
- def onQuerySuccess(self,insert_data,*r):
- """
- Here you can inspect query results or add any other valuable information to be parsed at client.
- For the test sake we will just return True to a customer if query was a success.
- original data available at kw argument insert_data
- """
- return True
- class Insert(amp.Command):
- arguments = [('data', amp.Integer())]
- response = [('insert_result', amp.Integer())]
- class MyAMPProtocol(amp.AMP):
- @Insert.responder
- def dbInsert(self, data):
- db = self.factory.getDBConnection()
- dF = db.insertBTData4(data)
- dF.addErrback(self.onInsertError,data)
- return dF
- def onInsertError(self, error, data):
- """
- Here you could do some additional error checking or inspect data
- which was handed for insert here. For now we will just throw the same exception again
- so that the client gets notified
- """
- raise error
- if __name__=='__main__':
- pf = AMPDBAccessProtocolFactory()
- pf.protocol = MyAMPProtocol
- pf.dbURL='biomed1.db'
- reactor.listenTCP(1234, pf)
- reactor.run()
Now on to the client. IF AMP follows the overall RPC logic (cant test it currently) it should be able to peruse the same connection across a number of calls. So I have created a ServerProxy class which will hold that perusable protocol instance and provide abstraction for calls:
- from twisted.internet import reactor
- from twisted.internet.protocol import ClientCreator
- from twisted.protocols import amp
- import time
- class Insert(amp.Command):
- arguments = [('data', amp.Integer())]
- response = [('insert_result', amp.Integer())]
- class ServerProxy(object):
- def connected(self,protocol):
- self.serverProxy = protocol # assign protocol as instance variable
- reactor.callLater(5,self.startMultipleInsert) #after five seconds start multiple insert procedure
- def remote_insert(self,data):
- return self.serverProxy.callRemote(Insert, data)
- def startMultipleInsert(self):
- for i in range (10): #send data over ten times
- dF = self.remote_insert(i)
- dF.addCallback(self.gotInsertResult)
- dF.addErrback(error)
- def gotInsertResult(self,result):
- print 'insert_result:', str(result)
- tm=time.time()
- print "stop", tm
- def error(reason):
- print "error", reason
- def main():
- tm=time.time()
- print "start",tm
- serverProxy = ServerProxy()
- ClientCreator(reactor, amp.AMP).connectTCP('', 1234).addCallback(serverProxy.connected).addErrback(error)
- reactor.run()
- if __name__=='__main__':
- main()
阅读(1161) | 评论(0) | 转发(0) |