C++,python,热爱算法和机器学习
全部博文(1214)
分类: 大数据
2017-10-07 00:31:00
ZooKeeper has an official API binding for Java and C. The ZooKeeper community provides unofficial API for most of the languages (.NET, python, etc.). Using ZooKeeper API, an application can connect, interact, manipulate data, coordinate, and finally disconnect from a ZooKeeper ensemble.
ZooKeeper API has a rich set of features to get all the functionality of the ZooKeeper ensemble in a simple and safe manner. ZooKeeper API provides both synchronous and asynchronous methods.
ZooKeeper ensemble and ZooKeeper API completely complement each other in every aspect and it benefits the developers in a great way. Let us discuss Java binding in this chapter.
Application interacting with ZooKeeper ensemble is referred as ZooKeeper Client or simply Client.
Znode is the core component of ZooKeeper ensemble and ZooKeeper API provides a small set of methods to manipulate all the details of znode with ZooKeeper ensemble.
A client should follow the steps given below to have a clear and clean interaction with ZooKeeper ensemble.
Connect to the ZooKeeper ensemble. ZooKeeper ensemble assign a Session ID for the client.
Send heartbeats to the server periodically. Otherwise, the ZooKeeper ensemble expires the Session ID and the client needs to reconnect.
Get / Set the znodes as long as a session ID is active.
Disconnect from the ZooKeeper ensemble, once all the tasks are completed. If the client is inactive for a prolonged time, then the ZooKeeper ensemble will automatically disconnect the client.
Let us understand the most important set of ZooKeeper API in this chapter. The central part of the ZooKeeper API is ZooKeeper class. It provides options to connect the ZooKeeper ensemble in its constructor and has the following methods ?
connect ? connect to the ZooKeeper ensemble
create ? create a znode
exists ? check whether a znode exists and its information
getData ? get data from a particular znode
setData ? set data in a particular znode
getChildren ? get all sub-nodes available in a particular znode
delete ? get a particular znode and all its children
close ? close a connection
The ZooKeeper class provides connection functionality through its constructor. The signature of the constructor is as follows ?
ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)
Where,
connectionString ? ZooKeeper ensemble host.
sessionTimeout ? session timeout in milliseconds.
watcher ? an object implementing “Watcher” interface. The ZooKeeper ensemble returns the connection status through the watcher object.
Let us create a new helper class ZooKeeperConnection and add a method connect. The connect method creates a ZooKeeper object, connects to the ZooKeeper ensemble, and then returns the object.
Here CountDownLatch is used to stop (wait) the main process until the client connects with the ZooKeeper ensemble.
The ZooKeeper ensemble replies the connection status through the Watcher callback. The Watcher callback will be called once the client connects with the ZooKeeper ensemble and the Watcher callback calls the countDown method of the CountDownLatch to release the lock, await in the main process.
Here is the complete code to connect with a ZooKeeper ensemble.
// import java classes import java.io.IOException; import java.util.concurrent.CountDownLatch; // import zookeeper classes import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.Stat; public class ZooKeeperConnection { // declare zookeeper instance to access ZooKeeper ensemble private ZooKeeper zoo; final CountDownLatch connectedSignal = new CountDownLatch(1); // Method to connect zookeeper ensemble. public ZooKeeper connect(String host) throws IOException,InterruptedException { zoo = new ZooKeeper(host,5000,new Watcher() { public void process(WatchedEvent we) { if (we.getState() == KeeperState.SyncConnected) { connectedSignal.countDown(); } } }); connectedSignal.await(); return zoo; } // Method to disconnect from zookeeper server public void close() throws InterruptedException { zoo.close(); } }
Save the above code and it will be used in the next section for connecting the ZooKeeper ensemble.
The ZooKeeper class provides create method to create a new znode in the ZooKeeper ensemble. The signature of the create method is as follows ?
create(String path, byte[] data, Listacl, CreateMode createMode)
Where,
path ? Znode path. For example, /myapp1, /myapp2, /myapp1/mydata1, myapp2/mydata1/myanothersubdata
data ? data to store in a specified znode path
acl ? access control list of the node to be created. ZooKeeper API provides a static interface ZooDefs.Ids to get some of basic acl list. For example, ZooDefs.Ids.OPEN_ACL_UNSAFE returns a list of acl for open znodes.
createMode ? the type of node, either ephemeral, sequential, or both. This is an enum.
Let us create a new Java application to check the create functionality of the ZooKeeper API. Create a file ZKCreate.java. In the main method, create an object of type ZooKeeperConnection and call the connect method to connect to the ZooKeeper ensemble.
The connect method will return the ZooKeeper object zk. Now, call the createmethod of zk object with custom path and data.
The complete program code to create a znode is as follows ?
import java.io.IOException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; public class ZKCreate { // create static instance for zookeeper class. private static ZooKeeper zk; // create static instance for ZooKeeperConnection class. private static ZooKeeperConnection conn; // Method to create znode in zookeeper ensemble public static void create(String path, byte[] data) throws KeeperException,InterruptedException { zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } public static void main(String[] args) { // znode path String path = "/MyFirstZnode"; // Assign path to znode // data in byte array byte[] data = "My first zookeeper app”.getBytes(); // Declare data try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); create(path, data); // Create the data to the specified path conn.close(); } catch (Exception e) { System.out.println(e.getMessage()); //Catch error message } } }
Once the application is compiled and executed, a znode with the specified data will be created in the ZooKeeper ensemble. You can check it using the ZooKeeper CLI zkCli.sh.
cd /path/to/zookeeper bin/zkCli.sh >>> get /MyFirstZnode
The ZooKeeper class provides the exists method to check the existence of a znode. It returns the metadata of a znode, if the specified znode exists. The signature of the exists method is as follows ?
exists(String path, boolean watcher)
Where,
path ? Znode path
watcher ? boolean value to specify whether to watch a specified znode or not
Let us create a new Java application to check the “exists” functionality of the ZooKeeper API. Create a file “ZKExists.java”. In the main method, create ZooKeeper object, “zk” using “ZooKeeperConnection” object. Then, call “exists” method of “zk” object with custom “path”. The complete listing is as follow ?
import java.io.IOException; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.data.Stat; public class ZKExists { private static ZooKeeper zk; private static ZooKeeperConnection conn; // Method to check existence of znode and its status, if znode is available. public static Stat znode_exists(String path) throws KeeperException,InterruptedException { return zk.exists(path, true); } public static void main(String[] args) throws InterruptedException,KeeperException { String path = "/MyFirstZnode"; // Assign znode to the specified path try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); Stat stat = znode_exists(path); // Stat checks the path of the znode if(stat != null) { System.out.println("Node exists and the node version is " + stat.getVersion()); } else { System.out.println("Node does not exists"); } } catch(Exception e) { System.out.println(e.getMessage()); // Catches error messages } } }
Once the application is compiled and executed, you will get the below output.
Node exists and the node version is 1.
The ZooKeeper class provides getData method to get the data attached in a specified znode and its status. The signature of the getData method is as follows ?
getData(String path, Watcher watcher, Stat stat)
Where,
path ? Znode path.
watcher ? Callback function of type Watcher. The ZooKeeper ensemble will notify through the Watcher callback when the data of the specified znode changes. This is one-time notification.
stat ? Returns the metadata of a znode.
Let us create a new Java application to understand the getData functionality of the ZooKeeper API. Create a file ZKGetData.java. In the main method, create a ZooKeeper object zk using he ZooKeeperConnection object. Then, call the getData method of zk object with custom path.
Here is the complete program code to get the data from a specified node ?
import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.data.Stat; public class ZKGetData { private static ZooKeeper zk; private static ZooKeeperConnection conn; public static Stat znode_exists(String path) throws KeeperException,InterruptedException { return zk.exists(path,true); } public static void main(String[] args) throws InterruptedException, KeeperException { String path = "/MyFirstZnode"; final CountDownLatch connectedSignal = new CountDownLatch(1); try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); Stat stat = znode_exists(path); if(stat != null) { byte[] b = zk.getData(path, new Watcher() { public void process(WatchedEvent we) { if (we.getType() == Event.EventType.None) { switch(we.getState()) { case Expired: connectedSignal.countDown(); break; } } else { String path = "/MyFirstZnode"; try { byte[] bn = zk.getData(path, false, null); String data = new String(bn, "UTF-8"); System.out.println(data); connectedSignal.countDown(); } catch(Exception ex) { System.out.println(ex.getMessage()); } } } }, null); String data = new String(b, "UTF-8"); System.out.println(data); connectedSignal.await(); } else { System.out.println("Node does not exists"); } } catch(Exception e) { System.out.println(e.getMessage()); } } }
Once the application is compiled and executed, you will get the following output
My first zookeeper app
And the application will wait for further notification from the ZooKeeper ensemble. Change the data of the specified znode using ZooKeeper CLI zkCli.sh.
cd /path/to/zookeeper bin/zkCli.sh >>> set /MyFirstZnode Hello
Now, the application will print the following output and exit.
Hello
The ZooKeeper class provides setData method to modify the data attached in a specified znode. The signature of the setData method is as follows ?
setData(String path, byte[] data, int version)
Where,
path ? Znode path
data ? data to store in a specified znode path.
version ? Current version of the znode. ZooKeeper updates the version number of the znode whenever the data gets changed.
Let us now create a new Java application to understand the setDatafunctionality of the ZooKeeper API. Create a file ZKSetData.java. In the main method, create a ZooKeeper object zk using the ZooKeeperConnectionobject. Then, call the setData method of zk object with the specified path, new data, and version of the node.
Here is the complete program code to modify the data attached in a specified znode.
import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import java.io.IOException; public class ZKSetData { private static ZooKeeper zk; private static ZooKeeperConnection conn; // Method to update the data in a znode. Similar to getData but without watcher. public static void update(String path, byte[] data) throws KeeperException,InterruptedException { zk.setData(path, data, zk.exists(path,true).getVersion()); } public static void main(String[] args) throws InterruptedException,KeeperException { String path= "/MyFirstZnode"; byte[] data = "Success".getBytes(); //Assign data which is to be updated. try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); update(path, data); // Update znode data to the specified path } catch(Exception e) { System.out.println(e.getMessage()); } } }
Once the application is compiled and executed, the data of the specified znode will be changed and it can be checked using the ZooKeeper CLI, zkCli.sh.
cd /path/to/zookeeper bin/zkCli.sh >>> get /MyFirstZnode
The ZooKeeper class provides getChildren method to get all the sub-node of a particular znode. The signature of the getChildren method is as follows ?
getChildren(String path, Watcher watcher)
Where,
path ? Znode path.
watcher ? Callback function of type “Watcher”. The ZooKeeper ensemble will notify when the specified znode gets deleted or a child under the znode gets created / deleted. This is a one-time notification.
import java.io.IOException; import java.util.*; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.data.Stat; public class ZKGetChildren { private static ZooKeeper zk; private static ZooKeeperConnection conn; // Method to check existence of znode and its status, if znode is available. public static Stat znode_exists(String path) throws KeeperException,InterruptedException { return zk.exists(path,true); } public static void main(String[] args) throws InterruptedException,KeeperException { String path = "/MyFirstZnode"; // Assign path to the znode try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); Stat stat = znode_exists(path); // Stat checks the path if(stat!= null) { //“getChildren” method- get all the children of znode.It has two args, path and watch Listchildren = zk.getChildren(path, false); for(int i = 0; i < children.size(); i++) System.out.println(children.get(i)); //Print children's } else { System.out.println("Node does not exists"); } } catch(Exception e) { System.out.println(e.getMessage()); } } }
Before running the program, let us create two sub-nodes for /MyFirstZnodeusing the ZooKeeper CLI, zkCli.sh.
cd /path/to/zookeeper bin/zkCli.sh >>> create /MyFirstZnode/myfirstsubnode Hi >>> create /MyFirstZnode/mysecondsubmode Hi
Now, compiling and running the program will output the above created znodes.
myfirstsubnode mysecondsubnode
The ZooKeeper class provides delete method to delete a specified znode. The signature of the delete method is as follows ?
delete(String path, int version)
Where,
path ? Znode path.
version ? Current version of the znode.
Let us create a new Java application to understand the delete functionality of the ZooKeeper API. Create a file ZKDelete.java. In the main method, create a ZooKeeper object zk using ZooKeeperConnection object. Then, call the delete method of zk object with the specified path and version of the node.
The complete program code to delete a znode is as follows ?
import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException; public class ZKDelete { private static ZooKeeper zk; private static ZooKeeperConnection conn; // Method to check existence of znode and its status, if znode is available. public static void delete(String path) throws KeeperException,InterruptedException { zk.delete(path,zk.exists(path,true).getVersion()); } public static void main(String[] args) throws InterruptedException,KeeperException { String path = "/MyFirstZnode"; //Assign path to the znode try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); delete(path); //delete the node with the specified path } catch(Exception e) { System.out.println(e.getMessage()); // catches error messages } } }