Is it safe to share the same Epoll fd (not socket fd) among several
threads.
Yes, it is safe - the epoll(7) interface is thread-safe - but you should be careful when doing so, you should at least use EPOLLET (edge-triggered mode, as opposed to the default level-triggered) to
avoid spurious wake-ups in other threads. This is because
level-triggered mode will wake up every thread when a new event is
available for processing. Since only one thread will be dealing with it,
this would wake up most threads unnecessarily.
If shared epfd is used will each thread have to pass its own events
array or a shared events array to epoll_wait()
Yes, you need a separate events array on each thread, or else you'll
have race conditions and nasty things can happen. For example, you might
have a thread that is still iterating through the events returned by epoll_wait(2) and processing the requests when suddenly another thread calls epoll_wait(2) with the same array and then the events get overwritten at the same time the other thread is reading them. Not good! You absolutely need a separate array for each thread.
Assuming you do have a separate array for each thread, either
possibility - waiting on the same epoll fd or have a separate epoll fd
for each thread - will work equally well, but note that the semantics
are different. With a globally shared epoll fd, every thread waits for a
request from any client, because clients are all added to the
same epoll fd. With a separate epoll fd for each thread, then each
thread is essentially responsible for a subset of clients (those clients
that were accepted by that thread).
This may be irrelevant for your system, or it may make a huge
difference. For example, it may happen that a thread is unfortunate
enough to get a group of power users that make heavy and frequent
requests, leaving that thread overworked, while other threads with less
aggressive clients are almost idle. Wouldn't that be unfair? On the
other hand, maybe you'd like to have only some threads dealing with a
specific class of users, and in that case maybe it makes sense to have
different epoll fds on each thread. As usual, you need to consider both
possibilities, evaluate trade offs, think about your specific problem,
and make a decision.
Below is an example using a globally shared epoll fd. I originally
didn't plan to do all of this, but one thing led to another, and, well,
it was fun and I think it may help you get started. It's an echo server
that listens on port 3000 and has a pool of 20 threads using epoll to
concurrently accept new clients and serve requests.
-
// g++ -lpthread server.c -o server_c
-
#include <stdio.h>
-
#include <stdlib.h>
-
#include <inttypes.h>
-
#include <errno.h>
-
#include <string.h>
-
#include <pthread.h>
-
#include <assert.h>
-
#include <unistd.h>
-
#include <sys/types.h>
-
#include <sys/socket.h>
-
#include <arpa/inet.h>
-
#include <sys/epoll.h>
-
-
#define SERVERPORT 1111
-
#define SERVERBACKLOG 10
-
#define THREADSNO 20
-
#define EVENTS_BUFF_SZ 256
-
-
static int serversock;
-
static int epoll_fd;
-
static pthread_t threads[THREADSNO];
-
-
int accept_new_client(void)
-
{
-
-
int clientsock;
-
struct sockaddr_in addr;
-
socklen_t addrlen = sizeof(addr);
-
if ((clientsock = accept(serversock, (struct sockaddr *)&addr, &addrlen)) < 0) {
-
return -1;
-
}
-
-
char ip_buff[INET_ADDRSTRLEN + 1];
-
if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) {
-
close(clientsock);
-
return -1;
-
}
-
-
printf("*** [%p] Client connected from %s:%d\n", (void *)pthread_self(), ip_buff, ntohs(addr.sin_port));
-
-
struct epoll_event epevent;
-
epevent.events = EPOLLIN | EPOLLET;
-
epevent.data.fd = clientsock;
-
-
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientsock, &epevent) < 0) {
-
perror("epoll_ctl(2) failed attempting to add new client");
-
close(clientsock);
-
return -1;
-
}
-
-
return 0;
-
}
-
-
int handle_request(int clientfd)
-
{
-
char readbuff[512];
-
struct sockaddr_in addr;
-
socklen_t addrlen = sizeof(addr);
-
ssize_t n;
-
-
if ((n = recv(clientfd, readbuff, sizeof(readbuff) - 1, 0)) < 0) {
-
return -1;
-
}
-
-
if (n == 0) {
-
return 0;
-
}
-
-
readbuff[n] = '\0';
-
-
if (getpeername(clientfd, (struct sockaddr *)&addr, &addrlen) < 0) {
-
return -1;
-
}
-
-
char ip_buff[INET_ADDRSTRLEN + 1];
-
if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) {
-
return -1;
-
}
-
-
printf("*** [%p] [%s:%d] -> server: %s", (void *)pthread_self(), ip_buff, ntohs(addr.sin_port), readbuff);
-
-
ssize_t sent;
-
if ((sent = send(clientfd, readbuff, n, 0)) < 0) {
-
return -1;
-
}
-
-
readbuff[sent] = '\0';
-
-
printf("*** [%p] server -> [%s:%d]: %s", (void *)pthread_self(), ip_buff, ntohs(addr.sin_port), readbuff);
-
-
return 0;
-
}
-
-
void *worker_thr(void *args)
-
{
-
struct epoll_event *events = (struct epoll_event *)malloc(sizeof(*events) * EVENTS_BUFF_SZ);
-
if (events == NULL) {
-
perror("malloc(3) failed when attempting to allocate events buffer");
-
pthread_exit(NULL);
-
}
-
-
int events_cnt;
-
while ((events_cnt = epoll_wait(epoll_fd, events, EVENTS_BUFF_SZ, -1)) > 0) {
-
int i;
-
for (i = 0; i < events_cnt; i++) {
-
assert(events[i].events & EPOLLIN);
-
-
if (events[i].data.fd == serversock) {
-
if (accept_new_client() == -1) {
-
fprintf(stderr, "Error accepting new client: %s\n", strerror(errno));
-
}
-
} else {
-
if (handle_request(events[i].data.fd) == -1) {
-
fprintf(stderr, "Error handling request: %s\n", strerror(errno));
-
}
-
}
-
}
-
}
-
-
if (events_cnt == 0) {
-
fprintf(stderr, "epoll_wait(2) returned 0, but timeout was not specified...?");
-
} else {
-
perror("epoll_wait(2) error");
-
}
-
-
free(events);
-
-
return NULL;
-
}
-
-
int main(void)
-
{
-
if ((serversock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
-
perror("socket(2) failed");
-
exit(EXIT_FAILURE);
-
}
-
-
struct sockaddr_in serveraddr;
-
serveraddr.sin_family = AF_INET;
-
serveraddr.sin_port = htons(SERVERPORT);
-
serveraddr.sin_addr.s_addr = INADDR_ANY;
-
-
if (bind(serversock, (const struct sockaddr *)&serveraddr, sizeof(serveraddr)) < 0) {
-
perror("bind(2) failed");
-
exit(EXIT_FAILURE);
-
}
-
-
if (listen(serversock, SERVERBACKLOG) < 0) {
-
perror("listen(2) failed");
-
exit(EXIT_FAILURE);
-
}
-
-
if ((epoll_fd = epoll_create(1)) < 0) {
-
perror("epoll_create(2) failed");
-
exit(EXIT_FAILURE);
-
}
-
-
struct epoll_event epevent;
-
epevent.events = EPOLLIN | EPOLLET;
-
epevent.data.fd = serversock;
-
-
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, serversock, &epevent) < 0) {
-
perror("epoll_ctl(2) failed on main server socket");
-
exit(EXIT_FAILURE);
-
}
-
-
int i;
-
for (i = 0; i < THREADSNO; i++) {
-
if (pthread_create(&threads[i], NULL, worker_thr, NULL) < 0) {
-
perror("pthread_create(3) failed");
-
exit(EXIT_FAILURE);
-
}
-
}
-
-
/* main thread also contributes as worker thread */
-
worker_thr(NULL);
-
-
return 0;
-
}
A couple of notes:
-
main() should return int, not void (as you show in your example)
-
Always deal with error return codes. It is very common to ignore them and when things break it's hard to know what happened.
-
The code assumes that no request is larger than 511 bytes (as seen by the buffer size in handle_request()). If a request is greater than this, it is possible that some data is left in the socket for a very long time, because epoll_wait(2) will not report it until a new event occurs on that file descriptor (because we're using EPOLLET). In the worst case, the client may never actually send any new data, and wait for a reply forever.
-
The code that prints the thread identifier for each request assumes that pthread_t is an opaque pointer type. Indeed, pthread_t is a pointer type in Linux, but it may be an integer type in other
platforms, so this is not portable. However, that is probably not much
of a problem, since epoll is Linux specific, so the code is not portable
anyway.
-
It assumes that no other requests from the same client arrive when a
thread is still serving a request from that client. If a new request
arrives in the meantime and another thread starts serving it, we have a
race condition and the client will not necessarily receive the echo
messages in the same order he sent them (however, write(2) is atomic, so while the replies may be out of order, they will not intersperse).
阅读(1460) | 评论(0) | 转发(0) |