endin.pro
- message Endin
- {
- required string ip = 1;
- required int32 port = 2;
- required int32 max_retry_times = 3;
- }
proxy.hpp
- #ifndef _CONNECTION_H
- #define _CONNECTION_H
- #include <iostream>
- #include <cstdio>
- #include <unistd.h>
- #include <boost/asio.hpp>
- #include <boost/asio/ip/tcp.hpp>
- #include <boost/asio/io_service.hpp>
- //#include <boost/bind.hpp>
- #include <boost/shared_ptr.hpp>
- #include <boost/thread.hpp>
- #include "endin.pro.pb.h"
- #define DELAYTIME 10
- #define RETRYTIME 10
- #define DEFAULT_SERVER_PORT 2048
- #define TRACE() do{cout << __FILE__ << " : " << __FUNCTION__ << " : " << __LINE__ << endl; }while (0)
- using namespace std;
- namespace PWRD{
- namespace Net{
- struct Header{ size_t length; };
- #define HEADERLEN sizeof(Header)
- class Session{
- protected:
- static int sid_;
- public:
- Session(){
- }
- virtual ~Session(){
- }
- int sid(){
- return ++sid_;
- }
- };
- int Session::sid_ = 0;
- class PassiveSession: public Session{
- public:
- PassiveSession(boost::asio::io_service &_io_service):
- socket_(_io_service){
- cout << "Create a new PassiveSession" << endl;
- }
- boost::asio::ip::tcp::socket& socket(){
- return socket_;
- }
- virtual ~PassiveSession(){
- }
- private:
- boost::asio::ip::tcp::socket socket_;
- };
- class ActiveSession: public Session{
- public:
- ActiveSession(boost::asio::io_service &_io_service, string _ip, int _port, int _max_retry_times = RETRYTIME):
- socket_(_io_service),
- endpoint_(boost::asio::ip::address::from_string(_ip), _port),
- strand_(_io_service),
- max_retry_times_(_max_retry_times),
- connected_(false)
- {
- cout << "Create a new ActiveSession" << endl;
- }
- boost::asio::ip::tcp::socket& socket(){
- return socket_;
- }
- void handle_connect(const boost::system::error_code &ec){
- if(!ec){
- cout << "Connected!" << endl;
- connected_ = true;
- }
- }
- void start(){
- cout << "ActiveSession begin to establish a connection" << endl;
- run();
- }
- void run(){
- /*
- int retry = 0;
- while(!connected_ && retry < max_retry_times_){
- this->socket().async_connect(endpoint_,
- strand_.wrap(
- bind(&ActiveSession::handle_connect, this,
- boost::asio::placeholders::error
- )));
- retry++;
- sleep(DELAYTIME);
- }
- */
- boost::system::error_code ec;
- this->socket().connect(endpoint_, ec);
- if(!ec){
- cout << "Connected!" << endl;
- connected_ = true;
- }
- else{
- cout << "Error occur: errno=" << ec.value()
- << ", errmsg=" << ec.message()
- << endl;
- TRACE();
- }
- }
- bool connected(){
- return connected_;
- }
- private:
- boost::asio::ip::tcp::socket socket_;
- boost::asio::ip::tcp::endpoint endpoint_;
- boost::asio::io_service::strand strand_;
- int max_retry_times_;
- bool connected_;
- };
- class SessionPool{
- public:
- typedef enum{ FORWARD, BACKWARD} DIRECTION;
- public:
- typedef pair<Session *, Session *> SessionPair;
- void join(PassiveSession* ps, ActiveSession *as){
- SessionPair *session_pair = new SessionPair(ps, as);
- session_map_[ps->sid()] = session_pair;
- cout << "Session: " << ps->sid() << " & " << as->sid() << " join the pool" << endl;
- }
- void leave(PassiveSession* ps){
- SessionPair *session_pair = session_map_[ps->sid()];
- if(NULL != session_pair){
- if(session_pair->first != NULL)
- delete session_pair->first;
- if(session_pair->second != NULL)
- delete session_pair->second;
- delete session_pair;
- session_map_[ps->sid()] = NULL;
- cout << "Session: " << session_pair->first->sid() << " & "
- << session_pair->second->sid() << " leave the pool" << endl;
- }
- }
- Session * session(Session *s, DIRECTION direction=FORWARD){
- if(FORWARD == direction){
- if(NULL != session_map_[s->sid()]){
- return session_map_[s->sid()]->second;
- }
- }
- else{
- SessionMap::iterator it = session_map_.begin();
- for(; it != session_map_.end(); it++){
- if(NULL != it->second){
- if(s == it->second->second){
- return it->second->first;
- }
- }
- }
- }
- }
- private:
- typedef map<int, SessionPair*> SessionMap;
- SessionMap session_map_;
- };
- class Proxy{
- public:
- Proxy(boost::asio::io_service &_io_service, SessionPool &_session_pool, PassiveSession *_ps):
- io_service_(_io_service),
- session_pool_(_session_pool),
- ps_(_ps)
- {
- cout << "Create a new Proxy......" << endl;
- *input_header_ = 0;
- *input_passive_ = 0;
- *input_active_ = 0;
- }
- void start()
- {
- cout << "Proxy start ......" << endl;
- boost::asio::async_read(ps_->socket(),
- boost::asio::buffer(&input_header_, HEADERLEN),
- boost::bind(&Proxy::handle_read, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred
- ));
- }
- void handle_read(const boost::system::error_code &_ec, size_t bytes_transferred){
- if(!_ec){
- cout << "Receive: " << bytes_transferred << " bytes" << endl;
- Header * header = (Header*)input_header_;
- cout << "Receive header length: " << header->length << " bytes" << endl;
- *buff_ = 0;
- boost::asio::async_read(ps_->socket(),
- boost::asio::buffer(buff_, header->length),
- boost::bind(&Proxy::handle_parse, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred
- ));
- }
- else{
- cout << "Error occur: errno=" << _ec.value()
- << ", errmsg=" << _ec.message()
- << endl;
- TRACE();
- stop();
- }
- }
- void handle_parse(const boost::system::error_code &_ec, size_t bytes_transferred){
- if(!_ec){
- //cout << "Receive " << bytes_transferred << " bytes" << endl;
- Endin endin;
- if(!endin.ParseFromArray(buff_, bytes_transferred)){
- cout << "Parse endin faild" << endl;
- return;
- }
- cout << "Endin: ip=" << endin.ip() << ", port=" << endin.port() << ", max_retry_times=" << endin.max_retry_times() << endl;
- string ip = endin.ip();
- int port = endin.port();
- int max_retry_times = endin.max_retry_times();
- as_ = new ActiveSession(io_service_,
- ip, port, max_retry_times);
- as_->start();
- if(!as_->connected()){
- cout << "Can't connect to " << ip << ":" << port << endl;
- return;
- }
- session_pool_.join(ps_, as_);
- handle_read_from_passive();
- }
- else{
- cout << "Error occur: errno=" << _ec.value()
- << ", errmsg=" << _ec.message()
- << endl;
- TRACE();
- stop();
- }
- }
- void handle_write(const boost::system::error_code &_ec, size_t bytes_transferred){
- if(!_ec){
- cout << "Receive " << bytes_transferred << " bytes" << endl;
- handle_read_from_passive();
- }
- else{
- cout << "Error occur: errno=" << _ec.value()
- << ", errmsg=" << _ec.message()
- << endl;
- TRACE();
- stop();
- }
- }
- void handle_read_from_passive(){
- ps_->socket().async_read_some(
- boost::asio::buffer(input_passive_),
- boost::bind(&Proxy::handle_write_to_active,
- this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred
- )
- );
- }
- void handle_read_from_active(const boost::system::error_code &_ec, size_t bytes_transferred){
- if(!_ec){
- cout << "Receive " << bytes_transferred << " bytes" << endl;
- as_->socket().async_read_some(
- boost::asio::buffer(input_active_),
- boost::bind(&Proxy::handle_write_to_passive,
- this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred
- )
- );
- }
- else{
- cout << "Error occur: errno=" << _ec.value()
- << ", errmsg=" << _ec.message()
- << endl;
- TRACE();
- stop();
- }
- }
- void handle_write_to_passive(const boost::system::error_code &_ec, size_t bytes_transferred){
- if(!_ec){
- input_active_[bytes_transferred] = 0;
- cout << "Recv--Contnet: " << input_active_ << endl;
- boost::asio::async_write(
- ps_->socket(),
- boost::asio::buffer(input_active_, bytes_transferred),
- boost::bind(&Proxy::handle_write,
- this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred
- )
- );
- }
- else{
- cout << "Error occur: errno=" << _ec.value()
- << ", errmsg=" << _ec.message()
- << endl;
- }
- }
- void handle_write_to_active(const boost::system::error_code &_ec, size_t bytes_transferred){
- if(!_ec){
- input_passive_[bytes_transferred] = 0;
- cout << "Recv--Contnet: " << input_passive_ << endl;
- boost::asio::async_write(
- as_->socket(),
- boost::asio::buffer(input_passive_, bytes_transferred),
- boost::bind(&Proxy::handle_read_from_active,
- this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred
- )
- );
- }
- else{
- cout << "Error occur: errno=" << _ec.value()
- << ", errmsg=" << _ec.message()
- << endl;
- }
- }
- void stop(){
- /*
- ps_->socket().close();
- as_->socket().close();
- session_pool_.leave(ps_);
- cout << "SessoinPair <" << ps_->sid() << "," << as_->sid() << "> stop" << endl;
- */
- }
- private:
- boost::asio::io_service &io_service_;
- SessionPool &session_pool_;
- PassiveSession* ps_;
- ActiveSession *as_;
- char buff_[100];
- char input_header_[HEADERLEN];
- char input_passive_[BUFSIZ];
- char input_active_[BUFSIZ];
- };
- class Engine{
- public:
- Engine(boost::asio::io_service &_io_service, int port = DEFAULT_SERVER_PORT):
- io_service_(_io_service),
- socket_(_io_service),
- strand_(_io_service),
- acceptor_(_io_service, boost::asio::ip::tcp::endpoint(
- boost::asio::ip::tcp::v4(), port)){
- cout << "Engine start......" << endl;
- }
- void start(){
- cout << "ProxyServer is waiting for a new connector ......" << endl;
- PassiveSession *ps = new PassiveSession(io_service_);
- acceptor_.async_accept( ps->socket(),
- strand_.wrap(
- boost::bind(&Engine::handle_accept,
- this, _1, ps)
- ) );
- //run();
- }
- void handle_accept(const boost::system::error_code &_ec, PassiveSession* _ps){
- if(!_ec){
- cout << "Accept a new socket......." << endl;
- Proxy *proxy = new Proxy(io_service_, session_pool_, _ps);
- proxy->start();
- proxy_list_.push_back(proxy);
- start();
- }
- else{
- cout << "Error occur: errno=" << _ec.value()
- << ", errmsg=" << _ec.message()
- << endl;
- }
- }
- void run(){
- while(true){
- }
- }
- virtual void stop(){
- socket_.close();
- acceptor_.close();
- }
- virtual ~Engine(){
- cout << "Engine stop ......." << endl;
- vector<Proxy*>::iterator pit = proxy_list_.begin();
- for(; pit != proxy_list_.end(); pit++){
- delete *pit;
- }
- }
- private:
- boost::asio::io_service &io_service_;
- boost::asio::ip::tcp::socket socket_;
- boost::asio::io_service::strand strand_;
- boost::asio::ip::tcp::acceptor acceptor_;
- SessionPool session_pool_;
- vector<Proxy*> proxy_list_;
- };
- }
- }
- #endif
proxy: main.cpp
- #include "proxy.hpp"
- using namespace PWRD::Net;
- int main(int argc, char* argv[]){
- if(2 != argc){
- cout << "Usage: " << argv[0] << " " << endl;
- return -1;
- }
- boost::asio::io_service io_service;
- Engine *engine = new Engine(io_service, atoi(argv[1]));
- engine->start();
- /*
- boost::shared_ptr<boost::thread> thread_ptr(
- new boost::thread(
- boost::bind(&Engine::start, engine)
- )
- );
- thread_ptr.get()->join();
- */
- io_service.run();
- return 1;
- }
client.hpp
- #ifndef _CLIENT_HPP
- #define _CLIENT_HPP
- #define DEFUALTRETRYTIMES 10
- #include <iostream>
- #include <boost/asio.hpp>
- #include <boost/bind.hpp>
- #include "endin.pro.pb.h"
- using namespace std;
- namespace PWRD{
- namespace Net{
- struct Header{ int length; };
- #define HEADERLEN sizeof(Header)
- class Client{
- public:
- Client(boost::asio::io_service& _io_service, string _ip, int _port):
- socket_(_io_service),
- ip_(_ip), port_(_port),
- connected_(false)
- {
- }
- virtual ~Client(){
- }
- void start(){
- cout << "Client start ........." << endl;
- socket_.async_connect(
- boost::asio::ip::tcp::endpoint(
- boost::asio::ip::address::from_string(ip_), port_),
- boost::bind(&Client::handle_connect, this, boost::asio::placeholders::error)
- );
- }
- size_t endin(){
- Endin endin;
- endin.set_ip(ip_);
- endin.set_port(11100);
- endin.set_max_retry_times(DEFUALTRETRYTIMES);
- endin.SerializeToArray(buff, endin.ByteSize());
- return endin.ByteSize();
- }
- void stop(){
- socket_.close();
- }
- protected:
- virtual void handle_recv(){
- socket_.async_read_some(
- boost::asio::buffer(buff),
- boost::bind(&Client::handle_recv_data,
- this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred
- ));
- }
- virtual void handle_send(){
- boost::asio::async_write(socket_,
- boost::asio::buffer("Hello world", 11),
- boost::bind(&Client::handle_write, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred)
- );
- sleep(1);
- }
- virtual void handle_connect(const boost::system::error_code& _ec){
- if(_ec){
- cout << "Can't connect to the sever" << endl;
- }
- else{
- connected_ = true;
- cout << "Connection established....." << endl;
- Header header;
- header.length = endin();
- boost::asio::async_write(socket_,
- boost::asio::buffer(&header, HEADERLEN),
- boost::bind(&Client::handle_delivery, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred)
- );
- cout << "Endin has " << header.length << " bytes" << endl;
- }
- }
- virtual void handle_delivery(const boost::system::error_code& _ec, size_t bytes_transferred ){
- if(!_ec){
- cout << "Send to " << ip_ << ":" << port_ <<" "<< bytes_transferred << " bytes" << endl;
- boost::asio::async_write(socket_,
- boost::asio::buffer(buff, endin()),
- boost::bind(&Client::handle_write_header, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred)
- );
- }
- else{
- cout << "Error: errno=" << _ec.value()
- << ", errmsg=" << _ec.message() << endl;
- stop();
- }
- }
- virtual void handle_write_header(const boost::system::error_code& _ec, size_t bytes_transferred){
- if(!_ec){
- cout << "Send to " << ip_ << ":" << port_ << " " << bytes_transferred << " bytes" << endl;
- handle_send();
- }
- else{
- cout << "Error: errno=" << _ec.value()
- << ", errmsg=" << _ec.message() << endl;
- stop();
- }
- }
- virtual void handle_write(const boost::system::error_code& _ec, size_t bytes_transferred){
- if(!_ec){
- cout << "Send to " << ip_ << ":" << port_ << " " << bytes_transferred << " bytes" << endl;
-
- cout << "In Handle write............." << endl;
- handle_recv();
- }
- else{
- cout << "Error: errno=" << _ec.value()
- << ", errmsg=" << _ec.message() << endl;
- stop();
- }
- }
- virtual void handle_recv_data(const boost::system::error_code& _ec, size_t bytes_transferred){
- if(!_ec){
- cout << "Recv from: " << ip_ << ":" << port_ << " " << bytes_transferred << " bytes" << endl;
- buff[bytes_transferred] = 0;
- cout << "Content: " << buff << endl;
- handle_send();
- }
- else{
- cout << "Error: errno=" << _ec.value()
- << ", errmsg=" << _ec.message() << endl;
- stop();
- }
- }
- private:
- boost::asio::ip::tcp::socket socket_;
- char buff[BUFSIZ];
- string ip_;
- int port_;
- bool connected_;
- };
- }
- }
- #endif
client: main.cpp
- #include "client.hpp"
- #include <boost/thread.hpp>
- #include <boost/shared_ptr.hpp>
- using namespace PWRD::Net;
- int main(int argc, char* argv[]){
- if(3 != argc){
- cout << "Usage: " << argv[0] << " " << endl;
- return -1;
- }
- boost::asio::io_service io_service;
- string ip(argv[1]);
- int port = atoi(argv[2]);
- Client *client = new Client(io_service, ip, port);
- client->start();
- /*
- boost::shared_ptr<boost::thread> thread(new boost::thread(
- boost::bind(&Client::start, client
- )));
-
- thread.get()->join();
- */
- io_service.run();
- return 1;
- }
server.hpp
- #ifndef _SERVER_HPP
- #define _SERVER_HPP
- #define DEFAULT_SERVER_PORT 2048
- #include <iostream>
- #include <cstdio>
- #include <unistd.h>
- #include <boost/asio.hpp>
- #include <boost/asio/ip/tcp.hpp>
- #include <boost/asio/io_service.hpp>
- //#include <boost/bind.hpp>
- #include <boost/shared_ptr.hpp>
- #include <boost/thread.hpp>
- #include "endin.pro.pb.h"
- namespace PWRD{
- namespace Net{
- using namespace std;
- struct Header{ size_t length; };
- #define HEADERLEN sizeof(Header)
- class Session{
- protected:
- static int sid_;
- public:
- Session(){
- ++sid_;
- }
- virtual ~Session(){
- }
- int sid(){
- return sid_;
- }
- };
- int Session::sid_ = 0;
- class PassiveSession: public Session{
- public:
- PassiveSession(boost::asio::io_service &_io_service):
- socket_(_io_service) {
- cout << "Create a new PassiveSession" << endl;
- *input_stream_ = 0;
- *output_stream_ = 0;
- *header_stream_ = 0;
- }
- boost::asio::ip::tcp::socket& socket(){
- return socket_;
- }
- virtual ~PassiveSession(){
- cout << "~PassiveSession()" << endl;
- }
- virtual void start(){
- cout << "PassiveSession start to read......" << endl;
- *input_stream_ = 0;
- socket_.async_read_some(
- boost::asio::buffer(input_stream_),
- boost::bind(&PassiveSession::handle_read_content, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred
- ));
- }
- virtual void run(){
- cout << "PassiveSession running ......" << endl;
- }
- virtual void stop(){
- socket_.close();
- }
- protected:
- virtual void handle_read_content(const boost::system::error_code&_ec, size_t bytes_transferred){
- if(!_ec){
- input_stream_[bytes_transferred] = 0;
- cout << "Receive: " << bytes_transferred << " bytes" << endl
- << "Content: " << input_stream_ << endl;
- boost::asio::async_write( socket_,
- boost::asio::buffer("HELLO WORLD", 11),
- boost::bind(&PassiveSession::handle_write,
- this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred
- )
- );
- sleep(1);
- }
- else{
- cout << "Error occur: errno=" << _ec.value()
- << ", errmsg=" << _ec.message()
- << endl;
- stop();
- }
- }
- virtual void handle_write(const boost::system::error_code& _ec, size_t bytes_transferred){
- if(!_ec){
- cout << "Write: " << bytes_transferred << " bytes" << endl;
- socket_.async_read_some(
- boost::asio::buffer(input_stream_),
- boost::bind(&PassiveSession::handle_read_content, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred
- ));
- }
- else{
- cout << "Error occur: errno=" << _ec.value()
- << ", errmsg=" << _ec.message()
- << endl;
- stop();
- }
- }
- private:
- boost::asio::ip::tcp::socket socket_;
- char header_stream_[BUFSIZ];
- char input_stream_[BUFSIZ];
- char output_stream_[BUFSIZ];
- };
- class Server{
- public:
- Server(boost::asio::io_service& _io_service, int port = DEFAULT_SERVER_PORT):
- io_service_(_io_service),
- acceptor_(_io_service, boost::asio::ip::tcp::endpoint(
- boost::asio::ip::tcp::v4(), port))
- {
- }
- virtual ~Server(){
- }
- virtual void start(){
- PassiveSession *ps = new PassiveSession(io_service_);
- acceptor_.async_accept( ps->socket(),
- boost::bind(&Server::handle_accept,
- this, _1, ps)
- );
- }
- protected:
- virtual void handle_accept(const boost::system::error_code &_ec, PassiveSession* _ps){
- if(!_ec){
- cout << "Accept a new socket......." << endl;
- _ps->start();
- /*
- boost::shared_ptr<boost::thread> thread_ptr(new boost::thread(
- boost::bind(&PassiveSession::start, _ps)
- ));
- thread_ptr.get()->detach();
- */
- start();
- }
- else{
- cout << "Error occur: errno=" << _ec.value()
- << ", errmsg=" << _ec.message()
- << endl;
- }
- }
- private:
- boost::asio::io_service &io_service_;
- boost::asio::ip::tcp::acceptor acceptor_;
- };
- }
- }
- #endif
- /*
- virtual void handle_read_header(const boost::system::error_code& _ec, size_t bytes_transferred){
-
- if(!_ec){
- cout << "Receive: " << bytes_transferred << " bytes" << endl;
- Header * header = (Header*)header_stream_;
- *input_stream_ = 0;
- boost::asio::async_read(socket_,
- boost::asio::buffer(input_stream_, header->length),
- boost::bind(&PassiveSession::handle_read_body, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred
- ));
- }
- else{
- cout << "Error occur: errno=" << _ec.value()
- << ", errmsg=" << _ec.message()
- << endl;
- stop();
- }
- }
- virtual void handle_read_body(const boost::system::error_code& _ec, size_t bytes_transferred){
- if(!_ec){
- cout << "Receive: " << bytes_transferred << " bytes" << endl;
- Endin endin;
- if(endin.ParseFromArray(input_stream_, bytes_transferred)){
- cout << "Endin: ip=" << endin.ip() << ", port=" << endin.port() << ", max_retry_times=" << endin.max_retry_times() << endl;
- }
- *input_stream_ = 0;
- socket_.async_read_some(
- boost::asio::buffer(input_stream_),
- boost::bind(&PassiveSession::handle_read_content, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred
- ));
- }
- else{
- cout << "Error occur: errno=" << _ec.value()
- << ", errmsg=" << _ec.message()
- << endl;
- stop();
- }
- }
- */
server: main.cpp
- #include "server.hpp"
- #include <boost/thread.hpp>
- #include <boost/shared_ptr.hpp>
- using namespace PWRD::Net;
- int main(int argc, char* argv[]){
- if(2 != argc){ cout << "Usage: " << argv[0] << " " << endl;
- return -1;
- }
- boost::asio::io_service io_service;
- int port = atoi(argv[1]);
- Server *server = new Server(io_service, port);
- server->start();
- /*
- boost::shared_ptr<boost::thread> thread(new boost::thread(
- boost::bind(&Server::start, server
- )));
-
- thread.get()->join();
- */
- io_service.run();
- return 1;
- }
阅读(2282) | 评论(0) | 转发(0) |