1.前言简介
1.1 专栏传送门
1.1.1 上文小总结
前面00-05中 分别提到了rabbitmq的基础部署 配置 使用 以及死信队列的使用
本篇呢 介绍新的队列 延时队列
1.1.2 上文传送门
微服务: 04-springboot中rabbitmq的yml或properties配置,消息回收,序列化方式
2. rabbitmq延时队列
2.1.0 查看docker启动的服务(检查rabbitmq)
docker ps
docker ps -a
没有rabbitmq的可以看专栏前面rabbitmq的00-05即可
2.1.1 延时队列的作用
延时队列,从名字就可以看出,队列里面的消息会延时一会,也就是等一会才会被消费。
最经典的就是电商中下订单后不支付。
通常,我们会设定一个时间,比如 30 分钟内如果不支付,订单就自动取消。这个功能就可以通过延时队列来实现,下订单后,马上向延时队列发送一条消息,并且设置延迟时间为 30 分钟。然后等 30 分钟之后,消费者开始消费这条消息,可以简单的判断一下比如订单还是未支付状态,就把订单状态改为关闭的。
2.1.2 业务中需要使用的场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
2.2 延时队列的准备工作
2.2.0 查看docker
2.2.1 升级rabbitmq, 装入延时插件
rabbitmq_delayed_message_exchange-3.12.0.ez
插件: => github插件地址:
插件位置: => csdn0积分下载(永久免费)
2.2.2 将插件放在宿主机的一个位置中
我这里创建的路径是:
/usr/local/src/docker/rabbitmq/plugins
2.2.3 复制宿主机插件到容器
先检查rabbitmq容器是否启动后
docker ps -a
然后复制宿主机路径文件到容器插件内
# 将插件拷贝到容器内部的插件目录
docker cp /usr/local/src/docker/rabbitmq/plugins/rabbitmq_delayed_message_exchange-3.12.0.ez rabbit:/plugins
2.2.4 进入rabbitmq的容器内部
docker exec -it rabbitmq bash
启动rabbitmq的插件
然后重启
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit
docker restart rabbitmq
2.2.5 检查一下rabbitmq控制台的交换机
找到x-delayed-message类型 则为成功
2.3 springboot中使用延时队列
2.3.1 创建配置类
配置死信队列和延时队列 创建延时交换机和延时队列
@Configuration
public class RabbitConfig {
@Autowired
private SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory;
@Autowired
private RabbitCallbackConfig rabbitCallbackConfig;
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//消息到不到队列 自动重新返回到生产者
rabbitTemplate.setMandatory(true);//其实前面配置加了
rabbitTemplate.setConfirmCallback(rabbitCallbackConfig);
rabbitTemplate.setReturnCallback(rabbitCallbackConfig);
// 使用 JSON 序列化与反序列化
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitListenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
@Bean
public DirectExchange deadDirectExchange() {
return ExchangeBuilder.directExchange(RabbitmqConstant.DEAD_LETTER_EXCHANGE).build();
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(RabbitmqConstant.*_DIVIDE_DEAD_QUEUE).build();
}
@Bean
public Binding deadBinding(Queue deadLetterQueue, DirectExchange deadDirectExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadDirectExchange).with("*DeadKey");
}
//------------------------------>
/**
* 延时队列交换机
*/
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("*_exchange", "x-delayed-message", true, false, args);
}
/**
* 延时队列
*/
@Bean
public Queue delayQueue() {
// return new Queue(RabbitConstant.DELAY_QUEUE, true);
Map<String, Object> arguments = new HashMap<>(3 );
arguments.put("x-dead-letter-exchange",RabbitmqConstant.DEAD_LETTER_EXCHANGE);
arguments.put("x-dead-letter-routing-key","*DeadKey");
arguments.put("x-max-length",2000);
// arguments.put("x-message-ttl", 100000 );
return QueueBuilder.durable(RabbitmqConstant.*_DIVIDE_QUEUE).withArguments(arguments).build( );
}
/**
* 给延时队列绑定交换机
*/
@Bean
public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with("*Key").noargs();
}
@Bean
public ApplicationRunner runner(ConnectionFactory connectionFactory) {
return args -> connectionFactory.createConnection().close();
}
2.3.2 发送json信息
.setHeader("x-delay", 10000)
//毫秒 代表10秒后发送消息
Message msg= new Message(1,"天气好");//随便的一个实体类
rabbitTemplate.convertAndSend(
"*_*_exchange",
"*Key",
dto,
message -> {
message.getMessageProperties().setHeader("x-delay", 10000);
return message;
}
);
2.3.3 查看交换机是否生成(运行后)
测试服务器中的mq交换机 找到x-delayed-message死信交换机 (名得相同)则成功
报错(如重名等, 手动删除后自动创建即可), 可以看前面00-05文章
2.4 消费者如何消费
2.4.1 正常消费 @RabbitListener(queues = *)
@RabbitListener(queues = RabbitmqConstant.*_DIVIDE_QUEUE)
public void collect(*DivideDto *DivideDto, Channel channel, Message message) throws IOException {
// goods coupons persons
log.info("获取到mq消息,消息内容为{}", *DivideDto);
log.info("获取到mq---mq消息,消息内容为{}", message);
}
2.4.2 什么情况会进入死信
随机刨除异常 throw new RuntimeException(“测试死信回滚”);
前面文章有介绍 这里不过多介绍 假设设置参数5次报错 进入死信
如测试服务器的mq
所示
3. 文章的总结与预告
3.1 本文总结
- 延时队列的使用
- 为什么要使用延时队列
- 升级rabbitmq,加入延时队列插件
- springboot中使用延时队列, 查看控制台信息
- 接收延时队列, 接收成功为成功
- 测试异常情况, 多次报错进入死信队列 单独处理
- 注意多次重复消费问题, 消费者进行处理, 特殊业务 请使用单一消费者解决即可
3.2 下文预告
rocketmq的相关代码操作
@author: pingzhuyan
@description: ok
@year: 2024