前一篇文章分析了server端主动超时session的情况,接下来看一下client和server网络暂时中断的情况。
1.和server主动关闭连接一样,client抛出EndOfStreamException异常,此时客户端状态还是CONNECTED
2.SendThread处理异常,清理连接,将当前所有请求置为失败,错误码是CONNECTIONLOSS
3.发送Disconnected状态通知
4.选下一个server重连
5.连上之后发送ConnectRequest,sessionid和password是当前session的数据
6.server端处理,分leader和follower,由于此时client端重试比较快,session还没超时,所以leader和follower端session校验成功。如果这个时候session正好超时了,则校验失败,client会抛出sessionExpired异常并退出
7.server端返回成功的ConnectResponse
8.client收到相应,发送SyncConnected状态通知给watcher
9.client发送SetWatches包,重建watch
-
-
if (!disableAutoWatchReset) {
-
-
List dataWatches = zooKeeper.getDataWatches();
-
List existWatches = zooKeeper.getExistWatches();
-
List childWatches = zooKeeper.getChildWatches();
-
if (!dataWatches.isEmpty()
-
|| !existWatches.isEmpty() || !childWatches.isEmpty()) {
-
-
SetWatches sw = new SetWatches(lastZxid,
-
prependChroot(dataWatches),
-
prependChroot(existWatches),
-
prependChroot(childWatches));
-
RequestHeader h = new RequestHeader();
-
h.setType(ZooDefs.OpCode.setWatches);
-
h.setXid(-8);
-
Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
-
outgoingQueue.addFirst(packet);
-
}
-
}
10.server端收到setWatches请求,如果是follower,直接进入FinalRequestProcessor处理,无需proposal
-
case OpCode.setWatches: {
-
lastOp = "SETW";
-
SetWatches setWatches = new SetWatches();
-
-
request.request.rewind();
-
ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
-
long relativeZxid = setWatches.getRelativeZxid();
-
-
zks.getZKDatabase().setWatches(relativeZxid,
-
setWatches.getDataWatches(),
-
setWatches.getExistWatches(),
-
setWatches.getChildWatches(), cnxn);
-
break;
-
}
-
-
public void setWatches(long relativeZxid, List dataWatches,
-
List existWatches, List childWatches,
-
Watcher watcher) {
-
for (String path : dataWatches) {
-
DataNode node = getNode(path);
-
WatchedEvent e = null;
-
if (node == null) {
-
e = new WatchedEvent(EventType.NodeDeleted,
-
KeeperState.SyncConnected, path);
-
} else if (node.stat.getCzxid() > relativeZxid) {
-
e = new WatchedEvent(EventType.NodeCreated,
-
KeeperState.SyncConnected, path);
-
} else if (node.stat.getMzxid() > relativeZxid) {
-
e = new WatchedEvent(EventType.NodeDataChanged,
-
KeeperState.SyncConnected, path);
-
}
-
if (e != null) {
-
watcher.process(e);
-
} else {
-
this.dataWatches.addWatch(path, watcher);
-
}
-
}
-
for (String path : existWatches) {
-
DataNode node = getNode(path);
-
WatchedEvent e = null;
-
if (node == null) {
-
-
} else if (node.stat.getMzxid() > relativeZxid) {
-
e = new WatchedEvent(EventType.NodeDataChanged,
-
KeeperState.SyncConnected, path);
-
} else {
-
e = new WatchedEvent(EventType.NodeCreated,
-
KeeperState.SyncConnected, path);
-
}
-
if (e != null) {
-
watcher.process(e);
-
} else {
-
this.dataWatches.addWatch(path, watcher);
-
}
-
}
-
for (String path : childWatches) {
-
DataNode node = getNode(path);
-
WatchedEvent e = null;
-
if (node == null) {
-
e = new WatchedEvent(EventType.NodeDeleted,
-
KeeperState.SyncConnected, path);
-
} else if (node.stat.getPzxid() > relativeZxid) {
-
e = new WatchedEvent(EventType.NodeChildrenChanged,
-
KeeperState.SyncConnected, path);
-
}
-
if (e != null) {
-
watcher.process(e);
-
} else {
-
this.childWatches.addWatch(path, watcher);
-
}
-
}
-
}
11.如果是leader,则多了一层PrepRequestProcessor的处理,检查session是否还在
再来看看客户端主动超时Session和心跳的情况,SendThread主线程
-
public void run() {
-
clientCnxnSocket.introduce(this,sessionId);
-
clientCnxnSocket.updateNow();
-
clientCnxnSocket.updateLastSendAndHeard();
-
-
int to;
-
long lastPingRwServer = System.currentTimeMillis();
-
while (state.isAlive()) {
-
try {
-
......
-
-
if (state.isConnected()) {
-
......
-
to = readTimeout - clientCnxnSocket.getIdleRecv();
-
} else {
-
to = connectTimeout - clientCnxnSocket.getIdleRecv();
-
}
-
-
if (to <= 0) {
-
throw new SessionTimeoutException(
-
"Client session timed out, have not heard from server in "
-
+ clientCnxnSocket.getIdleRecv() + "ms"
-
+ " for sessionid 0x"
-
+ Long.toHexString(sessionId));
-
}
-
-
if (state.isConnected()) {
-
-
int timeToNextPing = readTimeout / 2
-
- clientCnxnSocket.getIdleSend();
-
-
if (timeToNextPing <= 0) {
-
sendPing();
-
-
clientCnxnSocket.updateLastSend();
-
}
-
-
else {
-
if (timeToNextPing < to) {
-
to = timeToNextPing;
-
}
-
}
-
}
-
-
.....
-
-
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
-
} catch (Throwable e) {
-
....
-
clientCnxnSocket.updateNow();
-
clientCnxnSocket.updateLastSendAndHeard();
-
}
-
}
-
}
-
.....
-
}
心跳包,xid为-2
-
private void sendPing() {
-
lastPingSentNs = System.nanoTime();
-
RequestHeader h = new RequestHeader(-2, OpCode.ping);
-
queuePacket(h, null, null, null, null, null, null, null, null);
-
}
server端处理ping包,如果是follower直接进入FinalRequestProcessor处理
-
case OpCode.ping: {
-
zks.serverStats().updateLatency(request.createTime);
-
-
lastOp = "PING";
-
cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
-
request.createTime, System.currentTimeMillis());
-
-
cnxn.sendResponse(new ReplyHeader(-2,
-
zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");
-
return;
-
}
如果是leader,则多了一层PrepRequestProcessor的处理,检查session是否还在
client收到心跳包响应,啥事不做
-
if (replyHdr.getXid() == -2) {
-
-
if (LOG.isDebugEnabled()) {
-
LOG.debug("Got ping response for sessionid: 0x"
-
+ Long.toHexString(sessionId)
-
+ " after "
-
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
-
+ "ms");
-
}
-
return;
-
}
以上可以看出
1.心跳包只有写空闲时才会发送
2.每次transport的时候都会更新当前时间now
3.lastHeard和lastSend取决于是否有读写请求
4.客户端session超时和连接关闭CONNECTIONLOSS处理是一样的,都会导致重试
阅读(845) | 评论(0) | 转发(0) |