virtual~HelloWorldProducer(){ cleanup(); } virtualvoid run(){ try{ // Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory =new ActiveMQConnectionFactory("tcp://127.0.0.1:61613"); // Create a Connection
connection = connectionFactory->createConnection(); connection->start(); // Create a Session
session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); // Create the destination (Topic or Queue)
if( useTopic ){ destination = session->createTopic("TEST.FOO"); }else{ destination = session->createQueue("TEST.FOO"); } // Create a MessageProducer from the Session to the Topic or Queue
producer = session->createProducer( destination ); producer->setDeliveryMode( DeliveryMode::NON_PERSISTANT ); // Create the Thread Id String
string threadIdStr = Integer::toString( Thread::getId()); // Create a messages
string text =(string)"Hello world! from thread "+ threadIdStr; for(int ix=0; ix<numMessages;++ix ){ TextMessage* message = session->createTextMessage( text ); // Tell the producer to send the message
session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); // Create the destination (Topic or Queue)
if( useTopic ){ destination = session->createTopic("TEST.FOO"); }else{ destination = session->createQueue("TEST.FOO"); } // Create a MessageConsumer from the Session to the Topic or Queue
consumer = session->createConsumer( destination ); consumer->setMessageListener(this); // Sleep while asynchronous messages come in.
Thread::sleep( waitMillis ); }catch(CMSException& e){ e.printStackTrace(); } } // Called from the consumer since this class is a registered MessageListener.
virtualvoid onMessage(const Message* message ){ staticintcount= 0; try { count++; const TextMessage* textMessage = dynamic_cast<const TextMessage*>( message ); string text = textMessage->getText(); printf("Message #%d Received: %s\n",count, text.c_str()); }catch(CMSException& e){ e.printStackTrace(); } } // If something bad happens you see it here as this class is also been
// registered as an ExceptionListener with the connection.
virtualvoid onException(const CMSException& ex ){ printf("JMS Exception occured. Shutting down client.\n"); } private: void cleanup(){ //*************************************************
// Always close destination, consumers and producers before
int main(int argc,char* argv[]){ std::cout<<"=====================================================\n"; std::cout<<"Starting the example:"<<std::endl; std::cout<<"-----------------------------------------------------\n"; //============================================================
// set to true to use topics instead of queues
// Note in the code above that this causes createTopic or
// createQueue to be used in both consumer an producer.
Thread consumerThread(&consumer ); consumerThread.start(); // Start the producer thread.
Thread producerThread(&producer ); producerThread.start(); // Wait for the threads to complete.
producerThread.join(); consumerThread.join(); std::cout<<"-----------------------------------------------------\n"; std::cout<<"Finished with the example, ignore errors from this" <<std::endl <<"point on as the sockets breaks when we shutdown." <<std::endl; std::cout<<"=====================================================\n"; }