之前开发的一个小项目(实时数据采集)中,采用了自定义报文的Socket通信方式(Netty),对通信异常时的数据处理方式不是很完善,容易造成数据丢失,
自己动手解决数据传递的可靠性与稳定性,编程还是比较麻烦,于是想到了在应用中嵌入小型JMS服务器的方式,以简化编程。
经过几个开源项目比较,最终发现合适的项目FFMQ:,项目大小才600KB,支持JMS1.1规范。
以下代码仅用于供测试参考,不具备生产环境下的严谨,具体FFMQ配置请看说明文档(下载包中有)。
-
import java.io.FileInputStream;
-
import java.util.Properties;
-
-
import net.timewalker.ffmq3.listeners.ClientListener;
-
import net.timewalker.ffmq3.listeners.tcp.io.TcpListener;
-
import net.timewalker.ffmq3.local.FFMQEngine;
-
import net.timewalker.ffmq3.management.destination.definition.QueueDefinition;
-
import net.timewalker.ffmq3.management.destination.definition.TopicDefinition;
-
import net.timewalker.ffmq3.utils.Settings;
-
-
/**
-
* Embedded FFMQ sample
-
*/
-
public class EmbeddedFFMQSample implements Runnable
-
{
-
private FFMQEngine engine;
-
-
public void run()
-
{
-
try
-
{
-
// Create engine settings
-
Settings settings = createEngineSettings();
-
-
// Create the engine itself
-
engine = new FFMQEngine("myLocalEngineName", settings);
-
// -> myLocalEngineName will be the engine name.
-
// - It should be unique in a given JVM
-
// - This is the name to be used by local clients to establish
-
// an internal JVM connection (high performance)
-
// Use the following URL for clients : vm://myLocalEngineName
-
//
-
-
// Deploy the engine
-
System.out.println("Deploying engine : "+engine.getName());
-
engine.deploy();
-
// - The FFMQ engine is not functional until deployed.
-
// - The deploy operation re-activates all persistent queues
-
// and recovers them if the engine was not properly closed.
-
// (May take some time for large queues)
-
-
// Adding a TCP based client listener
-
System.out.println("Starting listener ...");
-
ClientListener tcpListener = new TcpListener(engine,"0.0.0.0",10002,settings,null);
-
tcpListener.start();
-
-
// This is how you can programmatically define a new queue
-
if (!engine.getDestinationDefinitionProvider().hasQueueDefinition("foo1"))
-
{
-
QueueDefinition queueDef = new QueueDefinition(settings);
-
queueDef.setName("foo2");
-
queueDef.setMaxNonPersistentMessages(0);
-
queueDef.setOverflowToPersistent(false);
-
queueDef.setPreAllocateFiles(true);
-
queueDef.setTemporary(false);
-
queueDef.setUseJournal(true);
-
queueDef.setAutoExtendAmount(128);
-
queueDef.setInitialBlockCount(32);
-
queueDef.setMaxBlockCount(1024);
-
queueDef.check();
-
engine.createQueue(queueDef);
-
}
-
-
// You could also define a queue using some java Properties
-
if (!engine.getDestinationDefinitionProvider().hasQueueDefinition("foo2"))
-
{
-
Properties queueProps = new Properties();
-
queueProps.put("name", "foo2");
-
queueProps.put("persistentStore.useJournal", "false");
-
queueProps.put("memoryStore.maxMessages", "1000");
-
QueueDefinition queueDef2 = new QueueDefinition(new Settings(queueProps));
-
engine.createQueue(queueDef2);
-
}
-
-
if(!engine.getDestinationDefinitionProvider().hasTopicDefinition("foox")) {
-
TopicDefinition topicDef = new TopicDefinition(settings);
-
topicDef.setName("foox");
-
topicDef.setMaxNonPersistentMessages(0);
-
topicDef.setOverflowToPersistent(false);
-
topicDef.setPreAllocateFiles(true);
-
topicDef.setTemporary(false);
-
topicDef.setUseJournal(true);
-
topicDef.check();
-
engine.createTopic(topicDef);
-
}
-
-
// Run for some time
-
System.out.println("Running ...");
-
Thread.sleep(60*1000);
-
-
// Stopping the listener
-
System.out.println("Stopping listener ...");
-
tcpListener.stop();
-
-
// Undeploy the engine
-
System.out.println("Undeploying engine ...");
-
engine.undeploy();
-
// - It is important to properly shutdown the engine
-
// before stopping the JVM to make sure current transactions
-
// are nicely completed and storages properly closed.
-
-
System.out.println("Done.");
-
}
-
catch (Exception e)
-
{
-
// Oops
-
e.printStackTrace();
-
}
-
}
-
-
private Settings createEngineSettings()
-
{
-
// Various ways of creating engine settings
-
-
// 1 - From a properties file
-
Properties externalProperties = new Properties();
-
try
-
{
-
FileInputStream in = new FileInputStream("D:\\ffmq3-distribution-3.0.5-dist\\conf\\ffmq-server.properties");
-
externalProperties.load(in);
-
in.close();
-
}
-
catch (Exception e)
-
{
-
throw new RuntimeException("Cannot load external properties",e);
-
}
-
Settings settings = new Settings(externalProperties);
-
-
// 2 - Explicit Java code
-
// Settings settings = new Settings();
-
//
-
// settings.setStringProperty(FFMQCoreSettings.DESTINATION_DEFINITIONS_DIR, ".");
-
// settings.setStringProperty(FFMQCoreSettings.BRIDGE_DEFINITIONS_DIR, ".");
-
// settings.setStringProperty(FFMQCoreSettings.TEMPLATES_DIR, ".");
-
// settings.setStringProperty(FFMQCoreSettings.DEFAULT_DATA_DIR, ".");
-
// ...
-
-
return settings;
-
}
-
-
public static void main(String[] args)
-
{
-
System.setProperty("FFMQ_BASE", "D:\\ffmq3-distribution-3.0.5-dist");
-
-
new EmbeddedFFMQSample().run();
-
}
-
}
模拟发送:
-
import java.util.Hashtable;
-
import java.util.Random;
-
-
import javax.jms.Connection;
-
import javax.jms.ConnectionFactory;
-
import javax.jms.Destination;
-
import javax.jms.JMSException;
-
import javax.jms.MessageProducer;
-
import javax.jms.Queue;
-
import javax.jms.Session;
-
import javax.jms.TextMessage;
-
import javax.naming.Context;
-
import javax.naming.InitialContext;
-
import javax.naming.NamingException;
-
-
import net.timewalker.ffmq3.FFMQConstants;
-
-
public class Sender implements Runnable {
-
-
public static void main(String[] args) throws Exception {
-
new Thread(new Sender("queue/foo1", "1")).start();
-
new Thread(new Sender("queue/foo2", "2")).start();
-
Thread.sleep(10000);
-
run = false;
-
Thread.sleep(1000);
-
}
-
-
private static volatile boolean run = true;
-
private String queueName;
-
private String qtmId;
-
-
private Sender(String queueName, String qtmId) {
-
super();
-
this.queueName = queueName;
-
this.qtmId = qtmId;
-
}
-
-
@Override
-
public void run() {
-
try {
-
// Create and initialize a JNDI context
-
Hashtable<String,String> env = new Hashtable<>();
-
env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY);
-
env.put(Context.PROVIDER_URL, "tcp://localhost:10002");
-
Context context = new InitialContext(env);
-
-
// Lookup a connection factory in the context
-
ConnectionFactory connFactory = (ConnectionFactory)context.lookup(FFMQConstants.JNDI_CONNECTION_FACTORY_NAME);
-
-
// Obtain a JMS connection from the factory
-
Connection conn = connFactory.createConnection("test","test");
-
conn.start();
-
-
Destination dest1=(Queue) context.lookup(queueName);
-
-
Session session=conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
-
-
Random rnd = new Random(System.currentTimeMillis());
-
long ms = (long)rnd.nextFloat() * 10 * 1000;
-
if(ms > 8000) {
-
ms /= 2;
-
} else if(ms < 1000) {
-
ms = 1500;
-
}
-
-
int i = 1;
-
-
MessageProducer p = session.createProducer(dest1);
-
while (run) {
-
TextMessage msg = session.createTextMessage();
-
String t = "[" + qtmId + "] Hello " + queueName + " " + i++;
-
System.out.println("sended..." + t);
-
msg.setStringProperty("QTMID", qtmId);
-
msg.setText(t);
-
p.send(msg);
-
Thread.sleep(ms);
-
}
-
p.close();
-
session.close();
-
-
conn.close();
-
context.close();
-
} catch (NamingException e) {
-
e.printStackTrace();
-
} catch (JMSException e) {
-
e.printStackTrace();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
-
}
-
-
}
模拟接收:
-
import java.util.Hashtable;
-
-
import javax.jms.Connection;
-
import javax.jms.ConnectionFactory;
-
import javax.jms.Destination;
-
import javax.jms.JMSException;
-
import javax.jms.Message;
-
import javax.jms.MessageConsumer;
-
import javax.jms.Queue;
-
import javax.jms.Session;
-
import javax.jms.TextMessage;
-
import javax.naming.Context;
-
import javax.naming.InitialContext;
-
-
import net.timewalker.ffmq3.FFMQConstants;
-
-
-
public class Receiver implements Runnable {
-
-
private static volatile boolean run = true;
-
-
public static void main(String[] args) throws Exception {
-
new Thread(new Receiver()).start();
-
Thread.sleep(10000);
-
run = false;
-
Thread.sleep(2000);
-
}
-
-
private Connection conn;
-
private Session session;
-
private MessageConsumer consumer;
-
-
private void init() throws Exception {
-
// Create and initialize a JNDI context
-
Hashtable<String,String> env = new Hashtable<>();
-
env.put(Context.INITIAL_CONTEXT_FACTORY, FFMQConstants.JNDI_CONTEXT_FACTORY);
-
env.put(Context.PROVIDER_URL, "tcp://localhost:10002");
-
Context context = new InitialContext(env);
-
-
// Lookup a connection factory in the context
-
ConnectionFactory connFactory = (ConnectionFactory)context.lookup(FFMQConstants.JNDI_CONNECTION_FACTORY_NAME);
-
Destination dest1=(Queue) context.lookup("queue/foo2");
-
context.close();
-
-
-
// Obtain a JMS connection from the factory
-
conn = connFactory.createConnection("test", "test");
-
-
conn.start();
-
-
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
consumer = session.createConsumer(dest1);
-
-
System.err.println("INIT.........");
-
}
-
-
private void destory() {
-
try {
-
consumer.close();
-
session.close();
-
conn.stop();
-
conn.close();
-
System.err.println("EXIT........REC");
-
} catch (JMSException e) {
-
e.printStackTrace();
-
}
-
}
-
-
public void run() {
-
try {
-
init();
-
-
// consumer.setMessageListener(new MessageListener() {
-
// @Override
-
// public void onMessage(Message m) {
-
// try {
-
// System.err.println("receive: " + ((TextMessage) m).getText());
-
// } catch (JMSException e) {
-
// e.printStackTrace();
-
// }
-
// }
-
// });
-
while(run) {
-
// Thread.sleep(500);
-
Message m = consumer.receive(500);
-
if(m != null) {
-
System.err.println("receive: " + ((TextMessage) m).getText());
-
}
-
}
-
-
} catch (Exception e) {
-
e.printStackTrace();
-
} finally {
-
destory();
-
}
-
-
}
-
-
}
主题订阅:
持久订阅:
阅读(1086) | 评论(0) | 转发(0) |