開始文章之前先澄清幾個概念
什么是消息 消息是一個用于在組件和應(yīng)用程序之間通訊的的方法。消息之間的傳遞是點對點的。任何終端之間都可以相互接受和發(fā)送消息。并且每個終端都必須遵守如下的規(guī)則 -> 創(chuàng)建消息 -> 發(fā)送消息 -> 接收消息 -> 讀取消息
為什么要使用消息 理由很簡單,消息是一個分布式的低耦合通訊方案。A發(fā)送一個消息到一個agent ,B作為接受者去agent上獲取消息。但是A,B不需要同時到agent上去注冊。agent作為一個中轉(zhuǎn)為A,B提供搞效率的通訊服務(wù)。
開發(fā)者的關(guān)注點 走到這里,我也不想去解釋jms spec上那些抽象且復(fù)雜的概念了,說的很白,1年多了我自己也沒弄懂是個什么東西,也沒時間從頭到尾去仔細(xì)的看,同時我認(rèn)為沒必要,我所關(guān)注的是如何讓jms跑起來,并且工作正常,所以spec只是個字典,當(dāng)我需要用的時候才去查。
開發(fā)者的jms環(huán)境 遵守簡單明了的原則,所謂jms環(huán)境只是2個對象 1> ConnectionFactory 2> Destination 通常Provider會提供JNDI的對象獲取,具體方法可以去Privider的網(wǎng)站上搜索jndi support
下面我以jbossMq為介質(zhì)跑一個簡單的jms,為了保證jms的本質(zhì)清晰,我沒有使用jbossMq的Api,而是直接調(diào)用的jms Api. java 代碼 - package com.javaeye.jms.jboss;
-
- import javax.jms.Connection;
- import javax.jms.ConnectionFactory;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.MessageConsumer;
- import javax.jms.MessageProducer;
- import javax.jms.Queue;
- import javax.jms.QueueSender;
- import javax.jms.Session;
- import javax.jms.TextMessage;
- import javax.naming.Context;
- import javax.naming.InitialContext;
- import javax.naming.NamingException;
-
- public class JbossNativeJmsImpl {
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- public void sendingProcessing(String messege) throws NamingException, JMSException{
- Context ctx = new InitialContext();
- ConnectionFactory cf = (ConnectionFactory) ctx.lookup("java:JmsXA");
- Connection conn = cf.createConnection();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination dest = (Queue) ctx.lookup("queue/A");
- MessageProducer msgp = session.createProducer(dest);
- QueueSender sender = (QueueSender) msgp;
- TextMessage msg = session.createTextMessage();
- msg.setText(messege);
- sender.send(msg);
- conn.close();
- }
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- public String retriveingProcessing() throws NamingException, JMSException{
- Context ctx = new InitialContext();
- ConnectionFactory cf = (ConnectionFactory) ctx.lookup("java:JmsXA");
- Connection conn = cf.createConnection();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination dest = (Queue) ctx.lookup("queue/A");
- MessageConsumer msgconsumer = session.createConsumer(dest);
-
-
- conn.start();
- TextMessage msg = (TextMessage) msgconsumer.receive();
- conn.close();
- System.out.println("messege is" + msg.getText());
- return msg.getText();
- }
- }
注意retrive函數(shù)中comment的掉的兩行,消息Listener的作用是實現(xiàn)異步通訊,但是它有一個約定,必須和發(fā)送者 保持物理上的分離,針對于jboss而言,就要求這個Listener必須跑在容器外面。這是一個很搞的問題,每天Jms的郵件列表里面都有無數(shù)的這樣的問題發(fā)過來。但是回復(fù)的人很少。我自己也從來不回復(fù)。 其實我也不清楚寫這篇文章到底是出于什么目的,怕只是讓這么一個簡單的問題有一個回答而已。
把下面這個程序跑起來就可以異步接受消息了。
java 代碼 - package com.javaeye.jms.jboss;
-
- import java.util.Properties;
-
- import javax.jms.Connection;
- import javax.jms.ConnectionFactory;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.MessageConsumer;
- import javax.jms.MessageListener;
- import javax.jms.Session;
- import javax.naming.Context;
- import javax.naming.InitialContext;
- import javax.naming.NamingException;
-
- import com.javaeye.spring.services.jms.mdp.JmsListenner;
-
- public class JbossJmsAsync {
-
-
-
-
-
-
- public static void main(String[] args) throws NamingException, JMSException {
- Properties pops = new Properties();
- pops.setProperty("jboss.bind.address", "0.0.0.0");
- pops.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
- pops.setProperty("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
- pops.setProperty("java.naming.provider.url", "localhost");
- Context ctx = new InitialContext(pops);
- ConnectionFactory cf = (ConnectionFactory) ctx.lookup("ConnectionFactory");
- Connection conn = cf.createConnection();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination dest = (Destination) ctx.lookup("queue/A");
- MessageConsumer msgConsumer = session.createConsumer(dest);
- MessageListener ml = new JmsListenner();
- msgConsumer.setMessageListener(ml);
- conn.start();
- }
-
- }
javaeye的主題好像是spring,為了迎合領(lǐng)導(dǎo),下面我把這套東西跑在spring里面。同時我發(fā)現(xiàn)spring對jms的包裝真的簡單,而且還提供了一個模版,雖然這個模版的接口是在是很羅唆。
ps:今天是第1次用spring在reference里找了半天找不到方法注入的辦法,于是google了一個注入辦法,不合理的地方請大家指出。首先我通過方法來注入ConnectionFactory和Destination這兩個對象來支撐jms環(huán)境
java 代碼 - package com.javaeye.spring.services.jms.mdp;
-
- import java.util.Properties;
-
- import javax.jms.ConnectionFactory;
- import javax.jms.Destination;
- import javax.jms.Queue;
- import javax.naming.Context;
- import javax.naming.InitialContext;
- import javax.naming.NamingException;
-
- public class UserJmsTransactionUtil {
-
- private String connectionFactoryJndiLookUp;
-
- private String destinationJndiLookUp;
-
- private String localConnectionFactoryJndiLookUp;
-
- private String containerType;
-
-
- public String getConnectionFactoryJndiLookUp() {
- return connectionFactoryJndiLookUp;
- }
-
-
-
- public void setConnectionFactoryJndiLookUp(String connectionFactoryJndiLookUp) {
- this.connectionFactoryJndiLookUp = connectionFactoryJndiLookUp;
- }
-
-
-
- public String getDestinationJndiLookUp() {
- return destinationJndiLookUp;
- }
-
-
-
- public void setDestinationJndiLookUp(String destinationJndiLookUp) {
- this.destinationJndiLookUp = destinationJndiLookUp;
- }
-
-
-
- public ConnectionFactory getConnectionFactory() throws NamingException{
- Context ctx = new InitialContext();
- ConnectionFactory cf = (ConnectionFactory) ctx.lookup(connectionFactoryJndiLookUp);
- return cf;
- }
-
-
- public Destination getJmsDestination() throws NamingException{
- Context ctx = new InitialContext();
- Destination dest = (Queue) ctx.lookup(destinationJndiLookUp);
- return dest;
- }
-
-
- public ConnectionFactory getQueueConnectionFactory() throws NamingException{
- Properties pops = new Properties();
- pops.setProperty("jboss.bind.address", "0.0.0.0");
- pops.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
- pops.setProperty("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
- pops.setProperty("java.naming.provider.url", "localhost");
- Context ctx = new InitialContext(pops);
- ConnectionFactory cf = (ConnectionFactory) ctx.lookup(localConnectionFactoryJndiLookUp);
- return cf;
- }
-
-
- public Destination getLocalJmsDestination() throws NamingException{
- Properties pops = new Properties();
- pops.setProperty("jboss.bind.address", "0.0.0.0");
- pops.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
- pops.setProperty("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
- pops.setProperty("java.naming.provider.url", "localhost");
- Context ctx = new InitialContext(pops);
- Destination dest = (Destination) ctx.lookup(destinationJndiLookUp);
- return dest;
- }
-
-
-
- public String getLocalConnectionFactoryJndiLookUp() {
- return localConnectionFactoryJndiLookUp;
- }
-
-
-
- public void setLocalConnectionFactoryJndiLookUp(
- String localConnectionFactoryJndiLookUp) {
- this.localConnectionFactoryJndiLookUp = localConnectionFactoryJndiLookUp;
- }
- }
發(fā)送端的配置如下
xml 代碼 - <beans>
- <bean id="userJmsUtil" class="com.javaeye.spring.services.jms.mdp.UserJmsTransactionUtil">
- <property name="connectionFactoryJndiLookUp" value="java:JmsXA"><!--</span-->property>
- <property name="destinationJndiLookUp" value="queue/A"><!--</span-->property>
- <property name="localConnectionFactoryJndiLookUp" value="ConnectionFactory"><!--</span-->property>
- <!--</span-->bean>
-
- <bean id="connectionFactory" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
- <property name="targetObject" ref="userJmsUtil"><!--</span-->property>
- <property name="targetMethod" value="getConnectionFactory"><!--</span-->property>
- <!--</span-->bean>
-
- <bean id="queue" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
- <property name="targetObject" ref="userJmsUtil"><!--</span-->property>
- <property name="targetMethod" value="getJmsDestination"><!--</span-->property>
- <!--</span-->bean>
-
- <bean id="jmsQueue" class="org.springframework.jms.core.JmsTemplate">
- <property name="connectionFactory" ref="connectionFactory"><!--</span-->property>
- <property name="defaultDestination" ref="queue"><!--</span-->property>
- <property name="messageConverter">
- <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"><!--</span-->bean>
- <!--</span-->property>
- <!--</span-->bean>
- <!--</span-->beans>
ps:javaeye的模版工具bug還真多,不管了.
如果使用Listenner的化,一樣需要遵守發(fā)送者和接收者物理隔離的原則,我的做法是把發(fā)送者配到一個xml中,在把接受者配到另外一個xml中去,發(fā)送的配置綁定到容器里,接收者的跑在本地.否則spring初始化是過不去的.
下面這個程序是發(fā)送消息的程序.使用了spring的模版,發(fā)條消息比new個對象還簡單.同時spring還提供了適配器的接口,一樣通過聲明式的配置,這樣可以在同一個接口里發(fā)送各種類型的消息了.同時支持事務(wù),我還不知道這個有什么用呵呵,第1次使用嘛!但是就使用上來說,spring是最簡單的.2者都只需要注入一個對象而已.
java 代碼 - @Test public void send(){
- ApplicationContext ac = new FileSystemXmlApplicationContext("jms.xml");
- BeanFactory bf = ac;
- JmsTemplate jt = (JmsTemplate) bf.getBean("jmsQueue");
- jt.convertAndSend("2132134");
- }
接收端的配置如下 xml 代碼 - xml version="1.0" encoding="UTF-8"?>
- >
- <beans>
-
- <bean id="listenner" class="com.javaeye.spring.services.jms.mdp.JmsListenner"><!--</span-->bean>
-
- <bean id="userJmsUtil" class="com.javaeye.spring.services.jms.mdp.UserJmsTransactionUtil">
- <property name="connectionFactoryJndiLookUp" value="java:JmsXA"><!--</span-->property>
- <property name="destinationJndiLookUp" value="queue/A"><!--</span-->property>
- <property name="localConnectionFactoryJndiLookUp" value="ConnectionFactory"><!--</span-->property>
- <!--</span-->bean>
-
- <bean id="localConnectionFactory" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
- <property name="targetObject" ref="userJmsUtil"><!--</span-->property>
- <property name="targetMethod" value="getQueueConnectionFactory"><!--</span-->property>
- <!--</span-->bean>
-
- <bean id="localDestination" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
- <property name="targetObject" ref="userJmsUtil"><!--</span-->property>
- <property name="targetMethod" value="getLocalJmsDestination"><!--</span-->property>
- <!--</span-->bean>
-
- <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
- <property name="concurrentConsumers" value="5"><!--</span-->property>
- <property name="connectionFactory" ref="localConnectionFactory"><!--</span-->property>
- <property name="destination" ref="localDestination"><!--</span-->property>
- <property name="messageListener" ref="listenner"><!--</span-->property>
- <!--</span-->bean>
- <!--</span-->beans>
接收端由于需要從jbossmq里取ConnectionFactory和Destination,所以,我調(diào)用的是userJmsUtil的localLookup.這個函數(shù)的作用等同于發(fā)送者的那個函數(shù),只不過前者是容器外獲取,而后者是容器內(nèi)的而已.
java 代碼 - package com.javaeye.spring.services.jms.mdp;
-
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageListener;
- import javax.jms.TextMessage;
-
- public class JmsListenner implements MessageListener {
-
- public void onMessage(Message message) {
- try {
- TextMessage msg = (TextMessage) message;
- System.out.println(msg.getText());
- } catch (JMSException e) { e.printStackTrace(); }
- }
-
- }
spring對jms的整合里提到了一個jms provider ActiveMQ,要用一個開源框架要做的第一件事就是先跑一個demo起來,同樣,我們要做的事還是獲取ConnectionFactory和Destination對象,還好,ActiveMQ的JNDI實現(xiàn)比jbossMQ還要簡單,直接通過一個本地的Context就可以查到了,具體的可以參照ActiveMQ官方的支持文檔.
| |