Chinaunix首页 | 论坛 | 博客
  • 博客访问: 6796685
  • 博文数量: 578
  • 博客积分: 13065
  • 博客等级: 上将
  • 技术积分: 10103
  • 用 户 组: 普通用户
  • 注册时间: 2008-03-26 16:44
个人简介

推荐: blog.csdn.net/aquester https://github.com/eyjian https://www.cnblogs.com/aquester http://blog.chinaunix.net/uid/20682147.html

文章分类

全部博文(578)

分类: C/C++

2014-05-02 17:03:22

CThriftServerHelper用于服务端,CThriftClientHelper用于客户端。

源代码链接:https://github.com/eyjian/mooon/blob/master/mooon/include/mooon/net/thrift_helper.h

IDL定义:
service PackageManagerService
{
}

服务端使用示例:
CThriftServerHelper _thrift_server_helper;
return _thrift_server_helper.serve(FLAGS_package_port, rpc_threads);

客户端使用示例:
CThriftClientHelper thrift_client_helper(FLAGS_package_ip, FLAGS_package_port);
thrift_client_helper.connect(); // 注意需要处理异常TTransportException/TApplicationException/TException

  1. #ifndef MOOON_NET_THRIFT_HELPER_H
  2. #define MOOON_NET_THRIFT_HELPER_H
  3. #include <mooon/net/config.h>
  4. #include <mooon/sys/log.h>
  5. #include <mooon/utils/string_utils.h>
  6. #include <mooon/utils/scoped_ptr.h>
  7. #include <arpa/inet.h>
  8. #include <boost/scoped_ptr.hpp>
  9. #include <thrift/concurrency/PosixThreadFactory.h>
  10. #include <thrift/concurrency/ThreadManager.h>
  11. #include <thrift/protocol/TBinaryProtocol.h>
  12. #include <thrift/server/TNonblockingServer.h>
  13. #include <thrift/transport/TSocketPool.h>
  14. #include <thrift/transport/TTransportException.h>
  15. #include <vector>
  16. NET_NAMESPACE_BEGIN

  17. // 用来判断thrift是否已经连接,包括两种情况:
  18. // 1.从未连接过,也就是还未打开过连接
  19. // 2.连接被对端关闭了
  20. inline bool thrift_not_connected(
  21.         apache::thrift::transport::TTransportException::TTransportExceptionType type)
  22. {
  23.     return (apache::thrift::transport::TTransportException::NOT_OPEN == type)
  24.         || (apache::thrift::transport::TTransportException::END_OF_FILE == type);
  25. }

  26. inline bool thrift_not_connected(
  27.         apache::thrift::transport::TTransportException& ex)
  28. {
  29.     apache::thrift::transport::TTransportException::TTransportExceptionType type = ex.getType();
  30.     return thrift_not_connected(type);
  31. }

  32. // thrift客户端辅助类
  33. //
  34. // 使用示例:
  35. // mooon::net::CThriftClientHelper<ExampleServiceClient> client(rpc_server_ip, rpc_server_port);
  36. // try
  37. // {
  38. // client.connect();
  39. // client->foo();
  40. // }
  41. // catch (apache::thrift::transport::TTransportException& ex)
  42. // {
  43. // MYLOG_ERROR("thrift exception: (%d)%s\n", ex.getType(), ex.what());
  44. // }
  45. // catch (apache::thrift::transport::TApplicationException& ex)
  46. // {
  47. // MYLOG_ERROR("thrift exception: %s\n", ex.what());
  48. // }
  49. // catch (apache::thrift::TException& ex)
  50. // {
  51. // MYLOG_ERROR("thrift exception: %s\n", ex.what());
  52. // }
  53. // Transport除默认的TFramedTransport (TBufferTransports.h),还可选择:
  54. // TBufferedTransport (TBufferTransports.h)
  55. // THttpTransport
  56. // TZlibTransport
  57. // TFDTransport (TSimpleFileTransport)
  58. //
  59. // Protocol除默认的apache::thrift::protocol::TBinaryProtocol,还可选择:
  60. // TCompactProtocol
  61. // TJSONProtocol
  62. // TDebugProtocol
  63. template <class ThriftClient,
  64.           class Protocol=apache::thrift::protocol::TBinaryProtocol,
  65.           class Transport=apache::thrift::transport::TFramedTransport>
  66. class CThriftClientHelper
  67. {
  68. public:
  69.     // host thrift服务端的IP地址
  70.     // port thrift服务端的端口号
  71.     // connect_timeout_milliseconds 连接thrift服务端的超时毫秒数
  72.     // receive_timeout_milliseconds 接收thrift服务端发过来的数据的超时毫秒数
  73.     // send_timeout_milliseconds 向thrift服务端发送数据时的超时毫秒数
  74.     CThriftClientHelper(const std::string &host, uint16_t port,
  75.                         int connect_timeout_milliseconds=2000,
  76.                         int receive_timeout_milliseconds=2000,
  77.                         int send_timeout_milliseconds=2000);

  78.     // 支持指定多个servers,运行时随机选择一个,当一个异常时自动选择其它
  79.     // num_retries 重试次数
  80.     // retry_interval 重试间隔,单位为秒
  81.     // max_consecutive_failures 单个Server最大连续失败次数
  82.     // randomize_ 是否随机选择一个Server
  83.     // always_try_last 是否总是重试最后一个Server
  84.     CThriftClientHelper(const std::vector<std::pair<std::string, int> >& servers,
  85.                         int connect_timeout_milliseconds=2000,
  86.                         int receive_timeout_milliseconds=2000,
  87.                         int send_timeout_milliseconds=2000,
  88.                         int num_retries=1, int retry_interval=60,
  89.                         int max_consecutive_failures=1,
  90.                         bool randomize=true, bool always_try_last=true
  91.                         );
  92.     ~CThriftClientHelper();

  93.     // 连接thrift服务端
  94.     //
  95.     // 出错时,可抛出以下几个thrift异常:
  96.     // apache::thrift::transport::TTransportException
  97.     // apache::thrift::TApplicationException
  98.     // apache::thrift::TException
  99.     void connect();
  100.     bool is_connected() const;

  101.     // 断开与thrift服务端的连接
  102.     //
  103.     // 出错时,可抛出以下几个thrift异常:
  104.     // apache::thrift::transport::TTransportException
  105.     // apache::thrift::TApplicationException
  106.     // apache::thrift::TException
  107.     void close();

  108.     apache::thrift::transport::TSocket* get_socket() { return _socket.get(); }
  109.     const apache::thrift::transport::TSocket get_socket() const { return _socket.get(); }
  110.     ThriftClient* get() { return _client.get(); }
  111.     ThriftClient* get() const { return _client.get(); }
  112.     ThriftClient* operator ->() { return get(); }
  113.     ThriftClient* operator ->() const { return get(); }

  114.     // 取thrift服务端的IP地址
  115.     const std::string& get_host() const;
  116.     // 取thrift服务端的端口号
  117.     uint16_t get_port() const;

  118.     // 返回可读的标识,常用于记录日志
  119.     std::string str() const
  120.     {
  121.         return utils::CStringUtils::format_string("thrift://%s:%u", get_host().c_str(), get_port());
  122.     }

  123. private:
  124.     void init();

  125. private:
  126.     int _connect_timeout_milliseconds;
  127.     int _receive_timeout_milliseconds;
  128.     int _send_timeout_milliseconds;

  129. private:
  130.     // TSocket只支持一个server,而TSocketPool是TSocket的子类支持指定多个server,运行时随机选择一个
  131.     boost::shared_ptr<apache::thrift::transport::TSocket> _socket;
  132.     boost::shared_ptr<apache::thrift::transport::TTransport> _transport;
  133.     boost::shared_ptr<apache::thrift::protocol::TProtocol> _protocol;
  134.     boost::shared_ptr<ThriftClient> _client;
  135. };

  136. ////////////////////////////////////////////////////////////////////////////////
  137. // thrift服务端辅助类
  138. //
  139. // 使用示例:
  140. // mooon::net::CThriftServerHelper<CExampleHandler, ExampleServiceProcessor> _thrift_server;
  141. // try
  142. // {
  143. // _thrift_server.serve(listen_port);
  144. // }
  145. // catch (apache::thrift::TException& ex)
  146. // {
  147. // MYLOG_ERROR("thrift exception: %s\n", ex.what());
  148. // }
  149. // ProtocolFactory除了默认的TBinaryProtocolFactory,还可选择:
  150. // TCompactProtocolFactory
  151. // TJSONProtocolFactory
  152. // TDebugProtocolFactory
  153. //
  154. // 只支持TNonblockingServer一种Server
  155. template <class ThriftHandler,
  156.           class ServiceProcessor,
  157.           class ProtocolFactory=apache::thrift::protocol::TBinaryProtocolFactory>
  158. class CThriftServerHelper
  159. {
  160. public:
  161.     // 启动rpc服务,请注意该调用是同步阻塞的,所以需放最后调用
  162.     // port thrift服务端的监听端口号
  163.     // num_threads thrift服务端开启的线程数
  164.     //
  165.     // 出错时,可抛出以下几个thrift异常:
  166.     // apache::thrift::transport::TTransportException
  167.     // apache::thrift::TApplicationException
  168.     // apache::thrift::TException
  169.     // 参数num_io_threads,只有当Server为TNonblockingServer才有效
  170.     void serve(uint16_t port, uint8_t num_worker_threads=1, uint8_t num_io_threads=1);
  171.     void serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads=1);
  172.     void serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads, void* attached);

  173.     // 要求ThriftHandler类有方法attach(void*)
  174.     void serve(uint16_t port, void* attached, uint8_t num_worker_threads=1, uint8_t num_io_threads=1);
  175.     void stop();

  176.     ThriftHandler* get()
  177.     {
  178.         return _handler.get();
  179.     }
  180.     ThriftHandler* get() const
  181.     {
  182.         return _handler.get();
  183.     }

  184. private:
  185.     void init(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads);

  186. private:
  187.     boost::shared_ptr<ThriftHandler> _handler;
  188.     boost::shared_ptr<apache::thrift::TProcessor> _processor;
  189.     boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> _protocol_factory;
  190.     boost::shared_ptr<apache::thrift::server::ThreadManager> _thread_manager;
  191.     boost::shared_ptr<apache::thrift::concurrency::PosixThreadFactory> _thread_factory;
  192.     boost::shared_ptr<apache::thrift::server::TServer> _server;
  193. };

  194. ////////////////////////////////////////////////////////////////////////////////
  195. // 被thrift回调的写日志函数,由set_thrift_debug_log_function()调用它
  196. inline void write_thrift_debug_log(const char* log)
  197. {
  198.     MYLOG_DEBUG("%s", log);
  199. }

  200. inline void write_thrift_info_log(const char* log)
  201. {
  202.     MYLOG_INFO("%s", log);
  203. }

  204. inline void write_thrift_error_log(const char* log)
  205. {
  206.     MYLOG_ERROR("%s", log);
  207. }

  208. // 将thrift输出写入到日志文件中
  209. inline void set_thrift_debug_log_function()
  210. {
  211.     if (::mooon::sys::g_logger != NULL)
  212.     {
  213.         apache::thrift::GlobalOutput.setOutputFunction(write_thrift_debug_log);
  214.     }
  215. }

  216. inline void set_thrift_info_log_function()
  217. {
  218.     if (::mooon::sys::g_logger != NULL)
  219.     {
  220.         apache::thrift::GlobalOutput.setOutputFunction(write_thrift_info_log);
  221.     }
  222. }

  223. inline void set_thrift_error_log_function()
  224. {
  225.     if (::mooon::sys::g_logger != NULL)
  226.     {
  227.         apache::thrift::GlobalOutput.setOutputFunction(write_thrift_error_log);
  228.     }
  229. }

  230. ////////////////////////////////////////////////////////////////////////////////
  231. template <class ThriftClient, class Protocol, class Transport>
  232. CThriftClientHelper<ThriftClient, Protocol, Transport>::CThriftClientHelper(
  233.         const std::string &host, uint16_t port,
  234.         int connect_timeout_milliseconds, int receive_timeout_milliseconds, int send_timeout_milliseconds)
  235.         : _connect_timeout_milliseconds(connect_timeout_milliseconds),
  236.           _receive_timeout_milliseconds(receive_timeout_milliseconds),
  237.           _send_timeout_milliseconds(send_timeout_milliseconds)
  238. {
  239.     set_thrift_debug_log_function();
  240.     _socket.reset(new apache::thrift::transport::TSocket(host, port));
  241.     init();
  242. }

  243. template <class ThriftClient, class Protocol, class Transport>
  244. CThriftClientHelper<ThriftClient, Protocol, Transport>::CThriftClientHelper(
  245.         const std::vector<std::pair<std::string, int> >& servers,
  246.         int connect_timeout_milliseconds,
  247.         int receive_timeout_milliseconds,
  248.         int send_timeout_milliseconds,
  249.         int num_retries, int retry_interval,
  250.         int max_consecutive_failures,
  251.         bool randomize, bool always_try_last)
  252.         : _connect_timeout_milliseconds(connect_timeout_milliseconds),
  253.           _receive_timeout_milliseconds(receive_timeout_milliseconds),
  254.           _send_timeout_milliseconds(send_timeout_milliseconds)
  255. {
  256.     set_thrift_debug_log_function();

  257.     apache::thrift::transport::TSocketPool* socket_pool = new apache::thrift::transport::TSocketPool(servers);
  258.     socket_pool->setNumRetries(num_retries);
  259.     socket_pool->setRetryInterval(retry_interval);
  260.     socket_pool->setMaxConsecutiveFailures(max_consecutive_failures);
  261.     socket_pool->setRandomize(randomize);
  262.     socket_pool->setAlwaysTryLast(always_try_last);
  263.     _socket.reset(socket_pool);
  264.     init();
  265. }

  266. template <class ThriftClient, class Protocol, class Transport>
  267. void CThriftClientHelper<ThriftClient, Protocol, Transport>::init()
  268. {
  269.     _socket->setConnTimeout(_connect_timeout_milliseconds);
  270.     _socket->setRecvTimeout(_receive_timeout_milliseconds);
  271.     _socket->setSendTimeout(_send_timeout_milliseconds);

  272.     // Transport默认为apache::thrift::transport::TFramedTransport
  273.     _transport.reset(new Transport(_socket));
  274.     // Protocol默认为apache::thrift::protocol::TBinaryProtocol
  275.     _protocol.reset(new Protocol(_transport));
  276.     // 服务端的Client
  277.     _client.reset(new ThriftClient(_protocol));
  278. }

  279. template <class ThriftClient, class Protocol, class Transport>
  280. CThriftClientHelper<ThriftClient, Protocol, Transport>::~CThriftClientHelper()
  281. {
  282.     close();
  283. }

  284. template <class ThriftClient, class Protocol, class Transport>
  285. void CThriftClientHelper<ThriftClient, Protocol, Transport>::connect()
  286. {
  287.     if (!_transport->isOpen())
  288.     {
  289.         // 如果Transport为TFramedTransport,则实际调用:TFramedTransport::open -> TSocketPool::open
  290.         _transport->open();
  291.         //"TSocketPool::open: all connections failed"时,
  292.         // TSocketPool::open就抛出异常TTransportException,异常类型为TTransportException::NOT_OPEN
  293.     }
  294. }

  295. template <class ThriftClient, class Protocol, class Transport>
  296. bool CThriftClientHelper<ThriftClient, Protocol, Transport>::is_connected() const
  297. {
  298.     return _transport->isOpen();
  299. }

  300. template <class ThriftClient, class Protocol, class Transport>
  301. void CThriftClientHelper<ThriftClient, Protocol, Transport>::close()
  302. {
  303.     if (_transport->isOpen())
  304.     {
  305.         _transport->close();
  306.     }
  307. }

  308. template <class ThriftClient, class Protocol, class Transport>
  309. const std::string& CThriftClientHelper<ThriftClient, Protocol, Transport>::get_host() const
  310. {
  311.     return _socket->getHost();
  312. }

  313. template <class ThriftClient, class Protocol, class Transport>
  314. uint16_t CThriftClientHelper<ThriftClient, Protocol, Transport>::get_port() const
  315. {
  316.     return static_cast<uint16_t>(_socket->getPort());
  317. }

  318. ////////////////////////////////////////////////////////////////////////////////
  319. template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
  320. void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory>::serve(uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)
  321. {
  322.     serve("0.0.0.0", port, num_worker_threads, num_io_threads);
  323. }

  324. template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
  325. void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory>::serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)
  326. {
  327.     init("0.0.0.0", port, num_worker_threads, num_io_threads);

  328.     // 这里也可直接调用serve(),但推荐run()
  329.     // !!!注意调用run()的进程或线程会被阻塞
  330.     _server->run();
  331.     _thread_manager->join();
  332. }

  333. template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
  334. void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory>::serve(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads, void* attached)
  335. {
  336.     init(ip, port, num_worker_threads, num_io_threads);

  337.     // 关联
  338.     if (attached != NULL)
  339.         _handler->attach(attached);

  340.     // 这里也可直接调用serve(),但推荐run()
  341.     // !!!注意调用run()的进程或线程会被阻塞
  342.     _server->run();
  343.     _thread_manager->join();
  344. }

  345. template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
  346. void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory>::serve(uint16_t port, void* attached, uint8_t num_worker_threads, uint8_t num_io_threads)
  347. {
  348.     init("0.0.0.0", port, num_worker_threads, num_io_threads);

  349.     // 关联
  350.     if (attached != NULL)
  351.         _handler->attach(attached);

  352.     // 这里也可直接调用serve(),但推荐run()
  353.     // !!!注意调用run()的进程或线程会被阻塞
  354.     _server->run();
  355.     _thread_manager->join();
  356. }

  357. template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
  358. void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory>::stop()
  359. {
  360.     _server->stop();
  361.     _thread_manager->stop();
  362. }

  363. template <class ThriftHandler, class ServiceProcessor, class ProtocolFactory>
  364. void CThriftServerHelper<ThriftHandler, ServiceProcessor, ProtocolFactory>::init(const std::string &ip, uint16_t port, uint8_t num_worker_threads, uint8_t num_io_threads)
  365. {
  366.     set_thrift_debug_log_function();

  367.     _handler.reset(new ThriftHandler);
  368.     _processor.reset(new ServiceProcessor(_handler));

  369.     // ProtocolFactory默认为apache::thrift::protocol::TBinaryProtocolFactory
  370.     _protocol_factory.reset(new ProtocolFactory());
  371.     _thread_manager = apache::thrift::server::ThreadManager::newSimpleThreadManager(num_worker_threads);
  372.     _thread_factory.reset(new apache::thrift::concurrency::PosixThreadFactory());

  373.     _thread_manager->threadFactory(_thread_factory);
  374.     _thread_manager->start();

  375.     apache::thrift::server::TNonblockingServer* server = new apache::thrift::server::TNonblockingServer(_processor, _protocol_factory, port, _thread_manager);
  376.     server->setNumIOThreads(num_io_threads);
  377.     _server.reset(server);

  378.     // 不要调用_server->run(),交给serve()来调用,
  379.     // 因为一旦调用了run()后,调用线程或进程就被阻塞了。
  380. }

  381. NET_NAMESPACE_END
  382. #endif // MOOON_NET_THRIFT_HELPER_H



阅读(4827) | 评论(2) | 转发(0) |
给主人留下些什么吧!~~

Aquester2017-08-17 16:03:12

thrift-0.9.3编译和0.9.0稍不同,标准方式指定openssl:
./configure --prefix=/usr/local/thrift-0.9.3 CPPFLAGS=\"-fPIC\" LDFLAGS=\"-fPIC\" --with-libevent=/usr/local/libevent --with-boost=/usr/local/boost --with-openssl=/usr/local/openssl --enable-tutorial=no --enable-tests=no --enable-coverage=no --with-cpp=yes --with-java=yes --with-php=yes --with-ruby=no --with-python=no --with-erlang=no --with-csharp=no --with-qt4=no --with-qt5=no --with-c_glib=no --with-perl=no --with-haskell=no

Aquester2017-08-17 14:58:46

thrift-0.9.3编译和0.9.0稍不同,标准方式指定openssl:
./configure --prefix=/usr/local/thrift-0.9.3 --with-libevent=/usr/local/libevent --with-boost=/usr/local/boost --with-openssl=/usr/local/openssl --enable-tutorial=no --enable-tests=no --enable-coverage=no --with-cpp=yes --with-java=yes --with-php=yes --with-ruby=no --with-python=no --with-erlang=no --with-csharp=no --with-qt4=no --with-qt5=no --with-c_glib=no --with-perl=no --with-haskell=no --with-d=no --with-go=no --with-nodejs=no --wi

评论热议
请登录后评论。

登录 注册