問題:我的ActiveMQ接收消息用的是topic模式,持久化訂閱,問題是我用了JMS接收消息的代碼每次重新啟動(dòng)總是會(huì)收到最后一次的消息,但這些消息是已經(jīng)接收過了的,而且啟動(dòng)一次就收到一次,難道ActiveMQ不會(huì)清除緩存的嗎?
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
- connection = factory.createConnection();
- connection.setClientID(Constant.JMS_CLIENT_ID);
- session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
- Topic jmsSendTopic = session.createTopic(sendTopic);
- sendTopicProducer = session.createProducer(jmsSendTopic);
- sendTopicProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
- sendTopicProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
-
- Topic jmsReceiveTopic = session.createTopic(receiveTopic);
- receiveTopicConsumer = session.createDurableSubscriber(jmsReceiveTopic,Constant.JMS_SUBSCRIBE_NAME);
- receiveTopicConsumer.setMessageListener(this);
- connection.start();
//創(chuàng)建JMS連接和會(huì)話ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);connection = factory.createConnection();connection.setClientID(Constant.JMS_CLIENT_ID);session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);// 創(chuàng)建消息發(fā)送主題和發(fā)送者Topic jmsSendTopic = session.createTopic(sendTopic);sendTopicProducer = session.createProducer(jmsSendTopic);sendTopicProducer.setDeliveryMode(DeliveryMode.PERSISTENT);sendTopicProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);// 創(chuàng)建消息接收主題和接收者Topic jmsReceiveTopic = session.createTopic(receiveTopic);receiveTopicConsumer = session.createDurableSubscriber(jmsReceiveTopic,Constant.JMS_SUBSCRIBE_NAME);receiveTopicConsumer.setMessageListener(this);connection.start();
解答:問題原因在于這段代碼在接收到JMS消息時(shí)不會(huì)向ActiveMQ服務(wù)器確認(rèn)消息的接收,故而ActiveMQ服務(wù)器一直認(rèn)為該消息沒有成功發(fā)送給接收者,因而每次接收者重啟之后就會(huì)收到ActiveMQ服務(wù)器發(fā)送過來的消息。在這里要解釋一下session的創(chuàng)建。
- session = connection.createSession(true,Session.Auto_ACKNOWLEDGE);
session = connection.createSession(true,Session.Auto_ACKNOWLEDGE);
當(dāng)createSession第一個(gè)參數(shù)為true時(shí),表示創(chuàng)建的session被標(biāo)記為transactional的,確認(rèn)消息就通過確認(rèn)和校正來自動(dòng)地處理,第二個(gè)參數(shù)應(yīng)該是沒用的。
- session = connection.createSession(false,Session.Auto_ACKNOWLEDGE);
session = connection.createSession(false,Session.Auto_ACKNOWLEDGE);
當(dāng)createSession的第一個(gè)參數(shù)為false時(shí),表示創(chuàng)建的session沒有標(biāo)記為transactional,此時(shí)有三種用于消息確認(rèn)的選項(xiàng):
**AUTO_ACKNOWLEDGE session將自動(dòng)地確認(rèn)收到的一則消息;
**CLIENT_ACKNOWLEDGE 客戶端程序?qū)⒋_認(rèn)收到的一則消息,調(diào)用這則消息的確認(rèn)方法;
**DUPS_OK_ACKNOWLEDGE 這個(gè)選項(xiàng)命令session“懶散的”確認(rèn)消息傳遞,可以想到,這將導(dǎo)致消息提供者傳遞的一些復(fù)制消息可能出錯(cuò)。
JMS有兩種消息傳遞方式。標(biāo)記為NON_PERSISTENT的消息最多傳遞一次,而標(biāo)記為PERSISTENT的消息將使用暫存后再轉(zhuǎn)發(fā)的機(jī)理投遞。如果一個(gè)JMS服務(wù)離線,那么持久性消息不會(huì)丟失,但是得等到這個(gè)服務(wù)恢復(fù)聯(lián)機(jī)的時(shí)候才會(huì)被傳遞。所以默認(rèn)的消息傳遞方式是非持久性的,雖然使用非持久性消息可能降低內(nèi)存和需要的存儲(chǔ)器,但這種傳遞方式只有當(dāng)你不需要接收所有消息時(shí)才使用。
因此正確的代碼只需改動(dòng)一處就行了,即將true改為false
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
- connection = factory.createConnection();
- connection.setClientID(Constant.JMS_CLIENT_ID);
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Topic jmsSendTopic = session.createTopic(sendTopic);
- sendTopicProducer = session.createProducer(jmsSendTopic);
- sendTopicProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
- sendTopicProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
-
- Topic jmsReceiveTopic = session.createTopic(receiveTopic);
- receiveTopicConsumer = session.createDurableSubscriber(jmsReceiveTopic,Constant.JMS_SUBSCRIBE_NAME);
- receiveTopicConsumer.setMessageListener(this);
- connection.start();
//創(chuàng)建JMS連接和會(huì)話ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);connection = factory.createConnection();connection.setClientID(Constant.JMS_CLIENT_ID);session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 創(chuàng)建消息發(fā)送主題和發(fā)送者Topic jmsSendTopic = session.createTopic(sendTopic);sendTopicProducer = session.createProducer(jmsSendTopic);sendTopicProducer.setDeliveryMode(DeliveryMode.PERSISTENT);sendTopicProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);// 創(chuàng)建消息接收主題和接收者Topic jmsReceiveTopic = session.createTopic(receiveTopic);receiveTopicConsumer = session.createDurableSubscriber(jmsReceiveTopic,Constant.JMS_SUBSCRIBE_NAME);receiveTopicConsumer.setMessageListener(this);connection.start();