Producer.hpp
- #ifndef _PRODUCER_H
- #define _PRODUCER_H
- #include <string>
- #include <memory>
- #include <cms/Session.h>
- #include <cms/MessageProducer.h>
- #include <cms/MessageConsumer.h>
- #include <cms/MessageListener.h>
- #include <cms/ConnectionFactory.h>
- #include <cms/Connection.h>
- #include <decaf/lang/Runnable.h>
- #include <decaf/util/concurrent/CountDownLatch.h>
- #include <activemq/commands/ActiveMQMessage.h>
- #include <activemq/commands/ProducerInfo.h>
- #include <activemq/library/ActiveMQCPP.h>
- #include "Message.hpp"
- using namespace PWRD;
- using namespace std;
- using namespace activemq;
- using namespace cms;
- using namespace decaf;
- using namespace decaf::lang;
- using namespace decaf::lang::exceptions;
- namespace PWRD{
- namespace ActiveMQ{
- /**
- * A sample Prodcer that will only send Messages on its Topic when it has
- * received an advisory indicating that there is an active MessageConsumer
- * on the Topic. Once another message comes in indicating that there is no
- * longer a consumer then this Producer stops producing again.
- *
- * @since 3.0
- */
- #define DEFAULTBROKERURI "failover:(tcp://127.0.0.1:61616)"
- class Producer : public decaf::lang::Runnable, public cms::MessageListener {
-
- private:
- volatile bool consumerOnline;
- volatile bool shutdown;
- decaf::util::concurrent::CountDownLatch shutdownLatch;
- cms::Session* session;
- std::auto_ptr<cms::MessageConsumer> consumer;
- std::auto_ptr<cms::MessageProducer> producer;
- std::string brokerURI;
- auto_ptr<cms::Connection> connection;
- public:
- Producer(string _brokerURI = DEFAULTBROKERURI );
- virtual ~Producer();
- /**
- * Shut down the processing that occurs in the Run method.
- */
- void stop();
- /**
- * Run the producer code.
- */
- virtual void run();
- void transfer(Packet *pack);
- /**
- * Async Message callback.
- */
- virtual void onMessage( const cms::Message* message );
- };
- }
- }
- #endif /* _ACTIVEMQCPP_EXAMPLES_ADVISORIES_ADVISORYPRODUCER_H_ */
ProducerMain.cpp
- #include "Producer.hpp"
- #include <decaf/lang/Thread.h>
- #include <decaf/lang/Runnable.h>
- #include <activemq/library/ActiveMQCPP.h>
- #include <cms/ConnectionFactory.h>
- #include <cms/Connection.h>
- #include <cms/Session.h>
- #include <stdlib.h>
- #include <iostream>
- #include <memory>
- using namespace cms;
- using namespace std;
- using namespace PWRD;
- using namespace PWRD::ActiveMQ;
- using namespace decaf;
- using namespace decaf::lang;
- using namespace decaf::lang::exceptions;
- ////////////////////////////////////////////////////////////////////////////////
- int main( int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED ) {
- // We must always init the library first before using any methods in it.
- activemq::library::ActiveMQCPP::initializeLibrary();
- std::cout << "=====================================================\n";
- std::cout << "Starting the example:" << std::endl;
- std::cout << "-----------------------------------------------------\n";
- // Set the URI to point to the IPAddress of your broker.
- // add any optional params to the url to enable things like
- // tightMarshalling or tcp logging etc. See the CMS web site for
- // a full list of configuration options.
- //
- // http://activemq.apache.org/cms/
- //
- std::string brokerURI = "failover:(tcp://10.14.2.40:61616)";
- {
- // Create the Connection
- Producer producer( brokerURI );
- Thread runner( &producer );
- runner.start();
- // Start the Connection now.
- // Wait until we are told to quit.
- while(1){
- Packet *packet = new Packet();
- packet->set_ip("10.14.2.40");
- packet->set_file("test");
- packet->set_offset(0);
- packet->set_data("Hello world");
- packet->set_type(ENORMAL);
- producer.transfer(packet);
- Thread::sleep( 1000 );
- delete packet;
- }
- std::cout << "Press 'q' to quit" << std::endl;
- while( std::cin.get() != 'q') {}
- // Shutdown now
- producer.stop();
- }
- std::cout << "-----------------------------------------------------\n";
- std::cout << "Finished with the example." << std::endl;
- std::cout << "=====================================================\n";
- // We must also always remember to shut down the Library when done.
- activemq::library::ActiveMQCPP::shutdownLibrary();
- return 0;
- }
/*******************************************************************************************/
Consumer.hpp
- #ifndef _CONSUMER_H_
- #define _CONSUMER_H_
- #include <string>
- #include <memory>
- #include <cms/Closeable.h>
- #include <cms/Session.h>
- #include <cms/MessageConsumer.h>
- #include <cms/MessageListener.h>
- #include <cms/MessageListener.h>
- #include <cms/ConnectionFactory.h>
- #include "Message.hpp"
- #include <activemq/commands/ActiveMQMessage.h>
- #include <activemq/commands/ProducerInfo.h>
- using namespace std;
- using namespace PWRD;
- using namespace activemq;
- using namespace cms;
- using namespace activemq::commands;
- namespace PWRD{
- namespace ActiveMQ{
- /**
- * A simple Consumer that compliements the AdvisoryProducer example. This
- * consumer listens on the Topic that the Producer is waiting to publish on
- * and will display the count of Producers that are active on the Topic
- * any time that it sees an advisory message indicating a consumer has
- * stopped or started.
- *
- * @since 3.0
- */
- #define FILEPATH "LocalFile"
- #define DEFAULTBROKERURL "failover:(tcp://127.0.0.1:61616)"
- class Consumer : public cms::Closeable, public cms::MessageListener {
- private:
- cms::Session* session;
- std::auto_ptr<cms::MessageConsumer> consumer;
- std::auto_ptr<cms::MessageConsumer> advisoryConsumer;
- auto_ptr<cms::Connection> connection;
- FILE *fp;
- const char *path_;
- std::string brokerURI;
- protected:
- int fini();
- int write();
- public:
- Consumer(string _brokerURI, const char *path = FILEPATH);
- int init(const char *path = FILEPATH);
- virtual ~Consumer();
- /**
- * Close down Consumer resources.
- */
- virtual void close() throw( cms::CM***ception );
- /**
- * Async Message callback.
- */
- virtual void onMessage( const cms::Message* message );
- };
- }
- }
- #endif /* _ACTIVEMQCPP_EXAMPLES_ADVISORIES_ADVISORYCONSUMER_H_ */
Consumer.cpp
- #include "Consumer.hpp"
- #include <cms/Topic.h>
- #include <cms/Message.h>
- #include <cms/TextMessage.h>
- #include <decaf/lang/exceptions/NullPointerException.h>
- #include <activemq/commands/ActiveMQMessage.h>
- #include <activemq/commands/ProducerInfo.h>
- using namespace std;
- using namespace decaf;
- using namespace decaf::lang;
- using namespace cms;
- using namespace decaf::lang::exceptions;
- namespace PWRD{
- namespace ActiveMQ{
- ////////////////////////////////////////////////////////////////////////////////
- Consumer::Consumer(string _brokerURI, const char *path) {
- brokerURI = _brokerURI;
- if(!brokerURI.empty()){
- auto_ptr<cms::ConnectionFactory> connectionFactory(
- cms::ConnectionFactory::createCMSConnectionFactory( brokerURI ) );
- // Create a Connection
- try{
- connection.reset( connectionFactory->createConnection() );
- } catch( CM***ception& e ) {
- e.printStackTrace();
- }
- // Create the Session
- session = connection->createSession();
- if( session == NULL ) {
- throw NullPointerException(
- __FILE__, __LINE__, "Session Object passed was Null." );
- }
- std::auto_ptr<cms::Topic> destination( session->createTopic(
- "HEART-BEAT-CHANNEL" ) );
- std::auto_ptr<cms::Topic> advisories( session->createTopic(
- "ActiveMQ.Advisory.Consumer.Topic.HEART-BEAT-CHANNEL" ) );
- this->consumer.reset( session->createConsumer( advisories.get() ) );
- this->consumer->setMessageListener( this );
- connection->start();
- }
- init(path);
- }
- ////////////////////////////////////////////////////////////////////////////////
- Consumer::~Consumer() {
- this->fini();
- connection->stop();
- }
- ////////////////////////////////////////////////////////////////////////////////
- void Consumer::close() throw( cms::CM***ception ) {
- this->consumer.reset( NULL );
- }
- int Consumer::init(const char *path){
- path_ = path;
- fp = fopen(path, "a+");
- if(NULL == fp){
- logs.write_log(NORMAL, "Can't open this file: %s", path_);
- return -1;
- }
- return 1;
- }
- int Consumer::fini(){
- return fclose(fp);
- }
- ////////////////////////////////////////////////////////////////////////////////
- void Consumer::onMessage( const cms::Message* message ) {
- if( message->getCMSType() == "Advisory" ) {
- const ActiveMQMessage* amqMessage =
- dynamic_cast<const ActiveMQMessage*>( message );
- // If you want you can get the ProducerInfo for instance, you could get
- // the ConsumerInfo and ConnectionInfo.
- if( amqMessage != NULL && amqMessage->getDataStructure() != NULL ) {
- const ProducerInfo* info = dynamic_cast<const ProducerInfo*>(
- amqMessage->getDataStructure().get() );
- std::cout << "Got ProducerInfo for producer: " << info->getProducerId()->toString() << std::endl;
- }
- if( message->propertyExists( "producerCount" ) ) {
- std::string producerCount = message->getStringProperty( "producerCount" );
- std::cout << "Number of Producers = " << producerCount << std::endl;
- }
- } else {
- const cms::TextMessage* txtMessage =
- dynamic_cast<const cms::TextMessage*>( message );
- if( txtMessage != NULL ) {
- string text = txtMessage->getText();
- cout << text << endl;
- int len = fwrite(text.c_str(), 1, text.length(), fp);
- fflush(fp);
- logs.write_log(NORMAL, "Consumer write %d bytes to file %s", len, path_);
- }
- }
- }
- }
- }
ConsumerMain.cpp
- #include "Consumer.hpp"
- #include <activemq/library/ActiveMQCPP.h>
- #include <cms/ConnectionFactory.h>
- #include <cms/Connection.h>
- #include <cms/Session.h>
- #include <stdlib.h>
- #include <iostream>
- #include <memory>
- using namespace cms;
- using namespace std;
- using namespace PWRD;
- using namespace PWRD::ActiveMQ;
- using namespace decaf;
- using namespace decaf::lang;
- using namespace decaf::lang::exceptions;
- ////////////////////////////////////////////////////////////////////////////////
- int main( int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED ) {
- // We must always init the library first before using any methods in it.
- activemq::library::ActiveMQCPP::initializeLibrary();
- std::cout << "=====================================================\n";
- std::cout << "Starting the example:" << std::endl;
- std::cout << "-----------------------------------------------------\n";
- // Set the URI to point to the IPAddress of your broker.
- // add any optional params to the url to enable things like
- // tightMarshalling or tcp logging etc. See the CMS web site for
- // a full list of configuration options.
- //
- // http://activemq.apache.org/cms/
- //
- std::string brokerURI = "failover:(tcp://10.14.2.45:61616)";
- /*
- // Create the Connection
- auto_ptr<cms::ConnectionFactory> connectionFactory(
- cms::ConnectionFactory::createCMSConnectionFactory( brokerURI ) );
- auto_ptr<cms::Connection> connection;
- // Create a Connection
- try{
- connection.reset( connectionFactory->createConnection() );
- } catch( CM***ception& e ) {
- e.printStackTrace();
- return 1;
- }
- // Create the Session
- std::auto_ptr<cms::Session> session( connection->createSession() );
- */
- // Create the producer and run it.
- Consumer consumer( brokerURI, "Localfile" );
- // Start the Connection now.
- // Wait until we are told to quit.
- std::cout << "Press 'q' to quit" << std::endl;
- while( std::cin.get() != 'q') {}
- // Shutdown now
- consumer.close();
- std::cout << "-----------------------------------------------------\n";
- std::cout << "Finished with the example." << std::endl;
- std::cout << "=====================================================\n";
- // We must also always remember to shut down the Library when done.
- activemq::library::ActiveMQCPP::shutdownLibrary();
- return 0;
- }
阅读(1548) | 评论(0) | 转发(0) |