随着多线程基础总结的增多,却明显的感觉知道的越来越少,好像转了一圈又回到了什么都不懂的起点。不过还是试着介绍一下队列的并发实现,努力尽快的驱散迷
雾。队列这个数据结构已经很熟悉了,利用其先进先出的特性,多数生产消费模型的首选数据结构就是队列。对于有多个生产者和多个消费者线程的模型来说,最重
要是他们共同访问的Queue是线程安全的。JDK中提供的线程安全的Queue的实现还是很丰富
的:ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,DelayQueue,ConcurrentLinkedQueue
等等,多数情况下使用这些数据结构编写并发程序足够了。ArrayBlockingQueue的实现之前的总结中已经有介绍,所以这次是分析一下
LinkedBlockingQueue和ConcurrentLinkedQueue的源码实现。
首先从简单的开始,先看看LinkedBlockingQueue线程安全的实现。之所以介绍它是因为其实现比较典型,对比
ArrayBlokcingQueue使用一个ReentrantLock和两个Condition维护内部的数组来说,它使用了两个
ReentrantLock,并且分别对应一个Condition来实现对内部数据结构Node型变量的维护。
- public class LinkedBlockingQueue extends AbstractQueue
- implements BlockingQueue, java.io.Serializable {
- private static final long serialVersionUID = -6903933977591709194L;
-
-
-
-
- static class Node {
-
- volatile E item;
- Node next;
- Node(E x) { item = x; }
- }
-
-
- private final int capacity;
-
-
- private final AtomicInteger count = new AtomicInteger(0);
-
-
- private transient Node head;
-
-
- private transient Node last;
-
-
- private final ReentrantLock takeLock = new ReentrantLock();
-
-
- private final Condition notEmpty = takeLock.newCondition();
-
-
- private final ReentrantLock putLock = new ReentrantLock();
-
-
- private final Condition notFull = putLock.newCondition();
-
- private void signalNotEmpty() {
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- try {
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- }
-
- private void signalNotFull() {
- final ReentrantLock putLock = this.putLock;
- putLock.lock();
- try {
- notFull.signal();
- } finally {
- putLock.unlock();
- }
- }
-
- private void insert(E x) {
- last = last.next = new Node(x);
- }
-
- private E extract() {
- Node first = head.next;
- head = first;
- E x = first.item;
- first.item = null;
- return x;
- }
-
- private void fullyLock() {
- putLock.lock();
- takeLock.lock();
- }
-
- private void fullyUnlock() {
- takeLock.unlock();
- putLock.unlock();
- }
-
- public LinkedBlockingQueue(int capacity) {
- if (capacity <= 0) throw new IllegalArgumentException();
- this.capacity = capacity;
- last = head = new Node(null);
- }
- ...
- }
这里仅仅展示部分源码,主要的方法在后面的分析中列出。分析之前明确一个最基本的概念。天天念叨着编写线程安全的类,什么是线程安全的类?那就是
类内共享的全局变量的访问必须保证是不受多线程形式影响的。如果由于多线程的访问(改变,遍历,查看)而使这些变量结构被破坏或者针对这些变量操作的原子
性被破坏,则这个类的编写不是线程安全的。
明确了这个基本的概念就可以很好的理解这个Queue的实现为什么是线程安全的了。在LinkedBlockingQueue的所有共享的全局变量
中,final声明的capacity在构造器生成实例时就成了不变量了。而final声明的count由于是AtomicInteger类型的,所以能
够保证其操作的原子性。剩下的final的变量都是初始化成了不变量,并且不包含可变属性,所以都是访问安全的。那么剩下的就是Node类型的head和
last两个可变量。
所以要保证LinkedBlockingQueue是线程安全的就是要保证对head和last的访问是线程安全的。
首先从上面的源码可以看到insert(E
x),extract()是真正的操作head,last来入队和出对的方法,但是由于是私有的,所以不能被直接访问,不用担心线程的问题。实际入队的公
开的方法是put(E e),offer(E e)和offer(E e, long timeout, TimeUnit
unit)。put(...)方法与offer(...)都是把新元素加入到队尾,所不同的是如果不满足条件put会把当前执行的线程扔到等待集中等待被
唤醒继续执行,而offer则是直接退出,所以如果是需要使用它的阻塞特性的话,不能直接使用poll(...)。
put(...)方法中加入元素的操作使用this.putLock来限制多线程的访问,并且使用了可中断的方式:
- public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- int c = -1;
- final ReentrantLock putLock = this.putLock;
- final AtomicInteger count = this.count;
- putLock.lockInterruptibly();
- try {
-
- try {
- while (count.get() == capacity)
- notFull.await();
- } catch (InterruptedException ie) {
- notFull.signal();
- throw ie;
- }
-
- insert(e);
-
- c = count.getAndIncrement();
- if (c + 1 < capacity)
- notFull.signal();
-
- } finally {
- putLock.unlock();
- }
- if (c == 0)
- signalNotEmpty();
- }
代码段(1)是阻塞操作,代码段(2)是count递增和唤醒等待的操作。
两者之间的insert(e)才是入队操作,其实际是操作的队尾引用last,并且没有牵涉到head。
所以设计两个锁的原因就在这里!因为出队操作take(),poll()实际是执行extract()仅仅操作队首引用head。增加了
this.takeLock这个锁,就实现了多个不同任务的线程入队的同时可以进行出对的操作,并且由于两个操作所共同使用的count是
AtomicInteger类型的,所以完全不用考虑计数器递增递减的问题。假设count换成int,则相应的putLock内的count++和
takeLock内的count--有可能相互覆盖,最终造成count的值被腐蚀,故这种设计必须使用原子操作类。
我之前说过,保证类的线程安全只要保证head和last的操作的线程安全,也就是保证insert(E
x)和extract()线程安全即可。那么上面的put方法中的代码段(1)放在a,b之间,代码段(2)放在c,d之间不是更好?毕竟锁的粒度越小越
好。单纯的考虑count的话这样的改变是正确的,
但是await()和singal()这两个方法执行时都会检查当前线程是否是独占锁的那个线程,如果不是则抛出java.lang.IllegalMonitorStateException异常。而这两段代码中包含notFull.await()和notFull.signal()这两句使得(1),(2)必须放在lock保护块内。这里说明主要是count本身并不需要putLock或者takeLock的保护,从
- public int size() {
- return count.get();
- }
可以看出count的访问是不需要任何锁的。而在put等方法中,其与锁机制的混用很容易造成迷惑。最后put中的代码d的作用主要是一个低位及
时通知的作用,也就是队列刚有值试图获得takeLock去通知等待集中的出队线程。因为c==0意味着count.getAndIncrement()
原子递增成功,所以count > 0成立。类似作用的代码:
- if (c == capacity)
- signalNotFull();
在take和poll中也有出现,实现了高位及时通知。
分析完了put,对应的offer,take,poll方法都是类似的实现。下面看看遍历队列的操作:
- public Object[] toArray() {
- fullyLock();
- try {
- int size = count.get();
- Object[] a = new Object[size];
- int k = 0;
- for (Node p = head.next; p != null; p = p.next)
- a[k++] = p.item;
- return a;
- } finally {
- fullyUnlock();
- }
- }
这个方法很简单主要是要清楚一点:这个操作执行时不允许其他线程再修改队首和队尾,所以使用了fullyLock去获取putLock和takeLock,只要成功则可以保证不会再有修改队列的操作。然后就是安心的遍历到最后一个元素为止了。
另外在offer(E e, long timeout, TimeUnit unit)这个方法中提供了带有超时的入队操作,如果一直不成功的话,它会尝试在timeout的时间内入队:
- for (;;) {
- ...
- if (nanos <= 0)
- return false;
- try {
- nanos = notFull.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- notFull.signal();
- throw ie;
- }
- }
其内部循环使用notFull.awaitNanos(nanos)方法反复的计算剩余时间的大概值用于实现延时功能。nanos<=0则放弃尝试,直接退出。
整体而言,LinkedBlockingQueue的实现还是很清晰的。相对于后面要介绍的ConcurrentLinkedQueue来说,它属于简单
的实现。这些看似复杂的数据结构的实现实质都是多线程的基础的综合应用。就好像数学中千变万化的难题其实都是基础公式的组合一样,如果有清晰的基础认知,
还是能找到自己分析的思路的。本来是想从mina中找找类似的实现,不过很遗憾的是它好像仅仅实现了一个非线程安全的循环队列,然后在其基础上使用
synchronized进行封装成线程安全的Queue。
阅读(7159) | 评论(0) | 转发(0) |