说起load balance,一般比较容易想到的是大型服务在多个replica之间的load balance、和kernal的load balance。前者一般只是在流量入口做一下流量分配,逻辑相对简单;而后者则比较复杂,需要不断发现正在运行的各个进程之间的imbalance,然后通过将进程在CPU之间进行迁移,使得各个CPU都被充分利用起来。
而本文想要讨论的load balance有别于以上两种,它是多线程(多进程)server程序内部,各个worker线程(进程)之间的load balance。
考虑一种常用的server模型:一个receiver线程负责接收请求,后面有一个线程池装了一堆worker线程,收到的请求被分派给这些worker进行处理。receiver与worker之间通过pthread_cond+request_queue来进行通信。一般的做法是:receiver将收到的请求放入queue,然后signal一下cond,就OK了。具体哪个worker会被唤醒,那是kernel的事情(实际上kernel会遵循先来后到原则,唤醒先进入等待的进程,参阅《》)。通常情况下这样做就足够了,receiver唤醒worker不需要涉及load balance的逻辑。但是有时候我们还是可以做一些load balance的工作,来提高server的性能。
kernel load balance概述
由于这里的load balance跟kernel的load balance息息相关,所以我们有必要先看看kernel的load balance都做了些什么。详细的内容请参阅《》,这里只做一些简要的概括。
说白了,kernel的load balance就做一件事情: 让系统中RUNNING状态的进程尽可能的被分摊,在每一个调度域上看都是balance的 。怎么理解呢?现在CPU的结构一般有:物理CPU、core、超线程、这么几个层次。”在每一个调度域上看都balance”可以理解为在每一个层次上都balance:每个物理CPU上的总load相当、每个core上的总load相当、每个超线程上的load也相当。
我们在系统中看到的”CPU”都是最底层的超线程这个层次,我们可能会直观的认为把RUNNING状态的进程分摊到每一个”CPU”上就行了,但是实际上kernel的load balance还有更高的要求。假设我们的机器有2个物理CPU、每个物理CPU有2个core、每个core有2个超线程,共8个”CPU”。如果现在有8个RUNNING状态的进程(假设优先级都相同),每个”CPU”各分摊一个进程,那么自然就是balance的。但是如果现在只有4个RUNNING状态的进程(假设优先级都相同),真正的balance并不仅仅是每个进程各自落到一个”CPU”上就行了,而是进一步要求每个物理CPU上跑两个进程、每个core上跑一个进程。
为什么要有这样的强约束呢?因为尽管各个”CPU”逻辑上是独立的(不存在主从关系之类),但它们并非孤立存在。相同物理CPU下的”CPU”会共享cache、相同core下的”CPU”会共享计算资源(所谓的超线程也就是一套流水线跑两个线程)。而共享也就意味着争抢。所以,在RUNNING状态的进程并非正好均摊给每一个”CPU”的情况下,需要考虑更高层次的CPU是否被均摊,以避免cache和CPU流水线的争抢(当然,除了性能,这也体现了kernel的公平性)。
最后再多提一点,kernel的load balance是异步的。为避免占用过多资源,kernel肯定不可能实时监控各个”CPU”的情况,然后面对变化实时的做出反应(当然,实时进程除外,但这不在我们讨论范围内)。
server的load balance考虑
有了kernel的load balance作为铺垫,看看我们server上的receiver线程能做些什么吧。
首先是worker线程的数量问题。如果worker数量过多会发生什么情况?还是假设我们的机器有上述的8个”CPU”,假设我们开了80个worker,再假设这80个线程被平均分派到每一个”CPU”上,等待处理任务。当一堆请求陆续到来的时候,由于我们的receiver没有任何load balance的策略,被唤醒的worker出现在哪个”CPU”上可以说是随机的。你想想,”同时”到来的8个请求正好落到8个不同”CPU”上的概率是多少?是:(70*60*50*40*30*20*10)/(79*78*77*76*75*74*73)=0.34%。也就是说几乎肯定会出现某些”CPU”要处理多个请求、某些”CPU”却闲着没事干的情况,系统的性能可想而知。而等到后知后觉的kernel load balance将这些请求balance到每一个”CPU”上时,可能请求已经处理得差不多了,等到下一批请求到来时,load又还是凌乱的。因为刚刚已经balance好的那些worker线程又被放回到了cond等待队列的尾部,而优先响应新请求的则是那些位于队列头部的未曾被balance过的worker。
那么会不会经历几轮请求之后就能达到balance了呢?如果请求真的是一轮一轮的过来,并且每个请求的处理时间完全相同,那么有可能会达到balance,但是实际情况肯定相差甚远。
解决办法是什么呢?将cond先进先出的队列式等待逻辑改为后进先出的栈式逻辑,或许可以解决问题,但是更好的办法应该是限制worker的数目等于或者略小于”CPU”数目,这样很自然的就balance了。
第二个问题,既然我们承认kernel在各个调度域上的load balance的有意义的,我们server中的receiver线程是不是也可以通过类似的办法来获得收益呢?现在我们吸取了之前的教训,只开了8个worker线程。依靠kernel load balance的作用,这8个线程基本会固定在每一个”CPU”上。假设现在一下子来了4个请求,它们会落到4个不同的”CPU”上,如果运气好,这4个”CPU”分别属于不同的core,那么处理请求的过程就不会涉及CPU资源的争抢;反之,可能形成2个core非常忙、2个core闲着的局面。
要解决这个问题需要做到两点,继续以我们之前的server程序为例。首先,receiver线程要知道各个worker线程都落在哪一个”CPU”上;然后在分派任务时还需要有balance的眼光。要做到第一点,最好是借助sched_affinity功能将线程固定在某个”CPU”上,避免kernel load balance把问题搞复杂了。既然前面我们已经得出了工作线程数等于或略小于CPU数的结论,现在每个线程固定在一个CPU上就是可行的。第二点,我们需要在现有pthread_cond的基础上做一些改进,给进入等待状态的worker线程赋一个优先级,比如每个core的第一个超线程作为第一优先级,第二个超线程为第二优先级。那么在cond唤醒工作线程的时候,我们就可以尽量让worker线程不落到同一个core上。实现上可以利用futex的bitset系列功能,通过bitset来标识优先级,以便在唤醒指定的worker线程。(参阅《》。)
例子
好了,纸上谈兵讲了这么多,得来点实际的例子验证一下。为了简单,就不写什么server程序了,只需要一个生产者线程和若干消费者线程。生产者线程生成一些任务,通过cond+queue将其传递给消费者线程。为了观察在不同任务负载下的程序表现,我们需要控制任务负载。消费者线程在完成任务后通过另一组cond+queue把任务应答给生产者线程,于是生产者就知道当前有多少个任务正在处理中,以便控制生产新任务的节奏。最后,我们通过观察在不同条件下完成一批任务的时间来体会程序的性能。
这里面比较关键的是任务本身的处理逻辑,既然我们讨论的是CPU的负载,任务肯定应该是CPU密集型的任务。然后,单个任务的处理时间不宜太短,否则可能调度过程会成为程序的瓶颈,体现不出CPU的负载问题;另一方面,单个任务的处理时间也不宜太长,否则后知后觉的kernel load balance也能解决问题,体现不出我们主动做load balance的好处(比如任务处理时间是10秒,kernel load balance花费几十毫秒来解决balance问题其实也无伤大雅)。
代码贴在文章最后,编译出来的bin文件是这样的:
$g++ cond.cpp -pthread -O2 $./a.out usage: ./a.out -j job_kind=shm|calc [-t thread_count=1] [-o job_load=1] [-c job_count=10] [-a affinity=0] [-l] [-f filename="./TEST" -n filelength=128M]
- 代码里面准备了两种任务逻辑,”-j shm”是mmap一个文件,然后读取上面的数据做一些运算(文件及其长度由-f和-n参数来限定);”-j calc”是做一些算术运算;
- “-t”参数指定工作线程的线程数;
- “-o”指定任务负载;
- “-c”指定单个线程处理任务的个数;
- “-a”指定是否设置sched_affinity,并且指明跳几个”CPU”放一个worker线程。比如”-a 1″表示把worker线程顺序固定在1、2、3、……号”CPU”上,而”-a 2″表示固定在2、4、6、……号”CPU”上,以此类推。需要注意的是,邻近的”CPU”号并不表示”CPU”在物理上是邻近的,比如在我测试用的机器上,共24个”CPU”,0~11号是每个core的第一个超线程、12~23是第二个超线程。这个细节需要读/proc/cpuinfo来确定。
- “-l”参数指定启用我们增强版的分级cond,启用的话会将0~11号worker作为第一优先级,12~23作为第二优先级(当然,需要配合”-a”参数才有实际意义,否则也不确定这些worker都落在哪些”CPU”上);
首先来看worker线程过多所带来的问题(以下case各运行5次取时间最小值)。
case-1,启240个worker线程,24个任务负载: $./a.out -j calc -t 240 -o 24 total cost: 23790 $./a.out -j shm -t 240 -o 24 total cost: 16827 case-2,启24个worker线程,24个任务负载: $./a.out -j calc -t 24 -o 24 total cost: 23210 $./a.out -j shm -t 24 -o 24 total cost: 16121
case-2效果明显要好略一些。并且在运行过程中如果用top观察的话,你会发现case-1只能压到2200%左右的CPU,而case-2几乎能达到2400%。
在case-1的基础上,如果禁止kernel load balance会怎样?加affinity试试看:
case-3,启240个worker线程,24个任务负载,加affinity: $./a.out -j calc -t 240 -o 24 -a 1 total cost: 27170 $./a.out -j shm -t 240 -o 24 -a 1 total cost: 15351
calc任务比较符合预期,没有kernel load balance的情况下,性能继续下降。
而shm任务则让人大跌眼镜,性能居然提升了!其实这个任务除了CPU之外还很依赖于内存,因为所有任务都工作在同一个文件的mmap上,”CPU”挨得近反而更能发挥内存cache。(可见在这种情况下,kernel load balance其实是帮了倒忙。)
那么,我们将工作线程再调回24,是不是应该更理想?
case-3' $./a.out -j shm -t 24 -o 24 -a 1 total cost: 15133
再来看第二个问题,worker线程站位不均所带来的影响。
case-4,启24个worker线程,12个任务负载: $./a.out -j calc -t 24 -o 12 total cost: 14686 $./a.out -j shm -t 24 -o 12 total cost: 13265 case-5,启24个worker线程,12个任务负载,加affinity,启用分级cond: $./a.out -j calc -t 24 -o 12 -a 1 -l total cost: 12206 $./a.out -j shm -t 24 -o 12 -a 1 -l total cost: 12376
效果还是不错的。改一下”-a”参数,让同一个core的两个超线程都分在同一优先级呢?
case-5' $./a.out -j calc -t 24 -o 12 -a 2 -l total cost: 23510 $./a.out -j shm -t 24 -o 12 -a 2 -l total cost: 15063
由于争抢CPU资源,calc任务性能变得很差,几乎减半。而shm任务由于cache复用所带来的好处,情况还好(比case-3还略好一些)。
这里的任务只是举了calc和shm两个例子,实际情况可能是很复杂的。尽管load balance的问题肯定存在,但是任务会因共享cache而得利、还是因争抢cache而失利?争抢CPU流水线又会造成多大的损失?这些都只能具体问题具体分析。kernel的load balance将负载尽量均摊到离得远的”CPU”上,大多数情况下没有问题。不过我们也看到shm任务中cache共享的收益还是很大的,如果例子更极端一点,肯定会出现承受负载的CPU离得越近,反而效果越好的情况。
另一方面,争抢CPU流水线会有多大损失,也可以简单的分析一下。超线程相当于两个线程共用一套CPU流水线,如果单个线程的代码上下文依赖很严重,指令基本上只能串行工作,无法充分利用流水线,那么流水线的空余能力就可以留给第二个线程使用。反之如果一个线程就能把流水线填满,硬塞两个线程进来肯定就只能有50%的性能(上述calc的例子就差不多是这样)。
为了说明这个问题,我们给calc任务加了一个SERIAL_CALC的宏开关,让它的运算逻辑变成上下文强依赖。然后重跑case-5中的两个命令,我们会看到其实在这种情况下承受负载的CPU离得近一些似乎也问题不大:
case-6,采用SERIAL_CALC运算逻辑,重跑case-5中的calc任务 $g++ cond.cpp -pthread -O2 -DSERIAL_CALC $./a.out -j calc -t 24 -o 12 -a 1 -l total cost: 51269 $./a.out -j calc -t 24 -o 12 -a 2 -l total cost: 56753
最后是代码,有兴趣你还可以尝试更多的case,have fun!
#include#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define CPUS 24 #define FUTEX_WAIT_BITSET 9 #define FUTEX_WAKE_BITSET 10 struct Job { long _input; long _output; }; class JobRunner { public: virtual void run(Job* job) = 0; }; class ShmJobRunner : public JobRunner { public: ShmJobRunner(const char* filepath, size_t length) : _length(length) { int fd = open(filepath, O_RDONLY); _base = (long*)mmap(NULL, _length*sizeof(long), PROT_READ, MAP_SHARED|MAP_POPULATE, fd, 0); if (_base == MAP_FAILED) { printf("FATAL: mmap %s(%lu) failed!\n", filepath, _length*sizeof(long)); abort(); } close(fd); } virtual void run(Job* job) { long i = job->_input % _length; long j = i + _length - 1; const int step = 4; while (i + step < j) { if (_base[i%_length] * _base[j%_length] > 0) { j -= step; } else { i += step; } } job->_output = _base[i%_length]; } private: const long* _base; size_t _length; }; class CalcJobRunner : public JobRunner { public: virtual void run(Job* job) { long v1 = 1; long v2 = 1; long v3 = 1; for (int i = 0; i < job->_input; i++) { #ifndef SERIAL_CALC v1 += v2 + v3; v2 *= 3; v3 *= 5; #else v1 += v2 + v3; v2 = v1 * 5 + v2 * v3; v3 = v1 * 3 + v1 * v2; #endif } job->_output = v1; } }; class JobRunnerCreator { public: static JobRunner* create(const char* name, const char* filepath, size_t filelength) { if (strcmp(name, "shm") == 0) { printf("share memory job\n"); return new ShmJobRunner(filepath, filelength); } else if (strcmp(name, "calc") == 0) { printf("caculation job\n"); return new CalcJobRunner(); } printf("unknown job '%s'\n", name); return NULL; } }; class Cond { public: virtual void lock() = 0; virtual void unlock() = 0; virtual void wait(size_t) = 0; virtual void wake() = 0; }; class NormalCond : public Cond { public: NormalCond() { pthread_mutex_init(&_mutex, NULL); pthread_cond_init(&_cond, NULL); } ~NormalCond() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); } void lock() { pthread_mutex_lock(&_mutex); } void unlock() { pthread_mutex_unlock(&_mutex); } void wait(size_t) { pthread_cond_wait(&_cond, &_mutex); } void wake() { pthread_cond_signal(&_cond); } private: pthread_mutex_t _mutex; pthread_cond_t _cond; }; class LayeredCond : public Cond { public: LayeredCond(size_t layers = 1) : _value(0), _layers(layers) { pthread_mutex_init(&_mutex, NULL); if (_layers > sizeof(int)*8) { printf("FATAL: cannot support such layer %u (max %u)\n", _layers, sizeof(int)*8); abort(); } _waiters = new size_t[_layers]; memset(_waiters, 0, sizeof(size_t)*_layers); } ~LayeredCond() { pthread_mutex_destroy(&_mutex); delete _waiters; _waiters = NULL; } void lock() { pthread_mutex_lock(&_mutex); } void unlock() { pthread_mutex_unlock(&_mutex); } void wait(size_t layer) { if (layer >= _layers) { printf("FATAL: layer overflow (%u/%u)\n", layer, _layers); abort(); } _waiters[layer]++; while (_value == 0) { int value = _value; unlock(); syscall(__NR_futex, &_value, FUTEX_WAIT_BITSET, value, NULL, NULL, layer2mask(layer)); lock(); } _waiters[layer]--; _value--; } void wake() { int mask = ~0; lock(); for (size_t i = 0; i < _layers; i++) { if (_waiters[i] > 0) { mask = layer2mask(i); break; } } _value++; unlock(); syscall(__NR_futex, &_value, FUTEX_WAKE_BITSET, 1, NULL, NULL, mask); } private: int layer2mask(size_t layer) { return 1 << layer; } private: pthread_mutex_t _mutex; int _value; size_t* _waiters; size_t _layers; }; template class Stack { public: Stack(size_t size, size_t cond_layers = 0) : _size(size), _sp(0) { _buf = new T*[_size]; _cond = (cond_layers > 0) ? (Cond*)new LayeredCond(cond_layers) : (Cond*)new NormalCond(); } ~Stack() { delete []_buf; delete _cond; } T* pop(size_t layer = 0) { T* ret = NULL; _cond->lock(); do { if (_sp > 0) { ret = _buf[--_sp]; } else { _cond->wait(layer); } } while (ret == NULL); _cond->unlock(); return ret; } void push(T* obj) { _cond->lock(); if (_sp >= _size) { printf("FATAL: stack overflow\n"); abort(); } _buf[_sp++] = obj; _cond->unlock(); _cond->wake(); } private: const size_t _size; size_t _sp; T** _buf; Cond* _cond; }; inline struct timeval cost_begin() { struct timeval tv; gettimeofday(&tv, NULL); return tv; } inline long cost_end(struct timeval &tv) { struct timeval tv2; gettimeofday(&tv2, NULL); tv2.tv_sec -= tv.tv_sec; tv2.tv_usec -= tv.tv_usec; return tv2.tv_sec*1000+tv2.tv_usec/1000; } struct ThreadParam { size_t layer; Stack * inputQ; Stack * outputQ; JobRunner* runner; }; void* thread_func(void *data) { size_t layer = ((ThreadParam*)data)->layer; Stack * inputQ = ((ThreadParam*)data)->inputQ; Stack * outputQ = ((ThreadParam*)data)->outputQ; JobRunner* runner = ((ThreadParam*)data)->runner; while (1) { Job* job = inputQ->pop(layer); runner->run(job); outputQ->push(job); } return NULL; } void force_cpu(pthread_t t, int n) { cpu_set_t cpus; CPU_ZERO(&cpus); CPU_SET(n, &cpus); if (pthread_setaffinity_np(t, sizeof(cpus), &cpus) != 0) { printf("FATAL: force cpu %d failed: %s\n", n, strerror(errno)); abort(); } } void usage(const char* bin) { printf("usage: %s -j job_kind=shm|calc " "[-t thread_count=1] [-o job_load=1] [-c job_count=10] " "[-a affinity=0] [-l] " "[-f filename=\"./TEST\" -n filelength=128M]\n", bin); abort(); } int main(int argc, char* const* argv) { int THREAD_COUNT = 1; int JOB_LOAD = 1; int JOB_COUNT = 10; int AFFINITY = 0; int LAYER = 0; char JOB_KIND[16] = ""; char FILEPATH[1024] = "./TEST"; size_t LENGTH = 128*1024*1024; for (int i = EOF; (i = getopt(argc, argv, "t:o:c:a:j:lf:n:")) != EOF;) { switch (i) { case 't': THREAD_COUNT = atoi(optarg); break; case 'o': JOB_LOAD = atoi(optarg); break; case 'c': JOB_COUNT = atoi(optarg); break; case 'a': AFFINITY = atoi(optarg); break; case 'l': LAYER = 2; break; case 'j': strncpy(JOB_KIND, optarg, sizeof(JOB_KIND)-1); break; case 'f': strncpy(FILEPATH, optarg, sizeof(FILEPATH)-1); break; case 'n': LENGTH = atoi(optarg); break; default: usage(argv[0]); break; } } JobRunner* runner = JobRunnerCreator::create( JOB_KIND, FILEPATH, LENGTH); if (!runner) { usage(argv[0]); } srand(0); Job jobs[JOB_LOAD]; #ifdef TEST_LOAD for (int i = 0; i < JOB_LOAD; i++) { jobs[i]._input = rand(); struct timeval tv = cost_begin(); runner->run(&jobs[i]); long cost = cost_end(tv); printf("job[%d](%ld)=(%ld) costs: %ld\n", i, jobs[i]._input, jobs[i]._output, cost); } delete runner; return 0; #endif printf("use layer %d\n", LAYER); Stack inputQ(JOB_LOAD, LAYER); Stack outputQ(JOB_LOAD, LAYER); pthread_t t; ThreadParam param[THREAD_COUNT]; printf("thread init: "); for (int i = 0; i < THREAD_COUNT; i++) { int cpu = AFFINITY ? (i/AFFINITY+i%AFFINITY*CPUS/2)%CPUS : -1; size_t layer = !!(LAYER && i % CPUS >= CPUS/2); param[i].inputQ = &inputQ; param[i].outputQ = &outputQ; param[i].runner = runner; param[i].layer = layer; pthread_create(&t, NULL, thread_func, (void*)¶m[i]); if (cpu >= 0) { printf("%d(%d|%d),", i, cpu, layer); force_cpu(t, cpu); } else { printf("%d(*|%d),", i, layer); } usleep(1000); } printf("\n"); struct timeval tv = cost_begin(); for (int i = 0; i < JOB_LOAD; i++) { jobs[i]._input = rand(); inputQ.push(&jobs[i]); } for (int i = 0; i < JOB_LOAD*JOB_COUNT; i++) { Job* job = outputQ.pop(); job->_input = rand(); inputQ.push(job); } for (int i = 0; i < JOB_LOAD; i++) { outputQ.pop(); } long cost = cost_end(tv); printf("total cost: %ld\n", cost); delete runner; return 0; }
程序退出时可考虑对线程池进行回收处理,仅仅作为一个建议,敬请指正。修改后代码如下:
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define CPUS 24
#define FUTEX_WAIT_BITSET 9
#define FUTEX_WAKE_BITSET 10
bool stop = false;
struct Job
{
long _input;
long _output;
};
class JobRunner
{
public:
virtual void run(Job* job) = 0;
};
class ShmJobRunner : public JobRunner
{
public:
ShmJobRunner(const char* filepath, size_t length)
: _length(length) {
int fd = open(filepath, O_RDONLY);
_base = (long*)mmap(NULL, _length*sizeof(long),
PROT_READ, MAP_SHARED|MAP_POPULATE, fd, 0);
if (_base == MAP_FAILED) {
printf(“FATAL: mmap %s(%lu) failed!\n”,
filepath, _length*sizeof(long));
abort();
}
close(fd);
}
virtual void run(Job* job) {
long i = job->_input % _length;
long j = i + _length – 1;
const int step = 4;
while (i + step 0) {
j -= step;
}
else {
i += step;
}
}
job->_output = _base[i%_length];
}
private:
const long* _base;
size_t _length;
};
class CalcJobRunner : public JobRunner
{
public:
virtual void run(Job* job) {
long v1 = 1;
long v2 = 1;
long v3 = 1;
for (int i = 0; i _input; i++) {
#ifndef SERIAL_CALC
v1 += v2 + v3;
v2 *= 3;
v3 *= 5;
#else
v1 += v2 + v3;
v2 = v1 * 5 + v2 * v3;
v3 = v1 * 3 + v1 * v2;
#endif
}
job->_output = v1;
}
};
class JobRunnerCreator
{
public:
static JobRunner* create(const char* name,
const char* filepath, size_t filelength) {
if (strcmp(name, “shm”) == 0) {
printf(“share memory job\n”);
return new ShmJobRunner(filepath, filelength);
}
else if (strcmp(name, “calc”) == 0) {
printf(“caculation job\n”);
return new CalcJobRunner();
}
printf(“unknown job ‘%s’\n”, name);
return NULL;
}
};
class Cond
{
public:
virtual void lock() = 0;
virtual void unlock() = 0;
virtual void wait(size_t) = 0;
virtual void wake() = 0;
};
class NormalCond : public Cond
{
public:
NormalCond() {
pthread_mutex_init(&_mutex, NULL);
pthread_cond_init(&_cond, NULL);
}
~NormalCond() {
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
void lock() { pthread_mutex_lock(&_mutex); }
void unlock() { pthread_mutex_unlock(&_mutex); }
void wait(size_t) { pthread_cond_wait(&_cond, &_mutex); }
void wake() { pthread_cond_signal(&_cond); }
private:
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
class LayeredCond : public Cond
{
public:
LayeredCond(size_t layers = 1) : _value(0), _layers(layers) {
pthread_mutex_init(&_mutex, NULL);
if (_layers > sizeof(int)*8) {
printf(“FATAL: cannot support such layer %u (max %u)\n”,
_layers, sizeof(int)*8);
abort();
}
_waiters = new size_t[_layers];
memset(_waiters, 0, sizeof(size_t)*_layers);
}
~LayeredCond() {
pthread_mutex_destroy(&_mutex);
delete _waiters;
_waiters = NULL;
}
void lock() {
pthread_mutex_lock(&_mutex);
}
void unlock() {
pthread_mutex_unlock(&_mutex);
}
void wait(size_t layer) {
if (layer >= _layers) {
printf(“FATAL: layer overflow (%u/%u)\n”, layer, _layers);
abort();
}
_waiters[layer]++;
while (_value == 0) {
int value = _value;
unlock();
syscall(__NR_futex, &_value, FUTEX_WAIT_BITSET, value,
NULL, NULL, layer2mask(layer));
lock();
}
_waiters[layer]–;
_value–;
}
void wake() {
int mask = ~0;
lock();
for (size_t i = 0; i 0) {
mask = layer2mask(i);
break;
}
}
_value++;
unlock();
syscall(__NR_futex, &_value, FUTEX_WAKE_BITSET, 1,
NULL, NULL, mask);
}
private:
int layer2mask(size_t layer) {
return 1 << layer;
}
private:
pthread_mutex_t _mutex;
int _value;
size_t* _waiters;
size_t _layers;
};
template
class Stack
{
public:
Stack(size_t size, size_t cond_layers = 0) : _size(size), _sp(0) {
_buf = new T*[_size];
_cond = (cond_layers > 0) ?
(Cond*)new LayeredCond(cond_layers) : (Cond*)new NormalCond();
}
~Stack() {
delete []_buf;
delete _cond;
}
T* pop(size_t layer = 0) {
T* ret = NULL;
_cond->lock();
do {
if (_sp > 0) {
ret = _buf[--_sp];
}
else {
_cond->wait(layer);
if (stop)
goto out;
}
} while (ret == NULL);
out: _cond->unlock();
return ret;
}
void push(T* obj) {
_cond->lock();
if (_sp >= _size) {
printf(“FATAL: stack overflow\n”);
abort();
}
_buf[_sp++] = obj;
_cond->unlock();
_cond->wake();
}
void Exit(int thread_count) {
for (int i = 0; i lock();
_cond->wake();
_cond->unlock();
}
}
private:
const size_t _size;
size_t _sp;
T** _buf;
Cond* _cond;
};
inline struct timeval cost_begin()
{
struct timeval tv;
gettimeofday(&tv, NULL);
return tv;
}
inline long cost_end(struct timeval &tv)
{
struct timeval tv2;
gettimeofday(&tv2, NULL);
tv2.tv_sec -= tv.tv_sec;
tv2.tv_usec -= tv.tv_usec;
return tv2.tv_sec*1000+tv2.tv_usec/1000;
}
struct ThreadParam
{
size_t layer;
Stack* inputQ;
Stack* outputQ;
JobRunner* runner;
};
void* thread_func(void *data)
{
size_t layer = ((ThreadParam*)data)->layer;
Stack* inputQ = ((ThreadParam*)data)->inputQ;
Stack* outputQ = ((ThreadParam*)data)->outputQ;
JobRunner* runner = ((ThreadParam*)data)->runner;
while (1) {
Job* job = inputQ->pop(layer);
if (stop)
break;
runner->run(job);
outputQ->push(job);
}
return NULL;
}
void force_cpu(pthread_t t, int n)
{
cpu_set_t cpus;
CPU_ZERO(&cpus);
CPU_SET(n, &cpus);
if (pthread_setaffinity_np(t, sizeof(cpus), &cpus) != 0) {
printf(“FATAL: force cpu %d failed: %s\n”, n, strerror(errno));
abort();
}
}
void usage(const char* bin)
{
printf(“usage: %s -j job_kind=shm|calc ”
“[-t thread_count=1] [-o job_load=1] [-c job_count=10] ”
“[-a affinity=0] [-l] ”
“[-f filename=\"./TEST\" -n filelength=128M]\n”, bin);
abort();
}
int main(int argc, char* const* argv)
{
int THREAD_COUNT = 1;
int JOB_LOAD = 1;
int JOB_COUNT = 10;
int AFFINITY = 0;
int LAYER = 0;
char JOB_KIND[16] = “”;
char FILEPATH[1024] = “./TEST”;
size_t LENGTH = 128*1024*1024;
for (int i = EOF;
(i = getopt(argc, argv, “t:o:c:a:j:lf:n:”)) != EOF;) {
switch (i) {
case ‘t’: THREAD_COUNT = atoi(optarg); break;
case ‘o’: JOB_LOAD = atoi(optarg); break;
case ‘c’: JOB_COUNT = atoi(optarg); break;
case ‘a’: AFFINITY = atoi(optarg); break;
case ‘l’: LAYER = 2; break;
case ‘j’: strncpy(JOB_KIND, optarg, sizeof(JOB_KIND)-1); break;
case ‘f’: strncpy(FILEPATH, optarg, sizeof(FILEPATH)-1); break;
case ‘n’: LENGTH = atoi(optarg); break;
default: usage(argv[0]); break;
}
}
JobRunner* runner = JobRunnerCreator::create(
JOB_KIND, FILEPATH, LENGTH);
if (!runner) {
usage(argv[0]);
}
srand(0);
Job jobs[JOB_LOAD];
#ifdef TEST_LOAD
for (int i = 0; i run(&jobs[i]);
long cost = cost_end(tv);
printf(“job[%d](%ld)=(%ld) costs: %ld\n”,
i, jobs[i]._input, jobs[i]._output, cost);
}
delete runner;
return 0;
#endif
printf(“use layer %d\n”, LAYER);
Stack inputQ(JOB_LOAD, LAYER);
Stack outputQ(JOB_LOAD, LAYER);
pthread_t t[THREAD_COUNT];
ThreadParam param[THREAD_COUNT];
printf(“thread init: “);
for (int i = 0; i = CPUS/2);
param[i].inputQ = &inputQ;
param[i].outputQ = &outputQ;
param[i].runner = runner;
param[i].layer = layer;
pthread_create(&t[i], NULL, thread_func, (void*)¶m[i]);
if (cpu >= 0) {
printf(“%d(%d|%d),”, i, cpu, layer);
force_cpu(t[i], cpu);
}
else {
printf(“%d(*|%d),”, i, layer);
}
usleep(1000);
}
printf(“\n”);
struct timeval tv = cost_begin();
for (int i = 0; i < JOB_LOAD; i++) {
jobs[i]._input = rand();
inputQ.push(&jobs[i]);
}
for (int i = 0; i _input = rand();
inputQ.push(job);
}
for (int i = 0; i < JOB_LOAD; i++) {
outputQ.pop();
}
long cost = cost_end(tv);
printf("total cost: %ld\n", cost);
stop = true;
inputQ.Exit(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++)
pthread_join(t[i], NULL);
delete runner;
return 0;
}