Flink 是如何将你写的代码生成 StreamGraph 的(1)
一、絮叨两句
新的一年又来了,不知道大家有没有立几个每年都完不成的 FLAG ?
反正我立了,我今年给自己立的 FLAG 是大致阅读大数据几个框架的源码。
为什么要“大致”阅读,因为这些牛逼的框架都是层层封装,搞懂核心原理已经是很不易,更别谈熟读源码了。
但是目标还是要有的,我也不要当一条咸鱼。
之前几篇源码阅读的文章,不知道大家有没有亲自动手打开 Idea 去试一试,这里我再贴一下文章链接,大家可以再回顾一下。
本次,我们来聊一聊,我们自己写的代码是如何变成 StreamGraph 的。
二、引出问题
开始之前,不妨稍微回顾一下……
一般我们执行一个 Flink 程序,都是使用命令行 flink run(flink 界面上执行的时候,也是在调用 flink run 命令来执行的)来执行,然后shell 会使用 java 命令,执行到 CliFrontend 类的 main 方法。
main 方法里面,首先会解析用户的输入参数,解析 flink-conf.yml 配置文件,解压出用户 jar 包里的依赖,以及其他的信息,都封装到 PackagedProgram 对象中。然后切换当前线程的类加载器为 UserCodeClassLoader,这个类加载器自定义了一些策略(Child-First 或者 Parent-First),使用这个类加载器去反射执行用户代码的 main 方法。
然后今天的故事就从这里开始。
首先我们贴一段 Flink 自带的 Example 里的代码(稍稍简化了代码,去掉了无关的逻辑):
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromElements(WordCountData.WORDS);
DataStream<Tuple2<String, Integer>> counts =
text.flatMap(new Tokenizer())
.keyBy(value -> value.f0).sum(1);
counts.print();
它是如何变成这张图的:
这张图是一个有向无环图,组成有向无环图的就是顶点信息,以及边的信息。
这些信息被封装在 StreamGraph 类之中,这个类中有三个非常重要的属性:
private Map<Integer, StreamNode> streamNodes;
private Set<Integer> sources;
private Set<Integer> sinks;
可以看到这几个属性记录了这个 Graph 中有几个节点,几个是 sources,几个是 sinks。
其中 StreamNode 是对节点的封装,节点上有几个重要的属性如下:
private final String operatorName;
private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
operatorName 表示节点的名字,inEdges 表示这个节点上游的边,outEdges 表示这个节点下游的边。
然后,StreamEdge 是对边的封装,只有输入节点 id 和目标节点 id:
private final int sourceId;
private final int targetId;
这三个类的这几个属性就描述了刚刚的那张图。
三、记住一个非常重要的属性
它就是 StreamExecutionEnvironment 类的 transformations 属性:
protected final List<Transformation<?>> transformations = new ArrayList<>();
什么是 Transformation,Transformation 就是 Flink 对我们写的算子的额外信息的封装,比如算子的名字,id,输出类型,输入,并行度等等这些信息。
有些算子最终会调用 this.tranformations.add() 加入到列表里来,而有的不会。
四、从 env.fromElements() 开始
env.fromElements(),这是一个算子,这个算子定义了 source 信息,这个算子对应的 transformation 是 LegacySourceTransformation,里面记录了算子的id,名字,输出类型,并行度,有界还是无界等等信息。
最后这个方法返回的是一个 DataStreamSource 对象,这个对象的基类是 DataStream。DataStream 里有一个 transformation 属性。
也就是说 env.fromElements() 返回了一个 DataStream 对象,并且把它自身的 transformation 信息放到这个 DataStream 实例的属性里面了。
env.fromElements 这个算子是没有加入到 上面的 transformations 列表中去的。
四、FlatMap 算子源码分析
紧接着,上面的 env.fromElements 的返回值: DataStream 实例,调用了它自己的 flatMap 方法,flatMap 最终又调用了 doTransform 方法。
FlatMap 算子也是要构造一个 transformation 的,FlatMap 对应的 transformation 是 OneInputTransformation,这个类里有一个属性是 input,也就是 FlatMap 算子的输入信息。我们看一下它的构造方法
public OneInputTransformation(
Transformation<IN> input,
String name,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<OUT> outputType,
int parallelism) {
super(name, outputType, parallelism);
this.input = input;
this.operatorFactory = operatorFactory;
}
再看一下调用信息
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
也就是说,FlatMap 的 transformation 信息中,有一个 input 属性,其值是 env.fromElements 的 transformation。
通俗点讲就是,FlatMap 的 transformation 中记录了它的输入是 env.fromElements() 。
最后返回了 SingleOutputStreamOperator 对象,这里面封装了 FlatMap 的 transformation 信息。
我们可以 debug 到这里来看看它的返回值:
然后需要关注的事情是,它最终调用了这个方法:
getExecutionEnvironment().addOperator(resultTransform);
public void addOperator(Transformation<?> transformation) {
Preconditions.checkNotNull(transformation, "transformation must not be null.");
this.transformations.add(transformation);
}
也就是加入到了 transformations 列表中去。
FlatMap 最后返回了一个 SingleOutputStreamOperator 类,这个类也是 DataStream 的子类。
所以,看到这基本能够理解,我们写的代码,其实本质都是 Flink 封装后对外暴露的简单易用的 api,Flink 在背后做了大部分事情。
五、KeyBy 算子源码分析
keyBy 也是 DataStream 的一个方法,它 new 了一个 KeyedStream,并且把 this 传入了构造函数中,this 是什么?this 就是刚刚 FlatMap 的返回值,还记得吗?里面记录了 FlatMap 的 transformation。
keyBy 对应的 transformation 是 PartitionTransformation,里面也有 input 属性,直接把 this.getTransformation() 传给了 input 了。
我们来 debug 看一下返回值:
有点像套娃,一层又一层的。
需要注意的是,KeyBy 只是一个虚拟的节点,它并没有加入到 transformations 列表中来。
六、sum 算子的源码分析
这个我们就不细看了,套路都差不多了,直接 debug 看一下返回值:
sum 算子有调用这个方法:
getExecutionEnvironment().addOperator(reduce);
加入到了 transformations 属性中来。
七、sink 算子的源码分析
和 sum 一样,我们直接 debug 一下最终的结果:
可见 sink 中,也套娃式的记录了所有的 input。
最后,sink 也调用了
getExecutionEnvironment().addOperator(sink.getTransformation());
原创文章,作者:kk,如若转载,请注明出处:http://www.wangkai123.com/64/