Chinaunix首页 | 论坛 | 博客
  • 博客访问: 79323
  • 博文数量: 83
  • 博客积分: 10
  • 博客等级: 民兵
  • 技术积分: 20
  • 用 户 组: 普通用户
  • 注册时间: 2012-03-30 00:36
文章分类

全部博文(83)

文章存档

2014年(83)

我的朋友

分类: C/C++

2014-07-17 14:38:06

在前面的示例libaio_test.c和native_aio_test.c中,可以看到对磁盘aio请求(本文的aio都指此类)的使用有阻塞等待,这明显之处为对io_getevents()函数(当然,其它函数,比如io_submit()也有一定程度的阻塞)的调用,它会等待并获取已完成的io请求,如果当前没有或少于指定数目的io请求完成,那么就会等待直到timeout。

io_getevents()函数的等待会导致整个进程的阻塞使得程序无法继续向下执行,如果程序还有其它阻塞点,那么有必要想办法把这多处等待合而为一同时进行,从而提高并行性,也就是通常所说的select/等这类多路复用技术。

本文就以epoll为例,介绍一下在linux下,如何把aio结合并应用到epoll机制里。我们知道,epoll机制的最大好处就是它能够在同一时刻对多个文件描述符(通常是由众多套接字形成的描述符集合)进行监听,并将其上发生的读/写(或错误等)事件通知给应用程序,也就是做到时间上的复用。如果能够把aio也放到epoll机制里,即把aio当作epoll机制里的“一路io”,那么就能使得aio与其它可能的等待操作(比如:读/写套接字)共同工作,从而达到时间复用的目的。

作为epoll机制里的“一路io”,需要一个文件描述符来反馈对应的发生事件,而对于纯aio而言,是没有文件描述符作为代表的,因此linux系统上多出了一个()的系统调用:

1
2
3
#include
 
int eventfd(unsigned int initval, int flags);

当然,这个系统调用是否就是因此原因才出现,我不得而知(也没去细查),但要把aio应用到epoll机制里,的确少不了它。从man手册可以看到,eventfd()函数的作用是提供一种让内核通知应用程序有事件发生的机制。根据给定参数的不同,对eventfd进行read()的语义也有所不同,看本文aio应用的场景情况:

1
int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);

对该描述符efd进行read(),如果读取成功,那么将返回8-byte的整型数据,而该数据也就是表示已经完成的aio请求个数。

充当中间桥梁的eventfd有了,并且eventfd()函数返回的描述符可以添加到epoll机制内,因此剩下需要做的就是把eventfd与aio联系起来,而目前aio当然已经有了这个支持,不过,由于native aio的相关结构体有两套封装,即一种是libaio的封装,一种是内核的直接封装(便于直接使用aio),比如iocb:
libaio的封装(来之:/usr/include/libaio.h):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
struct io_iocb_common {
    PADDEDptr(void  *buf, __pad1);
    PADDEDul(nbytes, __pad2);
    long long   offset;
    long long   __pad3;
    unsigned    flags;
    unsigned    resfd;
};  /* result code is the amount read or -'ve errno */
 
struct io_iocb_vector {
    const struct iovec  *vec;
    int         nr;
    long long       offset;
};  /* result code is the amount read or -'ve errno */
 
struct iocb {
    PADDEDptr(void *data, __pad1);  /* Return in the io completion event */
    PADDED(unsigned key, __pad2);   /* For use in identifying io requests */
 
    short       aio_lio_opcode;
    short       aio_reqprio;
    int     aio_fildes;
 
    union {
        struct io_iocb_common       c;
        struct io_iocb_vector       v;
        struct io_iocb_poll     poll;
        struct io_iocb_sockaddr saddr;
    } u;
};

内核的封装(来之:/usr/include/linux/aio_abi.h或):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/*
 /* we always use a 64bit off_t when communicating
 * with userland.  its up to libraries to do the
 * proper padding and aio_error abstraction
 */
struct iocb {
    /* these are internal to the kernel/libc. */
    __u64   aio_data;   /* data to be returned in event's data */
    __u32   PADDED(aio_key, aio_reserved1);
    /* the kernel sets aio_key to the req # */
   /* common fields */
    __u16   aio_lio_opcode; /* see IOCB_CMD_ above */
    __s16   aio_reqprio;
    __u32   aio_fildes;
 
    __u64   aio_buf;
    __u64   aio_nbytes;
    __s64   aio_offset;
 
    /* extra parameters */
    __u64   aio_reserved2;  /* TODO: use this for a (struct sigevent *) */
    /* flags for the "struct iocb" */
    __u32   aio_flags;

    /*
   * if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an
    /* eventfd to signal http://lenky.info/tag/aio/" title="查看 AIO 中的全部文章">AIO readiness to
     */
    __u32   aio_resfd;
}; /* 64 bytes */

两个结构体是等价的,只是字段名称有所不同而已,此处仅看内核封装的情况(后续将提到nginx对aio的使用实现,而nginx是采用的就是syscall手动封装),有一段很明显的英文注释出卖了aio对eventfd的使用支持,即两个字段:aio_flags与aio_resfd,详细来说就是将aio_flags打上IOCB_FLAG_RESFD标记并且将eventfd()函数返回的描述符设置到aio_resfd即可。

废话少说,看两个示例,第一个来之:http://blog.sina.com.cn/s/blog_6b19f21d0100znza.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#define _GNU_SOURCE
#define __STDC_FORMAT_MACROS
 
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
 
#define TEST_FILE   "aio_test_file"
#define TEST_FILE_SIZE  (127 * 1024)
#define NUM_EVENTS  128
#define ALIGN_SIZE  512
#define RD_WR_SIZE  1024
 
struct custom_iocb
{
    struct iocb iocb;
    int nth_request;
};
 
void aio_callback(io_context_t ctx, struct iocb *iocb, long res, long res2)
{
    struct custom_iocb *iocbp = (struct custom_iocb *)iocb;
    printf("nth_request: %d, request_type: %s, offset: %lld, length: %lu, res: %ld, res2: %ld\n",
            iocbp->nth_request, (iocb->aio_lio_opcode == IO_CMD_PREAD) ? "READ" : "WRITE",
            iocb->u.c.offset, iocb->u.c.nbytes, res, res2);
}
 
int main(int argc, char *argv[])
{
    int efd, fd, epfd;
    io_context_t ctx;
    struct timespec tms;
    struct io_event events[NUM_EVENTS];
    struct custom_iocb iocbs[NUM_EVENTS];
    struct iocb *iocbps[NUM_EVENTS];
    struct custom_iocb *iocbp;
    int i, j, r;
    void *buf;
    struct epoll_event epevent;
 
    efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (efd == -1) {
        perror("eventfd");
        return 2;
    }
 
    fd = open(TEST_FILE, O_RDWR | O_CREAT | O_DIRECT, 0644);
    if (fd == -1) {
        perror("open");
        return 3;
    }
    ftruncate(fd, TEST_FILE_SIZE);
 
    ctx = 0;
    if (io_setup(8192, &ctx)) {
        perror("io_setup");
        return 4;
    }
 
    if (posix_memalign(&buf, ALIGN_SIZE, RD_WR_SIZE)) {
        perror("posix_memalign");
        return 5;
    }
    printf("buf: %p\n", buf);
 
    for (i = 0, iocbp = iocbs; i < NUM_EVENTS; ++i, ++iocbp) {
        iocbps[i] = &iocbp->iocb;
        io_prep_pread(&iocbp->iocb, fd, buf, RD_WR_SIZE, i * RD_WR_SIZE);
        io_set_eventfd(&iocbp->iocb, efd);
        io_set_callback(&iocbp->iocb, aio_callback);
        iocbp->nth_request = i + 1;
    }
 
    if (io_submit(ctx, NUM_EVENTS, iocbps) != NUM_EVENTS) {
        perror("io_submit");
        return 6;
    }
 
    epfd = epoll_create(1);
    if (epfd == -1) {
        perror("epoll_create");
        return 7;
    }
 
    epevent.events = EPOLLIN | EPOLLET;
    epevent.data.ptr = NULL;
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent)) {
        perror("epoll_ctl");
        return 8;
    }
 
    i = 0;
    while (i < NUM_EVENTS) {
        uint64_t finished_aio;
 
        if (epoll_wait(epfd, &epevent, 1, -1) != 1) {
            perror("epoll_wait");
            return 9;
        }
 
        if (read(efd, &finished_aio, sizeof(finished_aio)) != sizeof(finished_aio)) {
            perror("read");
            return 10;
        }
 
        printf("finished io number: %"PRIu64"\n", finished_aio);
 
        while (finished_aio > 0) {
            tms.tv_sec = 0;
            tms.tv_nsec = 0;
            r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms);
            if (r > 0) {
                for (j = 0; j < r; ++j) {
                    ((io_callback_t)(events[j].data))(ctx, events[j].obj, events[j].res, events[j].res2);
                }
                i += r;
                finished_aio -= r;
            }
        }
    }
 
    close(epfd);
    free(buf);
    io_destroy(ctx);
    close(fd);
    close(efd);
    remove(TEST_FILE);
 
    return 0;
}

编译执行,OK无误(特别注意:上面示例代码仅只是演示aio+eventfd+epoll的使用,而细节部分是有严重bug的,比如所有请求共用一个缓存区buf):

1
2
[root@www 1]# gcc t.c -laio
[root@www 1]# ./a.out

上面示例采用了libaio库,试试syscall简单封装(由上面示例修改而来):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
/**
 /* gcc aio_eventfd_epoll.c -o aio_eventfd_epoll
 * modified by: http://lenky.info/
 */
#define _GNU_SOURCE
#define __STDC_FORMAT_MACROS
 
#include
#include             /* for perror() */
#include            /* for syscall() */
#include       /* for __NR_* definitions */
#include     /* for AIO types and constants */
#include             /* O_RDWR */
#include            /* memset() */
#include          /* uint64_t */
#include
 
#define TEST_FILE   "aio_test_file"
#define TEST_FILE_SIZE  (128 * 1024)
#define NUM_EVENTS  128
#define ALIGN_SIZE  512
#define RD_WR_SIZE  1024
 
inline int io_setup(unsigned nr, aio_context_t *ctxp)
{
    return syscall(__NR_io_setup, nr, ctxp);
}
 
inline int io_submit(aio_context_t ctx, long nr,  struct iocb **iocbpp)
{
    return syscall(__NR_io_submit, ctx, nr, iocbpp);
}
 
inline int io_getevents(aio_context_t ctx, long min_nr, long max_nr,
        struct io_event *events, struct timespec *timeout)
{
    return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout);
}
 
inline int io_destroy(aio_context_t ctx)
{
    return syscall(__NR_io_destroy, ctx);
}
 
inline int eventfd2(unsigned int initval, int flags)
{
    return syscall(__NR_eventfd2, initval, flags);
}
 
struct custom_iocb
{
    struct iocb iocb;
    int nth_request;
};
 
typedef void io_callback_t(aio_context_t ctx, struct iocb *iocb, long res, long res2);
 
void aio_callback(aio_context_t ctx, struct iocb *iocb, long res, long res2)
{
    struct custom_iocb *iocbp = (struct custom_iocb *)iocb;
    printf("nth_request: %d, request_type: %s, offset: %lld, length: %lu, res: %ld, res2: %ld\n",
            iocbp->nth_request, (iocb->aio_lio_opcode == IOCB_CMD_PREAD) ? "READ" : "WRITE",
            iocb->aio_offset, iocb->aio_nbytes, res, res2);
}
 
int main(int argc, char *argv[])
{
    int efd, fd, epfd;
    aio_context_t ctx;
    struct timespec tms;
    struct io_event events[NUM_EVENTS];
    struct custom_iocb iocbs[NUM_EVENTS];
    struct iocb *iocbps[NUM_EVENTS];
    struct custom_iocb *iocbp;
    int i, j, r;
    void *buf;
    void *aio_buf;
    struct epoll_event epevent;
 
    efd = eventfd2(0, O_NONBLOCK | O_CLOEXEC);
    if (efd == -1) {
        perror("eventfd2");
        return 2;
    }
 
    fd = open(TEST_FILE, O_RDWR | O_CREAT | O_DIRECT, 0644);
    if (fd == -1) {
        perror("open");
        return 3;
    }
    ftruncate(fd, TEST_FILE_SIZE);
 
    ctx = 0;
    if (io_setup(NUM_EVENTS, &ctx)) {
        perror("io_setup");
        return 4;
    }
 
    if (posix_memalign(&buf, ALIGN_SIZE, RD_WR_SIZE * NUM_EVENTS)) {
        perror("posix_memalign");
        return 5;
    }
    printf("buf: %p\n", buf);
 
    for (i = 0, iocbp = iocbs; i < NUM_EVENTS; ++i, ++iocbp) {
        aio_buf = (void *)((char *)buf + (i*RD_WR_SIZE));
        memset(aio_buf, 0, RD_WR_SIZE);
 
        //io_prep_pread(&iocbp->iocb, fd, buf, RD_WR_SIZE, i * RD_WR_SIZE);
        iocbp->iocb.aio_fildes = fd;
        iocbp->iocb.aio_lio_opcode = IOCB_CMD_PREAD;
        iocbp->iocb.aio_buf = (uint64_t)aio_buf;
        iocbp->iocb.aio_offset = i * RD_WR_SIZE;
        iocbp->iocb.aio_nbytes = RD_WR_SIZE;
 
        //io_set_eventfd(&iocbp->iocb, efd);
        iocbp->iocb.aio_flags = IOCB_FLAG_RESFD;
        iocbp->iocb.aio_resfd = efd;
 
        //io_set_callback(&iocbp->iocb, aio_callback);
        iocbp->iocb.aio_data = (__u64)aio_callback;
 
        iocbp->nth_request = i + 1;
        iocbps[i] = &iocbp->iocb;
    }
 
    if (io_submit(ctx, NUM_EVENTS, iocbps) != NUM_EVENTS) {
        perror("io_submit");
        return 6;
    }
 
    epfd = epoll_create(1);
    if (epfd == -1) {
        perror("epoll_create");
        return 7;
    }
 
    epevent.events = EPOLLIN | EPOLLET;
    epevent.data.ptr = NULL;
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent)) {
        perror("epoll_ctl");
        return 8;
    }
 
    i = 0;
    while (i < NUM_EVENTS) {
        uint64_t finished_aio;
 
        if (epoll_wait(epfd, &epevent, 1, -1) != 1) {
            perror("epoll_wait");
            return 9;
        }
 
        if (read(efd, &finished_aio, sizeof(finished_aio)) != sizeof(finished_aio)) {
            perror("read");
            return 10;
        }
 
        printf("finished io number: %"PRIu64"\n", finished_aio);
 
        while (finished_aio > 0) {
            tms.tv_sec = 0;
            tms.tv_nsec = 0;
            r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms);
            if (r > 0) {
                for (j = 0; j < r; ++j) {
                    ((io_callback_t *)(events[j].data))(ctx, (struct iocb *)events[j].obj, events[j].res, events[j].res2);
                }
                i += r;
                finished_aio -= r;
            }
        }
    }
 
    close(epfd);
    free(buf);
    io_destroy(ctx);
    close(fd);
    close(efd);
    remove(TEST_FILE);
 
    return 0;
}


阅读(645) | 评论(0) | 转发(0) |
0

上一篇:redis相关知识

下一篇:nginx, php, mysql 2222

给主人留下些什么吧!~~