redis事务实现了多个命令的批量执行,使得多个命令可以顺序执行而不受到干扰。事务的执行顺序如下:
redis> multi
redis> set k1 v1
redis> set k2 v2
redis> exec
# multi命令的实现
该命令对应的执行函数是multiCommand,实现如下:
void multiCommand(redisClient *c) {
if (c->flags & REDIS_MULTI) { // 若已经设置了REDIS_MULTI标记说明有事务正在改客户端执行,直接返回错误。
addReplyError(c,"MULTI calls can not be nested");
return;
}
c->flags |= REDIS_MULTI; // 若在该客户端没有事务执行,则在client结构的flags字段,设置REDIS_MULTI标记。
addReply(c,shared.ok); // 把成功状态返回客户端
}
小结:MULTI命令十分简单,就是在client结构上设置一个标志。
# 事务中的命令执行过程
当设置了multi后,客户端就打上了事务执行的标记。这样redis在处理后面的命令时,将会进行特殊的处理。实现过程可以查看下面的处理流程。redis所有的命令都必须通过processCommand()函数进行处理,所以此时若设置了REDIS_MULTI 标志位,将不会执行该命令,而是把命令放到执行队列中。
* 命令处理函数processCommand
int processCommand(redisClient *c) {
... ...
// 若client已经设置了REDIS_MULTI 标志,且本次执行的命令不是:EXEC,DISCARD,MULTI,WATCH这四者之一。
// 则执行queueMultiCommand函数,把命令保存下来。
/* Exec the command */
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c); // 不执行该命令,而是把命令保存到命令数组中。
addReply(c,shared.queued);
} else {
call(c,REDIS_CALL_FULL); // 若没有设置REDIS_MULTI标志,则执行命令对应的函数。
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return REDIS_OK;
}
* queueMultiCommand函数
// 该函数的功能是,添加一个新的命令道MULTI命令队列中。
/* Add a new command into the MULTI commands queue */
void queueMultiCommand(redisClient *c) {
multiCmd *mc;
int j;
// 这里其实不是什么队列,而是动态数组。在client结构中,就是一个multiState结构中的c->mstate指针。
// 就是直接使用zrealloc在内存中新添加一块内存,用来存放multiCmd结构。
c->mstate.commands = zrealloc(c->mstate.commands,
sizeof(multiCmd)*(c->mstate.count+1));
mc = c->mstate.commands+c->mstate.count;
mc->cmd = c->cmd; // 命令
mc->argc = c->argc; // 命令个数
mc->argv = zmalloc(sizeof(robj*)*c->argc); //命令参数值
memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
for (j = 0; j < c->argc; j++)
incrRefCount(mc->argv[j]);
c->mstate.count++;
}
# exec命令的实现
void execCommand(redisClient *c) {
int j;
robj **orig_argv;
int orig_argc;
struct redisCommand *orig_cmd;
int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
// 首先检查是否已经设置了REDIS_MULTI标志,若没有设置给客户端返回错误,退出命令。
// 若没有设置REDIS_MULTI这个标记说明,没有执行multi命令,因为在redis中只有在multi命令中才会设置该标记。
if (!(c->flags & REDIS_MULTI)) {
addReplyError(c,"EXEC without MULTI");
return;
}
// 运行到这里,说明已经执行了multi命令。
// 此时若设置了REDIS_DIRTY_CAS或REDIS_DIRTY_EXEC标记,则拒绝执行事务的命令。
// 说明:若watch 了某个key,就会把该key放到watched_keys字典中,在对该key做任何修改操作时,客户端都设置REDIS_DIRTY_CAS标志位。
/* Check if we need to abort the EXEC because:
* 1) Some WATCHed key was touched.
* 2) There was a previous error while queueing commands.
* A failed EXEC in the first case returns a multi bulk nil object
* (technically it is not an error but a special behavior), while
* in the second an EXECABORT error is returned. */
// 检查是否要放弃EXEC命令。若发生以下的情况,将会放弃EXEC命令:
1) 有些key,被WATCH。
2) 在执行事务过程中,前面有命令执行出错。
if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {
addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
shared.nullmultibulk);
discardTransaction(c);
goto handle_monitor;
}
/* Exec all the queued commands */
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
orig_argv = c->argv;
orig_argc = c->argc;
orig_cmd = c->cmd;
addReplyMultiBulkLen(c,c->mstate.count);
// 遍历事务命令表,一条一条执行对应函数
for (j = 0; j < c->mstate.count; j++) {
c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv;
c->cmd = c->mstate.commands[j].cmd;
/* Propagate a MULTI request once we encounter the first write op.
* This way we'll deliver the MULTI/..../EXEC block as a whole and
* both the AOF and the replication link will have the same consistency
* and atomicity guarantees. */
if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) {
execCommandPropagateMulti(c);
must_propagate = 1;
}
call(c,REDIS_CALL_FULL); // 执行命令
/* Commands may alter argc/argv, restore mstate. */
c->mstate.commands[j].argc = c->argc;
c->mstate.commands[j].argv = c->argv;
c->mstate.commands[j].cmd = c->cmd;
}
c->argv = orig_argv;
c->argc = orig_argc;
c->cmd = orig_cmd;
discardTransaction(c);
/* Make sure the EXEC command will be propagated as well if MULTI
* was already propagated. */
if (must_propagate) server.dirty++;
handle_monitor:
/* Send EXEC to clients waiting data from MONITOR. We do it here
* since the natural order of commands execution is actually:
* MUTLI, EXEC, ... commands inside transaction ...
* Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command
* table, and we do it here with correct ordering. */
if (listLength(server.monitors) && !server.loading)
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
# watch命令的实现
* 相关数据结构
每个redisClient 结构中都有一个watchkey的列表,当然也有该client端连接的db的指针,如下:
typedef struct redisClient {
... ...
redisDb *db;
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
... ...
}
// 每个watched_keys的alue值的结构如下,定义了key和对应的数据库的指针。
typedef struct watchedKey {
robj *key;
redisDb *db;
} watchedKey;
* 实现过程
watch命令对应的执行函数是watchCommand,实现如下:
函数调用关系如下:
watchCommand()
--> watchForKey()
* watchCommand函数
void watchCommand(redisClient *c) {
int j;
// 是否设置了REDIS_MULTI标志,若设置了该事务标志位,则不允许watch操作。
if (c->flags & REDIS_MULTI) {
addReplyError(c,"WATCH inside MULTI is not allowed");
return;
}
// 命令行
for (j = 1; j < c->argc; j++)
watchForKey(c,c->argv[j]);
addReply(c,shared.ok);
}
* watchForKey函数
/* Watch for the specified key */
void watchForKey(redisClient *c, robj *key) {
list *clients = NULL;
listIter li;
listNode *ln;
watchedKey *wk;
/* Check if we are already watching for this key */
// 为迭代器赋值,用来遍历client的watched_keys链表。
listRewind(c->watched_keys,&li);
// 通过迭代器遍历该客户端的watched_keys链表。
while((ln = listNext(&li))) {
// 获取watchedKey的指针,其实也就是链表节点中value的指针
wk = listNodeValue(ln);
// 若该key已经存在,且数据库也相同,则什么都不做,直接返回
if (wk->db == c->db && equalStringObjects(key,wk->key))
return; /* Key already watched */
}
/* This key is not already watched in this DB. Let's add it */
// 若运行到这里,说明key还没有被watch,要添加该key到相应的链表:
// 首先把该key添加到redisClient->watched_keys链表中
clients = dictFetchValue(c->db->watched_keys,key);
if (!clients) {
clients = listCreate();
dictAdd(c->db->watched_keys,key,clients);
incrRefCount(key);
}
listAddNodeTail(clients,c);
/* Add the new key to the list of keys watched by this client */
// 创建watchedKey结构,并把该key的值付给该节点
wk = zmalloc(sizeof(*wk));
wk->key = key;
wk->db = c->db;
incrRefCount(key);
// 把该key添加到c->watched_keys链表后
listAddNodeTail(c->watched_keys,wk);
}