Flink 用户代码如何生成 StreamGraph(下)

九、一小段源码

上次说到了所有的算子都会转化成 transformation ,并放到一个 List 列表中,那么今天我们开始遍历这个列表,来生成 StreamGraph。

打开这个类 StreamGraphGenerator,generate() 方法(252行),StreamGraph 生成的逻辑就是从这里开始的。

里面有一个 for 循环,遍历的就是上次说到的那个非常重要的 transformations 列表:

 for (Transformation<?> transformation: transformations) {
    transform(transformation);
 }

然后看 transform 方法(无关的逻辑被我精简掉了),这个方法的作用是,使用不同的转换器,把算子生成的 transformations,转换成 StreamGraph 。

 private Collection<Integer> transform(Transformation<?> transform) {
    if (alreadyTransformed.containsKey(transform)) {
       return alreadyTransformed.get(transform);
    }
   
    @SuppressWarnings("unchecked")
    final TransformationTranslator<?, Transformation<?>> translator =
          (TransformationTranslator<?, Transformation<?>>) translatorMap.get(transform.getClass());
 ​
    Collection<Integer> transformedIds;
    if (translator != null) {
       transformedIds = translate(translator, transform);
    } else {
       transformedIds = legacyTransform(transform);
    }
 ​
    if (!alreadyTransformed.containsKey(transform)) {
       alreadyTransformed.put(transform, transformedIds);
    }
    return transformedIds;
 }

首先,看下这个方法的返回值,是一个 Collection<Integer> 类型,也就是说,转换完之后,会返回本次转换的 id。

首先,要获得一个 translator 转换器,可以看到在 static 静态块里,为每一种不同的 transformation 设置了不同的 translator。

image-20210219211155784

获取到转换器之后,进入 translate 方法中,translate 方法有这样一个方法,getParentInputIds(),这是一个很神奇的方法,他是在递归。

 final List<Collection<Integer>> allInputIds = getParentInputIds(transform.getInputs());

赶紧点进去看看:

 private List<Collection<Integer>> getParentInputIds(
      @Nullable final Collection<Transformation<?>> parentTransformations) {
    final List<Collection<Integer>> allInputIds = new ArrayList<>();
    if (parentTransformations == null) {
      return allInputIds;
    }
 ​
    for (Transformation<?> transformation : parentTransformations) {
      allInputIds.add(transform(transformation));
    }
    return allInputIds;
 }

可以看到,它的输入参数是这个 transformation 的 input,然后 for 循环遍历这个 input,for 循环里面又在调用 transform 方法。这就是在递归调用了。

既然是递归调用,那么递归的终止条件是什么呢?

我一开始也很懵啊,debug 的时候,一直在循环往复,头有点大。静下来仔细 debug 了一下,发现终止条件就是:如果没有 input,那就不走到 for 循环里面来,也就直接返回了,这就是终止条件了。

那么,为什么要搞这样的递归调用?

目的就是,在转换一个算子的 transformation 的时候,要把它的上游先转换掉,也就是要从最开始的那个输入开始转换,这样才能顺利的构造出 DAG。

可能看到这,还是很迷茫,没关系,我们拿出具体的数据说话。

十、我们来 debug 一下

下面这个图是当前 transformations 的三个元素

image-20210219212638696

下面的这个是每一个 transformation 的父子关系,括号里是算子的 id,右边是它的 input。

  • Flat Map(2) – Collection Source(1)
  • Keyed Aggregation(4) – Partition(3) – Flat Map(2) – Collection Source(1)
  • Print to Std. out(5) – Keyed Aggregation(4) – Partition(3) – Flat Map(2) – Collection Source(1)

我们从这个 for 循环开始:

image-20210219213224144

当前 transform 方法中,Flat Map 算子作为入参。

它的调用链是:transform -> translate -> getParentInputs -> 遍历 Flat Map 的 inputs ,然后调用 transform 方法

image-20210219213511687

可以看到当前又是在 transform 方法中,但是输入参数是 Collection Source,也就是 Flat Map 的 input。

然后又是依次进入:transform -> translate -> getParentInputs -> 遍历 Collection Source 的 inputs

这个时候,Collection Source 是没有 input 的,所以本次递归就返回了,开始转换 Collection Source。

是用的 LegacySourceTransformationTranslator 这个转换器来转换的,最终就是做了这么一件事,new 了一个 StreamNode,放入了 StreamNodes 的 Map 中。

 StreamNode vertex = new StreamNode(
      vertexID,
      slotSharingGroup,
      coLocationGroup,
      operatorFactory,
      operatorName,
      vertexClass);
 ​
 streamNodes.put(vertexID, vertex);

那么 Collection Source 就处理完了,由于是递归遍历到根节点,那么肯定是会有重复的,所以,已经转换过的,要缓存起来,放到一个 Map 中,下次遇到同样的,就直接跳过了。

Collection Source 处理完了之后,也就是 Flat Map 的 input 处理完了

image-20210219214229833

下面要回来进入 FlatMap 的 translate 方法了(这就是在递归,处理 Flat Map 时,要先处理 Collection Source,等到把 Collection Source 处理完了,再继续回来处理 Flat Map)

FlatMap 是用的 OneInputTransformationTranslator 转换器来转换的。

可以看到它也是一样,new 了一个 StreamNode ,加入到了 streamNodes 列表中。

但是!它还做了另外一件事,那就是:

image-20210219214701839

它还要处理自己的 ParentTransformation,也就是 Collection Source,来构造一个边 Edge。

image-20210219214927858

可以看到这个边,是有方向的,从 Collection Source 到 FlatMap。

然后把这个边放到 Collection Source 的 outEdge 中;再放到 FlatMap 的 inEdge 中。

这样就记录了算子的流向。

这样,FlatMap 就算转换完成了。放入缓存中。

这样对于 transformations 的 一次 for 循环就结束了。

然后开始处理 Keyed Aggregation ,也是一样的流程,先处理它的 input,从最上面一层层往下处理,这里我们就不细说了。

image-20210219215152371

十一、最终结果

image-20210219215412696

最终生成的 StreamGraph中,重要的就是这个 StreamNodes,一共有四个:

image-20210219215450574

每一个 Node 里面有 InEdge,表示这个节点的上游节点是哪个;有 outEdge,表示这个节点的下游节点是哪个。

还有 sources 表示是源,sinks 表示是目标。

十二、小结

好了,本次的 StreamGraph 的 debug 就到这了。

阅读这部分的代码,给我感触最深的就是,要关注主要矛盾,忽略次要分支,才能把脉络梳理清楚,否则就会深陷泥潭,不仅自己没有成就感,而且还耽误了时间。

当然,生成 StreamGraph 的过程中,还有诸多细节,这里我不打算再深究了,如果日后有什么需要,再来看这块其他的代码。

下一次,就是具体的提交任务的过程了,这个过程需要涉及到 Java 的异步编程,所以再安排一次阅读源码必备知识之 Java 异步编程,拭目以待!

原创文章,作者:kk,如若转载,请注明出处:http://www.wangkai123.com/66/

(0)
上一篇 2022-06-19 21:54
下一篇 2022-06-19 21:55

相关推荐

  • Flink 异步 I/O 解析

    一、简介 我们知道 flink 对于外部数据源的操作可以通过自带的连接器,或者自定义 sink 和 source 实现数据的交互。 那么为啥还需要异步 IO 呢?那时因为对于实时处…

    Flink 2022-06-19
  • Flink 源码阅读准备之 – Java8 异步编程

    阅读 Flink 源码前必会的知识 Java8 异步编程 本文大纲速看 一、异步编程 通常来说,程序都是顺序执行,同一时刻只会发生一件事情。如果一个函数依赖于另一个函数的结果,它只…

    Flink 2022-06-19
  • Hive执行流程源码解析

    最近在出差,客户现场的 HiveServer 在很长时间内不可用,查看 CM 的监控发现,HiveServer 的内存在某一时刻暴涨,同时 JVM 开始 GC,每次 GC 长达 1…

    技术 2022-06-18
  • Flink 用户代码如何生成 StreamGraph (1)

    Flink 是如何将你写的代码生成 StreamGraph 的(1) 一、絮叨两句 新的一年又来了,不知道大家有没有立几个每年都完不成的 FLAG ? 反正我立了,我今年给自己立的…

    Flink 2022-06-19
  • Flink 异步编程模型介绍

    本次我们来实现一个支持百万并发连接数的采集服务器,并且找到异步+ NIO 的编程方式和实时流模型的那些千丝万缕的联系。 一、需求说明 简单的需求功能如下: 数据采集是一个 web …

    Flink 2022-06-19
  • Flink metrics 介绍和应用

    一、Flink metrics简介 Flink 的 metrics 是 Flink 公开的一个度量系统,metrics 也可以暴露给外部系统,通过在 Flink 配置文件 conf…

    Flink 2022-06-19
  • Flink-Clients 模块源码阅读

    本文大纲 一、Flink 官方文档这么全面,为什么还要读 Flink 源码 读文档和读源码的目的是不一样的,就拿 Apache Flink 这个项目来说,如果你想知道 Flink …

    Flink 2022-06-19
  • linux 的输入(出)重定向和管道符

    一、输入输出重定向 标准输入重定向(STDIN,文件描述符为0):默认从键盘输入,也可从其他文件或者命令中输入 标准输出重定向(STDOUT,文件描述符为1):默认输出到屏幕 错误…

    Linux 2022-07-03
  • vim 的使用

    一、命令模式中最常用的一些命令 dd 删除(剪切)光标所在行 5dd 删除(剪切)从光标处开始的5行 yy 复制光标所在的整行 5yy 复制从光标处开始的5行 n 显示搜索命令定位…

    Linux 2022-07-03
  • Flink-Clients 源码阅读(2)

    一、我们本次的目的是什么? 这次我们的目的是,在本地的 IDEA 中去 debug flink-clients 代码,然后远程提交给 flink standalone 集群上去执行…

    Flink 2022-06-19

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注