国产一级a片免费看高清,亚洲熟女中文字幕在线视频,黄三级高清在线播放,免费黄色视频在线看

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
Spring與ActiveMQ整合(多線程并發(fā)發(fā)送與接收消息)

本文博客介紹生產(chǎn)者和消費者方都使用多線程技術(shù)并發(fā)發(fā)送和接收消息。
1.生產(chǎn)者
創(chuàng)建固定線程數(shù)3的線程池,且發(fā)送方開啟了四個線程任務(wù)。

package com.spring.thread.jms;import java.util.Random;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.springframework.context.support.ClassPathXmlApplicationContext;public class JmsThreadTest {      public static void main(String[] args) {          ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("beans.xml");            final MessageThreadService ms=(MessageThreadService)(ctx.getBean("messageThreadService"));          ExecutorService threadPool=Executors.newFixedThreadPool(3);          for (int i = 1; i <= 4; i++){              threadPool.execute(new Runnable() {                    @Override                    public void run() {                        for (int i = 0; i < 10; i++) {                              try {                                Thread.sleep(new Random().nextInt(3)*500);                            } catch (InterruptedException e) {                                e.printStackTrace();                            }                              System.out.println(Thread.currentThread().getName()+"處理發(fā)送消息"+i);                              ms.sendMessage("你好:"+Thread.currentThread().getName()+"的消息"+i);                          }                         }                });          }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
package com.spring.thread.jms;import javax.annotation.Resource;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.stereotype.Component;@Component("messageThreadService")public class MessageThreadService {    private JmsTemplate jmsTemplate;    private Destination mQueue2;    @Resource    public void setmQueue2(Destination mQueue2) {        this.mQueue2 = mQueue2;    }    @Resource    public void setJmsTemplate(JmsTemplate jmsTemplate) {        this.jmsTemplate = jmsTemplate;    }    public void sendMessage(final String message){        jmsTemplate.send(mQueue2, new MessageCreator() {            @Override            public Message createMessage(Session session) throws JMSException {                 return session.createTextMessage(message);            }        });    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

2.消費者
消費者這邊有兩種方式去并發(fā)接收消息
1)在onMessage方法中,跟生產(chǎn)者那邊一樣創(chuàng)建線程池去完成接收任務(wù),本例中中有6個線程去處理接收任務(wù)
2)可在spring的xml文件中配置并發(fā)的線程數(shù)
方式一:代碼中手動創(chuàng)建線程池

package com.spring.thread.jms;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;import org.springframework.stereotype.Component;@Component("consumerThreadMessageListener")public class ConsumerThreadMessageListener implements MessageListener {    private ExecutorService threadPool=Executors.newFixedThreadPool(6);    @Override    public void onMessage(final Message message) {        threadPool.execute(new Runnable() {            @Override            public void run() {                try {                    System.out.println("接收線程"+Thread.currentThread().getName()+"接收消息:"+((TextMessage)message).getText());                } catch (JMSException e) {                    e.printStackTrace();                }            }        });                         }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

配置文件

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       xmlns:context="http://www.springframework.org/schema/context"       xmlns:jms="http://www.springframework.org/schema/jms"       xsi:schemaLocation="http://www.springframework.org/schema/beans        http://www.springframework.org/schema/beans/spring-beans-2.5.xsd        http://www.springframework.org/schema/context        http://www.springframework.org/schema/context/spring-context-2.5.xsd        http://www.springframework.org/schema/jms        http://www.springframework.org/schema/jms/spring-jms-2.5.xsd" >     <context:annotation-config />     <context:component-scan base-package="com.spring"/>    <!-- 真正可以產(chǎn)生Connection的ConnectionFactory,由對應(yīng)的 JMS服務(wù)廠商提供-->    <bean id="connectinFactory" class="org.apache.activemq.ActiveMQConnectionFactory">          <property name="brokerURL" value="tcp://localhost:61616" />    </bean>    <!-- Spring Caching連接工廠 -->    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->    <bean id="cachingConnectionFactory"        class="org.springframework.jms.connection.CachingConnectionFactory">        <!-- 目標ConnectionFactory對應(yīng)真實的可以產(chǎn)生JMS Connection的ConnectionFactory -->        <property name="targetConnectionFactory" ref="connectinFactory"></property>            <!-- Session緩存數(shù)量 -->        <property name="sessionCacheSize" value="10"></property>     </bean>    <!-- 配置消息發(fā)送目的地方式 -->    <!-- Queue隊列:僅有一個訂閱者會收到消息,消息一旦被處理就不會存在隊列中 -->    <bean id="mQueue2" class="org.apache.activemq.command.ActiveMQQueue">         <constructor-arg index="0" value="MessageQueue2" />    </bean>     <!-- Spring JMS Template 配置JMS模版 -->    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">         <property name="connectionFactory" ref="cachingConnectionFactory" />    </bean>     <jms:listener-container container-type="default" connection-factory="cachingConnectionFactory" acknowledge="auto">         <jms:listener destination="MessageQueue2" ref="consumerThreadMessageListener"/>     </jms:listener-container></beans>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42


注:實際情況是消息發(fā)送的時候,監(jiān)聽器就已經(jīng)在接收數(shù)據(jù)了,為了看的清楚,我設(shè)置斷點在接收方阻塞住了,以讓發(fā)送方完全發(fā)送完數(shù)據(jù)的時候才去接收消息,所以得出上面的圖。

方式二:xml配置線程數(shù)
在接收消息方也通過修改xml配置接收消息的并發(fā)線程數(shù),這樣在onMessage()方法中就不需要創(chuàng)建線程池去處理了。只需要添加concurrentConsumers就可以了。
修改如下

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       xmlns:context="http://www.springframework.org/schema/context"       xmlns:jms="http://www.springframework.org/schema/jms"       xsi:schemaLocation="http://www.springframework.org/schema/beans        http://www.springframework.org/schema/beans/spring-beans-2.5.xsd        http://www.springframework.org/schema/context        http://www.springframework.org/schema/context/spring-context-2.5.xsd        http://www.springframework.org/schema/jms        http://www.springframework.org/schema/jms/spring-jms-2.5.xsd" >     <context:annotation-config />     <context:component-scan base-package="com.spring"/>    <!-- 真正可以產(chǎn)生Connection的ConnectionFactory,由對應(yīng)的 JMS服務(wù)廠商提供-->    <bean id="connectinFactory" class="org.apache.activemq.ActiveMQConnectionFactory">          <property name="brokerURL" value="tcp://localhost:61616" />    </bean>    <!-- Spring Caching連接工廠 -->    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->    <bean id="cachingConnectionFactory"        class="org.springframework.jms.connection.CachingConnectionFactory">        <!-- 目標ConnectionFactory對應(yīng)真實的可以產(chǎn)生JMS Connection的ConnectionFactory -->        <property name="targetConnectionFactory" ref="connectinFactory"></property>            <!-- Session緩存數(shù)量 -->        <property name="sessionCacheSize" value="10"></property>     </bean>    <!-- 配置消息發(fā)送目的地方式 -->    <!-- Queue隊列:僅有一個訂閱者會收到消息,消息一旦被處理就不會存在隊列中 -->    <bean id="mQueue2" class="org.apache.activemq.command.ActiveMQQueue">         <constructor-arg index="0" value="MessageQueue2" />    </bean>     <!-- Spring JMS Template 配置JMS模版 -->    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">         <property name="connectionFactory" ref="cachingConnectionFactory" />    </bean>    <bean id="ListenerContainer"              class="org.springframework.jms.listener.DefaultMessageListenerContainer">              <property name="connectionFactory" ref="cachingConnectionFactory" />              <property name="receiveTimeout" value="1000" />              <property name="destination" ref="mQueue2" />              <property name="messageListener" ref="consumerThreadMessageListener" />              <property name="taskExecutor" ref="MessageExecutor"></property>              <property name="concurrentConsumers" value="7"></property>      </bean>      <bean id="MessageExecutor"          class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">          <property name="corePoolSize" value="20" />          <property name="maxPoolSize" value="100" />          <property name="daemon" value="true" />          <property name="keepAliveSeconds" value="120" />      </bean>  </beans>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

onMessage方法修改如下:

package com.spring.thread.jms;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;import org.springframework.stereotype.Component;@Component("consumerThreadMessageListener")public class ConsumerThreadMessageListener implements MessageListener {    @Override    public void onMessage(final Message message) {                try {                    System.out.println("接收線程"+Thread.currentThread().getName()+"接收消息:"+((TextMessage)message).getText());                } catch (JMSException e) {                    e.printStackTrace();                }            }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊舉報
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
通過spring開發(fā)ActiveMQ簡單應(yīng)用
JMS在Spring框架下的應(yīng)用
如何使用Spring架構(gòu)中的注釋功能
五、上下文 六、配置文件
使用 Spring 2.5 注釋驅(qū)動的 IoC 功能
Quartz與Spring的整合
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點擊這里聯(lián)系客服!

聯(lián)系客服