Debezium的基本使用(以MySQL为例)「终于解决」

Debezium的基本使用(以MySQL为例)「终于解决」GreatSQL社区原创内容未经授权不得随意使用,转载请联系小编并注明来源。 GreatSQL是MySQL的国产分支版本,使用上与MySQL一致。 一、Debezium介绍 摘自官网: Debeziu

Debezium的基本使用(以MySQL为例)

  • GreatSQL社区原创内容未经授权不得随意使用,转载请联系小编并注明来源。
  • GreatSQL是MySQL的国产分支版本,使用上与MySQL一致。

一、Debezium介绍

摘自官网:

Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.

简单理解就是Debezium可以捕获数据库中所有行级的数据变化并包装成事件流顺序输出。

二、基本使用

下面以MySQL为例介绍Debezium的基本使用。

1. MySQL的准备工作

  1. 准备一个MySQL用户并且拥有相应权限,像这样:
CREATE USER "dbz"@"%" IDENTIFIED BY "dbzpwd";

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "dbz" IDENTIFIED BY "dbzpwd";
  1. 检查MySQL是否开启log-bin
SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name="log_bin";

-- If the following error occurs: The "INFORMATION_SCHEMA.GLOBAL_VARIABLES" feature is disabled...
-- please execute the given SQL again after execute this SQL: set global show_compatibility_56=on;

如果是OFF则需要修改MySQL配置文件,类似下面这样:

server-id         = 223344		#必须有
log_bin           = mysql-bin	#log_bin的值是binlog文件序列的基本名称
binlog_format     = ROW				#必须是ROW
binlog_row_image  = FULL			#必须是FULL
expire_logs_days  = 10				#依据实际情况而定
  1. 准备数据库&表
create database inventory;
create table inventory.a (id bigint primary key auto_increment, name varchar(32));
insert into inventory.a values (null, "n1"),(null, "n2"),(null, "n3");

2. 编写程序

2.1. 工程依赖(Maven)

pom.xml

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${version.debezium}</version>
</dependency>

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${version.debezium}</version>
</dependency>

目前Debezium最新稳定版本为:1.9.5.Final

2.2. 准备数据库&表

create database inventory;
create table inventory.a (id bigint primary key, name varchar(32));
insert into inventory.a values (1, "n1"),(2, "n2"),(3, "n3");

2.3. 代码编写

package com.greatdb.dbzdemo;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;

/**
 * @author wang.jianwen
 * @version 1.0
 * @date 2022/07/29
 */
public class DebeziumTest {

    private static DebeziumEngine<ChangeEvent<String, String>> engine;

    public static void main(String[] args) throws Exception {
        final Properties props = new Properties();
        props.setProperty("name", "dbz-engine");
        props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");

        //offset config begin - 使用文件来存储已处理的binlog偏移量
        props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
        props.setProperty("offset.storage.file.filename", "/tmp/dbz/storage/mysql_offsets.dat");
        props.setProperty("offset.flush.interval.ms", "0");
        //offset config end

        props.setProperty("database.server.name", "mysql-connector");
        props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
        props.setProperty("database.history.file.filename", "/tmp/dbz/storage/mysql_dbhistory.txt");

        props.setProperty("database.server.id", "122112");	//需要与MySQL的server-id不同
        props.setProperty("database.hostname", "tmg");
        props.setProperty("database.port", "3306");
        props.setProperty("database.user", "mysqluser");
        props.setProperty("database.password", "mysqlpw");
        props.setProperty("database.include.list", "inventory");//要捕获的数据库名
        props.setProperty("table.include.list", "inventory.a");//要捕获的数据表

        props.setProperty("snapshot.mode", "initial");//全量+增量

        // 使用上述配置创建Debezium引擎,输出样式为Json字符串格式
        engine = DebeziumEngine.create(Json.class)
                .using(props)
                .notifying(record -> {
                    System.out.println(record);//输出到控制台
                })
                .using((success, message, error) -> {
                    if (error != null) {
                        // 报错回调
                        System.out.println("------------error, message:" + message + "exception:" + error);
                    }
                    closeEngine(engine);
                })
                .build();

        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(engine);
        addShutdownHook(engine);
        awaitTermination(executor);

        System.out.println("------------main finished.");
    }

    private static void closeEngine(DebeziumEngine<ChangeEvent<String, String>> engine) {
        try {
            engine.close();
        } catch (IOException ignored) {
        }
    }

    private static void addShutdownHook(DebeziumEngine<ChangeEvent<String, String>> engine) {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> closeEngine(engine)));
    }

    private static void awaitTermination(ExecutorService executor) {
        if (executor != null) {
            try {
                executor.shutdown();
                while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

3. 测试

程序跑起来后,可以看到控制台输出:

...(省略)
EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":1}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":1,"name":"n1"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005186,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005191,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic="mysql-connector.inventory.a", kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=1,name=n1},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005186,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005191}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":2}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"n2"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005195,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic="mysql-connector.inventory.a", kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=2,name=n2},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005195,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":3}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":3,"name":"n3"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005196,"snapshot":"last","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154}} ConnectRecord{topic="mysql-connector.inventory.a", kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=3,name=n3},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005196,snapshot=last,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
...(省略)

可以看到全量的数据已经输出,关键的数据如下:

..."payload":{"before":null,"after":{"id":1,"name":"n1"}..."op":"r"...
..."payload":{"before":null,"after":{"id":2,"name":"n2"}..."op":"r"...
..."payload":{"before":null,"after":{"id":3,"name":"n3"}..."op":"r"...
  • 接下来新增一条数据:
insert into inventory.a values (4, "n4");

控制台输出:

..."payload":{"before":null,"after":{"id":4,"name":"n4"}..."op":"c"...
  • 修改一条数据:
update inventory.a set name = "n4-upd" where id = 4;

控制台输出:

..."payload":{"before":{"id":4,"name":"n4"},"after":{"id":4,"name":"n4-upd"}..."op":"u"...
  • 删除一条数据:
delete from inventory.a where id = 1;

控制台输出:

..."payload":{"before":{"id":1,"name":"n1"},"after":null..."op":"d"...

三、总结

本文以MySQL为例介绍了Debezium在代码中基本使用流程,对MySQL的数据进行常见的增删改操作,Debezium将捕获这些数据行的变化,并记录了数据行变化前后的数据,并对外提供事件流,外部可以获取并对事件进行相应处理。

参考:https://debezium.io/documentation/reference/1.8/index.html

原文地址:https://www.cnblogs.com/greatsql/archive/2022/08/20/16607170.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/4880.html

(0)
上一篇 2023-06-01
下一篇 2023-06-01

相关推荐

  • Python双小于符号的使用方法

    Python双小于符号的使用方法Python中,双小于符号“<<”是位运算符,表示将左操作数向左移动右操作数个二进制位。

    2024-01-12
    115
  • python对csv的操作的简单介绍

    python对csv的操作的简单介绍Python读取CSV文件方法如下:

    2023-11-19
    141
  • 翻译|是否应该在 Kubernetes 上运行数据库?

    翻译|是否应该在 Kubernetes 上运行数据库?数据库如何在 Kubernetes 上运行?如果可以,哪些类型的数据库和数据最适合使用 K8s?让我们一起来看看。 Kubernetes 是用于自动部署、扩展和管理容器化应用程序的一个开源的容器编排解

    2023-06-01
    148
  • t+0的技巧_T教石峰

    t+0的技巧_T教石峰摘要:T+0查询是指实时数据查询,数据查询统计时将涉及到最新产生的数据。 本文分享自华为云社区《大数据解决方案:解决T+0问题》,作者: 小虚竹 。 T+0问题 T+0查询是指实时数据查询,数据查询统

    2023-06-12
    158
  • 以Python判断数字为中心的方法

    以Python判断数字为中心的方法在日常生活中,我们经常需要找出给定数字数组或序列的中心位置,然后在此基础上进行特定的处理。这时候,一个简单而高效的算法可以帮助我们快速地找到数字的中心位置,这就是本文所要介绍的“以Python判断数字为中心的方法”。

    2024-08-07
    30
  • Excel – 字符串处理函数:LEFT, RIGHT, MID, LEN 和 FIND[通俗易懂]

    Excel – 字符串处理函数:LEFT, RIGHT, MID, LEN 和 FIND[通俗易懂]在单元格中输入公式,=MID(字符串内容或所在单元格,第一个字符开始的位置,要获取的字符个数)单元格里输入公式为,=FIND(要寻找的字符或字符串,字符串或其所在单元格,开始查找的位置)注意:这里使用的字符串位置,都是从1开始计数的,而不是和C语言一样,是从0开始计数。在单元格中输入公式,=RIGHT(字符串内容或所在单元格,从右侧开始的字符个数)在单元格中输入公式,=LEFT(字符串内容或所在单元格,从左侧开始的字符个数)根据某个分隔字符或字符串,截取右边的字符串。根据某个分隔字符,截取左边的字符串。

    2023-03-01
    139
  • mysql 主从复制[通俗易懂]

    mysql 主从复制[通俗易懂]1, 准备二台机器或者服务器 ,保持mysq 版本一样或者版本相差不大; 主机:114.215.198.39 从机:116.62.234.228 2 新建一个数据库 我的数据库是hlqzxm; 进入…

    2023-03-27
    165
  • 用Python绘制艺术图形

    用Python绘制艺术图形Python是一种常用的编程语言,广泛用于各种领域。除了常见的应用程序和网站开发之外,Python还可以用于艺术图形的创作。Python提供了丰富的图形库,可以方便地创建各种艺术图形作品。在本文中,我们将介绍如何用Python绘制艺术图形。

    2024-06-27
    44

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注