推荐: blog.csdn.net/aquester https://github.com/eyjian https://www.cnblogs.com/aquester http://blog.chinaunix.net/uid/20682147.html
全部博文(595)
分类: 大数据
2018-09-19 09:23:07
Redis代码优美,注释也很到位,阅读起来会赏心悦目,大大降低了理解门槛。由于redis单线程几乎完成所有工作,整体逻辑是相当复杂的,涉及了太多状态,作者的技术深厚可见一斑。
Redis的单线程设计给出了一种优雅实现高性能服务思路,在实践中值得借鉴。需要注意Redis并不是严格的单线程,实际上它是多进程+多线程。为解决IO和慢速操作带的性能毛刺和卡顿,Redis实现引入的多进程和多线程。但是它的主体仍然是单线程的,单个线程完成网络IO操作和其它操作。
疑问:Redis选择slot数为16384,是一个偶数,没有采用质数,这个可能并不是一个最好的主意。
RESP |
REdis Serialization Protocol |
Redis序列化协议 |
AE |
A simple Event drived programming library |
一个简单的事务驱动编程库 |
ASAP |
|
|
AOF |
Append Only File |
仅仅追加写文件 |
BIO |
Backgroup IO |
后台IO操作,三种涉及BIO:FSYNC、关闭文件和内存free,均为阻塞或慢操作 |
哈希表的实现,哈希函数使用了siphash哈希算法。
一种非加密的64位哈希算法。
Redis中非常核心的算法——哈希表的实现在这个文件中,很多命令都有用到,比如set/hset等,它是除内存分配管理外的最基础实现。核心函数包含但不限于:dictFind、dictNext、dictAdd、dictDelete、dictFetchValue、dictGetHash、dictReplace、dictAddRaw、dictCreate、dictDelete、dictRelease、dictUnlink、dictRehash、dictEmpty、dictResize等。
CLUSTER_SLOTS |
定义Redis集群Slots个数,值为16384 |
DICT_HT_INITIAL_SIZE |
定义哈希表默认大小宏,值为4 |
dictSetVal |
设置Value |
dictSetKey |
设置Key |
dictCompareKeys |
比较两个Key |
dictHashKey |
对Key求哈希 |
dictGetKey |
取Key |
dictGetVal |
取Value |
dictSlots |
得到槽(slot)个数 |
dictSize |
得到已用槽(slot)个数 |
dictGetSignedIntegerVal |
取8字节有符号整数值 |
dictGetUnsignedIntegerVal |
取8字节无符号整数值 |
dictGetDoubleVal |
取double类型值 |
定义了哈希节点的数据结构:
typedef struct dictEntry { void *key; // Key union { // 支持4种类型的值 void *val; // 字符串值,或二进制值 uint64_t u64; // 8字节无符号整数值 int64_t s64; // 8字节有符号整数值 double d; // 双精度浮点值 } v; struct dictEntry *next; // 下一节点,链地址冲突解决法 } dictEntry; |
各具体的命令并不做持久化(写AOF等)和传播给Slaves,这两项是公共操作,在上层统一执行,具体函数为server.c中的“void call(client *c, int flags)”,具体执行函数为server.c中的“void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags)”。
群里有人问select跟普通命令的效率,猜想select没有效率问题,阅读源代码也证实了这一点。因为它只是改变指针指向,所以不存在效率问题,自然比普通命令效率要高。
Redis支持的所有命令,都存储在类型为redisCommand函数表中,函数表名为redisCommandTable的数组中。
如果以C++来理解redisCommand,可以将redisCommand看抽象基类,它定义了两个虚函数proc和getkeys_proc,各种command是它的具体实现。
// server.h struct redisCommand { char *name; // 命令名,比如:GET redisCommandProc *proc; // 命令处理过程(函数指针) // 命令参数的个数,用于检查命令请求的格式是否正确 // 如果这个值为负数-N,那么表示参数的数量大于等于N // 注意:命令的名字本身也是一个参数。 int arity; // 字符串形式的标识值, 这个值记录了命令的属性 // 比如是读命令还是写命令,是否允许在载入数据时使用,是否允许在 Lua 脚本中使用等 char *sflags; /* Flags as string representation, one char per flag. */ // sflags对应的二进制值,方便程序“与”、“或”等操作操作 int flags; /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line. * Used for Redis Cluster redirect. */ redisGetKeysProc *getkeys_proc; // 函数指针 /* What keys should be loaded in background when calling this command? */ int firstkey; /* The first argument that's a key (0 = no keys) */ int lastkey; /* The last argument that's a key */ int keystep; /* The step between first and last key */
// 服务器执行这个命令所耗费的总时长 long long microseconds;; // 服务器总共执行了多少次这个命令 long long calls; }; |
redisCommandTable的部分定义:
// server.c struct redisCommand redisCommandTable[] = { {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0}, {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}, {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}, 。。。。。。 {"select",selectCommand,2,"lF",0,NULL,0,0,0,0,0}, 。。。。。。 {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0} }; |
// server.c void selectCommand(client *c) { long id;
// 第一个参数值为DB的ID if (getLongFromObjectOrReply(c, c->argv[1], &id, "invalid DB index") != C_OK) return;
// 集群不支持SELECT命令 if (server.cluster_enabled && id != 0) { addReplyError(c,"SELECT is not allowed in cluster mode"); return; } if (selectDb(c,id) == C_ERR) { // 切换DB addReplyError(c,"DB index is out of range"); } else { addReply(c,shared.ok); } }
// db.c int selectDb(client *c, int id) { if (id < 0 || id >= server.dbnum) // dbnum值由redis.conf中的databases决定,默认为16 return C_ERR; c->db = &server.db[id]; return C_OK; } |
SET命令主要是字典操作(dict)。
// t_string.c /* SET key value [NX] [XX] [EX ] [PX ] */ void setCommand(client *c) { int j; robj *expire = NULL; int unit = UNIT_SECONDS; int flags = OBJ_SET_NO_FLAGS;
// 以下一长段均为参数选项解析, // 第一3个开始,因为第一个为命令SET自身,第二个为KEY,第三个为VALUE for (j = 3; j < c->argc; j++) { char *a = c->argv[j]->ptr; robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];
if ((a[0] == 'n' || a[0] == 'N') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && !(flags & OBJ_SET_XX)) { flags |= OBJ_SET_NX; } else if ((a[0] == 'x' || a[0] == 'X') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && !(flags & OBJ_SET_NX)) { flags |= OBJ_SET_XX; } else if ((a[0] == 'e' || a[0] == 'E') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && !(flags & OBJ_SET_PX) && next) { flags |= OBJ_SET_EX; unit = UNIT_SECONDS; expire = next; j++; } else if ((a[0] == 'p' || a[0] == 'P') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && !(flags & OBJ_SET_EX) && next) { flags |= OBJ_SET_PX; unit = UNIT_MILLISECONDS; expire = next; j++; } else { addReply(c,shared.syntaxerr); // 语法错误 return; } }
c->argv[2] = tryObjectEncoding(c->argv[2]); setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL); }
// t_string.c void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) { 。。。。。。 setKey(c->db,key,val); server.dirty++; if (expire) setExpire(c,c->db,key,mstime()+milliseconds); notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id); addReply(c, ok_reply ? ok_reply : shared.ok); }
// db.c void setKey(redisDb *db, robj *key, robj *val) { if (lookupKeyWrite(db,key) == NULL) { // 性能开销主要在这,参见dictFind函数 dbAdd(db,key,val); } else { dbOverwrite(db,key,val); } incrRefCount(val); removeExpire(db,key); signalModifiedKey(db,key); }
// db.c /* Add the key to the DB. It's up to the caller to increment the reference * counter of the value if needed. * * The program is aborted if the key already exists. */ void dbAdd(redisDb *db, robj *key, robj *val) { sds copy = sdsdup(key->ptr); int retval = dictAdd(db->dict, copy, val);
serverAssertWithInfo(NULL,key,retval == DICT_OK); if (val->type == OBJ_LIST || val->type == OBJ_ZSET) signalKeyAsReady(db, key); if (server.cluster_enabled) slotToKeyAdd(key); }
// dict.h #define dictSetVal(d, entry, _val_) do { \ if ((d)->type->valDup) \ (entry)->v.val = (d)->type->valDup((d)->privdata, _val_); \ else \ (entry)->v.val = (_val_); \ } while(0)
// db.c // KEY存在时覆盖写 /* Overwrite an existing key with a new value. Incrementing the reference * count of the new value is up to the caller. * This function does not modify the expire time of the existing key. * * The program is aborted if the key was not already present. */ void dbOverwrite(redisDb *db, robj *key, robj *val) { dictEntry *de = dictFind(db->dict,key->ptr);
serverAssertWithInfo(NULL,key,de != NULL); dictEntry auxentry = *de; robj *old = dictGetVal(de);
// 内存策略由redis.conf中的maxmemory-policy决定 // LFU策略是4.0开始引入的(Least Frequently Used,最不经常使用) // #define MAXMEMORY_FLAG_LRU (1<<0) // #define MAXMEMORY_FLAG_LFU (1<<1) // #define MAXMEMORY_FLAG_ALLKEYS (1<<2) // #define MAXMEMORY_FLAG_NO_SHARED_INTEGERS (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) // #define MAXMEMORY_VOLATILE_LRU ((0<<8)|MAXMEMORY_FLAG_LRU) // #define MAXMEMORY_VOLATILE_LFU ((1<<8)|MAXMEMORY_FLAG_LFU) // #define MAXMEMORY_VOLATILE_TTL (2<<8) // #define MAXMEMORY_VOLATILE_RANDOM (3<<8) // #define MAXMEMORY_ALLKEYS_LRU ((4<<8)|MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_ALLKEYS) // #define MAXMEMORY_ALLKEYS_LFU ((5<<8)|MAXMEMORY_FLAG_LFU|MAXMEMORY_FLAG_ALLKEYS) // #define MAXMEMORY_ALLKEYS_RANDOM ((6<<8)|MAXMEMORY_FLAG_ALLKEYS) // #define MAXMEMORY_NO_EVICTION (7<<8) if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { val->lru = old->lru; } dictSetVal(db->dict, de, val);
if (server.lazyfree_lazy_server_del) { freeObjAsync(old); dictSetVal(db->dict, &auxentry, NULL); }
dictFreeVal(db->dict, &auxentry); } |
// dict.c // KEY不存在时新增 /* Add an element to the target hash table */ int dictAdd(dict *d, void *key, void *val) { dictEntry *entry = dictAddRaw(d,key,NULL);
if (!entry) return DICT_ERR; dictSetVal(d, entry, val); return DICT_OK; } |
字典查找实现:
dictEntry *dictFind(dict *d, const void *key) { dictEntry *he; uint64_t h, idx, table;
if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */ if (dictIsRehashing(d)) _dictRehashStep(d);
// #define dictHashKey(d, key) (d)->type->hashFunction(key) h = dictHashKey(d, key); // 对key求哈希值 for (table = 0; table <= 1; table++) { idx = h & d->ht[table].sizemask; he = d->ht[table].table[idx]; while(he) { // 链式冲突解决法 if (key==he->key || dictCompareKeys(d, key, he->key)) return he; he = he->next; } if (!dictIsRehashing(d)) return NULL; // 重哈希 } return NULL; }
typedef struct dictType { uint64_t (*hashFunction)(const void *key); void *(*keyDup)(void *privdata, const void *key); void *(*valDup)(void *privdata, const void *obj); int (*keyCompare)(void *privdata, const void *key1, const void *key2); void (*keyDestructor)(void *privdata, void *key); void (*valDestructor)(void *privdata, void *obj); } dictType; |
// t_hash.c void hsetnxCommand(client *c) { robj *o; if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; hashTypeTryConversion(o,c->argv,2,3);
if (hashTypeExists(o, c->argv[2]->ptr)) { addReply(c, shared.czero); } else { hashTypeSet(o,c->argv[2]->ptr,c->argv[3]->ptr,HASH_SET_COPY); addReply(c, shared.cone); signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); server.dirty++; } } |
// t_hash.c int hashTypeSet(robj *o, sds field, sds value, int flags) { if (o->encoding == OBJ_ENCODING_ZIPLIST) { // ziplistInsert // ziplistPush } else if (o->encoding == OBJ_ENCODING_HT) { 。。。。。。 dictAdd(o->ptr,f,v); } else { serverPanic("Unknown hash encoding"); } } |
“AE”为“A simple event-driven programming library”的缩写,翻译成中文,即一个简单的事件驱动编程库。就Linux而言,可简单理解为对epoll的封装。
Redis支持select、epoll、kqueue和evport四种模式,编译Redis时决定采用哪一种,因此运行时只会有一种有效。优先顺序是:evport -> epoll -> kqueue -> select,其中select为兜底用,因为所有系统都会支持select。
对应的源码文件,分别为:
evport |
ae_evport.c |
Solaris |
epoll |
ae_epoll.c |
Linux系统 |
kqueue |
ae_kqueue.c |
BSD类系统 |
select |
ae_select.c |
其它不支持epoll、evport和kqueue系统 |
桥接它们的文件则是ae.c,桥接代码:
/* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif |
宏HAVE_EVPORT、HAVE_EPOLL、HAVE_KQUEUE则在config.h中决议出:
#ifdef __sun #include #ifdef _DTRACE_VERSION #define HAVE_EVPORT 1 #endif #endif
#ifdef __linux__ #define HAVE_EPOLL 1 #endif
#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__) #define HAVE_KQUEUE 1 #endif |
自main函数开始的调用过程(读事件和写事件的顺序,作者在这里用了个小技巧,支持优先响应是查询,即立即返回查询结果):
int main(int argc, char **argv) { // server.c 。。。。。。 // aeMain(server.el); void aeMain(aeEventLoop *eventLoop) // ae.c { while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); // 这很重要,写AOF文件在这里进行 // aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); int aeProcessEvents(aeEventLoop *eventLoop, int flags) { // ae.c // linux实际调用的是ae_epoll.c中的aeApiPoll函数 numevents = aeApiPoll(eventLoop, tvp); if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) { // 一般优先处理读事件,然后再写事件,但有时为立即响应是查询则写优先 /* Normally we execute the readable event first, and the writable event laster. */ int invert = fe->mask & AE_BARRIER; /* Fire the readable event if the call sequence is not inverted. */ fe->rfileProc(eventLoop,fd,fe->clientData,mask); /* Fire the writable event. */ fe->wfileProc(eventLoop,fd,fe->clientData,mask); /* If we have to invert the call, fire the readable event now after the writable one. */ fe->rfileProc(eventLoop,fd,fe->clientData,mask); // 函数aeCreateFileEvent中设置rfileProc和wfileProc // rfileProc和wfileProc的初始化均在aeCreateFileEvent中完成 } } } } 。。。。。。 }
// 初始化rfileProc和wfileProc int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; // events是一个以fd为下标的数组,0查找
if (aeApiAddEvent(eventLoop, fd, mask) == -1) // fd放到epoll或select等中 return AE_ERR; fe->mask |= mask; // 读写事件 if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; // 附加数据 if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; // 这个主要给select时用,epoll等不需要 return AE_OK; } |
main -> initServer -> aeCreateFileEvent(acceptTcpHandler) -> createClient -> aeCreateFileEvent(readQueryFromClient) -> read -> processCommand -> lookupCommand -> call(c,CMD_CALL_FULL) // call在调用proc后,会调用propagate函数将命令传播给AOF和Slaves -> c->cmd->proc(c) // 这个proc就是各种redis命令处理过程,比如:setCommand、hgetCommand等 |
在main函数中,会为accept注册一个事件acceptTcpHandler:
// server.c int main(int argc, char **argv) { void initServer(void) { // 支持多个IP,即redis.conf中可以bind多个IP for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], // 以读事件方式加入到epoll中 AE_READABLE, acceptTcpHandler, NULL) == AE_ERR) { serverPanic("Unrecoverable error creating server.ipfd file event."); } } } }
// networking.c void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata);
// 一次性连接接受max个连接,以提升效率 while(max--) { // anetTcpAccept处理了中断ENTR,方式是重试到成功或出错 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) // EWOULDBLOCK是正常的 serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); acceptCommonHandler(cfd,0,cip); } }
// networking.c static void acceptCommonHandler(int fd, int flags, char *ip) { client *c; // createClient为aceeptCommonHandler调用的核心函数 // 在createClient中会为fd关联读事件过程,并加入到epoll中 c = createClient(fd);
// 控制连接数不超过redis.conf上配置的最大数 if (listLength(server.clients) > server.maxclients) { char *err = "-ERR max number of clients reached\r\n"; /* That's a best effort error message, don't check write errors */ if (write(c->fd,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ }
server.stat_rejected_conn++; // 被拒绝的连接数增一 freeClient(c); return; }
// 保护模式处理 if (server.protected_mode && server.bindaddr_count == 0 && server.requirepass == NULL && !(flags & CLIENT_UNIX_SOCKET) && ip != NULL) { // 保护模式下,只允许127.0.0.1连接 if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) { char *err = "-DENIED Redis is running in protected mode because protected。。。。。。"; if (write(c->fd,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ }
server.stat_rejected_conn++; // 被拒绝的连接数增一 freeClient(c); return; } }
server.stat_numconnections++; // 当前连接数增一 c->flags |= flags; } |
// networking.c client *createClient(int fd) { client *c = zmalloc(sizeof(client)); // 如果redis.conf中开启了tcpkeepalive,配置项名tcp-keepalive,单位:秒 if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); // 注册c到epoll,并关联readQueryFromClient,事件为读事件AE_READABLE if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; }
selectDb(c,0); // 默认选择第1个DB,调用者需要执行select命令切换 。。。。。。 } |
// networking.c void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { client *c = (client*)privdata; int nread = read(fd, c->querybuf+qblen, readlen);
processInputBufferAndReplicate(c); }
/* This is a wrapper for processInputBuffer that also cares about handling * the replication forwarding to the sub-slaves, in case the client 'c' * is flagged as master. Usually you want to call this instead of the * raw processInputBuffer(). */ void processInputBufferAndReplicate(client *c) { if (!(c->flags & CLIENT_MASTER)) { processInputBuffer(c); // 处理命令 } else { size_t prev_offset = c->reploff; processInputBuffer(c); size_t applied = c->reploff - prev_offset; if (applied) { // 处理主从复制 replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied); sdsrange(c->pending_querybuf,applied,-1); } } } |
processInputBuffer调用processCommand,大量逻辑在processCommand中:
// networking.c /* If this function gets called we already read a whole * command, arguments are in the client argv/argc fields. * processCommand() execute the command or prepare the * server for a bulk read from the client. * * If C_OK is returned the client is still alive and valid and * other operations can be performed by the caller. Otherwise * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ void processInputBuffer(client *c) { server.current_client = c;
int processCommand(client *c) { // 单独处理QUIT命令 /* The QUIT command is handled separately. Normal command procs will * go through checking for replication and QUIT will cause trouble * when FORCE_REPLICATION is enabled and would be implemented in * a regular command proc. */ if (!strcasecmp(c->argv[0]->ptr,"quit")) { addReply(c,shared.ok); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return C_ERR; }
// 查找命令的处理 /* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. */ c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); if (!c->cmd) { addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",。。。); sdsfree(args); return C_OK; } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { // 参数个数不对 addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name); return C_OK; }
/* Check if the user is authenticated */ if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) { // 检查是否有设置密码,和密码是否已经验证通过 addReply(c,shared.noautherr); return C_OK; }
// 检查是否需要重定向 /* If cluster is enabled perform the cluster redirection here. * However we don't perform the redirection if: * 1) The sender of this command is our master. * 2) The command has no key arguments. */ if (server.cluster_enabled && !(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)) { if (n == NULL || n != server.cluster->myself) { if (c->cmd->proc == execCommand) { discardTransaction(c); } else { flagTransaction(c); } clusterRedirectClient(c,n,hashslot,error_code); return C_OK; } }
// 检查最大内存,如果超过配置的maxmemory, // 则返回错误“-OOM command not allowed when used memory > 'maxmemory'.” /* Handle the maxmemory directive. * * First we try to free some memory if possible (if there are volatile * keys in the dataset). If there are not the only thing we can do * is returning an error. * * Note that we do not want to reclaim memory if we are here re-entering * the event loop since there is a busy Lua script running in timeout * condition, to avoid mixing the propagation of scripts with the propagation * of DELs due to eviction. */ if (server.maxmemory && !server.lua_timedout) { /* It was impossible to free enough memory, and the command the client * is trying to execute is denied during OOM conditions? Error. */ if ((c->cmd->flags & CMD_DENYOOM) && out_of_memory) { addReply(c, shared.oomerr); return C_OK; } }
// 磁盘有问题不接受写请求 /* Don't accept write commands if there are problems persisting on disk * and if this is a master instance. */ int deny_write_type = writeCommandsDeniedByDiskError(); if (deny_write_type != DISK_ERROR_TYPE_NONE && server.masterhost == NULL && (c->cmd->flags & CMD_WRITE || c->cmd->proc == pingCommand)) { if (deny_write_type == DISK_ERROR_TYPE_RDB) addReply(c, shared.bgsaveerr); else addReplySds(c, sdscatprintf(sdsempty(), "-MISCONF Errors writing to the AOF file: %s\r\n", strerror(server.aof_last_write_errno))); return C_OK; }
// 不接受写请求,如果没有足够多好的slave /* Don't accept write commands if there are not enough good slaves and * user configured the min-slaves-to-write option. */ if (server.masterhost == NULL && server.repl_min_slaves_to_write && server.repl_min_slaves_max_lag && c->cmd->flags & CMD_WRITE && server.repl_good_slaves_count < server.repl_min_slaves_to_write) { addReply(c, shared.noreplicaserr); return C_OK; }
// 只读的slave不接受写请求 /* Don't accept write commands if this is a read only slave. But * accept write commands if this is our master. */ if (server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER) && c->cmd->flags & CMD_WRITE) { addReply(c, shared.roslaveerr); return C_OK; }
// 只接受SUB、UNSUB、PSUB、PUNSUB和PING命令 /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ if (c->flags & CLIENT_PUBSUB && c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand) { addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"); return C_OK; }
/* Only allow commands with flag "t", such as INFO, SLAVEOF and so on, * when slave-serve-stale-data is no and we are a slave with a broken * link with master. */ if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && server.repl_serve_stale_data == 0 && !(c->cmd->flags & CMD_STALE)) { flagTransaction(c); addReply(c, shared.masterdownerr); return C_OK; }
// 正在加载DB,直接返回错误 /* Loading DB? Return an error if the command has not the * CMD_LOADING flag. */ if (server.loading && !(c->cmd->flags & CMD_LOADING)) { addReply(c, shared.loadingerr); return C_OK; }
/* Lua script too slow? Only allow a limited number of commands. */ if (server.lua_timedout && c->cmd->proc != authCommand && c->cmd->proc != replconfCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && !(c->cmd->proc == scriptCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) { flagTransaction(c); addReply(c, shared.slowscripterr); return C_OK; }
/* Exec the command */ if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { /* Add a new command into the MULTI commands queue */ queueMultiCommand(c); // 链上新的事务子命令(队列结构),等到EXEC时一块执行 addReply(c,shared.queued); // 向client返回QUEUED } else { call(c,CMD_CALL_FULL); // 调用命令处理过程 c->woff = server.master_repl_offset; // 更新复制偏移 if (listLength(server.ready_keys)) handleClientsBlockedOnKeys(); } return C_OK; } } |
call在调用proc后,会调用propagate函数将命令传播给AOF和Slaves。
int processCommand(client *c) { 。。。。。。 void call(client *c, int flags) { 。。。。。。 propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); } } |
/* Propagate the specified command (in the context of the specified database id) * to AOF and Slaves. * * flags are an xor between: * + PROPAGATE_NONE (no propagation of command at all) * + PROPAGATE_AOF (propagate into the AOF file if is enabled) * + PROPAGATE_REPL (propagate into the replication link) * * This should not be used inside commands implementation. Use instead * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation(). */ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); // 写AOF if (flags & PROPAGATE_REPL) replicationFeedSlaves(server.slaves,dbid,argv,argc); // 推送给Slaves } |
feedAppendOnlyFile并不实际写文件,而只是将内容组织到AOF buffer中。写AOF文件是在每次epoll之前进行,redis-server启动时注册了函数beforeSleep,它在每次epoll之前被调用。beforeSleep调用flushAppendOnlyFile将AOF buffer写入到AOF文件。
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { 。。。。。。 /* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */ if (server.aof_state == AOF_ON) // aof_state值由redis.conf中的appendonly控制 server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf)); } |
redis-server启动:
int main(int argc, char **argv) { aeSetBeforeSleepProc(server.el, beforeSleep); // 注册回调beforeSleep aeSetAfterSleepProc(server.el, afterSleep); aeMain(server.el); } |
调用beforeSleep:
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); // 将AOF buffer写入到AOF文件 aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); // epoll } } |
调用flushAppendOnlyFile函数将AOF buffer写入到AOF文件:
/* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep * for ready file descriptors. */ void beforeSleep(struct aeEventLoop *eventLoop) { 。。。。。。 /* Unblock all the clients blocked for synchronous replication * in WAIT. */ if (listLength(server.clients_waiting_acks)) processClientsWaitingReplicas(); 。。。。。。 /* Try to process pending commands for clients that were just unblocked. */ if (listLength(server.unblocked_clients)) processUnblockedClients();
/* Write the AOF buffer on disk */ flushAppendOnlyFile(0); 。。。。。。 /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(); 。。。。。。 } |
写AOF文件:
/* Write the append only file buffer on disk. * * Since we are required to write the AOF before replying to the client, * and the only way the client socket can get a write is entering when the * the event loop, we accumulate all the AOF writes in a memory * buffer and write it on disk using this function just before entering * the event loop again. * * About the 'force' argument: * * When the fsync policy is set to 'everysec' we may delay the flush if there * is still an fsync() going on in the background thread, since for instance * on Linux write(2) will be blocked by the background fsync anyway. * When this happens we remember that there is some aof buffer to be * flushed ASAP, and will try to do that in the serverCron() function. * * However if force is set to 1 we'll write regardless of the background * fsync. */ #define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */ void flushAppendOnlyFile(int force) { // aofWrite调用write将AOF buffer写入到AOF文件,处理了ENTR,其它没什么 ssize_t nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); 。。。。。。 /* Handle the AOF write error. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* We can't recover when the fsync policy is ALWAYS since the * reply for the client is already in the output buffers, and we * have the contract with the user that on acknowledged write data * is synced on disk. */ serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting..."); exit(1); } else { return; /* We'll try again on the next call... */ } else { /* Successful write(2). If AOF was in error state, restore the * OK state and log the event. */ } 。。。。。。 /* Perform the fsync if needed. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { // redis_fsync是一个宏,Linux实际为fdatasync,其它为fsync // 所以最好不要将redis.conf中的appendfsync设置为always,这极影响性能 // 注意,Redis的实现并没有处理fsync和close的返回值,所以还不是最严格的
redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */ } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { // 如果已在sync状态,则不再重复 // BIO线程会间隔设置sync_in_progress // if (server.aof_fsync == AOF_FSYNC_EVERYSEC) // sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0; if (!sync_in_progress) // eversec性能并不那么糟糕,因为它: // 后台方式执行fsync // Redis并不是严格意义上的单线程,实际上它创建一组BIO线程,专门处理阻塞和慢操作 // 这些操作就包括FSYNC,另外还有关闭文件和内存的free两个操作。 // 不像always,EVERYSEC模式并不立即调用fsync, // 而是将这个操作丢给了BIO线程异步执行, // BIO线程在进程启动时被创建,两者间通过bio_jobs和bio_pending两个 // 全局对象交互,其中主线程负责写,BIO线程负责消费。 aof_background_fsync(server.aof_fd); server.aof_last_fsync = server.unixtime; } } |
这里也不会真正将数据发给Slaves,而只是将数据放入到replication buffer中。
/* Propagate write commands to slaves, and populate the replication backlog * as well. This function is used if the instance is a master: we use * the commands received by our clients in order to create the replication * stream. Instead if the instance is a slave and has sub-slaves attached, * we use replicationFeedSlavesFromMaster() */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { 。。。。。。 /* Write the command to every slave. */ listRewind(slaves,&li); while((ln = listNext(&li))) { // 遍历所有的slaves client *slave = ln->value; // Slave也被当作client
/* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
/* Feed slaves that are waiting for the initial SYNC (so these commands * are queued in the output buffer until the initial SYNC completes), * or are already in sync with the master. */
/* Add the multi bulk length. */ addReplyMultiBulkLen(slave,argc);
/* Finally any additional argument that was not stored inside the * static buffer if any (from j to argc). */ for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]); } } |
进程启动时事项:
1) 为所有配置设置默认值
2) 加载配置文件redis.conf,覆盖默认值
3) 安装信号处理器
4) 初始化全局状态
5) 创建epoll,启动TCP监控
6) 创建BIO线程
7) 加载module
8) 加载AOF或RDB
9) 设置beforeSleep和afterSleep两个回调
10) 进入epoll循环
int main(int argc, char **argv) { // 初始化所有配置,为它们设置默认值 initServerConfig();
// 如果是sentinel模式,额外做相关初始化 // sentinel_mode的值由命令行参数中是否包含参数“--sentinel”决定 if (server.sentinel_mode) { initSentinelConfig(); initSentinel(); }
// redis-server和redis-check-rdb、redis-check-aof共享了同一个main函数 // 进入redis_check_rdb_main或redis_check_aof_main后不会再返回, // 也就是这两个函数后的代码均不会被执行。 if (strstr(argv[0],"redis-check-rdb") != NULL) redis_check_rdb_main(argc,argv,NULL); else if (strstr(argv[0],"redis-check-aof") != NULL) redis_check_aof_main(argc,argv);
// 加载redis.conf, // 包括初始化module列表loadmodule_queue, // 注意会将redis-server的命令行参数一并传递给module loadServerConfig(configfile,options);
// 置为deamon方式, // 可通过redis.conf中的daemonize控制是否以deamon方式运行 if (background) daemonize();
// 对redis-server核心的初始化,包括但不限于: // 1) 忽略HUP和PIPE信号 // 2) 安装TERM和INT两个信号的处理器 // 3) 初始化全局的redisServer对象server // 4) 注册定时器回调serverCron // 5) 注册accept的回调acceptTcpHandler // 6) 注册UNIX套接字回调acceptUnixHandler // 7) 注册模块回调moduleBlockedClientPipeReadable // 8) 创建或追加模式打开AOF文件 // 9) 集群server.cluster初始化 // 10) 复制脚本初始化 // 11) 初始化lua运行环境 // 12) 慢日志初始化 // 13) 初始化延迟监控(latency monitor) // 14) 创建BIO线程,由bio.h中的BIO_NUM_OPS决定个数,当前为3个 initServer();
// 如果是daemon方式,则创建pid文件 if (background || server.pidfile) createPidFile();
// 修改进程的title,加入状态和端口号,这样对ps等更为友好 redisSetProcTitle(argv[0]);
// 进程启动时显示LOGO, // 可通过redis.conf中的always-show-logo控制是否显示 redisAsciiArt();
// 检查TCP的“/proc/sys/net/core/somaxconn” checkTcpBacklogSettings
if (server.sentinel_mode) { sentinelIsRunning(); } else { // 如果是Linux, // 检查“/proc/sys/vm/overcommit_memory” // 和“/sys/kernel/mm/transparent_hugepage/enabled”的值。 // “/proc/sys/vm/overcommit_memory”值需要为1, // “/sys/kernel/mm/transparent_hugepage/enabled”需要为“never” linuxMemoryWarnings();
// 加载所有的模块,如果有一个加载失败则启动失败 // 具体有哪些模块,由redis.conf中的loadmodule决定,如: // loadmodule /path/to/my_module.so // loadmodule /path/to/other_module.so // 每个module一行 moduleLoadFromQueue();
// 从磁盘加载RDB或AOF到内存 // 如果开启了AOF, // 则调用loadAppendOnlyFile加载AOF文件,这个时候不会加载RDB文件 // 没开启AOF时,调用rdbLoad加载RDB文件 // 不管是AOF还是RDB,加载不成功时,进程均不能启动。 loadDataFromDisk();
if (server.cluster_enabled) { if (verifyClusterConfigWithData() == C_ERR) { serverLog(LL_WARNING, "You can't have keys in a DB different than DB 0 when in " "Cluster mode. Exiting."); exit(1); }
if (server.ipfd_count > 0) serverLog(LL_NOTICE,"Ready to accept connections"); if (server.sofd > 0) serverLog(LL_NOTICE, "The server is now ready to accept connections at %s", server.unixsocket); }
// redis.conf中的maxmemory值过小,进行警告提示 if (server.maxmemory > 0 && server.maxmemory < 1024*1024) { serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory); }
// 注册回调beforeSleep, // 对于eversec模式的appendfsync将在这个回调中进行 aeSetBeforeSleepProc(server.el,beforeSleep); aeSetAfterSleepProc(server.el,afterSleep);
// 进入epoll主循环: // BSD为kqueue死循环, // Solaris为evport死循环, // 其它为select死循环 // // 网络数据的收和发,以及所有COMMAND的操作均在这个循环中完成 aeMain(server.el);
// 关闭epoll,释放相关资源 aeDeleteEventLoop(server.el); return 0; } |
Redis虽然采用C语言实现,但内含明显的对象,核心的对象包括:
这是一个超大的对象,实为redis-server的Context对象,承载了redis-server的各种全局状态。下面只列出它的一小部分:
// server.h struct redisServer { /* General */ pid_t pid; /* Main process pid. */ char *configfile; /* Absolute config file path, or NULL */
/* Networking */ int port; /* TCP listening port */
/* AOF persistence */ int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */ int aof_fsync; /* Kind of fsync() policy */
/* RDB persistence */ long long dirty; /* Changes to DB from the last save */
/* Logging */ char *logfile; /* Path of log file */
/* Replication (master) */ char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */
/* Replication (slave) */ char *masterhost; /* Hostname of master */
/* Cluster */ int cluster_enabled; /* Is cluster enabled? */
/* Scripting */ lua_State *lua; /* The Lua interpreter. We use just one for all clients */
/* Latency monitor */ dict *latency_events; }; |
一个client对象,可以是一个Client,也可以是一个slave,也可以是一个master。
// server.h /* With multiplexing we need to take per-client state. * Clients are taken in a linked list. */ typedef struct client { uint64_t id; /* Client incremental unique ID. */ int fd; /* Client socket. */ redisDb *db; /* Pointer to currently SELECTed DB. */ robj *name; /* As set by CLIENT SETNAME. */ sds querybuf; /* Buffer we use to accumulate client queries. */ size_t qb_pos; /* The position we have read in querybuf. */ int argc; /* Num of arguments of current command. */ robj **argv; /* Arguments of current command. */ struct redisCommand *cmd, *lastcmd; /* Last command executed. */ list *reply; /* List of reply objects to send to the client. */ int authenticated; /* When requirepass is non-NULL. */ int replstate; /* Replication state if this is a slave. */ char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
/* Response buffer */ int bufpos; char buf[PROTO_REPLY_CHUNK_BYTES]; }; |
Redis命令结构:
struct redisCommand { char *name; // Redis命令名,如:GET等 redisCommandProc *proc; // 处理该命令的过程 int arity; char *sflags; /* Flags as string representation, one char per flag. */ int flags; /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line. * Used for Redis Cluster redirect. */ redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */ int firstkey; /* The first argument that's a key (0 = no keys) */ int lastkey; /* The last argument that's a key */ int keystep; /* The step between first and last key */ long long microseconds, calls; }; |
Redis的DB结构:
/* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; /* Database ID */ long long avg_ttl; /* Average TTL, just for stats */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ } redisDb; |
Redis集群节点结构:
typedef long long mstime_t; /* millisecond time type. */ typedef struct clusterNode { mstime_t ctime; /* Node object creation time. */ char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */ int flags; /* CLUSTER_NODE_... */ uint64_t configEpoch; /* Last configEpoch observed for this node */ unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */ int numslots; /* Number of slots handled by this node */ int numslaves; /* Number of slave nodes, if this is a master */ // 所有的slave节点 struct clusterNode **slaves; /* pointers to slave nodes */ // master节点 struct clusterNode *slaveof; /* pointer to the master node. Note that it may be NULL even if the node is a slave if we don't have the master node in our tables. */ // 最近一次发送ping的时间 mstime_t ping_sent; /* Unix time we sent latest ping */ // 最近一次接收pong的时间 mstime_t pong_received; /* Unix time we received the pong */ mstime_t fail_time; /* Unix time when FAIL flag was set */ mstime_t voted_time; /* Last time we voted for a slave of this master */ mstime_t repl_offset_time; /* Unix time we received offset for this node */ mstime_t orphaned_time; /* Starting time of orphaned master condition */ long long repl_offset; /* Last known repl offset for this node. */ char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */ int port; /* Latest known clients port of this node */ int cport; /* Latest known cluster port of this node. */ clusterLink *link; /* TCP/IP link with this node */ list *fail_reports; /* List of nodes signaling this as failing */ } clusterNode; |
Redis集群状态结构:
typedef struct clusterState { clusterNode *myself; /* This node */ uint64_t currentEpoch; int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */ int size; /* Num of master nodes with at least one slot */
dict *nodes; /* Hash table of name -> clusterNode structures */ dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
clusterNode *migrating_slots_to[CLUSTER_SLOTS]; clusterNode *importing_slots_from[CLUSTER_SLOTS]; clusterNode *slots[CLUSTER_SLOTS];
mstime_t failover_auth_time; /* Time of previous or next election. */ int failover_auth_count; /* Number of votes received so far. */ int failover_auth_sent; /* True if we already asked for votes. */ int failover_auth_rank; /* This slave rank for current auth request. */ uint64_t failover_auth_epoch; /* Epoch of the current election. */
/* Manual failover state of master. */ clusterNode *mf_slave; /* Slave performing the manual failover. */
/* The followign fields are used by masters to take state on elections. */ uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */ } clusterState; |
待续。
待续。