Chinaunix首页 | 论坛 | 博客
  • 博客访问: 356897
  • 博文数量: 46
  • 博客积分: 4936
  • 博客等级: 上校
  • 技术积分: 575
  • 用 户 组: 普通用户
  • 注册时间: 2008-12-25 20:14
文章分类

全部博文(46)

文章存档

2012年(4)

2011年(1)

2010年(23)

2009年(18)

分类: LINUX

2010-05-18 21:13:39

文件: cpppipe.rar
大小: 10KB
下载: 下载
该文章可以任意转载,但请保留出处:http://rainfish.cublog.cn/

by rainfish

        

头两天在cu上看到有人在问关于pipe的问题,发现自己也是一样,不是很清楚,于是google一些相关的资料,有一些了理解,并写了一个封装有名管道的类,拿出来和朋友们共同探讨一下,有问题或者您的看法请与我联系, EMAIL,MSN:lizp.net#gmail.com(#@代替)

         Cpppipe分为两个类,对有名管道读操作类CPipeReader,对有名管道写操作类CPipeWriter。其中CPipeReader的读操作使用了epoll 当有数据可读时调用相应的回调函数,性能很高。

class CPipeWriter

{

public:

         CPipeWriter();

         ~CPipeWriter();

 

         // 0 success -1 mkfifo failed -2 open failed

//第一个参数就是有名管道的名字,第二个参数暂不使用

         int open(const char *fifo_name, int fifo_mode=0);

         int close();

//向管道写数据

         int write(const char* data_ptr, int data_len);

protected:

private:

         int _pipe_fd;

};

 

 

 

class CPipeReader

{

public:

         //ptr is null and ptr_len ==0: the writing end closed

         //回调函数

         typedef int (*FunReadPipeCallBack)(const void* owner_ptr, const char* ptr, int ptr_len);

 

         CPipeReader();

         ~CPipeReader();

 

         // 0 success  -2 open failed

         //打开有名管道,

         int open(const char *fifo_name, int fifo_mode=0);

         int close();

         //设置回调函数,使用方法见demo

         void set_read_pipe_fun(void* owner_ptr, FunReadPipeCallBack funReadPipe);

 

protected:

//该线程执行epoll操作

         static void* pipe_poll(void*);

         void pipe_poll_fun();

         pthread_t _pipe_poll_t;

 

         // >0 success 0 the writing end closed or didn't open -1 no data

         //对管道的一次读操作

         int read_one();

private:

         int _pipe_fd;

         void* _owner_ptr;

         FunReadPipeCallBack _funReadPipe;

};

 

 

源代码:

//.h
/***********************************************************************
 ** Copyright (c)2010, lizp.net
 ** All rights reserved.
 **
 ** File name  : cpppipe.h
 ** Author     : lizp ()
 ** Date       : 2010-5-18 下午 05:45:41
 ** Comments   :  使用管道、epoll实现进程间通信,对管道的操作已经封装,只有简单的api操作,
 尤其是对于管道的读操作,只需把函数注册进去即可,有数据可读时会回调该函数
 ***********************************************************************/

#ifndef _CPPPIPE_H_2010_05_18_
#define _CPPPIPE_H_2010_05_18_

#include
#defile INFO_PRT printf

class CPipeWriter
{
public:
 CPipeWriter();
 ~CPipeWriter();

 // 0 success -1 mkfifo failed -2 open failed
 int open(const char *fifo_name, int fifo_mode=0);
 int close();
 int write(const char* data_ptr, int data_len);
protected:
private:
 int _pipe_fd;
};

class CPipeReader
{
public:
 //ptr is null and ptr_len ==0: the writing end closed
 typedef int (*FunReadPipeCallBack)(const void* owner_ptr, const char* ptr, int ptr_len);

 CPipeReader();
 ~CPipeReader();

 // 0 success  -2 open failed
 int open(const char *fifo_name, int fifo_mode=0);
 int close();
 void set_read_pipe_fun(void* owner_ptr, FunReadPipeCallBack funReadPipe);

protected:
 static void* pipe_poll(void*);
 void pipe_poll_fun();
 pthread_t _pipe_poll_t;

 // >0 success 0 the writing end closed or didn't open -1 no data
 int read_one();
private:
 int _pipe_fd;
 void* _owner_ptr;
 FunReadPipeCallBack _funReadPipe;
};


#endif


 

//.cpp
#include
#include
#include
#include
#include

#include "cpppipe.h"
#include "comlog.h"

CPipeWriter::CPipeWriter()
{
 _pipe_fd = -1;
}

CPipeWriter::~CPipeWriter()
{
 close();
}


int CPipeWriter::open(const char *fifo_name, int fifo_mode)
{

 if ( NULL == fifo_name )
  return -1;

 unlink(fifo_name);//O_CREAT|O_EXCL
 if( (mkfifo(fifo_name, 0777)<0) && (errno != EEXIST) )
 {
  INFO_PRT("mkfifo(%s) failed! %s\n", fifo_name, strerror(errno));
  LOGERR(NULL, 0, "mkfifo(%s) failed! %s\n", fifo_name, strerror(errno));
  return -1;
 }

 //cann't open the pipe with _O_WRONLY, I don't know why.
 _pipe_fd = ::open(fifo_name, O_RDWR|O_NONBLOCK, 0);
 if ( -1 == _pipe_fd )
 {
  INFO_PRT("open(%s) failed! %s\n", fifo_name, strerror(errno));
  LOGERR(NULL, 0, "open(%s) failed! %s\n", fifo_name, strerror(errno));
  return -2;
 }

 return 0;
}

int CPipeWriter::close()
{
 int ret = 0;
 if ( -1 != _pipe_fd )
 {
  ret = ::close(_pipe_fd);
  _pipe_fd = -1;
 }
 
 return ret;
}

int CPipeWriter::write(const char* data_ptr, int data_len)
{

 if ( NULL == data_ptr || data_len < 0 )
  return -1;

 int write_num = 0;
 
 while (1)
 {
  write_num = ::write(_pipe_fd, data_ptr, data_len);
  if ( -1 == write_num && EAGAIN == errno )
   continue;
   
  break;
 }

 return write_num;
}

CPipeReader::CPipeReader()
{
 _pipe_fd = -1;
 _owner_ptr = NULL;
 _funReadPipe = NULL;
}

CPipeReader::~CPipeReader()
{
 close();
}

void CPipeReader::set_read_pipe_fun(void* owner_ptr, FunReadPipeCallBack funReadPipe)
{
 _owner_ptr = owner_ptr;
 _funReadPipe = funReadPipe;
}

int CPipeReader::open(const char *fifo_name, int fifo_mode)
{

 if ( NULL == fifo_name )
  return -1;

 _pipe_fd = ::open(fifo_name, O_RDONLY|O_NONBLOCK, 0);
 if ( -1 == _pipe_fd )
 {
  INFO_PRT("open(%s) failed! %s\n", fifo_name, strerror(errno));
  LOGERR(NULL, 0, "open(%s) failed! %s\n", fifo_name, strerror(errno));
  return -2;
 }

 // read once, and check the writing end opened whether or not.
 char szbuf[4096];
 int buf_len = 4096;
 int read_size = 0;
 
 memset(szbuf, 0, buf_len);
 read_size = read(_pipe_fd, szbuf, buf_len);
 if ( 0 == read_size )
 {
  this->close();
  return -2;
 }
 else if ( read_size > 0 )
 {
  if ( NULL != _funReadPipe )
   _funReadPipe(_owner_ptr, szbuf, read_size);
 }

 pthread_create(&_pipe_poll_t, NULL, pipe_poll, this);

 return 0;
}

int CPipeReader::close()
{
 int ret = 0;
 if ( -1 != _pipe_fd )
 {
  ret = ::close(_pipe_fd);
  _pipe_fd = -1;
 }

 return ret;
}

void* CPipeReader::pipe_poll(void* class_ptr)
{
 pthread_detach(pthread_self());
 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,NULL);
 
 ((CPipeReader*)class_ptr)->pipe_poll_fun();

 return NULL;
}

void CPipeReader::pipe_poll_fun()
{

 char szbuf[4096];
 int read_size = 0;
 int ret = 0;

 //first, read the data is already exist from the pipe
 while(1)
 {  
  ret = read_one();
  if ( 0 == ret )
   return;
  if ( -1 == ret )
   break;
 } 

 struct epoll_event ev, events[10];
 ev.data.fd = _pipe_fd;
 ev.events = EPOLLIN|EPOLLET|EPOLLHUP;
 int epfd = epoll_create(10);
 epoll_ctl(epfd, EPOLL_CTL_ADD, _pipe_fd, &ev);
 for (;;) {
  int ret = epoll_wait(epfd, events, 10, 3000);
  if (ret == 0) {

   ret = read_one();
   if ( 0 == ret )
    break;

   continue;
  }
  
  int i = 0;
  for (i = 0; i < ret; i++) {
   if (events[i].events & EPOLLIN)
   {
    while(1)
    {
     ret = read_one();
     if ( ret <= 0 )
      break;
    }

    if ( 0 == ret )
     break;
   }
   else if (events[i].events & EPOLLOUT)
    printf("EPOLLOUT");
   else if (events[i].events & EPOLLHUP)
   {
    INFO_PRT("EPOLLHUP");
    if ( NULL != _funReadPipe )
     _funReadPipe(_owner_ptr, NULL, 0);
    break;
   }
   else
    printf("unkown");
  }
 }

 ::close(epfd);
}

int CPipeReader::read_one()
{
 char szbuf[4096];
 int buf_len = 4096;
 int read_size = 0;

 memset(szbuf, 0, buf_len);
 read_size = read(_pipe_fd, szbuf, buf_len);
 INFO_PRT("read_size:%d <= 0", read_size);
 if ( read_size == 0 )//the remotion closed
 {
  if ( NULL != _funReadPipe )
   _funReadPipe(_owner_ptr, NULL, 0);
  return read_size;
 }

 if ( read_size <= 0 )
  return read_size;

 if ( NULL != _funReadPipe )
  _funReadPipe(_owner_ptr, szbuf, read_size);

 return read_size;
}

 

//demo

//writer



#include

#include "cpppipe.h"

int main()
{
 CPipeWriter pipe_writer;
 //CPipeReader pipe_reader;

 int ret = pipe_writer.open("/tmp/abcdefli");
 printf("pipe_writer.open ret:%d\n", ret);
 if ( ret < 0 )
  return ret;

 char ptr[256] = {0};
 int ptr_len = strlen(ptr);
 int total_len = 0;
 int i = 0;
 while(1)
 {
  sprintf(ptr, "hello, pipe:%d", i++);
  ptr_len = strlen(ptr);
  int ret = pipe_writer.write(ptr, ptr_len);
  if ( ret == -1 )
   break;

  total_len += ptr_len;

  printf("total_len is %d\n", total_len);

  sleep(1);
 }

 printf("total_len is %d\n", total_len);


 while (1)
 {
  sleep(1);
 }
 return 0;
}

 

 

//reader


#include
#include "cpppipe.h"
int read_pipe(const void* owner_ptr, const char* ptr, int ptr_len)
{
 INFO_PRT("ptr is %d:%s\n", ptr_len, ptr);
}
class COpPipe
{
public:
 static int read_pipe(const void* owner_ptr, const char* ptr, int ptr_len)
 {
  if ( NULL != owner_ptr )
   ((COpPipe*)owner_ptr)->print_str(ptr, ptr_len);
 }
 void print_str(const char* ptr, int ptr_len)
 {
  if ( ptr == NULL && ptr_len == 0 )
  {
   INFO_PRT("the writing pipe is closed!\n");
   return;
  }
  
  INFO_PRT("in COpPipe, ptr is %d:%s\n", ptr_len, ptr);
 }
};
int main()
{
 CPipeReader pipe_reader;
 COpPipe op_pipe;
 //设置读操作的回调函数
 pipe_reader.set_read_pipe_fun(&op_pipe, COpPipe::read_pipe);
 //pipe_reader.set_read_pipe_fun(NULL, read_pipe);
 int ret = 0;
 ret = pipe_reader.open("/tmp/abcdefli");
 printf("pipe_reader.open ret:%d\n", ret);
 if ( ret < 0 )
  return ret;

 while (1)
 {
  sleep(1);
 }
 return 0;
}
 
 

 

阅读(3132) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~