linux 管道的实现分析
1, 数据结构
//inode结点信息结构
struct inode {
...
struct pipe_inode_info *i_pipe;
...
};
//管道缓冲区个数
#define PIPE_BUFFERS (16)
//管道缓存区对象结构
struct pipe_buffer {
struct page *page; //管道缓冲区页框的描述符地址
unsigned int offset, len; //页框内有效数据的当前位置,和有效数据的长度
struct pipe_buf_operations *ops; //管道缓存区方法表的地址
};
//管道信息结构
struct pipe_inode_info {
wait_queue_head_t wait; //管道等待队列
unsigned int nrbufs, curbuf; //包含待读数据的缓冲区数和包含待读数据的第一个缓冲区的索引
struct pipe_buffer bufs[PIPE_BUFFERS]; //管道缓冲区描述符数组
struct page *tmp_page; //高速缓存区页框指针
unsigned int start; //当前管道缓存区读的位置
unsigned int readers; //读进程的标志,或编号
unsigned int writers; //写进程的标志,或编号
unsigned int waiting_writers; //在等待队列中睡眠的写进程的个数
unsigned int r_counter; //与readers类似,但当等待写入FIFO的进程是使用
unsigned int w_counter; //与writers类似,但当等待写入FIFO的进程时使用
struct fasync_struct *fasync_readers; //用于通过信号进行的异步I/O通知
struct fasync_struct *fasync_writers; //用于通过信号的异步I/O通知
};
2, 管道的实现
管道可以看着是打开的文件,但在已安装的文件系统中没有相应的影像。
管道是作为一组VFS对象来实现的,因此没有对应的磁盘映像。
而在2.6中把这些VFS对象组织成pipfs特殊文件系统以加速他们的处理。
但这种文件系统在系统目录树中没有安装点,用户根本看不到它。
2.1 pipefs 文件系统的安装
//文件系统的安装可以参考文件系统的实现一章
static struct super_block *pipefs_get_sb(struct file_system_type *fs_type,
int flags, const char *dev_name, void *data)
{
return get_sb_pseudo(fs_type, "pipe:", NULL, PIPEFS_MAGIC);
}
static struct file_system_type pipe_fs_type = {
.name = "pipefs",
.get_sb = pipefs_get_sb,
.kill_sb = kill_anon_super,
};
static int __init init_pipe_fs(void)
{
int err = register_filesystem(&pipe_fs_type);
if (!err) {
pipe_mnt = kern_mount(&pipe_fs_type);
if (IS_ERR(pipe_mnt)) {
err = PTR_ERR(pipe_mnt);
unregister_filesystem(&pipe_fs_type);
}
}
return err;
}
static void __exit exit_pipe_fs(void)
{
unregister_filesystem(&pipe_fs_type);
mntput(pipe_mnt);
}
2.2 pipe的建立的实现
对于每个管道来说,内核都创建一个inode结点对象,两个file对象,一个用于读,一个用于写。
int do_pipe(int *fd)
{
struct qstr this;
char name[32];
struct dentry *dentry;
struct inode * inode;
struct file *f1, *f2;
int error;
int i,j;
error = -ENFILE;
f1 = get_empty_filp(); //获取文件对象1
if (!f1)
goto no_files;
f2 = get_empty_filp(); //获取文件对象2
if (!f2)
goto close_f1;
inode = get_pipe_inode(); //获取pipe的inode结点
if (!inode)
goto close_f12;
error = get_unused_fd(); //获取没有使用的fd1
if (error < 0)
goto close_f12_inode;
i = error;
error = get_unused_fd(); //获取没有使用的fd2
if (error < 0)
goto close_f12_inode_i;
j = error;
error = -ENOMEM;
sprintf(name, "[%lu]", inode->i_ino); //设置索引节点号
this.name = name;
this.len = strlen(name);
this.hash = inode->i_ino; /* will go */
dentry = d_alloc(pipe_mnt->mnt_sb->s_root, &this); //获取一个目录对象
if (!dentry)
goto close_f12_inode_i_j;
dentry->d_op = &pipefs_dentry_operations;
//把目录对象和inode结点联系在一起
d_add(dentry, inode);
f1->f_vfsmnt = f2->f_vfsmnt = mntget(mntget(pipe_mnt));
f1->f_dentry = f2->f_dentry = dget(dentry);
f1->f_mapping = f2->f_mapping = inode->i_mapping;
/* read file */ //给读描述符的文件对象赋值
f1->f_pos = f2->f_pos = 0; //读的位置从0偏移量开始
f1->f_flags = O_RDONLY; //只读
f1->f_op = &read_pipe_fops; //读操作时执行的函数
f1->f_mode = FMODE_READ; //读模式
f1->f_version = 0;
/* write file */
f2->f_flags = O_WRONLY; //只写
f2->f_op = &write_pipe_fops; //写操作执行函数
f2->f_mode = FMODE_WRITE; //写模式
f2->f_version = 0;
fd_install(i, f1); //给文件对象f1中的fd赋值
fd_install(j, f2); //给文件对象f2中的fd赋值
fd[0] = i; //把值赋给用户空间
fd[1] = j; //把值赋给用户空间
return 0;
close_f12_inode_i_j:
put_unused_fd(j);
close_f12_inode_i:
put_unused_fd(i);
close_f12_inode:
free_pipe_info(inode);
iput(inode);
close_f12:
put_filp(f2);
close_f1:
put_filp(f1);
no_files:
return error;
}
//获取管道的inode结构
static struct inode * get_pipe_inode(void)
{
struct inode *inode = new_inode(pipe_mnt->mnt_sb);
if (!inode)
goto fail_inode;
if(!pipe_new(inode))
goto fail_iput;
PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 1;
inode->i_fop = &rdwr_pipe_fops;
/*
* Mark the inode dirty from the very beginning,
* that way it will never be moved to the dirty
* list because "mark_inode_dirty()" will think
* that it already _is_ on the dirty list.
*/
inode->i_state = I_DIRTY;
inode->i_mode = S_IFIFO | S_IRUSR | S_IWUSR;
inode->i_uid = current->fsuid;
inode->i_gid = current->fsgid;
inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
inode->i_blksize = PAGE_SIZE;
return inode;
fail_iput:
iput(inode);
fail_inode:
return NULL;
}
*管道的读操作的实现
管道读操作的规则如下:
//管道读操作函数
static ssize_t
pipe_readv(struct file *filp, const struct iovec *_iov,
unsigned long nr_segs, loff_t *ppos)
{
struct inode *inode = filp->f_dentry->d_inode; //获取inode结点指针
struct pipe_inode_info *info;
int do_wakeup;
ssize_t ret;
struct iovec *iov = (struct iovec *)_iov; //获取读缓冲区的结构
size_t total_len;
total_len = iov_length(iov, nr_segs);
/* Null read succeeds. */
if (unlikely(total_len == 0))
return 0;
do_wakeup = 0;
ret = 0;
down(PIPE_SEM(*inode)); //获取inode中的i_sem信号量
info = inode->i_pipe; //获取inode 结构的pipe_inode_info结构指针
for (;;) {
int bufs = info->nrbufs; //检查有几个管道缓冲区有被读取的数据
if (bufs) { //说明有其中有缓冲区包含了读数据
int curbuf = info->curbuf; //获取当前读数据的管道缓存区的索引
struct pipe_buffer *buf = info->bufs + curbuf; //共有16个缓冲区,curbuf是当前的
struct pipe_buf_operations *ops = buf->ops; //获取操作函数列表
void *addr;
size_t chars = buf->len;
int error;
//若缓冲区长度大于要求读取的数据长度,chars设置成要求读的长度
if (chars > total_len)
chars = total_len;
//执行Map方法
addr = ops->map(filp, info, buf);
//从缓存区中复制数据
error = pipe_iov_copy_to_user(iov, addr + buf->offset, chars);
//执行umap方法
ops->unmap(info, buf);
if (unlikely(error)) {
if (!ret) ret = -EFAULT; //第一次读失败
break;
}
//更新管道的offset和len字段
ret += chars;
buf->offset += chars;
buf->len -= chars;
//若现在的缓存区的数据长度为0
if (!buf->len) {
buf->ops = NULL;
ops->release(info, buf);
curbuf = (curbuf + 1) & (PIPE_BUFFERS-1);
info->curbuf = curbuf;
info->nrbufs = --bufs;
do_wakeup = 1;
}
total_len -= chars; //更新读的总长度
if (!total_len) //该读的已读完成
break; /* common path: read succeeded */
}
if (bufs) /* More to do? */
continue;
//若bufs为0,说明所有管道为NULL,此时进行一下操作
if (!PIPE_WRITERS(*inode)) //是否有写操作正在进行
break;
if (!PIPE_WAITING_WRITERS(*inode)) { //是否需要等待
/* syscall merging: Usually we must not sleep
* if O_NONBLOCK is set, or if we got some data.
* But if a writer sleeps in kernel space, then
* we can wait for that data without violating POSIX.
*/
if (ret)
break;
if (filp->f_flags & O_NONBLOCK) { //要等待但又设置了NONBLOCK标记,矛盾了
ret = -EAGAIN;
break;
}
}
if (signal_pending(current)) { //设置进程阻塞标志
if (!ret) ret = -ERESTARTSYS;
break;
}
if (do_wakeup) {
wake_up_interruptible_sync(PIPE_WAIT(*inode));
kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT);
}
pipe_wait(inode);
}
up(PIPE_SEM(*inode));
/* Signal writers asynchronously that there is more room. */
if (do_wakeup) {
wake_up_interruptible(PIPE_WAIT(*inode));
kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT);
}
if (ret > 0)
file_accessed(filp); //更新文件结构的atime对象
return ret;
}
static ssize_t
pipe_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
{
struct iovec iov = { .iov_base = buf, .iov_len = count };
return pipe_readv(filp, &iov, 1, ppos);
}
/* Drop the inode semaphore and wait for a pipe event, atomically */
void pipe_wait(struct inode * inode)
{
DEFINE_WAIT(wait);
//把current添加到管道的等待队列中
prepare_to_wait(PIPE_WAIT(*inode), &wait, TASK_INTERRUPTIBLE);
//释放i_sem
up(PIPE_SEM(*inode));
schedule();
//被呼醒,把它从等待队列中删除
finish_wait(PIPE_WAIT(*inode), &wait);
//再次获取i_sem索引节点信号量
down(PIPE_SEM(*inode));
}
*管道的写操作
在管道创建函数do_pipe中可以看到,管道的写操作结构是write_pipe_fops,
该操作列表中的写操作是调用pipe_write实现的。
POSIX标准定义了写操作的一些规则:
static ssize_t pipe_writev(struct file *filp, const struct iovec *_iov, unsigned long nr_segs, loff_t *ppos) { struct inode *inode = filp->f_dentry->d_inode; struct pipe_inode_info *info; ssize_t ret; int do_wakeup; struct iovec *iov = (struct iovec *)_iov; size_t total_len;
total_len = iov_length(iov, nr_segs); /* Null write succeeds. */ if (unlikely(total_len == 0)) return 0;
do_wakeup = 0; ret = 0; down(PIPE_SEM(*inode)); info = inode->i_pipe;
//是否有读者进程存在,若没有写管道操作就没有任何意义
//此时产生SIGPIPE信号
if (!PIPE_READERS(*inode)) { send_sig(SIGPIPE, current, 0); ret = -EPIPE; goto out; }
/* We try to merge small writes */ //若有待读数据的缓冲区,而且写入的数据长度小于PAGE_SIZE
if (info->nrbufs && total_len < PAGE_SIZE) { //第一个待读缓冲区+可读缓冲区数-1得到第一个可写缓冲区的地址
int lastbuf = (info->curbuf + info->nrbufs - 1) & (PIPE_BUFFERS-1); struct pipe_buffer *buf = info->bufs + lastbuf; struct pipe_buf_operations *ops = buf->ops; int offset = buf->offset + buf->len; //若可写缓冲区的剩余的空间大于写入的数据总量total_len
if (ops->can_merge && offset + total_len <= PAGE_SIZE) { void *addr = ops->map(filp, info, buf); //把数据复制到管道缓冲区
int error = pipe_iov_copy_from_user(offset + addr, iov, total_len); ops->unmap(info, buf); ret = error; do_wakeup = 1; if (error) goto out; //更新有效数据长度字段
buf->len += total_len; ret = total_len; goto out; } }
// 若全部可写(可读缓冲区数为0),
// 或写入数据长度大于管道缓冲区的长度单位(PAGE_SIZE)
for (;;) { int bufs; //是否有读者进程存在
if (!PIPE_READERS(*inode)) { send_sig(SIGPIPE, current, 0); if (!ret) ret = -EPIPE; break; } //获取读缓冲区数
bufs = info->nrbufs; if (bufs < PIPE_BUFFERS) { ssize_t chars; //用第一个可读缓冲区+可读缓冲区数得到可写(空)缓冲区的地址
int newbuf = (info->curbuf + bufs) & (PIPE_BUFFERS-1); struct pipe_buffer *buf = info->bufs + newbuf; struct page *page = info->tmp_page; int error;
//若page的值为空,从伙伴系统中获取一页
if (!page) { page = alloc_page(GFP_HIGHUSER); if (unlikely(!page)) { ret = ret ? : -ENOMEM; break; } info->tmp_page = page; } /* Always wakeup, even if the copy fails. Otherwise * we lock up (O_NONBLOCK-)readers that sleep due to * syscall merging. * FIXME! Is this really true? */ do_wakeup = 1; chars = PAGE_SIZE; if (chars > total_len) chars = total_len;
//写chars字节到缓冲区中
error = pipe_iov_copy_from_user(kmap(page), iov, chars); kunmap(page); if (unlikely(error)) { if (!ret) ret = -EFAULT; break; } ret += chars;
/* Insert it into the buffer array */ /更新nrbufs,和len字段。 buf->page = page; buf->ops = &anon_pipe_buf_ops; buf->offset = 0; buf->len = chars; info->nrbufs = ++bufs; info->tmp_page = NULL;
//若没有写完继续写入剩下的数据
total_len -= chars; if (!total_len) break; } //还有可写缓冲区,继续写
if (bufs < PIPE_BUFFERS) continue; //若设置非阻塞,
//若没有写入任何的数据ret=0,此时返回错误
//若已经写完了数据,结束写操作。
if (filp->f_flags & O_NONBLOCK) { if (!ret) ret = -EAGAIN; break; } if (signal_pending(current)) { if (!ret) ret = -ERESTARTSYS; break; } if (do_wakeup) { wake_up_interruptible_sync(PIPE_WAIT(*inode)); kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN); do_wakeup = 0; } PIPE_WAITING_WRITERS(*inode)++; pipe_wait(inode); PIPE_WAITING_WRITERS(*inode)--; } out: up(PIPE_SEM(*inode)); if (do_wakeup) { wake_up_interruptible(PIPE_WAIT(*inode)); kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN); } if (ret > 0) inode_update_time(inode, 1); /* mtime and ctime */ return ret; }
|