IoTDB-WAL解析之InputStream.available() 在SingleFileLogReader的应用「建议收藏」

IoTDB-WAL解析之InputStream.available() 在SingleFileLogReader的应用「建议收藏」先来看为什么我要单独查看这个方法,当我阅读IoTDB 的wal 读取方法的时候,发现读取数据的时候根据 available() 方法获取当前可读取的数据量,但是在网络编程中,应用这个方法会有个问题 …

IoTDB-WAL解析之InputStream.available() 在SingleFileLogReader的应用

先来看为什么我要单独查看这个方法,当我阅读IoTDB 的wal 读取方法的时候,发现读取数据的时候根据 available() 方法获取当前可读取的数据量,但是在网络编程中,应用这个方法会有个问题

查看 InputStream 的方法注释,available 是个非阻塞的操作,在网络拥堵情况下不会等待数据流全部返回以后才执行,在业务数量大的时候,服务方数据同步返回时间比较长,还未等到数据流返回,程序已经开始执行 is.available(),从而导致服务方有返回数据,而is.available()=0 

InputStream.available() 的默认实现 永远返回 0 ,注意到注释This method should be overridden by subclasses. 告诉我们子类需要复写该方法,代码如下

IoTDB-WAL解析之InputStream.available() 在SingleFileLogReader的应用「建议收藏」

IoTDB 的 SingleFileLogReader 判断是否还有需要读取的执行计划的判断依据之一是根据返回的可读取数量和 LEAST_LOG_SIZE = 12  进行比较。

if (logStream.available() < LEAST_LOG_SIZE) {
  return false;
}

现在我们来看看IoTDB 中使用这个方法是否会有问题?

IoTDB-WAL解析之InputStream.available() 在SingleFileLogReader的应用「建议收藏」

logStream 的实例化方式见 open 方法

public void open(File logFile) throws FileNotFoundException {
  close();
  logStream = new DataInputStream(new BufferedInputStream(new FileInputStream(logFile)));
  logger.info("open WAL file: {} size is {}", logFile.getName(), logFile.length());
  this.filepath = logFile.getPath();
  idx = 0;
}

可以看到 inputStream 是通过 FileInputStream 进行初始化的,FileInputStream 又是调用了 available0 的native 方法

public int available() throws IOException {
    return available0();
}

private native int available0() throws IOException;

我以 https://github.com/openjdk/jdk  在 window 的实现为例,找到  src/java.base/share/native/libjava/FileInputStream.c 文件 ,代码如下

JNIEXPORT jint JNICALL
Java_java_io_FileInputStream_available0(JNIEnv *env, jobject this) {
    jlong ret;
    FD fd = getFD(env, this, fis_fd);
    if (fd == -1) {
        JNU_ThrowIOException (env, "Stream Closed");
        return 0;
    }
    if (IO_Available(fd, &ret)) {
        if (ret > INT_MAX) {
            ret = (jlong) INT_MAX;
        } else if (ret < 0) {
            ret = 0;
        }
        return jlong_to_jint(ret);
    }
    JNU_ThrowIOExceptionWithLastError(env, NULL);
    return 0;
}

调用了 IO_Available 方法返回了文件的大小,这个方法 在 src/java.base/unix/native/libjava/io_util_md.h 头文件中做了定义

#define IO_Available handleAvailable

也就是调用了 src/java.base/unix/native/libjava/io_util_md.c 文件中的 handleAvailable 方法 代码如下

int
handleAvailable(FD fd, jlong *pbytes) {
    HANDLE h = (HANDLE)fd;
    DWORD type = 0;

    type = GetFileType(h);
    /* Handle is for keyboard or pipe */
    if (type == FILE_TYPE_CHAR || type == FILE_TYPE_PIPE) {
        int ret;
        long lpbytes;
        HANDLE stdInHandle = GetStdHandle(STD_INPUT_HANDLE);
        if (stdInHandle == h) {
            ret = handleStdinAvailable(fd, &lpbytes); /* keyboard */
        } else {
            ret = handleNonSeekAvailable(fd, &lpbytes); /* pipe */
        }
        (*pbytes) = (jlong)(lpbytes);
        return ret;
    }
    /* Handle is for regular file */
    if (type == FILE_TYPE_DISK) {
        jlong current, end;

        LARGE_INTEGER filesize;
        current = handleLseek(fd, 0, SEEK_CUR);
        if (current < 0) {
            return FALSE;
        }
        if (GetFileSizeEx(h, &filesize) == 0) {
            return FALSE;
        }
        end = long_to_jlong(filesize.QuadPart);
        *pbytes = end - current;
        return TRUE;
    }
    return FALSE;
}

查看GetFileTypeGetFileSizeEx的API文档

// FILE_TYPE_CHAR  字符文件,典型的如:打印设备或控制台
// FILE_TYPE_DISK  磁盘文件
// FILE_TYPE_PIPE  管道文件,如Socket,命名管道,匿名管道
// FILE_TYPE_REMOTE 未使用
// FILE_TYPE_UNKNOWN  未知设备,或者函数调用出错
DWORD GetFileType(
  // 文件句柄
  _In_ HANDLE hFile
);
 
BOOL GetFileSizeEx(
  // 文件句柄
  _In_  HANDLE         hFile,
  // 接收文件大小的长整型指针
  _Out_ PLARGE_INTEGER lpFileSize
);

结论就是 FileInputStream 复写了 InputStream 的  available() 方法, 使用 C 通过传入的File 转换为 文件句柄 HANDLE  调用 windows  API  来获取文件大小,所以不存在网络拥堵带来的问题。

 

参考

https://stackoverflow.com/questions/5826198/inputstream-available-is-0-always

https://blog.csdn.net/qq_36918149/article/details/103022221

 

 

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/6349.html

(0)
上一篇 2023-04-12 11:30
下一篇 2023-04-12

相关推荐

  • 技术分享 | Xtrabackup 备份中 Xtrabackup_binlog_info 文件记录的 GTID 信息是否准确?

    技术分享 | Xtrabackup 备份中 Xtrabackup_binlog_info 文件记录的 GTID 信息是否准确?作者:何政 本文来源:原创投稿 *原创内容未经授权不得随意使用,转载请联系小编并注明来源。 Xtrabackup 是由 percona 开源的免费数据库热备份软件,它能对 InnoDB 和 Xtra…

    2023-02-04
    128
  • mysql -h -u_MySQL date

    mysql -h -u_MySQL date在mysql中,hint指的是“查询优化提示”,会提示优化器按照一定的方式来生成执行计划进行优化,让用户的sql语句更具灵活性;Hint可基于表的连接顺序、方法、访问路径、并行度等规则对DML(数据操

    2023-06-16
    155
  • 优化Python程序执行速度的5种线程技巧

    优化Python程序执行速度的5种线程技巧a href=”https://beian.miit.gov.cn/”苏ICP备2023018380号-1/a Copyright www.python100.com .Some Rights Reserved.

    2024-01-14
    101
  • Python安装教程

    Python安装教程Python是一种高级编程语言,具有简洁易读、面向对象、可扩展等特点。它可以用于开发Web应用、人工智能、自然语言处理、数据科学、机器学习等领域。Python的安装对于想要学习或开发Python的人来说,是一个关键的第一步。

    2024-05-25
    85
  • 20200616学习笔记

    20200616学习笔记count(*) 的实现方式 在不同的 MySQL 引擎中,count(*) 有不同的实现方式 MyISAM 引擎把一个表的总行数存在了磁盘上,因此执行 count(*) 的时候会直接返回这个数,效…

    2023-03-12
    188
  • Python新建数组教程

    Python新建数组教程Python 是一种面向对象、解释型计算机程序设计语言。它既适用于初学者又能胜任专业程序员的工作。

    2024-09-03
    22
  • ol7.7安装部署4节点spark3.0.0分布式集群

    ol7.7安装部署4节点spark3.0.0分布式集群为学习spark,虚拟机中开4台虚拟机安装spark3.0.0底层hadoop集群已经安装好,见ol7.7安装部署4节点hadoop 3.2.1分布式集群学习环境首先,去http://spark.ap

    2023-03-19
    145
  • 如何查看pip版本

    如何查看pip版本pip是一个用于Python软件包安装的工具,它使得安装、卸载和管理Python软件包变得容易。在使用pip时,我们可能需要知道自己的pip版本,或者还需要查看一些其他信息,如可用包的版本、支持的文件名和版本等。在这篇文章中,我们将会详细介绍如何查看pip版本及其他相关信息。

    2024-09-07
    23

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注