国产一级a片免费看高清,亚洲熟女中文字幕在线视频,黄三级高清在线播放,免费黄色视频在线看

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
Activemq消息確認機制
AcitveMQ是作為一種消息存儲和分發(fā)組件,涉及到client與broker端數(shù)據(jù)交互的方方面面,它不僅要擔保消息的存儲安全性,還要提供額外的手段來確保消息的分發(fā)是可靠的。
一. ActiveMQ消息傳送機制
Producer客戶端使用來發(fā)送消息的, Consumer客戶端用來消費消息;它們的協(xié)同中心就是ActiveMQ broker,broker也是讓producer和consumer調(diào)用過程解耦的工具,最終實現(xiàn)了異步RPC/數(shù)據(jù)交換的功能。隨著ActiveMQ的不斷發(fā)展,支持了越來越多的特性,也解決開發(fā)者在各種場景下使用ActiveMQ的需求。比如producer支持異步調(diào)用;使用flow control機制讓broker協(xié)同consumer的消費速率;consumer端可以使用prefetchACK來最大化消息消費的速率;提供"重發(fā)策略"等來提高消息的安全性等。在此我們不詳細介紹。
一條消息的生命周期如下:
圖片中簡單的描述了一條消息的生命周期,不過在不同的架構(gòu)環(huán)境中,message的流動行可能更加復(fù)雜.將在稍后有關(guān)broker的架構(gòu)中詳解..一條消息從producer端發(fā)出之后,一旦被broker正確保存,那么它將會被consumer消費,然后ACK,broker端才會刪除;不過當消息過期或者存儲設(shè)備溢出時,也會終結(jié)它。
這是一張很復(fù)雜,而且有些凌亂的圖片;這張圖片中簡單的描述了:1)producer端如何發(fā)送消息 2) consumer端如何消費消息 3) broker端如何調(diào)度。如果用文字來描述圖示中的概念,恐怕一言難盡。圖示中,提及到prefetchAck,以及消息同步、異步發(fā)送的基本邏輯;這對你了解下文中的ACK機制將有很大的幫助。
二. optimizeACK
"可優(yōu)化的ACK",這是ActiveMQ對于consumer在消息消費時,對消息ACK的優(yōu)化選項,也是consumer端最重要的優(yōu)化參數(shù)之一,你可以通過如下方式開啟:
1) 在brokerUrl中增加如下查詢字符串:
[java] view plaincopy
String brokerUrl = "tcp://localhost:61616?" +
"jms.optimizeAcknowledge=true" +
"&jms.optimizeAcknowledgeTimeOut=30000" +
"&jms.redeliveryPolicy.maximumRedeliveries=6";
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
2) 在destinationUri中,增加如下查詢字符串:
[java] view plaincopy
String queueName = "test-queue?customer.prefetchSize";
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue(queueName);
我們需要在brokerUrl指定optimizeACK選項,在destinationUri中指定prefetchSize(預(yù)獲取)選項,其中brokerUrl參數(shù)選項是全局的,即當前factory下所有的connection/session/consumer都會默認使用這些值;而destinationUri中的選項,只會在使用此destination的consumer實例中有效;如果同時指定,brokerUrl中的參數(shù)選項值將會被覆蓋。optimizeAck表示是否開啟“優(yōu)化ACK”,只有在為true的情況下,prefetchSize(下文中將會簡寫成prefetch)以及optimizeAcknowledgeTimeout參數(shù)才會有意義。此處需要注意"optimizeAcknowledgeTimeout"選項只能在brokerUrl中配置。
prefetch值建議在destinationUri中指定,因為在brokerUrl中指定比較繁瑣;在brokerUrl中,queuePrefetchSize和topicPrefetchSize都需要單獨設(shè)定:"&jms.prefetchPolicy.queuePrefetch=12&jms.prefetchPolicy.topicPrefetch=12"等來逐個指定。
如果prefetchACK為true,那么prefetch必須大于0;當prefetchACK為false時,你可以指定prefetch為0以及任意大小的正數(shù)。不過,當prefetch=0是,表示consumer將使用PULL(拉取)的方式從broker端獲取消息,broker端將不會主動push消息給client端,直到client端發(fā)送PullCommand時;當prefetch>0時,就開啟了broker push模式,此后只要當client端消費且ACK了一定的消息之后,會立即push給client端多條消息。
當consumer端使用receive()方法同步獲取消息時,prefetch可以為0和任意正值;當prefetch=0時,那么receive()方法將會首先發(fā)送一個PULL指令并阻塞,直到broker端返回消息為止,這也意味著消息只能逐個獲取(類似于Request<->Response),這也是Activemq中PULL消息模式;當prefetch > 0時,broker端將會批量push給client 一定數(shù)量的消息(<= prefetch),client端會把這些消息(unconsumedMessage)放入到本地的隊列中,只要此隊列有消息,那么receive方法將會立即返回,當一定量的消息ACK之后,broker端會繼續(xù)批量push消息給client端。
當consumer端使用MessageListener異步獲取消息時,這就需要開發(fā)設(shè)定的prefetch值必須 >=1,即至少為1;在異步消費消息模式中,設(shè)定prefetch=0,是相悖的,也將獲得一個Exception。
此外,我們還可以brokerUrl中配置“redelivery”策略,比如當一條消息處理異常時,broker端可以重發(fā)的最大次數(shù);和下文中提到REDELIVERED_ACK_TYPE互相協(xié)同。當消息需要broker端重發(fā)時,consumer會首先在本地的“deliveredMessage隊列”(Consumer已經(jīng)接收但還未確認的消息隊列)刪除它,然后向broker發(fā)送“REDELIVERED_ACK_TYPE”類型的確認指令,broker將會把指令中指定的消息重新添加到pendingQueue(亟待發(fā)送給consumer的消息隊列)中,直到合適的時機,再次push給client。
到目前為止,或許你知道了optimizeACK和prefeth的大概意義,不過我們可能還會有些疑惑?。ptimizeACK和prefetch配合,將會達成一個高效的消息消費模型:批量獲取消息,并“延遲”確認(ACK)。prefetch表達了“批量獲取”消息的語義,broker端主動的批量push多條消息給client端,總比client多次發(fā)送PULL指令然后broker返回一條消息的方式要優(yōu)秀很多,它不僅減少了client端在獲取消息時阻塞的次數(shù)和阻塞的時間,還能夠大大的減少網(wǎng)絡(luò)開支。optimizeACK表達了“延遲確認”的語義(ACK時機),client端在消費消息后暫且不發(fā)送ACK,而是把它緩存下來(pendingACK),等到這些消息的條數(shù)達到一定閥值時,只需要通過一個ACK指令把它們?nèi)看_認;這比對每條消息都逐個確認,在性能上要提高很多。由此可見,prefetch優(yōu)化了消息傳送的性能,optimizeACK優(yōu)化了消息確認的性能。
當consumer端消息消費的速率很高(相對于producer生產(chǎn)消息),而且消息的數(shù)量也很大時(比如消息源源不斷的生產(chǎn)),我們使用optimizeACK + prefetch將會極大的提升consumer的性能。不過反過來:
1) 如果consumer端消費速度很慢(對消息的處理是耗時的),過大的prefetchSize,并不能有效的提升性能,反而不利于consumer端的負載均衡(只針對queue);按照良好的設(shè)計準則,當consumer消費速度很慢時,我們通常會部署多個consumer客戶端,并使用較小的prefetch,同時關(guān)閉optimizeACK,可以讓消息在多個consumer間“負載均衡”(即均勻的發(fā)送給每個consumer);如果較大的prefetchSize,將會導(dǎo)致broker一次性push給client大量的消息,但是這些消息需要很久才能ACK(消息積壓),而且在client故障時,還會導(dǎo)致這些消息的重發(fā)。
2) 如果consumer端消費速度很快,但是producer端生成消息的速率較慢,比如生產(chǎn)者10秒鐘生成10條消息,但是consumer一秒就能消費完畢,而且我們還部署了多個consumer?。∵@種場景下,建議開啟optimizeACK,但是需要設(shè)置較小的prefetchSize;這樣可以保證每個consumer都能有"活干",否則將會出現(xiàn)一個consumer非常忙碌,但是其他consumer幾乎收不到消息。
3) 如果消息很重要,特別是不原因接收到”redelivery“的消息,那么我們需要將optimizeACK=false,prefetchSize=1
既然optimizeACK是”延遲“確認,那么就引入一種潛在的風(fēng)險:在消息被消費之后還沒有來得及確認時,client端發(fā)生故障,那么這些消息就有可能會被重新發(fā)送給其他consumer,那么這種風(fēng)險就需要client端能夠容忍“重復(fù)”消息。
prefetch值默認為1000,當然這個值可能在很多場景下是偏大的;我們暫且不考慮ACK_MODE(參見下文),通常情況下,我們只需要簡單的統(tǒng)計出單個consumer每秒的最大消費消息數(shù)即可,比如一個consumer每秒可以處理100個消息,我們期望consumer端每2秒確認一次,那么我們的prefetchSize可以設(shè)置為100 * 2 /0.65大概為300。無論如何設(shè)定此值,client持有的消息條數(shù)最大為:prefetch + “DELIVERED_ACK_TYPE消息條數(shù)”(DELIVERED_ACK_TYPE參見下文)
即使當optimizeACK為true,也只會當session的ACK_MODE為AUTO_ACKNOWLEDGE時才會生效,即在其他類型的ACK_MODE時consumer端仍然不會“延遲確認”,即:
[java] view plaincopy
consumer.optimizeAck = connection.optimizeACK && session.isAutoAcknowledge()
當consumer.optimizeACK有效時,如果客戶端已經(jīng)消費但尚未確認的消息(deliveredMessage)達到prefetch * 0.65,consumer端將會自動進行ACK;同時如果離上一次ACK的時間間隔,已經(jīng)超過"optimizeAcknowledgeTimout"毫秒,也會導(dǎo)致自動進行ACK。
此外簡單的補充一下,批量確認消息時,只需要在ACK指令中指明“firstMessageId”和“l(fā)astMessageId”即可,即消息區(qū)間,那么broker端就知道此consumer(根據(jù)consumerId識別)需要確認哪些消息。
三. ACK模式與類型介紹
JMS API中約定了Client端可以使用四種ACK_MODE,在javax.jms.Session接口中:
AUTO_ACKNOWLEDGE = 1    自動確認
CLIENT_ACKNOWLEDGE = 2    客戶端手動確認
DUPS_OK_ACKNOWLEDGE = 3    自動批量確認
SESSION_TRANSACTED = 0    事務(wù)提交并確認
此外AcitveMQ補充了一個自定義的ACK_MODE:
INDIVIDUAL_ACKNOWLEDGE = 4    單條消息確認
我們在開發(fā)JMS應(yīng)用程序的時候,會經(jīng)常使用到上述ACK_MODE,其中"INDIVIDUAL_ACKNOWLEDGE "只有ActiveMQ支持,當然開發(fā)者也可以使用它. ACK_MODE描述了Consumer與broker確認消息的方式(時機),比如當消息被Consumer接收之后,Consumer將在何時確認消息。對于broker而言,只有接收到ACK指令,才會認為消息被正確的接收或者處理成功了,通過ACK,可以在consumer與Broker之間建立一種簡單的“擔?!睓C制.
Client端指定了ACK_MODE,但是在Client與broker在交換ACK指令的時候,還需要告知ACK_TYPE,ACK_TYPE表示此確認指令的類型,不同的ACK_TYPE將傳遞著消息的狀態(tài),broker可以根據(jù)不同的ACK_TYPE對消息進行不同的操作。
比如Consumer消費消息時出現(xiàn)異常,就需要向broker發(fā)送ACK指令,ACK_TYPE為"REDELIVERED_ACK_TYPE",那么broker就會重新發(fā)送此消息。在JMS API中并沒有定義ACT_TYPE,因為它通常是一種內(nèi)部機制,并不會面向開發(fā)者。ActiveMQ中定義了如下幾種ACK_TYPE(參看MessageAck類):
DELIVERED_ACK_TYPE = 0    消息"已接收",但尚未處理結(jié)束
STANDARD_ACK_TYPE = 2    "標準"類型,通常表示為消息"處理成功",broker端可以刪除消息了
POSION_ACK_TYPE = 1    消息"錯誤",通常表示"拋棄"此消息,比如消息重發(fā)多次后,都無法正確處理時,消息將會被刪除或者DLQ(死信隊列)
REDELIVERED_ACK_TYPE = 3    消息需"重發(fā)",比如consumer處理消息時拋出了異常,broker稍后會重新發(fā)送此消息
INDIVIDUAL_ACK_TYPE = 4    表示只確認"單條消息",無論在任何ACK_MODE下
UNMATCHED_ACK_TYPE = 5    BROKER間轉(zhuǎn)發(fā)消息時,接收端"拒絕"消息
到目前為止,我們已經(jīng)清楚了大概的原理: Client端在不同的ACK_MODE時,將意味著在不同的時機發(fā)送ACK指令,每個ACK Command中會包含ACK_TYPE,那么broker端就可以根據(jù)ACK_TYPE來決定此消息的后續(xù)操作. 接下來,我們詳細的分析ACK_MODE與ACK_TYPE.
[java] view plaincopy
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
我們需要在創(chuàng)建Session時指定ACK_MODE,由此可見,ACK_MODE將是session共享的,意味著一個session下所有的 consumer都使用同一種ACK_MODE。在創(chuàng)建Session時,開發(fā)者不能指定除ACK_MODE列表之外的其他值.如果此session為事務(wù)類型,用戶指定的ACK_MODE將被忽略,而強制使用"SESSION_TRANSACTED"類型;如果session非事務(wù)類型時,也將不能將 ACK_MODE設(shè)定為"SESSION_TRANSACTED",畢竟這是相悖的.
Consumer消費消息的風(fēng)格有2種: 同步/異步..使用consumer.receive()就是同步,使用messageListener就是異步;在同一個consumer中,我們不能使用使用這2種風(fēng)格,比如在使用listener的情況下,當調(diào)用receive()方法將會獲得一個Exception。兩種風(fēng)格下,消息確認時機有所不同。
"同步"偽代碼:
[java] view plaincopy
//receive偽代碼---過程
Message message = sessionMessageQueue.dequeue();
if(message != null){
ack(message);
}
return message
同步調(diào)用時,在消息從receive方法返回之前,就已經(jīng)調(diào)用了ACK;因此如果Client端沒有處理成功,此消息將丟失(可能重發(fā),與ACK_MODE有關(guān))。
"異步"偽代碼:
[java] view plaincopy
//基于listener
Session session = connection.getSession(consumerId);
sessionQueueBuffer.enqueue(message);
Runnable runnable = new Ruannale(){
run(){
Consumer consumer = session.getConsumer(consumerId);
Message md = sessionQueueBuffer.dequeue();
try{
consumer.messageListener.onMessage(md);
ack(md);//
}catch(Exception e){
redelivery();//sometime,not all the time;
}
}
//session中將采取線程池的方式,分發(fā)異步消息
//因此同一個session中多個consumer可以并行消費
threadPool.execute(runnable);
基于異步調(diào)用時,消息的確認是在onMessage方法返回之后,如果onMessage方法異常,會導(dǎo)致消息重發(fā)。
四. ACK_MODE詳解
AUTO_ACKNOWLEDGE : 自動確認,這就意味著消息的確認時機將有consumer擇機確認."擇機確認"似乎充滿了不確定性,這也意味著,開發(fā)者必須明確知道"擇機確認"的具體時機,否則將有可能導(dǎo)致消息的丟失,或者消息的重復(fù)接受.那么在ActiveMQ中,AUTO_ACKNOWLEDGE是如何運作的呢?
1) 對于consumer而言,optimizeAcknowledge屬性只會在AUTO_ACK模式下有效。
2) 其中DUPS_ACKNOWLEGE也是一種潛在的AUTO_ACK,只是確認消息的條數(shù)和時間上有所不同。
3) 在“同步”(receive)方法返回message之前,會檢測optimizeACK選項是否開啟,如果沒有開啟,此單條消息將立即確認,所以在這種情況下,message返回之后,如果開發(fā)者在處理message過程中出現(xiàn)異常,會導(dǎo)致此消息也不會redelivery,即"潛在的消息丟失";如果開啟了optimizeACK,則會在unAck數(shù)量達到prefetch * 0.65時確認,當然我們可以指定prefetchSize = 1來實現(xiàn)逐條消息確認。
4) 在"異步"(messageListener)方式中,將會首先調(diào)用listener.onMessage(message),此后再ACK,如果onMessage方法異常,將導(dǎo)致client端補充發(fā)送一個ACK_TYPE為REDELIVERED_ACK_TYPE確認指令;如果onMessage方法正常,消息將會正常確認(STANDARD_ACK_TYPE)。此外需要注意,消息的重發(fā)次數(shù)是有限制的,每條消息中都會包含“redeliveryCounter”計數(shù)器,用來表示此消息已經(jīng)被重發(fā)的次數(shù),如果重發(fā)次數(shù)達到閥值,將會導(dǎo)致發(fā)送一個ACK_TYPE為POSION_ACK_TYPE確認指令,這就導(dǎo)致broker端認為此消息無法消費,此消息將會被刪除或者遷移到"dead letter"通道中。
因此當我們使用messageListener方式消費消息時,通常建議在onMessage方法中使用try-catch,這樣可以在處理消息出錯時記錄一些信息,而不是讓consumer不斷去重發(fā)消息;如果你沒有使用try-catch,就有可能會因為異常而導(dǎo)致消息重復(fù)接收的問題,需要注意你的onMessage方法中邏輯是否能夠兼容對重復(fù)消息的判斷。
CLIENT_ACKNOWLEDGE : 客戶端手動確認,這就意味著AcitveMQ將不會“自作主張”的為你ACK任何消息,開發(fā)者需要自己擇機確認。在此模式下,開發(fā)者需要需要關(guān)注幾個方法:1) message.acknowledge(),2) ActiveMQMessageConsumer.acknowledege(),3) ActiveMQSession.acknowledge();其1)和3)是等效的,將當前session中所有consumer中尚未ACK的消息都一起確認,2)只會對當前consumer中那些尚未確認的消息進行確認。開發(fā)者可以在合適的時機必須調(diào)用一次上述方法。
我們通常會在基于Group(消息分組)情況下會使用CLIENT_ACKNOWLEDGE,我們將在一個group的消息序列接受完畢之后確認消息(組);不過當你認為消息很重要,只有當消息被正確處理之后才能確認時,也很可以使用此ACK_MODE。
如果開發(fā)者忘記調(diào)用acknowledge方法,將會導(dǎo)致當consumer重啟后,會接受到重復(fù)消息,因為對于broker而言,那些尚未真正ACK的消息被視為“未消費”。
開發(fā)者可以在當前消息處理成功之后,立即調(diào)用message.acknowledge()方法來"逐個"確認消息,這樣可以盡可能的減少因網(wǎng)絡(luò)故障而導(dǎo)致消息重發(fā)的個數(shù);當然也可以處理多條消息之后,間歇性的調(diào)用acknowledge方法來一次確認多條消息,減少ack的次數(shù)來提升consumer的效率,不過這仍然是一個利弊權(quán)衡的問題。
除了message.acknowledge()方法之外,ActiveMQMessageConumser.acknowledge()和ActiveMQSession.acknowledge()也可以確認消息,只不過前者只會確認當前consumer中的消息。其中sesson.acknowledge()和message.acknowledge()是等效的。
無論是“同步”/“異步”,ActiveMQ都不會發(fā)送STANDARD_ACK_TYPE,直到message.acknowledge()調(diào)用。如果在client端未確認的消息個數(shù)達到prefetchSize * 0.5時,會補充發(fā)送一個ACK_TYPE為DELIVERED_ACK_TYPE的確認指令,這會觸發(fā)broker端可以繼續(xù)push消息到client端。(參看PrefetchSubscription.acknwoledge方法)
在broker端,針對每個Consumer,都會保存一個因為"DELIVERED_ACK_TYPE"而“拖延”的消息個數(shù),這個參數(shù)為prefetchExtension,事實上這個值不會大于prefetchSize * 0.5,因為Consumer端會嚴格控制DELIVERED_ACK_TYPE指令發(fā)送的時機(參見ActiveMQMessageConsumer.ackLater方法),broker端通過“prefetchExtension”與prefetchSize互相配合,來決定即將push給client端的消息個數(shù),count = prefetchExtension + prefetchSize - dispatched.size(),其中dispatched表示已經(jīng)發(fā)送給client端但是還沒有“STANDARD_ACK_TYPE”的消息總量;由此可見,在CLIENT_ACK模式下,足夠快速的調(diào)用acknowledge()方法是決定consumer端消費消息的速率;如果client端因為某種原因?qū)е耡cknowledge方法未被執(zhí)行,將導(dǎo)致大量消息不能被確認,broker端將不會push消息,事實上client端將處于“假死”狀態(tài),而無法繼續(xù)消費消息。我們要求client端在消費1.5*prefetchSize個消息之前,必須acknowledge()一次;通常我們總是每消費一個消息調(diào)用一次,這是一種良好的設(shè)計。
此外需要額外的補充一下:所有ACK指令都是依次發(fā)送給broker端,在CLIET_ACK模式下,消息在交付給listener之前,都會首先創(chuàng)建一個DELIVERED_ACK_TYPE的ACK指令,直到client端未確認的消息達到"prefetchSize * 0.5"時才會發(fā)送此ACK指令,如果在此之前,開發(fā)者調(diào)用了acknowledge()方法,會導(dǎo)致消息直接被確認(STANDARD_ACK_TYPE)。broker端通常會認為“DELIVERED_ACK_TYPE”確認指令是一種“slow consumer”信號,如果consumer不能及時的對消息進行acknowledge而導(dǎo)致broker端阻塞,那么此consumer將會被標記為“slow”,此后queue中的消息將會轉(zhuǎn)發(fā)給其他Consumer。
DUPS_OK_ACKNOWLEDGE : "消息可重復(fù)"確認,意思是此模式下,可能會出現(xiàn)重復(fù)消息,并不是一條消息需要發(fā)送多次ACK才行。它是一種潛在的"AUTO_ACK"確認機制,為批量確認而生,而且具有“延遲”確認的特點。對于開發(fā)者而言,這種模式下的代碼結(jié)構(gòu)和AUTO_ACKNOWLEDGE一樣,不需要像CLIENT_ACKNOWLEDGE那樣調(diào)用acknowledge()方法來確認消息。
1) 在ActiveMQ中,如果在Destination是Queue通道,我們真的可以認為DUPS_OK_ACK就是“AUTO_ACK + optimizeACK + (prefetch > 0)”這種情況,在確認時機上幾乎完全一致;此外在此模式下,如果prefetchSize =1 或者沒有開啟optimizeACK,也會導(dǎo)致消息逐條確認,從而失去批量確認的特性。
2) 如果Destination為Topic,DUPS_OK_ACKNOWLEDGE才會產(chǎn)生JMS規(guī)范中詮釋的意義,即無論optimizeACK是否開啟,都會在消費的消息個數(shù)>=prefetch * 0.5時,批量確認(STANDARD_ACK_TYPE),在此過程中,不會發(fā)送DELIVERED_ACK_TYPE的確認指令,這是1)和AUTO_ACK的最大的區(qū)別。
這也意味著,當consumer故障重啟后,那些尚未ACK的消息會重新發(fā)送過來。
SESSION_TRANSACTED : 當session使用事務(wù)時,就是使用此模式。在事務(wù)開啟之后,和session.commit()之前,所有消費的消息,要么全部正常確認,要么全部redelivery。這種嚴謹性,通常在基于GROUP(消息分組)或者其他場景下特別適合。在SESSION_TRANSACTED模式下,optimizeACK并不能發(fā)揮任何效果,因為在此模式下,optimizeACK會被強制設(shè)定為false,不過prefetch仍然可以決定DELIVERED_ACK_TYPE的發(fā)送時機。
因為Session非線程安全,那么當前session下所有的consumer都會共享同一個transactionContext;同時建議,一個事務(wù)類型的Session中只有一個Consumer,已避免rollback()或者commit()方法被多個consumer調(diào)用而造成的消息混亂。
當consumer接受到消息之后,首先檢測TransactionContext是否已經(jīng)開啟,如果沒有,就會開啟并生成新的transactionId,并把信息發(fā)送給broker;此后將檢測事務(wù)中已經(jīng)消費的消息個數(shù)是否 >= prefetch * 0.5,如果大于則補充發(fā)送一個“DELIVERED_ACK_TYPE”的確認指令;這時就開始調(diào)用onMessage()方法,如果是同步(receive),那么即返回message。上述過程,和其他確認模式?jīng)]有任何特殊的地方。
當開發(fā)者決定事務(wù)可以提交時,必須調(diào)用session.commit()方法,commit方法將會導(dǎo)致當前session的事務(wù)中所有消息立即被確認;事務(wù)的確認過程中,首先把本地的deliveredMessage隊列中尚未確認的消息全部確認(STANDARD_ACK_TYPE);此后向broker發(fā)送transaction提交指令并等待broker反饋,如果broker端事務(wù)操作成功,那么將會把本地deliveredMessage隊列清空,新的事務(wù)開始;如果broker端事務(wù)操作失敗(此時broker已經(jīng)rollback),那么對于session而言,將執(zhí)行inner-rollback,這個rollback所做的事情,就是將當前事務(wù)中的消息清空并要求broker重發(fā)(REDELIVERED_ACK_TYPE),同時commit方法將拋出異常。
當session.commit方法異常時,對于開發(fā)者而言通常是調(diào)用session.rollback()回滾事務(wù)(事實上開發(fā)者不調(diào)用也沒有問題),當然你可以在事務(wù)開始之后的任何時機調(diào)用rollback(),rollback意味著當前事務(wù)的結(jié)束,事務(wù)中所有的消息都將被重發(fā)。需要注意,無論是inner-rollback還是調(diào)用session.rollback()而導(dǎo)致消息重發(fā),都會導(dǎo)致message.redeliveryCounter計數(shù)器增加,最終都會受限于brokerUrl中配置的"jms.redeliveryPolicy.maximumRedeliveries",如果rollback的次數(shù)過多,而達到重發(fā)次數(shù)的上限時,消息將會被DLQ(dead letter)。
INDIVIDUAL_ACKNOWLEDGE : 單條消息確認,這種確認模式,我們很少使用,它的確認時機和CLIENT_ACKNOWLEDGE幾乎一樣,當消息消費成功之后,需要調(diào)用message.acknowledege來確認此消息(單條),而CLIENT_ACKNOWLEDGE模式先message.acknowledge()方法將導(dǎo)致整個session中所有消息被確認(批量確認)。
結(jié)語:到目前為止,我們已經(jīng)已經(jīng)簡單的了解了ActiveMQ中消息傳送機制,還有JMS中ACK策略,重點分析了optimizeACK的策略,希望開發(fā)者能夠在使用activeMQ中避免一些不必要的錯誤。本文如有疏漏和錯誤之處,請各位不吝賜教,特此感謝。
源碼參考類:
1) ActiveMQConnectionFactory,ActiveMQMessageConsumer,ActiveMQSession,MessageAck等
2) Queue,PrefetchSubscription,TransactionContext,TransactionStore
本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊舉報
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
Spring ActiveMQ 整合(三): 確認機制ACK(收到消息后,應(yīng)該有一個回應(yīng)也就是確認答復(fù))
創(chuàng)建JMS Session對象的方法詳解
13-RabbitMQ高級特性-消費端限流
優(yōu)化ActiveMQ性能
ActiveMQ使用筆記
RabbitMq與KafKa比較
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點擊這里聯(lián)系客服!

聯(lián)系客服