iceberg flink-source读取流程「建议收藏」

iceberg flink-source读取流程「建议收藏」flink读取iceberg代码 首先调用创建一个Builder 然后进行一些设置,在调用build()进行构建 buildFormat()构建一些基本信息,如iceberg表,io,schema等信

iceberg

flink读取iceberg代码

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        TableLoader tableLoader = TableLoader.fromHadoopTable("file:///tmp/iceberg3");
        DataStream<RowData> stream = FlinkSource.forRowData()
                .env(env)
                .tableLoader(tableLoader)
                .streaming(true)
                .build();

        // Print all records to stdout.
        stream.print();

        // Submit and execute this streaming read job.
        env.execute("Test Iceberg Batch Read");

首先调用创建一个Builder

  public static Builder forRowData() {
    return new Builder();
  }

然后进行一些设置,在调用build()进行构建

    public DataStream<RowData> build() {
      Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
      FlinkInputFormat format = buildFormat();

      ScanContext context = contextBuilder.build();
      TypeInformation<RowData> typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(context.project()));

      if (!context.isStreaming()) {
        int parallelism = inferParallelism(format, context);
        return env.createInput(format, typeInfo).setParallelism(parallelism);
      } else {
        StreamingMonitorFunction function = new StreamingMonitorFunction(tableLoader, context);

        String monitorFunctionName = String.format("Iceberg table (%s) monitor", table);
        String readerOperatorName = String.format("Iceberg table (%s) reader", table);

        return env.addSource(function, monitorFunctionName)
            .transform(readerOperatorName, typeInfo, StreamingReaderOperator.factory(format));
      }
    }

buildFormat()构建一些基本信息,如iceberg表,io,schema等信息 然后分两种情况,批读取和流读取

看下面流读取分为两个算子Source和StreamingReaderOperator

这里StreamingMonitorFunction的逻辑大概是不停的扫描iceberg表看是否有新的snapshot生成,如果有则生成CombinedScanTask发向下游

StreamingReaderOperator中一旦收到source发来的split,会将其放到一个队列中,然后通过一个MailboxExecutor线程处理,这种结构可以将读取数据和处理checkpoint barriers功能分离,避免潜在的背压。

这里读取算子是通过 transform(readerOperatorName, typeInfo, StreamingReaderOperator.factory(format)) 创建的,这里transform第三个参数没有直接传一个OneInputStreamOperator对象,而是传了一个OneInputStreamOperatorFactory对象, 这里实现了OneInputStreamOperatorFactory中的createStreamOperator方法去创建OneInputStreamOperator对象,这里同时实现了YieldingOperatorFactory中的setMailboxExecutor方法。

之类先屡一下flink的task的结构,最底层是StreamTask,上一层是StreamOperator,然后还有更上层的User-define-function,这里直接使用过的Operator,flink的taskmanager对算子进行初始化的时候先创建StreamTask,然后调用invoke开始工作,其中需要先进行beforeInvoke(),这里会生成operatorChain,最后在StreamOperatorFactoryUtil中的createOperator创建StreamOperator,这里如果operatorFactory是YieldingOperatorFactory类型,则调用setMailboxExecutor。

我的理解这里是个将mailboxExecutor暴露到外层的一个接口,Yielding表示的正好是线程中让出的意思,即可以实现StreamTask在处理中途让出线程的功能

if (operatorFactory instanceof YieldingOperatorFactory) {
    ((YieldingOperatorFactory<?>) operatorFactory).setMailboxExecutor(mailboxExecutor);
}

在OperatorFactory实现了setMailboxExecutor,然后在创建的时候传给StreamingReaderOperator

然后看下StreamingReaderOperator类成员变量

private final MailboxExecutor executor; private FlinkInputFormat format; private transient SourceFunction.SourceContext sourceContext; private transient ListState inputSplitsState; private transient Queue splits;

private final MailboxExecutor executor;
private FlinkInputFormat format;
private transient SourceFunction.SourceContext<RowData> sourceContext;
private transient ListState<FlinkInputSplit> inputSplitsState;
private transient Queue<FlinkInputSplit> splits;
private transient SplitState currentSplitState;

其中MailboxExecutor executor是暴露出来的一个执行器,这个线程同时处理用户操作和checkpoint动作,我们一次只预定一个InputSplit去读取,因此当新的checkpoint到达是能被触发而不是被InputSplit读取操作阻塞。

ListState<FlinkInputSplit> inputSplitsState为存储FlinkInputSplit的状态变量,即需要被读取的FlinkInputSplit,会在checkpoint持久化

Queue<FlinkInputSplit> splits为当前周期需要读取的FlinkInputSplit,会在initializeState从inputSplitsState读出来

SplitState currentSplitState表示当前的读取状态,

  @Override
  public void processElement(StreamRecord<FlinkInputSplit> element) {
    splits.add(element.getValue());
    enqueueProcessSplits();
  }

这里没收到一个消息,将其加入splits然后调用enqueueProcessSplits方法

  private void enqueueProcessSplits() {
    if (currentSplitState == SplitState.IDLE && !splits.isEmpty()) {
      currentSplitState = SplitState.RUNNING;
      executor.execute(this::processSplits, this.getClass().getSimpleName());
    }
  }

当SplitState状态为IDLE且splits不为空时,executor才开始执行读取数据的方法processSplits,同时将currentSplitState设置为RUNNING,因此不是每次收到消息就会马上开始读取对应的数据

private void processSplits() throws IOException {
    FlinkInputSplit split = splits.poll();
    if (split == null) {
      currentSplitState = SplitState.IDLE;
      return;
    }

    format.open(split);
    try {
      RowData nextElement = null;
      while (!format.reachedEnd()) {
        nextElement = format.nextRecord(nextElement);
        sourceContext.collect(nextElement);
      }
    } finally {
      currentSplitState = SplitState.IDLE;
      format.close();
    }

    // Re-schedule to process the next split.
    enqueueProcessSplits();
  }

每次从splits中弹出一个FlinkInputSplit开始读取,结束之后将currentSplitState重新设为IDLE,然后再次调用enqueueProcessSplits读取下一个split

StreamingReaderOperator中有一个成员变量为FlinkInputFormat format FlinkInputFormat继承自flink中的RichInputFormat,RichInputFormat继承自InputFormat,InputFormat为读取数据时候的一个抽象类,类似spark中的DataSource,一些数据的读取数据的相关类都基于它实现,如读取mysql的数据时使用JDBCInputFormat类,InputFormat读取数据也是使用迭代器的模式其中需要实现 void open(T split) throws IOException; boolean reachedEnd() throws IOException; OT nextRecord(OT reuse) throws IOException; void close() throws IOException; 等方法

其中T继承自InputSplit,在iceberg中实现类为FlinkInputSplit,其中成员变量包括int型的splitNumber和一个CombinedScanTask对象

在FlinkInputFormat中调用open方法传入FlinkInputSplit对象开始读取数据

this.format.open(split);

为FlinkInputFormat对象format中的RowDataIterator iterator变量进行初始化,RowDataIterator对应一个CombinedScanTask的数据读取的迭代器,这里会传入表的schema信息,

  @Override
  public void open(FlinkInputSplit split) {
    this.iterator = new RowDataIterator(
        split.getTask(), io, encryption, tableSchema, context.project(), context.nameMapping(),
        context.caseSensitive());
  }

然后调用format中的nextRecord读取数据,这里读取的数据nextElement是后面添加了一行_pos列的数据,

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/13913.html

(0)

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注