Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1875784
  • 博文数量: 184
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 2388
  • 用 户 组: 普通用户
  • 注册时间: 2016-12-21 22:26
个人简介

90后空巢老码农

文章分类

全部博文(184)

文章存档

2021年(26)

2020年(56)

2019年(54)

2018年(47)

2017年(1)

我的朋友

分类: NOSQL

2019-05-19 23:03:26

redis当中的rio的话就是在后端执行读写操作,bio相当于它的一个补充操作,它使用一个大结构体将操作全部封装起来,有点像linux当中驱动的写法,方便统一接口,具体结构体如下:

点击(此处)折叠或打开

  1. struct _rio {
  2.     /* Backend functions.
  3.      * Since this functions do not tolerate short writes or reads the return
  4.      * value is simplified to: zero on error, non zero on complete success. */
  5.     size_t (*read)(struct _rio *, void *buf, size_t len);
  6.     size_t (*write)(struct _rio *, const void *buf, size_t len);
  7.     off_t (*tell)(struct _rio *);
  8.     int (*flush)(struct _rio *);
  9.     /* The update_cksum method if not NULL is used to compute the checksum of
  10.      * all the data that was read or written so far. The method should be
  11.      * designed so that can be called with the current checksum, and the buf
  12.      * and len fields pointing to the new block of data to add to the checksum
  13.      * computation. */
  14.     void (*update_cksum)(struct _rio *, const void *buf, size_t len);

  15.     /* The current checksum */
  16.     uint64_t cksum;

  17.     /* number of bytes read or written */
  18.     size_t processed_bytes;

  19.     /* maximum single read or write chunk size */
  20.     size_t max_processing_chunk;

  21.     /* Backend-specific vars. */
  22.     union {
  23.         /* In-memory buffer target. */
  24.         struct {
  25.             sds ptr;
  26.             off_t pos;
  27.         } buffer;
  28.         /* Stdio file pointer target. */
  29.         struct {
  30.             FILE *fp;
  31.             off_t buffered; /* Bytes written since last fsync. */
  32.             off_t autosync; /* fsync after 'autosync' bytes written. */
  33.         } file;
  34.         /* Multiple FDs target (used to write to N sockets). */
  35.         struct {
  36.             int *fds; /* File descriptors. */
  37.             int *state; /* Error state of each fd. 0 (if ok) or errno. */
  38.             int numfds;
  39.             off_t pos;
  40.             sds buf;
  41.         } fdset;
  42.     } io;
  43. };

  44. typedef struct _rio rio;
由结构可知,rio会处理三种不同的read和write,分别为
1. 内存buffer,内部由sds表示
2. 文件指针,内部由FILE*表示
3. 文件描述符数组,内部由int *表示
rio结构留给外部调用的接口有read,write,tell,flush和update_cksum,而整个rio留给外部调用的接口是将结构体的接口进一步封装了:

点击(此处)折叠或打开

  1. static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
  2.     while (len) {
  3.         size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
  4.         if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
  5.         if (r->write(r,buf,bytes_to_write) == 0) /*0表示失败;非0表示成功*/
  6.             return 0;
  7.         buf = (char*)buf + bytes_to_write;
  8.         len -= bytes_to_write;
  9.         r->processed_bytes += bytes_to_write;
  10.     }
  11.     return 1;
  12. }

  13. static inline size_t rioRead(rio *r, void *buf, size_t len) {
  14.     while (len) {
  15.         size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
  16.         if (r->read(r,buf,bytes_to_read) == 0)/*0表示失败;非0表示成功*/
  17.             return 0;
  18.         if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
  19.         buf = (char*)buf + bytes_to_read;
  20.         len -= bytes_to_read;
  21.         r->processed_bytes += bytes_to_read;
  22.     }
  23.     return 1;
  24. }

  25. static inline off_t rioTell(rio *r) {
  26.     return r->tell(r);
  27. }

  28. static inline int rioFlush(rio *r) {
  29.     return r->flush(r);
  30. }

接下来聊聊内部实现,还是按照之前说过的顺序内存buffer-> 文件指针->文件描述符数组来进行,其总体思路就是自定义对应类别的io函数,然后实例化结构体,将对应io函数填入结构体
1. 内存buffer主要是针对sds的封装

点击(此处)折叠或打开

  1. /* Returns 1 or 0 for success/failure. */
  2. static size_t rioBufferWrite(rio *r, const void *buf, size_t len) {
  3.     r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len);
  4.     r->io.buffer.pos += len;
  5.     return 1;
  6. }

  7. /* Returns 1 or 0 for success/failure. */
  8. static size_t rioBufferRead(rio *r, void *buf, size_t len) {
  9.     if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
  10.         return 0; /* not enough buffer to return len bytes. */
  11.     memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);
  12.     r->io.buffer.pos += len;
  13.     return 1;
  14. }

  15. /* Returns read/write position in buffer. */
  16. static off_t rioBufferTell(rio *r) {
  17.     return r->io.buffer.pos;
  18. }

  19. /* Flushes any buffer to target device if applicable. Returns 1 on success
  20.  * and 0 on failures. */
  21. static int rioBufferFlush(rio *r) {
  22.     UNUSED(r);
  23.     return 1; /* Nothing to do, our write just appends to the buffer. */
  24. }

  25. static const rio rioBufferIO = {
  26.     rioBufferRead,
  27.     rioBufferWrite,
  28.     rioBufferTell,
  29.     rioBufferFlush,
  30.     NULL, /* update_checksum */
  31.     0, /* current checksum */
  32.     0, /* bytes read or written */
  33.     0, /* read/write chunk size */
  34.     { { NULL, 0 } } /* union for io-specific vars */
  35. };

  36. void rioInitWithBuffer(rio *r, sds s) {
  37.     *r = rioBufferIO;
  38.     r->io.buffer.ptr = s;
  39.     r->io.buffer.pos = 0;
  40. }


2. 文件指针操作的封装:

点击(此处)折叠或打开

  1. /* Returns 1 or 0 for success/failure. */
  2. static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
  3.     size_t retval;

  4.     retval = fwrite(buf,len,1,r->io.file.fp);
  5.     r->io.file.buffered += len;

  6.     if (r->io.file.autosync &&
  7.         r->io.file.buffered >= r->io.file.autosync)
  8.     {
  9.         fflush(r->io.file.fp);
  10.         aof_fsync(fileno(r->io.file.fp));
  11.         r->io.file.buffered = 0;
  12.     }
  13.     return retval;
  14. }

  15. /* Returns 1 or 0 for success/failure. */
  16. static size_t rioFileRead(rio *r, void *buf, size_t len) {
  17.     return fread(buf,len,1,r->io.file.fp);
  18. }

  19. /* Returns read/write position in file. */
  20. static off_t rioFileTell(rio *r) {
  21.     return ftello(r->io.file.fp);
  22. }

  23. /* Flushes any buffer to target device if applicable. Returns 1 on success
  24.  * and 0 on failures. */
  25. static int rioFileFlush(rio *r) {
  26.     return (fflush(r->io.file.fp) == 0) ? 1 : 0;
  27. }

  28. static const rio rioFileIO = {
  29.     rioFileRead,
  30.     rioFileWrite,
  31.     rioFileTell,
  32.     rioFileFlush,
  33.     NULL, /* update_checksum */
  34.     0, /* current checksum */
  35.     0, /* bytes read or written */
  36.     0, /* read/write chunk size */
  37.     { { NULL, 0 } } /* union for io-specific vars */
  38. };

  39. void rioInitWithFile(rio *r, FILE *fp) {
  40.     *r = rioFileIO;
  41.     r->io.file.fp = fp;
  42.     r->io.file.buffered = 0;
  43.     r->io.file.autosync = 0;
  44. }


3. 文件描述符数组,这里要注意的是写操作,如果要写的buffer为空,会写入文件描述符数组自身的buffer,并且每个文件描述符最多写1024个字节,可以并行执行,代码如下:

点击(此处)折叠或打开

  1. /* Returns 1 or 0 for success/failure.
  2.  * The function returns success as long as we are able to correctly write
  3.  * to at least one file descriptor.
  4.  *
  5.  * When buf is NULL and len is 0, the function performs a flush operation
  6.  * if there is some pending buffer, so this function is also used in order
  7.  * to implement rioFdsetFlush(). */
  8. static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
  9.     ssize_t retval;
  10.     int j;
  11.     unsigned char *p = (unsigned char*) buf;
  12.     int doflush = (buf == NULL && len == 0);

  13.     /* To start we always append to our buffer. If it gets larger than
  14.      * a given size, we actually write to the sockets. */
  15.     if (len) {
  16.         r->io.fdset.buf = sdscatlen(r->io.fdset.buf,buf,len);
  17.         len = 0; /* Prevent entering the while below if we don't flush. */
  18.         if (sdslen(r->io.fdset.buf) > PROTO_IOBUF_LEN) doflush = 1;
  19.     }

  20.     if (doflush) {
  21.         p = (unsigned char*) r->io.fdset.buf;
  22.         len = sdslen(r->io.fdset.buf);
  23.     }

  24.     /* Write in little chunchs so that when there are big writes we
  25.      * parallelize while the kernel is sending data in background to
  26.      * the TCP socket. */
  27.     while(len) {
  28.         size_t count = len < 1024 ? len : 1024;
  29.         int broken = 0;
  30.         for (j = 0; j < r->io.fdset.numfds; j++) {
  31.             if (r->io.fdset.state[j] != 0) {
  32.                 /* Skip FDs alraedy in error. */
  33.                 broken++;
  34.                 continue;
  35.             }

  36.             /* Make sure to write 'count' bytes to the socket regardless
  37.              * of short writes. */
  38.             size_t nwritten = 0;
  39.             while(nwritten != count) {
  40.                 retval = write(r->io.fdset.fds[j],p+nwritten,count-nwritten);
  41.                 if (retval <= 0) {
  42.                     /* With blocking sockets, which is the sole user of this
  43.                      * rio target, EWOULDBLOCK is returned only because of
  44.                      * the SO_SNDTIMEO socket option, so we translate the error
  45.                      * into one more recognizable by the user. */
  46.                     if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
  47.                     break;
  48.                 }
  49.                 nwritten += retval;
  50.             }

  51.             if (nwritten != count) {
  52.                 /* Mark this FD as broken. */
  53.                 r->io.fdset.state[j] = errno;
  54.                 if (r->io.fdset.state[j] == 0) r->io.fdset.state[j] = EIO;
  55.             }
  56.         }
  57.         if (broken == r->io.fdset.numfds) return 0; /* All the FDs in error. */
  58.         p += count;
  59.         
阅读(11678) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~