Chinaunix首页 | 论坛 | 博客
  • 博客访问: 5095869
  • 博文数量: 921
  • 博客积分: 16037
  • 博客等级: 上将
  • 技术积分: 8469
  • 用 户 组: 普通用户
  • 注册时间: 2006-04-05 02:08
文章分类

全部博文(921)

文章存档

2020年(1)

2019年(3)

2018年(3)

2017年(6)

2016年(47)

2015年(72)

2014年(25)

2013年(72)

2012年(125)

2011年(182)

2010年(42)

2009年(14)

2008年(85)

2007年(89)

2006年(155)

分类: Python/Ruby

2013-12-18 19:42:34





Twisted as an Erlang node

Twisted as an Erlang node

Prequesites

In this document, we'll assume that you have a basic knowledge of Twisted (how to create a client, how to create a server, what is the protocol/factory mechanism, what is a deferred), and of Erlang (what is a node, what is the EPMD, what are the basic types). Please refer to the respective documentation if it's not the case.

Invoking methods on an Erlang node

To create connection to an erlang node, you'll need to get the cookie and create a node name. The utility functions readCookie and buildNodeName are here to help you out: the first read the cookie at the usual place ($HOME/.erlang.cookie), the second append the name of the host to the name you have chosen.

Then, instantiate the class Process with the infos, and call callRemote on it. This method will:

  • create a connection with the EPMD
  • ask for the port info of the node
  • connect to the node
  • return the connected node client protocol
  • make a RPC call on the connected node
  • return the result to you


  1. from twisted.internet import reactor
  2. from twotp import Process, readCookie, buildNodeName
  3. def gotResult(resp):
  4.     print "Got response", resp
  5.     reactor.stop()
  6. def eb(error):
  7.     print "Got error", error
  8.     reactor.stop()
  9. cookie = readCookie()
  10. nodeName = buildNodeName('nodename')
  11. process = Process(nodeName, cookie)
  12. process.callRemote("foo", "file", "get_cwd").addCallback(gotResult).addErrback(eb)
  13. reactor.run()

Instantiating a node

The process for creating a node is really straightforward: you just have to call the listen of your Process. What happens behind the scene is that first it creates a node server factory, and makes it listen on an arbitrary port. Then it creates a client connection to the EPMD, giving the server port as argument. The EPMD should be started outside Twisted. It is started automatically when you start an erlang application.

  1. import sys
  2. from twisted.python import log
  3. from twisted.internet import reactor
  4. from twotp import Process, readCookie, buildNodeName
  5. cookie = readCookie()
  6. nodeName = buildNodeName('nodename')
  7. log.startLogging(sys.stdout)
  8. process = Process(nodeName, cookie)
  9. process.listen()
  10. reactor.run()

Of course, for now it does nothing useful, so we now see how to get data from an erlang nodes.

Receiving method calls from an Erlang node

The key method here is the method registerModule of the Process class: this method takes a name and an object. The name is the name of the module used when making a call from Erlang. The object is an instance of a class with methods prefixed by remote_.

  1. import sys
  2. from twisted.python import log
  3. from twisted.internet import reactor
  4. from twotp import Process, readCookie, buildNodeName
  5. class Proxy(object):
  6.     def remote_double(self, arg):
  7.         return arg * 2
  8. cookie = readCookie()
  9. nodeName = buildNodeName('nodename')
  10. log.startLogging(sys.stdout)
  11. process = Process(nodeName, cookie)
  12. process.registerModule("math", Proxy())
  13. process.listen()
  14. reactor.run()

The script above define a Proxy with a remote_double method. We can publish with the keyword math, which creates a module named math with the method double. We can now call this method in an Erlang console:

  1. (erlang@node)1> rpc:call('nodename@node', math, double, [3]).
  2. {ok,6}

Reading data sent by an Erlang node

This is the next step after using RPC, which is the basic communication mechanism. But we can do better using standard process communication of Erlang, which provides a nice asynchronous mechanism. The receive method of the Process class will return a Deferred which will be fired when someone sends a message to your process.

  1. import sys
  2. from twisted.python import log
  3. from twisted.internet import reactor
  4. from twotp import Process, readCookie, buildNodeName
  5. def receive(process):
  6.     def cb(resp):
  7.         print "Got response", resp
  8.     def eb(error):
  9.         print "Got error", error
  10.     return process.receive(
  11.         ).addCallback(cb
  12.         ).addErrback(eb
  13.         ).addBoth(lambda x: receive(process))
  14. cookie = readCookie()
  15. nodeName = buildNodeName('nodename')
  16. log.startLogging(sys.stdout)
  17. process = Process(nodeName, cookie)
  18. process.register("main")
  19. process.listen().addCallback(lambda x: receive(process))
  20. reactor.run()

This example shows how you can easily read message forever in a TwOTP program, by calling receive indefinitely. This method also supports a timeout argument if you don't want to wait forever for a message. To send data back to an Erlang node, you can easily use the send method which takes a PID attribute. For example, emulating the previous RPC:

  1. import sys
  2. from twisted.python import log
  3. from twisted.internet import reactor
  4. from twotp import Process, readCookie, buildNodeName
  5. def receive(process):
  6.     def cb(resp):
  7.         print "Got response", resp
  8.         pid = resp[0]
  9.         process.send(pid, resp[1] * 2)
  10.     def eb(error):
  11.         print "Got error", error
  12.     return process.receive(
  13.         ).addCallback(cb
  14.         ).addErrback(eb
  15.         ).addBoth(lambda x: receive(process))
  16. cookie = readCookie()
  17. nodeName = buildNodeName('nodename')
  18. log.startLogging(sys.stdout)
  19. process = Process(nodeName, cookie)
  20. process.register("main")
  21. process.listen().addCallback(lambda x: receive(process))
  22. reactor.run()

The only difference here is that we're making some assumption on the format, namely that the first argument with be a PID, and the second something to double. And on the Erlang side:

  1. (erlang@node)1> {main, nodename@node} ! {self(), 1}.
  2. {<0.37.0>,1}
  3. (erlang@node)2> receive
  4. (erlang@node)2> X -> io:format("Result: ~p~n", [X])
  5. (erlang@node)2> end.
  6. Result: 2

Note that by calling the register method, we've been able to use the passed name to make the call from Erlang.

Using the twotp twisted plugin

When you install twotp, and type twistd --help in a shell, you should see a new twotp command. It has several options:

$ twistd twotp --help
Usage: twistd [options] twotp [options]
Options:
  -n, --nodename=  Name of the node [default: twisted]
  -c, --cookie=    Erlang cookie value [default: ]
  -m, --methods=   Methods holder [default: ]
      --version
      --help       Display this help and exit.
  • -n or --nodename let you specify the name of the node. It's the equivalent of the -sname option of erl.
  • -c or --cookie let you specify the coookie. It's the equivalent of the -setcookie option of erl.
  • -m or --methods let you specify the methods you want to publish. It takes the fully qualified name of a python dictionary.

Let's create a module rpc.py and put some objects with methods in them.

  1. class File(object):
  2.     def remote_get_cwd(self):
  3.         from twisted.python import filepath
  4.         return filepath.FilePath(".").path
  5. class Time(object):
  6.     def remote_get_time(self):
  7.         import time
  8.         return time.time()
  9. methods = {"file": File(), "time": Time()}

Then launch on the command line twistd, after making sure your module is in an importable place.

  1. $ twistd -n twotp --methods rpc.methods
  2. 2009-05-09 21:24:31+0200 [-] Log opened.
  3. 2009-05-09 21:24:31+0200 [-] twistd 8.2.0+r26899 (/usr/bin/python 2.5.2) starting up.
  4. 2009-05-09 21:24:31+0200 [-] reactor class: twisted.internet.selectreactor.SelectReactor.
  5. 2009-05-09 21:24:31+0200 [-] starting on 40962
  6. 2009-05-09 21:24:31+0200 [-] Starting factory
  7. 2009-05-09 21:24:31+0200 [-] Starting factory
  8. 2009-05-09 21:24:31+0200 [Uninitialized] Connection made with EPMD

This is it! You've exposed methods callable from Erlang. Let's fire an erlang shell to try.

  1. (erlang@node)1> rpc:call(twisted@node, file, get_cwd, []).
  2. {ok,"/home/user"}
  3. (erlang@node)2> rpc:call(twisted@node, time, get_time, []).
  4. {ok,1241897194.455415}

Now try and define some interesting methods instead.

Linking and monitoring an Erlang process

To get exit information with an Erlang process, you can create a link or a monitor operation with it. A link is symmetrical, which means that the remote process can be notified of the exit status of the Twisted process as well. Let's try monitoring first.

Open an Erlang shell, create and register a simple process that can fail.

  1. (erlang@node)1> F = fun() -> receive X -> list_to_atom(X) end end.
  2. #Funi
  3. (erlang@node)2> Pid1 = spawn(F).
  4. <0.42.0>
  5. (erlang@node)3> register(pid1, Pid1).
  6. true

It registers a process under the name pid1, which can easily fails. Now let's start a twisted process monitoring that pid:


  1. import sys
  2. from twisted.python import log
  3. from twisted.internet import reactor
  4. from twotp import Process, readCookie, buildNodeName
  5. def gotWhereis(pid):
  6.     return process.monitor(pid).addCallback(gotRef)
  7. def gotRef(ref):
  8.     process.addMonitorHandler(ref, gotExit)
  9. def gotExit(reason):
  10.     print "Remote process exited with:", reason
  11. cookie = readCookie()
  12. nodeName = buildNodeName('nodename')
  13. log.startLogging(sys.stdout)
  14. process = Process(nodeName, cookie)
  15. process.whereis('erlang', 'pid1').addCallback(gotWhereis)
  16. reactor.run()
There are several interesting methods here. First, whereis allows you to get a Pid object from a name, if this name is registered on the other node. Then monitor creates a monitor link, and returns the monitor reference in its callback. You can then call addMonitorHandler as many times you want with this reference: the specified callbacks will be called with the exit notification happens.

Return to the Erlang shell, and create an error on the process:


  1. (erlang@node)4> pid1 ! 1.
  2. =ERROR REPORT==== 9-May-2009::19:18:29 ===
  3. Error in process <0.42.0> on node 'erlang@node' with exit value: {badarg,[{erlang,list_to_atom,[1]}]}
  4. 1

If everything went fine, you should have the following log in your Twisted application:

2009-05-09 19:18:29+0200 [NodeClientProtocol,client] Remote process exited with: (, [(, , [1])])
    

An example: using a Mnesia database

To try the communication as client, we'll set up a mnesia database to be used within a Twisted application. Please refer to the mnesia documentation for further information. Note that there is nothing new here: it's just a concrete example to put the things together.

First create a file named twisted.hrl with following record:

-record(user, {id, name}).
    

Then create a file named twisted.erl


  1. -module(twisted).
  2. -export([init/0, insert_user/2, get_user/1]).
  3. -include("twisted.hrl").
  4. init() ->
  5.     mnesia:create_table(user,
  6.                         [{attributes, record_info(fields, user)}]).
  7. insert_user(UserName, UserId) ->
  8.     User = #user{id=UserId, name=UserName},
  9.     Fun = fun() ->
  10.             mnesia:write(User)
  11.         end,
  12.     mnesia:transaction(Fun).
  13. get_user(UserId) ->
  14.     Fun = fun() ->
  15.             [User] = mnesia:read(user, UserId, read),
  16.             User
  17.         end,
  18.     case mnesia:transaction(Fun) of
  19.         {atomic, User} ->
  20.             User
  21.     end.
Now fire an erlang interpreter. We'll compile the twisted file, start the mnesia database, create the table, and insert a test user.

  1. $ erl -sname twisted_mnesia
  2. (twisted_mnesia@localhost)1> c(twisted).
  3. {ok,twisted}
  4. (twisted_mnesia@localhost)2> mnesia:start().
  5. ok
  6. (twisted_mnesia@localhost)3> twisted:init().
  7. {atomic,ok}
  8. (twisted_mnesia@localhost)4> twisted:insert_user(test1, 1).
  9. {atomic,ok}
  10. (twisted_mnesia@localhost)5> twisted:get_user(1).
  11. {user,1,test1}

The node is ready to accept our python calls.


  1. from twisted.internet import reactor
  2. from twotp import Process, readCookie, buildNodeName
  3. def gotResult(resp):
  4.     print "Got response", resp
  5.     reactor.stop()
  6. def eb(error):
  7.     print "Got error", error
  8.     reactor.stop()
  9. cookie = readCookie()
  10. nodeName = buildNodeName('nodename')
  11. process = Process(nodeName, cookie)
  12. process.callRemote("twisted_mnesia", "twisted", "get_user", 1
  13.     ).addCallback(gotResult).addErrback(eb)
  14. reactor.run()

If everything is fine, you should get the following response:

Got response (, 1, )

from :  
阅读(2322) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~