redis虽然整体使用单线程实现,但是为了确保client响应时间,还是将一些比较耗时的操作移到了单个线程里面,在后台运行,这就是我们今天要讲的bio。
首先,
在4.0.11当中,bio留给外部调用的接口如下
-
/* Exported API */
-
void bioInit(void);
-
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3);
-
unsigned long long bioPendingJobsOfType(int type);
-
unsigned long long bioWaitStepOfType(int type);
-
time_t bioOlderJobOfType(int type);
-
void bioKillThreads(void);
具体要用到bio的,共有三种情况:
-
/* Background job opcodes */
-
#define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */
-
#define BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
-
#define BIO_LAZY_FREE 2 /* Deferred objects freeing. */
-
#define BIO_NUM_OPS 3
为了分开执行以上三种情况,redis为每一种情况都构建了如下变量
-
static pthread_t bio_threads[BIO_NUM_OPS];
-
static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
-
static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];
-
static pthread_cond_t bio_step_cond[BIO_NUM_OPS];
-
static list *bio_jobs[BIO_NUM_OPS];
-
-
static unsigned long long bio_pending[BIO_NUM_OPS];
最底下的两个分别是每种任务的队列,以及每种任务pending的数量,上面的变量就是为了操作这两个队列的相关线程、互斥量、条件变量了。
后台任务是用如下结构来表示的
-
/* This structure represents a background Job. It is only used locally to this
-
* file as the API does not expose the internals at all. */
-
struct bio_job {
-
time_t time; /* Time at which the job was created. */
-
/* Job specific arguments pointers. If we need to pass more than three
-
* arguments we can just pass a pointer to a structure or alike. */
-
void *arg1, *arg2, *arg3;
-
};
其初始化工作就是初始化每一种任务的相关变量,创建线程并等待:
-
/* Initialize the background system, spawning the thread. */
-
void bioInit(void) {
-
pthread_attr_t attr;
-
pthread_t thread;
-
size_t stacksize;
-
int j;
-
-
/* Initialization of state vars and objects */
-
for (j = 0; j < BIO_NUM_OPS; j++) {
-
pthread_mutex_init(&bio_mutex[j],NULL);
-
pthread_cond_init(&bio_newjob_cond[j],NULL);
-
pthread_cond_init(&bio_step_cond[j],NULL);
-
bio_jobs[j] = listCreate();
-
bio_pending[j] = 0;
-
}
-
-
/* Set the stack size as by default it may be small in some system */
-
pthread_attr_init(&attr);
-
pthread_attr_getstacksize(&attr,&stacksize);
-
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
-
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
-
pthread_attr_setstacksize(&attr, stacksize);
-
-
/* Ready to spawn our threads. We use the single argument the thread
-
* function accepts in order to pass the job ID the thread is
-
* responsible of. */
-
for (j = 0; j < BIO_NUM_OPS; j++) {
-
void *arg = (void*)(unsigned long) j;
-
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
-
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
-
exit(1);
-
}
-
bio_threads[j] = thread;
-
}
-
}
每个线程的具体实现如下
-
void *bioProcessBackgroundJobs(void *arg) {
-
struct bio_job *job;
-
unsigned long type = (unsigned long) arg;
-
sigset_t sigset;
-
-
/* Check that the type is within the right interval. */
-
if (type >= BIO_NUM_OPS) {
-
serverLog(LL_WARNING,
-
"Warning: bio thread started with wrong type %lu",type);
-
return NULL;
-
}
-
-
/* Make the thread killable at any time, so that bioKillThreads()
-
* can work reliably. */
-
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
-
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
-
-
pthread_mutex_lock(&bio_mutex[type]);
-
/* Block SIGALRM so we are sure that only the main thread will
-
* receive the watchdog signal. */
-
sigemptyset(&sigset);
-
sigaddset(&sigset, SIGALRM);
-
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
-
serverLog(LL_WARNING,
-
"Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
-
-
while(1) {
-
listNode *ln;
-
-
/* The loop always starts with the lock hold. */
-
if (listLength(bio_jobs[type]) == 0) {
-
pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
-
continue;
-
}
-
/* Pop the job from the queue. */
-
ln = listFirst(bio_jobs[type]);
-
job = ln->value;
-
/* It is now possible to unlock the background system as we know have
-
* a stand alone job structure to process.*/
-
pthread_mutex_unlock(&bio_mutex[type]);
-
-
/* Process the job accordingly to its type. */
-
if (type == BIO_CLOSE_FILE) {
-
close((long)job->arg1);
-
} else if (type == BIO_AOF_FSYNC) {
-
aof_fsync((long)job->arg1);
-
} else if (type == BIO_LAZY_FREE) {
-
/* What we free changes depending on what arguments are set:
-
* arg1 -> free the object at pointer.
-
* arg2 & arg3 -> free two dictionaries (a Redis DB).
-
* only arg3 -> free the skiplist. */
-
if (job->arg1)
-
lazyfreeFreeObjectFromBioThread(job->arg1);
-
else if (job->arg2 && job->arg3)
-
lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
-
else if (job->arg3)
-
lazyfreeFreeSlotsMapFromBioThread(job->arg3);
-
} else {
-
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
-
}
-
zfree(job);
-
-
/* Unblock threads blocked on bioWaitStepOfType() if any. */
-
pthread_cond_broadcast(&bio_step_cond[type]);
-
-
/* Lock again before reiterating the loop, if there are no longer
-
* jobs to process we'll block again in pthread_cond_wait(). */
-
pthread_mutex_lock(&bio_mutex[type]);
-
listDelNode(bio_jobs[type],ln);
-
bio_pending[type]--;
-
}
-
}
创建具体任务的api就是传入类型和参数,将任务放入队列,然后signal到等待线程
-
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
-
struct bio_job *job = zmalloc(sizeof(*job));
-
-
job->time = time(NULL);
-
job->arg1 = arg1;
-
job->arg2 = arg2;
-
job->arg3 = arg3;
-
pthread_mutex_lock(&bio_mutex[type]);
-
listAddNodeTail(bio_jobs[type],job);
-
bio_pending[type]++;
-
pthread_cond_signal(&bio_newjob_cond[type]);
-
pthread_mutex_unlock(&bio_mutex[type]);
-
}
获取指定类型的pending job 的数量
-
/* Return the number of pending jobs of the specified type. */
-
unsigned long long bioPendingJobsOfType(int type) {
-
unsigned long long val;
-
pthread_mutex_lock(&bio_mutex[type]);
-
val = bio_pending[type];
-
pthread_mutex_unlock(&bio_mutex[type]);
-
return val;
-
}
bioWaitStepOfType(int type)和上面的非常类似,只不过是在pending为0的时候直接返回,若非0,则等待当前类型的任务的下一个开始处理再返回
-
unsigned long long bioWaitStepOfType(int type) {
-
unsigned long long val;
-
pthread_mutex_lock(&bio_mutex[type]);
-
val = bio_pending[type];
-
if (val != 0) {
-
pthread_cond_wait(&bio_step_cond[type],&bio_mutex[type]);
-
val = bio_pending[type];
-
}
-
pthread_mutex_unlock(&bio_mutex[type]);
-
return val;
-
}
杀死所有线程就比较简单了
-
void bioKillThreads(void) {
-
int err, j;
-
-
for (j = 0; j < BIO_NUM_OPS; j++) {
-
if (pthread_cancel(bio_threads[j]) == 0) {
-
if ((err = pthread_join(bio_threads[j],NULL)) != 0) {
-
serverLog(LL_WARNING,
-
"Bio thread for job type #%d can be joined: %s",
-
j, strerror(err));
-
} else {
-
serverLog(LL_WARNING,
-
"Bio thread for job type #%d terminated",j);
-
}
-
}
-
}
-
}
阅读(3031) | 评论(0) | 转发(0) |