1、概念
观察者模式:当某个状态更改时,会及时的通知订阅这个状态对象,从而进行业务上的处理。比如我们常见的消息中心(MQ),Redis的发布订阅模式,都可以看作是观察者模式。
2、优缺点
优点:1、对象解耦。2、具有自动促发机制。
缺点:1、通知对象过多,会造成性能问题,一般采用异步方式。2、避免循环依赖(防止出现死循环),3、无法获知变化细节。
3、使用场景
- 具有监听功能的场景。
- 发布订阅的场景
- 具有批量通知的场景
4、实现方式
4.1、需求
实现一个可以支持订阅通知的功能。即:用户可以订阅某个主题,当主题有内容时,各个用户可以实时查看到具体的内容。
分析:从上面需求来看,发现有两个点:1、主题,主要有内容,还可以添加订阅用户,以及通知用户。2、用户类,主要接收定义的主题内容。
4.2、基础功能
基于上诉的分析,我们需要创建一个主题类与订阅者类。
4.2.1、主题类
主题:
import com.test.design.observer.basic.receive.Subscriber;
import java.util.ArrayList;
import java.util.List;
public class Topic {
/**
* 订阅者集合
*/
List<Subscriber> subscriberList = new ArrayList<>();
/**
* 主题名
*/
private String topicName;
/**
* 消息
*/
private String message;
/**
* 绑定主题名
*
* @param topicName 主题名
*/
public Topic(String topicName) {
this.topicName = topicName;
}
/**
* 设置发送的消息
*
* @param message 消息内容
*/
public void setMessage(String message) {
this.message = message;
// 通知订阅者
notifySubscribe();
}
/**
* 添加订阅者
*
* @param subscriber 订阅者
*/
public void addSubscribe(Subscriber subscriber) {
if (subscriber == null) {
return;
}
// 获取订阅者集合
if (subscriberList == null) {
subscriberList = new ArrayList<>();
}
// 添加订阅者
subscriberList.add(subscriber);
}
/**
* 通知订阅者
* 注意:通知采用内部,以设置成私有的
*/
private void notifySubscribe() {
// 获取主题的订阅者们
if (subscriberList == null) {
return;
}
// 遍历订阅者,一个一个的通知
for (Subscriber subscriber : subscriberList) {
subscriber.receiveMessage(message);
}
// 清空消息
message = null;
}
}
4.2.2、订阅者类
/**
* 订阅者接口
*/
public interface Subscriber {
/**
* 接收消息
*
* @param message 消息内容
*/
void receiveMessage(String message);
}
4.3、观察者模式(非官方)
4.3.1、实现订阅者
import com.test.design.observer.basic.receive.Subscriber;
import com.test.design.observer.basic.topic.Topic;
/**
* 订阅者实现类
*/
public class UserSubscriber implements Subscriber {
/**
* 主题
*/
public Topic topic;
/**
* 订阅者名称
*/
public String subscribeName;
/**
* 构造器,指定用户名
* @param subscribeName 订阅者名称
*/
public UserSubscriber(String subscribeName) {
this.subscribeName = subscribeName;
}
/**
* 绑定主题
*
* @param topic 主题
*/
public void subscribeTopic(Topic topic) {
this.topic = topic;
}
@Override
public void receiveMessage(String message) {
System.out.println(subscribeName + "接收到:" + message + " 内容了");
}
}
4.3.2、测试+效果
import com.test.design.observer.basic.receive.Subscriber;
import com.test.design.observer.basic.receive.concrete.UserSubscriber;
import com.test.design.observer.basic.topic.Topic;
public class ObserverTest {
public static void main(String[] args) throws InterruptedException {
// 教育类主题
Topic eduTopic = new Topic("edu_topic");
// 新闻类主题
Topic newsTopic = new Topic("news_topic");
// 创建5个用户
Subscriber userSubscribe1 = new UserSubscriber("用户1");
Subscriber userSubscribe2 = new UserSubscriber("用户2");
Subscriber userSubscribe3 = new UserSubscriber("用户3");
Subscriber userSubscribe4 = new UserSubscriber("用户4");
Subscriber userSubscribe5 = new UserSubscriber("用户5");
// 分别将用户1-3订阅教育类,3-5订阅新闻类,用户3同时订阅了教育与新闻类
eduTopic.addSubscribe(userSubscribe1);
eduTopic.addSubscribe(userSubscribe2);
eduTopic.addSubscribe(userSubscribe3);
newsTopic.addSubscribe(userSubscribe3);
newsTopic.addSubscribe(userSubscribe4);
newsTopic.addSubscribe(userSubscribe5);
System.out.println("-------- 教育类 ----------");
// 发送教育类消息
eduTopic.setMessage("教育部即将下发教育总结");
// 休息三秒
Thread.sleep(3000);
System.out.println("-------- 新闻类 ----------");
// 发送新闻类消息
newsTopic.setMessage("明天天气🌤");
}
}
4.3.3、结论
符合在测试类中的预期,用户1-3接收到了教育类消息,用户3-5接收到了新闻类消息。通过上面的代码,可以很直观的看到主题与订阅者之间的关系,订阅者必须在主题中进行绑定操作,否则无法进行通知。但是上面也有特别大的问题,需要我们仔细的去思考,第一个是并发的发送消息时,会产生并发问题。第二个,通知方法可以通过反射调用,跨越了私有权限。
那下面我们就解决上诉的两个问题。
4.3.4、功能升级
4.3.4.1、问题一:解决并发情况下,添加订阅者与通知订阅者。
既然是并发问题,那么我们只能通过加锁或采用并发包的工具类管理。那下面我们开始改造。
主题类 支持并发:
注释很重要,很重要,很重要。
package com.test.design.observer.basic.topic;
import com.test.design.observer.basic.receive.Subscriber;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
public class Topic {
/**
* 订阅者集合,这里采用并发集合,避免并发添加订阅者造成并发数据丢失问题
* 注意:这里主要避免并发添加订阅者问题
*/
CopyOnWriteArrayList<Subscriber> subscriberList = new CopyOnWriteArrayList<>();
/**
* 添加排队等待机制
* 注意:这里主要解决并发发送消息问题
*/
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue();
/**
* 是否正在执行通知,默认没有通知
* 注意:这里时并发重复通知调用订阅者与queue配合使用,保证并发安全性
*/
AtomicBoolean notify = new AtomicBoolean(Boolean.FALSE);
/**
* 主题名
*/
private String topicName;
/**
* 消息
*/
private String message;
/**
* 绑定主题名
*
* @param topicName 主题名
*/
public Topic(String topicName) {
this.topicName = topicName;
}
/**
* 设置发送的消息
*
* @param message 消息内容
*/
public void setMessage(String message) {
// 将要发送的消息放入队列中
queue.add(message);
// 既然没有人在促发通知了,那么我就去促发一次
// 注意:这里采用CAS保证只有一个线程可以设置成运行通知状态
if (notify.compareAndSet(Boolean.TRUE, Boolean.FALSE)) {
// 通知订阅者
asyncNotifySubscribe();
}
}
/**
* 添加订阅者
*
* @param subscriber 订阅者
*/
public void addSubscribe(Subscriber subscriber) {
if (subscriber == null) {
return;
}
// 添加订阅者
subscriberList.add(subscriber);
}
/**
* 通知订阅者
* 通知采用内部,以设置成私有的
* 注意:本次优化会改成异步调用,避免造成主线程一直阻塞使用,
*/
private void asyncNotifySubscribe() {
// 这里可以使用线程池,我在这就不使用了
new Thread(()->{
// 通知一次
notifyMessage();
// 再次检查一次,避免刚好还未释放标记锁,就已经添加消息了
notifyMessage();
// 释放标记,可以主动通知了
// 注意:这里无法再次采用CAS,本身以获取到了,直接设置即可
notify.set(Boolean.FALSE);
}).start();
}
/**
* 通知消息
*/
private void notifyMessage() {
String messageInfo = null;
// 不为空,说明还有内容在等待通知
while ((messageInfo = queue.poll()) != null) {
// 获取主题的订阅者们
if (subscriberList == null) {
return;
}
// 遍历订阅者,一个一个的通知
for (Subscriber subscriber : subscriberList) {
subscriber.receiveMessage(messageInfo);
}
}
}
}
4.3.4.2、问题二:反射跨越权限调用,造成分发消息。
反射这是Java本身自带的功能,目前无法很直观的屏蔽,但是我们可以设置流水线,必须经过什么才能访问,这样也能避免反射带来的影响。
主题类新增反射标记 同时 setMessage()方法设置false
/**
* 反射状态设置为True,只有通过setMessage方法才可以直接访问通知。
*/
AtomicBoolean reflectStatus = new AtomicBoolean(Boolean.TRUE);
/**
* 设置发送的消息
*
* @param message 消息内容
*/
public void setMessage(String message) {
// 将要发送的消息放入队列中
queue.add(message);
// 将反射标记设置成False
reflectStatus.set(Boolean.FALSE);
// 既然没有人在促发通知了,那么我就去促发一次
// 注意:这里采用CAS保证只有一个线程可以设置成运行通知状态
if (notify.compareAndSet(Boolean.TRUE, Boolean.FALSE)) {
// 通知订阅者
asyncNotifySubscribe();
}
}
完整代码:
package com.test.design.observer.basic.topic;
import com.test.design.observer.basic.receive.Subscriber;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
public class Topic {
/**
* 订阅者集合,这里采用并发集合,避免并发添加订阅者造成并发数据丢失问题
* 注意:这里主要避免并发添加订阅者问题
*/
CopyOnWriteArrayList<Subscriber> subscriberList = new CopyOnWriteArrayList<>();
/**
* 添加排队等待机制
* 注意:这里主要解决并发发送消息问题
*/
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue();
/**
* 是否正在执行通知,默认没有通知
* 注意:这里时并发重复通知调用订阅者与queue配合使用,保证并发安全性
*/
AtomicBoolean notify = new AtomicBoolean(Boolean.FALSE);
/**
* 反射状态设置为True,只有通过setMessage方法才可以直接访问通知。
*/
AtomicBoolean reflectStatus = new AtomicBoolean(Boolean.TRUE);
/**
* 主题名
*/
private String topicName;
/**
* 消息
*/
private String message;
/**
* 绑定主题名
*
* @param topicName 主题名
*/
public Topic(String topicName) {
this.topicName = topicName;
}
/**
* 设置发送的消息
*
* @param message 消息内容
*/
public void setMessage(String message) {
// 将要发送的消息放入队列中
queue.add(message);
// 将反射标记设置成False
reflectStatus.set(Boolean.FALSE);
// 既然没有人在促发通知了,那么我就去促发一次
// 注意:这里采用CAS保证只有一个线程可以设置成运行通知状态
if (notify.compareAndSet(Boolean.TRUE, Boolean.FALSE)) {
// 通知订阅者
asyncNotifySubscribe();
}
}
/**
* 添加订阅者
*
* @param subscriber 订阅者
*/
public void addSubscribe(Subscriber subscriber) {
if (subscriber == null) {
return;
}
// 添加订阅者
subscriberList.add(subscriber);
}
/**
* 通知订阅者
* 通知采用内部,以设置成私有的
* 注意:本次优化会改成异步调用,避免造成主线程一直阻塞使用,
*/
private void asyncNotifySubscribe() {
// 这里可以使用线程池,我在这就不使用了
new Thread(()->{
// 通知一次
notifyMessage();
// 再次检查一次,避免刚好还未释放标记锁,就已经添加消息了
notifyMessage();
// 释放标记,可以主动通知了
// 注意:这里无法再次采用CAS,本身以获取到了,直接设置即可
notify.set(Boolean.FALSE);
}).start();
}
/**
* 通知消息
*/
private void notifyMessage() {
String messageInfo = null;
// 不为空,说明还有内容在等待通知
while ((messageInfo = queue.poll()) != null) {
// 获取主题的订阅者们
if (subscriberList == null) {
return;
}
// 遍历订阅者,一个一个的通知
for (Subscriber subscriber : subscriberList) {
subscriber.receiveMessage(messageInfo);
}
}
}
}
4.3.4.3、测试+效果
import com.test.design.observer.basic.receive.Subscriber;
import com.test.design.observer.basic.receive.concrete.UserSubscriber;
import com.test.design.observer.basic.topic.Topic;
public class ObserverTest {
public static void main(String[] args) throws InterruptedException {
// 并发测试
// 教育类主题
Topic eduTopic = new Topic("edu_topic");
// 创建3个用户
Subscriber userSubscribe1 = new UserSubscriber("用户1");
Subscriber userSubscribe2 = new UserSubscriber("用户2");
Subscriber userSubscribe3 = new UserSubscriber("用户3");
// 都订阅教育类主题
eduTopic.addSubscribe(userSubscribe1);
eduTopic.addSubscribe(userSubscribe2);
eduTopic.addSubscribe(userSubscribe3);
// 开辟100个线程分别发送10条消息
for (int i = 0; i < 100; i++) {
Thread thread = new Thread(()->{
for (int j = 0; j < 10; j++) {
eduTopic.setMessage(Thread.currentThread().getName() + " 发送消息 " + j);
}
});
thread.setName("线程" + i);
thread.start();
}
}
}
可以看到总共发送了3000条消息,满足 100(线程)* 10(消息)* 3(订阅者)= 3000;说明消息都通知到各个订阅者了,没有发生消息丢失。
5、总结
通过上面分析,可以很直观的看到观察者模式中的核心理念,只要保证主题与订阅者之间的关系绑定,以及促发通知的约定,就能保证消息能够通知到各个订阅者。