Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1871322
  • 博文数量: 376
  • 博客积分: 2147
  • 博客等级: 大尉
  • 技术积分: 3642
  • 用 户 组: 普通用户
  • 注册时间: 2012-02-06 10:47
文章分类

全部博文(376)

文章存档

2019年(3)

2017年(28)

2016年(15)

2015年(17)

2014年(182)

2013年(16)

2012年(115)

我的朋友

分类: 嵌入式

2015-10-27 14:05:31

说起load balance,一般比较容易想到的是大型服务在多个replica之间的load balance、和kernal的load balance。前者一般只是在流量入口做一下流量分配,逻辑相对简单;而后者则比较复杂,需要不断发现正在运行的各个进程之间的imbalance,然后通过将进程在CPU之间进行迁移,使得各个CPU都被充分利用起来。

而本文想要讨论的load balance有别于以上两种,它是多线程(多进程)server程序内部,各个worker线程(进程)之间的load balance。
考虑一种常用的server模型:一个receiver线程负责接收请求,后面有一个线程池装了一堆worker线程,收到的请求被分派给这些worker进行处理。receiver与worker之间通过pthread_cond+request_queue来进行通信。一般的做法是:receiver将收到的请求放入queue,然后signal一下cond,就OK了。具体哪个worker会被唤醒,那是kernel的事情(实际上kernel会遵循先来后到原则,唤醒先进入等待的进程,参阅《》)。通常情况下这样做就足够了,receiver唤醒worker不需要涉及load balance的逻辑。但是有时候我们还是可以做一些load balance的工作,来提高server的性能。

kernel load balance概述

由于这里的load balance跟kernel的load balance息息相关,所以我们有必要先看看kernel的load balance都做了些什么。详细的内容请参阅《》,这里只做一些简要的概括。

说白了,kernel的load balance就做一件事情: 让系统中RUNNING状态的进程尽可能的被分摊,在每一个调度域上看都是balance的 。怎么理解呢?现在CPU的结构一般有:物理CPU、core、超线程、这么几个层次。”在每一个调度域上看都balance”可以理解为在每一个层次上都balance:每个物理CPU上的总load相当、每个core上的总load相当、每个超线程上的load也相当。

我们在系统中看到的”CPU”都是最底层的超线程这个层次,我们可能会直观的认为把RUNNING状态的进程分摊到每一个”CPU”上就行了,但是实际上kernel的load balance还有更高的要求。假设我们的机器有2个物理CPU、每个物理CPU有2个core、每个core有2个超线程,共8个”CPU”。如果现在有8个RUNNING状态的进程(假设优先级都相同),每个”CPU”各分摊一个进程,那么自然就是balance的。但是如果现在只有4个RUNNING状态的进程(假设优先级都相同),真正的balance并不仅仅是每个进程各自落到一个”CPU”上就行了,而是进一步要求每个物理CPU上跑两个进程、每个core上跑一个进程。
为什么要有这样的强约束呢?因为尽管各个”CPU”逻辑上是独立的(不存在主从关系之类),但它们并非孤立存在。相同物理CPU下的”CPU”会共享cache、相同core下的”CPU”会共享计算资源(所谓的超线程也就是一套流水线跑两个线程)。而共享也就意味着争抢。所以,在RUNNING状态的进程并非正好均摊给每一个”CPU”的情况下,需要考虑更高层次的CPU是否被均摊,以避免cache和CPU流水线的争抢(当然,除了性能,这也体现了kernel的公平性)。

最后再多提一点,kernel的load balance是异步的。为避免占用过多资源,kernel肯定不可能实时监控各个”CPU”的情况,然后面对变化实时的做出反应(当然,实时进程除外,但这不在我们讨论范围内)。

server的load balance考虑

有了kernel的load balance作为铺垫,看看我们server上的receiver线程能做些什么吧。

首先是worker线程的数量问题。如果worker数量过多会发生什么情况?还是假设我们的机器有上述的8个”CPU”,假设我们开了80个worker,再假设这80个线程被平均分派到每一个”CPU”上,等待处理任务。当一堆请求陆续到来的时候,由于我们的receiver没有任何load balance的策略,被唤醒的worker出现在哪个”CPU”上可以说是随机的。你想想,”同时”到来的8个请求正好落到8个不同”CPU”上的概率是多少?是:(70*60*50*40*30*20*10)/(79*78*77*76*75*74*73)=0.34%。也就是说几乎肯定会出现某些”CPU”要处理多个请求、某些”CPU”却闲着没事干的情况,系统的性能可想而知。而等到后知后觉的kernel load balance将这些请求balance到每一个”CPU”上时,可能请求已经处理得差不多了,等到下一批请求到来时,load又还是凌乱的。因为刚刚已经balance好的那些worker线程又被放回到了cond等待队列的尾部,而优先响应新请求的则是那些位于队列头部的未曾被balance过的worker。
那么会不会经历几轮请求之后就能达到balance了呢?如果请求真的是一轮一轮的过来,并且每个请求的处理时间完全相同,那么有可能会达到balance,但是实际情况肯定相差甚远。
解决办法是什么呢?将cond先进先出的队列式等待逻辑改为后进先出的栈式逻辑,或许可以解决问题,但是更好的办法应该是限制worker的数目等于或者略小于”CPU”数目,这样很自然的就balance了。

第二个问题,既然我们承认kernel在各个调度域上的load balance的有意义的,我们server中的receiver线程是不是也可以通过类似的办法来获得收益呢?现在我们吸取了之前的教训,只开了8个worker线程。依靠kernel load balance的作用,这8个线程基本会固定在每一个”CPU”上。假设现在一下子来了4个请求,它们会落到4个不同的”CPU”上,如果运气好,这4个”CPU”分别属于不同的core,那么处理请求的过程就不会涉及CPU资源的争抢;反之,可能形成2个core非常忙、2个core闲着的局面。
要解决这个问题需要做到两点,继续以我们之前的server程序为例。首先,receiver线程要知道各个worker线程都落在哪一个”CPU”上;然后在分派任务时还需要有balance的眼光。要做到第一点,最好是借助sched_affinity功能将线程固定在某个”CPU”上,避免kernel load balance把问题搞复杂了。既然前面我们已经得出了工作线程数等于或略小于CPU数的结论,现在每个线程固定在一个CPU上就是可行的。第二点,我们需要在现有pthread_cond的基础上做一些改进,给进入等待状态的worker线程赋一个优先级,比如每个core的第一个超线程作为第一优先级,第二个超线程为第二优先级。那么在cond唤醒工作线程的时候,我们就可以尽量让worker线程不落到同一个core上。实现上可以利用futex的bitset系列功能,通过bitset来标识优先级,以便在唤醒指定的worker线程。(参阅《》。)

例子

好了,纸上谈兵讲了这么多,得来点实际的例子验证一下。为了简单,就不写什么server程序了,只需要一个生产者线程和若干消费者线程。生产者线程生成一些任务,通过cond+queue将其传递给消费者线程。为了观察在不同任务负载下的程序表现,我们需要控制任务负载。消费者线程在完成任务后通过另一组cond+queue把任务应答给生产者线程,于是生产者就知道当前有多少个任务正在处理中,以便控制生产新任务的节奏。最后,我们通过观察在不同条件下完成一批任务的时间来体会程序的性能。

这里面比较关键的是任务本身的处理逻辑,既然我们讨论的是CPU的负载,任务肯定应该是CPU密集型的任务。然后,单个任务的处理时间不宜太短,否则可能调度过程会成为程序的瓶颈,体现不出CPU的负载问题;另一方面,单个任务的处理时间也不宜太长,否则后知后觉的kernel load balance也能解决问题,体现不出我们主动做load balance的好处(比如任务处理时间是10秒,kernel load balance花费几十毫秒来解决balance问题其实也无伤大雅)。

代码贴在文章最后,编译出来的bin文件是这样的:

$g++ cond.cpp -pthread -O2
$./a.out
usage: ./a.out -j job_kind=shm|calc [-t thread_count=1] 
[-o job_load=1] [-c job_count=10] [-a affinity=0] [-l] 
[-f filename="./TEST" -n filelength=128M] 
  • 代码里面准备了两种任务逻辑,”-j shm”是mmap一个文件,然后读取上面的数据做一些运算(文件及其长度由-f和-n参数来限定);”-j calc”是做一些算术运算;
  • “-t”参数指定工作线程的线程数;
  • “-o”指定任务负载;
  • “-c”指定单个线程处理任务的个数;
  • “-a”指定是否设置sched_affinity,并且指明跳几个”CPU”放一个worker线程。比如”-a 1″表示把worker线程顺序固定在1、2、3、……号”CPU”上,而”-a 2″表示固定在2、4、6、……号”CPU”上,以此类推。需要注意的是,邻近的”CPU”号并不表示”CPU”在物理上是邻近的,比如在我测试用的机器上,共24个”CPU”,0~11号是每个core的第一个超线程、12~23是第二个超线程。这个细节需要读/proc/cpuinfo来确定。
  • “-l”参数指定启用我们增强版的分级cond,启用的话会将0~11号worker作为第一优先级,12~23作为第二优先级(当然,需要配合”-a”参数才有实际意义,否则也不确定这些worker都落在哪些”CPU”上);

首先来看worker线程过多所带来的问题(以下case各运行5次取时间最小值)。

case-1,启240个worker线程,24个任务负载:
$./a.out -j calc -t 240 -o 24
total cost: 23790
$./a.out -j shm -t 240 -o 24
total cost: 16827  case-2,启24个worker线程,24个任务负载:
$./a.out -j calc -t 24 -o 24
total cost: 23210
$./a.out -j shm -t 24 -o 24
total cost: 16121 

case-2效果明显要好略一些。并且在运行过程中如果用top观察的话,你会发现case-1只能压到2200%左右的CPU,而case-2几乎能达到2400%。

在case-1的基础上,如果禁止kernel load balance会怎样?加affinity试试看:

case-3,启240个worker线程,24个任务负载,加affinity:
$./a.out -j calc -t 240 -o 24 -a 1
total cost: 27170
$./a.out -j shm -t 240 -o 24 -a 1
total cost: 15351 

calc任务比较符合预期,没有kernel load balance的情况下,性能继续下降。
而shm任务则让人大跌眼镜,性能居然提升了!其实这个任务除了CPU之外还很依赖于内存,因为所有任务都工作在同一个文件的mmap上,”CPU”挨得近反而更能发挥内存cache。(可见在这种情况下,kernel load balance其实是帮了倒忙。)
那么,我们将工作线程再调回24,是不是应该更理想?

case-3'
$./a.out -j shm -t 24 -o 24 -a 1
total cost: 15133 

再来看第二个问题,worker线程站位不均所带来的影响。

case-4,启24个worker线程,12个任务负载:
$./a.out -j calc -t 24 -o 12
total cost: 14686
$./a.out -j shm -t 24 -o 12
total cost: 13265  case-5,启24个worker线程,12个任务负载,加affinity,启用分级cond:
$./a.out -j calc -t 24 -o 12 -a 1 -l
total cost: 12206
$./a.out -j shm -t 24 -o 12 -a 1 -l
total cost: 12376 

效果还是不错的。改一下”-a”参数,让同一个core的两个超线程都分在同一优先级呢?

case-5'
$./a.out -j calc -t 24 -o 12 -a 2 -l
total cost: 23510
$./a.out -j shm -t 24 -o 12 -a 2 -l
total cost: 15063 

由于争抢CPU资源,calc任务性能变得很差,几乎减半。而shm任务由于cache复用所带来的好处,情况还好(比case-3还略好一些)。

这里的任务只是举了calc和shm两个例子,实际情况可能是很复杂的。尽管load balance的问题肯定存在,但是任务会因共享cache而得利、还是因争抢cache而失利?争抢CPU流水线又会造成多大的损失?这些都只能具体问题具体分析。kernel的load balance将负载尽量均摊到离得远的”CPU”上,大多数情况下没有问题。不过我们也看到shm任务中cache共享的收益还是很大的,如果例子更极端一点,肯定会出现承受负载的CPU离得越近,反而效果越好的情况。
另一方面,争抢CPU流水线会有多大损失,也可以简单的分析一下。超线程相当于两个线程共用一套CPU流水线,如果单个线程的代码上下文依赖很严重,指令基本上只能串行工作,无法充分利用流水线,那么流水线的空余能力就可以留给第二个线程使用。反之如果一个线程就能把流水线填满,硬塞两个线程进来肯定就只能有50%的性能(上述calc的例子就差不多是这样)。
为了说明这个问题,我们给calc任务加了一个SERIAL_CALC的宏开关,让它的运算逻辑变成上下文强依赖。然后重跑case-5中的两个命令,我们会看到其实在这种情况下承受负载的CPU离得近一些似乎也问题不大:

case-6,采用SERIAL_CALC运算逻辑,重跑case-5中的calc任务
$g++ cond.cpp -pthread -O2 -DSERIAL_CALC
$./a.out -j calc -t 24 -o 12 -a 1 -l
total cost: 51269
$./a.out -j calc -t 24 -o 12 -a 2 -l
total cost: 56753 

最后是代码,有兴趣你还可以尝试更多的case,have fun!

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#define CPUS    24
#define FUTEX_WAIT_BITSET   9
#define FUTEX_WAKE_BITSET   10

struct Job
{
    long _input;
    long _output;
};

class JobRunner
{
public:
    virtual void run(Job* job) = 0;
};

class ShmJobRunner : public JobRunner
{
public:
    ShmJobRunner(const char* filepath, size_t length)
            : _length(length) {
        int fd = open(filepath, O_RDONLY);
        _base = (long*)mmap(NULL, _length*sizeof(long),
                PROT_READ, MAP_SHARED|MAP_POPULATE, fd, 0);
        if (_base == MAP_FAILED) {
            printf("FATAL: mmap %s(%lu) failed!\n",
                    filepath, _length*sizeof(long));
            abort();
        }
        close(fd);
    }
    virtual void run(Job* job) {
        long i = job->_input % _length;
        long j = i + _length - 1;
        const int step = 4;
        while (i + step < j) {
            if (_base[i%_length] * _base[j%_length] > 0) {
                j -= step;
            }
            else {
                i += step;
            }
        }
        job->_output = _base[i%_length];
    }
private:
    const long* _base;
    size_t _length;
};

class CalcJobRunner : public JobRunner
{
public:
    virtual void run(Job* job) {
        long v1 = 1;
        long v2 = 1;
        long v3 = 1;
        for (int i = 0; i < job->_input; i++) {
#ifndef SERIAL_CALC
            v1 += v2 + v3;
            v2 *= 3;
            v3 *= 5;
#else
            v1 += v2 + v3;
            v2 = v1 * 5 + v2 * v3;
            v3 = v1 * 3 + v1 * v2;
#endif
        }
        job->_output = v1;
    }
};

class JobRunnerCreator
{
public:
    static JobRunner* create(const char* name,
            const char* filepath, size_t filelength) {
        if (strcmp(name, "shm") == 0) {
            printf("share memory job\n");
            return new ShmJobRunner(filepath, filelength);
        }
        else if (strcmp(name, "calc") == 0) {
            printf("caculation job\n");
            return new CalcJobRunner();
        }
        printf("unknown job '%s'\n", name);
        return NULL;
    }
};

class Cond
{
public:
    virtual void lock() = 0;
    virtual void unlock() = 0;
    virtual void wait(size_t) = 0;
    virtual void wake() = 0;
};

class NormalCond : public Cond
{
public:
    NormalCond() {
        pthread_mutex_init(&_mutex, NULL);
        pthread_cond_init(&_cond, NULL);
    }
    ~NormalCond() {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
    void lock() { pthread_mutex_lock(&_mutex); }
    void unlock() { pthread_mutex_unlock(&_mutex); }
    void wait(size_t) { pthread_cond_wait(&_cond, &_mutex); }
    void wake() { pthread_cond_signal(&_cond); }
private:
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};

class LayeredCond : public Cond
{
public:
    LayeredCond(size_t layers = 1) : _value(0), _layers(layers) {
        pthread_mutex_init(&_mutex, NULL);
        if (_layers > sizeof(int)*8) {
            printf("FATAL: cannot support such layer %u (max %u)\n",
                    _layers, sizeof(int)*8);
            abort();
        }
        _waiters = new size_t[_layers];
        memset(_waiters, 0, sizeof(size_t)*_layers);
    }
    ~LayeredCond() {
        pthread_mutex_destroy(&_mutex);
        delete _waiters;
        _waiters = NULL;
    }
    void lock() {
        pthread_mutex_lock(&_mutex);
    }
    void unlock() {
        pthread_mutex_unlock(&_mutex);
    }
    void wait(size_t layer) {
        if (layer >= _layers) {
            printf("FATAL: layer overflow (%u/%u)\n", layer, _layers);
            abort();
        }
        _waiters[layer]++;
        while (_value == 0) {
            int value = _value;
            unlock();
            syscall(__NR_futex, &_value, FUTEX_WAIT_BITSET, value,
                    NULL, NULL, layer2mask(layer));
            lock();
        }
        _waiters[layer]--;
        _value--;
    }
    void wake() {
        int mask = ~0;
        lock();
        for (size_t i = 0; i < _layers; i++) {
            if (_waiters[i] > 0) {
                mask = layer2mask(i);
                break;
            }
        }
        _value++;
        unlock();
        syscall(__NR_futex, &_value, FUTEX_WAKE_BITSET, 1,
                NULL, NULL, mask);
    }
private:
    int layer2mask(size_t layer) {
        return 1 << layer;
    }
private:
    pthread_mutex_t _mutex;
    int _value;
    size_t* _waiters;
    size_t _layers;
};

template
class Stack
{
public:
    Stack(size_t size, size_t cond_layers = 0) : _size(size), _sp(0) {
        _buf = new T*[_size];
        _cond = (cond_layers > 0) ?
            (Cond*)new LayeredCond(cond_layers) : (Cond*)new NormalCond();
    }
    ~Stack() {
        delete []_buf;
        delete _cond;
    }
    T* pop(size_t layer = 0) {
        T* ret = NULL;
        _cond->lock();
        do {
            if (_sp > 0) {
                ret = _buf[--_sp];
            }
            else {
                _cond->wait(layer);
            }
        } while (ret == NULL);
        _cond->unlock();
        return ret;
    }
    void push(T* obj) {
        _cond->lock();
        if (_sp >= _size) {
            printf("FATAL: stack overflow\n");
            abort();
        }
        _buf[_sp++] = obj;
        _cond->unlock();
        _cond->wake();
    }
private:
    const size_t _size;
    size_t _sp;
    T** _buf;
    Cond* _cond;
};

inline struct timeval cost_begin()
{
    struct timeval tv;
    gettimeofday(&tv, NULL);
    return tv;
}

inline long cost_end(struct timeval &tv)
{
    struct timeval tv2;
    gettimeofday(&tv2, NULL);
    tv2.tv_sec -= tv.tv_sec;
    tv2.tv_usec -= tv.tv_usec;
    return tv2.tv_sec*1000+tv2.tv_usec/1000;
}

struct ThreadParam
{
    size_t layer;
    Stack* inputQ;
    Stack* outputQ;
    JobRunner* runner;
};

void* thread_func(void *data)
{
    size_t layer = ((ThreadParam*)data)->layer;
    Stack* inputQ = ((ThreadParam*)data)->inputQ;
    Stack* outputQ = ((ThreadParam*)data)->outputQ;
    JobRunner* runner = ((ThreadParam*)data)->runner;

    while (1) {
        Job* job = inputQ->pop(layer);
        runner->run(job);
        outputQ->push(job);
    }
    return NULL;
}

void force_cpu(pthread_t t, int n)
{
    cpu_set_t cpus;
    CPU_ZERO(&cpus);
    CPU_SET(n, &cpus);
    if (pthread_setaffinity_np(t, sizeof(cpus), &cpus) != 0) {
        printf("FATAL: force cpu %d failed: %s\n", n, strerror(errno));
        abort();
    }
}

void usage(const char* bin)
{
    printf("usage: %s -j job_kind=shm|calc "
        "[-t thread_count=1] [-o job_load=1] [-c job_count=10] "
        "[-a affinity=0] [-l] "
        "[-f filename=\"./TEST\" -n filelength=128M]\n", bin);
    abort();
}

int main(int argc, char* const* argv)
{
    int THREAD_COUNT = 1;
    int JOB_LOAD = 1;
    int JOB_COUNT = 10;
    int AFFINITY = 0;
    int LAYER = 0;
    char JOB_KIND[16] = "";
    char FILEPATH[1024] = "./TEST";
    size_t LENGTH = 128*1024*1024;
    for (int i = EOF;
        (i = getopt(argc, argv, "t:o:c:a:j:lf:n:")) != EOF;) {
        switch (i) {
        case 't': THREAD_COUNT = atoi(optarg); break;
        case 'o': JOB_LOAD = atoi(optarg); break;
        case 'c': JOB_COUNT = atoi(optarg); break;
        case 'a': AFFINITY = atoi(optarg); break;
        case 'l': LAYER = 2; break;
        case 'j': strncpy(JOB_KIND, optarg, sizeof(JOB_KIND)-1); break;
        case 'f': strncpy(FILEPATH, optarg, sizeof(FILEPATH)-1); break;
        case 'n': LENGTH = atoi(optarg); break;
        default: usage(argv[0]); break;
        }
    }
    JobRunner* runner = JobRunnerCreator::create(
            JOB_KIND, FILEPATH, LENGTH);
    if (!runner) {
        usage(argv[0]);
    }

    srand(0);
    Job jobs[JOB_LOAD];

#ifdef TEST_LOAD
    for (int i = 0; i < JOB_LOAD; i++) {
        jobs[i]._input = rand();
        struct timeval tv = cost_begin();
        runner->run(&jobs[i]);
        long cost = cost_end(tv);
        printf("job[%d](%ld)=(%ld) costs: %ld\n",
                i, jobs[i]._input, jobs[i]._output, cost);
    }
    delete runner;
    return 0;
#endif

    printf("use layer %d\n", LAYER);
    Stack inputQ(JOB_LOAD, LAYER);
    Stack outputQ(JOB_LOAD, LAYER);

    pthread_t t;
    ThreadParam param[THREAD_COUNT];

    printf("thread init: ");
    for (int i = 0; i < THREAD_COUNT; i++) {
        int cpu = AFFINITY ? (i/AFFINITY+i%AFFINITY*CPUS/2)%CPUS : -1;
        size_t layer = !!(LAYER && i % CPUS >= CPUS/2);
        param[i].inputQ = &inputQ;
        param[i].outputQ = &outputQ;
        param[i].runner = runner;
        param[i].layer = layer;
        pthread_create(&t, NULL, thread_func, (void*)¶m[i]);
        if (cpu >= 0) {
            printf("%d(%d|%d),", i, cpu, layer);
            force_cpu(t, cpu);
        }
        else {
            printf("%d(*|%d),", i, layer);
        }
        usleep(1000);
    }
    printf("\n");

    struct timeval tv = cost_begin();
    for (int i = 0; i < JOB_LOAD; i++) {
        jobs[i]._input = rand();
        inputQ.push(&jobs[i]);
    }
    for (int i = 0; i < JOB_LOAD*JOB_COUNT; i++) {
        Job* job = outputQ.pop();
        job->_input = rand();
        inputQ.push(job);
    }
    for (int i = 0; i < JOB_LOAD; i++) {
        outputQ.pop();
    }
    long cost = cost_end(tv);
    printf("total cost: %ld\n", cost);

    delete runner;
    return 0;
} 



ONE RESPONSE


  1.  on 10 十二 2014

    程序退出时可考虑对线程池进行回收处理,仅仅作为一个建议,敬请指正。修改后代码如下:
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #include

    #define CPUS 24
    #define FUTEX_WAIT_BITSET 9
    #define FUTEX_WAKE_BITSET 10
    bool stop = false;
    struct Job
    {
    long _input;
    long _output;
    };

    class JobRunner
    {
    public:
    virtual void run(Job* job) = 0;
    };

    class ShmJobRunner : public JobRunner
    {
    public:
    ShmJobRunner(const char* filepath, size_t length)
    : _length(length) {
    int fd = open(filepath, O_RDONLY);
    _base = (long*)mmap(NULL, _length*sizeof(long),
    PROT_READ, MAP_SHARED|MAP_POPULATE, fd, 0);
    if (_base == MAP_FAILED) {
    printf(“FATAL: mmap %s(%lu) failed!\n”,
    filepath, _length*sizeof(long));
    abort();
    }
    close(fd);
    }
    virtual void run(Job* job) {
    long i = job->_input % _length;
    long j = i + _length – 1;
    const int step = 4;
    while (i + step 0) {
    j -= step;
    }
    else {
    i += step;
    }
    }
    job->_output = _base[i%_length];
    }
    private:
    const long* _base;
    size_t _length;
    };

    class CalcJobRunner : public JobRunner
    {
    public:
    virtual void run(Job* job) {
    long v1 = 1;
    long v2 = 1;
    long v3 = 1;
    for (int i = 0; i _input; i++) {
    #ifndef SERIAL_CALC
    v1 += v2 + v3;
    v2 *= 3;
    v3 *= 5;
    #else
    v1 += v2 + v3;
    v2 = v1 * 5 + v2 * v3;
    v3 = v1 * 3 + v1 * v2;
    #endif
    }
    job->_output = v1;
    }
    };

    class JobRunnerCreator
    {
    public:
    static JobRunner* create(const char* name,
    const char* filepath, size_t filelength) {
    if (strcmp(name, “shm”) == 0) {
    printf(“share memory job\n”);
    return new ShmJobRunner(filepath, filelength);
    }
    else if (strcmp(name, “calc”) == 0) {
    printf(“caculation job\n”);
    return new CalcJobRunner();
    }
    printf(“unknown job ‘%s’\n”, name);
    return NULL;
    }
    };

    class Cond
    {
    public:
    virtual void lock() = 0;
    virtual void unlock() = 0;
    virtual void wait(size_t) = 0;
    virtual void wake() = 0;
    };

    class NormalCond : public Cond
    {
    public:
    NormalCond() {
    pthread_mutex_init(&_mutex, NULL);
    pthread_cond_init(&_cond, NULL);
    }
    ~NormalCond() {
    pthread_mutex_destroy(&_mutex);
    pthread_cond_destroy(&_cond);
    }
    void lock() { pthread_mutex_lock(&_mutex); }
    void unlock() { pthread_mutex_unlock(&_mutex); }
    void wait(size_t) { pthread_cond_wait(&_cond, &_mutex); }
    void wake() { pthread_cond_signal(&_cond); }
    private:
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
    };

    class LayeredCond : public Cond
    {
    public:
    LayeredCond(size_t layers = 1) : _value(0), _layers(layers) {
    pthread_mutex_init(&_mutex, NULL);
    if (_layers > sizeof(int)*8) {
    printf(“FATAL: cannot support such layer %u (max %u)\n”,
    _layers, sizeof(int)*8);
    abort();
    }
    _waiters = new size_t[_layers];
    memset(_waiters, 0, sizeof(size_t)*_layers);
    }
    ~LayeredCond() {
    pthread_mutex_destroy(&_mutex);
    delete _waiters;
    _waiters = NULL;
    }
    void lock() {
    pthread_mutex_lock(&_mutex);
    }
    void unlock() {
    pthread_mutex_unlock(&_mutex);
    }
    void wait(size_t layer) {
    if (layer >= _layers) {
    printf(“FATAL: layer overflow (%u/%u)\n”, layer, _layers);
    abort();
    }
    _waiters[layer]++;
    while (_value == 0) {
    int value = _value;
    unlock();
    syscall(__NR_futex, &_value, FUTEX_WAIT_BITSET, value,
    NULL, NULL, layer2mask(layer));
    lock();
    }
    _waiters[layer]–;
    _value–;
    }
    void wake() {
    int mask = ~0;
    lock();
    for (size_t i = 0; i 0) {
    mask = layer2mask(i);
    break;
    }
    }
    _value++;
    unlock();
    syscall(__NR_futex, &_value, FUTEX_WAKE_BITSET, 1,
    NULL, NULL, mask);
    }
    private:
    int layer2mask(size_t layer) {
    return 1 << layer;
    }
    private:
    pthread_mutex_t _mutex;
    int _value;
    size_t* _waiters;
    size_t _layers;
    };

    template
    class Stack
    {
    public:
    Stack(size_t size, size_t cond_layers = 0) : _size(size), _sp(0) {
    _buf = new T*[_size];
    _cond = (cond_layers > 0) ?
    (Cond*)new LayeredCond(cond_layers) : (Cond*)new NormalCond();
    }
    ~Stack() {
    delete []_buf;
    delete _cond;
    }
    T* pop(size_t layer = 0) {
    T* ret = NULL;
    _cond->lock();
    do {
    if (_sp > 0) {
    ret = _buf[--_sp];
    }
    else {
    _cond->wait(layer);
    if (stop)
    goto out;
    }
    } while (ret == NULL);
    out: _cond->unlock();
    return ret;
    }
    void push(T* obj) {
    _cond->lock();
    if (_sp >= _size) {
    printf(“FATAL: stack overflow\n”);
    abort();
    }
    _buf[_sp++] = obj;
    _cond->unlock();
    _cond->wake();
    }
    void Exit(int thread_count) {
    for (int i = 0; i lock();
    _cond->wake();
    _cond->unlock();
    }
    }
    private:
    const size_t _size;
    size_t _sp;
    T** _buf;
    Cond* _cond;
    };

    inline struct timeval cost_begin()
    {
    struct timeval tv;
    gettimeofday(&tv, NULL);
    return tv;
    }

    inline long cost_end(struct timeval &tv)
    {
    struct timeval tv2;
    gettimeofday(&tv2, NULL);
    tv2.tv_sec -= tv.tv_sec;
    tv2.tv_usec -= tv.tv_usec;
    return tv2.tv_sec*1000+tv2.tv_usec/1000;
    }

    struct ThreadParam
    {
    size_t layer;
    Stack* inputQ;
    Stack* outputQ;
    JobRunner* runner;
    };

    void* thread_func(void *data)
    {
    size_t layer = ((ThreadParam*)data)->layer;
    Stack* inputQ = ((ThreadParam*)data)->inputQ;
    Stack* outputQ = ((ThreadParam*)data)->outputQ;
    JobRunner* runner = ((ThreadParam*)data)->runner;

    while (1) {
    Job* job = inputQ->pop(layer);
    if (stop)
    break;
    runner->run(job);
    outputQ->push(job);
    }
    return NULL;
    }

    void force_cpu(pthread_t t, int n)
    {
    cpu_set_t cpus;
    CPU_ZERO(&cpus);
    CPU_SET(n, &cpus);
    if (pthread_setaffinity_np(t, sizeof(cpus), &cpus) != 0) {
    printf(“FATAL: force cpu %d failed: %s\n”, n, strerror(errno));
    abort();
    }
    }

    void usage(const char* bin)
    {
    printf(“usage: %s -j job_kind=shm|calc ”
    “[-t thread_count=1] [-o job_load=1] [-c job_count=10] ”
    “[-a affinity=0] [-l] ”
    “[-f filename=\"./TEST\" -n filelength=128M]\n”, bin);
    abort();
    }

    int main(int argc, char* const* argv)
    {
    int THREAD_COUNT = 1;
    int JOB_LOAD = 1;
    int JOB_COUNT = 10;
    int AFFINITY = 0;
    int LAYER = 0;
    char JOB_KIND[16] = “”;
    char FILEPATH[1024] = “./TEST”;
    size_t LENGTH = 128*1024*1024;
    for (int i = EOF;
    (i = getopt(argc, argv, “t:o:c:a:j:lf:n:”)) != EOF;) {
    switch (i) {
    case ‘t’: THREAD_COUNT = atoi(optarg); break;
    case ‘o’: JOB_LOAD = atoi(optarg); break;
    case ‘c’: JOB_COUNT = atoi(optarg); break;
    case ‘a’: AFFINITY = atoi(optarg); break;
    case ‘l’: LAYER = 2; break;
    case ‘j’: strncpy(JOB_KIND, optarg, sizeof(JOB_KIND)-1); break;
    case ‘f’: strncpy(FILEPATH, optarg, sizeof(FILEPATH)-1); break;
    case ‘n’: LENGTH = atoi(optarg); break;
    default: usage(argv[0]); break;
    }
    }
    JobRunner* runner = JobRunnerCreator::create(
    JOB_KIND, FILEPATH, LENGTH);
    if (!runner) {
    usage(argv[0]);
    }

    srand(0);
    Job jobs[JOB_LOAD];

    #ifdef TEST_LOAD
    for (int i = 0; i run(&jobs[i]);
    long cost = cost_end(tv);
    printf(“job[%d](%ld)=(%ld) costs: %ld\n”,
    i, jobs[i]._input, jobs[i]._output, cost);
    }
    delete runner;
    return 0;
    #endif

    printf(“use layer %d\n”, LAYER);
    Stack inputQ(JOB_LOAD, LAYER);
    Stack outputQ(JOB_LOAD, LAYER);

    pthread_t t[THREAD_COUNT];
    ThreadParam param[THREAD_COUNT];

    printf(“thread init: “);
    for (int i = 0; i = CPUS/2);
    param[i].inputQ = &inputQ;
    param[i].outputQ = &outputQ;
    param[i].runner = runner;
    param[i].layer = layer;
    pthread_create(&t[i], NULL, thread_func, (void*)¶m[i]);
    if (cpu >= 0) {
    printf(“%d(%d|%d),”, i, cpu, layer);
    force_cpu(t[i], cpu);
    }
    else {
    printf(“%d(*|%d),”, i, layer);
    }
    usleep(1000);
    }
    printf(“\n”);

    struct timeval tv = cost_begin();
    for (int i = 0; i < JOB_LOAD; i++) {
    jobs[i]._input = rand();
    inputQ.push(&jobs[i]);
    }
    for (int i = 0; i _input = rand();
    inputQ.push(job);
    }
    for (int i = 0; i < JOB_LOAD; i++) {
    outputQ.pop();
    }
    long cost = cost_end(tv);
    printf("total cost: %ld\n", cost);
    stop = true;
    inputQ.Exit(THREAD_COUNT);
    for (int i = 0; i < THREAD_COUNT; i++)
    pthread_join(t[i], NULL);
    delete runner;
    return 0;
    }


阅读(1598) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~