Chinaunix首页 | 论坛 | 博客
  • 博客访问: 914866
  • 博文数量: 91
  • 博客积分: 803
  • 博客等级: 准尉
  • 技术积分: 1051
  • 用 户 组: 普通用户
  • 注册时间: 2012-05-24 13:42
文章分类

全部博文(91)

文章存档

2021年(1)

2020年(4)

2019年(4)

2018年(9)

2017年(11)

2016年(11)

2015年(6)

2014年(3)

2013年(28)

2012年(14)

分类: 大数据

2018-05-10 17:18:02

0.基类
连接服务器 ,设置default watch

点击(此处)折叠或打开

  1. package com.test.zookeeper.barrier;

  2. import java.io.IOException;

  3. import org.apache.zookeeper.WatchedEvent;
  4. import org.apache.zookeeper.Watcher;
  5. import org.apache.zookeeper.ZooKeeper;

  6. public class SyncPrimitive implements Watcher {
  7.     static ZooKeeper zk = null;
  8.     static Integer mutex;
  9.     static int i=0;
  10.     String root;

  11.     SyncPrimitive(String address) {
  12.         if(zk == null){
  13.             try {
  14.                 System.out.println("Starting ZK:");
  15.                 zk = new ZooKeeper(address, 3000, this);
  16.                 mutex = new Integer(-1);
  17.                 System.out.println("Finished starting ZK: " + zk);
  18.             } catch (IOException e) {
  19.                 System.out.println(e.toString());
  20.                 zk = null;
  21.             }
  22.         }
  23.     }

  24.     synchronized public void process(WatchedEvent event) {
  25.         synchronized (mutex) {
  26.             i++;
  27.             System.out.println(i);
  28.             mutex.notify();
  29.         }
  30.     }
  31. }

1.barrier 实现类


点击(此处)折叠或打开

  1. package com.test.zookeeper.barrier;

  2. import java.net.InetAddress;
  3. import java.net.UnknownHostException;
  4. import java.util.List;

  5. import org.apache.zookeeper.CreateMode;
  6. import org.apache.zookeeper.KeeperException;
  7. import org.apache.zookeeper.ZooDefs.Ids;
  8. import org.apache.zookeeper.data.Stat;

  9. public class Barrier extends SyncPrimitive {

  10.     private int size=0;
  11.     private String root=null;
  12.     private String name;

  13.     Barrier(String address, String root, int size) {
  14.         super(address);
  15.         this.root = root;
  16.         this.size = size;
  17.         if (zk != null) {
  18.            try {
  19.                 Stat s = zk.exists(root, false);
  20.                 if (s == null) {
  21.                     zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
  22.                 }
  23.             } catch (KeeperException e) {
  24.                 System.out.println("Keeper exception when instantiating queue: " + e.toString());
  25.             } catch (InterruptedException e) {
  26.                 System.out.println("Interrupted exception");
  27.             }
  28.         }
  29.         
  30.         try {
  31.             name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
  32.         } catch (UnknownHostException e) {
  33.             System.out.println(e.toString());
  34.         }

  35.     }
  36.     /**
  37.      * Join barrier
  38.      *
  39.      * @return
  40.      * @throws KeeperException
  41.      * @throws InterruptedException
  42.      */

  43.     boolean enter() throws KeeperException, InterruptedException{
  44.         zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
  45.         while (true) {
  46.             synchronized (mutex) {
  47.                 //此处 第二个参数设置为true 调用父类中默认的watch
  48.                 List<String> list = zk.getChildren(root, true);

  49.                 if (list.size() < size) {
  50.                     //如果进入栅栏的节点数小于设置的size值 则阻塞等待
  51.                     mutex.wait();
  52.                 } else {
  53.                     return true;
  54.                 }
  55.             }
  56.         }
  57.     }
  58.     
  59.     /**
  60.      * Wait until all reach barrier
  61.      *
  62.      * @return
  63.      * @throws KeeperException
  64.      * @throws InterruptedException
  65.      */

  66.     boolean leave() throws KeeperException, InterruptedException{
  67.         zk.delete(root + "/" + name, 0);
  68.         while (true) {
  69.             synchronized (mutex) {
  70.                 List<String> list = zk.getChildren(root, true);
  71.                     if (list.size() > 0) {
  72.                         mutex.wait();
  73.                     } else {
  74.                         return true;
  75.                     }
  76.                 }
  77.             }
  78.     }
  79. }

2.测试调用代码

点击(此处)折叠或打开

  1. package com.test.zookeeper.barrier;

  2. import java.util.Random;

  3. import org.apache.zookeeper.KeeperException;

  4. public class BarrierDemo {
  5.     public static void main(String[] args) {
  6.         //设置栅栏通过数量为3
  7.         barrierTest(new String[]{"","192.168.140.128:2181","3"});
  8.     }
  9.     
  10.     public static void barrierTest(String args[]) {
  11.         Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
  12.         try{
  13.             boolean flag = b.enter();
  14.             System.out.println("Entered barrier: " + args[2]);
  15.             if(!flag) System.out.println("Error when entering the barrier");
  16.         } catch (KeeperException e){

  17.         } catch (InterruptedException e){

  18.         }

  19.         // Generate random integer
  20.         Random rand = new Random();
  21.         int r = rand.nextInt(100);
  22.         // Loop for rand iterations
  23.         for (int i = 0; i < r; i++) {
  24.             try {
  25.                 Thread.sleep(100);
  26.             } catch (InterruptedException e) {

  27.             }
  28.         }
  29.         try{
  30.             b.leave();
  31.         } catch (KeeperException e){

  32.         } catch (InterruptedException e){

  33.         }
  34.         System.out.println("Left barrier");
  35.     }
  36. }


阅读(1277) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~