在block操作底层的文件中,第一句话就是
-
/* blocked.c - generic support for blocking operations like BLPOP & WAIT.
-
*/
这边是提供了一些api,供外界调用
1. getTimeoutFromObjectOrReply(),在object当中获取timeout参数,具体如下
-
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
-
long long tval;
-
-
if (getLongLongFromObjectOrReply(c,object,&tval,
-
"timeout is not an integer or out of range") != C_OK)
-
return C_ERR;
-
-
if (tval < 0) {
-
addReplyError(c,"timeout is negative");
-
return C_ERR;
-
}
-
-
if (tval > 0) {
-
if (unit == UNIT_SECONDS) tval *= 1000;
-
tval += mstime();
-
}
-
*timeout = tval;
-
-
return C_OK;
-
}
2. blockClient(), 将一个client的CLIENT_BLOCKED位置位,并设置具体阻塞的类型
-
void blockClient(client *c, int btype) {
-
c->flags |= CLIENT_BLOCKED;
-
c->btype = btype;
-
server.bpop_blocked_clients++;
-
}
3. processUnblockedClients(),当一个客户端被解出阻塞之后,在event loop当中的beforeSleep()函数中会被调用,用于处理这些client的input buffer
-
void processUnblockedClients(void) {
-
listNode *ln;
-
client *c;
-
-
while (listLength(server.unblocked_clients)) {
-
ln = listFirst(server.unblocked_clients);
-
serverAssert(ln != NULL);
-
c = ln->value;
-
listDelNode(server.unblocked_clients,ln);
-
c->flags &= ~CLIENT_UNBLOCKED;
-
-
/* Process remaining data in the input buffer, unless the client
-
* is blocked again. Actually processInputBuffer() checks that the
-
* client is not blocked before to proceed, but things may change and
-
* the code is conceptually more correct this way. */
-
if (!(c->flags & CLIENT_BLOCKED)) {
-
if (c->querybuf && sdslen(c->querybuf) > 0) {
-
processInputBuffer(c);
-
}
-
}
-
}
-
}
4. unblockClient(), 根据当前client被阻塞的类型进行unblock,
-
void unblockClient(client *c) {
-
if (c->btype == BLOCKED_LIST) {
-
unblockClientWaitingData(c);
-
} else if (c->btype == BLOCKED_WAIT) {
-
unblockClientWaitingReplicas(c);
-
} else if (c->btype == BLOCKED_MODULE) {
-
unblockClientFromModule(c);
-
} else {
-
serverPanic("Unknown btype in unblockClient().");
-
}
-
/* Clear the flags, and put the client in the unblocked list so that
-
* we'll process new commands in its query buffer ASAP. */
-
c->flags &= ~CLIENT_BLOCKED;
-
c->btype = BLOCKED_NONE;
-
server.bpop_blocked_clients--;
-
/* The client may already be into the unblocked list because of a previous
-
* blocking operation, don't add back it into the list multiple times. */
-
if (!(c->flags & CLIENT_UNBLOCKED)) {
-
c->flags |= CLIENT_UNBLOCKED;
-
listAddNodeTail(server.unblocked_clients,c);
-
}
-
}
5. replyToBlockedClientTimedOut(),当被阻塞的client到达time out时间后,给其发送响应
-
void replyToBlockedClientTimedOut(client *c) {
-
if (c->btype == BLOCKED_LIST) {
-
addReply(c,shared.nullmultibulk);
-
} else if (c->btype == BLOCKED_WAIT) {
-
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
-
} else if (c->btype == BLOCKED_MODULE) {
-
moduleBlockedClientTimedOut(c);
-
} else {
-
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
-
}
-
}
6. disconnectAllBlockedClients(), 这个函数只有在一个节点由主节点变为从节点,的时候,会给客户端发送一个unblocked error,并且断开与客户端的连接
-
void disconnectAllBlockedClients(void) {
-
listNode *ln;
-
listIter li;
-
-
listRewind(server.clients,&li);
-
while((ln = listNext(&li))) {
-
client *c = listNodeValue(ln);
-
-
if (c->flags & CLIENT_BLOCKED) {
-
addReplySds(c,sdsnew(
-
"-UNBLOCKED force unblock from blocking operation, "
-
"instance state changed (master -> slave?)\r\n"));
-
unblockClient(c);
-
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
-
}
-
}
-
}
阅读(4909) | 评论(0) | 转发(0) |