消息驱动bean是一种通过消息方式为外界提供服务的组件。mdb和它的使用者之间是一种松散耦合的关系:
由于mdb组件在使用的过程中,其客户并不拥有mdb组件的远程引用,而是直接将消息发送到特定的队列中,因而用户的调用并不强求mdb组件的运行,在mdb组件没有运行的情况下,客户发送的请求数据会临时保存到消息队列中,当mdb组件加载后,消息队列中的调用消息才能得到处理。java ee规范并没有规定mdb将处理结果返回给消息调用者的方式,但是通常情况下mdb组件是将处理玩的结果以消息方式发送到特定结果队列中,如果客户程序没有运行,结果会临时存放在消息队列中,直到客户程序启动,结果队列中的消息才能得到处理。
mdb可以和两种消息队列关联,如果和Topic型的队列关联,则客户发送到队列中的消息。会依次广播给所有的mdb组件,所有的mdb组件会得到相同的消息。
mdb组件和具体的业务逻辑无关,从特定的消息队列中获得消息,消息内容要在mdb的onMessage方法中处理,具体的而业务逻辑通常委托给其他的ejb组件。
mdb组件属性:
1.队列类型属性:
@MessageDriver(
activationConfig={
ActivationConfigProperty(
propertyName="destinationType",
propertyValue="javax.jms.Queue")
})
2.队列名称属性:
ActivationConfigProperty(
propertyName="destination",
propertyValue="queue/bitsqueue")
3.队列消息确认模式属性:
ActivationConfigProperty(
propertyName="acknowledgeMode",
propertyValue="auto-acknowledge")
4.mdb组件订阅持续性属性:
持久性订阅(Durable)和非持久性订阅(NonDurable)
基于Durable型的组件必须设定mdb组件的topic消息订阅标示mdb-subscription-id否则就会产生异常!
ActivationConfigProperty(
propertyName="subscriptionDurability",
propertyValue="Durable")
5.mdb队列消息过滤属性
ActivationConfigProperty(
propertyName="messageSelector",
propertyValue="myversion=1 and messageSelector='bit service'")
基于标注的mdb组件设计:
@MessageDriven(mappedName = "jms/mymdb", activationConfig = { @ActivationConfigProperty(propertyName="destination",propertyValue="queue/birqueue"),
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue") })
public class mymdb implements MessageListener {
@Override
public void onMessage(Message m) {
// TODO Auto-generated method stub
TextMessage tmsg=(TextMessage)m;
try{
System.out.println(tmsg.getText());
System.out.println("成功....");
}catch(Exception e){
System.out.println(e);
}
}
}
客户端:
public class send {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
try{
Context ctx=null;
MessageProducer sender;
TextMessage msg;
String url="jnp://localhost:1099";
String purl="org.jboss.naming:org.jnp.interfaces";
String fcy="org.jnp.interfaces.NamingContextFactory";
Properties p=new Properties();
p.put(Context.INITIAL_CONTEXT_FACTORY,fcy);
p.put(Context.PROVIDER_URL,url);
p.put(Context.URL_PKG_PREFIXES,purl);
ctx=new InitialContext(p);
ConnectionFactory qConFactory=(ConnectionFactory)ctx.lookup("ConnectionFactory");
Connection qCon=qConFactory.createConnection();
Session session=qCon.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue messageQueue=(Queue)ctx.lookup("queue/birqueue");
sender=session.createProducer(messageQueue);
msg=session.createTextMessage();
msg.setText("hello wuxiaoxiao");
sender.send(msg);
qCon.close();
//System.out.println("发送成功........");
}catch(Exception e){
System.out.println(e);
}
}
}
基于配置文件:把ejb组件的标注去掉:
xmlns=""
xmlns:xsi=""
xsi:schemaLocation="
/ejb-jar_3_0.xsd"
version="3.0">
myconfgejb
bits.mymdb
javax.jms.MessageListener
Container
javax.jms.Queue
destinationType
javax.jms.Queue
destination
queue/bitsqueue4
Topic型mdb组件设计:
public class mymdb implements MessageListener {
@Override
public void onMessage(Message m) {
// TODO Auto-generated method stub
TextMessage tmsg=(TextMessage)m;
try{
System.out.println(tmsg.getText());
System.out.println("成功....");
}catch(Exception e){
System.out.println(e);
}
}
}
配置文件:
xmlns=""
xmlns:xsi=""
xsi:schemaLocation="
/ejb-jar_3_0.xsd"
version="3.0">
myconfgejb
bits.mymdb
javax.jms.MessageListener
Container
javax.jms.Topic
destinationType
javax.jms.Topic
destination
topic/bitssub1
subscriptionDurability
Durable
一定要有jboss.xml配置文件:
myconfgejb
zhang
客户端代码:
public class send
/*06*/{
/*07*/ public static void main(String args[]) throws Exception
/*08*/ {
/*09*/ MessageProducer sender;
/*10*/ TextMessage msg;
/*11*/ Context ctx=null;
/*12*/ Properties p = new Properties();
/*13*/ FileInputStream f=new FileInputStream("jndi.properties");
/*14*/ p.load(f);
/*15*/ ctx = new InitialContext(p);
/*16*/ ConnectionFactory qConFactory
/*17*/ = (ConnectionFactory)ctx.lookup("ConnectionFactory");
/*18*/ Connection qCon = qConFactory.createConnection();
/*19*/ Session session
/*20*/ = qCon.createSession(false,Session.AUTO_ACKNOWLEDGE);
/*21*/ Topic messageQueue = (Topic)ctx.lookup("topic/bitssub1");
/*22*/ sender = session.createProducer(messageQueue);
/*23*/ msg = session.createTextMessage();
/*24*/ msg.setText("Hello");
/*25*/ sender.send(msg);
/*26*/ qCon.close();
/*27*/ }
/*28*/}
mdb负载均衡器设计:
原理:将所有的用户请求都发送到一个初始队列中(queue),队列中的请求按照客户请求的顺序一次排列在队列中,通过一个mdb组件称作是负载均衡器,把消息平均分配到俩个不同的队列中(queue1,queue2)
mdb均衡器的代码:
@MessageDriven(
/*08*/ activationConfig={
/*09*/ @ActivationConfigProperty(
/*10*/ propertyName="destinationType",
/*11*/ propertyValue="javax.jms.Queue"),
/*12*/ @ActivationConfigProperty(
/*13*/ propertyName="destination",
/*14*/ propertyValue="queue/bitsinit")
/*15*/ })
/*16*/public class balancer implements MessageListener
/*17*/{
/*18*/ @Resource(mappedName="java:/JmsXA")
/*19*/ private ConnectionFactory connectionFactory;
/*20*/ public void onMessage(Message msg)
/*21*/ { try
/*22*/ {
/*23*/ Context ctx=new InitialContext();
/*24*/ Queue queue1 = (Queue)ctx.lookup ("queue/bitsop1");
/*25*/ Queue queue2 = (Queue)ctx.lookup ("queue/bitsop2");
/*26*/ Connection connect = connectionFactory.createConnection();
/*27*/ Session session = connect.createSession(false,Session.AUTO_ACKNOWLEDGE);
/*28*/ Random rd=new Random();
/*29*/ Queue currentQueue=queue1;
/*30*/ if(rd.nextDouble()>=0.5)
/*31*/ {
/*32*/ currentQueue=queue1;
/*33*/ }
/*34*/ else
/*35*/ {
/*36*/ currentQueue=queue2;
/*37*/ }
/*38*/ MessageProducer sender = session.createProducer(currentQueue);
/*39*/ sender.send(msg);
/*40*/ connect.close();
/*41*/ }
/*42*/ catch(Exception ex)
/*43*/ {
/*44*/ System.out.println(ex);
/*45*/ throw new EJBException(ex);
/*46*/ }
/*47*/ }
/*48*/}
mdb组件1:
@MessageDriven(
/*07*/ activationConfig={
/*08*/ @ActivationConfigProperty(
/*09*/ propertyName="destinationType",
/*10*/ propertyValue="javax.jms.Queue"),
/*11*/ @ActivationConfigProperty(
/*12*/ propertyName="destination",
/*13*/ propertyValue="queue/bitsop1")
/*14*/ })
/*15*/public class mdbop1 implements MessageListener
/*16*/{
/*17*/ public void onMessage(Message msg)
/*18*/ {
/*19*/ TextMessage tmsg = (TextMessage) msg;
/*20*/ try
/*21*/ {
/*22*/ System.out.println("op1:"+tmsg.getText());
/*23*/ }
/*24*/ catch(Exception ex)
/*25*/ {
/*26*/ ex.printStackTrace();
/*27*/ }
/*28*/ }
/*29*/}
mdb组件2:
@MessageDriven(
/*07*/ activationConfig={
/*08*/ @ActivationConfigProperty(
/*09*/ propertyName="destinationType",
/*10*/ propertyValue="javax.jms.Queue"),
/*11*/ @ActivationConfigProperty(
/*12*/ propertyName="destination",
/*13*/ propertyValue="queue/bitsop2")
/*14*/ })
/*15*/public class mdbop2 implements MessageListener
/*16*/{
/*17*/ public void onMessage(Message msg)
/*18*/ {
/*19*/ TextMessage tmsg = (TextMessage) msg;
/*20*/ try
/*21*/ {
/*22*/ System.out.println("op2:"+tmsg.getText());
/*23*/ }
/*24*/ catch(Exception ex)
/*25*/ {
/*26*/ ex.printStackTrace();
/*27*/ }
/*28*/ }
/*29*/}
客户端:
public class send
/*06*/{
/*07*/ public static void main(String args[]) throws Exception
/*08*/ {
/*09*/ MessageProducer sender;
/*10*/ TextMessage msg;
/*11*/ Context ctx=null;
/*12*/ Properties p = new Properties();
/*13*/ FileInputStream f=new FileInputStream("jndi.properties");
/*14*/ p.load(f);
/*15*/ ctx = new InitialContext(p);
/*16*/ ConnectionFactory qConFactory
/*17*/ = (ConnectionFactory)ctx.lookup("ConnectionFactory");
/*18*/ Connection qCon = qConFactory.createConnection();
/*19*/ Session session
/*20*/ = qCon.createSession(false,Session.AUTO_ACKNOWLEDGE);
/*21*/ Queue messageQueue = (Queue)ctx.lookup("queue/bitsinit");
/*22*/ sender = session.createProducer(messageQueue);
/*23*/ for(int i=0;i<10;i++)
/*24*/ {
/*25*/ msg = session.createTextMessage();
/*26*/ msg.setText("Hello");
/*27*/ sender.send(msg);
/*28*/ }
/*29*/ qCon.close();
/*30*/ }
/*31*/}
至此消息驱动bean基础只是已经了解!