Chinaunix首页 | 论坛 | 博客
  • 博客访问: 10037
  • 博文数量: 2
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 10
  • 用 户 组: 普通用户
  • 注册时间: 2015-05-21 21:44



分类: Java

2015-05-25 00:32:59


The data stored at each znode in a namespace is read and written atomically. Reads get all the data bytes associated with a znode and a write replaces all the data. Each node has an Access Control List (ACL) that restricts who can do what.

ZooKeeper was not designed to be a general database or large object store. Instead, it manages coordination data. This data can come in the form of configuration, status information, rendezvous, etc. A common property of the various forms of coordination data is that they are relatively small: measured in kilobytes. The ZooKeeper client and the server implementations have sanity checks to ensure that znodes have less than 1M of data, but the data should be much less than that on average. Operating on relatively large data sizes will cause some operations to take much more time than others and will affect the latencies of some operations because of the extra time needed to move more data over the network and onto storage media. If large data storage is needed, the usually pattern of dealing with such data is to store it on a bulk storage system, such as NFS or HDFS, and store pointers to the storage locations in ZooKeeper.


The replicated database is an in-memory database containing the entire data tree. Updates are logged to disk for recoverability, and writes are serialized to disk before they are applied to the in-memory database.

Every ZooKeeper server services clients. Clients connect to exactly one server to submit irequests. Read requests are serviced from the local replica of each server database. Requests that change the state of the service, write requests, are processed by an agreement protocol.

As part of the agreement protocol all write requests from clients are forwarded to a single server, called the leader. The rest of the ZooKeeper servers, called followers, receive message proposals from the leader and agree upon message delivery. The messaging layer takes care of replacing leaders on failures and syncing followers with leaders.

ZooKeeper uses a custom atomic messaging protocol. Since the messaging layer is atomic, ZooKeeper can guarantee that the local replicas never diverge. When the leader receives a write request, it calculates what the state of the system is when the write is to be applied and transforms this into a transaction that captures this new state.



可以看到在这幅图种,zookeeper其实是封装将数据的一致性操作封装在了内部服务中。这里面网上有很多的分析了,比如leader选举,Fast Paxos算法 ,等等,但是关于原理,可以后续研究,我们先来分析一下他得应用场景。




在单机上JDK提供了CyclicBarrier这个类来实现这个机制,但在分布式环境中JDK就无能为力了。在分布式里实现Barrer需要高一致性做保障,因此 ZooKeeper可以派上用场,所采取的方案就是用一个Node作为Barrer的实体,需要被Barrer的任务通过调用exists()检测这个Node的存在,当需要打开Barrier的时候,删掉这个Node,ZooKeeper的watch机制会通知到各个任务可以开始执行。

2) 分布式 Queue

与 Barrier类似 分布式环境中 实现Queue也需要高一致性做保障, ZooKeeper提供了一个种简单的方式,ZooKeeper通过一个Node来维护Queue的实体,用其children来存储Queue的内容,并且 ZooKeeper的create方法中提供了顺序递增的模式,会自动地在name后面加上一个递增的数字来插入新元素。可以用其 children来构建一个queue的数据结构,offer的时候使用create,take的时候按照children的顺序删除第一个即可。 ZooKeeper保障了各个server上数据是一致的,因此也就实现了一个 分布式 Queue。take和offer的实例代码如下所示:


  1. /**
  2.  * Removes the head of the queue and returns it, blocks until it succeeds.
  3.  * @return The former head of the queue
  4.  * @throws NoSuchElementException
  5.  * @throws KeeperException
  6.  * @throws InterruptedException
  7.  */
  8. public byte[] take() throws KeeperException, InterruptedException {
  9.     TreeMap<Long,String> orderedChildren;
  10.     // Same as for element. Should refactor this.
  11.     while(true){
  12.         LatchChildWatcher childWatcher = new LatchChildWatcher();
  13.         try{
  14.             orderedChildren = orderedChildren(childWatcher);
  15.         }catch(KeeperException.NoNodeException e){
  16.             zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
  17.             continue;
  18.         }
  19.         if(orderedChildren.size() == 0){
  20.             childWatcher.await();
  21.             continue;
  22.         }
  23.         for(String headNode : orderedChildren.values()){
  24.             String path = dir +"/"+headNode;
  25.             try{
  26.                 byte[] data = zookeeper.getData(path, false, null);
  27.                 zookeeper.delete(path, -1);
  28.                 return data;
  29.             }catch(KeeperException.NoNodeException e){
  30.                 // Another client deleted the node first.
  31.             }
  32.         }
  33.     }
  34. }
  35. /**
  36.  * Inserts data into queue.
  37.  * @param data
  38.  * @return true if data was successfully added
  39.  */
  40. public boolean offer(byte[] data) throws KeeperException, InterruptedException{
  41.     for(;;){
  42.         try{
  43.             zookeeper.create(dir+"/"+prefix, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);
  44.             return true;
  45.         }catch(KeeperException.NoNodeException e){
  46.             zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
  47.         }
  48.     }
  49. }


利用 ZooKeeper实现 分布式lock,主要是通过一个Node来代表一个Lock,当一个client去拿锁的时候,会在这个Node下创建一个自增序列的child,然后通过getChildren()方式来check创建的child是不是最靠前的,如果是则拿到锁,否则就调用exist()来check第二靠前的child,并加上watch来监视。当拿到锁的child执行完后归还锁,归还锁仅仅需要删除自己创建的child,这时watch机制会通知到所有没有拿到锁的client,这些child就会根据前面所讲的拿锁规则来竞争锁。


阅读(1340) | 评论(0) | 转发(0) |