C10K问题的解决

C10K是每个写服务器端的人都会遇到的难题,就是如何使单机支持10K并发连接,并使得每个连接的反应时间都是在合理的范围内。有一份测评报告,可以看出Redis可以支持到64k的并发(需要修改源文件重新编译)。

x轴是并发连接数,y轴是每秒能处理的请求数。

我们都知道在linux下面解决这个问题,不外乎使用select/poll或者是epoll。Redis只使用了一个进程来处理所有的连接,所以肯定不能使用Blocking IO,所有的IO读写操作都必须是非阻塞的。Redis没有使用第三方的库libevent或者libev,我想主要原因还是作者的洁癖,不想有dependency。作者有具体阐述原因,看。事实证明,在linux支持高并发连接相当容易。不过由于Redis只有一个主进程/线程,只能使用CPU的一个核,不能充分利用多核的性能。

Redis自己实现了一个轻量级的event library,主要在ae.h/ae.c中,对select/epoll/kqueue的封装分别在ae_select.c,ae_epoll.c和ae_kqueue中,至于到底用哪一个由编译的宏决定。

ae.c

#ifdef HAVE_EPOLL

#include “ae_epoll.c”

#else

#ifdef HAVE_KQUEUE

#include “ae_kqueue.c”

#else

#include “ae_select.c”

#endif

#endif

可以看出是默认使用epoll作为模型,所以我们这里也只研究对epoll的支持.

先看到初始化部分。

首先先看几个重要的结构

aeEventLoop:

/*Redis默认支持最多10K连接*/

#define AE_SETSIZE (1024*10) /* Max number of fd supported */

/* State of an event based program */

typedef struct aeEventLoop {

int maxfd;

long long timeEventNextId;

aeFileEvent events[AE_SETSIZE]; /* Registered events */

aeFiredEvent fired[AE_SETSIZE]; /* Fired events */

aeTimeEvent *timeEventHead;

int stop;

void *apidata; /* This is used for polling API specific data */

aeBeforeSleepProc *beforesleep;

} aeEventLoop;

aeFileEvent:

/* File event structure */

typedef struct aeFileEvent {

int mask; /* one of AE_(READABLE|WRITABLE) */

aeFileProc *rfileProc;

aeFileProc *wfileProc;

void *clientData;

} aeFileEvent;

aeFiredEvent:

/* A fired event */

typedef struct aeFiredEvent {

int fd;

int mask;

} aeFiredEvent;

aeTimeEvent:

/* Time event structure */

typedef struct aeTimeEvent {

long long id; /* time event identifier. */

long when_sec; /* seconds */

long when_ms; /* milliseconds */

aeTimeProc *timeProc;

aeEventFinalizerProc *finalizerProc;

void *clientData;

struct aeTimeEvent *next;

} aeTimeEvent;

1.初始化event loop

server.el = aeCreateEventLoop();

aeEventLoop *aeCreateEventLoop(void) {

aeEventLoop *eventLoop;

int i;

eventLoop = zmalloc(sizeof(*eventLoop));

if (!eventLoop) return NULL;

eventLoop->timeEventHead = NULL;

eventLoop->timeEventNextId = 0;

eventLoop->stop = 0;

eventLoop->maxfd = -1;

eventLoop->beforesleep = NULL;

if (aeApiCreate(eventLoop) == -1) {

zfree(eventLoop);

return NULL;

}

/* Events with mask == AE_NONE are not set. So let’s initialize the

* vector with it. */

for (i = 0; i < AE_SETSIZE; i++)

eventLoop->events[i].mask = AE_NONE;

return eventLoop;

}

static int aeApiCreate(aeEventLoop *eventLoop) {

aeApiState *state = zmalloc(sizeof(aeApiState));

if (!state) return -1;

state->epfd = epoll_create(1024); /* 1024 is just an hint for the kernel */

if (state->epfd == -1) return -1;

eventLoop->apidata = state;

return 0;

}

2.创建socket,并listen在6379 port by default ,注意这里设置了SO_REUSEADDR标志.

server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr);

int anetTcpServer(char *err, int port, char *bindaddr)

{

int s;

struct sockaddr_in sa;

if ((s = anetCreateSocket(err,AF_INET)) == ANET_ERR)

return ANET_ERR;

memset(&sa,0,sizeof(sa));

sa.sin_family = AF_INET;

sa.sin_port = htons(port);

sa.sin_addr.s_addr = htonl(INADDR_ANY);

if (bindaddr && inet_aton(bindaddr, &sa.sin_addr) == 0) {

anetSetError(err, “Invalid bind address\n”);

close(s);

return ANET_ERR;

}

if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa)) == ANET_ERR)

return ANET_ERR;

return s;

}

static int anetCreateSocket(char *err, int domain) {

int s, on = 1;

if ((s = socket(domain, SOCK_STREAM, 0)) == -1) {

anetSetError(err, “creating socket: %s\n”, strerror(errno));

return ANET_ERR;

}

/* Make sure connection-intensive things like the redis benckmark

* will be able to close/open sockets a zillion of times */

if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {

anetSetError(err, “setsockopt SO_REUSEADDR: %s\n”, strerror(errno));

return ANET_ERR;

}

return s;

}

static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len) {

if (bind(s,sa,len) == -1) {

anetSetError(err, “bind: %s\n”, strerror(errno));

close(s);

return ANET_ERR;

}

if (listen(s, 511) == -1) { /* the magic 511 constant is from nginx */

anetSetError(err, “listen: %s\n”, strerror(errno));

close(s);

return ANET_ERR;

}

return ANET_OK;

}

3.创建TimeEvent

TimeEvent是一个链表结构。

ServerCron是一个定时任务,用于处理一堆杂事。

aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,

aeTimeProc *proc, void *clientData,

aeEventFinalizerProc *finalizerProc)

{

long long id = eventLoop->timeEventNextId++;

aeTimeEvent *te;

te = zmalloc(sizeof(*te));

if (te == NULL) return AE_ERR;

te->id = id;

aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);

te->timeProc = proc;

te->finalizerProc = finalizerProc;

te->clientData = clientData;

te->next = eventLoop->timeEventHead;

eventLoop->timeEventHead = te;

return id;

}

4. 创建FileEvent,把之前创建的socket绑到epoll上,并设置 acceptTcpHandler为处理函数。

if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,

acceptTcpHandler,NULL) == AE_ERR) oom(“creating file event”);

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,

aeFileProc *proc, void *clientData)

{

if (fd >= AE_SETSIZE) return AE_ERR;

aeFileEvent *fe = &eventLoop->events[fd];

if (aeApiAddEvent(eventLoop, fd, mask) == -1)

return AE_ERR;

fe->mask |= mask;

if (mask & AE_READABLE) fe->rfileProc = proc;

if (mask & AE_WRITABLE) fe->wfileProc = proc;

fe->clientData = clientData;

if (fd > eventLoop->maxfd)

eventLoop->maxfd = fd;

return AE_OK;

}

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {

aeApiState *state = eventLoop->apidata;

struct epoll_event ee;

/* If the fd was already monitored for some event, we need a MOD

* operation. Otherwise we need an ADD operation. */

int op = eventLoop->events[fd].mask == AE_NONE ?

EPOLL_CTL_ADD : EPOLL_CTL_MOD;

ee.events = 0;

mask |= eventLoop->events[fd].mask; /* Merge old events */

if (mask & AE_READABLE) ee.events |= EPOLLIN;

if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;

ee.data.u64 = 0; /* avoid valgrind warning */

ee.data.fd = fd;

if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;

return 0;

}

接下来我们看看事件处理的流程。

aeMain(server.el);

void aeMain(aeEventLoop *eventLoop) {

eventLoop->stop = 0;

while (!eventLoop->stop) {

if (eventLoop->beforesleep != NULL)

eventLoop->beforesleep(eventLoop);

aeProcessEvents(eventLoop, AE_ALL_EVENTS);

}

}

事件处理,可以看到调用epoll_wait得到有事件的句柄之后,然后使用callback函数调用相应的读写函数。

int aeProcessEvents(aeEventLoop *eventLoop, int flags)

{

int processed = 0, numevents;

shortest = aeSearchNearestTimer(eventLoop);

numevents = aeApiPoll(eventLoop, tvp);

for (j = 0; j < numevents; j++) {

aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

int mask = eventLoop->fired[j].mask;

int fd = eventLoop->fired[j].fd;

int rfired = 0;

/* note the fe->mask & mask & … code: maybe an already processed

* event removed an element that fired and we still didn’t

* processed, so we check if the event is still valid. */

if (fe->mask & mask & AE_READABLE) {

rfired = 1;

fe->rfileProc(eventLoop,fd,fe->clientData,mask);

}

if (fe->mask & mask & AE_WRITABLE) {

if (!rfired || fe->wfileProc != fe->rfileProc)

fe->wfileProc(eventLoop,fd,fe->clientData,mask);

}

processed++;

}

}

/* Check time events */

if (flags & AE_TIME_EVENTS)

processed += processTimeEvents(eventLoop);

return processed; /* return the number of processed file/time events */

}

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {

aeApiState *state = eventLoop->apidata;

int retval, numevents = 0;

retval = epoll_wait(state->epfd,state->events,AE_SETSIZE,

tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);

if (retval > 0) {

int j;

numevents = retval;

for (j = 0; j < numevents; j++) {

int mask = 0;

struct epoll_event *e = state->events+j;

if (e->events & EPOLLIN) mask |= AE_READABLE;

if (e->events & EPOLLOUT) mask |= AE_WRITABLE;

eventLoop->fired[j].fd = e->data.fd;

eventLoop->fired[j].mask = mask;

}

}

return numevents;

}

来具体看看callback函数在做什么。

acceptTcpHandler是用于处理新的客户连接请求的。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {

int cport, cfd;

char cip[128];

REDIS_NOTUSED(el);

REDIS_NOTUSED(mask);

REDIS_NOTUSED(privdata);

cfd = anetTcpAccept(server.neterr, fd, cip, &cport);

if (cfd == AE_ERR) {

redisLog(REDIS_VERBOSE,”Accepting client connection: %s”, server.neterr);

return;

}

redisLog(REDIS_VERBOSE,”Accepted %s:%d”, cip, cport);

acceptCommonHandler(cfd);

}

static void acceptCommonHandler(int fd) {

redisClient *c;

if ((c = createClient(fd)) == NULL) {

redisLog(REDIS_WARNING,”Error allocating resoures for the client”);

close(fd); /* May be already closed, just ingore errors */

return;

}

/* If maxclient directive is set and this is one client more… close the

* connection. Note that we create the client instead to check before

* for this condition, since now the socket is already set in nonblocking

* mode and we can send an error for free using the Kernel I/O */

if (server.maxclients && listLength(server.clients) > server.maxclients) {

char *err = “-ERR max number of clients reached\r\n”;

/* That’s a best effort error message, don’t check write errors */

if (write(c->fd,err,strlen(err)) == -1) {

/* Nothing to do, Just to avoid the warning… */

}

freeClient(c);

return;

}

server.stat_numconnections++;

}

创建一个新的client,注意这里把socket设为non block和no delay(减少latency)

redisClient *createClient(int fd) {

redisClient *c = zmalloc(sizeof(redisClient));

c->bufpos = 0;

anetNonBlock(NULL,fd);

anetTcpNoDelay(NULL,fd);

if (!c) return NULL;

if (aeCreateFileEvent(server.el,fd,AE_READABLE,

readQueryFromClient, c) == AE_ERR)

{

close(fd);

zfree(c);

return NULL;

}

}

readQueryFromClient用于处理从客户端发送的数据。

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {

redisClient *c = (redisClient*) privdata;

char buf[REDIS_IOBUF_LEN];

int nread;

REDIS_NOTUSED(el);

REDIS_NOTUSED(mask);

nread = read(fd, buf, REDIS_IOBUF_LEN);

if (nread == -1) {

if (errno == EAGAIN) {

nread = 0;

} else {

redisLog(REDIS_VERBOSE, “Reading from client: %s”,strerror(errno));

freeClient(c);

return;

}

} else if (nread == 0) {

redisLog(REDIS_VERBOSE, “Client closed connection”);

freeClient(c);

return;

}

if (nread) {

c->querybuf = sdscatlen(c->querybuf,buf,nread);

c->lastinteraction = time(NULL);

} else {

return;

}

processInputBuffer(c);

}