参考文章:http://rubylearning.com/blog/2010/10/01/an-introduction-to-eventmachine-and-how-to-avoid-callback-spaghetti/
eventmachine 介绍以及如何避免回调混乱。事件驱动编程最近变得很新潮,这很大程度是源于
项目的优雅。其实在 ruby 世界里面通过 eventmachine,我们 已经使用了好多年的事件编程了(eventmachine 为 ruby 添加了事件 IO)。一般来说编写事件驱动的代码比较复杂且混乱,但实际上我们写出来的代码非常漂亮。你只会用到不多的特殊技巧。
最大的挑战实际上是真正理解如何创建一个干净的模型抽象。由于需求不同,代码结构不同,不认真规划,你会很快发现你掉入到回调迷阵当中(这里有个形象的说法,叫意大利面)。此文中会讲解一些通用的模板,其中混合用到了 Twitter streaming API, Google’s language API, and a WebSocket server。绝对没有意大利面,我保证!
做完 eventmachine 入门讲解后,我们会讨论两种通用的抽象。第一个叫对象延迟处理,类似于异步方法调用。第二个是如何抽象代码,以实现多事件触发。最后,我们会添加 WebSocket Server 以演示并行 IO 处理。
eventmachine 起步:
首先是安装 eventmachine : gem install eventmachine
你可以运行以下代码以作测试 ruby test1.rb,终止程序需要 kill 掉哦。
- # test1.rb
- require 'rubygems'
- require 'eventmachine'
- EventMachine.run {
- EventMachine.add_periodic_timer(1) {
- puts "Hello world"
- }
- }
这个程序每隔一秒输出一个信息,运行不了,首先检查人品问题哈。
它是如何运作的呢? require eventmachine
之后,我们调用了 EventMachine.run,它会接收一个代码块最为参数。现在我们可以无视这些,专注于代码块内部,没有这个 eventmachine
可没法工作啊(如果你有强烈的好奇心,可以学习下 .)
EventMachine.run 中我们调用了 add_periodic_timer ,并传入了另外一个代码块。这里告诉 eventmachine
每隔1秒触发一个事件,然后调用代码块。这是我们学到的第一个有用的知识,这个代码块被称之为回调。
你可能会想使用一个循环不是更简单
loop { puts 'hi'; sleep 1 } ,你想的没错。但我保证,往后看,你会知道我们的方式更优。
在网络 IO上使用 eventmachine。高效的 IO 是 eventmachine 的全部意义所在,一定要理解这一点。当你使用 eventmachine 进行网络 IO 编程时,你要么直接使用 eventmachine,要么是通过 eventmachine 钩子扩展的某种 lib (在 github 上你会找到很多的这种例子,很好识别,因为他们大多以 em- 开头命名)。然后你需要小心挑选使用哪些gems 来进行 database, api 等等的编程。
如果你选择不当就会阻塞 reactor,就是在 IO 操作结束前,eventmachine 将不会触发任何事件。比如说,如果你是用标准库中的
Net::HTTP ,向一个 URL
发出请求,预计10s响应,在此期间上面代码中的定时器都不会触发,直到操作结束。你已经完全丧失了并发性。
我们讨论下 HTTP client。 eventmachine 已经自带有两个不同的 HTTP
client,但都有些问题,我们推荐不要用它们,这里有个更为强大的模块:em-http-request
。
安装:gem install em-http-request让我们通过它发出一个 http 请求,看看它如何工作的。(注:EM 是 EventMachine 的缩写,大家都喜欢少打几个字吧!)
- require 'rubygems'
- require 'eventmachine'
- EM.run {
- require 'em-http'
- EM::HttpRequest.new('').get.callback { |http|
- puts http.response
- }
- }
Again we’re attaching a callback which is called once the request has
completed. We’re attaching it in a slightly different way to the timer above,
which we’ll discuss next.
Abstracting code that has a success or failure case
在设计 API 接口时,需要有办法来区分响应成功或者失败。在 Ruby中,有两种常用的方法。一种是返回 nil ,一种是抛出异常(比如
ActiveRecord::NotFound)。Eventmachine 提供了一种更为优雅的方案:the deferrable。
deferrable 是一个对象,通过它你可以添加上成功或者失败后的回调方法,名字分别为 callback 和 errback。愿意的话,你可以查看源码 ,
不算复杂.
前面代码中调用 HttpRequest#get 时,返回的就是一个 deferrable (实际上返回的是一个
EM::HttpClient 实例对象,它实际上也被 mix in 到了EM::Deferrable
模块中)。当然也有更为通用的办法,就是直接使用 EM::DefaultDeferrable。
As an excuse to use a deferrable ourselves I’ve decided that it would be a
jolly marvelous idea to look up the language of tweets as they arrive from the
twitter streaming API.
Handily, the Google AJAX
Language API allows you to get the language of any piece of text. It’s
designed to be used from the browser, but we won’t let a small matter like that
stop us for now (although you should if it’s a real application).
When I’m trying out a new API I generally start with HTTParty (gem
install httparty) since it’s just so quick and easy to use. Warning: you
can’t use HTTParty with eventmachine since it uses Net::HTTP under
the covers – this is just for testing!
- require 'rubygems'
- require 'httparty'
- require 'json'
- response = HTTParty.get("", :query => {
- :v => '1.0',
- :q => "Sgwn i os yw google yn deall Cymraeg?"
- })
- p JSON.parse(response.body)["responseData"]
- # => {"isReliable"=>true, "confidence"=>0.5834181, "language"=>"cy"}
Cool, Google understands Welsh!
在将这段代码修改为使用 use em-http-request 前 (HTTParty 是使用的Net::HTTP
),我们先将它封装成一个类,拿它同我们随后要写的 eventmachine 版本做下比较。无法判定语言则返回 nil 值。
- require 'rubygems'
- require 'httparty'
- require 'json'
- class LanguageDetector
- URL = ""
- def initialize(text)
- @text = text
- end
- # Returns the language if it can be detected, otherwise nil
- def language
- response = HTTParty.get(URL, :query => {:v => '1.0', :q => @text})
- return nil unless response.code == 200
- info = JSON.parse(response.body)["responseData"]
- return info['isReliable'] ? info['language'] : nil
- end
- end
- puts LanguageDetector.new("Sgwn i os yw google yn deall Cymraeg?").language
现在将代码修改为使用 em-http-request:
- require 'rubygems'
- require 'em-http'
- require 'json'
- class LanguageDetector
- URL = ""
- include EM::Deferrable
- def initialize(text)
- request = EM::HttpRequest.new(URL).get({
- :query => {:v => "1.0", :q => text}
- })
- # This is called if the request completes successfully (whatever the code)
- request.callback {
- if request.response_header.status == 200
- info = JSON.parse(request.response)["responseData"]
- if info['isReliable']
- self.succeed(info['language'])
- else
- self.fail("Language could not be reliably determined")
- end
- else
- self.fail("Call to fetch language failed")
- end
- }
- # This is called if the request totally failed
- request.errback {
- self.fail("Error making API call")
- }
- end
- end
- EM.run {
- detector = LanguageDetector.new("Sgwn i os yw google yn deall Cymraeg?")
- detector.callback { |lang| puts "The language was #{lang}" }
- detector.errback { |error| puts "Error: #{error}" }
- }
返回结果如下:
这段代码看起来完全不同。最大的区别是,由于引入了EM::Deferrable,无论我们的调用成功或者失败,相应的
callback 或 errback 代码都会被执行。
作为练习,你可以试着修改下代码,让它允许三次错误,期间重复调用相关方法。这同现实情况极其类似,通过这个封装我们很好的隐藏了复杂性,完成后我们也不用再关心内部实现细节了。
抽象代码,允许多次返回多个事件。
现在我们将进入 eventmachine 最擅长的领域,接触它最激动人心的特性。
我们将构建一个 client ,连接到 Twitter’s streaming api ,每当有消息到达时,将触发一系列事件。
使用 Twitter’s streaming
API ,你只需要开启一个活跃的长 HTTP 连接到stream.twitter.com,然后等待信息涌入。我们会再次使用到
em-http-request。连接到 API,监听所有的 tweets ,静待新的 twitter 到达,代码很简单:
- http = EventMachine::HttpRequest.new('').post({
- :head => { 'Authorization' => [username, password] },
- :query => { "track" => "newtwitter" }
- })
这里返回的是一个 deferrable (用于请求完成后触发相应回调函数), 实际上我们用到了另外一个技巧。 We can also register to
be notified every time new data is received:
- http.stream do |chunk|
- # This chunk may contain one or more tweets separated by \r\n
- end
让我们把代码组合起来,监听 tweets:
- require 'rubygems'
- require 'em-http'
- require 'json'
- EM.run {
- username = 'yourtwitterusername'
- password = 'password'
- term = 'newtwitter'
- buffer = ""
- http = EventMachine::HttpRequest.new('').post({
- :head => { 'Authorization' => [ username, password ] },
- :query => { "track" => term }
- })
- http.callback {
- unless http.response_header.status == 200
- puts "Call failed with response code #{http.response_header.status}"
- end
- }
- http.stream do |chunk|
- buffer += chunk
- while line = buffer.slice!(/.+\r\n/)
- tweet = JSON.parse(line)
- puts tweet['text']
- end
- end
- }
返回信息类似如下:
- Hey @Twitter. When shall I be getting the #NewTwitter?
- #NewTwitter #Perfect
- WHAHOO WTF? #NewTwitter is
- Buenos das a =) Estoy sola en la office, leyendo Le Monde y probando el #NewTwitter desde FireFox, que funciona de
- Curiosity and boredom got the better of me...I
It works! Now say we wanted to lookup the language of each tweet using the class
we built earlier. We could do this by adding further to our stream method,
however this is the road to callback hell (and just imagine what it would be
like if we hadn’t abstracted the language detection!).
- http.stream do |chunk|
- buffer += chunk
- while line = buffer.slice!(/.+\r\n/)
- tweet = JSON.parse(line)
- text = tweet['text']
- detector = LanguageDetector.new(text)
- detector.callback { |lang|
- puts "Language: #{lang}, tweet: #{text}"
- }
- detector.errback { |error|
- puts "Language could not be determined for tweet: #{text}"
- }
- end
- end
我们来重写优化部分代码。
前面我们使用 deferrable 来抽离代码,用于处理异步事件返回的成功或失败。另外一个在 eventmachine 中广泛采用的通用技术是提供一个
onevent 回调。比如类似如下的接口代码:
- stream = TweetStream.new(username, password, term)
- stream.ontweet { |tweet| puts tweet }
As if by magic here is the code!- require 'rubygems'
- require 'em-http'
- require 'json'
- class TwitterStream
- URL = ''
- def initialize(username, password, term)
- @username, @password = username, password
- @term = term
- @callbacks = []
- @buffer = ""
- listen
- end
- def ontweet(&block)
- @callbacks << block
- end
- private
- def listen
- http = EventMachine::HttpRequest.new(URL).post({
- :head => { 'Authorization' => [ @username, @password ] },
- :query => { "track" => @term }
- })
- http.stream do |chunk|
- @buffer += chunk
- process_buffer
- end
- end
- def process_buffer
- while line = @buffer.slice!(/.+\r\n/)
- tweet = JSON.parse(line)
- @callbacks.each { |c| c.call(tweet['text']) }
- end
- end
- end
现在我们可以对代码进一步优化:
- EM.run {
- stream = TwitterStream.new('yourtwitterusername', 'pass', 'newtwitter')
- stream.ontweet { |tweet|
- LanguageDetector.new(tweet).callback { |lang|
- puts "New tweet in #{lang}: #{tweet}"
- }
- }
- }
同一处理流程中混合不同的 IO
使用 eventmachine 不会阻塞任何 IO 操作,我们由此得到另外一个优势,同一处理流程中可以混合使用不同类型的 IO 。作为示例,我们使用 WebSocket 实时推送 tweets 到浏览器端。幸运的是已经有一个现成的使用 eventmachine 的 WebSocket server, (同 中使用的非常类似)。
先安装:gem install em-websocket.
启动服务,代码如下:
- stream.ontweet { |tweet|
- LanguageDetector.new(tweet).callback { |lang|
- puts "New tweet in #{lang}: #{tweet}"
- websocket_connections.each do |socket|
- socket.send(JSON.generate({
- :lang => lang,
- :tweet => tweet
- }))
- end
- }
- }
完成手工,代码很干净!
所有代码在这里可以找到:,包括连接 WebSocket
的最基本的 HTML 页面代码。可以复制它作为基础框架,再添加些时髦的视觉效果,你自己完成吧,我可没时间写。
延伸阅读
想学习更多,可以参考如下内容:
- The .
- Aman Gupta’s (he knows what he’s talking about since he maintains eventmachine).
- Joining the .
- Reading the .
- Asking on the irc channel (#eventmachine).
- Try node.js as well, it’s pretty cool and the same concepts apply.
玩的开心!
希望这篇文章能给你一些启示,可以一窥 eventmachine。欢迎提问,任何反馈可留言。谢谢!
Do read these awesome Guest Posts:
Technorati Tags: , , , , ,
Posted by Martyn Loughran
阅读(4004) | 评论(0) | 转发(0) |