Flink 用户代码如何生成 StreamGraph (1)

Flink 是如何将你写的代码生成 StreamGraph 的(1)

一、絮叨两句

新的一年又来了,不知道大家有没有立几个每年都完不成的 FLAG ?

反正我立了,我今年给自己立的 FLAG 是大致阅读大数据几个框架的源码。

为什么要“大致”阅读,因为这些牛逼的框架都是层层封装,搞懂核心原理已经是很不易,更别谈熟读源码了。

但是目标还是要有的,我也不要当一条咸鱼。

之前几篇源码阅读的文章,不知道大家有没有亲自动手打开 Idea 去试一试,这里我再贴一下文章链接,大家可以再回顾一下。

本次,我们来聊一聊,我们自己写的代码是如何变成 StreamGraph 的。

二、引出问题

开始之前,不妨稍微回顾一下……

一般我们执行一个 Flink 程序,都是使用命令行 flink run(flink 界面上执行的时候,也是在调用 flink run 命令来执行的)来执行,然后shell 会使用 java 命令,执行到 CliFrontend 类的 main 方法。

main 方法里面,首先会解析用户的输入参数,解析 flink-conf.yml 配置文件,解压出用户 jar 包里的依赖,以及其他的信息,都封装到 PackagedProgram 对象中。然后切换当前线程的类加载器为 UserCodeClassLoader,这个类加载器自定义了一些策略(Child-First 或者 Parent-First),使用这个类加载器去反射执行用户代码的 main 方法。

然后今天的故事就从这里开始。

首先我们贴一段 Flink 自带的 Example 里的代码(稍稍简化了代码,去掉了无关的逻辑):

 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 DataStream<String> text = env.fromElements(WordCountData.WORDS);
 ​
 DataStream<Tuple2<String, Integer>> counts =
     text.flatMap(new Tokenizer())
    .keyBy(value -> value.f0).sum(1);
 ​
 counts.print();

它是如何变成这张图的:

image-20210218202440403

这张图是一个有向无环图,组成有向无环图的就是顶点信息,以及边的信息。

这些信息被封装在 StreamGraph 类之中,这个类中有三个非常重要的属性:

 private Map<Integer, StreamNode> streamNodes;
 private Set<Integer> sources;
 private Set<Integer> sinks;

可以看到这几个属性记录了这个 Graph 中有几个节点,几个是 sources,几个是 sinks。

其中 StreamNode 是对节点的封装,节点上有几个重要的属性如下:

 private final String operatorName;
 private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
 private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();

operatorName 表示节点的名字,inEdges 表示这个节点上游的边,outEdges 表示这个节点下游的边。

然后,StreamEdge 是对边的封装,只有输入节点 id 和目标节点 id:

 private final int sourceId;
 private final int targetId;

这三个类的这几个属性就描述了刚刚的那张图。

三、记住一个非常重要的属性

它就是 StreamExecutionEnvironment 类的 transformations 属性:

 protected final List<Transformation<?>> transformations = new ArrayList<>();

什么是 Transformation,Transformation 就是 Flink 对我们写的算子的额外信息的封装,比如算子的名字,id,输出类型,输入,并行度等等这些信息。

有些算子最终会调用 this.tranformations.add() 加入到列表里来,而有的不会。

四、从 env.fromElements() 开始

env.fromElements(),这是一个算子,这个算子定义了 source 信息,这个算子对应的 transformation 是 LegacySourceTransformation,里面记录了算子的id,名字,输出类型,并行度,有界还是无界等等信息。

最后这个方法返回的是一个 DataStreamSource 对象,这个对象的基类是 DataStream。DataStream 里有一个 transformation 属性。

也就是说 env.fromElements() 返回了一个 DataStream 对象,并且把它自身的 transformation 信息放到这个 DataStream 实例的属性里面了。

env.fromElements 这个算子是没有加入到 上面的 transformations 列表中去的。

四、FlatMap 算子源码分析

紧接着,上面的 env.fromElements 的返回值: DataStream 实例,调用了它自己的 flatMap 方法,flatMap 最终又调用了 doTransform 方法。

FlatMap 算子也是要构造一个 transformation 的,FlatMap 对应的 transformation 是 OneInputTransformation,这个类里有一个属性是 input,也就是 FlatMap 算子的输入信息。我们看一下它的构造方法

 public OneInputTransformation(
       Transformation<IN> input,
       String name,
       StreamOperatorFactory<OUT> operatorFactory,
       TypeInformation<OUT> outputType,
       int parallelism) {
    super(name, outputType, parallelism);
    this.input = input;
    this.operatorFactory = operatorFactory;
 }

再看一下调用信息

 OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
       this.transformation,
       operatorName,
       operatorFactory,
       outTypeInfo,
       environment.getParallelism());

也就是说,FlatMap 的 transformation 信息中,有一个 input 属性,其值是 env.fromElements 的 transformation。

通俗点讲就是,FlatMap 的 transformation 中记录了它的输入是 env.fromElements() 。

最后返回了 SingleOutputStreamOperator 对象,这里面封装了 FlatMap 的 transformation 信息。

我们可以 debug 到这里来看看它的返回值:

image-20210218210341278

然后需要关注的事情是,它最终调用了这个方法:

 getExecutionEnvironment().addOperator(resultTransform);
public void addOperator(Transformation<?> transformation) {
Preconditions.checkNotNull(transformation, "transformation must not be null.");
this.transformations.add(transformation);
}

也就是加入到了 transformations 列表中去。

FlatMap 最后返回了一个 SingleOutputStreamOperator 类,这个类也是 DataStream 的子类。

所以,看到这基本能够理解,我们写的代码,其实本质都是 Flink 封装后对外暴露的简单易用的 api,Flink 在背后做了大部分事情。

五、KeyBy 算子源码分析

keyBy 也是 DataStream 的一个方法,它 new 了一个 KeyedStream,并且把 this 传入了构造函数中,this 是什么?this 就是刚刚 FlatMap 的返回值,还记得吗?里面记录了 FlatMap 的 transformation。

keyBy 对应的 transformation 是 PartitionTransformation,里面也有 input 属性,直接把 this.getTransformation() 传给了 input 了。

我们来 debug 看一下返回值:

image-20210218211419117

有点像套娃,一层又一层的。

需要注意的是,KeyBy 只是一个虚拟的节点,它并没有加入到 transformations 列表中来。

六、sum 算子的源码分析

这个我们就不细看了,套路都差不多了,直接 debug 看一下返回值:

image-20210218211640400

sum 算子有调用这个方法:

getExecutionEnvironment().addOperator(reduce);

加入到了 transformations 属性中来。

七、sink 算子的源码分析

和 sum 一样,我们直接 debug 一下最终的结果:

image-20210218212528803

可见 sink 中,也套娃式的记录了所有的 input。

最后,sink 也调用了

getExecutionEnvironment().addOperator(sink.getTransformation());

原创文章,作者:kk,如若转载,请注明出处:http://www.wangkai123.com/64/

(0)
上一篇 2022-06-19 21:54
下一篇 2022-06-19 21:55

相关推荐

  • Hive执行流程源码解析

    最近在出差,客户现场的 HiveServer 在很长时间内不可用,查看 CM 的监控发现,HiveServer 的内存在某一时刻暴涨,同时 JVM 开始 GC,每次 GC 长达 1…

    技术 2022-06-18
  • Flink-Clients 源码阅读(2)

    一、我们本次的目的是什么? 这次我们的目的是,在本地的 IDEA 中去 debug flink-clients 代码,然后远程提交给 flink standalone 集群上去执行…

    Flink 2022-06-19
  • Flink 用户代码如何生成 StreamGraph(下)

    九、一小段源码 上次说到了所有的算子都会转化成 transformation ,并放到一个 List 列表中,那么今天我们开始遍历这个列表,来生成 StreamGraph。 打开这…

    Flink 2022-06-19
  • Flink 异步 I/O 解析

    一、简介 我们知道 flink 对于外部数据源的操作可以通过自带的连接器,或者自定义 sink 和 source 实现数据的交互。 那么为啥还需要异步 IO 呢?那时因为对于实时处…

    Flink 2022-06-19
  • Redis 原生支持 JSON 了

    号外 喜大普奔,Redis 官方支持 JSON 操作了,当我看到这个消息时,我的脑海中立马闪过这些操作: 把一个对象序列化成 json 字符串; 塞到 redis 的一个键上; 用…

    技术 2022-06-18
  • Flink 源码阅读准备之 – SPI 和 ClassLoader

    一、本文大纲 二、ClassLoader 类加载器 1、Java 中的类加载器以及双亲委派机制 Java 中的类加载器,是 Java 运行时环境的一部分,负责动态加载 Java 类…

    Flink 2022-06-19
  • Flink-Clients 模块源码阅读

    本文大纲 一、Flink 官方文档这么全面,为什么还要读 Flink 源码 读文档和读源码的目的是不一样的,就拿 Apache Flink 这个项目来说,如果你想知道 Flink …

    Flink 2022-06-19
  • linux 的输入(出)重定向和管道符

    一、输入输出重定向 标准输入重定向(STDIN,文件描述符为0):默认从键盘输入,也可从其他文件或者命令中输入 标准输出重定向(STDOUT,文件描述符为1):默认输出到屏幕 错误…

    Linux 2022-07-03
  • Flink Sql 核心概念剖析

    本次,我们从 0 开始逐步剖析 Flink SQL 的来龙去脉以及核心概念,并附带完整的示例程序,希望对大家有帮助! 本文大纲 一、快速体验 Flink SQL 为了快速搭建环境体…

    Flink 2022-06-19
  • Flink 异步编程模型介绍

    本次我们来实现一个支持百万并发连接数的采集服务器,并且找到异步+ NIO 的编程方式和实时流模型的那些千丝万缕的联系。 一、需求说明 简单的需求功能如下: 数据采集是一个 web …

    Flink 2022-06-19

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注