大家好,我是考100分的小小码 ,祝大家学习进步,加薪顺利呀。今天说一说iceberg flink-source读取流程「建议收藏」,希望您对编程的造诣更进一步.
flink读取iceberg代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("file:///tmp/iceberg3");
DataStream<RowData> stream = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(true)
.build();
// Print all records to stdout.
stream.print();
// Submit and execute this streaming read job.
env.execute("Test Iceberg Batch Read");
首先调用创建一个Builder
public static Builder forRowData() {
return new Builder();
}
然后进行一些设置,在调用build()进行构建
public DataStream<RowData> build() {
Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
FlinkInputFormat format = buildFormat();
ScanContext context = contextBuilder.build();
TypeInformation<RowData> typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(context.project()));
if (!context.isStreaming()) {
int parallelism = inferParallelism(format, context);
return env.createInput(format, typeInfo).setParallelism(parallelism);
} else {
StreamingMonitorFunction function = new StreamingMonitorFunction(tableLoader, context);
String monitorFunctionName = String.format("Iceberg table (%s) monitor", table);
String readerOperatorName = String.format("Iceberg table (%s) reader", table);
return env.addSource(function, monitorFunctionName)
.transform(readerOperatorName, typeInfo, StreamingReaderOperator.factory(format));
}
}
buildFormat()构建一些基本信息,如iceberg表,io,schema等信息 然后分两种情况,批读取和流读取
看下面流读取分为两个算子Source和StreamingReaderOperator
这里StreamingMonitorFunction的逻辑大概是不停的扫描iceberg表看是否有新的snapshot生成,如果有则生成CombinedScanTask发向下游
StreamingReaderOperator中一旦收到source发来的split,会将其放到一个队列中,然后通过一个MailboxExecutor线程处理,这种结构可以将读取数据和处理checkpoint barriers功能分离,避免潜在的背压。
这里读取算子是通过 transform(readerOperatorName, typeInfo, StreamingReaderOperator.factory(format))
创建的,这里transform第三个参数没有直接传一个OneInputStreamOperator对象,而是传了一个OneInputStreamOperatorFactory对象, 这里实现了OneInputStreamOperatorFactory中的createStreamOperator方法去创建OneInputStreamOperator对象,这里同时实现了YieldingOperatorFactory中的setMailboxExecutor方法。
之类先屡一下flink的task的结构,最底层是StreamTask,上一层是StreamOperator,然后还有更上层的User-define-function,这里直接使用过的Operator,flink的taskmanager对算子进行初始化的时候先创建StreamTask,然后调用invoke开始工作,其中需要先进行beforeInvoke(),这里会生成operatorChain,最后在StreamOperatorFactoryUtil中的createOperator创建StreamOperator,这里如果operatorFactory是YieldingOperatorFactory类型,则调用setMailboxExecutor。
我的理解这里是个将mailboxExecutor暴露到外层的一个接口,Yielding表示的正好是线程中让出的意思,即可以实现StreamTask在处理中途让出线程的功能
if (operatorFactory instanceof YieldingOperatorFactory) {
((YieldingOperatorFactory<?>) operatorFactory).setMailboxExecutor(mailboxExecutor);
}
在OperatorFactory实现了setMailboxExecutor,然后在创建的时候传给StreamingReaderOperator
然后看下StreamingReaderOperator类成员变量
private final MailboxExecutor executor; private FlinkInputFormat format; private transient SourceFunction.SourceContext sourceContext; private transient ListState inputSplitsState; private transient Queue splits;
private final MailboxExecutor executor;
private FlinkInputFormat format;
private transient SourceFunction.SourceContext<RowData> sourceContext;
private transient ListState<FlinkInputSplit> inputSplitsState;
private transient Queue<FlinkInputSplit> splits;
private transient SplitState currentSplitState;
其中MailboxExecutor executor
是暴露出来的一个执行器,这个线程同时处理用户操作和checkpoint动作,我们一次只预定一个InputSplit去读取,因此当新的checkpoint到达是能被触发而不是被InputSplit读取操作阻塞。
ListState<FlinkInputSplit> inputSplitsState
为存储FlinkInputSplit的状态变量,即需要被读取的FlinkInputSplit,会在checkpoint持久化
Queue<FlinkInputSplit> splits
为当前周期需要读取的FlinkInputSplit,会在initializeState从inputSplitsState读出来
SplitState currentSplitState
表示当前的读取状态,
@Override
public void processElement(StreamRecord<FlinkInputSplit> element) {
splits.add(element.getValue());
enqueueProcessSplits();
}
这里没收到一个消息,将其加入splits然后调用enqueueProcessSplits方法
private void enqueueProcessSplits() {
if (currentSplitState == SplitState.IDLE && !splits.isEmpty()) {
currentSplitState = SplitState.RUNNING;
executor.execute(this::processSplits, this.getClass().getSimpleName());
}
}
当SplitState状态为IDLE且splits不为空时,executor才开始执行读取数据的方法processSplits,同时将currentSplitState设置为RUNNING,因此不是每次收到消息就会马上开始读取对应的数据
private void processSplits() throws IOException {
FlinkInputSplit split = splits.poll();
if (split == null) {
currentSplitState = SplitState.IDLE;
return;
}
format.open(split);
try {
RowData nextElement = null;
while (!format.reachedEnd()) {
nextElement = format.nextRecord(nextElement);
sourceContext.collect(nextElement);
}
} finally {
currentSplitState = SplitState.IDLE;
format.close();
}
// Re-schedule to process the next split.
enqueueProcessSplits();
}
每次从splits中弹出一个FlinkInputSplit开始读取,结束之后将currentSplitState重新设为IDLE,然后再次调用enqueueProcessSplits读取下一个split
StreamingReaderOperator中有一个成员变量为FlinkInputFormat format FlinkInputFormat继承自flink中的RichInputFormat,RichInputFormat继承自InputFormat,InputFormat为读取数据时候的一个抽象类,类似spark中的DataSource,一些数据的读取数据的相关类都基于它实现,如读取mysql的数据时使用JDBCInputFormat类,InputFormat读取数据也是使用迭代器的模式其中需要实现 void open(T split) throws IOException; boolean reachedEnd() throws IOException; OT nextRecord(OT reuse) throws IOException; void close() throws IOException; 等方法
其中T继承自InputSplit,在iceberg中实现类为FlinkInputSplit,其中成员变量包括int型的splitNumber和一个CombinedScanTask对象
在FlinkInputFormat中调用open方法传入FlinkInputSplit对象开始读取数据
this.format.open(split);
为FlinkInputFormat对象format中的RowDataIterator iterator变量进行初始化,RowDataIterator对应一个CombinedScanTask的数据读取的迭代器,这里会传入表的schema信息,
@Override
public void open(FlinkInputSplit split) {
this.iterator = new RowDataIterator(
split.getTask(), io, encryption, tableSchema, context.project(), context.nameMapping(),
context.caseSensitive());
}
然后调用format中的nextRecord读取数据,这里读取的数据nextElement是后面添加了一行_pos列的数据,
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/13913.html