返回顶部
首页 > 资讯 > 数据库 >IoTDB-WAL解析之InputStream.available() 在SingleFileLogReader的应用
  • 691
分享到

IoTDB-WAL解析之InputStream.available() 在SingleFileLogReader的应用

摘要

IoTDB-WAL解析之InputStream.available() 在SingleFileLogReader的应用

先来看为什么我要单独查看这个方法,当我阅读ioTDB 的wal 读取方法的时候,发现读取数据的时候根据 available() 方法获取当前可读取的数据量,但是在网络编程中,应用这个方法会有个问题

查看 InputStream 的方法注释,available 是个非阻塞的操作,在网络拥堵情况下不会等待数据流全部返回以后才执行,在业务数量大的时候,服务方数据同步返回时间比较长,还未等到数据流返回,程序已经开始执行 is.available(),从而导致服务方有返回数据,而is.available()=0 

InputStream.available() 的默认实现 永远返回 0 ,注意到注释This method should be overridden by subclasses. 告诉我们子类需要复写该方法,代码如下

IoTDB 的 SingleFileLogReader 判断是否还有需要读取的执行计划的判断依据之一是根据返回的可读取数量和 LEAST_LOG_SIZE = 12  进行比较。


if (logStream.available() < LEAST_LOG_SIZE) {
  return false;
}

现在我们来看看IoTDB 中使用这个方法是否会有问题?

logStream 的实例化方式见 open 方法


public void open(File logFile) throws FileNotFoundException {
  close();
  logStream = new DataInputStream(new BufferedInputStream(new FileInputStream(logFile)));
  logger.info("open WAL file: {} size is {}", logFile.getName(), logFile.length());
  this.filepath = logFile.getPath();
  idx = 0;
}

可以看到 inputStream 是通过 FileInputStream 进行初始化的,FileInputStream 又是调用了 available0 的native 方法


public int available() throws IOException {
    return available0();
}

private native int available0() throws IOException;

我以 https://GitHub.com/openjdk/jdk  在 window 的实现为例,找到  src/java.base/share/native/libjava/FileInputStream.c 文件 ,代码如下


JNIEXPORT jint JNICALL
Java_java_io_FileInputStream_available0(JNIEnv *env, jobject this) {
    jlong ret;
    FD fd = getFD(env, this, fis_fd);
    if (fd == -1) {
        JNU_ThrowIOException (env, "Stream Closed");
        return 0;
    }
    if (IO_Available(fd, &ret)) {
        if (ret > INT_MAX) {
            ret = (jlong) INT_MAX;
        } else if (ret < 0) {
            ret = 0;
        }
        return jlong_to_jint(ret);
    }
    JNU_ThrowIOExceptionWithLastError(env, NULL);
    return 0;
}

调用了 IO_Available 方法返回了文件的大小,这个方法 在 src/java.base/unix/native/libjava/io_util_md.h 头文件中做了定义


#define IO_Available handleAvailable

也就是调用了 src/java.base/unix/native/libjava/io_util_md.c 文件中的 handleAvailable 方法 代码如下


int
handleAvailable(FD fd, jlong *pbytes) {
    HANDLE h = (HANDLE)fd;
    DWord type = 0;

    type = GetFileType(h);
    
    if (type == FILE_TYPE_CHAR || type == FILE_TYPE_PIPE) {
        int ret;
        long lpbytes;
        HANDLE stdInHandle = GetStdHandle(STD_INPUT_HANDLE);
        if (stdInHandle == h) {
            ret = handleStdinAvailable(fd, &lpbytes); 
        } else {
            ret = handleNonSeekAvailable(fd, &lpbytes); 
        }
        (*pbytes) = (jlong)(lpbytes);
        return ret;
    }
    
    if (type == FILE_TYPE_DISK) {
        jlong current, end;

        LARGE_INTEGER filesize;
        current = handleLseek(fd, 0, SEEK_CUR);
        if (current < 0) {
            return FALSE;
        }
        if (GetFileSizeEx(h, &filesize) == 0) {
            return FALSE;
        }
        end = long_to_jlong(filesize.QuadPart);
        *pbytes = end - current;
        return TRUE;
    }
    return FALSE;
}

查看GetFileTypeGetFileSizeExapi文档


// FILE_TYPE_CHAR  字符文件,典型的如:打印设备或控制台
// FILE_TYPE_DISK  磁盘文件
// FILE_TYPE_PIPE  管道文件,如Socket,命名管道,匿名管道
// FILE_TYPE_REMOTE 未使用
// FILE_TYPE_UNKNOWN  未知设备,或者函数调用出错
DWORD GetFileType(
  // 文件句柄
  _In_ HANDLE hFile
);
 
BOOL GetFileSizeEx(
  // 文件句柄
  _In_  HANDLE         hFile,
  // 接收文件大小的长整型指针
  _Out_ PLARGE_INTEGER lpFileSize
);

结论就是 FileInputStream 复写了 InputStream 的  available() 方法, 使用 C 通过传入的File 转换为 文件句柄 HANDLE  调用 windows  API  来获取文件大小,所以不存在网络拥堵带来的问题。

 

参考

Https://stackoverflow.com/questions/5826198/inputstream-available-is-0-always

https://blog.csdn.net/qq_36918149/article/details/103022221

 

 

 

 

您可能感兴趣的文档:

--结束END--

本文标题: IoTDB-WAL解析之InputStream.available() 在SingleFileLogReader的应用

本文链接: https://lsjlt.com/news/8258.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作