Chinaunix首页 | 论坛 | 博客
  • 博客访问: 355172
  • 博文数量: 81
  • 博客积分: 95
  • 博客等级: 民兵
  • 技术积分: 450
  • 用 户 组: 普通用户
  • 注册时间: 2007-10-18 20:40
文章分类

全部博文(81)

文章存档

2015年(25)

2014年(32)

2013年(2)

2012年(18)

2011年(4)

分类: Java

2014-08-24 08:15:43

原文地址:可嵌入式JMS消息队列FFMQ 作者:ruknow


之前开发的一个小项目(实时数据采集)中,采用了自定义报文的Socket通信方式(Netty),对通信异常时的数据处理方式不是很完善,容易造成数据丢失,
自己动手解决数据传递的可靠性与稳定性,编程还是比较麻烦,于是想到了在应用中嵌入小型JMS服务器的方式,以简化编程。

经过几个开源项目比较,最终发现合适的项目FFMQ:,项目大小才600KB,支持JMS1.1规范


以下代码仅用于供测试参考,不具备生产环境下的严谨,具体FFMQ配置请看说明文档(下载包中有)


点击(此处)折叠或打开

  1. import java.io.FileInputStream;
  2. import java.util.Properties;

  3. import net.timewalker.ffmq3.listeners.ClientListener;
  4. import net.timewalker.ffmq3.listeners.tcp.io.TcpListener;
  5. import net.timewalker.ffmq3.local.FFMQEngine;
  6. import net.timewalker.ffmq3.management.destination.definition.QueueDefinition;
  7. import net.timewalker.ffmq3.management.destination.definition.TopicDefinition;
  8. import net.timewalker.ffmq3.utils.Settings;

  9. /**
  10.  * Embedded FFMQ sample
  11.  */
  12. public class EmbeddedFFMQSample implements Runnable
  13. {
  14.     private FFMQEngine engine;
  15.     
  16.     public void run()
  17.     {
  18.         try
  19.         {
  20.             // Create engine settings
  21.             Settings settings = createEngineSettings();
  22.             
  23.             // Create the engine itself
  24.             engine = new FFMQEngine("myLocalEngineName", settings);
  25.             // -> myLocalEngineName will be the engine name.
  26.             // - It should be unique in a given JVM
  27.             // - This is the name to be used by local clients to establish
  28.             // an internal JVM connection (high performance)
  29.             // Use the following URL for clients : vm://myLocalEngineName
  30.             //
  31.             
  32.             // Deploy the engine
  33.             System.out.println("Deploying engine : "+engine.getName());
  34.             engine.deploy();
  35.             // - The FFMQ engine is not functional until deployed.
  36.             // - The deploy operation re-activates all persistent queues
  37.             // and recovers them if the engine was not properly closed.
  38.             // (May take some time for large queues)

  39.             // Adding a TCP based client listener
  40.             System.out.println("Starting listener ...");
  41.             ClientListener tcpListener = new TcpListener(engine,"0.0.0.0",10002,settings,null);
  42.             tcpListener.start();
  43.             
  44.             // This is how you can programmatically define a new queue
  45.             if (!engine.getDestinationDefinitionProvider().hasQueueDefinition("foo1"))
  46.             {
  47.                 QueueDefinition queueDef = new QueueDefinition(settings);
  48.                 queueDef.setName("foo2");
  49.                 queueDef.setMaxNonPersistentMessages(0);
  50.                 queueDef.setOverflowToPersistent(false);
  51.                 queueDef.setPreAllocateFiles(true);
  52.                 queueDef.setTemporary(false);
  53.                 queueDef.setUseJournal(true);
  54.                 queueDef.setAutoExtendAmount(128);
  55.                 queueDef.setInitialBlockCount(32);
  56.                 queueDef.setMaxBlockCount(1024);
  57.                 queueDef.check();
  58.                 engine.createQueue(queueDef);
  59.             }
  60.             
  61.             // You could also define a queue using some java Properties
  62.             if (!engine.getDestinationDefinitionProvider().hasQueueDefinition("foo2"))
  63.             {
  64.                 Properties queueProps = new Properties();
  65.                 queueProps.put("name", "foo2");
  66.                 queueProps.put("persistentStore.useJournal", "false");
  67.                 queueProps.put("memoryStore.maxMessages", "1000");
  68.                 QueueDefinition queueDef2 = new QueueDefinition(new Settings(queueProps));
  69.                 engine.createQueue(queueDef2);
  70.             }
  71.             
  72.             if(!engine.getDestinationDefinitionProvider().hasTopicDefinition("foox")) {
  73.                 TopicDefinition topicDef = new TopicDefinition(settings);
  74.                 topicDef.setName("foox");
  75.                 topicDef.setMaxNonPersistentMessages(0);
  76.                 topicDef.setOverflowToPersistent(false);
  77.                 topicDef.setPreAllocateFiles(true);
  78.                 topicDef.setTemporary(false);
  79.                 topicDef.setUseJournal(true);
  80.                 topicDef.check();
  81.                 engine.createTopic(topicDef);
  82.             }
  83.             
  84.             // Run for some time
  85.             System.out.println("Running ...");
  86.             Thread.sleep(60*1000);
  87.             
  88.             // Stopping the listener
  89.             System.out.println("Stopping listener ...");
  90.             tcpListener.stop();
  91.             
  92.             // Undeploy the engine
  93.             System.out.println("Undeploying engine ...");
  94.             engine.undeploy();
  95.             // - It is important to properly shutdown the engine
  96.             // before stopping the JVM to make sure current transactions
  97.             // are nicely completed and storages properly closed.
  98.             
  99.             System.out.println("Done.");
  100.         }
  101.         catch (Exception e)
  102.         {
  103.             // Oops
  104.             e.printStackTrace();
  105.         }
  106.     }
  107.     
  108.     private Settings createEngineSettings()
  109.     {
  110.         // Various ways of creating engine settings
  111.         
  112.         // 1 - From a properties file
  113.         Properties externalProperties = new Properties();
  114.         try
  115.         {
  116.             FileInputStream in = new FileInputStream("D:\\ffmq3-distribution-3.0.5-dist\\conf\\ffmq-server.properties");
  117.             externalProperties.load(in);
  118.             in.close();
  119.         }
  120.         catch (Exception e)
  121.         {
  122.             throw new RuntimeException("Cannot load external properties",e);
  123.         }
  124.         Settings settings = new Settings(externalProperties);
  125.         
  126.         // 2 - Explicit Java code
  127. // Settings settings = new Settings();
  128. //
  129. // settings.setStringProperty(FFMQCoreSettings.DESTINATION_DEFINITIONS_DIR, ".");
  130. // settings.setStringProperty(FFMQCoreSettings.BRIDGE_DEFINITIONS_DIR, ".");
  131. // settings.setStringProperty(FFMQCoreSettings.TEMPLATES_DIR, ".");
  132. // settings.setStringProperty(FFMQCoreSettings.DEFAULT_DATA_DIR, ".");
  133. // ...
  134.         
  135.         return settings;
  136.     }
  137.     
  138.     public static void main(String[] args)
  139.     {
  140.         System.setProperty("FFMQ_BASE", "D:\\ffmq3-distribution-3.0.5-dist");
  141.         
  142.         new EmbeddedFFMQSample().run();
  143.     }
  144. }

模拟发送:

点击(此处)折叠或打开

  1. import java.util.Hashtable;
  2. import java.util.Random;

  3. import javax.jms.Connection;
  4. import javax.jms.ConnectionFactory;
  5. import javax.jms.Destination;
  6. import javax.jms.JMSException;
  7. import javax.jms.MessageProducer;
  8. import javax.jms.Queue;
  9. import javax.jms.Session;
  10. import javax.jms.TextMessage;
  11. import javax.naming.Context;
  12. import javax.naming.InitialContext;
  13. import javax.naming.NamingException;

  14. import net.timewalker.ffmq3.FFMQConstants;

  15. public class Sender implements Runnable {

  16.     public static void main(String[] args) throws Exception {
  17.         new Thread(new Sender("queue/foo1", "1")).start();
  18.         new Thread(new Sender("queue/foo2", "2")).start();
  19.         Thread.sleep(10000);
  20.         run = false;
  21.         Thread.sleep(1000);
  22.     }
  23.     
  24.     private static volatile boolean run = true;
  25.     private String queueName;
  26.     private String qtmId;

  27.     private Sender(String queueName, String qtmId) {
  28.         super();
  29.         this.queueName = queueName;
  30.         this.qtmId = qtmId;
  31.     }

  32.     @Override
  33.     public void run() {
  34.         try {
  35.             // Create and initialize a JNDI context
  36.             Hashtable<String,String> env = new Hashtable<>();
  37.             env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY);
  38.             env.put(Context.PROVIDER_URL, "tcp://localhost:10002");
  39.             Context context = new InitialContext(env);

  40.             // Lookup a connection factory in the context
  41.             ConnectionFactory connFactory = (ConnectionFactory)context.lookup(FFMQConstants.JNDI_CONNECTION_FACTORY_NAME);

  42.             // Obtain a JMS connection from the factory
  43.             Connection conn = connFactory.createConnection("test","test");
  44.             conn.start();
  45.             
  46.             Destination dest1=(Queue) context.lookup(queueName);

  47.             Session session=conn.createSession(false,Session.AUTO_ACKNOWLEDGE);

  48.             
  49.             Random rnd = new Random(System.currentTimeMillis());
  50.             long ms = (long)rnd.nextFloat() * 10 * 1000;
  51.             if(ms > 8000) {
  52.                 ms /= 2;
  53.             } else if(ms < 1000) {
  54.                 ms = 1500;
  55.             }
  56.             
  57.             int i = 1;
  58.             
  59.             MessageProducer p = session.createProducer(dest1);
  60.             while (run) {
  61.                 TextMessage msg = session.createTextMessage();
  62.                 String t = "[" + qtmId + "] Hello " + queueName + " " + i++;
  63.                 System.out.println("sended..." + t);
  64.                 msg.setStringProperty("QTMID", qtmId);
  65.                 msg.setText(t);
  66.                 p.send(msg);
  67.                 Thread.sleep(ms);
  68.             }
  69.             p.close();
  70.             session.close();
  71.             
  72.             conn.close();
  73.             context.close();
  74.         } catch (NamingException e) {
  75.             e.printStackTrace();
  76.         } catch (JMSException e) {
  77.             e.printStackTrace();
  78.         } catch (InterruptedException e) {
  79.             e.printStackTrace();
  80.         }
  81.         
  82.     }

  83. }

模拟接收:

点击(此处)折叠或打开

  1. import java.util.Hashtable;

  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.Message;
  7. import javax.jms.MessageConsumer;
  8. import javax.jms.Queue;
  9. import javax.jms.Session;
  10. import javax.jms.TextMessage;
  11. import javax.naming.Context;
  12. import javax.naming.InitialContext;

  13. import net.timewalker.ffmq3.FFMQConstants;


  14. public class Receiver implements Runnable {

  15.     private static volatile boolean run = true;
  16.     
  17.     public static void main(String[] args) throws Exception {
  18.         new Thread(new Receiver()).start();
  19.         Thread.sleep(10000);
  20.         run = false;
  21.         Thread.sleep(2000);
  22.     }
  23.     
  24.     private Connection conn;
  25.     private Session session;
  26.     private MessageConsumer consumer;
  27.     
  28.     private void init() throws Exception {
  29.             // Create and initialize a JNDI context
  30.             Hashtable<String,String> env = new Hashtable<>();
  31.             env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY);
  32.             env.put(Context.PROVIDER_URL, "tcp://localhost:10002");
  33.             Context context = new InitialContext(env);
  34.             
  35.             // Lookup a connection factory in the context
  36.             ConnectionFactory connFactory = (ConnectionFactory)context.lookup(FFMQConstants.JNDI_CONNECTION_FACTORY_NAME);
  37.             Destination dest1=(Queue) context.lookup("queue/foo2");
  38.             context.close();
  39.             
  40.             
  41.             // Obtain a JMS connection from the factory
  42.             conn = connFactory.createConnection("test", "test");
  43.             
  44.             conn.start();
  45.             
  46.             session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
  47.             consumer = session.createConsumer(dest1);
  48.             
  49.             System.err.println("INIT.........");
  50.     }
  51.     
  52.     private void destory() {
  53.         try {
  54.             consumer.close();
  55.             session.close();
  56.             conn.stop();
  57.             conn.close();
  58.             System.err.println("EXIT........REC");
  59.         } catch (JMSException e) {
  60.             e.printStackTrace();
  61.         }
  62.     }

  63.     public void run() {
  64.             try {
  65.                 init();
  66.                 
  67. //                consumer.setMessageListener(new MessageListener() {
  68. //                    @Override
  69. //                    public void onMessage(Message m) {
  70. //                        try {
  71. //                            System.err.println("receive: " + ((TextMessage) m).getText());
  72. //                        } catch (JMSException e) {
  73. //                            e.printStackTrace();
  74. //                        }
  75. //                    }
  76. //                });
  77.                 while(run) {
  78. //                    Thread.sleep(500);
  79.                     Message m = consumer.receive(500);
  80.                     if(m != null) {
  81.                         System.err.println("receive: " + ((TextMessage) m).getText());
  82.                     }
  83.                 }

  84.             } catch (Exception e) {
  85.                 e.printStackTrace();
  86.             } finally {
  87.                 destory();
  88.             }
  89.         
  90.     }

  91. }

主题订阅:

点击(此处)折叠或打开

  1. package topic;

  2. import java.util.Hashtable;

  3. import javax.jms.JMSException;
  4. import javax.jms.Message;
  5. import javax.jms.MessageListener;
  6. import javax.jms.Session;
  7. import javax.jms.TextMessage;
  8. import javax.jms.Topic;
  9. import javax.jms.TopicConnection;
  10. import javax.jms.TopicConnectionFactory;
  11. import javax.jms.TopicSession;
  12. import javax.jms.TopicSubscriber;
  13. import javax.naming.Context;
  14. import javax.naming.InitialContext;

  15. import net.timewalker.ffmq3.FFMQConstants;

  16. public class SubClient implements Runnable {

  17.     private String topicName;
  18.     private String qtmId;
  19.     private TopicConnection conn;
  20.     private TopicSession session;    
  21.     private TopicSubscriber subscriber;
  22.     
  23.     public static void main(String[] args) throws Exception {
  24.         for(int i = 1; i < 5; i++) {
  25.             new Thread(new SubClient("topic/foox", String.valueOf(i))).start();
  26.         }
  27.         System.out.println(Thread.currentThread() + " EEXIT");
  28.     }
  29.     
  30.     private SubClient(String topicName, String qtmId) {
  31.         super();
  32.         this.topicName = topicName;
  33.         this.qtmId = qtmId;
  34.     }

  35.     private void init() throws Exception {
  36.         // Create and initialize a JNDI context
  37.         Hashtable<String,String> env = new Hashtable<>();
  38.         env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY);
  39.         env.put(Context.PROVIDER_URL, "tcp://localhost:10002");
  40.         Context context = new InitialContext(env);
  41.         
  42.         // Lookup a connection factory in the context
  43.         TopicConnectionFactory connFactory = (TopicConnectionFactory)context.lookup(FFMQConstants.JNDI_TOPIC_CONNECTION_FACTORY_NAME);
  44.         Topic topic = (Topic) context.lookup(topicName);
  45.         context.close();
  46.         
  47.         // Obtain a JMS connection from the factory
  48.         conn = connFactory.createTopicConnection("test","test");
  49.         
  50.         session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
  51.         String selector = "(QTMID = '" + qtmId + "')";
  52.         System.out.println("Selector: " + selector);
  53.         subscriber = session.createSubscriber(topic, selector, false);
  54.         
  55.         System.err.println("INIT.........");
  56.     }
  57.     
  58.     private void destory() {
  59.         try {
  60.             subscriber.close();
  61.             session.close();
  62.             conn.stop();
  63.             conn.close();
  64.             System.err.println(Thread.currentThread() + " Client EXIT........REC");
  65.         } catch (JMSException e) {
  66.             e.printStackTrace();
  67.         }
  68.     }
  69.     
  70.     @SuppressWarnings("static-access")
  71.     @Override
  72.     public void run() {
  73.         try {
  74.             init();
  75.             
  76.             subscriber.setMessageListener(new MessageListener() {
  77.                 @Override
  78.                 public void onMessage(Message m) {
  79.                     try {
  80.                         System.err.println(Thread.currentThread() + " Client " + qtmId + " Subscriber receive: " + ((TextMessage) m).getText());
  81.                     } catch (JMSException e) {
  82.                         e.printStackTrace();
  83.                     }
  84.                 }
  85.             });
  86.             
  87.             conn.start();
  88.             
  89.             Thread.currentThread().sleep(10000);
  90.     
  91.         } catch (Exception e) {
  92.             e.printStackTrace();
  93.         } finally {
  94.             destory();
  95.         }
  96.     
  97.     }

  98. }


持久订阅:

点击(此处)折叠或打开

  1. package topic;

  2. import java.util.Hashtable;

  3. import javax.jms.JMSException;
  4. import javax.jms.Message;
  5. import javax.jms.MessageListener;
  6. import javax.jms.Session;
  7. import javax.jms.TextMessage;
  8. import javax.jms.Topic;
  9. import javax.jms.TopicConnection;
  10. import javax.jms.TopicConnectionFactory;
  11. import javax.jms.TopicSession;
  12. import javax.jms.TopicSubscriber;
  13. import javax.naming.Context;
  14. import javax.naming.InitialContext;

  15. import net.timewalker.ffmq3.FFMQConstants;

  16. public class SubServer implements Runnable {

  17.     private String topicName;
  18.     
  19.     /**
  20.      * @param args
  21.      */
  22.     public static void main(String[] args) throws Exception {
  23.         new Thread(new SubServer("topic/foox")).start();
  24.         System.out.println(Thread.currentThread() + " main exit");
  25.     }

  26.     private TopicConnection conn;
  27.     private TopicSession session;    
  28.     private TopicSubscriber subscriber;
  29.     
  30.     
  31.     
  32.     private SubServer(String topicName) {
  33.         super();
  34.         this.topicName = topicName;
  35.     }

  36.     private void init() throws Exception {
  37.             // Create and initialize a JNDI context
  38.             Hashtable<String,String> env = new Hashtable<>();
  39.             env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY);
  40.             env.put(Context.PROVIDER_URL, "tcp://localhost:10002");
  41.             Context context = new InitialContext(env);
  42.             
  43.             // Lookup a connection factory in the context
  44.             TopicConnectionFactory connFactory = (TopicConnectionFactory)context.lookup(FFMQConstants.JNDI_TOPIC_CONNECTION_FACTORY_NAME);
  45.             Topic topic = (Topic) context.lookup(topicName);
  46.             context.close();
  47.             
  48.             // Obtain a JMS connection from the factory
  49.             conn = connFactory.createTopicConnection("test","test");
  50.             conn.setClientID("SERVER");
  51.             
  52.             session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);            
  53.             subscriber = session.createDurableSubscriber(topic, "DB");            
  54.             System.out.println("INIT........." + subscriber);
  55.     }
  56.     
  57.     private void destory() {
  58.         try {
  59.             subscriber.close();
  60.             session.close();
  61.             conn.stop();
  62.             conn.close();
  63.             System.err.println(Thread.currentThread() + " EXIT........REC");
  64.         } catch (JMSException e) {
  65.             e.printStackTrace();
  66.         }
  67.     }
  68.     
  69.     @SuppressWarnings("static-access")
  70.     @Override
  71.     public void run() {
  72.         try {
  73.             init();
  74.             
  75.             subscriber.setMessageListener(new MessageListener() {
  76.                 @Override
  77.                 public void onMessage(Message m) {
  78.                     try {
  79.                         System.err.println(Thread.currentThread() + " DurableSubscriber receive: " + ((TextMessage) m).getText());
  80.                     } catch (JMSException e) {
  81.                         e.printStackTrace();
  82.                     }
  83.                 }
  84.             });
  85.     
  86.             conn.start();
  87.             
  88.             Thread.currentThread().sleep(10000);
  89.         } catch (Exception e) {
  90.             e.printStackTrace();
  91.         } finally {
  92.             destory();
  93.         }
  94.     
  95.     }
  96. }


阅读(1086) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~