單點(diǎn)的ActiveMQ作為企業(yè)應(yīng)用無法滿足高可用和集群的需求,所以ActiveMQ提供了master-slave、broker cluster等多種部署方式,但通過分析多種部署方式之后我認(rèn)為需要將兩種部署方式相結(jié)合才能滿足我們公司分布式和高可用的需求,所以后面就重點(diǎn)將解如何將兩種部署方式相結(jié)合。
1、Master-Slave部署方式
1)shared filesystem Master-Slave部署方式
主要是通過共享存儲(chǔ)目錄來實(shí)現(xiàn)master和slave的熱備,所有的ActiveMQ應(yīng)用都在不斷地獲取共享目錄的控制權(quán),哪個(gè)應(yīng)用搶到了控制權(quán),它就成為master。
多個(gè)共享存儲(chǔ)目錄的應(yīng)用,誰先啟動(dòng),誰就可以最早取得共享目錄的控制權(quán)成為master,其他的應(yīng)用就只能作為slave。
2)shared database Master-Slave方式
與shared filesystem方式類似,只是共享的存儲(chǔ)介質(zhì)由文件系統(tǒng)改成了
數(shù)據(jù)庫而已。
3)Replicated LevelDB Store方式
這種主備方式是ActiveMQ5.9以后才新增的特性,使用ZooKeeper協(xié)調(diào)選擇一個(gè)node作為master。被選擇的master broker node開啟并接受客戶端連接。
其他node轉(zhuǎn)入slave模式,連接master并同步他們的存儲(chǔ)狀態(tài)。slave不接受客戶端連接。所有的存儲(chǔ)操作都將被復(fù)制到連接至Master的slaves。
如果master死了,得到了最新更新的slave被允許成為master。fialed node能夠重新加入到網(wǎng)絡(luò)中并連接master進(jìn)入slave mode。所有需要同步的disk的消息操作都將等待存儲(chǔ)狀態(tài)被復(fù)制到其他法定節(jié)點(diǎn)的操作完成才能完成。所以,如果你配置了replicas=3,那么法定大小是(3/2)+1=2. Master將會(huì)存儲(chǔ)并更新然后等待 (2-1)=1個(gè)slave存儲(chǔ)和更新完成,才匯報(bào)success。至于為什么是2-1,熟悉Zookeeper的應(yīng)該知道,有一個(gè)node要作為觀擦者存在。
單一個(gè)新的master被選中,你需要至少保障一個(gè)法定node在線以能夠找到擁有最新狀態(tài)的node。這個(gè)node將會(huì)成為新的master。因此,推薦運(yùn)行至少3個(gè)replica nodes,以防止一個(gè)node失敗了,服務(wù)中斷。
2、Broker-Cluster部署方式
前面的Master-Slave的方式雖然能解決多服務(wù)熱備的高可用問題,但無法解決負(fù)載均衡和分布式的問題。Broker-Cluster的部署方式就可以解決負(fù)載均衡的問題。
Broker-Cluster部署方式中,各個(gè)broker通過網(wǎng)絡(luò)互相連接,并共享queue。當(dāng)broker-A上面指定的queue-A中接收到一個(gè)message處于pending狀態(tài),而此時(shí)沒有consumer連接broker-A時(shí)。如果cluster中的broker-B上面由一個(gè)consumer在消費(fèi)queue-A的消息,那么broker-B會(huì)先通過內(nèi)部網(wǎng)絡(luò)獲取到broker-A上面的message,并通知自己的consumer來消費(fèi)。
1)static Broker-Cluster部署
在activemq.xml文件中靜態(tài)指定Broker需要建立橋連接的其他Broker:
1、 首先在Broker-A節(jié)點(diǎn)中添加networkConnector節(jié)點(diǎn):
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61617)"duplex="false"/>
</networkConnectors>
2、 修改Broker-A節(jié)點(diǎn)中的服務(wù)提供端口為61616:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
3、 在Broker-B節(jié)點(diǎn)中添加networkConnector節(jié)點(diǎn):
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
4、 修改Broker-A節(jié)點(diǎn)中的服務(wù)提供端口為61617:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
5、分別啟動(dòng)Broker-A和Broker-B。
2)Dynamic Broker-Cluster部署
在activemq.xml文件中不直接指定Broker需要建立橋連接的其他Broker,由activemq在啟動(dòng)后動(dòng)態(tài)查找:
1、 首先在Broker-A節(jié)點(diǎn)中添加networkConnector節(jié)點(diǎn):
<networkConnectors>
<networkConnectoruri="multicast://default"
dynamicOnly="true"
networkTTL="3"
prefetchSize="1"
decreaseNetworkConsumerPriority="true" />
</networkConnectors>
2、修改Broker-A節(jié)點(diǎn)中的服務(wù)提供端口為61616:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616? " discoveryUri="multicast://default"/>
</transportConnectors>
3、在Broker-B節(jié)點(diǎn)中添加networkConnector節(jié)點(diǎn):
<networkConnectors>
<networkConnectoruri="multicast://default"
dynamicOnly="true"
networkTTL="3"
prefetchSize="1"
decreaseNetworkConsumerPriority="true" />
</networkConnectors>
4、修改Broker-B節(jié)點(diǎn)中的服務(wù)提供端口為61617:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617" discoveryUri="multicast://default"/>
</transportConnectors>
5、啟動(dòng)Broker-A和Broker-B
2、Master-Slave與Broker-Cluster相結(jié)合的部署方式
可以看到Master-Slave的部署方式雖然解決了高可用的問題,但不支持負(fù)載均衡,Broker-Cluster解決了負(fù)載均衡,但當(dāng)其中一個(gè)Broker突然宕掉的話,那么存在于該Broker上處于Pending狀態(tài)的message將會(huì)丟失,無法達(dá)到高可用的目的。
由于目前ActiveMQ官網(wǎng)上并沒有一個(gè)明確的將兩種部署方式相結(jié)合的部署方案,所以我嘗試者把兩者結(jié)合起來部署:
1、部署的配置修改
這里以Broker-A + Broker-B建立cluster,Broker-C作為Broker-B的slave為例:
1)首先在Broker-A節(jié)點(diǎn)中添加networkConnector節(jié)點(diǎn):
<networkConnectors>
<networkConnector uri="masterslave:(tcp://0.0.0.0:61617,tcp:// 0.0.0.0:61618)" duplex="false"/>
</networkConnectors>
2)修改Broker-A節(jié)點(diǎn)中的服務(wù)提供端口為61616:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
3)在Broker-B節(jié)點(diǎn)中添加networkConnector節(jié)點(diǎn):
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
4)修改Broker-B節(jié)點(diǎn)中的服務(wù)提供端口為61617:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
5)修改Broker-B節(jié)點(diǎn)中的持久化方式:
<persistenceAdapter>
<kahaDB directory="/localhost/kahadb"/>
</persistenceAdapter>
6)在Broker-C節(jié)點(diǎn)中添加networkConnector節(jié)點(diǎn):
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
7)修改Broker-C節(jié)點(diǎn)中的服務(wù)提供端口為61618:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
8)修改Broker-B節(jié)點(diǎn)中的持久化方式:
<persistenceAdapter>
<kahaDB directory="/localhost/kahadb"/>
</persistenceAdapter>
9)分別啟動(dòng)broker-A、broker-B、broker-C,因?yàn)槭莃roker-B先啟動(dòng),所以“/localhost/kahadb”目錄被lock住,broker-C將一直處于掛起狀態(tài),當(dāng)人為停掉broker-B之后,broker-C將獲取目錄“/localhost/kahadb”的控制權(quán),重新與broker-A組成cluster提供服務(wù)。
4.ActiveMQ之Master-Slaver+負(fù)載均衡
什么叫中間件?
中間件為軟件應(yīng)用提供了
消息中間件解決了應(yīng)用之間的消息傳遞、解耦、異步的問題。
ActiveMQ 是Apache出品,最流行的,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn),盡管JMS規(guī)范出臺(tái)已經(jīng)是很久的事情了,但是JMS在當(dāng)今的J2EE應(yīng)用中間仍然扮演著特殊的地位。
一般中間件都提供了橫向擴(kuò)展和縱向擴(kuò)展,橫向擴(kuò)展就是我們經(jīng)常說的負(fù)載均衡,縱向擴(kuò)展提供了Master-Slaver;
負(fù)載均衡:提供負(fù)載均衡的中間件都對(duì)外提供服務(wù)
Master-Slaver:同時(shí)只有一個(gè)中間件對(duì)外提供服務(wù),當(dāng)Master出現(xiàn)掛機(jī)等問題,Slaver會(huì)自動(dòng)接管
看一個(gè)整合的簡(jiǎn)圖:
Master-Slave和Broker Cluster
準(zhǔn)備:
jdk1.6,apache-activemq-5.10.0,mysql5.1,zookeeper-3.4.3
先來看看Master-Slave模式
Shared File System Master Slave
本次
測(cè)試在同一臺(tái)機(jī)器上:
首先更改配置conf/activemq,做如下修改:
[java]
view plain copy<persistenceAdapter>
<!--<kahaDB directory="${activemq.data}/kahadb"/>-->
<kahaDB directory="D:/kahaDB"/>
</persistenceAdapter>
將activemq拷貝3份,分別:apache-activemq-5.10.0-M1, apache-activemq-5.10.0-M2,apache-activemq-5.10.0-M3,分別啟動(dòng)activemq命令,啟動(dòng)的日志分別是:
表示當(dāng)前進(jìn)程是Master
表示當(dāng)前進(jìn)程沒有獲取到鎖,作為Slaver
測(cè)試:
下面的例子中分別提供了Producer(Sender類)和Consumer(Receiver類)
我們首先用Producer發(fā)送消息給activemq,然后停止Master,然后再用Consumer接受消息,測(cè)試結(jié)果是可以接受到數(shù)據(jù)的。
2).JDBC Master Slave
JDBC Master Slave模式和Shared File Sysytem Master Slave模式的原理是一樣的,只是把共享文件系統(tǒng)換成了共享數(shù)據(jù)庫。
修改配置文件conf/activemq
[java]
view plain copy<pre code_snippet_id="1660918" snippet_file_name="blog_20160425_10_9093290" name="code" class="html"><persistenceAdapter>
<!--<kahaDB directory="${activemq.data}/kahadb"/>-->
<!--<kahaDB directory="D:/kahaDB"/>-->
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
</persistenceAdapter></pre><br><br>
添加數(shù)據(jù)源:[java]
view plain copy<pre code_snippet_id="1660918" snippet_file_name="blog_20160425_11_1808506" name="code" class="html"><bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="poolPreparedStatements" value="true"/>
</bean></pre><br><br>
注:這里使用的是mysql,所以需要mysql驅(qū)動(dòng)程序: mysql-connector-java-5.1.18,講jar包放入lib文件夾下面,驅(qū)動(dòng)版本不對(duì),會(huì)出現(xiàn)如下錯(cuò)誤: Database lock driver override not found for : [mysql_connect ...
分別拷貝到其他幾個(gè)文件夾下,分別啟動(dòng),啟動(dòng)成功后我們可以看到數(shù)據(jù)庫中多了幾張表:
測(cè)試方式同上;
官網(wǎng)手冊(cè):
http://activemq.apache.org/jdbc-master-slave.html3).Replicated LevelDB Store
這種方式是ActiveMQ5.9以后才新增的特性,使用ZooKeeper協(xié)調(diào)選擇一個(gè)node作為master
修改配置文件conf/activemq:
[java]
view plain copy<pre code_snippet_id="1660918" snippet_file_name="blog_20160425_12_5047109" name="code" class="html"><!--<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
<kahaDB directory="D:/kahaDB"/>
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
</persistenceAdapter>-->
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="127.0.0.1:2181"
hostname="127.0.0.1"
sync="local_disk"
zkPath="/activemq/leveldb-stores"/>
</persistenceAdapter></pre><br><br>
首先啟動(dòng)zookeeper,這里沒有做集群處理,默認(rèn)端口是2181,然后分別啟動(dòng)activemq,
啟動(dòng)之后報(bào)錯(cuò):"activemq LevelDB IOException handler"。 原因:版本5.10.0存在的依賴沖突。
解決方案:
(1)移除lib目錄中的pax-url-aether-1.5.2.jar包
(2)注釋掉配置文件中的日志配置;
[java]
view plain copy<!-- Allows accessing the server log
<bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
-->
測(cè)試方式同上;
提供
Java版的例子:
[java]
view plain copy<pre code_snippet_id="1660918" snippet_file_name="blog_20160425_13_2397151" name="code" class="java">public class Sender {
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageProducer producer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("FirstQueue");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("發(fā)送的消息"
+ i);
System.out.println("發(fā)送消息:" + "ActiveMq 發(fā)送的消息" + i);
producer.send(message);
}
}
}
public class Receiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);
while (true) {
TextMessage message = (TextMessage) consumer.receive(100000);
if (null != message) {
System.out.println("收到消息" + message.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}</pre><br><br>
5.Broker-Cluster實(shí)現(xiàn)負(fù)載均衡
Broker-Cluster部署方式中,各個(gè)broker通過網(wǎng)絡(luò)互相連接,并共享queue,提供了2中部署方式:
static Broker-Cluster和Dynamic Broker-Cluster
1).static Broker-Cluster
將ActiveMq拷貝2份,分別命名:apache-activemq-5.10.0_M1,apache-activemq-5.10.0_M2,
下面就是配置activemq,xml:
M1做如下配置:
?1
2
3
<networkConnectors>
<networkConnector uri="static:(tcp://localhost:61617)"/>
</networkConnectors>
?1
2
3
<transportConnectors>
<transportConnector name="openwire"uri="tcp://0.0.0.0:61616?maximumConnectio ns=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
M2做如下配置:
?1
2
3
<networkConnectors>
<networkConnector uri="static:(tcp://localhost:61616)"/>
</networkConnectors>
?1
2
3
<transportConnectors>
<transportConnector name="openwire"uri="tcp://0.0.0.0:61617?maximumConnectio ns=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
通過以上配置使M1和M2這兩個(gè) broker通過網(wǎng)絡(luò)互相連接,并共享queue,
啟動(dòng)M1和M2,可以看到如下啟動(dòng)日志:
可以看到M1和M2,network connection has been established
測(cè)試我們還是用上一篇中的Sender和Receiver類,只需要做一點(diǎn)點(diǎn)修改:
Sender類還是鏈接tcp://localhost:61616,發(fā)送消息到queue,
Receiver做如下修改:
?1
2
3
connectionFactory = newActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61617");
經(jīng)測(cè)試Receiver可以接受到數(shù)據(jù),表示M1和M2已經(jīng)共享了queue
2).Dynamic Broker-Cluster
Dynamic Discovery集群方式在配置ActiveMQ實(shí)例時(shí),不需要知道所有其它實(shí)例的URI地址
對(duì)activemq.xml做如下配置:
M1做如下配置:
?1
2
3
<networkConnectors>
<networkConnector uri="multicast://default"/>
</networkConnectors>
?1
2
3
4
<transportConnectors>
<transportConnector name="openwire"uri="tcp://0.0.0.0:61616? maximumConnections=1000&wireFormat.maxFrameSize=104857600"
discoveryUri="multicast://default"/>
</transportConnectors>
M2做如下配置:
?1
2
3
<networkConnectors>
<networkConnector uri="multicast://default"/>
</networkConnectors>
?1
2
3
4
<transportConnectors>
<transportConnector name="openwire"uri="tcp://0.0.0.0:61617? maximumConnections=1000&wireFormat.maxFrameSize=104857600"
discoveryUri="multicast://default"/>
</transportConnectors>
啟動(dòng)M1和M2,可以看到如下啟動(dòng)日志: network connection has been established測(cè)試同static broker-cluster,可以得到相同的結(jié)果。
官網(wǎng)配置說明:
http://activemq.apache.org/networks-of-brokers.htmlMaster-Slaver保證了數(shù)據(jù)的可靠性,Broker-Cluster提供了負(fù)載均衡,所以一般正式環(huán)境中都會(huì)采用:
Master-Slaver+Broker-Cluster的模式