ActiveMQ & Spring 集成

Scroll Down

ActiveMQ & Spring 集成

导入Maven依赖

<!-- jms依赖 -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>${spring.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.15.9</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.10.2</version>
</dependency>

<!--spring 依赖-->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>${spring.version}</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-core</artifactId>
    <version>${spring.version}</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-beans</artifactId>
    <version>${spring.version}</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-web</artifactId>
    <version>${spring.version}</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webmvc</artifactId>
    <version>${spring.version}</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-aop</artifactId>
    <version>${spring.version}</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-tx</artifactId>
    <version>${spring.version}</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jdbc</artifactId>
    <version>${spring.version}</version>
</dependency>

配置xml

<!-- 扫描包 -->
<content:component-scan base-package="com.lucas.spring"/>
<!-- 配置生产者 -->
<bean id="jmsFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory" destroy-method="stop">
    <property name="connectionFactory">
        <bean class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://192.168.189.104:61616"/>
        </bean>
    </property>
    <property name="maxConnections" value="100"></property>
</bean>

<!-- 配置目的地队列 -->
<!-- 可以配置多个进行更换 -->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg index="0" value="spring-active-queue"/>
</bean>

<!-- 配置spring提供的工具类 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="jmsFactory"/>
    <property name="defaultDestination" ref="destinationQueue"/>
    <property name="messageConverter">
        <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
    </property>
</bean>

生产者配置

@Service
public class Producer {
    @Autowired
    private JmsTemplate jmsTemplate;

    public void send()
    {
        jmsTemplate.send((session) ->{
            return session.createTextMessage("这是一条spring的消息");
        } );
    }
}

消费者配置

@Service
public class Consumer {
    @Autowired
    private JmsTemplate jmsTemplate;
    
    public void receive(){
        Message receive = jmsTemplate.receive();
        System.out.println(jmsTemplate.receiveAndConvert());
    }
}

配置主题

在applicationContext.xml中配置

<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg index="0" value="spring-topic"/>
</bean>

<!-- 配置spring提供的工具类 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="jmsFactory"/>
    <property name="defaultDestination" ref="destinationTopic"/>
    <property name="messageConverter">
        <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"></bean>
    </property>
</bean>

其余代码的都和上面的一样

使用消息监听

先创建一个MessageListener的实现类

public class MyMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (null!=message && message instanceof TextMessage)
        {
            try {
                System.out.println(((TextMessage) message).getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

再在applicationContext.xml中增加配置

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="jmsFactory"/>
    <property name="destination" ref="destinationTopic"/>
    <property name="messageListener">
        <bean class="com.lucas.spring.MyMessageListener"/>
    </property>
</bean>

就可以直接实现监听了

ActiveMQ和spring boot整合

导入maven依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
    <version>2.2.7.RELEASE</version>
</dependency>

编辑配置文件

server:
  port: 8080
spring:
  activemq:
    broker-url: tcp://192.168.189.104:61616
    user: admin
    password: admin
  jms:
    pub-sub-domain: false   # 队列为false,主题为true

myQueue: boot-queue   # 定义一个队列的名字

编写配置类

@EnableJms
@Configuration
public class ActiveMQConfig {
    //获取配置文件中的内容
    @Value("${myQueue}")
    private String myQueue;

    @Bean
    public Queue queue()
    {
        return new ActiveMQQueue(myQueue);
    }
}

注意@EnableJms这个注解

编写生产者

@Service
public class ProducerService {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Queue queue;

    public void send()
    {
        jmsMessagingTemplate.convertAndSend(queue,"test-springBoot-activeMQ");
    }
}

使用JmsMessagingTemplate发送信息,这个的功能必JmsTemplate更加强大,但是JmsTemplate一样在SpringBoot中可用

编写消费者

阻塞式消费

@Service
public class ConsumerService {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Queue queue;

    public void receive()
    {
        System.out.println(jmsMessagingTemplate.receiveAndConvert(queue, String.class));
    }
}

写法和生产者差不多

监听器

springboot使用更方便的监听器的设置方式

@Service
public class ConsumerService {

    @JmsListener(destination = "${myQueue}")
    public void receive(TextMessage textMessage) throws JMSException {
        System.out.println(textMessage.getText());
    }
}

spring-boot主题

配置文件

server:
  port: 8080
spring:
  activemq:
    broker-url: tcp://192.168.189.104:61616
    user: admin
    password: admin
  jms:
    pub-sub-domain: true   # 队列为false,主题为true

myTopic: boot-topic   # 定义一个队列的名字

配置类

@EnableJms
@Configuration
public class ActiveMQConfig {
    
    @Value("${myTopic}")
    private String myTopic;

    public Topic myTopic()
    {
        return new ActiveMQTopic(myTopic);
    }
}

生产者

@Service
public class ProducerService {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Topic topic;

    public void send()
    {
        jmsMessagingTemplate.convertAndSend(topic,"test-springBoot-activeMQ");
    }
}