用了SpringBoot的发布订阅模式,代码看着高级多了
大家好,我是小富~
在项目里,经常会有一些主线业务之外的其它业务,比如,下单之后,发送通知、监控埋点、记录日志……
这些非核心业务,如果全部一梭子写下去,有两个问题,一个是业务耦合,一个是串行耗时。
下单之后的逻辑所以,一般在开发的时候,都会把这些操作å抽象成观察者模式,也就是发布/订阅模式(这里就不讨论观察者模式和发布/订阅模式的不同),而且一般会采用多线程的方式来异步执行这些观察者方法。
观察者模式一开始,我们都是自己去写观察者模式。
自己实现观察者模式 观察者简图观察者
- 观察者定义接口
/**
* @Author: fighter3
* @Description: 观察者接口
* @Date: 2022/11/7 11:40 下午
*/
public interface OrderObserver {
void afterPlaceOrder(PlaceOrderMessage placeOrderMessage);
}
-
具体观察者
@Slf4j
public class OrderMetricsObserver implements OrderObserver {
@Override
public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
log.info("[afterPlaceOrder] metrics");
}
}@Slf4j
public class OrderLogObserver implements OrderObserver{
@Override
public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
log.info("[afterPlaceOrder] log.");
}
}@Slf4j
public class OrderNotifyObserver implements OrderObserver{
@Override
public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
log.info("[afterPlaceOrder] notify.");
}
}- 业务通知观察者
- 日志记录观察者
- 监控埋点观察者
被观察者
- 消息实体定义
@Data
public class PlaceOrderMessage implements Serializable {
/**
* 订单号
*/
private String orderId;
/**
* 订单状态
*/
private Integer orderStatus;
/**
* 下单用户ID
*/
private String userId;
//……
}
- 被观察者抽象类
public abstract class OrderSubject {
//定义一个观察者列表
private List<OrderObserver> orderObserverList = new ArrayList<>();
//定义一个线程池,这里参数随便写的
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));
//增加一个观察者
public void addObserver(OrderObserver o) {
this.orderObserverList.add(o);
}
//删除一个观察者
public void delObserver(OrderObserver o) {
this.orderObserverList.remove(o);
}
//通知所有观察者
public void notifyObservers(PlaceOrderMessage placeOrderMessage) {
for (OrderObserver orderObserver : orderObserverList) {
//利用多线程异步执行
threadPoolExecutor.execute(() -> {
orderObserver.afterPlaceOrder(placeOrderMessage);
});
}
}
}
这里利用了多线程,来异步执行观察者。
- 被观察者实现类
/**
* @Author: fighter3
* @Description: 订单实现类-被观察者实现类
* @Date: 2022/11/7 11:52 下午
*/
@Service
@Slf4j
public class OrderServiceImpl extends OrderSubject implements OrderService {
/**
* 下单
*/
@Override
public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
PlaceOrderResVO resVO = new PlaceOrderResVO();
//添加观察者
this.addObserver(new OrderMetricsObserver());
this.addObserver(new OrderLogObserver());
this.addObserver(new OrderNotifyObserver());
//通知观察者
this.notifyObservers(new PlaceOrderMessage());
log.info("[placeOrder] end.");
return resVO;
}
}
测试
@Test
@DisplayName("下单")
void placeOrder() {
PlaceOrderReqVO placeOrderReqVO = new PlaceOrderReqVO();
orderService.placeOrder(placeOrderReqVO);
}
- 测试执行结果
2022-11-08 00:11:13.617 INFO 20235 --- [pool-1-thread-1] c.f.obverser.OrderMetricsObserver : [afterPlaceOrder] metrics
2022-11-08 00:11:13.618 INFO 20235 --- [ main] cn.fighter3.obverser.OrderServiceImpl : [placeOrder] end.
2022-11-08 00:11:13.618 INFO 20235 --- [pool-1-thread-3] c.fighter3.obverser.OrderNotifyObserver : [afterPlaceOrder] notify.
2022-11-08 00:11:13.617 INFO 20235 --- [pool-1-thread-2] cn.fighter3.obverser.OrderLogObserver : [afterPlaceOrder] log.
可以看到,观察者是异步执行的。
利用Spring精简可以看到,观察者模式写起来还是比较简单的,但是既然都用到了Spring来管理Bean的生命周期,代码还可以更精简一些。
Spring精简观察者模式观察者实现类:定义成Bean
-
OrderLogObserver
@Slf4j
@Service
public class OrderLogObserver implements OrderObserver {
@Override
public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
log.info("[afterPlaceOrder] log.");
}
} -
OrderMetricsObserver
@Slf4j
@Service
public class OrderMetricsObserver implements OrderObserver {
@Override
public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
log.info("[afterPlaceOrder] metrics");
}
}
- OrderNotifyObserver
@Slf4j
@Service
public class OrderNotifyObserver implements OrderObserver {
@Override
public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
log.info("[afterPlaceOrder] notify.");
}
}
被观察者:自动注入Bean
-
OrderSubject
public abstract class OrderSubject {
/**
* 利用Spring的特性直接注入观察者
*/
@Autowired
protected List<OrderObserver> orderObserverList;
//定义一个线程池,这里参数随便写的
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));
//通知所有观察者
public void notifyObservers(PlaceOrderMessage placeOrderMessage) {
for (OrderObserver orderObserver : orderObserverList) {
//利用多线程异步执行
threadPoolExecutor.execute(() -> {
orderObserver.afterPlaceOrder(placeOrderMessage);
});
}
}
} -
OrderServiceImpl
@Service
@Slf4j
public class OrderServiceImpl extends OrderSubject implements OrderService {
/**
* 实现类里也要注入一下
*/
@Autowired
private List<OrderObserver> orderObserverList;
/**
* 下单
*/
@Override
public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
PlaceOrderResVO resVO = new PlaceOrderResVO();
//通知观察者
this.notifyObservers(new PlaceOrderMessage());
log.info("[placeOrder] end.");
return resVO;
}
}
这样一来,发现被观察者又简洁了很多,但是后来我发现,在SpringBoot项目里,利用Spring事件驱动驱动模型(event)模型来实现,更加地简练。
Spring Event实现发布/订阅模式Spring Event对发布/订阅模式进行了封装,使用起来更加简单,还是以我们这个场景为例,看看怎么来实现吧。
自定义事件
- PlaceOrderEvent:继承ApplicationEvent,并重写构造函数。ApplicationEvent是Spring提供的所有应用程序事件扩展类。
public class PlaceOrderEvent extends ApplicationEvent {
public PlaceOrderEvent(PlaceOrderEventMessage source) {
super(source);
}
}
- PlaceOrderEventMessage:事件消息,定义了事件的消息体。
@Data
public class PlaceOrderEventMessage implements Serializable {
/**
* 订单号
*/
private String orderId;
/**
* 订单状态
*/
private Integer orderStatus;
/**
* 下单用户ID
*/
private String userId;
//……
}
事件监听者
事件监听者,有两种实现方式,一种是实现ApplicationListener接口,另一种是使用@EventListener注解。
事件监听者实现实现ApplicationListener接口
实现ApplicationListener接口,重写onApplicationEvent方法,将类定义为Bean,这样,一个监听者就完成了。
- OrderLogListener
@Slf4j
@Service
public class OrderLogListener implements ApplicationListener<PlaceOrderEvent> {
@Override
public void onApplicationEvent(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] log.");
}
}
- OrderMetricsListener
@Slf4j
@Service
public class OrderMetricsListener implements ApplicationListener<PlaceOrderEvent> {
@Override
public void onApplicationEvent(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] metrics");
}
}
- OrderNotifyListener
@Slf4j
@Service
public class OrderNotifyListener implements ApplicationListener<PlaceOrderEvent> {
@Override
public void onApplicationEvent(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] notify.");
}
}
使用@EventListener注解
使用@EventListener注解就更简单了,直接在方法上,加上@EventListener注解就行了。
-
OrderLogListener
@Slf4j
@Service
public class OrderLogListener {
@EventListener
public void orderLog(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] log.");
}
} -
OrderMetricsListener
@Slf4j
@Service
public class OrderMetricsListener {
@EventListener
public void metrics(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] metrics");
}
} -
OrderNotifyListener
@Slf4j
@Service
public class OrderNotifyListener{
@EventListener
public void notify(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] notify.");
}
}
异步和自定义线程池
异步执行
异步执行也非常简单,使用Spring的异步注解@Async就可以了。例如:
- OrderLogListener
@Slf4j
@Service
public class OrderLogListener {
@EventListener
@Async
public void orderLog(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] log.");
}
}
当然,还需要开启异步,SpringBoot项目默认是没有开启异步的,我们需要手动配置开启异步功能,很简单,只需要在配置类上加上@EnableAsync
注解就行了,这个注解用于声明启用Spring的异步方法执行功能,需要和@Configuration
注解一起使用,也可以直接加在启动类上。
@SpringBootApplication
@EnableAsync
public class DailyApplication {
public static void main(String[] args) {
SpringApplication.run(DairlyLearnApplication.class, args);
}
}
自定义线程池
使用@Async的时候,一般都会自定义线程池,因为@Async
的默认线程池为SimpleAsyncTaskExecutor,不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。
自定义线程池有三种方式:
@Async自定义线程池- 实现接口AsyncConfigurer
- 继承AsyncConfigurerSupport
- 配置由自定义的TaskExecutor替代内置的任务执行器
我们来看看三种写法:
- 实现接口AsyncConfigurer
@Configuration
@Slf4j
public class AsyncConfiguration implements AsyncConfigurer {
@Bean("fighter3AsyncExecutor")
public ThreadPoolTaskExecutor executor() {
//Spring封装的一个线程池
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//随便写的一些配置
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(30);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("fighter3AsyncExecutor-");
executor.initialize();
return executor;
}
@Override
public Executor getAsyncExecutor() {
return executor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
}
}
- 继承AsyncConfigurerSupport
@Configuration
@Slf4j
public class SpringAsyncConfigurer extends AsyncConfigurerSupport {
@Bean
public ThreadPoolTaskExecutor asyncExecutor() {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
//随便写的一些配置
threadPool.setCorePoolSize(10);
threadPool.setMaxPoolSize(30);
threadPool.setWaitForTasksToCompleteOnShutdown(true);
threadPool.setAwaitTerminationSeconds(60 * 15);
return threadPool;
}
@Override
public Executor getAsyncExecutor() {
return asyncExecutor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
}
}
-
配置自定义的TaskExecutor
@Slf4j
@Service
public class OrderLogListener {
@EventListener
@Async("asyncExecutor")
public void orderLog(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] log.");
}
}-
配置线程池
@Configuration
public class TaskPoolConfig {
@Bean(name = "asyncExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//随便写的一些配置
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("asyncExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
} -
使用@Async注解的时候,指定线程池,推荐使用这种方式,因为在项目里,尽量做到线程池隔离,不同的任务使用不同的线程池
-
异步和自定义线程池
这一部分只是一些扩展,稍微占了一些篇幅,大家可不要觉得Spring Event用起来很繁琐。
发布事件
发布事件也非常简单,只需要使用Spring 提供的ApplicationEventPublisher
来发布自定义事件。
-
OrderServiceImpl
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
/**
* 下单
*/
@Override
public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
log.info("[placeOrder] start.");
PlaceOrderResVO resVO = new PlaceOrderResVO();
//消息
PlaceOrderEventMessage eventMessage = new PlaceOrderEventMessage();
//发布事件
applicationEventPublisher.publishEvent(new PlaceOrderEvent(eventMessage));
log.info("[placeOrder] end.");
return resVO;
}
}
在Idea里查看事件的监听者也比较方便,点击下面图中的图标,就可以查看监听者。
查看监听者监听者测试
最后,我们还是测试一下。
@Test
void placeOrder() {
PlaceOrderReqVO placeOrderReqVO = new PlaceOrderReqVO();
orderService.placeOrder(placeOrderReqVO);
}
- 执行结果
2022-11-08 10:05:14.415 INFO 22674 --- [ main] c.f.o.event.event.OrderServiceImpl : [placeOrder] start.
2022-11-08 10:05:14.424 INFO 22674 --- [ main] c.f.o.event.event.OrderServiceImpl : [placeOrder] end.
2022-11-08 10:05:14.434 INFO 22674 --- [sync-executor-3] c.f.o.event.event.OrderNotifyListener : [afterPlaceOrder] notify.
2022-11-08 10:05:14.435 INFO 22674 --- [sync-executor-2] c.f.o.event.event.OrderMetricsListener : [afterPlaceOrder] metrics
2022-11-08 10:05:14.436 INFO 22674 --- [sync-executor-1] c.f.o.event.event.OrderLogListener : [afterPlaceOrder] log.
可以看到,异步执行,而且用到了我们自定义的线程池。
小结这篇文章里,从最开始自己实现的观察者模式,再到利用Spring简化的观察者模式,再到使用Spring Event实现发布/订阅模式,可以看到,Spring Event用起来还是比较简单的。除此之外,还有Guava EventBus这样的事件驱动实现,大家更习惯使用哪种呢?
·········· END ··············
在看 、 点赞 、 转发 ,是对我最大的鼓励 。