Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1852764
  • 博文数量: 317
  • 博客积分: 1557
  • 博客等级: 上尉
  • 技术积分: 1208
  • 用 户 组: 普通用户
  • 注册时间: 2008-02-26 23:38
个人简介

如果想出发,就不要等到明天!

文章分类

全部博文(317)

文章存档

2016年(1)

2015年(41)

2014年(152)

2013年(114)

2012年(4)

2011年(1)

2009年(4)

分类: LINUX

2014-08-27 10:41:36

1. 同select比较
    对于select,linux/posix_types.h头文件有这样的宏:#define __FD_SETSIZE  1024;
    select能监听的最大fd数量,在内核编译的时候就确定了的。若想加大fd的数量,需要调整编译参数,重新编译内核。
    epoll_wait直接返回被触发的fd或对应的一块buffer,不需要遍历所有的fd。
    epoll的实现中,用户空间跟内核空间共享内存(mmap),避免拷贝。

2. 基本的数据结构
    typedef union epoll_data { //注意是union
       
void *ptr;
       
int fd;
       
uint32_t u32;
       
uint64_t u64;
   
} epoll_data_t;

    
struct epoll_event {
       
uint32_t events; /* Epoll events */
       
epoll_data_t data; /* User data variable */
    
};

3. 基本操作
3.1 加入到epoll监控
    struct epoll_event ee;
    ee.events = EPOLLIN|EPOLLOUT|EPOLLET; //不设置EPOLLET的话,为LT模式
    ee.data.ptr = (void *) buffer;//传入buffer地址
    epoll_ctl(epfd, EPOLL_CTL_ADD, socketfd, &ee);

3.2 从epoll监控中删除
   struct epoll_event ee;
   
ee.events = 0;
   
ee.data.ptr = NULL;
   
epoll_ctl(epfd, EPOLL_CTL_DEL, socketfd, &ee);

3.3 等待事件的触发
    for (;;){
        nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
        if (nfds == -1){    
            if (errno == EINTR)
                continue; // 中断
            perror("epoll_pwait");
            exit (EXIT_FAILURE);
        }

        for (n = 0; n < nfds; ++n){
            // events[n].events; //是什么事件 EPOLLIN/EPOLLOUT/EPOLLERR/EPOLLPRI
            // events[n].data.fd;//被触发的fd(在epoll_ctrl时传入的)
            // events[n].data.ptr;//跟fd相关联的buffer(在epoll_ctrl时传入的)
            if( events[n].events & EPOLLIN ) { }
            else if( events[n].events & EPOLLOUT ) { }
            else if( events[n].events & EPOLLERR ) { }
            else {}
        }
}

4. 读和写
    epoll有2种模式, 水平触发(LT)&边缘触发(ET)。
4.1 LT模式下的读和写
    读:若接收缓冲区中有数据,就会一直触发EPOLLIN。所以不用担心是否读干净缓冲区中的数据,若没有读完的话,EPOLL会一直被触发。
    写:若发送缓冲区中还有空间的话,会一直触发EPOLLOUT(只有在发送缓冲区满的时候,才不会被触发)。所以这个很烦,本来没有数据需要发送,但EPOLLOUT仍然被触发。解决方法:
    (1)epoll_ctl(, EPOLL_CTL_ADD, fd),将该fd加入到epoll中;
    (2)该fd的EPOLLOUT被触发,发送数据(一般在回调函数中完成);
    (3)epoll_ctl(epfd, EPOLL_CTL_DEL, fd),将该fd移出epoll。加入又移出的,很麻烦,又有开销。若数据比较少的时候,直接调用send发送好了。
4.2 ET模式下的读和写
    ET模式下,是不能用阻塞式的函数的。
    读:fd由不可读->可读(接收缓冲区没有数据->有数据)时,才会触发EPOLLIN。若没有读干净,read返回,缓冲区仍然有数据时, 不会再次触发。
    n=read(fd, buf, buf_size);
    if(n<0){
        if (errno == EAGAIN)
            continue;
        else
            return -1; //errno, should close the fd
    }
    else if(n < buf_size){
        // 读完了缓冲区中的数据,可以返回
        return n;
    }
    else if(n == buf_size){
        // 缓冲区中可能还有数据,继续读
        continue;
    }
    写:fd由不可写->可写(发送缓冲区满->有空间)时,才会触发EPOLLOUT。

5. 一个异步模型
    (1)将要监控的fd加入到epoll的监控中,每一个fd对应一个事件处理类(EventHanlder),当fd被触发时,处理类中的相关函数被调用。
    (2)用epoll可以实现定时器。依赖于epoll_wait的超时机制。该定时器存在误差

6. 代码

reactor.h

  1. #pragma once

  2. #include <time.h>
  3. #include <map>
  4. #include <sys/time.h>
  5. #include <fcntl.h>
  6. #include "min_heap.h"

  7. class FDEventHanderBase;
  8. class FDEventHanderSet;
  9. class TimerEventHanderBase;

  10. class NonCopyable
  11. {
  12. protected:
  13.     NonCopyable() {
  14.     }
  15.     virtual ~NonCopyable() {
  16.     }
  17. private:
  18.     NonCopyable(const NonCopyable &); //禁止拷贝
  19.     const NonCopyable & operator=(const NonCopyable &); //禁止赋值
  20. };

  21. class Reactor: public NonCopyable
  22. {
  23. public:
  24.     Reactor() {
  25.     }

  26.     virtual ~Reactor() {
  27.     }

  28.     virtual bool Init() = 0;
  29.     virtual bool Run() = 0;
  30.     virtual bool Stop() = 0;
  31.     virtual int RegisterReadEvent(FDEventHanderBase* pBase) = 0;
  32.     virtual int RegisterWriteEvent(FDEventHanderBase* pBase) = 0;
  33.     virtual int UnRegisterReadEvent(FDEventHanderBase* pBase) = 0;
  34.     virtual int UnRegisterWriteEvent(FDEventHanderBase* pBase) = 0;
  35.     virtual int RegisterTimer(TimerEventHanderBase* pBase) = 0;
  36.     virtual int UnRegisterTimer(TimerEventHanderBase* pBase) = 0;
  37. };
  38. /* **********************************
  39.  *             处理FD类的基类
  40.  * 当该fd被触发时,调用处理函数来进行处理
  41.  * ***********************************/
  42. class FDEventHanderBase: public NonCopyable
  43. {
  44. public:
  45.     FDEventHanderBase(int fd, Reactor* pReactor) :
  46.         m_fd(fd), m_pReactor(pReactor)
  47.     {
  48.     }
  49.     virtual ~FDEventHanderBase() {
  50.     }
  51.     // 读被触发时,调用该函数
  52.     virtual void OnFDRead() = 0;
  53.     // 写被触发时,调用该函数
  54.     virtual void OnFDWrite() = 0;
  55.     // 关闭fd
  56.     virtual void Close() = 0;
  57.     // 注册读事件
  58.     int RegisterReadEvent();
  59.     // 注册写事件
  60.     int RegisterWriteEvent();
  61.     // 注销读事件
  62.     int UnRegisterReadEvent();
  63.     // 注销写事件
  64.     int UnRegisterWriteEvent();
  65.     int GetFD() const;
  66.     void SetFD(int fd);
  67.     // 设置非阻塞,返回0,success.
  68.     int SetNonBlock(int fd, bool bNonBlock = true);
  69. protected:
  70.     int m_fd;//该handler所对应的fd.
  71.     Reactor* m_pReactor;
  72. };
  73. /* **********************************
  74.  *        处理timer类的基类
  75.  *    当timer被触发时,调用该类的函数
  76.  * **********************************/
  77. class TimerEventHanderBase: public NonCopyable
  78. {
  79. public:
  80.     TimerEventHanderBase(Reactor* reactor) :
  81.         m_pReactor(reactor), m_bRestart(false)
  82.     {
  83.         timerclear(&m_interval);
  84.         timerclear(&m_endtime);
  85.     }
  86.     virtual ~TimerEventHanderBase() {
  87.     }
  88.     virtual void OnTimeOut() = 0;
  89.     // 注册定时器,@msec,多长时间响一次,单位:毫秒
  90.     // 由于epoll精度的限制,这个地方,精度只能达到 毫秒,但不是准确
  91.     int RegisterTimer(unsigned int msec /*ms*/, bool restart = false);
  92.     int UnRegisterTimer();
  93.     int RegisterTimerAgain();
  94.     // 获取结束时间
  95.     const timeval& GetEndTime() const {
  96.         return m_endtime;
  97.     }
  98.     // 该定时器是否需要重启
  99.     bool IsRestart() const {
  100.         return m_bRestart;
  101.     }
  102. protected:
  103.     Reactor* m_pReactor;
  104.     timeval m_interval;// 时间间隔
  105.     timeval m_endtime; // 定时器到期时间
  106.     bool m_bRestart;// 是否自动重启
  107. };

  108. /* **********************************
  109.  *             处理类的集合
  110.  *     有个对应关系 -- fd -- 处理函数 -- read/write.
  111.  * *********************************/
  112. class EventHanderSet
  113. {
  114. public:
  115.     // 增加fd事件, @type : EPOLLIN, EPOLLOUT,
  116.     void AddFDEventHandler(FDEventHanderBase* pHandler, int type);
  117.     // @type : EPOLLIN, EPOLLOUT
  118.     void DelFDEventHandler(int fd, int type);
  119.     // 根据fd,找到 FDEventHanderBase,及type.
  120.     FDEventHanderBase* GetFDEventHandler(int fd, int& type);
  121. public:
  122.     void AddTimerEventHandler(TimerEventHanderBase* pHandler);
  123.     void DelTimerEventHandler(TimerEventHanderBase* pHandler);
  124.     void ScanTimer();
  125. private:
  126.     struct FDHandler
  127.     {
  128.         int type; //EPOLLIN, EPOLLOUT
  129.         FDEventHanderBase* pHandler;
  130.     };
  131.     std::map<int, FDHandler> m_fdmap; // fd -- type -- handler

  132.     // 用于保存timer,在同一时刻,可能有多个handler.
  133.     MinHeap<std::pair<long int, long int> > m_timerMinHeap;
  134.     std::multimap<std::pair<long int, long int>, TimerEventHanderBase*>
  135.      m_timerMultiMap;
  136. };

  137. //////////// LT epoll /////////////////
  138. class LTReactor: public Reactor
  139. {
  140. public:
  141.     LTReactor();
  142.     virtual ~LTReactor();

  143.     bool Init();
  144.     bool Run();
  145.     bool Stop();
  146.     //注册读事件
  147.     int RegisterReadEvent(FDEventHanderBase* pBase);
  148.     //注册写事件
  149.     int RegisterWriteEvent(FDEventHanderBase* pBase);

  150.     int UnRegisterReadEvent(FDEventHanderBase* pBase);
  151.     int UnRegisterWriteEvent(FDEventHanderBase* pBase);
  152.     int UnRegisterAllEvent(FDEventHanderBase* pBase);
  153.     // 注册定时器
  154.     int RegisterTimer(TimerEventHanderBase* pBase);
  155.     int UnRegisterTimer(TimerEventHanderBase* pBase);
  156. private:
  157.     void ScanTimer();
  158. private:
  159.     int m_epfd; //epoll's fd
  160.     bool m_bRunning;
  161.     timeval m_stoptime; //停止时间
  162.     EventHanderSet m_handlerSet;
  163. };

reactor.cpp

  1. #include "reactor.h"
  2. #include <sys/epoll.h>
  3. #include <errno.h>
  4. #include <iostream>
  5. #include <string.h>
  6. #include <stdio.h>

  7. /////////////////////////////////////////////////////////
  8. #define MAX_EPOLL_SIZE 1024
  9. #define MAX_EPOLL_EVENTS_SIZE 128

  10. LTReactor::LTReactor() :
  11.     m_epfd(-1), m_bRunning(false)
  12. {
  13. }

  14. LTReactor::~LTReactor() {
  15. }

  16. bool LTReactor::Init() {
  17.     m_epfd = epoll_create(MAX_EPOLL_SIZE);
  18.     if (m_epfd == -1)
  19.     {
  20.         std::cout << "epoll_create error: " << strerror(errno) << std::endl;
  21.         return false;
  22.     }

  23.     m_bRunning = true;
  24.     return true;
  25. }

  26. bool LTReactor::Run() {
  27.     struct epoll_event events[MAX_EPOLL_EVENTS_SIZE];
  28.     int nfds = 0; //返回被触发的事件的个数
  29.     while (true)
  30.     {
  31.         // 超时时间,若m_bRunning,立即返回;否则100ms,即0.1s. 实际上,epoll_wait,只能够精确到毫秒(1/1000)
  32.         nfds = epoll_wait(m_epfd, events, sizeof(events) / sizeof(events[0]),
  33.          m_bRunning ? 100 : 1);
  34.         // 要停止epoll.
  35.         if (m_bRunning == false)
  36.         {
  37.             timeval now;
  38.             gettimeofday(&now, NULL); //获取当前时间
  39.             if (timercmp(&now, &m_stoptime, >))
  40.             {
  41.                 break;
  42.             }
  43.         }
  44.         // epoll 出错
  45.         if (nfds == -1)
  46.         {
  47.             std::cout << "epoll_wait error: " << strerror(errno) << std::endl;
  48.             continue;
  49.         }
  50.         // 处理被触发的事件
  51.         for (int i = 0; i < nfds; ++i)
  52.         {
  53.             int type = 0;
  54.             int fd = events[i].data.fd;
  55.             if (fd < 0) //fd出错
  56.             {
  57.                 std::cout << "the FD is " << fd << std::endl;
  58.                 continue;
  59.             }
  60.             FDEventHanderBase* pBase = m_handlerSet.GetFDEventHandler(fd, type);
  61.             if (NULL == pBase)
  62.             {
  63.                 std::cout << "pBase is NULL, fd is" << fd << std::endl;
  64.                 continue;
  65.             }

  66.             if (events[i].events & EPOLLIN)//read events
  67.             {
  68.                 pBase->OnFDRead();
  69.             }
  70.             if (events[i].events & EPOLLOUT) //write events
  71.             {
  72.                 pBase->OnFDWrite();
  73.             }
  74.             if (events[i].events & EPOLLERR) // error events
  75.             {
  76.                 pBase->Close();
  77.             }
  78.         }
  79.         // 处理定时器事件
  80.         ScanTimer();
  81.         // 处理idle事件
  82.     } // end of while

  83.     std::cout << "stop epoll" << std::endl;
  84.     close(m_epfd); //close epoll fd
  85.     m_epfd = -1;

  86.     return true;
  87. }

  88. bool LTReactor::Stop() {
  89.     m_bRunning = false;

  90.     timeval val, now;
  91.     val.tv_usec = 50 * 1000; //50ms后停止
  92.     gettimeofday(&now, NULL);
  93.     timeradd(&val, &now, &m_stoptime);
  94.     return true;
  95. }
  96. // 注册读事件,
  97. // @return: 0, success. else failed.
  98. int LTReactor::RegisterReadEvent(FDEventHanderBase* pBase) {
  99.     int type = 0;
  100.     FDEventHanderBase* pHander = m_handlerSet.GetFDEventHandler(pBase->GetFD(),
  101.      type);

  102.     epoll_event event;
  103.     event.data.fd = pBase->GetFD();
  104.     event.events = type | EPOLLIN; // 注册读事件

  105.     int iRet = epoll_ctl(m_epfd, NULL == pHander ? EPOLL_CTL_ADD
  106.      : EPOLL_CTL_MOD, pBase->GetFD(), &event);
  107.     if (iRet == -1)
  108.     {
  109.         std::cout << "epoll_ctl error: " << strerror(errno) << std::endl;
  110.         return -1;
  111.     }

  112.     m_handlerSet.AddFDEventHandler(pBase, EPOLLIN);
  113.     return 0;
  114. }
  115. // 注册写事件
  116. // @return: 0, success. else failed.
  117. int LTReactor::RegisterWriteEvent(FDEventHanderBase* pBase) {
  118.     int type = 0;
  119.     FDEventHanderBase* pHandler = m_handlerSet.GetFDEventHandler(
  120.      pBase->GetFD(), type);

  121.     epoll_event event;
  122.     event.data.fd = pBase->GetFD();
  123.     event.events = type | EPOLLOUT; //注册写事件

  124.     int iRet = epoll_ctl(m_epfd, NULL == pHandler ? EPOLL_CTL_ADD
  125.      : EPOLL_CTL_MOD, pBase->GetFD(), &event);
  126.     if (iRet == -1)
  127.     {
  128.         std::cout << "epoll_ctl error: " << strerror(errno) << std::endl;
  129.         return -1;
  130.     }

  131.     m_handlerSet.AddFDEventHandler(pBase, EPOLLOUT);
  132.     return 0;
  133. }
  134. // 注销读事件
  135. // @return: 0, success. else failed.
  136. int LTReactor::UnRegisterReadEvent(FDEventHanderBase* pBase) {
  137.     int type = 0;
  138.     FDEventHanderBase* pHandler = m_handlerSet.GetFDEventHandler(
  139.      pBase->GetFD(), type);
  140.     epoll_event event;
  141.     event.data.fd = pBase->GetFD();
  142.     event.events = (type & ~EPOLLIN); // 取消读事件

  143.     int iRet = epoll_ctl(m_epfd, NULL == pHandler ? EPOLL_CTL_DEL
  144.      : EPOLL_CTL_MOD, pBase->GetFD(), &event);
  145.     if (iRet == -1)
  146.     {
  147.         std::cout << "epoll_ctl error: " << strerror(errno) << std::endl;
  148.         return -1;
  149.     }

  150.     m_handlerSet.DelFDEventHandler(pBase->GetFD(), EPOLLIN);
  151.     return 0;
  152. }

  153. // 注销写事件
  154. // @return: 0, success. else failed.
  155. int LTReactor::UnRegisterWriteEvent(FDEventHanderBase* pBase) {
  156.     int type = 0;
  157.     FDEventHanderBase* pHandler = m_handlerSet.GetFDEventHandler(
  158.      pBase->GetFD(), type);
  159.     epoll_event event;
  160.     event.data.fd = pBase->GetFD();
  161.     event.events = (type & ~EPOLLOUT); // 取消写事件

  162.     int iRet = epoll_ctl(m_epfd, NULL == pHandler ? EPOLL_CTL_DEL
  163.      : EPOLL_CTL_MOD, pBase->GetFD(), &event);
  164.     if (iRet == -1)
  165.     {
  166.         std::cout << "epoll_ctl error: " << strerror(errno) << std::endl;
  167.         return -1;
  168.     }

  169.     m_handlerSet.DelFDEventHandler(pBase->GetFD(), EPOLLOUT);
  170.     return 0;
  171. }
  172. // 注销读写事件
  173. int LTReactor::UnRegisterAllEvent(FDEventHanderBase* pBase) {
  174.     epoll_event event;
  175.     event.data.fd = pBase->GetFD();
  176.     event.events = EPOLLOUT | EPOLLIN; // 取消读写事件
  177.     int iRet = epoll_ctl(m_epfd, EPOLL_CTL_DEL, pBase->GetFD(), &event);
  178.     if (iRet == -1)
  179.     {
  180.         std::cout << "epoll_ctl error: " << strerror(errno) << std::endl;
  181.         return -1;
  182.     }

  183.     m_handlerSet.DelFDEventHandler(pBase->GetFD(), EPOLLOUT | EPOLLIN);
  184.     return 0;
  185. }
  186. // 注册定时器
  187. int LTReactor::RegisterTimer(TimerEventHanderBase* pBase) {
  188.     m_handlerSet.AddTimerEventHandler(pBase);
  189.     return 0;
  190. }
  191. // 注销定时器
  192. int LTReactor::UnRegisterTimer(TimerEventHanderBase* pBase) {
  193.     m_handlerSet.DelTimerEventHandler(pBase);
  194.     return 0;
  195. }
  196. // 扫描,看是否有定时器的时间到了
  197. void LTReactor::ScanTimer() {
  198.     m_handlerSet.ScanTimer();
  199. }

  200. /* **********************************
  201.  *             处理FD类的基类
  202.  * 当该fd被触发时,调用处理函数来进行处理
  203.  * ***********************************/
  204. // 注册读事件
  205. int FDEventHanderBase::RegisterReadEvent() {
  206.     return m_pReactor->RegisterReadEvent(this);
  207. }
  208. // 注册写事件
  209. int FDEventHanderBase::RegisterWriteEvent() {
  210.     return m_pReactor->RegisterWriteEvent(this);
  211. }
  212. // 注销读事件
  213. int FDEventHanderBase::UnRegisterReadEvent() {
  214.     return m_pReactor->UnRegisterReadEvent(this);
  215. }
  216. // 注销写事件
  217. int FDEventHanderBase::UnRegisterWriteEvent() {
  218.     return m_pReactor->UnRegisterWriteEvent(this);
  219. }

  220. int FDEventHanderBase::GetFD() const {
  221.     return m_fd;
  222. }
  223. void FDEventHanderBase::SetFD(int fd) {
  224.     m_fd = fd;
  225. }
  226. // 设置非阻塞,返回0,success.
  227. int FDEventHanderBase::SetNonBlock(int fd, bool bNonBlock/* = true*/) {
  228.     int flags = fcntl(fd, F_GETFL, 0);
  229.     if (flags == -1)
  230.     {
  231.         return -1;
  232.     }
  233.     if (bNonBlock)
  234.         flags |= O_NONBLOCK;
  235.     else
  236.         flags &= ~O_NONBLOCK;

  237.     if (fcntl(fd, F_SETFL, flags) == -1)
  238.     {
  239.         return -1;
  240.     }

  241.     return 0;
  242. }
  243. /* **********************************
  244.  *        处理timer类的基类
  245.  *    当timer被触发时,调用该类的函数
  246.  * **********************************/
  247. // 注册定时器,@msec,多长时间响一次,单位:毫秒
  248. // 由于epoll精度的限制,这个地方,精度只能达到 毫秒,但不是准确
  249. int TimerEventHanderBase::RegisterTimer(unsigned int msec /*ms*/, bool restart)
  250. {
  251.     m_bRestart = restart;

  252.     m_interval.tv_sec = msec / 1000;
  253.     m_interval.tv_usec = msec % 1000;
  254.     timeval now;
  255.     gettimeofday(&now, NULL);
  256.     timeradd(&now, &m_interval, &m_endtime);
  257.     return m_pReactor->RegisterTimer(this);
  258. }
  259. int TimerEventHanderBase::UnRegisterTimer() {
  260.     return m_pReactor->UnRegisterTimer(this);
  261. }
  262. int TimerEventHanderBase::RegisterTimerAgain() {
  263.     if (timerisset(&m_endtime) && timerisset(&m_interval))
  264.     {
  265.         timeval now;
  266.         gettimeofday(&now, NULL);
  267.         timeradd(&now, &m_interval, &m_endtime);
  268.         return m_pReactor->RegisterTimer(this);
  269.     }
  270.     return -1;
  271. }
  272. /* **********************************
  273.  *             处理类的集合
  274.  *         有个对应关系 -- fd -- 处理函数 -- read/write.
  275.  * *********************************/
  276. // 增加fd事件, @type : EPOLLIN, EPOLLOUT,
  277. void EventHanderSet::AddFDEventHandler(FDEventHanderBase* pHandler, int type) {
  278.     const int fd = pHandler->GetFD();
  279.     std::map<int, FDHandler>::iterator it = m_fdmap.find(fd);
  280.     if (it != m_fdmap.end())
  281.     {
  282.         it->second.type |= type;
  283.         it->second.pHandler = pHandler;
  284.     }
  285.     else
  286.     {
  287.         FDHandler th;
  288.         th.type = type;
  289.         th.pHandler = pHandler;
  290.         m_fdmap.insert(std::make_pair(fd, th));
  291.     }
  292. }
  293. // @type : EPOLLIN, EPOLLOUT
  294. void EventHanderSet::DelFDEventHandler(int fd, int type) {
  295.     std::map<int, FDHandler>::iterator it = m_fdmap.find(fd);
  296.     if (it != m_fdmap.end())
  297.     {
  298.         it->second.type &= ~type;//去掉该属性
  299.         if (0 == it->second.type) //若所有的属性都去掉了,从map中删除该handler.
  300.         {
  301.             m_fdmap.erase(it);
  302.         }
  303.     }
  304. }
  305. // 根据fd,找到 FDEventHanderBase,及type.
  306. FDEventHanderBase* EventHanderSet::GetFDEventHandler(int fd, int& type) {
  307.     std::map<int, FDHandler>::iterator it = m_fdmap.find(fd);
  308.     if (it != m_fdmap.end())
  309.     {
  310.         type = it->second.type;
  311.         return it->second.pHandler;
  312.     }

  313.     return NULL;
  314. }

  315. void EventHanderSet::AddTimerEventHandler(TimerEventHanderBase* pHandler) {
  316.     timeval endtime = pHandler->GetEndTime();
  317.     std::pair<long int, long int> t(endtime.tv_sec, endtime.tv_usec);
  318.     printf("addTimer: [%u:%u] \n", (unsigned int) t.first,
  319.      (unsigned int) t.second);
  320.     m_timerMinHeap.Insert(t);
  321.     m_timerMultiMap.insert(std::make_pair(t, pHandler));
  322. }
  323. void EventHanderSet::DelTimerEventHandler(TimerEventHanderBase* pHandler) {
  324.     timeval endtime = pHandler->GetEndTime();
  325.     std::pair<long int, long int> t(endtime.tv_sec, endtime.tv_usec);
  326.     std::multimap<std::pair<long int, long int>, TimerEventHanderBase*>::iterator
  327.      beg = m_timerMultiMap.lower_bound(t);
  328.     std::multimap<std::pair<long int, long int>, TimerEventHanderBase*>::iterator
  329.      end = m_timerMultiMap.upper_bound(t);
  330.     for (; beg != end; ++beg)
  331.     {
  332.         if (beg->second == pHandler)
  333.         {
  334.             m_timerMultiMap.erase(beg);
  335.             break;
  336.         }
  337.     }
  338. }
  339. void EventHanderSet::ScanTimer() {

  340.     while (true)
  341.     {
  342.         //        printf("m_timerMinHeap: m_max_size[%d] m_cur_size[%d] \n",
  343.         //         m_timerMinHeap.GetMaxSize(), m_timerMinHeap.GetCurSize());
  344.         if (!m_timerMinHeap.IsEmpty())
  345.         {
  346.             std::pair<long int, long int> minTime = m_timerMinHeap.GetMin(); //获取最小时间
  347.             timeval minTimeVal;
  348.             minTimeVal.tv_sec = minTime.first;
  349.             minTimeVal.tv_usec = minTime.second;

  350.             timeval now =
  351.             { 0, 0 };
  352.             gettimeofday(&now, NULL);
  353.             if (timercmp(&now, &minTimeVal, >=)) //当前时间>=触发时间
  354.             {
  355.                 m_timerMinHeap.RemoveMin(); //去掉最小堆里面的时间
  356.                 std::multimap<std::pair<long int, long int>,
  357.                  TimerEventHanderBase*>::iterator beg =
  358.                  m_timerMultiMap.lower_bound(minTime);

  359.                 for (; beg != m_timerMultiMap.upper_bound(minTime);)
  360.                 {
  361.                     TimerEventHanderBase* pHandler = beg->second;
  362.                     pHandler->OnTimeOut();
  363.                     if (pHandler->IsRestart()) //需要重启
  364.                     {
  365.                         AddTimerEventHandler(pHandler);
  366.                     }
  367.                     m_timerMultiMap.erase(beg++);//去掉multimap中的句柄
  368.                 }
  369.             }
  370.             else
  371.             {
  372.                 break;
  373.             }
  374.         }
  375.         else
  376.         {
  377.             break;
  378.         }
  379.     }
  380. }

7. 测试代码

测试代码

  1. #include "reactor.h"
  2. #include <stdio.h>
  3. #include <string.h>
  4. #include <unistd.h>
  5. #include <sys/types.h>
  6. #include <sys/stat.h>
  7. #include <fcntl.h>

  8. class TimerEventHandler_test: public TimerEventHanderBase
  9. {
  10. public:
  11.     TimerEventHandler_test(Reactor * pReactor) :
  12.         TimerEventHanderBase(pReactor)
  13.     {

  14.     }
  15.     void OnTimeOut() {
  16.         timeval t;
  17.         memset(&t, 0, sizeof(t));
  18.         gettimeofday(&t, NULL);
  19.         printf("timeOut: [%u:%u] \n", (unsigned int)t.tv_sec, (unsigned int)t.tv_usec);
  20.     }
  21. };

  22. void reactor_test_1() {
  23.     Reactor * pReactor = new LTReactor;
  24.     pReactor->Init();
  25.     TimerEventHanderBase* pHandler1 = new TimerEventHandler_test(pReactor);
  26.     pHandler1->RegisterTimer(2000, false);
  27.     TimerEventHanderBase* pHandler2 = new TimerEventHandler_test(pReactor);
  28.     pHandler2->RegisterTimer(4000, false);
  29.     TimerEventHanderBase* pHandler3 = new TimerEventHandler_test(pReactor);
  30.     pHandler3->RegisterTimer(8000, false);
  31.     TimerEventHanderBase* pHandler4 = new TimerEventHandler_test(pReactor);
  32.     pHandler4->RegisterTimer(10000, false);
  33.     pReactor->Run();
  34. }
  35. ////////////////////////////////////////////////////////////////////////////
  36. class FileFDEventHander: public FDEventHanderBase
  37. {
  38. public:
  39.     FileFDEventHander(int fd, Reactor* pReactor) :
  40.         FDEventHanderBase(fd, pReactor)
  41.     {

  42.     }

  43.     // 读被触发时,调用该函数
  44.     virtual void OnFDRead() {
  45.         char buf[1024] = {0};
  46.         read(m_fd, buf, sizeof(buf));
  47.         printf("FDReadResult: [%s] \n", buf);
  48.         UnRegisterReadEvent();
  49.     }
  50.     // 写被触发时,调用该函数
  51.     virtual void OnFDWrite() {
  52.         char buf[] = "this is a test file !";
  53.         write(m_fd, buf, strlen(buf));

  54.         UnRegisterWriteEvent();
  55.     }
  56.     // 关闭fd
  57.     virtual void Close() {
  58.         close(m_fd);
  59.     }
  60. };

  61. void reactor_test_2()
  62. {
  63.     int fd = -1;
  64.     const char* path = "/home/ll/work/min_heap/test.txt";
  65.     if(0 != mkfifo(path, 0777)) // 创建一个pipe.
  66.     {
  67.         fd = open(path, O_RDWR);
  68.     }

  69.     Reactor * pReactor = new LTReactor;
  70.     pReactor->Init();

  71.     FDEventHanderBase* pHandler = new FileFDEventHander(fd, pReactor);
  72.     // 注册写事件,在命令行 cat pipe_file_name
  73. //    pHandler->RegisterWriteEvent();
  74. //    sleep(5);
  75. //    printf("sleep 5s over \n");
  76.     // 注册读事件,在命令行 echo "dfsdf" > pipe_file_name
  77.     pHandler->RegisterReadEvent();
  78.     sleep(5);
  79.     printf("sleep 5s over \n");
  80.     pReactor->Run();
  81. }
8. 需要注意的信号
        SIGPIPE:Broken pipe: write to pipe with no readers,对socket的两端A和B,若B已经关闭。当A第一次写时,内核会返回RST给应用程序;当A第二次写,内核会返回SIGPIPE信号给应用程序。所以A端需要处理SIGPIPE信号。SIGPIPE默认动作是进程退出,可以忽略该信号,signal(SIGPIPE, SIG_IGN)
        SIGINT:当用户按ctl+c时,会产生SIGINT信号。可以用来终止进程。signal(SIGINT, proc_exit);
        SIGKILL / SIGSTOP:不能被捕捉或忽略的信号。提供了一种可以杀死进程的方法。signal(SIGKILL/SIGSTOP, proc_exit);
        SIGTERM:Termination signal,由kill命名发出的系统默认终止信号。signal(SIGTERM, proc_exit);
阅读(3583) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~