Chinaunix首页 | 论坛 | 博客
  • 博客访问: 85432
  • 博文数量: 19
  • 博客积分: 487
  • 博客等级: 下士
  • 技术积分: 205
  • 用 户 组: 普通用户
  • 注册时间: 2011-03-18 15:57
文章分类

全部博文(19)

文章存档

2011年(19)

我的朋友

分类: C/C++

2011-03-18 15:57:14

 
    代码很粗糙,希望朋友们多多指正^&^
  1. #include <boost/asio/deadline_timer.hpp>
  2. #include <boost/asio/io_service.hpp>
  3. #include <boost/asio/ip/tcp.hpp>
  4. #include <boost/asio/read_until.hpp>
  5. #include <boost/asio/streambuf.hpp>
  6. #include <boost/asio/write.hpp>
  7. #include <boost/asio.hpp>
  8. #include <boost/bind.hpp>
  9. #include <boost/shared_ptr.hpp>
  10. #include <boost/enable_shared_from_this.hpp>
  11. #include <iostream>
  12. #include "Message.hpp"
  13. #define MAXLEN 1024
  14. #define SECONDS 5

  15. using namespace std;
  16. using namespace ****;
  17. using namespace ****::Protocol;
  18. using boost::asio::deadline_timer;
  19. using boost::asio::ip::tcp;
  20. using boost::asio::ip::udp;

  21. class Session{
  22.     public:
  23.         Session(boost::asio::io_service& io_service, const char *path, int port)
  24.             :stopped_(false),
  25.             socket_(io_service),
  26.             command_(io_service, udp::endpoint(udp::v4(), port)),
  27.             deadline_(io_service),
  28.             heartbeat_timer_(io_service)
  29.     {
  30.                 callback_ = new Callback(path);
  31.                 input_stream_ = new char[MAXLEN];
  32.             }
  33.         ~Session(){
  34.             delete callback_;
  35.             delete input_stream_;    
  36.         }

  37.         void start(tcp::resolver::iterator endpoint_iter){
  38.             start_connect(endpoint_iter);
  39.             deadline_.async_wait(boost::bind(&Session::check_deadline, this));
  40.         }

  41.         void stop(){
  42.             stopped_ = true;
  43.             socket_.close();
  44.             deadline_.cancel();
  45.             heartbeat_timer_.cancel();
  46.         }


  47.     private:
  48.         void start_connect(tcp::resolver::iterator endpoint_iter){
  49.             if(endpoint_iter != tcp::resolver::iterator()){
  50.                 logs.write_log(NORMAL, "inof:Connect--->");    
  51.                         
  52.                 deadline_.expires_from_now(boost::posix_time::seconds(60));
  53.                 socket_.async_connect(endpoint_iter->endpoint(),
  54.                         boost::bind(&Session::handle_connect,
  55.                             this, _1, endpoint_iter));
  56.             }    
  57.             else{
  58.                 stop();    
  59.             }
  60.         }

  61.         void handle_connect(const boost::system::error_code& ec,
  62.                 tcp::resolver::iterator endpoint_iter){

  63.             if(stopped_)
  64.                 return;    
  65.             if(!socket_.is_open()){
  66.                 logs.write_log(NORMAL, "info:Connect time out");    
  67.                 start_connect(++endpoint_iter);
  68.             }
  69.             else if(ec){
  70.                 logs.write_log(NORMAL, "info:Connect error");    
  71.                 socket_.close();

  72.                 start_connect(++endpoint_iter);
  73.             }
  74.             else{
  75.                 //logs.write_log(NORMAL, "info:Conected to %s ", endpoint_iter->endpoint());    

  76.                 logs.write_log(NORMAL, "info:Conected! ");
  77.                 start_read();
  78.                 start_write_heartbeat();
  79.                 start_delivery();
  80.             }
  81.         }

  82.         void start_read(){
  83.             deadline_.expires_from_now(boost::posix_time::seconds(SECONDS*6));
  84.             socket_.async_receive(
  85.                     boost::asio::buffer(&header_, HEADERLEN),
  86.                     boost::bind(&Session::handle_read_header, this,
  87.                         boost::asio::placeholders::error)
  88.                     );
  89.         }

  90.         void handle_read_header(const boost::system::error_code& ec){
  91.             if(stopped_)
  92.                 return;    
  93.             if(!ec){
  94.                 if(HEARTBEAT == header_.type){
  95.                     logs.write_log(NORMAL, "info:Receive heartbeat message--->");    
  96.                     deadline_.expires_from_now(boost::posix_time::seconds(SECONDS*6));
  97.                     //heartbeat_timer_.expires_from_now(boost::posix_time::seconds(SECONDS));

  98.                     start_read();
  99.                 }    
  100.                 else if(PACKET == header_.type){
  101.                     logs.write_log(NORMAL, "info:Receive packet message--->");    
  102.                     socket_.async_receive(
  103.                             boost::asio::buffer(input_stream_, header_.length),
  104.                             boost::bind(&Session::handle_read_body, this,
  105.                                 boost::asio::placeholders::error)
  106.                             );
  107.                     logs.write_log(NORMAL, "info:Receive %d bytes", header_.length);    
  108.                 }
  109.             }
  110.             else{
  111.                 logs.write_log(NORMAL, "Error on receive: %s", (ec.message()).c_str());    
  112.                 stop();
  113.             }
  114.         }

  115.         void handle_read_body(const boost::system::error_code& ec){
  116.             if(stopped_)
  117.                 return;    
  118.             if(!ec){
  119.                 if(packet_message_.Parse(input_stream_, header_.length)){
  120.                     Packet * packet = packet_message_.pack();
  121.                     logs.write_log(NORMAL, "info:Receive data: %s", (packet->data()).c_str());
  122.                     callback_->Write(packet);            
  123.                 }

  124.                 start_read();
  125.             }
  126.             else{
  127.                 logs.write_log(NORMAL, "Error on receive: %s", (ec.message()).c_str());    
  128.                 stop();
  129.             }
  130.         }

  131.         void start_write_heartbeat(){
  132.             if(stopped_)
  133.                 return ;    
  134.             Header header;
  135.             header.length = 0;
  136.             header.type = HEARTBEAT;
  137.             boost::asio::async_write(socket_, boost::asio::buffer(&header, HEADERLEN),
  138.                     boost::bind(&Session::handle_write_heartbeat, this,
  139.                         boost::asio::placeholders::error    ));
  140.             logs.write_log(NORMAL, "info:Send heartbeat message--->");
  141.         }

  142.         void handle_write_heartbeat(const boost::system::error_code& ec){
  143.             if(stopped_)
  144.                 return ;    

  145.             if(!ec){
  146.                 heartbeat_timer_.expires_from_now(boost::posix_time::seconds(SECONDS));
  147.                 heartbeat_timer_.async_wait(boost::bind(&Session::start_write_heartbeat, this));

  148.             }
  149.             else{
  150.                 logs.write_log(NORMAL, "info:Error on heartbeat: %s ", (ec.message()).c_str());
  151.                 stop();

  152.             }
  153.         }

  154.         void start_delivery(){
  155.             if(stopped_)
  156.                 return ;    
  157.             Configure * conf = conf_message_.conf();
  158.             Header header;
  159.             conf->set_ip("10.14.2.40");
  160.             conf->set_conf("\\d+\\w+*");
  161.             conf->set_file("/tmp/aaa.txt");
  162.             conf->set_chars("hello world");
  163.             conf->set_offset(0);
  164.             conf_message_.Serialize();
  165.             header.length = conf_message_.length();
  166.             header.type = CONFIG;
  167.             socket_.async_send(
  168.                     boost::asio::buffer(&header,    HEADERLEN),
  169.                     boost::bind(&Session::handle_write_config, this,
  170.                         boost::asio::placeholders::error)
  171.                     );
  172.         }

  173.         void handle_write_config(const boost::system::error_code& ec){
  174.             if(stopped_)
  175.                 return ;    

  176.             if(!ec){
  177.                 socket_.async_send(
  178.                         boost::asio::buffer(conf_message_.data(),
  179.                             conf_message_.length()),
  180.                         boost::bind(&Session::handle_wait_commander, this,
  181.                             boost::asio::placeholders::error)
  182.                         );
  183.             }
  184.         }

  185.         void handle_wait_commander(const boost::system::error_code& ec){
  186.             logs.write_log(NORMAL, "info:Waiting for the commander --->..");    
  187.             /*
  188.              *boost::bind(&Session::delivery--->.)
  189.              * */
  190.             command_.async_receive_from(
  191.                     boost::asio::buffer(&commander_header_, HEADERLEN),
  192.                     receive_entpoint_,
  193.                     boost::bind(&Session::handle_check_commander, this,
  194.                         boost::asio::placeholders::error)
  195.                     );
  196.         }

  197.         void handle_check_commander(const boost::system::error_code& ec){
  198.             if(DELIVERY == commander_header_.type || 8888 == commander_header_.length){
  199.                 logs.write_log(NORMAL, "info:Receive a Command, delivery a Configure ");    
  200.                 /*
  201.                 boost::bind(&Session::start_delivery, this,
  202.                         boost::asio::placeholders::error);
  203.                         */
  204.                 start_delivery();
  205.             }    
  206.             else{
  207.                 logs.write_log(NORMAL, "Error:Bad command format!");    
  208.             }
  209.         }

  210.         void check_deadline()
  211.         {
  212.             if (stopped_)
  213.                 return;

  214.             // Check whether the deadline has passed. We compare the deadline against

  215.             // the current time since a new asynchronous operation may have moved the

  216.             // deadline before this actor had a chance to run.

  217.             if (deadline_.expires_at() <= deadline_timer::traits_type::now())
  218.             {
  219.                 // The deadline has passed. The socket is closed so that any outstanding

  220.                 // asynchronous operations are cancelled.

  221.                 socket_.close();

  222.                 // There is no longer an active deadline. The expiry is set to positive

  223.                 // infinity so that the actor takes no action until a new deadline is set.

  224.                 deadline_.expires_at(boost::posix_time::pos_infin);
  225.             }

  226.             // Put the actor back to sleep.

  227.             deadline_.async_wait(boost::bind(&Session::check_deadline, this));
  228.         }

  229.     private:
  230.         bool stopped_;
  231.         tcp::socket socket_;
  232.         udp::socket command_;
  233.         udp::endpoint receive_entpoint_;
  234.         Header header_;
  235.         Header commander_header_;
  236.         ConfigMessage conf_message_;
  237.         DataMessage packet_message_;
  238.         HeartbeatMessage heartbeat_message_;
  239.         deadline_timer deadline_;
  240.         deadline_timer heartbeat_timer_;
  241.         Callback *callback_;
  242.         char *input_stream_;

  243. };

  244. int main(int argc, char* argv[])
  245. {
  246.     try
  247.     {
  248.         if (argc != 4)
  249.         {
  250.             std::cerr << "Usage: Session \n";
  251.             return 1;
  252.         }

  253.         boost::asio::io_service io_service;
  254.         tcp::resolver r(io_service);
  255.         const char * path = "/home/lidawei/Doc/file";
  256.         Session s(io_service, path, std::atoi(argv[3]));
  257.         logs.write_log(NORMAL, "record file: %s", path);

  258.         s.start(r.resolve(tcp::resolver::query(argv[1], argv[2])));
  259.         logs.write_log(NORMAL, "info:Session start--->");

  260.         io_service.run();
  261.     }
  262.     catch (std::exception& e)
  263.     {
  264.         std::cerr << "Exception: " << e.what() << "\n";
  265.     }

  266.     return 0;
  267. }
    编译:
  1. g++ -Wall -g -I /usr/include/boost -L /usr/lib/boost easyzlib.o LogServer.pb.cc Message.cpp daemon.cpp Client.cpp -o Client -lboost_thread -lboost_system `pkg-config --cflags --libs protobuf`
阅读(10384) | 评论(1) | 转发(0) |
0

上一篇:没有了

下一篇:Boost::asio 阻塞模式的UDP客户端【很简单】

给主人留下些什么吧!~~

litanhua_2011-03-18 16:13:08