电脑
intelliJ idea,jdk1.8,mysql
在集成消息中间件的时候我们首先需要将activeMQ的中间件环境搭建起来然后才能做集成。activeMQ的环境搭建我之前也写过一篇经验,不懂的可以直接点击下方的链接。
下方链接是springboot项目(7),是一个系列的,都是在同一个项目上进行增加功能。有兴趣可以我之前的文章。废话不多说,先引入集成activeMQ的依赖,pom.xml文件中加入:
配置yml文件:spring: activemq: #主要这边的端口是61616,可在activemq.xml文件中自行修改对口 broker-url: tcp://localhost:61616 user: admin password: admin close-timeout: 15s
创建activeMQ配置文件类,初始化Queue(队列)和Topic(话题)这方面的监听。配置文件内容如下: import org.apache.activemq.command.ActiveMQQueue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import javax.jms.ConnectionFactory; import javax.jms.Queue; /** * Created by 30721 on 2019/7/9. */ @Configuration public class ActiveMQConfig { public final static String JMS_LISTENER_CONTAINER_FACTORY_QUEUE = 'jmsListenerContainerFactoryQueue'; public final static String JMS_LISTENER_CONTAINER_FACTORY_TOPIC = 'jmsListenerContainerFactoryTopic'; public final static String JMS_QUEUE = 'jmsQueue'; public final static String JMS_TOPIC = 'jmsTopic'; @Bean(JMS_LISTENER_CONTAINER_FACTORY_TOPIC) public JmsListenerContainerFactory> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setPubSubDomain(true); factory.setConnectionFactory(activeMQConnectionFactory); return factory; } @Bean(JMS_LISTENER_CONTAINER_FACTORY_QUEUE) public JmsListenerContainerFactory> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setPubSubDomain(false); factory.setConnectionFactory(activeMQConnectionFactory); return factory; } }
创建生产者队列类,代码如下: import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import javax.jms.Destination; /** * 生产者 * Created by 30721 on 2019/7/9. */ @Service public class ActiveMQQueueProducer { private final static Logger logger = LoggerFactory.getLogger(ActiveMQQueueProducer.class); @Resource private JmsTemplate jmsTemplate; public void sendQueueMessage(Destination destination, String message) { logger.info('生产者生成一条队列消息:{}。', message); this.jmsTemplate.convertAndSend(destination,message); } }
创建消费者队列,代码如下:import cn.cy.config.ActiveMQConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Service; /** * 消费者 * Created by 30721 on 2019/7/9. */ @Service public class ActiveMQQueueConsumer { private Logger logger = LoggerFactory.getLogger(ActiveMQQueueConsumer.class); @JmsListener(destination = ActiveMQConfig.JMS_QUEUE,containerFactory = ActiveMQConfig.JMS_LISTENER_CONTAINER_FACTORY_QUEUE) public void onQueueMessage(String msg) { logger.info('接收到一条队列消息:{}',msg); } }
创建主题发布者,代码如下: import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import javax.jms.Destination; /** * 主题消息发布者 * Created by 30721 on 2019/7/9. */ @Service public class ActiveMQTopicPublisher { private final static Logger logger = LoggerFactory.getLogger(ActiveMQTopicPublisher.class); @Resource private JmsTemplate jmsTemplate; public void sendTopicMessage(Destination destination, String message) { logger.info('发布者发布一条话题:{}。', message); this.jmsTemplate.convertAndSend(destination,message); } }
创建主题订阅者,代码如下: import cn.cy.config.ActiveMQConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Service; /** * 主题消息订阅者 * Created by 30721 on 2019/7/9. */ @Service public class ActiveMQTopicSubscriber { private Logger logger = LoggerFactory.getLogger(ActiveMQQueueConsumer.class); @JmsListener(destination = ActiveMQConfig.JMS_TOPIC,containerFactory = ActiveMQConfig.JMS_LISTENER_CONTAINER_FACTORY_TOPIC) public void onTopicMessage(String msg) { logger.info('订阅者收到一条主题:{}',msg); } }
测试代码如下: import cn.cy.config.ActiveMQConfig; import cn.cy.service.jms.ActiveMQQueueProducer; import cn.cy.service.jms.ActiveMQTopicPublisher; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource; /** * Created by 30721 on 2019/7/9. */ @RunWith(SpringRunner.class) @SpringBootTest public class JmsTest { @Resource private ActiveMQQueueProducer activeMQQueueProducer; @Resource private ActiveMQTopicPublisher activeMQTopicPublisher; @Test public void test() { activeMQQueueProducer.sendQueueMessage(new ActiveMQQueue(ActiveMQConfig.JMS_QUEUE), '我是队列'); activeMQTopicPublisher.sendTopicMessage(new ActiveMQTopic(ActiveMQConfig.JMS_TOPIC), '我是话题'); } }测试的结果如下图:接收消息,消化消息。
知识点小记:消息中间件并不是为了提高系统性能,消息中间件更多是用在分布式的环境当中,为了让系统解耦,还有就是消息中间件更多的是以分布式集群存在的,这是为了防止消息丢失,而重复消费问题就需要代码进行去重处理。