Flink 源码阅读准备之 – Java8 异步编程

阅读 Flink 源码前必会的知识 Java8 异步编程

本文大纲速看

image-20210221133939967

一、异步编程

通常来说,程序都是顺序执行,同一时刻只会发生一件事情。如果一个函数依赖于另一个函数的结果,它只能等待那个函数结束才能继续执行,从用户角度来说,整个程序才算执行完毕。

但现在的计算机普遍拥有多核 CPU,在那里干等着毫无意义,完全可以在另一个处理器内核上干其他工作,耗时长的任务结束之后会主动通知你。这就是异步编程的出发点:充分使用多核 CPU 的优势,最大程度提高程序性能。

一句话来说:所谓异步编程,就是实现一个无需等待被调用函数的返回值而让操作继续运行的方法。

二、抛出一个问题:如何实现烧水泡茶的程序

image-20210221111012603

最后我们会使用传统方式和 Java8 异步编程方式分别实现,来对比一下实现复杂度。

三、Java5 的 Future 实现的异步编程

Future 是 Java 5 添加的类,用来描述一个异步计算的结果。你可以使用 isDone() 方法检查计算是否完成,或者使用 get() 方法阻塞住调用线程,直到计算完成返回结果,也可以使用 cancel() 方法停止任务的执行。

     public static void main(String[] args) throws InterruptedException, ExecutionException {
         ExecutorService es = Executors.newFixedThreadPool(5);
         Future<Integer> f = es.submit(() -> 100);
         System.out.println(f.get());
         es.shutdown();
    }

虽然 Future 提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时的获取结果。

当然,很多其他的语言采用回调的方式来实现异步编程,比如 Node.js;Java 的一些框架,比如 Netty,Google Guava 也扩展了 Future 接口,提供了很多回调的机制,封装了工具类,辅助异步编程开发。

Java 作为老牌编程语言,自然也不会落伍。在 Java 8 中,新增了一个包含 50 多个方法的类:CompletableFuture,提供了非常强大的 Future 扩展功能,可以帮助我们简化异步编程的复杂性,提供函数式编程的能力。

四、CompletableFuture 类功能概览

如下图是 CompletableFuture 实现的接口:

image-20210221110807354

它实现了 Future 接口,拥有 Future 所有的特性,比如可以使用 get() 方法获取返回值等;还实现了 CompletionStage 接口,这个接口有超过 40 个方法,功能太丰富了,它主要是为了编排任务的工作流。

我们可以把工作流和工作流之间的关系分类为三种:串行关系,并行关系,汇聚关系。

  • 串行关系
image-20210221111715862

提供了如下的 api 来实现(先大致浏览一遍):

 CompletionStage<R> thenApply(fn);
 CompletionStage<R> thenApplyAsync(fn);
 CompletionStage<Void> thenAccept(consumer);
 CompletionStage<Void> thenAcceptAsync(consumer);
 CompletionStage<Void> thenRun(action);
 CompletionStage<Void> thenRunAsync(action);
 CompletionStage<R> thenCompose(fn);
 CompletionStage<R> thenComposeAsync(fn);
  • 并行关系
image-20210221111729330

多线程异步执行就是并行关系

  • 汇聚关系
image-20210221111804626

汇聚关系,又分为 AND 汇聚关系和 OR 汇聚关系:

AND 汇聚关系,就是所有依赖的任务都完成之后再执行;OR 汇聚关系,就是依赖的任务中有一个执行完成,就开始执行。

AND 汇聚关系由这些接口表达:

 CompletionStage<R> thenCombine(other, fn);
 CompletionStage<R> thenCombineAsync(other, fn);
 CompletionStage<Void> thenAcceptBoth(other, consumer);
 CompletionStage<Void> thenAcceptBothAsync(other, consumer);
 CompletionStage<Void> runAfterBoth(other, action);
 CompletionStage<Void> runAfterBothAsync(other, action);

OR 汇聚关系由这些接口来表达:

 CompletionStage applyToEither(other, fn);
 CompletionStage applyToEitherAsync(other, fn);
 CompletionStage acceptEither(other, consumer);
 CompletionStage acceptEitherAsync(other, consumer);
 CompletionStage runAfterEither(other, action);
 CompletionStage runAfterEitherAsync(other, action);

五、CompletableFuture 接口精讲

1、提交执行的静态方法

方法名描述
runAsync(Runnable runnable)执行异步代码,使用 ForkJoinPool.commonPool() 作为它的线程池
runAsync(Runnable runnable, Executor executor)执行异步代码,使用指定的线程池
supplyAsync(Supplier<U> supplier)异步执行代码,有返回值,使用 ForkJoinPool.commonPool() 作为它的线程池
supplyAsync(Supplier<U> supplier, Executor executor)异步执行代码,有返回值,使用指定的线程池执行

上述四个方法,都是提交任务的,runAsync 方法需要传入一个实现了 Runnable 接口的方法,supplyAsync 需要传入一个实现了 Supplier 接口的方法,实现 get 方法,返回一个值。

(1)run 和 supply 的区别

run 就是执行一个方法,没有返回值,supply 执行一个方法,有返回值。

(2)一个参数和两个参数的区别

第二个参数是线程池,如果没有传,则使用自带的 ForkJoinPool.commonPool() 作为线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)

2、串行关系 api

这些 api 之间主要是能否获得前一个任务的返回值与自己是否有返回值的区别。

api是否可获得前一个任务的返回值是否有返回值
thenApply
thenAccept
thenRun不能
thenCompose
(1) thenApply 和 thenApplyAsync 使用

thenApply 和 thenApplyAsync 把两个并行的任务串行化,另一个任务在获得上一个任务的返回值之后,做一些加工和转换。它也是有返回值的。

 public class BasicFuture4 {
 ​
     @Data
     @AllArgsConstructor
     @ToString
     static class Student {
         private String name;
    }
     
     public static void main(String[] args) throws ExecutionException, InterruptedException {
         CompletableFuture<Student> future = CompletableFuture.supplyAsync(() -> "Jack")
                .thenApply(s -> s + " Smith")
                .thenApply(String::toUpperCase)
                .thenApplyAsync(Student::new);
         System.out.println(future.get());
    }
 ​
 }

结果可以看到,输入是一个字符串,拼接了一个字符串,转换成大写,new 了一个 Student 对象返回。

 BasicFuture4.Student(name=JACK SMITH)

和 thenApply 一起的还有 thenAccept 和 thenRun,thenAccept 能获得到前一个任务的返回值,但是自身没有返回值;thenRun 不能获得前一个任务的返回值,自身也没有返回值。

(2)thenApply 和 thenApplyAsync 的区别

这两个方法的区别,在于谁去执行任务。如果使用 thenApplyAsync,那么执行的线程是从 ForkJoinPool.commonPool() 或者自己定义的线程池中取线程去执行。如果使用 thenApply,又分两种情况,如果 supplyAsync 方法执行速度特别快,那么 thenApply 任务就使用主线程执行,如果 supplyAsync 执行速度特别慢,就是和 supplyAsync 执行线程一样。

可以使用下面的例子演示一下:

 package com.dsj361.future;
 ​
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 ​
 /**
  * @Author wangkai
  */
 public class BasicFuture8 {
 ​
     public static void main(String[] args) throws ExecutionException, InterruptedException {
         System.out.println("----------supplyAsync 执行很快");
         CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
             System.out.println(Thread.currentThread().getName());
             return "1";
        }).thenApply(s -> {
             System.out.println(Thread.currentThread().getName());
             return "2";
        });
         System.out.println(future1.get());
 ​
         System.out.println("----------supplyAsync 执行很慢");
         CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
             try {
                 Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
             System.out.println(Thread.currentThread().getName());
             return "1";
        }).thenApply(s -> {
             System.out.println(Thread.currentThread().getName());
             return "2";
        });
         System.out.println(future2.get());
    }
 }
 ​

执行结果:

 ----------supplyAsync 执行很快
 ForkJoinPool.commonPool-worker-1
 main
 2
 ----------supplyAsync 执行很慢
 ForkJoinPool.commonPool-worker-1
 ForkJoinPool.commonPool-worker-1
 2
(3)thenCompose 的使用

假设有两个异步任务,第二个任务想要获取第一个任务的返回值,并且做运算,我们可以用 thenCompose。此时使用 thenApply 也可以实现,看一段代码发现他们的区别:

public class BasicFuture9 {

public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = getLastOne().thenCompose(BasicFuture9::getLastTwo);
System.out.println(future.get());

CompletableFuture<CompletableFuture<String>> future2 = getLastOne().thenApply(s -> getLastTwo(s));
System.out.println(future2.get().get());
}

public static CompletableFuture<String> getLastOne(){
return CompletableFuture.supplyAsync(()-> "topOne");
}

public static CompletableFuture<String> getLastTwo(String s){
return CompletableFuture.supplyAsync(()-> s + " topTwo");
}
}

可以看到使用 thenApply 的时候,需要使用两个 get() 方法才能获取到最终的返回值,使用 thenCompose 只要一个即可。

3、And 汇聚关系 Api

(1)thenCombine 的使用

加入我们要计算两个异步方法返回值的和,就必须要等到两个异步任务都计算完才能求和,此时可以用 thenCombine 来完成。

public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> thenComposeOne = CompletableFuture.supplyAsync(() -> 192);
CompletableFuture<Integer> thenComposeTwo = CompletableFuture.supplyAsync(() -> 196);
CompletableFuture<Integer> thenComposeCount = thenComposeOne
.thenCombine(thenComposeTwo, (s, y) -> s + y);

thenComposeOne.thenAcceptBoth(thenComposeTwo,(s,y)-> System.out.println("thenAcceptBoth"));
thenComposeOne.runAfterBoth(thenComposeTwo, () -> System.out.println("runAfterBoth"));

System.out.println(thenComposeCount.get());
}

可以看到 thenCombine 第二个参数是一个 Function 函数,前面两个异步任务都完成之后,使用这个函数来完成一些运算。

(2)thenAcceptBoth

接收前面两个异步任务的结果,执行一个回调函数,但是这个回调函数没有返回值。

(3)runAfterBoth

接收前面两个异步任务的结果,但是回调函数,不接收参数,也不返回值。

4、Or 汇聚关系 Api

public class BasicFuture11 {

public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> thenComposeOne = CompletableFuture.supplyAsync(() -> 192);
CompletableFuture<Integer> thenComposeTwo = CompletableFuture.supplyAsync(() -> 196);
CompletableFuture<Integer> thenComposeCount = thenComposeOne
.applyToEither(thenComposeTwo, s -> s + 1);

thenComposeOne.acceptEither(thenComposeTwo,s -> {});

thenComposeOne.runAfterEither(thenComposeTwo,()->{});

System.out.println(thenComposeCount.get());
}
}
(1)applyToEither

任何一个执行完就执行回调方法,回调方法接收一个参数,有返回值

(2)acceptEither

任何一个执行完就执行回调方法,回调方法接收一个参数,无返回值

(3)runAfterEither

任何一个执行完就执行回调方法,回调方法不接收参数,也无返回值

5、处理异常

上面我们讲了如何把几个异步任务编排起来,执行一些串行或者汇聚操作。还有一个重要的地方,就是异常的处理。

先看下面的例子:

public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
System.out.println("execute one ");
return 100;
})
.thenApply(s -> 10 / 0)
.thenRun(() -> System.out.println("thenRun"))
.thenAccept(s -> System.out.println("thenAccept"));

CompletableFuture.runAsync(() -> System.out.println("other"));
}

结果:

execute one 
other

可以发现,只要链条上有一个任务发生了异常,这个链条下面的任务都不再执行了。

但是 main 方法上的接下来的代码还是会执行的。

所以这个时候,需要合理的去处理异常来完成一些收尾的工作。

public class BasicFuture12 {

public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
System.out.println("execute one ");
return 100;
})
.thenApply(s -> 10 / 0)
.thenRun(() -> System.out.println("thenRun"))
.thenAccept(s -> System.out.println("thenAccept"))
.exceptionally(s -> {
System.out.println("异常处理");
return null;
});

CompletableFuture.runAsync(() -> System.out.println("other"));
}
}

可以使用 exceptionally 来处理异常。

使用 handle() 方法也可以处理异常。但是 handle() 方法的不同之处在于,即使没有发生异常,也会执行。

六、烧水泡茶程序的实现

1、使用 Thread 多线程和 CountDownLatch 来实现

public class MakeTee {

private static CountDownLatch countDownLatch = new CountDownLatch(2);

static class HeatUpWater implements Runnable {

private CountDownLatch countDownLatch;

public HeatUpWater(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
System.out.println("洗水壶");
Thread.sleep(1000);
System.out.println("烧开水");
Thread.sleep(5000);
countDownLatch.countDown();
} catch (InterruptedException e) {
}

}
}

static class PrepareTee implements Runnable {
private CountDownLatch countDownLatch;

public PrepareTee(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
try {
System.out.println("洗茶壶");
Thread.sleep(1000);
System.out.println("洗茶杯");
Thread.sleep(1000);
System.out.println("拿茶叶");
Thread.sleep(1000);
countDownLatch.countDown();
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) throws InterruptedException {
new Thread(new HeatUpWater(countDownLatch) ).start();
new Thread(new PrepareTee(countDownLatch)).start();
countDownLatch.await();
System.out.println("准备就绪,开始泡茶");
}
}

这里我们使用两个线程,分别执行烧水和泡茶的程序,使用 CountDownLatch 来协调两个线程的进度,等到他们都执行完成之后,再执行泡茶的动作。

可以看到这种方法,多了很多不必要的代码,new Thread,人工维护 CountDownLatch 的进度。

2、使用 CompletableFuture 来实现

public class MakeTeeFuture {

public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
try {
System.out.println("洗水壶");
Thread.sleep(1000);
System.out.println("烧开水");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
try {
System.out.println("洗茶壶");
Thread.sleep(1000);
System.out.println("洗茶杯");
Thread.sleep(1000);
System.out.println("拿茶叶");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
CompletableFuture<Void> finish = future1.runAfterBoth(future2, () -> {
System.out.println("准备完毕,开始泡茶");
});
System.out.println(finish.get());
}
}

这个程序极度简单,无需手工维护线程,给任务分配线程的工作也不需要关注。

同时语义也更加清晰,future1.runAfterBoth(future2,……) 能够清晰的表述“任务 3 要等到任务 1 和任务 2 都完成之后才能继续开始”

然后代码更加简练并且专注于业务逻辑,几乎所有的代码都是业务逻辑相关的。

七、总结

本文介绍了异步编程的概念,以及 Java8 的 CompletableFuture 是如何优雅的处理多个异步任务之间的协调工作的。CompletableFuture 能够极大简化我们对于异步任务编排的工作,Flink 在提交任务时,也是使用这种异步任务的方式,去编排提交时和提交后对于任务状态处理的一些工作的。

相信读了本篇文章,会对于你日后的工作以及阅读 Flink 源码由很大的帮助的!

谢谢!

原创文章,作者:kk,如若转载,请注明出处:http://www.wangkai123.com/74/

(0)
上一篇 2022-06-19 21:57
下一篇 2022-06-19 21:58

相关推荐

  • vim 的使用

    一、命令模式中最常用的一些命令 dd 删除(剪切)光标所在行 5dd 删除(剪切)从光标处开始的5行 yy 复制光标所在的整行 5yy 复制从光标处开始的5行 n 显示搜索命令定位…

    Linux 2022-07-03
  • Flink Sql 核心概念剖析

    本次,我们从 0 开始逐步剖析 Flink SQL 的来龙去脉以及核心概念,并附带完整的示例程序,希望对大家有帮助! 本文大纲 一、快速体验 Flink SQL 为了快速搭建环境体…

    Flink 2022-06-19
  • linux 的输入(出)重定向和管道符

    一、输入输出重定向 标准输入重定向(STDIN,文件描述符为0):默认从键盘输入,也可从其他文件或者命令中输入 标准输出重定向(STDOUT,文件描述符为1):默认输出到屏幕 错误…

    Linux 2022-07-03
  • Flink 源码阅读准备之 – SPI 和 ClassLoader

    一、本文大纲 二、ClassLoader 类加载器 1、Java 中的类加载器以及双亲委派机制 Java 中的类加载器,是 Java 运行时环境的一部分,负责动态加载 Java 类…

    Flink 2022-06-19
  • Flink 异步编程模型介绍

    本次我们来实现一个支持百万并发连接数的采集服务器,并且找到异步+ NIO 的编程方式和实时流模型的那些千丝万缕的联系。 一、需求说明 简单的需求功能如下: 数据采集是一个 web …

    Flink 2022-06-19
  • Redis 原生支持 JSON 了

    号外 喜大普奔,Redis 官方支持 JSON 操作了,当我看到这个消息时,我的脑海中立马闪过这些操作: 把一个对象序列化成 json 字符串; 塞到 redis 的一个键上; 用…

    技术 2022-06-18
  • Flink metrics 介绍和应用

    一、Flink metrics简介 Flink 的 metrics 是 Flink 公开的一个度量系统,metrics 也可以暴露给外部系统,通过在 Flink 配置文件 conf…

    Flink 2022-06-19
  • Flink-Clients 源码阅读(2)

    一、我们本次的目的是什么? 这次我们的目的是,在本地的 IDEA 中去 debug flink-clients 代码,然后远程提交给 flink standalone 集群上去执行…

    Flink 2022-06-19
  • Flink 源码阅读准备之 – Apache Commons Cli

    一、介绍一下 Apache Commons Cli,有一个感官的认识 我们在使用一些开源项目时,会敲一些命令,有简单的,比如 hadoop version;有复杂的,比如 flin…

    Flink 2022-06-19
  • Flink 用户代码如何生成 StreamGraph (1)

    Flink 是如何将你写的代码生成 StreamGraph 的(1) 一、絮叨两句 新的一年又来了,不知道大家有没有立几个每年都完不成的 FLAG ? 反正我立了,我今年给自己立的…

    Flink 2022-06-19

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注