Flink 流式聚合性能调优指南[亲测有效]

Flink 流式聚合性能调优指南[亲测有效]原文:Flink 流式聚合性能调优指南 SQL 是数据分析中使用最广泛的语言。Flink Table API 和 SQL 使用户能够以更少的时间和精力定义高效的流分析应用程序。此外,Flink Tab

Flink 流式聚合性能调优指南

原文:Flink 流式聚合性能调优指南

SQL 是数据分析中使用最广泛的语言。Flink Table API 和 SQL 使用户能够以更少的时间和精力定义高效的流分析应用程序。此外,Flink Table API 和 SQL 是高效优化过的,它集成了许多查询优化和算子优化。但并不是所有的优化都是默认开启的,因此对于某些工作负载,可以通过打开某些选项来提高性能。

这里将介绍一些实用的优化选项以及流式聚合的内部原理,它们在某些情况下能带来很大的提升。

注意:(1)目前,这里提到的优化选项仅支持 Blink planner。(2)目前,流聚合优化仅支持无界聚合,窗口聚合优化将在未来支持。

默认情况下,无界聚合算子是逐条处理输入的记录,即:(1)从状态中读取累加器,(2)累加/撤回记录至累加器,(3)将累加器写回状态,(4)下一条记录将再次从(1)开始处理。这种处理模式可能会增加 StateBackend 开销(尤其是对于 RocksDB StateBackend )。此外,生产中非常常见的数据倾斜会使这个问题恶化,并且容易导致 job 发生反压。

MiniBatch 聚合
MiniBatch 聚合的核心思想是将一组输入的数据缓存在聚合算子内部的缓冲区中。当输入的数据被触发处理时,每个 key 只需一个操作即可访问状态。这样可以大大减少状态开销并获得更好的吞吐量。但是,这可能会增加一些延迟,因为它会缓冲一些记录而不是立即处理它们。这是吞吐量和延迟之间的权衡。

下图说明了 mini-batch 聚合如何减少状态操作。

Flink 流式聚合性能调优指南
默认情况下 mini-batch 优化是被禁用的。开启这项优化,需要设置选项
table.exec.mini-batch.enabled、
table.exec.mini-batch.allow-latency 和
table.exec.mini-batch.size。

下面的例子显示如何启用这些选项。

// instantiate table environment
TableEnvironment tEnv = …

// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString(“table.exec.mini-batch.enabled”, “true”); // enable mini-batch optimization
configuration.setString(“table.exec.mini-batch.allow-latency”, “5 s”); // use 5 seconds to buffer input records
configuration.setString(“table.exec.mini-batch.size”, “5000”); // the maximum number of records can be buffered by each aggregate operator task
Local-Global 聚合
Local-Global 聚合是为解决数据倾斜问题提出的,通过将一组聚合分为两个阶段,首先在上游进行本地聚合,然后在下游进行全局聚合,类似于 MapReduce 中的 Combine + Reduce 模式。例如,就以下 SQL 而言:

SELECT color, sum(id)
FROM T
GROUP BY color
数据流中的记录可能会倾斜,因此某些聚合算子的实例必须比其他实例处理更多的记录,这会产生热点问题。本地聚合可以将一定数量具有相同 key 的输入数据累加到单个累加器中。全局聚合将仅接收 reduce 后的累加器,而不是大量的原始输入数据。这可以大大减少网络 shuffle 和状态访问的成本。每次本地聚合累积的输入数据量基于 mini-batch 间隔。这意味着 local-global 聚合依赖于启用了 mini-batch 优化。

下图显示了 local-global 聚合如何提高性能。

Flink 流式聚合性能调优指南
下面的例子显示如何启用 local-global 聚合。

// instantiate table environment
TableEnvironment tEnv = …

// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString(“table.exec.mini-batch.enabled”, “true”); // local-global aggregation depends on mini-batch is enabled
configuration.setString(“table.exec.mini-batch.allow-latency”, “5 s”);
configuration.setString(“table.exec.mini-batch.size”, “5000”);
configuration.setString(“table.optimizer.agg-phase-strategy”, “TWO_PHASE”); // enable two-phase, i.e. local-global aggregation
拆分 distinct 聚合
Local-Global 优化可有效消除常规聚合的数据倾斜,例如 SUM、COUNT、MAX、MIN、AVG。但是在处理 distinct 聚合时,其性能并不令人满意。

例如,如果我们要分析今天有多少唯一用户登录。我们可能有以下查询:

SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day
如果 distinct key (即 user_id)的值分布稀疏,则 COUNT DISTINCT 不适合减少数据。即使启用了 local-global 优化也没有太大帮助。因为累加器仍然包含几乎所有原始记录,并且全局聚合将成为瓶颈(大多数繁重的累加器由一个任务处理,即同一天)。

这个优化的想法是将不同的聚合(例如 COUNT(DISTINCT col))分为两个级别。第一次聚合由 group key 和额外的 bucket key 进行 shuffle。bucket key 是使用 HASH_CODE(distinct_key) % BUCKET_NUM 计算的。BUCKET_NUM 默认为1024,可以通过
table.optimizer.distinct-agg.split.bucket-num 选项进行配置。第二次聚合是由原始 group key 进行 shuffle,并使用 SUM 聚合来自不同 buckets 的 COUNT DISTINCT 值。由于相同的 distinct key 将仅在同一 bucket 中计算,因此转换是等效的。bucket key 充当附加 group key 的角色,以分担 group key 中热点的负担。bucket key 使 job 具有可伸缩性来解决不同聚合中的数据倾斜/热点。

拆分 distinct 聚合后,以上查询将被自动改写为以下查询:

SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day
下图显示了拆分 distinct 聚合如何提高性能(假设颜色表示 days,字母表示 user_id)。

Flink 流式聚合性能调优指南
注意:上面是可以从这个优化中受益的最简单的示例。除此之外,Flink 还支持拆分更复杂的聚合查询,例如,多个具有不同 distinct key (例如 COUNT(DISTINCT a), SUM(DISTINCT b) )的 distinct 聚合,可以与其他非 distinct 聚合(例如 SUM、MAX、MIN、COUNT )一起使用。

注意 当前,拆分优化不支持包含用户定义的 AggregateFunction 聚合。

下面的例子显示了如何启用拆分 distinct 聚合优化。

// instantiate table environment
TableEnvironment tEnv = …

tEnv.getConfig() // access high-level configuration
.getConfiguration() // set low-level key-value options
.setString(“table.optimizer.distinct-agg.split.enabled”, “true”); // enable distinct agg split
在 distinct 聚合上使用 FILTER 修饰符
在某些情况下,用户可能需要从不同维度计算 UV(独立访客)的数量,例如来自 Android 的 UV、iPhone 的 UV、Web 的 UV 和总 UV。很多人会选择 CASE WHEN,例如:

SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT CASE WHEN flag IN (“android”, “iphone”) THEN user_id ELSE NULL END) AS app_uv,
COUNT(DISTINCT CASE WHEN flag IN (“wap”, “other”) THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day
但是,在这种情况下,建议使用 FILTER 语法而不是 CASE WHEN。因为 FILTER 更符合 SQL 标准,并且能获得更多的性能提升。FILTER 是用于聚合函数的修饰符,用于限制聚合中使用的值。将上面的示例替换为 FILTER 修饰符,如下所示:

SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN (“android”, “iphone”)) AS app_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN (“wap”, “other”)) AS web_uv
FROM T
GROUP BY day
Flink SQL 优化器可以识别相同的 distinct key 上的不同过滤器参数。例如,在上面的示例中,三个 COUNT DISTINCT 都在 user_id 一列上。Flink 可以只使用一个共享状态实例,而不是三个状态实例,以减少状态访问和状态大小。在某些工作负载下,可以获得显著的性能提升。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/8212.html

(0)
上一篇 2023-03-05
下一篇 2023-03-05

相关推荐

  • Sqlite—删除语句(Delete)[通俗易懂]

    Sqlite—删除语句(Delete)[通俗易懂]SQLite 的 DELETE 语句用于删除表中已有的记录。可以使用带有 WHERE 子句的 DELETE 查询来删除选定行,否则所有的记录都会被删除。 SQLite 要清空表记录,只能使用Delet

    2022-12-25
    142
  • Mysql8.0修改lower_case_table_names参数导致重启失败[通俗易懂]

    Mysql8.0修改lower_case_table_names参数导致重启失败[通俗易懂]GreatSQL社区原创内容未经授权不得随意使用,转载请联系小编并注明来源。 GreatSQL是MySQL的国产分支版本,使用上与MySQL一致。 事件起因:在测试一个数据迁移工具时,源端oracle

    2023-06-02
    139
  • 使用JavaScript截取字符串最后一个字符

    使用JavaScript截取字符串最后一个字符在JavaScript中,字符串是一种重要的数据类型,而对于字符串的操作是开发中绕不开的一个主题。其中截取字符串的操作其实是最为常见和基础的操作之一。在截取字符串的方式中,截取最后一个字符的方式也是非常常见的一种。本文将结合实际的应用场景和示例详细介绍如何使用JavaScript截取字符串中的最后一个字符。

    2024-06-11
    59
  • MySQL家族”新”成员——MySQL Shell[通俗易懂]

    MySQL家族”新”成员——MySQL Shell[通俗易懂]本文转载自“MySQL解决方案工程师”公众号,由徐轶韬翻译 标题虽然叫做MySQL家族新成员,但如果从发布时间上来看,MySQL Shell已经不能算做新成员了,它的正式版与MySQL8.0同一天诞…

    2023-01-27
    148
  • Mysql基础02-约束「建议收藏」

    Mysql基础02-约束「建议收藏」约束与索引 概念 1、数据完整性(Data Integrity)是指数据的精确性(Accuracy)和可靠性(Reliability)。 实体完整性(Entity Integrity):例如,同一个表

    2022-12-26
    170
  • 双主master-master复制Err 1677故障分析

    双主master-master复制Err 1677故障分析2020-03-29 20:00:27 一、报错信息 近期项目实施同事对系统升级,对test.test_tab_t1的某个字段进行变更,SQL语句如下: ALTER TABLE TEST.TEST_…

    2023-02-20
    147
  • 利用Python Dictionary实现数据快速查询

    利用Python Dictionary实现数据快速查询Python中的字典是一种无序的键值对数据类型,其底层实现采用了哈希表的数据结构,使得字典可以实现快速的数据查找。使用字典可以方便地通过键来获取对应的值,同时也可以添加、修改、删除键值对。以下是一个简单的字典示例:

    2024-01-10
    108
  • mac的数据库工具软件_数据库for语句使用

    mac的数据库工具软件_数据库for语句使用DBeaverEE for Mac是一款运行在MacOS上通用的数据库管理工具。易用性是DBeaverEE的主要目标,支持 MySQL, PostgreSQL, Oracle等常用数据库。操作简单,功

    2023-05-26
    148

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注