多语言展示
当前在线:1565今日阅读:60今日分享:41

springboot项目(8)集成activeMQ

ActiveMQ是一个纯Java程序编写的消息中间件,是个开源项目。消息中间件也是工作中非常常用到的一部分,所以我继之前的项目做一个消息中间件的集成。
工具/原料
1

电脑

2

intelliJ idea,jdk1.8,mysql

方法/步骤
1

在集成消息中间件的时候我们首先需要将activeMQ的中间件环境搭建起来然后才能做集成。activeMQ的环境搭建我之前也写过一篇经验,不懂的可以直接点击下方的链接。

2

下方链接是springboot项目(7),是一个系列的,都是在同一个项目上进行增加功能。有兴趣可以我之前的文章。废话不多说,先引入集成activeMQ的依赖,pom.xml文件中加入:      org.springframework.boot     spring-boot-starter-activemq 

3

配置yml文件:spring:  activemq:    #主要这边的端口是61616,可在activemq.xml文件中自行修改对口    broker-url: tcp://localhost:61616     user: admin     password: admin     close-timeout: 15s

4

创建activeMQ配置文件类,初始化Queue(队列)和Topic(话题)这方面的监听。配置文件内容如下: import org.apache.activemq.command.ActiveMQQueue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory;  import javax.jms.ConnectionFactory; import javax.jms.Queue;  /**  * Created by 30721 on 2019/7/9.  */ @Configuration public class ActiveMQConfig {      public final static String JMS_LISTENER_CONTAINER_FACTORY_QUEUE = 'jmsListenerContainerFactoryQueue';      public final static String JMS_LISTENER_CONTAINER_FACTORY_TOPIC = 'jmsListenerContainerFactoryTopic';      public final static String JMS_QUEUE = 'jmsQueue';      public final static String JMS_TOPIC = 'jmsTopic';    @Bean(JMS_LISTENER_CONTAINER_FACTORY_TOPIC)     public JmsListenerContainerFactory jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {         DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();         factory.setPubSubDomain(true);         factory.setConnectionFactory(activeMQConnectionFactory);         return factory;     }      @Bean(JMS_LISTENER_CONTAINER_FACTORY_QUEUE)     public JmsListenerContainerFactory jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {         DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();         factory.setPubSubDomain(false);         factory.setConnectionFactory(activeMQConnectionFactory);         return factory;     } }

5

创建生产者队列类,代码如下: import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service;  import javax.annotation.Resource; import javax.jms.Destination;  /**  * 生产者  * Created by 30721 on 2019/7/9.  */ @Service public class ActiveMQQueueProducer {      private final static Logger logger = LoggerFactory.getLogger(ActiveMQQueueProducer.class);      @Resource     private JmsTemplate jmsTemplate;      public void sendQueueMessage(Destination destination, String message) {         logger.info('生产者生成一条队列消息:{}。', message);         this.jmsTemplate.convertAndSend(destination,message);     } }

6

创建消费者队列,代码如下:import cn.cy.config.ActiveMQConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Service;  /**  * 消费者  * Created by 30721 on 2019/7/9.  */ @Service public class ActiveMQQueueConsumer {      private Logger logger = LoggerFactory.getLogger(ActiveMQQueueConsumer.class);      @JmsListener(destination = ActiveMQConfig.JMS_QUEUE,containerFactory = ActiveMQConfig.JMS_LISTENER_CONTAINER_FACTORY_QUEUE)     public void onQueueMessage(String msg) {         logger.info('接收到一条队列消息:{}',msg);     }  }

7

创建主题发布者,代码如下: import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service;  import javax.annotation.Resource; import javax.jms.Destination;  /**  * 主题消息发布者  * Created by 30721 on 2019/7/9.  */ @Service public class ActiveMQTopicPublisher {      private final static Logger logger = LoggerFactory.getLogger(ActiveMQTopicPublisher.class);      @Resource     private JmsTemplate jmsTemplate;      public void sendTopicMessage(Destination destination, String message) {         logger.info('发布者发布一条话题:{}。', message);         this.jmsTemplate.convertAndSend(destination,message);     } }

8

创建主题订阅者,代码如下: import cn.cy.config.ActiveMQConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Service;  /**  * 主题消息订阅者  * Created by 30721 on 2019/7/9.  */ @Service public class ActiveMQTopicSubscriber {      private Logger logger = LoggerFactory.getLogger(ActiveMQQueueConsumer.class);      @JmsListener(destination = ActiveMQConfig.JMS_TOPIC,containerFactory = ActiveMQConfig.JMS_LISTENER_CONTAINER_FACTORY_TOPIC)     public void onTopicMessage(String msg) {         logger.info('订阅者收到一条主题:{}',msg);     } }

测试代码
1

测试代码如下: import cn.cy.config.ActiveMQConfig; import cn.cy.service.jms.ActiveMQQueueProducer; import cn.cy.service.jms.ActiveMQTopicPublisher; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;  import javax.annotation.Resource;  /**  * Created by 30721 on 2019/7/9.  */ @RunWith(SpringRunner.class) @SpringBootTest public class JmsTest {      @Resource     private ActiveMQQueueProducer activeMQQueueProducer;     @Resource     private ActiveMQTopicPublisher activeMQTopicPublisher;      @Test     public void test() {         activeMQQueueProducer.sendQueueMessage(new ActiveMQQueue(ActiveMQConfig.JMS_QUEUE), '我是队列');         activeMQTopicPublisher.sendTopicMessage(new ActiveMQTopic(ActiveMQConfig.JMS_TOPIC), '我是话题');     } }测试的结果如下图:接收消息,消化消息。

2

知识点小记:消息中间件并不是为了提高系统性能,消息中间件更多是用在分布式的环境当中,为了让系统解耦,还有就是消息中间件更多的是以分布式集群存在的,这是为了防止消息丢失,而重复消费问题就需要代码进行去重处理。

推荐信息