[离线计算-Spark|Hive] 数据近实时同步数仓方案设计

[离线计算-Spark|Hive] 数据近实时同步数仓方案设计本文主要针对hudi进行调研, 设计MySQL CDC 近实时同步至数仓中方案, 写入主要利用hudi的upsert以及delete能力. 针对hudi 表的查询,引入kyuubi 框架,除 了增强平

[离线计算-Spark|Hive] 数据近实时同步数仓方案设计

背景

最近阅读了大量关于hudi相关文章, 下面结合对Hudi的调研, 设计一套技术方案用于支持 MySQL数据CDC同步至数仓中,避免繁琐的ETL流程,借助Hudi的upsert, delete 能力,来缩短数据的交付时间.

组件版本:

  • Hadoop 2.6.0
  • Hive 1.1.0
  • hudi 0.7.0
  • spark 2.4.6

架构设计

7NLAZ8.png

  1. 使用canal(阿里巴巴MySQL Binlog增量订阅&消费组件)dump mysql binlog 数据
  2. 采集后将binlog 数据采集到kafka中, 按照库名创建topic, 并按照表名将数据写入topic 固定分区
  3. spark 消费数据将数据生成DF
  4. 将DF数据写入hudi表
  5. 同步hudi元数据到hive中

写入主要分成两部分全量数据和增量数据:

  • 历史数据通过bulkinsert 方式 同步写入hudi

  • 增量数据直接消费写入使用hudi的upsert能力,完成数据合并

写入hudi在hdfs的格式如下:

7aCdJ0.png

hudi

hudi 如何处理binlog upsert,delete 事件进行数据的合并?

upsert好理解, 依赖本身的能力.

针对mysql binlog的delete 事件,使用记录级别删除:

  1. 需要在数据中添加 “_HOODIE_IS_DELETED” 且值为true的列

  2. 需要在dataFrame中添加此列,如果此值为false或者不存在则当作常规写入记录

如果此值为true则为删除记录

示例代码如下:

StructField(_HOODIE_IS_DELETED, DataTypes.BooleanType, true, Metadata.empty());

dataFrame.write.format("org.apache.hudi")
               .option("hoodie.table.name", "test123")
               .option("hoodie.datasource.write.operation", "upsert")
               .option("hoodie.datasource.write.recordkey.field", "uuid")
               .option("hoodie.datasource.write.partitionpath.field", "partitionpath")
               .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE")
               .option("hoodie.datasource.write.precombine.field", "ts")
               .mode(Append)
               .save(basePath)

写入hudi及同步数据至hive,需要注意的事情和如何处理?

  1. 声明为hudi表的path路径, 非分区表 使用tablename/, 分区表根据分区路径层次定义/个数

  2. 在创建表时需添加 TBLPROPERTIES “spark.sql.sources.provider”=”hudi” 声明为datasource为hudi类型的表

hudi如何处理新增字段?

当使用Spark查询Hudi数据集时,当数据的schema新增时,会获取单个分区的parquet文件来推导出schema,若变更schema后未更新该分区数据,那么新增的列是不会显示,否则会显示该新增的列;若未更新该分区的记录时,那么新增的列也不会显示,可通过 mergeSchema来控制合并不同分区下parquet文件的schema,从而可达到显示新增列的目的

hudi 写入时指定mergeSchema参数 为true

spark如何实现hudi表数据的写入和读取?

Spark支持用户自定义的format来读取或写入文件,只需要实现对应的(RelationProvider、SchemaRelationProvider)等接口即可。而Hudi也自定义实现了 org.apache.hudi/ hudi来实现Spark对Hudi数据集的读写,Hudi中最重要的一个相关类为 DefaultSource,其实现了 CreatableRelationProvider#createRelation接口,并实现了读写逻辑

kyuubi

如何读取hudi表数据?

使用网易开源的kyuubi

kyuubi架构图:

7atsdH.png

支持HiveServer2 Thrift API协议,可以通过beeline 连接

hive: beeline -u jdbc:hive2://ip:10000 -n userName -p 

kyuubi: beeline -u jdbc:hive2://ip:8333 -n userName -p 

hudi 元数据使用hive metastore

spark来识别加载hudi表

实现hudi表与hive表关联查询

kyuubi 支持SparkContext的动态缓存,让用户不需要每次查询都动态创建SparkContext。作为一个应用在yarn 上一直运行,终止beeline 连接后,应用仍在运行,下次登录,使用SQL可以直接查询

总结

本文主要针对hudi进行调研, 设计MySQL CDC 近实时同步至数仓中方案, 写入主要利用hudi的upsert以及delete能力. 针对hudi 表的查询,引入kyuubi 框架,除 了增强平台 spark sql作为即席查询服务的能力外,同时支持查询hudi表,并可以实现hudi表与hive表的联合查询, 同时对原有hive相关服务没有太大影响.

参考

  1. https://blog.csdn.net/weixin_38166318/article/details/111825032
  2. https://blog.csdn.net/qq_37933018/article/details/120864648
  3. https://cxymm.net/article/qq_37933018/120864648
  4. https://www.jianshu.com/p/a271524adcc3
  5. https://jishuin.proginn.com/p/763bfbd65b70
本文作者: chaplinthink, 关注领域:大数据、基础架构、系统设计, 一个热爱学习、分享的大数据工程师

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/5542.html

(0)
上一篇 2023-05-05
下一篇 2023-05-05

相关推荐

  • 优化字符串处理:Python split使用实例

    优化字符串处理:Python split使用实例在Python中,字符串和列表是两种常用的数据类型,字符串是由字符组成的序列,列表是由元素组成的序列。字符串处理中常用的操作之一是将字符串按照某个分隔符切割成一个列表,Python中提供了一个split()方法来实现这个操作。

    2024-01-30
    116
  • 在 SQL Server 中使用 Try Catch 处理异常「建议收藏」

    在 SQL Server 中使用 Try Catch 处理异常「建议收藏」如何在 SQL Server 中使用 Try Catch 处理错误? 从 SQL Server 2005 开始,我们在TRY 和 CATCH块的帮助下提供了结构错误处理机制。使用TRY-CATCH的语

    2023-05-26
    158
  • 加速网站响应:使用Python 21线程实现高效并发处理

    加速网站响应:使用Python 21线程实现高效并发处理众所周知,Python作为一种高级语言,广受开发者的喜爱。在Web开发中,Python得到了广泛应用,但是在Web应用中,部分页面响应过慢、处理效率低等问题时有发生。那么该怎样解决这些问题呢?这时候Python的多线程和并发处理就可以派上用场了!

    2023-12-12
    116
  • Centos7安装Mongodb4

    Centos7安装Mongodb41、下载源码包 curl -O https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-rhel70-4.2.1.tgz 2、解压 放到 /usr…

    2022-12-19
    178
  • mysql中间件proxysql实现mysql读写分离「建议收藏」

    mysql中间件proxysql实现mysql读写分离「建议收藏」mysql中间件proxysql实现mysql读写分离 1. mysql实现读写分离的方式 mysql 实现读写分离的有以下几种: 程序修改mysql操作,直接和数据库通信,简单快捷的读写分离和随机…

    2023-04-04
    149
  • python之包(Python 常用包)

    python之包(Python 常用包)   Python之所以受欢迎不光是因为它简单易学,更重要的是它有成千上万的宝藏库。这些库相当于是已经集成好的工具,只要安装就能在Python里使用。它们可以处理各式各样的问题,无需你再造轮子,而且随着社区的不断更新维护,有些库越来越强大,几乎能媲美企业级应用。那么这些工具库怎么下载安装呢?它们被放在一个统一的“仓库”里,名叫PyPi(Python Package Index),所有的库安装都是从这里调度。有了仓库之后,还需要有管理员,pip就是这样一个角色。

    2023-10-29
    122
  • 基于.net ,使用几种常见的NoSQL数据库

    基于.net ,使用几种常见的NoSQL数据库[toc] "示例完整源码地址" 2020年1月10日 10:10:10 shanzm 0 .net中的缓存对象 MemoryCache对象 HttpContext.Cache (

    2023-01-22
    155
  • mysql中的事务隔离级别及可重复读读提交详细分析(mvcc多版本控制/undo log)[通俗易懂]

    mysql中的事务隔离级别及可重复读读提交详细分析(mvcc多版本控制/undo log)[通俗易懂]一.事物隔离级别 读未提交(read uncommitted)是指,一个事务还没提交时,它做的变更就能被别的事务看到.通俗理解,别人改数据的事务尚未提交,我在我的事务中也能读到。 读提交(read c

    2023-02-03
    157

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注