请按到场顺序发言之CompletionService详解!

业余草

共 10095字,需浏览 21分钟

 ·

2022-06-15 14:33

你知道的越多,不知道的就越多,业余的像一棵小草!

你来,我们一起精进!你不来,我和你的竞争对手一起精进!

编辑:业余草

static.kancloud.cn/alex_wsc/java_concurrent_programming/1874829

推荐:https://www.xttblog.com/?p=5347

请按到场顺序发言之CompletionService详解!

时间像海绵里的水,只要你愿意挤,总还是有的。

——鲁迅

CompletionService

讲解 CompletionService 之前,我们先回忆一下 ExcutorSevice。ExcutorService 实现了通过线程池来并发执行任务。其中有一种方式是通过线程池执行 Callable 任务,然后通过 Future 获取异步执行的结果,如下面的代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newFixedThreadPool(5);

    Callable callable1 = () -> {
        Thread.sleep(10000);
        return "任务1完成";
    };

    Callable callable2 = () -> {
        Thread.sleep(5000);
        return "任务2完成";
    };

    Future future1 = executor.submit(callable1);
    Future future2 = executor.submit(callable2);

    System.out.println(future1.get());
    System.out.println(future2.get());
}

任务一执行需要 10 秒,任务二执行只需要 5 秒。但是当执行到 future1.get () 时,主线程会被阻塞。等待 10 秒后第一个任务执行完才会去获取第二个任务。然后执行和第二个任务相关的打印操作。大家有没有看出问题?任务 2 明明在 5 秒前就已经执行完成,却不能立刻打印。主线程阻塞在任务一结果的获取上。这样程序执行的效率并不高。如果任务完成后能够立刻被取得执行结果,然后执行后面的逻辑,效率就会有显著的提升。今天我们要讲解的 CompletionService 就是用来做这个事情的。CompletionService 可以按照执行完成结果的到场顺序,被主线程获取到,从而继续执行后面逻辑。

了解 CompletionService

了解一个类最好、最快的方法就是阅读源代码的注解。而大多数人通常的做法却是去百度或者 google。这样有两个弊端,一是效率并不一定高,可能搜出来很多无用的内容。二是看到的文章并不权威,甚至可能是错的。有的同学可能觉得英文阅读费劲,其实作为开发人员,英语阅读已经是必备技能。这就如同你要熟知 IDE 的快捷键一样,所以如果觉得英文阅读困难,可以刻意练习。其实多读一些技术文档,会发现用词基本都是类似的。

扯的有点远,我们收回来,先看看源代码中对 CompletionService 的解释:

对异步任务执行和执行结果消费解藕。生产者提交任务执行。消费者则获取完成的任务,然后按照完成任务的顺序对任务结果进行处理。

官方的解释是不是十分简洁明了?

使用 CompletionService

下面我们使用 CompletionService 实现一个吃苹果的程序。首先我声明一个流,里面是一些水果,每个水果会对应一个洗干净的任务。然后主线程拿到洗干净的水果再一个个吃掉。代码如下:

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService pool = Executors.newFixedThreadPool(5);
    CompletionService<String> service = new ExecutorCompletionService<String>(pool);
    
  Stream.of("苹果""梨""葡萄""桃")
            .forEach(fruit -> service.submit(() -> {
                        if(fruit.equals("苹果")){
                            TimeUnit.SECONDS.sleep(6);
                        }else if(fruit.equals("梨")){
                            TimeUnit.SECONDS.sleep(1);
                        }else if(fruit.equals("葡萄")){
                            TimeUnit.SECONDS.sleep(10);
                        }else if(fruit.equals("桃")){
                            TimeUnit.SECONDS.sleep(3);
                        }
                        return "洗干净的"+fruit;
                    })
            );

    String result;
    while((result=service.take().get())!=null){
        System.out.println("吃掉"+result);
    }
}

可以看到有四种水果。会为每个水果启一个洗水果的任务。每种水果洗的时间不同,其中葡萄最不好洗要 10 秒,而梨最好洗,只需要 1 秒。等待水果洗好后,主线程通过 service.take () 取得执行完成的 Future,然后从里面 get 出返回值,把洗干净的水果吃掉。

我们可以看到输出如下:

吃掉洗干净的梨
吃掉洗干净的桃
吃掉洗干净的苹果
吃掉洗干净的葡萄

可以看到哪个水果先洗干净就会先被吃掉。这也证明了 service.take () 的顺序是任务的完成顺序,而不是任务提交的顺序。

通过 CompletionService 我们就可以一端生产,另一端按照完成的顺序进行消费。这避免提交大量任务时,不知道哪个任务先完成,从而在调用 Future 的 get 方法时产生阻塞。使用 CompletionService,永远都是完成一个返回一个,然后消费一个。这样你的程序才更为高效。

主线程收到返回后,可以再继续使用 CompletionService 来异步执行下一步的逻辑,这和非阻塞的编程方式异曲同工。

CompletionService 源码分析

CompletionService 构造方法

我们先看如何初始化 CompletionService:

ExecutorService pool = Executors.newFixedThreadPool(5);
CompletionService<String> service = new ExecutorCompletionService<String>(pool);

首先初始化 ExecutorService,在构造 ExecutorCompletionService 时作为参数传入。其实 CompletionService 对任务的执行其实就是借助于 ExecutorService 来完成的。接下来我们进入它的构造函数:

public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

构造函数中构造了三个属性:

private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;

executor 就是你传入的 ExecutorService,用来执行任务。

aes 的作用是创建新的 task。它的初始化过程比较有意思,判断了是否为 AbstractExecutorService 的实例。至于为什么这么做,我们后面再详细讲解。

completionQueue 是一个存放 Future 的阻塞队列,并且是无界的。这意味着如果源头不断的产生 Future,但是没有去消费,就会造成内存泄漏。

executor 执行完成的 Future 会被放入 completionQueue 中,take 方法将会从

completionQueue 中取得最新的 future 对象(最近执行完的 task 的结果)。

CompletionService 的 submit 方法

public Future<V> submit(Callable<V> task) {
    if (task == nullthrow new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture(f));
    return f;
}

首先将 Callable 类型的 task 转为 RunnableFuture 类型。RunnableFuture 是个接口,FutureTask 是其一种实现。

然后通过 new QueueingFuture (f),再将 RunnableFuture 包装为 QueueingFuture 类型的对象。QueueingFuture 的作用就是在 Future 完成时,加入到 completionQueue 中。

我们先看 newTaskFor 的源码:

private RunnableFuture<V> newTaskFor(Callable<V> task) {
    if (aes == null)
        return new FutureTask<V>(task);
    else
        return aes.newTaskFor(task);
}

如果 aes 为空,那么直接 new FutureTask。如果不为空则调用 aes 的 newTaskFor 方法。什么情况 aes 会为空呢?我们再看下 aes 初始化的代码:

this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;

当传入的 executor 为 AbstractExecutorService 类型时,那么 aes 不为空。否则 aes 为空。这两处逻辑处理是相关联的,这么做的原因如下:

  1. 如果 executor 是 AbstractExecutorService 的子类,有可能会重写 newTaskFor 方法,所以这里优先使用 executo r 的方法来创建 Task,这样后面通过 executor 执行 task 才能正确。比如 ForkJoinPool 就对 newTaskFor 方法进行了重写;
  2. 如果 executor 不是继承自 AbstractExecutorService。那么它可能并没有 newTaskFor 方法。所以需要 CompletionService 自己来创建 FutureTask。

这样看来 aes 的存在,只是为了尽量使用 executor 提供的 newTaskFor 方法来创建 task,以使后面 excute 方法能够正常运行。

接下来我们分析 QueueingFuture 方法:

private class QueueingFuture extends FutureTask<Void{
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}

QueueingFuture 是内部静态类,并且是 FutureTask 的子类。他只是重写了 done 方法。大家回忆上一节对 Future 的分析,应该还记得 done 方法在任务执行结果返回后被调用,但是留给子类来实。这里就用上了这个特性。done 方法里面做的就是把 task 加入阻塞队列中。这意味着,先完成的 task 会先把自己的 Future 放入队列中。那么当然也会被 take 方法先取到。而由于是阻塞队列,所以 take 方法取不到 task 时,就会阻塞。但由于能被 take 到的 task 肯定已经有了返回值,所以调用 task 的 get 方法时就不会再次阻塞了。也就是说 client 代码中的下面一行只会在 take 时发生阻塞:

while((result=service.take().get())!=null){
        System.out.println("吃掉"+result);
}

executor 执行任务的代码就不用再次分析了,这在之前学习 Executor 的时候已经详细分析过了。submit 方法分析完后我们再来看看 take 方法。

CompletionService 的 take 方法

相比较 submit 方法,take 方法就更为简单了,如下:

public Future<V> take() throws InterruptedException {
    return completionQueue.take();
}

只有一行代码,就是从 completionQueue 中取得 Futrue 对象。由于 completionQueue 是阻塞队列,当没有 Future 时,就会阻塞在此。而 completionQueue 中保存 Future 的顺序是完成顺序。

总结

CompletionService 给我们提供了一种非阻塞的异步执行方式。让程序更为高效。他的实现非常的简单和巧妙,值得我们借鉴。其实我们学习到这里,不知道你是否有这种体会,这些工具实际上就是我们之前学习内容的组合运用,如果前面你掌握的很牢固,学习起来一点也不费劲。如果前面就似懂非懂,那么就会越看越糊涂。其实我们在学习上至少有一半的时间都是在打基础,但这个过程必不可少,并且受益更为深远。

浏览 25
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报