Chinaunix首页 | 论坛 | 博客
  • 博客访问: 837592
  • 博文数量: 756
  • 博客积分: 40000
  • 博客等级: 大将
  • 技术积分: 4980
  • 用 户 组: 普通用户
  • 注册时间: 2008-10-13 14:40
文章分类

全部博文(756)

文章存档

2011年(1)

2008年(755)

我的朋友

分类:

2008-10-13 16:12:04

基本上应该能看懂,其实用起来都是比select要简单,抛开select 吧,假如想编写高效的程序。
//comm_epoll.cc

#include "squid.h"
#include "Store.h"
#include "fde.h"

#ifdef USE_EPOLL

#define DEBUG_EPOLL 0

#include

static int kdpfd;
static int max_poll_time = 1000;

static struct epoll_event *pevents;

 

/* XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX */
/* Public functions */


/*
 * comm_select_init
 *
 * This is a needed exported function which will be called to initialise
 * the network loop code.
 */
void
comm_select_init(void)
{

    pevents = (struct epoll_event *) xmalloc(SQUID_MAXFD * sizeof(struct epoll_event));

    if (!pevents)
    {
        fatalf("comm_select_init: xmalloc() failed: %s\n",xstrerror());
    }

    kdpfd = epoll_create(SQUID_MAXFD);

    if (kdpfd < 0)
    {
        fatalf("comm_select_init: epoll_create(): %s\n",xstrerror());
    }
}

static const char* epolltype_atoi(int x)
{
    switch(x)
    {

    case EPOLL_CTL_ADD:
        return "EPOLL_CTL_ADD";

    case EPOLL_CTL_DEL:
        return "EPOLL_CTL_DEL";

    case EPOLL_CTL_MOD:
        return "EPOLL_CTL_MOD";

    default:
        return "UNKNOWN_EPOLLCTL_OP";
    }
}

/*
 * comm_setselect
 *
 * This is a needed exported function which will be called to register
 * and deregister interest in a pending IO state for a given FD.
 *
 */
void
commSetSelect(int fd, unsigned int type, PF * handler,
              void *client_data, time_t timeout)
{
    fde *F = &fd_table[fd];
    int epoll_ctl_type = 0;

    struct epoll_event ev;
    assert(fd >= 0);
    debug(5, DEBUG_EPOLL ? 0 : 8) ("commSetSelect(fd=%d,type=%u,handler=%p,client_data=%p,timeout=%ld)\n",
                                   fd,type,handler,client_data,timeout);

    ev.events = 0;
    ev.data.fd = fd;

    if (!F->flags.open)
    {
        epoll_ctl(kdpfd, EPOLL_CTL_DEL, fd, &ev);
        return;
    }

    // If read is an interest

    if (type & COMM_SELECT_READ)
    {
        if (handler)
            ev.events |= EPOLLIN;

        F->read_handler = handler;

        F->read_data = client_data;

        // Otherwise, use previously stored value
    }
    else if (F->epoll_state & EPOLLIN)
    {
        ev.events |= EPOLLIN;
    }

    // If write is an interest
    if (type & COMM_SELECT_WRITE)
    {
        if (handler)
            ev.events |= EPOLLOUT;

        F->write_handler = handler;

        F->write_data = client_data;

        // Otherwise, use previously stored value
    }
    else if (F->epoll_state & EPOLLOUT)
    {
        ev.events |= EPOLLOUT;
    }

    if (ev.events)
        ev.events |= EPOLLHUP | EPOLLERR;

    if (ev.events != F->epoll_state)
    {
        if (F->epoll_state) // already monitoring something.
            epoll_ctl_type = ev.events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
        else
            epoll_ctl_type = EPOLL_CTL_ADD;

        F->epoll_state = ev.events;

        if (epoll_ctl(kdpfd, epoll_ctl_type, fd, &ev) < 0)
        {
            debug(5, DEBUG_EPOLL ? 0 : 8) ("commSetSelect: epoll_ctl(,%s,,): failed on fd=%d: %s\n",
                                           epolltype_atoi(epoll_ctl_type), fd, xstrerror());
        }
    }

    if (timeout)
        F->timeout = squid_curtime + timeout;
}

/*
 * Check all connections for new connections and input data that is to be
 * processed. Also check for connections with data queued and whether we can
 * write it out.
 */

/*
 * comm_select
 *
 * Called to do the new-style IO, courtesy of of squid (like most of this
 * new IO code). This routine handles the stuff we've hidden in
 * comm_setselect and fd_table[] and calls callbacks for IO ready
 * events.
 */

comm_err_t
comm_select(int msec)
{
    int num, i,fd;
    fde *F;
    PF *hdl;

    struct epoll_event *cevents;
    static time_t last_timeout = 0;

    if (squid_curtime > last_timeout)
    {
        last_timeout = squid_curtime;
        checkTimeouts();
    }

    PROF_start(comm_check_incoming);

    if (msec > max_poll_time)
        msec = max_poll_time;

    for (;;)
    {
        num = epoll_wait(kdpfd, pevents, SQUID_MAXFD, msec);
        statCounter.select_loops++;

        if (num >= 0)
            break;

        if (ignoreErrno(errno))
            break;

        getCurrentTime();

        PROF_stop(comm_check_incoming);

        return COMM_ERROR;
    }

    PROF_stop(comm_check_incoming);
    getCurrentTime();

    statHistCount(&statCounter.select_fds_hist, num);

    if (num == 0)
        return COMM_TIMEOUT;  /* No error.. */

    PROF_start(comm_handle_ready_fd);

    for (i = 0, cevents = pevents; i < num; i++, cevents++)
    {
        fd = cevents->data.fd;
        F = &fd_table[fd];
        debug(5, DEBUG_EPOLL ? 0 : 8) ("comm_select(): got fd=%d events=%x monitoring=%x F->read_handler=%p F->write_handler=%p\n",
                                       fd,cevents->events,F->epoll_state,F->read_handler,F->write_handler);

        // TODO: add EPOLLPRI??

        if (cevents->events & (EPOLLIN|EPOLLHUP|EPOLLERR))
        {
            if ((hdl = F->read_handler) != NULL)
            {
                debug(5, DEBUG_EPOLL ? 0 : 8) ("comm_select(): Calling read handler on fd=%d\n",fd);
                PROF_start(comm_write_handler);
                F->read_handler = NULL;
                hdl(fd, F->read_data);
                PROF_stop(comm_write_handler);
                statCounter.select_fds++;
            }
            else
            {
                debug(5, DEBUG_EPOLL ? 0 : 8) ("comm_select(): no read handler for fd=%d\n",fd);
                fd_table[fd].flags.read_pending = 1;
                // remove interest since no handler exist for this event.
                commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
            }
        }

        if (cevents->events & (EPOLLOUT|EPOLLHUP|EPOLLERR))
        {
            if ((hdl = F->write_handler) != NULL)
            {
                debug(5, DEBUG_EPOLL ? 0 : 8) ("comm_select(): Calling write handler on fd=%d\n",fd);
                PROF_start(comm_read_handler);
                F->write_handler = NULL;
                hdl(fd, F->write_data);
                PROF_stop(comm_read_handler);
                statCounter.select_fds++;
            }
            else
            {
                fd_table[fd].flags.write_pending = 1;
                debug(5, DEBUG_EPOLL ? 0 : 8) ("comm_select(): no write handler for fd=%d\n",fd);
                // remove interest since no handler exist for this event.
                commSetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0);
            }
        }
    }

    PROF_stop(comm_handle_ready_fd);

    return COMM_OK;
}

void
comm_quick_poll_required(void)
{
    max_poll_time = 100;
}


 

//comm_kqueue.cc

#include "squid.h"
#include "Store.h"
#include "fde.h"

#ifdef USE_KQUEUE

#include


#define KE_LENGTH        128

/* jlemon goofed up and didn't add EV_SET until fbsd 4.3 */

#ifndef EV_SET
#define EV_SET(kevp, a, b, c, d, e, f) do {     \
        (kevp)->ident = (a);                    \
        (kevp)->filter = (b);                   \
        (kevp)->flags = (c);                    \
        (kevp)->fflags = (d);                   \
        (kevp)->data = (e);                     \
        (kevp)->udata = (f);                    \
} while(0)
#endif

static void kq_update_events(int, short, PF *);
static int kq;

static struct timespec zero_timespec;

static struct kevent *kqlst;        /* kevent buffer */
static int kqmax;                /* max structs to buffer */
static int kqoff;                /* offset into the buffer */
static int max_poll_time = 1000;


/* XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX */
/* Private functions */

void
kq_update_events(int fd, short filter, PF * handler)
{
    PF *cur_handler;
    int kep_flags;

#if 0

    int retval;
#endif

    switch (filter)
    {

    case EVFILT_READ:
        cur_handler = fd_table[fd].read_handler;
        break;

    case EVFILT_WRITE:
        cur_handler = fd_table[fd].write_handler;
        break;

    default:
        /* XXX bad! -- adrian */
        return;
        break;
    }

    if ((cur_handler == NULL && handler != NULL)
            || (cur_handler != NULL && handler == NULL))
    {

        struct kevent *kep;

        kep = kqlst + kqoff;

        if (handler != NULL)
        {
            kep_flags = (EV_ADD | EV_ONESHOT);
        }
        else
        {
            kep_flags = EV_DELETE;
        }

        EV_SET(kep, (uintptr_t) fd, filter, kep_flags, 0, 0, 0);

        if (kqoff == kqmax)
        {
            int ret;

            ret = kevent(kq, kqlst, kqoff, NULL, 0, &zero_timespec);
            /* jdc -- someone needs to do error checking... */

            if (ret == -1)
            {
                perror("kq_update_events(): kevent()");
                return;
            }

            kqoff = 0;
        }
        else
        {
            kqoff++;
        }

#if 0
        if (retval < 0)
        {
            /* Error! */

            if (ke.flags & EV_ERROR)
            {
                errno = ke.data;
            }
        }

#endif

    }
}

 

/* XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX */
/* Public functions */


/*
 * comm_select_init
 *
 * This is a needed exported function which will be called to initialise
 * the network loop code.
 */
void
comm_select_init(void)
{
    kq = kqueue();

    if (kq < 0)
    {
        fatal("comm_select_init: Couldn't open kqueue fd!\n");
    }

    kqmax = getdtablesize();

    kqlst = (struct kevent *)xmalloc(sizeof(*kqlst) * kqmax);
    zero_timespec.tv_sec = 0;
    zero_timespec.tv_nsec = 0;
}

/*
 * comm_setselect
 *
 * This is a needed exported function which will be called to register
 * and deregister interest in a pending IO state for a given FD.
 */
void
commSetSelect(int fd, unsigned int type, PF * handler,
              void *client_data, time_t timeout)
{
    fde *F = &fd_table[fd];
    assert(fd >= 0);
    assert(F->flags.open);

    if (type & COMM_SELECT_READ)
    {
        kq_update_events(fd, EVFILT_READ, handler);
        F->read_handler = handler;
        F->read_data = client_data;
    }

    if (type & COMM_SELECT_WRITE)
    {
        kq_update_events(fd, EVFILT_WRITE, handler);
        F->write_handler = handler;
        F->write_data = client_data;
    }

    if (timeout)
        F->timeout = squid_curtime + timeout;

}

/*
 * Check all connections for new connections and input data that is to be
 * processed. Also check for connections with data queued and whether we can
 * write it out.
 */

/*
 * comm_select
 *
 * Called to do the new-style IO, courtesy of of squid (like most of this
 * new IO code). This routine handles the stuff we've hidden in
 * comm_setselect and fd_table[] and calls callbacks for IO ready
 * events.
 */

comm_err_t
comm_select(int msec)
{
    int num, i;

    static struct kevent ke[KE_LENGTH];

    struct timespec poll_time;

    /*
     * remember we are doing NANOseconds here, not micro/milli. God knows
     * why jlemon used a timespec, but hey, he wrote the interface, not I
     *   -- Adrian
     */

    if (msec > max_poll_time)
        msec = max_poll_time;

    poll_time.tv_sec = msec / 1000;

    poll_time.tv_nsec = (msec % 1000) * 1000000;

    for (;;)
    {
        num = kevent(kq, kqlst, kqoff, ke, KE_LENGTH, &poll_time);
        statCounter.select_loops++;
        kqoff = 0;

        if (num >= 0)
            break;

        if (ignoreErrno(errno))
            break;

        getCurrentTime();

        return COMM_ERROR;

        /* NOTREACHED */
    }

    getCurrentTime();

    if (num == 0)
        return COMM_OK;  /* No error.. */

    for (i = 0; i < num; i++)
    {
        int fd = (int) ke[i].ident;
        PF *hdl = NULL;
        fde *F = &fd_table[fd];

        if (ke[i].flags & EV_ERROR)
        {
            errno = ke[i].data;
            /* XXX error == bad! -- adrian */
            continue;        /* XXX! */
        }

        switch (ke[i].filter)
        {

        case EVFILT_READ:

            if ((hdl = F->read_handler) != NULL)
            {
                F->read_handler = NULL;
                hdl(fd, F->read_data);
            }

            break;

        case EVFILT_WRITE:

            if ((hdl = F->write_handler) != NULL)
            {
                F->write_handler = NULL;
                hdl(fd, F->write_data);
            }

            break;

        default:
            /* Bad! -- adrian */
            debug(5, 1) ("comm_select: kevent returned %d!\n", ke[i].filter);
            break;
        }
    }

    return COMM_OK;
}

void
comm_quick_poll_required(void)
{
    max_poll_time = 100;
}


--------------------next---------------------

阅读(535) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~