關(guān)鍵字: activemq2.5 Clustering
ActiveMQ從多種不同的方面提供了集群的支持。
2.5.1 Queue consumer clusters
ActiveMQ支持訂閱同一個(gè)queue的consumers上的集群。如果一個(gè)consumer失效,那么所有未被確認(rèn)(unacknowledged)的消息都會(huì)被發(fā)送到這個(gè)queue上其它的consumers。如果某個(gè)consumer的處理速度比其它c(diǎn)onsumers更快,那么這個(gè)consumer就會(huì)消費(fèi)更多的消息。
需要注意的是,筆者發(fā)現(xiàn)AcitveMQ5.0版本的Queue consumer clusters存在一個(gè)bug:采用AMQ Message Store,運(yùn)行一個(gè)producer,兩個(gè)consumer,并采用如下的配置文件:
- <beans>
- <broker xmlns="http://activemq.org/config/1.0" brokerName="BugBroker1" useJmx="true">
-
- <transportConnectors>
- <transportConnector uri="tcp://localhost:61616"/>
- </transportConnectors>
-
- <persistenceAdapter>
- <amqPersistenceAdapter directory="activemq-data/BugBroker1" maxFileLength="32mb"/>
- </persistenceAdapter>
-
- </broker>
- </beans>
<beans><broker xmlns="http://activemq.org/config/1.0" brokerName="BugBroker1" useJmx="true"><transportConnectors><transportConnector uri="tcp://localhost:61616"/></transportConnectors><persistenceAdapter><amqPersistenceAdapter directory="activemq-data/BugBroker1" maxFileLength="32mb"/></persistenceAdapter></broker></beans>
那么經(jīng)過一段時(shí)間后可能會(huì)報(bào)出如下錯(cuò)誤:
ERROR [ActiveMQTransport: tcp:///127.0.0.1:1843 - RecoveryListenerAdapter.java:58 -RecoveryListenerAdapter] Message idID:versus-1837-1203915536609-0:2:1:1:419 could not be recovered fromthe data store!
Apache官方文檔說,此bug已經(jīng)被修正,預(yù)定在5.1.0版本上體現(xiàn)。
2.5.2 Broker clusters
一個(gè)常見的場(chǎng)景是有多個(gè)JMS broker,有一個(gè)客戶連接到其中一個(gè)broker。如果這個(gè)broker失效,那么客戶會(huì)自動(dòng)重新連接到其它的broker。在ActiveMQ中使用failover:// 協(xié)議來實(shí)現(xiàn)這個(gè)功能。ActiveMQ3.x版本的reliable://協(xié)議已經(jīng)變更為failover://。
如果某個(gè)網(wǎng)絡(luò)上有多個(gè)brokers而且客戶使用靜態(tài)發(fā)現(xiàn)(使用Static Transport或FailoverTransport)或動(dòng)態(tài)發(fā)現(xiàn)(使用DiscoveryTransport),那么客戶可以容易地在某個(gè)broker失效的情況下切換到其它的brokers。然而,stand alonebrokers并不了解其它brokers上的consumers,也就是說如果某個(gè)broker上沒有consumers,那么這個(gè)broker上的消息可能會(huì)因得不到處理而積壓起來。目前的解決方案是使用Network of brokers,以便在broker之間存儲(chǔ)轉(zhuǎn)發(fā)消息。ActiveMQ在未來會(huì)有更好的特性,用來在客戶端處理這個(gè)問題。
從ActiveMQ1.1版本起,ActiveMQ支持networks ofbrokers。它支持分布式的queues和topics。一個(gè)broker會(huì)相同對(duì)待所有的訂閱(subscription):不管他們是來自本地的客戶連接,還是來自遠(yuǎn)程broker,它都會(huì)遞送有關(guān)的消息拷貝到每個(gè)訂閱。遠(yuǎn)程broker得到這個(gè)消息拷貝后,會(huì)依次把它遞送到其內(nèi)部的本地連接上。有兩種方式配置Network of brokers,一種是使用static transport,如下:
- <broker brokerName="receiver" persistent="false" useJmx="false">
- <transportConnectors>
- <transportConnector uri="tcp://localhost:62002"/>
- </transportConnectors>
- <networkConnectors>
- <networkConnector uri="static:( tcp://localhost:61616,tcp://remotehost:61616)"/>
- </networkConnectors>
- …
- </broker>
<broker brokerName="receiver" persistent="false" useJmx="false"><transportConnectors><transportConnector uri="tcp://localhost:62002"/></transportConnectors><networkConnectors><networkConnector uri="static:( tcp://localhost:61616,tcp://remotehost:61616)"/></networkConnectors>…</broker>
另外一種是使用multicast discovery,如下:
- <broker name="sender" persistent="false" useJmx="false">
- <transportConnectors>
- <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
- </transportConnectors>
- <networkConnectors>
- <networkConnector uri="multicast://default"/>
- </networkConnectors>
- ...
- </broker>
<broker name="sender" persistent="false" useJmx="false"><transportConnectors><transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/></transportConnectors><networkConnectors><networkConnector uri="multicast://default"/></networkConnectors>...</broker>
Network Connector有以下屬性:
Property | Default Value | Description |
name | bridge | name of the network - for more than one network connector between the same two brokers - use different names |
dynamicOnly | false | if true, only forward messages if a consumer is active on the connected broker |
decreaseNetworkConsumerPriority | false | decrease the priority for dispatching to a Queue consumer the further away it is (in network hops) from the producer |
networkTTL | 1 | the number of brokers in the network that messages and subscriptions can pass through |
conduitSubscriptions | true | multiple consumers subscribing to the same destination are treated as one consumer by the network |
excludedDestinations | empty | destinations matching this list won't be forwarded across the network |
dynamicallyIncludedDestinations | empty | destinations that match this list will be forwarded across the network n.b. an empty list means all destinations not in the excluded list will be forwarded |
staticallyIncludedDestinations | empty | destinations that match will always be passed across the network - even if no consumers have ever registered an interest |
duplex | false | if true, a network connection will be used to both produce AND Consume messages. This is useful for hub and spoke scenarios when the hub is behind a firewall etc.
|
關(guān)于conduitSubscriptions屬性,這里稍稍說明一下。設(shè)想有兩個(gè)brokers,分別是brokerA和brokerB,它們之間用forwardingbridge連接。有一個(gè)consumer連接到brokerA并訂閱queue:Q.TEST。有兩個(gè)consumers連接到brokerB,也是訂閱queue:Q.TEST。這三個(gè)consumers有相同的優(yōu)先級(jí)。然后啟動(dòng)一個(gè)producer,它發(fā)送了30條消息到brokerA。如果conduitSubscriptions=true,那么brokerA上的consumer會(huì)得到15條消息,另外15條消息會(huì)發(fā)送給brokerB。此時(shí)負(fù)載并不均衡,因?yàn)榇藭r(shí)brokerA將brokerB上的兩個(gè)consumers視為一個(gè);如果conduitSubscriptions=false,那么每個(gè)consumer上都會(huì)收到10條消息。以下是關(guān)于NetworkConnector屬性的一個(gè)例子:
- <networkConnectors>
- <networkConnector uri="static://(tcp://localhost:61617)"
- name="bridge" dynamicOnly="false" conduitSubscriptions="true"
- decreaseNetworkConsumerPriority="false">
- <excludedDestinations>
- <queue physicalName="exclude.test.foo"/>
- <topic physicalName="exclude.test.bar"/>
- </excludedDestinations>
- <dynamicallyIncludedDestinations>
- <queue physicalName="include.test.foo"/>
- <topic physicalName="include.test.bar"/>
- </dynamicallyIncludedDestinations>
- <staticallyIncludedDestinations>
- <queue physicalName="always.include.queue"/>
- <topic physicalName="always.include.topic"/>
- </staticallyIncludedDestinations>
- </networkConnector>
- </networkConnectors>
<networkConnectors><networkConnector uri="static://(tcp://localhost:61617)"name="bridge" dynamicOnly="false" conduitSubscriptions="true"decreaseNetworkConsumerPriority="false"><excludedDestinations><queue physicalName="exclude.test.foo"/><topic physicalName="exclude.test.bar"/></excludedDestinations><dynamicallyIncludedDestinations><queue physicalName="include.test.foo"/><topic physicalName="include.test.bar"/></dynamicallyIncludedDestinations><staticallyIncludedDestinations><queue physicalName="always.include.queue"/><topic physicalName="always.include.topic"/></staticallyIncludedDestinations></networkConnector></networkConnectors>
2.5.3 Master Slave
在一個(gè)網(wǎng)絡(luò)內(nèi)運(yùn)行多個(gè)brokers或者stand alonebrokers時(shí)存在一個(gè)問題,這就是消息在物理上只被一個(gè)broker持有,因此當(dāng)某個(gè)broker失效,那么你只能等待直到它重啟后,這個(gè)broker上的消息才能夠被繼續(xù)發(fā)送(如果沒有設(shè)置持久化,那么在這種情況下,消息將會(huì)丟失)。Master Slave背后的想法是,消息被復(fù)制到slave broker,因此即使master broker遇到了像硬件故障之類的錯(cuò)誤,你也可以立即切換到slavebroker而不丟失任何消息。
Master Slave是目前ActiveMQ推薦的高可靠性和容錯(cuò)的解決方案。以下是幾種不同的類型:
Master Slave Type | Requirements | Pros | Cons |
Pure Master Slave | None | No central point of failure | Requires manual restart to bring back a failed master and can only support 1 slave |
Shared File System Master Slave | A Shared File system such as a SAN | Run as many slaves as required. Automatic recovery of old masters | Requires shared file system |
JDBC Master Slave | A Shared database | Run as many slaves as required. Automatic recovery of old masters | Requires a shared database. Also relatively slow as it cannot use the high performance journal |
2.5.3.1 Pure Master Slave
Pure Master Slave的工作方式如下:
- Slave broker消費(fèi)master broker上所有的消息狀態(tài),例如消息、確認(rèn)和事務(wù)狀態(tài)等。只要slave broker連接到了master broker,它不會(huì)(也不被允許)啟動(dòng)任何network connectors或者transport connectors,所以唯一的目的就是復(fù)制master broker的狀態(tài)。
- Master broker只有在消息成功被復(fù)制到slave broker之后才會(huì)響應(yīng)客戶。例如,客戶的commit請(qǐng)求只有在master broker和slave broker都處理完畢commit請(qǐng)求之后才會(huì)結(jié)束。
- 當(dāng)master broker失效的時(shí)候,slave broker有兩種選擇,一種是slave broker啟動(dòng)所有的network connectors和transport connectors,這允許客戶端切換到slave broker;另外一種是slave broker停止。這種情況下,slave broker只是復(fù)制了master broker的狀態(tài)。
- 客戶應(yīng)該使用failover transport并且應(yīng)該首先嘗試連接master broker。例如:
failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false
設(shè)置randomize為false就可以讓客戶總是首先嘗試連接master broker(slave broker并不會(huì)接受任何連接,直到它成為了master broker)。
Pure Master Slave具有以下限制:
- 只能有一個(gè)slave broker連接到master broker。
- 在因master broker失效而導(dǎo)致slave broker成為master之后,之前的master broker只有在當(dāng)前的master broker(原slave broker)停止后才能重新生效。
- Master broker失效后而切換到slave broker后,最安全的恢復(fù)master broker的方式是人工處理。首先要停止slave broker(這意味著所有的客戶也要停止)。然后把slave broker的數(shù)據(jù)目錄中所有的數(shù)據(jù)拷貝到master broker的數(shù)據(jù)目錄中。然后重啟master broker和slave broker。
Master broker不需要特殊的配置。Slave broker需要進(jìn)行以下配置
- <broker masterConnectorURI="tcp://masterhost:62001" shutdownOnMasterFailure="false">
- ...
- <transportConnectors>
- <transportConnector uri="tcp://slavehost:61616"/>
- </transportConnectors>
- </broker>
<broker masterConnectorURI="tcp://masterhost:62001" shutdownOnMasterFailure="false">...<transportConnectors><transportConnector uri="tcp://slavehost:61616"/></transportConnectors></broker>
其中的masterConnectorURI用于指向master broker,shutdownOnMasterFailure用于指定slave broker在master broker失效的時(shí)候是否需要停止。此外,也可以使用如下配置:
- <broker brokerName="slave" useJmx="false" deleteAllMessagesOnStartup="true" xmlns="http://activemq.org/config/1.0">
- ...
- <services>
- <masterConnector remoteURI= "tcp://localhost:62001" userName="user" password="password"/>
- </services>
- </broker>
<broker brokerName="slave" useJmx="false" deleteAllMessagesOnStartup="true" xmlns="http://activemq.org/config/1.0">...<services><masterConnector remoteURI= "tcp://localhost:62001" userName="user" password="password"/></services></broker>
需要注意的是,筆者認(rèn)為ActiveMQ5.0版本的Pure Master Slave仍然不夠穩(wěn)定。
2.5.3.2 Shared File System Master Slave
如果你使用SAN或者共享文件系統(tǒng),那么你可以使用Shared File System MasterSlave?;旧?,你可以運(yùn)行多個(gè)broker,這些broker共享數(shù)據(jù)目錄。當(dāng)?shù)谝粋€(gè)broker得到文件上的排他鎖之后,其它的broker便會(huì)在循環(huán)中等待獲得這把鎖??蛻舳耸褂胒ailover transport來連接到可用的broker。當(dāng)masterbroker失效的時(shí)候會(huì)釋放這把鎖,這時(shí)候其中一個(gè)slave broker會(huì)得到這把鎖從而成為master broker。以下是ActiveMQ配置的一個(gè)例子:
- <broker useJmx="false" xmlns="http://activemq.org/config/1.0">
- <persistenceAdapter>
- <journaledJDBC dataDirectory="/sharedFileSystem/broker"/>
- </persistenceAdapter>
- …
- </broker>
<broker useJmx="false" xmlns="http://activemq.org/config/1.0"><persistenceAdapter><journaledJDBC dataDirectory="/sharedFileSystem/broker"/></persistenceAdapter>…</broker>
2.5.3.3 JDBC Master Slave
JDBC Master Slave的工作原理跟Shared File System Master Slave類似,只是采用了數(shù)據(jù)庫作為持久化存儲(chǔ)。以下是ActiveMQ配置的一個(gè)例子:
- <beans>
- <broker xmlns="http://activemq.org/config/1.0" brokerName="JdbcMasterBroker">
- ...
- <persistenceAdapter>
- <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
- </persistenceAdapter>
-
- </broker>
-
- <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
- <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
- <property name="url" value="jdbc:mysql://localhost:3306/test?relaxAutoCommit=true"/>
- <property name="username" value="username"/>
- <property name="password" value="passward"/>
- <property name="poolPreparedStatements" value="true"/>
- </bean>
- </beans>
<beans><broker xmlns="http://activemq.org/config/1.0" brokerName="JdbcMasterBroker">...<persistenceAdapter><jdbcPersistenceAdapter dataSource="#mysql-ds"/></persistenceAdapter></broker><bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"><property name="driverClassName" value="com.mysql.jdbc.Driver"/><property name="url" value="jdbc:mysql://localhost:3306/test?relaxAutoCommit=true"/><property name="username" value="username"/><property name="password" value="passward"/><property name="poolPreparedStatements" value="true"/></bean></beans>
需要注意的是,如果你使用MySQL數(shù)據(jù)庫,需要首先執(zhí)行以下三條語句:(Apache官方文檔說,此bug已經(jīng)被修正,預(yù)定在5.1.0版本上體現(xiàn))
- ALTER TABLE activemq_acks ENGINE = InnoDB;
- ALTER TABLE activemq_lock ENGINE = InnoDB;
- ALTER TABLE activemq_msgs ENGINE = InnoDB;