前一篇介绍了zookeeper的client和server之间session是如何建立的。在DataMonitor的例子中,DataMonitor通过exists异步接口和server端交互,本文将介绍exists操作是如何完成。
dataMonitor开始exist操作
-
public void exists(final String path, Watcher watcher,
-
StatCallback cb, Object ctx)
-
{
-
......
-
-
RequestHeader h = new RequestHeader();
-
h.setType(ZooDefs.OpCode.exists);
-
-
ExistsRequest request = new ExistsRequest();
-
request.setPath(serverPath);
-
request.setWatch(watcher != null);
-
SetDataResponse response = new SetDataResponse();
-
-
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
-
clientPath, serverPath, ctx, wcb);
-
}
添加过程
-
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
-
Record response, AsyncCallback cb, String clientPath,
-
String serverPath, Object ctx, WatchRegistration watchRegistration)
-
{
-
Packet packet = null;
-
-
-
-
-
synchronized (outgoingQueue) {
-
-
packet = new Packet(h, r, request, response, watchRegistration);
-
packet.cb = cb;
-
packet.ctx = ctx;
-
packet.clientPath = clientPath;
-
packet.serverPath = serverPath;
-
if (!state.isAlive() || closing) {
-
conLossPacket(packet);
-
} else {
-
-
-
if (h.getType() == OpCode.closeSession) {
-
closing = true;
-
}
-
-
outgoingQueue.add(packet);
-
}
-
}
-
-
sendThread.getClientCnxnSocket().wakeupCnxn();
-
return packet;
-
}
接下来还是回到SendThread的发送过程,之前Session建立时已经分析过,这里有一点要注意下:
-
-
if ((p.requestHeader != null) &&
-
(p.requestHeader.getType() != OpCode.ping) &&
-
(p.requestHeader.getType() != OpCode.auth)) {
-
p.requestHeader.setXid(cnxn.getXid());
-
}
接下来Server端IO线程拿到请求,处理,过程和之前session建立时一样,就不赘述了。变化的是后续的处理链。
PrepRequestProcessor预处理
-
-
/读请求,不需要创建事务,只是检查了下session是否还在,此时事务头和事务体都是null
-
case OpCode.sync:
-
case OpCode.exists:
-
case OpCode.getData:
-
case OpCode.getACL:
-
case OpCode.getChildren:
-
case OpCode.getChildren2:
-
case OpCode.ping:
-
case OpCode.setWatches:
-
zks.sessionTracker.checkSession(request.sessionId,
-
request.getOwner());
-
break;
SyncRequestProcessor处理逻辑之前已经分析过了,这里就挑重点说一下
-
-
zks.getZKDatabase().append(si)
接下来FinalRequestProcessor处理,由于不是事务型请求,省了很多步骤,直接进入switch处理:
-
case OpCode.exists: {
-
lastOp = "EXIS";
-
-
ExistsRequest existsRequest = new ExistsRequest();
-
-
ByteBufferInputStream.byteBuffer2Record(request.request,
-
existsRequest);
-
String path = existsRequest.getPath();
-
if (path.indexOf('\0') != -1) {
-
throw new KeeperException.BadArgumentsException();
-
}
-
-
Stat stat = zks.getZKDatabase().statNode(path, existsRequest
-
.getWatch() ? cnxn : null);
-
-
rsp = new ExistsResponse(stat);
-
break;
-
}
-
......
-
-
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
-
-
ReplyHeader hdr =
-
new ReplyHeader(request.cxid, lastZxid, err.intValue());
-
-
zks.serverStats().updateLatency(request.createTime);
-
cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
-
request.createTime, System.currentTimeMillis());
-
-
try {
-
-
cnxn.sendResponse(hdr, rsp, "response");
statNode过程
-
-
public Stat statNode(String path, Watcher watcher)
-
throws KeeperException.NoNodeException {
-
Stat stat = new Stat();
-
DataNode n = nodes.get(path);
-
if (watcher != null) {
-
-
dataWatches.addWatch(path, watcher);
-
}
-
if (n == null) {
-
throw new KeeperException.NoNodeException();
-
}
-
synchronized (n) {
-
n.copyStat(stat);
-
return stat;
-
}
-
}
好了,以上server端就完成了ExistsRequest的处理了。接下来client端SendThread收到ExistsResponse进行处理
-
if (sockKey.isReadable()) {
-
int rc = sock.read(incomingBuffer);
-
......
-
else if (!initialized) {
-
readConnectResult();
-
enableRead();
-
if (findSendablePacket(outgoingQueue,
-
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
-
-
-
enableWrite();
-
}
-
lenBuffer.clear();
-
incomingBuffer = lenBuffer;
-
updateLastHeard();
-
initialized = true;
-
}
-
-
else {
-
sendThread.readResponse(incomingBuffer);
-
lenBuffer.clear();
-
incomingBuffer = lenBuffer;
-
updateLastHeard();
-
}
-
}
-
}
具体读取:
-
void readResponse(ByteBuffer incomingBuffer) throws IOException {
-
ByteBufferInputStream bbis = new ByteBufferInputStream(
-
incomingBuffer);
-
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
-
ReplyHeader replyHdr = new ReplyHeader();
-
-
replyHdr.deserialize(bbia, "header");
-
......
-
-
Packet packet;
-
synchronized (pendingQueue) {
-
if (pendingQueue.size() == 0) {
-
throw new IOException("Nothing in the queue, but got "
-
+ replyHdr.getXid());
-
}
-
packet = pendingQueue.remove();
-
}
-
-
-
-
-
try {
-
-
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
-
packet.replyHeader.setErr(
-
KeeperException.Code.CONNECTIONLOSS.intValue());
-
throw new IOException("Xid out of order. Got Xid "
-
+ replyHdr.getXid() + " with err " +
-
+ replyHdr.getErr() +
-
" expected Xid "
-
+ packet.requestHeader.getXid()
-
+ " for a packet with details: "
-
+ packet );
-
}
-
-
packet.replyHeader.setXid(replyHdr.getXid());
-
packet.replyHeader.setErr(replyHdr.getErr());
-
packet.replyHeader.setZxid(replyHdr.getZxid());
-
-
if (replyHdr.getZxid() > 0) {
-
lastZxid = replyHdr.getZxid();
-
}
-
-
if (packet.response != null && replyHdr.getErr() == 0) {
-
packet.response.deserialize(bbia, "response");
-
}
-
-
if (LOG.isDebugEnabled()) {
-
LOG.debug("Reading reply sessionid:0x"
-
+ Long.toHexString(sessionId) + ", packet:: " + packet);
-
}
-
} finally {
-
-
finishPacket(packet);
-
}
-
}
finishPacket过程
-
private void finishPacket(Packet p) {
-
-
if (p.watchRegistration != null) {
-
p.watchRegistration.register(p.replyHeader.getErr());
-
}
-
-
if (p.cb == null) {
-
synchronized (p) {
-
p.finished = true;
-
p.notifyAll();
-
}
-
}
-
-
else {
-
p.finished = true;
-
eventThread.queuePacket(p);
-
}
-
}
EventThread端
-
else if (p.response instanceof ExistsResponse
-
|| p.response instanceof SetDataResponse
-
|| p.response instanceof SetACLResponse) {
-
StatCallback cb = (StatCallback) p.cb;
-
-
if (rc == 0) {
-
if (p.response instanceof ExistsResponse) {
-
cb.processResult(rc, clientPath, p.ctx,
-
((ExistsResponse) p.response)
-
.getStat());
-
} else if (p.response instanceof SetDataResponse) {
-
cb.processResult(rc, clientPath, p.ctx,
-
((SetDataResponse) p.response)
-
.getStat());
-
} else if (p.response instanceof SetACLResponse) {
-
cb.processResult(rc, clientPath, p.ctx,
-
((SetACLResponse) p.response)
-
.getStat());
-
}
-
}
-
-
else {
-
cb.processResult(rc, clientPath, p.ctx, null);
-
}
-
}
在DataMonitor例子中,它本身就是一个StatCallback
-
public void processResult(int rc, String path, Object ctx, Stat stat) {
-
boolean exists;
-
switch (rc) {
-
case Code.Ok:
-
exists = true;
-
break;
-
case Code.NoNode:
-
exists = false;
-
break;
-
case Code.SessionExpired:
-
case Code.NoAuth:
-
dead = true;
-
listener.closing(rc);
-
return;
-
default:
-
-
zk.exists(znode, true, this, null);
-
return;
-
}
Exists过程大致就是上面描述的,主要注意点:
1.客户端Request发送完之后会进入Pending队列,等待响应之后拿出来继续处理
2.同步接口是使用Packet.wait()实现的
3.server端exists操作不是事务型的操作,不会写入log
4.server端的watcher就是一个客户端连接ServerCxcn,代表一个客户端,notify的时候直接往连接里写数据即可
阅读(1090) | 评论(0) | 转发(0) |