ActiveMQ & Spring 集成
导入Maven依赖
<!-- jms依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.9</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.2</version>
</dependency>
<!--spring 依赖-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
配置xml
<!-- 扫描包 -->
<content:component-scan base-package="com.lucas.spring"/>
<!-- 配置生产者 -->
<bean id="jmsFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.189.104:61616"/>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<!-- 配置目的地队列 -->
<!-- 可以配置多个进行更换 -->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-active-queue"/>
</bean>
<!-- 配置spring提供的工具类 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"/>
<property name="defaultDestination" ref="destinationQueue"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
生产者配置
@Service
public class Producer {
@Autowired
private JmsTemplate jmsTemplate;
public void send()
{
jmsTemplate.send((session) ->{
return session.createTextMessage("这是一条spring的消息");
} );
}
}
消费者配置
@Service
public class Consumer {
@Autowired
private JmsTemplate jmsTemplate;
public void receive(){
Message receive = jmsTemplate.receive();
System.out.println(jmsTemplate.receiveAndConvert());
}
}
配置主题
在applicationContext.xml中配置
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic"/>
</bean>
<!-- 配置spring提供的工具类 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"/>
<property name="defaultDestination" ref="destinationTopic"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"></bean>
</property>
</bean>
其余代码的都和上面的一样
使用消息监听
先创建一个MessageListener
的实现类
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
if (null!=message && message instanceof TextMessage)
{
try {
System.out.println(((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
再在applicationContext.xml中增加配置
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsFactory"/>
<property name="destination" ref="destinationTopic"/>
<property name="messageListener">
<bean class="com.lucas.spring.MyMessageListener"/>
</property>
</bean>
就可以直接实现监听了
ActiveMQ和spring boot整合
导入maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
编辑配置文件
server:
port: 8080
spring:
activemq:
broker-url: tcp://192.168.189.104:61616
user: admin
password: admin
jms:
pub-sub-domain: false # 队列为false,主题为true
myQueue: boot-queue # 定义一个队列的名字
编写配置类
@EnableJms
@Configuration
public class ActiveMQConfig {
//获取配置文件中的内容
@Value("${myQueue}")
private String myQueue;
@Bean
public Queue queue()
{
return new ActiveMQQueue(myQueue);
}
}
注意@EnableJms
这个注解
编写生产者
@Service
public class ProducerService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
public void send()
{
jmsMessagingTemplate.convertAndSend(queue,"test-springBoot-activeMQ");
}
}
使用JmsMessagingTemplate
发送信息,这个的功能必JmsTemplate
更加强大,但是JmsTemplate
一样在SpringBoot中可用
编写消费者
阻塞式消费
@Service
public class ConsumerService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
public void receive()
{
System.out.println(jmsMessagingTemplate.receiveAndConvert(queue, String.class));
}
}
写法和生产者差不多
监听器
springboot使用更方便的监听器的设置方式
@Service
public class ConsumerService {
@JmsListener(destination = "${myQueue}")
public void receive(TextMessage textMessage) throws JMSException {
System.out.println(textMessage.getText());
}
}
spring-boot主题
配置文件
server:
port: 8080
spring:
activemq:
broker-url: tcp://192.168.189.104:61616
user: admin
password: admin
jms:
pub-sub-domain: true # 队列为false,主题为true
myTopic: boot-topic # 定义一个队列的名字
配置类
@EnableJms
@Configuration
public class ActiveMQConfig {
@Value("${myTopic}")
private String myTopic;
public Topic myTopic()
{
return new ActiveMQTopic(myTopic);
}
}
生产者
@Service
public class ProducerService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
public void send()
{
jmsMessagingTemplate.convertAndSend(topic,"test-springBoot-activeMQ");
}
}