0.基类
连接服务器 ,设置default watch
-
package com.test.zookeeper.queue;
-
-
import java.io.IOException;
-
-
import org.apache.zookeeper.WatchedEvent;
-
import org.apache.zookeeper.Watcher;
-
import org.apache.zookeeper.ZooKeeper;
-
-
public class SyncPrimitive implements Watcher {
-
static ZooKeeper zk = null;
-
static Integer mutex;
-
static int i=0;
-
String root;
-
-
SyncPrimitive(String address) {
-
if(zk == null){
-
try {
-
System.out.println("Starting ZK:");
-
zk = new ZooKeeper(address, 3000, this);
-
mutex = new Integer(-1);
-
System.out.println("Finished starting ZK: " + zk);
-
} catch (IOException e) {
-
System.out.println(e.toString());
-
zk = null;
-
}
-
}
-
}
-
-
synchronized public void process(WatchedEvent event) {
-
synchronized (mutex) {
-
i++;
-
System.out.println(i);
-
mutex.notify();
-
}
-
}
-
}
1.queue实现类
-
package com.test.zookeeper.queue;
-
-
import java.nio.ByteBuffer;
-
import java.util.List;
-
-
import org.apache.zookeeper.CreateMode;
-
import org.apache.zookeeper.KeeperException;
-
import org.apache.zookeeper.ZooDefs.Ids;
-
import org.apache.zookeeper.data.Stat;
-
-
import com.test.zookeeper.queue.SyncPrimitive;
-
-
public class Queue extends SyncPrimitive {
-
-
private int formatLength=10;
-
-
-
/**
-
* Constructor of producer-consumer queue
-
*
-
* @param address
-
* @param name
-
*/
-
Queue(String address, String name) {
-
super(address);
-
this.root = name;
-
// Create ZK node name
-
if (zk != null) {
-
try {
-
Stat s = zk.exists(root, false);
-
if (s == null) {
-
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
-
}
-
} catch (KeeperException e) {
-
System.out.println("Keeper exception when instantiating queue: "+ e.toString());
-
} catch (InterruptedException e) {
-
System.out.println("Interrupted exception");
-
}
-
}
-
}
-
-
-
/**
-
* Add element to the queue.
-
*
-
* @param i
-
* @return
-
*/
-
boolean produce(int i) throws KeeperException, InterruptedException{
-
ByteBuffer b = ByteBuffer.allocate(4);
-
byte[] value;
-
-
// Add child with value i
-
b.putInt(i);
-
value = b.array();
-
//创建顺序用永久节点
-
zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
-
-
return true;
-
}
-
-
-
/**
-
* Remove first element from the queue.
-
*
-
* @return
-
* @throws KeeperException
-
* @throws InterruptedException
-
*/
-
int consume() throws KeeperException, InterruptedException{
-
int retvalue = -1;
-
Stat stat = null;
-
-
// Get the first element available
-
while (true) {
-
synchronized (mutex) {
-
//调用父类 默认的watch
-
List<String> list = zk.getChildren(root, true);
-
if (list.size() == 0) {
-
System.out.println("Going to wait");
-
mutex.wait();
-
} else {
-
//取最小顺序节点
-
Integer min = new Integer(list.get(0).substring(7));
-
for(String s : list){
-
Integer tempValue = new Integer(s.substring(7));
-
if(tempValue < min) min = tempValue;
-
}
-
String newString = String.format("%0"+formatLength+"d", min);
-
System.out.println("Temporary value: " + root + "/element" + newString);
-
byte[] b = zk.getData(root + "/element" + newString, false, stat);
-
//删除最小顺序节点
-
zk.delete(root + "/element" + newString, 0);
-
ByteBuffer buffer = ByteBuffer.wrap(b);
-
retvalue = buffer.getInt();
-
-
return retvalue;
-
}
-
}
-
}
-
-
}
-
-
}
2.生产者调用
-
package com.test.zookeeper.queue;
-
-
import org.apache.zookeeper.KeeperException;
-
-
public class ProduceDemo {
-
public static void main(String[] args) {
-
Queue queue = new Queue("192.168.140.128:2181", "/Queue");
-
for(int i=0; i<=100;i++) {
-
try {
-
queue.produce(i);
-
} catch (KeeperException e) {
-
System.out.println(e.toString());
-
} catch (InterruptedException e) {
-
System.out.println(e.toString());
-
}
-
}
-
}
-
}
3.消费者调用
-
package com.test.zookeeper.queue;
-
-
import org.apache.zookeeper.KeeperException;
-
-
public class ConsumerDemo {
-
public static void main(String[] args) {
-
Queue queue = new Queue("192.168.140.128:2181", "/Queue");
-
try {
-
while(true){
-
int seq = queue.consume();
-
System.out.println(seq);
-
}
-
} catch (KeeperException e) {
-
System.out.println(e.toString());
-
} catch (InterruptedException e) {
-
System.out.println(e.toString());
-
}
-
System.out.println("!!!");
-
}
-
}
阅读(1389) | 评论(0) | 转发(0) |