Chinaunix首页 | 论坛 | 博客
  • 博客访问: 6796658
  • 博文数量: 578
  • 博客积分: 13065
  • 博客等级: 上将
  • 技术积分: 10103
  • 用 户 组: 普通用户
  • 注册时间: 2008-03-26 16:44
个人简介

推荐: blog.csdn.net/aquester https://github.com/eyjian https://www.cnblogs.com/aquester http://blog.chinaunix.net/uid/20682147.html

文章分类

全部博文(578)

分类: HADOOP

2018-11-12 17:33:16

CZookeeperHelper:
https://github.com/eyjian/libmooon/blob/master/include/mooon/net/zookeeper_helper.h

CMainHelper:
https://github.com/eyjian/libmooon/blob/master/include/mooon/sys/main_template.h

  1. // 演示一个多线程程序如何借助zookeeper,实现配置的动态更新
  2. //
  3. // 实现理念(有些场景不适合):
  4. // 1) 让线程不涉及配置的动态更新,这样避免了动态更新配置
  5. // 2) 通过创建新线程的方式达到配置动态更新的目的,老的线程直接退出
  6. // 3) 先创建新线程,再退出老线程,保持服务不中断
  7. //
  8. // 实际上,也可以通过父子进程方式来达到配置动态更新,
  9. // 父进程检测到配置更新后,父进程读取配置,并检查配置的合法性。
  10. // 如果合法则创建新的子进程,完成后再kill原有的子进程,
  11. // 这样子进程就不涉及配置更新逻辑。
  12. //
  13. // 这两种方法,均可比较简单应对复杂的配置动态更新,
  14. // 但如果新旧配置无法同时兼容,则需要先停掉老的线程或进程,
  15. // 然后再启动新的线程或进程,否则做到无缝地动态更新。
  16. //
  17. // 编译要求环境:C++11或更高
  18. // 编译语句大致如下:
  19. // g++ -g -o b zk_conf_example.cpp -I/usr/local/mooon/include -I/usr/local/zookeeper/include /usr/local/mooon/lib/libmooon.a /usr/local/zookeeper/lib/libzookeeper_mt.a -pthread -std=c++11 -DMOOON_HAVE_ZOOKEEPER=1 -lz
  20. #include <mooon/net/zookeeper_helper.h>
  21. #include <mooon/sys/datetime_utils.h> // 格式化时间也可以考虑C++标准库提供的std::put_time
  22. #include <mooon/sys/main_template.h>
  23. #include <mooon/utils/args_parser.h>
  24. #include <chrono>
  25. #include <condition_variable>
  26. #include <mutex>
  27. #include <system_error>
  28. #include <thread>

  29. // 指定存放配置的zookeeper
  30. STRING_ARG_DEFINE(zookeeper, "", "Comma separated list of servers in the ZooKeeper Quorum, example: --zookeeper=127.0.0.1:2181");

  31. class CMyApplication;

  32. // 负责具体业务的工作者(线程)
  33. class CWorker
  34. {
  35. public:
  36.     CWorker(CMyApplication* app, int index);
  37.     void run(); // 线程入口函数
  38.     void stop() { _stop = true; }

  39. private:
  40.     CMyApplication* _app;
  41.     int _index;
  42.     volatile bool _stop;
  43. };

  44. // 应用程序主类(或叫上下文类,也可叫入口类)
  45. // 通过继承CZookeeperHelper,获得zookeeper操作能力,
  46. // 包括读写zookeeper数据能力、发现配置更新能力和主备切换能力。
  47. //
  48. // 可继承mooon::sys::CMainHelper,
  49. // 以获得通过信号SIGTERM的优雅退出能力,
  50. // CMainHelper提供了优雅和安全的信号处理,
  51. // 默认的优雅退出信号为SIGTERM,可自定义为其它信号。
  52. class CMyApplication: public mooon::net::CZookeeperHelper, public mooon::sys::CMainHelper
  53. {
  54. public:
  55.     CMyApplication();

  56. private:
  57.     // num_workers 需要启动的CWorker个数
  58.     bool start_workers(
  59.         std::vector<std::thread>* work_threads,
  60.         std::vector<std::shared_ptr<CWorker>>* workers,
  61.         int num_workers);
  62.     void stop_workers(
  63.         std::vector<std::thread>* work_threads,
  64.         std::vector<std::shared_ptr<CWorker>>* workers);
  65.     // 当zookeeper的会话过期后,
  66.     // 需要调用recreate_zookeeper_session重新建立会话
  67.     void recreate_zookeeper_session();

  68. // 实现父类CMainHelper定义的虚拟函数(实为回调函数),
  69. // 以下五个“on_”函数,均运行在独立的信号线程中,而不是主线程中。
  70. private:
  71.     // 主线程的调用顺序:
  72.     // main()
  73.     // -> on_check_parameter() -> on_init()
  74.     // -> on_run() -> on_fini()
  75.     //
  76.     // 注意on_terminated()是由信号触发的,
  77.     // 由独立的信号线程调用,但位于on_init()之后。
  78.     virtual bool on_check_parameter();
  79.     virtual bool on_init(int argc, char* argv[]);
  80.     virtual bool on_run(); // 这里使得配置动态生效
  81.     virtual void on_fini();
  82.     virtual void on_terminated();

  83. // 实现父类CZookeeperHelper定义的虚拟函数(实为回调函数)
  84. // 以下五个“on_”函数,均运行在独立的zookeeper线程中,而不是主线程中。
  85. private:
  86.     virtual void on_zookeeper_session_connected(const char* path);
  87.     virtual void on_zookeeper_session_connecting(const char* path);
  88.     virtual void on_zookeeper_session_expired(const char *path);
  89.     virtual void on_zookeeper_session_event(int state, const char *path);
  90.     virtual void on_zookeeper_event(int type, int state, const char *path);

  91. private:
  92.     volatile bool _stop;
  93.     std::mutex _mutex;
  94.     std::condition_variable _cond;
  95.     std::vector<std::thread> _work_threads;
  96.     std::vector<std::shared_ptr<CWorker>> _workers;

  97. private:
  98.     volatile bool _conf_changed; // 配置发生变化
  99.     volatile bool _zookeeper_session_expired; // zookeeper的会话(session)过期
  100.     std::string _zk_nodes; // 存放配置的zookeeper节点列表
  101.     std::string _conf_zkpath; // 配置的zookeeper节点路径
  102. };

  103. int main(int argc, char* argv[])
  104. {
  105.     CMyApplication app;
  106.     return mooon::sys::main_template(&app, argc, argv);
  107. }

  108. static unsigned long long get_current_thread_id()
  109. {
  110.     std::stringstream ss;
  111.     ss << std::this_thread::get_id();
  112.     return std::stoull(ss.str());
  113. }

  114. CMyApplication::CMyApplication()
  115.     : _stop(false), _conf_changed(false), _zookeeper_session_expired(false)
  116. {
  117.     _conf_zkpath = "/tmp/conf";
  118. }

  119. bool CMyApplication::on_check_parameter()
  120. {
  121.     // 命令行参数“--zookeeper”不能为空
  122.     return !mooon::argument::zookeeper->value().empty();
  123. }

  124. bool CMyApplication::on_init(int argc, char* argv[])
  125. {
  126.     try
  127.     {
  128.         // 以this方式调用的函数,均为CZookeeperHelper提供
  129.         _zk_nodes = mooon::argument::zookeeper->value();
  130.         this->create_session(_zk_nodes);

  131.         // zookeeper的会话(session)是异步创建的,
  132.         // 只有连接成功后,方可读取存放在zookeeper上的配置数据。
  133.         for (int i=0; i<5&&!_stop; ++i)
  134.         {
  135.             if (this->is_connected())
  136.                 break;
  137.             else
  138.                 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  139.         }

  140.         if (!this->is_connected())
  141.         {
  142.             fprintf(stderr, "Can not connect zookeeper://%s\n", _zk_nodes.c_str());
  143.             return false;
  144.         }
  145.         else
  146.         {
  147.             // 取zookeeper节点数据
  148.             std::string zkdata;
  149.             int n = get_zk_data(_conf_zkpath.c_str(), &zkdata, 4);
  150.             if (n > 4 || zkdata.empty())
  151.             {
  152.                 // 配置数据的大小超出预期
  153.                 fprintf(stderr, "conf size error: %d\n", n);
  154.                 return false;
  155.             }
  156.             else
  157.             {
  158.                 // 如果zkdata不是一个有效的数字,
  159.                 // stoi会抛出异常invalid_argument
  160.                 const int num_workers = std::stoi(zkdata);

  161.                 if (num_workers < 1 || num_workers > 10)
  162.                 {
  163.                     fprintf(stderr, "conf error: %d\n", num_workers);
  164.                     return false;
  165.                 }
  166.                 else
  167.                 {
  168.                     return start_workers(&_work_threads, &_workers, num_workers);
  169.                 }
  170.             }
  171.         }
  172.     }
  173.     catch (std::invalid_argument& ex)
  174.     {
  175.         fprintf(stderr, "%s\n", ex.what());
  176.         return false;
  177.     }
  178.     catch (mooon::sys::CSyscallException& ex)
  179.     {
  180.         fprintf(stderr, "%s\n", ex.str().c_str());
  181.         return false;
  182.     }
  183.     catch (mooon::utils::CException& ex)
  184.     {
  185.         fprintf(stderr, "%s\n", ex.str().c_str());
  186.         return false;
  187.     }
  188. }

  189. bool CMyApplication::on_run()
  190. {
  191.     while (!_stop)
  192.     {
  193.         std::unique_lock<std::mutex> lock(_mutex);
  194.         _cond.wait(lock); // 等待配置更新或收到退出指令
  195.         if (_stop)
  196.         {
  197.             break;
  198.         }

  199.         // 以下实现省略了函数调用抛异常处理
  200.         if (_zookeeper_session_expired)
  201.         {
  202.             // 如果会话过期,则需要重新建会话
  203.             recreate_zookeeper_session();
  204.         }
  205.         if (_stop)
  206.         {
  207.             // 在建立会话过程中,可能收到了停止指令
  208.             break;
  209.         }
  210.         if (_conf_changed)
  211.         {
  212.             _conf_changed = false;

  213.             // 读取新的配置
  214.             std::string zkdata;
  215.             int n = get_zk_data(_conf_zkpath.c_str(), &zkdata, 4);
  216.             if (n > 4)
  217.             {
  218.                 // 这种情况下应触发告警
  219.                 // 配置数据的大小超出预期
  220.                 fprintf(stderr, "conf size error: %d\n", n);
  221.             }
  222.             else
  223.             {
  224.                 // 这里可考虑加上优化:
  225.                 // 只有配置确实发生变化时才进行后续操作。
  226.                 const int num_workers = std::stoi(zkdata);

  227.                 if (num_workers < 1 || num_workers > 10)
  228.                 {
  229.                     // 这种情况下应触发告警
  230.                     fprintf(stderr, "conf error: %d\n", num_workers);
  231.                 }
  232.                 else
  233.                 {
  234.                     std::vector<std::thread> work_threads;
  235.                     std::vector<std::shared_ptr<CWorker>> workers;

  236.                     // 新的配置生效,才停掉原来的,
  237.                     // 防止因为误操破坏配置,导致整个系统崩溃
  238.                     if (!start_workers(&work_threads, &workers, num_workers))
  239.                     {
  240.                         // 这种情况下应触发告警
  241.                     }
  242.                     else
  243.                     {
  244.                         stop_workers(&_work_threads, &_workers);
  245.                         _work_threads.swap(work_threads);
  246.                         _workers.swap(workers);
  247.                     }
  248.                 }
  249.             }
  250.         }
  251.     }

  252.     return true;
  253. }

  254. void CMyApplication::on_fini()
  255. {
  256.     // 应用退出时被调用
  257.     fprintf(stdout, "Application is about to quit\n");
  258. }

  259. // 接收到了SIGTERM信号
  260. void CMyApplication::on_terminated()
  261. {
  262.     // 一定要最先调用父类CMainHelper的on_terminated
  263.     mooon::sys::CMainHelper::on_terminated();

  264.     _stop = true;
  265.     stop_workers(&_work_threads, &_workers);

  266.     std::unique_lock<std::mutex> lock(_mutex);
  267.     _cond.notify_one(); // 唤醒等待状态的CMyApplication::run
  268. }

  269. bool CMyApplication::start_workers(
  270.     std::vector<std::thread>* work_threads,
  271.     std::vector<std::shared_ptr<CWorker>>* workers,
  272.     int num_workers)
  273. {
  274.     try
  275.     {
  276.         for (int i=0; i<num_workers; ++i)
  277.         {
  278.             std::shared_ptr<CWorker> worker(new CWorker(this, i));
  279.             workers->push_back(worker);
  280.             work_threads->push_back(std::thread(&CWorker::run, worker));
  281.         }
  282.         return true;
  283.     }
  284.     catch(const std::system_error& ex)
  285.     {
  286.         // 如果有部分启动功能应当回退,这里省略了
  287.         fprintf(stderr, "(%d)%s\n", ex.code().value(), ex.what());
  288.         return false;
  289.     }
  290. }

  291. void CMyApplication::stop_workers(
  292.     std::vector<std::thread>* work_threads,
  293.     std::vector<std::shared_ptr<CWorker>>* workers)
  294. {
  295.     for (std::vector<std::shared_ptr<CWorker>>::size_type i=0; i<workers->size(); ++i)
  296.     {
  297.         (*workers)[i]->stop();
  298.         if ((*work_threads)[i].joinable())
  299.             (*work_threads)[i].join();
  300.     }
  301.     work_threads->clear();
  302.     workers->clear();
  303. }

  304. void CMyApplication::recreate_zookeeper_session()
  305. {
  306.     unsigned int count = 0;

  307.     while (!_stop)
  308.     {
  309.         try
  310.         {
  311.             recreate_session();
  312.             _zookeeper_session_expired = false;
  313.             break;
  314.         }
  315.         catch (mooon::utils::CException& ex)
  316.         {
  317.             std::this_thread::sleep_for(std::chrono::milliseconds(2000));

  318.             if (0 == count++%30)
  319.             {
  320.                 fprintf(stderr, "recreate zookeeper session failed: (count:%d)%s\n", count, ex.str().c_str());
  321.             }
  322.         }
  323.     }
  324. }

  325. void CMyApplication::on_zookeeper_session_connected(const char* path)
  326. {
  327.     fprintf(stdout, "path=%s\n", path);
  328. }

  329. void CMyApplication::on_zookeeper_session_connecting(const char* path)
  330. {
  331.     fprintf(stdout, "path=%s\n", path);
  332. }

  333. void CMyApplication::on_zookeeper_session_expired(const char *path)
  334. {
  335.     fprintf(stdout, "path=%s\n", path);

  336.     std::unique_lock<std::mutex> lock(_mutex);
  337.     _zookeeper_session_expired = true;
  338.     _cond.notify_one(); // 唤醒等待状态的CMyApplication::run
  339. }

  340. void CMyApplication::on_zookeeper_session_event(int state, const char *path)
  341. {
  342.     fprintf(stdout, "state=%d, path=%s\n", state, path);
  343. }

  344. void CMyApplication::on_zookeeper_event(int type, int state, const char *path)
  345. {
  346.     fprintf(stdout, "type=%d, state=%d, path=%s\n", type, state, path);

  347.     if (ZOO_CONNECTED_STATE == state &&
  348.         ZOO_CHANGED_EVENT == type &&
  349.         0 == strcmp(path, _conf_zkpath.c_str()))
  350.     {
  351.         // 配置发生变化
  352.         std::unique_lock<std::mutex> lock(_mutex);
  353.         _conf_changed = true;
  354.         _cond.notify_one(); // 唤醒等待状态的CMyApplication::run
  355.     }
  356. }

  357. CWorker::CWorker(CMyApplication* app, int index)
  358.     : _app(app), _index(index), _stop(false)
  359. {
  360. }

  361. void CWorker::run()
  362. {
  363.     fprintf(stdout, "Worker[%d/%llu] \033[1;33mstarted\033[m\n", _index, get_current_thread_id());

  364.     while (!_stop)
  365.     {
  366.         // 执行具体的业务逻辑操作,这里仅以sleep替代做示范
  367.         std::this_thread::sleep_for(std::chrono::milliseconds(2000));
  368.         fprintf(stdout, "[%s] Worker[\033[1;33m%d\033[m/%llu] is working ...\n",
  369.             mooon::sys::CDatetimeUtils::get_current_time().c_str(),
  370.             _index, get_current_thread_id());
  371.     }

  372.     fprintf(stdout, "Worker[%d/%llu] \033[1;33mstopped\033[m\n", _index, get_current_thread_id());
  373. }

阅读(2916) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~
评论热议
请登录后评论。

登录 注册