Chinaunix首页 | 论坛 | 博客
  • 博客访问: 4223881
  • 博文数量: 528
  • 博客积分: 13065
  • 博客等级: 上将
  • 技术积分: 9440
  • 用 户 组: 普通用户
  • 注册时间: 2008-03-26 16:44
个人简介

https://blog.csdn.net/Aquester https://www.cnblogs.com/aquester https://github.com/eyjian http://aquester.blog.chinaunix.net

文章分类

全部博文(528)

分类: 大数据

2018-09-19 09:23:07

目录

目录 1

1. 前言 2

2. 名词 2

3. dict.c 2

3.1. siphash算法 2

3.2. 核心函数 3

3.3. 核心宏 3

3.4. 核心结构体 3

3.4.1. dictEntry 3

4. Redis命令 4

4.1. SELECT命令 4

4.1.1. redisCommand结构体 4

4.1.2. redisCommandTable变量 5

4.1.3. selectCommand函数 5

4.2. SET命令 6

4.2.1. setCommand函数 6

4.2.2. dictAdd函数 9

4.2.3. dictFind函数 9

4.3. HSETNX命令 10

4.3.1. hsetnxCommand函数 10

4.3.2. hashTypeSet函数 11

5. Redis多路复用机制 11

5.1. AE是什么? 11

5.2. 多路复用选择aeApiPoll 11

5.3. 多路复用调用aeCreateFileEvent 12

6. 网络收发过程 14

6.1. 过程简要 14

6.2. 接受连接请求acceptTcpHandler 14

6.3. 创建client对象createClient 16

6.4. 读取命令readQueryFromClient 17

6.5. 命令处理processCommand 17

7. 持久化和复制 22

7.1. propagate函数(aof&replication 22

7.2. 写AOF文件feedAppendOnlyFile 23

7.3. 数据复制replicationFeedSlaves 25

8. 进程启动时做了些什么? 26

9. 核心对象 29

9.1. redisServer 29

9.2. client 30

9.3. redisCommand 31

9.4. redisDb 31

9.5. clusterNode 32

9.6. clusterState 32

10. 主备同步 33

11. 主备切换 33


1. 前言

Redis代码优美,注释也很到位,阅读起来会赏心悦目,大大降低了理解门槛。由于redis单线程几乎完成所有工作,整体逻辑是相当复杂的,涉及了太多状态,作者的技术深厚可见一斑。

Redis的单线程设计给出了一种优雅实现高性能服务思路,在实践中值得借鉴。需要注意Redis并不是严格的单线程,实际上它是多进程+多线程。为解决IO和慢速操作带的性能毛刺和卡顿,Redis实现引入的多进程和多线程。但是它的主体仍然是单线程的,单个线程完成网络IO操作和其它操作。

疑问:Redis选择slot数为16384,是一个偶数,没有采用质数,这个可能并不是一个最好的主意。

2. 名词

RESP

REdis Serialization Protocol

Redis序列化协议

AE

A simple Event drived programming library

一个简单的事务驱动编程库

ASAP

 

 

AOF

Append Only File

仅仅追加写文件

BIO

Backgroup IO

后台IO操作,三种涉及BIO:FSYNC、关闭文件和内存free,均为阻塞或慢操作

3. dict.c

哈希表的实现,哈希函数使用了siphash哈希算法。

3.1. siphash算法

一种非加密的64位哈希算法

3.2. 核心函数

Redis中非常核心的算法——哈希表的实现在这个文件中,很多命令都有用到,比如set/hset等,它是除内存分配管理外的最基础实现。核心函数包含但不限于:dictFinddictNextdictAdddictDeletedictFetchValuedictGetHashdictReplacedictAddRawdictCreatedictDeletedictReleasedictUnlinkdictRehashdictEmptydictResize等。

3.3. 核心宏

CLUSTER_SLOTS

定义Redis集群Slots个数,值为16384

DICT_HT_INITIAL_SIZE

定义哈希表默认大小宏,值为4

dictSetVal

设置Value

dictSetKey

设置Key

dictCompareKeys

比较两个Key

dictHashKey

Key求哈希

dictGetKey

Key

dictGetVal

Value

dictSlots

得到槽(slot)个数

dictSize

得到已用槽(slot)个数

dictGetSignedIntegerVal

8字节有符号整数值

dictGetUnsignedIntegerVal

8字节无符号整数值

dictGetDoubleVal

double类型值

3.4. 核心结构体

3.4.1. dictEntry

定义了哈希节点的数据结构:

typedef struct dictEntry {

    void *key// Key

    union { // 支持4种类型的值

        void *val; // 字符串值,或二进制值

        uint64_t u64; // 8字节无符号整数值

        int64_t s64; // 8字节有符号整数值

        double d; // 双精度浮点值

    } v;

    struct dictEntry *next; // 下一节点,链地址冲突解决法

} dictEntry;

4. Redis命令

各具体的命令并不做持久化(写AOF等)和传播给Slaves,这两项是公共操作,在上层统一执行,具体函数为server.c中的“void call(client *c, int flags)”,具体执行函数为server.c中的“void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags)”。

4.1. SELECT命令

群里有人问select跟普通命令的效率,猜想select没有效率问题,阅读源代码也证实了这一点。因为它只是改变指针指向,所以不存在效率问题,自然比普通命令效率要高。

Redis支持的所有命令,都存储在类型为redisCommand函数表中,函数表名为redisCommandTable的数组中。

4.1.1. redisCommand结构体

如果以C++来理解redisCommand,可以将redisCommand看抽象基类,它定义了两个虚函数procgetkeys_proc,各种command是它的具体实现。

// server.h

struct redisCommand {

    char *name// 命令名,比如:GET

    redisCommandProc *proc; // 命令处理过程(函数指针)

    // 命令参数的个数,用于检查命令请求的格式是否正确

    // 如果这个值为负数-N,那么表示参数的数量大于等于N

    // 注意:命令的名字本身也是一个参数。

    int arity;

    // 字符串形式的标识值, 这个值记录了命令的属性

    // 比如是读命令还是写命令,是否允许在载入数据时使用,是否允许在 Lua 脚本中使用等

    char *sflags; /* Flags as string representation, one char per flag. */

    // sflags对应的二进制值,方便程序“与”、“或”等操作操作

    int flags;    /* The actual flags, obtained from the 'sflags' field. */

 

    /* Use a function to determine keys arguments in a command line.

     * Used for Redis Cluster redirect. */

    redisGetKeysProc *getkeys_proc// 函数指针

    /* What keys should be loaded in background when calling this command? */

    int firstkey; /* The first argument that's a key (0 = no keys) */

    int lastkey;  /* The last argument that's a key */

    int keystep;  /* The step between first and last key */

 

    // 服务器执行这个命令所耗费的总时长

    long long microseconds;;

    // 服务器总共执行了多少次这个命令

    long long calls;

};

4.1.2. redisCommandTable变量

redisCommandTable的部分定义:

// server.c

struct redisCommand redisCommandTable[] = {

    {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},

    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},

    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},

    。。。。。。

    {"select",selectCommand,2,"lF",0,NULL,0,0,0,0,0},

    。。。。。。

    {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},

    {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},

    {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}

};

4.1.3. selectCommand函数

// server.c

void selectCommand(client *c) {

    long id;

 

    // 第一个参数值为DB的ID

    if (getLongFromObjectOrReply(c, c->argv[1], &id,

        "invalid DB index") != C_OK)

        return;

 

    // 集群不支持SELECT命令

    if (server.cluster_enabled && id != 0) {

        addReplyError(c,"SELECT is not allowed in cluster mode");

        return;

    }

    if (selectDb(c,id) == C_ERR) { // 切换DB

        addReplyError(c,"DB index is out of range");

    } else {

        addReply(c,shared.ok);

    }

}

 

// db.c

int selectDb(client *c, int id) {

    if (id < 0 || id >= server.dbnum// dbnum值由redis.conf中的databases决定,默认为16

        return C_ERR;

    c->db = &server.db[id];

    return C_OK;

}

4.2. SET命令

SET命令主要是字典操作(dict)。

4.2.1. setCommand函数

// t_string.c

/* SET key value [NX] [XX] [EX ] [PX ] */

void setCommand(client *c) {

    int j;

    robj *expire = NULL;

    int unit = UNIT_SECONDS;

    int flags = OBJ_SET_NO_FLAGS;

 

    // 以下一长段均为参数选项解析,

    // 第一3个开始,因为第一个为命令SET自身,第二个为KEY,第三个为VALUE

    for (j = 3; j < c->argc; j++) {

        char *a = c->argv[j]->ptr;

        robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];

 

        if ((a[0] == 'n' || a[0] == 'N') &&

            (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&

            !(flags & OBJ_SET_XX))

        {

            flags |= OBJ_SET_NX;

        } else if ((a[0] == 'x' || a[0] == 'X') &&

                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&

                   !(flags & OBJ_SET_NX))

        {

            flags |= OBJ_SET_XX;

        } else if ((a[0] == 'e' || a[0] == 'E') &&

                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&

                   !(flags & OBJ_SET_PX) && next)

        {

            flags |= OBJ_SET_EX;

            unit = UNIT_SECONDS;

            expire = next;

            j++;

        } else if ((a[0] == 'p' || a[0] == 'P') &&

                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&

                   !(flags & OBJ_SET_EX) && next)

        {

            flags |= OBJ_SET_PX;

            unit = UNIT_MILLISECONDS;

            expire = next;

            j++;

        } else {

            addReply(c,shared.syntaxerr); // 语法错误

            return;

        }

    }

 

    c->argv[2] = tryObjectEncoding(c->argv[2]);

    setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);

}

 

// t_string.c

void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {

    。。。。。。

    setKey(c->db,key,val);

    server.dirty++;

    if (expire) setExpire(c,c->db,key,mstime()+milliseconds);

    notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);

    if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);

    addReply(c, ok_reply ? ok_reply : shared.ok);

}

 

// db.c

void setKey(redisDb *db, robj *key, robj *val) {

    if (lookupKeyWrite(db,key) == NULL) { // 性能开销主要在这,参见dictFind函数

        dbAdd(db,key,val);

    } else {

        dbOverwrite(db,key,val);

    }

    incrRefCount(val);

    removeExpire(db,key);

    signalModifiedKey(db,key);

}

 

// db.c

/* Add the key to the DB. It's up to the caller to increment the reference

 * counter of the value if needed.

 *

 * The program is aborted if the key already exists. */

void dbAdd(redisDb *db, robj *key, robj *val) {

    sds copy = sdsdup(key->ptr);

    int retval = dictAdd(db->dict, copy, val);

 

    serverAssertWithInfo(NULL,key,retval == DICT_OK);

    if (val->type == OBJ_LIST ||

        val->type == OBJ_ZSET)

        signalKeyAsReady(db, key);

    if (server.cluster_enabled) slotToKeyAdd(key);

}

 

// dict.h

#define dictSetVal(d, entry, _val_) do { \

    if ((d)->type->valDup) \

        (entry)->v.val = (d)->type->valDup((d)->privdata, _val_); \

    else \

        (entry)->v.val = (_val_); \

} while(0)

 

// db.c

// KEY存在时覆盖写

/* Overwrite an existing key with a new value. Incrementing the reference

 * count of the new value is up to the caller.

 * This function does not modify the expire time of the existing key.

 *

 * The program is aborted if the key was not already present. */

void dbOverwrite(redisDb *db, robj *key, robj *val) {

    dictEntry *de = dictFind(db->dict,key->ptr);

 

    serverAssertWithInfo(NULL,key,de != NULL);

    dictEntry auxentry = *de;

    robj *old = dictGetVal(de);

 

    // 内存策略由redis.conf中的maxmemory-policy决定

    // LFU策略是4.0开始引入的(Least Frequently Used,最不经常使用

    // #define MAXMEMORY_FLAG_LRU (1<<0)

    // #define MAXMEMORY_FLAG_LFU (1<<1)

    // #define MAXMEMORY_FLAG_ALLKEYS (1<<2)

    // #define MAXMEMORY_FLAG_NO_SHARED_INTEGERS (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU)

    // #define MAXMEMORY_VOLATILE_LRU ((0<<8)|MAXMEMORY_FLAG_LRU)

    // #define MAXMEMORY_VOLATILE_LFU ((1<<8)|MAXMEMORY_FLAG_LFU)

    // #define MAXMEMORY_VOLATILE_TTL (2<<8)

    // #define MAXMEMORY_VOLATILE_RANDOM (3<<8)

    // #define MAXMEMORY_ALLKEYS_LRU ((4<<8)|MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_ALLKEYS)

    // #define MAXMEMORY_ALLKEYS_LFU ((5<<8)|MAXMEMORY_FLAG_LFU|MAXMEMORY_FLAG_ALLKEYS)

    // #define MAXMEMORY_ALLKEYS_RANDOM ((6<<8)|MAXMEMORY_FLAG_ALLKEYS)

    // #define MAXMEMORY_NO_EVICTION (7<<8)

    if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {

        val->lru = old->lru;

    }

    dictSetVal(db->dict, de, val);

 

    if (server.lazyfree_lazy_server_del) {

        freeObjAsync(old);

        dictSetVal(db->dict, &auxentry, NULL);

    }

 

    dictFreeVal(db->dict, &auxentry);

}

4.2.2. dictAdd函数

// dict.c

// KEY不存在时新增

/* Add an element to the target hash table */

int dictAdd(dict *d, void *key, void *val)

{

    dictEntry *entry = dictAddRaw(d,key,NULL);

 

    if (!entry) return DICT_ERR;

    dictSetVal(d, entry, val);

    return DICT_OK;

}

4.2.3. dictFind函数

字典查找实现:

dictEntry *dictFind(dict *d, const void *key)

{

    dictEntry *he;

    uint64_t h, idx, table;

 

    if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */

    if (dictIsRehashing(d)) _dictRehashStep(d);

 

    // #define dictHashKey(d, key) (d)->type->hashFunction(key)

    h = dictHashKey(d, key); // 对key求哈希值

    for (table = 0; table <= 1; table++) {

        idx = h & d->ht[table].sizemask;

        he = d->ht[table].table[idx];

        while(he) { // 链式冲突解决法

            if (key==he->key || dictCompareKeys(d, key, he->key))

                return he;

            he = he->next;

        }

        if (!dictIsRehashing(d)) return NULL; // 重哈希

    }

    return NULL;

}

 

typedef struct dictType {

    uint64_t (*hashFunction)(const void *key);

    void *(*keyDup)(void *privdata, const void *key);

    void *(*valDup)(void *privdata, const void *obj);

    int (*keyCompare)(void *privdata, const void *key1, const void *key2);

    void (*keyDestructor)(void *privdata, void *key);

    void (*valDestructor)(void *privdata, void *obj);

} dictType;

4.3. HSETNX命令

4.3.1. hsetnxCommand函数

// t_hash.c

void hsetnxCommand(client *c) {

    robj *o;

    if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;

    hashTypeTryConversion(o,c->argv,2,3);

 

    if (hashTypeExists(o, c->argv[2]->ptr)) {

        addReply(c, shared.czero);

    } else {

        hashTypeSet(o,c->argv[2]->ptr,c->argv[3]->ptr,HASH_SET_COPY);

        addReply(c, shared.cone);

        signalModifiedKey(c->db,c->argv[1]);

        notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);

        server.dirty++;

    }

}

4.3.2. hashTypeSet函数

// t_hash.c

int hashTypeSet(robj *o, sds field, sds value, int flags) {

    if (o->encoding == OBJ_ENCODING_ZIPLIST) {

        // ziplistInsert

        // ziplistPush

    } else if (o->encoding == OBJ_ENCODING_HT) {

        。。。。。。

        dictAdd(o->ptr,f,v);

    } else {

        serverPanic("Unknown hash encoding");

    }

}

5. Redis多路复用机制

5.1. AE是什么?

AE”为“A simple event-driven programming library”的缩写,翻译成中文,即一个简单的事件驱动编程库。就Linux而言,可简单理解为对epoll的封装。

5.2. 多路复用选择aeApiPoll

Redis支持selectepollkqueueevport四种模式,编译Redis时决定采用哪一种,因此运行时只会有一种有效。优先顺序是:evport -> epoll -> kqueue -> select,其中select为兜底用,因为所有系统都会支持select

对应的源码文件,分别为:

evport

ae_evport.c

Solaris

epoll

ae_epoll.c

Linux系统

kqueue

ae_kqueue.c

BSD类系统

select

ae_select.c

其它不支持epoll、evport和kqueue系统


桥接它们的文件则是ae.c,桥接代码:

/* Include the best multiplexing layer supported by this system.

 * The following should be ordered by performances, descending. */

#ifdef HAVE_EVPORT

#include "ae_evport.c"

#else

    #ifdef HAVE_EPOLL

    #include "ae_epoll.c"

    #else

        #ifdef HAVE_KQUEUE

        #include "ae_kqueue.c"

        #else

        #include "ae_select.c"

        #endif

    #endif

#endif


HAVE_EVPORTHAVE_EPOLLHAVE_KQUEUE则在config.h中决议出:

#ifdef __sun

#include 

#ifdef _DTRACE_VERSION

#define HAVE_EVPORT 1

#endif

#endif

 

#ifdef __linux__

#define HAVE_EPOLL 1

#endif

 

#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)

#define HAVE_KQUEUE 1

#endif

5.3. 多路复用调用aeCreateFileEvent

main函数开始的调用过程(读事件和写事件的顺序,作者在这里用了个小技巧,支持优先响应是查询,即立即返回查询结果):

int main(int argc, char **argv) { // server.c

    。。。。。。

    // aeMain(server.el);

    void aeMain(aeEventLoop *eventLoop) // ae.c

    {

        while (!eventLoop->stop) {

            if (eventLoop->beforesleep != NULL)

                eventLoop->beforesleep(eventLoop); // 这很重要,写AOF文件在这里进行

            // aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);

            int aeProcessEvents(aeEventLoop *eventLoop, int flags) { // ae.c

                // linux实际调用的是ae_epoll.c中的aeApiPoll函数

                numevents = aeApiPoll(eventLoop, tvp);

                if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)

                    eventLoop->aftersleep(eventLoop);

                    

                for (j = 0; j < numevents; j++) {

                    // 一般优先处理读事件,然后再写事件,但有时为立即响应是查询则写优先

                    /* Normally we execute the readable event first, 

                       and the writable event laster. */

                    int invert = fe->mask & AE_BARRIER;

                    /* Fire the readable event if the call sequence is not inverted. */

                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);

                    /* Fire the writable event. */

                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);

                    /* If we have to invert the call, 

                    fire the readable event now after the writable one. */

                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);

                    // 函数aeCreateFileEvent中设置rfileProcwfileProc

                    // rfileProcwfileProc的初始化均在aeCreateFileEvent中完成

                }

            }

        }

    }

    。。。。。。

}

 

// 初始化rfileProc和wfileProc

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,

                      aeFileProc *proc, void *clientData)

{

    if (fd >= eventLoop->setsize) {

        errno = ERANGE;

        return AE_ERR;

    }

    aeFileEvent *fe = &eventLoop->events[fd]; // events是一个以fd为下标的数组,0查找

 

    if (aeApiAddEvent(eventLoop, fd, mask) == -1) // fd放到epoll或select等中

        return AE_ERR;

    fe->mask |= mask; // 读写事件

    if (mask & AE_READABLE) fe->rfileProc = proc;

    if (mask & AE_WRITABLE) fe->wfileProc = proc;

    fe->clientData = clientData; // 附加数据

    if (fd > eventLoop->maxfd)

        eventLoop->maxfd = fd; // 这个主要给select时用,epoll等不需要

    return AE_OK;

}

6. 网络收发过程

6.1. 过程简要

main

-> initServer

-> aeCreateFileEvent(acceptTcpHandler)

-> createClient

-> aeCreateFileEvent(readQueryFromClient)

-> read

-> processCommand

-> lookupCommand

-> call(c,CMD_CALL_FULL) // call在调用proc后,会调用propagate函数将命令传播给AOF和Slaves

-> c->cmd->proc(c) // 这个proc就是各种redis命令处理过程,比如:setCommand、hgetCommand等

6.2. 接受连接请求acceptTcpHandler

main函数中,会为accept注册一个事件acceptTcpHandler:

// server.c

int main(int argc, char **argv) {

    void initServer(void) {

        // 支持多个IP,即redis.conf中可以bind多个IP

        for (j = 0; j < server.ipfd_count; j++) {

            if (aeCreateFileEvent(server.el, server.ipfd[j], // 以事件方式加入到epoll中

                AE_READABLE, acceptTcpHandler, NULL) == AE_ERR)

            {

                serverPanic("Unrecoverable error creating server.ipfd file event.");

            }

        }

    }

}

 

// networking.c

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {

    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;

    char cip[NET_IP_STR_LEN];

    UNUSED(el);

    UNUSED(mask);

    UNUSED(privdata);

 

    // 一次性连接接受max个连接,以提升效率

    while(max--) {

        // anetTcpAccept处理了中断ENTR,方式是重试到成功或出错

        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);

        if (cfd == ANET_ERR) {

            if (errno != EWOULDBLOCK) // EWOULDBLOCK是正常的

                serverLog(LL_WARNING,

                    "Accepting client connection: %s", server.neterr);

            return;

        }

        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);

        acceptCommonHandler(cfd,0,cip);

    }

}

 

// networking.c

static void acceptCommonHandler(int fd, int flags, char *ip) {

    client *c;

    // createClient为aceeptCommonHandler调用的核心函数

    // 在createClient中会为fd关联读事件过程,并加入到epoll中

    c = createClient(fd);

    

    // 控制连接数不超过redis.conf上配置的最大数

    if (listLength(server.clients) > server.maxclients) {

        char *err = "-ERR max number of clients reached\r\n";

        /* That's a best effort error message, don't check write errors */

        if (write(c->fd,err,strlen(err)) == -1) {

            /* Nothing to do, Just to avoid the warning... */

        }

        

        server.stat_rejected_conn++; // 被拒绝的连接数增一

        freeClient(c);

        return;

    }

    

    // 保护模式处理

    if (server.protected_mode &&

        server.bindaddr_count == 0 &&

        server.requirepass == NULL &&

        !(flags & CLIENT_UNIX_SOCKET) &&

        ip != NULL)

    {

        // 保护模式下,只允许127.0.0.1连接

        if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {

            char *err = "-DENIED Redis is running in protected mode because protected。。。。。。";

            if (write(c->fd,err,strlen(err)) == -1) {

                /* Nothing to do, Just to avoid the warning... */

            }

            

            server.stat_rejected_conn++; // 被拒绝的连接数增一

            freeClient(c);

            return;

        }

    }

    

    server.stat_numconnections++; // 当前连接数增一

    c->flags |= flags;

}

6.3. 创建client对象createClient

// networking.c

client *createClient(int fd) {

    client *c = zmalloc(sizeof(client));

    // 如果redis.conf中开启了tcpkeepalive,配置项名tcp-keepalive,单位:秒

    if (server.tcpkeepalive)

        anetKeepAlive(NULL,fd,server.tcpkeepalive);

    // 注册c到epoll,并关联readQueryFromClient,事件为读事件AE_READABLE

    if (aeCreateFileEvent(server.el,fd,AE_READABLEreadQueryFromClient, c) == AE_ERR)

    {

        close(fd);

        zfree(c);

        return NULL;

    }

    

    selectDb(c,0); // 默认选择第1个DB,调用者需要执行select命令切换

    。。。。。。

}

6.4. 读取命令readQueryFromClient

// networking.c

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {

    client *c = (client*)privdata;

    int nread = read(fd, c->querybuf+qblen, readlen);

 

    processInputBufferAndReplicate(c);

}

 

/* This is a wrapper for processInputBuffer that also cares about handling

 * the replication forwarding to the sub-slaves, in case the client 'c'

 * is flagged as master. Usually you want to call this instead of the

 * raw processInputBuffer(). */

void processInputBufferAndReplicate(client *c) {

    if (!(c->flags & CLIENT_MASTER)) {

        processInputBuffer(c); // 处理命令

    } else {

        size_t prev_offset = c->reploff;

        processInputBuffer(c);

        size_t applied = c->reploff - prev_offset;

        if (applied) {

            // 处理主从复制

            replicationFeedSlavesFromMasterStream(server.slaves,

                    c->pending_querybuf, applied);

            sdsrange(c->pending_querybuf,applied,-1);

        }

    }

}

6.5. 命令处理processCommand

processInputBuffer调用processCommand,大量逻辑在processCommand中:

// networking.c

/* If this function gets called we already read a whole

 * command, arguments are in the client argv/argc fields.

 * processCommand() execute the command or prepare the

 * server for a bulk read from the client.

 *

 * If C_OK is returned the client is still alive and valid and

 * other operations can be performed by the caller. Otherwise

 * if C_ERR is returned the client was destroyed (i.e. after QUIT). */

void processInputBuffer(client *c) {

    server.current_client = c;

    

    int processCommand(client *c) {

        // 单独处理QUIT命令

        /* The QUIT command is handled separately. Normal command procs will

         * go through checking for replication and QUIT will cause trouble

         * when FORCE_REPLICATION is enabled and would be implemented in

         * a regular command proc. */

        if (!strcasecmp(c->argv[0]->ptr,"quit")) {

            addReply(c,shared.ok);

            c->flags |= CLIENT_CLOSE_AFTER_REPLY;

            return C_ERR;

        }

        

        // 查找命令的处理

        /* Now lookup the command and check ASAP about trivial error conditions

         * such as wrong arity, bad command name and so forth. */

        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);

        if (!c->cmd) {

            addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",。。。);

            sdsfree(args);

            return C_OK;

        }

        else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||

                 (c->argc < -c->cmd->arity)) {

            // 参数个数不对

            addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name);

            return C_OK;

        }

        

        /* Check if the user is authenticated */

        if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) {

            // 检查是否有设置密码,和密码是否已经验证通过

            addReply(c,shared.noautherr);

            return C_OK;

        }

        

        // 检查是否需要重定向

        /* If cluster is enabled perform the cluster redirection here.

         * However we don't perform the redirection if:

         * 1) The sender of this command is our master.

         * 2) The command has no key arguments. */

        if (server.cluster_enabled &&

          !(c->flags & CLIENT_MASTER) &&

          !(c->flags & CLIENT_LUA &&

           server.lua_caller->flags & CLIENT_MASTER) &&

          !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&

           c->cmd->proc != execCommand)) {

            if (n == NULL || n != server.cluster->myself) {

                if (c->cmd->proc == execCommand) {

                    discardTransaction(c);

                }

                else {

                    flagTransaction(c);

                }

                clusterRedirectClient(c,n,hashslot,error_code);

                return C_OK;                

            }

        }

        

        // 检查最大内存,如果超过配置的maxmemory,

        // 则返回错误“-OOM command not allowed when used memory > 'maxmemory'.”

        /* Handle the maxmemory directive.

         *

         * First we try to free some memory if possible (if there are volatile

         * keys in the dataset). If there are not the only thing we can do

         * is returning an error.

         *

         * Note that we do not want to reclaim memory if we are here re-entering

         * the event loop since there is a busy Lua script running in timeout

         * condition, to avoid mixing the propagation of scripts with the propagation

         * of DELs due to eviction. */

        if (server.maxmemory && !server.lua_timedout) {

            /* It was impossible to free enough memory, and the command the client

             * is trying to execute is denied during OOM conditions? Error. */

            if ((c->cmd->flags & CMD_DENYOOM) && out_of_memory) {

                addReply(c, shared.oomerr);

                return C_OK;

            }

        }

        

        // 磁盘有问题不接受写请求

        /* Don't accept write commands if there are problems persisting on disk

         * and if this is a master instance. */

        int deny_write_type = writeCommandsDeniedByDiskError();

        if (deny_write_type != DISK_ERROR_TYPE_NONE && server.masterhost == NULL &&

            (c->cmd->flags & CMD_WRITE || c->cmd->proc == pingCommand)) {

            if (deny_write_type == DISK_ERROR_TYPE_RDB)

                addReply(c, shared.bgsaveerr);

            else

                addReplySds(c, sdscatprintf(sdsempty(),

                    "-MISCONF Errors writing to the AOF file: %s\r\n",

                    strerror(server.aof_last_write_errno)));

            return C_OK;

        }

        

        // 不接受写请求,如果没有足够多好的slave

        /* Don't accept write commands if there are not enough good slaves and

         * user configured the min-slaves-to-write option. */

        if (server.masterhost == NULL &&

            server.repl_min_slaves_to_write &&

            server.repl_min_slaves_max_lag &&

            c->cmd->flags & CMD_WRITE &&

            server.repl_good_slaves_count < server.repl_min_slaves_to_write) {

            addReply(c, shared.noreplicaserr);

            return C_OK;

        }

        

        // 只读的slave不接受写请求

        /* Don't accept write commands if this is a read only slave. But

         * accept write commands if this is our master. */

        if (server.masterhost && server.repl_slave_ro &&

            !(c->flags & CLIENT_MASTER) &&

            c->cmd->flags & CMD_WRITE)

        {

            addReply(c, shared.roslaveerr);

            return C_OK;

        }

        

        // 只接受SUB、UNSUB、PSUB、PUNSUB和PING命令

        /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */

        if (c->flags & CLIENT_PUBSUB &&

            c->cmd->proc != pingCommand &&

            c->cmd->proc != subscribeCommand &&

            c->cmd->proc != unsubscribeCommand &&

            c->cmd->proc != psubscribeCommand &&

            c->cmd->proc != punsubscribeCommand) {

            addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");

            return C_OK;

        }

 

        /* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,

         * when slave-serve-stale-data is no and we are a slave with a broken

         * link with master. */

        if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&

            server.repl_serve_stale_data == 0 &&

            !(c->cmd->flags & CMD_STALE))

        {

            flagTransaction(c);

            addReply(c, shared.masterdownerr);

            return C_OK;

        }

        

        // 正在加载DB,直接返回错误

        /* Loading DB? Return an error if the command has not the

         * CMD_LOADING flag. */

        if (server.loading && !(c->cmd->flags & CMD_LOADING)) {

            addReply(c, shared.loadingerr);

            return C_OK;

        }

        

        /* Lua script too slow? Only allow a limited number of commands. */

        if (server.lua_timedout &&

              c->cmd->proc != authCommand &&

              c->cmd->proc != replconfCommand &&

            !(c->cmd->proc == shutdownCommand &&

              c->argc == 2 &&

              tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&

            !(c->cmd->proc == scriptCommand &&

              c->argc == 2 &&

              tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))

        {

            flagTransaction(c);

            addReply(c, shared.slowscripterr);

            return C_OK;

        }

 

        /* Exec the command */

        if (c->flags & CLIENT_MULTI &&

            c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&

            c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)

        {

            /* Add a new command into the MULTI commands queue */

            queueMultiCommand(c); // 链上新的事务子命令(队列结构),等到EXEC时一块执行

            addReply(c,shared.queued); // 向client返回QUEUED

        } else {

            call(c,CMD_CALL_FULL); // 调用命令处理过程

            c->woff = server.master_repl_offset; // 更新复制偏移

            if (listLength(server.ready_keys))

                handleClientsBlockedOnKeys();

        }

        return C_OK;

    }

}

7. 持久化和复制

call在调用proc后,会调用propagate函数将命令传播给AOFSlaves

int processCommand(client *c) {

    。。。。。。

    void call(client *c, int flags) {

        。。。。。。

        propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);

    }

}

7.1. propagate函数(aof&replication

/* Propagate the specified command (in the context of the specified database id)

 * to AOF and Slaves.

 *

 * flags are an xor between:

 * + PROPAGATE_NONE (no propagation of command at all)

 * + PROPAGATE_AOF (propagate into the AOF file if is enabled)

 * + PROPAGATE_REPL (propagate into the replication link)

 *

 * This should not be used inside commands implementation. Use instead

 * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().

 */

void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags)

{

    if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)

        feedAppendOnlyFile(cmd,dbid,argv,argc); // 写AOF

    if (flags & PROPAGATE_REPL)

        replicationFeedSlaves(server.slaves,dbid,argv,argc); // 推送给Slaves

}

7.2. 写AOF文件feedAppendOnlyFile

feedAppendOnlyFile并不实际写文件,而只是将内容组织到AOF buffer中。写AOF文件是在每次epoll之前进行,redis-server启动时注册了函数beforeSleep,它在每次epoll之前被调用。beforeSleep调用flushAppendOnlyFileAOF buffer写入到AOF文件。

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {

    。。。。。。

    /* Append to the AOF buffer. This will be flushed on disk just before

     * of re-entering the event loop, so before the client will get a

     * positive reply about the operation performed. */

    if (server.aof_state == AOF_ON// aof_state值由redis.conf中的appendonly控制

        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

}


redis-server启动:

int main(int argc, char **argv) {

    aeSetBeforeSleepProc(server.el, beforeSleep); // 注册回调beforeSleep

    aeSetAfterSleepProc(server.el, afterSleep);

    aeMain(server.el);

}


调用beforeSleep

void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

        if (eventLoop->beforesleep != NULL)

            eventLoop->beforesleep(eventLoop); // 将AOF buffer写入到AOF文件

        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); // epoll

    }

}


调用flushAppendOnlyFile函数将AOF buffer写入到AOF文件:

/* This function gets called every time Redis is entering the

 * main loop of the event driven library, that is, before to sleep

 * for ready file descriptors. */

void beforeSleep(struct aeEventLoop *eventLoop) {

    。。。。。。

    /* Unblock all the clients blocked for synchronous replication

     * in WAIT. */

    if (listLength(server.clients_waiting_acks))

        processClientsWaitingReplicas();

    。。。。。。        

    /* Try to process pending commands for clients that were just unblocked. */

    if (listLength(server.unblocked_clients))

        processUnblockedClients();

        

    /* Write the AOF buffer on disk */

    flushAppendOnlyFile(0);

    。。。。。。

    /* Handle writes with pending output buffers. */

    handleClientsWithPendingWrites();

    。。。。。。

}


AOF文件:

/* Write the append only file buffer on disk.

 *

 * Since we are required to write the AOF before replying to the client,

 * and the only way the client socket can get a write is entering when the

 * the event loop, we accumulate all the AOF writes in a memory

 * buffer and write it on disk using this function just before entering

 * the event loop again.

 *

 * About the 'force' argument:

 *

 * When the fsync policy is set to 'everysec' we may delay the flush if there

 * is still an fsync() going on in the background thread, since for instance

 * on Linux write(2) will be blocked by the background fsync anyway.

 * When this happens we remember that there is some aof buffer to be

 * flushed ASAP, and will try to do that in the serverCron() function.

 *

 * However if force is set to 1 we'll write regardless of the background

 * fsync. */

#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */

void flushAppendOnlyFile(int force) {

    // aofWrite调用write将AOF buffer写入到AOF文件,处理了ENTR,其它没什么

    ssize_t nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));

    。。。。。。

    /* Handle the AOF write error. */

    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {

        /* We can't recover when the fsync policy is ALWAYS since the

         * reply for the client is already in the output buffers, and we

         * have the contract with the user that on acknowledged write data

         * is synced on disk. */

        serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");

        exit(1);

    } else {

        return; /* We'll try again on the next call... */

    } else {

        /* Successful write(2). If AOF was in error state, restore the

         * OK state and log the event. */

    }

    。。。。。。

    /* Perform the fsync if needed. */

    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {

        // redis_fsync是一个宏,Linux实际为fdatasync,其它为fsync

        // 所以最好不要将redis.conf中的appendfsync设置为always,这极影响性能         

          // 注意,Redis的实现并没有处理fsync和close的返回值,所以还不是最严格的


        redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */

    }

    else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) {

        // 如果已在sync状态,则不再重复

        // BIO线程会间隔设置sync_in_progress

        // if (server.aof_fsync == AOF_FSYNC_EVERYSEC)

        //     sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;

        if (!sync_in_progress)

            // eversec性能并不那么糟糕,因为它:

            // 后台方式执行fsync

            // Redis并不是严格意义上的单线程,实际上它创建一组BIO线程,专门处理阻塞和慢操作

            // 这些操作就包括FSYNC,另外还有关闭文件和内存的free两个操作。

            // 不像always,EVERYSEC模式并不立即调用fsync,

            // 而是将这个操作丢给了BIO线程异步执行,

            // BIO线程在进程启动时被创建,两者间通过bio_jobs和bio_pending两个

            // 全局对象交互,其中主线程负责写,BIO线程负责消费。

            aof_background_fsync(server.aof_fd);

        server.aof_last_fsync = server.unixtime;

    }

}

7.3. 数据复制replicationFeedSlaves

这里也不会真正将数据发给Slaves,而只是将数据放入到replication buffer中。

/* Propagate write commands to slaves, and populate the replication backlog

 * as well. This function is used if the instance is a master: we use

 * the commands received by our clients in order to create the replication

 * stream. Instead if the instance is a slave and has sub-slaves attached,

 * we use replicationFeedSlavesFromMaster() */

void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {

    。。。。。。

    /* Write the command to every slave. */

    listRewind(slaves,&li);

    while((ln = listNext(&li))) { // 遍历所有的slaves

        client *slave = ln->value; // Slave也被当作client

 

        /* Don't feed slaves that are still waiting for BGSAVE to start */

        if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;

 

        /* Feed slaves that are waiting for the initial SYNC (so these commands

         * are queued in the output buffer until the initial SYNC completes),

         * or are already in sync with the master. */

 

        /* Add the multi bulk length. */

        addReplyMultiBulkLen(slave,argc);

 

        /* Finally any additional argument that was not stored inside the

         * static buffer if any (from j to argc). */

        for (j = 0; j < argc; j++)

            addReplyBulk(slave,argv[j]);

    }

}

8. 进程启动时做了些什么?

进程启动时事项:

1) 为所有配置设置默认值

2) 加载配置文件redis.conf,覆盖默认值

3) 安装信号处理器

4) 初始化全局状态

5) 创建epoll,启动TCP监控

6) 创建BIO线程

7) 加载module

8) 加载AOFRDB

9) 设置beforeSleep和afterSleep两个回调

10) 进入epoll循环

int main(int argc, char **argv) {

    // 初始化所有配置,为它们设置默认值

    initServerConfig();

    

    // 如果是sentinel模式,额外做相关初始化

    // sentinel_mode的值由命令行参数中是否包含参数“--sentinel”决定

    if (server.sentinel_mode) {

        initSentinelConfig();

        initSentinel();

    }

 

    // redis-server和redis-check-rdb、redis-check-aof共享了同一个main函数

    // 进入redis_check_rdb_main或redis_check_aof_main后不会再返回,

    // 也就是这两个函数后的代码均不会被执行。

    if (strstr(argv[0],"redis-check-rdb") != NULL)

        redis_check_rdb_main(argc,argv,NULL);

    else if (strstr(argv[0],"redis-check-aof") != NULL)

        redis_check_aof_main(argc,argv);

 

    // 加载redis.conf,

    // 包括初始化module列表loadmodule_queue,

    // 注意会将redis-server的命令行参数一并传递给module

    loadServerConfig(configfile,options);

    

    // 置为deamon方式,

    // 可通过redis.conf中的daemonize控制是否以deamon方式运行

    if (background) daemonize();

    

    // 对redis-server核心的初始化,包括但不限于:

    // 1) 忽略HUP和PIPE信号

    // 2) 安装TERM和INT两个信号的处理器

    // 3) 初始化全局的redisServer对象server

    // 4) 注册定时器回调serverCron

    // 5) 注册accept的回调acceptTcpHandler

    // 6) 注册UNIX套接字回调acceptUnixHandler

    // 7) 注册模块回调moduleBlockedClientPipeReadable

    // 8) 创建或追加模式打开AOF文件

    // 9) 集群server.cluster初始化

    // 10) 复制脚本初始化

    // 11) 初始化lua运行环境

    // 12) 慢日志初始化

    // 13) 初始化延迟监控(latency monitor)

    // 14) 创建BIO线程,由bio.h中的BIO_NUM_OPS决定个数,当前为3个

    initServer();

    

    // 如果是daemon方式,则创建pid文件

    if (background || server.pidfile) createPidFile();

    

    // 修改进程的title,加入状态和端口号,这样对ps等更为友好

    redisSetProcTitle(argv[0]);

    

    // 进程启动时显示LOGO,

    // 可通过redis.conf中的always-show-logo控制是否显示

    redisAsciiArt();

    

    // 检查TCP的“/proc/sys/net/core/somaxconn”

    checkTcpBacklogSettings

    

    if (server.sentinel_mode) {

        sentinelIsRunning();

    } else {

        // 如果是Linux,

        // 检查“/proc/sys/vm/overcommit_memory”

        // 和“/sys/kernel/mm/transparent_hugepage/enabled”的值。

        // “/proc/sys/vm/overcommit_memory”值需要为1,

        // “/sys/kernel/mm/transparent_hugepage/enabled”需要为“never”

        linuxMemoryWarnings();

        

        // 加载所有的模块,如果有一个加载失败则启动失败

        // 具体有哪些模块,由redis.conf中的loadmodule决定,如:

        // loadmodule /path/to/my_module.so

        // loadmodule /path/to/other_module.so

        // 每个module一行

        moduleLoadFromQueue();

        

        // 从磁盘加载RDB或AOF到内存

        // 如果开启了AOF,

        // 则调用loadAppendOnlyFile加载AOF文件,这个时候不会加载RDB文件

        // 没开启AOF时,调用rdbLoad加载RDB文件

        // 不管是AOF还是RDB,加载不成功时,进程均不能启动。

        loadDataFromDisk();

        

        if (server.cluster_enabled) {

            if (verifyClusterConfigWithData() == C_ERR) {

                serverLog(LL_WARNING,

                    "You can't have keys in a DB different than DB 0 when in "

                    "Cluster mode. Exiting.");

                exit(1);

        }

        

        if (server.ipfd_count > 0)

            serverLog(LL_NOTICE,"Ready to accept connections");

        if (server.sofd > 0)

            serverLog(LL_NOTICE,

                "The server is now ready to accept connections at %s",

                server.unixsocket);

    }

    

    // redis.conf中的maxmemory值过小,进行警告提示

    if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {

        serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);

    }

    

    // 注册回调beforeSleep,

    // 对于eversec模式的appendfsync将在这个回调中进行

    aeSetBeforeSleepProc(server.el,beforeSleep);

    aeSetAfterSleepProc(server.el,afterSleep);

    

    // 进入epoll主循环:

    // BSD为kqueue死循环,

    // Solaris为evport死循环,

    // 其它为select死循环

    //

    // 网络数据的收和发,以及所有COMMAND的操作均在这个循环中完成

    aeMain(server.el);

 

    // 关闭epoll,释放相关资源

    aeDeleteEventLoop(server.el);

    return 0;

}

9. 核心对象

Redis虽然采用C语言实现,但内含明显的对象,核心的对象包括:

9.1. redisServer

这是一个超大的对象,实为redis-serverContext对象,承载了redis-server的各种全局状态。下面只列出它的一小部分:

// server.h

struct redisServer {

    /* General */

    pid_t pid; /* Main process pid. */

    char *configfile; /* Absolute config file path, or NULL */

    

    /* Networking */

    int port; /* TCP listening port */

    

    /* AOF persistence */

    int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */

    int aof_fsync; /* Kind of fsync() policy */

    

    /* RDB persistence */

    long long dirty; /* Changes to DB from the last save */

    

    /* Logging */

    char *logfile; /* Path of log file */

    

    /* Replication (master) */

    char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */

    

    /* Replication (slave) */

    char *masterhost; /* Hostname of master */

    

    /* Cluster */

    int cluster_enabled; /* Is cluster enabled? */

    

    /* Scripting */

    lua_State *lua; /* The Lua interpreter. We use just one for all clients */

    

    /* Latency monitor */

    dict *latency_events;

};

9.2. client

一个client对象,可以是一个Client,也可以是一个slave,也可以是一个master

// server.h

/* With multiplexing we need to take per-client state.

 * Clients are taken in a linked list. */

typedef struct client {

    uint64_t id; /* Client incremental unique ID. */

    int fd; /* Client socket. */

    redisDb *db; /* Pointer to currently SELECTed DB. */

    robj *name; /* As set by CLIENT SETNAME. */

    sds querybuf; /* Buffer we use to accumulate client queries. */

    size_t qb_pos; /* The position we have read in querybuf. */

    int argc; /* Num of arguments of current command. */

    robj **argv; /* Arguments of current command. */

    struct redisCommand *cmd, *lastcmd; /* Last command executed. */

    list *reply; /* List of reply objects to send to the client. */

    int authenticated; /* When requirepass is non-NULL. */

    int replstate; /* Replication state if this is a slave. */

    char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */

    

    /* Response buffer */

    int bufpos;

    char buf[PROTO_REPLY_CHUNK_BYTES];

};

9.3. redisCommand

Redis命令结构:

struct redisCommand {

    char *name; // Redis命令名,如:GET等

    redisCommandProc *proc; // 处理该命令的过程

    int arity;

    char *sflags; /* Flags as string representation, one char per flag. */

    int flags;    /* The actual flags, obtained from the 'sflags' field. */

    

    /* Use a function to determine keys arguments in a command line.

     * Used for Redis Cluster redirect. */

    redisGetKeysProc *getkeys_proc;

 

    /* What keys should be loaded in background when calling this command? */

    int firstkey; /* The first argument that's a key (0 = no keys) */

    int lastkey;  /* The last argument that's a key */

    int keystep;  /* The step between first and last key */

    long long microseconds, calls;

};

9.4. redisDb

RedisDB结构:

/* Redis database representation. There are multiple databases identified

 * by integers from 0 (the default database) up to the max configured

 * database. The database number is the 'id' field in the structure. */

typedef struct redisDb {

    dict *dict;             /* The keyspace for this DB */

    dict *expires;          /* Timeout of keys with a timeout set */

    dict *blocking_keys;    /* Keys with clients waiting for data (BLPOP)*/

    dict *ready_keys;       /* Blocked keys that received a PUSH */

    dict *watched_keys;     /* WATCHED keys for MULTI/EXEC CAS */

    int id;                 /* Database ID */

    long long avg_ttl;      /* Average TTL, just for stats */

    list *defrag_later;     /* List of key names to attempt to defrag one by one, gradually. */

} redisDb;

9.5. clusterNode

Redis集群节点结构:

typedef long long mstime_t/* millisecond time type. */

typedef struct clusterNode {

    mstime_t ctime; /* Node object creation time. */

    char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */

    int flags;      /* CLUSTER_NODE_... */

    uint64_t configEpoch; /* Last configEpoch observed for this node */

    unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */

    int numslots;   /* Number of slots handled by this node */

    int numslaves;  /* Number of slave nodes, if this is a master */

    // 所有的slave节点

    struct clusterNode **slaves; /* pointers to slave nodes */

    // master节点

    struct clusterNode *slaveof; /* pointer to the master node. Note that it

                                    may be NULL even if the node is a slave

                                    if we don't have the master node in our

                                    tables. */

    // 最近一次发送ping的时间

    mstime_t ping_sent;      /* Unix time we sent latest ping */

    // 最近一次接收pong的时间

    mstime_t pong_received;  /* Unix time we received the pong */

    mstime_t fail_time;      /* Unix time when FAIL flag was set */

    mstime_t voted_time;     /* Last time we voted for a slave of this master */

    mstime_t repl_offset_time;  /* Unix time we received offset for this node */

    mstime_t orphaned_time;     /* Starting time of orphaned master condition */

    long long repl_offset;      /* Last known repl offset for this node. */

    char ip[NET_IP_STR_LEN];  /* Latest known IP address of this node */

    int port;                   /* Latest known clients port of this node */

    int cport;                  /* Latest known cluster port of this node. */

    clusterLink *link;          /* TCP/IP link with this node */

    list *fail_reports;         /* List of nodes signaling this as failing */

} clusterNode;

9.6. clusterState

Redis集群状态结构:

typedef struct clusterState {

    clusterNode *myself;  /* This node */

    uint64_t currentEpoch;

    int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */

    int size; /* Num of master nodes with at least one slot */

    

    dict *nodes; /* Hash table of name -> clusterNode structures */

    dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */

    

    clusterNode *migrating_slots_to[CLUSTER_SLOTS];

    clusterNode *importing_slots_from[CLUSTER_SLOTS];

    clusterNode *slots[CLUSTER_SLOTS];

    

    mstime_t failover_auth_time; /* Time of previous or next election. */

    int failover_auth_count;    /* Number of votes received so far. */

    int failover_auth_sent;     /* True if we already asked for votes. */

    int failover_auth_rank;     /* This slave rank for current auth request. */

    uint64_t failover_auth_epoch; /* Epoch of the current election. */

    

    /* Manual failover state of master. */

    clusterNode *mf_slave; /* Slave performing the manual failover. */

    

    /* The followign fields are used by masters to take state on elections. */

    uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */

} clusterState;

10. 主备同步

待续。

11. 主备切换

待续。

阅读(567) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~
评论热议
请登录后评论。

登录 注册