在使用Message Queue的過程中,總會由于種種原因而導(dǎo)致消息失敗。一個(gè)經(jīng)典的場景是一個(gè)生成者向Queue中發(fā)消息,里面包含了一組郵件地址和郵件內(nèi)容。而消費(fèi)者從Queue中將消息一條條讀出來,向指定郵件地址發(fā)送郵件。消費(fèi)者在發(fā)送消息的過程中由于種種原因會導(dǎo)致失敗,比如網(wǎng)絡(luò)超時(shí)、當(dāng)前郵件服務(wù)器不可用等。這樣我們就希望建立一種機(jī)制,對于未發(fā)送成功的郵件再重新發(fā)送,也就是重新處理。重新處理超過一定次數(shù)還不成功,就放棄對該消息的處理,記錄下來,繼續(xù)對剩余消息進(jìn)行處理。
ActiveMQ為我們實(shí)現(xiàn)了這一功能,叫做ReDelivery(重新投遞)。當(dāng)消費(fèi)者在處理消息時(shí)有異常發(fā)生,會將消息重新放回Queue里,進(jìn)行下一次處理。當(dāng)超過重試次數(shù)時(shí),消息會被放置到一個(gè)特殊的Queue中,即Dead Letter Queue,簡稱DLQ,用于進(jìn)行后續(xù)分析。
廢話不多說,一起來實(shí)現(xiàn)吧。(該示例中的全部代碼已放置到GitHub 上,請自行下載。)
還是接著本系列中的示例代碼來進(jìn)行。要實(shí)現(xiàn)ReDelivery功能,要給LinsterContainer加上事務(wù)處理。設(shè)置SimpleMessageListenerContainer的sessionTransacted屬性為true。
activeMQConnection.xml 123456789 <!-- Message Receiver Definition --> <bean id="messageReceiver" class="huangbowen.net.jms.retry.MessageReceiver"> </bean> <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destinationName" value="${jms.queue.name}"/> <property name="messageListener" ref="messageReceiver"/> <property name="sessionTransacted" value="true" /> </bean>
然后創(chuàng)建一個(gè)ReDeliveryPolicy,來定義ReDelivery的機(jī)制。
activeMQConnection.xml 1 <amq:redeliveryPolicy id="activeMQRedeliveryPolicy" destination="#defaultDestination" redeliveryDelay="100" maximumRedeliveries="4" />
這里設(shè)置ReDelivery的時(shí)間間隔是100毫秒,最大重發(fā)次數(shù)是4次。
在ActiveMQ的Connection Factory中應(yīng)用這個(gè)Policy。就是給Connection Factory設(shè)置屬性redeliveryPolicy為我們剛剛創(chuàng)建的Policy。
activeMQConnection.xml 123456 <!-- Activemq connection factory --> <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${jms.broker.url}?"/> <property name="useAsyncSend" value="true"/> <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" /> </bean>
這樣ReDelivery機(jī)制就設(shè)置好了。那么怎么能證明我不是在忽悠你們那?當(dāng)然最好的辦法是寫自動化測試來測試這個(gè)功能了。
首先修改下broker的配置,將其對消息的持久化設(shè)置為false,這樣每次運(yùn)行測試時(shí)Queue中消息都為0,用于還原現(xiàn)場。然后設(shè)置一個(gè)Destination Policy,當(dāng)消息超過重試次數(shù)仍未被正確處理時(shí),就把它放入到以DLQ.
為前綴的Queue中。由于ActiveMQ默認(rèn)對非持久化的Message不放入DLQ中的,所以手動設(shè)置processNonPersistent為true。
activeMQConnection.xml 123456789101112131415161718 <amq:broker id="activeMQBroker" persistent="false"> <amq:transportConnectors> <amq:transportConnector uri="${jms.broker.url}"/> </amq:transportConnectors> <amq:destinationPolicy> <amq:policyMap> <amq:policyEntries> <amq:policyEntry queue=">"> <amq:deadLetterStrategy> <amq:individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" processExpired="true" processNonPersistent="true" /> </amq:deadLetterStrategy> </amq:policyEntry> </amq:policyEntries> </amq:policyMap> </amq:destinationPolicy> </amq:broker>
然后新建一個(gè)MessageListener,當(dāng)接收到消息就拋出一個(gè)異常,這樣用以啟動ReDelivery機(jī)制。
retry/MessageReceiver 12345678910111213141516171819202122232425 package huangbowen.net.jms.retry;import org.springframework.jms.support.JmsUtils;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;public class MessageReceiver implements MessageListener { public void onMessage(Message message) { if(message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { String text = textMessage.getText(); System.out.println(String.format("Received: %s",text)); throw new JMSException("process failed"); } catch (JMSException e) { System.out.println("there is JMS exception: " + e.getMessage() ); throw JmsUtils.convertJmsAccessException(e); } } }}
最后新建一個(gè)集成測試類。
ReDeliveryFunctionIntegrationTest.java 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 package huangbowen.net;import huangbowen.net.jms.MessageSender;import org.junit.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.core.BrowserCallback;import org.springframework.jms.core.JmsTemplate;import org.springframework.test.annotation.DirtiesContext;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;import javax.jms.JMSException;import javax.jms.QueueBrowser;import javax.jms.Session;import java.util.Enumeration;import static org.hamcrest.core.Is.is;import static org.junit.Assert.assertThat;@ContextConfiguration(locations = {"/retry/activeMQConnection.xml"})@DirtiesContextpublic class ReDeliveryFunctionIntegrationTest extends AbstractJUnit4SpringContextTests { private final static String DLQ = "DLQ.bar"; @Autowired public JmsTemplate jmsTemplate; @Autowired public MessageSender messageSender; private int getMessagesInDLQ() { return jmsTemplate.browse(DLQ, new BrowserCallback<Integer>() { @Override public Integer doInJms(Session session, QueueBrowser browser) throws JMSException { Enumeration messages = browser.getEnumeration(); int total = 0; while(messages.hasMoreElements()) { messages.nextElement(); total++; } return total; } }); } @Test public void shouldRetryIfExceptionHappened() throws Exception { assertThat(getMessagesInDLQ(), is(0)); messageSender.send("this is a message"); Thread.sleep(5000); assertThat(getMessagesInDLQ(), is(1)); }}
我們通過Spring的Autowired功能拿到配置中的JmsTemplate和MessageSender。使用JmsTemplate的brower方法來讀取當(dāng)前DLQ.bar Queue中有多少剩余的消息。用MessageSender來發(fā)送一條消息,這樣即使我們有Listener來處理這條消息,但是由于每次都會拋出異常,超過限定次數(shù)后,被放置到了DLQ.bar中。我們檢測DLQ.bar中的消息數(shù)量就可以知道ReDelivery功能是否正確。
運(yùn)行測試,成功通過。這是日志信息:
12345678910111213 send: this is a messageReceived: this is a messagethere is JMS exception: process failedReceived: this is a messagethere is JMS exception: process failedReceived: this is a messagethere is JMS exception: process failedReceived: this is a messagethere is JMS exception: process failedReceived: this is a messagethere is JMS exception: process failedProcess finished with exit code 0
本系列的全部示例代碼請?jiān)?a >https://github.com/huangbowen521/SpringJMSSample下載。