Chinaunix首页 | 论坛 | 博客
  • 博客访问: 910049
  • 博文数量: 91
  • 博客积分: 803
  • 博客等级: 准尉
  • 技术积分: 1051
  • 用 户 组: 普通用户
  • 注册时间: 2012-05-24 13:42
文章分类

全部博文(91)

文章存档

2021年(1)

2020年(4)

2019年(4)

2018年(9)

2017年(11)

2016年(11)

2015年(6)

2014年(3)

2013年(28)

2012年(14)

分类: 大数据

2018-05-10 18:16:22

0.基类
连接服务器 ,设置default watch

点击(此处)折叠或打开

  1. package com.test.zookeeper.queue;

  2. import java.io.IOException;

  3. import org.apache.zookeeper.WatchedEvent;
  4. import org.apache.zookeeper.Watcher;
  5. import org.apache.zookeeper.ZooKeeper;

  6. public class SyncPrimitive implements Watcher {
  7.     static ZooKeeper zk = null;
  8.     static Integer mutex;
  9.     static int i=0;
  10.     String root;

  11.     SyncPrimitive(String address) {
  12.         if(zk == null){
  13.             try {
  14.                 System.out.println("Starting ZK:");
  15.                 zk = new ZooKeeper(address, 3000, this);
  16.                 mutex = new Integer(-1);
  17.                 System.out.println("Finished starting ZK: " + zk);
  18.             } catch (IOException e) {
  19.                 System.out.println(e.toString());
  20.                 zk = null;
  21.             }
  22.         }
  23.     }

  24.     synchronized public void process(WatchedEvent event) {
  25.         synchronized (mutex) {
  26.             i++;
  27.             System.out.println(i);
  28.             mutex.notify();
  29.         }
  30.     }
  31. }
1.queue实现类


点击(此处)折叠或打开

  1. package com.test.zookeeper.queue;

  2. import java.nio.ByteBuffer;
  3. import java.util.List;

  4. import org.apache.zookeeper.CreateMode;
  5. import org.apache.zookeeper.KeeperException;
  6. import org.apache.zookeeper.ZooDefs.Ids;
  7. import org.apache.zookeeper.data.Stat;

  8. import com.test.zookeeper.queue.SyncPrimitive;

  9. public class Queue extends SyncPrimitive {

  10.     private int formatLength=10;


  11.     /**
  12.      * Constructor of producer-consumer queue
  13.      *
  14.      * @param address
  15.      * @param name
  16.      */
  17.     Queue(String address, String name) {
  18.         super(address);
  19.         this.root = name;
  20.         // Create ZK node name
  21.         if (zk != null) {
  22.             try {
  23.                 Stat s = zk.exists(root, false);
  24.                 if (s == null) {
  25.                     zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
  26.                 }
  27.             } catch (KeeperException e) {
  28.                 System.out.println("Keeper exception when instantiating queue: "+ e.toString());
  29.             } catch (InterruptedException e) {
  30.                 System.out.println("Interrupted exception");
  31.             }
  32.         }
  33.     }
  34.     
  35.     
  36.     /**
  37.      * Add element to the queue.
  38.      *
  39.      * @param i
  40.      * @return
  41.      */
  42.     boolean produce(int i) throws KeeperException, InterruptedException{
  43.         ByteBuffer b = ByteBuffer.allocate(4);
  44.         byte[] value;

  45.         // Add child with value i
  46.         b.putInt(i);
  47.         value = b.array();
  48.         //创建顺序用永久节点
  49.         zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);

  50.         return true;
  51.     }
  52.     
  53.     
  54.     /**
  55.      * Remove first element from the queue.
  56.      *
  57.      * @return
  58.      * @throws KeeperException
  59.      * @throws InterruptedException
  60.      */
  61.     int consume() throws KeeperException, InterruptedException{
  62.         int retvalue = -1;
  63.         Stat stat = null;

  64.         // Get the first element available
  65.         while (true) {
  66.             synchronized (mutex) {
  67.                 //调用父类 默认的watch
  68.                 List<String> list = zk.getChildren(root, true);
  69.                 if (list.size() == 0) {
  70.                     System.out.println("Going to wait");
  71.                     mutex.wait();
  72.                 } else {
  73.                     //取最小顺序节点
  74.                     Integer min = new Integer(list.get(0).substring(7));
  75.                     for(String s : list){
  76.                         Integer tempValue = new Integer(s.substring(7));
  77.                         if(tempValue < min) min = tempValue;
  78.                     }
  79.                     String newString = String.format("%0"+formatLength+"d", min);
  80.                     System.out.println("Temporary value: " + root + "/element" + newString);
  81.                     byte[] b = zk.getData(root + "/element" + newString, false, stat);
  82.                     //删除最小顺序节点
  83.                     zk.delete(root + "/element" + newString, 0);
  84.                     ByteBuffer buffer = ByteBuffer.wrap(b);
  85.                     retvalue = buffer.getInt();

  86.                     return retvalue;
  87.                 }
  88.             }
  89.         }
  90.         
  91.     }

  92. }

2.生产者调用


点击(此处)折叠或打开

  1. package com.test.zookeeper.queue;

  2. import org.apache.zookeeper.KeeperException;

  3. public class ProduceDemo {
  4.     public static void main(String[] args) {
  5.         Queue queue = new Queue("192.168.140.128:2181", "/Queue");
  6.         for(int i=0; i<=100;i++) {
  7.             try {
  8.                 queue.produce(i);
  9.             } catch (KeeperException e) {
  10.                 System.out.println(e.toString());
  11.             } catch (InterruptedException e) {
  12.                 System.out.println(e.toString());
  13.             }
  14.         }
  15.     }
  16. }

3.消费者调用


点击(此处)折叠或打开

  1. package com.test.zookeeper.queue;

  2. import org.apache.zookeeper.KeeperException;

  3. public class ConsumerDemo {
  4.     public static void main(String[] args) {
  5.         Queue queue = new Queue("192.168.140.128:2181", "/Queue");
  6.         try {
  7.             while(true){
  8.                 int seq = queue.consume();
  9.                 System.out.println(seq);    
  10.             }
  11.         } catch (KeeperException e) {
  12.             System.out.println(e.toString());
  13.         } catch (InterruptedException e) {
  14.             System.out.println(e.toString());
  15.         }
  16.         System.out.println("!!!");
  17.     }
  18. }



阅读(1391) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~