Chinaunix首页 | 论坛 | 博客
  • 博客访问: 585555
  • 博文数量: 137
  • 博客积分: 4040
  • 博客等级: 上校
  • 技术积分: 1584
  • 用 户 组: 普通用户
  • 注册时间: 2009-01-08 13:05
文章分类

全部博文(137)

文章存档

2011年(10)

2010年(23)

2009年(104)

分类: LINUX

2009-07-30 14:04:20

 

#include <activemq/concurrent/Thread.h>
#include <activemq/concurrent/Runnable.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Integer.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <iostream>
 
using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::concurrent;
using namespace cms;
using namespace std;

class HelloWorldProducer : public Runnable {
private:
    Connection* connection;
    Session* session;
    Destination* destination;
    MessageProducer* producer;
    int numMessages;
    bool useTopic;
public:
    HelloWorldProducer( int numMessages, bool useTopic = false ){
        connection = NULL;
        session = NULL;
        destination = NULL;
        producer = NULL;
        this->numMessages = numMessages;
        this->useTopic = useTopic;
    }
    
    virtual ~HelloWorldProducer(){
        cleanup();
    }
    virtual void run() {
        try {
            // Create a ConnectionFactory

            ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61613");
            // Create a Connection

            connection = connectionFactory->createConnection();
            connection->start();
            // Create a Session

            session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
            // Create the destination (Topic or Queue)

            if( useTopic ) {
                destination = session->createTopic( "TEST.FOO" );
            } else {
                destination = session->createQueue( "TEST.FOO" );
            }
            // Create a MessageProducer from the Session to the Topic or Queue

            producer = session->createProducer( destination );
            producer->setDeliveryMode( DeliveryMode::NON_PERSISTANT );
            // Create the Thread Id String

            string threadIdStr = Integer::toString( Thread::getId() );
            // Create a messages

            string text = (string)"Hello world! from thread " + threadIdStr;
            for( int ix=0; ix<numMessages; ++ix ){
                TextMessage* message = session->createTextMessage( text );
                // Tell the producer to send the message

                printf( "Sent message from thread %s\n", threadIdStr.c_str() );
                producer->send( message );
                delete message;
            }
        }catch ( CMSException& e ) {
            e.printStackTrace();
        }
    }
private:
    void cleanup(){
            // Destroy resources.

            try{
                if( destination != NULL ) delete destination;
            }catch ( CMSException& e ) {}
            destination = NULL;
            try{
                if( producer != NULL ) delete producer;
            }catch ( CMSException& e ) {}
            producer = NULL;
            // Close open resources.

            try{
                if( session != NULL ) session->close();
                if( connection != NULL ) connection->close();
            }catch ( CMSException& e ) {}
            try{
                if( session != NULL ) delete session;
            }catch ( CMSException& e ) {}
            session = NULL;
            try{
                if( connection != NULL ) delete connection;
            }catch ( CMSException& e ) {}
            connection = NULL;
    }
};

class HelloWorldConsumer : public ExceptionListener,
                           public MessageListener,
                           public Runnable {
private:
    Connection* connection;
    Session* session;
    Destination* destination;
    MessageConsumer* consumer;
    long waitMillis;
    bool useTopic;
public:
    HelloWorldConsumer( long waitMillis, bool useTopic = false ){
        connection = NULL;
        session = NULL;
        destination = NULL;
        consumer = NULL;
        this->waitMillis = waitMillis;
        this->useTopic = useTopic;
    }
    virtual ~HelloWorldConsumer(){
        cleanup();
    }
    virtual void run() {
        try {
            // Create a ConnectionFactory

            ActiveMQConnectionFactory* connectionFactory =
                new ActiveMQConnectionFactory( "tcp://127.0.0.1:61613" );
            // Create a Connection

            connection = connectionFactory->createConnection();
            delete connectionFactory;
            connection->start();
            connection->setExceptionListener(this);
            // Create a Session

            session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
            // Create the destination (Topic or Queue)

            if( useTopic ) {
                destination = session->createTopic( "TEST.FOO" );
            } else {
                destination = session->createQueue( "TEST.FOO" );
            }
            // Create a MessageConsumer from the Session to the Topic or Queue

            consumer = session->createConsumer( destination );
            consumer->setMessageListener( this );
            // Sleep while asynchronous messages come in.

            Thread::sleep( waitMillis );
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }
    // Called from the consumer since this class is a registered MessageListener.

    virtual void onMessage( const Message* message ){
        static int count = 0;
        try
        {
            count++;
            const TextMessage* textMessage =
                dynamic_cast< const TextMessage* >( message );
            string text = textMessage->getText();
            printf( "Message #%d Received: %s\n", count, text.c_str() );
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }
    // If something bad happens you see it here as this class is also been

    // registered as an ExceptionListener with the connection.

    virtual void onException( const CMSException& ex ) {
        printf("JMS Exception occured. Shutting down client.\n");
    }
private:
    void cleanup(){
        //*************************************************

        // Always close destination, consumers and producers before

        // you destroy their sessions and connection.

        //*************************************************

        // Destroy resources.

        try{
            if( destination != NULL ) delete destination;
        }catch (CMSException& e) {}
        destination = NULL;
        try{
            if( consumer != NULL ) delete consumer;
        }catch (CMSException& e) {}
        consumer = NULL;
        // Close open resources.

        try{
            if( session != NULL ) session->close();
            if( connection != NULL ) connection->close();
        }catch (CMSException& e) {}
        // Now Destroy them

        try{
            if( session != NULL ) delete session;
        }catch (CMSException& e) {}
        session = NULL;
        try{
            if( connection != NULL ) delete connection;
        }catch (CMSException& e) {}
        connection = NULL;
    }
};

int main(int argc, char* argv[]) {
    std::cout << "=====================================================\n";
    std::cout << "Starting the example:" << std::endl;
    std::cout << "-----------------------------------------------------\n";
    //============================================================

    // set to true to use topics instead of queues

    // Note in the code above that this causes createTopic or

    // createQueue to be used in both consumer an producer.

    //============================================================

    bool useTopics = false;
    HelloWorldProducer producer( 1000, useTopics );
    HelloWorldConsumer consumer( 8000, useTopics );
    // Start the consumer thread.

    Thread consumerThread( &consumer );
    consumerThread.start();
    // Start the producer thread.

    Thread producerThread( &producer );
    producerThread.start();
    // Wait for the threads to complete.

    producerThread.join();
    consumerThread.join();
    std::cout << "-----------------------------------------------------\n";
    std::cout << "Finished with the example, ignore errors from this"
              << std::endl
              << "point on as the sockets breaks when we shutdown."
              << std::endl;
    std::cout << "=====================================================\n";
}

阅读(4385) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~