前面几篇zookeeper的文章简单分析了执行流程,接下来打算从横向来分析一下zk的一些特性,先从session开始。
这一篇http://iwinit.iteye.com/blog/1754611分析了单机情况下session建立,在集群环境下建立session不太一样,是一个proposal的过程,先假设集群由leader,followerA,followerB组成,我们的client去连followerA。follower和leader初始化之后,初始化的sessionTracker不一样,leader中是SessionTrackerImpl,follower中是LearnerSessionTracker,主要区别和类同点:
1.follower中不会启动超时检查线程,只是简单得记录了session信息,主要数据结构是
-
-
HashMap<Long, Integer> touchTable = new HashMap<Long, Integer>();
-
-
private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
而leader会启动超时线程,而且数据结构也多一些
-
-
HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>();
-
-
HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>();
-
-
ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
2.addSession时,leader会创建session实体,follower只是简单的记录了一下session信息
3.sessionId初始化算法一样
-
-
public static long initializeNextSession(long id) {
-
long nextSid = 0;
-
nextSid = (System.currentTimeMillis() << 24) >> 8;
-
nextSid = nextSid | (id <<56);
-
return nextSid;
-
}
连下来client开始创建session
1.客户端发送ConnectionRequest给followerA
2.followerA处理
session超时时间协商
-
-
int sessionTimeout = connReq.getTimeOut();
-
byte passwd[] = connReq.getPasswd();
-
int minSessionTimeout = getMinSessionTimeout();
-
if (sessionTimeout < minSessionTimeout) {
-
sessionTimeout = minSessionTimeout;
-
}
-
int maxSessionTimeout = getMaxSessionTimeout();
-
if (sessionTimeout > maxSessionTimeout) {
-
sessionTimeout = maxSessionTimeout;
-
}
-
cnxn.setSessionTimeout(sessionTimeout);
-
-
-
cnxn.disableRecv();
-
-
long sessionId = connReq.getSessionId();
-
-
if (sessionId != 0) {
-
long clientSessionId = connReq.getSessionId();
-
LOG.info("Client attempting to renew session 0x"
-
+ Long.toHexString(clientSessionId)
-
+ " at " + cnxn.getRemoteSocketAddress());
-
serverCnxnFactory.closeSession(sessionId);
-
cnxn.setSessionId(sessionId);
-
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
-
}
-
-
else {
-
LOG.info("Client attempting to establish new session at "
-
+ cnxn.getRemoteSocketAddress());
-
createSession(cnxn, passwd, sessionTimeout);
-
}
sessionid和密码初始化
-
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
-
-
long sessionId = sessionTracker.createSession(timeout);
-
-
Random r = new Random(sessionId ^ superSecret);
-
r.nextBytes(passwd);
-
-
ByteBuffer to = ByteBuffer.allocate(4);
-
to.putInt(timeout);
-
cnxn.setSessionId(sessionId);
-
-
submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
-
return sessionId;
-
}
提交请求
-
private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
-
int xid, ByteBuffer bb, List<Id> authInfo) {
-
-
Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
-
submitRequest(si);
-
}
根据之前http://iwinit.iteye.com/blog/1777109分析的Processor链图,FollowerRequestProcessor执行。和create请求一样,createSession是事务请求需要投票,FollowerA发送投票packet给leader。
3.leader处理,learnerHandler中收到request请求,提交leader的processor链
PrepRequestProcessor处理
-
-
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
-
zks.getTime(), type);
-
....
-
case OpCode.createSession:
-
request.request.rewind();
-
int to = request.request.getInt();
-
-
request.txn = new CreateSessionTxn(to);
-
request.request.rewind();
-
-
zks.sessionTracker.addSession(request.sessionId, to);
-
-
zks.setOwner(request.sessionId, request.getOwner());
-
break;
之后ProposalRequestProcessor发起投票,并写入log
4.followerA和followerB处理投票
-
public void logRequest(TxnHeader hdr, Record txn) {
-
Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),
-
hdr.getType(), null, null);
-
request.hdr = hdr;
-
request.txn = txn;
-
request.zxid = hdr.getZxid();
-
-
if ((request.zxid & 0xffffffffL) != 0) {
-
pendingTxns.add(request);
-
}
-
/写入log,并发送ack包给leader
-
syncProcessor.processRequest(request);
-
}
5.leader收到投票,发起commit,然后自己commit
6.leader的FinalRequestProcessor处理,添加session,不需要返回client数据
-
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
-
ProcessTxnResult rc;
-
int opCode = hdr.getType();
-
long sessionId = hdr.getClientId();
-
-
rc = getZKDatabase().processTxn(hdr, txn);
-
-
if (opCode == OpCode.createSession) {
-
if (txn instanceof CreateSessionTxn) {
-
CreateSessionTxn cst = (CreateSessionTxn) txn;
-
sessionTracker.addSession(sessionId, cst
-
.getTimeOut());
-
} else {
-
LOG.warn("*****>>>>> Got "
-
+ txn.getClass() + " "
-
+ txn.toString());
-
}
-
} else if (opCode == OpCode.closeSession) {
-
sessionTracker.removeSession(sessionId);
-
}
-
return rc;
-
}
7.followerA和followerB处理commit,区别在于CommitRequestProcessor中,followerA中的Request会带上connection信息而followerB中的reqesut没有connection信息。所以在FinalRequestProcessor中,followerB创立完session就返回了,而followerA还需要写回client响应
最后看下leader的sessionTracker超时机制,构造SessionTrackerImpl
-
public SessionTrackerImpl(SessionExpirer expirer,
-
ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime,
-
long sid)
-
{
-
super("SessionTracker");
-
this.expirer = expirer;
-
this.expirationInterval = tickTime;
-
this.sessionsWithTimeout = sessionsWithTimeout;
-
-
nextExpirationTime = roundToInterval(System.currentTimeMillis());
-
this.nextSessionId = initializeNextSession(sid);
-
for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
-
addSession(e.getKey(), e.getValue());
-
}
-
}
主线程循环
-
synchronized public void run() {
-
try {
-
while (running) {
-
-
currentTime = System.currentTimeMillis();
-
if (nextExpirationTime > currentTime) {
-
this.wait(nextExpirationTime - currentTime);
-
continue;
-
}
-
-
SessionSet set;
-
set = sessionSets.remove(nextExpirationTime);
-
-
if (set != null) {
-
for (SessionImpl s : set.sessions) {
-
setSessionClosing(s.sessionId);
-
expirer.expire(s);
-
}
-
}
-
-
nextExpirationTime += expirationInterval;
-
}
-
} catch (InterruptedException e) {
-
LOG.error("Unexpected interruption", e);
-
}
-
LOG.info("SessionTrackerImpl exited loop!");
-
}
session更新时
-
synchronized public boolean touchSession(long sessionId, int timeout) {
-
......
-
-
SessionImpl s = sessionsById.get(sessionId);
-
-
if (s == null || s.isClosing()) {
-
return false;
-
}
-
-
long expireTime = roundToInterval(System.currentTimeMillis() + timeout);
-
-
if (s.tickTime >= expireTime) {
-
-
return true;
-
}
-
-
SessionSet set = sessionSets.get(s.tickTime);
-
if (set != null) {
-
set.sessions.remove(s);
-
}
-
-
s.tickTime = expireTime;
-
-
set = sessionSets.get(s.tickTime);
-
if (set == null) {
-
set = new SessionSet();
-
sessionSets.put(expireTime, set);
-
}
-
set.sessions.add(s);
-
return true;
-
}
简单小节
1.session创建需要投票处理
2.结果是每台server上的内存中都会建立相同的session记录
3.sessionid通过serverid+时间保证唯一
4.session超时检查由leader负责,以ticktime定时检查
5.session更新时,会修改自己这个session所属的超时set,超时时间是ticktime倍数
阅读(865) | 评论(0) | 转发(0) |