Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1875980
  • 博文数量: 184
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 2388
  • 用 户 组: 普通用户
  • 注册时间: 2016-12-21 22:26
个人简介

90后空巢老码农

文章分类

全部博文(184)

文章存档

2021年(26)

2020年(56)

2019年(54)

2018年(47)

2017年(1)

我的朋友

分类: NOSQL

2019-04-01 23:41:02

记得之前写过一篇关于quicklist的文章http://blog.chinaunix.net/uid-31422160-id-5817816.html,当时没觉得会有啥用,没想到在这边等我了,在链表模型中,编码基本都是基于quicklist的了,不信?来一段api代码 掌掌眼

点击(此处)折叠或打开

  1. void listTypePush(robj *subject, robj *value, int where) {
  2.     if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
  3.         int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
  4.         value = getDecodedObject(value);// 转换成未编码时的sds字符串
  5.         size_t len = sdslen(value->ptr);
  6.         quicklistPush(subject->ptr, value->ptr, len, pos);
  7.         decrRefCount(value);
  8.     } else {
  9.         serverPanic("Unknown list encoding");
  10.     }
  11. }
再比如pop和length函数,也是使用的OBJ_ENCODING_QUICKLIST编码

点击(此处)折叠或打开

  1. robj *listTypePop(robj *subject, int where) {
  2.     long long vlong;
  3.     robj *value = NULL;

  4.     int ql_where = where == LIST_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL;
  5.     if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
  6.         if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value,
  7.                                NULL, &vlong, listPopSaver)) {
  8.             if (!value)
  9.                 value = createStringObjectFromLongLong(vlong);
  10.         }
  11.     } else {
  12.         serverPanic("Unknown list encoding");
  13.     }
  14.     return value;
  15. }

  16. unsigned long listTypeLength(const robj *subject) {
  17.     if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
  18.         return quicklistCount(subject->ptr);
  19.     } else {
  20.         serverPanic("Unknown list encoding");
  21.     }
  22. }


其余对外的api函数均是这样的,底层实现以quicklist为主
list对于数据库的操作代码,逻辑上跟string类型差不多:
1)先在client结构中找到对应的参数;
2)判断参数的合法性;
3)操作list对象,发送事件
这里先给出push的通常实现

点击(此处)折叠或打开

  1. void pushGenericCommand(client *c, int where) {
  2.     int j, pushed = 0;
  3.     robj *lobj = lookupKeyWrite(c->db,c->argv[1]);

  4.     if (lobj && lobj->type != OBJ_LIST) {
  5.         addReply(c,shared.wrongtypeerr);
  6.         return;
  7.     }

  8.     for (j = 2; j < c->argc; j++) {
  9.         if (!lobj) {
  10.             lobj = createQuicklistObject();
  11.             quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
  12.                                 server.list_compress_depth);
  13.             dbAdd(c->db,c->argv[1],lobj);
  14.         }
  15.         listTypePush(lobj,c->argv[j],where);
  16.         pushed++;
  17.     }
  18.     addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0));
  19.     if (pushed) {
  20.         char *event = (where == LIST_HEAD) ? "lpush" : "rpush";

  21.         signalModifiedKey(c->db,c->argv[1]);
  22.         notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
  23.     }
  24.     server.dirty += pushed;
  25. }
对于x类的命令,它的逻辑在判断为空或者编码非OBJ_LIST时直接返回,并不往下再走了,代码就不贴出来了~
之后有点意思的代码就是rpoplpush了,它不是直接pop,然后push,而是保存pop的值,增加引用计数,push成功后,减引用计数,这属于历史遗留问题了,之前的版本需要保护客户端的命令参数向量~

点击(此处)折叠或打开

  1. void rpoplpushCommand(client *c) {
  2.     robj *sobj, *value;
  3.     if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
  4.         checkType(c,sobj,OBJ_LIST)) return;

  5.     if (listTypeLength(sobj) == 0) {
  6.         /* This may only happen after loading very old RDB files. Recent
  7.          * versions of Redis delete keys of empty lists. */
  8.         addReply(c,shared.nullbulk);
  9.     } else {
  10.         robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
  11.         robj *touchedkey = c->argv[1];

  12.         if (dobj && checkType(c,dobj,OBJ_LIST)) return;
  13.         value = listTypePop(sobj,LIST_TAIL);
  14.         /* We saved touched key, and protect it, since rpoplpushHandlePush
  15.          * may change the client command argument vector (it does not
  16.          * currently). */
  17.         incrRefCount(touchedkey);
  18.         rpoplpushHandlePush(c,c->argv[2],dobj,value);

  19.         /* listTypePop returns an object with its refcount incremented */
  20.         decrRefCount(value);

  21.         /* Delete the source list when it is empty */
  22.         notifyKeyspaceEvent(NOTIFY_LIST,"rpop",touchedkey,c->db->id);
  23.         if (listTypeLength(sobj) == 0) {
  24.             dbDelete(c->db,touchedkey);
  25.             notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
  26.                                 touchedkey,c->db->id);
  27.         }
  28.         signalModifiedKey(c->db,touchedkey);
  29.         decrRefCount(touchedkey);
  30.         server.dirty++;
  31.     }
  32. }
再之后就是BLPOP等阻塞操作也需要说一下:
1)如果链表存在且不为空,则正常pop
2)如果链表不存在或者为空,需要先将请求的客户端挂起,不进行服务,并将所有阻塞在某个key的客户端放进一个链表里,直到进来新的数据,会依次被serve,代码如下:

点击(此处)折叠或打开

  1. void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
  2.     dictEntry *de;
  3.     list *l;
  4.     int j;

  5.     c->bpop.timeout = timeout;
  6.     c->bpop.target = target;

  7.     if (target != NULL) incrRefCount(target);

  8.     for (j = 0; j < numkeys; j++) {
  9.         /* If the key already exists in the dict ignore it. */
  10.         if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
  11.         incrRefCount(keys[j]);

  12.         /* And in the other "side", to map keys -> clients */
  13.         de = dictFind(c->db->blocking_keys,keys[j]);
  14.         if (de == NULL) {
  15.             int retval;

  16.             /* For every key we take a list of clients blocked for it */
  17.             l = listCreate();
  18.             retval = dictAdd(c->db->blocking_keys,keys[j],l);
  19.             incrRefCount(keys[j]);
  20.             serverAssertWithInfo(c,keys[j],retval == DICT_OK);
  21.         } else {
  22.             l = dictGetVal(de);
  23.         }
  24.         listAddNodeTail(l,c);
  25.     }
  26.     blockClient(c,BLOCKED_LIST);
  27. }
解除阻塞操作如下:

点击(此处)折叠或打开

  1. void unblockClientWaitingData(client *c) {
  2.     dictEntry *de;
  3.     dictIterator *di;
  4.     list *l;

  5.     serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
  6.     di = dictGetIterator(c->bpop.keys);
  7.     /* The client may wait for multiple keys, so unblock it for every key. */
  8.     while((de = dictNext(di)) != NULL) {
  9.         robj *key = dictGetKey(de);

  10.         /* Remove this client from the list of clients waiting for this key. */
  11.         l = dictFetchValue(c->db->blocking_keys,key);
  12.         serverAssertWithInfo(c,key,l != NULL);
  13.         listDelNode(l,listSearchKey(l,c));
  14.         /* If the list is empty we need to remove it to avoid wasting memory */
  15.         if (listLength(l) == 0)
  16.             dictDelete(c->db->blocking_keys,key);
  17.     }
  18.     dictReleaseIterator(di);

  19.     /* Cleanup the client structure */
  20.     dictEmpty(c->bpop.keys,NULL);
  21.     if (c->bpop.target) {
  22.         decrRefCount(c->bpop.target);
  23.         c->bpop.target = NULL;
  24.     }
  25. }
发送ready信息代码如下:

点击(此处)折叠或打开

  1. void signalListAsReady(redisDb *db, robj *key) {
  2.     readyList *rl;

  3.     /* No clients blocking for this key? No need to queue it. */
  4.     if (dictFind(db->blocking_keys,key) == NULL) return;

  5.     /* Key was already signaled? No need to queue it again. */
  6.     if (dictFind(db->ready_keys,key) != NULL) return;

  7.     /* Ok, we need to queue this key into server.ready_keys. */
  8.     rl = zmalloc(sizeof(*rl));
  9.     rl->key = key;
  10.     rl->db = db;
  11.     incrRefCount(key);
  12.     listAddNodeTail(server.ready_keys,rl);

  13.     /* We also add the key in the db->ready_keys dictionary in order
  14.      * to avoid adding it multiple times into a list with a simple O(1)
  15.      * check. */
  16.     incrRefCount(key);
  17.     serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
  18. }
其余的操作套路是一样的,就不在这里复制粘贴代码了~
感谢您的阅读,有不足之处,还望不吝赐教~~~
阅读(2759) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~