Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1959618
  • 博文数量: 1000
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 7921
  • 用 户 组: 普通用户
  • 注册时间: 2013-08-20 09:23
个人简介

storage R&D guy.

文章分类

全部博文(1000)

文章存档

2019年(5)

2017年(47)

2016年(38)

2015年(539)

2014年(193)

2013年(178)

分类: 服务器与存储

2015-07-21 10:41:30

前面一篇文章提到zookeeper server端主动发现session超时并清理session信息,关闭连接,接下来看看client端如何试图恢复session的。关于client端代码分析见前文http://iwinit.iteye.com/blog/1754611 。
由于session被清理,此时server端已经没有session信息了。而由于连接被关闭,client会抛出异常

Java代码  收藏代码
  1. if (sockKey.isReadable()) {  
  2.         //返回-1  
  3.             int rc = sock.read(incomingBuffer);  
  4.             if (rc < 0) {  
  5.                 throw new EndOfStreamException(  
  6.                         "Unable to read additional data from server sessionid 0x"  
  7.                                 + Long.toHexString(sessionId)  
  8.                                 + ", likely server has closed socket");  
  9.             }  

 SendThread处理异常

Java代码  收藏代码
  1. else {  
  2.                        // this is ugly, you have a better way speak up  
  3.                        if (e instanceof SessionExpiredException) {  
  4.                            LOG.info(e.getMessage() + ", closing socket connection");  
  5.                        } else if (e instanceof SessionTimeoutException) {  
  6.                            LOG.info(e.getMessage() + RETRY_CONN_MSG);  
  7.                        }   
  8.         //连接被关闭了              
  9.         else if (e instanceof EndOfStreamException) {  
  10.                            LOG.info(e.getMessage() + RETRY_CONN_MSG);  
  11.                        } else if (e instanceof RWServerFoundException) {  
  12.                            LOG.info(e.getMessage());  
  13.                        } else {  
  14.                            LOG.warn(  
  15.                                    "Session 0x"  
  16.                                            + Long.toHexString(getSessionId())  
  17.                                            + " for server "  
  18.                                            + clientCnxnSocket.getRemoteSocketAddress()  
  19.                                            + ", unexpected error"  
  20.                                            + RETRY_CONN_MSG, e);  
  21.                        }  
  22.         //清理连接,失败当前请求  
  23.                        cleanup();  
  24.         //此时state还是CONNECTED,发送DISCONNECTED状态通知  
  25.                        if (state.isAlive()) {  
  26.                            eventThread.queueEvent(new WatchedEvent(  
  27.                                    Event.EventType.None,  
  28.                                    Event.KeeperState.Disconnected,  
  29.                                    null));  
  30.                        }  
  31.                        clientCnxnSocket.updateNow();  
  32.                        clientCnxnSocket.updateLastSendAndHeard();  
  33.                    }  

 接下来client重新寻找下一个server进行session恢复,此时client的sessionId和password仍然是上一次创建的session信息。

Java代码  收藏代码
  1. //此处返回true,因为连接已经被置为null了  
  2.             if (!clientCnxnSocket.isConnected()) {  
  3.             //重连时,先sleep一下  
  4.                         if(!isFirstConnect){  
  5.                             try {  
  6.                                 Thread.sleep(r.nextInt(1000));  
  7.                             } catch (InterruptedException e) {  
  8.                                 LOG.warn("Unexpected exception", e);  
  9.                             }  
  10.                         }  
  11.                         // don't re-establish connection if we are closing  
  12.                         if (closing || !state.isAlive()) {  
  13.                             break;  
  14.                         }  
  15.             //重新开始连接  
  16.                         startConnect();  
  17.                         clientCnxnSocket.updateLastSendAndHeard();  
  18.                     }  

 取下一个server

Java代码  收藏代码
  1. addr = hostProvider.next(1000);  

 之后就和新建session时类似,区别是发送ConnectRequest时sessionid和password都是老的

Java代码  收藏代码
  1. //新建session成功时,seenRwServerBefore已经被置为true  
  2. long sessId = (seenRwServerBefore) ? sessionId : 0;  
  3.             ConnectRequest conReq = new ConnectRequest(0, lastZxid,  
  4.                     sessionTimeout, sessId, sessionPasswd);  

 接下来server端处理

Java代码  收藏代码
  1. //此时sessionId不为0  
  2.  long sessionId = connReq.getSessionId();  
  3.         if (sessionId != 0) {  
  4.             long clientSessionId = connReq.getSessionId();  
  5.             LOG.info("Client attempting to renew session 0x"  
  6.                     + Long.toHexString(clientSessionId)  
  7.                     + " at " + cnxn.getRemoteSocketAddress());  
  8.         //先关闭老的连接,如果有的话,删除watch  
  9.             serverCnxnFactory.closeSession(sessionId);  
  10.             cnxn.setSessionId(sessionId);  
  11.             reopenSession(cnxn, sessionId, passwd, sessionTimeout);  
  12.         } else {  
  13.             LOG.info("Client attempting to establish new session at "  
  14.                     + cnxn.getRemoteSocketAddress());  
  15.             createSession(cnxn, passwd, sessionTimeout);  
  16.         }  

 重启session

Java代码  收藏代码
  1.    public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,  
  2.            int sessionTimeout) throws IOException {  
  3. //检查密码,如果不一样,则结束session,返回client一个为0的sessionid。如果sessionid为0,则为false  
  4.        if (!checkPasswd(sessionId, passwd)) {  
  5.            finishSessionInit(cnxn, false);  
  6.        }   
  7. //密码正确,再校验下session是否还有效,这里不同的server处理不一样  
  8. else {  
  9.            revalidateSession(cnxn, sessionId, sessionTimeout);  
  10.        }  
  11.    }  

 重试的server如果是leader

Java代码  收藏代码
  1. @Override  
  2. protected void revalidateSession(ServerCnxn cnxn, long sessionId,  
  3.     int sessionTimeout) throws IOException {  
  4. 类中通过sessionTrack检查  
  5.     super.revalidateSession(cnxn, sessionId, sessionTimeout);  
  6.     try {  
  7.         // setowner as the leader itself, unless updated  
  8.         // via the follower handlers  
  9.         setOwner(sessionId, ServerCnxn.me);  
  10.     } catch (SessionExpiredException e) {  
  11.         // this is ok, it just means that the session revalidation failed.  
  12.     }  
  13. }  

 父类revalidateSession方法

Java代码  收藏代码
  1. protected void revalidateSession(ServerCnxn cnxn, long sessionId,  
  2.         int sessionTimeout) throws IOException {  
  3.     boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);  
  4.     if (LOG.isTraceEnabled()) {  
  5.         ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,  
  6.                                  "Session 0x" + Long.toHexString(sessionId) +  
  7.                 " is valid: " + rc);  
  8.     }  
  9.     finishSessionInit(cnxn, rc);  
  10. }  

 leader的SessionTracker为SessionTrackerImpl,touchSession方法如下

Java代码  收藏代码
  1. synchronized public boolean touchSession(long sessionId, int timeout) {  
  2.         if (LOG.isTraceEnabled()) {  
  3.             ZooTrace.logTraceMessage(LOG,  
  4.                                      ZooTrace.CLIENT_PING_TRACE_MASK,  
  5.                                      "SessionTrackerImpl --- Touch session: 0x"  
  6.                     + Long.toHexString(sessionId) + " with timeout " + timeout);  
  7.         }  
  8.     //因为session超时,session已经被删掉了,此处返回null,所以检查结果是false  
  9.         SessionImpl s = sessionsById.get(sessionId);  
  10.         // Return false, if the session doesn't exists or marked as closing  
  11.         if (s == null || s.isClosing()) {  
  12.             return false;  
  13.         }  
  14. .....  

 对于leader来说session检查的结果是false。

如果是follower,其校验方法

Java代码  收藏代码
  1. protected void revalidateSession(ServerCnxn cnxn, long sessionId,  
  2.             int sessionTimeout) throws IOException {  
  3.     //需要询问leader,session是否还有效  
  4.         getLearner().validateSession(cnxn, sessionId, sessionTimeout);  
  5.     }  

 follower询问session是否有效

Java代码  收藏代码
  1.  */  
  2. void validateSession(ServerCnxn cnxn, long clientId, int timeout)  
  3.         throws IOException {  
  4.     LOG.info("Revalidating client: 0x" + Long.toHexString(clientId));  
  5.     ByteArrayOutputStream baos = new ByteArrayOutputStream();  
  6.     DataOutputStream dos = new DataOutputStream(baos);  
  7.     dos.writeLong(clientId);  
  8.     dos.writeInt(timeout);  
  9.     dos.close();  
  10. EVALIDATE包用来检查session是否还有效  
  11.     QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos  
  12.             .toByteArray(), null);  
  13.     pendingRevalidations.put(clientId, cnxn);  
  14.     if (LOG.isTraceEnabled()) {  
  15.         ZooTrace.logTraceMessage(LOG,  
  16.                                  ZooTrace.SESSION_TRACE_MASK,  
  17.                                  "To validate session 0x"  
  18.                                  + Long.toHexString(clientId));  
  19.     }  
  20.     writePacket(qp, true);  
  21. }   

 leader端处理REVALIDATE包

Java代码  收藏代码
  1. case Leader.REVALIDATE:  
  2.                     bis = new ByteArrayInputStream(qp.getData());  
  3.                     dis = new DataInputStream(bis);  
  4.                     long id = dis.readLong();  
  5.                     int to = dis.readInt();  
  6.                     ByteArrayOutputStream bos = new ByteArrayOutputStream();  
  7.                     DataOutputStream dos = new DataOutputStream(bos);  
  8.                     dos.writeLong(id);  
  9.             //这里由于session已经被删掉,返回false  
  10.                     boolean valid = leader.zk.touch(id, to);  
  11.                     if (valid) {  
  12.                         try {  
  13.                             //set the session owner  
  14.                             // as the follower that  
  15.                             // owns the session  
  16.                             leader.zk.setOwner(id, this);  
  17.                         } catch (SessionExpiredException e) {  
  18.                             LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);  
  19.                         }  
  20.                     }  
  21.                     if (LOG.isTraceEnabled()) {  
  22.                         ZooTrace.logTraceMessage(LOG,  
  23.                                                  ZooTrace.SESSION_TRACE_MASK,  
  24.                                                  "Session 0x" + Long.toHexString(id)  
  25.                                                  + " is valid: "+ valid);  
  26.                     }  
  27.             //结果是false  
  28.                     dos.writeBoolean(valid);  
  29.                     qp.setData(bos.toByteArray());  
  30.                     queuedPackets.add(qp);  
  31.                     break;  

 follower处理返回结果

Java代码  收藏代码
  1. case Leader.REVALIDATE:  
  2.             revalidate(qp);  
  3.             break;  
  4. protected void revalidate(QuorumPacket qp) throws IOException {  
  5.         ByteArrayInputStream bis = new ByteArrayInputStream(qp  
  6.                 .getData());  
  7.         DataInputStream dis = new DataInputStream(bis);  
  8.         long sessionId = dis.readLong();  
  9.         boolean valid = dis.readBoolean();  
  10.         ServerCnxn cnxn = pendingRevalidations  
  11.         .remove(sessionId);  
  12.         if (cnxn == null) {  
  13.             LOG.warn("Missing session 0x"  
  14.                     + Long.toHexString(sessionId)  
  15.                     + " for validation");  
  16.         } else {  
  17.             zk.finishSessionInit(cnxn, valid);  
  18.         }  
  19.         if (LOG.isTraceEnabled()) {  
  20.             ZooTrace.logTraceMessage(LOG,  
  21.                     ZooTrace.SESSION_TRACE_MASK,  
  22.                     "Session 0x" + Long.toHexString(sessionId)  
  23.                     + " is valid: " + valid);  
  24.         }  
  25.     }  

 可以看到无论是leader还是follower最后都会调用zk.finishSessionInit(cnxn, valid)处理,而由于session已经失效,所以valid为false

Java代码  收藏代码
  1. public void finishSessionInit(ServerCnxn cnxn, boolean valid) {  
  2.         ......  
  3.   
  4.         try {  
  5.         //由于valid是false,所以返回给client的sessionid为0,password为空  
  6.             ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()  
  7.                     : 0, valid ? cnxn.getSessionId() : 0// send 0 if session is no  
  8.                             // longer valid  
  9.                             valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);  
  10.             ByteArrayOutputStream baos = new ByteArrayOutputStream();  
  11.             BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);  
  12.             bos.writeInt(-1"len");  
  13.             rsp.serialize(bos, "connect");  
  14.             if (!cnxn.isOldClient) {  
  15.                 bos.writeBool(  
  16.                         this instanceof ReadOnlyZooKeeperServer, "readOnly");  
  17.             }  
  18.             baos.close();  
  19.             ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());  
  20.             bb.putInt(bb.remaining() - 4).rewind();  
  21.             cnxn.sendBuffer(bb);      
  22.   
  23.             ......  
  24.     }  

 client端处理ConnectResponse

Java代码  收藏代码
  1. void readConnectResult() throws IOException {  
  2.         ......  
  3.     //被server重置为0了  
  4.         this.sessionId = conRsp.getSessionId();  
  5.         sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,  
  6.                 conRsp.getPasswd(), isRO);  
  7.     }  

 

Java代码  收藏代码
  1. void onConnected(int _negotiatedSessionTimeout, long _sessionId,  
  2.                 byte[] _sessionPasswd, boolean isRO) throws IOException {  
  3.             negotiatedSessionTimeout = _negotiatedSessionTimeout;  
  4.         //sessionid为0,抛出SessionExpiredException异常  
  5.             if (negotiatedSessionTimeout <= 0) {  
  6.         //state设为CLOSED,这个行为将关闭SendThread  
  7.                 state = States.CLOSED;  
  8.         //Expired状态通知  
  9.                 eventThread.queueEvent(new WatchedEvent(  
  10.                         Watcher.Event.EventType.None,  
  11.                         Watcher.Event.KeeperState.Expired, null));  
  12.         //这个行为将关闭EventThread  
  13.                 eventThread.queueEventOfDeath();  
  14.         //抛出异常  
  15.                 throw new SessionExpiredException(  
  16.                         "Unable to reconnect to ZooKeeper service, session 0x"  
  17.                                 + Long.toHexString(sessionId) + " has expired");  
  18.             }  
  19.            ......  
  20.         }  

 SendThread处理SessionExpiredException,关闭SendThread

Java代码  收藏代码
  1. while (state.isAlive()) {  
  2.         ......  
  3.         // this is ugly, you have a better way speak up  
  4.                        if (e instanceof SessionExpiredException) {  
  5.                            LOG.info(e.getMessage() + ", closing socket connection");  
  6.                        }   
  7.         ......  
  8.         //关闭连接,失败所有请求  
  9.                        cleanup();  
  10.         //此时state已经被置为CLOSED,SendThread将退出  
  11.                        if (state.isAlive()) {  
  12.                            eventThread.queueEvent(new WatchedEvent(  
  13.                                    Event.EventType.None,  
  14.                                    Event.KeeperState.Disconnected,  
  15.                                    null));  
  16.                        }  
  17.                        clientCnxnSocket.updateNow();  
  18.                        clientCnxnSocket.updateLastSendAndHeard();  
  19.     }  
  20.     //清理  
  21.     cleanup();  
  22.     //关闭selector  
  23.             clientCnxnSocket.close();  
  24.         if (state.isAlive()) {  
  25.             eventThread.queueEvent(new WatchedEvent(Event.EventType.None,  
  26.                     Event.KeeperState.Disconnected, null));  
  27.         }  
  28.         ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),  
  29.                                  "SendThread exitedloop.");  

 清理动作

Java代码  收藏代码
  1. private void cleanup() {  
  2.         //关闭socket  
  3.             clientCnxnSocket.cleanup();  
  4.         //发送完等待响应的请求失败,此时由于state是CLOSED,所以异常信息是SESSIONEXPIRED  
  5.             synchronized (pendingQueue) {  
  6.                 for (Packet p : pendingQueue) {  
  7.                     conLossPacket(p);  
  8.                 }  
  9.                 pendingQueue.clear();  
  10.             }  
  11.         //等待发送的请求失败,此时由于state是CLOSED,所以异常信息是SESSIONEXPIRED  
  12.             synchronized (outgoingQueue) {  
  13.                 for (Packet p : outgoingQueue) {  
  14.                     conLossPacket(p);  
  15.                 }  
  16.                 outgoingQueue.clear();  
  17.             }  
  18.         }  

 

Java代码  收藏代码
  1. private void conLossPacket(Packet p) {  
  2.         if (p.replyHeader == null) {  
  3.             return;  
  4.         }  
  5.         switch (state) {  
  6.         case AUTH_FAILED:  
  7.             p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());  
  8.             break;  
  9.     //关闭的时候,是SESSIONEXPIRED异常码  
  10.         case CLOSED:  
  11.             p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());  
  12.             break;  
  13.     //其他是CONNECTIONLOSS异常码  
  14.         default:  
  15.             p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());  
  16.         }  
  17.         finishPacket(p);  
  18.     }  

 EventThread关闭

Java代码  收藏代码
  1. public void run() {  
  2.            try {  
  3.               isRunning = true;  
  4.               while (true) {  
  5.                  Object event = waitingEvents.take();  
  6.         //kill signal  
  7.                  if (event == eventOfDeath) {  
  8.                     wasKilled = true;  
  9.                  } else {  
  10.                     processEvent(event);  
  11.                  }  
  12.                  if (wasKilled)  
  13.             //等所有通知都发完再退出  
  14.                     synchronized (waitingEvents) {  
  15.                        if (waitingEvents.isEmpty()) {  
  16.                           isRunning = false;  
  17.                           break;  
  18.                        }  
  19.                     }  
  20.               }  
  21.            } catch (InterruptedException e) {  
  22.               LOG.error("Event thread exiting due to interruption", e);  
  23.            }  
  24.   
  25.             LOG.info("EventThread shut down");  
  26.         }  

 之后client将不可用,所有请求都将发送的时候都将收到SESSIONEXPIRED异常码,因为queuePacket的时候一个判断

Java代码  收藏代码
  1. Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,  
  2.             Record response, AsyncCallback cb, String clientPath,  
  3.             String serverPath, Object ctx, WatchRegistration watchRegistration)  
  4.     {  
  5.         Packet packet = null;  
  6.   
  7.         // Note that we do not generate the Xid for the packet yet. It is  
  8.         // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),  
  9.         // where the packet is actually sent.  
  10.         synchronized (outgoingQueue) {  
  11.             packet = new Packet(h, r, request, response, watchRegistration);  
  12.             packet.cb = cb;  
  13.             packet.ctx = ctx;  
  14.             packet.clientPath = clientPath;  
  15.             packet.serverPath = serverPath;  
  16.         //此时状态为CLOSED  
  17.             if (!state.isAlive() || closing) {  
  18.                 conLossPacket(packet);  
  19.             } else {  
  20.                 // If the client is asking to close the session then  
  21.                 // mark as closing  
  22.                 if (h.getType() == OpCode.closeSession) {  
  23.                     closing = true;  
  24.                 }  
  25.                 outgoingQueue.add(packet);  
  26.             }  
  27.         }  
  28.         sendThread.getClientCnxnSocket().wakeupCnxn();  
  29.         return packet;  
  30.     }  

 从以上分析可知,SESSIONEXPIRED异常码是比较严重的事件,之后这个zookeeper实例不可用了,如果需要恢复,则需要重新创建zookeeper实例。而CONNECTIONLOSS异常码是比较常见的,比如网络暂时中断的时候,这个状态码下zookeeper会自动重连恢复,因为server端还保留着session信息。

阅读(3942) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~