Flink 作业提交流程「建议收藏」

Flink 作业提交流程「建议收藏」大家好,我是小寒~ 今天给大家带来一篇 flink 作业提交相关的文章。 我们都知道,在开发完一个 flink 应用程序后,打包成 jar 包,然后通过 FLink CLI 或者 Web UI 提交作

Flink 作业提交流程

大家好,我是小寒~

今天给大家带来一篇 flink 作业提交相关的文章。

我们都知道,在开发完一个 flink 应用程序后,打包成 jar 包,然后通过 FLink CLI 或者 Web UI 提交作业到 FLink 集群。其实,Flink 的 jar 文件并不是 FLink 集群的可执行文件,需要经过转换之后提交给集群。其转换过程分为两个大的步骤。

  1. 在 FLink Client 中通过反射启动 Jar 中的 main 函数,生成 Flink StreamGraph、JobGraph,将 JobGraph 提交给 Flink 集群。
  2. FLink 集群收到 JobGraph 之后,将 JobGraph 翻译成 ExecutionGraph,然后开始调度执行,启动成功之后开始消费数据。

总的来说,对用户API的调用,可以转换为 StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行拓扑(Task DAG)

提交流程

FLink 作业在开发完毕之后,需要提交到 FLink 集群执行。ClientFrontend 是入口,触发用户开发的 Flink 应用 jar 文件中的 main 方法,然后交给 PipelineExecutor#execue 方法,最终会触发一个具体的 PipelineExecutor 执行,如下图所示。

Flink 作业提交流程「建议收藏」

作业执行可以选择 Session 和 Per-Job 模式两种集群。

  • Session 模式的集群,一个集群中运行多个作业。
  • Per-Job 模式的集群,一个集群中只运行一个作业,作业执行完毕则集群销毁。

Flink 作业提交流程「建议收藏」

流水线执行器 PipelineExecutor

流水线执行器在 FLink 中叫作 PipelineExecutor,是 FLink Client 生成 JobGraph 之后,将作业提交给集群的重要环节。

集群有 Session 和 Per-Job 两种模式。在这两种模式下,集群的启动时机、提交作业的方式不同,所以在生产环境中有两种 PipelineExecutor。Session 模式对应于 AbstractSessionClusterExecutor,Per-Job 模式对应于 AbstractJobClusterExecutor。

  1. Session 模式

该模式下,作业共享集群资源,作业通过 Http 协议进行提交。

在 Flink 1.10 版本中提供了三种会话模式:Yarn 会话模式、K8s 会话模式、Standalone。Standalone 模式比较特别,Flink 安装在物理机上,不能像在资源集群上一样,可以随时启动一个新集群,所有的作业共享 Standalone 集群。

在 Session 模式下, Yarn 作业提交使用 yarn-session.sh 脚本, K8s 作业提交使用 kubernetes-session.sh 脚本。两者的具体实现不同 ,但逻辑是类似的 ,在启动脚本的时候就会检查是否存在已经启动好的 Flink Session 模式集群,如果没有,则启动一个 Flink Session 模式集群,然后在 PipelineExecutor 中,通过 Dispatcher 提供的 Rest 接口提交 JobGraph,Dispatcher 为每一个作业启动一个 JobMaster,然后进入作业执行阶段。

  1. Per-Job 模式

该模式下,一个作业一个集群,作业之间相互隔离。

在 FLink 1.10 版本中,只有 Yarn 上实现了 Per-Job 模式。

Per-Job 模式下,因为不需要共享集群,所以在 PipelineExecutor 中执行作业提交的时候,可以创建集群并将 JobGraph 以及所需要的文件等一同交给 Yarn 集群,Yarn 集群在容器中启动 JobManager 进程,进行一系列的初始化动作,初始化完毕之后,从文件系统中获取 JobGraph ,交给 Dispatcher。 之后的执行流程与 Session 模式下的执行流程相同。

yarn session 的提交流程

从总体上来说,在 Yarn 集群上使用 Session 模式提交 Flink 作业的过程分为 3 个阶段。首先在 Yarn 上启动 Flink Session 模式的集群;其次通过 Flink Client 提交作业 ,最后进行作业的调度执行。

Flink 作业提交流程「建议收藏」

  1. 启动集群

(1) 使用 yarn-session.sh 提交会话模式的作业

如果提交到已经存在的集群, 则获取 Yarn 集群信息、应用 ID,并准备提交作业。

如果是启动新的 Yarn Session 集群,则进入到步骤 (2)。

(2)Yarn 启动新的 Flink 集群

如果没有集群,则创建一个新的 Session 模式的集群。首先,将应用的配置文件(flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink jar、用户 jar 文件、JobGraph 对象等)上传至分布式存储(如 HDFS)的应用暂存目录。

然后通过 Yarn Client 向 Yarn 提交 Flink 创建集群的申请,Yarn 分配资源,在申请的 Yarn Container 中初始化并启动 FLink JobManager 进程,在 JobManager 进程中运行 YarnSessionClusterEntrypoint 作为集群启动的入口(不同的集群部署模式有不同的 ClusterEntrypoint 的实现),初始化 Dispatcher、ResourceManager。启动相关的 RPC 服务,等待 Client 通过 Rest 接口提交作业。

2、作业提交

Yarn 集群准备好后,开始作业提交。

(1)Flink Client 通过 Rest 向 Dispatcher 提交 JobGraph。

(2)Dispatcher 是 Rest 接口,不负责实际的调度、执行方面的工作,当收到 JobGraph 后,为作业创建一个 JobMaster,将工作交给 JobMaster(负责作业调度、管理作业和 Task 的生命周期 ),构建 ExecutionGraph(Job Graph的并行化版本)

  1. 作业调度执行

(1)JobMaster 向 YarnResourceManager 申请资源,开始调度 ExecutionGraph 的执行;初次提交作业,集群尚没有 TaskManager,此时资源不足,开始申请资源。

(2)YarnResourceManager 收到 JobMaster 的资源请求,如果当前有空闲的 Slot,则将 Slot 分配给 JobMaster,否则 YarnResourceManager 将向 Yarn Master(Yarn 集群的 ResourceManager) 请求创建 TaskManager。

(3)YarnResourceManager 将资源请求加入等待请求队列,并通过心跳向 YARN RM 申请新的 Container 资源来启动 TaskManager 进程;Yarn 分配新的 Container 给 TaskManager。

(4)YarnResourceManager 从 HDFS 加载 Jar 文件等所需的相关资源,在容器中启动 TaskManager。

(5)TaskManager 启动之后,向 YarnResourceManager 进行注册,并把自己的 Slot 资源情况汇报给 YarnResourceManager 。

(6)YarnResourceManager 从等待队列中取出 Slot 请求,向 TaskManager 确认资源可用情况,并告知 TaskManager 将 Slot 分配给了哪个 JobMaster。

(7)TaskManager 向 JobMaster 提供 Slot,JobMaster 调度 Task 到 TaskManager 的此 Slot 上执行。

至此,作业进入执行阶段。

Yarn Per-Job 提交流程

Yarn Per-Job 模式提交作业与 Yarn-Session 模式提交作业基本类似。Per-Job 模式下,JobGraph 和集群资源请求一起提交给 Yarn。

Flink 作业提交流程「建议收藏」

  1. 启动集群

    (1)使用 flink run -m yarn-cluster 提交 Per-Job 模式的作业。

    (2)Yarn 启动 Flink 集群。该模式下 Flink 集群的启动入口是 YarnJobClusterEntryPoint,其它与 Yarn-Session 模式启动类似。

  2. 作业提交

    该步骤与 Session 模式下的不同之处在于,Client 并不会通过 Rest 向 Dispacher 提交 JobGraph,由 Dispacher 从本地文件系统获取 JobGraph,其后的步骤与 Session 模式一样。

  3. 作业调度执行

    与 Yarn-Session 模式下一致。

流处理的转换过程

StreamGraph

使用 DataStream API 开发的应用程序,首先被转换为 Transformation,然后被映射为 StreamGraph。

我们以熟知的 WordCount 程序为例,它的 StreamGraph 如下图所示。

Flink 作业提交流程「建议收藏」

从图中我们可以看到,StreamGraph 是由 StreamNode 和 StreamEdge 构成。

  • StreamNode

    StreamNode 是 StreamGraph 中的节点,从 Transformation 转换而来,可以简单理解为一个 StreamNode 表示为一个算子;从逻辑上来说,StreamNode 在 StreamGraph 中存在实体和虚拟的 StreamNode。StreamNode 可以有多个输入,也可以有多个输出。

    实体的 StreamNode 会最终变为物理的算子。虚拟的 StreamNode 会附着在 StreamEdge 上。

  • StreamEdge

    StreamEdge 是 StreamGraph 中的边, 用来连接两个 StreamNode,一个 StreamNode 可以有多个出边、入边。 StreamEdge 中包含了盘路输出、分区器、字段筛选输出等的信息。

作业图

JobGraph 可以由流计算的 StreamGraph 转换而来。
流计算中,在 StreamGraph 的基础上进行了一些优化,如通过 OperatorChain 机制将算子合并起来,在执行时,调度在同一个 Task 线程上,避免数据的跨线程、跨网络的传递。

Flink 作业提交流程「建议收藏」

从 JobGraph 的图里可以看到,数据从上一个算子流到下一个算子的过程中,上游作为生产者提供了中间数据集(IntermediateDateSet),而下游作为消费者需要 JobEdge。JobEdge 是一个通信管道,连接了上游生产的中间数据集和 JobVertex 节点。

JobGraph 的核心对象是 JobVertex、JobEdge 和 IntermediateDateSet。

  • JobVertex

    经过算子融合优化后符合条件的多个 StreamNode 可能会融合在一起生成一个 JobVertex,即一个 JobVertex 包含一个或多个算子,JobVertex 的输入是 JobEdge,输出是 IntermediateDateSet。

  • JobEdge

    JobEdge 是 JobGraph 中连接 IntermediateDateSet 和 JobVertex 的边,表示 JobGraph 中的一个数据流转通道,其上游数据源是 IntermediateDateSet,下游消费者是 JobVertex ,即数据通过 JobEdge 由 IntermediateDateSet 传递给目标 JobVertex 。

    JobEdge 中的数据分发模式会直接影响执行时 Task 之间的数据连接关系,是点对点连接还是全连接。

  • IntermediateDateSet

    中间数据集 IntermediateDataSet 是一种逻辑结构,用来表示 JobVertex 的输出,即该 JobVertex 中包含的算子会产生的数据集。不同的执行模式下,其对应的结果分区类型不同,决定了在执行时刻数据交换的模式。

    IntermediateDataSet 的个数与该 JobVertex 对应的 StreamNode 的出边数量相同,可以是一个或者多个。

执行图

ExecutionGraph 是调度 Flink 作业执行的核心数据结构,包含了作业中所有并行执行的 Task 的信息、Task 之间的关联关系、数据流转关系等。

StreamGraph、JobGraph 在 Flink 客户端中生成,然后提交给 Flink 集群。JobGraph 到 ExecutionGraph 的转换在 JobMaster 中完成。在转化过程中,有如下重要变化。

  • 加入了并行度的概念,成为真正可调度的图结构。
  • 生成了与 JobVertex 对应的 ExecutionJobVertex 和 ExecutionVertex,与IntermediateDataSet 对应的 IntermediateResult 和 IntermediateResultPartition 等。

生成的图如下图所示。

Flink 作业提交流程「建议收藏」

ExecutionGraph 的核心对象有 ExecutionJobVertex 、ExecutionVertex、IntermediateResult 、IntermediateResultPartition、ExecutionEdge 和 Execution。

  • ExecutionJobVertex

    该对象和 JobGraph 中的 JobVertex 一一对应。该对象还包含一组 ExecutionVertex,数量与该 JobVertex 中所包含的 StreamNode 的并行度一致,假设 StreamNode 的并行度为3,那么 ExecutionJobVertex 也会包含 3个 ExecutionVertex。

  • ExecutionVertex

    ExecutionJobVertex 中会对作业进行并行化处理,构造可以并行执行的实例,每一个并行执行的实例就是 ExecutionVertex。

    构造 ExecutionVertex 的同时,也会构建 ExecutionVertex 的输出 IntermediateResult。

  • IntermediateResult

    IntermediateResult 又叫中间结果集,该对象是个逻辑概念,表示 ExecutionJobVertex 的输出,和 JobVertex 中的 IntermediateDataSet 一一对应,同样,一个ExecutionJobVertex 可以有多个中间结果,取决于当前 JobVertex 有几个出边(JobEdge)

    一个中间结果包含多个中间结果分区 IntermediateResultPartition,其个数等于该 JobVertex 的并发度,或者叫作算子的并行度。

  • IntermediateResultPartition

    IntermediateResultPartition 又叫作中间结果分区,表示一个 ExecutionVertex 的输出结果,与 ExecutionEdge 相关联。

  • ExecutionEdge

    表示 ExecutionVertex 的输入,连接到上游产生的 IntermediateResultPartition 。

  • Execution

    ExecutionVertex 相当于每个 Task 的模板,在真正执行的时候,会将 ExecutionVertex 中的信息包装为一个 Execution,执行一个 ExecutionVertex 的一次尝试。JobManager 和 TaskManager 之间关于 Task 的部署和 Task 的执行状态的更新都是通过 ExecutionAttemptID 来标识实例的。在发生故障或者数据需要重算的情况下,ExecutionVertex 可能会有多个ExecutionAttemptID 。一个 Execution 通过 ExecutionAttemptID 来唯一标识。

总结

Flink 作业执行前需要提交 Flink 集群, Flink 集群可以与不同的资源框架(Yarn、K8s、Mesos 等)进行集成,可以按照不同的模式(Session 模式和 Per-Job模式)运行,所以在 Flink 作业提交过程中,可能在资源框架上启动Flink集群。Flink 就绪之后,进入作业提交阶段,在Flink客户端进行StreamGraph、JobGraph的转换,提交 JobGraph 到 Flink 集群,然后 Flink 集群负责将 JobGraph 转换为 ExecutionGraph,之后进入调度执行阶段。

原文地址:https://www.cnblogs.com/cxyxz/archive/2022/07/12/16469037.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/5041.html

(0)
上一篇 2023-05-25
下一篇 2023-05-25

相关推荐

  • 利用Python CFFI进行原子级别的C库调用

    利用Python CFFI进行原子级别的C库调用Python是一种高级语言,常用于快速开发、数据挖掘等领域,但有时候需要借助C库进行密集计算等操作。Python提供了很多种方式进行C库调用,例如ctypes、Swig等,但各种方式都存在一些问题。CFFI是Python官方推荐的C库调用方式,提供了原子级别的C库调用能力,一致性强,灵活性高,效率较高,已被广泛应用于NumPy、PyPy、Pillow等多个Python库。

    2024-06-30
    52
  • vertica 如何实现存储过程?「终于解决」

    vertica 如何实现存储过程?「终于解决」JAVA 等通用语言缺乏结构化计算类库,即使最简单的结构化算法,比如查询、排序、聚合,也要从零开始硬编码。对于很常用的算法,比如分组汇总、关联查询,则要编写大篇幅的代码。对于复杂些的算法,甚至要设计…

    2023-03-03
    158
  • 数据库中事务隔离分为4个级别_数据库隔离级别怎么实现的

    数据库中事务隔离分为4个级别_数据库隔离级别怎么实现的概述 不少人对于事务的使用局限于begin transaction:开始事务、commit transaction:提交事务、rollback transaction:回滚事务的初步运用。 并且知道使

    2023-04-16
    154
  • sql按任意时间段分组统计[通俗易懂]

    sql按任意时间段分组统计[通俗易懂]任意时间序列数据都可以按时间分组。 timestamp 为时间戳。 按每五分钟统计日志的数目 select floor(cast(logs.timestamp as int) / 60 / 5) a…

    2023-02-01
    158
  • Python Master Framers-构建高效Python应用程序的必备技能

    Python Master Framers-构建高效Python应用程序的必备技能Python是一门非常强大的开发语言,它是免费的、易于学习的、跨平台等等应用十分广泛。构建高效的Python应用程序,是每一个Python工程师必须了解的重要技能之一。下面,我们将从以下几个方面阐述重要的技能。

    2024-01-26
    102
  • 怎么做 HDFS 的原地平滑缩容?

    怎么做 HDFS 的原地平滑缩容?背景 当数据规模越来越大,存储成本也水涨船高。随着时间推移,数据热度分布往往呈 2⁄8 原则,即 80% 的访问集中在 20% 的数据上。对于那不经常访问的 80% 数据来说,使用多个 SSD 来存储

    2023-05-09
    157
  • MySQL日志突然暴涨[通俗易懂]

    MySQL日志突然暴涨[通俗易懂]1. 现象 今天协助其他同学排查问题的时候,发现数据库错误日志文件已经有9G以上了,打开内容查看如下: 2020-07-08 13:47:43 0x7fe3723ff700 INNODB MONITO

    2023-03-19
    163
  • Python替换字符串中的字符或子串功能

    Python替换字符串中的字符或子串功能在日常的编程工作中,替换字符串中的字符或子串是一个十分常见的操作。Python作为一门高效而简单的编程语言,拥有多种方法来实现该功能。本篇文章将从多个方面对这一主题进行详细的阐述。

    2024-02-20
    97

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注