对于 RabbitMQ 开发,Spring 也提供了⼀些便利. Spring 和 RabbitMQ 的官⽅⽂档对此均有介绍
Spring官⽅:Spring AMQP
RabbitMQ 官⽅:RabbitMQ tutorial - "Hello World!" | RabbitMQ
下⾯来看如何基于 SpringBoot 进⾏ RabbitMQ 的开发
项目准备
创建spring项目,引入依赖
在创建 spring 项目时添加 Web 开发的依赖(方便测试),以及 RabbitMQ 的依赖。
加入的依赖如下:
<dependencies>
<!--Spring MVC相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--RabbitMQ相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
添加配置
#配置RabbitMQ的基本信息
spring:
rabbitmq:
host: 192.168.66.129
port: 5672
virtual-host: wuyulin
username: wuyulin
password: wuyulin
工作队列模式
编写⽣产者代码,为⽅便测试,我们通过接⼝来发送消息
声明队列
import com.yulin.rabbitmqspringdemo.constant.RabbitMQConstants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//声明队列
//1. ⼯作模式队列
@Bean("workQueue")
public Queue workQueue() {
return QueueBuilder.durable(RabbitMQConstants.WORK_QUEUE).build();
}
}
生产者代码(接口)
import com.yulin.rabbitmqspringdemo.constant.RabbitMQConstants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/work")
public String work(){
for(int i=0;i<10;i++){
String message="hello spring rabbitMQ"+i;
rabbitTemplate.convertAndSend("", RabbitMQConstants.WORK_QUEUE,message);
}
return "向工作队列发送消息成功";
}
}
消费者代码(监听类)
import com.yulin.rabbitmqspringdemo.constant.RabbitMQConstants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* Created with IntelliJ IDEA.
* Description:
* User: wuyulin
* Date: 2024-12-24
* Time: 11:53
*/
@Component
public class WorkListener {
@RabbitListener(queues = RabbitMQConstants.WORK_QUEUE)
public void listenerQueue1(Message message){
System.out.println("listener 1["+RabbitMQConstants.WORK_QUEUE+"]收到消息:" + message);
}
@RabbitListener(queues = RabbitMQConstants.WORK_QUEUE)
public void listenerQueue2(Message message){
System.out.println("listener 2["+RabbitMQConstants.WORK_QUEUE+"]收到消息:" + message);
}
}
@RabbitListener 是 Spring 框架中⽤于监听 RabbitMQ 队列的注解,通过使⽤这个注解,可以定义⼀个⽅法,以便从 RabbitMQ 队列中接收消息.该注解⽀持多种参数类型,这些参数类型代表了从 RabbitMQ 接收到的消息和相关信息.以下是⼀些常⽤的参数类型:
1. String :返回消息的内容
2. Message ( org.springframework.amqp.core.Message ):Spring AMQP 的 Message 类,返回原始的消息体以及消息的属性,如消息ID,内容,队列信息等.
3. Channel ( com.rabbitmq.client.Channel ):RabbitMQ的通道对象,可以⽤于进⾏更⾼级的操作,如⼿动确认消息
Publish/Subscribe(发布订阅模式)
在发布/订阅模型中,多了⼀个 Exchange ⻆⾊. Exchange 常⻅有三种类型,分别代表不同的路由规则
a) Fanout:⼴播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式)
b) Direct:定向,把消息交给符合指定 routing key 的队列(Routing模式)
c) Topic:通配符,把消息交给符合 routing pattern(路由模式)的队列(Topics模式)
广播模式(Fanout)
声明队列和交换机并建立绑定关系
import com.yulin.rabbitmqspringdemo.constant.RabbitMQConstants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//发布订阅模式
//声明2个队列, 观察是否两个队列都收到了消息
@Bean("fanoutQueue1")
public Queue fanoutQueue1() {
return QueueBuilder.durable(RabbitMQConstants.FANOUT_QUEUE1).build();
}
@Bean("fanoutQueue2")
public Queue fanoutQueue2() {
return QueueBuilder.durable(RabbitMQConstants.FANOUT_QUEUE2).build();
}
//声明交换机
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange() {
return ExchangeBuilder.fanoutExchange(RabbitMQConstants.FANOUT_EXCHANGE_NAME).durable(true).build();
}
//队列和交换机绑定
@Bean
public Binding fanoutBinding(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue1") Queue queue) {
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding fanoutBinding2(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(exchange);
}
}
生产者代码(接口)
@RequestMapping("/fanout")
public String fanoutProduct(){
//routingKey为空, 表⽰所有队列都可以收到消息
rabbitTemplate.convertAndSend(RabbitMQConstants.FANOUT_EXCHANGE_NAME, "","hello spring boot: fanout");
return "发送成功";
}
消费者代码(监听类)
import com.yulin.rabbitmqspringdemo.constant.RabbitMQConstants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* Created with IntelliJ IDEA.
* Description:
* User: wuyulin
* Date: 2024-12-24
* Time: 15:23
*/
@Component
public class FanoutListener {
//指定监听队列的名称
@RabbitListener(queues = RabbitMQConstants.FANOUT_QUEUE1)
public void ListenerFanoutQueue1(String message){
System.out.println("["+RabbitMQConstants.FANOUT_QUEUE1+ "]接收到消息:"+ message);
}
@RabbitListener(queues = RabbitMQConstants.FANOUT_QUEUE2)
public void ListenerFanoutQueue2(String message){
System.out.println("["+RabbitMQConstants.FANOUT_QUEUE2+ "]接收到消息:"+ message);
}
}
路由模式(Routing)
交换机类型为 Direct 时,会把消息交给符合指定 routing key 的队列. 队列和交换机的绑定,不是任意的绑定了,⽽是要指定⼀个 RoutingKey (路由key) 消息的发送⽅在向 Exchange 发送消息时,也需要指定消息的 RoutingKey ,Exchange 也不再把消息交给每⼀个绑定的 key,⽽是根据消息的RoutingKey 进⾏判断,只有队列的 RoutingKey 和消息的 RoutingKey 完全⼀致,才会接收到消
声明队列和交换机并建立 routing key 绑定关系
import com.yulin.rabbitmqspringdemo.constant.RabbitMQConstants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//路由模式,声明两个数组和一个交换机
@Bean("directQueue1")
public Queue routingQueue1() {
return QueueBuilder.durable(RabbitMQConstants.DIRECT_QUEUE1).build();
}
@Bean("directQueue2")
public Queue routingQueue2() {
return QueueBuilder.durable(RabbitMQConstants.DIRECT_QUEUE2).build();
}
//声明交换机
@Bean("directExchange")
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstants.DIRECT_EXCHANGE_NAME).durable(true).build();
}
//队列和交换机绑定
//队列1绑定orange
@Bean
public Binding directBinding(@Qualifier("directExchange") DirectExchange exchange,
@Qualifier("directQueue1") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("orange");
}
//队列2绑定black, orange
@Bean
public Binding directBinding2(@Qualifier("directExchange") DirectExchange exchange,
@Qualifier("directQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("black");
}
@Bean
public Binding directBinding3(@Qualifier("directExchange") DirectExchange exchange,
@Qualifier("directQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("orange");
}
}
生产者代码(接口)
@RequestMapping("/direct")
public String directProduct(String routingKey){
//routingKey作为参数传递
rabbitTemplate.convertAndSend(RabbitMQConstants.DIRECT_EXCHANGE_NAME,
routingKey,"hello spring boot: direct "+routingKey);
return "发送成功";
}
消费者代码(监听类)
import com.yulin.rabbitmqspringdemo.constant.RabbitMQConstants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* Created with IntelliJ IDEA.
* Description:
* User: wuyulin
* Date: 2024-12-24
* Time: 16:07
*/
@Component
public class DirectListener {
//指定监听队列的名称
@RabbitListener(queues = RabbitMQConstants.DIRECT_QUEUE1)
public void ListenerQueue1(String message){
System.out.println("["+RabbitMQConstants.DIRECT_QUEUE1+ "]接收到消息:"+ message);
}
@RabbitListener(queues = RabbitMQConstants.DIRECT_QUEUE2)
public void ListenerQueue2(String message){
System.out.println("["+RabbitMQConstants.DIRECT_QUEUE2+ "]接收到消息:"+ message);
}
}
通配符模式(Topics)
Topics 和Routing模式的区别是:
1. topics 模式使⽤的交换机类型为 topic( Routing 模式使⽤的交换机类型为direct)
2. topic 类型的交换机在匹配规则上进⾏了扩展, Binding Key ⽀持通配符匹配
声明队列和交换机并建立 routing key 通配符绑定关系
import com.yulin.rabbitmqspringdemo.constant.RabbitMQConstants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//topic模式
@Bean("topicsQueue1")
public Queue topicsQueue1() {
return QueueBuilder.durable(RabbitMQConstants.TOPICS_QUEUE1).build();
}
@Bean("topicsQueue2")
public Queue topicsQueue2() {
return QueueBuilder.durable(RabbitMQConstants.TOPICS_QUEUE2).build();
}
//声明交换机
@Bean("topicExchange")
public TopicExchange topicExchange() {
return ExchangeBuilder.topicExchange(RabbitMQConstants.TOPICS_EXCHANGE_NAME).durable(true).build();
}
//队列和交换机绑定
//队列1绑定error, 仅接收error信息
@Bean
public Binding topicBinding(@Qualifier("topicExchange") TopicExchange exchange,
@Qualifier("topicsQueue1") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("*.error");
}
//队列2绑定info, error: error,info信息都接收
@Bean
public Binding topicBinding2(@Qualifier("topicExchange") TopicExchange exchange,
@Qualifier("topicsQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("#.info");
}
@Bean
public Binding topicBinding3(@Qualifier("topicExchange") TopicExchange exchange,
@Qualifier("topicsQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("*.error");
}
}
生产者代码(接口)
@RequestMapping("/topics/{routingKey}")
public String topicProduct(@PathVariable("routingKey") String routingKey){
//routingKey为空, 表⽰所有队列都可以收到消息
rabbitTemplate.convertAndSend(RabbitMQConstants.TOPICS_EXCHANGE_NAME,
routingKey,"hello spring boot: topics "+routingKey);
return "发送成功";
}
消费者代码(监听类)
import com.yulin.rabbitmqspringdemo.constant.RabbitMQConstants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* Created with IntelliJ IDEA.
* Description:
* User: wuyulin
* Date: 2024-12-25
* Time: 11:26
*/
@Component
public class TopicListener {
//指定监听队列的名称
@RabbitListener(queues = RabbitMQConstants.TOPICS_QUEUE1)
public void ListenerQueue1(String message){
System.out.println("["+RabbitMQConstants.TOPICS_QUEUE1+ "]接收到消息:"+ message);
}
@RabbitListener(queues = RabbitMQConstants.TOPICS_QUEUE2)
public void ListenerQueue2(String message){
System.out.println("["+RabbitMQConstants.TOPICS_QUEUE2+ "]接收到消息:"+ message);
}
}
应用通信
作为⼀个消息队列,RabbitMQ 也可以⽤作应⽤程序之间的通信.上述代码,⽣产者和消费者代码放在不同的应⽤中即可完成不同应⽤程序的通信.接下来我们来看,基于 SpringBoot+RabbitMQ 完成应⽤间的通信.
需求描述:⽤户下单成功之后,通知物流系统,进⾏发货.(只讲应⽤通信,不做具体功能实现)
订单系统作为⼀个⽣产者,物流系统作为⼀个消费者
创建项目
把两个项⽬放在⼀个项⽬中(也可独⽴创建)
1. 创建⼀个空的项⽬ rabbitmq-communication(其实就是⼀个空的⽂件夹)
2.在这个项⽬⾥,创建Module
3.后续流程和创建 SpringBoot 项⽬⼀样,添加对应依赖 创建两个项⽬
1)logistics-service
2)order-service
4. 最终结构如下
可能出现的问题:
如下图,文件显示为橙色,并且启动类无法启动,这是因为没有添加配置 maven 仓库
右击项目,选择Add Framework Support
勾选 Maven
订单系统(生产者)
发送消息格式为对象
如果通过 RabbitTemplate 发送⼀个对象作为消息,我们需要对该对象进⾏序列化.Spring AMQP推荐使⽤ JSON 序列化,Spring AMQP提供了 Jackson2JsonMessageConverter 和 MappingJackson2MessageConverter 等转换器,我们需要把⼀个 MessageConverter 设置到 RabbitTemplate 中.
@Configuration
public class RabbitMQConfig {
//创建一个 Json 格式的消息转换器给Ioc容器管理
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jackson2JsonMessageConverter); // 设置消息转换器
return template;
}
}
定义⼀个对象
@Data
public class OrderInfo {
private String orderId;
private String name;
private long price;
}
⽣产者代码:
@RequestMapping("/createOrder")
public String createOrder(){
//下单相关操作, ⽐如参数校验, 操作数据库等, 代码省略
//发送消息通知
String orderId = UUID.randomUUID().toString();
OrderInfo orderInfo = new OrderInfo(orderId, "商品", 536);
rabbitTemplate.convertAndSend("", "order.create",orderInfo);
return "下单成功";
}
消费者代码:
@Component
//指定监听队列的名称
//@RabbitListener(queues="order.create")可以加在类上,也可以加在⽅法上,⽤于定于⼀个类或者⽅法作为消息的监听器.
@RabbitListener(queues = "order.create")
public class OrderListener {
//会根据接收到的信息类型调用对应的方法
@RabbitHandler
public void ListenerQueue(String message){
System.out.println("接收到消息:"+ message);
//收到消息后的处理, 代码省略
}
@RabbitHandler
public void ListenerQueue(OrderInfo message){
System.out.println("接收到消息:"+ message);
//收到消息后的处理, 代码省略
}
}
@RabbitListener(queues="order.create")可以加在类上,也可以加在⽅法上,⽤于定于⼀个类或者⽅法作为消息的监听器.
@RabbitHandler 是⼀个⽅法级别的注解,当使⽤ @RabbitHandler 注解时,这个⽅法将被调 ⽤处理特定的消息.
如上所示,当信息是字符串类型,由第一个方法接收处理,当信息是 OrderInfo 类型由第二个方法接收处理