Chinaunix首页 | 论坛 | 博客
  • 博客访问: 521768
  • 博文数量: 151
  • 博客积分: 7010
  • 博客等级: 少将
  • 技术积分: 1405
  • 用 户 组: 普通用户
  • 注册时间: 2008-04-22 14:32
文章分类

全部博文(151)

文章存档

2011年(1)

2010年(23)

2009年(1)

2008年(126)

我的朋友

分类: 系统运维

2010-01-29 13:09:56

Introducing JMS
JMS is the Java Messaging Service. It is a wrapper API that does not provide any
services directly, but instead serves to standardize the messaging functionality
provided by some other service provider, like IBM’s MQSeries (now WebSphere MQ)
or Sonic Software’s SonicMQ. Many application servers also provide their own JMS
server implementations. JMS provides a single API that can be used to access the
messaging facilities provided by any messaging-service provider, much as Java
Database Connectivity (JDBC) is used to access any relational database that provides
a JDBC driver.

JMS versus RMI
When a method call is made via RMI the caller blocks until the procedure
completes. Work is performed sequentially, which ensures that tasks are completed
in the specified order.
Message-oriented systems, in contrast, are asynchronous. They continue processing
as soon as the message is dispatched.

Message structure
The first element of JMS you need to understand is how messages are structured.
A message consists of the three following parts:
 1.Headers
   The message headers provide a fixed set of metadata fields describing the message,with information such as where the message is going and when it was received.

 2.Properties
   The properties are a set of key-value pairs used for application-specific purposes, usually to help filter messages quickly when they've been received.

 3.Body
   Finally, the body contains whatever data is being sent in the message. The contents of the body vary depending on the type of the message: The javax.jms.Message interface has several subinterfaces for different types of messages
 

 Message Type

 Message Contents

javax.jms.BytesMessage A stream of bytes. A number of convenience methods on the BytesMessage interface enable developers to deal with other primitive types or Strings,automatically turning these values into bytes.
javax.jms.MapMessage A set of key-value pairs. Unlike a java.util.Map object, MapMessage always uses Strings for the keys and some primitive type for the values.
javax.jms.ObjectMessage A serialized object instance. Note that the serialization mechanism will also automatically serialize any objects being referred to indirectly, so the “single object” may in fact be the root of a large graph of objects.
javax.jms.StreamMessage A stream of primitives. Very similar in function to BytesMessage.
javax.jms.TextMessage A String instance.
Most JMS providers also provide a vendor-specific XMLMessage interface that is
usually derived from TextMessage.

These are all interfaces, not classes. In JMS you don't create messages (or almost
any type of JMS object) directly—you use a factory class to create the instances
for you. This provides a layer of independence from the particular JMS implementation you’re using. Now that we've had a look at how messages are structured, let's have a look at how they're passed from message senders to message receivers.

Examining Messaging Models
For maximum compatibility with existing messaging servers, JMS supports two different messaging models: point-to-point (p2p) and publish-and-subscribe (pub/sub).
Point-to-point messaging
In the point-to-point model messages are sent from producers to consumers via
queues.A given queue may have multiple receivers but only one receiver may consume
each message.
Publish-and-subscribe messaging
In the publish-and-subscribe messaging model, messages are sent (published) to
consumers via topics.Messages published on a specific topic are sent to all message
consumers that have registered (subscribed) to receive messages on that topic.
Pub/sub supports a number of other features to make messaging more reliable.
A durable subscription is a special type of subscription that outlasts a consumer's connection to the messaging server. When a durable subscriber is disconnected from the JMS server the server will store all messages that would have gone to the subscriber and deliver them when the subscriber reconnects. This is also referred to as store-and-forward messaging. This type of behavior is essential for guaranteed messaging, with which a consumer can be ensured of receiving all messages regardless of application or network failures.

Understanding the Major JMS Components
Other than messages a number of classes exist that you’ll need to use in almost every JMS application. Separate interfaces are available for dealing with publish/subscribe systems and point-to-point systems, but because they have so many similarities base classes encapsulate common functionality. This section discusses the following components:
 1.Destinations
 2.Connections
 3.Connection factories
 4.Sessions
 5.Producers
 6.Consumers
Destinations
A destination is, as its name implies, somewhere you're sending a message.Specific
types of destinations are queues (in point-to-point systems) or topics (in publish/
subscribe systems). Destinations are normally configured in the messaging server
and are not directly instantiated in the application.Instead, you obtain them via a
JNDI lookup.

Connections
JMS Connections are similar to the Connection class in JDBC—it represents a connection between the application and the messaging server over which messages can
be sent.

Connection factories
As in JDBC, connections in JMS are not directly instantiated. Instead, a connection
factory creates connections. Where does the connection factory come from? From a JNDI lookup, like a destination. Connection factories and destinations are the only
types of objects in JMS that need to be obtained via JNDI.

Sessions
You don't send and receive messages directly through a connection. Instead, you
need a session. A session serves as a factory for message objects, message producers and consumers.Sessions are created using a Connection object.

Producers
Finally, having created a number of administrative objects (a ConnectionFactory,
a Connection, a Session, and a Destination) we can get to the point where
we’re able to actually create a message and send it somewhere. The javax.jms.
Message-Producer interface has two sub-interfaces: QueueSender and
TopicPublisher. You can use whichever interface you like but the QueueSender
and TopicPublisher interfaces don’t add any additional functionality to
MessageProducer. Topic-Publisher’s publish() methods do exactly the same
thing as the send() method in MessageProducer. MessageProducer instances
can be created using a Session.

Consumers
If you want to receive messages, use the session to create a MessageConsumer. The
interface javax.jms.MessageConsumer has two sub-interfaces, QueueReceiver
and TopicSubscriber. Messages can be received two different ways with a
MessageConsumer.

JMS and Threads
Some JMS objects are safe to share between threads, specifically connection factories and connections. Sessions are single-threaded objects and, as such, should not be shared among multiple threads. If you know that a given session object will not be used in more than one thread it’s safe to store it in some accessible location (like an instance variable). If the object may be shared among multiple threads (the way a servlet is), then create a new session each time the method that needs to dispatch a message is invoked. Because J2EE application
servers have multiple threads, all potentially executing the same methods, it's safer not to try to share session objects outside the scope of a single method.
MessageProducers and MessageConsumers, being tied to a specific session, are also not safe to share among threads

Configuring JMS
Much of the work in configuring a messaging-based application is done in the messaging server itself. Topics and queues are configured through the administrative interface of your MOM server, so consult your vendor’s documentation to learn how to do this.

At minimum you’ll need to configure a ConnectionFactory and a Topic for pub/sub applications or a Queue for p2p applications. These objects will be retrieved by the application via JNDI. The application will need to know the type and location of the naming service. These values can be hardcoded into the application or passed in at runtime, either via the command line or via a configuration file.


Connexia Airlines Point-to-Point Messaging Business Case
In a p2p system all messages are routed via queues. Multiple listeners may be receiving messages from the queue, but each message goes to one and only one listener. The example code here will be simplified with only one listener, but you would add additional listeners just as you added the first one. Note that deciding which queue gets which message is JMS server–specific. Some JMS servers loadbalance and try to distribute the messages equally across all consumers, but this
behavior is not defined by the JMS specification.

Setting up all the objects on the message-sending side is straightforward, but you must follow these steps:
  1. Obtain a JNDI InitialContext. You don't need to do this more than once,and for performance reasons it's best to do it only once and save the result for use later.
  2. Obtain the ConnectionFactory via a JNDI lookup. Again, JNDI lookups can be slow, so do this only once and cache the result.
  3. Obtain the destination Queue via a JNDI lookup.
  4. Use the ConnectionFactory to obtain a Connection.

Then follow these instructions in order to send a message:
  1. Use the Connection to obtain a Session.
  2. Use the Session to create a MessageProducer.
  3. Use the Session to create an appropriate message.
  4. Send the Message using the MessageProducer

For the Connexia Airlines example we'll create a MealService class with a request() method that will request a specified type of meal. We'll also create a no-argument version that will request a "regular" meal. This method will use the QueueConnection that was created during initialization to send a MapMessage to the catering company,
detailing the type of meal as well as the date and flight number for the given passenger. To test it we'll use a dummy MessageListener and a simple servlet.

Listing 9-1 contains the code to create the message.
========================================================================
Listing 9-1: MealService.java
import java.util.*;
import java.util.Queue;
import java.text.DateFormat;
import javax.jms.*;
import javax.naming.*;
public class MealService {
 public static final String REGULAR = "regular";
 public static final String LOW_SALT = "low salt";
 public static final String VEGETARIAN = "vegetarian";
 public static final String KOSHER = "kosher";
 public static final String HALAL = "halal";
 public static final String DIABETIC = "diabetic";
 private static MealService singleton = null;
 private Connection connection;
 private Queue mealOrderQ;
 private MealService() {
  try {
   // initialize appropriate JMS objects
   // obtain the JNDI InitialContext
   Hashtable env = new Hashtable();
   env.put(Context.INITIAL_CONTEXT_FACTORY,
     "com.swiftmq.jndi.InitialContextFactoryImpl");
   env
     .put(Context.PROVIDER_URL,
       "smqp://localhost:4001/timeout=10000");
   Context ctx = new InitialContext(env);
   // obtain the ConnectionFactory and
   // create the Connection
   ConnectionFactory cf = (ConnectionFactory) ctx
     .lookup("QueueConnectionFactory");
   connection = cf.createConnection();
   // obtain the destination Queue
   mealOrderQ = (Queue) ctx.lookup("connexia.MealOrderQueue");
  } catch (NamingException ne) {
   ne.printStackTrace();
  } catch (JMSException je) {
   je.printStackTrace();
  }
 }
 public static MealService getMealService() {
  if (singleton == null)
   singleton = new MealService();
  return singleton;
 }
 public void request(int flight_number, Date date) {
  request(REGULAR, flight_number, date);
 }
 public void request(String type, int flight_number, Date date) {
  MessageProducer qSender;
  Session session;
  try {
   // obtain a Session
   session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   // obtain a MessageProducer
   qSender = session.createProducer((Destination) mealOrderQ);
   // build the map message
   MapMessage msg = session.createMapMessage();
   msg.setString("mealType", type);
   msg.setInt("flightNumber", flight_number);
   msg.setString("date", DateFormat.getDateInstance().format(date));
   // set the message headers
   msg.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
   // send the message
   qSender.send(msg);
   qSender.close();
   session.close();
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }
 public void close() {
  try {
   connection.close();
   singleton = null;
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }
 public static void main(String args[]) {
  MealService service = MealService.getMealService();
  service.request(234, new Date());
  service.close();
  System.exit(0);
 }
}
===============================================================================
First of all, MealService is a singleton. The use of a singleton is unrelated to JMS,but it’s important to understand that only one MealService instance will be accessible from any point in the application. It’s worth noting that we are not required to make MealService a singleton—we simply want to avoid initializing multiple Connection objects, for performance reasons. Creating a separate MealService object for every request is certainly possible and will not affect our ability to send messages successfully.

The environment properties you pass in to the InitialContext will depend on
what JMS provider you’re using. The parameters specified here are the ones used
with SwiftMQ 4.5.1 from IIT Software ().

Once the MealService object has been initialized, sending a message in the
request() method is relatively simple. It may seem that you need to create a lot of
objects to perform a simple task, but each object plays a specific role in the application.

We set only one JMS message-header property here, JMSDeliveryMode. Valid values
for JMSDeliveryMode are DeliveryMode.PERSISTENT and DeliveryMode.
NON_PERSISTENT. An application marks a message as persistent if it feels that the
application will have problems if the message is lost in transit. If an occasional lost message is tolerable, mark the message as non-persistent. In our case we don’t want to lose any meal requests so we use the persistent delivery mode.

Once the message has been sent we close the sender and session and when we exit
the application we close the connection as well. Closing a connection also closes
any open sessions, producers, or consumers associated with that connection, as
well as deleting any temporary destinations. Any uncommitted transactions will be
rolled back when the connection is closed.

The application that receives and processes the messages is even simpler, at least
in terms of the message-handling part. Presumably some challenge remains in cooking
and delivering the meals to Connexia Airlines. Listing 9-2 shows the code for the
Caterer class.

Listing 9-2: Caterer.java
===============================================================================
import javax.naming.*;
import javax.jms.*;
import java.util.*;
import java.util.Queue;
public class Caterer implements MessageListener {
 private Caterer() {
  try {
   Hashtable env = new Hashtable();
   env.put(Context.INITIAL_CONTEXT_FACTORY,
     "com.swiftmq.jndi.InitialContextFactoryImpl");
   env
     .put(Context.PROVIDER_URL,
       "smqp://localhost:4001/timeout=10000");
   Context ctx = new InitialContext(env);
   ConnectionFactory cf = (ConnectionFactory) ctx
     .lookup("QueueConnectionFactory");
   Connection connection = cf.createConnection();
   Queue mealOrderQ = (Queue) ctx.lookup("connexia.MealOrderQueue");
   Session session = connection.createSession(false,
     Session.AUTO_ACKNOWLEDGE);
   MessageConsumer mc = session.createConsumer((Destination) mealOrderQ);
   mc.setMessageListener(this);
   connection.start();
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
 public void onMessage(Message msg) {
  try {
   MapMessage mmsg = (MapMessage) msg;
   System.out.println("Meal request for flight "
     + mmsg.getString("flightNumber") + " on date "
     + mmsg.getString("date"));
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
 public static void main(String args[]) {
  Caterer c = new Caterer();
 }
}
===============================================================================
The setup for this class is pretty much the same as it was for the MealService
class, except that a MessageConsumer is created instead of a MessageProducer.
Registering a messageListener isn’t enough—to begin receiving messages you
must also call Connection.start(). You can turn message delivery on and off by
calling start() and stop() on the Connection object. Stopping message delivery
has no effect on a connection’s ability to send messages.

To run the example, run java Caterer on the command line and then, in another
window, run java MealService. Try running multiple Caterer instances in different
windows—notice that any message produced is sent to only one Caterer. Now
let’s look at an example using pub/sub messaging.

Magazine-Publisher Publish-Subscribe Messaging Business Case
The setup of publish-subscribe messaging systems in JMS is almost identical to that
of point-to-point systems. Aside from terminology (topic instead of queue, for example) the major difference is that all messages published to a topic are broadcast to all subscribers. With queues, even if multiple receivers are connected to a single queue, only one of them will receive a message posted to the queue.

Because JMS 1.1 has unified the two messaging models virtually no difference exists
between code that posts a message to a queue and code that publishes a message
to a topic. In JMS 1.0.2b this was not the case—you were required to use either
QueueConnection or TopicConnection, QueueSession or TopicSession, and so
on. In JMS 1.1 the Connection, Session, MessageConsumer, and MessageProducer
interfaces handle both messaging models.

The application shown in Listing 9-3 sends messages to the publisher
Listing 9-3: BookOrder.java
===============================================================================
import javax.naming.*;
import javax.jms.*;
import java.util.Hashtable;
public class BookOrder {
 private String name;
 private String isbn;
 public BookOrder() {
  name = "default";
  isbn = "none";
 }
 public void setCustomer(String name) {
  this.name = name;
 }
 public void setBook(String isbn) {
  this.isbn = isbn;
 }
 public void dispatch() {
  try {
   Hashtable env = new Hashtable();
   env.put(Context.INITIAL_CONTEXT_FACTORY,
     "com.swiftmq.jndi.InitialContextFactoryImpl");
   env
     .put(Context.PROVIDER_URL,
       "smqp://localhost:4001/timeout=10000");
   Context ctx = new InitialContext(env);
   ConnectionFactory cf = (ConnectionFactory) ctx
     .lookup("TopicConnectionFactory");
   Connection connection = cf.createConnection();
   Topic bookOrderTopic = (Topic) ctx
     .lookup("publisher.BookOrdersTopic");
   Session session = connection.createSession(false,
     Session.AUTO_ACKNOWLEDGE);
   MessageProducer publisher = session.createProducer(bookOrderTopic);
   MapMessage msg = session.createMapMessage();
   msg.setString("customer", name);
   msg.setString("isbn", isbn);
   publisher.send(msg, DeliveryMode.PERSISTENT, 5, 600000);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}
===============================================================================
The OrderProcessor class shown in Listing 9-4 receives the messages created by
BookOrder class and does something with them. In this case it simply prints the
information to the user but in a real application it would perform some useful
processing.

Listing 9-4: OrderProcessor.java
===============================================================================
import javax.naming.*;
import javax.jms.*;
import java.util.Hashtable;
public class OrderProcessor implements MessageListener {
 String name;
 public OrderProcessor(String name) {
  this.name = name;
  try {
   Hashtable env = new Hashtable();
   env.put(Context.INITIAL_CONTEXT_FACTORY,
     "com.swiftmq.jndi.InitialContextFactoryImpl");
   env
     .put(Context.PROVIDER_URL,
       "smqp://localhost:4001/timeout=10000");
   Context ctx = new InitialContext(env);
   ConnectionFactory cf = (ConnectionFactory) ctx
     .lookup("TopicConnectionFactory");
   Connection connection = cf.createConnection();
   Topic bookOrderTopic = (Topic) ctx
     .lookup("publisher.BookOrdersTopic");
   Session session = connection.createSession(false,
     Session.AUTO_ACKNOWLEDGE);
   MessageConsumer subscriber = session.createConsumer(bookOrderTopic);
   subscriber.setMessageListener(this);
   connection.start();
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
 public void onMessage(Message msg) {
  try {
   MapMessage mmsg = (MapMessage) msg;
   System.out.println("Processor " + name + " received an order for "
     + mmsg.getString("isbn") + " from "
     + mmsg.getString("customer"));
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
 public static void main(String args[]) {
  OrderProcessor accounting = new OrderProcessor("accounting");
  OrderProcessor shipping = new OrderProcessor("shipping");
  OrderProcessor sales_tracking = new OrderProcessor("sales_tracking");
  try {
   Thread.sleep(1000);
  } catch (InterruptedException e) {
  }
  BookOrder order = new BookOrder();
  order.setBook("0-7645-3966-3");
  order.setCustomer("ehenry");
  order.dispatch();
  order.setCustomer("fgeary");
  order.dispatch();
  try {
   Thread.sleep(1000);
  } catch (InterruptedException e) {
  }
  System.exit(0);
 }
}
===============================================================================
Observe that in the main() method for OrderProcessor, which actually tests message
distribution, multiple OrderProcessor instances are created. When you run
this code you should see one message received and processed three times. Perhaps
this does not seem very impressive in this small sample application, but JMS provides transparent distribution and scalability making it trivial to scale this example up into an enterprise-grade order-processing system.

In BookOrder.dispatch() note that the MessageProducer.send() method
specifies three parameters in addition to the destination. The first is the delivery mode. In the point-to-point example this value was set on the message by means of calling setJMSDeliveryMode(). This value is set by the MessageProducer (via MessageProducer.setDeliveryMode()) unless overridden in the message header
or in the call to send(). The second parameter is the message priority—again, this
value is taken from the MessageProducer unless it is overridden in the message
header or in the call to send().

Finally, a time-to-live (TTL) value in milliseconds is specified. In this case, a value of 600000 represents a 10-minute time to live. Be aware that message expiration is often calculated by means of taking the current system time when the message is dispatched and adding the time-to-live value; therefore, if the clocks on the message producer and the messaging server are not synchronized messages may be prematurely expired. You can get around this by ensuring that the systems involved in message processing have synchronized clocks or by increasing the time-to-live value to ensure that messages are not prematurely removed. Now we’ve seen the basics of how to create, send and receive messages, let’s look at extending the basic messaging model to help ensure reliability.
阅读(1855) | 评论(0) | 转发(0) |
0

上一篇:Eclipse中使用JUnit4进行单元测试(转载)

下一篇:没有了

给主人留下些什么吧!~~