分布式是什么意思_宏可以实现的功能不包括

分布式是什么意思_宏可以实现的功能不包括作者:路路 热爱技术、乐于分享的技术人,目前主要从事数据库相关技术的研究。 本文来源:原创投稿 *爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。 1.概述 本篇文章主要…

分布式 | DBLE LOAD DATA 功能实现解析


1.概述

本篇文章主要介绍 DBLE LOAD DATA 大规模数据导入功能的实现,包括方案设计、源码解读。

下面就让我们一起来探秘 DBLE 是如何实现该功能的吧!

2.方案设计

LOAD DATA 为 MySQL 提供的从文本文件导入数据到表的语法,作为数据库中间件,当然也需要实现对应的功能,来满足用户的导入数据需求。

DBLE 对该功能的实现其实就是直接模拟了 MySQL 对 LOAD DATA 命令相应的处理协议。当然作为数据库中间件,还需要处理相应数据的存储、数据路由情况以及与后端 MySQL 的交互等方面的逻辑。

下图即为 DBLE 对 LOAD DATA 处理的整体流程:

DBLE LOAD DATA整体处理流程

3.源码解读

DBLE 与 LOAD DATA 功能实现相关的类其实主要有两个,一个是 ServerLoadDataInfileHandler 类,一个是 LoadDataUtil 类,ServerLoadDataInfileHandler 类主要处理的是与客户端交互的逻辑,而 LoadDataUtil 类主要处理的是与后端 MySQL 交互的逻辑。

下面我们就从客户端发送命令到 DBLE 处理,最后到 DBLE 与后端 MySQL 交互的过程,来详细看下相应的代码。

当客户端发来 LOAD DATA 导入数据到表命令的时候,DBLE 作为服务端会接收到相应的命令并进行处理,对应的代码在 ServerQueryHandler#query 方法中,这里会判断 SQL 的类型为 LOAD DATA,然后进一步处理:

public void query(String sql) {
        ServerConnection c = this.source;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.valueOf(c) + sql);
        }
         ……
        int rs = ServerParse.parse(sql);
        boolean isWithHint = ServerParse.startWithHint(sql);
        int sqlType = rs & 0xff;
        ……
        switch (sqlType) {
        ……
            case ServerParse.LOAD_DATA_INFILE_SQL:
                    //对LOAD DATA的处理,调用FrontendConnection#loadDataInfileStart方法
                    c.loadDataInfileStart(sql);
                    break;
      ……
      }
  }

代码100分

继续看一下 FrontendConnection#loadDataInfileStart 方法:

代码100分	public void loadDataInfileStart(String sql) {
        if (loadDataInfileHandler != null) {
            try {
                //进一步调用了ServerLoadDataInfileHandler#start方法
                loadDataInfileHandler.start(sql);
            } catch (Exception e) {
                LOGGER.info("load data error", e);
                writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.getMessage());
            }

        } else {
            writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "load data infile sql is not  unsupported!");
        }
    }

下面便进入到了 ServerLoadDataInfileHandler#start 方法,前面讲过该类主要处理的是 DBLE 与客户端的交互逻辑。

该方法比较长,大家可以去细看,主要功能还是解析了客户端发送过来的 SQL 语句,然后针对 LOAD DATA 语法,如果导入文件是本机文件,则直接进行解析,否则的话会向客户端发送获取文件的命令,让客户端传输文件过来:

public void start(String strSql) {
        ……
        parseLoadDataPram();
        //如果文件不在本地,则向客户端发送命令,请求数据文件,这里的local可能会让人疑惑,但MySQL语法确实是这么规定的,load data local用法反而是文件不在本地的用法
        if (statement.isLocal()) {
            isStartLoadData = true;
            //request file from client
            ByteBuffer buffer = serverConnection.allocate();
            RequestFilePacket filePacket = new RequestFilePacket();
            filePacket.setFileName(fileName.getBytes());
            filePacket.setPacketId(1);
            filePacket.write(buffer, serverConnection, true);
        } else {
            //如果文件在本地的话,先判断文件是否存在,不存在则报错,存在的话需要对文件进行读取,计算每一行的路由结果,然后对不同节点的数据分别进行存储
            if (!new File(fileName).exists()) {
                String msg = fileName + " is not found!";
                clear();
                serverConnection.writeErrMessage(ErrorCode.ER_FILE_NOT_FOUND, msg);
            } else {
                if (parseFileByLine(fileName, loadData.getCharset(), loadData.getLineTerminatedBy())) {
                    RouteResultset rrs = buildResultSet(routeResultMap);
                    if (rrs != null) {
                        flushDataToFile();
                        isStartLoadData = false;
                        serverConnection.getSession2().execute(rrs);
                    }
                }
            }
        }
    }

DBLE 发送命令给客户端后,客户端便会源源不断地把数据文件发送过来,对发送过来文件的处理逻辑在 ServerLoadDataInfileHandler#handle 方法中,该方法其实就是对传输过来的文件进行转储,默认数据小于 200Mb 则存在内存中,否则的话存储到本地文件:

代码100分public void handle(byte[] data) {
        try {
            if (sql == null) {
                clear();
                serverConnection.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command");
                return;
            }
            BinaryPacket packet = new BinaryPacket();
            ByteArrayInputStream inputStream = new ByteArrayInputStream(data, 0, data.length);
            packet.read(inputStream);
            //这里就是对发送过来的文件进行转储
            saveByteOrToFile(packet.getData(), false);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

文件发送完成,客户端还会发送一个空包过来,告诉 DBLE 数据发送完了,然后 DBLE 会进行下一步处理(其实这里就是 MySQL 协议中的规定),下一步处理的逻辑在 ServerLoadDataInfileHandler#end 方法中。

该方法也比较长,主要处理逻辑是将接受过来的文件进一步计算路由,根据计算结果将文件根据不同节点分别存储,最后构建路由结果集,通过 DBLE 下发 LOAD DATA 命令到后端不同的 MySQL 节点:

public void end(byte packId) {
        isStartLoadData = false;
        this.packID = packId;
        //empty packet for end
        saveByteOrToFile(null, true);

        if (isHasStoreToFile) {
            //这里便是计算路由,并根据路由结果存储不同节点的数据文件
            parseFileByLine(tempFile, loadData.getCharset(), loadData.getLineTerminatedBy());
        }
        ……
        //构建路由结果集,下发后端MySQL,执行LOAD DATA命令
        RouteResultset rrs = buildResultSet(routeResultMap);
        if (rrs != null) {
            flushDataToFile();
            serverConnection.getSession2().execute(rrs);
        }
}

DBLE 与后端 MySQL 的交互逻辑跟客户端与 DBLE 的交互逻辑基本一样,因为都是基于 MySQL 协议嘛,DBLE 这边还需要做的就是将不同节点的数据文件发送给后端的 MySQL,具体的逻辑在 LoadDataUtil#requestFileDataResponse 方法中,该方法就是将 DBLE 处理过的数据文件,发送到后端的 MySQL 了,由 MySQL 来进行真正的数据存储:

public static void requestFileDataResponse(byte[] data, BackendConnection conn) {
        byte packId = data[3];
        MySQLConnection c = (MySQLConnection) conn;
        RouteResultsetNode rrn = (RouteResultsetNode) conn.getAttachment();
        LoadData loadData = rrn.getLoadData();
        List<String> loadDataData = loadData.getData();

        BufferedInputStream in = null;
        try {
            //如果数据较小,都在内存中,则直接发送
            if (loadDataData != null && loadDataData.size() > 0) {
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                for (String loadDataDataLine : loadDataData) {
                    String s = loadDataDataLine + loadData.getLineTerminatedBy();
                    byte[] bytes = s.getBytes(CharsetUtil.getJavaCharset(loadData.getCharset()));
                    bos.write(bytes);
                }
                packId = writeToBackConnection(packId, new ByteArrayInputStream(bos.toByteArray()), c);
            } else {
                //否则的话,先读取文件,然后再发送数据
                in = new BufferedInputStream(new FileInputStream(loadData.getFileName()));
                packId = writeToBackConnection(packId, in, c);
            }
        } 
     ……
    }

到这里,整个 DBLE 对 LOAD DATA 的处理流程就讲完啦。

4.总结

本篇文章主要分析讲解了 DBLE 对 LOAD DATA 功能的实现,包括方案设计以及源码解读,希望大家看完后能对整个 LOAD DATA 功能有更进一步的了解。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/9562.html

(0)
上一篇 2023-02-05
下一篇 2023-02-05

相关推荐

  • 面试官问你:MySQL事务和隔离级别,你该如何回答[亲测有效]

    面试官问你:MySQL事务和隔离级别,你该如何回答[亲测有效]一、事务 事务是由一组SQL语句组成的逻辑处理单元,是满足 ACID 特性的一组操作,可以通过 Commit 提交一个事务,也可以使用 Rollback 进行回滚。事务具有以下4个属性,通常简称为事…

    2023-04-08
    150
  • 2、操作数据库「终于解决」

    2、操作数据库「终于解决」
    操作数据库 操作数据库 > 操作数据库中的表 > 操作数据库中表的数据 MySQL关键字不区分大小写 2.1、操作数据库(了解) 1.创建数据库 CREAT…

    2023-04-05
    159
  • MariaDB重置密码[通俗易懂]

    MariaDB重置密码[通俗易懂]按照以下步骤重置MariaDB root密码 1.停掉mariaDB systemctl stop mariadb.service 2.关闭系统里的MySQL进程 ps -ef | grep mar…

    2023-04-06
    155
  • 上海哪里有开住宿费发票

    上海哪里有开住宿费发票电13564998196 陈生详情-项.目.齐.全 可先开验。-本报讯(劳动报记者陆燕婷)聚焦餐饮行业,58同城招聘研究院昨 发布数据显示,今年上半年,全国餐饮行业招聘需求增长46.18%,平均月薪…

    2023-02-15
    160
  • Python文件操作:如何正确关闭文件

    Python文件操作:如何正确关闭文件在Python文件操作中,使用open()函数打开文件后需要及时关闭文件。如果程序在将文件对象用完后未关闭它,就可能会导致数据丢失、系统资源占用过多,严重时可能会导致系统崩溃。

    2024-01-13
    107
  • MySQL学习笔记(10):视图

    MySQL学习笔记(10):视图本文更新于2019-06-22,使用MySQL 5.7,操作系统为Deepin 15.4。 为了便于描述,此处将创建视图的DDL复述一次,其已于“SQL”章节描述。 CREATE [OR REPLAC

    2023-03-17
    188
  • Python如何关闭线程

    Python如何关闭线程线程是计算机处理任务的一种方式,相对于进程而已,线程更轻量级并且占用的资源更少。Python在多线程编程方面提供了很方便的API,但是,一旦线程开始执行,我们要怎么关闭它呢?本文将探讨如何关闭Python线程。

    2024-07-31
    30
  • 故障分析 | MySQL 派生表优化

    故障分析 | MySQL 派生表优化作者:xuty 一、问题 SQL 原 SQL 如下: select name,count(name) from bm_id a left JOIN (select TaskName from up_…

    2023-02-02
    166

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注