/**
* The structure representing a connection into memcached. 定义在memcached.h中
*/
typedef struct conn conn;
struct conn {
int sfd;
sasl_conn_t *sasl_conn;
enum conn_states state;
enum bin_substates substate;
struct event event;
short ev_flags;
short which; /** which events were just triggered */
char *rbuf; /** buffer to read commands into */
char *rcurr; /** but if we parsed some already, this is where we stopped */
int rsize; /** total allocated size of rbuf */
int rbytes; /** how much data, starting from rcur, do we have unparsed */
char *wbuf;
char *wcurr;
int wsize;
int wbytes;
/** which state to go into after finishing current write */
enum conn_states write_and_go;
void *write_and_free; /** free this memory after finishing writing */
char *ritem; /** when we read in an item's value, it goes here */
int rlbytes;
/* data for the nread state */
/**
* item is used to hold an item structure created after reading the command
* line of set/add/replace commands, but before we finished reading the actual
* data. The data is read into ITEM_data(item) to avoid extra copying.
*/
void *item; /* for commands set/add/replace */
/* data for the swallow state */
int sbytes; /* how many bytes to swallow */
/* data for the mwrite state */
struct iovec *iov;
int iovsize; /* number of elements allocated in iov[] */
int iovused; /* number of elements used in iov[] */
struct msghdr *msglist;
int msgsize; /* number of elements allocated in msglist[] */
int msgused; /* number of elements used in msglist[] */
int msgcurr; /* element in msglist[] being transmitted now */
int msgbytes; /* number of bytes in current msg */
item **ilist; /* list of items to write out */
int isize;
item **icurr;
int ileft;
char **suffixlist;
int suffixsize;
char **suffixcurr;
int suffixleft;
enum protocol protocol; /* which protocol this connection speaks */
enum network_transport transport; /* what transport is used by this connection */
/* data for UDP clients */
int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
struct sockaddr request_addr; /* Who sent the most recent request */
socklen_t request_addr_size;
unsigned char *hdrbuf; /* udp packet headers */
int hdrsize; /* number of headers' worth of space is allocated */
bool noreply; /* True if the reply should not be sent. */
/* current stats command */
struct {
char *buffer;
size_t size;
size_t offset;
} stats;
/* Binary protocol stuff */
/* This is where the binary header goes */
protocol_binary_request_header binary_header;
uint64_t cas; /* the cas to return */
short cmd; /* current command being processed */
int opaque;
int keylen;
conn *next; /* Used for generating a list of conn structures */
LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
};
/*
* Free list management for connections. thread.h中
*/
static conn **freeconns; //conn的freelist
static int freetotal;
static int freecurr;
/* Lock for connection freelist */
static pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
static void conn_init(void) {
freetotal = 200;
freecurr = 0;
if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL) {
fprintf(stderr, "Failed to allocate connection structures\n");
}
return;
}
/*
* Adds a connection to the freelist. 0 = success.
*/
bool conn_add_to_freelist(conn *c) {
bool ret = true;
pthread_mutex_lock(&conn_lock);
if (freecurr < freetotal) {
freeconns[freecurr++] = c;
ret = false;
} else { //否则需要增长链表长度, memcached中好多个地方都是这样增长list
/* try to enlarge free connections array */
size_t newsize = freetotal * 2;
conn **new_freeconns = realloc(freeconns, sizeof(conn *) * newsize);
if (new_freeconns) {
freetotal = newsize;
freeconns = new_freeconns;
freeconns[freecurr++] = c;
ret = false;
}
}
pthread_mutex_unlock(&conn_lock);
return ret;
}
static const char *prot_text(enum protocol prot) {
char *rv = "unknown";
switch(prot) {
case ascii_prot:
rv = "ascii";
break;
case binary_prot:
rv = "binary";
break;
case negotiating_prot:
rv = "auto-negotiate";
break;
}
return rv;
}
//核心函数之一,每个worker_pthread在get一个CQ_ITEM都都会调用。。。
conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base) {
conn *c = conn_from_freelist();
if (NULL == c) {
if (!(c = (conn *)calloc(1, sizeof(conn)))) {
fprintf(stderr, "calloc()\n");
return NULL;
}
MEMCACHED_CONN_CREATE(c);
c->rbuf = c->wbuf = 0;
c->ilist = 0;
c->suffixlist = 0;
c->iov = 0;
c->msglist = 0;
c->hdrbuf = 0;
c->rsize = read_buffer_size;
c->wsize = DATA_BUFFER_SIZE; //2048
c->isize = ITEM_LIST_INITIAL; //200
c->suffixsize = SUFFIX_LIST_INITIAL; //20
c->iovsize = IOV_LIST_INITIAL; //400
c->msgsize = MSG_LIST_INITIAL; //10
c->hdrsize = 0;
c->rbuf = (char *)malloc((size_t)c->rsize);
c->wbuf = (char *)malloc((size_t)c->wsize);
c->ilist = (item **)malloc(sizeof(item *) * c->isize);
c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
c->msglist == 0 || c->suffixlist == 0) {
conn_free(c);
fprintf(stderr, "malloc()\n");
return NULL;
}
STATS_LOCK();
stats.conn_structs++;
STATS_UNLOCK();
}
/* 定义在memcached.h中
enum network_transport {
local_transport, /* Unix sockets*/
tcp_transport,
udp_transport
};*/
c->transport = transport;
c->protocol = settings.binding_protocol;
/* unix socket mode doesn't need this, so zeroed out. but why
* is this done for every command? presumably for UDP
* mode. */
if (!settings.socketpath) {
c->request_addr_size = sizeof(c->request_addr);
} else {
c->request_addr_size = 0;
}
if (settings.verbose > 1) {
if (init_state == conn_listening) {
fprintf(stderr, "<%d server listening (%s)\n", sfd,
prot_text(c->protocol));
} else if (IS_UDP(transport)) {
fprintf(stderr, "<%d server listening (udp)\n", sfd);
} else if (c->protocol == negotiating_prot) {
fprintf(stderr, "<%d new auto-negotiating client connection\n",
sfd);
} else if (c->protocol == ascii_prot) {
fprintf(stderr, "<%d new ascii client connection.\n", sfd);
} else if (c->protocol == binary_prot) {
fprintf(stderr, "<%d new binary client connection.\n", sfd);
} else {
fprintf(stderr, "<%d new unknown (%d) client connection\n",
sfd, c->protocol);
assert(false);
}
}
c->sfd = sfd;
c->state = init_state;
c->rlbytes = 0;
c->cmd = -1;
c->rbytes = c->wbytes = 0;
c->wcurr = c->wbuf;
c->rcurr = c->rbuf;
c->ritem = 0;
c->icurr = c->ilist;
c->suffixcurr = c->suffixlist;
c->ileft = 0;
c->suffixleft = 0;
c->iovused = 0;
c->msgcurr = 0;
c->msgused = 0;
c->write_and_go = init_state;
c->write_and_free = 0;
c->item = 0;
c->noreply = false;
//将本connection的thread注册到worker_thread的event_base中,event_flag来源于CQ_ITEM的event_flag, 每个connection事件发生时会回调event_handler函数
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
if (event_add(&c->event, 0) == -1) { //never timeout
if (conn_add_to_freelist(c)) {
conn_free(c);
}
perror("event_add");
return NULL;
}
STATS_LOCK();
stats.curr_conns++;
stats.total_conns++;
STATS_UNLOCK();
MEMCACHED_CONN_ALLOCATE(c->sfd);
return c;
}
static void conn_cleanup(conn *c) {
assert(c != NULL);
if (c->item) {
item_remove(c->item);
c->item = 0;
}
if (c->ileft != 0) {
for (; c->ileft > 0; c->ileft--,c->icurr++) {
item_remove(*(c->icurr));
}
}
if (c->suffixleft != 0) {
for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
cache_free(c->thread->suffix_cache, *(c->suffixcurr));
}
}
if (c->write_and_free) {
free(c->write_and_free);
c->write_and_free = 0;
}
if (c->sasl_conn) {
assert(settings.sasl);
sasl_dispose(&c->sasl_conn);
c->sasl_conn = NULL;
}
}
/*
* Frees a connection.
*/
void conn_free(conn *c) {
if (c) {
MEMCACHED_CONN_DESTROY(c);
if (c->hdrbuf)
free(c->hdrbuf);
if (c->msglist)
free(c->msglist);
if (c->rbuf)
free(c->rbuf);
if (c->wbuf)
free(c->wbuf);
if (c->ilist)
free(c->ilist);
if (c->suffixlist)
free(c->suffixlist);
if (c->iov)
free(c->iov);
free(c);
}
}
static void conn_close(conn *c) {
assert(c != NULL);
/* delete the event, the socket and the conn */
event_del(&c->event);
if (settings.verbose > 1)
fprintf(stderr, "<%d connection closed.\n", c->sfd);
MEMCACHED_CONN_RELEASE(c->sfd);
close(c->sfd);
accept_new_conns(true);
conn_cleanup(c);
/* if the connection has big buffers, just free it */
if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
conn_free(c);
}
STATS_LOCK();
stats.curr_conns--;
STATS_UNLOCK();
return;
}
/*
* Shrinks a connection's buffers if they're too big. This prevents
* periodic large "get" requests from permanently chewing lots of server
* memory.
*
* This should only be called in between requests since it can wipe output
* buffers!
*/
static void conn_shrink(conn *c) {
assert(c != NULL);
if (IS_UDP(c->transport))
return;
if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
char *newbuf;
if (c->rcurr != c->rbuf)
memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
if (newbuf) {
c->rbuf = newbuf;
c->rsize = DATA_BUFFER_SIZE;
}
/* TODO check other branch... */
c->rcurr = c->rbuf;
}
if (c->isize > ITEM_LIST_HIGHWAT) {
item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
if (newbuf) {
c->ilist = newbuf;
c->isize = ITEM_LIST_INITIAL;
}
/* TODO check error condition? */
}
if (c->msgsize > MSG_LIST_HIGHWAT) {
struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
if (newbuf) {
c->msglist = newbuf;
c->msgsize = MSG_LIST_INITIAL;
}
/* TODO check error condition? */
}
if (c->iovsize > IOV_LIST_HIGHWAT) {
struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
if (newbuf) {
c->iov = newbuf;
c->iovsize = IOV_LIST_INITIAL;
}
/* TODO check return value */
}
}
|