分类:
2009-07-19 21:02:28
面向 Java 开发人员的 Scala 指南:
深入了解 Scala 并发性
对于许多(如果不是大多数)Java™ 程序员来说,Scala 的吸引力在于处理并发性以及编写线程安全的代码时非常轻松。在本期文章中,Ted Neward 将开始深入研究 Scala 语言及环境所提供的各种并发特性和库。
2003 年,Herb Sutter 在他的文章 “The Free Lunch Is Over” 中揭露了行业中最不可告人的一个小秘密,他明确论证了处理器在速度上的发展已经走到了尽头,并且将由全新的单芯片上的并行 “内核”(虚拟 CPU)所取代。这一发现对编程社区造成了不小的冲击,因为正确创建线程安全的代码,在理论而非实践中,始终会提高高性能开发人员的身价,而让各公司难以聘用他们。看上去,仅有少数人充分理解了 Java 的线程模型、并发 API 以及 “同步” 的含义,以便能够编写同时提供安全性和吞吐量的代码 —— 并且大多数人已经明白了它的困难所在。
据推测,行业的其余部分将自力更生,这显然不是一个理想的结局,至少不是 IT 部门努力开发软件所应得的回报。
|
与 Scala 在 .NET 领域中的姐妹语言 F# 相似,Scala 是针对 “并发性问题” 的解决方案之一。在本期文章中,我讨论了 Scala 的一些属性,这些属性使它更加胜任于编写线程安全的代码,比如默认不可修改的对象,并讨论了一种返回对象副本而不是修改它们内容的首选设计方案。Scala 对并发性的支持远比此深远;现在,我们有必要来了解一下 Scala 的各种库。
在深入研究 Scala 的并发性支持之前,有必要确保您具备了对 Java 基本并发性模型的良好理解,因为 Scala 的并发性支持,从某种程度上说,建立在 JVM 和支持库所提供的特性和功能的基础之上。为此,清单 1 中的代码包含了一个已知的 Producer/Consumer 并发性问题(详见 Sun Java Tutorial 的 “Guarded Blocks” 小节)。注意,Java Tutorial 版本并未在其解决方案中使用 java.util.concurrent
类,而是择优使用了 java.lang.Object
中的较旧的 wait()
/notifyAll()
方法:
package com.tedneward.scalaexamples.notj5; class Producer implements Runnable { private Drop drop; private String importantInfo[] = { "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" }; public Producer(Drop drop) { this.drop = drop; } public void run() { for (int i = 0; i < importantInfo.length; i++) { drop.put(importantInfo[i]); } drop.put("DONE"); } } class Consumer implements Runnable { private Drop drop; public Consumer(Drop drop) { this.drop = drop; } public void run() { for (String message = drop.take(); !message.equals("DONE"); message = drop.take()) { System.out.format("MESSAGE RECEIVED: %s%n", message); } } } class Drop { //Message sent from producer to consumer. private String message; //True if consumer should wait for producer to send message, //false if producer should wait for consumer to retrieve message. private boolean empty = true; //Object to use to synchronize against so as to not "leak" the //"this" monitor private Object lock = new Object(); public String take() { synchronized(lock) { //Wait until message is available. while (empty) { try { lock.wait(); } catch (InterruptedException e) {} } //Toggle status. empty = true; //Notify producer that status has changed. lock.notifyAll(); return message; } } public void put(String message) { synchronized(lock) { //Wait until message has been retrieved. while (!empty) { try { lock.wait(); } catch (InterruptedException e) {} } //Toggle status. empty = false; //Store message. this.message = message; //Notify consumer that status has changed. lock.notifyAll(); } } } public class ProdConSample { public static void main(String[] args) { Drop drop = new Drop(); (new Thread(new Producer(drop))).start(); (new Thread(new Consumer(drop))).start(); } } |
|
注意: 我在此处展示的代码对 Sun 教程解决方案做了少许修改;它们提供的代码存在一个很小的设计缺陷(参见 Java 教程 “缺陷”)。
Producer/Consumer 问题的核心非常容易理解:一个(或多个)生产者实体希望将数据提供给一个(或多个)使用者实体供它们使用和操作(在本例中,它包括将数据打印到控制台)。Producer
和 Consumer
类是相应直观的 Runnable
-实现类:Producer
从数组中获取 String
,并通过 put
将它们放置到 Consumer
的缓冲区中,并根据需要执行 take
。
问题的难点在于,如果 Producer
运行过快,则数据在覆盖时可能会丢失;如果 Consumer
运行过快,则当 Consumer
读取相同的数据两次时,数据可能会得到重复处理。缓冲区(在 Java Tutorial 代码中称作 Drop
)将确保不会出现这两种情况。数据破坏的可能性就更不用提了(在 String 引用的例子中很困难,但仍然值得注意),因为数据会由 put
放入缓冲区,并由 take
取出。
关于此主题的全面讨论请阅读 Brian Goetz 的 Java Concurrency in Practice 或 Doug Lea 的 Concurrent Programming in Java(参见 参考资料),但是,在应用 Scala 之前有必要快速了解一下此代码的运行原理。
当 Java 编译器看到 synchronized
关键字时,它会在同步块的位置生成一个 try
/finally
块,其顶部包括一个 monitorenter
操作码,并且 finally
块中包括一个 monitorexit
操作码,以确保监控程序(Java 的原子性基础)已经发布,而与代码退出的方式无关。因此,Drop
中的 put
代码将被重写,如清单 2 所示:
// This is pseudocode public void put(String message) { try { monitorenter(lock) //Wait until message has been retrieved. while (!empty) { try { lock.wait(); } catch (InterruptedException e) {} } //Toggle status. empty = false; //Store message. this.message = message; //Notify consumer that status has changed. lock.notifyAll(); } finally { monitorexit(lock) } } |
wait()
方法将通知当前线程进入非活动状态,并等待另一个线对该对象调用 notifyAll()
。然后,通知的线程必须在能够继续执行的时候尝试再次获取监控程序。从本质上说,wait()
和 notify()
/notifyAll()
允许一种简单的信令机制,它允许 Drop
在 Producer
和 Consumer
线程之间进行协调,每个 put
都有相应的 take
。
本文的 代码下载 部分使用 Java5 并发性增强(Lock
和 Condition
接口以及 ReentrantLock
锁定实现)提供 清单 2 的基于超时的版本,但基本代码模式仍然相同。这就是问题所在:编写清单 2 这样的代码的开发人员需要过度专注于线程和锁定的细节以及低级实现代码,以便让它们能够正确运行。此外,开发人员需要对每一行代码刨根知底,以确定是否需要保护它们,因为过度同步与过少同步同样有害。
现在,我们来看到 Scala 替代方案。
开始应用 Scala 并发性的一种方法是将 Java 代码直接转换为 Scala,以便利用 Scala 的语法优势来简化代码(至少能简化一点):
object ProdConSample { class Producer(drop : Drop) extends Runnable { val importantInfo : Array[String] = Array( "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" ); override def run() : Unit = { importantInfo.foreach((msg) => drop.put(msg)) drop.put("DONE") } } class Consumer(drop : Drop) extends Runnable { override def run() : Unit = { var message = drop.take() while (message != "DONE") { System.out.format("MESSAGE RECEIVED: %s%n", message) message = drop.take() } } } class Drop { var message : String = "" var empty : Boolean = true var lock : AnyRef = new Object() def put(x: String) : Unit = lock.synchronized { // Wait until message has been retrieved await (empty == true) // Toggle status empty = false // Store message message = x // Notify consumer that status has changed lock.notifyAll() } def take() : String = lock.synchronized { // Wait until message is available. await (empty == false) // Toggle status empty=true // Notify producer that staus has changed lock.notifyAll() // Return the message message } private def await(cond: => Boolean) = while (!cond) { lock.wait() } } def main(args : Array[String]) : Unit = { // Create Drop val drop = new Drop(); // Spawn Producer new Thread(new Producer(drop)).start(); // Spawn Consumer new Thread(new Consumer(drop)).start(); } } |
Producer
和 Consumer
类几乎与它们的 Java 同类相同,再一次扩展(实现)了 Runnable
接口并覆盖了 run()
方法,并且 — 对于 Producer
的情况 — 分别使用了内置迭代方法来遍历 importantInfo
数组的内容。(实际上,为了让它更像 Scala,importantInfo
可能应该是一个 List
而不是 Array
,但在第一次尝试时,我希望尽可能保证它们与原始 Java 代码一致。)
Drop
类同样类似于它的 Java 版本。但 Scala 中有一些例外,“synchronized” 并不是关键字,它是针对 AnyRef
类定义的一个方法,即 Scala “所有引用类型的根”。这意味着,要同步某个特定的对象,您只需要对该对象调用同步方法;在本例中,对 Drop
上的 lock 字段中所保存的对象调用同步方法。
注意,我们在 await()
方法定义的 Drop
类中还利用了一种 Scala 机制:cond
参数是等待计算的代码块,而不是在传递给该方法之前进行计算。在 Scala 中,这被称作 “call-by-name”;此处,它是一种实用的方法,可以捕获需要在 Java 版本中表示两次的条件等待逻辑(分别用于 put
和 take
)。
最后,在 main()
中,创建 Drop
实例,实例化两个线程,使用 start()
启动它们,然后在 main()
的结束部分退出,相信 JVM 会在 main()
结束之前启动这两个线程。(在生产代码中,可能无法保证这种情况,但对于这样的简单的例子,99.99% 没有问题。)
但是,已经说过,仍然存在相同的基本问题:程序员仍然需要过分担心两个线程之间的通信和协调问题。虽然一些 Scala 机制可以简化语法,但这目前为止并没有相当大的吸引力。
Scala Library Reference 中有一个有趣的包:scala.concurrency
。这个包包含许多不同的并发性结构,包括我们即将利用的 MailBox
类。
顾名思义,MailBox
从本质上说就是 Drop
,用于在检测之前保存数据块的单槽缓冲区。但是,MailBox
最大的优势在于它将发送和接收数据的细节完全封装到模式匹配和 case 类中,这使它比简单的 Drop
(或 Drop
的多槽数据保存类 java.util.concurrent.BoundedBuffer
)更加灵活。
package com.tedneward.scalaexamples.scala.V2 { import concurrent.{MailBox, ops} object ProdConSample { class Producer(drop : Drop) extends Runnable { val importantInfo : Array[String] = Array( "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" ); override def run() : Unit = { importantInfo.foreach((msg) => drop.put(msg)) drop.put("DONE") } } class Consumer(drop : Drop) extends Runnable { override def run() : Unit = { var message = drop.take() while (message != "DONE") { System.out.format("MESSAGE RECEIVED: %s%n", message) message = drop.take() } } } class Drop { private val m = new MailBox() private case class Empty() private case class Full(x : String) m send Empty() // initialization def put(msg : String) : Unit = { m receive { case Empty() => m send Full(msg) } } def take() : String = { m receive { case Full(msg) => m send Empty(); msg } } } def main(args : Array[String]) : Unit = { // Create Drop val drop = new Drop() // Spawn Producer new Thread(new Producer(drop)).start(); // Spawn Consumer new Thread(new Consumer(drop)).start(); } } } |
此处,v2 和 v1 之间的惟一区别在于 Drop
的实现,它现在利用 MailBox
类处理传入以及从 Drop
中删除的消息的阻塞和信号事务。(我们可以重写 Producer
和 Consumer
,让它们直接使用 MailBox
,但考虑到简单性,我们假定希望保持所有示例中的 Drop
API 相一致。)使用 MailBox
与使用典型的 BoundedBuffer
(Drop
)稍有不同,因此我们来仔细看看其代码。
MailBox
有两个基本操作:send
和 receive
。receiveWithin
方法仅仅是基于超时的 receive
。MailBox
接收任何类型的消息。send()
方法将消息放置到邮箱中,并立即通知任何关心该类型消息的等待接收者,并将它附加到一个消息链表中以便稍后检索。receive()
方法将阻塞,直到接收到对于功能块合适的消息。
因此,在这种情况下,我们将创建两个 case 类,一个不包含任何内容(Empty
),这表示 MailBox
为空,另一个包含消息数据(Full
。
put
方法,由于它会将数据放置在 Drop
中,对 MailBox
调用 receive()
以查找 Empty
实例,因此会阻塞直到发送 Empty
。此时,它发送一个 Full
实例给包含新数据的 MailBox
。
take
方法,由于它会从 Drop
中删除数据,对 MailBox
调用 receive()
以查找 Full
实例,提取消息(再次得益于模式匹配从 case 类内部提取值并将它们绑到本地变量的能力)并发送一个 Empty
实例给 MailBox
。 不需要明确的锁定,并且不需要考虑监控程序。
事实上,我们可以显著缩短代码,只要 Producer
和 Consumer
不需要功能全面的类(此处便是如此) — 两者从本质上说都是 Runnable.run()
方法的瘦包装器,Scala 可以使用 scala.concurrent.ops
对象的 spawn
方法来实现,如清单 5 所示:
package com.tedneward.scalaexamples.scala.V3 { import concurrent.MailBox import concurrent.ops._ object ProdConSample { class Drop { private val m = new MailBox() private case class Empty() private case class Full(x : String) m send Empty() // initialization def put(msg : String) : Unit = { m receive { case Empty() => m send Full(msg) } } def take() : String = { m receive { case Full(msg) => m send Empty(); msg } } } def main(args : Array[String]) : Unit = { // Create Drop val drop = new Drop() // Spawn Producer spawn { val importantInfo : Array[String] = Array( "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" ); importantInfo.foreach((msg) => drop.put(msg)) drop.put("DONE") } // Spawn Consumer spawn { var message = drop.take() while (message != "DONE") { System.out.format("MESSAGE RECEIVED: %s%n", message) message = drop.take() } } } } } |
spawn
方法(通过包块顶部的 ops
对象导入)接收一个代码块(另一个 by-name 参数示例)并将它包装在匿名构造的线程对象的 run()
方法内部。事实上,并不难理解 spawn
的定义在 ops
类的内部是什么样的:
def spawn(p: => Unit) = { val t = new Thread() { override def run() = p } t.start() } |
……这再一次强调了 by-name 参数的强大之处。
ops.spawn
方法的一个缺点在于,它是在 2003 年 Java 5 concurrency 类还不可用的时候编写的。特别是,java.util.concurrent.Executor
及其同类的作用是让开发人员更加轻松地生成线程,而不需要实际处理直接创建线程对象的细节。幸运的是,在您自己的自定义库中重新创建 spawn
的定义是相当简单的,这需要利用 Executor
(或 ExecutorService
或 ScheduledExecutorService
)来执行线程的实际启动任务。
事实上,Scala 的并发性支持超越了 MailBox
和 ops
类;Scala 还支持一个类似的 “Actors” 概念,它使用了与 MailBox
所采用的方法相类似的消息传递方法,但应用更加全面并且灵活性也更好。但是,这部分内容将在下期讨论。
Scala 为并发性提供了两种级别的支持,这与其他与 Java 相关的主题极为类似:
wait()
/notifyAll()
)。
MailBox
类以及将在本系列下一篇文章中讨论的 Actors 库。 两个例子中的目标是相同的:让开发人员能够更加轻松地专注于问题的实质,而不用考虑并发编程的低级细节(显然,第二种方法更好地实现了这一目标,至少对于没有过多考虑低级细节的人来说是这样的。)
但是,当前 Scala 库的一个明显的缺陷就是缺乏 Java 5 支持;scala.concurrent.ops
类应该具有 spawn
这样的利用新的 Executor
接口的方法。它还应该支持利用新的 Lock
接口的各种版本的 synchronized
。幸运的是,这些都是可以在 Scala 生命周期中实现的库增强,而不会破坏已有代码;它们甚至可以由 Scala 开发人员自己完成,而不需要等待 Scala 的核心开发团队提供给他们(只需要花费少量时间)。