前面分析知道session超时由leader负责,假设某个session长时间没心跳超时,SessionTrackImpl入口
-
if (set != null) {
-
for (SessionImpl s : set.sessions) {
-
setSessionClosing(s.sessionId);
-
expirer.expire(s);
-
}
-
}
session失效是一个proposal过程
-
private void close(long sessionId) {
-
submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
-
}
leader段执行处理链,PrepRequestProcessor执行,closeSession的request没有txn内容
-
case OpCode.closeSession:
-
-
-
-
-
-
HashSet<String> es = zks.getZKDatabase()
-
.getEphemerals(request.sessionId);
-
synchronized (zks.outstandingChanges) {
-
for (ChangeRecord c : zks.outstandingChanges) {
-
if (c.stat == null) {
-
-
es.remove(c.path);
-
} else if (c.stat.getEphemeralOwner() == request.sessionId) {
-
es.add(c.path);
-
}
-
}
-
for (String path2Delete : es) {
-
addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
-
path2Delete, null, 0, null));
-
}
-
-
zks.sessionTracker.setSessionClosing(request.sessionId);
-
}
-
-
LOG.info("Processed session termination for sessionid: 0x"
-
+ Long.toHexString(request.sessionId));
-
break;
之后leader通过ProposalRequestProcessor发起投票,并写入log,follower处理投票,写入log并ack。然后leader commit请求,leader和follower都进入FinalRequestProcessor处理。先是datatree处理
-
case OpCode.closeSession:
-
killSession(header.getClientId(), header.getZxid());
-
break;
-
void killSession(long session, long zxid) {
-
-
-
-
-
-
-
-
HashSet<String> list = ephemerals.remove(session);
-
if (list != null) {
-
for (String path : list) {
-
try {
-
deleteNode(path, zxid);
-
if (LOG.isDebugEnabled()) {
-
LOG
-
.debug("Deleting ephemeral node " + path
-
+ " for session 0x"
-
+ Long.toHexString(session));
-
}
-
} catch (NoNodeException e) {
-
LOG.warn("Ignoring NoNodeException for path " + path
-
+ " while removing ephemeral for dead session 0x"
-
+ Long.toHexString(session));
-
}
-
}
-
}
-
}
然后FinalRequestProcessor删除session
-
else if (opCode == OpCode.closeSession) {
-
sessionTracker.removeSession(sessionId);
-
}
然后FinalRequestProcessor关闭连接
-
if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) {
-
ServerCnxnFactory scxn = zks.getServerCnxnFactory();
-
-
-
if (scxn != null && request.cnxn == null) {
-
-
-
-
-
scxn.closeSession(request.sessionId);
-
return;
-
}
-
}
closeSession就是把session从内存数据结构中删除。
遍历连接集合,关闭sessionid对应的那个连接
-
private void closeSessionWithoutWakeup(long sessionId) {
-
HashSet<NIOServerCnxn> cnxns;
-
synchronized (this.cnxns) {
-
cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
-
}
-
-
for (NIOServerCnxn cnxn : cnxns) {
-
if (cnxn.getSessionId() == sessionId) {
-
try {
-
cnxn.close();
-
} catch (Exception e) {
-
LOG.warn("exception during session close", e);
-
}
-
break;
-
}
-
}
-
}
以上就完成了server端session超时的处理
1.leader发起closeSession的Proposal
2.该session对应的server的FinalRequestProcessor中关闭连接
3.server端发起的session超时处理不需要返回client信息,如果是client主动关闭session,则需要返回closeConn的信息,然后再关闭连接
阅读(1104) | 评论(0) | 转发(0) |