討論完了消息生產(chǎn)者的關(guān)鍵性能點(diǎn),我們再將目光轉(zhuǎn)向消息消費(fèi)者(接收者端);就像本小節(jié)開始時(shí)描述的那樣,比起消息生產(chǎn)者來說消息消費(fèi)者的性能更能影響ActiveMQ系統(tǒng)的整體性能,因?yàn)橐晒ν瓿梢粭l消息的處理,它的工作要遠(yuǎn)遠(yuǎn)多于消息生產(chǎn)者。
首先,在默認(rèn)情況下ActiveMQ服務(wù)端采用異步方式向客戶端推送消息。也就是說ActiveMQ服務(wù)端在向某個(gè)消費(fèi)者會(huì)話推送消息后,不會(huì)等待消費(fèi)者的響應(yīng)信息,直到消費(fèi)者處理完消息后,主動(dòng)向服務(wù)端返回處理結(jié)果。如果您對自己的消費(fèi)者性能足夠滿意,也可以將這個(gè)過程設(shè)置為“同步”:
......// 設(shè)置為同步connectionFactory.setDispatchAsync(false);......
消費(fèi)者關(guān)鍵策略中,需要重點(diǎn)討論的是消費(fèi)者“預(yù)取數(shù)量”——prefetchSize。可以想象,如果消費(fèi)者端的工作策略是按照某個(gè)周期(例如1秒),主動(dòng)到服務(wù)器端一條一條請求新的消息,那么消費(fèi)者的工作效率一定是極低的;所以ActiveMQ系統(tǒng)中,默認(rèn)的策略是ActiveMQ服務(wù)端一旦有消息,就主動(dòng)按照設(shè)置的規(guī)則推送給當(dāng)前活動(dòng)的消費(fèi)者。其中每次推送都有一定的數(shù)量限制,這個(gè)限制值就是prefetchSize。
針對Queue工作模型的隊(duì)列和Topic工作模型的隊(duì)列,ActiveMQ有不同的默認(rèn)“預(yù)取數(shù)量”;針對NON_PERSISTENT Message和PERSISTENT Message,ActiveMQ也有不同的默認(rèn)“預(yù)取數(shù)量”:
ActiveMQ中設(shè)置的各種默認(rèn)預(yù)取數(shù)量一般情況下不需要進(jìn)行改變。如果您使用默認(rèn)的異步方式從服務(wù)器端推送消息到消費(fèi)者端,且您對消費(fèi)者端的性能有足夠的信心,可以加大預(yù)取數(shù)量的限制。但是非必要情況下,請不要設(shè)置prefetchSize=1,因?yàn)檫@樣就是一條一條的取數(shù)據(jù);也不要設(shè)置為prefetchSize=0,因?yàn)檫@將導(dǎo)致關(guān)閉服務(wù)器端的推送機(jī)制,改為客戶端主動(dòng)請求。
......// 預(yù)取策略對象ActiveMQPrefetchPolicy prefetchPolicy = connectionFactory.getPrefetchPolicy();// 設(shè)置Queue的預(yù)取數(shù)量為50prefetchPolicy.setQueuePrefetch(50);connectionFactory.setPrefetchPolicy(prefetchPolicy);//進(jìn)行連接connection = connectionFactory.createQueueConnection();connection.start();......
......Properties props = new Properties();props.setProperty("prefetchPolicy.queuePrefetch", "1000");props.setProperty("prefetchPolicy.topicPrefetch", "1000");//設(shè)置屬性connectionFactory.setProperties(props);//進(jìn)行連接connection = connectionFactory.createQueueConnection();connection.start();......
JMS規(guī)范除了為消息生產(chǎn)者端提供事務(wù)支持以外,還為消費(fèi)服務(wù)端準(zhǔn)備了事務(wù)的支持。您可以通過在消費(fèi)者端操作事務(wù)的commit和rollback方法,向服務(wù)器告知一組消息是否處理完成。采用事務(wù)的意義在于,一組消息要么被全部處理并確認(rèn)成功,要么全部被回滾并重新處理。
......//建立會(huì)話(采用commit方式確認(rèn)一批消息處理完畢)session = connection.createSession(true, Session.SESSION_TRANSACTED);//建立Queue(當(dāng)然如果有了就不會(huì)重復(fù)建立)sendQueue = session.createQueue("/test");//建立消息發(fā)送者對象MessageConsumer consumer = session.createConsumer(sendQueue);consumer.setMessageListener(new MyMessageListener(session));......class MyMessageListener implements MessageListener { private int number = 0; /** * 會(huì)話 */ private Session session; public MyMessageListener(Session session) { this.session = session; } @Override public void onMessage(Message message) { // 打印這條消息 System.out.println("Message = " + message); // 如果條件成立,就向服務(wù)器確認(rèn)這批消息處理成功 // 服務(wù)器將從隊(duì)列中刪除這些消息 if(number++ % 3 == 0) { try { this.session.commit(); } catch (JMSException e) { e.printStackTrace(System.out); } } }}
以上代碼演示的是消費(fèi)者通過事務(wù)commit的方式,向服務(wù)器確認(rèn)一批消息正常處理完成的方式。請注意代碼示例中的“session = connection.createSession(true, Session.SESSION_TRANSACTED);”語句。第一個(gè)參數(shù)表示連接會(huì)話啟用事務(wù)支持;第二個(gè)參數(shù)表示使用commit或者rollback的方式進(jìn)行向服務(wù)器應(yīng)答。
這是調(diào)用commit的情況,那么如果調(diào)用rollback方法又會(huì)發(fā)生什么情況呢?調(diào)用rollback方法時(shí),在rollback之前已處理過的消息(注意,并不是所有預(yù)取的消息)將重新發(fā)送一次到消費(fèi)者端(發(fā)送給同一個(gè)連接會(huì)話)。并且消息中redeliveryCounter(重發(fā)計(jì)數(shù)器)屬性將會(huì)加1。請看如下所示的代碼片段和運(yùn)行結(jié)果:
@Overridepublic void onMessage(Message message) { // 打印這條消息 System.out.println("Message = " + message); // rollback這條消息 this.session.rollback();}
以上代碼片段中,我們不停的回滾正在處理的這條消息,通過打印出來的信息可以看到,這條消息被不停的重發(fā):
Message = ActiveMQTextMessage {...... redeliveryCounter = 0, text = 這是發(fā)送的消息內(nèi)容-------------------20}Message = ActiveMQTextMessage {...... redeliveryCounter = 1, text = 這是發(fā)送的消息內(nèi)容-------------------20}Message = ActiveMQTextMessage {...... redeliveryCounter = 2, text = 這是發(fā)送的消息內(nèi)容-------------------20}Message = ActiveMQTextMessage {...... redeliveryCounter = 3, text = 這是發(fā)送的消息內(nèi)容-------------------20}Message = ActiveMQTextMessage {...... redeliveryCounter = 4, text = 這是發(fā)送的消息內(nèi)容-------------------20}
可以看到同一條記錄被重復(fù)的處理,并且其中的redeliveryCounter屬性不斷累加。
但是消息處理失敗后,不斷的重發(fā)消息肯定不是一個(gè)最好的處理辦法:如果一條消息被不斷的處理失敗,那么最可能的情況就是這條消息承載的業(yè)務(wù)內(nèi)容本身就有問題。那么無論重發(fā)多少次,這條消息還是會(huì)處理失敗。
為了解決這個(gè)問題,ActiveMQ中引入了“死信隊(duì)列”(Dead Letter Queue)的概念。即一條消息再被重發(fā)了多次后(默認(rèn)為重發(fā)6次redeliveryCounter==6),將會(huì)被ActiveMQ移入“死信隊(duì)列”。開發(fā)人員可以在這個(gè)Queue中查看處理出錯(cuò)的消息,進(jìn)行人工干預(yù)。
默認(rèn)情況下“死信隊(duì)列”只接受PERSISTENT Message,如果NON_PERSISTENT Message超過了重發(fā)上限,將直接被刪除。以下配置信息可以讓NON_PERSISTENT Message在超過重發(fā)上限后,也移入“死信隊(duì)列”:
<policyEntry queue=">"> <deadLetterStrategy> <sharedDeadLetterStrategy processNonPersistent="true" /> </deadLetterStrategy> </policyEntry>
另外,上文提到的默認(rèn)重發(fā)次數(shù)redeliveryCounter的上限也是可以進(jìn)行設(shè)置的,為了保證消息異常情況下盡可能小的影響消費(fèi)者端的處理效率,實(shí)際工作中建議將這個(gè)上限值設(shè)置為3。原因上文已經(jīng)說過,如果消息本身的業(yè)務(wù)內(nèi)容就存在問題,那么重發(fā)多少次也沒有用。
RedeliveryPolicy redeliveryPolicy = connectionFactory.getRedeliveryPolicy();// 設(shè)置最大重發(fā)次數(shù)redeliveryPolicy.setMaximumRedeliveries(3);
實(shí)際上ActiveMQ的重發(fā)機(jī)制還有包括以上提到的rollback方式在內(nèi)的多種方式:
但是以上幾種重發(fā)機(jī)制有一些小小的差異,主要體現(xiàn)在redeliveryCounter屬性的作用區(qū)域。簡而言之,第一種方法redeliveryCounter屬性的作用區(qū)域是本次連接會(huì)話,而后兩種redeliveryCounter屬性的作用區(qū)域是在整個(gè)ActiveMQ系統(tǒng)范圍。
消費(fèi)者端,除了可以使用事務(wù)方式來告知ActiveMQ服務(wù)端一批消息已經(jīng)成功處理外,還可以通過JMS規(guī)范中定義的acknowledge模式來實(shí)現(xiàn)同樣功能。事實(shí)上acknowledge模式更為常用。
如果選擇使用acknowledge模式,那么你至少有4種方式使用它,且這四種方式的性能區(qū)別很大:
AUTO_ACKNOWLEDGE方式:這種方式下,當(dāng)消費(fèi)者端通過receive方法或者M(jìn)essageListener監(jiān)聽方式從服務(wù)端得到消息后(無論是pul方式還是push方式),消費(fèi)者連接會(huì)話會(huì)自動(dòng)認(rèn)為消費(fèi)者端對消息的處理是成功的。但請注意,這種方式下消費(fèi)者端不一定是向服務(wù)端一條一條ACK消息;
CLIENT_ACKNOWLEDGE方式:這種方式下,當(dāng)消費(fèi)者端通過receive方法或者M(jìn)essageListener監(jiān)聽方式從服務(wù)端得到消息后(無論是pul方式還是push方式),必須顯示調(diào)用消息中的acknowledge方法。如果不這樣做,ActiveMQ服務(wù)器端將不會(huì)認(rèn)為這條消息處理成功:
public void onMessage(Message message) { //==================== //這里進(jìn)行您的業(yè)務(wù)處理 //==================== try { // 顯示調(diào)用ack方法 message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); }}
DUPS_OK_ACKNOWLEDGE方式:批量確認(rèn)方式。消費(fèi)者端會(huì)按照一定的策略向服務(wù)器端間隔發(fā)送一個(gè)ack標(biāo)示,表示某一批消息已經(jīng)處理完成。DUPS_OK_ACKNOWLEDGE方式和 AUTO_ACKNOWLEDGE方式在某些情況下是一致的,這個(gè)在后文會(huì)講到;
INDIVIDUAL_ACKNOWLEDGE方式:單條確認(rèn)方式。這種方式是ActiveMQ單獨(dú)提供的一種方式,其常量定義的位置都不在javax.jms.Session規(guī)范接口中,而是在org.apache.activemq.ActiveMQSession這個(gè)類中。這種方式消費(fèi)者端將會(huì)逐條向ActiveMQ服務(wù)端發(fā)送ACK信息。所以這種ACK方式的性能很差,除非您有特別的業(yè)務(wù)要求,否則不建議使用。
筆者建議首先考慮使用AUTO_ACKNOWLEDGE方式確認(rèn)消息,如果您這樣做,那么一定請使用optimizeACK優(yōu)化選項(xiàng),并且重新設(shè)置prefetchSize數(shù)量為一個(gè)較小值(因?yàn)?000條的默認(rèn)值在這樣的情況下就顯得比較大了):
......//ack優(yōu)化選項(xiàng)(實(shí)際上默認(rèn)情況下是開啟的)connectionFactory.setOptimizeAcknowledge(true);//ack信息最大發(fā)送周期(毫秒)connectionFactory.setOptimizeAcknowledgeTimeOut(5000);connection = connectionFactory.createQueueConnection();connection.start();......
AUTO_ACKNOWLEDGE方式的根本意義是“延遲確認(rèn)”,消費(fèi)者端在處理消息后暫時(shí)不會(huì)發(fā)送ACK標(biāo)示,而是把它緩存在連接會(huì)話的一個(gè)pending 區(qū)域,等到這些消息的條數(shù)達(dá)到一定的值(或者等待時(shí)間超過設(shè)置的值),再通過一個(gè)ACK指令告知服務(wù)端這一批消息已經(jīng)處理完成;而optimizeACK選項(xiàng)(指明AUTO_ACKNOWLEDGE采用“延遲確認(rèn)”方式)只有當(dāng)消費(fèi)者端使用AUTO_ACKNOWLEDGE方式時(shí)才會(huì)起效:
“延遲確認(rèn)”的數(shù)量閥值:prefetch * 0.65
“延遲確認(rèn)”的時(shí)間閥值:> optimizeAcknowledgeTimeOut
DUPS_OK_ACKNOWLEDGE方式也是一種“延遲確認(rèn)”策略,如果目標(biāo)隊(duì)列是Queue模式,那么它的工作策略與AUTO_ACKNOWLEDGE方式是一樣的。也就是說,如果這時(shí)prefetchSize =1 或者沒有開啟optimizeACK,也會(huì)逐條消息發(fā)送ACK標(biāo)示;如果目標(biāo)隊(duì)列是Topic模式,那么無論optimizeACK是否開啟,都會(huì)在消費(fèi)的消息個(gè)數(shù)>=prefetch * 0.5時(shí),批量確認(rèn)這些消息。