九、一小段源码
上次说到了所有的算子都会转化成 transformation ,并放到一个 List 列表中,那么今天我们开始遍历这个列表,来生成 StreamGraph。
打开这个类 StreamGraphGenerator,generate() 方法(252行),StreamGraph 生成的逻辑就是从这里开始的。
里面有一个 for 循环,遍历的就是上次说到的那个非常重要的 transformations 列表:
for (Transformation<?> transformation: transformations) {
transform(transformation);
}
然后看 transform 方法(无关的逻辑被我精简掉了),这个方法的作用是,使用不同的转换器,把算子生成的 transformations,转换成 StreamGraph 。
private Collection<Integer> transform(Transformation<?> transform) {
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
@SuppressWarnings("unchecked")
final TransformationTranslator<?, Transformation<?>> translator =
(TransformationTranslator<?, Transformation<?>>) translatorMap.get(transform.getClass());
Collection<Integer> transformedIds;
if (translator != null) {
transformedIds = translate(translator, transform);
} else {
transformedIds = legacyTransform(transform);
}
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
return transformedIds;
}
首先,看下这个方法的返回值,是一个 Collection<Integer> 类型,也就是说,转换完之后,会返回本次转换的 id。
首先,要获得一个 translator 转换器,可以看到在 static 静态块里,为每一种不同的 transformation 设置了不同的 translator。
获取到转换器之后,进入 translate 方法中,translate 方法有这样一个方法,getParentInputIds(),这是一个很神奇的方法,他是在递归。
final List<Collection<Integer>> allInputIds = getParentInputIds(transform.getInputs());
赶紧点进去看看:
private List<Collection<Integer>> getParentInputIds(
@Nullable final Collection<Transformation<?>> parentTransformations) {
final List<Collection<Integer>> allInputIds = new ArrayList<>();
if (parentTransformations == null) {
return allInputIds;
}
for (Transformation<?> transformation : parentTransformations) {
allInputIds.add(transform(transformation));
}
return allInputIds;
}
可以看到,它的输入参数是这个 transformation 的 input,然后 for 循环遍历这个 input,for 循环里面又在调用 transform 方法。这就是在递归调用了。
既然是递归调用,那么递归的终止条件是什么呢?
我一开始也很懵啊,debug 的时候,一直在循环往复,头有点大。静下来仔细 debug 了一下,发现终止条件就是:如果没有 input,那就不走到 for 循环里面来,也就直接返回了,这就是终止条件了。
那么,为什么要搞这样的递归调用?
目的就是,在转换一个算子的 transformation 的时候,要把它的上游先转换掉,也就是要从最开始的那个输入开始转换,这样才能顺利的构造出 DAG。
可能看到这,还是很迷茫,没关系,我们拿出具体的数据说话。
十、我们来 debug 一下
下面这个图是当前 transformations 的三个元素
下面的这个是每一个 transformation 的父子关系,括号里是算子的 id,右边是它的 input。
- Flat Map(2) – Collection Source(1)
- Keyed Aggregation(4) – Partition(3) – Flat Map(2) – Collection Source(1)
- Print to Std. out(5) – Keyed Aggregation(4) – Partition(3) – Flat Map(2) – Collection Source(1)
我们从这个 for 循环开始:
当前 transform 方法中,Flat Map 算子作为入参。
它的调用链是:transform -> translate -> getParentInputs -> 遍历 Flat Map 的 inputs ,然后调用 transform 方法
可以看到当前又是在 transform 方法中,但是输入参数是 Collection Source,也就是 Flat Map 的 input。
然后又是依次进入:transform -> translate -> getParentInputs -> 遍历 Collection Source 的 inputs
这个时候,Collection Source 是没有 input 的,所以本次递归就返回了,开始转换 Collection Source。
是用的 LegacySourceTransformationTranslator 这个转换器来转换的,最终就是做了这么一件事,new 了一个 StreamNode,放入了 StreamNodes 的 Map 中。
StreamNode vertex = new StreamNode(
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
operatorName,
vertexClass);
streamNodes.put(vertexID, vertex);
那么 Collection Source 就处理完了,由于是递归遍历到根节点,那么肯定是会有重复的,所以,已经转换过的,要缓存起来,放到一个 Map 中,下次遇到同样的,就直接跳过了。
Collection Source 处理完了之后,也就是 Flat Map 的 input 处理完了
下面要回来进入 FlatMap 的 translate 方法了(这就是在递归,处理 Flat Map 时,要先处理 Collection Source,等到把 Collection Source 处理完了,再继续回来处理 Flat Map)
FlatMap 是用的 OneInputTransformationTranslator 转换器来转换的。
可以看到它也是一样,new 了一个 StreamNode ,加入到了 streamNodes 列表中。
但是!它还做了另外一件事,那就是:
它还要处理自己的 ParentTransformation,也就是 Collection Source,来构造一个边 Edge。
可以看到这个边,是有方向的,从 Collection Source 到 FlatMap。
然后把这个边放到 Collection Source 的 outEdge 中;再放到 FlatMap 的 inEdge 中。
这样就记录了算子的流向。
这样,FlatMap 就算转换完成了。放入缓存中。
这样对于 transformations 的 一次 for 循环就结束了。
然后开始处理 Keyed Aggregation ,也是一样的流程,先处理它的 input,从最上面一层层往下处理,这里我们就不细说了。
十一、最终结果
最终生成的 StreamGraph中,重要的就是这个 StreamNodes,一共有四个:
每一个 Node 里面有 InEdge,表示这个节点的上游节点是哪个;有 outEdge,表示这个节点的下游节点是哪个。
还有 sources 表示是源,sinks 表示是目标。
十二、小结
好了,本次的 StreamGraph 的 debug 就到这了。
阅读这部分的代码,给我感触最深的就是,要关注主要矛盾,忽略次要分支,才能把脉络梳理清楚,否则就会深陷泥潭,不仅自己没有成就感,而且还耽误了时间。
当然,生成 StreamGraph 的过程中,还有诸多细节,这里我不打算再深究了,如果日后有什么需要,再来看这块其他的代码。
下一次,就是具体的提交任务的过程了,这个过程需要涉及到 Java 的异步编程,所以再安排一次阅读源码必备知识之 Java 异步编程,拭目以待!
原创文章,作者:kk,如若转载,请注明出处:http://www.wangkai123.com/66/