模型概述
Work Queues
模型也称为任务模型,多个消费者绑定到同一个队列,共同消费队列中的消息。- 特点:
- 每条消息只会被一个消费者处理,消息不会被多个消费者同时消费。
- 消息的处理是分配给绑定到队列的消费者,多个消费者可以加速消息的处理。
需求
模拟 WorkQueue,实现一个队列绑定多个消费者
- 在 RabbitMQ 的控制台创建一个队列,名为 work.queue
- 在 publisher 服务中定义测试方法,发送 50 条消息到 work.queue
- 在 consumer 服务中定义两个消息监听者,都监听 work.queue 队列
实现步骤
创建队列
- 在控制台创建队列,命名为
work_queue
。
定义消费者
- 在
Consumer
服务中定义两个消息监听者(即消费者)。 - 监听的队列改为
work_queue
,分别打印消息处理情况,并加上时间戳。 - 为区分不同消费者,使用不同的打印样式(例如,
Consumer 1
用黑色,Consumer 2
用红色打印)。
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message) {
System.out.println("消费者11111111111111111接收到的消息: " +
message +
", " +
LocalTime.now());
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String message) {
System.err.println("消费者22222222222222222接收到的消息: " +
message +
", " +
LocalTime.now());
}
定义消息发送
- 在发送者服务中,编写发送 50 条消息的代码。
- 使用循环发送消息,并在消息中附加编号,以便消费者知道自己接收的是第几条消息。
@Test
public void testWorkQueue() {
// 队列名称
String queueName = "work.queue";
for (int i = 1; i <= 50; i++) {
// 消息
String message = "Hello, Spring AMQP_" + i;
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
测试执行
- 启动消费者后,开始发送 50 条消息。
- 观察消费者的消息接收情况:
Consumer 1
接收到偶数编号的消息,Consumer 2
接收到奇数编号的消息。- 每个消费者分别接收 25 条消息,分配是均匀的(轮询机制)。
观察结论
- 结论 1: 同一个消息只会被一个消费者处理,不会被多个消费者处理。
- 结论 2: 消息的分配是均匀的,即使有多个消费者,每个消费者会轮流处理消息。例如,消息1分给消费者2,消息2分给消费者1,依此类推。
多消费者的作用
- 增加消费者数目可以提高消息的处理速度。
- 如果只有一个消费者,则所有消息由一个消费者处理。如果有多个消费者,消息会被分配给不同的消费者,提高处理效率。
性能差异
- 情况 1: 一个消费者处理所有消息时,消息处理的速度较慢。
- 情况 2: 多个消费者处理消息时,可以大大提高消息的处理速度。
- 例如,2 个消费者分别处理 25 条消息,处理速度快。
- 3 个消费者则每人处理约 16 条,4 个消费者则每人处理约 12 条消息。
生产环境中的应用
- 在实际生产中,通常不会手动创建多个消费者方法。一般是写一个消费者方法,并通过部署多个实例(多台机器或多个服务实例)来实现多个消费者。
- 这样可以使用不同机器的资源来处理消息,进一步提升消息处理速度。
处理速度差异的情况
- 如果不同消费者的处理速度不同,默认的轮询分配可能不够合理。为了解决这个问题,可以使用
prefetch
配置来优化消息的分配。- 默认情况下,RabbitMQ 使用轮询方式来分配消息。
- 如果某个消费者处理消息较慢,默认的分配方式会导致它处理大量消息,处理速度慢。
- 可以通过配置
spring.rabbitmq.listener.simple.prefetch=1
来优化。这样每个消费者只能提前获取一条消息,直到当前消息处理完毕,才能获取下一条。
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
优化示例
- 在消费者 1 中设置每条消息处理后休眠 25 毫秒,每秒处理 40 条消息。
- 在消费者 2 中设置每条消息处理后休眠 200 毫秒,每秒处理 5 条消息。
- 通过这种方式模拟消费者处理速度不同。
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message) throws InterruptedException {
System.out.println("消费者11111111111111111接收到的消息: " + message + ", " + LocalTime.now());
Thread.sleep(25);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String message) throws InterruptedException {
System.err.println("消费者22222222222222222接收到的消息: " + message + ", " + LocalTime.now());
Thread.sleep(200);
}
总结
Work Queues
模型通过多个消费者共同消费同一队列的消息,提高了处理速度,解决了单个消费者处理过慢的问题。- 生产环境中通过多实例部署,可以有效地扩展消费者数量,提升系统处理能力。