flink 使用sql实现kafka生产者和消费者

flink 使用sql实现kafka生产者和消费者1.maven依赖 UTF-8

	flink 使用sql实现kafka生产者和消费者[数据库教程]

1.maven依赖

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.11.2</flink.version>
        <logback.version>1.1.7</logback.version>
        <slf4j.version>1.7.25</slf4j.version>
    </properties>

    <dependencies>
        <dependency>
            <!-- Used by maven-dependency-plugin -->
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-wikiedits_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>${logback.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
        </dependency>
    </dependencies>

2.生产者

import com.g2.flink.models.CustomerStatusChangedEvent;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.table.api.Expressions.$;

/**
 * Hello world!
 */
//@Slf4j
public class KafkaTableStreamApiProducerTest {

    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                //.useOldPlanner() // flink
                .useBlinkPlanner() // blink
                .build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);

        Long baseTimestamp = 1600855709000L;
        DataStream<CustomerStatusChangedEvent> eventDataSet = env.fromElements(
                new CustomerStatusChangedEvent(1010L, 2, baseTimestamp),
                new CustomerStatusChangedEvent(1011L, 2, baseTimestamp + 100),
                new CustomerStatusChangedEvent(1011L, 1, baseTimestamp - 100),
                new CustomerStatusChangedEvent(1010L, 3, baseTimestamp + 150)
        );

        String ddl = "CREATE TABLE CustomerStatusChangedEvent(
" +
                "customerId int,
" +
                "oldStatus int,
" +
                "newStatus int,
" +
                "eventTime bigint
" +
                ") WITH(
" +
                "‘connector.type‘=‘kafka‘,
" +
                "‘connector.version‘=‘universal‘,
" +

                "‘connector.properties.bootstrap.servers‘=‘192.168.1.85:9092,192.168.1.86:9092,192.168.12.87:9092‘,
" +
                "‘connector.topic‘=‘customer_statusChangedEvent‘,
" +
               
                "‘format.type‘=‘json‘
" +
                ")
"
                ;
        tableEnvironment.executeSql(ddl);


        while (true) {
            try {
                TimeUnit.SECONDS.sleep(3);
                int status = (int) (System.currentTimeMillis() % 3);
                String insert = "insert into CustomerStatusChangedEvent(customerId,oldStatus,newStatus,eventTime)" +
                        "values(1001,1," + status + "," + System.currentTimeMillis() + ")";
                tableEnvironment.executeSql(insert);
            } catch (Exception ex) {

            }
        }

    }
}

 

3.消费者

import com.g2.flink.models.CustomerStatusChangedEvent;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Hello world!
 */
//@Slf4j
public class KafkaTableStreamApiConsumerTest {

    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                //.useOldPlanner() // flink
                .useBlinkPlanner() // blink
                .build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);

        Long baseTimestamp = 1600855709000L;
        DataStream<CustomerStatusChangedEvent> eventDataSet = env.fromElements(
                new CustomerStatusChangedEvent(1010L, 2, baseTimestamp),
                new CustomerStatusChangedEvent(1011L, 2, baseTimestamp + 100),
                new CustomerStatusChangedEvent(1011L, 1, baseTimestamp - 100),
                new CustomerStatusChangedEvent(1010L, 3, baseTimestamp + 150)
        );

        String ddl = "CREATE TABLE CustomerStatusChangedEvent(
" +
                "customerId int,
" +
                "oldStatus int,
" +
                "newStatus int,
" +
                "eventTime bigint
" +
                ") WITH(
" +
                "‘connector.type‘=‘kafka‘,
" +
                "‘connector.version‘=‘universal‘,
" +
                "‘connector.properties.group.id‘=‘g2_group‘,
" +
                "‘connector.properties.bootstrap.servers‘=‘192.168.1.85:9092,192.168.1.86:9092,192.168.1.87:9092‘,
" +
                "‘connector.topic‘=‘customer_statusChangedEvent‘,
" +
                "‘connector.startup-mode‘ = ‘latest-offset‘,
" +
                "‘format.type‘=‘json‘
" +
                ")
";
        tableEnvironment.executeSql(ddl);

        Table resultTb = tableEnvironment.sqlQuery("select customerId,newStatus as status " +
                " from CustomerStatusChangedEvent" +
                " where newStatus in(1,2)"
        );


    /*
    DataStream<Tuple2<Boolean, Tuple2<Integer, Integer>>> result = tableEnvironment.toRetractStream(resultTb,
                Types.TUPLE(Types.INT, Types.INT));

  */
        DataStream<CustomerStatusLog> result = tableEnvironment.toAppendStream(resultTb, CustomerStatusLog.class);
        result.print();

        try {
            env.execute();
        } catch (Exception ex) {

        }
    }

    public static class CustomerStatusLog {
        private Long customerId;

        private Integer status;

        public Long getCustomerId() {
            return customerId;
        }

        public void setCustomerId(Long customerId) {
            this.customerId = customerId;
        }

        public Integer getStatus() {
            return status;
        }

        public void setStatus(Integer newStatus) {
            this.status = newStatus;
        }

        public CustomerStatusLog() {

        }

        @Override
        public String toString() {
            return "CustomerStatusLog{" +
                    "customerId=" + customerId +
                    ", status=" + status +
                    ‘}‘;
        }
    }
}

 

4.消费者打印

4> CustomerStatusLog{customerId=1001, status=2}
4> CustomerStatusLog{customerId=1001, status=1}
4> CustomerStatusLog{customerId=1001, status=2}
4> CustomerStatusLog{customerId=1001, status=2}

flink 使用sql实现kafka生产者和消费者

原文地址:https://www.cnblogs.com/zhshlimi/p/13725081.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/6678.html

(0)
上一篇 2023-04-05
下一篇 2023-04-05

相关推荐

  • MySQL索引特性

    MySQL索引特性索引特性 索引:提高数据库的性能,索引是物美价廉的东西了。不用加内存,不用改程序,不用调sql,只要执行正确的 create index ,查询速度就可能提高成百上千倍。但是天下没有免费的午餐,查询…

    2023-02-17
    86
  • 给腾讯云数据库产品经理的几点小建议

    给腾讯云数据库产品经理的几点小建议本文作者:叶金荣,知数堂联合创始人,3306pai社区联合创始人 说说使用腾讯云数据库MySQL和CynosDB的几点感受。 近日对腾讯云旗下的两款数据库产品云数据库 MySQL(下面称为“标准版M…

    2023-02-01
    107
  • nginx日志归档_docker日志

    nginx日志归档_docker日志在我们线上的生产环境中要备份的东西很多,各种服务日志、数据库数据、用户上传数据、代码等等。用 JuiceFS 来备份可以节省你大量时间,我们会围绕这个主题写一系列的教程,整理出一套最佳实践,方便大家。

    2023-05-07
    104
  • 初中英语教学反思「建议收藏」

    初中英语教学反思「建议收藏」随着时代的发展和社会的进步,英语学习、英语教学越来越受到人们的普遍重视,初中英语教学反思。英语已从一种工具变成了一种思想,一种知识库。没有掌握英语犹如缺乏一种思想,缺少了一个重要的知识源泉。掌握了一…

    2022-12-25
    104
  • mysql配置问题「建议收藏」

    mysql配置问题「建议收藏」mysql8.0版本在配置文件my.ini[mysqld]加上skip-grant-tables后无法启动 需要手动去任务管理器中,关闭mysql服务,步骤如下: 需要用管理员身份打开一个命令提示符…

    2023-04-05
    114
  • 【漫画】ES原理 必知必会的倒排索引和分词「建议收藏」

    【漫画】ES原理 必知必会的倒排索引和分词「建议收藏」倒排索引的初衷 倒排索引,它也是索引。索引,初衷都是为了快速检索到你要的数据。 我相信你一定知道mysql的索引,如果对某一个字段加了索引,一般来说查询该字段速度是可以有显著的提升。 每种数据库都有自

    2023-03-04
    111
  • MySQL——创建数据库与表[亲测有效]

    MySQL——创建数据库与表[亲测有效]创建数据库 — Firsr way CREATE DATABASE database_name; — Second way CREATE SCHEMA database_name; 2.创建表 C

    2023-02-26
    102
  • excel快捷小技巧_电子表格办公小技巧汇总大全

    excel快捷小技巧_电子表格办公小技巧汇总大全天下武功,唯快不破。快既是一种境界,也是一种能力。今天就和大家分享6个Excel快速操作小技巧,让你可以节省更多时间,毕竟时间就是生命,时间就是金钱。1、快速求和求和大家都知道可以使用SUM函数,但是

    2023-03-02
    105

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注