mapreduce的框架_数据切片原理

mapreduce的框架_数据切片原理Hadoop 2. InputSplit 切片类 1.0 类的作用 InputSplit 他在逻辑上包含了提供给处理这个Inputsplit的Mapper的所有的key-value 1.1 抽象方法

03_MapReduce框架原理_3.4 InputSplit 切片类(源码)

Hadoop

2. InputSplit 切片类 1.0 类的作用 InputSplit 他在逻辑上包含了提供给处理这个Inputsplit的Mapper的所有的key-value 1.1 抽象方法 1. public abstract long getLength() 2. public abstract String[] getLocations() 1. 功能说明 获取 InputSplit对象的大小(Bytes) 支持根据 InputSplit 的size 来排序 1. 功能说明 获取 该切片 存储节点的位置信息 1.2 FileSplit 实现类 1. 成员属性 1. private Path file 2. private long start 3. private long length 4. private String[] hosts 该切片 所属文件的路径 切片起始位置 切片长度 存储切片的hosts 1.3 CombineFileSplit 实现类 为每个MapTask 提供一个InputSplit对象,包含了 这个MapTask要处理的数据

点击查看InputSplit
// 切片类,表示 一份被Mapper处理的数据
public abstract class InputSplit {
 
  // 获取切片对象的 长度(单位Bytes)
  public abstract long getLength() throws IOException, InterruptedException;

  // 获取当前切片对象的 存储信息
  public abstract 
    String[] getLocations() throws IOException, InterruptedException;
  
  // 获取所有切片对象的 存储信息
  public SplitLocationInfo[] getLocationInfo() throws IOException {
    return null;
  }
}

FileSplit对应的是一个输入文件,也就是说,如果用FileSplit对应的FileInputFormat作为输入格式,
那么即使文件特别小,也是作为一个单独的InputSplit来处理,而每一个InputSplit将会由一个独立的Mapper Task来处理。
在输入数据是由大量小文件组成的情形下,就会有同样大量的InputSplit,
从而需要同样大量的Mapper来处理,大量的Mapper Task创建销毁开销将是巨大的,甚至对集群来说,是灾难性的!

点击查看FileSplit
// 切片类,表示 一份被Mapper处理的数据
// 作为 InputFormat的getSplits方法的返回值
// 作为 InputFormat的createRecordReader方法的输入
// 每个切片 包含文件的一部分 或者整个文件(不可切分或者 文件大小小于切片*1.1时)
public class FileSplit extends InputSplit implements Writable {
  private Path file; // 切片 所属的文件名称
  private long start;  // 切片对应 在文件中的 启示位置
  private long length; // 切片长度(字节数)
  private String[] hosts; // 切片 所属 block的存储host信息
  private SplitLocationInfo[] hostInfos;

  // 构造器
  public FileSplit() {}

  // 构造器
  public FileSplit(Path file, long start, long length, String[] hosts) {
    this.file = file;
    this.start = start;
    this.length = length;
    this.hosts = hosts;
  }
  
 // 构造器
 public FileSplit(Path file, long start, long length, String[] hosts,
     String[] inMemoryHosts) {
   this(file, start, length, hosts);
   hostInfos = new SplitLocationInfo[hosts.length];
   for (int i = 0; i < hosts.length; i++) {
     // because N will be tiny, scanning is probably faster than a HashSet
     boolean inMemory = false;
     for (String inMemoryHost : inMemoryHosts) {
       if (inMemoryHost.equals(hosts[i])) {
         inMemory = true;
         break;
       }
     }
     hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory);
   }
 }
 
  /** The file containing this split"s data. */
  public Path getPath() { return file; }
  
  /** The position of the first byte in the file to process. */
  public long getStart() { return start; }
  
  /** The number of bytes in the file to process. */
  @Override
  public long getLength() { return length; }

  @Override
  public String toString() { return file + ":" + start + "+" + length; }

  ////////////////////////////////////////////
  // Writable methods  序列化方法
  ////////////////////////////////////////////

  @Override
  public void write(DataOutput out) throws IOException {
    Text.writeString(out, file.toString());
    out.writeLong(start);
    out.writeLong(length);
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    file = new Path(Text.readString(in));
    start = in.readLong();
    length = in.readLong();
    hosts = null;
  }

  @Override
  public String[] getLocations() throws IOException {
    if (this.hosts == null) {
      return new String[]{};
    } else {
      return this.hosts;
    }
  }
  
  @Override
  @Evolving
  public SplitLocationInfo[] getLocationInfo() throws IOException {
    return hostInfos;
  }
}

CombineFileSplit是针对小文件的分片,它将一系列小文件封装在一个InputSplit内,这样一个Mapper就可以处理多个小文件。
可以有效的降低进程开销。与FileSplit类似,CombineFileSplit同样包含文件路径,分片起始位置,
分片大小和分片数据所在的host列表四个属性,只不过这些属性不再是一个值,而是一个列表。
需要注意的一点是,CombineFileSplit的getLength()方法,返回的是这一系列数据的数据的总长度。

点击查看CombineFileSplit
// 切片类,表示 一份被Mapper处理的数据
// 一个切片对象,可以包含多个文件
public class CombineFileSplit extends InputSplit implements Writable {

  private Path[] paths;
  private long[] startoffset;
  private long[] lengths;
  private String[] locations;
  private long totLength;

  /**
   * default constructor
   */
  public CombineFileSplit() {}
  public CombineFileSplit(Path[] files, long[] start, 
                          long[] lengths, String[] locations) {
    initSplit(files, start, lengths, locations);
  }

  public CombineFileSplit(Path[] files, long[] lengths) {
    long[] startoffset = new long[files.length];
    for (int i = 0; i < startoffset.length; i++) {
      startoffset[i] = 0;
    }
    String[] locations = new String[files.length];
    for (int i = 0; i < locations.length; i++) {
      locations[i] = "";
    }
    initSplit(files, startoffset, lengths, locations);
  }
  
  private void initSplit(Path[] files, long[] start, 
                         long[] lengths, String[] locations) {
    this.startoffset = start;
    this.lengths = lengths;
    this.paths = files;
    this.totLength = 0;
    this.locations = locations;
    for(long length : lengths) {
      totLength += length;
    }
  }

  /**
   * Copy constructor
   */
  public CombineFileSplit(CombineFileSplit old) throws IOException {
    this(old.getPaths(), old.getStartOffsets(),
         old.getLengths(), old.getLocations());
  }

  public long getLength() {
    return totLength;
  }

  /** Returns an array containing the start offsets of the files in the split*/ 
  public long[] getStartOffsets() {
    return startoffset;
  }
  
  /** Returns an array containing the lengths of the files in the split*/ 
  public long[] getLengths() {
    return lengths;
  }

  /** Returns the start offset of the i<sup>th</sup> Path */
  public long getOffset(int i) {
    return startoffset[i];
  }
  
  /** Returns the length of the i<sup>th</sup> Path */
  public long getLength(int i) {
    return lengths[i];
  }
  
  /** Returns the number of Paths in the split */
  public int getNumPaths() {
    return paths.length;
  }

  /** Returns the i<sup>th</sup> Path */
  public Path getPath(int i) {
    return paths[i];
  }
  
  /** Returns all the Paths in the split */
  public Path[] getPaths() {
    return paths;
  }

  /** Returns all the Paths where this input-split resides */
  public String[] getLocations() throws IOException {
    return locations;
  }

  public void readFields(DataInput in) throws IOException {
    totLength = in.readLong();
    int arrLength = in.readInt();
    lengths = new long[arrLength];
    for(int i=0; i<arrLength;i++) {
      lengths[i] = in.readLong();
    }
    int filesLength = in.readInt();
    paths = new Path[filesLength];
    for(int i=0; i<filesLength;i++) {
      paths[i] = new Path(Text.readString(in));
    }
    arrLength = in.readInt();
    startoffset = new long[arrLength];
    for(int i=0; i<arrLength;i++) {
      startoffset[i] = in.readLong();
    }
  }

  public void write(DataOutput out) throws IOException {
    out.writeLong(totLength);
    out.writeInt(lengths.length);
    for(long length : lengths) {
      out.writeLong(length);
    }
    out.writeInt(paths.length);
    for(Path p : paths) {
      Text.writeString(out, p.toString());
    }
    out.writeInt(startoffset.length);
    for(long length : startoffset) {
      out.writeLong(length);
    }
  }
  
  @Override
 public String toString() {
    StringBuffer sb = new StringBuffer();
    for (int i = 0; i < paths.length; i++) {
      if (i == 0 ) {
        sb.append("Paths:");
      }
      sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] +
                "+" + lengths[i]);
      if (i < paths.length -1) {
        sb.append(",");
      }
    }
    if (locations != null) {
      String locs = "";
      StringBuffer locsb = new StringBuffer();
      for (int i = 0; i < locations.length; i++) {
        locsb.append(locations[i] + ":");
      }
      locs = locsb.toString();
      sb.append(" Locations:" + locs + "; ");
    }
    return sb.toString();
  }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/5604.html

(0)
上一篇 2023-05-02
下一篇 2023-05-02

相关推荐

  • 线上千万级大表排序优化

    线上千万级大表排序优化前言   大家好我是不一样的科技宅,每天进步一点点,体验不一样的生活,今天我们聊一聊Mysql大表查询优化,前段时间应急群有客服反馈,会员管理功能无法按到店时间、到店次数、消费金额 进行排序。经过排…

    2023-02-03
    150
  • Vue实现导出excel表格

    Vue实现导出excel表格iView可以实现表格的导出,不过只能导出csv格式的,并不适合项目需求。

    2023-03-02
    163
  • Postgresql执行计划概述「建议收藏」

    Postgresql执行计划概述「建议收藏」执行计划个人理解是一个“点”,“线”,“面”的问题,关系数据库中执行计划是一个同质化的对象,串联起来还是比较容易掌握的,对于一条复杂的sql,所谓的点就是其中单个表的访问方式,线是表之间的连接驱动顺

    2023-03-29
    157
  • Python不是内部或外部命令的解决方法

    Python不是内部或外部命令的解决方法在使用Python时,有时我们会遇到“Python不是内部或外部命令”的错误提示,这个错误提示会让我们无从下手,不知道该怎么解决。本文将介绍如何解决这个问题,并提供几种可行的方法。

    2024-05-15
    73
  • 使用Python实现堆

    使用Python实现堆堆(Heap)是一种特殊的树形数据结构,其中每个节点都满足其父节点的值大于或等于(小于或等于)其子节点的值。堆结构最常用于排序算法中,常见的有堆排序,堆还可以在优先队列、图形算法等领域中使用。

    2024-07-25
    34
  • 使用Django搭建Web应用

    使用Django搭建Web应用随着移动设备的普及,Web应用的需求越来越大。Python的Django框架就是一个非常流行的Web应用框架,它帮助开发者高效地设计和开发Web应用。

    2024-06-23
    44
  • 使用Pycharm设置代码编码格式

    使用Pycharm设置代码编码格式 在编写代码过程中,正确地设置编码格式显得十分重要,否则代码可能会因为编码不对而出现各种错误,而这些错误也可能会是隐藏的,难以发现。在本文中,我们将介绍如何使用Pycharm来正确设置代码的编码格式。

    2024-06-17
    45
  • MySQL——创建数据库与表[亲测有效]

    MySQL——创建数据库与表[亲测有效]创建数据库 — Firsr way CREATE DATABASE database_name; — Second way CREATE SCHEMA database_name; 2.创建表 C

    2023-02-26
    146

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注