国产一级a片免费看高清,亚洲熟女中文字幕在线视频,黄三级高清在线播放,免费黄色视频在线看

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
ActiveMQ系列—ActiveMQ性能優(yōu)化(中2)(處理規(guī)則和優(yōu)化)

4、消費(fèi)者策略:Dispatch Async

討論完了消息生產(chǎn)者的關(guān)鍵性能點(diǎn),我們再將目光轉(zhuǎn)向消息消費(fèi)者(接收者端);就像本小節(jié)開始時(shí)描述的那樣,比起消息生產(chǎn)者來說消息消費(fèi)者的性能更能影響ActiveMQ系統(tǒng)的整體性能,因?yàn)橐晒ν瓿梢粭l消息的處理,它的工作要遠(yuǎn)遠(yuǎn)多于消息生產(chǎn)者。

首先,在默認(rèn)情況下ActiveMQ服務(wù)端采用異步方式向客戶端推送消息。也就是說ActiveMQ服務(wù)端在向某個(gè)消費(fèi)者會(huì)話推送消息后,不會(huì)等待消費(fèi)者的響應(yīng)信息,直到消費(fèi)者處理完消息后,主動(dòng)向服務(wù)端返回處理結(jié)果。如果您對自己的消費(fèi)者性能足夠滿意,也可以將這個(gè)過程設(shè)置為“同步”:

......// 設(shè)置為同步connectionFactory.setDispatchAsync(false);......
  • 1
  • 2
  • 3
  • 4

5、消費(fèi)者策略:Prefetch

消費(fèi)者關(guān)鍵策略中,需要重點(diǎn)討論的是消費(fèi)者“預(yù)取數(shù)量”——prefetchSize。可以想象,如果消費(fèi)者端的工作策略是按照某個(gè)周期(例如1秒),主動(dòng)到服務(wù)器端一條一條請求新的消息,那么消費(fèi)者的工作效率一定是極低的;所以ActiveMQ系統(tǒng)中,默認(rèn)的策略是ActiveMQ服務(wù)端一旦有消息,就主動(dòng)按照設(shè)置的規(guī)則推送給當(dāng)前活動(dòng)的消費(fèi)者。其中每次推送都有一定的數(shù)量限制,這個(gè)限制值就是prefetchSize。

針對Queue工作模型的隊(duì)列和Topic工作模型的隊(duì)列,ActiveMQ有不同的默認(rèn)“預(yù)取數(shù)量”;針對NON_PERSISTENT Message和PERSISTENT Message,ActiveMQ也有不同的默認(rèn)“預(yù)取數(shù)量”:

  • PERSISTENT Message—Queue:prefetchSize=1000
  • NON_PERSISTENT Message—Queue:prefetchSize=1000
  • PERSISTENT Message—Topic:prefetchSize=100
  • NON_PERSISTENT Message—Topic:prefetchSize=32766

ActiveMQ中設(shè)置的各種默認(rèn)預(yù)取數(shù)量一般情況下不需要進(jìn)行改變。如果您使用默認(rèn)的異步方式從服務(wù)器端推送消息到消費(fèi)者端,且您對消費(fèi)者端的性能有足夠的信心,可以加大預(yù)取數(shù)量的限制。但是非必要情況下,請不要設(shè)置prefetchSize=1,因?yàn)檫@樣就是一條一條的取數(shù)據(jù);也不要設(shè)置為prefetchSize=0,因?yàn)檫@將導(dǎo)致關(guān)閉服務(wù)器端的推送機(jī)制,改為客戶端主動(dòng)請求。

  • 可以通過ActiveMQPrefetchPolicy策略對象更改預(yù)取數(shù)量
......// 預(yù)取策略對象ActiveMQPrefetchPolicy prefetchPolicy = connectionFactory.getPrefetchPolicy();// 設(shè)置Queue的預(yù)取數(shù)量為50prefetchPolicy.setQueuePrefetch(50);connectionFactory.setPrefetchPolicy(prefetchPolicy);//進(jìn)行連接connection = connectionFactory.createQueueConnection();connection.start();......
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 也可以通過Properties屬性更改(當(dāng)然還可以加入其他屬性)預(yù)取數(shù)量:
......Properties props = new Properties();props.setProperty("prefetchPolicy.queuePrefetch", "1000");props.setProperty("prefetchPolicy.topicPrefetch", "1000");//設(shè)置屬性connectionFactory.setProperties(props);//進(jìn)行連接connection = connectionFactory.createQueueConnection();connection.start();......
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

6、消費(fèi)者策略:事務(wù)和死信

6.1、消費(fèi)者端事務(wù)

JMS規(guī)范除了為消息生產(chǎn)者端提供事務(wù)支持以外,還為消費(fèi)服務(wù)端準(zhǔn)備了事務(wù)的支持。您可以通過在消費(fèi)者端操作事務(wù)的commit和rollback方法,向服務(wù)器告知一組消息是否處理完成。采用事務(wù)的意義在于,一組消息要么被全部處理并確認(rèn)成功,要么全部被回滾并重新處理。

......//建立會(huì)話(采用commit方式確認(rèn)一批消息處理完畢)session = connection.createSession(true, Session.SESSION_TRANSACTED);//建立Queue(當(dāng)然如果有了就不會(huì)重復(fù)建立)sendQueue = session.createQueue("/test");//建立消息發(fā)送者對象MessageConsumer consumer = session.createConsumer(sendQueue);consumer.setMessageListener(new MyMessageListener(session));......class MyMessageListener implements MessageListener {    private int number = 0;    /**     * 會(huì)話     */    private Session session;    public MyMessageListener(Session session) {        this.session = session;    }    @Override    public void onMessage(Message message) {        // 打印這條消息        System.out.println("Message = " + message);        // 如果條件成立,就向服務(wù)器確認(rèn)這批消息處理成功        // 服務(wù)器將從隊(duì)列中刪除這些消息        if(number++ % 3 == 0) {            try {                this.session.commit();            } catch (JMSException e) {                e.printStackTrace(System.out);            }        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

以上代碼演示的是消費(fèi)者通過事務(wù)commit的方式,向服務(wù)器確認(rèn)一批消息正常處理完成的方式。請注意代碼示例中的“session = connection.createSession(true, Session.SESSION_TRANSACTED);”語句。第一個(gè)參數(shù)表示連接會(huì)話啟用事務(wù)支持;第二個(gè)參數(shù)表示使用commit或者rollback的方式進(jìn)行向服務(wù)器應(yīng)答。

這是調(diào)用commit的情況,那么如果調(diào)用rollback方法又會(huì)發(fā)生什么情況呢?調(diào)用rollback方法時(shí),在rollback之前已處理過的消息(注意,并不是所有預(yù)取的消息)將重新發(fā)送一次到消費(fèi)者端(發(fā)送給同一個(gè)連接會(huì)話)。并且消息中redeliveryCounter(重發(fā)計(jì)數(shù)器)屬性將會(huì)加1。請看如下所示的代碼片段和運(yùn)行結(jié)果:

@Overridepublic void onMessage(Message message) {    // 打印這條消息    System.out.println("Message = " + message);    // rollback這條消息    this.session.rollback();}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

以上代碼片段中,我們不停的回滾正在處理的這條消息,通過打印出來的信息可以看到,這條消息被不停的重發(fā):

Message = ActiveMQTextMessage {...... redeliveryCounter = 0, text = 這是發(fā)送的消息內(nèi)容-------------------20}Message = ActiveMQTextMessage {...... redeliveryCounter = 1, text = 這是發(fā)送的消息內(nèi)容-------------------20}Message = ActiveMQTextMessage {...... redeliveryCounter = 2, text = 這是發(fā)送的消息內(nèi)容-------------------20}Message = ActiveMQTextMessage {...... redeliveryCounter = 3, text = 這是發(fā)送的消息內(nèi)容-------------------20}Message = ActiveMQTextMessage {...... redeliveryCounter = 4, text = 這是發(fā)送的消息內(nèi)容-------------------20}
  • 1
  • 2
  • 3
  • 4
  • 5

可以看到同一條記錄被重復(fù)的處理,并且其中的redeliveryCounter屬性不斷累加。

6.2、重發(fā)和死信隊(duì)列

但是消息處理失敗后,不斷的重發(fā)消息肯定不是一個(gè)最好的處理辦法:如果一條消息被不斷的處理失敗,那么最可能的情況就是這條消息承載的業(yè)務(wù)內(nèi)容本身就有問題。那么無論重發(fā)多少次,這條消息還是會(huì)處理失敗。

為了解決這個(gè)問題,ActiveMQ中引入了“死信隊(duì)列”(Dead Letter Queue)的概念。即一條消息再被重發(fā)了多次后(默認(rèn)為重發(fā)6次redeliveryCounter==6),將會(huì)被ActiveMQ移入“死信隊(duì)列”。開發(fā)人員可以在這個(gè)Queue中查看處理出錯(cuò)的消息,進(jìn)行人工干預(yù)。

默認(rèn)情況下“死信隊(duì)列”只接受PERSISTENT Message,如果NON_PERSISTENT Message超過了重發(fā)上限,將直接被刪除。以下配置信息可以讓NON_PERSISTENT Message在超過重發(fā)上限后,也移入“死信隊(duì)列”:

<policyEntry queue=">">      <deadLetterStrategy>          <sharedDeadLetterStrategy processNonPersistent="true" />      </deadLetterStrategy>  </policyEntry>
  • 1
  • 2
  • 3
  • 4
  • 5

另外,上文提到的默認(rèn)重發(fā)次數(shù)redeliveryCounter的上限也是可以進(jìn)行設(shè)置的,為了保證消息異常情況下盡可能小的影響消費(fèi)者端的處理效率,實(shí)際工作中建議將這個(gè)上限值設(shè)置為3。原因上文已經(jīng)說過,如果消息本身的業(yè)務(wù)內(nèi)容就存在問題,那么重發(fā)多少次也沒有用。

RedeliveryPolicy redeliveryPolicy = connectionFactory.getRedeliveryPolicy();// 設(shè)置最大重發(fā)次數(shù)redeliveryPolicy.setMaximumRedeliveries(3);
  • 1
  • 2
  • 3

實(shí)際上ActiveMQ的重發(fā)機(jī)制還有包括以上提到的rollback方式在內(nèi)的多種方式:

  • 在支持事務(wù)的消費(fèi)者連接會(huì)話中調(diào)用rollback方法;
  • 在支持事務(wù)的消費(fèi)者連接會(huì)話中,使用commit方法明確告知服務(wù)器端消息已處理成功前,會(huì)話連接就終止了(最可能是異常終止);
  • 在需要使用ACK模式的會(huì)話中,使用消息的acknowledge方式明確告知服務(wù)器端消息已處理成功前,會(huì)話連接就終止了(最可能是異常終止)。

但是以上幾種重發(fā)機(jī)制有一些小小的差異,主要體現(xiàn)在redeliveryCounter屬性的作用區(qū)域。簡而言之,第一種方法redeliveryCounter屬性的作用區(qū)域是本次連接會(huì)話,而后兩種redeliveryCounter屬性的作用區(qū)域是在整個(gè)ActiveMQ系統(tǒng)范圍。

7、消費(fèi)者策略:ACK

消費(fèi)者端,除了可以使用事務(wù)方式來告知ActiveMQ服務(wù)端一批消息已經(jīng)成功處理外,還可以通過JMS規(guī)范中定義的acknowledge模式來實(shí)現(xiàn)同樣功能。事實(shí)上acknowledge模式更為常用。

7.1、基本使用

如果選擇使用acknowledge模式,那么你至少有4種方式使用它,且這四種方式的性能區(qū)別很大:

  • AUTO_ACKNOWLEDGE方式:這種方式下,當(dāng)消費(fèi)者端通過receive方法或者M(jìn)essageListener監(jiān)聽方式從服務(wù)端得到消息后(無論是pul方式還是push方式),消費(fèi)者連接會(huì)話會(huì)自動(dòng)認(rèn)為消費(fèi)者端對消息的處理是成功的。但請注意,這種方式下消費(fèi)者端不一定是向服務(wù)端一條一條ACK消息;

  • CLIENT_ACKNOWLEDGE方式:這種方式下,當(dāng)消費(fèi)者端通過receive方法或者M(jìn)essageListener監(jiān)聽方式從服務(wù)端得到消息后(無論是pul方式還是push方式),必須顯示調(diào)用消息中的acknowledge方法。如果不這樣做,ActiveMQ服務(wù)器端將不會(huì)認(rèn)為這條消息處理成功:

public void onMessage(Message message) {    //====================    //這里進(jìn)行您的業(yè)務(wù)處理    //====================    try {        // 顯示調(diào)用ack方法        message.acknowledge();    } catch (JMSException e) {        e.printStackTrace();    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • DUPS_OK_ACKNOWLEDGE方式:批量確認(rèn)方式。消費(fèi)者端會(huì)按照一定的策略向服務(wù)器端間隔發(fā)送一個(gè)ack標(biāo)示,表示某一批消息已經(jīng)處理完成。DUPS_OK_ACKNOWLEDGE方式和 AUTO_ACKNOWLEDGE方式在某些情況下是一致的,這個(gè)在后文會(huì)講到;

  • INDIVIDUAL_ACKNOWLEDGE方式:單條確認(rèn)方式。這種方式是ActiveMQ單獨(dú)提供的一種方式,其常量定義的位置都不在javax.jms.Session規(guī)范接口中,而是在org.apache.activemq.ActiveMQSession這個(gè)類中。這種方式消費(fèi)者端將會(huì)逐條向ActiveMQ服務(wù)端發(fā)送ACK信息。所以這種ACK方式的性能很差,除非您有特別的業(yè)務(wù)要求,否則不建議使用。

7.2、工作方式和性能

筆者建議首先考慮使用AUTO_ACKNOWLEDGE方式確認(rèn)消息,如果您這樣做,那么一定請使用optimizeACK優(yōu)化選項(xiàng),并且重新設(shè)置prefetchSize數(shù)量為一個(gè)較小值(因?yàn)?000條的默認(rèn)值在這樣的情況下就顯得比較大了):

......//ack優(yōu)化選項(xiàng)(實(shí)際上默認(rèn)情況下是開啟的)connectionFactory.setOptimizeAcknowledge(true);//ack信息最大發(fā)送周期(毫秒)connectionFactory.setOptimizeAcknowledgeTimeOut(5000);connection = connectionFactory.createQueueConnection();connection.start();......
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

AUTO_ACKNOWLEDGE方式的根本意義是“延遲確認(rèn)”,消費(fèi)者端在處理消息后暫時(shí)不會(huì)發(fā)送ACK標(biāo)示,而是把它緩存在連接會(huì)話的一個(gè)pending 區(qū)域,等到這些消息的條數(shù)達(dá)到一定的值(或者等待時(shí)間超過設(shè)置的值),再通過一個(gè)ACK指令告知服務(wù)端這一批消息已經(jīng)處理完成;而optimizeACK選項(xiàng)(指明AUTO_ACKNOWLEDGE采用“延遲確認(rèn)”方式)只有當(dāng)消費(fèi)者端使用AUTO_ACKNOWLEDGE方式時(shí)才會(huì)起效:

“延遲確認(rèn)”的數(shù)量閥值:prefetch * 0.65
“延遲確認(rèn)”的時(shí)間閥值:> optimizeAcknowledgeTimeOut

DUPS_OK_ACKNOWLEDGE方式也是一種“延遲確認(rèn)”策略,如果目標(biāo)隊(duì)列是Queue模式,那么它的工作策略與AUTO_ACKNOWLEDGE方式是一樣的。也就是說,如果這時(shí)prefetchSize =1 或者沒有開啟optimizeACK,也會(huì)逐條消息發(fā)送ACK標(biāo)示;如果目標(biāo)隊(duì)列是Topic模式,那么無論optimizeACK是否開啟,都會(huì)在消費(fèi)的消息個(gè)數(shù)>=prefetch * 0.5時(shí),批量確認(rèn)這些消息。

本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊舉報(bào)。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
Activemq消息確認(rèn)機(jī)制
Java消息服務(wù)-JMS 確認(rèn)和事務(wù)【面試+工作】
ActiveMQ分享(一)JMS簡介
優(yōu)化ActiveMQ性能
ActiveMQ入門篇
用ActiveMQ遇到的消息確認(rèn)問題
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服