Chinaunix首页 | 论坛 | 博客
  • 博客访问: 8169187
  • 博文数量: 595
  • 博客积分: 13065
  • 博客等级: 上将
  • 技术积分: 10334
  • 用 户 组: 普通用户
  • 注册时间: 2008-03-26 16:44
个人简介

推荐: blog.csdn.net/aquester https://github.com/eyjian https://www.cnblogs.com/aquester http://blog.chinaunix.net/uid/20682147.html

文章分类

全部博文(595)

分类: 大数据

2018-09-19 09:23:07

目录


1. 前言

Redis代码优美,注释也很到位,阅读起来会赏心悦目,大大降低了理解门槛。由于redis单线程几乎完成所有工作,整体逻辑是相当复杂的,涉及了太多状态,作者的技术深厚可见一斑。

Redis的单线程设计给出了一种优雅实现高性能服务思路,在实践中值得借鉴。需要注意Redis并不是严格的单线程,实际上它是多进程+多线程。为解决IO和慢速操作带的性能毛刺和卡顿,Redis实现引入的多进程和多线程。但是它的主体仍然是单线程的,单个线程完成网络IO操作和其它操作。

疑问:Redis选择slot数为16384,是一个偶数,没有采用质数,这个可能并不是一个最好的主意。

2. 名词

RESP

REdis Serialization Protocol

Redis序列化协议

AE

A simple Event drived programming library

一个简单的事务驱动编程库

ASAP

 

 

AOF

Append Only File

仅仅追加写文件

BIO

Backgroup IO

后台IO操作,三种涉及BIO:FSYNC、关闭文件和内存free,均为阻塞或慢操作

3. dict.c

哈希表的实现,哈希函数使用了siphash哈希算法。

3.1. siphash算法

一种非加密的64位哈希算法

3.2. 核心函数

Redis中非常核心的算法——哈希表的实现在这个文件中,很多命令都有用到,比如set/hset等,它是除内存分配管理外的最基础实现。核心函数包含但不限于:dictFinddictNextdictAdddictDeletedictFetchValuedictGetHashdictReplacedictAddRawdictCreatedictDeletedictReleasedictUnlinkdictRehashdictEmptydictResize等。

3.3. 核心宏

CLUSTER_SLOTS

定义Redis集群Slots个数,值为16384

DICT_HT_INITIAL_SIZE

定义哈希表默认大小宏,值为4

dictSetVal

设置Value

dictSetKey

设置Key

dictCompareKeys

比较两个Key

dictHashKey

Key求哈希

dictGetKey

Key

dictGetVal

Value

dictSlots

得到槽(slot)个数

dictSize

得到已用槽(slot)个数

dictGetSignedIntegerVal

8字节有符号整数值

dictGetUnsignedIntegerVal

8字节无符号整数值

dictGetDoubleVal

double类型值

3.4. 核心结构体

3.4.1. dictEntry

定义了哈希节点的数据结构:

typedef struct dictEntry {

    void *key// Key

    union { // 支持4种类型的值

        void *val; // 字符串值,或二进制值

        uint64_t u64; // 8字节无符号整数值

        int64_t s64; // 8字节有符号整数值

        double d; // 双精度浮点值

    } v;

    struct dictEntry *next; // 下一节点,链地址冲突解决法

} dictEntry;

4. Redis命令

各具体的命令并不做持久化(写AOF等)和传播给Slaves,这两项是公共操作,在上层统一执行,具体函数为server.c中的“void call(client *c, int flags)”,具体执行函数为server.c中的“void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags)”。

4.1. SELECT命令

群里有人问select跟普通命令的效率,猜想select没有效率问题,阅读源代码也证实了这一点。因为它只是改变指针指向,所以不存在效率问题,自然比普通命令效率要高。

Redis支持的所有命令,都存储在类型为redisCommand函数表中,函数表名为redisCommandTable的数组中。

4.1.1. redisCommand结构体

如果以C++来理解redisCommand,可以将redisCommand看抽象基类,它定义了两个虚函数procgetkeys_proc,各种command是它的具体实现。

// server.h

struct redisCommand {

    char *name// 命令名,比如:GET

    redisCommandProc *proc; // 命令处理过程(函数指针)

    // 命令参数的个数,用于检查命令请求的格式是否正确

    // 如果这个值为负数-N,那么表示参数的数量大于等于N

    // 注意:命令的名字本身也是一个参数。

    int arity;

    // 字符串形式的标识值, 这个值记录了命令的属性

    // 比如是读命令还是写命令,是否允许在载入数据时使用,是否允许在 Lua 脚本中使用等

    char *sflags; /* Flags as string representation, one char per flag. */

    // sflags对应的二进制值,方便程序“与”、“或”等操作操作

    int flags;    /* The actual flags, obtained from the 'sflags' field. */

 

    /* Use a function to determine keys arguments in a command line.

     * Used for Redis Cluster redirect. */

    redisGetKeysProc *getkeys_proc// 函数指针

    /* What keys should be loaded in background when calling this command? */

    int firstkey; /* The first argument that's a key (0 = no keys) */

    int lastkey;  /* The last argument that's a key */

    int keystep;  /* The step between first and last key */

 

    // 服务器执行这个命令所耗费的总时长

    long long microseconds;;

    // 服务器总共执行了多少次这个命令

    long long calls;

};

4.1.2. redisCommandTable变量

redisCommandTable的部分定义:

// server.c

struct redisCommand redisCommandTable[] = {

    {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},

    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},

    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},

    。。。。。。

    {"select",selectCommand,2,"lF",0,NULL,0,0,0,0,0},

    。。。。。。

    {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},

    {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},

    {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}

};

4.1.3. selectCommand函数

// server.c

void selectCommand(client *c) {

    long id;

 

    // 第一个参数值为DB的ID

    if (getLongFromObjectOrReply(c, c->argv[1], &id,

        "invalid DB index") != C_OK)

        return;

 

    // 集群不支持SELECT命令

    if (server.cluster_enabled && id != 0) {

        addReplyError(c,"SELECT is not allowed in cluster mode");

        return;

    }

    if (selectDb(c,id) == C_ERR) { // 切换DB

        addReplyError(c,"DB index is out of range");

    } else {

        addReply(c,shared.ok);

    }

}

 

// db.c

int selectDb(client *c, int id) {

    if (id < 0 || id >= server.dbnum// dbnum值由redis.conf中的databases决定,默认为16

        return C_ERR;

    c->db = &server.db[id];

    return C_OK;

}

4.2. SET命令

SET命令主要是字典操作(dict)。

4.2.1. setCommand函数

// t_string.c

/* SET key value [NX] [XX] [EX ] [PX ] */

void setCommand(client *c) {

    int j;

    robj *expire = NULL;

    int unit = UNIT_SECONDS;

    int flags = OBJ_SET_NO_FLAGS;

 

    // 以下一长段均为参数选项解析,

    // 第一3个开始,因为第一个为命令SET自身,第二个为KEY,第三个为VALUE

    for (j = 3; j < c->argc; j++) {

        char *a = c->argv[j]->ptr;

        robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];

 

        if ((a[0] == 'n' || a[0] == 'N') &&

            (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&

            !(flags & OBJ_SET_XX))

        {

            flags |= OBJ_SET_NX;

        } else if ((a[0] == 'x' || a[0] == 'X') &&

                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&

                   !(flags & OBJ_SET_NX))

        {

            flags |= OBJ_SET_XX;

        } else if ((a[0] == 'e' || a[0] == 'E') &&

                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&

                   !(flags & OBJ_SET_PX) && next)

        {

            flags |= OBJ_SET_EX;

            unit = UNIT_SECONDS;

            expire = next;

            j++;

        } else if ((a[0] == 'p' || a[0] == 'P') &&

                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&

                   !(flags & OBJ_SET_EX) && next)

        {

            flags |= OBJ_SET_PX;

            unit = UNIT_MILLISECONDS;

            expire = next;

            j++;

        } else {

            addReply(c,shared.syntaxerr); // 语法错误

            return;

        }

    }

 

    c->argv[2] = tryObjectEncoding(c->argv[2]);

    setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);

}

 

// t_string.c

void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {

    。。。。。。

    setKey(c->db,key,val);

    server.dirty++;

    if (expire) setExpire(c,c->db,key,mstime()+milliseconds);

    notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);

    if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);

    addReply(c, ok_reply ? ok_reply : shared.ok);

}

 

// db.c

void setKey(redisDb *db, robj *key, robj *val) {

    if (lookupKeyWrite(db,key) == NULL) { // 性能开销主要在这,参见dictFind函数

        dbAdd(db,key,val);

    } else {

        dbOverwrite(db,key,val);

    }

    incrRefCount(val);

    removeExpire(db,key);

    signalModifiedKey(db,key);

}

 

// db.c

/* Add the key to the DB. It's up to the caller to increment the reference

 * counter of the value if needed.

 *

 * The program is aborted if the key already exists. */

void dbAdd(redisDb *db, robj *key, robj *val) {

    sds copy = sdsdup(key->ptr);

    int retval = dictAdd(db->dict, copy, val);

 

    serverAssertWithInfo(NULL,key,retval == DICT_OK);

    if (val->type == OBJ_LIST ||

        val->type == OBJ_ZSET)

        signalKeyAsReady(db, key);

    if (server.cluster_enabled) slotToKeyAdd(key);

}

 

// dict.h

#define dictSetVal(d, entry, _val_) do { \

    if ((d)->type->valDup) \

        (entry)->v.val = (d)->type->valDup((d)->privdata, _val_); \

    else \

        (entry)->v.val = (_val_); \

} while(0)

 

// db.c

// KEY存在时覆盖写

/* Overwrite an existing key with a new value. Incrementing the reference

 * count of the new value is up to the caller.

 * This function does not modify the expire time of the existing key.

 *

 * The program is aborted if the key was not already present. */

void dbOverwrite(redisDb *db, robj *key, robj *val) {

    dictEntry *de = dictFind(db->dict,key->ptr);

 

    serverAssertWithInfo(NULL,key,de != NULL);

    dictEntry auxentry = *de;

    robj *old = dictGetVal(de);

 

    // 内存策略由redis.conf中的maxmemory-policy决定

    // LFU策略是4.0开始引入的(Least Frequently Used,最不经常使用

    // #define MAXMEMORY_FLAG_LRU (1<<0)

    // #define MAXMEMORY_FLAG_LFU (1<<1)

    // #define MAXMEMORY_FLAG_ALLKEYS (1<<2)

    // #define MAXMEMORY_FLAG_NO_SHARED_INTEGERS (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU)

    // #define MAXMEMORY_VOLATILE_LRU ((0<<8)|MAXMEMORY_FLAG_LRU)

    // #define MAXMEMORY_VOLATILE_LFU ((1<<8)|MAXMEMORY_FLAG_LFU)

    // #define MAXMEMORY_VOLATILE_TTL (2<<8)

    // #define MAXMEMORY_VOLATILE_RANDOM (3<<8)

    // #define MAXMEMORY_ALLKEYS_LRU ((4<<8)|MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_ALLKEYS)

    // #define MAXMEMORY_ALLKEYS_LFU ((5<<8)|MAXMEMORY_FLAG_LFU|MAXMEMORY_FLAG_ALLKEYS)

    // #define MAXMEMORY_ALLKEYS_RANDOM ((6<<8)|MAXMEMORY_FLAG_ALLKEYS)

    // #define MAXMEMORY_NO_EVICTION (7<<8)

    if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {

        val->lru = old->lru;

    }

    dictSetVal(db->dict, de, val);

 

    if (server.lazyfree_lazy_server_del) {

        freeObjAsync(old);

        dictSetVal(db->dict, &auxentry, NULL);

    }

 

    dictFreeVal(db->dict, &auxentry);

}

4.2.2. dictAdd函数

// dict.c

// KEY不存在时新增

/* Add an element to the target hash table */

int dictAdd(dict *d, void *key, void *val)

{

    dictEntry *entry = dictAddRaw(d,key,NULL);

 

    if (!entry) return DICT_ERR;

    dictSetVal(d, entry, val);

    return DICT_OK;

}

4.2.3. dictFind函数

字典查找实现:

dictEntry *dictFind(dict *d, const void *key)

{

    dictEntry *he;

    uint64_t h, idx, table;

 

    if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */

    if (dictIsRehashing(d)) _dictRehashStep(d);

 

    // #define dictHashKey(d, key) (d)->type->hashFunction(key)

    h = dictHashKey(d, key); // 对key求哈希值

    for (table = 0; table <= 1; table++) {

        idx = h & d->ht[table].sizemask;

        he = d->ht[table].table[idx];

        while(he) { // 链式冲突解决法

            if (key==he->key || dictCompareKeys(d, key, he->key))

                return he;

            he = he->next;

        }

        if (!dictIsRehashing(d)) return NULL; // 重哈希

    }

    return NULL;

}

 

typedef struct dictType {

    uint64_t (*hashFunction)(const void *key);

    void *(*keyDup)(void *privdata, const void *key);

    void *(*valDup)(void *privdata, const void *obj);

    int (*keyCompare)(void *privdata, const void *key1, const void *key2);

    void (*keyDestructor)(void *privdata, void *key);

    void (*valDestructor)(void *privdata, void *obj);

} dictType;

4.3. HSETNX命令

4.3.1. hsetnxCommand函数

// t_hash.c

void hsetnxCommand(client *c) {

    robj *o;

    if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;

    hashTypeTryConversion(o,c->argv,2,3);

 

    if (hashTypeExists(o, c->argv[2]->ptr)) {

        addReply(c, shared.czero);

    } else {

        hashTypeSet(o,c->argv[2]->ptr,c->argv[3]->ptr,HASH_SET_COPY);

        addReply(c, shared.cone);

        signalModifiedKey(c->db,c->argv[1]);

        notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);

        server.dirty++;

    }

}

4.3.2. hashTypeSet函数

// t_hash.c

int hashTypeSet(robj *o, sds field, sds value, int flags) {

    if (o->encoding == OBJ_ENCODING_ZIPLIST) {

        // ziplistInsert

        // ziplistPush

    } else if (o->encoding == OBJ_ENCODING_HT) {

        。。。。。。

        dictAdd(o->ptr,f,v);

    } else {

        serverPanic("Unknown hash encoding");

    }

}

5. Redis多路复用机制

5.1. AE是什么?

AE”为“A simple event-driven programming library”的缩写,翻译成中文,即一个简单的事件驱动编程库。就Linux而言,可简单理解为对epoll的封装。

5.2. 多路复用选择aeApiPoll

Redis支持selectepollkqueueevport四种模式,编译Redis时决定采用哪一种,因此运行时只会有一种有效。优先顺序是:evport -> epoll -> kqueue -> select,其中select为兜底用,因为所有系统都会支持select

对应的源码文件,分别为:

evport

ae_evport.c

Solaris

epoll

ae_epoll.c

Linux系统

kqueue

ae_kqueue.c

BSD类系统

select

ae_select.c

其它不支持epoll、evport和kqueue系统


桥接它们的文件则是ae.c,桥接代码:

/* Include the best multiplexing layer supported by this system.

 * The following should be ordered by performances, descending. */

#ifdef HAVE_EVPORT

#include "ae_evport.c"

#else

    #ifdef HAVE_EPOLL

    #include "ae_epoll.c"

    #else

        #ifdef HAVE_KQUEUE

        #include "ae_kqueue.c"

        #else

        #include "ae_select.c"

        #endif

    #endif

#endif


HAVE_EVPORTHAVE_EPOLLHAVE_KQUEUE则在config.h中决议出:

#ifdef __sun

#include 

#ifdef _DTRACE_VERSION

#define HAVE_EVPORT 1

#endif

#endif

 

#ifdef __linux__

#define HAVE_EPOLL 1

#endif

 

#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)

#define HAVE_KQUEUE 1

#endif

5.3. 多路复用调用aeCreateFileEvent

main函数开始的调用过程(读事件和写事件的顺序,作者在这里用了个小技巧,支持优先响应是查询,即立即返回查询结果):

int main(int argc, char **argv) { // server.c

    。。。。。。

    // aeMain(server.el);

    void aeMain(aeEventLoop *eventLoop) // ae.c

    {

        while (!eventLoop->stop) {

            if (eventLoop->beforesleep != NULL)

                eventLoop->beforesleep(eventLoop); // 这很重要,写AOF文件在这里进行

            // aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);

            int aeProcessEvents(aeEventLoop *eventLoop, int flags) { // ae.c

                // linux实际调用的是ae_epoll.c中的aeApiPoll函数

                numevents = aeApiPoll(eventLoop, tvp);

                if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)

                    eventLoop->aftersleep(eventLoop);

                    

                for (j = 0; j < numevents; j++) {

                    // 一般优先处理读事件,然后再写事件,但有时为立即响应是查询则写优先

                    /* Normally we execute the readable event first, 

                       and the writable event laster. */

                    int invert = fe->mask & AE_BARRIER;

                    /* Fire the readable event if the call sequence is not inverted. */

                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);

                    /* Fire the writable event. */

                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);

                    /* If we have to invert the call, 

                    fire the readable event now after the writable one. */

                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);

                    // 函数aeCreateFileEvent中设置rfileProcwfileProc

                    // rfileProcwfileProc的初始化均在aeCreateFileEvent中完成

                }

            }

        }

    }

    。。。。。。

}

 

// 初始化rfileProc和wfileProc

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,

                      aeFileProc *proc, void *clientData)

{

    if (fd >= eventLoop->setsize) {

        errno = ERANGE;

        return AE_ERR;

    }

    aeFileEvent *fe = &eventLoop->events[fd]; // events是一个以fd为下标的数组,0查找

 

    if (aeApiAddEvent(eventLoop, fd, mask) == -1) // fd放到epoll或select等中

        return AE_ERR;

    fe->mask |= mask; // 读写事件

    if (mask & AE_READABLE) fe->rfileProc = proc;

    if (mask & AE_WRITABLE) fe->wfileProc = proc;

    fe->clientData = clientData; // 附加数据

    if (fd > eventLoop->maxfd)

        eventLoop->maxfd = fd; // 这个主要给select时用,epoll等不需要

    return AE_OK;

}

6. 网络收发过程

6.1. 过程简要

main

-> initServer

-> aeCreateFileEvent(acceptTcpHandler)

-> createClient

-> aeCreateFileEvent(readQueryFromClient)

-> read

-> processCommand

-> lookupCommand

-> call(c,CMD_CALL_FULL) // call在调用proc后,会调用propagate函数将命令传播给AOF和Slaves

-> c->cmd->proc(c) // 这个proc就是各种redis命令处理过程,比如:setCommand、hgetCommand等

6.2. 接受连接请求acceptTcpHandler

main函数中,会为accept注册一个事件acceptTcpHandler:

// server.c

int main(int argc, char **argv) {

    void initServer(void) {

        // 支持多个IP,即redis.conf中可以bind多个IP

        for (j = 0; j < server.ipfd_count; j++) {

            if (aeCreateFileEvent(server.el, server.ipfd[j], // 以事件方式加入到epoll中

                AE_READABLE, acceptTcpHandler, NULL) == AE_ERR)

            {

                serverPanic("Unrecoverable error creating server.ipfd file event.");

            }

        }

    }

}

 

// networking.c

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {

    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;

    char cip[NET_IP_STR_LEN];

    UNUSED(el);

    UNUSED(mask);

    UNUSED(privdata);

 

    // 一次性连接接受max个连接,以提升效率

    while(max--) {

        // anetTcpAccept处理了中断ENTR,方式是重试到成功或出错

        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);

        if (cfd == ANET_ERR) {

            if (errno != EWOULDBLOCK) // EWOULDBLOCK是正常的

                serverLog(LL_WARNING,

                    "Accepting client connection: %s", server.neterr);

            return;

        }

        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);

        acceptCommonHandler(cfd,0,cip);

    }

}

 

// networking.c

static void acceptCommonHandler(int fd, int flags, char *ip) {

    client *c;

    // createClient为aceeptCommonHandler调用的核心函数

    // 在createClient中会为fd关联读事件过程,并加入到epoll中

    c = createClient(fd);

    

    // 控制连接数不超过redis.conf上配置的最大数

    if (listLength(server.clients) > server.maxclients) {

        char *err = "-ERR max number of clients reached\r\n";

        /* That's a best effort error message, don't check write errors */

        if (write(c->fd,err,strlen(err)) == -1) {

            /* Nothing to do, Just to avoid the warning... */

        }

        

        server.stat_rejected_conn++; // 被拒绝的连接数增一

        freeClient(c);

        return;

    }

    

    // 保护模式处理

    if (server.protected_mode &&

        server.bindaddr_count == 0 &&

        server.requirepass == NULL &&

        !(flags & CLIENT_UNIX_SOCKET) &&

        ip != NULL)

    {

        // 保护模式下,只允许127.0.0.1连接

        if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {

            char *err = "-DENIED Redis is running in protected mode because protected。。。。。。";

            if (write(c->fd,err,strlen(err)) == -1) {

                /* Nothing to do, Just to avoid the warning... */

            }

            

            server.stat_rejected_conn++; // 被拒绝的连接数增一

            freeClient(c);

            return;

        }

    }

    

    server.stat_numconnections++; // 当前连接数增一

    c->flags |= flags;

}

6.3. 创建client对象createClient

// networking.c

client *createClient(int fd) {

    client *c = zmalloc(sizeof(client));

    // 如果redis.conf中开启了tcpkeepalive,配置项名tcp-keepalive,单位:秒

    if (server.tcpkeepalive)

        anetKeepAlive(NULL,fd,server.tcpkeepalive);

    // 注册c到epoll,并关联readQueryFromClient,事件为读事件AE_READABLE

    if (aeCreateFileEvent(server.el,fd,AE_READABLEreadQueryFromClient, c) == AE_ERR)

    {

        close(fd);

        zfree(c);

        return NULL;

    }

    

    selectDb(c,0); // 默认选择第1个DB,调用者需要执行select命令切换

    。。。。。。

}

6.4. 读取命令readQueryFromClient

// networking.c

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {

    client *c = (client*)privdata;

    int nread = read(fd, c->querybuf+qblen, readlen);

 

    processInputBufferAndReplicate(c);

}

 

/* This is a wrapper for processInputBuffer that also cares about handling

 * the replication forwarding to the sub-slaves, in case the client 'c'

 * is flagged as master. Usually you want to call this instead of the

 * raw processInputBuffer(). */

void processInputBufferAndReplicate(client *c) {

    if (!(c->flags & CLIENT_MASTER)) {

        processInputBuffer(c); // 处理命令

    } else {

        size_t prev_offset = c->reploff;

        processInputBuffer(c);

        size_t applied = c->reploff - prev_offset;

        if (applied) {

            // 处理主从复制

            replicationFeedSlavesFromMasterStream(server.slaves,

                    c->pending_querybuf, applied);

            sdsrange(c->pending_querybuf,applied,-1);

        }

    }

}

6.5. 命令处理processCommand

processInputBuffer调用processCommand,大量逻辑在processCommand中:

// networking.c

/* If this function gets called we already read a whole

 * command, arguments are in the client argv/argc fields.

 * processCommand() execute the command or prepare the

 * server for a bulk read from the client.

 *

 * If C_OK is returned the client is still alive and valid and

 * other operations can be performed by the caller. Otherwise

 * if C_ERR is returned the client was destroyed (i.e. after QUIT). */

void processInputBuffer(client *c) {

    server.current_client = c;

    

    int processCommand(client *c) {

        // 单独处理QUIT命令

        /* The QUIT command is handled separately. Normal command procs will

         * go through checking for replication and QUIT will cause trouble

         * when FORCE_REPLICATION is enabled and would be implemented in

         * a regular command proc. */

        if (!strcasecmp(c->argv[0]->ptr,"quit")) {

            addReply(c,shared.ok);

            c->flags |= CLIENT_CLOSE_AFTER_REPLY;

            return C_ERR;

        }

        

        // 查找命令的处理

        /* Now lookup the command and check ASAP about trivial error conditions

         * such as wrong arity, bad command name and so forth. */

        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);

        if (!c->cmd) {

            addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",。。。);

            sdsfree(args);

            return C_OK;

        }

        else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||

                 (c->argc < -c->cmd->arity)) {

            // 参数个数不对

            addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name);

            return C_OK;

        }

        

        /* Check if the user is authenticated */

        if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) {

            // 检查是否有设置密码,和密码是否已经验证通过

            addReply(c,shared.noautherr);

            return C_OK;

        }

        

        // 检查是否需要重定向

        /* If cluster is enabled perform the cluster redirection here.

         * However we don't perform the redirection if:

         * 1) The sender of this command is our master.

         * 2) The command has no key arguments. */

        if (server.cluster_enabled &&

          !(c->flags & CLIENT_MASTER) &&

          !(c->flags & CLIENT_LUA &&

           server.lua_caller->flags & CLIENT_MASTER) &&

          !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&

           c->cmd->proc != execCommand)) {

            if (n == NULL || n != server.cluster->myself) {

                if (c->cmd->proc == execCommand) {

                    discardTransaction(c);

                }

                else {

                    flagTransaction(c);

                }

                clusterRedirectClient(c,n,hashslot,error_code);

                return C_OK;                

            }

        }

        

        // 检查最大内存,如果超过配置的maxmemory,

        // 则返回错误“-OOM command not allowed when used memory > 'maxmemory'.”

        /* Handle the maxmemory directive.

         *

         * First we try to free some memory if possible (if there are volatile

         * keys in the dataset). If there are not the only thing we can do

         * is returning an error.

         *

         * Note that we do not want to reclaim memory if we are here re-entering

         * the event loop since there is a busy Lua script running in timeout

         * condition, to avoid mixing the propagation of scripts with the propagation

         * of DELs due to eviction. */

        if (server.maxmemory && !server.lua_timedout) {

            /* It was impossible to free enough memory, and the command the client

             * is trying to execute is denied during OOM conditions? Error. */

            if ((c->cmd->flags & CMD_DENYOOM) && out_of_memory) {

                addReply(c, shared.oomerr);

                return C_OK;

            }

        }

        

        // 磁盘有问题不接受写请求

        /* Don't accept write commands if there are problems persisting on disk

         * and if this is a master instance. */

        int deny_write_type = writeCommandsDeniedByDiskError();

        if (deny_write_type != DISK_ERROR_TYPE_NONE && server.masterhost == NULL &&

            (c->cmd->flags & CMD_WRITE || c->cmd->proc == pingCommand)) {

            if (deny_write_type == DISK_ERROR_TYPE_RDB)

                addReply(c, shared.bgsaveerr);

            else

                addReplySds(c, sdscatprintf(sdsempty(),

                    "-MISCONF Errors writing to the AOF file: %s\r\n",

                    strerror(server.aof_last_write_errno)));

            return C_OK;

        }

        

        // 不接受写请求,如果没有足够多好的slave

        /* Don't accept write commands if there are not enough good slaves and

         * user configured the min-slaves-to-write option. */

        if (server.masterhost == NULL &&

            server.repl_min_slaves_to_write &&

            server.repl_min_slaves_max_lag &&

            c->cmd->flags & CMD_WRITE &&

            server.repl_good_slaves_count < server.repl_min_slaves_to_write) {

            addReply(c, shared.noreplicaserr);

            return C_OK;

        }

        

        // 只读的slave不接受写请求

        /* Don't accept write commands if this is a read only slave. But

         * accept write commands if this is our master. */

        if (server.masterhost && server.repl_slave_ro &&

            !(c->flags & CLIENT_MASTER) &&

            c->cmd->flags & CMD_WRITE)

        {

            addReply(c, shared.roslaveerr);

            return C_OK;

        }

        

        // 只接受SUB、UNSUB、PSUB、PUNSUB和PING命令

        /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */

        if (c->flags & CLIENT_PUBSUB &&

            c->cmd->proc != pingCommand &&

            c->cmd->proc != subscribeCommand &&

            c->cmd->proc != unsubscribeCommand &&

            c->cmd->proc != psubscribeCommand &&

            c->cmd->proc != punsubscribeCommand) {

            addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");

            return C_OK;

        }

 

        /* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,

         * when slave-serve-stale-data is no and we are a slave with a broken

         * link with master. */

        if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&

            server.repl_serve_stale_data == 0 &&

            !(c->cmd->flags & CMD_STALE))

        {

            flagTransaction(c);

            addReply(c, shared.masterdownerr);

            return C_OK;

        }

        

        // 正在加载DB,直接返回错误

        /* Loading DB? Return an error if the command has not the

         * CMD_LOADING flag. */

        if (server.loading && !(c->cmd->flags & CMD_LOADING)) {

            addReply(c, shared.loadingerr);

            return C_OK;

        }

        

        /* Lua script too slow? Only allow a limited number of commands. */

        if (server.lua_timedout &&

              c->cmd->proc != authCommand &&

              c->cmd->proc != replconfCommand &&

            !(c->cmd->proc == shutdownCommand &&

              c->argc == 2 &&

              tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&

            !(c->cmd->proc == scriptCommand &&

              c->argc == 2 &&

              tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))

        {

            flagTransaction(c);

            addReply(c, shared.slowscripterr);

            return C_OK;

        }

 

        /* Exec the command */

        if (c->flags & CLIENT_MULTI &&

            c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&

            c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)

        {

            /* Add a new command into the MULTI commands queue */

            queueMultiCommand(c); // 链上新的事务子命令(队列结构),等到EXEC时一块执行

            addReply(c,shared.queued); // 向client返回QUEUED

        } else {

            call(c,CMD_CALL_FULL); // 调用命令处理过程

            c->woff = server.master_repl_offset; // 更新复制偏移

            if (listLength(server.ready_keys))

                handleClientsBlockedOnKeys();

        }

        return C_OK;

    }

}

7. 持久化和复制

call在调用proc后,会调用propagate函数将命令传播给AOFSlaves

int processCommand(client *c) {

    。。。。。。

    void call(client *c, int flags) {

        。。。。。。

        propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);

    }

}

7.1. propagate函数(aof&replication

/* Propagate the specified command (in the context of the specified database id)

 * to AOF and Slaves.

 *

 * flags are an xor between:

 * + PROPAGATE_NONE (no propagation of command at all)

 * + PROPAGATE_AOF (propagate into the AOF file if is enabled)

 * + PROPAGATE_REPL (propagate into the replication link)

 *

 * This should not be used inside commands implementation. Use instead

 * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().

 */

void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags)

{

    if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)

        feedAppendOnlyFile(cmd,dbid,argv,argc); // 写AOF

    if (flags & PROPAGATE_REPL)

        replicationFeedSlaves(server.slaves,dbid,argv,argc); // 推送给Slaves

}

7.2. 写AOF文件feedAppendOnlyFile

feedAppendOnlyFile并不实际写文件,而只是将内容组织到AOF buffer中。写AOF文件是在每次epoll之前进行,redis-server启动时注册了函数beforeSleep,它在每次epoll之前被调用。beforeSleep调用flushAppendOnlyFileAOF buffer写入到AOF文件。

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {

    。。。。。。

    /* Append to the AOF buffer. This will be flushed on disk just before

     * of re-entering the event loop, so before the client will get a

     * positive reply about the operation performed. */

    if (server.aof_state == AOF_ON// aof_state值由redis.conf中的appendonly控制

        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

}


redis-server启动:

int main(int argc, char **argv) {

    aeSetBeforeSleepProc(server.el, beforeSleep); // 注册回调beforeSleep

    aeSetAfterSleepProc(server.el, afterSleep);

    aeMain(server.el);

}


调用beforeSleep

void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

        if (eventLoop->beforesleep != NULL)

            eventLoop->beforesleep(eventLoop); // 将AOF buffer写入到AOF文件

        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); // epoll

    }

}


调用flushAppendOnlyFile函数将AOF buffer写入到AOF文件:

/* This function gets called every time Redis is entering the

 * main loop of the event driven library, that is, before to sleep

 * for ready file descriptors. */

void beforeSleep(struct aeEventLoop *eventLoop) {

    。。。。。。

    /* Unblock all the clients blocked for synchronous replication

     * in WAIT. */

    if (listLength(server.clients_waiting_acks))

        processClientsWaitingReplicas();

    。。。。。。        

    /* Try to process pending commands for clients that were just unblocked. */

    if (listLength(server.unblocked_clients))

        processUnblockedClients();

        

    /* Write the AOF buffer on disk */

    flushAppendOnlyFile(0);

    。。。。。。

    /* Handle writes with pending output buffers. */

    handleClientsWithPendingWrites();

    。。。。。。

}


AOF文件:

/* Write the append only file buffer on disk.

 *

 * Since we are required to write the AOF before replying to the client,

 * and the only way the client socket can get a write is entering when the

 * the event loop, we accumulate all the AOF writes in a memory

 * buffer and write it on disk using this function just before entering

 * the event loop again.

 *

 * About the 'force' argument:

 *

 * When the fsync policy is set to 'everysec' we may delay the flush if there

 * is still an fsync() going on in the background thread, since for instance

 * on Linux write(2) will be blocked by the background fsync anyway.

 * When this happens we remember that there is some aof buffer to be

 * flushed ASAP, and will try to do that in the serverCron() function.

 *

 * However if force is set to 1 we'll write regardless of the background

 * fsync. */

#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */

void flushAppendOnlyFile(int force) {

    // aofWrite调用write将AOF buffer写入到AOF文件,处理了ENTR,其它没什么

    ssize_t nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));

    。。。。。。

    /* Handle the AOF write error. */

    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {

        /* We can't recover when the fsync policy is ALWAYS since the

         * reply for the client is already in the output buffers, and we

         * have the contract with the user that on acknowledged write data

         * is synced on disk. */

        serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");

        exit(1);

    } else {

        return; /* We'll try again on the next call... */

    } else {

        /* Successful write(2). If AOF was in error state, restore the

         * OK state and log the event. */

    }

    。。。。。。

    /* Perform the fsync if needed. */

    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {

        // redis_fsync是一个宏,Linux实际为fdatasync,其它为fsync

        // 所以最好不要将redis.conf中的appendfsync设置为always,这极影响性能         

          // 注意,Redis的实现并没有处理fsync和close的返回值,所以还不是最严格的


        redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */

    }

    else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) {

        // 如果已在sync状态,则不再重复

        // BIO线程会间隔设置sync_in_progress

        // if (server.aof_fsync == AOF_FSYNC_EVERYSEC)

        //     sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;

        if (!sync_in_progress)

            // eversec性能并不那么糟糕,因为它:

            // 后台方式执行fsync

            // Redis并不是严格意义上的单线程,实际上它创建一组BIO线程,专门处理阻塞和慢操作

            // 这些操作就包括FSYNC,另外还有关闭文件和内存的free两个操作。

            // 不像always,EVERYSEC模式并不立即调用fsync,

            // 而是将这个操作丢给了BIO线程异步执行,

            // BIO线程在进程启动时被创建,两者间通过bio_jobs和bio_pending两个

            // 全局对象交互,其中主线程负责写,BIO线程负责消费。

            aof_background_fsync(server.aof_fd);

        server.aof_last_fsync = server.unixtime;

    }

}

7.3. 数据复制replicationFeedSlaves

这里也不会真正将数据发给Slaves,而只是将数据放入到replication buffer中。

/* Propagate write commands to slaves, and populate the replication backlog

 * as well. This function is used if the instance is a master: we use

 * the commands received by our clients in order to create the replication

 * stream. Instead if the instance is a slave and has sub-slaves attached,

 * we use replicationFeedSlavesFromMaster() */

void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {

    。。。。。。

    /* Write the command to every slave. */

    listRewind(slaves,&li);

    while((ln = listNext(&li))) { // 遍历所有的slaves

        client *slave = ln->value; // Slave也被当作client

 

        /* Don't feed slaves that are still waiting for BGSAVE to start */

        if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;

 

        /* Feed slaves that are waiting for the initial SYNC (so these commands

         * are queued in the output buffer until the initial SYNC completes),

         * or are already in sync with the master. */

 

        /* Add the multi bulk length. */

        addReplyMultiBulkLen(slave,argc);

 

        /* Finally any additional argument that was not stored inside the

         * static buffer if any (from j to argc). */

        for (j = 0; j < argc; j++)

            addReplyBulk(slave,argv[j]);

    }

}

8. 进程启动时做了些什么?

进程启动时事项:

1) 为所有配置设置默认值

2) 加载配置文件redis.conf,覆盖默认值

3) 安装信号处理器

4) 初始化全局状态

5) 创建epoll,启动TCP监控

6) 创建BIO线程

7) 加载module

8) 加载AOFRDB

9) 设置beforeSleep和afterSleep两个回调

10) 进入epoll循环

int main(int argc, char **argv) {

    // 初始化所有配置,为它们设置默认值

    initServerConfig();

    

    // 如果是sentinel模式,额外做相关初始化

    // sentinel_mode的值由命令行参数中是否包含参数“--sentinel”决定

    if (server.sentinel_mode) {

        initSentinelConfig();

        initSentinel();

    }

 

    // redis-server和redis-check-rdb、redis-check-aof共享了同一个main函数

    // 进入redis_check_rdb_main或redis_check_aof_main后不会再返回,

    // 也就是这两个函数后的代码均不会被执行。

    if (strstr(argv[0],"redis-check-rdb") != NULL)

        redis_check_rdb_main(argc,argv,NULL);

    else if (strstr(argv[0],"redis-check-aof") != NULL)

        redis_check_aof_main(argc,argv);

 

    // 加载redis.conf,

    // 包括初始化module列表loadmodule_queue,

    // 注意会将redis-server的命令行参数一并传递给module

    loadServerConfig(configfile,options);

    

    // 置为deamon方式,

    // 可通过redis.conf中的daemonize控制是否以deamon方式运行

    if (background) daemonize();

    

    // 对redis-server核心的初始化,包括但不限于:

    // 1) 忽略HUP和PIPE信号

    // 2) 安装TERM和INT两个信号的处理器

    // 3) 初始化全局的redisServer对象server

    // 4) 注册定时器回调serverCron

    // 5) 注册accept的回调acceptTcpHandler

    // 6) 注册UNIX套接字回调acceptUnixHandler

    // 7) 注册模块回调moduleBlockedClientPipeReadable

    // 8) 创建或追加模式打开AOF文件

    // 9) 集群server.cluster初始化

    // 10) 复制脚本初始化

    // 11) 初始化lua运行环境

    // 12) 慢日志初始化

    // 13) 初始化延迟监控(latency monitor)

    // 14) 创建BIO线程,由bio.h中的BIO_NUM_OPS决定个数,当前为3个

    initServer();

    

    // 如果是daemon方式,则创建pid文件

    if (background || server.pidfile) createPidFile();

    

    // 修改进程的title,加入状态和端口号,这样对ps等更为友好

    redisSetProcTitle(argv[0]);

    

    // 进程启动时显示LOGO,

    // 可通过redis.conf中的always-show-logo控制是否显示

    redisAsciiArt();

    

    // 检查TCP的“/proc/sys/net/core/somaxconn”

    checkTcpBacklogSettings

    

    if (server.sentinel_mode) {

        sentinelIsRunning();

    } else {

        // 如果是Linux,

        // 检查“/proc/sys/vm/overcommit_memory”

        // 和“/sys/kernel/mm/transparent_hugepage/enabled”的值。

        // “/proc/sys/vm/overcommit_memory”值需要为1,

        // “/sys/kernel/mm/transparent_hugepage/enabled”需要为“never”

        linuxMemoryWarnings();

        

        // 加载所有的模块,如果有一个加载失败则启动失败

        // 具体有哪些模块,由redis.conf中的loadmodule决定,如:

        // loadmodule /path/to/my_module.so

        // loadmodule /path/to/other_module.so

        // 每个module一行

        moduleLoadFromQueue();

        

        // 从磁盘加载RDB或AOF到内存

        // 如果开启了AOF,

        // 则调用loadAppendOnlyFile加载AOF文件,这个时候不会加载RDB文件

        // 没开启AOF时,调用rdbLoad加载RDB文件

        // 不管是AOF还是RDB,加载不成功时,进程均不能启动。

        loadDataFromDisk();

        

        if (server.cluster_enabled) {

            if (verifyClusterConfigWithData() == C_ERR) {

                serverLog(LL_WARNING,

                    "You can't have keys in a DB different than DB 0 when in "

                    "Cluster mode. Exiting.");

                exit(1);

        }

        

        if (server.ipfd_count > 0)

            serverLog(LL_NOTICE,"Ready to accept connections");

        if (server.sofd > 0)

            serverLog(LL_NOTICE,

                "The server is now ready to accept connections at %s",

                server.unixsocket);

    }

    

    // redis.conf中的maxmemory值过小,进行警告提示

    if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {

        serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);

    }

    

    // 注册回调beforeSleep,

    // 对于eversec模式的appendfsync将在这个回调中进行

    aeSetBeforeSleepProc(server.el,beforeSleep);

    aeSetAfterSleepProc(server.el,afterSleep);

    

    // 进入epoll主循环:

    // BSD为kqueue死循环,

    // Solaris为evport死循环,

    // 其它为select死循环

    //

    // 网络数据的收和发,以及所有COMMAND的操作均在这个循环中完成

    aeMain(server.el);

 

    // 关闭epoll,释放相关资源

    aeDeleteEventLoop(server.el);

    return 0;

}

9. 核心对象

Redis虽然采用C语言实现,但内含明显的对象,核心的对象包括:

9.1. redisServer

这是一个超大的对象,实为redis-serverContext对象,承载了redis-server的各种全局状态。下面只列出它的一小部分:

// server.h

struct redisServer {

    /* General */

    pid_t pid; /* Main process pid. */

    char *configfile; /* Absolute config file path, or NULL */

    

    /* Networking */

    int port; /* TCP listening port */

    

    /* AOF persistence */

    int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */

    int aof_fsync; /* Kind of fsync() policy */

    

    /* RDB persistence */

    long long dirty; /* Changes to DB from the last save */

    

    /* Logging */

    char *logfile; /* Path of log file */

    

    /* Replication (master) */

    char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */

    

    /* Replication (slave) */

    char *masterhost; /* Hostname of master */

    

    /* Cluster */

    int cluster_enabled; /* Is cluster enabled? */

    

    /* Scripting */

    lua_State *lua; /* The Lua interpreter. We use just one for all clients */

    

    /* Latency monitor */

    dict *latency_events;

};

9.2. client

一个client对象,可以是一个Client,也可以是一个slave,也可以是一个master

// server.h

/* With multiplexing we need to take per-client state.

 * Clients are taken in a linked list. */

typedef struct client {

    uint64_t id; /* Client incremental unique ID. */

    int fd; /* Client socket. */

    redisDb *db; /* Pointer to currently SELECTed DB. */

    robj *name; /* As set by CLIENT SETNAME. */

    sds querybuf; /* Buffer we use to accumulate client queries. */

    size_t qb_pos; /* The position we have read in querybuf. */

    int argc; /* Num of arguments of current command. */

    robj **argv; /* Arguments of current command. */

    struct redisCommand *cmd, *lastcmd; /* Last command executed. */

    list *reply; /* List of reply objects to send to the client. */

    int authenticated; /* When requirepass is non-NULL. */

    int replstate; /* Replication state if this is a slave. */

    char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */

    

    /* Response buffer */

    int bufpos;

    char buf[PROTO_REPLY_CHUNK_BYTES];

};

9.3. redisCommand

Redis命令结构:

struct redisCommand {

    char *name; // Redis命令名,如:GET等

    redisCommandProc *proc; // 处理该命令的过程

    int arity;

    char *sflags; /* Flags as string representation, one char per flag. */

    int flags;    /* The actual flags, obtained from the 'sflags' field. */

    

    /* Use a function to determine keys arguments in a command line.

     * Used for Redis Cluster redirect. */

    redisGetKeysProc *getkeys_proc;

 

    /* What keys should be loaded in background when calling this command? */

    int firstkey; /* The first argument that's a key (0 = no keys) */

    int lastkey;  /* The last argument that's a key */

    int keystep;  /* The step between first and last key */

    long long microseconds, calls;

};

9.4. redisDb

RedisDB结构:

/* Redis database representation. There are multiple databases identified

 * by integers from 0 (the default database) up to the max configured

 * database. The database number is the 'id' field in the structure. */

typedef struct redisDb {

    dict *dict;             /* The keyspace for this DB */

    dict *expires;          /* Timeout of keys with a timeout set */

    dict *blocking_keys;    /* Keys with clients waiting for data (BLPOP)*/

    dict *ready_keys;       /* Blocked keys that received a PUSH */

    dict *watched_keys;     /* WATCHED keys for MULTI/EXEC CAS */

    int id;                 /* Database ID */

    long long avg_ttl;      /* Average TTL, just for stats */

    list *defrag_later;     /* List of key names to attempt to defrag one by one, gradually. */

} redisDb;

9.5. clusterNode

Redis集群节点结构:

typedef long long mstime_t/* millisecond time type. */

typedef struct clusterNode {

    mstime_t ctime; /* Node object creation time. */

    char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */

    int flags;      /* CLUSTER_NODE_... */

    uint64_t configEpoch; /* Last configEpoch observed for this node */

    unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */

    int numslots;   /* Number of slots handled by this node */

    int numslaves;  /* Number of slave nodes, if this is a master */

    // 所有的slave节点

    struct clusterNode **slaves; /* pointers to slave nodes */

    // master节点

    struct clusterNode *slaveof; /* pointer to the master node. Note that it

                                    may be NULL even if the node is a slave

                                    if we don't have the master node in our

                                    tables. */

    // 最近一次发送ping的时间

    mstime_t ping_sent;      /* Unix time we sent latest ping */

    // 最近一次接收pong的时间

    mstime_t pong_received;  /* Unix time we received the pong */

    mstime_t fail_time;      /* Unix time when FAIL flag was set */

    mstime_t voted_time;     /* Last time we voted for a slave of this master */

    mstime_t repl_offset_time;  /* Unix time we received offset for this node */

    mstime_t orphaned_time;     /* Starting time of orphaned master condition */

    long long repl_offset;      /* Last known repl offset for this node. */

    char ip[NET_IP_STR_LEN];  /* Latest known IP address of this node */

    int port;                   /* Latest known clients port of this node */

    int cport;                  /* Latest known cluster port of this node. */

    clusterLink *link;          /* TCP/IP link with this node */

    list *fail_reports;         /* List of nodes signaling this as failing */

} clusterNode;

9.6. clusterState

Redis集群状态结构:

typedef struct clusterState {

    clusterNode *myself;  /* This node */

    uint64_t currentEpoch;

    int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */

    int size; /* Num of master nodes with at least one slot */

    

    dict *nodes; /* Hash table of name -> clusterNode structures */

    dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */

    

    clusterNode *migrating_slots_to[CLUSTER_SLOTS];

    clusterNode *importing_slots_from[CLUSTER_SLOTS];

    clusterNode *slots[CLUSTER_SLOTS];

    

    mstime_t failover_auth_time; /* Time of previous or next election. */

    int failover_auth_count;    /* Number of votes received so far. */

    int failover_auth_sent;     /* True if we already asked for votes. */

    int failover_auth_rank;     /* This slave rank for current auth request. */

    uint64_t failover_auth_epoch; /* Epoch of the current election. */

    

    /* Manual failover state of master. */

    clusterNode *mf_slave; /* Slave performing the manual failover. */

    

    /* The followign fields are used by masters to take state on elections. */

    uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */

} clusterState;

10. 主备同步

待续。

11. 主备切换

待续。

阅读(3722) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~