Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1876416
  • 博文数量: 184
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 2388
  • 用 户 组: 普通用户
  • 注册时间: 2016-12-21 22:26
个人简介

90后空巢老码农

文章分类

全部博文(184)

文章存档

2021年(26)

2020年(56)

2019年(54)

2018年(47)

2017年(1)

我的朋友

分类: NOSQL

2019-04-30 19:59:28

redis虽然整体使用单线程实现,但是为了确保client响应时间,还是将一些比较耗时的操作移到了单个线程里面,在后台运行,这就是我们今天要讲的bio。
首先,在4.0.11当中,bio留给外部调用的接口如下

点击(此处)折叠或打开

  1. /* Exported API */
  2. void bioInit(void);
  3. void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3);
  4. unsigned long long bioPendingJobsOfType(int type);
  5. unsigned long long bioWaitStepOfType(int type);
  6. time_t bioOlderJobOfType(int type);
  7. void bioKillThreads(void);
具体要用到bio的,共有三种情况:

点击(此处)折叠或打开

  1. /* Background job opcodes */
  2. #define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */
  3. #define BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
  4. #define BIO_LAZY_FREE 2 /* Deferred objects freeing. */
  5. #define BIO_NUM_OPS 3

为了分开执行以上三种情况,redis为每一种情况都构建了如下变量

点击(此处)折叠或打开

  1. static pthread_t bio_threads[BIO_NUM_OPS];
  2. static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
  3. static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];
  4. static pthread_cond_t bio_step_cond[BIO_NUM_OPS];
  5. static list *bio_jobs[BIO_NUM_OPS];

  6. static unsigned long long bio_pending[BIO_NUM_OPS];
最底下的两个分别是每种任务的队列,以及每种任务pending的数量,上面的变量就是为了操作这两个队列的相关线程、互斥量、条件变量了。
后台任务是用如下结构来表示的

点击(此处)折叠或打开

  1. /* This structure represents a background Job. It is only used locally to this
  2.  * file as the API does not expose the internals at all. */
  3. struct bio_job {
  4.     time_t time; /* Time at which the job was created. */
  5.     /* Job specific arguments pointers. If we need to pass more than three
  6.      * arguments we can just pass a pointer to a structure or alike. */
  7.     void *arg1, *arg2, *arg3;
  8. };
其初始化工作就是初始化每一种任务的相关变量,创建线程并等待:

点击(此处)折叠或打开

  1. /* Initialize the background system, spawning the thread. */
  2. void bioInit(void) {
  3.     pthread_attr_t attr;
  4.     pthread_t thread;
  5.     size_t stacksize;
  6.     int j;

  7.     /* Initialization of state vars and objects */
  8.     for (j = 0; j < BIO_NUM_OPS; j++) {
  9.         pthread_mutex_init(&bio_mutex[j],NULL);
  10.         pthread_cond_init(&bio_newjob_cond[j],NULL);
  11.         pthread_cond_init(&bio_step_cond[j],NULL);
  12.         bio_jobs[j] = listCreate();
  13.         bio_pending[j] = 0;
  14.     }

  15.     /* Set the stack size as by default it may be small in some system */
  16.     pthread_attr_init(&attr);
  17.     pthread_attr_getstacksize(&attr,&stacksize);
  18.     if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
  19.     while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
  20.     pthread_attr_setstacksize(&attr, stacksize);

  21.     /* Ready to spawn our threads. We use the single argument the thread
  22.      * function accepts in order to pass the job ID the thread is
  23.      * responsible of. */
  24.     for (j = 0; j < BIO_NUM_OPS; j++) {
  25.         void *arg = (void*)(unsigned long) j;
  26.         if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
  27.             serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
  28.             exit(1);
  29.         }
  30.         bio_threads[j] = thread;
  31.     }
  32. }
每个线程的具体实现如下

点击(此处)折叠或打开

  1. void *bioProcessBackgroundJobs(void *arg) {
  2.     struct bio_job *job;
  3.     unsigned long type = (unsigned long) arg;
  4.     sigset_t sigset;

  5.     /* Check that the type is within the right interval. */
  6.     if (type >= BIO_NUM_OPS) {
  7.         serverLog(LL_WARNING,
  8.             "Warning: bio thread started with wrong type %lu",type);
  9.         return NULL;
  10.     }

  11.     /* Make the thread killable at any time, so that bioKillThreads()
  12.      * can work reliably. */
  13.     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
  14.     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

  15.     pthread_mutex_lock(&bio_mutex[type]);
  16.     /* Block SIGALRM so we are sure that only the main thread will
  17.      * receive the watchdog signal. */
  18.     sigemptyset(&sigset);
  19.     sigaddset(&sigset, SIGALRM);
  20.     if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
  21.         serverLog(LL_WARNING,
  22.             "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

  23.     while(1) {
  24.         listNode *ln;

  25.         /* The loop always starts with the lock hold. */
  26.         if (listLength(bio_jobs[type]) == 0) {
  27.             pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
  28.             continue;
  29.         }
  30.         /* Pop the job from the queue. */
  31.         ln = listFirst(bio_jobs[type]);
  32.         job = ln->value;
  33.         /* It is now possible to unlock the background system as we know have
  34.          * a stand alone job structure to process.*/
  35.         pthread_mutex_unlock(&bio_mutex[type]);

  36.         /* Process the job accordingly to its type. */
  37.         if (type == BIO_CLOSE_FILE) {
  38.             close((long)job->arg1);
  39.         } else if (type == BIO_AOF_FSYNC) {
  40.             aof_fsync((long)job->arg1);
  41.         } else if (type == BIO_LAZY_FREE) {
  42.             /* What we free changes depending on what arguments are set:
  43.              * arg1 -> free the object at pointer.
  44.              * arg2 & arg3 -> free two dictionaries (a Redis DB).
  45.              * only arg3 -> free the skiplist. */
  46.             if (job->arg1)
  47.                 lazyfreeFreeObjectFromBioThread(job->arg1);
  48.             else if (job->arg2 && job->arg3)
  49.                 lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
  50.             else if (job->arg3)
  51.                 lazyfreeFreeSlotsMapFromBioThread(job->arg3);
  52.         } else {
  53.             serverPanic("Wrong job type in bioProcessBackgroundJobs().");
  54.         }
  55.         zfree(job);

  56.         /* Unblock threads blocked on bioWaitStepOfType() if any. */
  57.         pthread_cond_broadcast(&bio_step_cond[type]);

  58.         /* Lock again before reiterating the loop, if there are no longer
  59.          * jobs to process we'll block again in pthread_cond_wait(). */
  60.         pthread_mutex_lock(&bio_mutex[type]);
  61.         listDelNode(bio_jobs[type],ln);
  62.         bio_pending[type]--;
  63.     }
  64. }
创建具体任务的api就是传入类型和参数,将任务放入队列,然后signal到等待线程

点击(此处)折叠或打开

  1. void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
  2.     struct bio_job *job = zmalloc(sizeof(*job));

  3.     job->time = time(NULL);
  4.     job->arg1 = arg1;
  5.     job->arg2 = arg2;
  6.     job->arg3 = arg3;
  7.     pthread_mutex_lock(&bio_mutex[type]);
  8.     listAddNodeTail(bio_jobs[type],job);
  9.     bio_pending[type]++;
  10.     pthread_cond_signal(&bio_newjob_cond[type]);
  11.     pthread_mutex_unlock(&bio_mutex[type]);
  12. }
获取指定类型的pending job 的数量

点击(此处)折叠或打开

  1. /* Return the number of pending jobs of the specified type. */
  2. unsigned long long bioPendingJobsOfType(int type) {
  3.     unsigned long long val;
  4.     pthread_mutex_lock(&bio_mutex[type]);
  5.     val = bio_pending[type];
  6.     pthread_mutex_unlock(&bio_mutex[type]);
  7.     return val;
  8. }
bioWaitStepOfType(int type)和上面的非常类似,只不过是在pending为0的时候直接返回,若非0,则等待当前类型的任务的下一个开始处理再返回

点击(此处)折叠或打开

  1. unsigned long long bioWaitStepOfType(int type) {
  2.     unsigned long long val;
  3.     pthread_mutex_lock(&bio_mutex[type]);
  4.     val = bio_pending[type];
  5.     if (val != 0) {
  6.         pthread_cond_wait(&bio_step_cond[type],&bio_mutex[type]);
  7.         val = bio_pending[type];
  8.     }
  9.     pthread_mutex_unlock(&bio_mutex[type]);
  10.     return val;
  11. }
杀死所有线程就比较简单了

点击(此处)折叠或打开

  1. void bioKillThreads(void) {
  2.     int err, j;

  3.     for (j = 0; j < BIO_NUM_OPS; j++) {
  4.         if (pthread_cancel(bio_threads[j]) == 0) {
  5.             if ((err = pthread_join(bio_threads[j],NULL)) != 0) {
  6.                 serverLog(LL_WARNING,
  7.                     "Bio thread for job type #%d can be joined: %s",
  8.                         j, strerror(err));
  9.             } else {
  10.                 serverLog(LL_WARNING,
  11.                     "Bio thread for job type #%d terminated",j);
  12.             }
  13.         }
  14.     }
  15. }









阅读(3031) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~