Flink 异步编程模型介绍

本次我们来实现一个支持百万并发连接数的采集服务器,并且找到异步+ NIO 的编程方式和实时流模型的那些千丝万缕的联系。

一、需求说明

简单的需求功能如下:

  • 数据采集是一个 web 服务器,可以接收 http 请求传上来的事件,事件是 json 格式的;
  • 收到之后,进行解码,校验字符串是否可以解码成 json 对象;
  • 对消息进行抽取、清洗、转化;
  • 最后发送到 kafka 中。

性能要求:

  • 支持百万并发连接数;
  • 对 CPU 资源和 IO 资源充分利用;image-20210303230936853

二、实现方案剖析

网页和手机端会产生一些埋点文件,通过 http 方式发送给采集服务器。涉及到网络连接,第一个想到的就是 Socket。

1、初始版本:使用 BIO 实现的客户端和服务端通信

我们很容易就可以用多线程快速实现一个 web 端服务器,模型图如下(为了节省篇幅,代码我就不写了,很简单,但不实用,百度有很多)。

简单描述一下:每次来一个请求,都创建一个线程来执行。如下图:

image-20210303210844982

但是弊端也显而易见,随随便便就能列出三点:

1.1 创建和销毁线程动作代价昂贵

Java 中的线程模型是基于操作系统原生线程模型实现的,也就是说 Java 中的线程其实是基于内核线程实现的,线程的创建,析构与同步都需要进行系统调用,而系统调用需要在用户态与内核中来回切换,代价相对较高;

1.2 线程本身占用大量的内存

Java 中,默认一个线程,线程栈大小是 1 M。一旦线程数过千,恐怕整个 Jvm 内存都会被吃掉一半;

1.3 线程切换成本是很高

操作系统在切换线程的时候,需要保留线程的上下文,然后再执行系统调用。如果线程数过多,可能执行线程的时间都会大于线程执行的时间,使系统陷于几乎不可用的状态;

2、改造版本:异步 + NIO 实现的高性能网络通信

这是 Java 中的 NIO 模型,如下图:

image-20210303230740838

可能你一下无法接受同时出现这么多陌生的概念(Acceptor,Selector,Channel),没关系,NIO 再慢慢学,这里我们只要抓住它的核心:

  • 使用了队列将请求接收器和工作线程隔开,让请求接收器和工作线程各自尽其所能的工作,更加充分的利用 IO 和 CPU 资源;
  • 现在,NIO 连接器能够保持的并发连接数,不再受限于工作线程数量,无需分配大量线程,就能支持大量并发连接了。

3、进阶版本,如何充分利用 CPU 和 网络 IO 资源

在第二步,我们解决了高并发连接数的问题,但是还远远不够。

在一个采集系统中,我们需要做这三件事情,解码,清洗转化,发送。

其中,解码和清洗转化过程纯粹是 CPU 计算,占用 CPU 资源,而发送会大量占用 IO 资源。

如果让一个线程串行的执行这三件事,前面两件事,CPU 会很快做完,势必最后会等在 IO 操作上,这个线程就被操作系统挂起了,在那里空等,直到 IO 操作完成。

如何解决,只能增加工作线程数量,但是增加工作线程数量,会导致过多的线程调度和上下文切换,是另一种形式的 CPU 浪费。

如何解决,我们可以使用异步的方式。何谓异步,比如做饭过程就是异步,先把饭放电饭煲煮着,趁着这个时间去做菜,这就是“异步”。如果一直等到米饭煮熟,再去烧菜,这就是“同步”。

可能你就已经知道了,上次我们讲过 CompletableFuture,这是一个异步编程框架,可以将不同的线程编排起来。

并且可以指定线程池,让不同的事情,在不同的线程池里面完成。

看下面的代码:

 // 解码线程池
 Executor decoderExecutor = ExecutorHelper.createExecutor(2, "decoder");
 // 转换线程池
 Executor ectExecutor = ExecutorHelper.createExecutor(8, "ect");
 // 发送线程池
 Executor senderExecutor = ExecutorHelper.createExecutor(2, "sender");
 ​
 @Override
 protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req) throws Exception {
     CompletableFuture
  // 解码过程
             .supplyAsync(() -> this.decode(ctx, req), this.decoderExecutor)
  // 转换过程
             .thenApplyAsync(e -> this.doExtractCleanTransform(ctx, req, e), this.ectExecutor)
  // IO 过程
             .thenApplyAsync(e -> this.send(ctx, req, e), this.senderExecutor);
 }
 ​

其中 channelRead0 是 Netty 框架的请求方法,每次请求都会到这个方法中。每次请求进来时,三个动作分别异步执行,但是 CompletableFuture 框架会保证每个请求三个动作执行的先后顺序。

这样,CPU 资源和 IO 资源,就可以得到充分的利用了。

三、进阶版本中存在的问题

1、问题描述

上述的异步模型可以用下面的图来表示

image-20210303235215544

在上面的异步编程代码中,我们把不同类型的任务提交到不同的线程池中,而线程池是需要队列的,图上的队列就是线程池的队列。

其中,解码过程和转换过程,都是比较快速的过程,而发送的 I/O 过程则比较慢。

那么前面的消息会一直积压在发送过程的线程池队列中,等待执行。如果队列选择的是无界队列,那么越来越多的任务会积压,最终会用光所有的虚拟机所有的内存,导致 OOM。

2、如何控制事件的速度

我们可以直接想到,严格控制上游的发送速度,比如控制上游每秒钟只能发送 1000 条消息。这种方法虽然可行,但是非常不优雅。

如果下游遇到特殊原因,每秒只能处理 500 条,那仍然还是会 OOM,我们没法去估出一个合适的值的。

3、反向压力

有一种更加优雅的方案,叫做反向压力

上面我们的方案出问题,主要原因还是在于队列是无界的,消息一直在积压,并且是非阻塞的。

要实现反向压力,只需要从两个方面来控制:

  • 执行器的任务队列,它的容量必须是有限的;
  • 当执行器的任务队列满了的时候,就阻止上游继续提交任务,直到任务队列中有新的空间为止。
image-20210304000311010

如上图,可以看到,如果发送的线程池队列满了之后,就阻止上游的转换任务继续提交任务。过了一会,转换过程的队列也会满,同样的它也会阻止解码过程提交任务。

对于我们这种数据处理场景的话,可以通过横向增加服务器来解决 TPS 低的问题;如果是流式处理场景,那么最上游应该是主动从 Kafka 拉取消息,这个时候,它就放缓自己拉取消息的速度,从而做到流量控制。

当一段时间后,发送线程池队列有空闲了,又会继续处理消息。

4、实现反压

如何来实现反压?其实很简单,当队列满了之后,会进入线程池的拒绝策略中,在拒绝策略中,我们使用 while 循环来重复提交任务,直到任务提交成功,看下面的代码:

 private final List<ExecutorService> executors;
 private final Partitioner partitioner;
 private Long rejectSleepMills = 1L;
 ​
 public BackPressureExecutor(String name, int executorNumber, int coreSize, int maxSize, int capacity, long rejectSleepMills) {
     this.rejectSleepMills = rejectSleepMills;
     this.executors = new ArrayList<>(executorNumber);
     for (int i = 0; i < executorNumber; i++) {
         ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(capacity);
         this.executors.add(new ThreadPoolExecutor(
                 coreSize, maxSize, 0L, TimeUnit.MILLISECONDS,
                 queue,
                 new ThreadFactoryBuilder().setNameFormat(name + "-" + i + "-%d").build(),
                 new ThreadPoolExecutor.AbortPolicy()));
     }
     this.partitioner = new RoundRobinPartitionSelector(executorNumber);
 }
 ​
 @Override
 public void execute(Runnable command) {
     boolean rejected;
     do {
         try {
            rejected = false;
            executors.get(partitioner.getPartition()).execute(command);
         } catch (RejectedExecutionException e) {
             rejected = true;
             try {
                 TimeUnit.MILLISECONDS.sleep(rejectSleepMills);
             } catch (InterruptedException e1) {
                 logger.warn("Reject sleep has been interrupted.", e1);
             }
         }
     } while (rejected);
 }
 ​

可以看到上面的代码,关键点有两个:

第一个是,在创建 ThreadPoolExecutor 对象时,采用 ArrayBlockingQueue。这是一个容量有限的阻塞队列。因此,当任务队列已经满了时,就会停止继续往队列里添加新的任务,从而避免内存无限大,造成 OOM 问题。

第二个是,将 ThreadPoolExecutor 拒绝任务时,采用的策略设置为 AbortPolicy。这就意味着,在任务队列已经满了的时候,如果再向任务队列提交任务,就会抛出 RejectedExecutionException 异常。之后,我们再通过一个 while 循环,在循环体内,捕获 RejectedExecutionException 异常,并不断尝试,重新提交任务,直到成功为止。

这样,经过上面的改造,当下游的步骤执行较慢时,它的任务队列就会占满。这个时候,如果上游继续往下游提交任务,它就会不停重试。这样,自然而然地降低了上游步骤的处理速度,从而起到了流量控制的作用。

四、这不就是一个实时流模型!

上面的那个图,是不是似曾相识?没错,它就是实时流模型啊。

并且反压,已经成为流计算领域的共识,并且已经形成了反向压力相关的标准。

Flink 中是通过 Netty 的网络模型的阻塞来把压力一层层往上游传递的,和我们实现的这个有异曲同工之妙。

image-20210304001528949

在如今的异步编程模型中,无处不存在着队列的影子,甚至在操作系统底层,也会使用队列来对性能做极致的优化,比如大名鼎鼎的 epoll。

而“队列”正是流计算系统最重要的组成结构。有“队列”的系统,它注定会是一个异步执行的过程,这也意味着“流”这种模式注定了是“异步”的。

五、总结

如今的分布式系统,都是几百甚至上千机器在一起协同工作,那不同的机器的不同进程一定会通信。

像 Spark,使用了 Netty 作为通信框架,Flink 也有在使用 Netty (还有Akka)作为通信框架。而要去了解一个分布式框架,第一步就是要了解它的通信框架,不然进程和进程的通信部分就没法看懂,整个框架核心逻辑也就无法透彻看懂。

而且在分布式系统中,异步编程的代码也是非常多,像 Flink 提交任务的过程,就使用了 CompletableFuture 异步编程框架来提交任务。

所以理解 NIO 和异步编程是通往大神的必要条件,深刻的理解他们一定会对你以后的工作,有所帮助!

原创文章,作者:kk,如若转载,请注明出处:http://www.wangkai123.com/78/

(0)
上一篇 2022-06-19 21:58
下一篇 2022-06-20 21:51

相关推荐

  • Flink 源码阅读准备之 – Java8 异步编程

    阅读 Flink 源码前必会的知识 Java8 异步编程 本文大纲速看 一、异步编程 通常来说,程序都是顺序执行,同一时刻只会发生一件事情。如果一个函数依赖于另一个函数的结果,它只…

    Flink 2022-06-19
  • Flink 源码阅读准备之 – SPI 和 ClassLoader

    一、本文大纲 二、ClassLoader 类加载器 1、Java 中的类加载器以及双亲委派机制 Java 中的类加载器,是 Java 运行时环境的一部分,负责动态加载 Java 类…

    Flink 2022-06-19
  • Redis 原生支持 JSON 了

    号外 喜大普奔,Redis 官方支持 JSON 操作了,当我看到这个消息时,我的脑海中立马闪过这些操作: 把一个对象序列化成 json 字符串; 塞到 redis 的一个键上; 用…

    技术 2022-06-18
  • Flink-Clients 模块源码阅读

    本文大纲 一、Flink 官方文档这么全面,为什么还要读 Flink 源码 读文档和读源码的目的是不一样的,就拿 Apache Flink 这个项目来说,如果你想知道 Flink …

    Flink 2022-06-19
  • Flink metrics 介绍和应用

    一、Flink metrics简介 Flink 的 metrics 是 Flink 公开的一个度量系统,metrics 也可以暴露给外部系统,通过在 Flink 配置文件 conf…

    Flink 2022-06-19
  • linux 的输入(出)重定向和管道符

    一、输入输出重定向 标准输入重定向(STDIN,文件描述符为0):默认从键盘输入,也可从其他文件或者命令中输入 标准输出重定向(STDOUT,文件描述符为1):默认输出到屏幕 错误…

    Linux 2022-07-03
  • vim 的使用

    一、命令模式中最常用的一些命令 dd 删除(剪切)光标所在行 5dd 删除(剪切)从光标处开始的5行 yy 复制光标所在的整行 5yy 复制从光标处开始的5行 n 显示搜索命令定位…

    Linux 2022-07-03
  • Flink 异步 I/O 解析

    一、简介 我们知道 flink 对于外部数据源的操作可以通过自带的连接器,或者自定义 sink 和 source 实现数据的交互。 那么为啥还需要异步 IO 呢?那时因为对于实时处…

    Flink 2022-06-19
  • Hive执行流程源码解析

    最近在出差,客户现场的 HiveServer 在很长时间内不可用,查看 CM 的监控发现,HiveServer 的内存在某一时刻暴涨,同时 JVM 开始 GC,每次 GC 长达 1…

    技术 2022-06-18
  • Flink 用户代码如何生成 StreamGraph(下)

    九、一小段源码 上次说到了所有的算子都会转化成 transformation ,并放到一个 List 列表中,那么今天我们开始遍历这个列表,来生成 StreamGraph。 打开这…

    Flink 2022-06-19

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注