Paimon哈希lookup文件构建解析_paimon 文件索引
HashLookupStoreWriter
这个类的输入也是一个合并的SST文件,键有序且唯一
HashLookupStoreWriter 这个类 是 Paimon 中用于创建本地 Key-Value 查找文件(Lookup File)的两种实现之一(另一种是 SortLookupStoreWriter),它构建的是一个基于哈希的持久化存储结构。
HashLookupStoreWriter 实现了 LookupStoreWriter 接口,其核心职责是接收一系列的 (byte[] key, byte[] value) 对,并将它们写入一个文件中,这个文件最终能被 HashLookupStoreReader 高效地读取和查询。
它的核心思想是构建一个磁盘上的哈希表 (On-Disk Hash Table)。为了实现这个目标,它采用了以下几个关键策略:
- 按 Key 长度分治: 它不是将所有 Key 放入一个巨大的哈希表中,而是根据 Key 的字节长度进行分组。每个长度的 Key 拥有自己独立的索引区和数据区。这极大地简化了设计,因为同一个哈希表中的所有 Key 长度相同,使得每个哈希槽(slot)的大小可以固定。
- 两阶段写入:
- 阶段一(收集): 在
put方法被调用时,它并不直接构建最终的哈希表。而是将数据写入临时的、按 Key 长度分开的顺序文件中。具体来说,会创建temp_index.dat(存储 key 和 value 的偏移量)和data.dat(存储 value 的具体内容)。 - 阶段二(构建与合并): 在
close方法被调用时,它才开始真正的构建工作。它会读取第一阶段生成的临时文件,在内存中构建哈希索引,然后将哈希索引写入新的临时文件index.dat。最后,将所有临时文件(布隆过滤器、所有长度的索引文件、所有长度的数据文件)按顺序合并成一个最终的输出文件。
- 阶段一(收集): 在
- 开放地址法解决哈希冲突: 在构建哈希索引时,如果发生哈希冲突,它采用简单的线性探测(Linear Probing)来寻找下一个可用的哈希槽。
核心成员变量分析
// ... existing code ...public class HashLookupStoreWriter implements LookupStoreWriter { // ... existing code ... // 哈希表的加载因子 private final double loadFactor; // 临时文件夹和最终输出文件 private final File tempFolder; private final File outputFile; // 按 key 长度组织的临时索引文件和流 private File[] indexFiles; private DataOutputStream[] indexStreams; // 按 key 长度组织的临时数据文件和流 private File[] dataFiles; private DataOutputStream[] dataStreams; // ... 其他统计和缓存变量 ... private int keyCount; private int[] keyCounts; // 按 key 长度记录的 key 数量 // ... private int collisions; // 布隆过滤器构建器 @Nullable private final BloomFilter.Builder bloomFilter; // 压缩相关 @Nullable private final BlockCompressionFactory compressionFactory; private final int compressPageSize;// ... existing code ...
loadFactor: 哈希表的加载因子,用于计算哈希表需要多少个槽位(slots = keyCount / loadFactor),是空间利用率和冲突率之间的一个权衡。tempFolder: 一个临时目录,用于存放所有在构建过程中产生的中间文件。-
private final File outputFile; 最终生成的、持久化的、可供 HashLookupStoreReader 读取的目标文件。所有临时文件最终会合并、构建并写入到这个文件中。
-
private final File tempFolder; 一个临时的文件夹,用于存放写入过程中产生的所有中间文件(如 temp_index_xx.dat, data_xx.dat 等)。这个文件夹会在 close() 方法执行完毕后被清理掉。
indexFiles/indexStreams:用于写入临时索引数据的文件和流。indexFiles[i] 对应 key 长度为 i 的临时索引文件(temp_index_i.dat)。put 方法会向其中写入 (Key, Value地址) 对。dataFiles/dataStreams: 用于写入临时值(Value)数据的文件和流。dataFiles[i] 对应 key 长度为 i 的临时数据文件(data_i.dat)。put 方法会向其中写入不重复的 Value。bloomFilter: 可选的布隆过滤器,用于在查询时快速判断一个 Key 是否肯定不存在。-
private final int compressPageSize; 用于最终输出文件 (outputFile) 的压缩配置。compressionFactory 指定压缩算法(如 ZSTD, LZ4),compressPageSize 指定压缩块的大小。如果 compressionFactory 为 null,则不进行压缩。
状态与统计 (State and Statistics)
这些变量用于在写入过程中跟踪状态和收集统计数据。除了 keyCount, valueCount, collisions 外,其他也都是按 key 长度分区的数组。
-
private byte[][] lastValues; -
private int[] lastValuesLength;- 作用:用于值去重优化。缓存每个
key长度分区下,上一次写入的value及其长度。在put新数据时,会先检查value是否和缓存的相同,如果相同则不必重复写入value数据,只需复用上一个value的地址即可。
- 作用:用于值去重优化。缓存每个
-
private long[] dataLengths;- 作用:跟踪每个临时数据文件 (
data_i.dat) 当前已写入的总字节数。这个值直接被用作下一个value的地址(偏移量)。
- 作用:跟踪每个临时数据文件 (
-
private int[] maxOffsetLengths;- 作用:记录每个
key长度分区中,value地址(offset)经过变长编码后占用的最大字节数。这个值在buildIndex时至关重要,因为它决定了最终哈希索引中每个Slot为地址部分需要预留多大的空间。
- 作用:记录每个
-
private int keyCount; -
private int[] keyCounts;- 作用:
keyCount记录写入的key的总数。keyCounts[i]记录长度为i的key的数量。用于计算loadFactor和日志统计。
- 作用:
-
private int valueCount;- 作用:记录写入的唯一
value的数量。
- 作用:记录写入的唯一
-
private int collisions;- 作用:在
buildIndex构建最终哈希索引时,统计发生的哈希冲突次数。这是一个衡量哈希函数和加载因子效果的指标。
- 作用:在
put(byte[] key, byte[] value) 方法
这是数据写入的入口,执行的是上述的阶段一(收集)。
// ... existing code ... @Override public void put(byte[] key, byte[] value) throws IOException { // 1. 按 Key 的长度进行分组 int keyLength = key.length; // 2. 获取该 Key 长度对应的“临时索引文件”的输出流 DataOutputStream indexStream = getIndexStream(keyLength); // 3. 将 Key 的原始字节写入“临时索引文件” indexStream.write(key); // 4. 值去重优化:检查当前 value 是否和上一个相同 byte[] lastValue = lastValues[keyLength]; boolean sameValue = lastValue != null && Arrays.equals(value, lastValue); // 5. 计算 Value 在“临时数据文件”中的地址(偏移量) long dataLength = dataLengths[keyLength]; if (sameValue) { // 如果值相同,就复用上一个值的地址 dataLength -= lastValuesLength[keyLength]; } // 6. 将这个地址(偏移量)写入“临时索引文件” int offsetLength = VarLengthIntUtils.encodeLong(indexStream, dataLength); maxOffsetLengths[keyLength] = Math.max(offsetLength, maxOffsetLengths[keyLength]); // 7. 如果值是新的 (不和上一个重复),则将其写入“临时数据文件” if (!sameValue) { // 获取该 Key 长度对应的“临时数据文件”的输出流 DataOutputStream dataStream = getDataStream(keyLength); // 先写入 value 的长度,再写入 value 的内容 int valueSize = VarLengthIntUtils.encodeInt(dataStream, value.length); dataStream.write(value); // 更新统计信息和缓存 dataLengths[keyLength] += valueSize + value.length; lastValues[keyLength] = value; lastValuesLength[keyLength] = valueSize + value.length; valueCount++; } // 8. 更新 Key 的总数,并将 Key 添加到布隆过滤器 keyCount++; keyCounts[keyLength]++; if (bloomFilter != null) { bloomFilter.addHash(MurmurHashUtils.hashBytes(key)); } }// ... existing code ...
这个过程非常清晰:数据被按 Key 长度分流,Key 和 Value 的偏移量被顺序写入 temp_index 文件,Value 的内容被顺序写入 data 文件。
close() 方法
这是最核心的方法,执行阶段二(构建与合并)。
close() 方法是 HashLookupStoreWriter 生命周期中的终点,也是最关键、最复杂的一个环节。它标志着所有 put 操作的结束,并负责执行一系列的收尾工作,最终将所有临时的、分散的数据和索引文件,整合成一个单一、持久化、可供高效读取的最终查找文件。
// ... existing code ... @Override public Context close() throws IOException { // ... function body ... }// ... existing code ...
第一部分:关闭输入流 (Closing Input Streams)
方法的第一步是确保所有在 put 阶段打开的临时文件写入流都被正确关闭。
// ... existing code ... @Override public Context close() throws IOException { // Close the data and index streams for (DataOutputStream dos : dataStreams) { if (dos != null) { dos.close(); } } for (DataOutputStream dos : indexStreams) { if (dos != null) { dos.close(); } } // Stats LOG.info(\"Number of keys: {}\", keyCount); LOG.info(\"Number of values: {}\", valueCount);// ... existing code ...
逻辑分析:
- 关闭数据流 (
dataStreams): 遍历所有为不同keyLength创建的数据文件输出流,并关闭它们。close()操作会确保所有缓冲在内存中的数据都被刷写(flush)到磁盘上的临时数据文件 (data_xx.dat) 中。 - 关闭索引流 (
indexStreams): 同理,关闭所有临时索引文件的输出流。这会将所有(Key, 局部Value地址)对刷写到磁盘上的临时索引文件 (temp_index_xx.dat) 中。 - 打印统计信息: 此时,所有的
key和value都已处理完毕,可以打印出总的键数和值数,用于监控和调试。
第二部分:元数据计算与上下文准备 (Metadata Calculation & Context Preparation)
这是 close() 方法的核心计算部分。它在物理合并文件之前,预先计算出最终文件的完整布局和元数据
初始化的时候建立了布隆过滤器的信息:
当 HashLookupStoreReader 打开这个文件时,它会首先读取 HashContext(这个 context 对象在内存)。然后:
- 检查 context.bloomFilterEnabled。
- 如果为 true,它就知道文件的开头 0 到 context.bloomFilterBytes - 1 这段区间是布隆过滤器的实体数据。它可以直接读取这部分字节来重建内存中的布隆过滤器对象。
- 接下来,当需要查找某个 key 时,它会使用 context.indexOffsets 和 context.dataOffsets 等元数据。这些偏移量已经考虑了布隆过滤器所占用的空间。回忆一下 close() 方法中的计算:long indexesLength = bloomFilterBytes;,所有的索引偏移量都是从 bloomFilterBytes 之后开始计算的。
// ... existing code ... // Prepare files to merge List filesToMerge = new ArrayList(); int bloomFilterBytes = bloomFilter == null ? 0 : bloomFilter.getBuffer().size(); HashContext context = new HashContext( bloomFilter != null, bloomFilter == null ? 0 : bloomFilter.expectedEntries(), bloomFilterBytes, new int[keyCounts.length], new int[keyCounts.length], new int[keyCounts.length], new int[keyCounts.length], new long[keyCounts.length], 0, null); long indexesLength = bloomFilterBytes; long datasLength = 0; for (int i = 0; i 0) { // Write the key Count context.keyCounts[i] = keyCounts[i]; // Write slot count int slots = (int) Math.round(keyCounts[i] / loadFactor); context.slots[i] = slots; // Write slot size int offsetLength = maxOffsetLengths[i]; context.slotSizes[i] = i + offsetLength; // Write index offset context.indexOffsets[i] = (int) indexesLength; // Increment index length indexesLength += (long) (i + offsetLength) * slots; // Write data length context.dataOffsets[i] = datasLength; // Increment data length datasLength += dataLengths[i]; } } // adjust data offsets for (int i = 0; i < context.dataOffsets.length; i++) { context.dataOffsets[i] = indexesLength + context.dataOffsets[i]; }// ... existing code ...
逻辑分析:
- 创建
HashContext:HashContext是一个数据结构,它将作为最终文件的“目录”或“元数据头”。它包含了所有用于解析文件内容所需的信息。 - 计算布隆过滤器大小: 如果使用了布隆过滤器,计算其占用的字节数。
- 遍历分区计算元数据:
- 循环遍历每个
keyLength分区。 context.keyCounts: 记录该分区有多少个key。context.slots: 根据key数量和加载因子计算哈希表的槽位数。context.slotSizes: 计算每个哈希槽的大小。context.indexOffsets: 记录当前索引块在最终文件中的全局起始偏移量。这是通过累加器indexesLength实现的。context.dataOffsets: 记录当前数据块相对于所有数据块拼接体的局部起始偏移量。这是通过累加器datasLength实现的。
- 循环遍历每个
- 校正数据偏移量 (关键步骤):
- 在遍历完所有分区后,
indexesLength的值等于所有索引部分(包括布隆过滤器)的总长度。 - 再次循环,将这个
indexesLength加到每个分区的局部数据偏移量context.dataOffsets[i]上。 - 经过这一步,
context.dataOffsets[i]从一个局部偏移量被校正为了一个在最终文件中的全局绝对偏移量。
- 在遍历完所有分区后,
第三部分:构建、合并与清理 (Build, Merge & Cleanup)
在所有元数据都计算完毕后,方法开始执行实际的文件操作。
// ... existing code ... PageFileOutput output = PageFileOutput.create(outputFile, compressPageSize, compressionFactory); try { // Write bloom filter file if (bloomFilter != null) { File bloomFilterFile = new File(tempFolder, \"bloomfilter.dat\"); try (FileOutputStream bfOutputStream = new FileOutputStream(bloomFilterFile)) { bfOutputStream.write(bloomFilter.getBuffer().getArray()); LOG.info(\"Bloom filter size: {} bytes\", bloomFilter.getBuffer().size()); } filesToMerge.add(bloomFilterFile); } // Build index file for (int i = 0; i < indexFiles.length; i++) { if (indexFiles[i] != null) { filesToMerge.add(buildIndex(i)); } } // Stats collisions LOG.info(\"Number of collisions: {}\", collisions); // Add data files for (File dataFile : dataFiles) { if (dataFile != null) { filesToMerge.add(dataFile); } } // Merge and write to output checkFreeDiskSpace(filesToMerge); mergeFiles(filesToMerge, output); } finally { cleanup(filesToMerge); output.close(); }// ... existing code ...
逻辑分析:
- 创建输出对象 (
PageFileOutput): 根据是否配置了压缩,创建一个最终文件的输出对象。 try...finally块: 确保无论发生什么异常,output流和临时文件都能被正确处理。- 准备待合并文件列表 (
filesToMerge):- 布隆过滤器: 如果存在,将其写入一个临时文件,并添加到
filesToMerge列表的最前面。 - 索引文件: 调用
buildIndex(i)方法。这个方法会读取temp_index_xx.dat,构建最终的哈希索引文件index_xx.dat,并返回该文件。然后将其添加到列表中。 - 数据文件: 将所有临时数据文件
data_xx.dat添加到列表的最后面。
- 布隆过滤器: 如果存在,将其写入一个临时文件,并添加到
- 执行合并:
checkFreeDiskSpace(): 检查磁盘空间是否足够。mergeFiles(): 调用文件合并方法,将filesToMerge列表中的所有文件,按照添加的顺序,依次拼接到output中。
- 清理:
cleanup(): 删除所有在filesToMerge列表中的临时文件和临时文件夹。output.close(): 关闭最终的输出文件流,确保所有数据持久化。
第四部分:处理压缩并返回上下文 (Handle Compression & Return Context)
最后,处理压缩相关的元数据并返回 HashContext。
// ... existing code ... LOG.info( \"Compressed Total store size: {} Mb\", new DecimalFormat(\"#,##0.0\").format(outputFile.length() / (1024 * 1024))); if (output instanceof CompressedPageFileOutput) { CompressedPageFileOutput compressedOutput = (CompressedPageFileOutput) output; context = context.copy(compressedOutput.uncompressBytes(), compressedOutput.pages()); } return context; }// ... existing code ...
逻辑分析:
- 记录最终大小: 打印最终生成文件的大小。
- 处理压缩信息: 如果使用了压缩 (
CompressedPageFileOutput),从输出对象中获取解压后的大小、页信息等,并更新到HashContext中。 - 返回
Context: 返回填充好所有元数据的HashContext对象。这个对象会被序列化并存储起来,供HashLookupStoreReader使用。
总结
close() 方法是一个精心编排的过程,它像一个工厂的最后组装线,遵循以下逻辑顺序:
- 停止接收新零件 (关闭输入流)。
- 绘制总装蓝图 (计算元数据,校正地址)。
- 按蓝图施工 (构建索引,按顺序合并文件)。
- 清理工位 (删除临时文件)。
- 交付产品和说明书 (返回最终文件和
HashContext)。
通过这种方式,它成功地将一个复杂、分区的写入过程,转换成了一个单一、高效、可随机访问的持久化文件。
buildIndex(int keyLength) 方法
这个函数是 HashLookupStoreWriter 的核心所在。它的主要职责是将之前在 put 方法中收集到的、存储在临时文件里的 (Key, Value地址) 对,构建成一个最终的、可供快速查询的、基于哈SH希表的索引文件。
private File buildIndex(int keyLength) throws IOException { // ... function body ...}
第一部分:初始化与准备 (Initialization & Setup)
在构建索引之前,函数首先会进行一系列的计算和文件准备工作。
// ... existing code ... private File buildIndex(int keyLength) throws IOException { long count = keyCounts[keyLength]; int slots = (int) Math.round(count / loadFactor); int offsetLength = maxOffsetLengths[keyLength]; int slotSize = keyLength + offsetLength; // Init index File indexFile = new File(tempFolder, \"index\" + keyLength + \".dat\"); try (RandomAccessFile indexAccessFile = new RandomAccessFile(indexFile, \"rw\")) { indexAccessFile.setLength((long) slots * slotSize); FileChannel indexChannel = indexAccessFile.getChannel(); MappedByteBuffer byteBuffer = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, indexAccessFile.length()); // Init reading stream File tempIndexFile = indexFiles[keyLength]; DataInputStream tempIndexStream = new DataInputStream( new BufferedInputStream(new FileInputStream(tempIndexFile)));// ... existing code ...
逻辑分析:
-
计算核心参数:
count: 获取当前keyLength分区下key的总数。slots: 根据key的总数和加载因子loadFactor计算出哈希表需要多少个槽位 (Slot)。这是哈希表的容量。offsetLength: 获取当前分区下,Value 地址经过变长编码后的最大字节长度。slotSize: 计算出每个槽位的固定大小,等于key的长度加上offset的最大长度。这确保了哈希表是一个定长的数组结构,便于快速寻址。
-
创建最终索引文件:
indexFile: 在临时文件夹tempFolder中创建一个名为index.dat的文件。这将是最终生成的索引文件。indexAccessFile.setLength(...): 使用RandomAccessFile预先分配整个哈希表所需的磁盘空间 (slots * slotSize)。这一步至关重要,因为它是使用内存映射文件 (MappedByteBuffer) 的前提。
-
使用内存映射 (
MappedByteBuffer):indexChannel.map(...): 将预分配好的indexFile文件直接映射到内存。byteBuffer对象就代表了这块内存,对它的读写操作会由操作系统自动同步到磁盘文件上。这是一种极高性能的 I/O 方式,避免了频繁的系统调用。
-
准备数据源:
tempIndexStream: 打开之前存放(Key, Value地址)对的临时文件temp_index.dat,准备从中读取数据。
第二部分:遍历与哈希计算 (Iteration & Hashing)
准备工作就绪后,函数开始遍历所有 key,计算哈希值并准备将它们放入哈希表。
// ... existing code ... try { byte[] keyBuffer = new byte[keyLength]; byte[] slotBuffer = new byte[slotSize]; byte[] offsetBuffer = new byte[offsetLength]; // Read all keys for (int i = 0; i < count; i++) { // Read key tempIndexStream.readFully(keyBuffer); // Read offset long offset = VarLengthIntUtils.decodeLong(tempIndexStream); // Hash long hash = MurmurHashUtils.hashBytesPositive(keyBuffer);// ... existing code ...
逻辑分析:
- 初始化缓冲区: 创建几个
byte[]数组作为缓冲区,用于重复读取和写入,避免在循环中频繁创建对象。 - 主循环: 循环
count次,处理每一个(Key, Value地址)对。 - 读取数据:
tempIndexStream.readFully(keyBuffer): 从临时文件中读取一个key。VarLengthIntUtils.decodeLong(tempIndexStream): 紧接着读取这个key对应的value地址。
- 计算哈希:
MurmurHashUtils.hashBytesPositive(keyBuffer): 使用 MurmurHash 算法计算key的哈希值。这个哈希值将决定key在哈希表中的“理想”初始位置。
第三部分:寻址、冲突处理与写入 (Probing, Collision Handling & Writing)
这是 buildIndex 函数最核心、最复杂的部分。它实现了开放定址法 (Open Addressing) 中的线性探测 (Linear Probing) 来构建哈希表。
// ... existing code ... boolean collision = false; for (int probe = 0; probe < count; probe++) { int slot = (int) ((hash + probe) % slots); byteBuffer.position(slot * slotSize); byteBuffer.get(slotBuffer); long found = VarLengthIntUtils.decodeLong(slotBuffer, keyLength); if (found == 0) { // The spot is empty use it byteBuffer.position(slot * slotSize); byteBuffer.put(keyBuffer); int pos = VarLengthIntUtils.encodeLong(offsetBuffer, offset); byteBuffer.put(offsetBuffer, 0, pos); break; } else { collision = true; // Check for duplicates if (Arrays.equals(keyBuffer, Arrays.copyOf(slotBuffer, keyLength))) { throw new RuntimeException( String.format( \"A duplicate key has been found for for key bytes %s\", Arrays.toString(keyBuffer))); } } } if (collision) { collisions++; }// ... existing code ...
逻辑分析:
-
探测循环 (内层循环):
int slot = (int) ((hash + probe) % slots);: 这是线性探测的核心。probe从 0 开始。第一次 (probe=0),计算出的slot就是理想位置。如果该位置被占用,probe增加 1,继续探测下一个位置,以此类推。byteBuffer.position(...)和byteBuffer.get(...): 定位到计算出的slot位置,并读取该槽位的全部内容到slotBuffer中。
-
判断槽位状态:
long found = VarLengthIntUtils.decodeLong(slotBuffer, keyLength);: 这是一个非常巧妙的设计。它尝试从槽位数据中key之后的位置解码出offset。因为有效的offset至少为 1(getDataStream中预留了 1 字节),所以如果解码出的值为0,就意味着这个槽位是空的。
-
处理两种情况:
- Case A: 槽位为空 (
found == 0):- 找到了一个可以安放当前
key的位置。 byteBuffer.put(keyBuffer): 将key的内容写入该槽位。VarLengthIntUtils.encodeLong(...)和byteBuffer.put(...): 紧接着将value的地址也写入槽位。break;: 成功放入,跳出内层探测循环,继续处理下一个key。
- 找到了一个可以安放当前
- Case B: 槽位被占用 (
found != 0):collision = true;: 标记发生了哈希冲突。- 关键:重复键检查:
Arrays.equals(...)会比较当前要放入的key和槽位中已存在的key是否完全相同。 - 如果相同,说明用户提供了重复的
key,直接抛出RuntimeException,保证了key的唯一性。 - 如果不相同(只是哈希值一样),则什么也不做,内层循环继续,
probe加 1,去探测下一个槽位。
- Case A: 槽位为空 (
-
统计冲突: 如果内层循环中
collision标记被设为true(无论最终是否找到位置),说明这个key至少经历了一次冲突,全局冲突计数器collisions加 1。
第四部分:收尾与清理 (Finalization & Cleanup)
当所有 key 都被成功放入哈希表后,函数进行最后的清理工作。
// ... existing code ... } finally { // Close input tempIndexStream.close(); // Close index and make sure resources are liberated indexChannel.close(); // Delete temp index file if (tempIndexFile.delete()) { LOG.info(\"Temporary index file {} has been deleted\", tempIndexFile.getName()); } } } return indexFile; }// ... existing code ...
逻辑分析:
finally块: 确保无论是否发生异常,资源都能被正确释放。- 关闭流: 关闭临时的
tempIndexStream和indexChannel。 - 删除临时文件:
tempIndexFile的使命已经完成,它的内容已经被转换并写入了最终的indexFile,所以将其删除。 - 返回结果: 返回构建好的、持久化的
indexFile文件对象。这个文件将在close()方法中被合并到最终的outputFile中。
总结
buildIndex 函数是一个设计精巧且高效的哈希表构建器。它通过以下关键技术实现了其功能:
- 分而治之: 按
key的长度将数据分区处理。 - 两阶段构建:
put阶段只做快速数据收集,buildIndex阶段集中构建索引,实现了写操作的吞吐最大化。 - 内存映射文件 (
MappedByteBuffer): 提供了极高的 I/O 性能,将文件操作变成了内存操作。 - 开放定址法(线性探测): 一种简单有效的哈希冲突解决方案。
- 延迟的唯一性检查: 在构建索引时才检查
key的唯一性,而不是在每次put时,这是一种性能优化策略。
mergeFiles
这个函数在 HashLookupStoreWriter 的 close() 方法中扮演着最后“组装”的角色。在 close() 方法的前半部分,程序已经生成了所有必需的临时文件(布隆过滤器文件、各个 keyLength 对应的索引文件和数据文件)。mergeFiles 的职责就是将这些独立的、临时的文件块,按照预定的顺序,合并成一个最终的、完整的、可供读取的 outputFile。
// ... existing code ... // Merge files to the provided fileChannel private void mergeFiles(List inputFiles, PageFileOutput output) throws IOException { long startTime = System.nanoTime(); // Merge files for (File f : inputFiles) { if (f.exists()) { FileInputStream fileInputStream = new FileInputStream(f); BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream); try { LOG.info(\"Merging {} size={}\", f.getName(), f.length()); byte[] buffer = new byte[8192]; int length; while ((length = bufferedInputStream.read(buffer)) > 0) { output.write(buffer, 0, length); } } finally { bufferedInputStream.close(); fileInputStream.close(); } } else { LOG.info(\"Skip merging file {} because it doesn\'t exist\", f.getName()); } } LOG.info(\"Time to merge {} s\", ((System.nanoTime() - startTime) / 1000000000.0)); }// ... existing code ...
在分析 mergeFiles 之前,理解它在 close() 方法中是如何被调用的至关重要。
// ... existing code ... @Override public Context close() throws IOException {// ... (关闭流) ... // 1. 准备要合并的文件列表 List filesToMerge = new ArrayList();// ... (计算元数据) ... // 2. 创建最终的输出对象 PageFileOutput output = PageFileOutput.create(outputFile, compressPageSize, compressionFactory); try { // 3. 按顺序添加文件到列表 // 3.1 添加布隆过滤器文件 if (bloomFilter != null) { // ... filesToMerge.add(bloomFilterFile); } // 3.2 添加所有索引文件 for (int i = 0; i < indexFiles.length; i++) { if (indexFiles[i] != null) { filesToMerge.add(buildIndex(i)); } } // 3.3 添加所有数据文件 for (File dataFile : dataFiles) { if (dataFile != null) { filesToMerge.add(dataFile); } } // 4. 调用 mergeFiles checkFreeDiskSpace(filesToMerge); mergeFiles(filesToMerge, output); } finally { // 5. 清理临时文件 cleanup(filesToMerge); output.close(); }// ... (返回上下文) ... }// ... existing code ...
从 close() 方法的逻辑可以看出,filesToMerge 这个列表中的文件顺序是固定的:
- 布隆过滤器文件 (如果存在)
- 所有索引文件 (按
keyLength从小到大) - 所有数据文件 (按
keyLength从小到大)
这个顺序定义了最终 outputFile 的物理布局,HashLookupStoreReader 在读取时会严格依赖这个布局。
现在我们来逐行分析 mergeFiles 函数本身。它的实现非常直接,就是一个经典的文件拼接操作。
private void mergeFiles(List inputFiles, PageFileOutput output) throws IOException { long startTime = System.nanoTime(); // Merge files for (File f : inputFiles) { // ... } // ...}
逻辑分析:
long startTime = System.nanoTime();: 记录开始时间,用于后续计算合并过程的总耗时。这是一个很好的实践,用于性能监控。for (File f : inputFiles): 遍历close()方法中精心准备好的文件列表filesToMerge。循环的顺序就是文件被写入最终输出文件的顺序。
文件存在性检查与流的创建
// ...if (f.exists()) { FileInputStream fileInputStream = new FileInputStream(f); BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream); try { // ... } finally { bufferedInputStream.close(); fileInputStream.close(); }} else { LOG.info(\"Skip merging file {} because it doesn\'t exist\", f.getName());}// ...
逻辑分析:
if (f.exists()): 检查文件是否存在。这是一个健壮性措施。例如,如果某个keyLength没有任何数据,对应的临时文件就不会被创建,此时就应该跳过。FileInputStream和BufferedInputStream: 创建一个带缓冲的输入流来读取源文件 (f)。使用BufferedInputStream可以显著提高文件读取性能,因为它会一次性从磁盘读取一个较大的块到内存缓冲区,而不是一个字节一个字节地读。try...finally: 这是一个标准的资源管理模式,确保无论try块内部是否发生异常,输入流 (bufferedInputStream和fileInputStream) 都能被正确关闭,防止资源泄露。
核心读写循环
// ...LOG.info(\"Merging {} size={}\", f.getName(), f.length());byte[] buffer = new byte[8192]; // 8KB bufferint length;while ((length = bufferedInputStream.read(buffer)) > 0) { output.write(buffer, 0, length);}// ...
逻辑分析:
LOG.info(...): 打印日志,记录当前正在合并哪个文件及其大小。这对于调试和监控非常有用。byte[] buffer = new byte[8192];: 创建一个 8KB 大小的字节数组作为缓冲区。这是数据从输入流搬运到输出流的“卡车”。while ((length = bufferedInputStream.read(buffer)) > 0): 这是文件复制的核心。bufferedInputStream.read(buffer): 尝试从输入流中读取数据,填满buffer数组。该方法返回实际读到的字节数。- 如果返回值
length大于 0,说明成功读取到了数据。 - 如果返回 -1,说明已经到达文件末尾,循环终止。
output.write(buffer, 0, length);: 将刚刚从输入文件读到缓冲区buffer里的数据,原封不动地写入到最终的输出对象output中。注意这里写入的长度是length,即实际读到的字节数,而不是整个buffer的大小。
收尾工作
// ...LOG.info(\"Time to merge {} s\", ((System.nanoTime() - startTime) / 1000000000.0));
逻辑分析:
- 在循环结束后,计算总耗时(当前时间减去
startTime),并将其转换为秒后打印日志。
总结
mergeFiles 函数本身的功能非常纯粹和直接:它是一个高效的文件拼接器。
- 职责单一: 它的唯一职责就是将给定的文件列表按顺序、字节对字节地追加到输出流中。
- 高效性: 通过使用
BufferedInputStream和一个固定大小的byte[]缓冲区,它实现了高效的批量读写,最小化了磁盘 I/O 次数。 - 健壮性: 通过
try...finally保证了资源的正确释放,并通过if (f.exists())处理了可选文件不存在的情况。 - 依赖外部逻辑: 函数的正确性严重依赖于调用者(即
close()方法)传入的inputFiles列表的正确顺序。它本身不关心文件的内容,只负责“搬运”。
最终,这个函数将所有分散的临时部件(布隆过滤器、索引、数据)组装成一个结构完整、布局正确的持久化查找文件。
mergeFiles 之所以能直接合并,是因为:
- 写入和读取是解耦的:Writer 的工作是生成一个大的二进制文件和一个描述其内部结构的元数据对象 (HashContext)。
- 元数据先行:在物理合并之前,所有地址的计算和校正工作都已经完成,并保存在 HashContext 中。
- 读取时依赖元数据:当 HashLookupStoreReader 读取这个文件时,它会首先加载 HashContext 这个“地图”。当需要查找一个 key 时,它会:
- 根据 context.indexOffsets 找到对应索引块的全局起始位置。
- 在索引块中找到 key,读取出它对应的局部数据偏移量 local_offset。
- 根据 context.dataOffsets 找到对应数据块的全局起始位置 global_data_start。
- 最终通过 global_data_start + local_offset 计算出 value 的精确全局地址,然后去读取。
最终文件结构
close() 方法执行完毕后,生成的 outputFile 是一个二进制文件,其内部逻辑结构如下:
+--------------------------------+| || Bloom Filter Data | <-- (如果启用) 从文件开头,长度为 bloomFilterBytes| (BitSet a.k.a. 位数组) || |+--------------------------------+| || Index Block for KeyLen 1 | <-- 紧跟在布隆过滤器之后| |+--------------------------------+| Index Block for KeyLen 2 |+--------------------------------+| ... |+--------------------------------+| || Data Block for KeyLen 1 | <-- 紧跟在所有索引块之后| |+--------------------------------+| Data Block for KeyLen 2 |+--------------------------------+| ... |+--------------------------------+
close() 方法返回的 HashContext 对象包含了所有元数据(比如每个部分的起始偏移量、大小等),HashLookupStoreReader 正是依赖这些元数据来解析这个文件并执行查找。
总结
HashLookupStoreWriter 是一个设计精巧的本地 KV 存储写入器。它通过按 Key 长度分治和两阶段写入的策略,成功地在磁盘上构建了一个高效的、支持压缩和布隆过滤器的哈希表。这个组件是 Paimon 实现高性能 Lookup Join 的基石之一。
HashLookupStoreReader
HashLookupStoreReader 是 HashLookupStoreWriter 的配套读取类。Writer 负责构建一个优化的、持久化的哈希查找文件,而 Reader 则负责高效地从这个文件中读取数据。它的核心功能是实现 lookup(byte[] key) 方法,通过给定的 key 快速返回对应的 value。
类的结构与初始化 (Class Structure & Initialization)
HashLookupStoreReader 的构造函数是理解其工作原理的起点。它接收一个文件 (File) 和一个 HashContext 对象,然后初始化所有必要的组件。
// ... existing code ...public class HashLookupStoreReader implements LookupStoreReader, Iterable<Map.Entry> { // ... existing code ... // Key count for each key length private final int[] keyCounts; // Slot size for each key length private final int[] slotSizes; // Number of slots for each key length private final int[] slots; // Offset of the index for different key length private final int[] indexOffsets; // Offset of the data for different key length private final long[] dataOffsets; // File input view private FileBasedRandomInputView inputView; // Buffers private final byte[] slotBuffer; @Nullable private FileBasedBloomFilter bloomFilter;// ... existing code ...
逻辑分析:
- 元数据数组:
keyCounts,slotSizes,slots,indexOffsets,dataOffsets这些数组都直接从HashContext中获取。它们是Reader的“地图”,精确描述了最终文件中各个数据块(按keyLength划分)的位置和结构。 inputView: 一个FileBasedRandomInputView对象,提供了对底层文件的随机读取能力。这是实现高效查找的基础,因为它允许直接跳到文件的任意位置读取数据,而无需顺序扫描。它还集成了缓存管理器 (CacheManager),可以缓存文件页,减少磁盘I/O。slotBuffer: 一个字节数组缓冲区,用于临时存放从索引区读取的单个“哈希槽”的数据。其大小被设置为所有分区中最大的slotSize,以实现复用。bloomFilter: 一个可选的FileBasedBloomFilter对象。如果Writer写入了布隆过滤器,Reader就会初始化它,用于快速排除不存在的key。
构造函数
// ... existing code ... HashLookupStoreReader( File file, HashContext context, CacheManager cacheManager, int cachePageSize, @Nullable BlockCompressionFactory compressionFactory) throws IOException { // File path if (!file.exists()) { throw new FileNotFoundException(\"File \" + file.getAbsolutePath() + \" not found\"); } // 1. 加载元数据 keyCounts = context.keyCounts; slots = context.slots; slotSizes = context.slotSizes; int maxSlotSize = 0; for (int slotSize : slotSizes) { maxSlotSize = Math.max(maxSlotSize, slotSize); } slotBuffer = new byte[maxSlotSize]; indexOffsets = context.indexOffsets; dataOffsets = context.dataOffsets; LOG.info(\"Opening file {}\", file.getName()); // 2. 初始化文件输入视图 PageFileInput fileInput = PageFileInput.create( file, cachePageSize, compressionFactory, context.uncompressBytes, context.compressPages); inputView = new FileBasedRandomInputView(fileInput, cacheManager); // 3. 初始化布隆过滤器 if (context.bloomFilterEnabled) { bloomFilter = new FileBasedBloomFilter( fileInput, cacheManager, context.bloomFilterExpectedEntries, 0, // 从文件开头读取 context.bloomFilterBytes); } }// ... existing code ...
逻辑分析:
- 加载元数据: 将
HashContext中的所有元数据数组复制到类的成员变量中。 - 初始化文件视图: 创建
PageFileInput和FileBasedRandomInputView。PageFileInput负责处理可能的压缩,而FileBasedRandomInputView负责提供带缓存的随机读取接口。 - 初始化布隆过滤器: 如果
context表明布隆过滤器已启用,就创建一个FileBasedBloomFilter实例。注意构造函数中的参数:它从文件的偏移量 0 开始,读取context.bloomFilterBytes个字节来构建过滤器。这与Writer的文件布局约定完全一致。
核心查找逻辑 (lookup 方法)
这是 HashLookupStoreReader 最核心的方法,实现了从 key 到 value 的查找。
// ... existing code ... @Override public byte[] lookup(byte[] key) throws IOException { // 1. 预检查 int keyLength = key.length; if (keyLength >= slots.length || keyCounts[keyLength] == 0) { return null; } // 2. 布隆过滤器检查 int hashcode = MurmurHashUtils.hashBytes(key); if (bloomFilter != null && !bloomFilter.testHash(hashcode)) { return null; } // 3. 定位哈希表 long hashPositive = hashcode & 0x7fffffff; int numSlots = slots[keyLength]; int slotSize = slotSizes[keyLength]; int indexOffset = indexOffsets[keyLength]; long dataOffset = dataOffsets[keyLength]; // 4. 开放地址法探测 for (int probe = 0; probe < numSlots; probe++) { long slot = (hashPositive + probe) % numSlots; // 4.1 定位并读取哈希槽 inputView.setReadPosition(indexOffset + slot * slotSize); inputView.readFully(slotBuffer, 0, slotSize); // 4.2 检查 key 是否匹配 long offset = VarLengthIntUtils.decodeLong(slotBuffer, keyLength); if (offset == 0) { // 空槽,说明 key 不存在 return null; } if (isKey(slotBuffer, key)) { // 4.3 key 匹配成功,获取 value return getValue(dataOffset + offset); } } // 5. 探测完所有槽位仍未找到 return null; }// ... existing code ...
逻辑分析:
- 预检查: 根据
key的长度keyLength,检查是否存在对应的分区。如果该长度的key从未被写入过,直接返回null。 - 布隆过滤器: 如果布隆过滤器存在,先用
key的哈希值进行测试。如果过滤器说“肯定不存在”,则直接返回null,避免了昂贵的磁盘读取。这是第一道防线。 - 定位哈希表: 从元数据数组中获取当前
keyLength对应的哈希表参数:槽位数numSlots、槽大小slotSize、索引区的全局起始偏移量indexOffset和数据区的全局起始偏移量dataOffset。 - 开放地址法探测:
- 使用线性探测法(
probe从 0 开始增加)来解决哈希冲突。 - 4.1 定位和读取: 计算出当前探测的哈希槽在文件中的绝对位置 (
indexOffset + slot * slotSize),然后使用inputView.setReadPosition直接跳转到该位置,并将整个槽的内容读入slotBuffer。 - 4.2 检查:
- 一个哈希槽的结构是
[key, value_offset]。首先从slotBuffer中比较key部分 (isKey方法)。 - 如果
key匹配,就从slotBuffer的后半部分解析出value的局部偏移量offset。 - 如果
key不匹配,则继续下一次探测 (probe++)。 - 如果解析出的
offset为 0,表示这是一个空槽,根据开放地址法的原理,说明要查找的key不存在,可以直接返回null。
- 一个哈希槽的结构是
- 4.3 获取 Value: 如果
key匹配成功,调用getValue方法。注意传入的参数是dataOffset + offset,即将数据区的全局起始偏移量和值的局部偏移量相加,得到value在文件中的绝对地址。
- 使用线性探测法(
- 查找失败: 如果遍历了所有可能的槽位(理论上不会发生,因为有空槽检查)仍然没有找到,返回
null。
辅助方法 (isKey, getValue)
这两个是 lookup 调用的内部辅助方法。
// ... existing code ... private boolean isKey(byte[] slotBuffer, byte[] key) { for (int i = 0; i < key.length; i++) { if (slotBuffer[i] != key[i]) { return false; } } return true; } private byte[] getValue(long offset) throws IOException { inputView.setReadPosition(offset); // Get size of data int size = VarLengthIntUtils.decodeInt(inputView); // Create output bytes byte[] res = new byte[size]; inputView.readFully(res); return res; }// ... existing code ...
逻辑分析:
isKey: 逐字节比较slotBuffer的前key.length个字节与给定的key是否完全相同。getValue:- 使用
inputView.setReadPosition直接跳转到value的绝对地址offset。 value的存储格式是[value_length, value_data]。首先用VarLengthIntUtils.decodeInt读取变长的value长度size。- 创建一个大小为
size的字节数组res。 - 接着从文件中读取
size个字节到res中。 - 返回
res。
- 使用
总结
HashLookupStoreReader 是一个设计精良的只读存储访问器,其高效性体现在:
- 元数据驱动: 依赖
HashContext提供的精确“地图”来直接定位数据,避免了任何形式的扫描。 - 随机访问: 核心依赖
FileBasedRandomInputView提供的随机读取能力,实现了 O(1) 复杂度的查找(考虑到哈希冲突,平均复杂度接近 O(1))。 - 多层过滤:
- 通过
keyLength进行初步分区过滤。 - 通过可选的布隆过滤器进行快速存在性判断,大大减少了对不存在
key的磁盘 I/O。
- 通过
- 缓存优化: 底层的
CacheManager可以缓存访问过的文件页,对于热点数据的访问可以做到内存级别的速度。 - 零拷贝/低拷贝: 通过
FileBasedRandomInputView和缓冲区的使用,尽量减少了数据在内存中的复制。
它与 HashLookupStoreWriter 完美配合,共同构成了一个完整、高效的本地键值存储解决方案。
为什么不使用 BlockCache
不使用 BlockCache 的原因是:HashLookupStore 的文件结构是基于页(Page)的连续布局,而 SortLookupStore 的文件结构是基于块(Block)的离散布局。因此,它们需要不同的缓存读取策略。
FileBasedRandomInputView 是为前者设计的,而 BlockCache 是为后者设计的。
FileBasedRandomInputView:为“页式”访问设计
HashLookupStore 将所有数据(布隆过滤器、索引区、数据区)拼接成一个巨大的、连续的文件。它的访问模式是:
- 通过计算得出某个
key或value在这个大文件中的绝对偏移量 (absolute offset)。 - 直接跳转 (
seek) 到这个偏移量进行读取。
FileBasedRandomInputView 完美匹配这种模式:
- 缓存物理页:它将整个大文件视为由连续的、固定大小的页(比如 32KB)组成。当
setReadPosition(position)被调用时,它会计算出position落在哪个页里。 - 提供内存视图:它从
CacheManager中获取这个页对应的MemorySegment,然后将内部指针指向页内的正确偏移量。上层代码可以像操作内存一样读取数据,而无需关心磁盘 I/O。 - 无需解压:
HashLookupStore的数据在写入时就已经确定了最终形态,FileBasedRandomInputView这一层不负责解压逻辑(如果文件本身是压缩的,由更底层的PageFileInput处理,但对View是透明的)。
// ... existing code ... @Override public void setReadPosition(long position) { // 1. 计算出 position 属于哪个页 (currentSegmentIndex) int offset = (int) (position & this.segmentSizeMask); this.currentSegmentIndex = (int) (position >>> this.segmentSizeBits); // 2. 获取该页的 MemorySegment MemorySegment segment = getCurrentPage(); // 3. 在页内 seek seekInput(segment, offset, getLimitForSegment(segment)); }// ... existing code ...
BlockCache:为“块式”访问设计
我们来看一下 BlockCache 的典型用户 SortLookupStoreReader。这种存储格式类似于 RocksDB 的 SSTable,它的文件结构是:
- 文件由多个可变大小的数据块 (Data Block) 和一个索引块 (Index Block) 组成。
- 每个数据块内部的键值对是排序的,并且整个块可以被独立压缩(例如用 LZ4 或 ZSTD)。
- 查找过程是:先在索引块中找到
key可能在哪个数据块,然后获取这个数据块的句柄(BlockHandle,包含偏移量和大小)。
BlockCache 正是为这种“先定位、再读取整个块、然后解压”的模式设计的:
- 缓存逻辑块:它的缓存键
CacheKey包含position和length,直接对应一个逻辑块。 - 处理解压:它的
getBlock方法接收一个decompressFunc函数作为参数。当缓存未命中时,它会从磁盘读取length长度的原始(压缩)数据,然后调用decompressFunc函数进行解压,最后将解压后的数据放入缓存。
// ... existing code ... public MemorySegment getBlock( long position, int length, Function decompressFunc, boolean isIndex) { CacheKey cacheKey = CacheKey.forPosition(file, position, length, isIndex); // ... if (container == null || container.getAccessCount() == CacheManager.REFRESH_COUNT) { MemorySegment segment = cacheManager.getPage( cacheKey, key -> { // 1. 从磁盘读取原始(压缩)数据 byte[] bytes = readFrom(position, length); // 2. 调用传入的函数进行解压 return decompressFunc.apply(bytes); }, blocks::remove); // ... } return container.access(); }// ... existing code ...
总结
Paimon 在这里展现了优秀的软件设计思想:为不同的问题提供专门的解决方案,而不是用一个通用的方案去勉强适应所有场景。
-
对于
HashLookupStore这种**“大文件 + 绝对偏移量寻址”的场景,使用FileBasedRandomInputView进行物理页缓存**是最自然、最高效的。如果用BlockCache,你将无法确定length,因为访问是随机的,而不是按块进行的。 -
对于
SortLookupStore这种 “分块 + 独立压缩 + 块内搜索”的场景,使用BlockCache进行逻辑块缓存并处理解压 是必需的。如果用FileBasedRandomInputView,你只能拿到原始的压缩数据页,还需要自己拼接和解压,非常麻烦且低效。
因此,HashLookupStoreReader 不使用 BlockCache,是因为它的文件结构和访问模式决定了 FileBasedRandomInputView 才是那个“量身定做”的、正确的工具。


