Chinaunix首页 | 论坛 | 博客
  • 博客访问: 87904
  • 博文数量: 19
  • 博客积分: 487
  • 博客等级: 下士
  • 技术积分: 205
  • 用 户 组: 普通用户
  • 注册时间: 2011-03-18 15:57
文章分类

全部博文(19)

文章存档

2011年(19)

我的朋友

分类: C/C++

2011-04-20 11:07:33

Producer.hpp
  1. #ifndef _PRODUCER_H
  2. #define _PRODUCER_H

  3. #include <string>
  4. #include <memory>

  5. #include <cms/Session.h>
  6. #include <cms/MessageProducer.h>
  7. #include <cms/MessageConsumer.h>
  8. #include <cms/MessageListener.h>
  9. #include <cms/ConnectionFactory.h>

  10. #include <cms/Connection.h>
  11. #include <decaf/lang/Runnable.h>
  12. #include <decaf/util/concurrent/CountDownLatch.h>

  13. #include <activemq/commands/ActiveMQMessage.h>
  14. #include <activemq/commands/ProducerInfo.h>
  15. #include <activemq/library/ActiveMQCPP.h>

  16. #include "Message.hpp"
  17. using namespace PWRD;
  18. using namespace std;
  19. using namespace activemq;
  20. using namespace cms;
  21. using namespace decaf;
  22. using namespace decaf::lang;
  23. using namespace decaf::lang::exceptions;

  24. namespace PWRD{
  25.     namespace ActiveMQ{

  26.         /**
  27.          * A sample Prodcer that will only send Messages on its Topic when it has
  28.          * received an advisory indicating that there is an active MessageConsumer
  29.          * on the Topic. Once another message comes in indicating that there is no
  30.          * longer a consumer then this Producer stops producing again.
  31.          *
  32.          * @since 3.0
  33.          */
  34. #define DEFAULTBROKERURI "failover:(tcp://127.0.0.1:61616)"
  35.         class Producer : public decaf::lang::Runnable, public cms::MessageListener {
  36.         
  37.             private:

  38.                 volatile bool consumerOnline;
  39.                 volatile bool shutdown;
  40.                 decaf::util::concurrent::CountDownLatch shutdownLatch;

  41.                 cms::Session* session;
  42.                 std::auto_ptr<cms::MessageConsumer> consumer;
  43.                 std::auto_ptr<cms::MessageProducer> producer;
  44.                 std::string brokerURI;
  45.                 auto_ptr<cms::Connection> connection;

  46.             public:

  47.                 Producer(string _brokerURI = DEFAULTBROKERURI );
  48.                 virtual ~Producer();

  49.                 /**
  50.                  * Shut down the processing that occurs in the Run method.
  51.                  */
  52.                 void stop();

  53.                 /**
  54.                  * Run the producer code.
  55.                  */
  56.                 virtual void run();

  57.                 void transfer(Packet *pack);

  58.                 /**
  59.                  * Async Message callback.
  60.                  */
  61.                 virtual void onMessage( const cms::Message* message );

  62.         };

  63.     }
  64. }

  65. #endif /* _ACTIVEMQCPP_EXAMPLES_ADVISORIES_ADVISORYPRODUCER_H_ */
Producer.cpp
  1. #include "Producer.hpp"

  2. #include <cms/Topic.h>
  3. #include <cms/Message.h>
  4. #include <cms/TextMessage.h>
  5. #include <decaf/lang/exceptions/NullPointerException.h>
  6. #include <decaf/lang/Integer.h>


  7. namespace PWRD{
  8.     namespace ActiveMQ{
  9.         ////////////////////////////////////////////////////////////////////////////////
  10.         Producer::Producer(string _brokerURI) : shutdownLatch(1) {

  11.             brokerURI = _brokerURI;
  12.             if(!brokerURI.empty()){
  13.                 auto_ptr<cms::ConnectionFactory> connectionFactory(
  14.                         cms::ConnectionFactory::createCMSConnectionFactory( brokerURI ) );

  15.                 // Create a Connection
  16.                 try{
  17.                     connection.reset( connectionFactory->createConnection() );
  18.                 } catch( CM***ception& e ) {
  19.                     e.printStackTrace();
  20.                 }

  21.                 // Create the Session
  22.                 session = connection->createSession();
  23.                 if( session == NULL ) {
  24.                     throw NullPointerException(
  25.                             __FILE__, __LINE__, "Session Object passed was Null." );
  26.                 }

  27.                 std::auto_ptr<cms::Topic> destination( session->createTopic(
  28.                             "HEART-BEAT-CHANNEL" ) );
  29.                 std::auto_ptr<cms::Topic> advisories( session->createTopic(
  30.                             "ActiveMQ.Advisory.Consumer.Topic.HEART-BEAT-CHANNEL" ) );
  31.                 this->producer.reset( session->createProducer( destination.get() ) );
  32.                 this->consumer.reset( session->createConsumer( advisories.get() ) );
  33.                 this->consumer->setMessageListener( this );

  34.                 connection->start();
  35.             }

  36.             this->shutdown = false;
  37.             this->consumerOnline = false;
  38.         }

  39.         ////////////////////////////////////////////////////////////////////////////////
  40.         Producer::~Producer() {
  41.             connection->stop();
  42.         }

  43.         ////////////////////////////////////////////////////////////////////////////////
  44.         void Producer::stop() {
  45.             this->shutdown = true;
  46.             this->shutdownLatch.await( 3000 );
  47.         }

  48.         ////////////////////////////////////////////////////////////////////////////////
  49.         void Producer::run() {

  50.             /*
  51.             while( !this->shutdown ) {

  52.                 if( this->consumerOnline ) {

  53.                     std::auto_ptr<cms::TextMessage> message(
  54.                             this->session->createTextMessage( "Alive" ) );

  55.                     //this->producer->send( message.get() );
  56.                     Packet *packet = new Packet();
  57.                     packet->set_ip("10.14.2.40");
  58.                     packet->set_file("test");
  59.                     packet->set_offset(0);
  60.                     packet->set_data("Hello world");
  61.                     packet->set_type(ENORMAL);
  62.                     this->transfer(packet);

  63.                     Thread::sleep( 1000 );
  64.                     delete packet;
  65.                 }
  66.             }

  67.             this->shutdownLatch.countDown();
  68.             */
  69.         }

  70.         void Producer::transfer(Packet *pack){
  71.             if( !this->shutdown){
  72.                 if(this->consumerOnline){

  73.                     std::auto_ptr<cms::TextMessage> message(
  74.                             this->session->createTextMessage(pack->data()));

  75.                     this->producer->send( message.get() );
  76.                     logs.write_log(NORMAL, "Send message to default destination");

  77.                 }    
  78.                 else{
  79.                     logs.write_log(NORMAL, "There are no consumer");    
  80.                 }
  81.             }

  82.             this->shutdownLatch.countDown();
  83.         }

  84.         ////////////////////////////////////////////////////////////////////////////////
  85.         void Producer::onMessage( const cms::Message* message ) {

  86.             if( message->getCMSType() == "Advisory" ) {

  87.                 std::cout << "Received an Advisory Message!" << std::endl;

  88.                 if( message->propertyExists( "consumerCount" ) ) {

  89.                     std::string consumerCount = message->getStringProperty( "consumerCount" );
  90.                     std::cout << "Number of Consumers = " << consumerCount << std::endl;
  91.                     this->consumerOnline = Integer::parseInt( consumerCount ) > 0 ? true : false;
  92.                 }

  93.             } else {
  94.                 std::cout << "Received a Non-Advisory Message!" << std::endl;
  95.             }

  96.         }
  97.     }
  98. }
ProducerMain.cpp
  1. #include "Producer.hpp"

  2. #include <decaf/lang/Thread.h>
  3. #include <decaf/lang/Runnable.h>
  4. #include <activemq/library/ActiveMQCPP.h>
  5. #include <cms/ConnectionFactory.h>
  6. #include <cms/Connection.h>
  7. #include <cms/Session.h>
  8. #include <stdlib.h>
  9. #include <iostream>
  10. #include <memory>

  11. using namespace cms;
  12. using namespace std;
  13. using namespace PWRD;
  14. using namespace PWRD::ActiveMQ;

  15. using namespace decaf;
  16. using namespace decaf::lang;
  17. using namespace decaf::lang::exceptions;
  18. ////////////////////////////////////////////////////////////////////////////////
  19. int main( int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED ) {

  20.     // We must always init the library first before using any methods in it.
  21.     activemq::library::ActiveMQCPP::initializeLibrary();

  22.     std::cout << "=====================================================\n";
  23.     std::cout << "Starting the example:" << std::endl;
  24.     std::cout << "-----------------------------------------------------\n";

  25.     // Set the URI to point to the IPAddress of your broker.
  26.     // add any optional params to the url to enable things like
  27.     // tightMarshalling or tcp logging etc. See the CMS web site for
  28.     // a full list of configuration options.
  29.     //
  30.     // http://activemq.apache.org/cms/
  31.     //
  32.     std::string brokerURI = "failover:(tcp://10.14.2.40:61616)";

  33.     {
  34.         // Create the Connection
  35.         Producer producer( brokerURI );
  36.         Thread runner( &producer );
  37.         runner.start();

  38.         // Start the Connection now.

  39.         // Wait until we are told to quit.
  40.         while(1){
  41.             Packet *packet = new Packet();
  42.             packet->set_ip("10.14.2.40");
  43.             packet->set_file("test");
  44.             packet->set_offset(0);
  45.             packet->set_data("Hello world");
  46.             packet->set_type(ENORMAL);
  47.             producer.transfer(packet);

  48.             Thread::sleep( 1000 );
  49.             delete packet;

  50.         }
  51.         std::cout << "Press 'q' to quit" << std::endl;
  52.         while( std::cin.get() != 'q') {}

  53.         // Shutdown now
  54.         producer.stop();
  55.     }

  56.     std::cout << "-----------------------------------------------------\n";
  57.     std::cout << "Finished with the example." << std::endl;
  58.     std::cout << "=====================================================\n";

  59.     // We must also always remember to shut down the Library when done.
  60.     activemq::library::ActiveMQCPP::shutdownLibrary();

  61.     return 0;
  62. }
 
/*******************************************************************************************/
 
Consumer.hpp
  1. #ifndef _CONSUMER_H_
  2. #define _CONSUMER_H_

  3. #include <string>
  4. #include <memory>

  5. #include <cms/Closeable.h>
  6. #include <cms/Session.h>
  7. #include <cms/MessageConsumer.h>
  8. #include <cms/MessageListener.h>
  9. #include <cms/MessageListener.h>
  10. #include <cms/ConnectionFactory.h>
  11. #include "Message.hpp"
  12. #include <activemq/commands/ActiveMQMessage.h>
  13. #include <activemq/commands/ProducerInfo.h>

  14. using namespace std;
  15. using namespace PWRD;
  16. using namespace activemq;
  17. using namespace cms;
  18. using namespace activemq::commands;

  19. namespace PWRD{
  20.     namespace ActiveMQ{

  21.         /**
  22.          * A simple Consumer that compliements the AdvisoryProducer example. This
  23.          * consumer listens on the Topic that the Producer is waiting to publish on
  24.          * and will display the count of Producers that are active on the Topic
  25.          * any time that it sees an advisory message indicating a consumer has
  26.          * stopped or started.
  27.          *
  28.          * @since 3.0
  29.          */
  30. #define FILEPATH "LocalFile"
  31. #define DEFAULTBROKERURL "failover:(tcp://127.0.0.1:61616)"
  32.         class Consumer : public cms::Closeable, public cms::MessageListener {

  33.             private:

  34.                 cms::Session* session;
  35.                 std::auto_ptr<cms::MessageConsumer> consumer;
  36.                 std::auto_ptr<cms::MessageConsumer> advisoryConsumer;
  37.                 auto_ptr<cms::Connection> connection;
  38.                 FILE *fp;
  39.                 const char *path_;
  40.                 std::string brokerURI;
  41.             protected:
  42.                 int fini();
  43.                 int write();

  44.             public:

  45.                 Consumer(string _brokerURI, const char *path = FILEPATH);
  46.                 int init(const char *path = FILEPATH);
  47.                 virtual ~Consumer();

  48.                 /**
  49.                  * Close down Consumer resources.
  50.                  */
  51.                 virtual void close() throw( cms::CM***ception );

  52.                 /**
  53.                  * Async Message callback.
  54.                  */
  55.                 virtual void onMessage( const cms::Message* message );

  56.         };
  57.     }
  58. }

  59. #endif /* _ACTIVEMQCPP_EXAMPLES_ADVISORIES_ADVISORYCONSUMER_H_ */
Consumer.cpp
  1. #include "Consumer.hpp"

  2. #include <cms/Topic.h>
  3. #include <cms/Message.h>
  4. #include <cms/TextMessage.h>
  5. #include <decaf/lang/exceptions/NullPointerException.h>

  6. #include <activemq/commands/ActiveMQMessage.h>
  7. #include <activemq/commands/ProducerInfo.h>

  8. using namespace std;
  9. using namespace decaf;
  10. using namespace decaf::lang;
  11. using namespace cms;
  12. using namespace decaf::lang::exceptions;

  13. namespace PWRD{
  14.     namespace ActiveMQ{
  15.         ////////////////////////////////////////////////////////////////////////////////
  16.         Consumer::Consumer(string _brokerURI, const char *path) {

  17.             brokerURI = _brokerURI;
  18.             if(!brokerURI.empty()){
  19.                 auto_ptr<cms::ConnectionFactory> connectionFactory(
  20.                         cms::ConnectionFactory::createCMSConnectionFactory( brokerURI ) );

  21.                 // Create a Connection
  22.                 try{
  23.                     connection.reset( connectionFactory->createConnection() );
  24.                 } catch( CM***ception& e ) {
  25.                     e.printStackTrace();
  26.                 }

  27.                 // Create the Session
  28.                 session = connection->createSession();
  29.                 if( session == NULL ) {
  30.                     throw NullPointerException(
  31.                             __FILE__, __LINE__, "Session Object passed was Null." );
  32.                 }

  33.                 std::auto_ptr<cms::Topic> destination( session->createTopic(
  34.                             "HEART-BEAT-CHANNEL" ) );
  35.                 std::auto_ptr<cms::Topic> advisories( session->createTopic(
  36.                             "ActiveMQ.Advisory.Consumer.Topic.HEART-BEAT-CHANNEL" ) );
  37.                 this->consumer.reset( session->createConsumer( advisories.get() ) );
  38.                 this->consumer->setMessageListener( this );
  39.                 connection->start();
  40.             }

  41.             init(path);
  42.         }

  43.         ////////////////////////////////////////////////////////////////////////////////
  44.         Consumer::~Consumer() {
  45.             this->fini();
  46.             connection->stop();
  47.         }

  48.         ////////////////////////////////////////////////////////////////////////////////
  49.         void Consumer::close() throw( cms::CM***ception ) {
  50.             this->consumer.reset( NULL );
  51.         }

  52.         int Consumer::init(const char *path){
  53.             path_ = path;
  54.             fp = fopen(path, "a+");
  55.             if(NULL == fp){
  56.                 logs.write_log(NORMAL, "Can't open this file: %s", path_);    
  57.                 return -1;
  58.             }
  59.             return 1;
  60.         }

  61.         int Consumer::fini(){
  62.             return fclose(fp);
  63.         }
  64.         ////////////////////////////////////////////////////////////////////////////////
  65.         void Consumer::onMessage( const cms::Message* message ) {

  66.             if( message->getCMSType() == "Advisory" ) {

  67.                 const ActiveMQMessage* amqMessage =
  68.                     dynamic_cast<const ActiveMQMessage*>( message );

  69.                 // If you want you can get the ProducerInfo for instance, you could get
  70.                 // the ConsumerInfo and ConnectionInfo.
  71.                 if( amqMessage != NULL && amqMessage->getDataStructure() != NULL ) {
  72.                     const ProducerInfo* info = dynamic_cast<const ProducerInfo*>(
  73.                             amqMessage->getDataStructure().get() );

  74.                     std::cout << "Got ProducerInfo for producer: " << info->getProducerId()->toString() << std::endl;
  75.                 }

  76.                 if( message->propertyExists( "producerCount" ) ) {
  77.                     std::string producerCount = message->getStringProperty( "producerCount" );
  78.                     std::cout << "Number of Producers = " << producerCount << std::endl;
  79.                 }

  80.             } else {

  81.                 const cms::TextMessage* txtMessage =
  82.                     dynamic_cast<const cms::TextMessage*>( message );

  83.                 if( txtMessage != NULL ) {
  84.                     string text = txtMessage->getText();
  85.                     cout << text << endl;
  86.                     int len = fwrite(text.c_str(), 1, text.length(), fp);
  87.                     fflush(fp);
  88.                     logs.write_log(NORMAL, "Consumer write %d bytes to file %s", len, path_);
  89.                 }
  90.             }
  91.         }
  92.     }
  93. }
ConsumerMain.cpp
  1. #include "Consumer.hpp"

  2. #include <activemq/library/ActiveMQCPP.h>
  3. #include <cms/ConnectionFactory.h>
  4. #include <cms/Connection.h>
  5. #include <cms/Session.h>
  6. #include <stdlib.h>
  7. #include <iostream>
  8. #include <memory>

  9. using namespace cms;
  10. using namespace std;
  11. using namespace PWRD;
  12. using namespace PWRD::ActiveMQ;
  13. using namespace decaf;
  14. using namespace decaf::lang;
  15. using namespace decaf::lang::exceptions;

  16. ////////////////////////////////////////////////////////////////////////////////
  17. int main( int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED ) {

  18.     // We must always init the library first before using any methods in it.
  19.     activemq::library::ActiveMQCPP::initializeLibrary();

  20.     std::cout << "=====================================================\n";
  21.     std::cout << "Starting the example:" << std::endl;
  22.     std::cout << "-----------------------------------------------------\n";

  23.     // Set the URI to point to the IPAddress of your broker.
  24.     // add any optional params to the url to enable things like
  25.     // tightMarshalling or tcp logging etc. See the CMS web site for
  26.     // a full list of configuration options.
  27.     //
  28.     // http://activemq.apache.org/cms/
  29.     //
  30.     std::string brokerURI = "failover:(tcp://10.14.2.45:61616)";

  31.     /*
  32.     // Create the Connection
  33.     auto_ptr<cms::ConnectionFactory> connectionFactory(
  34.             cms::ConnectionFactory::createCMSConnectionFactory( brokerURI ) );

  35.     auto_ptr<cms::Connection> connection;

  36.     // Create a Connection
  37.     try{
  38.         connection.reset( connectionFactory->createConnection() );
  39.     } catch( CM***ception& e ) {
  40.         e.printStackTrace();
  41.         return 1;
  42.     }

  43.     // Create the Session
  44.     std::auto_ptr<cms::Session> session( connection->createSession() );
  45.     */

  46.     // Create the producer and run it.
  47.     Consumer consumer( brokerURI, "Localfile" );

  48.     // Start the Connection now.

  49.     // Wait until we are told to quit.
  50.     std::cout << "Press 'q' to quit" << std::endl;
  51.     while( std::cin.get() != 'q') {}

  52.     // Shutdown now
  53.     consumer.close();

  54.     std::cout << "-----------------------------------------------------\n";
  55.     std::cout << "Finished with the example." << std::endl;
  56.     std::cout << "=====================================================\n";

  57.     // We must also always remember to shut down the Library when done.
  58.     activemq::library::ActiveMQCPP::shutdownLibrary();

  59.     return 0;
  60. }
阅读(1543) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~