1.环境搭建
1.Docker安装RabbitMQ
1.拉取镜像
docker pull rabbitmq:3.8-management
2.安装命令
docker run -e RABBITMQ_DEFAULT_USER=sun \
-e RABBITMQ_DEFAULT_PASS=mq \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-d 699038cb2b96 # 注意这里是镜像id,需要替换
3.开启5672和15672端口
4.登录控制台
15672端口
2.整合Spring AMQP
1.sun-common模块下创建新模块
2.引入amqp依赖和fastjson
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- 继承父模块的版本和通用依赖 -->
<parent>
<groupId>com.sunxiansheng</groupId>
<artifactId>sun-common</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>sun-common-rabbitmq</artifactId>
<!-- 子模块的version,如果不写就默认跟父模块的一样 -->
<version>${children.version}</version>
<!-- 自定义依赖,无需版本号 -->
<dependencies>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 用于传递消息时的序列化操作 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
</project>
3.新建一个mq-demo的模块
1.在sun-frame下创建mq-demo
2.然后在mq-demo下创建生产者和消费者子模块
3.查看是否交给父模块管理了
4.在mq-demo模块引入sun-common-rabbitmq依赖
<dependencies>
<!-- 引入sun-common-rabbitmq -->
<dependency>
<groupId>com.sunxiansheng</groupId>
<artifactId>sun-common-rabbitmq</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
5.publisher引入sun-common-test依赖
<dependencies>
<!-- sun-common-test -->
<dependency>
<groupId>com.sunxiansheng</groupId>
<artifactId>sun-common-test</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
6.将sun-common-rabbitmq clean-install一下
7.给consumer和publisher都创建主类
1.ConsumerApplication.java
package com.sunxiansheng.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan("com.sunxiansheng")
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
2.PublisherApplication.java
package com.sunxiansheng.publisher;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class PublisherApplication {
public static void main(String[] args) {
SpringApplication.run(PublisherApplication.class);
}
}
4.测试MQ
1.application.yml mq的最基本配置
spring:
# RabbitMQ 配置
rabbitmq:
# 服务器地址
host: ip
# 用户名
username: sunxiansheng
# 密码
password: rabbitmq
# 虚拟主机
virtual-host: /
# 端口
port: 5672
2.consumer
1.TestConfig.java MQ配置
package com.sunxiansheng.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Description: 最基本的MQ测试
* @Author sun
* @Create 2024/8/2 14:34
* @Version 1.0
*/
@Configuration
public class TestConfig {
/**
* 创建一个fanout类型的交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange.test");
}
/**
* 创建一个队列
* @return
*/
@Bean
public Queue fanoutQueueTest() {
return new Queue("fanout.queue.test");
}
/**
* 交换机和队列绑定
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(fanoutQueueTest()).to(fanoutExchange());
}
}
2.TestConfigListener.java 监听队列
package com.sunxiansheng.consumer.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* Description: 最基本的MQ测试
* @Author sun
* @Create 2024/8/2 14:34
* @Version 1.0
*/
@Component
public class TestConfigListener {
@RabbitListener(queues = "fanout.queue.test")
public void receive(String message) {
System.out.println("接收到的消息:" + message);
}
}
3.publisher
1.TestConfig.java 测试(注意指定启动类)
package com.sunxiansheng.consumer.config;
import com.sunxiansheng.publisher.PublisherApplication;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
/**
* Description: 最基本的MQ测试
* @Author sun
* @Create 2024/8/2 14:34
* @Version 1.0
*/
@SpringBootTest(classes = PublisherApplication.class) // 指定启动类
public class TestConfig {
@Resource
private AmqpTemplate amqpTemplate;
@Test
public void send() {
// 交换机
String exchange = "fanout.exchange.test";
// 路由键
String routingKey = "";
// 消息
String message = "hello fanout";
// 发送消息
amqpTemplate.convertAndSend(exchange, routingKey, message);
}
}
2.结果
2.基本交换机
1.Fanout
1.FanoutConfig.java 交换机配置
package com.sunxiansheng.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Description: Fanout交换机
* @Author sun
* @Create 2024/7/29 15:06
* @Version 1.0
*/
@Configuration
public class FanoutConfig {
@Bean
public FanoutExchange fanoutExchange1() {
// 创建一个fanout类型的交换机
return new FanoutExchange("fanout.exchange");
}
@Bean
public Queue fanoutQueue1() {
// 创建一个队列
return new Queue("fanout.queue1");
}
@Bean
public Queue fanoutQueue2() {
// 创建一个队列
return new Queue("fanout.queue2");
}
// 两个队列绑定到交换机上
@Bean
public Binding bindingFanoutQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange1) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange1);
}
@Bean
public Binding bindingFanoutQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange1) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange1);
}
}
2.FanoutConfigListener.java 监听者
package com.sunxiansheng.consumer.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* Description: Fanout交换机
* @Author sun
* @Create 2024/7/29 15:06
* @Version 1.0
*/
@Component
public class FanoutConfigListener {
@RabbitListener(queues = "fanout.queue1")
public void receive1(String message) {
System.out.println("fanout.queue1接收到的消息:" + message);
}
@RabbitListener(queues = "fanout.queue2")
public void receive2(String message) {
System.out.println("fanout.queue2接收到的消息:" + message);
}
}
3.FanoutConfig.java 生产者
package com.sunxiansheng.consumer.config;
import com.sunxiansheng.publisher.PublisherApplication;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
/**
* Description: Fanout交换机
* @Author sun
* @Create 2024/7/29 15:06
* @Version 1.0
*/
@SpringBootTest(classes = PublisherApplication.class) // 指定启动类
public class FanoutConfig {
@Resource
private AmqpTemplate amqpTemplate;
@Test
public void send() {
// 交换机
String exchange = "fanout.exchange";
// 路由键
String routingKey = "";
// 消息
String message = "hello fanout";
// 发送消息
amqpTemplate.convertAndSend(exchange, routingKey, message);
}
}
2.Direct
1.DirectConfig.java 交换机配置
package com.sunxiansheng.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Description: Direct交换机
* @Author sun
* @Create 2024/7/29 15:06
* @Version 1.0
*/
@Configuration
public class DirectConfig {
@Bean
public DirectExchange directExchange() {
// 创建一个direct类型的交换机
return new DirectExchange("direct.exchange");
}
@Bean
public Queue directQueue1() {
// 创建一个队列
return new Queue("direct.queue1");
}
@Bean
public Queue directQueue2() {
// 创建一个队列
return new Queue("direct.queue2");
}
// 两个队列绑定到交换机上,这里需要指定routingKey
@Bean
public Binding bindingDirectQueue1(Queue directQueue1, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue1).to(directExchange).with("black");
}
@Bean
public Binding bindingDirectQueue2(Queue directQueue2, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue2).to(directExchange).with("green");
}
}
2.DirectConfigListener.java 监听者
package com.sunxiansheng.consumer.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* Description: Direct交换机
* @Author sun
* @Create 2024/7/29 15:06
* @Version 1.0
*/
@Component
public class DirectConfigListener {
@RabbitListener(queues = "direct.queue1")
public void receive1(String message) {
System.out.println("direct.queue1接收到的消息:" + message);
}
@RabbitListener(queues = "direct.queue2")
public void receive2(String message) {
System.out.println("direct.queue2接收到的消息:" + message);
}
}
3.DirectConfig.java 生产者
package com.sunxiansheng.consumer.config;
import com.sunxiansheng.publisher.PublisherApplication;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
/**
* Description: Direct交换机
* @Author sun
* @Create 2024/7/29 15:06
* @Version 1.0
*/
@SpringBootTest(classes = PublisherApplication.class) // 指定启动类
public class DirectConfig {
@Resource
private AmqpTemplate amqpTemplate;
@Test
public void send() {
// 交换机
String exchange = "direct.exchange";
// 路由键
String routingKey = "black";
// 消息
String message = "hello direct";
// 发送消息
amqpTemplate.convertAndSend(exchange, routingKey, message);
}
}