Chinaunix首页 | 论坛 | 博客
  • 博客访问: 75955
  • 博文数量: 15
  • 博客积分: 1410
  • 博客等级: 上尉
  • 技术积分: 265
  • 用 户 组: 普通用户
  • 注册时间: 2008-09-21 18:15
文章分类

全部博文(15)

文章存档

2009年(8)

2008年(7)

我的朋友

分类: LINUX

2008-10-02 16:12:39

管道机制是所有unix都愿意提供的一种进程通信机制。管道是进程之间一个单向的数据流,在POSIX标准中只定义了半双工管道,既每个进程在使用一个文件描述符之前必须把另一个描述符关闭。在Linux中实现有所不同:每个管道的文件描述符仍然是单向的,但是在使用一个描述符时不必关闭另一个描述符。
管道操作主要涉及两个数据结构,pipe_inode_info和pipe_buffer.
在2.6中管道的缓冲区已经增加到16个,每个缓冲区对应一个页面(在高端内存中),这个改变大大增强了向管道写大量数据的用户态应用的性能。
1.管道的创建:
 
pipe系统调用有sys_pipe处理:
asmlinkage int sys_pipe(unsigned long __user * fildes)
{
 int fd[2];
 int error;
 error = do_pipe(fd);
 if (!error) {
   //将描述符拷贝到用户空间
  if (copy_to_user(fildes, fd, 2*sizeof(int)))
   error = -EFAULT;
 }
 return error;
}
它会调用do_pipe:
int do_pipe(int *fd)
{
       struct file *fw, *fr;
       int error;
       int fdw, fdr;
 
       fw = create_write_pipe();
       if (IS_ERR(fw))
              return PTR_ERR(fw);
       fr = create_read_pipe(fw);
       error = PTR_ERR(fr);
       if (IS_ERR(fr))
              goto err_write_pipe;
 
       error = get_unused_fd();
       if (error < 0)
              goto err_read_pipe;
       fdr = error;
 
       error = get_unused_fd();
       if (error < 0)
              goto err_fdr;
       fdw = error;
 
       error = audit_fd_pair(fdr, fdw);
       if (error < 0)
              goto err_fdw;
 
       fd_install(fdr, fr);
       fd_install(fdw, fw);
       fd[0] = fdr;
       fd[1] = fdw;
 
       return 0;
 
 err_fdw:
       put_unused_fd(fdw);
 err_fdr:
       put_unused_fd(fdr);
 err_read_pipe:
       dput(fr->f_dentry);
       mntput(fr->f_vfsmnt);
       put_filp(fr);
 err_write_pipe:
       free_write_pipe(fw);
       return error;
}
由于创建的过程比较简单,所以代码就不在这讲解了,主要就是用两个file对象引用同一个inode
 
2.管道的读操作:
先看一下读操作的过程:
 
对应的函数是pipe_read:
 
static ssize_t
pipe_read(struct kiocb *iocb, const struct iovec *_iov,
    unsigned long nr_segs, loff_t pos)
{
 struct file *filp = iocb->ki_filp;
 struct inode *inode = filp->f_path.dentry->d_inode;
 struct pipe_inode_info *pipe;
 int do_wakeup;
 ssize_t ret;
 struct iovec *iov = (struct iovec *)_iov;
 size_t total_len;
 //待读数据长度
 total_len = iov_length(iov, nr_segs);
 /* Null read succeeds. */
 if (unlikely(total_len == 0))
  return 0;
 do_wakeup = 0;
 ret = 0;
 mutex_lock(&inode->i_mutex);
 pipe = inode->i_pipe;
 for (;;) {
  // 获取管道大小,如果待读数据包含的缓冲区个数为0,证明管道中并没有待读数据
  int bufs = pipe->nrbufs;
  if (bufs) {
   //管道大小>0
   //获得第一个数据所在的管道索引
   int curbuf = pipe->curbuf;
   //第一个数据所在的缓冲区,是16个中的一个
   struct pipe_buffer *buf = pipe->bufs + curbuf;
   const struct pipe_buf_operations *ops = buf->ops;
   void *addr;
   //缓冲中有效数据的长度
   size_t chars = buf->len;
   int error, atomic;
   //如果有效数据长度>待读长度
   if (chars > total_len)
    chars = total_len;
   error = ops->pin(pipe, buf);
   if (error) {
    if (!ret)
     error = ret;
    break;
   }
   atomic = !iov_fault_in_pages_write(iov, chars);
redo:
 
   addr = ops->map(pipe, buf, atomic);
   
   //具体拷贝过程
   error = pipe_iov_copy_to_user(iov, addr + buf->offset, chars, atomic);
   
   ops->unmap(pipe, buf, addr);
   
   if (unlikely(error)) {
    /*
     * Just retry with the slow path if we failed.
     */
    if (atomic) {
     atomic = 0;
     goto redo;
    }
    if (!ret)
     ret = error;
    break;
   }
   ret += chars;
   buf->offset += chars;
   buf->len -= chars;
   if (!buf->len) {
    //如果这个缓冲空了,则读下一个缓冲
    buf->ops = NULL;
    //如果这个缓冲空了,释放对应的页
    ops->release(pipe, buf);
    //到下一个缓冲区
    curbuf = (curbuf + 1) & (PIPE_BUFFERS-1);
    pipe->curbuf = curbuf;
    //有效数据缓冲区个数减一
    pipe->nrbufs = --bufs;
    do_wakeup = 1;
   }
   total_len -= chars;
   if (!total_len)
    //如果待读数据都读出
    break; /* common path: read succeeded */
   //如果total_len不为0,证明并没有获得全部数据
  }
  if (bufs) /* More to do? */
   //其他缓冲区还有有效数据,继续循环读下一个缓冲区
   continue;
  if (!pipe->writers)
   break;
  //有没有等待的写者
  if (!pipe->waiting_writers) {
   /* syscall merging: Usually we must not sleep
    * if O_NONBLOCK is set, or if we got some data.
    * But if a writer sleeps in kernel space, then
    * we can wait for that data without violating POSIX.
    */
   
   //没有等待的写者
   
   if (ret)
    //已经读了一部分数据,返回读出的数据长度
    break;
   //没有读过数据
   
   if (filp->f_flags & O_NONBLOCK) {
    //不是阻塞的
    ret = -EAGAIN;
    break;
   }
   //且可以阻塞
  }
  //有写者等待
  
  if (signal_pending(current)) {
   if (!ret)
    ret = -ERESTARTSYS;
   break;
  }
  if (do_wakeup) {
   wake_up_interruptible_sync(&pipe->wait);
    kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
  }
  //读者等待
  pipe_wait(pipe);
 }
 mutex_unlock(&inode->i_mutex);
 /* Signal writers asynchronously that there is more room. */
 if (do_wakeup) {
  wake_up_interruptible(&pipe->wait);
  kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
 }
 if (ret > 0)
  file_accessed(filp);
 return ret;
}
 
结合上图应该不难看明白
 
3.管道的写:
过程如下:
static ssize_t
pipe_write(struct kiocb *iocb, const struct iovec *_iov,
     unsigned long nr_segs, loff_t ppos)
{
 struct file *filp = iocb->ki_filp;
 struct inode *inode = filp->f_path.dentry->d_inode;
 struct pipe_inode_info *pipe;
 ssize_t ret;
 int do_wakeup;
 struct iovec *iov = (struct iovec *)_iov;
 size_t total_len;
 ssize_t chars;
 //要写入的长度
 total_len = iov_length(iov, nr_segs);
 /* Null write succeeds. */
 if (unlikely(total_len == 0))
  return 0;
 do_wakeup = 0;
 ret = 0;
 mutex_lock(&inode->i_mutex);
 pipe = inode->i_pipe;
 if (!pipe->readers) {
  //如果没有读进程,就向当前进程发送一个SIGPIPE信号,这就是"Broken pipe(损坏的管道)"消息
  send_sig(SIGPIPE, current, 0);
  ret = -EPIPE;
  goto out;
 }
 /* We try to merge small writes */
 //写的数据的长度不能超过一个页面
 chars = total_len & (PAGE_SIZE-1); /* size of the last buffer */
 if (pipe->nrbufs && chars != 0) {
  //第一个有效数据的管道号+包含有效数据的管道数目-1=最后一个非空管道缓冲区
  int lastbuf = (pipe->curbuf + pipe->nrbufs - 1) &
       (PIPE_BUFFERS-1);
  struct pipe_buffer *buf = pipe->bufs + lastbuf;
  const struct pipe_buf_operations *ops = buf->ops;
  int offset = buf->offset + buf->len;
  if (ops->can_merge && offset + chars <= PAGE_SIZE) {
   //如果最后一个非空缓冲区中有足够的空间存放要写的数据
   
   int error, atomic = 1;
   void *addr;
   error = ops->pin(pipe, buf);
   if (error)
    goto out;
   iov_fault_in_pages_read(iov, chars);
redo1:
   //将数据拷贝到缓冲区中
   addr = ops->map(pipe, buf, atomic);
   error = pipe_iov_copy_from_user(offset + addr, iov,
       chars, atomic);
   ops->unmap(pipe, buf, addr);
   ret = error;
   do_wakeup = 1;
   if (error) {
    if (atomic) {
     atomic = 0;
     goto redo1;
    }
    goto out;
   }
   buf->len += chars;
   total_len -= chars;
   ret = chars;
   if (!total_len)
    goto out;
   //如果还有数据没有写入
  }
 }

 //向一个空缓冲区写
 for (;;) {
  int bufs;
  if (!pipe->readers) {
   send_sig(SIGPIPE, current, 0);
   if (!ret)
    ret = -EPIPE;
   break;
  }
  bufs = pipe->nrbufs;
  if (bufs < PIPE_BUFFERS) {
   //第一个空缓冲区
   int newbuf = (pipe->curbuf + bufs) & (PIPE_BUFFERS-1);
   struct pipe_buffer *buf = pipe->bufs + newbuf;
   struct page *page = pipe->tmp_page;
   char *src;
   int error, atomic = 1;
   if (!page) {
    page = alloc_page(GFP_HIGHUSER);
    if (unlikely(!page)) {
     ret = ret ? : -ENOMEM;
     break;
    }
    pipe->tmp_page = page;
   }
   /* Always wake up, even if the copy fails. Otherwise
    * we lock up (O_NONBLOCK-)readers that sleep due to
    * syscall merging.
    * FIXME! Is this really true?
    */
   do_wakeup = 1;
   chars = PAGE_SIZE;
   //写的数据最大为一个页
   if (chars > total_len)
    chars = total_len;
   iov_fault_in_pages_read(iov, chars);
redo2:
   //永久映射
   if (atomic)
    src = kmap_atomic(page, KM_USER0);
   else
    
    src = kmap(page);
   //将数据拷贝到缓冲区
   error = pipe_iov_copy_from_user(src, iov, chars,
       atomic);
   //释放映射
   if (atomic)
    kunmap_atomic(src, KM_USER0);
   else
    kunmap(page);
   if (unlikely(error)) {
    if (atomic) {
     atomic = 0;
     goto redo2;
    }
    if (!ret)
     ret = error;
    break;
   }
   ret += chars;
   /* Insert it into the buffer array */
   buf->page = page;
   buf->ops = &anon_pipe_buf_ops;
   buf->offset = 0;
   buf->len = chars;
   pipe->nrbufs = ++bufs;
   pipe->tmp_page = NULL;
   total_len -= chars;
   if (!total_len)
    break;
  }
  if (bufs < PIPE_BUFFERS)
   //如果还用空缓冲区并且还有数据进行传递,进行下一次循环
   continue;
  //运行到这说明没有空缓冲区了
  if (filp->f_flags & O_NONBLOCK) {
   if (!ret)
    ret = -EAGAIN;
   break;
  }
  if (signal_pending(current)) {
   if (!ret)
    ret = -ERESTARTSYS;
   break;
  }
  if (do_wakeup) {
   wake_up_interruptible_sync(&pipe->wait);
   kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
   do_wakeup = 0;
  }
  //写者等待
  pipe->waiting_writers++;
  pipe_wait(pipe);
  pipe->waiting_writers--;
 }
out:
 mutex_unlock(&inode->i_mutex);
 if (do_wakeup) {
  wake_up_interruptible(&pipe->wait);
  kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
 }
 if (ret > 0)
  file_update_time(filp);
 return ret;
}

阅读(1822) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~