前面几篇文章讲了follower和leader之间如何选举和初始化的,这一篇将以之前描述过的CREATE请求作为例子来描述在集群环境下是如何处理事务的。
关于client和zookeeper server的描述前几篇文章已经涉及了。这里不就不再赘述了。假设client和某一个follower建立了连接,并发送了CREATE请求。在follower端,IO线程拿到请求开始执行处理链,Follower处理链如下
初始化代码:
-
protected void setupRequestProcessors() {
-
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
-
commitProcessor = new CommitProcessor(finalProcessor,
-
Long.toString(getServerId()), true);
-
commitProcessor.start();
-
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
-
((FollowerRequestProcessor) firstProcessor).start();
-
syncProcessor = new SyncRequestProcessor(this,
-
new SendAckRequestProcessor((Learner)getFollower()));
-
syncProcessor.start();
-
}
第一个处理器是FollowerRequestProcessor,处理如下
-
while (!finished) {
-
Request request = queuedRequests.take();
-
if (LOG.isTraceEnabled()) {
-
ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
-
'F', request, "");
-
}
-
if (request == Request.requestOfDeath) {
-
break;
-
}
-
-
-
-
-
nextProcessor.processRequest(request);
-
-
-
-
-
-
-
-
switch (request.type) {
-
case OpCode.sync:
-
zks.pendingSyncs.add(request);
-
zks.getFollower().request(request);
-
break;
-
case OpCode.create:
-
case OpCode.delete:
-
case OpCode.setData:
-
case OpCode.setACL:
-
case OpCode.createSession:
-
case OpCode.closeSession:
-
case OpCode.multi:
-
zks.getFollower().request(request);
-
break;
-
}
转发事务请求给leader
-
void request(Request request) throws IOException {
-
反序列化
-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
DataOutputStream oa = new DataOutputStream(baos);
-
oa.writeLong(request.sessionId);
-
oa.writeInt(request.cxid);
-
oa.writeInt(request.type);
-
if (request.request != null) {
-
request.request.rewind();
-
int len = request.request.remaining();
-
byte b[] = new byte[len];
-
request.request.get(b);
-
request.request.rewind();
-
oa.write(b);
-
}
-
oa.close();
-
QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
-
.toByteArray(), request.authInfo);
-
writePacket(qp, true);
-
}
在CommitProcessor中主要是等待缓存请求,并等待该请求被commit
-
while (!finished) {
-
int len = toProcess.size();
-
-
for (int i = 0; i < len; i++) {
-
nextProcessor.processRequest(toProcess.get(i));
-
}
-
toProcess.clear();
-
synchronized (this) {
-
-
if ((queuedRequests.size() == 0 || nextPending != null)
-
&& committedRequests.size() == 0) {
-
wait();
-
continue;
-
}
-
-
-
-
if ((queuedRequests.size() == 0 || nextPending != null)
-
&& committedRequests.size() > 0) {
-
Request r = committedRequests.remove();
-
-
-
-
-
-
-
-
if (nextPending != null
-
&& nextPending.sessionId == r.sessionId
-
&& nextPending.cxid == r.cxid) {
-
-
-
nextPending.hdr = r.hdr;
-
nextPending.txn = r.txn;
-
nextPending.zxid = r.zxid;
-
toProcess.add(nextPending);
-
nextPending = null;
-
}
-
-
else {
-
-
-
toProcess.add(r);
-
}
-
}
-
}
-
-
-
-
-
if (nextPending != null) {
-
continue;
-
}
-
-
synchronized (this) {
-
-
while (nextPending == null && queuedRequests.size() > 0) {
-
Request request = queuedRequests.remove();
-
switch (request.type) {
-
case OpCode.create:
-
case OpCode.delete:
-
case OpCode.setData:
-
case OpCode.multi:
-
case OpCode.setACL:
-
case OpCode.createSession:
-
case OpCode.closeSession:
-
nextPending = request;
-
break;
-
case OpCode.sync:
-
if (matchSyncs) {
-
nextPending = request;
-
} else {
-
toProcess.add(request);
-
}
-
break;
-
default:
-
toProcess.add(request);
-
}
-
}
-
}
在这个场景中,CREATE请求先到了queuedRequests中,然后nextPending会指向这个请求,但是此时还未commit,所以CommitProcessor会wait,直到该请求投票被通过,然后被commit。
此时leader收到了转发的请求,在LearnerHandler中
-
case Leader.REQUEST:
-
-
bb = ByteBuffer.wrap(qp.getData());
-
sessionId = bb.getLong();
-
cxid = bb.getInt();
-
type = bb.getInt();
-
bb = bb.slice();
-
Request si;
-
if(type == OpCode.sync){
-
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
-
} else {
-
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
-
}
-
si.setOwner(this);
-
-
leader.zk.submitRequest(si);
-
break;
Leader端的执行链如下
PrepRequestProcessor在之前的文章已经分析过了,主要是根据请求类型,拼装不同的Request,这里是CreateRequest
接下来ProposalRequestProcessor执行,ProposalRequestProcessor主要是发起投票
-
public void processRequest(Request request) throws RequestProcessorException {
-
-
......
-
-
-
-
-
-
-
-
-
-
if(request instanceof LearnerSyncRequest){
-
zks.getLeader().processSync((LearnerSyncRequest)request);
-
} else {
-
-
nextProcessor.processRequest(request);
-
if (request.hdr != null) {
-
-
try {
-
-
zks.getLeader().propose(request);
-
} catch (XidRolloverException e) {
-
throw new RequestProcessorException(e.getMessage(), e);
-
}
-
-
syncProcessor.processRequest(request);
-
}
-
}
-
}
leader发起投票
-
public Proposal propose(Request request) throws XidRolloverException {
-
.......
-
-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
-
try {
-
request.hdr.serialize(boa, "hdr");
-
if (request.txn != null) {
-
request.txn.serialize(boa, "txn");
-
}
-
baos.close();
-
} catch (IOException e) {
-
LOG.warn("This really should be impossible", e);
-
}
-
-
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
-
baos.toByteArray(), null);
-
-
Proposal p = new Proposal();
-
p.packet = pp;
-
p.request = request;
-
synchronized (this) {
-
if (LOG.isDebugEnabled()) {
-
LOG.debug("Proposing:: " + request);
-
}
-
-
lastProposed = p.packet.getZxid();
-
-
outstandingProposals.put(lastProposed, p);
-
-
sendPacket(pp);
-
}
-
return p;
-
}
leader发完投票后,通过SyncRequestProcessor将事务写入日志文件,本地写成功后,投票成功。
SyncRequestProcessor之前文章已经分析过了,主要是将事务顺序写入日志文件。主要看之后的AckRequestProcessor
-
public void processRequest(Request request) {
-
QuorumPeer self = leader.self;
-
if(self != null)
-
本地日志写成功后,认为自己成功了
-
leader.processAck(self.getId(), request.zxid, null);
-
else
-
LOG.error("Null QuorumPeer");
-
}
leader的processAck方法比较关键,之前也有分析,这里再强调下
-
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
-
.......
-
-
Proposal p = outstandingProposals.get(zxid);
-
if (p == null) {
-
LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
-
Long.toHexString(zxid), followerAddr);
-
return;
-
}
-
-
p.ackSet.add(sid);
-
if (LOG.isDebugEnabled()) {
-
LOG.debug("Count for zxid: 0x{} is {}",
-
Long.toHexString(zxid), p.ackSet.size());
-
}
-
/如果满足投票结束条件,默认是半数server统一,则提交事务
-
if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
-
if (zxid != lastCommitted+1) {
-
LOG.warn("Commiting zxid 0x{} from {} not first!",
-
Long.toHexString(zxid), followerAddr);
-
LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
-
}
-
outstandingProposals.remove(zxid);
-
if (p.request != null) {
-
-
toBeApplied.add(p);
-
}
-
-
if ((zxid & 0xffffffffL) != 0) {
-
if (p.request == null) {
-
LOG.warn("Going to commmit null request for proposal: {}", p);
-
}
-
-
commit(zxid);
-
-
inform(p);
-
-
zk.commitProcessor.commit(p.request);
-
......
-
}
-
}
通知follower提交事务
-
public void commit(long zxid) {
-
synchronized(this){
-
lastCommitted = zxid;
-
}
-
-
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
-
sendPacket(qp);
-
}
此时Follower收到proposal包,follower中处理投票
-
case Leader.PROPOSAL:
-
TxnHeader hdr = new TxnHeader();
-
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
-
if (hdr.getZxid() != lastQueued + 1) {
-
LOG.warn("Got zxid 0x"
-
+ Long.toHexString(hdr.getZxid())
-
+ " expected 0x"
-
+ Long.toHexString(lastQueued + 1));
-
}
-
lastQueued = hdr.getZxid();
-
-
fzk.logRequest(hdr, txn);
-
break;
-
public void logRequest(TxnHeader hdr, Record txn) {
-
Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),
-
hdr.getType(), null, null);
-
request.hdr = hdr;
-
request.txn = txn;
-
request.zxid = hdr.getZxid();
-
if ((request.zxid & 0xffffffffL) != 0) {
-
pendingTxns.add(request);
-
}
-
是通过SyncRequestProcessor将事务写入本地文件,再发送ack包
-
syncProcessor.processRequest(request);
-
}
日志写成功后,SendAckRequestProcessor发送ACK包
-
public void processRequest(Request si) {
-
if(si.type != OpCode.sync){
-
-
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
-
null);
-
try {
-
-
learner.writePacket(qp, false);
-
} catch (IOException e) {
-
LOG.warn("Closing connection to leader, exception during packet send", e);
-
try {
-
if (!learner.sock.isClosed()) {
-
learner.sock.close();
-
}
-
} catch (IOException e1) {
-
-
LOG.debug("Ignoring error closing the connection", e1);
-
}
-
}
-
}
-
}
此时,leader收到ack包,LearnerHandler线程中
-
case Leader.ACK:
-
if (this.learnerType == LearnerType.OBSERVER) {
-
if (LOG.isDebugEnabled()) {
-
LOG.debug("Received ACK from Observer " + this.sid);
-
}
-
}
-
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
-
break;
还是调用了processAck方法,由于之前已经有了leader自己的投票,此时follower再投一票,3台机器的集群即认为投票成功,leader开始发送commit操作,也就是发送commit包给follower。
follower收到commit包
-
case Leader.COMMIT:
-
fzk.commit(qp.getZxid());
-
break;
-
-
public void commit(long zxid) {
-
if (pendingTxns.size() == 0) {
-
LOG.warn("Committing " + Long.toHexString(zxid)
-
+ " without seeing txn");
-
return;
-
}
-
long firstElementZxid = pendingTxns.element().zxid;
-
if (firstElementZxid != zxid) {
-
LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
-
+ " but next pending txn 0x"
-
+ Long.toHexString(firstElementZxid));
-
System.exit(12);
-
}
-
-
Request request = pendingTxns.remove();
-
-
commitProcessor.commit(request);
-
}
Commit之后请求将交给FinalRequestProcessor处理,修改最后的内存db结构,如果是本机请求则写回响应,如果不是则不用写回响应
阅读(1081) | 评论(0) | 转发(0) |