之前讲到SSDB会注册任务处理函数,还会将客户端的请求封装为任务进行分发处理:
1. 注册处理函数
-
-
// net/proc.h
-
#define DEF_PROC(f) int proc_##f(NetworkServer *net, Link *link, const Request &req, Response *resp)
-
-
// serv.cpp
-
#define REG_PROC(c, f) net->proc_map.set_proc(#c, f, proc_##c)
-
-
void SSDBServer::reg_procs(NetworkServer *net){
-
REG_PROC(get, "rt");
-
REG_PROC(set, "wt");
-
REG_PROC(del, "wt");
-
REG_PROC(setx, "wt");
-
REG_PROC(setnx, "wt");
-
REG_PROC(getset, "wt");
-
REG_PROC(getbit, "rt");
-
REG_PROC(setbit, "wt");
-
REG_PROC(countbit, "rt");
-
REG_PROC(substr, "rt");
-
REG_PROC(getrange, "rt");
-
REG_PROC(strlen, "rt");
-
.......
-
}
2. 任务分发处理:
-
-
// server.cpp
-
if(cmd->flags & Command::FLAG_THREAD){
-
if(cmd->flags & Command::FLAG_WRITE){
-
job->result = PROC_THREAD;
-
// 写任务线程池处理
-
writer->push(*job);
-
}else{
-
job->result = PROC_THREAD;
-
// 读任务线程池处理
-
reader->push(*job);
-
}
-
return;
-
}
-
-
proc_t p = cmd->proc;
-
job->time_wait = 1000 * (millitime() - job->stime);
-
// 当前主线程处理
-
job->result = (*p)(this, job->link, *req, &resp);
-
job->time_proc = 1000 * (millitime() - job->stime) - job->time_wait;
下面将详细分析SSDB中的任务和任务执行
一. ProcJob && Command && ProcMap
与任务相关的数据结构的定义都放在proc.h中,交给任务线程池处理的对象是Command类型的,而不是ProcJob类型,Command对象是根据ProcJob构造来的。它们两者的区别在于:
ProcJob关心的是在哪个Link上需要进行任务处理,至于处理的具体任务是什么,处理函数是什么,ProcJob并不关心。
Command 则是众多处理任务中的一个,它包含了处理函数是什么,具体任务类型是什么。在一个Link(连接)上可能会发生很多次任务处理,那么只会有一个ProcJob,但会包含很多Command。
ProcMap则是处理函数的注册表,NetworkServer将根据名字查找注册函数,并获得任务类型
-
#define PROC_OK 0
-
#define PROC_ERROR -1
-
#define PROC_THREAD 1
-
#define PROC_BACKEND 100
-
-
// 通过宏定义处理函数
-
#define DEF_PROC(f) int proc_##f(NetworkServer *net, Link *link, const Request &req, Response *resp)
-
-
typedef std::vector<Bytes> Request;
-
// 处理函数的类型
-
typedef int (*proc_t)(NetworkServer *net, Link *link, const Request &req, Response *resp);
-
-
// Command类
-
// FLAG_READ : 读任务
-
// FLAG_WRITE : 写任务
-
// FLAG_BACKEND : 需要新建线程背地执行的特殊任务(例如主从同步,日志处理)
-
// FLAG_THREAD : 需要放到任务线程池中执行的任务
-
struct Command{
-
static const int FLAG_READ = (1 << 0);
-
static const int FLAG_WRITE = (1 << 1);
-
static const int FLAG_BACKEND = (1 << 2);
-
static const int FLAG_THREAD = (1 << 3);
-
-
std::string name;
-
int flags;
-
proc_t proc;
-
// 统计相关的变量
-
uint64_t calls;
-
double time_wait;
-
double time_proc;
-
-
Command(){
-
flags = 0;
-
proc = NULL;
-
calls = 0;
-
time_wait = 0;
-
time_proc = 0;
-
}
-
};
-
-
-
// ProcJob类
-
struct ProcJob{
-
int result;
-
NetworkServer *serv;
-
// 任务涉及的连接
-
Link *link;
-
Command *cmd;
-
double stime;
-
double time_wait;
-
double time_proc;
-
-
ProcJob(){
-
result = 0;
-
serv = NULL;
-
link = NULL;
-
cmd = NULL;
-
stime = 0;
-
time_wait = 0;
-
time_proc = 0;
-
}
-
};
-
-
-
struct BytesEqual{
-
bool operator()(const Bytes &s1, const Bytes &s2) const {
-
return (bool)(s1.compare(s2) == 0);
-
}
-
};
-
struct BytesHash{
-
size_t operator()(const Bytes &s1) const {
-
unsigned long __h = 0;
-
const char *p = s1.data();
-
for (int i=0 ; i<s1.size(); i++)
-
__h = 5*__h + p[i];
-
return size_t(__h);
-
}
-
};
-
-
-
#define GCC_VERSION (__GNUC__ * 100 + __GNUC_MINOR__)
-
#if GCC_VERSION >= 403
-
#include <tr1/unordered_map>
-
typedef std::tr1::unordered_map<Bytes, Command *, BytesHash, BytesEqual> proc_map_t;
-
#else
-
#ifdef NEW_MAC
-
#include <unordered_map>
-
typedef std::unordered_map<Bytes, Command *, BytesHash, BytesEqual> proc_map_t;
-
#else
-
#include <ext/hash_map>
-
typedef __gnu_cxx::hash_map<Bytes, Command *, BytesHash, BytesEqual> proc_map_t;
-
#endif
-
#endif
-
-
// 将所有的处理函数都注册到ProcMap中
-
class ProcMap
-
{
-
private:
-
proc_map_t proc_map;
-
-
public:
-
ProcMap();
-
~ProcMap();
-
void set_proc(const std::string &cmd, const char *sflags, proc_t proc);
-
void set_proc(const std::string &cmd, proc_t proc);
-
Command* get_proc(const Bytes &str);
-
-
proc_map_t::iterator begin(){
-
return proc_map.begin();
-
}
-
proc_map_t::iterator end(){
-
return proc_map.end();
-
}
-
};
二 . 任务执行: WorkerPool && ProcWoker
在NetworkServer解析出客户端的请求后,会根据需要将任务添加到任务线程池中处理,处理完成之后,再从线程池的结果队列中获取已完成的任务
-
if(cmd->flags & Command::FLAG_THREAD){
-
if(cmd->flags & Command::FLAG_WRITE){
-
job->result = PROC_THREAD;
-
writer->push(*job);
-
}else{
-
job->result = PROC_THREAD;
-
reader->push(*job);
-
}
-
return;
-
}
任务线程池(reader, writer) 类型为WorkerPool,它的定义在thread.h中:
-
template<class W, class JOB>
-
class WorkerPool{
-
public:
-
class Worker{
-
public:
-
Worker(){};
-
Worker(const std::string &name);
-
virtual ~Worker(){}
-
int id;
-
virtual void init(){}
-
virtual void destroy(){}
-
virtual int proc(JOB *job) = 0;
-
private:
-
protected:
-
std::string name;
-
};
-
private:
-
std::string name;
-
-
// 工作者池需要处理的JOB
-
Queue<JOB> jobs;
-
// 处理完成后的结果
-
SelectableQueue<JOB> results;
-
-
int num_workers;
-
std::vector<pthread_t> tids;
-
bool started;
-
-
struct run_arg{
-
int id;
-
WorkerPool *tp;
-
};
-
static void* _run_worker(void *arg);
-
public:
-
WorkerPool(const char *name="");
-
~WorkerPool();
-
-
int fd(){
-
return results.fd();
-
}
-
-
int start(int num_workers);
-
int stop();
-
-
int push(JOB job);
-
int pop(JOB *job);
-
};
线程池启动:
-
template<class W, class JOB>
-
void* WorkerPool<W, JOB>::_run_worker(void *arg){
-
struct run_arg *p = (struct run_arg*)arg;
-
int id = p->id;
-
WorkerPool *tp = p->tp;
-
delete p;
-
-
W w(tp->name);
-
Worker *worker = (Worker *)&w;
-
worker->id = id;
-
worker->init();
-
while(1){
-
JOB job;
-
// 获取任务
-
if(tp->jobs.pop(&job) == -1){
-
fprintf(stderr, "jobs.pop error\n");
-
::exit(0);
-
break;
-
}
-
// 处理任务
-
worker->proc(&job);
-
// 写入结果
-
if(tp->results.push(job) == -1){
-
fprintf(stderr, "results.push error\n");
-
::exit(0);
-
break;
-
}
-
}
-
worker->destroy();
-
return (void *)NULL;
-
}
-
-
template<class W, class JOB>
-
int WorkerPool<W, JOB>::start(int num_workers){
-
this->num_workers = num_workers;
-
if(started){
-
return 0;
-
}
-
int err;
-
pthread_t tid;
-
for(int i=0; i<num_workers; i++){
-
struct run_arg *arg = new run_arg();
-
arg->id = i;
-
arg->tp = this;
-
// 创建任务线程
-
err = pthread_create(&tid, NULL, &WorkerPool::_run_worker, arg);
-
if(err != 0){
-
fprintf(stderr, "can't create thread: %s\n", strerror(err));
-
}else{
-
tids.push_back(tid);
-
}
-
}
-
started = true;
-
return 0;
-
}
-
-
Worker类抽象了具体的执行任务过程:
step1 . 根据ProcJob获取对应的Command,执行其中的任务处理函数
step2 . 将任务结果放到Link的输出缓冲区中
-
int ProcWorker::proc(ProcJob *job){
-
const Request *req = job->link->last_recv();
-
Response resp;
-
-
proc_t p = job->cmd->proc;
-
// 任务等待的时长
-
job->time_wait = 1000 * (millitime() - job->stime);
-
// 根据传入的处理函数处理请求,得到结果
-
job->result = (*p)(job->serv, job->link, *req, &resp);
-
// 任务处理的时长
-
job->time_proc = 1000 * (millitime() - job->stime) - job->time_wait;
-
// 任务结果放到Link输出缓冲区中
-
if(job->link->send(resp.resp) == -1){
-
job->result = PROC_ERROR;
-
}else{
-
log_debug("w:%.3f,p:%.3f, req: %s, resp: %s",
-
job->time_wait, job->time_proc,
-
serialize_req(*req).c_str(),
-
serialize_req(resp.resp).c_str());
-
}
-
return 0;
-
}
线程池停止:
-
template<class W, class JOB>
-
int WorkerPool<W, JOB>::stop(){
-
// TODO: notify works quit and wait
-
for(int i=0; i<tids.size(); i++){
-
#ifdef OS_ANDROID
-
#else
-
pthread_cancel(tids[i]);
-
#endif
-
}
-
return 0;
-
}
注意: WorkerPool中的任务队列( jobs ) 和结果队列 ( results )使用了不同的数据结构,这与它们的应用场景有关:
Queue : Queue内部仅仅使用pthread_mutex和pthread_cond_t来同步读写操作,当Queue为空时,读者(任务线程)会阻塞在条件变量上等待新任务的到达。
SelectableQueue :
SelectableQueue的写入操作是工作者线程在完成任务之后写入结果,读取操作是网络处理主线程 (NetworkServer::serv )读取结果发送给客户端,因此在SelectableQueue为空时,不能让网络处理主线程阻塞在SelectableQueue上。因此在工作者线程写入时,会使用pipe通知主线程来读取,主线程读取的时候,仍然需要pthread_mutex同步。
Queue的push和pop操作:
-
template <class T>
-
int Queue<T>::push(const T item){
-
if(pthread_mutex_lock(&mutex) != 0){
-
return -1;
-
}
-
{
-
items.push(item);
-
}
-
pthread_mutex_unlock(&mutex);
-
pthread_cond_signal(&cond);
-
return 1;
-
}
-
-
template <class T>
-
int Queue<T>::pop(T *data){
-
if(pthread_mutex_lock(&mutex) != 0){
-
return -1;
-
}
-
{
-
// 必须放在循环中, 因为 pthread_cond_wait 可能抢不到锁而被其它处理了
-
while(items.empty()){
-
//fprintf(stderr, "%d wait\n", pthread_self());
-
if(pthread_cond_wait(&cond, &mutex) != 0){
-
//fprintf(stderr, "%s %d -1!\n", __FILE__, __LINE__);
-
return -1;
-
}
-
//fprintf(stderr, "%d wait 2\n", pthread_self());
-
}
-
*data = items.front();
-
//fprintf(stderr, "%d job: %d\n", pthread_self(), (int)*data);
-
items.pop();
-
}
-
if(pthread_mutex_unlock(&mutex) != 0){
-
//fprintf(stderr, "error!\n");
-
return -1;
-
}
-
//fprintf(stderr, "%d wait end 2, job: %d\n", pthread_self(), (int)*data);
-
return 1;
-
}
SelectableQueue的push和pop操作:
-
template <class T>
-
int SelectableQueue<T>::push(const T item){
-
if(pthread_mutex_lock(&mutex) != 0){
-
return -1;
-
}
-
{
-
items.push(item);
-
}
-
// 写入管道通知主线程读取
-
if(::write(fds[1], "1", 1) == -1){
-
fprintf(stderr, "write fds error\n");
-
exit(0);
-
}
-
pthread_mutex_unlock(&mutex);
-
return 1;
-
}
-
-
template <class T>
-
int SelectableQueue<T>::pop(T *data){
-
int n, ret = 1;
-
char buf[1];
-
-
while(1){
-
// 读取管道数据
-
n = ::read(fds[0], buf, 1);
-
if(n < 0){
-
if(errno == EINTR){
-
continue;
-
}else{
-
return -1;
-
}
-
}else if(n == 0){
-
ret = -1;
-
}else{
-
// phtead_mutex同步读取操作
-
if(pthread_mutex_lock(&mutex) != 0){
-
return -1;
-
}
-
{
-
if(items.empty()){
-
fprintf(stderr, "%s %d error!\n", __FILE__, __LINE__);
-
pthread_mutex_unlock(&mutex);
-
return -1;
-
}
-
*data = items.front();
-
items.pop();
-
}
-
pthread_mutex_unlock(&mutex);
-
}
-
break;
-
}
-
return ret;
-
}
阅读(1935) | 评论(0) | 转发(0) |