消息隊(duì)列在目前分布式系統(tǒng)下具備非常重要的地位,如下的場(chǎng)景是比較適合消息隊(duì)列的:
之前有一個(gè)場(chǎng)景是商品數(shù)據(jù)在修改后需要推送到elasticsearch中,由于修改產(chǎn)品的并發(fā)量以及數(shù)據(jù)量均不大,所以對(duì)于消息未做持久化,而且為了快速上線(xiàn)簡(jiǎn)化系統(tǒng),生產(chǎn)者與消費(fèi)者更是部署在一個(gè)應(yīng)用中,自生產(chǎn)自消費(fèi)。這篇將從頭搭建RabbitMQ環(huán)境,并且將之集成在Spring boot中。
由于RabbitMQ是基于erlang開(kāi)發(fā)的,所以要安裝RabbitMQ先必須安裝erlang。
更換軟件源
使用apt-get時(shí)默認(rèn)的軟件源是us.archive.ubuntu.com,這會(huì)經(jīng)常發(fā)生安裝問(wèn)題,比如速度特別慢或者由于下載不了造成不能安裝。
可以更換成國(guó)內(nèi)的數(shù)據(jù)源cn.archive.ubuntu.com,速度那是不用說(shuō)的了(這里感謝我的同事的提醒)。找到下面這個(gè)文件然后進(jìn)行替換。
/etc/apt/sources.list:%s/us.archive/cn.archive/g
在沒(méi)有更新軟件源時(shí),我采取的是源碼編譯安裝方法,參考這篇文章。我安裝的是最新19.2版本,安裝過(guò)程中還遇到各種問(wèn)題就不一一記錄了。
測(cè)試erlang安裝是否正確,輸入erl,如果看到如下圖所示就說(shuō)明安裝成功了。
在未更換軟件源之前我也是選擇了源碼編譯安裝方法,安裝的最新的3.6.6,但手動(dòng)啟動(dòng)時(shí)總是不成功,錯(cuò)誤信息如下:
RabbitMQ 3.6.6+ erlang 19.2 啟動(dòng)失敗的問(wèn)題暫時(shí)未解決,有誰(shuí)知道的可以告訴我。
由于啟動(dòng)不成功,最后在更新成國(guó)內(nèi)軟件源之后,再次通過(guò) apt-get 安裝RabbitMQ,默認(rèn)的版本是3.5.7,好像也可以選版本,以后再?lài)L試??上驳氖峭ㄟ^(guò)apt-get安裝的RabbitMQ成功的啟動(dòng)起來(lái)了??梢酝ㄟ^(guò)如下命令查看RabbitMQ狀態(tài)。
./rabbitmqctl stauts
這是自帶的一個(gè)web插件,可以用來(lái)管理消息隊(duì)列,啟動(dòng)它的方法比較簡(jiǎn)單:
rabbitmq-plugins enable rabbitmq_management
然后重啟RabbitMQ即可生效。默認(rèn)生成了guest用戶(hù),但這個(gè)guest用戶(hù)只能在RabbitMQ所在主機(jī)才能訪問(wèn),所以要想遠(yuǎn)程訪問(wèn)就需要重新分配一個(gè)用戶(hù),有兩種辦法:
創(chuàng)建用戶(hù),指定用戶(hù)名以及密碼
./rabbitmqctl add_user root root //用戶(hù)名密碼都是root
分配角色,administrator是可以操作和guest本地用戶(hù)一樣的功能,當(dāng)?shù)卿浬蟫abbitmq_management之后,里面的所有功能都可以使用。
rabbitmqctl set_user_tags root administrator
授權(quán),隊(duì)列的操作管理權(quán)限。如果不配置,那么客戶(hù)端在連接消息隊(duì)列時(shí)會(huì)出問(wèn)題。
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
上面語(yǔ)句我沒(méi)有執(zhí)行成功,后續(xù)再研究下是不是寫(xiě)法問(wèn)題
我們?cè)趓abbitmq_management上面可以正常訪問(wèn)操作后,就可以放心的寫(xiě)demo了,這里采用spring boot。先看簡(jiǎn)單看下RabbitMQ的簡(jiǎn)易架構(gòu)圖,容易理解下面提到的一些組件。
生產(chǎn)者,消息,消費(fèi)者
消息內(nèi)部:Exchange,Binding,Queues
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
這里沒(méi)有采用自動(dòng)配置
mq.rabbit.host=192.168.21.128mq.rabbit.port=5672mq.rabbit.virtualHost=/mq.rabbit.username=rootmq.rabbit.password=root
@Beanpublic ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(this.mqRabbitHost,this.mqRabbitPort); connectionFactory.setUsername(this.mqRabbitUserName); connectionFactory.setPassword(this.mqRabbitPassword); connectionFactory.setVirtualHost(this.mqRabbitVirtualHost); connectionFactory.setPublisherConfirms(true); return connectionFactory;}
@Beanpublic RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template;}
@Beanpublic DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE_NAME);}
@Beanpublic Queue queue() { return new Queue(QUEUE_NAME, true);}
@Beanpublic Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(ROUTING_KEY);}
需要將ACK修改為手動(dòng)確認(rèn),避免消息在處理過(guò)程中發(fā)生異常造成被誤認(rèn)為已經(jīng)成功消費(fèi)的假象。
@Beanpublic SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { byte[] body = message.getBody(); logger.info("消費(fèi)端接收到消息 : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }); return container;}
為了讓客戶(hù)端知道消息是否已經(jīng)成功,消息隊(duì)列提供了回調(diào)機(jī)制(需要實(shí)現(xiàn)ConfirmCallback),當(dāng)消息服務(wù)器接收到消息之后會(huì)給客戶(hù)端一個(gè)通知,此時(shí)客戶(hù)端根據(jù)消息應(yīng)答來(lái)決定后續(xù)的流程。
@Servicepublic class ProductServiceImpl extends BaseService implements ProductService, RabbitTemplate.ConfirmCallback { @Autowired private ProductMapper productMapper; private RabbitTemplate rabbitTemplate; public ProductServiceImpl(RabbitTemplate rabbitTemplate){ this.rabbitTemplate=rabbitTemplate; this.rabbitTemplate.setConfirmCallback(this); } public void confirm(CorrelationData correlationData, boolean ack, String cause) { this.logger.info(" 消息id:" + correlationData); if (ack) { this.logger.info("消息發(fā)送確認(rèn)成功"); } else { this.logger.info("消息發(fā)送確認(rèn)失敗:" + cause); } } @Override public void save(Product product) { //執(zhí)行保存 String uuid = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuid); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, product.getName(),correlationId); }}
可以清晰的看到RabbitMQ發(fā)給生產(chǎn)者的信息收到的確認(rèn)信息,也能看到消息被消費(fèi)端消費(fèi)后的信息。
與常見(jiàn)的數(shù)據(jù)庫(kù)類(lèi)似,都是主從模式來(lái)保證高可用,可以利用HAProxy來(lái)實(shí)現(xiàn)主從備份方案。
主要是為了解決垂直優(yōu)化的瓶頸問(wèn)題,主要有這三種:
這個(gè)不是RabbitMQ的專(zhuān)利,將消息持久化可以確保RabbitMQ重啟或者死機(jī)過(guò)程中不至于丟掉沒(méi)有消費(fèi)的消息。
這點(diǎn)要靠消費(fèi)端來(lái)完成,盡管消費(fèi)端可以通過(guò)ACK來(lái)通知消息隊(duì)列消息已經(jīng)被消費(fèi),但如果消費(fèi)端消費(fèi)了消息,此時(shí)ACK過(guò)程中的通知出現(xiàn)異常,消息隊(duì)列會(huì)認(rèn)為消息未被消費(fèi)會(huì)繼續(xù)發(fā)給消費(fèi)端。
初次安裝可能會(huì)出現(xiàn)一堆問(wèn)題,特別是需要安裝所依賴(lài)的眾多包。RabbitMQ與Erlang可能存在版本依賴(lài)問(wèn)題待后續(xù)確認(rèn)。spring boot下集成RabbitMQ異常簡(jiǎn)單,可以根據(jù)需求部署集群來(lái)實(shí)現(xiàn)可擴(kuò)展高可用的消息系統(tǒng)。
聯(lián)系客服