Spring Boot 中应用ActiveMQ
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题,实现高性 能、高可用、可伸缩和最终一致性架构,是大型分布式系统不可缺少的中间件。
目前在生产环境中使用较多的消息队列有 ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。
特性
- 异步性:将耗时的同步操作通过以发送消息的方式进行了异步化处理,减少了同步等待的时间。
- 松耦合:消息队列减少了服务之间的耦合性,不同的服务可以通过消息队列进行通信,而不用关心彼此的实现细节,只要定义好消息的格式就行。
- 分布式:通过对消费者的横向扩展,降低了消息队列阻塞的风险,以及单个消费者产生单点故障的可能性(当然消息队列本身也可以做成分布式集群)。
- 可靠性:消息队列一般会把接收到的消息存储到本地硬盘上(当消息被处理完之后,存储信息根据不同的消息队列实现,有可能将其删除),这样即使应用挂掉或者消息队列本身挂掉,消息也能够重新加载。
JMS规范
JMS 即 Java 消息服务(Java Message Service)应用程序接口,是一个 Java 平台中关于面向消息中间件 (MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供支持。
JMS 的消息机制有 2 种模型,一种是 Point to Point,表现为队列的形式,发送的消息,只能被一个接收者取走;另一种是 Topic,可以被多个订阅者订阅,类似于群发。
ActiveMQ 是 JMS 的一个实现。
ActiveMQ介绍
ActiveMQ 是 Apache 软件基金下的一个开源软件,它遵循 JMS1.1 规范(Java Message Service),是消息 驱动中间件软件(MOM)。它为企业消息传递提供高可用、出色性能、可扩展、稳定和安全保障。
ActiveMQ 使用 Apache 许可协议,因此,任何人都可以使用和修改它而不必反馈任何改变。
ActiveMQ 的目标是在尽可能多的平台和语言上提供一个标准的,消息驱动的应用集成。ActiveMQ 实现 JMS 规范并在此之上提供大量额外的特性。ActiveMQ 支持队列和订阅两种模式的消息发送。
Spring Boot 提供了了 ActiveMQ 组件 spring-boot-starter-activemq,⽤来支持 ActiveMQ 在 Spring Boot 体系内使用,下⾯我们来详细了解如何使用。
首先Spring Boot 集成了ActiveMQ,但是这个项目默认只支持队列(Queue)或者广播(Topic)中的一种
而且是通过配置项 spring.jms.pub-sub-domain的值来控制,true为topic模式,false为queue模式,模型情况下支持队列模式。
ActiveMQ的使用-Queue模式
-
添加依赖
1 2 3 4 5
<-- 主要添加的依赖:spring-boot-starter-activemq </--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
-
YAML配置文件
1 2 3 4 5 6 7 8 9 10 11 12
spring: activemq: # 基于内存ActiveMQ in-memory: true # 不适应连接池 pool: enabled: false # 独立的 ActiveMQ # broker-url: tcp://192.168.1.1 # user: admin # password: admin
在使⽤ActiveMQ 时有两种使用方式,一种是使用独立安装的 ActiveMQ,在生产环境推荐使用这种;另一种是使用基于内存 ActiveMQ ,在调试阶段建议使用这种方式。
队列发送的消息, 只能被一个消费者接收。
-
创建队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
import org.apache.activemq.command.ActiveMQQueue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.jms.Queue; @Configuration public class activeMQConfig { @Bean public Queue queue() { // xmj.queue是队列的名字 return new ActiveMQQueue("xmj.queue"); } }
使用 @Configuration 注解在项目启动时,定义了一个队列 queue 命名为:xmj.queue
-
消息生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Component; import javax.jms.Queue; @Component public class Producer { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; public void sendQueue(String msg) { System.out.println("send queue msg: " + msg); this.jmsMessagingTemplate.convertAndSend(this.queue, msg); } }
JmsMessagingTemplate 是 Spring 提供发送消息的工具类,使⽤ JmsMessagingTemplate 和创建好的 queue 对消息进行发送。
-
消息消费者
1 2 3 4 5 6 7 8 9 10 11 12 13
// 消费者1 import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class Consumer { @JmsListener(destination = "xmj.queue") public void receiveQueue(String msg) { System.out.println("Consumer queue msg: "+ msg); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// 消费者2 import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class Consumer2 { @JmsListener(destination = "xmj.queue") public void receiveQueue(String msg) { System.out.println("Consumer2 queue msg: "+ msg); } }
使用注解 @JmsListener(destination = “xmj.queue”),表示此方法监控了了名为 xmj.queue 的队列。当队列xmj.queue 中有消息发送时会触发此方法的执行,text 为消息内容。
根据控制台输出的消息可以看出,当有多个消费者监听一个队列时,消费者会自动均衡负载的接收消息,并且每个消息只能有一个消费者所接收。
-
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import top.withlevi.producer.Producer; @SpringBootTest @RunWith(SpringRunner.class) public class QueueTests { @Autowired private Producer producer; @Test public void sendSimpleQueueMessage() throws InterruptedException { for (int i = 0; i < 1000; i++) { this.producer.sendQueue("Test queue message. " + i); } Thread.sleep(1000L); } }
创建测试方式,使⽤用 producer 发送消息,为了保证容器器可以接收到消息,让测试方法等待 1 秒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# Output result 2023-07-26 10:58:33.217 INFO 22320 --- [ main] top.withlevi.queue.QueueTests : Started QueueTests in 2.003 seconds (JVM running for 3.096) send queue msg: Test queue message. 0 send queue msg: Test queue message. 1 send queue msg: Test queue message. 2 send queue msg: Test queue message. 3 Consumer queue msg: Test queue message. 0 Consumer2 queue msg: Test queue message. 1 send queue msg: Test queue message. 4 Consumer queue msg: Test queue message. 2 Consumer2 queue msg: Test queue message. 3 send queue msg: Test queue message. 5 Consumer queue msg: Test queue message. 4 send queue msg: Test queue message. 6 ----ignored
注意:控制台输出 javax.jms.JMSException: peer (vm://localhost#1) stopped. 报错信息可以忽略,这是 Info 级别的错误,是 ActiveMQ 的⼀一个 bug。
ActiveMQ的使用-Topic模式
-
配置见Queue上面的信息
-
YAML配置文件
1 2 3 4 5 6 7 8
spring: activemq: in-memory: true pool: enabled: false # 开启topic模式 jms: pub-sub-domain: true
-
创建Topic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
import org.apache.activemq.command.ActiveMQTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.jms.Topic; @Configuration public class activeMQConfig { @Bean public Topic topic() { // xmj.topic是消息广播的名字 return new ActiveMQTopic("xmj.topic"); } }
使用 @Configuration 注解在项目启动时,定义了一个广播 Topic 命名为:xmj.topic。
-
消息生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Component; import javax.jms.Topic; @Component public class Producer { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Topic topic; public void sendTopic(String msg) { System.out.println("Send topic msg: " + msg); this.jmsMessagingTemplate.convertAndSend(this.topic, msg); } }
和上面的生产者对比只是 convertAndSend() 方法传入的第一个参数变成了 Topic。
-
消息消费者
1 2 3 4 5 6 7 8 9 10 11 12 13
// Consumer1 import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class Consumer { @JmsListener(destination = "xmj.topic") public void receiveTopic(String msg) { System.out.println("Consumer topic msg: " + msg); } }
1 2 3 4 5 6 7 8 9 10 11 12 13
// Consumer2 import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class Consumer2 { @JmsListener(destination = "xmj.topic") public void receiveTopic(String msg) { System.out.println("Consumer2 topic msg: " + msg); } }
消费者也没有变化,只是监听的名改为上面的 xmj.topic,因为模拟多个消费者,复制一份 Consumer 命名为Consumer2,代码相同在输出中标明来自 Consumer2。
-
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import top.withlevi.producer.Producer; @SpringBootTest @RunWith(SpringRunner.class) public class TopicTest { @Autowired private Producer producer; @Test public void sendSimpleTopicMessage() throws InterruptedException { this.producer.sendTopic("Test topic message"); Thread.sleep(1000L); } }
测试方法执行成功后,会看到控制台输出信息,如下:
1 2 3
Send topic msg: Test topic message Consumer2 topic msg: Test topic message Consumer topic msg: Test topic message
可以看出两个消费者都收到了发送的消息,从而验证广播(Topic)是一个发送者多个消费者的模式。
同时支持队列(Queue)和广播(Topic)
如果需要在同一项目中既支持队列模式也支持广播模式,可以通过 DefaultJmsListenerContainerFactory 创建自定义的 JmsListenerContainerFactory 实例,之后在 @JmsListener 注解中通过 containerFactory 属性引用它。
分别创建两个自定义的 JmsListenerContainerFactory 实例,通过 pubSubDomain 来控制是支持队列模式还是广播模式。
-
添加依赖
和上面两个一样
-
编写YAML文件
1 2 3 4 5 6 7
spring: activemq: in-memory: true # Specifies if ActiveMQ is using in-memory storage pool: enabled: false # Specifies if pooling is enabled for ActiveMQ connections non-blocking-redelivery: false # Specifies if non-blocking redelivery is enabled send-timeout: 0 # Specifies the timeout (in milliseconds) for sending messages
-
创建ActiveMQ配置文件同时支持Queue和Topic模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import javax.jms.ConnectionFactory; @Configuration @EnableJms public class ActiveMQConfig { @Bean("queueListenerFactory") public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(false); return factory; } @Bean("topicListenerFactory") public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); return factory; } }
-
消息生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.jms.Queue; import javax.jms.Topic; @Component public class Producer { @Resource private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; @Autowired private Topic topic; public void sendQueue(String msg) { System.out.println("send queue msg: " + msg); this.jmsMessagingTemplate.convertAndSend(this.queue, msg); } public void sendTopic(String msg) { System.out.println("send topic msg: " + msg); this.jmsMessagingTemplate.convertAndSend(this.topic, msg); } }
-
消息消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// Consumer1 import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class Consumer { // 在消费者接收的方法中,指明使用 containerFactory 接收消息。 @JmsListener(destination = "Levi.queue",containerFactory = "queueListenerFactory") public void receiveQueue(String text) { System.out.println("Consumer queue msg: " + text); } @JmsListener(destination = "Levi.topic",containerFactory = "topicListenerFactory") public void receiveTopic(String text) { System.out.println("Consumer topic msg: " + text); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
// Consumer2 import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class Consumer2 { // 在消费者接收的方法中,指明使用 containerFactory 接收消息。 @JmsListener(destination = "Levi.queue",containerFactory = "queueListenerFactory") public void receiveQueue(String text) { System.out.println("Consumer2 queue msg: "+text); } @JmsListener(destination = "Levi.topic",containerFactory = "topicListenerFactory") public void receiveTopic(String text) { System.out.println("Consumer2 topic msg: " + text); } }
改造完成之后,再次执行队列和广播的测试方法,就会发现项目同时支持了两种类型的消息收发。
-
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
package top.withlevi; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import top.withlevi.producer.Producer; @SpringBootTest @RunWith(SpringRunner.class) public class SampleActiveMqTests { @Autowired private Producer producer; @Test public void sendSimpleTopicMessage() throws InterruptedException { this.producer.sendTopic("Test Topic message " ); Thread.sleep(1000L); } }
改造完成之后,再次执行队列和广播的测试方法,就会发现项目同时支持了两种类型的消息收发。
总结
消息中间件广泛应用在大型互联网架构中,利用消息中间件队列和广播各自的特性可以支持很多业务,比如 群发送短信、给单个用户发送邮件等。ActiveMQ 是一款非常流行的消息中间件,它的特点是部署简单、使 用方便,比较适合中小型团队。Spring Boot 提供了集成 ActiveMQ 对应的组件,在 Spring Boot 中使用 ActiveMQ 只需要添加相关注解即可。