Chinaunix首页 | 论坛 | 博客
  • 博客访问: 836682
  • 博文数量: 91
  • 博客积分: 2544
  • 博客等级: 少校
  • 技术积分: 1885
  • 用 户 组: 普通用户
  • 注册时间: 2006-12-12 09:08
文章存档

2016年(10)

2014年(2)

2013年(4)

2012年(23)

2011年(23)

2010年(13)

2009年(14)

2007年(2)

分类: LINUX

2016-05-10 06:55:30

redis事务实现了多个命令的批量执行,使得多个命令可以顺序执行而不受到干扰。事务的执行顺序如下:
redis> multi
redis> set k1 v1
redis> set k2 v2
redis> exec


# multi命令的实现
该命令对应的执行函数是multiCommand,实现如下:
void multiCommand(redisClient *c) {
    if (c->flags & REDIS_MULTI) {    // 若已经设置了REDIS_MULTI标记说明有事务正在改客户端执行,直接返回错误。
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }   
    c->flags |= REDIS_MULTI;    // 若在该客户端没有事务执行,则在client结构的flags字段,设置REDIS_MULTI标记。
    addReply(c,shared.ok);  // 把成功状态返回客户端
}
小结:MULTI命令十分简单,就是在client结构上设置一个标志。


# 事务中的命令执行过程
当设置了multi后,客户端就打上了事务执行的标记。这样redis在处理后面的命令时,将会进行特殊的处理。实现过程可以查看下面的处理流程。redis所有的命令都必须通过processCommand()函数进行处理,所以此时若设置了REDIS_MULTI 标志位,将不会执行该命令,而是把命令放到执行队列中。

* 命令处理函数processCommand
int processCommand(redisClient *c) {
   ... ... 
    // 若client已经设置了REDIS_MULTI 标志,且本次执行的命令不是:EXEC,DISCARD,MULTI,WATCH这四者之一。
    // 则执行queueMultiCommand函数,把命令保存下来。
   /* Exec the command */
    if (c->flags & REDIS_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {    
        queueMultiCommand(c);    // 不执行该命令,而是把命令保存到命令数组中。
        addReply(c,shared.queued);
    } else {
        call(c,REDIS_CALL_FULL);    // 若没有设置REDIS_MULTI标志,则执行命令对应的函数。
        if (listLength(server.ready_keys))
            handleClientsBlockedOnLists();
    }    
    return REDIS_OK;
}

* queueMultiCommand函数
// 该函数的功能是,添加一个新的命令道MULTI命令队列中。
/* Add a new command into the MULTI commands queue */
void queueMultiCommand(redisClient *c) {
    multiCmd *mc;
    int j;

    // 这里其实不是什么队列,而是动态数组。在client结构中,就是一个multiState结构中的c->mstate指针。
    // 就是直接使用zrealloc在内存中新添加一块内存,用来存放multiCmd结构。
    c->mstate.commands = zrealloc(c->mstate.commands,
            sizeof(multiCmd)*(c->mstate.count+1));
    mc = c->mstate.commands+c->mstate.count;
    mc->cmd = c->cmd;    // 命令
    mc->argc = c->argc;    // 命令个数
    mc->argv = zmalloc(sizeof(robj*)*c->argc); //命令参数值
    memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
    for (j = 0; j < c->argc; j++)
        incrRefCount(mc->argv[j]);
    c->mstate.count++;
}

# exec命令的实现

void execCommand(redisClient *c) {
    int j;
    robj **orig_argv;
    int orig_argc;
    struct redisCommand *orig_cmd;
    int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */

    // 首先检查是否已经设置了REDIS_MULTI标志,若没有设置给客户端返回错误,退出命令。
    // 若没有设置REDIS_MULTI这个标记说明,没有执行multi命令,因为在redis中只有在multi命令中才会设置该标记。
    if (!(c->flags & REDIS_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }

    // 运行到这里,说明已经执行了multi命令。
    // 此时若设置了REDIS_DIRTY_CAS或REDIS_DIRTY_EXEC标记,则拒绝执行事务的命令。
    // 说明:若watch 了某个key,就会把该key放到watched_keys字典中,在对该key做任何修改操作时,客户端都设置REDIS_DIRTY_CAS标志位。
    /* Check if we need to abort the EXEC because:
     * 1) Some WATCHed key was touched.
     * 2) There was a previous error while queueing commands.
     * A failed EXEC in the first case returns a multi bulk nil object
     * (technically it is not an error but a special behavior), while
     * in the second an EXECABORT error is returned. */

    // 检查是否要放弃EXEC命令。若发生以下的情况,将会放弃EXEC命令:
    1) 有些key,被WATCH。
    2) 在执行事务过程中,前面有命令执行出错。
    if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {
        addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
                                                  shared.nullmultibulk);
        discardTransaction(c);
        goto handle_monitor;
    }

    /* Exec all the queued commands */
    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
    orig_argv = c->argv;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    addReplyMultiBulkLen(c,c->mstate.count);
    
    // 遍历事务命令表,一条一条执行对应函数
    for (j = 0; j < c->mstate.count; j++) {
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->cmd = c->mstate.commands[j].cmd;

        /* Propagate a MULTI request once we encounter the first write op.
         * This way we'll deliver the MULTI/..../EXEC block as a whole and
         * both the AOF and the replication link will have the same consistency
         * and atomicity guarantees. */
        if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) {
            execCommandPropagateMulti(c);
            must_propagate = 1;
        }

        call(c,REDIS_CALL_FULL);  // 执行命令

        /* Commands may alter argc/argv, restore mstate. */
        c->mstate.commands[j].argc = c->argc;
        c->mstate.commands[j].argv = c->argv;
        c->mstate.commands[j].cmd = c->cmd;
    }
    c->argv = orig_argv;
    c->argc = orig_argc;
    c->cmd = orig_cmd;
    discardTransaction(c);
    /* Make sure the EXEC command will be propagated as well if MULTI
     * was already propagated. */
    if (must_propagate) server.dirty++;

handle_monitor:
    /* Send EXEC to clients waiting data from MONITOR. We do it here
     * since the natural order of commands execution is actually:
     * MUTLI, EXEC, ... commands inside transaction ...
     * Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command
     * table, and we do it here with correct ordering. */
    if (listLength(server.monitors) && !server.loading)
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}


# watch命令的实现

* 相关数据结构
每个redisClient 结构中都有一个watchkey的列表,当然也有该client端连接的db的指针,如下:
typedef struct redisClient {
    ... ...
    redisDb *db;
    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
    ... ...
}

// 每个watched_keys的alue值的结构如下,定义了key和对应的数据库的指针。
typedef struct watchedKey {
    robj *key;
    redisDb *db;
} watchedKey;


* 实现过程
watch命令对应的执行函数是watchCommand,实现如下:
函数调用关系如下:
watchCommand()
   --> watchForKey()

* watchCommand函数
void watchCommand(redisClient *c) {
    int j;

    // 是否设置了REDIS_MULTI标志,若设置了该事务标志位,则不允许watch操作。
    if (c->flags & REDIS_MULTI) {
        addReplyError(c,"WATCH inside MULTI is not allowed");
        return;
    }
    // 命令行
    for (j = 1; j < c->argc; j++)
        watchForKey(c,c->argv[j]);
    addReply(c,shared.ok);
}

* watchForKey函数

/* Watch for the specified key */
void watchForKey(redisClient *c, robj *key) {
    list *clients = NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;

    /* Check if we are already watching for this key */
    // 为迭代器赋值,用来遍历client的watched_keys链表。
    listRewind(c->watched_keys,&li);
    // 通过迭代器遍历该客户端的watched_keys链表。
    while((ln = listNext(&li))) {
        // 获取watchedKey的指针,其实也就是链表节点中value的指针
        wk = listNodeValue(ln);
        // 若该key已经存在,且数据库也相同,则什么都不做,直接返回
        if (wk->db == c->db && equalStringObjects(key,wk->key))
            return; /* Key already watched */
    }
    
    /* This key is not already watched in this DB. Let's add it */
    // 若运行到这里,说明key还没有被watch,要添加该key到相应的链表:
    // 首先把该key添加到redisClient->watched_keys链表中
    clients = dictFetchValue(c->db->watched_keys,key);
    if (!clients) {
        clients = listCreate();
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    listAddNodeTail(clients,c);

    /* Add the new key to the list of keys watched by this client */
    // 创建watchedKey结构,并把该key的值付给该节点
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    // 把该key添加到c->watched_keys链表后
    listAddNodeTail(c->watched_keys,wk);
}
阅读(1896) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~