1、將連接Mysql數(shù)據(jù)庫的jar文件,放到ActiveMQ的lib目錄下
2、修改ActiveMQ的conf目錄下的active.xml文件,修改數(shù)據(jù)持久化的方式
2.1 修改原來的kshadb的持久化數(shù)據(jù)的方式
- <persistenceAdapter>
- <!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
- <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
- </persistenceAdapter>
2.2 連接Mysql的配置(注意配置文件放置的位置)
- <!-- 用于持久化數(shù)據(jù)到Mysql數(shù)據(jù)庫 -->
- <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
- <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
- <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>
- <property name="username" value="root"/>
- <property name="password" value="1234"/>
- <property name="maxActive" value="200"/>
- <property name="poolPreparedStatements" value="true"/>
-
- </bean>
3、將數(shù)據(jù)持久化Mysql的運行截圖
3.1 重新啟動ActiveMQ,并運行程序,放入持久化數(shù)據(jù),查看Mysql的active數(shù)據(jù)庫
4、放入持久化數(shù)據(jù)的代碼
- import javax.jms.Connection;
- import javax.jms.ConnectionFactory;
- import javax.jms.DeliveryMode;
- import javax.jms.Destination;
- import javax.jms.MessageProducer;
- import javax.jms.Session;
- import javax.jms.TextMessage;
-
- import org.apache.activemq.ActiveMQConnectionFactory;
-
- public class Sender {
-
- public static void main(String[] args) throws Exception {
-
- // 1、建立ConnectionFactory工廠對象,需要填入用戶名,密碼,以及連接的地址
- // 僅使用默認。端口號為"tcp://localhost:61616"
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
- "zhangsan",// ActiveMQConnectionFactory.DEFAULT_USER,
- "123",// ActiveMQConnectionFactory.DEFAULT_PASSWORD,
- "tcp://localhost:61616");
- // 2、通過ConnectionFactory工廠對象創(chuàng)建一個Connection連接
- // 并且調(diào)用Connection的start方法開啟連接,Connection默認是不開啟的
- Connection connection = connectionFactory.createConnection();
- connection.start();
-
- // 3、通過Connection對象創(chuàng)建Session會話(上下文環(huán)境對象),
- // 參數(shù)一,表示是否開啟事務
- // 參數(shù)二,表示的是簽收模式,一般使用的有自動簽收和客戶端自己確認簽收
-
- // 第一個參數(shù)設置為true,表示開啟事務
- // 開啟事務后,記得要手動提交事務
-
- Session session = connection.createSession(Boolean.TRUE,
- Session.CLIENT_ACKNOWLEDGE);
-
- // 4、通過Session創(chuàng)建Destination對象,指的是一個客戶端用來指定生產(chǎn)消息目標和消費消息來源的對象。
- // 在PTP模式中,Destination指的是Queue
- // 在發(fā)布訂閱模式中,Destination指的是Topic
- Destination destination = session.createQueue("queue1");
-
- // 5、使用Session來創(chuàng)建消息對象的生產(chǎn)者或者消費者
- MessageProducer messageProducer = session.createProducer(destination);
-
- // 6、如果是,生產(chǎn)者,使用MessageProducer的setDeliverMode方法設置,消息的持久化和非持久化
- messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
- // 7、最后使用JMS規(guī)范的TextMessage形式創(chuàng)建數(shù)據(jù)(通過Session對象)
- // 并利用MessageProducer的send方法發(fā)送數(shù)據(jù)
- for (int i = 0; i < 5; i++) {
- TextMessage textMessage = session.createTextMessage();
- textMessage.setText("我是消息" + i);
- messageProducer.send(textMessage);
- }
-
- // 手動提交開啟的事務
- session.commit();
-
- // 釋放連接
- if (connection != null) {
- connection.close();
- }
- }
- }
本站僅提供存儲服務,所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請
點擊舉報。