客户端接口
-
public String create(final String path, byte data[], List<ACL> acl,
-
CreateMode createMode)
-
throws KeeperException, InterruptedException
-
{
-
final String clientPath = path;
-
PathUtils.validatePath(clientPath, createMode.isSequential());
-
-
final String serverPath = prependChroot(clientPath);
-
-
RequestHeader h = new RequestHeader();
-
h.setType(ZooDefs.OpCode.create);
-
-
CreateRequest request = new CreateRequest();
-
-
CreateResponse response = new CreateResponse();
-
request.setData(data);
-
-
request.setFlags(createMode.toFlag());
-
request.setPath(serverPath);
-
if (acl != null && acl.size() == 0) {
-
throw new KeeperException.InvalidACLException();
-
}
-
request.setAcl(acl);
-
-
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
-
-
if (r.getErr() != 0) {
-
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
-
clientPath);
-
}
-
-
if (cnxn.chrootPath == null) {
-
return response.getPath();
-
} else {
-
return response.getPath().substring(cnxn.chrootPath.length());
-
}
-
}
请求提交过程和之前的exists一样,都是通过sendthread写出去,都会进入pengding队列等待server端返回。
server端处理也是一样,变化的只是RequestProcessor的业务逻辑。也就是说transport层是通用的,变化的是上层的业务层。
server端执行处理链,PrepRequestProcessor
-
switch (request.type) {
-
case OpCode.create:
-
-
CreateRequest createRequest = new CreateRequest();
-
-
pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
-
break;
-
......
-
request.zxid = zks.getZxid();
-
nextProcessor.processRequest(request);
具体处理
-
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
-
throws KeeperException, IOException, RequestProcessorException
-
{
-
-
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
-
zks.getTime(), type);
-
-
switch (type) {
-
case OpCode.create:
-
-
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
-
CreateRequest createRequest = (CreateRequest)record;
-
-
if(deserialize)
-
ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
-
String path = createRequest.getPath();
-
-
int lastSlash = path.lastIndexOf('/');
-
if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) {
-
LOG.info("Invalid path " + path + " with session 0x" +
-
Long.toHexString(request.sessionId));
-
throw new KeeperException.BadArgumentsException(path);
-
}
-
-
List<ACL> listACL = removeDuplicates(createRequest.getAcl());
-
if (!fixupACL(request.authInfo, listACL)) {
-
throw new KeeperException.InvalidACLException(path);
-
}
-
String parentPath = path.substring(0, lastSlash);
-
-
ChangeRecord parentRecord = getRecordForPath(parentPath);
-
-
checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE,
-
request.authInfo);
-
int parentCVersion = parentRecord.stat.getCversion();
-
CreateMode createMode =
-
CreateMode.fromFlag(createRequest.getFlags());
-
-
if (createMode.isSequential()) {
-
path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
-
}
-
try {
-
PathUtils.validatePath(path);
-
} catch(IllegalArgumentException ie) {
-
LOG.info("Invalid path " + path + " with session 0x" +
-
Long.toHexString(request.sessionId));
-
throw new KeeperException.BadArgumentsException(path);
-
}
-
-
try {
-
if (getRecordForPath(path) != null) {
-
throw new KeeperException.NodeExistsException(path);
-
}
-
} catch (KeeperException.NoNodeException e) {
-
-
}
-
-
boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0;
-
if (ephemeralParent) {
-
throw new KeeperException.NoChildrenForEphemeralsException(path);
-
}
-
-
int newCversion = parentRecord.stat.getCversion()+1;
-
-
request.txn = new CreateTxn(path, createRequest.getData(),
-
listACL,
-
createMode.isEphemeral(), newCversion);
-
StatPersisted s = new StatPersisted();
-
-
if (createMode.isEphemeral()) {
-
s.setEphemeralOwner(request.sessionId);
-
}
-
parentRecord = parentRecord.duplicate(request.hdr.getZxid());
-
-
parentRecord.childCount++;
-
-
parentRecord.stat.setCversion(newCversion);
-
-
addChangeRecord(parentRecord);
-
addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s,
-
0, listACL));
-
break;
-
......
SyncRequestProcessor处理,主要是log写入,和之前的分析类似,不赘述
FinalRequestProcessor处理,修改内存中的datatree结构
-
switch (header.getType()) {
-
case OpCode.create:
-
CreateTxn createTxn = (CreateTxn) txn;
-
rc.path = createTxn.getPath();
-
createNode(
-
createTxn.getPath(),
-
createTxn.getData(),
-
createTxn.getAcl(),
-
createTxn.getEphemeral() ? header.getClientId() : 0,
-
createTxn.getParentCVersion(),
-
header.getZxid(), header.getTime());
-
break;
public String createNode(String path, byte data[], List<ACL> acl,
-
long ephemeralOwner, int parentCVersion, long zxid, long time)
-
throws KeeperException.NoNodeException,
-
KeeperException.NodeExistsException {
-
-
int lastSlash = path.lastIndexOf('/');
-
String parentName = path.substring(0, lastSlash);
-
String childName = path.substring(lastSlash + 1);
-
StatPersisted stat = new StatPersisted();
-
stat.setCtime(time);
-
stat.setMtime(time);
-
stat.setCzxid(zxid);
-
stat.setMzxid(zxid);
-
stat.setPzxid(zxid);
-
stat.setVersion(0);
-
stat.setAversion(0);
-
stat.setEphemeralOwner(ephemeralOwner);
-
DataNode parent = nodes.get(parentName);
-
if (parent == null) {
-
throw new KeeperException.NoNodeException();
-
}
-
-
synchronized (parent) {
-
Set<String> children = parent.getChildren();
-
if (children != null) {
-
if (children.contains(childName)) {
-
throw new KeeperException.NodeExistsException();
-
}
-
}
-
-
if (parentCVersion == -1) {
-
parentCVersion = parent.stat.getCversion();
-
parentCVersion++;
-
}
-
parent.stat.setCversion(parentCVersion);
-
parent.stat.setPzxid(zxid);
-
Long longval = convertAcls(acl);
-
-
DataNode child = new DataNode(parent, data, longval, stat);
-
parent.addChild(childName);
-
nodes.put(path, child);
-
-
if (ephemeralOwner != 0) {
-
HashSet<String> list = ephemerals.get(ephemeralOwner);
-
if (list == null) {
-
list = new HashSet<String>();
-
ephemerals.put(ephemeralOwner, list);
-
}
-
synchronized (list) {
-
list.add(path);
-
}
-
}
-
}
-
......
-
-
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
-
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
-
Event.EventType.NodeChildrenChanged);
-
return path;
-
}
事件触发过程
-
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
-
WatchedEvent e = new WatchedEvent(type,
-
KeeperState.SyncConnected, path);
-
HashSet<Watcher> watchers;
-
synchronized (this) {
-
-
watchers = watchTable.remove(path);
-
.......
-
for (Watcher w : watchers) {
-
HashSet<String> paths = watch2Paths.get(w);
-
if (paths != null) {
-
paths.remove(path);
-
}
-
}
-
}
-
-
for (Watcher w : watchers) {
-
if (supress != null && supress.contains(w)) {
-
continue;
-
}
-
w.process(e);
-
}
-
return watchers;
-
}
具体处理,NIOServerCncx
-
synchronized public void process(WatchedEvent event) {
-
-
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
-
......
-
-
-
WatcherEvent e = event.getWrapper();
-
-
sendResponse(h, e, "notification");
-
}
具体发送:
-
synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
-
try {
-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
-
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
-
try {
-
-
baos.write(fourBytes);
-
-
bos.writeRecord(h, "header");
-
if (r != null) {
-
bos.writeRecord(r, tag);
-
}
-
baos.close();
-
} catch (IOException e) {
-
LOG.error("Error serializing response");
-
}
-
-
byte b[] = baos.toByteArray();
-
ByteBuffer bb = ByteBuffer.wrap(b);
-
-
bb.putInt(b.length - 4).rewind();
-
-
sendBuffer(bb);
-
if (h.getXid() > 0) {
-
synchronized(this){
-
outstandingRequests--;
-
}
-
-
synchronized (this.factory) {
-
if (zkServer.getInProcess() < outstandingLimit
-
|| outstandingRequests < 1) {
-
sk.selector().wakeup();
-
enableRecv();
-
}
-
}
-
}
-
} catch(Exception e) {
-
LOG.warn("Unexpected exception. Destruction averted.", e);
-
}
-
}
server端就处理完了,接下来client端SendThread收到响应
if (replyHdr.getXid() == -1) {
-
-
if (LOG.isDebugEnabled()) {
-
LOG.debug("Got notification sessionid:0x"
-
+ Long.toHexString(sessionId));
-
}
-
-
WatcherEvent event = new WatcherEvent();
-
event.deserialize(bbia, "response");
-
-
-
-
if (chrootPath != null) {
-
String serverPath = event.getPath();
-
if(serverPath.compareTo(chrootPath)==0)
-
event.setPath("/");
-
else if (serverPath.length() > chrootPath.length())
-
event.setPath(serverPath.substring(chrootPath.length()));
-
else {
-
LOG.warn("Got server path " + event.getPath()
-
+ " which is too short for chroot path "
-
+ chrootPath);
-
}
-
}
-
-
WatchedEvent we = new WatchedEvent(event);
-
if (LOG.isDebugEnabled()) {
-
LOG.debug("Got " + we + " for sessionid 0x"
-
+ Long.toHexString(sessionId));
-
}
-
-
eventThread.queueEvent( we );
-
return;
-
}
eventThread端处理和之前类似
-
case NodeCreated:
-
-
synchronized (dataWatches) {
-
addTo(dataWatches.remove(clientPath), result);
-
}
-
-
synchronized (existWatches) {
-
addTo(existWatches.remove(clientPath), result);
-
}
-
break;
-
if (event instanceof WatcherSetEventPair) {
-
-
WatcherSetEventPair pair = (WatcherSetEventPair) event;
-
-
for (Watcher watcher : pair.watchers) {
-
try {
-
watcher.process(pair.event);
-
} catch (Throwable t) {
-
LOG.error("Error while calling watcher ", t);
-
}
-
}
-
}
以上就完成了create操作的处理,之后的delete,set,get操作都是类似的,大家有兴趣的直接阅读相关代码即可:)
阅读(840) | 评论(0) | 转发(0) |