前一篇介绍了Leader选举,这一篇介绍选举成功之后Leader和Follower之间的初始化。
先看Leader端操作
-
case LEADING:
-
LOG.info("LEADING");
-
try {
-
-
setLeader(makeLeader(logFactory));
-
-
leader.lead();
-
setLeader(null);
-
} catch (Exception e) {
-
LOG.warn("Unexpected exception",e);
-
} finally {
-
if (leader != null) {
-
leader.shutdown("Forcing shutdown");
-
setLeader(null);
-
}
-
setPeerState(ServerState.LOOKING);
-
}
-
break;
-
}
Leader初始化
-
Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
-
this.self = self;
-
try {
-
打开lead端口,这里是2888
-
ss = new ServerSocket();
-
ss.setReuseAddress(true);
-
ss.bind(new InetSocketAddress(self.getQuorumAddress().getPort()));
-
} catch (BindException e) {
-
LOG.error("Couldn't bind to port "
-
+ self.getQuorumAddress().getPort(), e);
-
throw e;
-
}
-
this.zk=zk;
-
}
具体lead过程
-
self.tick = 0;
-
-
zk.loadData();
-
-
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
-
-
-
-
-
cnxAcceptor = new LearnerCnxAcceptor();
-
cnxAcceptor.start();
-
-
readyToStart = true;
-
-
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
-
.......
等待follower连接
-
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
-
synchronized(connectingFollowers) {
-
if (!waitingForNewEpoch) {
-
return epoch;
-
}
-
if (lastAcceptedEpoch >= epoch) {
-
epoch = lastAcceptedEpoch+1;
-
}
-
-
connectingFollowers.add(sid);
-
QuorumVerifier verifier = self.getQuorumVerifier();
-
-
if (connectingFollowers.contains(self.getId()) &&
-
verifier.containsQuorum(connectingFollowers)) {
-
waitingForNewEpoch = false;
-
self.setAcceptedEpoch(epoch);
-
connectingFollowers.notifyAll();
-
}
-
-
else {
-
long start = System.currentTimeMillis();
-
long cur = start;
-
long end = start + self.getInitLimit()*self.getTickTime();
-
while(waitingForNewEpoch && cur < end) {
-
connectingFollowers.wait(end - cur);
-
cur = System.currentTimeMillis();
-
}
-
-
if (waitingForNewEpoch) {
-
throw new InterruptedException("Timeout while waiting for epoch from quorum");
-
}
-
}
-
return epoch;
-
}
-
}
好的,这个时候我们假设其他follower还没连接进来,那Leader就会在此等待。再来看Follower的初始化过程
-
case FOLLOWING:
-
try {
-
LOG.info("FOLLOWING");
-
-
setFollower(makeFollower(logFactory));
-
-
follower.followLeader();
-
} catch (Exception e) {
-
LOG.warn("Unexpected exception",e);
-
} finally {
-
follower.shutdown();
-
setFollower(null);
-
setPeerState(ServerState.LOOKING);
-
}
-
break;
具体follow过程
-
void followLeader() throws InterruptedException {
-
.......
-
try {
-
-
InetSocketAddress addr = findLeader();
-
try {
-
-
connectToLeader(addr);
-
-
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
-
-
-
-
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
-
if (newEpoch < self.getAcceptedEpoch()) {
-
LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
-
+ " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
-
throw new IOException("Error: Epoch of leader is lower");
-
}
-
-
syncWithLeader(newEpochZxid);
-
QuorumPacket qp = new QuorumPacket();
-
-
while (self.isRunning()) {
-
readPacket(qp);
-
processPacket(qp);
-
}
-
......
-
}
连接leader
-
protected void connectToLeader(InetSocketAddress addr)
-
throws IOException, ConnectException, InterruptedException {
-
sock = new Socket();
-
-
sock.setSoTimeout(self.tickTime * self.initLimit);
-
-
for (int tries = 0; tries < 5; tries++) {
-
try {
-
-
sock.connect(addr, self.tickTime * self.syncLimit);
-
sock.setTcpNoDelay(nodelay);
-
break;
-
} catch (IOException e) {
-
if (tries == 4) {
-
LOG.error("Unexpected exception",e);
-
throw e;
-
} else {
-
LOG.warn("Unexpected exception, tries="+tries+
-
", connecting to " + addr,e);
-
sock = new Socket();
-
sock.setSoTimeout(self.tickTime * self.initLimit);
-
}
-
}
-
Thread.sleep(1000);
-
}
-
leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
-
sock.getInputStream()));
-
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
-
leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
-
}
假设这里follower顺利连上了leader,此时leader端会为每个follower启动单独IO线程,请看LearnerCnxAcceptor代码
-
public void run() {
-
try {
-
while (!stop) {
-
try{
-
-
Socket s = ss.accept();
-
-
-
-
s.setSoTimeout(self.tickTime * self.initLimit);
-
s.setTcpNoDelay(nodelay);
-
-
LearnerHandler fh = new LearnerHandler(s, Leader.this);
-
fh.start();
-
} catch (SocketException e) {
-
.....
-
}
leader端为follower建立IO线程,其处理过程和follower自身的主线程根据协议相互交互,以下将通过数据交换场景式分析这个过程,leader端IO线程LearnerHandler启动
-
ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
-
.getInputStream()));
-
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
-
oa = BinaryOutputArchive.getArchive(bufferedOutput);
-
-
QuorumPacket qp = new QuorumPacket();
-
ia.readRecord(qp, "packet");
follower端进入registerWithLeader处理
-
long lastLoggedZxid = self.getLastLoggedZxid();
-
QuorumPacket qp = new QuorumPacket();
-
-
qp.setType(pktType);
-
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
-
-
-
-
-
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);
-
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
-
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
-
boa.writeRecord(li, "LearnerInfo");
-
qp.setData(bsid.toByteArray());
-
-
writePacket(qp, true);
-
-
readPacket(qp);
leader端收到包处理
-
byte learnerInfoData[] = qp.getData();
-
if (learnerInfoData != null) {
-
if (learnerInfoData.length == 8) {
-
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
-
this.sid = bbsid.getLong();
-
} else {
-
-
LearnerInfo li = new LearnerInfo();
-
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
-
this.sid = li.getServerid();
-
this.version = li.getProtocolVersion();
-
}
-
} else {
-
this.sid = leader.followerCounter.getAndDecrement();
-
}
-
-
......
-
-
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
-
-
long peerLastZxid;
-
StateSummary ss = null;
-
long zxid = qp.getZxid();
-
-
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
-
-
......
-
-
byte ver[] = new byte[4];
-
ByteBuffer.wrap(ver).putInt(0x10000);
-
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
-
oa.writeRecord(newEpochPacket, "packet");
-
bufferedOutput.flush();
-
QuorumPacket ackEpochPacket = new QuorumPacket();
-
-
ia.readRecord(ackEpochPacket, "packet");
-
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
-
LOG.error(ackEpochPacket.toString()
-
+ " is not ACKEPOCH");
-
return;
-
}
-
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
-
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
-
leader.waitForEpochAck(this.getSid(), ss);
-
}
此时follower收到LEADERINFO包处理:
-
final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
-
if (qp.getType() == Leader.LEADERINFO) {
-
-
leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
-
byte epochBytes[] = new byte[4];
-
final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
-
-
if (newEpoch > self.getAcceptedEpoch()) {
-
wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
-
self.setAcceptedEpoch(newEpoch);
-
}
-
......
-
-
QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
-
writePacket(ackNewEpoch, true);
-
return ZxidUtils.makeZxid(newEpoch, 0);
-
}
leader收到Leader.ACKEPOCH后进入waitForEpochAck处理
-
public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
-
synchronized(electingFollowers) {
-
if (electionFinished) {
-
return;
-
}
-
if (ss.getCurrentEpoch() != -1) {
-
......
-
-
electingFollowers.add(id);
-
}
-
QuorumVerifier verifier = self.getQuorumVerifier();
-
-
if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) {
-
electionFinished = true;
-
electingFollowers.notifyAll();
-
}
-
-
else {
-
long start = System.currentTimeMillis();
-
long cur = start;
-
long end = start + self.getInitLimit()*self.getTickTime();
-
while(!electionFinished && cur < end) {
-
electingFollowers.wait(end - cur);
-
cur = System.currentTimeMillis();
-
}
-
if (!electionFinished) {
-
throw new InterruptedException("Timeout while waiting for epoch to be acked by quorum");
-
}
-
}
-
}
-
}
假设IO线程在此等待,此时leader主线程在getEpochToPropose恢复后继续执行
-
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
-
-
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
-
-
synchronized(this){
-
lastProposed = zk.getZxid();
-
}
-
-
newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
-
null, null);
-
......
-
-
outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal);
-
-
newLeaderProposal.ackSet.add(self.getId());
-
-
waitForEpochAck(self.getId(), leaderStateSummary);
由于之前已经有follower进来,满足选举条件,则IO线程和leader主线程都继续往下执行,先看leader主线程
-
-
self.setCurrentEpoch(epoch);
-
-
-
-
-
-
while (!self.getQuorumVerifier().containsQuorum(newLeaderProposal.ackSet)){
-
-
-
if (self.tick > self.initLimit) {
-
-
-
StringBuilder ackToString = new StringBuilder();
-
for(Long id : newLeaderProposal.ackSet)
-
ackToString.append(id + ": ");
-
-
shutdown("Waiting for a quorum of followers, only synced with: " + ackToString);
-
HashSet<Long> followerSet = new HashSet<Long>();
-
-
for(LearnerHandler f : getLearners()) {
-
followerSet.add(f.getSid());
-
}
-
-
if (self.getQuorumVerifier().containsQuorum(followerSet)) {
-
-
LOG.warn("Enough followers present. "+
-
"Perhaps the initTicks need to be increased.");
-
}
-
return;
-
}
-
Thread.sleep(self.tickTime);
-
self.tick++;
-
}
这个时候IO线程继续执行
-
-
-
int packetToSend = Leader.SNAP;
-
long zxidToSend = 0;
-
long leaderLastZxid = 0;
-
-
long updates = peerLastZxid;
-
-
-
-
-
ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
-
ReadLock rl = lock.readLock();
-
try {
-
rl.lock();
-
final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
-
final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
-
LOG.info("Synchronizing with Follower sid: " + sid
-
+" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
-
+" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
-
+" peerLastZxid=0x"+Long.toHexString(peerLastZxid));
-
-
-
LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
-
-
if (proposals.size() != 0) {
-
-
if ((maxCommittedLog >= peerLastZxid)
-
&& (minCommittedLog <= peerLastZxid)) {
-
.......
-
-
-
-
packetToSend = Leader.DIFF;
-
zxidToSend = maxCommittedLog;
-
-
for (Proposal propose: proposals) {
-
-
-
if (propose.packet.getZxid() <= peerLastZxid) {
-
prevProposalZxid = propose.packet.getZxid();
-
continue;
-
} else {
-
-
-
-
if (firstPacket) {
-
firstPacket = false;
-
-
-
if (prevProposalZxid < peerLastZxid) {
-
-
packetToSend = Leader.TRUNC;
-
zxidToSend = prevProposalZxid;
-
updates = zxidToSend;
-
}
-
}
-
-
queuePacket(propose.packet);
-
-
QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
-
null, null);
-
queuePacket(qcommit);
-
}
-
}
-
}
-
-
else if (peerLastZxid > maxCommittedLog) {
-
LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
-
Long.toHexString(maxCommittedLog),
-
Long.toHexString(updates));
-
-
packetToSend = Leader.TRUNC;
-
zxidToSend = maxCommittedLog;
-
updates = zxidToSend;
-
} else {
-
LOG.warn("Unhandled proposal scenario");
-
}
-
}
-
-
else if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
-
.....
-
packetToSend = Leader.DIFF;
-
zxidToSend = peerLastZxid;
-
.......
-
-
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
-
ZxidUtils.makeZxid(newEpoch, 0), null, null);
-
if (getVersion() < 0x10000) {
-
oa.writeRecord(newLeaderQP, "packet");
-
} else {
-
queuedPackets.add(newLeaderQP);
-
}
-
bufferedOutput.flush();
-
-
if (packetToSend == Leader.SNAP) {
-
zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
-
}
-
-
oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
-
bufferedOutput.flush();
-
......
-
-
-
new Thread() {
-
public void run() {
-
Thread.currentThread().setName(
-
"Sender-" + sock.getRemoteSocketAddress());
-
try {
-
sendPackets();
-
} catch (InterruptedException e) {
-
LOG.warn("Unexpected interruption",e);
-
}
-
}
-
}.start();
-
-
-
-
-
-
-
-
qp = new QuorumPacket();
-
ia.readRecord(qp, "packet");
在我们这个集群里。由于是刚启动的,所以leader会直接发送DIFF包,然后再发送一个NEWLEADER包
接着follower收到包处理,在syncWithLeader中
-
QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
-
QuorumPacket qp = new QuorumPacket();
-
long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
-
-
readPacket(qp);
-
LinkedList<Long> packetsCommitted = new LinkedList<Long>();
-
LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
-
synchronized (zk) {
-
-
if (qp.getType() == Leader.DIFF) {
-
LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));
-
}
-
-
else if (qp.getType() == Leader.SNAP) {
-
LOG.info("Getting a snapshot from leader");
-
-
-
zk.getZKDatabase().clear();
-
zk.getZKDatabase().deserializeSnapshot(leaderIs);
-
String signature = leaderIs.readString("signature");
-
if (!signature.equals("BenWasHere")) {
-
LOG.error("Missing signature. Got " + signature);
-
throw new IOException("Missing signature");
-
}
-
}
-
-
else if (qp.getType() == Leader.TRUNC) {
-
-
LOG.warn("Truncating log to get in sync with the leader 0x"
-
+ Long.toHexString(qp.getZxid()));
-
boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
-
......
-
-
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
-
-
zk.createSessionTracker();
-
-
long lastQueued = 0;
-
-
-
-
-
boolean snapshotTaken = false;
-
-
outerLoop:
-
-
while (self.isRunning()) {
-
readPacket(qp);
-
switch(qp.getType()) {
-
-
case Leader.PROPOSAL:
-
PacketInFlight pif = new PacketInFlight();
-
pif.hdr = new TxnHeader();
-
pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
-
if (pif.hdr.getZxid() != lastQueued + 1) {
-
LOG.warn("Got zxid 0x"
-
+ Long.toHexString(pif.hdr.getZxid())
-
+ " expected 0x"
-
+ Long.toHexString(lastQueued + 1));
-
}
-
lastQueued = pif.hdr.getZxid();
-
packetsNotCommitted.add(pif);
-
break;
-
-
case Leader.COMMIT:
-
if (!snapshotTaken) {
-
pif = packetsNotCommitted.peekFirst();
-
if (pif.hdr.getZxid() != qp.getZxid()) {
-
LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
-
} else {
-
zk.processTxn(pif.hdr, pif.rec);
-
packetsNotCommitted.remove();
-
}
-
} else {
-
packetsCommitted.add(qp.getZxid());
-
}
-
break;
-
case Leader.INFORM:
-
TxnHeader hdr = new TxnHeader();
-
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
-
zk.processTxn(hdr, txn);
-
break;
-
-
case Leader.UPTODATE:
-
if (!snapshotTaken) {
-
zk.takeSnapshot();
-
self.setCurrentEpoch(newEpoch);
-
}
-
self.cnxnFactory.setZooKeeperServer(zk);
-
break outerLoop;
-
-
case Leader.NEWLEADER:
-
zk.takeSnapshot();
-
self.setCurrentEpoch(newEpoch);
-
snapshotTaken = true;
-
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
-
break;
-
}
-
}
-
}
follower在这里同步leader数据,在拿到NEWLEADER包之后序列化到文件,发送ACK包,leaderIO线程处理
-
qp = new QuorumPacket();
-
ia.readRecord(qp, "packet");
-
if(qp.getType() != Leader.ACK){
-
LOG.error("Next packet was supposed to be an ACK");
-
return;
-
}
-
-
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
-
-
-
sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
-
-
-
-
-
-
synchronized(leader.zk){
-
while(!leader.zk.isRunning() && !this.isInterrupted()){
-
leader.zk.wait(20);
-
}
-
}
-
-
-
-
-
-
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
具体的ACK包处理
-
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
-
-
......
-
Proposal p = outstandingProposals.get(zxid);
-
......
-
-
p.ackSet.add(sid);
-
......
-
-
if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
-
.......
-
-
} else {
-
lastCommitted = zxid;
-
LOG.info("Have quorum of supporters; starting up and setting last processed zxid: 0x{}",
-
Long.toHexString(zk.getZxid()));
-
-
zk.startup();
-
zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
-
}
-
}
-
}
由于follower进来已经满足投票条件,则leader 的server启动,如下
-
public void startup() {
-
if (sessionTracker == null) {
-
createSessionTracker();
-
}
-
-
startSessionTracker();
-
-
setupRequestProcessors();
-
-
registerJMX();
-
-
synchronized (this) {
-
running = true;
-
notifyAll();
-
}
-
}
-
protected void setupRequestProcessors() {
-
后final处理器
-
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
-
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
-
finalProcessor, getLeader().toBeApplied);
-
票结果确认
-
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
-
Long.toString(getServerId()), false);
-
commitProcessor.start();
-
票发起
-
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
-
commitProcessor);
-
proposalProcessor.initialize();
-
务预处理
-
firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
-
((PrepRequestProcessor)firstProcessor).start();
-
}
leader启动后,发送一个UPTODATE包,follower处理
-
-
case Leader.UPTODATE:
-
if (!snapshotTaken) {
-
zk.takeSnapshot();
-
self.setCurrentEpoch(newEpoch);
-
}
-
self.cnxnFactory.setZooKeeperServer(zk);
-
break outerLoop;
-
......
-
-
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
-
writePacket(ack, true);
leader的IO线程LearnerHandler进入主循环,收到ACK包处理
-
while (true) {
-
qp = new QuorumPacket();
-
ia.readRecord(qp, "packet");
-
-
......
-
tickOfLastAck = leader.self.tick;
-
-
-
ByteBuffer bb;
-
long sessionId;
-
int cxid;
-
int type;
-
-
switch (qp.getType()) {
-
-
case Leader.ACK:
-
......
-
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
-
break;
-
-
case Leader.PING:
-
-
ByteArrayInputStream bis = new ByteArrayInputStream(qp
-
.getData());
-
DataInputStream dis = new DataInputStream(bis);
-
while (dis.available() > 0) {
-
long sess = dis.readLong();
-
int to = dis.readInt();
-
leader.zk.touch(sess, to);
-
}
-
break;
-
-
case Leader.REVALIDATE:
-
bis = new ByteArrayInputStream(qp.getData());
-
dis = new DataInputStream(bis);
-
long id = dis.readLong();
-
int to = dis.readInt();
-
ByteArrayOutputStream bos = new ByteArrayOutputStream();
-
DataOutputStream dos = new DataOutputStream(bos);
-
dos.writeLong(id);
-
boolean valid = leader.zk.touch(id, to);
-
if (valid) {
-
try {
-
-
-
-
leader.zk.setOwner(id, this);
-
} catch (SessionExpiredException e) {
-
LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);
-
}
-
}
-
if (LOG.isTraceEnabled()) {
-
ZooTrace.logTraceMessage(LOG,
-
ZooTrace.SESSION_TRACE_MASK,
-
"Session 0x" + Long.toHexString(id)
-
+ " is valid: "+ valid);
-
}
-
dos.writeBoolean(valid);
-
qp.setData(bos.toByteArray());
-
queuedPackets.add(qp);
-
break;
-
-
case Leader.REQUEST:
-
bb = ByteBuffer.wrap(qp.getData());
-
sessionId = bb.getLong();
-
cxid = bb.getInt();
-
type = bb.getInt();
-
bb = bb.slice();
-
Request si;
-
if(type == OpCode.sync){
-
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
-
} else {
-
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
-
}
-
si.setOwner(this);
-
leader.zk.submitRequest(si);
-
break;
-
default:
-
}
-
}
这个时候LearnerHandler线程已经启动完成,follower发完ACK包后
-
writePacket(ack, true);
-
-
sock.setSoTimeout(self.tickTime * self.syncLimit);
-
-
zk.startup();
-
-
if (zk instanceof FollowerZooKeeperServer) {
-
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer)zk;
-
for(PacketInFlight p: packetsNotCommitted) {
-
fzk.logRequest(p.hdr, p.rec);
-
}
-
for(Long zxid: packetsCommitted) {
-
fzk.commit(zxid);
-
}
-
}
Follower的zookeeper server启动
-
@Override
-
protected void setupRequestProcessors() {
-
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
-
commitProcessor = new CommitProcessor(finalProcessor,
-
Long.toString(getServerId()), true);
-
commitProcessor.start();
-
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
-
((FollowerRequestProcessor) firstProcessor).start();
-
syncProcessor = new SyncRequestProcessor(this,
-
new SendAckRequestProcessor((Learner)getFollower()));
-
syncProcessor.start();
-
}
Follower进入主处理
-
QuorumPacket qp = new QuorumPacket();
-
while (self.isRunning()) {
-
readPacket(qp);
-
processPacket(qp);
-
}
-
protected void processPacket(QuorumPacket qp) throws IOException{
-
switch (qp.getType()) {
-
-
case Leader.PING:
-
ping(qp);
-
break;
-
-
case Leader.PROPOSAL:
-
TxnHeader hdr = new TxnHeader();
-
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
-
if (hdr.getZxid() != lastQueued + 1) {
-
LOG.warn("Got zxid 0x"
-
+ Long.toHexString(hdr.getZxid())
-
+ " expected 0x"
-
+ Long.toHexString(lastQueued + 1));
-
}
-
lastQueued = hdr.getZxid();
-
fzk.logRequest(hdr, txn);
-
break;
-
-
case Leader.COMMIT:
-
fzk.commit(qp.getZxid());
-
break;
-
case Leader.UPTODATE:
-
LOG.error("Received an UPTODATE message after Follower started");
-
break;
-
case Leader.REVALIDATE:
-
revalidate(qp);
-
break;
-
case Leader.SYNC:
-
fzk.sync();
-
break;
-
}
-
}
这个时候Follower也初始化完成,再看leader主线程,Leader主线程之前在等待follower同步结束,结束之后,leader主线程进入主循环,检查follower是否down掉
-
while (true) {
-
Thread.sleep(self.tickTime / 2);
-
if (!tickSkip) {
-
self.tick++;
-
}
-
HashSet<Long> syncedSet = new HashSet<Long>();
-
-
-
syncedSet.add(self.getId());
-
-
for (LearnerHandler f : getLearners()) {
-
-
-
if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
-
syncedSet.add(f.getSid());
-
}
-
f.ping();
-
}
-
-
if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
-
-
-
-
shutdown("Only " + syncedSet.size() + " followers, need "
-
+ (self.getVotingView().size() / 2));
-
-
-
return;
-
}
-
tickSkip = !tickSkip;
-
}
阅读(1431) | 评论(0) | 转发(0) |