「Flink」使用Java lambda表达式实现Flink WordCount

「Flink」使用Java lambda表达式实现Flink WordCount本篇我们将使用Java语言来实现Flink的单词统计。代码开发环境准备导入Flink 1.9 pom依赖<dependencies> <dependency> <grou

「Flink」使用Java lambda表达式实现Flink WordCount

本篇我们将使用Java语言来实现Flink的单词统计。

代码开发

环境准备

导入Flink 1.9 pom依赖

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.7</version>
        </dependency>
    </dependencies>

代码100分

构建Flink流处理环境

代码100分StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

自定义source

每秒生成一行文本

DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
            private boolean isCanal = false;
            private String[] words = {
                    "important oracle jdk license update",
                    "the oracle jdk license has changed for releases starting april 16 2019",
                    "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
                    "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
                    "downloading and using this product an faq is available here ",
                    "commercial license and support is available with a low cost java se subscription",
                    "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
            };

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                // 每秒发送一行文本
                while (!isCanal) {
                    int randomIndex = RandomUtils.nextInt(0, words.length);
                    ctx.collect(words[randomIndex]);
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanal = true;
            }
        });

单词计算

代码100分// 3. 单词统计
        // 3.1 将文本行切分成一个个的单词
        SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
            // 切分单词
            Arrays.stream(line.split(" ")).forEach(word -> {
                ctx.collect(word);
            });
        }).returns(Types.STRING);

        //3.2 将单词转换为一个个的元组
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        // 3.3 按照单词进行分组
        KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

        // 3.4 对每组单词数量进行累加
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
                .timeWindow(Time.seconds(3))
                .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

        resultDS.print();

参考代码

public class WordCount {
    public static void main(String[] args) throws Exception {
        // 1. 构建Flink流式初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 自定义source - 每秒发送一行文本
        DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
            private boolean isCanal = false;
            private String[] words = {
                    "important oracle jdk license update",
                    "the oracle jdk license has changed for releases starting april 16 2019",
                    "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
                    "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
                    "downloading and using this product an faq is available here ",
                    "commercial license and support is available with a low cost java se subscription",
                    "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
            };

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                // 每秒发送一行文本
                while (!isCanal) {
                    int randomIndex = RandomUtils.nextInt(0, words.length);
                    ctx.collect(words[randomIndex]);
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanal = true;
            }
        });

        // 3. 单词统计
        // 3.1 将文本行切分成一个个的单词
        SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
            // 切分单词
            Arrays.stream(line.split(" ")).forEach(word -> {
                ctx.collect(word);
            });
        }).returns(Types.STRING);

        //3.2 将单词转换为一个个的元组
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        // 3.3 按照单词进行分组
        KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

        // 3.4 对每组单词数量进行累加
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
                .timeWindow(Time.seconds(3))
                .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

        resultDS.print();

        env.execute("app");
    }
}

Flink对Java Lambda表达式支持情况

Flink支持Java API所有操作符使用Lambda表达式。但是,但Lambda表达式使用Java泛型时,就需要声明类型信息。

我们来看下上述的这段代码:

SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
            // 切分单词
            Arrays.stream(line.split(" ")).forEach(word -> {
                ctx.collect(word);
            });
        }).returns(Types.STRING);

之所以这里将所有的类型信息,因为Flink无法正确自动推断出来Collector中带的泛型。我们来看一下FlatMapFuntion的源代码

@Public
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {

/**
* The core method of the FlatMapFunction. Takes an element from the input data set and transforms
* it into zero, one, or more elements.
*
* @param value The input value.
* @param out The collector for returning result values.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
void flatMap(T value, Collector<O> out) throws Exception;
}

我们发现 flatMap的第二个参数是Collector<O>,是一个带参数的泛型。Java编译器编译该代码时会进行参数类型擦除,所以Java编译器会变成成:

void flatMap(T value, Collector out)

这种情况,Flink将无法自动推断类型信息。如果我们没有显示地提供类型信息,将会出现以下错误:

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of "Collector" are missing.
    In many cases lambda methods don"t provide enough information for automatic type extraction when Java generics are involved.
    An easy workaround is to use an (anonymous) class instead that implements the "org.apache.flink.api.common.functions.FlatMapFunction" interface.
    Otherwise the type has to be specified explicitly using type information.

这种情况下,必须要显示指定类型信息,否则输出将返回值视为Object类型,这将导致Flink无法正确序列化。

所以,我们需要显示地指定Lambda表达式的参数类型信息,并通过returns方法显示指定输出的类型信息

我们再看一段代码:

SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

为什么map后面也需要指定类型呢?

因为此处map返回的是Tuple2类型,Tuple2是带有泛型参数,在编译的时候同样会被查出泛型参数信息,导致Flink无法正确推断。

更多关于对Java Lambda表达式的支持请参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/10112.html

(0)
上一篇 2023-01-25
下一篇 2023-01-25

相关推荐

  • 解析动态IP地址的方法

    解析动态IP地址的方法IP地址是网络通信中的重要概念,是唯一标识互联网上设备的地址。IP地址分为静态IP地址和动态IP地址两种,而动态IP地址是不固定的,会随着时间和网络环境的变化而变化。动态IP地址的变化对网络通信产生了一定的影响,因此需要解析动态IP地址的方法。

    2024-05-13
    71
  • Python函数:优化代码结构与逻辑复用

    Python函数:优化代码结构与逻辑复用
    在Python编程中,函数是一组可以重复使用的代码块。在代码设计和编程中,函数是一个重要的概念。良好的函数设计可以带来许多好处,包括代码复用、可测试性和代码维护性。

    2024-01-19
    112
  • 分布式 | ddl 失败在 dble 中排查过程[通俗易懂]

    分布式 | ddl 失败在 dble 中排查过程[通俗易懂]作者:冒飞 爱可生 dble 项目测试组成员,负责 dble 相关测试工作,拥有多年数据库中间件测试经验,擅长故障排查及性能调优。 本文来源:原创投稿 *爱可生开源社区出品,原创内容未经授权不得随意…

    2023-03-05
    161
  • python的包都安装在哪里(python包放在哪里)

    python的包都安装在哪里(python包放在哪里)python使用pip包管理器来安装、删除、管理软件包,使用pip安装软件包会自动安装包所依赖的其它包,而无需手动安装。最新版本的python中已经默认安装了pip包管理器,老版本的需要自己手动安装。pip包管理器的使用方法如下:依次点击开始运行,输入cmd打开命令提示符 pip install 包名称 通过以上两步即可实现包的安装,同样在命令提示符下输入:pip list 可以查看所有已经安装的包。

    2023-10-26
    148
  • Python Obj Age: 优化程序性能的关键

    Python Obj Age: 优化程序性能的关键Python是一种非常流行的编程语言,因为它易学易用,是一种高级语言,同时也是一种解释型语言。Python Object的Age是优化Python代码的关键,它是程序的重要组成部分,影响程序代码的性能。

    2024-03-08
    81
  • centos7 源码安装mysql5.6「建议收藏」

    centos7 源码安装mysql5.6「建议收藏」这篇博客的主要内容是在CentOS7服务器上搭建一个MySQL5.6版本的数据库服务。 1、我的当前环境: [root@local-test ~]# cat /proc/cpuinfo |grep …

    2023-04-03
    157
  • 【赵强老师】Oracle RAC集群的概念「终于解决」

    【赵强老师】Oracle RAC集群的概念「终于解决」一、什么是Oracle RAC(Real Application Cluster)? Oracle RAC 是一个具有共享缓存架构的集群数据库,它克服了传统的无共享方法和共享磁盘方法的限制,为您的所有

    2023-02-14
    159
  • 使用Python对亚马逊商品进行ASIN编码

    使用Python对亚马逊商品进行ASIN编码Amazon Standard Identification Number,简称ASIN,是亚马逊独有的商品识别编码格式。该编码由10位英文字母和数字组成,是每个亚马逊商品都有的唯一标识。

    2023-12-21
    114

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注