分类:
2008-10-13 16:12:04
基本上应该能看懂,其实用起来都是比select要简单,抛开select 吧,假如想编写高效的程序。
//comm_epoll.cc
#include "squid.h"
#include "Store.h"
#include "fde.h"
#ifdef USE_EPOLL
#define DEBUG_EPOLL 0
#include
static int kdpfd;
static int max_poll_time = 1000;
static struct epoll_event *pevents;
/* XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX */
/* Public functions */
/*
* comm_select_init
*
* This is a needed exported function which will be called to initialise
* the network loop code.
*/
void
comm_select_init(void)
{
pevents = (struct epoll_event *) xmalloc(SQUID_MAXFD * sizeof(struct epoll_event));
if (!pevents)
{
fatalf("comm_select_init: xmalloc() failed: %s\n",xstrerror());
}
kdpfd = epoll_create(SQUID_MAXFD);
if (kdpfd < 0)
{
fatalf("comm_select_init: epoll_create(): %s\n",xstrerror());
}
}
static const char* epolltype_atoi(int x)
{
switch(x)
{
case EPOLL_CTL_ADD:
return "EPOLL_CTL_ADD";
case EPOLL_CTL_DEL:
return "EPOLL_CTL_DEL";
case EPOLL_CTL_MOD:
return "EPOLL_CTL_MOD";
default:
return "UNKNOWN_EPOLLCTL_OP";
}
}
/*
* comm_setselect
*
* This is a needed exported function which will be called to register
* and deregister interest in a pending IO state for a given FD.
*
*/
void
commSetSelect(int fd, unsigned int type, PF * handler,
void *client_data, time_t timeout)
{
fde *F = &fd_table[fd];
int epoll_ctl_type = 0;
struct epoll_event ev;
assert(fd >= 0);
debug(5, DEBUG_EPOLL ? 0 : 8) ("commSetSelect(fd=%d,type=%u,handler=%p,client_data=%p,timeout=%ld)\n",
fd,type,handler,client_data,timeout);
ev.events = 0;
ev.data.fd = fd;
if (!F->flags.open)
{
epoll_ctl(kdpfd, EPOLL_CTL_DEL, fd, &ev);
return;
}
// If read is an interest
if (type & COMM_SELECT_READ)
{
if (handler)
ev.events |= EPOLLIN;
F->read_handler = handler;
F->read_data = client_data;
// Otherwise, use previously stored value
}
else if (F->epoll_state & EPOLLIN)
{
ev.events |= EPOLLIN;
}
// If write is an interest
if (type & COMM_SELECT_WRITE)
{
if (handler)
ev.events |= EPOLLOUT;
F->write_handler = handler;
F->write_data = client_data;
// Otherwise, use previously stored value
}
else if (F->epoll_state & EPOLLOUT)
{
ev.events |= EPOLLOUT;
}
if (ev.events)
ev.events |= EPOLLHUP | EPOLLERR;
if (ev.events != F->epoll_state)
{
if (F->epoll_state) // already monitoring something.
epoll_ctl_type = ev.events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
else
epoll_ctl_type = EPOLL_CTL_ADD;
F->epoll_state = ev.events;
if (epoll_ctl(kdpfd, epoll_ctl_type, fd, &ev) < 0)
{
debug(5, DEBUG_EPOLL ? 0 : 8) ("commSetSelect: epoll_ctl(,%s,,): failed on fd=%d: %s\n",
epolltype_atoi(epoll_ctl_type), fd, xstrerror());
}
}
if (timeout)
F->timeout = squid_curtime + timeout;
}
/*
* Check all connections for new connections and input data that is to be
* processed. Also check for connections with data queued and whether we can
* write it out.
*/
/*
* comm_select
*
* Called to do the new-style IO, courtesy of of squid (like most of this
* new IO code). This routine handles the stuff we've hidden in
* comm_setselect and fd_table[] and calls callbacks for IO ready
* events.
*/
comm_err_t
comm_select(int msec)
{
int num, i,fd;
fde *F;
PF *hdl;
struct epoll_event *cevents;
static time_t last_timeout = 0;
if (squid_curtime > last_timeout)
{
last_timeout = squid_curtime;
checkTimeouts();
}
PROF_start(comm_check_incoming);
if (msec > max_poll_time)
msec = max_poll_time;
for (;;)
{
num = epoll_wait(kdpfd, pevents, SQUID_MAXFD, msec);
statCounter.select_loops++;
if (num >= 0)
break;
if (ignoreErrno(errno))
break;
getCurrentTime();
PROF_stop(comm_check_incoming);
return COMM_ERROR;
}
PROF_stop(comm_check_incoming);
getCurrentTime();
statHistCount(&statCounter.select_fds_hist, num);
if (num == 0)
return COMM_TIMEOUT; /* No error.. */
PROF_start(comm_handle_ready_fd);
for (i = 0, cevents = pevents; i < num; i++, cevents++)
{
fd = cevents->data.fd;
F = &fd_table[fd];
debug(5, DEBUG_EPOLL ? 0 : 8) ("comm_select(): got fd=%d events=%x monitoring=%x F->read_handler=%p F->write_handler=%p\n",
fd,cevents->events,F->epoll_state,F->read_handler,F->write_handler);
// TODO: add EPOLLPRI??
if (cevents->events & (EPOLLIN|EPOLLHUP|EPOLLERR))
{
if ((hdl = F->read_handler) != NULL)
{
debug(5, DEBUG_EPOLL ? 0 : 8) ("comm_select(): Calling read handler on fd=%d\n",fd);
PROF_start(comm_write_handler);
F->read_handler = NULL;
hdl(fd, F->read_data);
PROF_stop(comm_write_handler);
statCounter.select_fds++;
}
else
{
debug(5, DEBUG_EPOLL ? 0 : 8) ("comm_select(): no read handler for fd=%d\n",fd);
fd_table[fd].flags.read_pending = 1;
// remove interest since no handler exist for this event.
commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
}
}
if (cevents->events & (EPOLLOUT|EPOLLHUP|EPOLLERR))
{
if ((hdl = F->write_handler) != NULL)
{
debug(5, DEBUG_EPOLL ? 0 : 8) ("comm_select(): Calling write handler on fd=%d\n",fd);
PROF_start(comm_read_handler);
F->write_handler = NULL;
hdl(fd, F->write_data);
PROF_stop(comm_read_handler);
statCounter.select_fds++;
}
else
{
fd_table[fd].flags.write_pending = 1;
debug(5, DEBUG_EPOLL ? 0 : 8) ("comm_select(): no write handler for fd=%d\n",fd);
// remove interest since no handler exist for this event.
commSetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0);
}
}
}
PROF_stop(comm_handle_ready_fd);
return COMM_OK;
}
void
comm_quick_poll_required(void)
{
max_poll_time = 100;
}
//comm_kqueue.cc
#include "squid.h"
#include "Store.h"
#include "fde.h"
#ifdef USE_KQUEUE
#include
#define KE_LENGTH 128
/* jlemon goofed up and didn't add EV_SET until fbsd 4.3 */
#ifndef EV_SET
#define EV_SET(kevp, a, b, c, d, e, f) do { \
(kevp)->ident = (a); \
(kevp)->filter = (b); \
(kevp)->flags = (c); \
(kevp)->fflags = (d); \
(kevp)->data = (e); \
(kevp)->udata = (f); \
} while(0)
#endif
static void kq_update_events(int, short, PF *);
static int kq;
static struct timespec zero_timespec;
static struct kevent *kqlst; /* kevent buffer */
static int kqmax; /* max structs to buffer */
static int kqoff; /* offset into the buffer */
static int max_poll_time = 1000;
/* XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX */
/* Private functions */
void
kq_update_events(int fd, short filter, PF * handler)
{
PF *cur_handler;
int kep_flags;
#if 0
int retval;
#endif
switch (filter)
{
case EVFILT_READ:
cur_handler = fd_table[fd].read_handler;
break;
case EVFILT_WRITE:
cur_handler = fd_table[fd].write_handler;
break;
default:
/* XXX bad! -- adrian */
return;
break;
}
if ((cur_handler == NULL && handler != NULL)
|| (cur_handler != NULL && handler == NULL))
{
struct kevent *kep;
kep = kqlst + kqoff;
if (handler != NULL)
{
kep_flags = (EV_ADD | EV_ONESHOT);
}
else
{
kep_flags = EV_DELETE;
}
EV_SET(kep, (uintptr_t) fd, filter, kep_flags, 0, 0, 0);
if (kqoff == kqmax)
{
int ret;
ret = kevent(kq, kqlst, kqoff, NULL, 0, &zero_timespec);
/* jdc -- someone needs to do error checking... */
if (ret == -1)
{
perror("kq_update_events(): kevent()");
return;
}
kqoff = 0;
}
else
{
kqoff++;
}
#if 0
if (retval < 0)
{
/* Error! */
if (ke.flags & EV_ERROR)
{
errno = ke.data;
}
}
#endif
}
}
/* XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX */
/* Public functions */
/*
* comm_select_init
*
* This is a needed exported function which will be called to initialise
* the network loop code.
*/
void
comm_select_init(void)
{
kq = kqueue();
if (kq < 0)
{
fatal("comm_select_init: Couldn't open kqueue fd!\n");
}
kqmax = getdtablesize();
kqlst = (struct kevent *)xmalloc(sizeof(*kqlst) * kqmax);
zero_timespec.tv_sec = 0;
zero_timespec.tv_nsec = 0;
}
/*
* comm_setselect
*
* This is a needed exported function which will be called to register
* and deregister interest in a pending IO state for a given FD.
*/
void
commSetSelect(int fd, unsigned int type, PF * handler,
void *client_data, time_t timeout)
{
fde *F = &fd_table[fd];
assert(fd >= 0);
assert(F->flags.open);
if (type & COMM_SELECT_READ)
{
kq_update_events(fd, EVFILT_READ, handler);
F->read_handler = handler;
F->read_data = client_data;
}
if (type & COMM_SELECT_WRITE)
{
kq_update_events(fd, EVFILT_WRITE, handler);
F->write_handler = handler;
F->write_data = client_data;
}
if (timeout)
F->timeout = squid_curtime + timeout;
}
/*
* Check all connections for new connections and input data that is to be
* processed. Also check for connections with data queued and whether we can
* write it out.
*/
/*
* comm_select
*
* Called to do the new-style IO, courtesy of of squid (like most of this
* new IO code). This routine handles the stuff we've hidden in
* comm_setselect and fd_table[] and calls callbacks for IO ready
* events.
*/
comm_err_t
comm_select(int msec)
{
int num, i;
static struct kevent ke[KE_LENGTH];
struct timespec poll_time;
/*
* remember we are doing NANOseconds here, not micro/milli. God knows
* why jlemon used a timespec, but hey, he wrote the interface, not I
* -- Adrian
*/
if (msec > max_poll_time)
msec = max_poll_time;
poll_time.tv_sec = msec / 1000;
poll_time.tv_nsec = (msec % 1000) * 1000000;
for (;;)
{
num = kevent(kq, kqlst, kqoff, ke, KE_LENGTH, &poll_time);
statCounter.select_loops++;
kqoff = 0;
if (num >= 0)
break;
if (ignoreErrno(errno))
break;
getCurrentTime();
return COMM_ERROR;
/* NOTREACHED */
}
getCurrentTime();
if (num == 0)
return COMM_OK; /* No error.. */
for (i = 0; i < num; i++)
{
int fd = (int) ke[i].ident;
PF *hdl = NULL;
fde *F = &fd_table[fd];
if (ke[i].flags & EV_ERROR)
{
errno = ke[i].data;
/* XXX error == bad! -- adrian */
continue; /* XXX! */
}
switch (ke[i].filter)
{
case EVFILT_READ:
if ((hdl = F->read_handler) != NULL)
{
F->read_handler = NULL;
hdl(fd, F->read_data);
}
break;
case EVFILT_WRITE:
if ((hdl = F->write_handler) != NULL)
{
F->write_handler = NULL;
hdl(fd, F->write_data);
}
break;
default:
/* Bad! -- adrian */
debug(5, 1) ("comm_select: kevent returned %d!\n", ke[i].filter);
break;
}
}
return COMM_OK;
}
void
comm_quick_poll_required(void)
{
max_poll_time = 100;
}