Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1959507
  • 博文数量: 1000
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 7921
  • 用 户 组: 普通用户
  • 注册时间: 2013-08-20 09:23
个人简介

storage R&D guy.

文章分类

全部博文(1000)

文章存档

2019年(5)

2017年(47)

2016年(38)

2015年(539)

2014年(193)

2013年(178)

分类: 服务器与存储

2015-07-21 10:44:00

前面分析知道session超时由leader负责,假设某个session长时间没心跳超时,SessionTrackImpl入口

Java代码  收藏代码
  1. if (set != null) {  
  2.                    for (SessionImpl s : set.sessions) {  
  3.                        setSessionClosing(s.sessionId);  
  4.                        expirer.expire(s);  
  5.                    }  
  6.                }  

 session失效是一个proposal过程

Java代码  收藏代码
  1. private void close(long sessionId) {  
  2.     submitRequest(null, sessionId, OpCode.closeSession, 0nullnull);  
  3. }  

 leader段执行处理链,PrepRequestProcessor执行,closeSession的request没有txn内容

Java代码  收藏代码
  1. case OpCode.closeSession:  
  2.                 // We don't want to do this check since the session expiration thread  
  3.                 // queues up this operation without being the session owner.  
  4.                 // this request is the last of the session so it should be ok  
  5.                 //zks.sessionTracker.checkSession(request.sessionId, request.getOwner());  
  6.         //session下的所有临时节点,需要删除,添加到outstandingChanges队列  
  7.                 HashSet<String> es = zks.getZKDatabase()  
  8.                         .getEphemerals(request.sessionId);  
  9.                 synchronized (zks.outstandingChanges) {  
  10.                     for (ChangeRecord c : zks.outstandingChanges) {  
  11.                         if (c.stat == null) {  
  12.                             // Doing a delete  
  13.                             es.remove(c.path);  
  14.                         } else if (c.stat.getEphemeralOwner() == request.sessionId) {  
  15.                             es.add(c.path);  
  16.                         }  
  17.                     }  
  18.                     for (String path2Delete : es) {  
  19.                         addChangeRecord(new ChangeRecord(request.hdr.getZxid(),  
  20.                                 path2Delete, null0null));  
  21.                     }  
  22.             //设置状态为closing  
  23.                     zks.sessionTracker.setSessionClosing(request.sessionId);  
  24.                 }  
  25.   
  26.                 LOG.info("Processed session termination for sessionid: 0x"  
  27.                         + Long.toHexString(request.sessionId));  
  28.                 break;  

 之后leader通过ProposalRequestProcessor发起投票,并写入log,follower处理投票,写入log并ack。然后leader commit请求,leader和follower都进入FinalRequestProcessor处理。先是datatree处理

Java代码  收藏代码
  1. case OpCode.closeSession:  
  2.                    killSession(header.getClientId(), header.getZxid());  
  3.                    break;  

 

Java代码  收藏代码
  1. void killSession(long session, long zxid) {  
  2.         // the list is already removed from the ephemerals  
  3.         // so we do not have to worry about synchronizing on  
  4.         // the list. This is only called from FinalRequestProcessor  
  5.         // so there is no need for synchronization. The list is not  
  6.         // changed here. Only create and delete change the list which  
  7.         // are again called from FinalRequestProcessor in session下所有临时节点删除.  
  8.     //sequence  
  9.         HashSet<String> list = ephemerals.remove(session);  
  10.         if (list != null) {  
  11.             for (String path : list) {  
  12.                 try {  
  13.                     deleteNode(path, zxid);  
  14.                     if (LOG.isDebugEnabled()) {  
  15.                         LOG  
  16.                                 .debug("Deleting ephemeral node " + path  
  17.                                         + " for session 0x"  
  18.                                         + Long.toHexString(session));  
  19.                     }  
  20.                 } catch (NoNodeException e) {  
  21.                     LOG.warn("Ignoring NoNodeException for path " + path  
  22.                             + " while removing ephemeral for dead session 0x"  
  23.                             + Long.toHexString(session));  
  24.                 }  
  25.             }  
  26.         }  
  27.     }  

 然后FinalRequestProcessor删除session

Java代码  收藏代码
  1. else if (opCode == OpCode.closeSession) {  
  2.             sessionTracker.removeSession(sessionId);  
  3.         }  

 然后FinalRequestProcessor关闭连接

Java代码  收藏代码
  1. if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) {  
  2.             ServerCnxnFactory scxn = zks.getServerCnxnFactory();  
  3.             // this might be possible since  
  4.             // we might just be playing diffs from the leader  
  5.             if (scxn != null && request.cnxn == null) {  
  6.                 // calling this if we have the cnxn results in the client's  
  7.                 // close session response being lost - we've already closed  
  8.                 // the session/socket here before we can send the closeSession  
  9.                 // in the switch block below  
  10.                 scxn.closeSession(request.sessionId);  
  11.                 return;  
  12.             }  
  13.         }  

 closeSession就是把session从内存数据结构中删除。

遍历连接集合,关闭sessionid对应的那个连接

Java代码  收藏代码
  1. private void closeSessionWithoutWakeup(long sessionId) {  
  2.      HashSet<NIOServerCnxn> cnxns;  
  3.      synchronized (this.cnxns) {  
  4.          cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();  
  5.      }  
  6.   
  7.      for (NIOServerCnxn cnxn : cnxns) {  
  8.          if (cnxn.getSessionId() == sessionId) {  
  9.              try {  
  10.                  cnxn.close();  
  11.              } catch (Exception e) {  
  12.                  LOG.warn("exception during session close", e);  
  13.              }  
  14.              break;  
  15.          }  
  16.      }  
  17.  }  

 以上就完成了server端session超时的处理
1.leader发起closeSession的Proposal
2.该session对应的server的FinalRequestProcessor中关闭连接
3.server端发起的session超时处理不需要返回client信息,如果是client主动关闭session,则需要返回closeConn的信息,然后再关闭连接

阅读(1104) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~