[离线计算-Spark|Hive] HDFS小文件处理[亲测有效]

[离线计算-Spark|Hive] HDFS小文件处理[亲测有效]本文主要介绍小文件的处理方法思路,以及通过阅读源码和相关资料学习hudi 如何在写入时智能的处理小文件问题新思路.Hudi利用spark 自定义分区的机制优化记录分配到不同文件的能力,达到小文件的合并

[离线计算-Spark|Hive]  HDFS小文件处理

背景

HDFS 小文件过多会对hadoop 扩展性以及稳定性造成影响, 因为要在namenode 上存储维护大量元信息.

大量的小文件也会导致很差的查询分析性能,因为查询引擎执行查询时需要进行太多次文件的打开/读取/关闭.

小文件解决思路

通常能想到的方案就是通过Spark API 对文件目录下的小文件进行读取,然后通过Spark的算子repartition操作进行合并小文件,repartition 分区数通过输入文件的总大小和期望输出文件的大小通过预计算而得。

总体流程如下:

TLYig1.png

该方案适合针对已发现有小文件问题,然后对其进行处理. 下面介绍下hudi是如何实现在写入时实现对小文件的智能处理.

Hudi小文件处理

Hudi会自管理文件大小,避免向查询引擎暴露小文件,其中自动处理文件大小起很大作用

在进行insert/upsert操作时,Hudi可以将文件大小维护在一个指定文件大小

hudi 小文件处理流程:

7rIbi8.png

每次写入都会遵循此过程,以确保Hudi表中没有小文件。

核心代码:

写入文件分配:

org.apache.hudi.table.action.commit.UpsertPartitioner#assignInserts

 //获取分区路径
 Set<String> partitionPaths = profile.getPartitionPaths();

 //根据先前提交期间写入的记录获取平均记录大小。用于估计有多少记录打包到一个文件中。
 long averageRecordSize = averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),config);
 
 LOG.info("AvgRecordSize => " + averageRecordSize);

 //获取每个分区文件路径下小文件
 Map<String, List<SmallFile>> partitionSmallFilesMap =
        getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), jsc);


for (String partitionPath : partitionPaths) {
     ...
    
     List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
    //未分配的写入记录
    long totalUnassignedInserts = pStat.getNumInserts();  

    ...

    for (SmallFile smallFile : smallFiles) {
      //hoodie.parquet.max.file.size 数据文件最大大小,Hudi将试着维护文件大小到该指定值
      //算出数据文件大小 - 小文件 就是剩余可以写入文件大小, 除以平均记录大小就是插入的记录行数      
      long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize, totalUnassignedInserts);

        //分配记录到小文件中
        if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
            // create a new bucket or re-use an existing bucket
            int bucket;
            if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
              bucket = updateLocationToBucket.get(smallFile.location.getFileId());
              LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket);
            } else {
              bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
              LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
            }
            bucketNumbers.add(bucket);
            recordsPerBucket.add(recordsToAppend);
            //减去已经分配的记录数
            totalUnassignedInserts -= recordsToAppend;
          }  


        //如果记录没有分配完
        if (totalUnassignedInserts > 0) {
            //hoodie.copyonwrite.insert.split.size 每个分区条数
            long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();
            //是否自动计算每个分区条数
            if (config.shouldAutoTuneInsertSplits()) {
                insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize;
            }

           //计算要创建的bucket
           int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket); 
           
          ...
          
          for (int b = 0; b < insertBuckets; b++) {
            bucketNumbers.add(totalBuckets);
            if (b == insertBuckets - 1) {
              //针对最后一个buket处理,就是写完剩下的记录
              recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket);
            } else {
              recordsPerBucket.add(insertRecordsPerBucket);
            }
            BucketInfo bucketInfo = new BucketInfo();
            bucketInfo.bucketType = BucketType.INSERT;
            bucketInfo.partitionPath = partitionPath;
            bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
            bucketInfoMap.put(totalBuckets, bucketInfo);
            totalBuckets++;
          }

        }

    }

}


获取每个分区路径下小文件:
org.apache.hudi.table.action.commit.UpsertPartitioner#getSmallFiles

 if (!commitTimeline.empty()) { // if we have some commits
      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
      List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView()
          .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());

      for (HoodieBaseFile file : allFiles) {

        //获取小于 hoodie.parquet.small.file.limit 参数值就为小文件
        if (file.getFileSize() < config.getParquetSmallFileLimit()) {
          String filename = file.getFileName();
          SmallFile sf = new SmallFile();
          sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
          sf.sizeBytes = file.getFileSize();
          smallFileLocations.add(sf);
        }
      }
    }

UpsertPartitioner继承spark的Partitioner, hudi在写入的时候会利用spark 自定分区的机制优化记录分配到不同文件的能力, 从而达到在写入时不断优化解决小文件问题.

涉及到的关键配置:

  • hoodie.parquet.max.file.size:数据文件最大大小,Hudi将试着维护文件大小到该指定值;

  • hoodie.parquet.small.file.limit:小于该大小的文件均被视为小文件;

  • hoodie.copyonwrite.insert.split.size:单文件中插入记录条数,此值应与单个文件中的记录数匹配(可以根据最大文件大小和每个记录大小来确定)

在hudi写入时候如何使用、配置参数?

在写入hudi的代码中 .option中配置上述参数大小,如下:

.option(HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES, 120 * 1024 * 1024)

总结

本文主要介绍小文件的处理方法思路,以及通过阅读源码和相关资料学习hudi 如何在写入时智能的处理小文件问题新思路.Hudi利用spark 自定义分区的机制优化记录分配到不同文件的能力,达到小文件的合并处理.

参考

  1. https://www.cnblogs.com/leesf456/p/14642991.html
本文作者: chaplinthink, 关注领域:大数据、基础架构、系统设计, 一个热爱学习、分享的大数据工程师

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/5539.html

(0)
上一篇 2023-05-05
下一篇 2023-05-05

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注