大名鼎鼎的Zookeeper是解决分布式问题的神器。小编最近简单阅读了代码,分享一下。有不对之处,还请大家指出。
整篇文章将分多个系列完成,因为涉及点比较多,很难在一片文章内搞定。关于zookeeper的使用场景,大家参考。api使用参考官网手。这里以最新的zookeeper3.4.5为例。
这个系列的第一篇来说说zookeeper server端的启动,以单机为例,分布式zookeeper将在后续专门分析。
单机版启动类ZooKeeperServerMain
-
protected void initializeAndRun(String[] args)
-
throws ConfigException, IOException
-
{
-
try {
-
ManagedUtil.registerLog4jMBeans();
-
} catch (JMException e) {
-
LOG.warn("Unable to register log4j JMX control", e);
-
}
-
-
ServerConfig config = new ServerConfig();
-
if (args.length == 1) {
-
config.parse(args[0]);
-
} else {
-
config.parse(args);
-
}
-
-
runFromConfig(config);
-
}
具体解析:
-
public void parse(String path) throws ConfigException {
-
QuorumPeerConfig config = new QuorumPeerConfig();
-
config.parse(path);
-
-
-
-
readFrom(config);
-
}
启动
-
public void runFromConfig(ServerConfig config) throws IOException {
-
LOG.info("Starting server");
-
try {
-
-
-
-
-
ZooKeeperServer zkServer = new ZooKeeperServer();
-
-
-
FileTxnSnapLog ftxn = new FileTxnSnapLog(new
-
File(config.dataLogDir), new File(config.dataDir));
-
zkServer.setTxnLogFactory(ftxn);
-
zkServer.setTickTime(config.tickTime);
-
zkServer.setMinSessionTimeout(config.minSessionTimeout);
-
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
-
-
cnxnFactory = ServerCnxnFactory.createFactory();
-
-
cnxnFactory.configure(config.getClientPortAddress(),
-
config.getMaxClientCnxns());
-
-
cnxnFactory.startup(zkServer);
-
-
cnxnFactory.join();
-
if (zkServer.isRunning()) {
-
zkServer.shutdown();
-
}
-
} catch (InterruptedException e) {
-
-
LOG.warn("Server interrupted", e);
-
}
-
}
具体startup流程:
-
public void startup(ZooKeeperServer zks) throws IOException,
-
InterruptedException {
-
-
start();
-
-
zks.startdata();
-
-
zks.startup();
-
setZooKeeperServer(zks);
-
}
具体恢复过程:
-
public void startdata()
-
throws IOException, InterruptedException {
-
-
if (zkDb == null) {
-
-
zkDb = new ZKDatabase(this.txnLogFactory);
-
}
-
if (!zkDb.isInitialized()) {
-
loadData();
-
}
-
}
DataTree用Map实现,key是节点名称,value是DataNode,DataNode从有parent指向父亲节点,有children指向所有孩子节点
-
public DataTree() {
-
-
-
nodes.put("", root);
-
nodes.put(rootZookeeper, root);
-
-
-
root.addChild(procChildZookeeper);
-
nodes.put(procZookeeper, procDataNode);
-
-
procDataNode.addChild(quotaChildZookeeper);
-
nodes.put(quotaZookeeper, quotaDataNode);
-
}
具体恢复数据
-
public void loadData() throws IOException, InterruptedException {
-
-
setZxid(zkDb.loadDataBase());
-
-
-
LinkedList<Long> deadSessions = new LinkedList<Long>();
-
for (Long session : zkDb.getSessions()) {
-
if (zkDb.getSessionWithTimeOuts().get(session) == null) {
-
deadSessions.add(session);
-
}
-
}
-
zkDb.setDataTreeInit(true);
-
for (long session : deadSessions) {
-
-
killSession(session, zkDb.getDataTreeLastProcessedZxid());
-
}
-
-
-
takeSnapshot();
-
}
load过程:
-
public long loadDataBase() throws IOException {
-
oad过程中,发起分布式提议,对于单机版,先不考虑
-
PlayBackListener listener=new PlayBackListener(){
-
public void onTxnLoaded(TxnHeader hdr,Record txn){
-
Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
-
null, null);
-
r.txn = txn;
-
r.hdr = hdr;
-
r.zxid = hdr.getZxid();
-
addCommittedProposal(r);
-
}
-
};
-
-
long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
-
initialized = true;
-
restore过程:
-
public long restore(DataTree dt, Map<Long, Integer> sessions,
-
PlayBackListener listener) throws IOException {
-
-
snapLog.deserialize(dt, sessions);
-
FileTxnLog txnLog = new FileTxnLog(dataDir);
-
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
-
long highestZxid = dt.lastProcessedZxid;
-
TxnHeader hdr;
-
-
while (true) {
-
-
-
hdr = itr.getHeader();
-
if (hdr == null) {
-
-
return dt.lastProcessedZxid;
-
}
-
if (hdr.getZxid() < highestZxid && highestZxid != 0) {
-
LOG.error(highestZxid + "(higestZxid) > "
-
+ hdr.getZxid() + "(next log) for type "
-
+ hdr.getType());
-
} else {
-
highestZxid = hdr.getZxid();
-
}
-
try {
-
processTransaction(hdr,dt,sessions, itr.getTxn());
-
} catch(KeeperException.NoNodeException e) {
-
throw new IOException("Failed to process transaction type: " +
-
hdr.getType() + " error: " + e.getMessage(), e);
-
}
-
listener.onTxnLoaded(hdr, itr.getTxn());
-
if (!itr.next())
-
break;
-
}
-
return highestZxid;
-
}
FileSnap恢复过程:
-
public long deserialize(DataTree dt, Map<Long, Integer> sessions)
-
throws IOException {
-
-
-
-
-
List<File> snapList = findNValidSnapshots(100);
-
if (snapList.size() == 0) {
-
return -1L;
-
}
-
-
File snap = null;
-
boolean foundValid = false;
-
for (int i = 0; i < snapList.size(); i++) {
-
snap = snapList.get(i);
-
InputStream snapIS = null;
-
CheckedInputStream crcIn = null;
-
try {
-
LOG.info("Reading snapshot " + snap);
-
snapIS = new BufferedInputStream(new FileInputStream(snap));
-
crcIn = new CheckedInputStream(snapIS, new Adler32());
-
InputArchive ia = BinaryInputArchive.getArchive(crcIn);
-
deserialize(dt,sessions, ia);
-
long checkSum = crcIn.getChecksum().getValue();
-
long val = ia.readLong("val");
-
if (val != checkSum) {
-
throw new IOException("CRC corruption in snapshot : " + snap);
-
}
-
foundValid = true;
-
break;
-
} catch(IOException e) {
-
LOG.warn("problem reading snap file " + snap, e);
-
} finally {
-
if (snapIS != null)
-
snapIS.close();
-
if (crcIn != null)
-
crcIn.close();
-
}
-
}
-
if (!foundValid) {
-
throw new IOException("Not able to find valid snapshots in " + snapDir);
-
}
-
-
dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot");
-
return dt.lastProcessedZxid;
-
}
单个事务处理:
-
public void processTransaction(TxnHeader hdr,DataTree dt,
-
Map<Long, Integer> sessions, Record txn)
-
throws KeeperException.NoNodeException {
-
ProcessTxnResult rc;
-
switch (hdr.getType()) {
-
创建session
-
case OpCode.createSession:
-
sessions.put(hdr.getClientId(),
-
((CreateSessionTxn) txn).getTimeOut());
-
......
-
-
rc = dt.processTxn(hdr, txn);
-
break;
-
case OpCode.closeSession:
-
sessions.remove(hdr.getClientId());
-
if (LOG.isTraceEnabled()) {
-
ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
-
"playLog --- close session in log: 0x"
-
+ Long.toHexString(hdr.getClientId()));
-
}
-
rc = dt.processTxn(hdr, txn);
-
break;
-
default:
-
rc = dt.processTxn(hdr, txn);
-
}
-
-
......
-
}
DataTree处理单个事务
-
public ProcessTxnResult processTxn(TxnHeader header, Record txn)
-
{
-
ProcessTxnResult rc = new ProcessTxnResult();
-
-
try {
-
rc.clientId = header.getClientId();
-
rc.cxid = header.getCxid();
-
rc.zxid = header.getZxid();
-
rc.type = header.getType();
-
rc.err = 0;
-
rc.multiResult = null;
-
switch (header.getType()) {
-
case OpCode.create:
-
CreateTxn createTxn = (CreateTxn) txn;
-
rc.path = createTxn.getPath();
-
createNode(
-
createTxn.getPath(),
-
createTxn.getData(),
-
createTxn.getAcl(),
-
createTxn.getEphemeral() ? header.getClientId() : 0,
-
createTxn.getParentCVersion(),
-
header.getZxid(), header.getTime());
-
break;
-
case OpCode.delete:
-
DeleteTxn deleteTxn = (DeleteTxn) txn;
-
rc.path = deleteTxn.getPath();
-
deleteNode(deleteTxn.getPath(), header.getZxid());
-
break;
-
case OpCode.setData:
-
SetDataTxn setDataTxn = (SetDataTxn) txn;
-
rc.path = setDataTxn.getPath();
-
rc.stat = setData(setDataTxn.getPath(), setDataTxn
-
.getData(), setDataTxn.getVersion(), header
-
.getZxid(), header.getTime());
-
break;
-
》 ......
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
if (rc.zxid > lastProcessedZxid) {
-
lastProcessedZxid = rc.zxid;
-
}
-
-
......
-
return rc;
-
}
以上就完成了server的数据恢复过程,LSM的精华所在。
接下来server启动sessionTracker线程和请求处理链
-
protected void setupRequestProcessors() {
-
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
-
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
-
finalProcessor);
-
((SyncRequestProcessor)syncProcessor).start();
-
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
-
((PrepRequestProcessor)firstProcessor).start();
-
}
核心IO线程
-
public void run() {
-
while (!ss.socket().isClosed()) {
-
try {
-
-
selector.select(1000);
-
Set<SelectionKey> selected;
-
synchronized (this) {
-
selected = selector.selectedKeys();
-
}
-
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
-
selected);
-
-
Collections.shuffle(selectedList);
-
for (SelectionKey k : selectedList) {
-
-
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
-
SocketChannel sc = ((ServerSocketChannel) k
-
.channel()).accept();
-
InetAddress ia = sc.socket().getInetAddress();
-
int cnxncount = getClientCnxnCount(ia);
-
-
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
-
LOG.warn("Too many connections from " + ia
-
+ " - max is " + maxClientCnxns );
-
sc.close();
-
} else {
-
LOG.info("Accepted socket connection from "
-
+ sc.socket().getRemoteSocketAddress());
-
-
sc.configureBlocking(false);
-
-
SelectionKey sk = sc.register(selector,
-
SelectionKey.OP_READ);
-
-
NIOServerCnxn cnxn = createConnection(sc, sk);
-
sk.attach(cnxn);
-
-
addCnxn(cnxn);
-
}
-
}
-
-
else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
-
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
-
c.doIO(k);
-
} else {
-
if (LOG.isDebugEnabled()) {
-
LOG.debug("Unexpected ops in select "
-
+ k.readyOps());
-
}
-
}
-
}
-
-
selected.clear();
-
} catch (RuntimeException e) {
-
LOG.warn("Ignoring unexpected runtime exception", e);
-
} catch (Exception e) {
-
LOG.warn("Ignoring exception", e);
-
}
-
}
-
closeAll();
-
LOG.info("NIOServerCnxn factory exited run method");
-
}
具体io处理过程,将在后续结合实例来讲解。
至此server启动完成,就等待client去连接了。server启动核心功能就是从snapshot和log文件中恢复datatree,其核心就是zxid,典型的LSM应用。
阅读(1082) | 评论(0) | 转发(0) |