Chinaunix首页 | 论坛 | 博客
  • 博客访问: 892550
  • 博文数量: 299
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 2493
  • 用 户 组: 普通用户
  • 注册时间: 2014-03-21 10:07
个人简介

Linux后台服务器编程。

文章分类

全部博文(299)

文章存档

2015年(2)

2014年(297)

分类: C/C++

2014-11-03 22:00:56

使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.dispatch的接口也和io_service.post一样,但不同的是它是直接调用而不是经过push到队列然后在io_services.run中执行,而在这个示例当中,显然我们需要把工作交到另一个线程去完成,这样才不会影响网络接收线程池的工作以达到高效率的接收数据,这种设计与前面的netsever其实相同,这就是典型的Half Sync/Half Async。二者的区别就是netsever自己实现了工作队列,而不是直接使用io_service,这种设计实际上在win下是使用了iocp作为工作队列。

不过我更倾向于前一种设计,因为那样做,代码一切都在自己的掌握中,而io_service则是经过许多封装代码,并且本身设计只是用于处理网络完成事件的。

无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。


点击(此处)折叠或打开

  1. #include <stdio.h>
  2. #include <cstdlib>
  3. #include <iostream>
  4. #include <boost/thread.hpp>
  5. #include <boost/aligned_storage.hpp>
  6. #include <boost/array.hpp>
  7. #include <boost/bind.hpp>
  8. #include <boost/enable_shared_from_this.hpp>
  9. #include <boost/noncopyable.hpp>
  10. #include <boost/shared_ptr.hpp>
  11. #include <boost/asio.hpp>

  12. using boost::asio::ip::tcp;

  13. class handler_allocator
  14.     : private boost::noncopyable
  15. {
  16. public:
  17.     handler_allocator()
  18.         : in_use_(false)
  19.     {
  20.     }

  21.     void* allocate(std::size_t size)
  22.     {
  23.         if (!in_use_ && size < storage_.size)
  24.         {
  25.             in_use_ = true;
  26.             return storage_.address();
  27.         }
  28.         else
  29.         {
  30.             return ::operator new(size);
  31.         }
  32.     }

  33.     void deallocate(void* pointer)
  34.     {
  35.         if (pointer == storage_.address())
  36.         {
  37.             in_use_ = false;
  38.         }
  39.         else
  40.         {
  41.             ::operator delete(pointer);
  42.         }
  43.     }

  44. private:
  45.     // Storage space used for handler-based custom memory allocation.
  46.     boost::aligned_storage<1024> storage_;

  47.     // Whether the handler-based custom allocation storage has been used.
  48.     bool in_use_;
  49. };

  50. template <typename Handler>
  51. class custom_alloc_handler
  52. {
  53. public:
  54.     custom_alloc_handler(handler_allocator& a, Handler h)
  55.         : allocator_(a),
  56.         handler_(h)
  57.     {
  58.     }

  59.     template <typename Arg1>
  60.     void operator()(Arg1 arg1)
  61.     {
  62.         handler_(arg1);
  63.     }

  64.     template <typename Arg1, typename Arg2>
  65.     void operator()(Arg1 arg1, Arg2 arg2)
  66.     {
  67.         handler_(arg1, arg2);
  68.     }

  69.     friend void* asio_handler_allocate(std::size_t size,
  70.         custom_alloc_handler<Handler>* this_handler)
  71.     {
  72.         return this_handler->allocator_.allocate(size);
  73.     }

  74.     friend void asio_handler_deallocate(void* pointer, std::size_t /*size*/,
  75.         custom_alloc_handler<Handler>* this_handler)
  76.     {
  77.         this_handler->allocator_.deallocate(pointer);
  78.     }

  79. private:
  80.     handler_allocator& allocator_;
  81.     Handler handler_;
  82. };

  83. // Helper function to wrap a handler object to add custom allocation.
  84. template <typename Handler>
  85. inline custom_alloc_handler<Handler> make_custom_alloc_handler(
  86.     handler_allocator& a, Handler h)
  87. {
  88.     return custom_alloc_handler<Handler>(a, h);
  89. }

  90. /// A pool of io_service objects.
  91. class io_service_pool
  92.     : private boost::noncopyable
  93. {
  94. public:
  95.     /// Construct the io_service pool.
  96.     explicit io_service_pool(std::size_t pool_size) : next_io_service_(0)
  97.     {
  98.         if (pool_size == 0)
  99.             throw std::runtime_error("io_service_pool size is 0");

  100.         // Give all the io_services work to do so that their run() functions will not
  101.         // exit until they are explicitly stopped.
  102.         for (std::size_t i = 0; i < pool_size; ++i)
  103.         {
  104.             io_service_ptr io_service(new boost::asio::io_service);
  105.             work_ptr work(new boost::asio::io_service::work(*io_service));
  106.             io_services_.push_back(io_service);
  107.             work_.push_back(work);
  108.         }
  109.     }

  110.     // Run all io_service objects in the pool.
  111.     void run()
  112.     {
  113.         // Create a pool of threads to run all of the io_services.
  114.         std::vector<boost::shared_ptr<boost::thread> > threads;
  115.         for (std::size_t i = 0; i < io_services_.size(); ++i)
  116.         {
  117.             boost::shared_ptr<boost::thread> thread(new boost::thread(
  118.                 boost::bind(&boost::asio::io_service::run, io_services_[i])));
  119.             threads.push_back(thread);
  120.         }

  121.         // Wait for all threads in the pool to exit.
  122.         for (std::size_t i = 0; i < threads.size(); ++i)
  123.             threads[i]->join();
  124.     }

  125.     // Stop all io_service objects in the pool.
  126.     void stop()
  127.     {
  128.         // Explicitly stop all io_services.
  129.         for (std::size_t i = 0; i < io_services_.size(); ++i)
  130.             io_services_[i]->stop();
  131.     }

  132.     // Get an io_service to use.
  133.     boost::asio::io_service& get_io_service()
  134.     {
  135.         // Use a round-robin scheme to choose the next io_service to use.
  136.         boost::asio::io_service& io_service = *io_services_[next_io_service_];
  137.         ++next_io_service_;
  138.         if (next_io_service_ == io_services_.size())
  139.             next_io_service_ = 0;
  140.         return io_service;
  141.     }

  142. private:
  143.     typedef boost::shared_ptr<boost::asio::io_service> io_service_ptr;
  144.     typedef boost::shared_ptr<boost::asio::io_service::work> work_ptr;

  145.     /// The pool of io_services.
  146.     std::vector<io_service_ptr> io_services_;

  147.     /// The work that keeps the io_services running.
  148.     std::vector<work_ptr> work_;

  149.     /// The next io_service to use for a connection.
  150.     std::size_t next_io_service_;
  151. };

  152. class session
  153.     : public boost::enable_shared_from_this<session>
  154. {
  155. public:
  156.     session(boost::asio::io_service& work_service
  157.         , boost::asio::io_service& io_service)
  158.         : socket_(io_service)
  159.         , io_work_service(work_service)
  160.     {
  161.     }

  162.     tcp::socket& socket()
  163.     {
  164.         return socket_;
  165.     }

  166.     void start()
  167.     {
  168.         socket_.async_read_some(boost::asio::buffer(data_),
  169.             make_custom_alloc_handler(allocator_,
  170.             boost::bind(&session::handle_read,
  171.             shared_from_this(),
  172.             boost::asio::placeholders::error,
  173.             boost::asio::placeholders::bytes_transferred)));
  174.     }

  175.     void handle_read(const boost::system::error_code& error,
  176.         size_t bytes_transferred)
  177.     {
  178.         if (!error)
  179.         {
  180.             boost::shared_ptr<std::vector<char> > buf(new std::vector<char>);

  181.             buf->resize(bytes_transferred);
  182.             std::copy(data_.begin(), data_.begin() + bytes_transferred, buf->begin());
  183.             io_work_service.post(boost::bind(&session::on_receive
  184.                 , shared_from_this(), buf, bytes_transferred));

  185.             socket_.async_read_some(boost::asio::buffer(data_),
  186.                 make_custom_alloc_handler(allocator_,
  187.                 boost::bind(&session::handle_read,
  188.                 shared_from_this(),
  189.                 boost::asio::placeholders::error,
  190.                 boost::asio::placeholders::bytes_transferred)));
  191.         }
  192.     }

  193.     void handle_write(const boost::system::error_code& error)
  194.     {
  195.         if (!error)
  196.         {
  197.         }
  198.     }

  199.     void on_receive(boost::shared_ptr<std::vector<char> > buffers
  200.         , size_t bytes_transferred)
  201.     {
  202.         char* data_stream = &(*buffers->begin());
  203.         // in here finish the work.
  204.         std::cout << "receive :" << bytes_transferred << " bytes." <<
  205.             "message :" << data_stream << std::endl;
  206.     }

  207. private:
  208.     // The io_service used to finish the work.
  209.     boost::asio::io_service& io_work_service;

  210.     // The socket used to communicate with the client.
  211.     tcp::socket socket_;

  212.     // Buffer used to store data received from the client.
  213.     boost::array<char, 1024> data_;

  214.     // The allocator to use for handler-based custom memory allocation.
  215.     handler_allocator allocator_;
  216. };

  217. typedef boost::shared_ptr<session> session_ptr;

  218. class server
  219. {
  220. public:
  221.     server(short port, std::size_t io_service_pool_size)
  222.         : io_service_pool_(io_service_pool_size)
  223.         , io_service_work_pool_(io_service_pool_size)
  224.         , acceptor_(io_service_pool_.get_io_service(), tcp::endpoint(tcp::v4(), port))
  225.     {
  226.         session_ptr new_session(new session(io_service_work_pool_.get_io_service()
  227.             , io_service_pool_.get_io_service()));
  228.         acceptor_.async_accept(new_session->socket(),
  229.             boost::bind(&server::handle_accept, this, new_session,
  230.             boost::asio::placeholders::error));
  231.     }

  232.     void handle_accept(session_ptr new_session,
  233.         const boost::system::error_code& error)
  234.     {
  235.         if (!error)
  236.         {
  237.             new_session->start();
  238.             new_session.reset(new session(io_service_work_pool_.get_io_service()
  239.                 , io_service_pool_.get_io_service()));
  240.             acceptor_.async_accept(new_session->socket(),
  241.                 boost::bind(&server::handle_accept, this, new_session,
  242.                 boost::asio::placeholders::error));
  243.         }
  244.     }

  245.     void run()
  246.     {
  247.         io_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run
  248.             , &io_service_pool_)));
  249.         work_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run
  250.             , &io_service_work_pool_)));
  251.     }

  252.     void stop()
  253.     {
  254.         io_service_pool_.stop();
  255.         io_service_work_pool_.stop();

  256.         io_thread_->join();
  257.         work_thread_->join();
  258.     }

  259. private:
  260.     boost::shared_ptr<boost::thread> io_thread_;
  261.     boost::shared_ptr<boost::thread> work_thread_;
  262.     io_service_pool io_service_pool_;
  263.     io_service_pool io_service_work_pool_;
  264.     tcp::acceptor acceptor_;
  265. };

  266. int main(int argc, char* argv[])
  267. {
  268.     try
  269.     {
  270.         if (argc != 2)
  271.         {
  272.             std::cerr << "Usage: server /n";
  273.             return 1;
  274.         }

  275.         using namespace std; // For atoi.
  276.         server s(atoi(argv[1]), 10);

  277.         s.run();

  278.         getchar();

  279.         s.stop();
  280.     }
  281.     catch (std::exception& e)
  282.     {
  283.         std::cerr << "Exception: " << e.what() << "/n";
  284.     }

  285.     getchar();
  286.     return 0;
  287. }

阅读(2934) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~