SpringBoot异步接口实现:提高系统的吞吐量
程序员的成长之路
共 8881字,需浏览 18分钟
·
2024-08-15 07:33
阅读本文大概需要 5 分钟。
来自:juejin.cn/post/7367186272849788962
前言
ResponseBodyEmitter
、SseEmitter
、StreamingResponseBody
,不在本文介绍内,之后新写文章介绍):
-
AsyncContext -
Callable -
WebAsyncTask -
DeferredResult
AsyncContext
是Servlet层级的,比较原生的方式,本文不对此介绍(一般都不使用它,太麻烦了)。本文着重介绍后面三种方式。
特别说明:服务端的异步或同步对于客户端而言是不可见的。不会因为服务端使用了异步,接口的结果就和同步不一样了。另外,对于单个请求而言,使用异步接口会导致响应时间比同步大,但不特别明显。具体后文分析。
基于Callable实现
java.util.concurrent.Callable
包装的任何值,都表示该接口是一个异步接口:
@GetMapping("/testCallAble")
public Callable<String> testCallAble() {
return () -> {
Thread.sleep(40000);
return "hello";
};
}
Callable
处理过程如下:
Callable
。
-
Spring MVC 调用 request.startAsync()
并将 Callable 提交给AsyncTaskExecutor
以在单独的线程中进行处理。 -
同时, DispatcherServlet
和所有过滤器退出 Servlet 容器线程,但response保持打开状态。 -
最终 Callable 产生结果,Spring MVC将请求分派回Servlet容器以完成处理。 -
再次调用 DispatcherServlet
,并使用 Callable 异步生成的返回值继续处理。
Callable
默认使用SimpleAsyncTaskExecutor
类来执行,这个类非常简单而且没有重用线程。在实践中,需要使用AsyncTaskExecutor
类来对线程进行配置。
基于WebAsyncTask实现
WebAsyncTask
是对Callable的包装,提供了更强大的功能,比如:处理超时回调、错误回调、完成回调等。本质上,和Callable区别不大,但是由于它额外封装了一些事件的回调,所有,通常都使用WebAsyncTask
而不是Callable
:
@GetMapping("/webAsyncTask")
public WebAsyncTask<String> webAsyncTask() {
WebAsyncTask<String> result = new WebAsyncTask<>(30003, () -> {
return "success";
});
result.onTimeout(() -> {
log.info("timeout callback");
return "timeout callback";
});
result.onCompletion(() -> log.info("finish callback"));
return result;
}
WebAsyncTask
可以配置一个超时时间,这里配置的超时时间比全局配置的超时时间优先级都高(会覆盖全局配置的超时时间)。
基于DeferredResult实现
DeferredResult
使用方式与Callable
类似,但在返回结果时不一样,它返回的时实际结果可能没有生成,实际的结果可能会在另外的线程里面设置到DeferredResult
中去。
//定义一个全局的变量,用来存储DeferredResult对象
private Map<String, DeferredResult<String>> deferredResultMap = new ConcurrentHashMap<>();
@GetMapping("/testDeferredResult")
public DeferredResult<String> testDeferredResult(){
DeferredResult<String> deferredResult = new DeferredResult<>();
deferredResultMap.put("test", deferredResult);
return deferredResult;
}
DeferredResult
对象存放在了一个Map集合中,实际应用中可以设计一个对象管理器来统一管理这些个对象。
注意:要考虑定时轮询(或其他方式)这些对象,将已经处理过或无效的 DeferredResult
对象清理掉(DeferredResult.isSetOrExpired
方法可以判断是否还有效),避免内存泄露。
@GetMapping("/testSetDeferredResult")
public String testSetDeferredResult() throws InterruptedException {
DeferredResult<String> deferredResult = deferredResultMap.get("test");
boolean flag = deferredResult.setResult("testSetDeferredResult");
if(!flag){
log.info("结果已经被处理,此次操作无效");
}
return "ok";
}
DeferredResult
的值:首先是从之前存放DeferredResult
的map中拿到DeferredResult
的值,然后设置它的返回值。当执行deferredResult.setResult
之后,可以看到之前pending状态的接口完成了响应,得到的结果,就是这里设置的值。
DeferredResult
时也可以设置超时时间,这个时间的优先级也是大于全局设置的。另外,判断DeferredResult
是否有效,只是一个简单的判断,实际中判断有效的并不一定是有效的(比如:客户端取消了请求,服务端是不知道的),但是一般判断为无效的,那肯定是无效了。
DeferredResult
处理过程如下:
-
控制器返回一个 DeferredResult
并将其保存在可以访问的内存队列或列表中。 -
Spring MVC 调用 request.startAsync()
。 -
同时, DispatcherServlet
和所有配置的过滤器退出请求处理线程,但响应保持打开状态。 -
应用程序从某个线程设置 DeferredResult
,Spring MVC 将请求分派回 Servlet 容器。 -
再次调用 DispatcherServlet
,并使用异步生成的返回值继续处理。
提供一个线程池
@Bean("mvcAsyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 线程池维护线程的最少数量
// asyncServiceExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors() + 1);
executor.setCorePoolSize(5);
// 线程池维护线程的最大数量
executor.setMaxPoolSize(10);
// 线程池所使用的缓冲队列
executor.setQueueCapacity(10);
// asyncServiceExecutor.prefersShortLivedTasks();
executor.setThreadNamePrefix("fyk-mvcAsyncTask-Thread-");
asyncServiceExecutor.setBeanName("TaskId" + taskId);
// asyncServiceExecutor.setKeepAliveSeconds(20);
//调用者执行
// asyncServiceExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 线程全部结束才关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 如果超过60s还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
executor.setAwaitTerminationSeconds(30);
executor.initialize();
return executor;
}
@Configuration
public class FykWebMvcConfigurer implements WebMvcConfigurer {
@Autowired
@Qualifier("mvcAsyncTaskExecutor")
private AsyncTaskExecutor asyncTaskExecutor;
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
//异步操作的超时时间,值为0或者更小,表示永不超时
configurer.setDefaultTimeout(60001);
configurer.setTaskExecutor(asyncTaskExecutor);
}
}
什么时候使用异步请求
推荐阅读:
程序员在线工具站:cxytools.com 推荐一个自己写的工具站:http://cxytools.com,专为程序员设计,包括时间日期、JSON处理、SQL格式化、随机字符串生成、UUID生成、文本Hash...等功能,提升开发效率。
⬇戳阅读原文直达! 朕已阅
评论