分类: LINUX
2010-05-18 21:13:39
该文章可以任意转载,但请保留出处:http://rainfish.cublog.cn/
文件:
cpppipe.rar
大小:
10KB
下载:
下载
by rainfish
头两天在cu上看到有人在问关于pipe的问题,发现自己也是一样,不是很清楚,于是google一些相关的资料,有一些了理解,并写了一个封装有名管道的类,拿出来和朋友们共同探讨一下,有问题或者您的看法请与我联系, EMAIL,MSN:lizp.net#gmail.com(#用@代替)。
Cpppipe分为两个类,对有名管道读操作类CPipeReader,对有名管道写操作类CPipeWriter。其中CPipeReader的读操作使用了epoll, 当有数据可读时调用相应的回调函数,性能很高。
class CPipeWriter
{
public:
CPipeWriter();
~CPipeWriter();
// 0 success -1 mkfifo failed -2 open failed
//第一个参数就是有名管道的名字,第二个参数暂不使用
int open(const char *fifo_name, int fifo_mode=0);
int close();
//向管道写数据
int write(const char* data_ptr, int data_len);
protected:
private:
int _pipe_fd;
};
class CPipeReader
{
public:
//ptr is null and ptr_len ==0: the writing end closed
//回调函数
typedef int (*FunReadPipeCallBack)(const void* owner_ptr, const char* ptr, int ptr_len);
CPipeReader();
~CPipeReader();
// 0 success -2 open failed
//打开有名管道,
int open(const char *fifo_name, int fifo_mode=0);
int close();
//设置回调函数,使用方法见demo
void set_read_pipe_fun(void* owner_ptr, FunReadPipeCallBack funReadPipe);
protected:
//该线程执行epoll操作
static void* pipe_poll(void*);
void pipe_poll_fun();
pthread_t _pipe_poll_t;
// >0 success 0 the writing end closed or didn't open -1 no data
//对管道的一次读操作
int read_one();
private:
int _pipe_fd;
void* _owner_ptr;
FunReadPipeCallBack _funReadPipe;
};
源代码:
//.h
/***********************************************************************
** Copyright (c)2010, lizp.net
** All rights reserved.
**
** File name : cpppipe.h
** Author : lizp ()
** Date : 2010-5-18 下午 05:45:41
** Comments : 使用管道、epoll实现进程间通信,对管道的操作已经封装,只有简单的api操作,
尤其是对于管道的读操作,只需把函数注册进去即可,有数据可读时会回调该函数
***********************************************************************/
#ifndef _CPPPIPE_H_2010_05_18_
#define _CPPPIPE_H_2010_05_18_
#include
#defile INFO_PRT printf
class CPipeWriter
{
public:
CPipeWriter();
~CPipeWriter();
// 0 success -1 mkfifo failed -2 open failed
int open(const char *fifo_name, int fifo_mode=0);
int close();
int write(const char* data_ptr, int data_len);
protected:
private:
int _pipe_fd;
};
class CPipeReader
{
public:
//ptr is null and ptr_len ==0: the writing end closed
typedef int (*FunReadPipeCallBack)(const void* owner_ptr, const char* ptr, int ptr_len);
CPipeReader();
~CPipeReader();
// 0 success -2 open failed
int open(const char *fifo_name, int fifo_mode=0);
int close();
void set_read_pipe_fun(void* owner_ptr, FunReadPipeCallBack funReadPipe);
protected:
static void* pipe_poll(void*);
void pipe_poll_fun();
pthread_t _pipe_poll_t;
// >0 success 0 the writing end closed or didn't open -1 no data
int read_one();
private:
int _pipe_fd;
void* _owner_ptr;
FunReadPipeCallBack _funReadPipe;
};
#endif
//.cpp
#include
#include
#include
#include
#include
#include "cpppipe.h"
#include "comlog.h"
CPipeWriter::CPipeWriter()
{
_pipe_fd = -1;
}
CPipeWriter::~CPipeWriter()
{
close();
}
int CPipeWriter::open(const char *fifo_name, int fifo_mode)
{
if ( NULL == fifo_name )
return -1;
unlink(fifo_name);//O_CREAT|O_EXCL
if( (mkfifo(fifo_name, 0777)<0) && (errno != EEXIST) )
{
INFO_PRT("mkfifo(%s) failed! %s\n", fifo_name, strerror(errno));
LOGERR(NULL, 0, "mkfifo(%s) failed! %s\n", fifo_name, strerror(errno));
return -1;
}
//cann't open the pipe with _O_WRONLY, I don't know why.
_pipe_fd = ::open(fifo_name, O_RDWR|O_NONBLOCK, 0);
if ( -1 == _pipe_fd )
{
INFO_PRT("open(%s) failed! %s\n", fifo_name, strerror(errno));
LOGERR(NULL, 0, "open(%s) failed! %s\n", fifo_name, strerror(errno));
return -2;
}
return 0;
}
int CPipeWriter::close()
{
int ret = 0;
if ( -1 != _pipe_fd )
{
ret = ::close(_pipe_fd);
_pipe_fd = -1;
}
return ret;
}
int CPipeWriter::write(const char* data_ptr, int data_len)
{
if ( NULL == data_ptr || data_len < 0 )
return -1;
int write_num = 0;
while (1)
{
write_num = ::write(_pipe_fd, data_ptr, data_len);
if ( -1 == write_num && EAGAIN == errno )
continue;
break;
}
return write_num;
}
CPipeReader::CPipeReader()
{
_pipe_fd = -1;
_owner_ptr = NULL;
_funReadPipe = NULL;
}
CPipeReader::~CPipeReader()
{
close();
}
void CPipeReader::set_read_pipe_fun(void* owner_ptr, FunReadPipeCallBack funReadPipe)
{
_owner_ptr = owner_ptr;
_funReadPipe = funReadPipe;
}
int CPipeReader::open(const char *fifo_name, int fifo_mode)
{
if ( NULL == fifo_name )
return -1;
_pipe_fd = ::open(fifo_name, O_RDONLY|O_NONBLOCK, 0);
if ( -1 == _pipe_fd )
{
INFO_PRT("open(%s) failed! %s\n", fifo_name, strerror(errno));
LOGERR(NULL, 0, "open(%s) failed! %s\n", fifo_name, strerror(errno));
return -2;
}
// read once, and check the writing end opened whether or not.
char szbuf[4096];
int buf_len = 4096;
int read_size = 0;
memset(szbuf, 0, buf_len);
read_size = read(_pipe_fd, szbuf, buf_len);
if ( 0 == read_size )
{
this->close();
return -2;
}
else if ( read_size > 0 )
{
if ( NULL != _funReadPipe )
_funReadPipe(_owner_ptr, szbuf, read_size);
}
pthread_create(&_pipe_poll_t, NULL, pipe_poll, this);
return 0;
}
int CPipeReader::close()
{
int ret = 0;
if ( -1 != _pipe_fd )
{
ret = ::close(_pipe_fd);
_pipe_fd = -1;
}
return ret;
}
void* CPipeReader::pipe_poll(void* class_ptr)
{
pthread_detach(pthread_self());
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,NULL);
((CPipeReader*)class_ptr)->pipe_poll_fun();
return NULL;
}
void CPipeReader::pipe_poll_fun()
{
char szbuf[4096];
int read_size = 0;
int ret = 0;
//first, read the data is already exist from the pipe
while(1)
{
ret = read_one();
if ( 0 == ret )
return;
if ( -1 == ret )
break;
}
struct epoll_event ev, events[10];
ev.data.fd = _pipe_fd;
ev.events = EPOLLIN|EPOLLET|EPOLLHUP;
int epfd = epoll_create(10);
epoll_ctl(epfd, EPOLL_CTL_ADD, _pipe_fd, &ev);
for (;;) {
int ret = epoll_wait(epfd, events, 10, 3000);
if (ret == 0) {
ret = read_one();
if ( 0 == ret )
break;
continue;
}
int i = 0;
for (i = 0; i < ret; i++) {
if (events[i].events & EPOLLIN)
{
while(1)
{
ret = read_one();
if ( ret <= 0 )
break;
}
if ( 0 == ret )
break;
}
else if (events[i].events & EPOLLOUT)
printf("EPOLLOUT");
else if (events[i].events & EPOLLHUP)
{
INFO_PRT("EPOLLHUP");
if ( NULL != _funReadPipe )
_funReadPipe(_owner_ptr, NULL, 0);
break;
}
else
printf("unkown");
}
}
::close(epfd);
}
int CPipeReader::read_one()
{
char szbuf[4096];
int buf_len = 4096;
int read_size = 0;
memset(szbuf, 0, buf_len);
read_size = read(_pipe_fd, szbuf, buf_len);
INFO_PRT("read_size:%d <= 0", read_size);
if ( read_size == 0 )//the remotion closed
{
if ( NULL != _funReadPipe )
_funReadPipe(_owner_ptr, NULL, 0);
return read_size;
}
if ( read_size <= 0 )
return read_size;
if ( NULL != _funReadPipe )
_funReadPipe(_owner_ptr, szbuf, read_size);
return read_size;
}
//demo
//writer
#include
#include "cpppipe.h"
int main()
{
CPipeWriter pipe_writer;
//CPipeReader pipe_reader;
int ret = pipe_writer.open("/tmp/abcdefli");
printf("pipe_writer.open ret:%d\n", ret);
if ( ret < 0 )
return ret;
char ptr[256] = {0};
int ptr_len = strlen(ptr);
int total_len = 0;
int i = 0;
while(1)
{
sprintf(ptr, "hello, pipe:%d", i++);
ptr_len = strlen(ptr);
int ret = pipe_writer.write(ptr, ptr_len);
if ( ret == -1 )
break;
total_len += ptr_len;
printf("total_len is %d\n", total_len);
sleep(1);
}
printf("total_len is %d\n", total_len);
while (1)
{
sleep(1);
}
return 0;
}
//reader