Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1945806
  • 博文数量: 1000
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 7921
  • 用 户 组: 普通用户
  • 注册时间: 2013-08-20 09:23
个人简介

storage R&D guy.

文章分类

全部博文(1000)

文章存档

2019年(5)

2017年(47)

2016年(38)

2015年(539)

2014年(193)

2013年(178)

分类: 服务器与存储

2015-10-21 16:07:28

前一篇文章分析了server端主动超时session的情况,接下来看一下client和server网络暂时中断的情况。

1.和server主动关闭连接一样,client抛出EndOfStreamException异常,此时客户端状态还是CONNECTED

2.SendThread处理异常,清理连接,将当前所有请求置为失败,错误码是CONNECTIONLOSS

3.发送Disconnected状态通知

4.选下一个server重连

5.连上之后发送ConnectRequest,sessionid和password是当前session的数据

6.server端处理,分leader和follower,由于此时client端重试比较快,session还没超时,所以leader和follower端session校验成功。如果这个时候session正好超时了,则校验失败,client会抛出sessionExpired异常并退出

7.server端返回成功的ConnectResponse

8.client收到相应,发送SyncConnected状态通知给watcher

9.client发送SetWatches包,重建watch

Java代码  收藏代码
  1. //可以通过配置禁止重建watch  
  2. if (!disableAutoWatchReset) {  
  3.     //当前的所有watch  
  4.                   List dataWatches = zooKeeper.getDataWatches();  
  5.                   List existWatches = zooKeeper.getExistWatches();  
  6.                   List childWatches = zooKeeper.getChildWatches();  
  7.                   if (!dataWatches.isEmpty()  
  8.                               || !existWatches.isEmpty() || !childWatches.isEmpty()) {  
  9.         //发送重建请求  
  10.                       SetWatches sw = new SetWatches(lastZxid,  
  11.                               prependChroot(dataWatches),  
  12.                               prependChroot(existWatches),  
  13.                               prependChroot(childWatches));  
  14.                       RequestHeader h = new RequestHeader();  
  15.                       h.setType(ZooDefs.OpCode.setWatches);  
  16.                       h.setXid(-8);  
  17.                       Packet packet = new Packet(h, new ReplyHeader(), sw, nullnull);  
  18.                       outgoingQueue.addFirst(packet);  
  19.                   }  
  20.               }  

 10.server端收到setWatches请求,如果是follower,直接进入FinalRequestProcessor处理,无需proposal

Java代码  收藏代码
  1. case OpCode.setWatches: {  
  2.                 lastOp = "SETW";  
  3.                 SetWatches setWatches = new SetWatches();  
  4.                 // XXX We really should NOT need this!!!!  
  5.                 request.request.rewind();  
  6.                 ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);  
  7.                 long relativeZxid = setWatches.getRelativeZxid();  
  8.         //添加watch  
  9.                 zks.getZKDatabase().setWatches(relativeZxid,   
  10.                         setWatches.getDataWatches(),   
  11.                         setWatches.getExistWatches(),  
  12.                         setWatches.getChildWatches(), cnxn);  
  13.                 break;  
  14.             }  

 

Java代码  收藏代码
  1. //添加watch的时候判断watch是否需要触发  
  2. public void setWatches(long relativeZxid, List dataWatches,  
  3.             List existWatches, List childWatches,  
  4.             Watcher watcher) {  
  5.         for (String path : dataWatches) {  
  6.             DataNode node = getNode(path);  
  7.             WatchedEvent e = null;  
  8.             if (node == null) {  
  9.                 e = new WatchedEvent(EventType.NodeDeleted,  
  10.                         KeeperState.SyncConnected, path);  
  11.             } else if (node.stat.getCzxid() > relativeZxid) {  
  12.                 e = new WatchedEvent(EventType.NodeCreated,  
  13.                         KeeperState.SyncConnected, path);  
  14.             } else if (node.stat.getMzxid() > relativeZxid) {  
  15.                 e = new WatchedEvent(EventType.NodeDataChanged,  
  16.                         KeeperState.SyncConnected, path);  
  17.             }  
  18.             if (e != null) {  
  19.                 watcher.process(e);  
  20.             } else {  
  21.                 this.dataWatches.addWatch(path, watcher);  
  22.             }  
  23.         }  
  24.         for (String path : existWatches) {  
  25.             DataNode node = getNode(path);  
  26.             WatchedEvent e = null;  
  27.             if (node == null) {  
  28.                 // This is the case when the watch was registered  
  29.             } else if (node.stat.getMzxid() > relativeZxid) {  
  30.                 e = new WatchedEvent(EventType.NodeDataChanged,  
  31.                         KeeperState.SyncConnected, path);  
  32.             } else {  
  33.                 e = new WatchedEvent(EventType.NodeCreated,  
  34.                         KeeperState.SyncConnected, path);  
  35.             }  
  36.             if (e != null) {  
  37.                 watcher.process(e);  
  38.             } else {  
  39.                 this.dataWatches.addWatch(path, watcher);  
  40.             }  
  41.         }  
  42.         for (String path : childWatches) {  
  43.             DataNode node = getNode(path);  
  44.             WatchedEvent e = null;  
  45.             if (node == null) {  
  46.                 e = new WatchedEvent(EventType.NodeDeleted,  
  47.                         KeeperState.SyncConnected, path);  
  48.             } else if (node.stat.getPzxid() > relativeZxid) {  
  49.                 e = new WatchedEvent(EventType.NodeChildrenChanged,  
  50.                         KeeperState.SyncConnected, path);  
  51.             }  
  52.             if (e != null) {  
  53.                 watcher.process(e);  
  54.             } else {  
  55.                 this.childWatches.addWatch(path, watcher);  
  56.             }  
  57.         }  
  58.     }  

 11.如果是leader,则多了一层PrepRequestProcessor的处理,检查session是否还在

 

再来看看客户端主动超时Session和心跳的情况,SendThread主线程

Java代码  收藏代码
  1. public void run() {  
  2.             clientCnxnSocket.introduce(this,sessionId);  
  3.             clientCnxnSocket.updateNow();  
  4.             clientCnxnSocket.updateLastSendAndHeard();  
  5.         //selector的select超时时间,每次循环都会重新计算  
  6.             int to;  
  7.             long lastPingRwServer = System.currentTimeMillis();  
  8.             while (state.isAlive()) {  
  9.                 try {  
  10.                     ......  
  11.             //session建立之后,to为读超时减去读空闲时间  
  12.                     if (state.isConnected()) {  
  13.                         ......  
  14.                         to = readTimeout - clientCnxnSocket.getIdleRecv();  
  15.                     } else {  
  16.                         to = connectTimeout - clientCnxnSocket.getIdleRecv();  
  17.                     }  
  18.                     //如果client长时间没收到server的packet,会导致读空闲时间很长,超过读超时,直接抛出异常  
  19.                     if (to <= 0) {  
  20.                         throw new SessionTimeoutException(  
  21.                                 "Client session timed out, have not heard from server in "  
  22.                                         + clientCnxnSocket.getIdleRecv() + "ms"  
  23.                                         + " for sessionid 0x"  
  24.                                         + Long.toHexString(sessionId));  
  25.                     }  
  26.             //session建立之后,发送心跳  
  27.                     if (state.isConnected()) {  
  28.             //如果写频繁,则写空闲时间很少,不用发送心跳  
  29.                         int timeToNextPing = readTimeout / 2  
  30.                                 - clientCnxnSocket.getIdleSend();  
  31.             //写少,发心跳  
  32.                         if (timeToNextPing <= 0) {  
  33.                             sendPing();  
  34.                 //上次发送时间  
  35.                             clientCnxnSocket.updateLastSend();  
  36.                         }   
  37.             //写繁忙,不用发送心跳  
  38.             else {  
  39.                             if (timeToNextPing < to) {  
  40.                                 to = timeToNextPing;  
  41.                             }  
  42.                         }  
  43.                     }  
  44.   
  45.             .....  
  46.             //每次doTransport都会更新now,lastHeard和lastSend则取决于是否有读写请求  
  47.                     clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);  
  48.                 } catch (Throwable e) {  
  49.                    ....  
  50.                         clientCnxnSocket.updateNow();  
  51.                         clientCnxnSocket.updateLastSendAndHeard();  
  52.                     }  
  53.                 }  
  54.             }  
  55.            .....  
  56.         }  

 心跳包,xid为-2

Java代码  收藏代码
  1. private void sendPing() {  
  2.     lastPingSentNs = System.nanoTime();  
  3.     RequestHeader h = new RequestHeader(-2, OpCode.ping);  
  4.     queuePacket(h, nullnullnullnullnullnullnullnull);  
  5. }  

server端处理ping包,如果是follower直接进入FinalRequestProcessor处理

Java代码  收藏代码
  1. case OpCode.ping: {  
  2.                 zks.serverStats().updateLatency(request.createTime);  
  3.   
  4.                 lastOp = "PING";  
  5.                 cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,  
  6.                         request.createTime, System.currentTimeMillis());  
  7.         //心跳包的响应xid也是-2  
  8.                 cnxn.sendResponse(new ReplyHeader(-2,  
  9.                         zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null"response");  
  10.                 return;  
  11.             }  

 如果是leader,则多了一层PrepRequestProcessor的处理,检查session是否还在

client收到心跳包响应,啥事不做

Java代码  收藏代码
  1. if (replyHdr.getXid() == -2) {  
  2.                 // -2 is the xid for pings  
  3.                 if (LOG.isDebugEnabled()) {  
  4.                     LOG.debug("Got ping response for sessionid: 0x"  
  5.                             + Long.toHexString(sessionId)  
  6.                             + " after "  
  7.                             + ((System.nanoTime() - lastPingSentNs) / 1000000)  
  8.                             + "ms");  
  9.                 }  
  10.                 return;  
  11.             }  

 

 以上可以看出

1.心跳包只有写空闲时才会发送

2.每次transport的时候都会更新当前时间now

3.lastHeard和lastSend取决于是否有读写请求

4.客户端session超时和连接关闭CONNECTIONLOSS处理是一样的,都会导致重试

阅读(845) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~