国产一级a片免费看高清,亚洲熟女中文字幕在线视频,黄三级高清在线播放,免费黄色视频在线看

打開(kāi)APP
userphoto
未登錄

開(kāi)通VIP,暢享免費(fèi)電子書(shū)等14項(xiàng)超值服

開(kāi)通VIP
Spring集成RabbitMQ并實(shí)現(xiàn)延遲隊(duì)列

一、說(shuō)明

在實(shí)際業(yè)務(wù)場(chǎng)景中可能會(huì)用到延時(shí)消息發(fā)送,例如異步回調(diào)失敗時(shí)的重發(fā)機(jī)制。 RabbitMQ本身不具有延時(shí)消息隊(duì)列的功能,但是可以通過(guò)rabbitmq-delayed-message-exchange來(lái)實(shí)現(xiàn)(也可以通過(guò)TTL(Time To Live)、DLX(Dead Letter Exchanges)特性實(shí)現(xiàn),我們主要講解通過(guò)延遲插件來(lái)實(shí)現(xiàn)的方法)。利用RabbitMQ的這種特性,應(yīng)該可以實(shí)現(xiàn)很多現(xiàn)實(shí)中的業(yè)務(wù),我們可以發(fā)揮想象。 

二、安裝插件

RabbitMQ的安裝請(qǐng)參考我的文章“RabbitMQ安裝與使用”,這里我們重點(diǎn)講插件的安裝。

首先到http://www.rabbitmq.com/community-plugins.html網(wǎng)頁(yè)下載適合的“rabbitmq_delayed_message_exchange插件”。下載完成后將它放到RabbitMQ插件安裝目錄({rabbitmq-server}/plugins/),然后執(zhí)行命令rabbitmq-plugins enable rabbitmq_delayed_message_exchange啟用插件,執(zhí)行命令rabbitmq-plugins disable rabbitmq_delayed_message_exchange也可以關(guān)閉插件。具體過(guò)程可以查看參考文檔2。

三、Spring集成RabbitMQ

1、maven配置

  1. <dependency>  
  2.     <groupId>org.springframework.amqp</groupId>  
  3.     <artifactId>spring-amqp</artifactId>  
  4.     <version>1.6.6.RELEASE</version>  
  5.     <exclusions>  
  6.         <exclusion>  
  7.             <groupId>org.springframework</groupId>  
  8.             <artifactId>spring-core</artifactId>  
  9.             <version>4.1.6.RELEASE</version>  
  10.         </exclusion>  
  11.     </exclusions>  
  12. </dependency>  
  13. <dependency>  
  14.     <groupId>org.springframework.amqp</groupId>  
  15.     <artifactId>spring-rabbit</artifactId>  
  16.     <version>1.6.6.RELEASE</version>  
  17.     <exclusions>  
  18.         <exclusion>  
  19.             <groupId>org.springframework</groupId>  
  20.             <artifactId>spring-core</artifactId>  
  21.             <version>4.1.6.RELEASE</version>  
  22.         </exclusion>  
  23.         <exclusion>  
  24.             <groupId>org.springframework</groupId>  
  25.             <artifactId>spring-messaging</artifactId>  
  26.             <version>4.1.6.RELEASE</version>  
  27.         </exclusion>  
  28.         <exclusion>  
  29.             <groupId>org.springframework</groupId>  
  30.             <artifactId>spring-tx</artifactId>  
  31.             <version>4.1.6.RELEASE</version>  
  32.         </exclusion>  
  33.         <exclusion>  
  34.             <groupId>org.springframework</groupId>  
  35.             <artifactId>spring-context</artifactId>  
  36.             <version>4.1.6.RELEASE</version>  
  37.         </exclusion>  
  38.     </exclusions>  
  39. </dependency>  
說(shuō)明:實(shí)現(xiàn)延遲隊(duì)列需要Spring在4.1以上,spring-amqp在1.6以上。

2、xml配置

  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"  
  4.     xmlns:util="http://www.springframework.org/schema/util" xmlns:context="http://www.springframework.org/schema/context"  
  5.     xmlns:rabbit="http://www.springframework.org/schema/rabbit"  
  6.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd  
  7.                             http://www.springframework.org/schema/context   
  8.                             http://www.springframework.org/schema/context/spring-context-3.1.xsd  
  9.                             http://www.springframework.org/schema/tx  
  10.                             http://www.springframework.org/schema/tx/spring-tx.xsd   
  11.                             http://www.springframework.org/schema/aop  
  12.                             http://www.springframework.org/schema/aop/spring-aop.xsd  
  13.                             http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd   
  14.                             http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">  
  15.     <context:property-placeholder location="classpath:rmq-config.properties" ignore-unresolvable="true"/>  
  16.   
  17.     <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">  
  18.         <property name="host" value="${rabbitmq.host}" />  
  19.         <property name="port" value="${rabbitmq.port}" />  
  20.         <property name="username" value="${rabbitmq.username}" />  
  21.         <property name="password" value="${rabbitmq.password}" />  
  22.         <property name="channelCacheSize" value="${rabbitmq.channel.cacheSize}" />  
  23.     </bean>  
  24.   
  25.     <bean id="orderConsumer" class="com.xxx.rmq.OrderConsumer"></bean>  
  26.     <bean id="messageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter" />  
  27.     <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />  
  28.   
  29.     <rabbit:admin connection-factory="connectionFactory" />  
  30.       
  31.     <!-- 延遲消息start -->  
  32.     <rabbit:topic-exchange name="delay_exchange" delayed="true">  
  33.         <rabbit:bindings>  
  34.             <rabbit:binding queue="delay_queue" pattern="order.delay.notify" />  
  35.         </rabbit:bindings>  
  36.     </rabbit:topic-exchange>  
  37.       
  38.     <rabbit:queue name="delay_queue" durable="true" auto-declare="true" auto-delete="false" />  
  39.       
  40.     <rabbit:template id="delayMsgTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" exchange="delay_exchange" />  
  41.       
  42.     <rabbit:listener-container connection-factory="connectionFactory" channel-transacted="false" acknowledge="auto" message-converter="jsonMessageConverter">  
  43.         <rabbit:listener queues="delay_queue" ref="orderConsumer" method="delayMsg" />  
  44.     </rabbit:listener-container>  
  45.     <!-- 延遲消息end -->  
  46.   
  47. </beans>  
說(shuō)明:spring-rabbit-1.6.xsd必須是1.6及以上版本,否則會(huì)報(bào)“元素 'rabbit:topic-exchange' 中不允許出現(xiàn)屬性 'delayed'”錯(cuò)誤。具體請(qǐng)查看參考文檔3。

四、延遲隊(duì)列的使用

1、發(fā)送消息Producer

  1. import net.sf.json.JSONObject;  
  2.   
  3. import org.apache.commons.lang.StringUtils;  
  4. import org.springframework.amqp.AmqpException;  
  5. import org.springframework.amqp.core.AmqpTemplate;  
  6. import org.springframework.amqp.core.Message;  
  7. import org.springframework.amqp.core.MessagePostProcessor;  
  8. import org.springframework.beans.factory.annotation.Autowired;  
  9. import org.springframework.stereotype.Service;  
  10. /**  
  11.  *  
  12.  * @author Horace  
  13.  * @version 創(chuàng)建時(shí)間:2016年10月26日 下午6:34:31  
  14.  */  
  15. @Service  
  16. public class MessageProducerServiceImpl implements MessageProducerService{  
  17.     @Autowired  
  18.     private AmqpTemplate delayMsgTemplate;  
  19.     @Override  
  20.     public void delayMsg(JSONObject msg,int delay) {  
  21.         // TODO Auto-generated method stub   
  22.         final int xdelay= delay*1000;   
  23.         delayMsgTemplate.convertAndSend("order.delay.notify", (Object) msg,  
  24.                 new MessagePostProcessor() {  
  25.   
  26.                     @Override  
  27.                     public Message postProcessMessage(Message message)  
  28.                             throws AmqpException {  
  29.                         // TODO Auto-generated method stub  
  30.                         message.getMessageProperties().setDelay(xdelay);  
  31.                         return message;  
  32.                     }  
  33.                 });  
  34.     }  
  35. }  

2、異步接收消息Consumer

  1. import net.sf.json.JSONObject;  
  2. import org.apache.commons.lang.StringUtils;  
  3. import org.slf4j.Logger;  
  4. import org.slf4j.LoggerFactory;  
  5. import org.springframework.beans.factory.annotation.Autowired;  
  6.   
  7. /**  
  8.  * 
  9.  * @author Horace  
  10.  * @version 創(chuàng)建時(shí)間:2016年10月26日 下午2:48:14  
  11.  */  
  12. public class OrderConsumer {  
  13.       
  14.     private static Logger logger = LoggerFactory.getLogger(OrderConsumer.class);  
  15.       
  16.     @Autowired  
  17.     private MessageProducerService messageProducerService;  
  18.       
  19.       
  20.     public void delayMsg(Object obj) {  
  21.         logger.info("[延時(shí)消息]" + obj);  
  22.         if (obj != null) {  
  23.             JSONObject notifyJson = JSONObject.fromObject(obj);  
  24.             String notifyUrl = notifyJson.getString("notifyUrl");  
  25.             String notifyContent = notifyJson.getString("notifyContent");  
  26.             String result = HttpUtil.postMessage(notifyUrl, notifyContent);  
  27.             if (StringUtils.isBlank(result)) { // 通知失敗 進(jìn)入重發(fā)機(jī)制  
  28.                 int newNotifyCount = notifyJson.getInt("notifyCount") + 1; //已經(jīng)通知的次數(shù)  
  29.                 if (newNotifyCount < 5) {  
  30.                     notifyJson.put("notifyCount", newNotifyCount);  
  31.                     int spacingInterval = getSpacingInterval(newNotifyCount);  
  32.                     messageProducerService  
  33.                             .delayMsg(notifyJson, spacingInterval);  
  34.                 } else {  
  35.                     logger.info("通知5次都失敗,等待后臺(tái)手工處理!");  
  36.                 }  
  37.             }  
  38.         }  
  39.     }  
  40.       
  41.     /** 
  42.      * 重復(fù)通知間隔時(shí)間(單位為秒) 
  43.      * @param notifyCount 已經(jīng)通知的次數(shù) 
  44.      * @return 
  45.      */  
  46.     private int getSpacingInterval(int notifyCount) {  
  47.         // TODO Auto-generated method stub  
  48.         int spacingInterval = 0;  
  49.         switch (notifyCount) {  
  50.         case 1:  
  51.             spacingInterval = 10;  
  52.             break;  
  53.         case 2:  
  54.             spacingInterval = 20;  
  55.             break;  
  56.         case 3:  
  57.             spacingInterval = 30;  
  58.             break;  
  59.         case 4:  
  60.             spacingInterval = 60;  
  61.             break;  
  62.         case 5:  
  63.             spacingInterval = 90;  
  64.             break;  
  65.         default:  
  66.             break;  
  67.         }  
  68.         return spacingInterval;  
  69.     }  
  70.       
  71. }  


參考文檔:

1、http://blog.csdn.net/tongdao/article/details/51638066 RabbitMQ安裝與使用

2、http://blog.csdn.net/u014308482/article/details/53036770  rabbitmq 實(shí)現(xiàn)延遲隊(duì)列的兩種方式

3、http://docs.spring.io/spring-amqp/docs/1.6.0.RELEASE/reference/html/_reference.html#delayed-message-exchange

本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)
打開(kāi)APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
springboot整合rabbitMQ
11-RabbitMQ高級(jí)特性-消息可靠性投遞
RabbitMQ與java、Spring結(jié)合實(shí)例詳細(xì)講解
pom.xml
RabbitMQ教程
SpringMVC和rabbitmq集成的使用案例
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長(zhǎng)圖 關(guān)注 下載文章
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服