轉(zhuǎn)載請(qǐng)注明出處
ps: 文章里面延遲隊(duì)列=延時(shí)隊(duì)列
延遲隊(duì)列存儲(chǔ)的對(duì)象肯定是對(duì)應(yīng)的延時(shí)消息,所謂”延時(shí)消息”是指當(dāng)消息被發(fā)送以后,并不想讓消費(fèi)者立即拿到消息,而是等待指定時(shí)間后,消費(fèi)者才拿到這個(gè)消息進(jìn)行消費(fèi)。
場(chǎng)景一:在訂單系統(tǒng)中,一個(gè)用戶下單之后通常有30分鐘的時(shí)間進(jìn)行支付,如果30分鐘之內(nèi)沒有支付成功,那么這個(gè)訂單將進(jìn)行一場(chǎng)處理。這是就可以使用延時(shí)隊(duì)列將訂單信息發(fā)送到延時(shí)隊(duì)列。
場(chǎng)景二:用戶希望通過手機(jī)遠(yuǎn)程遙控家里的智能設(shè)備在指定的時(shí)間進(jìn)行工作。這時(shí)候就可以將用戶指令發(fā)送到延時(shí)隊(duì)列,當(dāng)指令設(shè)定的時(shí)間到了再將指令推送到只能設(shè)備。
AMQP協(xié)議和RabbitMQ隊(duì)列本身沒有直接支持延遲隊(duì)列功能,但是可以通過以下特性模擬出延遲隊(duì)列的功能。
但是我們可以通過RabbitMQ的兩個(gè)特性來曲線實(shí)現(xiàn)延遲隊(duì)列:
RabbitMQ可以針對(duì)Queue設(shè)置x-expires 或者 針對(duì)Message設(shè)置 x-message-ttl,來控制消息的生存時(shí)間,如果超時(shí)(兩者同時(shí)設(shè)置以最先到期的時(shí)間為準(zhǔn)),則消息變?yōu)閐ead letter(死信)
RabbitMQ針對(duì)隊(duì)列中的消息過期時(shí)間有兩種方法可以設(shè)置。
- A: 通過隊(duì)列屬性設(shè)置,隊(duì)列中所有消息都有相同的過期時(shí)間。
- B: 對(duì)消息進(jìn)行單獨(dú)設(shè)置,每條消息TTL可以不同。
如果同時(shí)使用,則消息的過期時(shí)間以兩者之間TTL較小的那個(gè)數(shù)值為準(zhǔn)。消息在隊(duì)列的生存時(shí)間一旦超過設(shè)置的TTL值,就成為dead letter
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個(gè)參數(shù),如果隊(duì)列內(nèi)出現(xiàn)了dead letter,則按照這兩個(gè)參數(shù)重新路由轉(zhuǎn)發(fā)到指定的隊(duì)列。
- x-dead-letter-exchange:出現(xiàn)dead letter之后將dead letter重新發(fā)送到指定exchange
- x-dead-letter-routing-key:出現(xiàn)dead letter之后將dead letter重新按照指定的routing-key發(fā)送
隊(duì)列出現(xiàn)dead letter的情況有:
消息或者隊(duì)列的TTL過期
隊(duì)列達(dá)到最大長度
消息被消費(fèi)端拒絕(basic.reject or basic.nack)并且requeue=false
綜合上述兩個(gè)特性,設(shè)置了TTL規(guī)則之后當(dāng)消息在一個(gè)隊(duì)列中變成死信時(shí),利用DLX特性它能被重新轉(zhuǎn)發(fā)到另一個(gè)Exchange或者Routing Key,這時(shí)候消息就可以重新被消費(fèi)了。
第一步:設(shè)置TTL產(chǎn)生死信,有兩種方式Per-Message TTL和 Queue TTL,第一種可以針對(duì)每一條消息設(shè)置一個(gè)過期時(shí)間使用于大多數(shù)場(chǎng)景,第二種針對(duì)隊(duì)列設(shè)置過期時(shí)間、適用于一次性延時(shí)任務(wù)的場(chǎng)景
還有其他產(chǎn)生死信的方式比如消費(fèi)者拒絕消費(fèi) basic.reject 或者 basic.nack ( 前提要設(shè)置消費(fèi)者的屬性requeue=false)
- Per-Message TTL (對(duì)每一條消息設(shè)置一個(gè)過期時(shí)間)(官方文檔)
java client發(fā)送一條只能駐留60秒的消息到隊(duì)列:
byte[] messageBodyBytes = "Hello, world!".getBytes();AMQP.BasicProperties properties = new AMQP.BasicProperties();properties.setExpiration("60000");//設(shè)置消息的過期時(shí)間為60秒channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);//這條消息發(fā)送到相應(yīng)的隊(duì)列之后,如果60秒內(nèi)沒有被消費(fèi),則變?yōu)樗佬?/code>- 1
- 2
- 3
- 4
- 5
創(chuàng)建一個(gè)隊(duì)列,隊(duì)列的消息過期時(shí)間為30分鐘(這個(gè)隊(duì)列30分鐘內(nèi)沒有消費(fèi)者消費(fèi)消息則刪除,刪除后隊(duì)列內(nèi)的消息變?yōu)樗佬?
java client方式:Map<String, Object> args = new HashMap<String, Object>();args.put("x-expires", 1800000);channel.queueDeclare("myqueue", false, false, false, args);rabbitmqctl命令方式(.* 為所有隊(duì)列, 可以替換為指定隊(duì)列):rabbitmqctl set_policy expiry ".*" '{"expires":1800000}' --apply-to queuesrabbitmqctl (Windows):rabbitmqctl set_policy expiry ".*" "{""expires"":1800000}" --apply-to queues
第二步:設(shè)置死信的轉(zhuǎn)發(fā)規(guī)則(如果沒有任何規(guī)則,則直接丟棄死信)
- Dead Letter Exchanges設(shè)置方法(官方文檔)
Java Client方式://聲明一個(gè)直連模式的exchangechannel.exchangeDeclare("some.exchange.name", "direct");//聲明一個(gè)隊(duì)列,當(dāng)myqueue隊(duì)列中有死信產(chǎn)生時(shí),會(huì)轉(zhuǎn)發(fā)到交換器some.exchange.nameMap<String, Object> args = new HashMap<String, Object>();args.put("x-dead-letter-exchange", "some.exchange.name");//如果設(shè)置死信會(huì)以路由鍵some-routing-key轉(zhuǎn)發(fā)到some.exchange.name,如果沒設(shè)默認(rèn)為消息發(fā)送到本隊(duì)列時(shí)用的routing key//args.put("x-dead-letter-routing-key", "some-routing-key");channel.queueDeclare("myqueue", false, false, false, args);命令行方式(.* 為所有隊(duì)列, 可以替換為指定隊(duì)列):設(shè)置 "dead-letter-exchange"rabbitmqctl:rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queuesrabbitmqctl (Windows):rabbitmqctl set_policy DLX ".*" "{""dead-letter-exchange"":""my-dlx""}" --apply-to queues設(shè)置 "dead-letter-routing-key"rabbitmqctl:rabbitmqctl set_policy DLX ".*" '{ "dead-letter-routing-key":"my-routing-key"}' --apply-to queuesrabbitmqctl (Windows):rabbitmqctl set_policy DLX ".*" "{""dead-letter-routing-key"":""my-routing-key""}" --apply-to queues
在rabbitmq 3.5.7及以上的版本提供了一個(gè)插件(rabbitmq-delayed-message-exchange)來實(shí)現(xiàn)延遲隊(duì)列功能。同時(shí)插件依賴Erlang/OPT 18.0及以上。
插件源碼地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
插件下載地址:
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange
進(jìn)入插件安裝目錄
{rabbitmq-server}/plugins/(可以查看一下當(dāng)前已存在的插件)
下載插件
rabbitmq_delayed_message_exchange
wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
(如果下載的文件名稱不規(guī)則就手動(dòng)重命名一下如:
rabbitmq_delayed_message_exchange-0.0.1.ez)
rabbitmq-plugins enable rabbitmq_delayed_message_exchange(關(guān)閉插件)rabbitmq-plugins disable rabbitmq_delayed_message_exchange
通過聲明一個(gè)x-delayed-message類型的exchange來使用delayed-messaging特性
x-delayed-message是插件提供的類型,并不是rabbitmq本身的
// ... elided code ...Map<String, Object> args = new HashMap<String, Object>();args.put("x-delayed-type", "direct");channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);// ... more code ...
發(fā)送消息的時(shí)候通過在header添加”x-delay”參數(shù)來控制消息的延時(shí)時(shí)間
// ... elided code ...byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");Map<String, Object> headers = new HashMap<String, Object>();headers.put("x-delay", 5000);AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);// ... more code ...
消息發(fā)送端:
import java.text.SimpleDateFormat;import java.util.Date;import java.util.HashMap;import java.util.Map;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class Send { // 隊(duì)列名稱 private final static String EXCHANGE_NAME="delay_exchange"; private final static String ROUTING_KEY="key_delay"; @SuppressWarnings("deprecation") public static void main(String[] argv) throws Exception { /** * 創(chuàng)建連接連接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.12.190"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); // 聲明x-delayed-type類型的exchange Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, args); Map<String, Object> headers = new HashMap<String, Object>(); //設(shè)置在2016/11/04,16:45:12向消費(fèi)端推送本條消息 Date now = new Date(); Date timeToPublish = new Date("2016/11/04,16:45:12"); String readyToPushContent = "publish at " + sf.format(now) + " \t deliver at " + sf.format(timeToPublish); headers.put("x-delay", timeToPublish.getTime() - now.getTime()); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder() .headers(headers); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, props.build(), readyToPushContent.getBytes()); // 關(guān)閉頻道和連接 channel.close(); connection.close(); }}
消息接收端:
import java.text.SimpleDateFormat;import java.util.Date;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;public class Recv { // 隊(duì)列名稱 private final static String QUEUE_NAME = "delay_queue"; private final static String EXCHANGE_NAME="delay_exchange"; public static void main(String[] argv) throws Exception, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.12.190"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.queueDeclare(QUEUE_NAME, true,false,false,null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicConsume(QUEUE_NAME, true, queueingConsumer); SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); try { System.out.println("****************WAIT***************"); while(true){ QueueingConsumer.Delivery delivery = queueingConsumer .nextDelivery(); // String message = (new String(delivery.getBody())); System.out.println("message:"+message); System.out.println("now:\t"+sf.format(new Date())); } } catch (Exception exception) { exception.printStackTrace(); } }}
啟動(dòng)接收端,啟動(dòng)發(fā)送端
運(yùn)行結(jié)果:
****************WAIT***************message:publish at 2016-11-04 16:44:16.887 deliver at 2016-11-04 16:45:12.000now: 2016-11-04 16:45:12.023
結(jié)果顯示在我們2016-11-04 16:45:12.023接收到了消息,距離我們?cè)O(shè)定的時(shí)間2016-11-04 16:45:12.023有23毫秒的延遲
Note:使用rabbitmq-delayed-message-exchange插件時(shí)發(fā)送到隊(duì)列的消息數(shù)量在web管理界面可能不可見,不影響正常功能使用
Note :使用過程中發(fā)現(xiàn),當(dāng)一臺(tái)啟用了rabbitmq-delayed-message-exchange插件的RAM節(jié)點(diǎn)在重啟的時(shí)候會(huì)無法啟動(dòng),查看日志發(fā)現(xiàn)了一個(gè)Timeout異常,開發(fā)者解釋說這是節(jié)點(diǎn)在啟動(dòng)過程會(huì)同步集群相關(guān)數(shù)據(jù)造成啟動(dòng)超時(shí),并建議不要使用Ram節(jié)點(diǎn)
插件開發(fā)者:
RAM nodes start blank and need a disk node to sync tables from. In this case it times out.
More importantly, you don’t need RAM nodes. If you’re not sure if you do, you certainly don’t, as don’t 99% of users.轉(zhuǎn)載請(qǐng)注明出處
ps: 文章里面延遲隊(duì)列=延時(shí)隊(duì)列
延遲隊(duì)列存儲(chǔ)的對(duì)象肯定是對(duì)應(yīng)的延時(shí)消息,所謂”延時(shí)消息”是指當(dāng)消息被發(fā)送以后,并不想讓消費(fèi)者立即拿到消息,而是等待指定時(shí)間后,消費(fèi)者才拿到這個(gè)消息進(jìn)行消費(fèi)。
場(chǎng)景一:在訂單系統(tǒng)中,一個(gè)用戶下單之后通常有30分鐘的時(shí)間進(jìn)行支付,如果30分鐘之內(nèi)沒有支付成功,那么這個(gè)訂單將進(jìn)行一場(chǎng)處理。這是就可以使用延時(shí)隊(duì)列將訂單信息發(fā)送到延時(shí)隊(duì)列。
場(chǎng)景二:用戶希望通過手機(jī)遠(yuǎn)程遙控家里的智能設(shè)備在指定的時(shí)間進(jìn)行工作。這時(shí)候就可以將用戶指令發(fā)送到延時(shí)隊(duì)列,當(dāng)指令設(shè)定的時(shí)間到了再將指令推送到只能設(shè)備。
AMQP協(xié)議和RabbitMQ隊(duì)列本身沒有直接支持延遲隊(duì)列功能,但是可以通過以下特性模擬出延遲隊(duì)列的功能。
但是我們可以通過RabbitMQ的兩個(gè)特性來曲線實(shí)現(xiàn)延遲隊(duì)列:
RabbitMQ可以針對(duì)Queue設(shè)置x-expires 或者 針對(duì)Message設(shè)置 x-message-ttl,來控制消息的生存時(shí)間,如果超時(shí)(兩者同時(shí)設(shè)置以最先到期的時(shí)間為準(zhǔn)),則消息變?yōu)閐ead letter(死信)
RabbitMQ針對(duì)隊(duì)列中的消息過期時(shí)間有兩種方法可以設(shè)置。
- A: 通過隊(duì)列屬性設(shè)置,隊(duì)列中所有消息都有相同的過期時(shí)間。
- B: 對(duì)消息進(jìn)行單獨(dú)設(shè)置,每條消息TTL可以不同。
如果同時(shí)使用,則消息的過期時(shí)間以兩者之間TTL較小的那個(gè)數(shù)值為準(zhǔn)。消息在隊(duì)列的生存時(shí)間一旦超過設(shè)置的TTL值,就成為dead letter
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個(gè)參數(shù),如果隊(duì)列內(nèi)出現(xiàn)了dead letter,則按照這兩個(gè)參數(shù)重新路由轉(zhuǎn)發(fā)到指定的隊(duì)列。
- x-dead-letter-exchange:出現(xiàn)dead letter之后將dead letter重新發(fā)送到指定exchange
- x-dead-letter-routing-key:出現(xiàn)dead letter之后將dead letter重新按照指定的routing-key發(fā)送
隊(duì)列出現(xiàn)dead letter的情況有:
消息或者隊(duì)列的TTL過期
隊(duì)列達(dá)到最大長度
消息被消費(fèi)端拒絕(basic.reject or basic.nack)并且requeue=false
綜合上述兩個(gè)特性,設(shè)置了TTL規(guī)則之后當(dāng)消息在一個(gè)隊(duì)列中變成死信時(shí),利用DLX特性它能被重新轉(zhuǎn)發(fā)到另一個(gè)Exchange或者Routing Key,這時(shí)候消息就可以重新被消費(fèi)了。
第一步:設(shè)置TTL產(chǎn)生死信,有兩種方式Per-Message TTL和 Queue TTL,第一種可以針對(duì)每一條消息設(shè)置一個(gè)過期時(shí)間使用于大多數(shù)場(chǎng)景,第二種針對(duì)隊(duì)列設(shè)置過期時(shí)間、適用于一次性延時(shí)任務(wù)的場(chǎng)景
還有其他產(chǎn)生死信的方式比如消費(fèi)者拒絕消費(fèi) basic.reject 或者 basic.nack ( 前提要設(shè)置消費(fèi)者的屬性requeue=false)
- Per-Message TTL (對(duì)每一條消息設(shè)置一個(gè)過期時(shí)間)(官方文檔)
java client發(fā)送一條只能駐留60秒的消息到隊(duì)列:
byte[] messageBodyBytes = "Hello, world!".getBytes();AMQP.BasicProperties properties = new AMQP.BasicProperties();properties.setExpiration("60000");//設(shè)置消息的過期時(shí)間為60秒channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);//這條消息發(fā)送到相應(yīng)的隊(duì)列之后,如果60秒內(nèi)沒有被消費(fèi),則變?yōu)樗佬?/code>- 1
- 2
- 3
- 4
- 5
創(chuàng)建一個(gè)隊(duì)列,隊(duì)列的消息過期時(shí)間為30分鐘(這個(gè)隊(duì)列30分鐘內(nèi)沒有消費(fèi)者消費(fèi)消息則刪除,刪除后隊(duì)列內(nèi)的消息變?yōu)樗佬?
java client方式:Map<String, Object> args = new HashMap<String, Object>();args.put("x-expires", 1800000);channel.queueDeclare("myqueue", false, false, false, args);rabbitmqctl命令方式(.* 為所有隊(duì)列, 可以替換為指定隊(duì)列):rabbitmqctl set_policy expiry ".*" '{"expires":1800000}' --apply-to queuesrabbitmqctl (Windows):rabbitmqctl set_policy expiry ".*" "{""expires"":1800000}" --apply-to queues
第二步:設(shè)置死信的轉(zhuǎn)發(fā)規(guī)則(如果沒有任何規(guī)則,則直接丟棄死信)
- Dead Letter Exchanges設(shè)置方法(官方文檔)
Java Client方式://聲明一個(gè)直連模式的exchangechannel.exchangeDeclare("some.exchange.name", "direct");//聲明一個(gè)隊(duì)列,當(dāng)myqueue隊(duì)列中有死信產(chǎn)生時(shí),會(huì)轉(zhuǎn)發(fā)到交換器some.exchange.nameMap<String, Object> args = new HashMap<String, Object>();args.put("x-dead-letter-exchange", "some.exchange.name");//如果設(shè)置死信會(huì)以路由鍵some-routing-key轉(zhuǎn)發(fā)到some.exchange.name,如果沒設(shè)默認(rèn)為消息發(fā)送到本隊(duì)列時(shí)用的routing key//args.put("x-dead-letter-routing-key", "some-routing-key");channel.queueDeclare("myqueue", false, false, false, args);命令行方式(.* 為所有隊(duì)列, 可以替換為指定隊(duì)列):設(shè)置 "dead-letter-exchange"rabbitmqctl:rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queuesrabbitmqctl (Windows):rabbitmqctl set_policy DLX ".*" "{""dead-letter-exchange"":""my-dlx""}" --apply-to queues設(shè)置 "dead-letter-routing-key"rabbitmqctl:rabbitmqctl set_policy DLX ".*" '{ "dead-letter-routing-key":"my-routing-key"}' --apply-to queuesrabbitmqctl (Windows):rabbitmqctl set_policy DLX ".*" "{""dead-letter-routing-key"":""my-routing-key""}" --apply-to queues
在rabbitmq 3.5.7及以上的版本提供了一個(gè)插件(rabbitmq-delayed-message-exchange)來實(shí)現(xiàn)延遲隊(duì)列功能。同時(shí)插件依賴Erlang/OPT 18.0及以上。
插件源碼地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
插件下載地址:
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange
進(jìn)入插件安裝目錄
{rabbitmq-server}/plugins/(可以查看一下當(dāng)前已存在的插件)
下載插件
rabbitmq_delayed_message_exchange
wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
(如果下載的文件名稱不規(guī)則就手動(dòng)重命名一下如:
rabbitmq_delayed_message_exchange-0.0.1.ez)
rabbitmq-plugins enable rabbitmq_delayed_message_exchange(關(guān)閉插件)rabbitmq-plugins disable rabbitmq_delayed_message_exchange
通過聲明一個(gè)x-delayed-message類型的exchange來使用delayed-messaging特性
x-delayed-message是插件提供的類型,并不是rabbitmq本身的
// ... elided code ...Map<String, Object> args = new HashMap<String, Object>();args.put("x-delayed-type", "direct");channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);// ... more code ...
發(fā)送消息的時(shí)候通過在header添加”x-delay”參數(shù)來控制消息的延時(shí)時(shí)間
// ... elided code ...byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");Map<String, Object> headers = new HashMap<String, Object>();headers.put("x-delay", 5000);AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);// ... more code ...
消息發(fā)送端:
import java.text.SimpleDateFormat;import java.util.Date;import java.util.HashMap;import java.util.Map;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class Send { // 隊(duì)列名稱 private final static String EXCHANGE_NAME="delay_exchange"; private final static String ROUTING_KEY="key_delay"; @SuppressWarnings("deprecation") public static void main(String[] argv) throws Exception { /** * 創(chuàng)建連接連接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.12.190"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); // 聲明x-delayed-type類型的exchange Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, args); Map<String, Object> headers = new HashMap<String, Object>(); //設(shè)置在2016/11/04,16:45:12向消費(fèi)端推送本條消息 Date now = new Date(); Date timeToPublish = new Date("2016/11/04,16:45:12"); String readyToPushContent = "publish at " + sf.format(now) + " \t deliver at " + sf.format(timeToPublish); headers.put("x-delay", timeToPublish.getTime() - now.getTime()); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder() .headers(headers); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, props.build(), readyToPushContent.getBytes()); // 關(guān)閉頻道和連接 channel.close(); connection.close(); }}
消息接收端:
import java.text.SimpleDateFormat;import java.util.Date;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;public class Recv { // 隊(duì)列名稱 private final static String QUEUE_NAME = "delay_queue"; private final static String EXCHANGE_NAME="delay_exchange"; public static void main(String[] argv) throws Exception, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.12.190"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.queueDeclare(QUEUE_NAME, true,false,false,null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicConsume(QUEUE_NAME, true, queueingConsumer); SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); try { System.out.println("****************WAIT***************"); while(true){ QueueingConsumer.Delivery delivery = queueingConsumer .nextDelivery(); // String message = (new String(delivery.getBody())); System.out.println("message:"+message); System.out.println("now:\t"+sf.format(new Date())); } } catch (Exception exception) { exception.printStackTrace(); } }}
啟動(dòng)接收端,啟動(dòng)發(fā)送端
運(yùn)行結(jié)果:
****************WAIT***************message:publish at 2016-11-04 16:44:16.887 deliver at 2016-11-04 16:45:12.000now: 2016-11-04 16:45:12.023
結(jié)果顯示在我們2016-11-04 16:45:12.023接收到了消息,距離我們?cè)O(shè)定的時(shí)間2016-11-04 16:45:12.023有23毫秒的延遲
Note:使用rabbitmq-delayed-message-exchange插件時(shí)發(fā)送到隊(duì)列的消息數(shù)量在web管理界面可能不可見,不影響正常功能使用
Note :使用過程中發(fā)現(xiàn),當(dāng)一臺(tái)啟用了rabbitmq-delayed-message-exchange插件的RAM節(jié)點(diǎn)在重啟的時(shí)候會(huì)無法啟動(dòng),查看日志發(fā)現(xiàn)了一個(gè)Timeout異常,開發(fā)者解釋說這是節(jié)點(diǎn)在啟動(dòng)過程會(huì)同步集群相關(guān)數(shù)據(jù)造成啟動(dòng)超時(shí),并建議不要使用Ram節(jié)點(diǎn)
插件開發(fā)者:
RAM nodes start blank and need a disk node to sync tables from. In this case it times out.
More importantly, you don’t need RAM nodes. If you’re not sure if you do, you certainly don’t, as don’t 99% of users.
聯(lián)系客服