Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1944036
  • 博文数量: 1000
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 7921
  • 用 户 组: 普通用户
  • 注册时间: 2013-08-20 09:23
个人简介

storage R&D guy.

文章分类

全部博文(1000)

文章存档

2019年(5)

2017年(47)

2016年(38)

2015年(539)

2014年(193)

2013年(178)

分类: 服务器与存储

2015-07-21 10:44:54

前面几篇zookeeper的文章简单分析了执行流程,接下来打算从横向来分析一下zk的一些特性,先从session开始。
这一篇http://iwinit.iteye.com/blog/1754611分析了单机情况下session建立,在集群环境下建立session不太一样,是一个proposal的过程,先假设集群由leader,followerA,followerB组成,我们的client去连followerA。follower和leader初始化之后,初始化的sessionTracker不一样,leader中是SessionTrackerImpl,follower中是LearnerSessionTracker,主要区别和类同点:
1.follower中不会启动超时检查线程,只是简单得记录了session信息,主要数据结构是

 

Java代码  收藏代码
  1. //session更新信息  
  2. HashMap<Long, Integer> touchTable = new HashMap<Long, Integer>();  
  3. //session超时信息  
  4. private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;  

 而leader会启动超时线程,而且数据结构也多一些

Java代码  收藏代码
  1. //session实体  
  2.  HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>();  
  3. //同一个超时时间点的session,给超时线程用  
  4. HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>();  
  5. //session超时信息  
  6.  ConcurrentHashMap<Long, Integer> sessionsWithTimeout;  

 2.addSession时,leader会创建session实体,follower只是简单的记录了一下session信息

 

3.sessionId初始化算法一样

 

Java代码  收藏代码
  1. //1字节server_id+当前时间的后5个字节+2字节0,保证全局唯一  
  2.  public static long initializeNextSession(long id) {  
  3.         long nextSid = 0;  
  4.         nextSid = (System.currentTimeMillis() << 24) >> 8;  
  5.         nextSid =  nextSid | (id <<56);  
  6.         return nextSid;  
  7.     }  

 连下来client开始创建session

 

1.客户端发送ConnectionRequest给followerA
2.followerA处理

                   session超时时间协商

       

Java代码  收藏代码
  1. //处理session时间,minSessionTimeout为ticktime2倍,maxSessionTimeout为ticktime20倍  
  2.     int sessionTimeout = connReq.getTimeOut();  
  3.         byte passwd[] = connReq.getPasswd();  
  4.         int minSessionTimeout = getMinSessionTimeout();  
  5.         if (sessionTimeout < minSessionTimeout) {  
  6.             sessionTimeout = minSessionTimeout;  
  7.         }  
  8.         int maxSessionTimeout = getMaxSessionTimeout();  
  9.         if (sessionTimeout > maxSessionTimeout) {  
  10.             sessionTimeout = maxSessionTimeout;  
  11.         }  
  12.         cnxn.setSessionTimeout(sessionTimeout);  
  13.         // We don't want to receive any packets until we are sure that the  
  14.         // session is setup  
  15.         cnxn.disableRecv();  
  16.     //客户端发送的sessionid,重试时不为0  
  17.         long sessionId = connReq.getSessionId();  
  18.     //客户端重试,则reopen,后文分析  
  19.         if (sessionId != 0) {  
  20.             long clientSessionId = connReq.getSessionId();  
  21.             LOG.info("Client attempting to renew session 0x"  
  22.                     + Long.toHexString(clientSessionId)  
  23.                     + " at " + cnxn.getRemoteSocketAddress());  
  24.             serverCnxnFactory.closeSession(sessionId);  
  25.             cnxn.setSessionId(sessionId);  
  26.             reopenSession(cnxn, sessionId, passwd, sessionTimeout);  
  27.         }   
  28.     //创建session  
  29.     else {  
  30.             LOG.info("Client attempting to establish new session at "  
  31.                     + cnxn.getRemoteSocketAddress());  
  32.             createSession(cnxn, passwd, sessionTimeout);  
  33.         }  

             sessionid和密码初始化

Java代码  收藏代码
  1. long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {  
  2.         //sessionid递增  
  3.         long sessionId = sessionTracker.createSession(timeout);  
  4.         //随机密码  
  5.         Random r = new Random(sessionId ^ superSecret);  
  6.         r.nextBytes(passwd);  
  7.         //4个字节的超时时间  
  8.         ByteBuffer to = ByteBuffer.allocate(4);  
  9.         to.putInt(timeout);  
  10.         cnxn.setSessionId(sessionId);  
  11.         //异步提交执行链  
  12.         submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);  
  13.         return sessionId;  
  14.         }  

                   提交请求

Java代码  收藏代码
  1.   private void submitRequest(ServerCnxn cnxn, long sessionId, int type,  
  2.            int xid, ByteBuffer bb, List<Id> authInfo) {  
  3. //初始化时,xid为0,bb为4个字节的session超时时间  
  4.        Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);  
  5.        submitRequest(si);  
  6.    }  

              根据之前http://iwinit.iteye.com/blog/1777109分析的Processor链图,FollowerRequestProcessor执行。和create请求一样,createSession是事务请求需要投票,FollowerA发送投票packet给leader。

3.leader处理,learnerHandler中收到request请求,提交leader的processor链

                    PrepRequestProcessor处理

Java代码  收藏代码
  1. //头信息  
  2.      request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,  
  3.                                     zks.getTime(), type);  
  4.     ....  
  5.     case OpCode.createSession:  
  6.                 request.request.rewind();  
  7.                 int to = request.request.getInt();  
  8.         //txn信息,就是一个session超时  
  9.                 request.txn = new CreateSessionTxn(to);  
  10.                 request.request.rewind();  
  11.         //leader中创建session  
  12.                 zks.sessionTracker.addSession(request.sessionId, to);  
  13.         //owner属于followerA  
  14.                 zks.setOwner(request.sessionId, request.getOwner());  
  15.                 break;  

             之后ProposalRequestProcessor发起投票,并写入log

4.followerA和followerB处理投票

Java代码  收藏代码
  1. public void logRequest(TxnHeader hdr, Record txn) {  
  2.         Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),  
  3.                 hdr.getType(), nullnull);  
  4.         request.hdr = hdr;  
  5.         request.txn = txn;  
  6.         request.zxid = hdr.getZxid();  
  7.     //添加到pending队列  
  8.         if ((request.zxid & 0xffffffffL) != 0) {  
  9.             pendingTxns.add(request);  
  10.         }  
  11.     /写入log,并发送ack包给leader  
  12.         syncProcessor.processRequest(request);  
  13.     }  

 5.leader收到投票,发起commit,然后自己commit

6.leader的FinalRequestProcessor处理,添加session,不需要返回client数据

Java代码  收藏代码
  1. public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {  
  2.         ProcessTxnResult rc;  
  3.         int opCode = hdr.getType();  
  4.         long sessionId = hdr.getClientId();  
  5.     //createSession不需要修改db状态,啥都不做  
  6.         rc = getZKDatabase().processTxn(hdr, txn);  
  7.     //又添加了一次session  
  8.         if (opCode == OpCode.createSession) {  
  9.             if (txn instanceof CreateSessionTxn) {  
  10.                 CreateSessionTxn cst = (CreateSessionTxn) txn;  
  11.                 sessionTracker.addSession(sessionId, cst  
  12.                         .getTimeOut());  
  13.             } else {  
  14.                 LOG.warn("*****>>>>> Got "  
  15.                         + txn.getClass() + " "  
  16.                         + txn.toString());  
  17.             }  
  18.         } else if (opCode == OpCode.closeSession) {  
  19.             sessionTracker.removeSession(sessionId);  
  20.         }  
  21.         return rc;  
  22.     }  

 7.followerA和followerB处理commit,区别在于CommitRequestProcessor中,followerA中的Request会带上connection信息而followerB中的reqesut没有connection信息。所以在FinalRequestProcessor中,followerB创立完session就返回了,而followerA还需要写回client响应

最后看下leader的sessionTracker超时机制,构造SessionTrackerImpl

Java代码  收藏代码
  1. public SessionTrackerImpl(SessionExpirer expirer,  
  2.            ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime,  
  3.            long sid)  
  4.    {  
  5.        super("SessionTracker");  
  6.        this.expirer = expirer;  
  7.        this.expirationInterval = tickTime;  
  8.        this.sessionsWithTimeout = sessionsWithTimeout;  
  9. //下一个检查点,向上取整为expirationInterval倍数,expirationInterval就是配置的ticktime  
  10.        nextExpirationTime = roundToInterval(System.currentTimeMillis());  
  11.        this.nextSessionId = initializeNextSession(sid);  
  12.        for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {  
  13.            addSession(e.getKey(), e.getValue());  
  14.        }  
  15.    }  

 主线程循环

Java代码  收藏代码
  1.   synchronized public void run() {  
  2.       try {  
  3.           while (running) {  
  4. //下一个超时点没到,就等待  
  5.               currentTime = System.currentTimeMillis();  
  6.               if (nextExpirationTime > currentTime) {  
  7.                   this.wait(nextExpirationTime - currentTime);  
  8.                   continue;  
  9.               }  
  10. //同一个时间点超时的session  
  11.               SessionSet set;  
  12.               set = sessionSets.remove(nextExpirationTime);  
  13. //超时session处理  
  14.               if (set != null) {  
  15.                   for (SessionImpl s : set.sessions) {  
  16.                       setSessionClosing(s.sessionId);  
  17.                       expirer.expire(s);  
  18.                   }  
  19.               }  
  20. //下一个check point  
  21.               nextExpirationTime += expirationInterval;  
  22.           }  
  23.       } catch (InterruptedException e) {  
  24.           LOG.error("Unexpected interruption", e);  
  25.       }  
  26.       LOG.info("SessionTrackerImpl exited loop!");  
  27.   }  

 session更新时

Java代码  收藏代码
  1.    synchronized public boolean touchSession(long sessionId, int timeout) {  
  2.        ......  
  3. //session对象  
  4.        SessionImpl s = sessionsById.get(sessionId);  
  5.        // Return false, if the session doesn't exists or marked as closing  
  6.        if (s == null || s.isClosing()) {  
  7.            return false;  
  8.        }  
  9. //这个session的下一个超时点,向上取整为ticktime倍数  
  10.        long expireTime = roundToInterval(System.currentTimeMillis() + timeout);  
  11. //时间点比老的时间还小,不更新  
  12.        if (s.tickTime >= expireTime) {  
  13.            // Nothing needs to be done  
  14.            return true;  
  15.        }  
  16. //先从老的超时set中remove掉,再添加到新的set中,超时线程会定时check  
  17.        SessionSet set = sessionSets.get(s.tickTime);  
  18.        if (set != null) {  
  19.            set.sessions.remove(s);  
  20.        }  
  21. //下个超时点  
  22.        s.tickTime = expireTime;  
  23. //添加到新的超时set中  
  24.        set = sessionSets.get(s.tickTime);  
  25.        if (set == null) {  
  26.            set = new SessionSet();  
  27.            sessionSets.put(expireTime, set);  
  28.        }  
  29.        set.sessions.add(s);  
  30.        return true;  
  31.    }  

 简单小节
1.session创建需要投票处理
2.结果是每台server上的内存中都会建立相同的session记录
3.sessionid通过serverid+时间保证唯一
4.session超时检查由leader负责,以ticktime定时检查
5.session更新时,会修改自己这个session所属的超时set,超时时间是ticktime倍数

阅读(865) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~