全部博文(921)
分类: Python/Ruby
2013-12-18 19:42:34
In this document, we'll assume that you have a basic knowledge of Twisted (how to create a client, how to create a server, what is the protocol/factory mechanism, what is a deferred), and of Erlang (what is a node, what is the EPMD, what are the basic types). Please refer to the respective documentation if it's not the case.
To create connection to an erlang node, you'll need to get the cookie and create a node name. The utility functions readCookie and buildNodeName are here to help you out: the first read the cookie at the usual place ($HOME/.erlang.cookie), the second append the name of the host to the name you have chosen.
Then, instantiate the class Process with the infos, and call callRemote on it. This method will:
- from twisted.internet import reactor
- from twotp import Process, readCookie, buildNodeName
- def gotResult(resp):
- print "Got response", resp
- reactor.stop()
- def eb(error):
- print "Got error", error
- reactor.stop()
- cookie = readCookie()
- nodeName = buildNodeName('nodename')
- process = Process(nodeName, cookie)
- process.callRemote("foo", "file", "get_cwd").addCallback(gotResult).addErrback(eb)
- reactor.run()
The process for creating a node is really straightforward: you just have to call the listen of your Process. What happens behind the scene is that first it creates a node server factory, and makes it listen on an arbitrary port. Then it creates a client connection to the EPMD, giving the server port as argument. The EPMD should be started outside Twisted. It is started automatically when you start an erlang application.
- import sys
- from twisted.python import log
- from twisted.internet import reactor
- from twotp import Process, readCookie, buildNodeName
- cookie = readCookie()
- nodeName = buildNodeName('nodename')
- log.startLogging(sys.stdout)
- process = Process(nodeName, cookie)
- process.listen()
- reactor.run()
Of course, for now it does nothing useful, so we now see how to get data from an erlang nodes.
The key method here is the method registerModule of the Process class: this method takes a name and an object. The name is the name of the module used when making a call from Erlang. The object is an instance of a class with methods prefixed by remote_.
- import sys
- from twisted.python import log
- from twisted.internet import reactor
- from twotp import Process, readCookie, buildNodeName
- class Proxy(object):
- def remote_double(self, arg):
- return arg * 2
- cookie = readCookie()
- nodeName = buildNodeName('nodename')
- log.startLogging(sys.stdout)
- process = Process(nodeName, cookie)
- process.registerModule("math", Proxy())
- process.listen()
- reactor.run()
The script above define a Proxy with a remote_double method. We can publish with the keyword math, which creates a module named math with the method double. We can now call this method in an Erlang console:
- (erlang@node)1> rpc:call('nodename@node', math, double, [3]).
- {ok,6}
This is the next step after using RPC, which is the basic communication mechanism. But we can do better using standard process communication of Erlang, which provides a nice asynchronous mechanism. The receive method of the Process class will return a Deferred which will be fired when someone sends a message to your process.
- import sys
- from twisted.python import log
- from twisted.internet import reactor
- from twotp import Process, readCookie, buildNodeName
- def receive(process):
- def cb(resp):
- print "Got response", resp
- def eb(error):
- print "Got error", error
- return process.receive(
- ).addCallback(cb
- ).addErrback(eb
- ).addBoth(lambda x: receive(process))
- cookie = readCookie()
- nodeName = buildNodeName('nodename')
- log.startLogging(sys.stdout)
- process = Process(nodeName, cookie)
- process.register("main")
- process.listen().addCallback(lambda x: receive(process))
- reactor.run()
This example shows how you can easily read message forever in a TwOTP program, by calling receive indefinitely. This method also supports a timeout argument if you don't want to wait forever for a message. To send data back to an Erlang node, you can easily use the send method which takes a PID attribute. For example, emulating the previous RPC:
- import sys
- from twisted.python import log
- from twisted.internet import reactor
- from twotp import Process, readCookie, buildNodeName
- def receive(process):
- def cb(resp):
- print "Got response", resp
- pid = resp[0]
- process.send(pid, resp[1] * 2)
- def eb(error):
- print "Got error", error
- return process.receive(
- ).addCallback(cb
- ).addErrback(eb
- ).addBoth(lambda x: receive(process))
- cookie = readCookie()
- nodeName = buildNodeName('nodename')
- log.startLogging(sys.stdout)
- process = Process(nodeName, cookie)
- process.register("main")
- process.listen().addCallback(lambda x: receive(process))
- reactor.run()
The only difference here is that we're making some assumption on the format, namely that the first argument with be a PID, and the second something to double. And on the Erlang side:
- (erlang@node)1> {main, nodename@node} ! {self(), 1}.
- {<0.37.0>,1}
- (erlang@node)2> receive
- (erlang@node)2> X -> io:format("Result: ~p~n", [X])
- (erlang@node)2> end.
- Result: 2
Note that by calling the register method, we've been able to use the passed name to make the call from Erlang.
When you install twotp, and type twistd --help in a shell, you should see a new twotp command. It has several options:
$ twistd twotp --help Usage: twistd [options] twotp [options] Options: -n, --nodename= Name of the node [default: twisted] -c, --cookie= Erlang cookie value [default: ] -m, --methods= Methods holder [default: ] --version --help Display this help and exit.
Let's create a module rpc.py and put some objects with methods in them.
- class File(object):
- def remote_get_cwd(self):
- from twisted.python import filepath
- return filepath.FilePath(".").path
- class Time(object):
- def remote_get_time(self):
- import time
- return time.time()
- methods = {"file": File(), "time": Time()}
Then launch on the command line twistd, after making sure your module is in an importable place.
- $ twistd -n twotp --methods rpc.methods
- 2009-05-09 21:24:31+0200 [-] Log opened.
- 2009-05-09 21:24:31+0200 [-] twistd 8.2.0+r26899 (/usr/bin/python 2.5.2) starting up.
- 2009-05-09 21:24:31+0200 [-] reactor class: twisted.internet.selectreactor.SelectReactor.
- 2009-05-09 21:24:31+0200 [-] starting on 40962
- 2009-05-09 21:24:31+0200 [-] Starting factory
- 2009-05-09 21:24:31+0200 [-] Starting factory
- 2009-05-09 21:24:31+0200 [Uninitialized] Connection made with EPMD
This is it! You've exposed methods callable from Erlang. Let's fire an erlang shell to try.
- (erlang@node)1> rpc:call(twisted@node, file, get_cwd, []).
- {ok,"/home/user"}
- (erlang@node)2> rpc:call(twisted@node, time, get_time, []).
- {ok,1241897194.455415}
Now try and define some interesting methods instead.
To get exit information with an Erlang process, you can create a link or a monitor operation with it. A link is symmetrical, which means that the remote process can be notified of the exit status of the Twisted process as well. Let's try monitoring first.
Open an Erlang shell, create and register a simple process that can fail.
- (erlang@node)1> F = fun() -> receive X -> list_to_atom(X) end end.
- #Funi
- (erlang@node)2> Pid1 = spawn(F).
- <0.42.0>
- (erlang@node)3> register(pid1, Pid1).
- true
It registers a process under the name pid1, which can easily fails. Now let's start a twisted process monitoring that pid:
There are several interesting methods here. First, whereis allows you to get a Pid object from a name, if this name is registered on the other node. Then monitor creates a monitor link, and returns the monitor reference in its callback. You can then call addMonitorHandler as many times you want with this reference: the specified callbacks will be called with the exit notification happens.
- import sys
- from twisted.python import log
- from twisted.internet import reactor
- from twotp import Process, readCookie, buildNodeName
- def gotWhereis(pid):
- return process.monitor(pid).addCallback(gotRef)
- def gotRef(ref):
- process.addMonitorHandler(ref, gotExit)
- def gotExit(reason):
- print "Remote process exited with:", reason
- cookie = readCookie()
- nodeName = buildNodeName('nodename')
- log.startLogging(sys.stdout)
- process = Process(nodeName, cookie)
- process.whereis('erlang', 'pid1').addCallback(gotWhereis)
- reactor.run()
Return to the Erlang shell, and create an error on the process:
- (erlang@node)4> pid1 ! 1.
- =ERROR REPORT==== 9-May-2009::19:18:29 ===
- Error in process <0.42.0> on node 'erlang@node' with exit value: {badarg,[{erlang,list_to_atom,[1]}]}
- 1
If everything went fine, you should have the following log in your Twisted application:
2009-05-09 19:18:29+0200 [NodeClientProtocol,client] Remote process exited with: (, [(, , [1])])
To try the communication as client, we'll set up a mnesia database to be used within a Twisted application. Please refer to the mnesia documentation for further information. Note that there is nothing new here: it's just a concrete example to put the things together.
First create a file named twisted.hrl with following record:
-record(user, {id, name}).
Then create a file named twisted.erl
Now fire an erlang interpreter. We'll compile the twisted file, start the mnesia database, create the table, and insert a test user.
- -module(twisted).
- -export([init/0, insert_user/2, get_user/1]).
- -include("twisted.hrl").
- init() ->
- mnesia:create_table(user,
- [{attributes, record_info(fields, user)}]).
- insert_user(UserName, UserId) ->
- User = #user{id=UserId, name=UserName},
- Fun = fun() ->
- mnesia:write(User)
- end,
- mnesia:transaction(Fun).
- get_user(UserId) ->
- Fun = fun() ->
- [User] = mnesia:read(user, UserId, read),
- User
- end,
- case mnesia:transaction(Fun) of
- {atomic, User} ->
- User
- end.
- $ erl -sname twisted_mnesia
- (twisted_mnesia@localhost)1> c(twisted).
- {ok,twisted}
- (twisted_mnesia@localhost)2> mnesia:start().
- ok
- (twisted_mnesia@localhost)3> twisted:init().
- {atomic,ok}
- (twisted_mnesia@localhost)4> twisted:insert_user(test1, 1).
- {atomic,ok}
- (twisted_mnesia@localhost)5> twisted:get_user(1).
- {user,1,test1}
The node is ready to accept our python calls.
- from twisted.internet import reactor
- from twotp import Process, readCookie, buildNodeName
- def gotResult(resp):
- print "Got response", resp
- reactor.stop()
- def eb(error):
- print "Got error", error
- reactor.stop()
- cookie = readCookie()
- nodeName = buildNodeName('nodename')
- process = Process(nodeName, cookie)
- process.callRemote("twisted_mnesia", "twisted", "get_user", 1
- ).addCallback(gotResult).addErrback(eb)
- reactor.run()
If everything is fine, you should get the following response:
Got response (, 1, )
from :