Chinaunix首页 | 论坛 | 博客
  • 博客访问: 157531
  • 博文数量: 22
  • 博客积分: 425
  • 博客等级: 下士
  • 技术积分: 350
  • 用 户 组: 普通用户
  • 注册时间: 2012-05-15 09:43
个人简介

专注服务器设计、开发; https://github.com/justscu;

文章分类

全部博文(22)

文章存档

2014年(10)

2013年(2)

2012年(10)

分类: 服务器与存储

2014-01-22 14:59:35

参考文档:
官网(apache.org/
开源中国社区(
专门介绍原理(http://blog.csdn.net/kongjing0815/article/details/8177459)

基础知识

1. 基本信息:
1. 下载activeMQ,(apache.org/activemq-570-release.html
2. 这里以apache-activemq-5.7.0-bin.tar.gz为例。
3. 下载后,将其拷贝到/home/ll/commlib/下, 执行tar -zxvf apache-activemq-5.7.0-bin.tar.gz
4. cd apache-activemq-5.7.0,可以看见有个README.txt文件。读读有好处。
5. cd ./bin,可以看见一些可执行的脚本。运行./activemq start,启动activemq.
6. activemq默认端口为61616,用netstat -ano | grep 61616,看看该端口是否处于监听状态。
7. ./activemq stop,关闭.
8. 在apache-activemq-5.7.0/conf/目录下,有对MQ进行配置的文件。
9. 监控: in/topics.jsp

2. 性能

官方提供数据:2.1W-2.2W/second,64bit双核CPU,SUSE系统。

3. 基础知识
(1)点对点消息(queue)
        每个消息只能有一个消费者(无论消费者在消息产生的时候是否处于运行状态,都可以接收消息)。
(2)sub/pub消息(topic)
        每个消息,可以有多个消费者。消费者只能得到订阅后,生产者发布的消息。
JMS 规范允许客户创建持久订阅,持久订阅允许消费者消费它在未处于激活状态时发送的消息。
(3)消息的生产者(producer)
            由会话创建的一个对象,将消息发送到目的地。
        消息的消费者(consumer)
            (1)同步消费(receive阻塞式的接收);(2)异步消费(设置回调函数)
(4)传输方式
        VM transport: 在VM内部通信,避免了网络传输的开销。不是采用socket通信,而是采用直接调用的方法。
        TCP transport: 客户端通过TCP的方式连接到broker。
        Failover transport: 是一种重新建立连接的机制,可靠的传输。
failover transport,允许重连的机制,建立可靠的传输;
master-slave:消息被复制到slave-broker。当master-broker遇到故障时,slave-broker不丢失任何消息。
设置randomize="false",可以让客户总是尝试连接master broker(slave broker不会接收任何连接,直到它成为master broker).
        Discovery transport:  可靠的transport.
(5)持久化存储(persistence)
        apache.org/persistence.html
        AMQ Message Store: messages都是被保存在data logs中的,同时被reference store进行索引以提高存取速度。当某个data logs中的消息都被消费了,该data log文件就会被标记,以便在下一轮清理中被归档或删除。
        Kaha Persistence: 在 Kaha 中,数据被追加到 data logs 中。当不再需要log文件中的数据的时候,log 文件会被丢弃。
        JDBC Persistence: 将消息保存在数据库中。
(6)集群模式(clustering)
        master-slave,主从模式。消息从Master Broker复制到Slave Broker。master-broker不需要特殊配置,但slave-broker需要特殊配置

4. 一些特性
(1)当多个consumer同时从Queue中取消息时,要注意消息的顺序。Message Groups特性保证了具有相同GroupID的消息被发送给同一个consumer.
(2)JMS Selectors,可以按照关键字对消息进行过滤,某个consumer只接收关心的消息。
(3)Pending Message Limit Strategy。每个consumer都会设置一定的缓存来接收消息,当consumer的消息满时,broker不会再向consumer分发消息。但这时broker的消息就会累积,会将消息存磁盘(会带来性能问题)。consumer的一种策略是新的消息来时,将旧的消息丢掉,这样就broker就不会积累消息了。
(4)AMQ支持同步发送(sync)和异步发送(async)。async的性能更高,但可能会少量丢失消息。
(5)Strict Order Dispatch Policy,可以保证多个consumer按照同样的顺序来接收多个消息。
(6)AMQ可以采用批量确认(Optimized Acknowledgement)的方式提高效率。
(7)使用Producer Flow Control来控制producer消息的发送速度。可以限制producer在收到broker确认前,能够发送的最大字节数。

CMS (stands for C++ Messaging Service)
  1. activeMQ作为broker,可以支持多种语言的连接(如java, c++, c等)。其中,C++的地址为(apache.org/cms/);开发接口API文档地址(apache.org/cms/api_docs/activemqcpp-3.6.0/html/index.html)。假设使用的版本为activemq-cpp-library-3.5.0.
  2. activemq-cpp-library-3.5.0的编译是比较麻烦的,依赖于APR, APR-Util, CPPUnit, OpenSSL等,可以参考README.txt文件。
  3. 开发模型图
4. 包装后的接口代码
点击(此处)折叠或打开   mq_api.h
  1. #pragma once

  2. #include <decaf/lang/Thread.h>
  3. #include <decaf/lang/Runnable.h>
  4. #include <decaf/util/concurrent/CountDownLatch.h>
  5. #include <activemq/core/ActiveMQConnectionFactory.h>
  6. #include <activemq/core/ActiveMQConnection.h>
  7. #include <activemq/transport/DefaultTransportListener.h>
  8. #include <activemq/library/ActiveMQCPP.h>
  9. #include <decaf/lang/Integer.h>
  10. #include <activemq/util/Config.h>
  11. #include <decaf/util/Date.h>
  12. #include <cms/Connection.h>
  13. #include <cms/Session.h>
  14. #include <cms/TextMessage.h>
  15. #include <cms/BytesMessage.h>
  16. #include <cms/MapMessage.h>
  17. #include <cms/ExceptionListener.h>
  18. #include <cms/MessageListener.h>

  19. #include <string>
  20. #include <vector>
  21. #include <stdlib.h>
  22. #include <stdio.h>
  23. #include <iostream>

  24. using namespace activemq;
  25. using namespace activemq::core;
  26. using namespace activemq::transport;
  27. using namespace decaf::lang;
  28. using namespace decaf::util;
  29. using namespace decaf::util::concurrent;
  30. using namespace cms;
  31. using namespace std;

  32. typedef void (*MQCbFunc)(string dest_name, unsigned char *buffer,
  33.         size_t buflen);

  34. class ActiveMQAPI: public ExceptionListener, public MessageListener, public DefaultTransportListener
  35. {
  36. public:
  37.     ActiveMQAPI(const string& brokerURI, MQCbFunc pCallBack, const string& clientID = "") :
  38.         connection(NULL), session(NULL), pCallBack(pCallBack), brokerURI(brokerURI), clientID(clientID)
  39.     {

  40.     }
  41.     virtual ~ActiveMQAPI() throw ();
  42.     void close();
  43.     int Init();

  44.     /**
  45.      *@param dest: topic/queue name
  46.      *@param name: if PersistentMode=true, name is the durable sub name
  47.      */
  48.     int Subscribe(const string& dest, bool useTopic = true, const string& name =
  49.      "", bool PersistentMode = false);
  50.     /**
  51.      @priority: 0-9,
  52.      @timeToLive:The time to live value for this message in milliseconds. 0, never expire;
  53.      */
  54.     int PubMsg(const string& dest, const string& msg, bool useTopic = true,
  55.      bool PersistentMode = false, int priority = 4,
  56.      long long timeToLive = 0);

  57.     int UnSubscribe(const string& dest, const string& name = "");

  58.     virtual void onMessage(const cms::Message* message) throw ();
  59.     virtual void onException(const CMSException& ex AMQCPP_UNUSED);
  60.     virtual void transportInterrupted();
  61.     virtual void transportResumed();

  62.     int GetDestCount()
  63.     {
  64.         return (int) dest_names.size();
  65.     }
  66. private:
  67.     void cleanup();

  68. private:
  69.     Connection* connection;
  70.     Session* session;
  71.     vector<string> dest_names;
  72.     map<string, MessageConsumer*> consumermap; // 消费者容器
  73.     map<string, MessageProducer*> producermap; // 生产者容器
  74.     map<Destination*, string> destmap;

  75.     MQCbFunc pCallBack;

  76.     const string brokerURI;
  77.     const string clientID;
  78. };
点击(此处)折叠或打开   mq_api.cpp
  1. #include "mq_api.h"

  2. void ActiveMQAPI::close() {
  3.     cleanup();
  4. }

  5. ActiveMQAPI::~ActiveMQAPI() throw () {
  6.     cleanup();
  7. }

  8. // 初始化
  9. int ActiveMQAPI::Init()
  10. {
  11.     try
  12.     {
  13.         ActiveMQConnectionFactory* connectionFactory =
  14.          new ActiveMQConnectionFactory(brokerURI);
  15.         //create connection
  16.         connection = connectionFactory->createConnection();
  17.         if (clientID.size() > 0)
  18.             connection->setClientID(clientID);
  19.         delete connectionFactory;
  20.         connectionFactory = NULL;

  21.         ActiveMQConnection* amqConnection =
  22.          dynamic_cast<ActiveMQConnection*> (connection);
  23.         if (amqConnection != NULL)
  24.         {
  25.             amqConnection->addTransportListener(this);
  26.         }

  27.         connection->start();
  28.         connection->setExceptionListener(this);

  29.         //create session
  30.         session = connection->createSession(Session::AUTO_ACKNOWLEDGE); // 自动应答
  31.     } catch (CMSException& e)
  32.     {
  33.         e.printStackTrace();
  34.         return -1;
  35.     }

  36.     return 0;
  37. }

  38. // 去broker订阅.
  39. // @dest为 topic或queue的名字
  40. // @useTopic = true,订阅的为topic;否则为queue.
  41. int ActiveMQAPI::Subscribe(const string& dest, bool useTopic,
  42.         const string& name, bool PersistentMode)
  43. {
  44.     if (session == NULL)
  45.         return -1;

  46.     Destination* destination;
  47.     MessageConsumer *consumer = NULL;

  48.     map<string, MessageConsumer*>::iterator it;
  49.     it = consumermap.find(dest);
  50.     if (it != consumermap.end())
  51.         return 0;
  52.     else
  53.     {
  54.         if (useTopic)
  55.             destination = session->createTopic(dest);
  56.         else
  57.             destination = session->createQueue(dest);

  58.         dest_names.push_back(dest);
  59.         destmap.insert(pair<Destination*, string> (destination, dest));
  60.         if (useTopic && PersistentMode)
  61.         {
  62.             if (name.size() == 0)
  63.                 return -1;
  64.             // 持久订阅
  65.             consumer = session->createDurableConsumer(
  66.              (cms::Topic*) destination, name, "");
  67.         }
  68.         else
  69.         {
  70.             consumer = session->createConsumer(destination);
  71.         }
  72.         consumermap.insert(pair<string, MessageConsumer*> (dest, consumer));
  73.         consumer->setMessageListener(this);
  74.     }

  75.     return 0;
  76. }

  77. // 发布
  78. int ActiveMQAPI::PubMsg(const string& dest, const string& msg, bool useTopic,
  79.         bool PersistentMode, int priority, long long timeToLive)
  80. {
  81.     if (msg.size() == 0 || dest.size() == 0)
  82.         return -1;
  83.     if (session == NULL)
  84.         return -1;

  85.     Destination* destination = NULL;
  86.     MessageProducer* producer = NULL;

  87.     map<string, MessageProducer*>::iterator it;
  88.     it = producermap.find(dest);
  89.     if (it != producermap.end())
  90.     {
  91.         producer = it->second;
  92.     }
  93.     else
  94.     {
  95.         if (useTopic)
  96.             destination = session->createTopic(dest);
  97.         else
  98.             destination = session->createQueue(dest);
  99.         producer = session->createProducer(destination);

  100.         if (PersistentMode)
  101.             producer->setDeliveryMode(DeliveryMode::PERSISTENT);
  102.         else
  103.             producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);

  104.         producermap.insert(pair<string, MessageProducer*> (dest, producer));
  105.     }

  106.     BytesMessage* message = session->createBytesMessage((unsigned char*) msg.c_str(), msg.size());
  107.     // producer->send(message);
  108.     producer->send(message, (PersistentMode == true) ? DeliveryMode::PERSISTENT
  109.      : DeliveryMode::NON_PERSISTENT, priority, timeToLive);

  110.     delete message;
  111.     message = NULL;
  112.     return 0;
  113. }

  114. // 取消订阅
  115. int ActiveMQAPI::UnSubscribe(const string& dest, const string& name) {
  116.     map<string, MessageConsumer*>::iterator it_c = consumermap.find(dest);
  117.     MessageConsumer* consumer = NULL;
  118.     if (it_c != consumermap.end())
  119.     {
  120.         consumer = it_c->second;
  121.         consumermap.erase(it_c);
  122.         delete consumer;
  123.         consumer = NULL;
  124.     }

  125.     map<Destination*, string>::iterator it;
  126.     Destination* destination = NULL;
  127.     for (it = destmap.begin(); it != destmap.end();)
  128.     {
  129.         Destination* k = it->first;
  130.         const string &v = it->second;
  131.         it++;
  132.         if (dest.compare(v) == 0)
  133.         {
  134.             destination = k;
  135.             destmap.erase(k);

  136.             try
  137.             {
  138.                 if (NULL == destination)
  139.                 {
  140.                     printf(
  141.                      "ActiveMQAPI(UnSubscribe)[%s]: destination[%p] is NULL",
  142.                      dest.c_str(), destination);
  143.                     return -1;
  144.                 }
  145.                 ActiveMQConnection* amqConnection =
  146.                  dynamic_cast<ActiveMQConnection*> (connection);
  147.                 amqConnection->destroyDestination(destination);
  148.             } catch (std::exception& e)
  149.             {
  150.                 printf("ActiveMQAPI(UnSubscribe)[%s]: exception[%s]",
  151.                  dest.c_str(), e.what());
  152.                 return -1;
  153.             }

  154.             delete destination;
  155.             destination = NULL;

  156.             break;
  157.         }
  158.     }

  159.     if (name.size() > 0)
  160.     {
  161.         if (session == NULL)
  162.             return -1;
  163.         try
  164.         {
  165.             session->unsubscribe(name);
  166.         } catch (CMSException& e)
  167.         {
  168.             e.printStackTrace();
  169.             return -1;
  170.         }
  171.     }
  172.     return 0;

  173. }

  174. // 消息来时的回调函数
  175. void ActiveMQAPI::onMessage(const Message* message) throw ()
  176. {
  177.     try
  178.     {
  179.         const BytesMessage* msg = dynamic_cast<const BytesMessage*> (message);
  180.         unsigned char *buffer = NULL;
  181.         size_t buflen = 0;
  182.         string destname;

  183.         if (msg != NULL)
  184.         {
  185.             buffer = msg->getBodyBytes();
  186.             buflen = msg->getBodyLength();
  187.             map<Destination*, string>::iterator it;
  188.             for (it = destmap.begin(); it != destmap.end(); it++)
  189.             {
  190.                 if (msg->getCMSDestination()->equals(*(it->first)))
  191.                 {
  192.                     destname = it->second;
  193.                 }
  194.             }
  195.         }
  196.         else
  197.         {
  198.             printf("the msg is NULL");
  199.             return;
  200.         }

  201.         pCallBack(destname, buffer, buflen);
  202.         if (NULL != buffer)
  203.         {
  204.             delete[] buffer;
  205.             buffer = NULL;
  206.         }

  207.     }
  208.     catch (CMSException& e)
  209.     {
  210.         e.printStackTrace();
  211.     }
  212. }

  213. void ActiveMQAPI::onException(const CMSException& ex AMQCPP_UNUSED)
  214. {
  215.     printf("ActiveMQAPI: CMS Exception occurred[%s]. Shutting down client.",
  216.      ex.what());

  217.     // exit(1);
  218. }

  219. void ActiveMQAPI::transportInterrupted()
  220. {
  221.     printf("ActiveMQAPI: The Connection's Transport has been Interrupted.");
  222. }

  223. void ActiveMQAPI::transportResumed()
  224. {
  225.     printf("ActiveMQAPI: The Connection's Transport has been Restored.");
  226. }

  227. void ActiveMQAPI::cleanup()
  228. {
  229.     map<Destination*, string>::iterator it;
  230.     for (it = destmap.begin(); it != destmap.end();)
  231.     {
  232.         Destination* k = it->first;
  233.         it++;
  234.         destmap.erase(k);
  235.         delete k;
  236.         k = NULL;
  237.     }

  238.     map<string, MessageConsumer*>::iterator it_consumer;
  239.     for (it_consumer = consumermap.begin(); it_consumer != consumermap.end();)
  240.     {
  241.         const string &k = it_consumer->first;
  242.         const MessageConsumer* v = it_consumer->second;
  243.         it_consumer++;
  244.         if (v != NULL)
  245.         {
  246.             delete v;
  247.             v = NULL;
  248.             consumermap.erase(k);
  249.         }
  250.     }

  251.     if (session != NULL)
  252.     {
  253.         session->close();
  254.         delete session;
  255.         session = NULL;
  256.     }
  257.     if (connection != NULL)
  258.     {
  259.         connection->close();
  260.         delete connection;
  261.         connection = NULL;
  262.     }
  263. }

5. 测试代码
点击(此处)折叠或打开   订阅topic
  1. void MQCallBack(string dest_name, unsigned char *buffer,
  2.         size_t buflen)
  3. {
  4.     printf("topic[%s] buffer[%s] \n", dest_name.c_str(), string((char*)buffer, buflen).c_str());
  5. }

  6. int main()
  7. {
  8.     activemq::library::ActiveMQCPP::initializeLibrary();
  9.     //
  10.     ActiveMQAPI* mq = new ActiveMQAPI("failover:(tcp://XX:61616,tcp://XX:61616)?randomize=false", MQCallBack);
  11.     if(NULL == mq)
  12.     {
  13.         printf("new activeMQ error.");
  14.         return 0;
  15.     }

  16.     if(0 != mq->Init())
  17.     {
  18.         printf("init activeMQ failed.");
  19.         delete mq;
  20.         mq = NULL;
  21.         return 0;
  22.     }

  23.     mq->Subscribe("bond.mq", true);

  24.     while(true)
  25.     {
  26.         sleep(5);
  27.     }

  28.     mq->UnSubscribe("bond.mq");
  29.     delete mq;
  30.     mq = NULL;
  31.     activemq::library::ActiveMQCPP::shutdownLibrary();
  32.     return 0;
  33. }
        订阅后,可以通过 MQ提供的监控平台来查看是否订阅成功。在()主页的 “Connections”下面,通过IP找到运行订阅程序的主机,然后再点击进去,看是否有这个topic。

点击(此处)折叠或打开   发布topic

  1. int main()
  2. {
  3.     activemq::library::ActiveMQCPP::initializeLibrary();
  4.     //
  5.     ActiveMQAPI* mq = new ActiveMQAPI("failover:(tcp://XX:61616,tcp://XX:61616)?randomize=false", MQCallBack);
  6.     if(NULL == mq)
  7.     {
  8.         printf("new activeMQ error.");
  9.         return 0;
  10.     }

  11.     if(0 != mq->Init())
  12.     {
  13.         printf("init activeMQ failed.");
  14.         delete mq;
  15.         mq = NULL;
  16.         return 0;
  17.     }

  18.     int iRet = mq->PubMsg("bond.mq", "the message to send to the consumers.", true, true);
  19.     delete mq;
  20.     mq = NULL;

  21.     activemq::library::ActiveMQCPP::shutdownLibrary();
  22.     return 0;
  23. }



阅读(5281) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~