在Spring 2.0之前,Spring的JMS的作用局限于產(chǎn)生消息。這個功能(封裝在 JmsTemplate 類中)當(dāng)然是很好的,但是,它沒有描述完整的JMS堆棧,比如像消息的 異步 產(chǎn)生和消耗。JMS堆棧缺少的這一部分已經(jīng)被添加,Spring 2.0現(xiàn)在提供對消息異步消耗的完整支持。
讓我們從一個例子開始。
首先我們打開ActiveMQ。從ActiveMQ的安裝路徑上的bin目錄,那里有一個ActiveMQ.bat,雙擊執(zhí)行即可。不過要注意必須先設(shè)置java_home環(huán)境變量。ActiveMQ默認(rèn)的服務(wù)端口是61616。
然后我們開始配置Spring配置文件。我起名為spring-jms.xml
<bean id="connectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="jmsTemplate"
class="org.springframework.jms.core.JmsTemplate102">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="timeToLive" value="86400000"/>
<property name="defaultDestinationName" value="cmpp" />
<property name="messageConverter" ref="messageConverter" />
<property name="receiveTimeout" value="30000" />
</bean>
<!-- Spring JMS SimpleConverter -->
<bean id="simpleConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" />
<!-- Message Converter -->
<bean id="messageConverter"
class="com.liangj.apmgt.jms.ApmgtMessageConverter">
<property name="converter">
<ref local="simpleConverter" />
</property>
</bean>
<!-- Message porducer -->
<bean id="producer"
class="com.liangj.apmgt.jms.DefaultApmgtMessageProducer">
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean
class="com.liangj.apmgt.jms.DefaultApmgtMessageListener" />
</constructor-arg>
<property name="defaultListenerMethod" value="onMessage" />
<property name="messageConverter" ref="messageConverter" />
</bean>
<!-- and this is the attendant message listener container -->
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destinationName" value="${jms.destinationName.cmpp}" />
<property name="messageSelector" value="${jms.messageSelector}" />
<property name="messageListener" ref="messageListener" />
</bean>
public interface IApmgtMessageProducer {
public abstract void sendMessage(ApmgtMessageData messageData);
}
public interface IApmgtMessageListener {
public void onMessage(ApmgtMessageData message);
}
public class DefaultApmgtMessageProducer implements IApmgtMessageProducer {
private JmsTemplate jmsTemplate;
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void sendMessage(ApmgtMessageData messageData) {
this.jmsTemplate.convertAndSend(messageData);
}
}
public class DefaultApmgtMessageListener implements IApmgtMessageListener {
public void onMessage(ApmgtMessageData message) {
System.out.println("監(jiān)聽到消息:"+message);
}
}
public class ApmgtMessageConverter implements MessageConverter {
private Log log = LogFactory.getLog(ApmgtMessageConverter.class);
private SimpleMessageConverter converter;
public void setConverter(SimpleMessageConverter converter) {
this.converter = converter;
}
public Object fromMessage(Message message) throws JMSException, MessageConversionException {
if (message instanceof ObjectMessage) {
ObjectMessage o_message = (ObjectMessage)message;
MessageHeader header = new MessageHeader();
header.setId(message.getLongProperty("id"));
header.setReceiver(message.getIntProperty("receiver"));
header.setSender(message.getIntProperty("sender"));
header.setSendPerson(message.getStringProperty("sendPerson"));
header.setType(message.getIntProperty("type"));
Serializable messageContent = o_message.getObject();
ApmgtMessageData<Serializable> messageData = new ApmgtMessageData<Serializable>();
messageData.setMessageContent(messageContent);
messageData.setMessageHeader(header);
return messageData;
}
return null;
}
public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
if (object instanceof ApmgtMessageData) {
ApmgtMessageData data = (ApmgtMessageData) object;
Message message = converter.toMessage(data.getMessageContent(), session);
message.setLongProperty("id", data.getMessageHeader().getId());
message.setIntProperty("receiver", data.getMessageHeader().getReceiver());
message.setIntProperty("sender", data.getMessageHeader().getSender());
message.setIntProperty("type", data.getMessageHeader().getType());
message.setStringProperty("sendPerson", data.getMessageHeader().getSendPerson());
log.info("發(fā)送消息[MessageSender]:\n" + message);
return message;
} else {
return null;
}
}
}
public class ApmgtMessageData<T extends Serializable>{
protected T messageContent;
protected MessageHeader messageHeader;
public T getMessageContent() {
return this.messageContent;
}
public MessageHeader getMessageHeader() {
return this.messageHeader;
}
public void setMessageContent(T messageContent) {
this.messageContent = messageContent;
}
public void setMessageHeader(MessageHeader messageHeader) {
this.messageHeader = messageHeader;
}
}
public class MessageHeader {
/**
* 消息ID
*/
private long id;
/**
* 消息類型
*/
private int type;
/**
* 消息發(fā)送方,發(fā)送消息的模塊
*/
private int sender;
/**
* 消息接收方,接收消息的模塊
*/
private int receiver;
/**
* 消息發(fā)送者,具體的用戶
*/
private String sendPerson;
public MessageHeader(){
this.id = System.currentTimeMillis() ;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getSendPerson() {
return sendPerson;
}
public void setSendPerson(String sendPerson) {
this.sendPerson = sendPerson;
}
public int getReceiver() {
return receiver;
}
public void setReceiver(int receiver) {
this.receiver = receiver;
}
public int getSender() {
return sender;
}
public void setSender(int sender) {
this.sender = sender;
}
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
}
public class ModPasswordRequest implements Serializable{
private static final long serialVersionUID = 1L;
/**
* 舊密碼
*/
private String oldPassword;
/**
* 新密碼
*/
private String newPassword;
public String getNewPassword() {
return newPassword;
}
public void setNewPassword(String newPassword) {
this.newPassword = newPassword;
}
public String getOldPassword() {
return oldPassword;
}
public void setOldPassword(String oldPassword) {
this.oldPassword = oldPassword;
}
}
public class ApmgtModPasswordRequest extends ApmgtMessageData<ModPasswordRequest> {
private static final int REQ_MODPASSWORD = 0;
private static final int INTF = 1;
private static final int APMGT = 2;
public void init(){
messageHeader = new MessageHeader();
messageContent = new ModPasswordRequest();
messageHeader.setType(REQ_MODPASSWORD);
messageHeader.setSender(INTF);
messageHeader.setReceiver(APMGT);
messageContent.setNewPassword("123456");
messageContent.setOldPassword("654321");
}
}
public class Main {
public static void main(final String[] args) throws Exception {
PropertyConfigurator.configure("log4j.properties");
AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(new String[] { "spring-jms.xml" });
// ctx.registerShutdownHook();
IApmgtMessageProducer producer = (IApmgtMessageProducer)ctx.getBean("producer");
ApmgtModPasswordRequest messageData = new ApmgtModPasswordRequest();
messageData.setMessageHeader(new MessageHeader());
messageData.setMessageContent(new ModPasswordRequest());
messageData.init();
producer.sendMessage(messageData);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="[url]http://www.springframework.org/schema/beans"[/url]
xmlns:xsi="[url]http://www.w3.org/2001/XMLSchema-instance"[/url]
xsi:schemaLocation="[url]http://www.springframework.org/schema/beans[/url][url]http://www.springframework.org/schema/beans/spring-beans.xsd">[/url]
<bean id="propertyConfigurer"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>apmgt.properties</value>
</list>
</property>
</bean>
<!-- ####################################### -->
<!-- JMS Spring Beans -->
<!-- ####################################### -->
<!-- Jms ConnectionFactory -->
<bean id="connectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${jms.brokerURL}" />
</bean>
<!-- Spring JMS SimpleConverter -->
<bean id="simpleConverter"
class="org.springframework.jms.support.converter.SimpleMessageConverter" />
<!-- JMS Queue Template -->
<bean id="jmsTemplate"
class="org.springframework.jms.core.JmsTemplate102">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="timeToLive" value="${jms.timeToLive}"/>
<property name="defaultDestinationName" value="${jms.destinationName.cmpp}" />
<property name="messageConverter" ref="messageConverter" />
<property name="receiveTimeout" value="${jms.receiveTimeout}" />
</bean>
<!-- Message Converter -->
<bean id="messageConverter"
class="com.liangj.apmgt.jms.ApmgtMessageConverter">
<property name="converter">
<ref local="simpleConverter" />
</property>
</bean>
<!-- Message porducer -->
<bean id="producer"
class="com.liangj.apmgt.jms.DefaultApmgtMessageProducer">
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean
class="com.liangj.apmgt.jms.DefaultApmgtMessageListener" />
</constructor-arg>
<property name="defaultListenerMethod" value="onMessage" />
<property name="messageConverter" ref="messageConverter" />
</bean>
<!-- and this is the attendant message listener container -->
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destinationName" value="${jms.destinationName.cmpp}" />
<property name="messageSelector" value="${jms.messageSelector}" />
<property name="messageListener" ref="messageListener" />
</bean>
</beans>
#jms properties
jms.brokerURL=tcp://localhost:61616
jms.receiveTimeout=3000
jms.destinationName.cmpp=cmpp
jms.messageSelector=receiver=2
#one day is 86400000 ms. 0 is means that it lives forever.
jms.timeToLive=86400000