国产一级a片免费看高清,亚洲熟女中文字幕在线视频,黄三级高清在线播放,免费黄色视频在线看

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
ActiveMQ第五彈:增加ReDelivery功能

ActiveMQ第五彈:增加ReDelivery功能

在使用Message Queue的過程中,總會由于種種原因而導(dǎo)致消息失敗。一個(gè)經(jīng)典的場景是一個(gè)生成者向Queue中發(fā)消息,里面包含了一組郵件地址和郵件內(nèi)容。而消費(fèi)者從Queue中將消息一條條讀出來,向指定郵件地址發(fā)送郵件。消費(fèi)者在發(fā)送消息的過程中由于種種原因會導(dǎo)致失敗,比如網(wǎng)絡(luò)超時(shí)、當(dāng)前郵件服務(wù)器不可用等。這樣我們就希望建立一種機(jī)制,對于未發(fā)送成功的郵件再重新發(fā)送,也就是重新處理。重新處理超過一定次數(shù)還不成功,就放棄對該消息的處理,記錄下來,繼續(xù)對剩余消息進(jìn)行處理。

ActiveMQ為我們實(shí)現(xiàn)了這一功能,叫做ReDelivery(重新投遞)。當(dāng)消費(fèi)者在處理消息時(shí)有異常發(fā)生,會將消息重新放回Queue里,進(jìn)行下一次處理。當(dāng)超過重試次數(shù)時(shí),消息會被放置到一個(gè)特殊的Queue中,即Dead Letter Queue,簡稱DLQ,用于進(jìn)行后續(xù)分析。

廢話不多說,一起來實(shí)現(xiàn)吧。(該示例中的全部代碼已放置到GitHub上,請自行下載。)

還是接著本系列中的示例代碼來進(jìn)行。要實(shí)現(xiàn)ReDelivery功能,要給LinsterContainer加上事務(wù)處理。設(shè)置SimpleMessageListenerContainer的sessionTransacted屬性為true。

activeMQConnection.xml
123456789
    <!-- Message Receiver Definition -->    <bean id="messageReceiver" class="huangbowen.net.jms.retry.MessageReceiver">    </bean>    <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">        <property name="connectionFactory" ref="connectionFactory"/>        <property name="destinationName" value="${jms.queue.name}"/>        <property name="messageListener" ref="messageReceiver"/>        <property name="sessionTransacted" value="true" />    </bean>

然后創(chuàng)建一個(gè)ReDeliveryPolicy,來定義ReDelivery的機(jī)制。

activeMQConnection.xml
1
    <amq:redeliveryPolicy id="activeMQRedeliveryPolicy" destination="#defaultDestination" redeliveryDelay="100" maximumRedeliveries="4" />

這里設(shè)置ReDelivery的時(shí)間間隔是100毫秒,最大重發(fā)次數(shù)是4次。

在ActiveMQ的Connection Factory中應(yīng)用這個(gè)Policy。就是給Connection Factory設(shè)置屬性redeliveryPolicy為我們剛剛創(chuàng)建的Policy。

activeMQConnection.xml
123456
    <!-- Activemq connection factory -->    <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">        <property name="brokerURL" value="${jms.broker.url}?"/>        <property name="useAsyncSend" value="true"/>        <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />    </bean>

這樣ReDelivery機(jī)制就設(shè)置好了。那么怎么能證明我不是在忽悠你們那?當(dāng)然最好的辦法是寫自動化測試來測試這個(gè)功能了。

首先修改下broker的配置,將其對消息的持久化設(shè)置為false,這樣每次運(yùn)行測試時(shí)Queue中消息都為0,用于還原現(xiàn)場。然后設(shè)置一個(gè)Destination Policy,當(dāng)消息超過重試次數(shù)仍未被正確處理時(shí),就把它放入到以DLQ.為前綴的Queue中。由于ActiveMQ默認(rèn)對非持久化的Message不放入DLQ中的,所以手動設(shè)置processNonPersistent為true。

activeMQConnection.xml
123456789101112131415161718
    <amq:broker id="activeMQBroker" persistent="false">        <amq:transportConnectors>            <amq:transportConnector uri="${jms.broker.url}"/>        </amq:transportConnectors>        <amq:destinationPolicy>            <amq:policyMap>                <amq:policyEntries>                    <amq:policyEntry queue=">">                        <amq:deadLetterStrategy>                            <amq:individualDeadLetterStrategy                                    queuePrefix="DLQ." useQueueForQueueMessages="true" processExpired="true"                                    processNonPersistent="true" />                        </amq:deadLetterStrategy>                    </amq:policyEntry>                </amq:policyEntries>            </amq:policyMap>        </amq:destinationPolicy>    </amq:broker>

然后新建一個(gè)MessageListener,當(dāng)接收到消息就拋出一個(gè)異常,這樣用以啟動ReDelivery機(jī)制。

retry/MessageReceiver
12345678910111213141516171819202122232425
package huangbowen.net.jms.retry;import org.springframework.jms.support.JmsUtils;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;public class MessageReceiver implements MessageListener {    public void onMessage(Message message) {        if(message instanceof TextMessage) {            TextMessage textMessage = (TextMessage) message;            try {                String text = textMessage.getText();                System.out.println(String.format("Received: %s",text));                throw new JMSException("process failed");            } catch (JMSException e) {                System.out.println("there is JMS exception: " + e.getMessage() );                throw JmsUtils.convertJmsAccessException(e);            }        }    }}

最后新建一個(gè)集成測試類。

ReDeliveryFunctionIntegrationTest.java
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
package huangbowen.net;import huangbowen.net.jms.MessageSender;import org.junit.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.core.BrowserCallback;import org.springframework.jms.core.JmsTemplate;import org.springframework.test.annotation.DirtiesContext;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;import javax.jms.JMSException;import javax.jms.QueueBrowser;import javax.jms.Session;import java.util.Enumeration;import static org.hamcrest.core.Is.is;import static org.junit.Assert.assertThat;@ContextConfiguration(locations = {"/retry/activeMQConnection.xml"})@DirtiesContextpublic class ReDeliveryFunctionIntegrationTest extends AbstractJUnit4SpringContextTests {    private final static String DLQ = "DLQ.bar";    @Autowired    public JmsTemplate jmsTemplate;    @Autowired    public MessageSender messageSender;    private int getMessagesInDLQ() {        return jmsTemplate.browse(DLQ, new BrowserCallback<Integer>() {            @Override            public Integer doInJms(Session session, QueueBrowser browser) throws JMSException {                Enumeration messages = browser.getEnumeration();                int total = 0;                while(messages.hasMoreElements()) {                    messages.nextElement();                    total++;                }                return  total;            }        });    }    @Test    public void shouldRetryIfExceptionHappened() throws Exception {        assertThat(getMessagesInDLQ(), is(0));        messageSender.send("this is a message");        Thread.sleep(5000);        assertThat(getMessagesInDLQ(), is(1));    }}

我們通過Spring的Autowired功能拿到配置中的JmsTemplate和MessageSender。使用JmsTemplate的brower方法來讀取當(dāng)前DLQ.bar Queue中有多少剩余的消息。用MessageSender來發(fā)送一條消息,這樣即使我們有Listener來處理這條消息,但是由于每次都會拋出異常,超過限定次數(shù)后,被放置到了DLQ.bar中。我們檢測DLQ.bar中的消息數(shù)量就可以知道ReDelivery功能是否正確。

運(yùn)行測試,成功通過。這是日志信息:

12345678910111213
send: this is a messageReceived: this is a messagethere is JMS exception: process failedReceived: this is a messagethere is JMS exception: process failedReceived: this is a messagethere is JMS exception: process failedReceived: this is a messagethere is JMS exception: process failedReceived: this is a messagethere is JMS exception: process failedProcess finished with exit code 0

本系列的全部示例代碼請?jiān)?a >https://github.com/huangbowen521/SpringJMSSample下載。

本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊舉報(bào)
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
深入掌握J(rèn)MS(二):一個(gè)JMS例子
深入掌握J(rèn)MS
ActiveMQ 即時(shí)通訊服務(wù) 淺析(二)
博客開張:用Spring ActiveMQ Jencks開發(fā)消息驅(qū)動POJO-Jamsa ...
【Active入門
springboot 集成ActiveMQ
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服