一.几个名词:
消息:消息是对使用它的应用程序有意义的以字节为单位的字符串。消息可以用来实现在相同或不同平台上应用程序间的通信。
WebSphere MQ 消息由两个部分:
应用程序数据。
应用程序数据的内容和结构由使用它的应用程序定义。
消息描述符。
消息描述符标识消息,并包含其它控制信息,如消息类型和消息的优先级,
队列:队列是用于存储消息的数据机构,通俗点,队列是张表,消息是一条记录,从功能上队列划分为:
1.本地队列--存储本地消息和远程队列发送到本地的消息。(假设本地机器为A)
2.远程队列--与A机器相互通信的B机器上的本地队列在A上的一个映射,它的作用的发送消息到远程,下文中重点介绍。
3.传输队列,实质是一本地队列,特殊之处是临时存储将要发往远程队列的消息,类似与JavaBean的作用。一个传输队列是两个队列管理器之间的连接的一端。所有直接目的地是同一队列管理器的消息都可放在同一个传输队列上,这些消息的最终目的可能不一样。把消息从一个队列管理器传送到另一个队列管理器只需要一个传输队列,然而也有可能在两个队列管理器之间存在着多个连接以提供不同的传输服务,每个连接都带有一个不同的传输队列。
4.MCA--MCA实际上是处理传输队列上消息的MQI应用程序。传输队列是由MCA处理的,MCA负责在队列管理器之间可靠地传送消息。
5.动态队列和模板队列--除了有固定定义的队列之外,WebSphere MQ还为程序在它们执行时提供了动态地创建队列的能力。例如,一个应用程序作为某种服务的客户,它可能创建一个动态队列,并通知服务器把对服务要求的响应发送到该动态队列。当然,这种情况也可以使用具有永久定义的队列。为了简化在创建动态队列时所必需设置的许多参数,动态队列总是基于模板队列被创建的,模板队列定义了动态队列的所有属性。当应用程序试图打开一个模板队列时,WebSphere MQ就创建一个动态队列。WebSphere MQ为应用提供了系统模板队列。
6.死信队列--死信(未传递的消息)队列是存储无法发送到其正确目的地的消息的队列。有时候会出现队列管理器不能把消息发送到目的地的情况,此时消息将被发送到某个死信队列中。
7.通道--消息通道是一种提供从一个队列管理器到另一个队列管理器的通信路径。消息通道用在分布式的队列把消息从一个队列管理器发送到另一个队列管理器。它们使应用程序屏蔽了底层的通信协议。队列管理器可能存在同种或异种平台之间。为了实现队列管理器之间的通信,您必需在一个队列管理器中定义一个发送消息的通道对象,在另一个队列管理器中定义一个接收消息的通道对象。消息通道是一个单向链接。它通过消息通道代理(message channel agents)把两个队列管理器连接起来。不要和MQI通道(MQI channel)通道混淆。MQI通道有两种类型,分别是服务器连接(server-connection)和客户器连接(client-connection)。消息通道的定义可以分为以下6种类型
发送通道(Sender),接收通道(Receiver),服务器通道(Server),请求器通道(Requester),群集发送通道(Cluster sender),群集接收通道(Cluster receiver)
二。几个模型
1.发送消息到本地
设置队列管理器 QM_APPLE 和队列 Q1。在设置了这些对象之后,您将使用已提供的程序 amqsput 把测试消息放入队列,并且使用另一个程序 amqsget 来验证是否已接收到该测试消息。
通过输入以下命令来创建名为 QM_APPLE 的缺省队列管理器:
crtmqm -q QM_APPLE
此时会显示消息,告诉您已经创建了队列和缺省 WebSphere MQ 对象。
通过输入以下命令来启动此队列管理器:
strmqm
此时会显示一条消息,告诉您何时启动了该队列管理器。
通过输入以下命令来启用 MQSC 命令:
runmqsc
输入以下命令:
define qlocal (Q1)
此时会显示消息,告诉您已经创建了队列和缺省 WebSphere MQ 对象。
通过输入以下命令来停止 MQSC:
end
您现在已经创建了名为 Q1 的本地队列。下一个任务是将测试消息放入此新创建的本地队列中。
启动 amqsput 样本程序,如下所示:
在 Linux 上,切换到 /opt/mqm/samp/bin 目录,然后输入命令:./amqsput Q1
在 Windows 上,输入命令:amqsput Q1
此时会显示以下消息:
样本 amqsput0 启动
目标队列为 Q1
在一行或多行上输入某些消息文本,然后按 Enter 键两次。此时会显示以下消息:
样本 amqsput0 结束
您现在已经创建了测试消息并将其放入本地队列。下一个任务是验证是否已接收到测试消息。
打开命令提示符,然后按照以下步骤操作:
启动 amqsget 样本程序:
在 Windows 上,输入以下命令:amqsget Q1
在 Linux 上,切换到 /opt/mqm/samp/bin 目录,然后输入以下命令:./amqsget Q1
此时会启动该样本程序,并显示您的消息以及此队列上的任何其它消息。在暂停 15 秒钟后,样本程序结束,并再次显示命令提示符。
2.发送消息到远程
一台计算机上的队列管理器QM_ORANGE 与另一台计算机上的队列管理器 QM_APPLE 之间的消息传递,在发送机器上打开命令提示符,然后按照以下步骤操作:
通过输入以下命令来创建名为 QM_ORANGE 的缺省队列管理器:
crtmqm -q QM_ORANGE
此时会显示消息,告诉您已经创建了队列和缺省 WebSphere MQ 对象。
通过输入以下命令来启动此队列管理器:
strmqm
此时会显示一条消息,告诉您何时启动了该队列管理器。
现在,您必须通过输入以下命令来创建侦听器:
runmqlsr -m QM_APPLE -t TCP -p (port number)
注:如果未使用 -p 参数指定任何端口,则使用缺省端口 1414,但请注意,不能对发送方和接收方使用相同的端口号,因此,如果其中一个为 1414,那么另一个就应该是 1415。
您现在已经创建了发送队列管理器。下一个任务是在此队列管理器上创建队列。
在发送机器上打开命令提示符,然后按照以下步骤操作:
通过输入以下命令来启动 MQSC:
runmqsc
此时会显示一条消息,告诉您 MQSC 会话已启动。
通过输入以下命令来定义名为 QM_APPLE 的本地队列:
define qlocal (QM_APPLE) usage (xmitq)
此时会显示一条消息,告诉您何时创建了该队列。
通过输入以下命令来定义远程队列定义:
define qremote (Q1) rname (Q1) rqmname(QM_APPLE) xmitq (QM_APPLE)
您现在已经在发送队列管理器上创建了队列。下一个任务是创建发送队列管理器和接收队列管理器之间的消息通道。通过输入以下命令来启动 MQSC:
runmqsc
此时会显示一条消息,告诉您 MQSC 会话已启动。
通过输入以下命令来定义接收通道:
define channel (QM_ORANGE.QM_APPLE) chltype (RCVR) trptype (TCP)
此时会显示一条消息,告诉您何时创建了该通道。
通过输入以下命令来停止 MQSC:
end
将显示一些消息,然后会显示命令提示符。
在发送机器上打开命令提示符,然后按照以下步骤操作:
通过输入以下命令来启动 MQSC:
runmqsc
此时会显示一条消息,告诉您 MQSC 会话已启动。
通过输入以下命令来定义发送方通道:
define channel (QM_ORANGE.QM_APPLE) chltype (sdr) conname ('con-name') xmitq (QM_APPLE) trptype (tcp)
值 con-name 是接收方队列管理器的 TCP 地址。
通过输入以下命令来启动通道:
start channel (QM_ORANGE.QM_APPLE)
通过输入以下命令来停止 MQSC:
end
将显示一些消息,然后会显示命令提示符。
您现在已经创建了将消息从发送队列管理器 QM_ORANGE 发送到接收队列管理器 QM_APPLE 上的队列 Q1 所需的全部 WebSphere MQ 对象。下一个任务是发送测试消息。在发送机器(主管队列管理器 QM_ORANGE 的机器)上执行此任务。
使用 amqsput 样本程序来将消息放入您创建的队列。
在 Windows 上,缺省情况下样本程序随 WebSphere MQ 服务器或客户机一起安装。在 Linux 上,需要安装样本程序 RPM。
打开命令提示符,然后按照以下步骤操作:
启动 amqsput 样本程序,如下所示:
在 Linux 上,切换到 /opt/mqm/samp/bin 目录,然后输入命令:./amqsput Q1
在 Windows 上,输入命令:amqsput Q1
此时会显示以下消息:
样本 amqsput0 启动
目标队列为 Q1
在一行或多行上输入某些消息文本,然后按 Enter 键两次。此时会显示以下消息:
样本 amqsput0 结束
您在接收机器(主管队列管理器 QM_APPLE 的机器)上执行此任务。使用 amqsget 样本程序来从队列中取回消息。
打开命令提示符,然后按照以下步骤操作:
启动 amqsget 样本程序,如下所示:
在 Linux 上,切换到 /opt/mqm/samp/bin 目录,然后输入命令:./amqsget Q1
在 Windows 上,输入命令:amqsget Q1
此时会启动该样本程序,并显示您的消息以及此队列上的任何其它消息。在短暂停留后,样本程序结束,并再次显示命令提示符。
3.在客户机 - 服务器配置上发送消息
在接收机器上打开命令提示符,然后按照以下步骤操作:
通过输入以下命令来启动 MQSC:
runmqsc
此时会显示一条消息,告诉您 MQSC 会话已启动。MQSC 没有命令提示符。
在一行中输入以下命令来定义服务器连接通道:
define channel(CLIENT.QM_ORANGE) chltype(SVRCONN) trptype(TCP) mcauser('mqm')
Windows 用户应输入 Windows 登录名(或有效的 mqm 用户名)替换 mqm。
此时会显示一条消息,告诉您何时创建了该通道。
通过输入以下命令来停止 MQSC:
end
将显示一些消息,然后会显示命令提示符。
通过输入以下命令来启动侦听器:
runmqlsr -t tcp
您现在已经完成设置服务器。下一个任务是设置客户机。
在设置客户机以与队列管理器 QM_ORANGE 进行通信之前,必须确保已经在客户机上安装了 WebSphere MQ 客户机。
在此部分的教程中,您将使用 MQSERVER 环境变量来设置客户机组件。您将需要从系统管理员处了解主管队列管理器 QM_ORANGE 的机器的网络名。
如果客户机位于 Windows 上:
打开控制面板:单击开始 > 设置 > 控制面板。
双击系统。
单击高级选项卡。
单击环境变量。
在“用户变量”窗格中,单击新建。
在“变量名”字段中输入 MQSERVER。
在“变量值”字段中输入 CLIENT.QM_ORANGE/TCP/hostname,其中,hostname 是标识主管队列管理器 QM_ORANGE 的机器的计算机名称或 IP 地址。如果不使用缺省端口号 1414,您还必须指定侦听器要侦听的端口号,例如:MQSERVER=CLIENT.QM_ORANGE/TCP/hostname (1415)
单击确定。MQSERVER 环境变量将出现在“用户变量”窗格中。
如果客户机位于 Linux 上:
以将要运行 Express File Transfer 的用户身份登录,该用户必须是 mqm 组的成员。
打开命令提示符。
输入 cd $HOME。
使用文本编辑器来编辑概要文件。此示例假定您正在使用 bash shell,因此您需要编辑文件 $HOME/.bashrc。但是,如果您使用的是其它系统 shell,则请参阅系统文档。将以下文本添加到文件末尾:
MQSERVER=CLIENT.QM_ORANGE/TCP/'hostname' export MQSERVER
使用标识网络上服务器的名称替换 hostname。
关闭命令提示符。
注销并重新登录以使更改生效。
您现在已经设置了所需的客户机和服务器组件。下一个任务是将消息从客户机发送到服务器队列管理器 QM_ORANGE。
在客户机上打开命令提示符,然后按照以下步骤操作:
启动 amqsputc 样本程序,如下所示:
在 Linux 上,切换到 /opt/mqm/samp/bin 目录,然后输入命令:./amqsputc Q1
在 Windows 上,输入命令:amqsputc Q1
此时会显示以下消息:
样本 AMQSPUT0 启动
目标队列为 Q1
在一行或多行上输入某些消息文本,然后按 Enter 键两次。此时会显示以下消息:
样本 AMQSPUT0 结束
您现在已经创建了测试消息,且已将其发送到服务器队列管理器 QM_ORANGE,该队列管理器会将消息传递至队列管理器 QM_APPLE 上的队列 Q1。下一个任务是验证是否已接收到测试消息。
使用 MQSC 验证是否已发送测试消息
使用 amqsget 样本程序来从队列中取回消息。
打开命令提示符,然后启动 amqsget 样本程序,如下所示:
在 Windows 上,输入以下命令:amqsget Q1
在 Linux 上,切换到 /opt/mqm/samp/bin 目录,然后输入以下命令:./amqsget Q1
此时会启动该样本程序,并显示您的消息以及此队列上的任何其它消息。在暂停 15 秒钟后,样本程序结束,并再次显示命令提示符。
三.编程调用(针对本地队列发送消息,远程的也很简单,参照上模型)
1.通过底层API调用
import com.ibm.mq.*;
public class FirstMqTest {
// public static void main(String[] args[]){
// FirstMqTest first = new FirstMqTest();
// first.test();
// }
public static void main(String args[]){
FirstMqTest first = new FirstMqTest();
first.test();
}
public void test(){
String qManager = "QM_ORANGE"; //QueueManager name
String qName = "Q2";//Queue Name
try {
//configure connection parameters
MQEnvironment.hostname="127.0.0.1";//MQ Server name or IP
//MQEnvironment.port=1414;//listenr port
MQEnvironment.channel="BridgeChannel";//Server-Connection Channel
MQEnvironment.CCSID =1381;
// Create a connection to the QueueManager
System.out.println("Connecting to queue manager: "+qManager);
MQQueueManager qMgr = new MQQueueManager(qManager);
// Set up the options on the queue we wish to open
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;
// Now specify the queue that we wish to open and the open options
System.out.println("Accessing queue: "+qName);
MQQueue queue = qMgr.accessQueue(qName, openOptions);
// Define a simple WebSphere MQ Message ...
MQMessage msg = new MQMessage();
// ... and write some text in UTF8 format
msg.writeUTF("Hello, World!");
// Specify the default put message options
MQPutMessageOptions pmo = new MQPutMessageOptions();
// Put the message to the queue
System.out.println("Sending a message...");
/*
* 在此测试一下 mq 的传输次列
*
*/
for(int j=0;j<5;j++){
String str ="test11111111111";
str = str+j;
msg.writeUTF(str);
queue.put(msg, pmo);
}
queue.put(msg, pmo);
// Now get the message back again. First define a WebSphere MQ message
// to receive the data
MQMessage rcvMessage = new MQMessage();
// Specify default get message options
MQGetMessageOptions gmo = new MQGetMessageOptions();
// Get the message off the queue.
System.out.println("...and getting the message back again");
queue.get(rcvMessage, gmo);
// And display the message text...
String msgText = rcvMessage.readUTF();
System.out.println("The message is: " + msgText);
// Close the queue
System.out.println("Closing the queue");
queue.close();
// Disconnect from the QueueManager
System.out.println("Disconnecting from the Queue Manager");
qMgr.disconnect();
System.out.println("Done!");
}
catch (MQException ex) {
System.out.println("A WebSphere MQ Error occured : Completion Code "
+ ex.completionCode + " Reason Code " + ex.reasonCode);
}
catch (java.io.IOException ex) {
System.out.println("An IOException occured whilst writing to the message buffer: "
+ ex);
}
}
}
2.JMS调用
准备工作
JNDI配置,此处采用JNDI查找的方式,当然你也可以不用配置JNDI,在程序中定义,was mq在6.0后可以在MQ资源管理器中配置JMS所需的JNDI,5.3中没有此功能,找到WAS MQ安装目录,%WAS_MQ%\Java\bin目录下JMSAdmin.config,修改INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactory
PROVIDER_URL=file:/D:/Program Files/IBM/WebSphere MQ/Java/bin
注:D:/Program Files/IBM/WebSphere MQ/Java/bin是我机器上MQ安装目录
运行JMSAdmin.bat
输入:
def qcf(jms/QueueConnectionFactory) qmgr(QM_APPLE)
def q(jms/Queue) queue(Q1) ccsid(1381) encoding(RRR)
工具类
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnectionFactory;
// JNDI 类
import javax.naming.InitialContext;
import javax.naming.Context;
import javax.naming.NamingException;
// 标准 Java类
import java.util.Hashtable;
public class JNDIUtil
{
private Context context;
public JNDIUtil(String icf, String url) throws JMSException,
NamingException
{
Hashtable environment = new Hashtable();
environment.put(Context.INITIAL_CONTEXT_FACTORY, icf);
environment.put(Context.PROVIDER_URL, url);
context = new InitialContext(environment);
}
private Object getObjectByName(String ObjName) throws NamingException
{
return context.lookup(ObjName);
}
public QueueConnectionFactory getQueueConnectionFactory(String factoryName)
throws NamingException
{
return (QueueConnectionFactory) getObjectByName(factoryName);
}
public Queue getQueue(String queueName) throws NamingException
{
return (Queue) getObjectByName(queueName);
}
}
测试类
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.NamingException;
import mht.tools.JNDIUtil;
public class Tester
{
public static String icf = "com.sun.jndi.fscontext.RefFSContextFactory";
public static String url = "file:/D:/Program Files/IBM/WebSphere MQ/Java/bin";
public static void main(String[] vars) throws JMSException, NamingException
{
QueueSession session = null;
QueueConnection connection = null;
QueueConnectionFactory factory = null;
QueueSender queueSender = null;
QueueReceiver queueReceiver = null;
Queue oQueue = null; // 消息发送到的队列
Queue iQueue = null; // 接收消息的队列
try
{
JNDIUtil jndiUtil = new JNDIUtil(icf, url);
factory = jndiUtil.getQueueConnectionFactory("jms/QueueConnectionFactory");
connection = factory.createQueueConnection();
// 启动(或重新启动)入站消息的连接地址,如果没有这个调用消息不会被接收
connection.start();
// 表示一个非相互操作会话
boolean transacted = false;
session = connection.createQueueSession(transacted,
Session.AUTO_ACKNOWLEDGE);
oQueue = jndiUtil.getQueue("jms/Queue");
queueSender = session.createSender(oQueue);
TextMessage oMsg = session.createTextMessage();
oMsg.setText("你好世界!");
// 你还可以设置其他消息属性
queueSender.send(oMsg);
iQueue = jndiUtil.getQueue("jms/Queue");
queueReceiver = session.createReceiver(iQueue);
Message iMsg = queueReceiver.receive(1000);
if (iMsg != null)
System.out.println(((TextMessage) iMsg).getText());
else
System.out.println("No messages in queue ");
}
finally
{
// 总是释放资源
if (queueReceiver != null)
queueReceiver.close();
if (queueSender != null)
queueSender.close();
if (session != null)
session.close();
if (connection != null)
{
connection.close();
}
}
}
}
阅读(1849) | 评论(0) | 转发(0) |