参考文档:
官网(apache.org/)
开源中国社区()
专门介绍原理(http://blog.csdn.net/kongjing0815/article/details/8177459)
基础知识
1. 基本信息:
1. 下载activeMQ,(apache.org/activemq-570-release.html)
2. 这里以apache-activemq-5.7.0-bin.tar.gz为例。
3. 下载后,将其拷贝到/home/ll/commlib/下, 执行tar -zxvf apache-activemq-5.7.0-bin.tar.gz
4. cd apache-activemq-5.7.0,可以看见有个README.txt文件。读读有好处。
5. cd ./bin,可以看见一些可执行的脚本。运行./activemq start,启动activemq.
6. activemq默认端口为61616,用netstat -ano | grep 61616,看看该端口是否处于监听状态。
7. ./activemq stop,关闭.
8. 在apache-activemq-5.7.0/conf/目录下,有对MQ进行配置的文件。
9. 监控: in/topics.jsp
2. 性能
官方提供数据:2.1W-2.2W/second,64bit双核CPU,SUSE系统。
3. 基础知识
(1)点对点消息(queue)
每个消息只能有一个消费者(无论消费者在消息产生的时候是否处于运行状态,都可以接收消息)。
(2)sub/pub消息(topic)
每个消息,可以有多个消费者。消费者只能得到订阅后,生产者发布的消息。
JMS 规范允许客户创建持久订阅,持久订阅允许消费者消费它在未处于激活状态时发送的消息。
(3)消息的生产者(producer)
由会话创建的一个对象,将消息发送到目的地。
消息的消费者(consumer)
(1)同步消费(receive阻塞式的接收);(2)异步消费(设置回调函数)
(4)传输方式
VM transport: 在VM内部通信,避免了网络传输的开销。不是采用socket通信,而是采用直接调用的方法。
TCP transport: 客户端通过TCP的方式连接到broker。
Failover transport: 是一种重新建立连接的机制,可靠的传输。
failover transport,允许重连的机制,建立可靠的传输;
master-slave:消息被复制到slave-broker。当master-broker遇到故障时,slave-broker不丢失任何消息。
设置randomize="false",可以让客户总是尝试连接master broker(slave broker不会接收任何连接,直到它成为master broker).
Discovery transport: 可靠的transport.
(5)持久化存储(persistence)
apache.org/persistence.html
AMQ Message Store: messages都是被保存在data logs中的,同时被reference store进行索引以提高存取速度。当某个data logs中的消息都被消费了,该data log文件就会被标记,以便在下一轮清理中被归档或删除。
Kaha Persistence: 在 Kaha 中,数据被追加到 data logs 中。当不再需要log文件中的数据的时候,log 文件会被丢弃。
JDBC Persistence: 将消息保存在数据库中。
(6)集群模式(clustering)
master-slave,主从模式。消息从Master Broker复制到Slave Broker。master-broker不需要特殊配置,但slave-broker需要特殊配置
4. 一些特性
(1)当多个consumer同时从Queue中取消息时,要注意消息的顺序。Message Groups特性保证了具有相同GroupID的消息被发送给同一个consumer.
(2)JMS Selectors,可以按照关键字对消息进行过滤,某个consumer只接收关心的消息。
(3)Pending Message Limit Strategy。每个consumer都会设置一定的缓存来接收消息,当consumer的消息满时,broker不会再向consumer分发消息。但这时broker的消息就会累积,会将消息存磁盘(会带来性能问题)。consumer的一种策略是新的消息来时,将旧的消息丢掉,这样就broker就不会积累消息了。
(4)AMQ支持同步发送(sync)和异步发送(async)。async的性能更高,但可能会少量丢失消息。
(5)Strict Order Dispatch Policy,可以保证多个consumer按照同样的顺序来接收多个消息。
(6)AMQ可以采用批量确认(Optimized Acknowledgement)的方式提高效率。
(7)使用Producer Flow Control来控制producer消息的发送速度。可以限制producer在收到broker确认前,能够发送的最大字节数。
CMS (stands for C++ Messaging Service)
-
activeMQ作为broker,可以支持多种语言的连接(如java, c++, c等)。其中,C++的地址为(apache.org/cms/);开发接口API文档地址(apache.org/cms/api_docs/activemqcpp-3.6.0/html/index.html)。假设使用的版本为activemq-cpp-library-3.5.0.
-
activemq-cpp-library-3.5.0的编译是比较麻烦的,依赖于APR, APR-Util, CPPUnit, OpenSSL等,可以参考README.txt文件。
-
开发模型图
4. 包装后的接口代码
点击(此处)折叠或打开 mq_api.h
-
#pragma once
-
-
#include <decaf/lang/Thread.h>
-
#include <decaf/lang/Runnable.h>
-
#include <decaf/util/concurrent/CountDownLatch.h>
-
#include <activemq/core/ActiveMQConnectionFactory.h>
-
#include <activemq/core/ActiveMQConnection.h>
-
#include <activemq/transport/DefaultTransportListener.h>
-
#include <activemq/library/ActiveMQCPP.h>
-
#include <decaf/lang/Integer.h>
-
#include <activemq/util/Config.h>
-
#include <decaf/util/Date.h>
-
#include <cms/Connection.h>
-
#include <cms/Session.h>
-
#include <cms/TextMessage.h>
-
#include <cms/BytesMessage.h>
-
#include <cms/MapMessage.h>
-
#include <cms/ExceptionListener.h>
-
#include <cms/MessageListener.h>
-
-
#include <string>
-
#include <vector>
-
#include <stdlib.h>
-
#include <stdio.h>
-
#include <iostream>
-
-
using namespace activemq;
-
using namespace activemq::core;
-
using namespace activemq::transport;
-
using namespace decaf::lang;
-
using namespace decaf::util;
-
using namespace decaf::util::concurrent;
-
using namespace cms;
-
using namespace std;
-
-
typedef void (*MQCbFunc)(string dest_name, unsigned char *buffer,
-
size_t buflen);
-
-
class ActiveMQAPI: public ExceptionListener, public MessageListener, public DefaultTransportListener
-
{
-
public:
-
ActiveMQAPI(const string& brokerURI, MQCbFunc pCallBack, const string& clientID = "") :
-
connection(NULL), session(NULL), pCallBack(pCallBack), brokerURI(brokerURI), clientID(clientID)
-
{
-
-
}
-
virtual ~ActiveMQAPI() throw ();
-
void close();
-
int Init();
-
-
/**
-
*@param dest: topic/queue name
-
*@param name: if PersistentMode=true, name is the durable sub name
-
*/
-
int Subscribe(const string& dest, bool useTopic = true, const string& name =
-
"", bool PersistentMode = false);
-
/**
-
@priority: 0-9,
-
@timeToLive:The time to live value for this message in milliseconds. 0, never expire;
-
*/
-
int PubMsg(const string& dest, const string& msg, bool useTopic = true,
-
bool PersistentMode = false, int priority = 4,
-
long long timeToLive = 0);
-
-
int UnSubscribe(const string& dest, const string& name = "");
-
-
virtual void onMessage(const cms::Message* message) throw ();
-
virtual void onException(const CMSException& ex AMQCPP_UNUSED);
-
virtual void transportInterrupted();
-
virtual void transportResumed();
-
-
int GetDestCount()
-
{
-
return (int) dest_names.size();
-
}
-
private:
-
void cleanup();
-
-
private:
-
Connection* connection;
-
Session* session;
-
vector<string> dest_names;
-
map<string, MessageConsumer*> consumermap; // 消费者容器
-
map<string, MessageProducer*> producermap; // 生产者容器
-
map<Destination*, string> destmap;
-
-
MQCbFunc pCallBack;
-
-
const string brokerURI;
-
const string clientID;
-
};
点击(此处)折叠或打开 mq_api.cpp
-
#include "mq_api.h"
-
-
void ActiveMQAPI::close() {
-
cleanup();
-
}
-
-
ActiveMQAPI::~ActiveMQAPI() throw () {
-
cleanup();
-
}
-
-
// 初始化
-
int ActiveMQAPI::Init()
-
{
-
try
-
{
-
ActiveMQConnectionFactory* connectionFactory =
-
new ActiveMQConnectionFactory(brokerURI);
-
//create connection
-
connection = connectionFactory->createConnection();
-
if (clientID.size() > 0)
-
connection->setClientID(clientID);
-
delete connectionFactory;
-
connectionFactory = NULL;
-
-
ActiveMQConnection* amqConnection =
-
dynamic_cast<ActiveMQConnection*> (connection);
-
if (amqConnection != NULL)
-
{
-
amqConnection->addTransportListener(this);
-
}
-
-
connection->start();
-
connection->setExceptionListener(this);
-
-
//create session
-
session = connection->createSession(Session::AUTO_ACKNOWLEDGE); // 自动应答
-
} catch (CMSException& e)
-
{
-
e.printStackTrace();
-
return -1;
-
}
-
-
return 0;
-
}
-
-
// 去broker订阅.
-
// @dest为 topic或queue的名字
-
// @useTopic = true,订阅的为topic;否则为queue.
-
int ActiveMQAPI::Subscribe(const string& dest, bool useTopic,
-
const string& name, bool PersistentMode)
-
{
-
if (session == NULL)
-
return -1;
-
-
Destination* destination;
-
MessageConsumer *consumer = NULL;
-
-
map<string, MessageConsumer*>::iterator it;
-
it = consumermap.find(dest);
-
if (it != consumermap.end())
-
return 0;
-
else
-
{
-
if (useTopic)
-
destination = session->createTopic(dest);
-
else
-
destination = session->createQueue(dest);
-
-
dest_names.push_back(dest);
-
destmap.insert(pair<Destination*, string> (destination, dest));
-
if (useTopic && PersistentMode)
-
{
-
if (name.size() == 0)
-
return -1;
-
// 持久订阅
-
consumer = session->createDurableConsumer(
-
(cms::Topic*) destination, name, "");
-
}
-
else
-
{
-
consumer = session->createConsumer(destination);
-
}
-
consumermap.insert(pair<string, MessageConsumer*> (dest, consumer));
-
consumer->setMessageListener(this);
-
}
-
-
return 0;
-
}
-
-
// 发布
-
int ActiveMQAPI::PubMsg(const string& dest, const string& msg, bool useTopic,
-
bool PersistentMode, int priority, long long timeToLive)
-
{
-
if (msg.size() == 0 || dest.size() == 0)
-
return -1;
-
if (session == NULL)
-
return -1;
-
-
Destination* destination = NULL;
-
MessageProducer* producer = NULL;
-
-
map<string, MessageProducer*>::iterator it;
-
it = producermap.find(dest);
-
if (it != producermap.end())
-
{
-
producer = it->second;
-
}
-
else
-
{
-
if (useTopic)
-
destination = session->createTopic(dest);
-
else
-
destination = session->createQueue(dest);
-
producer = session->createProducer(destination);
-
-
if (PersistentMode)
-
producer->setDeliveryMode(DeliveryMode::PERSISTENT);
-
else
-
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
-
-
producermap.insert(pair<string, MessageProducer*> (dest, producer));
-
}
-
-
BytesMessage* message = session->createBytesMessage((unsigned char*) msg.c_str(), msg.size());
-
// producer->send(message);
-
producer->send(message, (PersistentMode == true) ? DeliveryMode::PERSISTENT
-
: DeliveryMode::NON_PERSISTENT, priority, timeToLive);
-
-
delete message;
-
message = NULL;
-
return 0;
-
}
-
-
// 取消订阅
-
int ActiveMQAPI::UnSubscribe(const string& dest, const string& name) {
-
map<string, MessageConsumer*>::iterator it_c = consumermap.find(dest);
-
MessageConsumer* consumer = NULL;
-
if (it_c != consumermap.end())
-
{
-
consumer = it_c->second;
-
consumermap.erase(it_c);
-
delete consumer;
-
consumer = NULL;
-
}
-
-
map<Destination*, string>::iterator it;
-
Destination* destination = NULL;
-
for (it = destmap.begin(); it != destmap.end();)
-
{
-
Destination* k = it->first;
-
const string &v = it->second;
-
it++;
-
if (dest.compare(v) == 0)
-
{
-
destination = k;
-
destmap.erase(k);
-
-
try
-
{
-
if (NULL == destination)
-
{
-
printf(
-
"ActiveMQAPI(UnSubscribe)[%s]: destination[%p] is NULL",
-
dest.c_str(), destination);
-
return -1;
-
}
-
ActiveMQConnection* amqConnection =
-
dynamic_cast<ActiveMQConnection*> (connection);
-
amqConnection->destroyDestination(destination);
-
} catch (std::exception& e)
-
{
-
printf("ActiveMQAPI(UnSubscribe)[%s]: exception[%s]",
-
dest.c_str(), e.what());
-
return -1;
-
}
-
-
delete destination;
-
destination = NULL;
-
-
break;
-
}
-
}
-
-
if (name.size() > 0)
-
{
-
if (session == NULL)
-
return -1;
-
try
-
{
-
session->unsubscribe(name);
-
} catch (CMSException& e)
-
{
-
e.printStackTrace();
-
return -1;
-
}
-
}
-
return 0;
-
-
}
-
-
// 消息来时的回调函数
-
void ActiveMQAPI::onMessage(const Message* message) throw ()
-
{
-
try
-
{
-
const BytesMessage* msg = dynamic_cast<const BytesMessage*> (message);
-
unsigned char *buffer = NULL;
-
size_t buflen = 0;
-
string destname;
-
-
if (msg != NULL)
-
{
-
buffer = msg->getBodyBytes();
-
buflen = msg->getBodyLength();
-
map<Destination*, string>::iterator it;
-
for (it = destmap.begin(); it != destmap.end(); it++)
-
{
-
if (msg->getCMSDestination()->equals(*(it->first)))
-
{
-
destname = it->second;
-
}
-
}
-
}
-
else
-
{
-
printf("the msg is NULL");
-
return;
-
}
-
-
pCallBack(destname, buffer, buflen);
-
if (NULL != buffer)
-
{
-
delete[] buffer;
-
buffer = NULL;
-
}
-
-
}
-
catch (CMSException& e)
-
{
-
e.printStackTrace();
-
}
-
}
-
-
void ActiveMQAPI::onException(const CMSException& ex AMQCPP_UNUSED)
-
{
-
printf("ActiveMQAPI: CMS Exception occurred[%s]. Shutting down client.",
-
ex.what());
-
-
// exit(1);
-
}
-
-
void ActiveMQAPI::transportInterrupted()
-
{
-
printf("ActiveMQAPI: The Connection's Transport has been Interrupted.");
-
}
-
-
void ActiveMQAPI::transportResumed()
-
{
-
printf("ActiveMQAPI: The Connection's Transport has been Restored.");
-
}
-
-
void ActiveMQAPI::cleanup()
-
{
-
map<Destination*, string>::iterator it;
-
for (it = destmap.begin(); it != destmap.end();)
-
{
-
Destination* k = it->first;
-
it++;
-
destmap.erase(k);
-
delete k;
-
k = NULL;
-
}
-
-
map<string, MessageConsumer*>::iterator it_consumer;
-
for (it_consumer = consumermap.begin(); it_consumer != consumermap.end();)
-
{
-
const string &k = it_consumer->first;
-
const MessageConsumer* v = it_consumer->second;
-
it_consumer++;
-
if (v != NULL)
-
{
-
delete v;
-
v = NULL;
-
consumermap.erase(k);
-
}
-
}
-
-
if (session != NULL)
-
{
-
session->close();
-
delete session;
-
session = NULL;
-
}
-
if (connection != NULL)
-
{
-
connection->close();
-
delete connection;
-
connection = NULL;
-
}
-
}
5. 测试代码
点击(此处)折叠或打开 订阅topic
-
void MQCallBack(string dest_name, unsigned char *buffer,
-
size_t buflen)
-
{
-
printf("topic[%s] buffer[%s] \n", dest_name.c_str(), string((char*)buffer, buflen).c_str());
-
}
-
-
int main()
-
{
-
activemq::library::ActiveMQCPP::initializeLibrary();
-
//
-
ActiveMQAPI* mq = new ActiveMQAPI("failover:(tcp://XX:61616,tcp://XX:61616)?randomize=false", MQCallBack);
-
if(NULL == mq)
-
{
-
printf("new activeMQ error.");
-
return 0;
-
}
-
-
if(0 != mq->Init())
-
{
-
printf("init activeMQ failed.");
-
delete mq;
-
mq = NULL;
-
return 0;
-
}
-
-
mq->Subscribe("bond.mq", true);
-
-
while(true)
-
{
-
sleep(5);
-
}
-
-
mq->UnSubscribe("bond.mq");
-
delete mq;
-
mq = NULL;
-
activemq::library::ActiveMQCPP::shutdownLibrary();
-
return 0;
-
}
订阅后,可以通过 MQ提供的监控平台来查看是否订阅成功。在()主页的 “Connections”下面,通过IP找到运行订阅程序的主机,然后再点击进去,看是否有这个topic。
-
int main()
-
{
-
activemq::library::ActiveMQCPP::initializeLibrary();
-
//
-
ActiveMQAPI* mq = new ActiveMQAPI("failover:(tcp://XX:61616,tcp://XX:61616)?randomize=false", MQCallBack);
-
if(NULL == mq)
-
{
-
printf("new activeMQ error.");
-
return 0;
-
}
-
-
if(0 != mq->Init())
-
{
-
printf("init activeMQ failed.");
-
delete mq;
-
mq = NULL;
-
return 0;
-
}
-
-
int iRet = mq->PubMsg("bond.mq", "the message to send to the consumers.", true, true);
-
delete mq;
-
mq = NULL;
-
-
activemq::library::ActiveMQCPP::shutdownLibrary();
-
return 0;
-
}
阅读(5273) | 评论(0) | 转发(0) |