上一篇,小编给大家介绍了zookeeper server端的启动。这一篇将来说一下client和server端是如何建立session的。通过官网的DataMonitor例子来说明。通过Session建立这个例子,可以大概知道client端和server端是如何处理请求的,之间是如何通信的。
官网Datamonitor的代码:
Executor
-
public class Executor implements Watcher, Runnable,
-
DataMonitor.DataMonitorListener {
-
String znode;
-
-
DataMonitor dm;
-
-
ZooKeeper zk;
-
-
String filename;
-
-
String exec[];
-
-
Process child;
-
-
-
public Executor(String hostPort, String znode, String filename,
-
String exec[]) throws KeeperException, IOException {
-
this.filename = filename;
-
this.exec = exec;
-
-
zk = new ZooKeeper(hostPort, 3000, this);
-
-
dm = new DataMonitor(zk, znode, null, this);
-
}
DataMonitor
-
public class DataMonitor implements Watcher, StatCallback {
-
-
.......
-
-
public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
-
DataMonitorListener listener) {
-
......
-
-
-
zk.exists(znode, true, this, null);
-
}
-
-
......
-
-
public void process(WatchedEvent event) {
-
String path = event.getPath();
-
-
} else {
-
if (path != null && path.equals(znode)) {
-
-
zk.exists(znode, true, this, null);
-
}
-
}
-
if (chainedWatcher != null) {
-
chainedWatcher.process(event);
-
}
-
}
-
-
public void processResult(int rc, String path, Object ctx, Stat stat) {
-
boolean exists;
-
switch (rc) {
-
case Code.Ok:
-
exists = true;
-
break;
-
case Code.NoNode:
-
exists = false;
-
break;
-
case Code.SessionExpired:
-
case Code.NoAuth:
-
dead = true;
-
listener.closing(rc);
-
return;
-
default:
-
-
zk.exists(znode, true, this, null);
-
return;
-
}
-
-
byte b[] = null;
-
if (exists) {
-
try {
-
b = zk.getData(znode, false, null);
-
} catch (KeeperException e) {
-
-
-
e.printStackTrace();
-
} catch (InterruptedException e) {
-
return;
-
}
-
}
-
-
if ((b == null && b != prevData)
-
|| (b != null && !Arrays.equals(prevData, b))) {
-
listener.exists(b);
-
prevData = b;
-
}
-
}
-
}
从这个例子出发,我们来分析下zookeeper的第一步session是如何建立的,主要就是Zookeeper类的构造。
Zookeeper构造
-
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
-
boolean canBeReadOnly)
-
throws IOException
-
{
-
LOG.info("Initiating client connection, connectString=" + connectString
-
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
-
-
watchManager.defaultWatcher = watcher;
-
-
ConnectStringParser connectStringParser = new ConnectStringParser(
-
connectString);
-
-
HostProvider hostProvider = new StaticHostProvider(
-
connectStringParser.getServerAddresses());
-
-
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
-
hostProvider, sessionTimeout, this, watchManager,
-
getClientCnxnSocket(), canBeReadOnly);
-
-
cnxn.start();
-
}
初始化连接
-
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
-
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
-
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
-
this.zooKeeper = zooKeeper;
-
this.watcher = watcher;
-
-
this.sessionId = sessionId;
-
this.sessionPasswd = sessionPasswd;
-
-
this.sessionTimeout = sessionTimeout;
-
-
this.hostProvider = hostProvider;
-
this.chrootPath = chrootPath;
-
-
connectTimeout = sessionTimeout / hostProvider.size();
-
-
readTimeout = sessionTimeout * 2 / 3;
-
readOnly = canBeReadOnly;
-
-
sendThread = new SendThread(clientCnxnSocket);
-
eventThread = new EventThread();
-
-
}
SendThread核心流程
-
public void run() {
-
.....
-
while (state.isAlive()) {
-
try {
-
-
if (!clientCnxnSocket.isConnected()) {
-
......
-
-
startConnect();
-
clientCnxnSocket.updateLastSendAndHeard();
-
}
-
-
if (state.isConnected()) {
-
......
-
-
to = readTimeout - clientCnxnSocket.getIdleRecv();
-
} else {
-
-
to = connectTimeout - clientCnxnSocket.getIdleRecv();
-
}
-
-
if (to <= 0) {
-
throw new SessionTimeoutException(
-
"Client session timed out, have not heard from server in "
-
+ clientCnxnSocket.getIdleRecv() + "ms"
-
+ " for sessionid 0x"
-
+ Long.toHexString(sessionId));
-
}
-
-
if (state.isConnected()) {
-
int timeToNextPing = readTimeout / 2
-
- clientCnxnSocket.getIdleSend();
-
if (timeToNextPing <= 0) {
-
sendPing();
-
clientCnxnSocket.updateLastSend();
-
} else {
-
if (timeToNextPing < to) {
-
to = timeToNextPing;
-
}
-
}
-
}
-
-
-
-
if (state == States.CONNECTEDREADONLY) {
-
long now = System.currentTimeMillis();
-
int idlePingRwServer = (int) (now - lastPingRwServer);
-
if (idlePingRwServer >= pingRwTimeout) {
-
lastPingRwServer = now;
-
idlePingRwServer = 0;
-
pingRwTimeout =
-
Math.min(2*pingRwTimeout, maxPingRwTimeout);
-
-
pingRwServer();
-
}
-
to = Math.min(to, pingRwTimeout - idlePingRwServer);
-
}
-
-
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
-
} catch (Throwable e) {
-
if (closing) {
-
if (LOG.isDebugEnabled()) {
-
-
LOG.debug("An exception was thrown while closing send thread for session 0x"
-
+ Long.toHexString(getSessionId())
-
+ " : " + e.getMessage());
-
}
-
break;
-
} else {
-
-
if (e instanceof SessionExpiredException) {
-
LOG.info(e.getMessage() + ", closing socket connection");
-
} else if (e instanceof SessionTimeoutException) {
-
LOG.info(e.getMessage() + RETRY_CONN_MSG);
-
} else if (e instanceof EndOfStreamException) {
-
LOG.info(e.getMessage() + RETRY_CONN_MSG);
-
} else if (e instanceof RWServerFoundException) {
-
LOG.info(e.getMessage());
-
} else {
-
......
-
}
-
-
cleanup();
-
if (state.isAlive()) {
-
eventThread.queueEvent(new WatchedEvent(
-
Event.EventType.None,
-
Event.KeeperState.Disconnected,
-
null));
-
}
-
clientCnxnSocket.updateNow();
-
clientCnxnSocket.updateLastSendAndHeard();
-
}
-
}
-
}
-
......
-
}
具体过程
-
private void startConnect() throws IOException {
-
-
state = States.CONNECTING;
-
-
InetSocketAddress addr;
-
if (rwServerAddress != null) {
-
addr = rwServerAddress;
-
rwServerAddress = null;
-
} else {
-
addr = hostProvider.next(1000);
-
}
-
-
setName(getName().replaceAll("\\(.*\\)",
-
"(" + addr.getHostName() + ":" + addr.getPort() + ")"));
-
......
-
-
clientCnxnSocket.connect(addr);
-
}
具体connect
-
void connect(InetSocketAddress addr) throws IOException {
-
-
SocketChannel sock = createSock();
-
try {
-
-
registerAndConnect(sock, addr);
-
} catch (IOException e) {
-
LOG.error("Unable to open socket to " + addr);
-
sock.close();
-
throw e;
-
}
-
-
initialized = false;
-
-
-
-
-
-
lenBuffer.clear();
-
incomingBuffer = lenBuffer;
-
}
registerAndConnect过程:
-
void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
-
throws IOException {
-
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
-
试连接
-
boolean immediateConnect = sock.connect(addr);
-
果网络情况很好,立马可以连上,则发送ConnectRequest请求,请求和server建立session
-
if (immediateConnect) {
-
sendThread.primeConnection();
-
}
-
}
primeConnection代表连上之后的操作,主要是建立session:
-
void primeConnection() throws IOException {
-
......
-
-
long sessId = (seenRwServerBefore) ? sessionId : 0;
-
-
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
-
sessionTimeout, sessId, sessionPasswd);
-
synchronized (outgoingQueue) {
-
......
-
-
outgoingQueue.addFirst(new Packet(null, null, conReq,
-
null, null, readOnly));
-
}
-
-
clientCnxnSocket.enableReadWriteOnly();
-
.....
-
}
此时ConnectRequest请求已经添加到发送队列,SendThread进入doTransport处理流程:
-
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
-
ClientCnxn cnxn)
-
throws IOException, InterruptedException {
-
-
selector.select(waitTimeOut);
-
Set<SelectionKey> selected;
-
synchronized (this) {
-
selected = selector.selectedKeys();
-
}
-
-
-
-
updateNow();
-
for (SelectionKey k : selected) {
-
SocketChannel sc = ((SocketChannel) k.channel());
-
-
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
-
if (sc.finishConnect()) {
-
updateLastSendAndHeard();
-
sendThread.primeConnection();
-
}
-
}
-
-
else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
-
doIO(pendingQueue, outgoingQueue, cnxn);
-
}
-
}
-
if (sendThread.getZkState().isConnected()) {
-
synchronized(outgoingQueue) {
-
if (findSendablePacket(outgoingQueue,
-
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
-
enableWrite();
-
}
-
}
-
}
-
selected.clear();
-
}
假设我们此时连接已经好了,WRITE事件ok,则SendThread开始发送我们的ConnectRequest
-
if (sockKey.isWritable()) {
-
-
synchronized(outgoingQueue) {
-
-
Packet p = findSendablePacket(outgoingQueue,
-
cnxn.sendThread.clientTunneledAuthenticationInProgress());
-
-
if (p != null) {
-
-
updateLastSend();
-
-
-
if (p.bb == null) {
-
-
if ((p.requestHeader != null) &&
-
(p.requestHeader.getType() != OpCode.ping) &&
-
(p.requestHeader.getType() != OpCode.auth)) {
-
p.requestHeader.setXid(cnxn.getXid());
-
}
-
-
p.createBB();
-
}
-
-
sock.write(p.bb);
-
-
if (!p.bb.hasRemaining()) {
-
-
sentCount++;
-
-
outgoingQueue.removeFirstOccurrence(p);
-
-
if (p.requestHeader != null
-
&& p.requestHeader.getType() != OpCode.ping
-
&& p.requestHeader.getType() != OpCode.auth) {
-
synchronized (pendingQueue) {
-
pendingQueue.add(p);
-
}
-
}
-
}
-
}
-
-
if (outgoingQueue.isEmpty()) {
-
-
-
-
-
-
disableWrite();
-
} else {
-
-
enableWrite();
-
}
-
}
-
}
具体序列化方式,ConnRequest的packet没有协议头
-
public void createBB() {
-
try {
-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
-
-
boa.writeInt(-1, "len");
-
-
if (requestHeader != null) {
-
requestHeader.serialize(boa, "header");
-
}
-
-
if (request instanceof ConnectRequest) {
-
request.serialize(boa, "connect");
-
-
boa.writeBool(readOnly, "readOnly");
-
} else if (request != null) {
-
request.serialize(boa, "request");
-
}
-
baos.close();
-
-
this.bb = ByteBuffer.wrap(baos.toByteArray());
-
-
this.bb.putInt(this.bb.capacity() - 4);
-
-
this.bb.rewind();
-
} catch (IOException e) {
-
LOG.warn("Ignoring unexpected exception", e);
-
}
-
}
这里我们的第一个Packet是ConnReq,它构造的packet没有header,所以发完就直接丢掉了,但是SendThread还需要监听server端的返回,以确认连上,并进行session的初始化。那到这里client端等待server端返回了,我们看看server是怎么处理ConnReq请求的。
假设server的selector线程已经就位,则selector会拿到一个读就位的事件,也就是client的connReq请求
-
else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
-
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
-
c.doIO(k);
if (k.isReadable()) {
-
-
int rc = sock.read(incomingBuffer);
-
if (rc < 0) {
-
throw new EndOfStreamException(
-
"Unable to read additional data from client sessionid 0x"
-
+ Long.toHexString(sessionId)
-
+ ", likely client has closed socket");
-
}
-
-
if (incomingBuffer.remaining() == 0) {
-
boolean isPayload;
-
-
if (incomingBuffer == lenBuffer) {
-
incomingBuffer.flip();
-
-
isPayload = readLength(k);
-
-
incomingBuffer.clear();
-
} else {
-
-
isPayload = true;
-
}
-
-
if (isPayload) {
-
readPayload();
-
}
-
else {
-
-
-
return;
-
}
-
}
-
}
具体的后续数据流程:
-
-
private void readPayload() throws IOException, InterruptedException {
-
if (incomingBuffer.remaining() != 0) {
-
-
int rc = sock.read(incomingBuffer);
-
if (rc < 0) {
-
throw new EndOfStreamException(
-
"Unable to read additional data from client sessionid 0x"
-
+ Long.toHexString(sessionId)
-
+ ", likely client has closed socket");
-
}
-
}
-
-
if (incomingBuffer.remaining() == 0) {
-
-
packetReceived();
-
-
incomingBuffer.flip();
-
-
if (!initialized) {
-
readConnectRequest();
-
}
-
-
else {
-
readRequest();
-
}
-
-
lenBuffer.clear();
-
incomingBuffer = lenBuffer;
-
}
-
}
我们现在发的ConnReq已经被server端接受了,处理如下
-
private void readConnectRequest() throws IOException, InterruptedException {
-
if (zkServer == null) {
-
throw new IOException("ZooKeeperServer not running");
-
}
-
-
zkServer.processConnectRequest(this, incomingBuffer);
-
-
initialized = true;
-
}
具体处理:
-
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
-
-
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
-
ConnectRequest connReq = new ConnectRequest();
-
connReq.deserialize(bia, "connect");
-
...
-
boolean readOnly = false;
-
try {
-
-
readOnly = bia.readBool("readOnly");
-
cnxn.isOldClient = false;
-
} catch (IOException e) {
-
....
-
}
-
...
-
-
int sessionTimeout = connReq.getTimeOut();
-
byte passwd[] = connReq.getPasswd();
-
int minSessionTimeout = getMinSessionTimeout();
-
if (sessionTimeout < minSessionTimeout) {
-
sessionTimeout = minSessionTimeout;
-
}
-
int maxSessionTimeout = getMaxSessionTimeout();
-
if (sessionTimeout > maxSessionTimeout) {
-
sessionTimeout = maxSessionTimeout;
-
}
-
cnxn.setSessionTimeout(sessionTimeout);
-
-
-
-
cnxn.disableRecv();
-
-
long sessionId = connReq.getSessionId();
-
-
if (sessionId != 0) {
-
long clientSessionId = connReq.getSessionId();
-
LOG.info("Client attempting to renew session 0x"
-
+ Long.toHexString(clientSessionId)
-
+ " at " + cnxn.getRemoteSocketAddress());
-
serverCnxnFactory.closeSession(sessionId);
-
cnxn.setSessionId(sessionId);
-
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
-
} else {
-
LOG.info("Client attempting to establish new session at "
-
+ cnxn.getRemoteSocketAddress());
-
-
createSession(cnxn, passwd, sessionTimeout);
-
}
-
}
创建新session如下:
-
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
-
-
long sessionId = sessionTracker.createSession(timeout);
-
-
Random r = new Random(sessionId ^ superSecret);
-
r.nextBytes(passwd);
-
ByteBuffer to = ByteBuffer.allocate(4);
-
to.putInt(timeout);
-
-
cnxn.setSessionId(sessionId);
-
-
submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
-
return sessionId;
-
}
提交过程:
-
private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
-
int xid, ByteBuffer bb, List<Id> authInfo) {
-
Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
-
submitRequest(si);
-
}
Server端开始执行链,参数是内部的Request对象,此时type是CREATE_SESSION:
-
public void submitRequest(Request si) {
-
......
-
try {
-
touch(si.cnxn);
-
boolean validpacket = Request.isValid(si.type);
-
if (validpacket) {
-
-
firstProcessor.processRequest(si);
-
if (si.cnxn != null) {
-
incInProcess();
-
}
-
......
-
}
第一个processor PrepRequestProcessor执行:
-
public void run() {
-
try {
-
while (true) {
-
Request request = submittedRequests.take();
-
......
-
pRequest(request);
-
}
-
......
-
}
对于CREATE_SESSION具体处理:
-
-
case OpCode.createSession:
-
case OpCode.closeSession:
-
-
pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
-
break;
-
......
-
request.zxid = zks.getZxid();
-
-
nextProcessor.processRequest(request);
-
case OpCode.createSession:
-
-
request.request.rewind();
-
int to = request.request.getInt();
-
-
request.txn = new CreateSessionTxn(to);
-
request.request.rewind();
-
zks.sessionTracker.addSession(request.sessionId, to);
-
zks.setOwner(request.sessionId, request.getOwner());
-
break;
从上可见,PrepRequestProcessor主要是负责组装Request的header和txn参数的,相当于是预处理
第二个Processor SyncRequestProcessor处理:
-
int randRoll = r.nextInt(snapCount/2);
-
while (true) {
-
Request si = null;
-
-
if (toFlush.isEmpty()) {
-
si = queuedRequests.take();
-
}
-
-
-
else {
-
si = queuedRequests.poll();
-
-
if (si == null) {
-
flush(toFlush);
-
continue;
-
}
-
}
-
if (si == requestOfDeath) {
-
break;
-
}
-
if (si != null) {
-
-
-
if (zks.getZKDatabase().append(si)) {
-
-
logCount++;
-
-
-
-
if (logCount > (snapCount / 2 + randRoll)) {
-
randRoll = r.nextInt(snapCount/2);
-
-
-
zks.getZKDatabase().rollLog();
-
-
-
if (snapInProcess != null && snapInProcess.isAlive()) {
-
LOG.warn("Too busy to snap, skipping");
-
} else {
-
snapInProcess = new Thread("Snapshot Thread") {
-
public void run() {
-
try {
-
zks.takeSnapshot();
-
} catch(Exception e) {
-
LOG.warn("Unexpected exception", e);
-
}
-
}
-
};
-
snapInProcess.start();
-
}
-
logCount = 0;
-
}
-
}
-
-
else if (toFlush.isEmpty()) {
-
-
-
-
-
nextProcessor.processRequest(si);
-
if (nextProcessor instanceof Flushable) {
-
((Flushable)nextProcessor).flush();
-
}
-
continue;
-
}
-
-
toFlush.add(si);
-
-
if (toFlush.size() > 1000) {
-
flush(toFlush);
-
}
-
}
具体的flush处理:
-
private void flush(LinkedList<Request> toFlush)
-
throws IOException, RequestProcessorException
-
{
-
if (toFlush.isEmpty())
-
return;
-
-
zks.getZKDatabase().commit();
-
-
while (!toFlush.isEmpty()) {
-
Request i = toFlush.remove();
-
-
nextProcessor.processRequest(i);
-
}
-
if (nextProcessor instanceof Flushable) {
-
((Flushable)nextProcessor).flush();
-
}
-
}
我们假设现在系统压力小,我们的ConnectionRequest可以被立马处理了,执行FinalRequestProcessor:
-
if (request.hdr != null) {
-
TxnHeader hdr = request.hdr;
-
Record txn = request.txn;
-
-
rc = zks.processTxn(hdr, txn);
-
}
具体处理:
-
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
-
ProcessTxnResult rc;
-
int opCode = hdr.getType();
-
long sessionId = hdr.getClientId();
-
-
rc = getZKDatabase().processTxn(hdr, txn);
-
-
if (opCode == OpCode.createSession) {
-
if (txn instanceof CreateSessionTxn) {
-
CreateSessionTxn cst = (CreateSessionTxn) txn;
-
sessionTracker.addSession(sessionId, cst
-
.getTimeOut());
-
......
-
return rc;
-
}
public ProcessTxnResult processTxn(TxnHeader header, Record txn)
-
{
-
-
ProcessTxnResult rc = new ProcessTxnResult();
-
-
try {
-
rc.clientId = header.getClientId();
-
rc.cxid = header.getCxid();
-
rc.zxid = header.getZxid();
-
rc.type = header.getType();
-
rc.err = 0;
-
rc.multiResult = null;
-
......
在FinalRequestProcessor拿到database的处理结果,继续处理:
-
case OpCode.createSession: {
-
zks.serverStats().updateLatency(request.createTime);
-
-
lastOp = "SESS";
-
cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
-
request.createTime, System.currentTimeMillis());
-
-
zks.finishSessionInit(request.cnxn, true);
-
return;
-
}
public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
-
......
-
-
ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
-
: 0, valid ? cnxn.getSessionId() : 0,
-
-
valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
-
-
bos.writeInt(-1, "len");
-
-
rsp.serialize(bos, "connect");
-
if (!cnxn.isOldClient) {
-
bos.writeBool(
-
this instanceof ReadOnlyZooKeeperServer, "readOnly");
-
}
-
baos.close();
-
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
-
-
bb.putInt(bb.remaining() - 4).rewind();
-
-
cnxn.sendBuffer(bb);
-
-
......
-
-
cnxn.enableRecv();
-
......
-
}
具体写回,通讯层NIOServerCnxn:
-
public void sendBuffer(ByteBuffer bb) {
-
try {
-
if (bb != ServerCnxnFactory.closeConn) {
-
-
-
-
-
if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {
-
try {
-
-
sock.write(bb);
-
} catch (IOException e) {
-
-
}
-
}
-
-
-
if (bb.remaining() == 0) {
-
packetSent();
-
return;
-
}
-
}
-
-
synchronized(this.factory){
-
sk.selector().wakeup();
-
if (LOG.isTraceEnabled()) {
-
LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
-
+ " is valid: " + sk.isValid());
-
}
-
outgoingBuffers.add(bb);
-
if (sk.isValid()) {
-
sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
-
}
-
}
-
-
.......
-
}
到这里server端已经执行完毕了,返回给client一个ConnectResponse对象,client端的SendThread收到server端的Response处理:
-
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
-
throws InterruptedException, IOException {
-
SocketChannel sock = (SocketChannel) sockKey.channel();
-
if (sock == null) {
-
throw new IOException("Socket is null!");
-
}
-
if (sockKey.isReadable()) {
-
-
int rc = sock.read(incomingBuffer);
-
if (rc < 0) {
-
throw new EndOfStreamException(
-
"Unable to read additional data from server sessionid 0x"
-
+ Long.toHexString(sessionId)
-
+ ", likely server has closed socket");
-
}
-
-
if (!incomingBuffer.hasRemaining()) {
-
incomingBuffer.flip();
-
-
if (incomingBuffer == lenBuffer) {
-
recvCount++;
-
-
readLength();
-
}
-
-
else if (!initialized) {
-
-
readConnectResult();
-
-
enableRead();
-
-
if (findSendablePacket(outgoingQueue,
-
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
-
-
-
enableWrite();
-
}
-
-
lenBuffer.clear();
-
incomingBuffer = lenBuffer;
-
updateLastHeard();
-
-
initialized = true;
-
} else {
-
sendThread.readResponse(incomingBuffer);
-
lenBuffer.clear();
-
incomingBuffer = lenBuffer;
-
updateLastHeard();
-
}
-
}
-
}
具体的读取:
-
void readConnectResult() throws IOException {
-
.....
-
-
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
-
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
-
ConnectResponse conRsp = new ConnectResponse();
-
conRsp.deserialize(bbia, "connect");
-
-
-
boolean isRO = false;
-
try {
-
isRO = bbia.readBool("readOnly");
-
} catch (IOException e) {
-
-
-
LOG.warn("Connected to an old server; r-o mode will be unavailable");
-
}
-
-
this.sessionId = conRsp.getSessionId();
-
-
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
-
conRsp.getPasswd(), isRO);
-
}
后续处理如下:
-
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
-
byte[] _sessionPasswd, boolean isRO) throws IOException {
-
negotiatedSessionTimeout = _negotiatedSessionTimeout;
-
......
-
-
readTimeout = negotiatedSessionTimeout * 2 / 3;
-
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
-
hostProvider.onConnected();
-
sessionId = _sessionId;
-
sessionPasswd = _sessionPasswd;
-
-
state = (isRO) ?
-
States.CONNECTEDREADONLY : States.CONNECTED;
-
seenRwServerBefore |= !isRO;
-
LOG.info("Session establishment complete on server "
-
+ clientCnxnSocket.getRemoteSocketAddress()
-
+ ", sessionid = 0x" + Long.toHexString(sessionId)
-
+ ", negotiated timeout = " + negotiatedSessionTimeout
-
+ (isRO ? " (READ-ONLY mode)" : ""));
-
-
KeeperState eventState = (isRO) ?
-
KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
-
eventThread.queueEvent(new WatchedEvent(
-
Watcher.Event.EventType.None,
-
eventState, null));
-
}
EventThread处理:
-
public void queueEvent(WatchedEvent event) {
-
if (event.getType() == EventType.None
-
&& sessionState == event.getState()) {
-
return;
-
}
-
-
sessionState = event.getState();
-
-
-
-
WatcherSetEventPair pair = new WatcherSetEventPair(
-
watcher.materialize(event.getState(), event.getType(),
-
event.getPath()),
-
event);
-
-
-
waitingEvents.add(pair);
-
}
EventThread主线程
-
public void run() {
-
try {
-
isRunning = true;
-
while (true) {
-
-
Object event = waitingEvents.take();
-
if (event == eventOfDeath) {
-
wasKilled = true;
-
} else {
-
-
processEvent(event);
-
}
-
if (wasKilled)
-
synchronized (waitingEvents) {
-
if (waitingEvents.isEmpty()) {
-
isRunning = false;
-
break;
-
}
-
}
-
}
-
} catch (InterruptedException e) {
-
LOG.error("Event thread exiting due to interruption", e);
-
}
-
-
LOG.info("EventThread shut down");
-
}
具体处理:
-
if (event instanceof WatcherSetEventPair) {
-
-
WatcherSetEventPair pair = (WatcherSetEventPair) event;
-
for (Watcher watcher : pair.watchers) {
-
try {
-
watcher.process(pair.event);
-
} catch (Throwable t) {
-
LOG.error("Error while calling watcher ", t);
-
}
-
}
-
}
在我们的例子里,会调用Executor这个watcher的process方法,又代理给了DataMonitor,对于SyncConnected啥事不干
-
case SyncConnected:
-
-
-
-
-
break;
好了,到这里client和server端session已经建立,可以进行后续的业务处理了。通过这个例子,我们讲解了client和server是如何交互数据,后续的请求比如create,get,set,delete都是类似流程。
Session建立核心流程:
1.创建TCP连接
2.client发送ConnectRequest包
3.server收到ConnectRequest包,创建session,将server端的sessionId返回给client
4.client收到server的响应,触发相应SyncConnected状态的事件
5.client端watcher消费事件
阅读(1110) | 评论(0) | 转发(0) |