Chinaunix首页 | 论坛 | 博客
  • 博客访问: 904933
  • 博文数量: 322
  • 博客积分: 6688
  • 博客等级: 准将
  • 技术积分: 3626
  • 用 户 组: 普通用户
  • 注册时间: 2010-09-19 11:26
文章分类

全部博文(322)

文章存档

2013年(5)

2012年(66)

2011年(87)

2010年(164)

分类: Java

2010-11-10 09:48:19

关键字: activemq 例子

ProducerTool.java用于发送消息:

package com.google.homework;

 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
   
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
   
 public class ProducerTool {
   
     private String user = ActiveMQConnection.DEFAULT_USER;
   
     private String password = ActiveMQConnection.DEFAULT_PASSWORD;
   
     private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
   
     private String subject = "TOOL.DEFAULT";
   
     private Destination destination = null;
   
     private Connection connection = null;
   
     private Session session = null;
   
     private MessageProducer producer = null;
   
     // 初始化

     private void initialize() throws JMSException, Exception {
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                 user, password, url);
         connection = connectionFactory.createConnection();
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         destination = session.createQueue(subject);
         producer = session.createProducer(destination);
         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
     }
   
     // 发送消息

     public void produceMessage(String message) throws JMSException, Exception {
         initialize();
         TextMessage msg = session.createTextMessage(message);
         connection.start();
         System.out.println("Producer:->Sending message: " + message);
         producer.send(msg);
         System.out.println("Producer:->Message sent complete!");
     }
   
     // 关闭连接

     public void close() throws JMSException {
         System.out.println("Producer:->Closing connection");
         if (producer != null)
             producer.close();
         if (session != null)
             session.close();
         if (connection != null)
             connection.close();
     }
 }


ConsumerTool.java用于接受消息,我用的是基于消息监听的机制,需要实现MessageListener接口,这个接口有个onMessage方法,当接受到消息的时候会自动调用这个函数对消息进行处理。

 

package com.google.homework;

 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
 import javax.jms.MessageListener;
 import javax.jms.Message;
 import javax.jms.TextMessage;
   
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
   
 public class ConsumerTool implements MessageListener {
   
     private String user = ActiveMQConnection.DEFAULT_USER;
   
     private String password = ActiveMQConnection.DEFAULT_PASSWORD;
   
     private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
   
     private String subject = "TOOL.DEFAULT";
   
     private Destination destination = null;
   
     private Connection connection = null;
   
     private Session session = null;
   
     private MessageConsumer consumer = null;
   
     // 初始化

     private void initialize() throws JMSException, Exception {
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                 user, password, url);
         connection = connectionFactory.createConnection();
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         destination = session.createQueue(subject);
         consumer = session.createConsumer(destination);
            
     }
   
     // 消费消息

     public void consumeMessage() throws JMSException, Exception {
         initialize();
         connection.start();
            
         System.out.println("Consumer:->Begin listening...");
         // 开始监听

         consumer.setMessageListener(this);
         // Message message = consumer.receive();

     }
   
     // 关闭连接

     public void close() throws JMSException {
         System.out.println("Consumer:->Closing connection");
         if (consumer != null)
             consumer.close();
         if (session != null)
             session.close();
         if (connection != null)
             connection.close();
     }
   
     // 消息处理函数

     public void onMessage(Message message) {
         try {
             if (message instanceof TextMessage) {
                 TextMessage txtMsg = (TextMessage) message;
                 String msg = txtMsg.getText();
                 System.out.println("Consumer:->Received: " + msg);
             } else {
                 System.out.println("Consumer:->Received: " + message);
             }
         } catch (JMSException e) {
             // TODO Auto-generated catch block

             e.printStackTrace();
         }
     }
 }

如果想主动的去接受消息,而不用消息监听的话,把consumer.setMessageListener(this)改为Message message = consumer.receive(),手动去调用MessageConsumer的receive方法即可。

下面是测试类Test.java:


package com.google.homework;

 import javax.jms.JMSException;
   
 public class Test {
   
     /**
      * @param args
      */

     public static void main(String[] args) throws JMSException, Exception {
         // TODO Auto-generated method stub

         ConsumerTool consumer = new ConsumerTool();
         ProducerTool producer = new ProducerTool();
         // 开始监听

         consumer.consumeMessage();
            
         // 延时500毫秒之后发送消息

         Thread.sleep(500);
         producer.produceMessage("Hello, world!");
         producer.close();
            
         // 延时500毫秒之后停止接受消息

         Thread.sleep(500);
         consumer.close();
     }
 }


阅读(587) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~