redis当中的rio的话就是在后端执行读写操作,bio相当于它的一个补充操作,它使用一个大结构体将操作全部封装起来,有点像linux当中驱动的写法,方便统一接口,具体结构体如下:
-
struct _rio {
-
/* Backend functions.
-
* Since this functions do not tolerate short writes or reads the return
-
* value is simplified to: zero on error, non zero on complete success. */
-
size_t (*read)(struct _rio *, void *buf, size_t len);
-
size_t (*write)(struct _rio *, const void *buf, size_t len);
-
off_t (*tell)(struct _rio *);
-
int (*flush)(struct _rio *);
-
/* The update_cksum method if not NULL is used to compute the checksum of
-
* all the data that was read or written so far. The method should be
-
* designed so that can be called with the current checksum, and the buf
-
* and len fields pointing to the new block of data to add to the checksum
-
* computation. */
-
void (*update_cksum)(struct _rio *, const void *buf, size_t len);
-
-
/* The current checksum */
-
uint64_t cksum;
-
-
/* number of bytes read or written */
-
size_t processed_bytes;
-
-
/* maximum single read or write chunk size */
-
size_t max_processing_chunk;
-
-
/* Backend-specific vars. */
-
union {
-
/* In-memory buffer target. */
-
struct {
-
sds ptr;
-
off_t pos;
-
} buffer;
-
/* Stdio file pointer target. */
-
struct {
-
FILE *fp;
-
off_t buffered; /* Bytes written since last fsync. */
-
off_t autosync; /* fsync after 'autosync' bytes written. */
-
} file;
-
/* Multiple FDs target (used to write to N sockets). */
-
struct {
-
int *fds; /* File descriptors. */
-
int *state; /* Error state of each fd. 0 (if ok) or errno. */
-
int numfds;
-
off_t pos;
-
sds buf;
-
} fdset;
-
} io;
-
};
-
-
typedef struct _rio rio;
由结构可知,rio会处理三种不同的read和write,分别为
1. 内存buffer,内部由sds表示
2. 文件指针,内部由FILE*表示
3. 文件描述符数组,内部由int *表示
rio结构留给外部调用的接口有read,write,tell,flush和update_cksum,而整个rio留给外部调用的接口是将结构体的接口进一步封装了:
-
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
-
while (len) {
-
size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
-
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
-
if (r->write(r,buf,bytes_to_write) == 0) /*0表示失败;非0表示成功*/
-
return 0;
-
buf = (char*)buf + bytes_to_write;
-
len -= bytes_to_write;
-
r->processed_bytes += bytes_to_write;
-
}
-
return 1;
-
}
-
-
static inline size_t rioRead(rio *r, void *buf, size_t len) {
-
while (len) {
-
size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
-
if (r->read(r,buf,bytes_to_read) == 0)/*0表示失败;非0表示成功*/
-
return 0;
-
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
-
buf = (char*)buf + bytes_to_read;
-
len -= bytes_to_read;
-
r->processed_bytes += bytes_to_read;
-
}
-
return 1;
-
}
-
-
static inline off_t rioTell(rio *r) {
-
return r->tell(r);
-
}
-
-
static inline int rioFlush(rio *r) {
-
return r->flush(r);
-
}
接下来聊聊内部实现,还是按照之前说过的顺序内存buffer-> 文件指针->文件描述符数组来进行,其总体思路就是自定义对应类别的io函数,然后实例化结构体,将对应io函数填入结构体
1. 内存buffer主要是针对sds的封装
-
/* Returns 1 or 0 for success/failure. */
-
static size_t rioBufferWrite(rio *r, const void *buf, size_t len) {
-
r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len);
-
r->io.buffer.pos += len;
-
return 1;
-
}
-
-
/* Returns 1 or 0 for success/failure. */
-
static size_t rioBufferRead(rio *r, void *buf, size_t len) {
-
if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
-
return 0; /* not enough buffer to return len bytes. */
-
memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);
-
r->io.buffer.pos += len;
-
return 1;
-
}
-
-
/* Returns read/write position in buffer. */
-
static off_t rioBufferTell(rio *r) {
-
return r->io.buffer.pos;
-
}
-
-
/* Flushes any buffer to target device if applicable. Returns 1 on success
-
* and 0 on failures. */
-
static int rioBufferFlush(rio *r) {
-
UNUSED(r);
-
return 1; /* Nothing to do, our write just appends to the buffer. */
-
}
-
-
static const rio rioBufferIO = {
-
rioBufferRead,
-
rioBufferWrite,
-
rioBufferTell,
-
rioBufferFlush,
-
NULL, /* update_checksum */
-
0, /* current checksum */
-
0, /* bytes read or written */
-
0, /* read/write chunk size */
-
{ { NULL, 0 } } /* union for io-specific vars */
-
};
-
-
void rioInitWithBuffer(rio *r, sds s) {
-
*r = rioBufferIO;
-
r->io.buffer.ptr = s;
-
r->io.buffer.pos = 0;
-
}
2. 文件指针操作的封装:
-
/* Returns 1 or 0 for success/failure. */
-
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
-
size_t retval;
-
-
retval = fwrite(buf,len,1,r->io.file.fp);
-
r->io.file.buffered += len;
-
-
if (r->io.file.autosync &&
-
r->io.file.buffered >= r->io.file.autosync)
-
{
-
fflush(r->io.file.fp);
-
aof_fsync(fileno(r->io.file.fp));
-
r->io.file.buffered = 0;
-
}
-
return retval;
-
}
-
-
/* Returns 1 or 0 for success/failure. */
-
static size_t rioFileRead(rio *r, void *buf, size_t len) {
-
return fread(buf,len,1,r->io.file.fp);
-
}
-
-
/* Returns read/write position in file. */
-
static off_t rioFileTell(rio *r) {
-
return ftello(r->io.file.fp);
-
}
-
-
/* Flushes any buffer to target device if applicable. Returns 1 on success
-
* and 0 on failures. */
-
static int rioFileFlush(rio *r) {
-
return (fflush(r->io.file.fp) == 0) ? 1 : 0;
-
}
-
-
static const rio rioFileIO = {
-
rioFileRead,
-
rioFileWrite,
-
rioFileTell,
-
rioFileFlush,
-
NULL, /* update_checksum */
-
0, /* current checksum */
-
0, /* bytes read or written */
-
0, /* read/write chunk size */
-
{ { NULL, 0 } } /* union for io-specific vars */
-
};
-
-
void rioInitWithFile(rio *r, FILE *fp) {
-
*r = rioFileIO;
-
r->io.file.fp = fp;
-
r->io.file.buffered = 0;
-
r->io.file.autosync = 0;
-
}
3. 文件描述符数组,这里要注意的是写操作,如果要写的buffer为空,会写入文件描述符数组自身的buffer,并且每个文件描述符最多写1024个字节,可以并行执行,代码如下:
-
/* Returns 1 or 0 for success/failure.
-
* The function returns success as long as we are able to correctly write
-
* to at least one file descriptor.
-
*
-
* When buf is NULL and len is 0, the function performs a flush operation
-
* if there is some pending buffer, so this function is also used in order
-
* to implement rioFdsetFlush(). */
-
static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
-
ssize_t retval;
-
int j;
-
unsigned char *p = (unsigned char*) buf;
-
int doflush = (buf == NULL && len == 0);
-
-
/* To start we always append to our buffer. If it gets larger than
-
* a given size, we actually write to the sockets. */
-
if (len) {
-
r->io.fdset.buf = sdscatlen(r->io.fdset.buf,buf,len);
-
len = 0; /* Prevent entering the while below if we don't flush. */
-
if (sdslen(r->io.fdset.buf) > PROTO_IOBUF_LEN) doflush = 1;
-
}
-
-
if (doflush) {
-
p = (unsigned char*) r->io.fdset.buf;
-
len = sdslen(r->io.fdset.buf);
-
}
-
-
/* Write in little chunchs so that when there are big writes we
-
* parallelize while the kernel is sending data in background to
-
* the TCP socket. */
-
while(len) {
-
size_t count = len < 1024 ? len : 1024;
-
int broken = 0;
-
for (j = 0; j < r->io.fdset.numfds; j++) {
-
if (r->io.fdset.state[j] != 0) {
-
/* Skip FDs alraedy in error. */
-
broken++;
-
continue;
-
}
-
-
/* Make sure to write 'count' bytes to the socket regardless
-
* of short writes. */
-
size_t nwritten = 0;
-
while(nwritten != count) {
-
retval = write(r->io.fdset.fds[j],p+nwritten,count-nwritten);
-
if (retval <= 0) {
-
/* With blocking sockets, which is the sole user of this
-
* rio target, EWOULDBLOCK is returned only because of
-
* the SO_SNDTIMEO socket option, so we translate the error
-
* into one more recognizable by the user. */
-
if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
-
break;
-
}
-
nwritten += retval;
-
}
-
-
if (nwritten != count) {
-
/* Mark this FD as broken. */
-
r->io.fdset.state[j] = errno;
-
if (r->io.fdset.state[j] == 0) r->io.fdset.state[j] = EIO;
-
}
-
}
-
if (broken == r->io.fdset.numfds) return 0; /* All the FDs in error. */
-
p += count;
-
阅读(11678) | 评论(0) | 转发(0) |