Chinaunix首页 | 论坛 | 博客
  • 博客访问: 80448
  • 博文数量: 31
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 340
  • 用 户 组: 普通用户
  • 注册时间: 2013-04-02 20:25
文章分类

全部博文(31)

文章存档

2015年(2)

2014年(29)

我的朋友

分类: Java

2014-09-02 21:49:55

   在两种情况下需要同步,一是访问临届资源,二是线程或进程间合作。这两种对应Java语言的API分别是锁跟wait/notify。同步进入临界区通常需要满足4个条件:
1. 空闲让进,这是显然的
2. 忙则等待,这也是显然的
3. 有限等待,这是防止线程死等,synchronized并不支持,并发包中的Lock支持
4. 让权等待,不能进入
临界区时,立即释放处理机。这条可不一定要遵循,现在都有多个处理机,让线程自旋会,可能有更好的性能。

一. 自旋PV

   P(S):                                                   V(S):
        while(S<=0);                                       S=S+1;
        S=S-1;
 这个显然不支持让权等待,但不见的就是不好的。

二. 阻塞型PV

  P(S):                                              V(S):
     S=S-1;                                               S=S+1;
     if(S<0) block(S, thread);                      if(S<=0) wakeup(S);
 这符合让权等待,类似于Java中的wait/notify。

三. 生产者消费者问题

  这是个经典问题,并发编程很多地方需要用到该模式,能使程序更加清晰。
  这里的临界资源是缓冲区,缓冲区必有个属性,那就是大小(n),还有个属性就是当前容量(in)。其中n是不变量,in是个变量随生成跟消费而变动,所以in肯定需要互斥访问,则需要一个信号量mutex=1来保证in的互斥访问。生产者能生产的条件是缓冲区还有空间,可用一个信号量empty=n来表示是否还有空间。消费者能消费的条件是缓冲区有元素可以消费,可用信号量full=0来表示。
  所以:需要第一3个信号量,mutex=1、empty=n、full=0。
生产者:                                            消费者:
        item = produce();                               P(full)
        P(empty)                                            P(mutex)
        P(mutex)                                            item=buffer(in);  
        buffer(in) = item;                                in=in-1;
        in = (in + 1)%n;                                V(mutex);
        V(mutex)                                           V(empty);
        V(full)

点击(此处)折叠或打开

  1. static class BlockQ<E> {
  2.         private int n;
  3.         private int size = 0;
  4.         private Object[] ar;

  5.         public BlockQ(int n) {
  6.             this.n = n;
  7.             ar = new Object[n];
  8.         }

  9.         public synchronized void put(E e) throws InterruptedException {

  10.             while (size >= n)
  11.                 wait();
  12.             ar[size++] = e;
  13.             notifyAll();

  14.         }

  15.         @SuppressWarnings("unchecked")
  16.         public synchronized E take() throws InterruptedException {
  17.             E e = null;

  18.             while (size <= 0)
  19.                 wait();
  20.             e = (E) ar[--size];
  21.             notifyAll();

  22.             return e;
  23.         }
  24.     }

  25.     static class BlockQ2<E> {
  26.         private Semaphore empty;
  27.         private Semaphore mutex;
  28.         private Semaphore full;
  29.         private int in = 0;
  30.         private Object[] ar;

  31.         public BlockQ2(int n) {
  32.             empty = new Semaphore(n);
  33.             full = new Semaphore(0);
  34.             mutex = new Semaphore(1);
  35.             ar = new Object[n];
  36.         }

  37.         public void put(E e) throws InterruptedException {
  38.             empty.acquire();
  39.             mutex.acquire();
  40.             ar[in++] = e;
  41.             mutex.release();
  42.             full.release();
  43.         }

  44.         @SuppressWarnings("unchecked")
  45.         public E take() throws InterruptedException {
  46.             E e = null;
  47.             full.acquire();
  48.             mutex.acquire();
  49.             e = (E) ar[--in];
  50.             mutex.release();
  51.             empty.release();
  52.             return e;
  53.         }
  54.     }








阅读(869) | 评论(0) | 转发(0) |
0

上一篇:5. ThreadLocal

下一篇:7. 线程池

给主人留下些什么吧!~~