問題:我的ActiveMQ接收消息用的是topic模式,持久化訂閱,問題是我用了JMS接收消息的代碼每次重新啟動總是會收到最后一次的消息,但這些消息是已經(jīng)接收過了的,而且啟動一次就收到一次,難道ActiveMQ不會清除緩存的嗎?
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
- connection = factory.createConnection();
- connection.setClientID(Constant.JMS_CLIENT_ID);
- session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
- Topic jmsSendTopic = session.createTopic(sendTopic);
- sendTopicProducer = session.createProducer(jmsSendTopic);
- sendTopicProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
- sendTopicProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
-
- Topic jmsReceiveTopic = session.createTopic(receiveTopic);
- receiveTopicConsumer = session.createDurableSubscriber(jmsReceiveTopic,Constant.JMS_SUBSCRIBE_NAME);
- receiveTopicConsumer.setMessageListener(this);
- connection.start();
//創(chuàng)建JMS連接和會話ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);connection = factory.createConnection();connection.setClientID(Constant.JMS_CLIENT_ID);session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);// 創(chuàng)建消息發(fā)送主題和發(fā)送者Topic jmsSendTopic = session.createTopic(sendTopic);sendTopicProducer = session.createProducer(jmsSendTopic);sendTopicProducer.setDeliveryMode(DeliveryMode.PERSISTENT);sendTopicProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);// 創(chuàng)建消息接收主題和接收者Topic jmsReceiveTopic = session.createTopic(receiveTopic);receiveTopicConsumer = session.createDurableSubscriber(jmsReceiveTopic,Constant.JMS_SUBSCRIBE_NAME);receiveTopicConsumer.setMessageListener(this);connection.start();
解答:問題原因在于這段代碼在接收到JMS消息時不會向ActiveMQ服務器確認消息的接收,故而ActiveMQ服務器一直認為該消息沒有成功發(fā)送給接收者,因而每次接收者重啟之后就會收到ActiveMQ服務器發(fā)送過來的消息。在這里要解釋一下session的創(chuàng)建。
- session = connection.createSession(true,Session.Auto_ACKNOWLEDGE);
session = connection.createSession(true,Session.Auto_ACKNOWLEDGE);
當createSession第一個參數(shù)為true時,表示創(chuàng)建的session被標記為transactional的,確認消息就通過確認和校正來自動地處理,第二個參數(shù)應該是沒用的。
- session = connection.createSession(false,Session.Auto_ACKNOWLEDGE);
session = connection.createSession(false,Session.Auto_ACKNOWLEDGE);
當createSession的第一個參數(shù)為false時,表示創(chuàng)建的session沒有標記為transactional,此時有三種用于消息確認的選項:
**AUTO_ACKNOWLEDGE session將自動地確認收到的一則消息;
**CLIENT_ACKNOWLEDGE 客戶端程序將確認收到的一則消息,調用這則消息的確認方法;
**DUPS_OK_ACKNOWLEDGE 這個選項命令session“懶散的”確認消息傳遞,可以想到,這將導致消息提供者傳遞的一些復制消息可能出錯。
JMS有兩種消息傳遞方式。標記為NON_PERSISTENT的消息最多傳遞一次,而標記為PERSISTENT的消息將使用暫存后再轉發(fā)的機理投遞。如果一個JMS服務離線,那么持久性消息不會丟失,但是得等到這個服務恢復聯(lián)機的時候才會被傳遞。所以默認的消息傳遞方式是非持久性的,雖然使用非持久性消息可能降低內存和需要的存儲器,但這種傳遞方式只有當你不需要接收所有消息時才使用。
因此正確的代碼只需改動一處就行了,即將true改為false
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
- connection = factory.createConnection();
- connection.setClientID(Constant.JMS_CLIENT_ID);
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Topic jmsSendTopic = session.createTopic(sendTopic);
- sendTopicProducer = session.createProducer(jmsSendTopic);
- sendTopicProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
- sendTopicProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
-
- Topic jmsReceiveTopic = session.createTopic(receiveTopic);
- receiveTopicConsumer = session.createDurableSubscriber(jmsReceiveTopic,Constant.JMS_SUBSCRIBE_NAME);
- receiveTopicConsumer.setMessageListener(this);
- connection.start();
//創(chuàng)建JMS連接和會話ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);connection = factory.createConnection();connection.setClientID(Constant.JMS_CLIENT_ID);session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 創(chuàng)建消息發(fā)送主題和發(fā)送者Topic jmsSendTopic = session.createTopic(sendTopic);sendTopicProducer = session.createProducer(jmsSendTopic);sendTopicProducer.setDeliveryMode(DeliveryMode.PERSISTENT);sendTopicProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);// 創(chuàng)建消息接收主題和接收者Topic jmsReceiveTopic = session.createTopic(receiveTopic);receiveTopicConsumer = session.createDurableSubscriber(jmsReceiveTopic,Constant.JMS_SUBSCRIBE_NAME);receiveTopicConsumer.setMessageListener(this);connection.start();