Chinaunix首页 | 论坛 | 博客
  • 博客访问: 396449
  • 博文数量: 69
  • 博客积分: 1984
  • 博客等级: 上尉
  • 技术积分: 953
  • 用 户 组: 普通用户
  • 注册时间: 2007-03-28 00:43
个人简介

学无所长,一事无成

文章分类

全部博文(69)

文章存档

2015年(19)

2014年(14)

2013年(9)

2012年(17)

2010年(10)

我的朋友

分类: Python/Ruby

2012-07-02 14:54:47

参考文章:http://rubylearning.com/blog/2010/10/01/an-introduction-to-eventmachine-and-how-to-avoid-callback-spaghetti/

eventmachine 介绍以及如何避免回调混乱。

事件驱动编程最近变得很新潮,这很大程度是源于     项目的优雅。其实在 ruby 世界里面通过 eventmachine,我们 已经使用了好多年的事件编程了(eventmachine 为 ruby 添加了事件 IO)。一般来说编写事件驱动的代码比较复杂且混乱,但实际上我们写出来的代码非常漂亮。你只会用到不多的特殊技巧。

最大的挑战实际上是真正理解如何创建一个干净的模型抽象。由于需求不同,代码结构不同,不认真规划,你会很快发现你掉入到回调迷阵当中(这里有个形象的说法,叫意大利面)。此文中会讲解一些通用的模板,其中混合用到了 Twitter streaming API, Google’s language API, and a WebSocket server。绝对没有意大利面,我保证!

做完 eventmachine 入门讲解后,我们会讨论两种通用的抽象。第一个叫对象延迟处理,类似于异步方法调用。第二个是如何抽象代码,以实现多事件触发。最后,我们会添加 WebSocket Server 以演示并行 IO 处理。

eventmachine 起步:

首先是安装 eventmachine :  gem install eventmachine

你可以运行以下代码以作测试 ruby test1.rb,终止程序需要 kill 掉哦。

  1. # test1.rb

  2. require 'rubygems'
  3. require 'eventmachine'

  4. EventMachine.run {
  5.   EventMachine.add_periodic_timer(1) {
  6.     puts "Hello world"
  7.   }
  8. }

这个程序每隔一秒输出一个信息,运行不了,首先检查人品问题哈。

它是如何运作的呢? require eventmachine 之后,我们调用了 EventMachine.run,它会接收一个代码块最为参数。现在我们可以无视这些,专注于代码块内部,没有这个 eventmachine 可没法工作啊(如果你有强烈的好奇心,可以学习下 .)  EventMachine.run  中我们调用了 add_periodic_timer ,并传入了另外一个代码块。这里告诉 eventmachine  每隔1秒触发一个事件,然后调用代码块。这是我们学到的第一个有用的知识,这个代码块被称之为回调。

你可能会想使用一个循环不是更简单  loop { puts 'hi'; sleep 1 } ,你想的没错。但我保证,往后看,你会知道我们的方式更优。

在网络 IO上使用 eventmachine。

高效的 IO 是 eventmachine 的全部意义所在,一定要理解这一点。当你使用 eventmachine 进行网络 IO 编程时,你要么直接使用 eventmachine,要么是通过 eventmachine 钩子扩展的某种 lib (在 github 上你会找到很多的这种例子,很好识别,因为他们大多以  em- 开头命名)。然后你需要小心挑选使用哪些gems 来进行 database, api 等等的编程。

如果你选择不当就会阻塞 reactor,就是在 IO 操作结束前,eventmachine 将不会触发任何事件。比如说,如果你是用标准库中的 Net::HTTP ,向一个 URL 发出请求,预计10s响应,在此期间上面代码中的定时器都不会触发,直到操作结束。你已经完全丧失了并发性。

我们讨论下 HTTP client。 eventmachine 已经自带有两个不同的 HTTP client,但都有些问题,我们推荐不要用它们,这里有个更为强大的模块:em-http-request

安装:gem install em-http-request让我们通过它发出一个 http 请求,看看它如何工作的。(注:EM 是 EventMachine 的缩写,大家都喜欢少打几个字吧!)
  1. require 'rubygems'
  2. require 'eventmachine'

  3. EM.run {
  4.   require 'em-http'

  5.   EM::HttpRequest.new('').get.callback { |http|
  6.     puts http.response
  7.   }
  8. }

Again we’re attaching a callback which is called once the request has completed. We’re attaching it in a slightly different way to the timer above, which we’ll discuss next.

Abstracting code that has a success or failure case

在设计 API 接口时,需要有办法来区分响应成功或者失败。在 Ruby中,有两种常用的方法。一种是返回 nil ,一种是抛出异常(比如 ActiveRecord::NotFound)。Eventmachine 提供了一种更为优雅的方案:the deferrable。

deferrable 是一个对象,通过它你可以添加上成功或者失败后的回调方法,名字分别为 callback 和 errback。愿意的话,你可以查看源码 , 不算复杂.

前面代码中调用 HttpRequest#get 时,返回的就是一个 deferrable (实际上返回的是一个 EM::HttpClient 实例对象,它实际上也被 mix in 到了EM::Deferrable 模块中)。当然也有更为通用的办法,就是直接使用 EM::DefaultDeferrable。

As an excuse to use a deferrable ourselves I’ve decided that it would be a jolly marvelous idea to look up the language of tweets as they arrive from the twitter streaming API.

Handily, the Google AJAX Language API allows you to get the language of any piece of text. It’s designed to be used from the browser, but we won’t let a small matter like that stop us for now (although you should if it’s a real application).

When I’m trying out a new API I generally start with HTTParty (gem install httparty) since it’s just so quick and easy to use. Warning: you can’t use HTTParty with eventmachine since it uses Net::HTTP under the covers – this is just for testing!


  1. require 'rubygems'
  2. require 'httparty'
  3. require 'json'

  4. response = HTTParty.get("", :query => {
  5.   :v => '1.0',
  6.   :q => "Sgwn i os yw google yn deall Cymraeg?"
  7. })

  8. p JSON.parse(response.body)["responseData"]

  9. # => {"isReliable"=>true, "confidence"=>0.5834181, "language"=>"cy"}

Cool, Google understands Welsh!

在将这段代码修改为使用 use em-http-request 前 (HTTParty 是使用的Net::HTTP ),我们先将它封装成一个类,拿它同我们随后要写的 eventmachine 版本做下比较。无法判定语言则返回 nil 值。

  1. require 'rubygems'
  2. require 'httparty'
  3. require 'json'

  4. class LanguageDetector
  5.   URL = ""

  6.   def initialize(text)
  7.     @text = text
  8.   end

  9.   # Returns the language if it can be detected, otherwise nil
  10.   def language
  11.     response = HTTParty.get(URL, :query => {:v => '1.0', :q => @text})

  12.     return nil unless response.code == 200

  13.     info = JSON.parse(response.body)["responseData"]

  14.     return info['isReliable'] ? info['language'] : nil
  15.   end
  16. end

  17. puts LanguageDetector.new("Sgwn i os yw google yn deall Cymraeg?").language

现在将代码修改为使用 em-http-request:

  1. require 'rubygems'
  2. require 'em-http'
  3. require 'json'

  4. class LanguageDetector
  5.   URL = ""

  6.   include EM::Deferrable

  7.   def initialize(text)
  8.     request = EM::HttpRequest.new(URL).get({
  9.       :query => {:v => "1.0", :q => text}
  10.     })

  11.     # This is called if the request completes successfully (whatever the code)
  12.     request.callback {
  13.       if request.response_header.status == 200
  14.         info = JSON.parse(request.response)["responseData"]
  15.         if info['isReliable']
  16.           self.succeed(info['language'])
  17.         else
  18.           self.fail("Language could not be reliably determined")
  19.         end
  20.       else
  21.         self.fail("Call to fetch language failed")
  22.       end
  23.     }

  24.     # This is called if the request totally failed
  25.     request.errback {
  26.       self.fail("Error making API call")
  27.     }
  28.   end
  29. end

  30. EM.run {
  31.   detector = LanguageDetector.new("Sgwn i os yw google yn deall Cymraeg?")
  32.   detector.callback { |lang| puts "The language was #{lang}" }
  33.   detector.errback { |error| puts "Error: #{error}" }
  34. }
返回结果如下:

  1. The language was cy

这段代码看起来完全不同。最大的区别是,由于引入了EM::Deferrable,无论我们的调用成功或者失败,相应的 callback 或 errback 代码都会被执行。

作为练习,你可以试着修改下代码,让它允许三次错误,期间重复调用相关方法。这同现实情况极其类似,通过这个封装我们很好的隐藏了复杂性,完成后我们也不用再关心内部实现细节了。

抽象代码,允许多次返回多个事件。

现在我们将进入 eventmachine  最擅长的领域,接触它最激动人心的特性。

我们将构建一个 client ,连接到 Twitter’s streaming api ,每当有消息到达时,将触发一系列事件。 

使用 Twitter’s streaming API ,你只需要开启一个活跃的长 HTTP 连接到stream.twitter.com,然后等待信息涌入。我们会再次使用到 em-http-request。连接到 API,监听所有的 tweets ,静待新的 twitter 到达,代码很简单:


  1. http = EventMachine::HttpRequest.new('').post({
  2.   :head => { 'Authorization' => [username, password] },
  3.   :query => { "track" => "newtwitter" }
  4. })
这里返回的是一个 deferrable (用于请求完成后触发相应回调函数), 实际上我们用到了另外一个技巧。 We can also register to be notified every time new data is received:

  1. http.stream do |chunk|
  2.   # This chunk may contain one or more tweets separated by \r\n
  3. end
让我们把代码组合起来,监听 tweets:

  1. require 'rubygems'
  2. require 'em-http'
  3. require 'json'

  4. EM.run {
  5.   username = 'yourtwitterusername'
  6.   password = 'password'
  7.   term = 'newtwitter'
  8.   buffer = ""

  9.   http = EventMachine::HttpRequest.new('').post({
  10.     :head => { 'Authorization' => [ username, password ] },
  11.     :query => { "track" => term }
  12.   })

  13.   http.callback {
  14.     unless http.response_header.status == 200
  15.       puts "Call failed with response code #{http.response_header.status}"
  16.     end
  17.   }

  18.   http.stream do |chunk|
  19.     buffer += chunk
  20.     while line = buffer.slice!(/.+\r\n/)
  21.       tweet = JSON.parse(line)
  22.       puts tweet['text']
  23.     end
  24.   end
  25. }

返回信息类似如下:

  1. Hey @Twitter. When shall I be getting the #NewTwitter?
  2. #NewTwitter #Perfect
  3. WHAHOO WTF? #NewTwitter is
  4. Buenos das a =) Estoy sola en la office, leyendo Le Monde y probando el #NewTwitter desde FireFox, que funciona de
  5. Curiosity and boredom got the better of me...I
It works! Now say we wanted to lookup the language of each tweet using the class we built earlier. We could do this by adding further to our stream method, however this is the road to callback hell (and just imagine what it would be like if we hadn’t abstracted the language detection!).


  1. http.stream do |chunk|
  2.   buffer += chunk
  3.   while line = buffer.slice!(/.+\r\n/)
  4.     tweet = JSON.parse(line)
  5.     text = tweet['text']
  6.     detector = LanguageDetector.new(text)
  7.     detector.callback { |lang|
  8.       puts "Language: #{lang}, tweet: #{text}"
  9.     }
  10.     detector.errback { |error|
  11.       puts "Language could not be determined for tweet: #{text}"
  12.     }
  13.   end
  14. end

我们来重写优化部分代码。

前面我们使用 deferrable 来抽离代码,用于处理异步事件返回的成功或失败。另外一个在 eventmachine 中广泛采用的通用技术是提供一个 onevent 回调。比如类似如下的接口代码:

  1. stream = TweetStream.new(username, password, term)
  2. stream.ontweet { |tweet| puts tweet }
As if by magic here is the code!

  1. require 'rubygems'
  2. require 'em-http'
  3. require 'json'

  4. class TwitterStream
  5.   URL = ''

  6.   def initialize(username, password, term)
  7.     @username, @password = username, password
  8.     @term = term
  9.     @callbacks = []
  10.     @buffer = ""
  11.     listen
  12.   end

  13.   def ontweet(&block)
  14.     @callbacks << block
  15.   end

  16.   private

  17.   def listen
  18.     http = EventMachine::HttpRequest.new(URL).post({
  19.       :head => { 'Authorization' => [ @username, @password ] },
  20.       :query => { "track" => @term }
  21.     })

  22.     http.stream do |chunk|
  23.       @buffer += chunk
  24.       process_buffer
  25.     end
  26.   end

  27.   def process_buffer
  28.     while line = @buffer.slice!(/.+\r\n/)
  29.       tweet = JSON.parse(line)

  30.       @callbacks.each { |c| c.call(tweet['text']) }
  31.     end
  32.   end
  33. end

现在我们可以对代码进一步优化:

  1. EM.run {
  2.   stream = TwitterStream.new('yourtwitterusername', 'pass', 'newtwitter')
  3.   stream.ontweet { |tweet|
  4.     LanguageDetector.new(tweet).callback { |lang|
  5.       puts "New tweet in #{lang}: #{tweet}"
  6.     }
  7.   }
  8. }

同一处理流程中混合不同的 IO 

使用 eventmachine 不会阻塞任何 IO 操作,我们由此得到另外一个优势,同一处理流程中可以混合使用不同类型的 IO 。作为示例,我们使用 WebSocket 实时推送 tweets 到浏览器端。幸运的是已经有一个现成的使用 eventmachine 的 WebSocket server, (同  中使用的非常类似)。

先安装:gem install em-websocket.

启动服务,代码如下:

  1. stream.ontweet { |tweet|
  2.   LanguageDetector.new(tweet).callback { |lang|
  3.     puts "New tweet in #{lang}: #{tweet}"

  4.     websocket_connections.each do |socket|
  5.       socket.send(JSON.generate({
  6.         :lang => lang,
  7.         :tweet => tweet
  8.       }))
  9.     end
  10.   }
  11. }
完成手工,代码很干净!

所有代码在这里可以找到:,包括连接 WebSocket 的最基本的 HTML 页面代码。可以复制它作为基础框架,再添加些时髦的视觉效果,你自己完成吧,我可没时间写。

延伸阅读

想学习更多,可以参考如下内容:

  • The .
  • Aman Gupta’s  (he knows what he’s talking about since he maintains eventmachine).
  • Joining the .
  • Reading the .
  • Asking on the irc channel (#eventmachine).
  • Try node.js as well, it’s pretty cool and the same concepts apply.
玩的开心!
希望这篇文章能给你一些启示,可以一窥 eventmachine。欢迎提问,任何反馈可留言。谢谢!

Technorati Tags: , , , , , 

Posted by Martyn Loughran




阅读(4004) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~