记得之前写过一篇关于quicklist的文章
http://blog.chinaunix.net/uid-31422160-id-5817816.html,当时没觉得会有啥用,没想到在这边等我了,在链表模型中,编码基本都是基于quicklist的了,不信?来一段api代码 掌掌眼
-
void listTypePush(robj *subject, robj *value, int where) {
-
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
-
int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
-
value = getDecodedObject(value);// 转换成未编码时的sds字符串
-
size_t len = sdslen(value->ptr);
-
quicklistPush(subject->ptr, value->ptr, len, pos);
-
decrRefCount(value);
-
} else {
-
serverPanic("Unknown list encoding");
-
}
-
}
再比如pop和length函数,也是使用的OBJ_ENCODING_QUICKLIST编码
-
robj *listTypePop(robj *subject, int where) {
-
long long vlong;
-
robj *value = NULL;
-
-
int ql_where = where == LIST_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL;
-
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
-
if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value,
-
NULL, &vlong, listPopSaver)) {
-
if (!value)
-
value = createStringObjectFromLongLong(vlong);
-
}
-
} else {
-
serverPanic("Unknown list encoding");
-
}
-
return value;
-
}
-
-
unsigned long listTypeLength(const robj *subject) {
-
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
-
return quicklistCount(subject->ptr);
-
} else {
-
serverPanic("Unknown list encoding");
-
}
-
}
其余对外的api函数均是这样的,底层实现以quicklist为主
list对于数据库的操作代码,逻辑上跟string类型差不多:
1)先在client结构中找到对应的参数;
2)判断参数的合法性;
3)操作list对象,发送事件
这里先给出push的通常实现
-
void pushGenericCommand(client *c, int where) {
-
int j, pushed = 0;
-
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
-
-
if (lobj && lobj->type != OBJ_LIST) {
-
addReply(c,shared.wrongtypeerr);
-
return;
-
}
-
-
for (j = 2; j < c->argc; j++) {
-
if (!lobj) {
-
lobj = createQuicklistObject();
-
quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
-
server.list_compress_depth);
-
dbAdd(c->db,c->argv[1],lobj);
-
}
-
listTypePush(lobj,c->argv[j],where);
-
pushed++;
-
}
-
addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0));
-
if (pushed) {
-
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
-
-
signalModifiedKey(c->db,c->argv[1]);
-
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
-
}
-
server.dirty += pushed;
-
}
对于x类的命令,它的逻辑在判断为空或者编码非OBJ_LIST时直接返回,并不往下再走了,代码就不贴出来了~
之后有点意思的代码就是rpoplpush了,它不是直接pop,然后push,而是保存pop的值,增加引用计数,push成功后,减引用计数,这属于历史遗留问题了,之前的版本需要保护客户端的命令参数向量~
-
void rpoplpushCommand(client *c) {
-
robj *sobj, *value;
-
if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
-
checkType(c,sobj,OBJ_LIST)) return;
-
-
if (listTypeLength(sobj) == 0) {
-
/* This may only happen after loading very old RDB files. Recent
-
* versions of Redis delete keys of empty lists. */
-
addReply(c,shared.nullbulk);
-
} else {
-
robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
-
robj *touchedkey = c->argv[1];
-
-
if (dobj && checkType(c,dobj,OBJ_LIST)) return;
-
value = listTypePop(sobj,LIST_TAIL);
-
/* We saved touched key, and protect it, since rpoplpushHandlePush
-
* may change the client command argument vector (it does not
-
* currently). */
-
incrRefCount(touchedkey);
-
rpoplpushHandlePush(c,c->argv[2],dobj,value);
-
-
/* listTypePop returns an object with its refcount incremented */
-
decrRefCount(value);
-
-
/* Delete the source list when it is empty */
-
notifyKeyspaceEvent(NOTIFY_LIST,"rpop",touchedkey,c->db->id);
-
if (listTypeLength(sobj) == 0) {
-
dbDelete(c->db,touchedkey);
-
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
-
touchedkey,c->db->id);
-
}
-
signalModifiedKey(c->db,touchedkey);
-
decrRefCount(touchedkey);
-
server.dirty++;
-
}
-
}
再之后就是BLPOP等阻塞操作也需要说一下:
1)如果链表存在且不为空,则正常pop
2)如果链表不存在或者为空,需要先将请求的客户端挂起,不进行服务,并将所有阻塞在某个key的客户端放进一个链表里,直到进来新的数据,会依次被serve,代码如下:
-
void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
-
dictEntry *de;
-
list *l;
-
int j;
-
-
c->bpop.timeout = timeout;
-
c->bpop.target = target;
-
-
if (target != NULL) incrRefCount(target);
-
-
for (j = 0; j < numkeys; j++) {
-
/* If the key already exists in the dict ignore it. */
-
if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
-
incrRefCount(keys[j]);
-
-
/* And in the other "side", to map keys -> clients */
-
de = dictFind(c->db->blocking_keys,keys[j]);
-
if (de == NULL) {
-
int retval;
-
-
/* For every key we take a list of clients blocked for it */
-
l = listCreate();
-
retval = dictAdd(c->db->blocking_keys,keys[j],l);
-
incrRefCount(keys[j]);
-
serverAssertWithInfo(c,keys[j],retval == DICT_OK);
-
} else {
-
l = dictGetVal(de);
-
}
-
listAddNodeTail(l,c);
-
}
-
blockClient(c,BLOCKED_LIST);
-
}
解除阻塞操作如下:
-
void unblockClientWaitingData(client *c) {
-
dictEntry *de;
-
dictIterator *di;
-
list *l;
-
-
serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
-
di = dictGetIterator(c->bpop.keys);
-
/* The client may wait for multiple keys, so unblock it for every key. */
-
while((de = dictNext(di)) != NULL) {
-
robj *key = dictGetKey(de);
-
-
/* Remove this client from the list of clients waiting for this key. */
-
l = dictFetchValue(c->db->blocking_keys,key);
-
serverAssertWithInfo(c,key,l != NULL);
-
listDelNode(l,listSearchKey(l,c));
-
/* If the list is empty we need to remove it to avoid wasting memory */
-
if (listLength(l) == 0)
-
dictDelete(c->db->blocking_keys,key);
-
}
-
dictReleaseIterator(di);
-
-
/* Cleanup the client structure */
-
dictEmpty(c->bpop.keys,NULL);
-
if (c->bpop.target) {
-
decrRefCount(c->bpop.target);
-
c->bpop.target = NULL;
-
}
-
}
发送ready信息代码如下:
-
void signalListAsReady(redisDb *db, robj *key) {
-
readyList *rl;
-
-
/* No clients blocking for this key? No need to queue it. */
-
if (dictFind(db->blocking_keys,key) == NULL) return;
-
-
/* Key was already signaled? No need to queue it again. */
-
if (dictFind(db->ready_keys,key) != NULL) return;
-
-
/* Ok, we need to queue this key into server.ready_keys. */
-
rl = zmalloc(sizeof(*rl));
-
rl->key = key;
-
rl->db = db;
-
incrRefCount(key);
-
listAddNodeTail(server.ready_keys,rl);
-
-
/* We also add the key in the db->ready_keys dictionary in order
-
* to avoid adding it multiple times into a list with a simple O(1)
-
* check. */
-
incrRefCount(key);
-
serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
-
}
其余的操作套路是一样的,就不在这里复制粘贴代码了~
感谢您的阅读,有不足之处,还望不吝赐教~~~
阅读(2759) | 评论(0) | 转发(0) |