Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1959613
  • 博文数量: 1000
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 7921
  • 用 户 组: 普通用户
  • 注册时间: 2013-08-20 09:23
个人简介

storage R&D guy.

文章分类

全部博文(1000)

文章存档

2019年(5)

2017年(47)

2016年(38)

2015年(539)

2014年(193)

2013年(178)

分类: 服务器与存储

2015-07-21 10:56:35

前面几篇文章讲了follower和leader之间如何选举和初始化的,这一篇将以之前描述过的CREATE请求作为例子来描述在集群环境下是如何处理事务的。

关于client和zookeeper server的描述前几篇文章已经涉及了。这里不就不再赘述了。假设client和某一个follower建立了连接,并发送了CREATE请求。在follower端,IO线程拿到请求开始执行处理链,Follower处理链如下

初始化代码:

Java代码  收藏代码
  1. protected void setupRequestProcessors() {  
  2.     RequestProcessor finalProcessor = new FinalRequestProcessor(this);  
  3.     commitProcessor = new CommitProcessor(finalProcessor,  
  4.             Long.toString(getServerId()), true);  
  5.     commitProcessor.start();  
  6.     firstProcessor = new FollowerRequestProcessor(this, commitProcessor);  
  7.     ((FollowerRequestProcessor) firstProcessor).start();  
  8.     syncProcessor = new SyncRequestProcessor(this,  
  9.             new SendAckRequestProcessor((Learner)getFollower()));  
  10.     syncProcessor.start();  
  11. }  

 第一个处理器是FollowerRequestProcessor,处理如下

Java代码  收藏代码
  1. while (!finished) {  
  2.                 Request request = queuedRequests.take();  
  3.                 if (LOG.isTraceEnabled()) {  
  4.                     ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,  
  5.                             'F', request, "");  
  6.                 }  
  7.                 if (request == Request.requestOfDeath) {  
  8.                     break;  
  9.                 }  
  10.                 // We want to queue the request to be processed before we submit  
  11.                 // the request to the leader so that we are ready to receive  
  12.                 // the response  
  13.         //先交给CommitProcessor,最终投票通过后,会通过CommitProcessor的commit方法最终提交事务  
  14.                 nextProcessor.processRequest(request);  
  15.                   
  16.                 // We now ship the request to the leader. As with all  
  17.                 // other quorum operations, sync also follows this code  
  18.                 // path, but different from others, we need to keep track  
  19.                 // of the sync operations this follower has pending, so we  
  20.                 // add it to pendingSyncs.  
  21.         //只有事务请求才转发给leader,进行投票  
  22.                 switch (request.type) {  
  23.                 case OpCode.sync:  
  24.                     zks.pendingSyncs.add(request);  
  25.                     zks.getFollower().request(request);  
  26.                     break;  
  27.                 case OpCode.create:  
  28.                 case OpCode.delete:  
  29.                 case OpCode.setData:  
  30.                 case OpCode.setACL:  
  31.                 case OpCode.createSession:  
  32.                 case OpCode.closeSession:  
  33.                 case OpCode.multi:  
  34.                     zks.getFollower().request(request);  
  35.                     break;  
  36.                 }  

 转发事务请求给leader

Java代码  收藏代码
  1. void request(Request request) throws IOException {  
  2. 反序列化  
  3.      ByteArrayOutputStream baos = new ByteArrayOutputStream();  
  4.      DataOutputStream oa = new DataOutputStream(baos);  
  5.      oa.writeLong(request.sessionId);  
  6.      oa.writeInt(request.cxid);  
  7.      oa.writeInt(request.type);  
  8.      if (request.request != null) {  
  9.          request.request.rewind();  
  10.          int len = request.request.remaining();  
  11.          byte b[] = new byte[len];  
  12.          request.request.get(b);  
  13.          request.request.rewind();  
  14.          oa.write(b);  
  15.      }  
  16.      oa.close();  
  17.      QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos  
  18.              .toByteArray(), request.authInfo);  
  19.      writePacket(qp, true);  
  20.  }  

 在CommitProcessor中主要是等待缓存请求,并等待该请求被commit

Java代码  收藏代码
  1. while (!finished) {  
  2.                int len = toProcess.size();  
  3.     //最终的请求处理交给FinalRequestProcessor  
  4.                for (int i = 0; i < len; i++) {  
  5.                    nextProcessor.processRequest(toProcess.get(i));  
  6.                }  
  7.                toProcess.clear();  
  8.                synchronized (this) {  
  9.         //如果没有commit请求,则wait,直到commit请求的时候唤醒  
  10.                    if ((queuedRequests.size() == 0 || nextPending != null)  
  11.                            && committedRequests.size() == 0) {  
  12.                        wait();  
  13.                        continue;  
  14.                    }  
  15.                    // First check and see if the commit came in for the pending  
  16.                    // request  
  17.         //有commit请求,则添加到最终队列,下一轮处理  
  18.                    if ((queuedRequests.size() == 0 || nextPending != null)  
  19.                            && committedRequests.size() > 0) {  
  20.                        Request r = committedRequests.remove();  
  21.                        /* 
  22.                         * We match with nextPending so that we can move to the 
  23.                         * next request when it is committed. We also want to 
  24.                         * use nextPending because it has the cnxn member set 
  25.                         * properly. 
  26.                         */  
  27.          //如果是自己的请求,则使用之前的Request,以为之前的Request带client的连接信息,可以写回响应  
  28.                        if (nextPending != null  
  29.                                && nextPending.sessionId == r.sessionId  
  30.                                && nextPending.cxid == r.cxid) {  
  31.                            // we want to send our version of the request.  
  32.                            // the pointer to the connection in the request  
  33.                            nextPending.hdr = r.hdr;  
  34.                            nextPending.txn = r.txn;  
  35.                            nextPending.zxid = r.zxid;  
  36.                            toProcess.add(nextPending);  
  37.                            nextPending = null;  
  38.                        }  
  39.         //如果是别人的请求,则使用新的Request,不带连接信息,无法发送响应  
  40.         else {  
  41.                            // this request came from someone else so just  
  42.                            // send the commit packet  
  43.                            toProcess.add(r);  
  44.                        }  
  45.                    }  
  46.                }  
  47.   
  48.                // We haven't matched the pending requests, so go back to  
  49.                // waiting  
  50.     //有pending请求,但是该请求还未commit,则继续  
  51.                if (nextPending != null) {  
  52.                    continue;  
  53.                }  
  54.     //从队列中拿待处理请求  
  55.                synchronized (this) {  
  56.                    // Process the next requests in the queuedRequests  
  57.                    while (nextPending == null && queuedRequests.size() > 0) {  
  58.                        Request request = queuedRequests.remove();  
  59.                        switch (request.type) {  
  60.                        case OpCode.create:  
  61.                        case OpCode.delete:  
  62.                        case OpCode.setData:  
  63.                        case OpCode.multi:  
  64.                        case OpCode.setACL:  
  65.                        case OpCode.createSession:  
  66.                        case OpCode.closeSession:  
  67.                            nextPending = request;  
  68.                            break;  
  69.                        case OpCode.sync:  
  70.                            if (matchSyncs) {  
  71.                                nextPending = request;  
  72.                            } else {  
  73.                                toProcess.add(request);  
  74.                            }  
  75.                            break;  
  76.                        default:  
  77.                            toProcess.add(request);  
  78.                        }  
  79.                    }  
  80.                }  

 在这个场景中,CREATE请求先到了queuedRequests中,然后nextPending会指向这个请求,但是此时还未commit,所以CommitProcessor会wait,直到该请求投票被通过,然后被commit。

此时leader收到了转发的请求,在LearnerHandler中

Java代码  收藏代码
  1. case Leader.REQUEST:                      
  2.             //反序列化  
  3.                     bb = ByteBuffer.wrap(qp.getData());  
  4.                     sessionId = bb.getLong();  
  5.                     cxid = bb.getInt();  
  6.                     type = bb.getInt();  
  7.                     bb = bb.slice();  
  8.                     Request si;  
  9.                     if(type == OpCode.sync){  
  10.                         si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());  
  11.                     } else {  
  12.                         si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());  
  13.                     }  
  14.                     si.setOwner(this);  
  15.             //提交给执行链处理  
  16.                     leader.zk.submitRequest(si);  
  17.                     break;  

 Leader端的执行链如下

PrepRequestProcessor在之前的文章已经分析过了,主要是根据请求类型,拼装不同的Request,这里是CreateRequest

接下来ProposalRequestProcessor执行,ProposalRequestProcessor主要是发起投票

Java代码  收藏代码
  1. public void processRequest(Request request) throws RequestProcessorException {  
  2.   
  3.     ......                  
  4.           
  5.         /* In the following IF-THEN-ELSE block, we process syncs on the leader.  
  6.          * If the sync is coming from a follower, then the follower 
  7.          * handler adds it to syncHandler. Otherwise, if it is a client of 
  8.          * the leader that issued the sync command, then syncHandler won't  
  9.          * contain the handler. In this case, we add it to syncHandler, and  
  10.          * call processRequest on the next processor. 
  11.          */  
  12.           
  13.         if(request instanceof LearnerSyncRequest){  
  14.             zks.getLeader().processSync((LearnerSyncRequest)request);  
  15.         } else {  
  16.         //先交给CommitProcessor处理下,此时还未提交  
  17.                 nextProcessor.processRequest(request);  
  18.             if (request.hdr != null) {  
  19.                 // We need to sync and get consensus on any transactions  
  20.                 try {  
  21.             //发起一个投票  
  22.                     zks.getLeader().propose(request);  
  23.                 } catch (XidRolloverException e) {  
  24.                     throw new RequestProcessorException(e.getMessage(), e);  
  25.                 }  
  26.         //先写日志  
  27.                 syncProcessor.processRequest(request);  
  28.             }  
  29.         }  
  30.     }  

 leader发起投票

Java代码  收藏代码
  1. public Proposal propose(Request request) throws XidRolloverException {  
  2.        .......  
  3.   
  4.        ByteArrayOutputStream baos = new ByteArrayOutputStream();  
  5.        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);  
  6.        try {  
  7.            request.hdr.serialize(boa, "hdr");  
  8.            if (request.txn != null) {  
  9.                request.txn.serialize(boa, "txn");  
  10.            }  
  11.            baos.close();  
  12.        } catch (IOException e) {  
  13.            LOG.warn("This really should be impossible", e);  
  14.        }  
  15. //投票包  
  16.        QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,   
  17.                baos.toByteArray(), null);  
  18.          
  19.        Proposal p = new Proposal();  
  20.        p.packet = pp;  
  21.        p.request = request;  
  22.        synchronized (this) {  
  23.            if (LOG.isDebugEnabled()) {  
  24.                LOG.debug("Proposing:: " + request);  
  25.            }  
  26.   
  27.            lastProposed = p.packet.getZxid();  
  28.     //添加到投票箱,后续leader收到选票时会检查这个投票箱里的投票是否满足条件  
  29.            outstandingProposals.put(lastProposed, p);  
  30.     //给每个follower发一个投票包,让他们投票  
  31.            sendPacket(pp);  
  32.        }  
  33.        return p;  
  34.    }  

 leader发完投票后,通过SyncRequestProcessor将事务写入日志文件,本地写成功后,投票成功。

SyncRequestProcessor之前文章已经分析过了,主要是将事务顺序写入日志文件。主要看之后的AckRequestProcessor

Java代码  收藏代码
  1. public void processRequest(Request request) {  
  2.     QuorumPeer self = leader.self;  
  3.     if(self != null)  
  4. 本地日志写成功后,认为自己成功了  
  5.         leader.processAck(self.getId(), request.zxid, null);  
  6.     else  
  7.         LOG.error("Null QuorumPeer");  
  8. }  

 leader的processAck方法比较关键,之前也有分析,这里再强调下

Java代码  收藏代码
  1.  synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {  
  2.      .......  
  3.      //当有选票进来时,先看看是哪个投票的  
  4.       Proposal p = outstandingProposals.get(zxid);  
  5.       if (p == null) {  
  6.           LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",  
  7.                   Long.toHexString(zxid), followerAddr);  
  8.           return;  
  9.       }  
  10.       //把票投上  
  11.       p.ackSet.add(sid);  
  12.       if (LOG.isDebugEnabled()) {  
  13.           LOG.debug("Count for zxid: 0x{} is {}",  
  14.                   Long.toHexString(zxid), p.ackSet.size());  
  15.       }  
  16. /如果满足投票结束条件,默认是半数server统一,则提交事务  
  17.       if (self.getQuorumVerifier().containsQuorum(p.ackSet)){               
  18.           if (zxid != lastCommitted+1) {  
  19.               LOG.warn("Commiting zxid 0x{} from {} not first!",  
  20.                       Long.toHexString(zxid), followerAddr);  
  21.               LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));  
  22.           }  
  23.           outstandingProposals.remove(zxid);  
  24.           if (p.request != null) {  
  25. //先添加到带提交队列  
  26.               toBeApplied.add(p);  
  27.           }  
  28.           // We don't commit the new leader proposal  
  29.           if ((zxid & 0xffffffffL) != 0) {  
  30.               if (p.request == null) {  
  31.                   LOG.warn("Going to commmit null request for proposal: {}", p);  
  32.               }  
  33. //事务提交,通知follower提交事务  
  34.               commit(zxid);  
  35. //通知Observer  
  36.               inform(p);  
  37. //leader commit事务  
  38. zk.commitProcessor.commit(p.request);  
  39.           ......  
  40.       }  
  41.   }  

 通知follower提交事务

Java代码  收藏代码
  1.    public void commit(long zxid) {  
  2.        synchronized(this){  
  3.            lastCommitted = zxid;  
  4.        }  
  5. //发送COMMIT包  
  6.        QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, nullnull);  
  7.        sendPacket(qp);  
  8.    }  

 此时Follower收到proposal包,follower中处理投票

Java代码  收藏代码
  1. case Leader.PROPOSAL:              
  2.             TxnHeader hdr = new TxnHeader();  
  3.             Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);  
  4.             if (hdr.getZxid() != lastQueued + 1) {  
  5.                 LOG.warn("Got zxid 0x"  
  6.                         + Long.toHexString(hdr.getZxid())  
  7.                         + " expected 0x"  
  8.                         + Long.toHexString(lastQueued + 1));  
  9.             }  
  10.             lastQueued = hdr.getZxid();  
  11.         //记录事务日志,成功后发送ACK包  
  12.             fzk.logRequest(hdr, txn);  
  13.             break;  

 

Java代码  收藏代码
  1. public void logRequest(TxnHeader hdr, Record txn) {  
  2.     Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),  
  3.             hdr.getType(), nullnull);  
  4.     request.hdr = hdr;  
  5.     request.txn = txn;  
  6.     request.zxid = hdr.getZxid();  
  7.     if ((request.zxid & 0xffffffffL) != 0) {  
  8.         pendingTxns.add(request);  
  9.     }  
  10. 是通过SyncRequestProcessor将事务写入本地文件,再发送ack包  
  11.     syncProcessor.processRequest(request);  
  12. }  

 日志写成功后,SendAckRequestProcessor发送ACK包

Java代码  收藏代码
  1.   public void processRequest(Request si) {  
  2.       if(si.type != OpCode.sync){  
  3. //ACK包  
  4.           QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,  
  5.               null);  
  6.           try {  
  7. //发送  
  8.               learner.writePacket(qp, false);  
  9.           } catch (IOException e) {  
  10.               LOG.warn("Closing connection to leader, exception during packet send", e);  
  11.               try {  
  12.                   if (!learner.sock.isClosed()) {  
  13.                       learner.sock.close();  
  14.                   }  
  15.               } catch (IOException e1) {  
  16.                   // Nothing to do, we are shutting things down, so an exception here is irrelevant  
  17.                   LOG.debug("Ignoring error closing the connection", e1);  
  18.               }  
  19.           }  
  20.       }  
  21.   }  

 此时,leader收到ack包,LearnerHandler线程中

Java代码  收藏代码
  1. case Leader.ACK:  
  2.                  if (this.learnerType == LearnerType.OBSERVER) {  
  3.                      if (LOG.isDebugEnabled()) {  
  4.                          LOG.debug("Received ACK from Observer  " + this.sid);  
  5.                      }  
  6.                  }  
  7.                  leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());  
  8.                  break;  

 还是调用了processAck方法,由于之前已经有了leader自己的投票,此时follower再投一票,3台机器的集群即认为投票成功,leader开始发送commit操作,也就是发送commit包给follower。

follower收到commit包

Java代码  收藏代码
  1. case Leader.COMMIT:  
  2.             fzk.commit(qp.getZxid());  
  3.             break;  
  4.   
  5.     public void commit(long zxid) {  
  6.         if (pendingTxns.size() == 0) {  
  7.             LOG.warn("Committing " + Long.toHexString(zxid)  
  8.                     + " without seeing txn");  
  9.             return;  
  10.         }  
  11.         long firstElementZxid = pendingTxns.element().zxid;  
  12.         if (firstElementZxid != zxid) {  
  13.             LOG.error("Committing zxid 0x" + Long.toHexString(zxid)  
  14.                     + " but next pending txn 0x"  
  15.                     + Long.toHexString(firstElementZxid));  
  16.             System.exit(12);  
  17.         }  
  18.     //从Pending队列中拿到待commit请求  
  19.         Request request = pendingTxns.remove();  
  20.     //commit这个请求,这个请求将交给FinalRequestProcessor处理  
  21.         commitProcessor.commit(request);  
  22.     }  

 Commit之后请求将交给FinalRequestProcessor处理,修改最后的内存db结构,如果是本机请求则写回响应,如果不是则不用写回响应

阅读(1089) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~