Chinaunix首页 | 论坛 | 博客
  • 博客访问: 5096725
  • 博文数量: 921
  • 博客积分: 16037
  • 博客等级: 上将
  • 技术积分: 8469
  • 用 户 组: 普通用户
  • 注册时间: 2006-04-05 02:08
文章分类

全部博文(921)

文章存档

2020年(1)

2019年(3)

2018年(3)

2017年(6)

2016年(47)

2015年(72)

2014年(25)

2013年(72)

2012年(125)

2011年(182)

2010年(42)

2009年(14)

2008年(85)

2007年(89)

2006年(155)

分类: Python/Ruby

2012-04-06 23:42:16

is a networking engine written in Python, that among many other things, can be used to do parallel processing. It is very big, though, so I had a hard time finding what I needed. I browsed through the Twisted Documentation and the . There is also a . However, I found Bruce Eckel's article, Concurrency with Python, Twisted, and Flex to be the most helpful. (See also Bruce Eckel's initial article on Twisted: Grokking Twisted)

Here are my notes on running Bruce Eckel's example. I removed the Flex part because I didn't need or know anything about it. This example runs a Controller which starts a number of separate parallel processes running Solvers (a.ka. workers). It also allows for communication between the Controller and Solvers. Though this example only runs on one machine, the article said extending this to multiple machines is not difficult. For a good explanation of how this works, please see the original article.

Here is solver.py which is copied from the original article. The actual "work" is done in the step method. I only added some debugging print statements for myself.


 

  1. """
  2. solver.py
  3. Original version by Bruce Eckel
  4. Solves one portion of a problem, in a separate process on a separate CPU
  5. """
  6. import sys, random, math
  7. from twisted.spread import pb
  8. from twisted.internet import reactor

  9. class Solver(pb.Root):

  10.     def __init__(self, id):
  11.         print "solver.py %s: solver init" % id
  12.         self.id = id

  13.     def __str__(self): # String representation
  14.         return "Solver %s" % self.id

  15.     def remote_initialize(self, initArg):
  16.         return "%s initialized" % self

  17.     def step(self, arg):
  18.         print "solver.py %s: solver step" % self.id
  19.         "Simulate work and return result"
  20.         result = 0
  21.         for i in range(random.randint(1000000, 3000000)):
  22.             angle = math.radians(random.randint(0, 45))
  23.             result += math.tanh(angle)/math.cosh(angle)
  24.         return "%s, %s, result: %.2f" % (self, str(arg), result)

  25.     # Alias methods, for demonstration version:
  26.     remote_step1 = step
  27.     remote_step2 = step
  28.     remote_step3 = step

  29.     def remote_status(self):
  30.         print "solver.py %s: remote_status" % self.id
  31.         return "%s operational" % self

  32.     def remote_terminate(self):
  33.         print "solver.py %s: remote_terminate" % self.id
  34.         reactor.callLater(0.5, reactor.stop)
  35.         return "%s terminating..." % self

  36. if __name__ == "__main__":
  37.     port = int(sys.argv[1])
  38.     reactor.listenTCP(port, pb.PBServerFactory(Solver(sys.argv[1])))
  39.     reactor.run()


 

Here is controller.py. This is also copied from the original article but I removed the Flex interface and created calls to start and terminate in the Controller class. I'm not sure if this makes sense, but at least this allowed me to run the example. I also moved the terminate method from the FlexInterface to the Controller.


 

  1. """
  2. Controller.py
  3. Original version by Bruce Eckel
  4. Starts and manages solvers in separate processes for parallel processing.
  5. """
  6. import sys
  7. from subprocess import Popen
  8. from twisted.spread import pb
  9. from twisted.internet import reactor, defer

  10. START_PORT = 5566
  11. MAX_PROCESSES = 2

  12. class Controller(object):

  13.     def broadcastCommand(self, remoteMethodName, arguments, nextStep, failureMessage):
  14.         print "controller.py: broadcasting..."
  15.         deferreds = [solver.callRemote(remoteMethodName, arguments)
  16.                      for solver in self.solvers.values()]
  17.         print "controller.py: broadcasted"
  18.         reactor.callLater(3, self.checkStatus)

  19.         defer.DeferredList(deferreds, consumeErrors=True).addCallbacks(
  20.             nextStep, self.failed, errbackArgs=(failureMessage))
  21.     
  22.     def checkStatus(self):
  23.         print "controller.py: checkStatus"
  24.         for solver in self.solvers.values():
  25.             solver.callRemote("status").addCallbacks(
  26.                 lambda r: sys.stdout.write(r + "\n"), self.failed,
  27.                 errbackArgs=("Status Check Failed"))
  28.                                                      
  29.     def failed(self, results, failureMessage="Call Failed"):
  30.         print "controller.py: failed"
  31.         for (success, returnValue), (address, port) in zip(results, self.solvers):
  32.             if not success:
  33.                 raise Exception("address: %s port: %d %s" % (address, port, failureMessage))

  34.     def __init__(self):
  35.         print "controller.py: init"
  36.         self.solvers = dict.fromkeys(
  37.             [("localhost", i) for i in range(START_PORT, START_PORT+MAX_PROCESSES)])
  38.         self.pids = [Popen(["python", "solver.py", str(port)]).pid
  39.                      for ip, port in self.solvers]
  40.         print "PIDS: ", self.pids
  41.         self.connected = False
  42.         reactor.callLater(1, self.connect)

  43.     def connect(self):
  44.         print "controller.py: connect"
  45.         connections = []
  46.         for address, port in self.solvers:
  47.             factory = pb.PBClientFactory()
  48.             reactor.connectTCP(address, port, factory)
  49.             connections.append(factory.getRootObject())
  50.         defer.DeferredList(connections, consumeErrors=True).addCallbacks(
  51.             self.storeConnections, self.failed, errbackArgs=("Failed to Connect"))

  52.         print "controller.py: starting parallel jobs"
  53.         self.start()

  54.     def storeConnections(self, results):
  55.         print "controller.py: storeconnections"
  56.         for (success, solver), (address, port) in zip(results, self.solvers):
  57.             self.solvers[address, port] = solver
  58.         print "controller.py: Connected; self.solvers:", self.solvers
  59.         self.connected = True

  60.     def start(self):
  61.         "controller.py: Begin the solving process"
  62.         if not self.connected:
  63.             return reactor.callLater(0.5, self.start)
  64.         self.broadcastCommand("step1", ("step 1"), self.step2, "Failed Step 1")

  65.     def step2(self, results):
  66.         print "controller.py: step 1 results:", results
  67.         self.broadcastCommand("step2", ("step 2"), self.step3, "Failed Step 2")

  68.     def step3(self, results):
  69.         print "controller.py: step 2 results:", results
  70.         self.broadcastCommand("step3", ("step 3"), self.collectResults, "Failed Step 3")

  71.     def collectResults(self, results):
  72.         print "controller.py: step 3 results:", results
  73.         self.terminate()
  74.         
  75.     def terminate(self):
  76.         print "controller.py: terminate"
  77.         for solver in self.solvers.values():
  78.             solver.callRemote("terminate").addErrback(self.failed, "Termination Failed")
  79.         reactor.callLater(1, reactor.stop)
  80.         return "Terminating remote solvers"

  81. if __name__ == "__main__":
  82.     controller = Controller()
  83.     reactor.run()

To run it, put the two files in the same directory and run python controller.py. You should see 2 CPUs (if you have 2) go up to 100% usage. And here is the screen output:


 

  1. controller.py: init
  2. PIDS: [12173, 12174]
  3. solver.py 5567: solver init
  4. solver.py 5566: solver init
  5. controller.py: connect
  6. controller.py: starting parallel jobs
  7. controller.py: storeconnections
  8. controller.py: Connected; self.solvers: {('localhost', 5567): , ('localhost', 5566): }
  9. controller.py: broadcasting...
  10. controller.py: broadcasted
  11. solver.py 5566: solver step
  12. solver.py 5567: solver step
  13. controller.py: checkStatus
  14. solver.py 5566: remote_status
  15. Solver 5566 operational
  16. solver.py 5567: remote_status
  17. controller.py: step 1 results: [(True, 'Solver 5567, step 1, result: 683825.75'), (True, 'Solver 5566, step 1, result: 543177.17')]
  18. controller.py: broadcasting...
  19. controller.py: broadcasted
  20. Solver 5567 operational
  21. solver.py 5566: solver step
  22. solver.py 5567: solver step
  23. controller.py: checkStatus
  24. solver.py 5566: remote_status
  25. Solver 5566 operational
  26. solver.py 5567: remote_status
  27. controller.py: step 2 results: [(True, 'Solver 5567, step 2, result: 636793.90'), (True, 'Solver 5566, step 2, result: 335358.16')]
  28. controller.py: broadcasting...
  29. controller.py: broadcasted
  30. Solver 5567 operational
  31. solver.py 5566: solver step
  32. solver.py 5567: solver step
  33. controller.py: checkStatus
  34. solver.py 5566: remote_status
  35. Solver 5566 operational
  36. solver.py 5567: remote_status
  37. controller.py: step 3 results: [(True, 'Solver 5567, step 3, result: 847386.43'), (True, 'Solver 5566, step 3, result: 512120.15')]
  38. controller.py: terminate
  39. Solver 5567 operational
  40. solver.py 5566: remote_terminate
  41. solver.py 5567: remote_terminate

from: http://www.saltycrane.com/blog/2008/09/notes-parallel-processing-python-and-twisted/

阅读(870) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~