Chinaunix首页 | 论坛 | 博客
  • 博客访问: 60191
  • 博文数量: 40
  • 博客积分: 1607
  • 博客等级: 上尉
  • 技术积分: 382
  • 用 户 组: 普通用户
  • 注册时间: 2009-12-09 16:35
文章分类

全部博文(40)

文章存档

2011年(1)

2010年(30)

2009年(9)

我的朋友

分类: LINUX

2010-02-28 14:16:50

/*-
 * Copyright (c) 2006 Verdens Gang AS
 * Copyright (c) 2006-2008 Linpro AS
 * All rights reserved.
 *
 * Author: Poul-Henning Kamp
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 * notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 * notice, this list of conditions and the following disclaimer in the
 * documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 *
 * $Id: cache_pool.c 3999 2009-03-23 14:51:24Z tfheen $
 *
 * We maintain a number of worker thread pools, to spread lock contention.
 *
 * Pools can be added on the fly, as a means to mitigate lock contention,
 * but can only be removed again by a restart. (XXX: we could fix that)
 *
 * Two threads herd the pools, one eliminates idle threads and aggregates
 * statistics for all the pools, the other thread creates new threads
 * on demand, subject to various numerical constraints.
 *
 * The algorithm for when to create threads needs to be reactive enough
 * to handle startup spikes, but sufficiently attenuated to not cause
 * thread pileups. This remains subject for improvement.
 */


#include "config.h"

#include <sys/types.h>
#include <sys/uio.h>

#ifdef SENDFILE_WORKS
#if defined(__FreeBSD__) || defined(__DragonFly__)
#include <sys/socket.h>
#elif defined(__linux__)
#include <sys/sendfile.h>
#elif defined(__sun)
#include <sys/sendfile.h>
#else
#error Unknown sendfile() implementation
#endif
#endif /* SENDFILE_WORKS */

#include <errno.h>
#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include "shmlog.h"
#include "vcl.h"
#include "cli_priv.h"
#include "cache.h"
#include "stevedore.h"
#include "hash_slinger.h"
#include "vsha256.h"

VTAILQ_HEAD(workerhead, worker);

/* Number of work requests queued in excess of worker threads available */

struct wq {
    unsigned        magic;
#define WQ_MAGIC        0x606658fa
    struct lock        mtx;
    struct workerhead    idle;
    VTAILQ_HEAD(, workreq)    overflow;
    unsigned        nthr;
    unsigned        nqueue;
    unsigned        lqueue;
    uintmax_t        ndrop;
    uintmax_t        noverflow;
};

static struct wq        **wq;
static unsigned            nwq;
static unsigned            ovfl_max;
static unsigned            nthr_max;

static pthread_cond_t        herder_cond;
static struct lock        herder_mtx;

/*--------------------------------------------------------------------
 * Write data to fd
 * We try to use writev() if possible in order to minimize number of
 * syscalls made and packets sent. It also just might allow the worker
 * thread to complete the request without holding stuff locked.
 */


void
WRW_Reserve(struct worker *w, int *fd)
{

    CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
    AZ(w->wfd);
    w->werr = 0;
    w->liov = 0;
    w->niov = 0;
    w->wfd = fd;
}

void
WRW_Release(struct worker *w)
{

    CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
    w->werr = 0;
    w->liov = 0;
    w->niov = 0;
    w->wfd = NULL;
}

unsigned
WRW_Flush(struct worker *w)
{
    ssize_t i;

    CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
    AN(w->wfd);
    if (*w->wfd >= 0 && w->niov > 0 && w->werr == 0) {
        i = writev(*w->wfd, w->iov, w->niov);
        if (i != w->liov) {
            w->werr++;
            WSL(w, SLT_Debug, *w->wfd,
             "Write error, len = %d/%d, errno = %s",
             i, w->liov, strerror(errno));
        }
    }
    w->liov = 0;
    w->niov = 0;
    return (w->werr);
}

unsigned
WRW_FlushRelease(struct worker *w)
{
    unsigned u;

    CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
    AN(w->wfd);
    u = WRW_Flush(w);
    WRW_Release(w);
    return (u);
}

unsigned
WRW_WriteH(struct worker *w, const txt *hh, const char *suf)
{
    unsigned u;

    CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
    AN(w->wfd);
    AN(w);
    AN(hh);
    AN(hh->b);
    AN(hh->e);
    u = WRW_Write(w, hh->b, hh->e - hh->b);
    if (suf != NULL)
        u += WRW_Write(w, suf, -1);
    return (u);
}

unsigned
WRW_Write(struct worker *w, const void *ptr, int len)
{

    CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
    AN(w->wfd);
    if (len == 0 || *w->wfd < 0)
        return (0);
    if (len == -1)
        len = strlen(ptr);
    if (w->niov == MAX_IOVS)
        (void)WRW_Flush(w);
    w->iov[w->niov].iov_base = TRUST_ME(ptr);
    w->iov[w->niov].iov_len = len;
    w->liov += len;
    w->niov++;
    return (len);
}

#ifdef SENDFILE_WORKS
void
WRW_Sendfile(struct worker *w, int fd, off_t off, unsigned len)
{

    CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
    AN(w->wfd);
    assert(fd >= 0);
    assert(len > 0);

#if defined(__FreeBSD__) || defined(__DragonFly__)
    do {
        struct sf_hdtr sfh;
        memset(&sfh, 0, sizeof sfh);
        if (w->niov > 0) {
            sfh.headers = w->iov;
            sfh.hdr_cnt = w->niov;
        }
        if (sendfile(fd, *w->wfd, off, len, &sfh, NULL, 0) != 0)
            w->werr++;
        w->liov = 0;
        w->niov = 0;
    } while (0);
#elif defined(__linux__)
    do {
        if (WRK_Flush(w) == 0 &&
         sendfile(*w->wfd, fd, &off, len) != len)
            w->werr++;
    } while (0);
#elif defined(__sun) && defined(HAVE_SENDFILEV)
    do {
        sendfilevec_t svvec[HTTP_HDR_MAX * 2 + 1];
        size_t xferred = 0, expected = 0;
        int i;
        for (i = 0; i < w->niov; i++) {
            svvec[i].sfv_fd = SFV_FD_SELF;
            svvec[i].sfv_flag = 0;
            svvec[i].sfv_off = (off_t) w->iov[i].iov_base;
            svvec[i].sfv_len = w->iov[i].iov_len;
            expected += svvec[i].sfv_len;
        }
        svvec[i].sfv_fd = fd;
        svvec[i].sfv_flag = 0;
        svvec[i].sfv_off = off;
        svvec[i].sfv_len = len;
        expected += svvec[i].sfv_len;
        if (sendfilev(*w->wfd, svvec, i, &xferred) == -1 ||
         xferred != expected)
            w->werr++;
        w->liov = 0;
        w->niov = 0;
    } while (0);
#elif defined(__sun) && defined(HAVE_SENDFILE)
    do {
        if (WRK_Flush(w) == 0 &&
         sendfile(*w->wfd, fd, &off, len) != len)
            w->werr++;
    } while (0);
#else
#error Unknown sendfile() implementation
#endif
}
#endif /* SENDFILE_WORKS */

/*--------------------------------------------------------------------*/

static void *
wrk_thread(void *priv)
{
    struct worker *w, ww;
    struct wq *qp;
    unsigned char wlog[params->shm_workspace];
    struct SHA256Context sha256;

    THR_SetName("cache-worker");
    w = &ww;
    CAST_OBJ_NOTNULL(qp, priv, WQ_MAGIC);
    memset(w, 0, sizeof *w);
    w->magic = WORKER_MAGIC;
    w->lastused = NAN;
    w->wlb = w->wlp = wlog;
    w->wle = wlog + sizeof wlog;
    w->sha256ctx = &sha256;
    AZ(pthread_cond_init(&w->cond, NULL));

    VSL(SLT_WorkThread, 0, "%p start", w);

    Lck_Lock(&qp->mtx);
    qp->nthr++;
    while (1) {
        CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);

        /* Process overflow requests, if any */
        w->wrq = VTAILQ_FIRST(&qp->overflow);
        if (w->wrq != NULL) {
            VTAILQ_REMOVE(&qp->overflow, w->wrq, list);
            qp->nqueue--;
        } else {
            if (isnan(w->lastused))
                w->lastused = TIM_real();
            VTAILQ_INSERT_HEAD(&qp->idle, w, list);
            Lck_CondWait(&w->cond, &qp->mtx);
        }
        if (w->wrq == NULL)
            break;
        Lck_Unlock(&qp->mtx);
        AN(w->wrq);
        AN(w->wrq->func);
        w->lastused = NAN;
        w->wrq->func(w, w->wrq->priv);
        AZ(w->wfd);
        assert(w->wlp == w->wlb);
        w->wrq = NULL;
        Lck_Lock(&qp->mtx);
    }
    qp->nthr--;
    Lck_Unlock(&qp->mtx);

    VSL(SLT_WorkThread, 0, "%p end", w);
    if (w->vcl != NULL)
        VCL_Rel(&w->vcl);
    AZ(pthread_cond_destroy(&w->cond));
    if (w->srcaddr != NULL)
        free(w->srcaddr);
    if (w->nobjhead != NULL) {
        Lck_Delete(&w->nobjhead->mtx);
        FREE_OBJ(w->nobjhead);
    }
    if (w-> NULL)
        STV_free(w->nobj->objstore);
    return (NULL);
}

/*--------------------------------------------------------------------
 * Queue a workrequest if possible.
 *
 * Return zero if the request was queued, negative if it wasn't.
 */


int
WRK_Queue(struct workreq *wrq)
{
    struct worker *w;
    struct wq *qp;
    static unsigned nq = 0;
    unsigned onq;

    /*
     * Select which pool we issue to
     * XXX: better alg ?
     * XXX: per CPU ?
     */

    onq = nq + 1;
    if (onq >= nwq)
        onq = 0;
    qp = wq[onq];
    nq = onq;

    Lck_Lock(&qp->mtx);

    /* If there are idle threads, we tickle the first one into action */
    w = VTAILQ_FIRST(&qp->idle);
    if (w != NULL) {
        VTAILQ_REMOVE(&qp->idle, w, list);
        Lck_Unlock(&qp->mtx);
        w->wrq = wrq;
        AZ(pthread_cond_signal(&w->cond));
        return (0);
    }

    /* If we have too much in the overflow already, refuse. */
    if (qp->nqueue > ovfl_max) {
        qp->ndrop++;
        Lck_Unlock(&qp->mtx);
        return (-1);
    }

    VTAILQ_INSERT_TAIL(&qp->overflow, wrq, list);
    qp->noverflow++;
    qp->nqueue++;
    Lck_Unlock(&qp->mtx);
    AZ(pthread_cond_signal(&herder_cond));
    return (0);
}

/*--------------------------------------------------------------------*/

static void
wrk_do_cnt_sess(struct worker *w, void *priv)
{
    struct sess *sess;

    CAST_OBJ_NOTNULL(sess, priv, SESS_MAGIC);
    THR_SetSession(sess);
    sess->wrk = w;
    CHECK_OBJ_ORNULL(w->nobj, OBJECT_MAGIC);
    CHECK_OBJ_ORNULL(w->nobjhead, OBJHEAD_MAGIC);
    CNT_Session(sess);
    CHECK_OBJ_ORNULL(w->nobj, OBJECT_MAGIC);
    CHECK_OBJ_ORNULL(w->nobjhead, OBJHEAD_MAGIC);
    THR_SetSession(NULL);
}

/*--------------------------------------------------------------------*/

void
WRK_QueueSession(struct sess *sp)
{
    sp->workreq.func = wrk_do_cnt_sess;
    sp->workreq.priv = sp;
    if (WRK_Queue(&sp->workreq) == 0)
        return;

    /*
     * Couldn't queue it -- kill it.
     *
     * XXX: a notice might be polite, but would potentially
     * XXX: sleep whichever thread got us here
     */

    sp->t_end = TIM_real();
    vca_close_session(sp, "dropped");
    if (sp->vcl != NULL) {
        /*
         * A session parked on a busy object can come here
         * after it wakes up. Loose the VCL reference.
         */

        VCL_Rel(&sp->vcl);
    }
    SES_Delete(sp);
}

/*--------------------------------------------------------------------
 * Add (more) thread pools
 */


static void
wrk_addpools(const unsigned pools)
{
    struct wq **pwq, **owq;
    unsigned u;

    pwq = calloc(sizeof *pwq, pools);
    if (pwq == NULL)
        return;
    if (wq != NULL)
        memcpy(pwq, wq, sizeof *pwq * nwq);
    owq = wq;
    wq = pwq;
    for (u = nwq; u < pools; u++) {
        wq[u] = calloc(sizeof *wq[u], 1);
        XXXAN(wq[u]);
        wq[u]->magic = WQ_MAGIC;
        Lck_New(&wq[u]->mtx);
        VTAILQ_INIT(&wq[u]->overflow);
        VTAILQ_INIT(&wq[u]->idle);
    }
    (void)owq;    /* XXX: avoid race, leak it. */
    nwq = pools;
}

/*--------------------------------------------------------------------
 * If a thread is idle or excess, pick it out of the pool.
 */


static void
wrk_decimate_flock(struct wq *qp, double t_idle, struct varnish_stats *vs)
{
    struct worker *w = NULL;

    Lck_Lock(&qp->mtx);
    vs->n_wrk += qp->nthr;
    vs->n_wrk_queue += qp->nqueue;
    vs->n_wrk_drop += qp->ndrop;
    vs->n_wrk_overflow += qp->noverflow;

    if (qp->nthr > params->wthread_min) {
        w = VTAILQ_LAST(&qp->idle, workerhead);
        if (w != NULL && (w->lastused < t_idle || qp->nthr > nthr_max))
            VTAILQ_REMOVE(&qp->idle, w, list);
        else
            w = NULL;
    }
    Lck_Unlock(&qp->mtx);

    /* And give it a kiss on the cheek... */
    if (w != NULL) {
        AZ(w->wrq);
        AZ(pthread_cond_signal(&w->cond));
        TIM_sleep(params->wthread_purge_delay * 1e-3);
    }
}

/*--------------------------------------------------------------------
 * Periodic pool herding thread
 *
 * Do things which we can do at our leisure:
 * Add pools
 * Scale constants
 * Get rid of excess threads
 * Aggregate stats across pools
 */


static void *
wrk_herdtimer_thread(void *priv)
{
    volatile unsigned u;
    double t_idle;
    struct varnish_stats vsm, *vs;

    THR_SetName("wrk_herdtimer");

    memset(&vsm, 0, sizeof vsm);
    vs = &vsm;

    (void)priv;
    while (1) {
        /* Add Pools */
        u = params->wthread_pools;
        if (u > nwq)
            wrk_addpools(u);

        /* Scale parameters */

        u = params->wthread_max / nwq;
        if (u < params->wthread_min)
            u = params->wthread_min;
        nthr_max = u;

        ovfl_max = (nthr_max * params->overflow_max) / 100;

        vs->n_wrk = 0;
        vs->n_wrk_queue = 0;
        vs->n_wrk_drop = 0;
        vs->n_wrk_overflow = 0;

        t_idle = TIM_real() - params->wthread_timeout;
        for (u = 0; u < nwq; u++)
            wrk_decimate_flock(wq[u], t_idle, vs);

        VSL_stats->n_wrk= vs->n_wrk;
        VSL_stats->n_wrk_queue = vs->n_wrk_queue;
        VSL_stats->n_wrk_drop = vs->n_wrk_drop;
        VSL_stats->n_wrk_overflow = vs->n_wrk_overflow;

        TIM_sleep(params->wthread_purge_delay * 1e-3);
    }
}

/*--------------------------------------------------------------------
 * Create another thread, if necessary & possible
 */


static void
wrk_breed_flock(struct wq *qp)
{
    pthread_t tp;

    /*
     * If we need more threads, and have space, create
     * one more thread.
     */

    if (qp->nthr < params->wthread_min ||    /* Not enough threads yet */
     (qp->nqueue > params->wthread_add_threshold && /* more needed */
     qp->nqueue > qp->lqueue)) {    /* not getting better since last */
        if (qp->nthr >= nthr_max) {
            VSL_stats->n_wrk_max++;
        } else if (pthread_create(&tp, NULL, wrk_thread, qp)) {
            VSL(SLT_Debug, 0, "Create worker thread failed %d %s",
             errno, strerror(errno));
            VSL_stats->n_wrk_failed++;
            TIM_sleep(params->wthread_fail_delay * 1e-3);
        } else {
            AZ(pthread_detach(tp));
            VSL_stats->n_wrk_create++;
            TIM_sleep(params->wthread_add_delay * 1e-3);
        }
    }
    qp->lqueue = qp->nqueue;
}

/*--------------------------------------------------------------------
 * This thread wakes up whenever a pool overflows.
 *
 * The trick here is to not be too aggressive about creating threads.
 * We do this by only examining one pool at a time, and by sleeping
 * a short while whenever we create a thread and a little while longer
 * whenever we fail to, hopefully missing a lot of cond_signals in
 * the meantime.
 *
 * XXX: probably need a lot more work.
 *
 */


static void *
wrk_herder_thread(void *priv)
{
    unsigned u, w;

    THR_SetName("wrk_herder");
    (void)priv;
    while (1) {
        for (u = 0 ; u < nwq; u++) {
            /*
             * Make sure all pools have their minimum complement
             */

            for (w = 0 ; w < nwq; w++)
                while (wq[w]->nthr < params->wthread_min)
                    wrk_breed_flock(wq[w]);
            /*
             * We cannot avoid getting a mutex, so we have a
             * bogo mutex just for POSIX_STUPIDITY
             */

            Lck_Lock(&herder_mtx);
            Lck_CondWait(&herder_cond, &herder_mtx);
            Lck_Unlock(&herder_mtx);
            wrk_breed_flock(wq[u]);
        }
    }
}

/*--------------------------------------------------------------------*/

void
WRK_Init(void)
{
    pthread_t tp;

    AZ(pthread_cond_init(&herder_cond, NULL));
    Lck_New(&herder_mtx);

    wrk_addpools(params->wthread_pools);
    AZ(pthread_create(&tp, NULL, wrk_herdtimer_thread, NULL));
    AZ(pthread_detach(tp));
    AZ(pthread_create(&tp, NULL, wrk_herder_thread, NULL));
    AZ(pthread_detach(tp));
}

/*--------------------------------------------------------------------*/


阅读(657) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~