前面一篇文章提到zookeeper server端主动发现session超时并清理session信息,关闭连接,接下来看看client端如何试图恢复session的。关于client端代码分析见前文http://iwinit.iteye.com/blog/1754611 。
由于session被清理,此时server端已经没有session信息了。而由于连接被关闭,client会抛出异常
-
if (sockKey.isReadable()) {
-
-
int rc = sock.read(incomingBuffer);
-
if (rc < 0) {
-
throw new EndOfStreamException(
-
"Unable to read additional data from server sessionid 0x"
-
+ Long.toHexString(sessionId)
-
+ ", likely server has closed socket");
-
}
SendThread处理异常
-
else {
-
-
if (e instanceof SessionExpiredException) {
-
LOG.info(e.getMessage() + ", closing socket connection");
-
} else if (e instanceof SessionTimeoutException) {
-
LOG.info(e.getMessage() + RETRY_CONN_MSG);
-
}
-
-
else if (e instanceof EndOfStreamException) {
-
LOG.info(e.getMessage() + RETRY_CONN_MSG);
-
} else if (e instanceof RWServerFoundException) {
-
LOG.info(e.getMessage());
-
} else {
-
LOG.warn(
-
"Session 0x"
-
+ Long.toHexString(getSessionId())
-
+ " for server "
-
+ clientCnxnSocket.getRemoteSocketAddress()
-
+ ", unexpected error"
-
+ RETRY_CONN_MSG, e);
-
}
-
-
cleanup();
-
-
if (state.isAlive()) {
-
eventThread.queueEvent(new WatchedEvent(
-
Event.EventType.None,
-
Event.KeeperState.Disconnected,
-
null));
-
}
-
clientCnxnSocket.updateNow();
-
clientCnxnSocket.updateLastSendAndHeard();
-
}
接下来client重新寻找下一个server进行session恢复,此时client的sessionId和password仍然是上一次创建的session信息。
-
-
if (!clientCnxnSocket.isConnected()) {
-
-
if(!isFirstConnect){
-
try {
-
Thread.sleep(r.nextInt(1000));
-
} catch (InterruptedException e) {
-
LOG.warn("Unexpected exception", e);
-
}
-
}
-
-
if (closing || !state.isAlive()) {
-
break;
-
}
-
-
startConnect();
-
clientCnxnSocket.updateLastSendAndHeard();
-
}
取下一个server
-
addr = hostProvider.next(1000);
之后就和新建session时类似,区别是发送ConnectRequest时sessionid和password都是老的
-
-
long sessId = (seenRwServerBefore) ? sessionId : 0;
-
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
-
sessionTimeout, sessId, sessionPasswd);
接下来server端处理
-
-
long sessionId = connReq.getSessionId();
-
if (sessionId != 0) {
-
long clientSessionId = connReq.getSessionId();
-
LOG.info("Client attempting to renew session 0x"
-
+ Long.toHexString(clientSessionId)
-
+ " at " + cnxn.getRemoteSocketAddress());
-
-
serverCnxnFactory.closeSession(sessionId);
-
cnxn.setSessionId(sessionId);
-
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
-
} else {
-
LOG.info("Client attempting to establish new session at "
-
+ cnxn.getRemoteSocketAddress());
-
createSession(cnxn, passwd, sessionTimeout);
-
}
重启session
-
public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,
-
int sessionTimeout) throws IOException {
-
-
if (!checkPasswd(sessionId, passwd)) {
-
finishSessionInit(cnxn, false);
-
}
-
-
else {
-
revalidateSession(cnxn, sessionId, sessionTimeout);
-
}
-
}
重试的server如果是leader
-
@Override
-
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
-
int sessionTimeout) throws IOException {
-
类中通过sessionTrack检查
-
super.revalidateSession(cnxn, sessionId, sessionTimeout);
-
try {
-
-
-
setOwner(sessionId, ServerCnxn.me);
-
} catch (SessionExpiredException e) {
-
-
}
-
}
父类revalidateSession方法
-
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
-
int sessionTimeout) throws IOException {
-
boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
-
if (LOG.isTraceEnabled()) {
-
ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
-
"Session 0x" + Long.toHexString(sessionId) +
-
" is valid: " + rc);
-
}
-
finishSessionInit(cnxn, rc);
-
}
leader的SessionTracker为SessionTrackerImpl,touchSession方法如下
-
synchronized public boolean touchSession(long sessionId, int timeout) {
-
if (LOG.isTraceEnabled()) {
-
ZooTrace.logTraceMessage(LOG,
-
ZooTrace.CLIENT_PING_TRACE_MASK,
-
"SessionTrackerImpl --- Touch session: 0x"
-
+ Long.toHexString(sessionId) + " with timeout " + timeout);
-
}
-
-
SessionImpl s = sessionsById.get(sessionId);
-
-
if (s == null || s.isClosing()) {
-
return false;
-
}
-
.....
对于leader来说session检查的结果是false。
如果是follower,其校验方法
-
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
-
int sessionTimeout) throws IOException {
-
-
getLearner().validateSession(cnxn, sessionId, sessionTimeout);
-
}
follower询问session是否有效
-
*/
-
void validateSession(ServerCnxn cnxn, long clientId, int timeout)
-
throws IOException {
-
LOG.info("Revalidating client: 0x" + Long.toHexString(clientId));
-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
DataOutputStream dos = new DataOutputStream(baos);
-
dos.writeLong(clientId);
-
dos.writeInt(timeout);
-
dos.close();
-
EVALIDATE包用来检查session是否还有效
-
QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos
-
.toByteArray(), null);
-
pendingRevalidations.put(clientId, cnxn);
-
if (LOG.isTraceEnabled()) {
-
ZooTrace.logTraceMessage(LOG,
-
ZooTrace.SESSION_TRACE_MASK,
-
"To validate session 0x"
-
+ Long.toHexString(clientId));
-
}
-
writePacket(qp, true);
-
}
leader端处理REVALIDATE包
-
case Leader.REVALIDATE:
-
bis = new ByteArrayInputStream(qp.getData());
-
dis = new DataInputStream(bis);
-
long id = dis.readLong();
-
int to = dis.readInt();
-
ByteArrayOutputStream bos = new ByteArrayOutputStream();
-
DataOutputStream dos = new DataOutputStream(bos);
-
dos.writeLong(id);
-
-
boolean valid = leader.zk.touch(id, to);
-
if (valid) {
-
try {
-
-
-
-
leader.zk.setOwner(id, this);
-
} catch (SessionExpiredException e) {
-
LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);
-
}
-
}
-
if (LOG.isTraceEnabled()) {
-
ZooTrace.logTraceMessage(LOG,
-
ZooTrace.SESSION_TRACE_MASK,
-
"Session 0x" + Long.toHexString(id)
-
+ " is valid: "+ valid);
-
}
-
-
dos.writeBoolean(valid);
-
qp.setData(bos.toByteArray());
-
queuedPackets.add(qp);
-
break;
follower处理返回结果
-
case Leader.REVALIDATE:
-
revalidate(qp);
-
break;
-
protected void revalidate(QuorumPacket qp) throws IOException {
-
ByteArrayInputStream bis = new ByteArrayInputStream(qp
-
.getData());
-
DataInputStream dis = new DataInputStream(bis);
-
long sessionId = dis.readLong();
-
boolean valid = dis.readBoolean();
-
ServerCnxn cnxn = pendingRevalidations
-
.remove(sessionId);
-
if (cnxn == null) {
-
LOG.warn("Missing session 0x"
-
+ Long.toHexString(sessionId)
-
+ " for validation");
-
} else {
-
zk.finishSessionInit(cnxn, valid);
-
}
-
if (LOG.isTraceEnabled()) {
-
ZooTrace.logTraceMessage(LOG,
-
ZooTrace.SESSION_TRACE_MASK,
-
"Session 0x" + Long.toHexString(sessionId)
-
+ " is valid: " + valid);
-
}
-
}
可以看到无论是leader还是follower最后都会调用zk.finishSessionInit(cnxn, valid)处理,而由于session已经失效,所以valid为false
-
public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
-
......
-
-
try {
-
-
ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
-
: 0, valid ? cnxn.getSessionId() : 0,
-
-
valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
-
bos.writeInt(-1, "len");
-
rsp.serialize(bos, "connect");
-
if (!cnxn.isOldClient) {
-
bos.writeBool(
-
this instanceof ReadOnlyZooKeeperServer, "readOnly");
-
}
-
baos.close();
-
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
-
bb.putInt(bb.remaining() - 4).rewind();
-
cnxn.sendBuffer(bb);
-
-
......
-
}
client端处理ConnectResponse
-
void readConnectResult() throws IOException {
-
......
-
-
this.sessionId = conRsp.getSessionId();
-
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
-
conRsp.getPasswd(), isRO);
-
}
-
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
-
byte[] _sessionPasswd, boolean isRO) throws IOException {
-
negotiatedSessionTimeout = _negotiatedSessionTimeout;
-
-
if (negotiatedSessionTimeout <= 0) {
-
-
state = States.CLOSED;
-
-
eventThread.queueEvent(new WatchedEvent(
-
Watcher.Event.EventType.None,
-
Watcher.Event.KeeperState.Expired, null));
-
-
eventThread.queueEventOfDeath();
-
-
throw new SessionExpiredException(
-
"Unable to reconnect to ZooKeeper service, session 0x"
-
+ Long.toHexString(sessionId) + " has expired");
-
}
-
......
-
}
SendThread处理SessionExpiredException,关闭SendThread
-
while (state.isAlive()) {
-
......
-
-
if (e instanceof SessionExpiredException) {
-
LOG.info(e.getMessage() + ", closing socket connection");
-
}
-
......
-
-
cleanup();
-
-
if (state.isAlive()) {
-
eventThread.queueEvent(new WatchedEvent(
-
Event.EventType.None,
-
Event.KeeperState.Disconnected,
-
null));
-
}
-
clientCnxnSocket.updateNow();
-
clientCnxnSocket.updateLastSendAndHeard();
-
}
-
-
cleanup();
-
-
clientCnxnSocket.close();
-
if (state.isAlive()) {
-
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
-
Event.KeeperState.Disconnected, null));
-
}
-
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
-
"SendThread exitedloop.");
清理动作
-
private void cleanup() {
-
-
clientCnxnSocket.cleanup();
-
-
synchronized (pendingQueue) {
-
for (Packet p : pendingQueue) {
-
conLossPacket(p);
-
}
-
pendingQueue.clear();
-
}
-
-
synchronized (outgoingQueue) {
-
for (Packet p : outgoingQueue) {
-
conLossPacket(p);
-
}
-
outgoingQueue.clear();
-
}
-
}
-
private void conLossPacket(Packet p) {
-
if (p.replyHeader == null) {
-
return;
-
}
-
switch (state) {
-
case AUTH_FAILED:
-
p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
-
break;
-
-
case CLOSED:
-
p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
-
break;
-
-
default:
-
p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
-
}
-
finishPacket(p);
-
}
EventThread关闭
-
public void run() {
-
try {
-
isRunning = true;
-
while (true) {
-
Object event = waitingEvents.take();
-
-
if (event == eventOfDeath) {
-
wasKilled = true;
-
} else {
-
processEvent(event);
-
}
-
if (wasKilled)
-
-
synchronized (waitingEvents) {
-
if (waitingEvents.isEmpty()) {
-
isRunning = false;
-
break;
-
}
-
}
-
}
-
} catch (InterruptedException e) {
-
LOG.error("Event thread exiting due to interruption", e);
-
}
-
-
LOG.info("EventThread shut down");
-
}
之后client将不可用,所有请求都将发送的时候都将收到SESSIONEXPIRED异常码,因为queuePacket的时候一个判断
-
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
-
Record response, AsyncCallback cb, String clientPath,
-
String serverPath, Object ctx, WatchRegistration watchRegistration)
-
{
-
Packet packet = null;
-
-
-
-
-
synchronized (outgoingQueue) {
-
packet = new Packet(h, r, request, response, watchRegistration);
-
packet.cb = cb;
-
packet.ctx = ctx;
-
packet.clientPath = clientPath;
-
packet.serverPath = serverPath;
-
-
if (!state.isAlive() || closing) {
-
conLossPacket(packet);
-
} else {
-
-
-
if (h.getType() == OpCode.closeSession) {
-
closing = true;
-
}
-
outgoingQueue.add(packet);
-
}
-
}
-
sendThread.getClientCnxnSocket().wakeupCnxn();
-
return packet;
-
}
从以上分析可知,SESSIONEXPIRED异常码是比较严重的事件,之后这个zookeeper实例不可用了,如果需要恢复,则需要重新创建zookeeper实例。而CONNECTIONLOSS异常码是比较常见的,比如网络暂时中断的时候,这个状态码下zookeeper会自动重连恢复,因为server端还保留着session信息。
阅读(3823) | 评论(0) | 转发(0) |