Chinaunix首页 | 论坛 | 博客
  • 博客访问: 719529
  • 博文数量: 147
  • 博客积分: 6010
  • 博客等级: 准将
  • 技术积分: 1725
  • 用 户 组: 普通用户
  • 注册时间: 2008-08-22 10:36
文章分类

全部博文(147)

文章存档

2011年(1)

2010年(1)

2009年(35)

2008年(110)

我的朋友

分类: Java

2008-09-13 21:11:24

消息驱动bean是一种通过消息方式为外界提供服务的组件。mdb和它的使用者之间是一种松散耦合的关系:
由于mdb组件在使用的过程中,其客户并不拥有mdb组件的远程引用,而是直接将消息发送到特定的队列中,因而用户的调用并不强求mdb组件的运行,在mdb组件没有运行的情况下,客户发送的请求数据会临时保存到消息队列中,当mdb组件加载后,消息队列中的调用消息才能得到处理。java ee规范并没有规定mdb将处理结果返回给消息调用者的方式,但是通常情况下mdb组件是将处理玩的结果以消息方式发送到特定结果队列中,如果客户程序没有运行,结果会临时存放在消息队列中,直到客户程序启动,结果队列中的消息才能得到处理。
        mdb可以和两种消息队列关联,如果和Topic型的队列关联,则客户发送到队列中的消息。会依次广播给所有的mdb组件,所有的mdb组件会得到相同的消息。
       mdb组件和具体的业务逻辑无关,从特定的消息队列中获得消息,消息内容要在mdb的onMessage方法中处理,具体的而业务逻辑通常委托给其他的ejb组件。
       mdb组件属性:
1.队列类型属性:
  @MessageDriver(
    activationConfig={
ActivationConfigProperty(
propertyName="destinationType",
propertyValue="javax.jms.Queue")
})
2.队列名称属性:
ActivationConfigProperty(
propertyName="destination",
propertyValue="queue/bitsqueue")
3.队列消息确认模式属性:
ActivationConfigProperty(
propertyName="acknowledgeMode",
propertyValue="auto-acknowledge")
4.mdb组件订阅持续性属性:
 持久性订阅(Durable)和非持久性订阅(NonDurable)
基于Durable型的组件必须设定mdb组件的topic消息订阅标示mdb-subscription-id否则就会产生异常!
ActivationConfigProperty(
propertyName="subscriptionDurability",
propertyValue="Durable")
5.mdb队列消息过滤属性
ActivationConfigProperty(
propertyName="messageSelector",
propertyValue="myversion=1 and messageSelector='bit service'")
基于标注的mdb组件设计:
  @MessageDriven(mappedName = "jms/mymdb", activationConfig = {        @ActivationConfigProperty(propertyName="destination",propertyValue="queue/birqueue"),
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue") })
public class mymdb implements MessageListener {
    @Override
    public void onMessage(Message m) {
        // TODO Auto-generated method stub
TextMessage tmsg=(TextMessage)m;
try{
    System.out.println(tmsg.getText());
    System.out.println("成功....");
}catch(Exception e){
    System.out.println(e);
}
    }
}
客户端:
public class send {
    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        try{
            Context ctx=null;
            MessageProducer sender;
            TextMessage msg;
            String url="jnp://localhost:1099";
            String purl="org.jboss.naming:org.jnp.interfaces";
            String fcy="org.jnp.interfaces.NamingContextFactory";
            Properties p=new Properties();
            p.put(Context.INITIAL_CONTEXT_FACTORY,fcy);
            p.put(Context.PROVIDER_URL,url);
            p.put(Context.URL_PKG_PREFIXES,purl);
            ctx=new InitialContext(p);
            ConnectionFactory qConFactory=(ConnectionFactory)ctx.lookup("ConnectionFactory");
            Connection qCon=qConFactory.createConnection();
            Session session=qCon.createSession(false,Session.AUTO_ACKNOWLEDGE);
            Queue messageQueue=(Queue)ctx.lookup("queue/birqueue");
            sender=session.createProducer(messageQueue);
            msg=session.createTextMessage();
            msg.setText("hello wuxiaoxiao");
            sender.send(msg);
            qCon.close();
            //System.out.println("发送成功........");
            }catch(Exception e){
            System.out.println(e);
            }
    }
}
基于配置文件:把ejb组件的标注去掉:
       xmlns=""
       xmlns:xsi=""
       xsi:schemaLocation="
                           /ejb-jar_3_0.xsd"
       version="3.0">
  
       
            myconfgejb
            bits.mymdb
            javax.jms.MessageListener
            Container
           
                javax.jms.Queue
           

           
               
                   destinationType
                  

                   javax.jms.Queue
                  
                  
               

               
                   destination
                  

                   queue/bitsqueue4
                  
                  
               

           

       

  


Topic型mdb组件设计:
public class mymdb implements MessageListener {
    @Override
    public void onMessage(Message m) {
        // TODO Auto-generated method stub
TextMessage tmsg=(TextMessage)m;
try{
    System.out.println(tmsg.getText());
    System.out.println("成功....");
}catch(Exception e){
    System.out.println(e);
}
    }
}
配置文件:
       xmlns=""
       xmlns:xsi=""
       xsi:schemaLocation="
                           /ejb-jar_3_0.xsd"
       version="3.0">
  
       
            myconfgejb
            bits.mymdb
            javax.jms.MessageListener
            Container
           
                javax.jms.Topic
           

           
               
                   destinationType
                  

                   javax.jms.Topic
                  
                  
               

               
                   destination
                  

                   topic/bitssub1
                  
                  
               

             
                   subscriptionDurability
                  

                   Durable
                  
                  
               

           

       

  


一定要有jboss.xml配置文件:


   
      
            myconfgejb
            zhang           
       

   


客户端代码:
public class send
/*06*/{
/*07*/    public static void main(String args[]) throws Exception
/*08*/    {
/*09*/          MessageProducer sender;
/*10*/                TextMessage msg;
/*11*/               Context ctx=null;   
/*12*/            Properties p = new Properties();
/*13*/            FileInputStream f=new FileInputStream("jndi.properties");
/*14*/            p.load(f);
/*15*/            ctx = new InitialContext(p);
/*16*/                ConnectionFactory qConFactory
/*17*/                    = (ConnectionFactory)ctx.lookup("ConnectionFactory");        
/*18*/                Connection qCon = qConFactory.createConnection();
/*19*/                Session session
/*20*/                    = qCon.createSession(false,Session.AUTO_ACKNOWLEDGE);
/*21*/                Topic messageQueue = (Topic)ctx.lookup("topic/bitssub1");   
/*22*/                sender = session.createProducer(messageQueue);
/*23*/                msg = session.createTextMessage();
/*24*/                msg.setText("Hello");
/*25*/                sender.send(msg);
/*26*/                qCon.close();
/*27*/     }
/*28*/}

mdb负载均衡器设计:
原理:将所有的用户请求都发送到一个初始队列中(queue),队列中的请求按照客户请求的顺序一次排列在队列中,通过一个mdb组件称作是负载均衡器,把消息平均分配到俩个不同的队列中(queue1,queue2)
mdb均衡器的代码:
@MessageDriven(
/*08*/   activationConfig={
/*09*/    @ActivationConfigProperty(
/*10*/        propertyName="destinationType",
/*11*/      propertyValue="javax.jms.Queue"),
/*12*/    @ActivationConfigProperty(
/*13*/        propertyName="destination",
/*14*/      propertyValue="queue/bitsinit")
/*15*/   })
/*16*/public class balancer implements MessageListener
/*17*/{
/*18*/    @Resource(mappedName="java:/JmsXA")
/*19*/  private ConnectionFactory connectionFactory; 
/*20*/     public void onMessage(Message msg)
/*21*/  { try
/*22*/      {                   
/*23*/            Context ctx=new InitialContext();
/*24*/              Queue queue1 = (Queue)ctx.lookup ("queue/bitsop1");
/*25*/              Queue queue2 = (Queue)ctx.lookup ("queue/bitsop2");
/*26*/            Connection connect = connectionFactory.createConnection();
/*27*/            Session session = connect.createSession(false,Session.AUTO_ACKNOWLEDGE);
/*28*/            Random rd=new Random();
/*29*/            Queue currentQueue=queue1;
/*30*/            if(rd.nextDouble()>=0.5)
/*31*/            {
/*32*/                 currentQueue=queue1;
/*33*/            }
/*34*/            else
/*35*/            {
/*36*/                 currentQueue=queue2;
/*37*/            }
/*38*/            MessageProducer sender = session.createProducer(currentQueue);
/*39*/            sender.send(msg);
/*40*/            connect.close();
/*41*/   }
/*42*/   catch(Exception ex)
/*43*/   {
/*44*/       System.out.println(ex);
/*45*/          throw new EJBException(ex);
/*46*/   }
/*47*/ }
/*48*/}
mdb组件1:
@MessageDriven(
/*07*/   activationConfig={
/*08*/    @ActivationConfigProperty(
/*09*/        propertyName="destinationType",
/*10*/      propertyValue="javax.jms.Queue"),
/*11*/    @ActivationConfigProperty(
/*12*/        propertyName="destination",
/*13*/      propertyValue="queue/bitsop1")
/*14*/   })
/*15*/public class mdbop1 implements MessageListener
/*16*/{
/*17*/     public void onMessage(Message msg)
/*18*/  {
/*19*/      TextMessage tmsg = (TextMessage) msg;
/*20*/      try
/*21*/      {
/*22*/          System.out.println("op1:"+tmsg.getText()); 
/*23*/    }
/*24*/    catch(Exception ex)
/*25*/    {
/*26*/        ex.printStackTrace();
/*27*/    }
/*28*/  }
/*29*/}
mdb组件2:
@MessageDriven(
/*07*/   activationConfig={
/*08*/    @ActivationConfigProperty(
/*09*/        propertyName="destinationType",
/*10*/      propertyValue="javax.jms.Queue"),
/*11*/    @ActivationConfigProperty(
/*12*/        propertyName="destination",
/*13*/      propertyValue="queue/bitsop2")
/*14*/   })
/*15*/public class mdbop2 implements MessageListener
/*16*/{
/*17*/     public void onMessage(Message msg)
/*18*/  {
/*19*/      TextMessage tmsg = (TextMessage) msg;
/*20*/      try
/*21*/      {
/*22*/          System.out.println("op2:"+tmsg.getText()); 
/*23*/    }
/*24*/    catch(Exception ex)
/*25*/    {
/*26*/        ex.printStackTrace();
/*27*/    }
/*28*/  }
/*29*/}
客户端:
public class send
/*06*/{
/*07*/    public static void main(String args[]) throws Exception
/*08*/    {
/*09*/          MessageProducer sender;
/*10*/                TextMessage msg;
/*11*/               Context ctx=null;   
/*12*/            Properties p = new Properties();
/*13*/            FileInputStream f=new FileInputStream("jndi.properties");
/*14*/            p.load(f);
/*15*/            ctx = new InitialContext(p);
/*16*/                ConnectionFactory qConFactory
/*17*/                    = (ConnectionFactory)ctx.lookup("ConnectionFactory");        
/*18*/                Connection qCon = qConFactory.createConnection();
/*19*/                Session session
/*20*/                    = qCon.createSession(false,Session.AUTO_ACKNOWLEDGE);
/*21*/                Queue messageQueue = (Queue)ctx.lookup("queue/bitsinit");   
/*22*/                sender = session.createProducer(messageQueue);
/*23*/                for(int i=0;i<10;i++)
/*24*/                {
/*25*/                    msg = session.createTextMessage();
/*26*/                    msg.setText("Hello");
/*27*/                    sender.send(msg);
/*28*/              }
/*29*/                qCon.close();
/*30*/     }
/*31*/}
至此消息驱动bean基础只是已经了解!


阅读(3656) | 评论(2) | 转发(0) |
0

上一篇:JMS与消息队列

下一篇:EJB事务

给主人留下些什么吧!~~

chinaunix网友2011-06-08 16:35:03

是从其他的网站上拷贝的 不是抄书 你会抄上行号吗!用来学习一下而已!

chinaunix网友2011-04-12 10:53:53

好像是从一本上抄来的,就连毫无意义的行号注释都一样