双流join 、 Paimon Partial Update 和 动态schema_partial update paimon
背景
Paimon 通过其独特的 partial-update
合并引擎和底层的 LSM 存储结构,巧妙地将传统双流 Join 中对 Flink State 的高频随机读/写,转换为了对 Paimon 表的顺序写和后台的高效合并,从而一站式地解决了 Flink 作业状态过大、依赖外部 KV 系统等一系列痛点。
传统方案中,Flink 作业需要维护一个巨大的 State(可能达到 TB 级)来存储其中一个流的数据。当另一个流的数据到达时,需要去这个巨大的 State 中查找(Join)对应的记录。这个“查找”操作,在数据量巨大、内存无法完全容纳时,就会频繁触发对磁盘的随机读。机械硬盘和固态硬盘的随机读性能远低于顺序读,这成为了整个作业的性能瓶瓶颈,并导致了高昂的资源开销和不稳定性。
使用 Paimon 的 partial-update
模式后,整个数据处理的范式发生了改变:
- 不再需要 Flink State 来做 Join:两个数据流不再需要在 Flink 算子内部进行 Join。它们各自独立地、源源不断地将自己的数据写入(
INSERT INTO
)到同一个 Paimon 表中。 - 写入是高效的顺序操作:Paimon 底层采用 LSM-Tree 结构。新写入的数据会先进入内存缓冲区,然后刷写成新的、有序的小文件。这个过程主要是顺序写,效率非常高。
这样一来,原来 Flink 作业中最消耗性能的“状态查找”(随机读)环节,被彻底消除了。
现在,两个流的数据都以部分列的形式写入了 Paimon 表。那么,数据是在哪里“打宽”合并的呢?答案是在 Paimon 的Compaction过程中。
-
Partial-Update 合并引擎:当将表的合并引擎设置为
partial-update
时,Paimon 就知道了它的合并策略。 正如文档/docs/content/primary-key-table/merge-engine/partial-update.md
中描述的,对于相同主键的多条记录,它会取每个字段最新的非空值,合并成一条完整的记录。假设 Paimon 收到三条记录:
假设第一列是主键,最终合并的结果将是
。
-
LSM-Tree 与顺序读合并:Paimon 的 Compaction 任务会定期将小的、分层的文件合并成更大的文件。这个合并过程是读取多个有序的文件,然后进行多路归并排序,这基本上是顺序读操作,效率远高于随机读。
PartialUpdateMergeFunction
这个类就是实现该合并逻辑的核心。 -
Paimon Compaction策略见:Paimon LSM Tree Compaction 策略
// ... existing code ...public class PartialUpdateMergeFunction implements MergeFunction { // ... existing code ... private InternalRow currentKey; private long latestSequenceNumber; private GenericRow row; private KeyValue reused; private boolean currentDeleteRow; // ... existing code ... @Override public void add(KeyValue kv) { // refresh key object to avoid reference overwritten currentKey = kv.key(); currentDeleteRow = false;// ... existing code ...
-
专用 Compaction 作业:为了不影响数据写入的实时性,最佳实践是启动一个独立的、专用的 Compaction 作业。这样,数据写入和数据合并就可以完全解耦,互不干扰。
如文档
/docs/content/maintenance/dedicated-compaction.md
所述,当有多个流式作业写入一个partial-update
表时,推荐使用专用的 Compaction 作业。/bin/flink run \\ /path/to/paimon-flink-action-{{}}.jar \\ compact \\ --warehouse \\ --database \\ --table \\ ...
总结:Paimon 的核心优势
通过上述分析,我们可以清晰地看到 Paimon 在这个场景下的优势:
- 性能革命:将 Flink State 的随机读瓶颈,转变为 Paimon 的顺序写 + 后台顺序读合并,大幅提升了整体吞吐量和性能。
- 架构简化与成本降低:不再需要维护外部的 HBase/Pegasus 等 KV 系统,所有数据统一存储在 Paimon 中,降低了系统复杂度和运维、存储成本。
- 稳定性提升:Flink 作业本身变成了无状态或轻状态的写入任务,彻底告别了 TB 级的 State,使得作业的稳定性和恢复速度大大增强。
- 开发简化:原来需要手写复杂
DataStream
API 和Timer
才能实现的逻辑,现在只需要两个简单的INSERT INTO
SQL 语句即可完成,开发效率和代码可维护性显著提高。
PartialUpdateMergeFunction
这是在 Paimon 中实现 partial-update
(部分列更新) 合并引擎的核心类。它的主要职责是在 Compaction 过程中,将具有相同主键的多条记录(KeyValue
)合并成最终的一条记录。
PartialUpdateMergeFunction
实现了 MergeFunction
接口。在 Paimon 的 LSM-Tree 存储模型中,当执行 Compaction 操作需要合并多个数据文件时,Paimon 会读取具有相同主键的一组 KeyValue
数据,然后交由一个 MergeFunction
实例来处理,计算出最终的结果。
PartialUpdateMergeFunction
的合并逻辑是:对于相同主键的记录,不断地用新的非空字段值去覆盖旧的字段值,最终得到一个“打宽”后的完整记录。 它还支持更复杂的场景,如基于序列号的更新、字段聚合和多种删除策略。
// ... existing code ...import org.apache.paimon.mergetree.compact.MergeFunction;// ... existing code .../** * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update * non-null fields on merge. */public class PartialUpdateMergeFunction implements MergeFunction {// ... existing code ...
核心成员变量
这些变量定义了 PartialUpdateMergeFunction
的状态和配置,决定了其合并行为。
// ... existing code ...public class PartialUpdateMergeFunction implements MergeFunction { public static final String SEQUENCE_GROUP = \"sequence-group\"; private final InternalRow.FieldGetter[] getters; // 用于从 InternalRow 中获取字段值 private final boolean ignoreDelete; // 是否忽略删除记录 private final Map fieldSeqComparators; // 字段序列号比较器,用于 sequence-group private final boolean fieldSequenceEnabled; // 是否启用了 sequence-group private final Map fieldAggregators; // 字段聚合器 private final boolean removeRecordOnDelete; // 收到 DELETE 记录时是否删除整行 private final Set sequenceGroupPartialDelete; // 收到 DELETE 记录时,根据 sequence-group 删除部分列 private final boolean[] nullables; // 记录每个字段是否可为 null private InternalRow currentKey; // 当前处理的主键 private long latestSequenceNumber; // 见过的最新序列号 private GenericRow row; // 合并过程中的结果行 private KeyValue reused; // 用于复用的 KeyValue 对象,避免重复创建 private boolean currentDeleteRow; // 标记当前行最终是否应被删除 private boolean notNullColumnFilled; /** * If the first value is retract, and no insert record is received, the row kind should be * RowKind.DELETE. (Partial update sequence group may not correctly set currentDeleteRow if no * RowKind.INSERT value is received) */ private boolean meetInsert; // 是否遇到过 INSERT 类型的记录// ... existing code ...
- 配置类变量 (
ignoreDelete
,fieldSeqComparators
,fieldAggregators
等) 通常在Factory
中被初始化,它们在整个合并过程中保持不变。 - 状态类变量 (
currentKey
,row
,latestSequenceNumber
等) 会在每次reset()
时被重置,用于处理新的一组具有相同主键的记录。
add(KeyValue kv)
:合并逻辑的核心
这是最重要的方法,定义了单条 KeyValue
是如何被合并到当前结果 row
中的。
// ... existing code ... @Override public void add(KeyValue kv) { // refresh key object to avoid reference overwritten currentKey = kv.key(); currentDeleteRow = false; if (kv.valueKind().isRetract()) { if (!notNullColumnFilled) { initRow(row, kv.value()); notNullColumnFilled = true; } // ... 删除逻辑处理 ... // ... existing code ... String msg = String.join( \"\\n\", \"By default, Partial update can not accept delete records,\" + \" you can choose one of the following solutions:\", \"1. Configure \'ignore-delete\' to ignore delete records.\", \"2. Configure \'partial-update.remove-record-on-delete\' to remove the whole row when receiving delete records.\", \"3. Configure \'sequence-group\'s to retract partial columns.\"); throw new IllegalArgumentException(msg); } latestSequenceNumber = kv.sequenceNumber(); if (fieldSeqComparators.isEmpty()) { updateNonNullFields(kv); } else { updateWithSequenceGroup(kv); } meetInsert = true; notNullColumnFilled = true; }// ... existing code ...
它的逻辑可以分为两大块:
A. 处理 retract
消息 (RowKind 为 DELETE
或 UPDATE_BEFORE
)
partial-update
默认不接受删除记录。如果收到了,行为由配置决定:
ignoreDelete = true
: 直接忽略这条删除记录,返回。removeRecordOnDelete = true
: 当收到DELETE
类型的记录时,将currentDeleteRow
标记为true
,并清空当前row
。这意味着最终这条主键对应的记录将被删除。fieldSequenceEnabled = true
: 启用了sequence-group
。这是最复杂的逻辑,它会调用retractWithSequenceGroup(kv)
。这个方法会根据序列号比较结果,来决定是否要“撤销”某些字段的更新(通常是将其设置为null
或调用聚合器的retract
方法)。- 默认行为: 如果以上配置都没有,则直接抛出
IllegalArgumentException
异常,提示用户如何正确配置。
B. 处理 add
消息 (RowKind 为 INSERT
或 UPDATE_AFTER
)
这是主要的更新逻辑:
-
简单更新 (
updateNonNullFields
): 如果没有配置sequence-group
(fieldSeqComparators
为空),则执行最简单的部分列更新。遍历新纪录kv
的所有字段,只要字段值不为null
,就用它来更新row
中对应位置的值。// ... existing code ...private void updateNonNullFields(KeyValue kv) { for (int i = 0; i < getters.length; i++) { Object field = getters[i].getFieldOrNull(kv.value()); if (field != null) { row.setField(i, field); } else {// ... existing code ...
-
带序列号的更新 (
updateWithSequenceGroup
): 如果配置了sequence-group
,逻辑会更复杂。对于每个字段:- 如果该字段不属于任何
sequence-group
,则行为和简单更新类似(但会考虑聚合)。 - 如果该字段属于某个
sequence-group
,则会使用FieldsComparator
比较新记录kv
和当前结果row
的序列号字段。只有当新记录的序列号 大于或等于 当前结果的序列号时,才会用新记录的字段值去更新row
中由该sequence-group
控制的所有字段。这保证了数据的更新顺序。
- 如果该字段不属于任何
updateWithSequenceGroup
这个方法是 partial-update
合并引擎处理带有 sequence-group
配置时的核心逻辑。当用户在表属性中定义了 fields..sequence-group = ,
这样的规则时,数据合并就不再是简单的“非空值覆盖”,而是需要根据 seq_field
的值来判断是否应该更新 data_field1
和 data_field2
。这解决了多流更新时可能出现的数据乱序覆盖问题。
updateWithSequenceGroup
方法通过引入FieldsComparator
,将简单的字段更新升级为基于序列号的条件更新。它精确地控制了哪些字段在何时可以被更新,从而保证了在多流并发写入场景下,即使数据存在一定程度的乱序,最终也能合并成正确的结果。这是 Paimonpartial-update
模式能够处理复杂更新场景的关键所在。
// ... existing code ... private void updateWithSequenceGroup(KeyValue kv) {// ... existing code ...
- 输入:
KeyValue kv
,代表一条新到达的、具有相同主键的记录。 - 目标: 遍历这条新记录
kv
的所有字段,并根据sequence-group
的规则,决定是否用kv
中的字段值来更新当前正在合并的结果行this.row
。
该方法的核心是一个 for
循环,它遍历了表中的每一个字段。
// ... existing code ... private void updateWithSequenceGroup(KeyValue kv) { for (int i = 0; i < getters.length; i++) {// ... existing code ...
在循环内部,对每个字段的处理逻辑可以分为两种情况:
- 该字段不属于任何
sequence-group
。 - 该字段属于某个
sequence-group
。
让我们来详细看这两种情况。
1. 字段不属于任何 sequence-group
// ... existing code ... private void updateWithSequenceGroup(KeyValue kv) { for (int i = 0; i < getters.length; i++) { Object field = getters[i].getFieldOrNull(kv.value()); FieldsComparator seqComparator = fieldSeqComparators.get(i); FieldAggregator aggregator = fieldAggregators.get(i); Object accumulator = getters[i].getFieldOrNull(row); if (seqComparator == null) { if (aggregator != null) { row.setField(i, aggregator.agg(accumulator, field)); } else if (field != null) { row.setField(i, field); } } else {// ... existing code ...
- 判断条件:
seqComparator == null
。fieldSeqComparators
是一个Map
,如果在里面找不到当前字段索引i
,就说明这个字段不受任何sequence-group
控制。 - 处理逻辑:
- 带聚合函数: 如果为该字段配置了聚合函数(
aggregator != null
),例如sum
、max
等,则调用aggregator.agg()
方法,将当前累加值accumulator
和新值field
进行聚合,并将结果写回row
。 - 不带聚合函数: 这是最简单的情况。如果新来的字段值
field
不为null
,就直接用它覆盖row
中的旧值。这和updateNonNullFields
的行为是一致的。
- 带聚合函数: 如果为该字段配置了聚合函数(
2. 字段属于某个 sequence-group
这是该方法最核心和复杂的部分。
// ... existing code ... } else { if (isEmptySequenceGroup(kv, seqComparator)) { // skip null sequence group continue; } if (seqComparator.compare(kv.value(), row) >= 0) { int index = i; // Multiple sequence fields should be updated at once. if (Arrays.stream(seqComparator.compareFields()) .anyMatch(seqIndex -> seqIndex == index)) { for (int fieldIndex : seqComparator.compareFields()) { row.setField( fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value())); } continue; } row.setField( i, aggregator == null ? field : aggregator.agg(accumulator, field)); } else if (aggregator != null) { row.setField(i, aggregator.aggReversed(accumulator, field)); } } } }// ... existing code ...
- 判断条件:
seqComparator != null
。 - 处理逻辑:
- 空序列组检查:
isEmptySequenceGroup(kv, seqComparator)
会检查这条新纪录kv
中,其对应的序列号字段是否都为null
。如果是,意味着这条记录无法判断新旧,因此直接跳过,不进行任何更新。 - 序列号比较:
seqComparator.compare(kv.value(), row) >= 0
是关键。它会比较新记录kv
和当前结果row
中,由seqComparator
定义的序列号字段。- 如果新记录的序列号 >= 当前结果的序列号: 这意味着新记录
kv
是“更新”的或者“同样新”的,此时应该用kv
的值去更新row
。- 更新序列号字段本身: 如果当前字段
i
就是序列号字段之一,那么需要把这个sequence-group
定义的所有序列号字段都一次性更新掉,然后用continue
跳出本次循环。这是为了保证序列号字段之间的一致性。 - 更新数据字段: 如果当前字段
i
是被序列号控制的数据字段,则执行更新。如果有聚合器,则调用aggregator.agg()
;如果没有,则直接用新值field
覆盖。
- 更新序列号字段本身: 如果当前字段
- 如果新记录的序列号 < 当前结果的序列号: 这意味着
kv
是一条“旧”数据。在大部分情况下,这条旧数据会被忽略。但有一个例外:如果为该字段配置了支持乱序聚合的聚合器(例如sum
),则会调用aggregator.aggReversed()
。这个方法通常和agg()
的逻辑是一样的,它允许旧数据也能被正确地聚合进来。对于不支持乱序的聚合器(如max
),aggReversed
可能就是一个空操作。
- 如果新记录的序列号 >= 当前结果的序列号: 这意味着新记录
- 空序列组检查:
getResult()
方法:产出最终结果
当处理完具有相同主键的所有 KeyValue
后,调用此方法来获取最终的合并结果。
// ... existing code ... @Override public KeyValue getResult() { if (reused == null) { reused = new KeyValue(); } RowKind rowKind = currentDeleteRow || !meetInsert ? RowKind.DELETE : RowKind.INSERT; return reused.replace(currentKey, latestSequenceNumber, rowKind, row); }// ... existing code ...
它会根据 currentDeleteRow
和 meetInsert
标志位来决定最终的 RowKind
。如果 currentDeleteRow
为 true
,或者整个合并过程从未见过 INSERT
类型的记录,那么最终结果就是一条 DELETE
记录。否则,就是一条 INSERT
记录。然后将主键、最新的序列号、最终的 RowKind
和合并后的 row
数据打包成一个 KeyValue
返回。
Factory
内部类:配置的入口
PartialUpdateMergeFunction.Factory
是一个非常重要的内部类,它负责解析用户在表上设置的 OPTIONS
,并据此创建出一个配置好的 PartialUpdateMergeFunction
实例。
// ... existing code ... public static MergeFunctionFactory factory( Options options, RowType rowType, List primaryKeys) { return new Factory(options, rowType, primaryKeys); } private static class Factory implements MergeFunctionFactory { // ... 成员变量,用于存储从 Options 解析出的配置 ... private Factory(Options options, RowType rowType, List primaryKeys) { this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE); // ... existing code ... this.removeRecordOnDelete = options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE); // ... 解析 sequence-group 配置 ... for (Map.Entry entry : options.toMap().entrySet()) { String k = entry.getKey(); String v = entry.getValue(); if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) { // ... 解析出序列号字段和被控制的字段,构建 fieldSeqComparators ... } } // ... 解析聚合函数配置,构建 fieldAggregators ... this.fieldAggregators = createFieldAggregators( rowType, primaryKeys, allSequenceFields, new CoreOptions(options)); // ... 配置校验,确保冲突的配置不会同时开启 ... Preconditions.checkState( !(removeRecordOnDelete && ignoreDelete), // ... ); // ... }// ... existing code ...
在构造函数中,它会:
- 读取
ignore-delete
,partial-update.remove-record-on-delete
等简单配置。 - 遍历所有
OPTIONS
,查找以fields.
开头、以.sequence-group
结尾的配置项,例如fields.order_time.sequence-group=order_id,price
。它会解析这些配置,构建出fieldSeqComparators
这个 Map,其中 key 是被控制字段的索引,value 是一个能够比较order_time
字段的比较器。 - 调用
createFieldAggregators
方法,解析fields.*.aggregate-function
等配置,构建出fieldAggregators
这个 Map。 - 执行一系列
Preconditions.checkState
,对用户的配置进行合法性校验,防止出现逻辑冲突。
总结
PartialUpdateMergeFunction
是 Paimon 实现高性能数据打宽(部分列更新)能力的技术基石。它通过一个设计精巧的合并流程,将简单的非空字段覆盖、基于序列号的有序更新、字段聚合以及多种删除策略融为一体。其 Factory
类则充当了连接用户配置和底层实现的桥梁。理解了这个类的工作原理,就能深刻地理解 Paimon partial-update
模式的强大之处。
双流拼接 怎么处理schema
Paimon 允许在写入数据时自动合并和演进表结构。这对于像双流 Join 结果写入等 schema 可能变化的场景至关重要。这个功能主要通过 write.merge-schema
选项来开启。
当将数据写入 Paimon 表时:
- 如果
write.merge-schema
设置为true
,Paimon 会比较写入数据(Source)的 schema 和目标表(Sink)当前的 schema。 - 如果发现写入数据中包含了表中不存在的新列,Paimon 会自动将这些新列添加到表结构中,生成一个新的、版本更高的 schema。
- 对于数据中缺失但在表 schema 中存在的列,Paimon 会自动填充
null
值。
这个过程是原子性的,并记录在表的元数据中。Paimon 会为每一次 schema 变更创建一个新的版本化的 schema 文件。
代码参考:
在 Spark 中,写入逻辑由 WriteIntoPaimonTable.scala
处理。可以看到,当 mergeSchema
为 true
时,它会调用 mergeAndCommitSchema
来合并 schema,并处理列不匹配的情况。
WriteIntoPaimonTable.scala
// ... existing code ... override def run(sparkSession: SparkSession): Seq[Row] = { var data = _data if (mergeSchema) { val dataSchema = SparkSystemColumns.filterSparkSystemColumns(data.schema) val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST) mergeAndCommitSchema(dataSchema, allowExplicitCast) // For case that some columns is absent in data, we still allow to write once write.merge-schema is true. val newTableSchema = SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType()) if (!PaimonUtils.sameType(newTableSchema, dataSchema)) { val resolve = sparkSession.sessionState.conf.resolver val cols = newTableSchema.map { field => dataSchema.find(f => resolve(f.name, field.name)) match { case Some(f) => col(f.name) case _ => lit(null).as(field.name) } } data = data.select(cols: _*) } }// ... existing code ...
一个具体的测试用例也展示了这一点,一个原先只有 a
和 b
列的表,成功写入了包含 c
和 d
列的新数据。
DataFrameWriteTest.scala
// ... existing code ... // Case 1: two additional fields: DoubleType and TimestampType val ts = java.sql.Timestamp.valueOf(\"2023-08-01 10:00:00.0\") val df2 = Seq((1, \"2023-08-01\", 12.3d, ts), (3, \"2023-08-03\", 34.5d, ts)) .toDF(\"a\", \"b\", \"c\", \"d\") df2.write .format(\"paimon\") .mode(\"append\") .option(\"write.merge-schema\", \"true\") .save(location)// ... existing code ...
在 Flink 或 Spark 中进行双流 Join 时,Paimon 通常作为 Sink 端。Join 操作本身由计算引擎完成。应用需要做的就是:
- 执行双流 Join。
- 将 Join 后的
DataStream
或DataFrame
写入 Paimon 表。 - 在写入时,设置
write.merge-schema
为true
。
这样,无论 Join 结果的 schema 如何(比如因为上游流增加了字段导致 Join 结果也增加了字段),Paimon 表都可以自动适应,动态地添加新列。
SchemaMergingUtils
SchemaMergingUtils
是 Paimon schema 演进(Schema Evolution)功能的核心工具类。它的主要职责是比较两个 schema(通常是数据表的现有 schema 和新写入数据的 schema),并根据预设的规则将它们合并成一个新的、统一的 schema。这个过程支持添加新列、安全地转换现有列的数据类型,从而实现动态 schema 的能力。
当配置 Paimon 表允许 schema 合并(例如通过 write.merge-schema=true
)时,写入流程就会调用这个工具类。它会:
- 比较字段:找出新旧 schema 中同名和新增的字段。
- 合并类型:对于同名字段,尝试合并其数据类型(例如,
INT
可以演进为BIGINT
)。 - 添加字段:将新 schema 中独有的字段添加到最终的 schema 中,并为其分配新的唯一 ID。
- 生成新版 Schema:如果发生了任何变更,它会创建一个版本号加一的新的
TableSchema
对象。
下面我们结合代码,从顶层方法到底层实现,一步步进行分析。
mergeSchemas
这是最顶层的入口方法,用于合并一个完整的表 schema 和一个新的行类型(通常来自要写入的数据)。
- 参数:
currentTableSchema
: Paimon 表当前的TableSchema
对象。它包含了字段、分区键、主键、表配置等所有元数据。targetType
: 目标RowType
,即新数据的 schema。allowExplicitCast
: 一个布尔标志,决定是否允许显式(可能存在精度损失)的类型转换,比如STRING
转INT
。
- 逻辑:
- 首先,它会检查
targetType
和currentTableSchema
的RowType
是否完全相同。如果相同,则无需合并,直接返回当前的TableSchema
。 - 如果不同,它会初始化一个
AtomicInteger
类型的highestFieldId
,记录当前 schema 中所有字段(包括嵌套字段)的最大 ID。这个 ID 对于为新字段分配唯一标识至关重要。 - 调用重载的
mergeSchemas
方法(最终调用核心的merge
方法)来递归地合并两个RowType
。 - 如果合并后的
newRowType
与原始的currentType
相同(例如,只是可空性变化,而合并逻辑会保留原始的可空性),则也认为没有发生实质性变化,返回原始的TableSchema
。 - 如果 schema 确实发生了变化,它会创建一个新的
TableSchema
实例。这个新 schema 的 ID 会在旧 ID 的基础上加 1,字段列表和highestFieldId
会更新,而分区键、主键、表配置和注释等信息则会从旧 schema 中继承。
- 首先,它会检查
// ... existing code ... public static TableSchema mergeSchemas( TableSchema currentTableSchema, RowType targetType, boolean allowExplicitCast) { RowType currentType = currentTableSchema.logicalRowType(); if (currentType.equals(targetType)) { return currentTableSchema; } AtomicInteger highestFieldId = new AtomicInteger(currentTableSchema.highestFieldId()); RowType newRowType = mergeSchemas(currentType, targetType, highestFieldId, allowExplicitCast); if (newRowType.equals(currentType)) { // It happens if the `targetType` only changes `nullability` but we always respect the // current\'s. return currentTableSchema; } return new TableSchema( currentTableSchema.id() + 1, newRowType.getFields(), highestFieldId.get(), currentTableSchema.partitionKeys(), currentTableSchema.primaryKeys(), currentTableSchema.options(), currentTableSchema.comment()); }// ... existing code ...
merge
这是所有合并逻辑的核心。它被递归调用以处理各种数据类型。
可空性处理 (Nullability Handling)
在方法的一开始,它将 base0
和 update0
的可空性都设置为 true
来进行比较。最终返回的类型的可空性将以 base0
(原始表 schema 中的类型)为准。这意味着 schema 合并不会改变现有列的可空性。
// ... existing code ... public static DataType merge( DataType base0, DataType update0, AtomicInteger highestFieldId, boolean allowExplicitCast) { // Here we try to merge the base0 and update0 without regard to the nullability, // and set the base0\'s nullability to the return\'s. DataType base = base0.copy(true); DataType update = update0.copy(true); if (base.equals(update)) { return base0; } else if (base instanceof RowType && update instanceof RowType) {// ... existing code ...
递归合并复杂类型
RowType
(行类型): 这是最复杂的部分。- 合并现有字段: 遍历
base
(旧 schema) 的所有字段。对于每个字段,检查update
(新 schema) 中是否存在同名字段。如果存在,就递归调用merge
方法来合并这两个字段的类型。如果不存在,则保留base
中的原始字段。 - 添加新字段: 遍历
update
的所有字段,找出在base
中不存在的字段。这些就是需要新增的列。对于每个新字段,调用assignIdForNewField
为其分配一个新的、唯一的字段 ID,然后将其添加到最终的字段列表中。 - 最后,用更新后的字段列表创建一个新的
RowType
。
- 合并现有字段: 遍历
// ... existing code ... } else if (base instanceof RowType && update instanceof RowType) { List baseFields = ((RowType) base).getFields(); List updateFields = ((RowType) update).getFields(); Map updateFieldMap = updateFields.stream() .collect(Collectors.toMap(DataField::name, Function.identity())); List updatedFields = baseFields.stream() .map( baseField -> { if (updateFieldMap.containsKey(baseField.name())) {DataField updateField = updateFieldMap.get(baseField.name());DataType updatedDataType = merge( baseField.type(), updateField.type(), highestFieldId, allowExplicitCast);return new DataField( baseField.id(), baseField.name(), updatedDataType, baseField.description()); } else {return baseField; } }) .collect(Collectors.toList()); Map baseFieldMap = baseFields.stream() .collect(Collectors.toMap(DataField::name, Function.identity())); List newFields = updateFields.stream() .filter(field -> !baseFieldMap.containsKey(field.name())) .map(field -> assignIdForNewField(field, highestFieldId)) .map(field -> field.copy(true)) .collect(Collectors.toList()); updatedFields.addAll(newFields); return new RowType(base0.isNullable(), updatedFields); } else if (base instanceof MapType && update instanceof MapType) {// ... existing code ...
MapType
,ArrayType
,MultisetType
: 对于这些集合类型,合并逻辑很简单:递归地调用merge
方法来合并它们的内部元素类型(MapType
的键和值类型,ArrayType
和MultisetType
的元素类型)。
合并基础类型
-
DecimalType
: 这是一个特例。只有当两个DecimalType
的scale
(小数位数) 相同时,才能合并。合并后的precision
(总位数) 取两者中的最大值。如果scale
不同,会直接抛出UnsupportedOperationException
。 -
其他可转换类型: 对于其他基础类型,通过
supportsDataTypesCast
方法判断是否可以转换。- 隐式转换 (Implicit Cast): 当
allowExplicitCast
为false
时,只允许安全的类型提升,例如INT
->BIGINT
,FLOAT
->DOUBLE
。 - 显式转换 (Explicit Cast): 当
allowExplicitCast
为true
时,允许更多可能损失精度的转换。 - 对于带有长度(如
VARCHAR
)或精度(如TIMESTAMP
)的类型,通常要求新类型的长度/精度不能小于旧类型,除非开启了显式转换。 - 如果可以转换,则直接采用
update
的类型,但保留base0
的可空性。
- 隐式转换 (Implicit Cast): 当
// ... existing code ... } else if (supportsDataTypesCast(base, update, allowExplicitCast)) { if (DataTypes.getLength(base).isPresent() && DataTypes.getLength(update).isPresent()) { // this will check and merge types which has a `length` attribute, like BinaryType, // CharType, VarBinaryType, VarCharType. if (allowExplicitCast || DataTypes.getLength(base).getAsInt() <= DataTypes.getLength(update).getAsInt()) { return update.copy(base0.isNullable()); } else { throw new UnsupportedOperationException( String.format( \"Failed to merge the target type that has a smaller length: %s and %s\", base, update)); } } else if (DataTypes.getPrecision(base).isPresent() && DataTypes.getPrecision(update).isPresent()) { // this will check and merge types which has a `precision` attribute, like // LocalZonedTimestampType, TimeType, TimestampType. if (allowExplicitCast || DataTypes.getPrecision(base).getAsInt() <= DataTypes.getPrecision(update).getAsInt()) { return update.copy(base0.isNullable()); } else { throw new UnsupportedOperationException( String.format( \"Failed to merge the target type that has a lower precision: %s and %s\", base, update)); } } else { return update.copy(base0.isNullable()); } } else { throw new UnsupportedOperationException( String.format(\"Failed to merge data types %s and %s\", base, update)); } }// ... existing code ...
assignIdForNewField
这个方法非常重要。当向 RowType
中添加一个新字段时,它负责为这个新字段及其所有嵌套字段(如果是复杂类型)分配唯一的 ID。它通过传入的 AtomicInteger highestFieldId
来实现 ID 的原子性递增,确保了在并发场景下 ID 的唯一性,这对于 Paimon 正确地按 ID 映射和读取列数据至关重要。
// ... existing code ... private static DataField assignIdForNewField(DataField field, AtomicInteger highestFieldId) { DataType dataType = ReassignFieldId.reassign(field.type(), highestFieldId); return new DataField( highestFieldId.incrementAndGet(), field.name(), dataType, field.description()); }}
总结
SchemaMergingUtils
通过一套定义明确且可递归的规则,实现了 Paimon 强大而灵活的 Schema 演进能力。它能够智能地处理字段的增加和类型变化,同时通过严格的 ID 分配和管理,保证了数据读写的正确性。这个类是 Paimon 能够适应动态数据源、支持平滑表结构变更的关键所在。
SchemaManager
SchemaManager
是 Paimon 中负责管理表 schema(模式)的核心组件。它处理所有与 schema 相关的持久化操作,包括创建、读取、更新和版本管理。可以把它看作是 Paimon 表 schema 在文件系统中的“数据库管理员”。
SchemaManager
的主要职责可以归纳为以下几点:
- Schema 持久化:将
TableSchema
对象序列化为 JSON 文件,并存储在表的schema
目录下。每个 schema 文件代表一个版本。 - 版本管理:每个 schema 文件名都以
schema-
开头,后跟一个从 0 开始递增的版本号(ID),例如schema-0
,schema-1
等。这使得 Paimon 可以追踪 schema 的所有历史变更。 - Schema 读取:提供方法来读取最新版本的 schema、特定版本的 schema 或所有版本的 schema。
- Schema 创建:在创建新表时,负责初始化并提交第一个 schema 版本(
schema-0
)。 - Schema 变更:通过应用一系列
SchemaChange
(如添加列、删除列、修改表选项等)来原子性地更新 schema,并生成一个新的、版本号加一的 schema 文件。 - 多分支支持:能够为不同的数据分支(branch)管理各自独立的 schema 演进路径。
结构和关键属性
// ... existing code ...@ThreadSafepublic class SchemaManager implements Serializable { private static final String SCHEMA_PREFIX = \"schema-\"; private final FileIO fileIO; private final Path tableRoot; private final String branch; public SchemaManager(FileIO fileIO, Path tableRoot) {// ... existing code ...
@ThreadSafe
: 这个注解表明该类的设计是线程安全的,允许多个线程同时访问一个SchemaManager
实例。SCHEMA_PREFIX
: 常量\"schema-\"
,定义了 schema 文件名的前缀。fileIO
:FileIO
接口的实例,用于与底层文件系统(如 HDFS, S3, 本地文件系统)进行交互。tableRoot
:Path
对象,指向表的根目录。SchemaManager
会在这个目录下的schema
子目录中工作。branch
: 字符串,表示当前SchemaManager
实例操作的数据分支名称。Paimon 支持类似 Git 的分支功能,main
是默认的主分支。不同的分支可以有独立的快照和 schema 演进。
构造函数和分支管理
// ... existing code ... public SchemaManager(FileIO fileIO, Path tableRoot) { this(fileIO, tableRoot, DEFAULT_MAIN_BRANCH); } /** Specify the default branch for data writing. */ public SchemaManager(FileIO fileIO, Path tableRoot, String branch) { this.fileIO = fileIO; this.tableRoot = tableRoot; this.branch = BranchManager.normalizeBranch(branch); } public SchemaManager copyWithBranch(String branchName) { return new SchemaManager(fileIO, tableRoot, branchName); }// ... existing code ...
- 构造函数初始化了
fileIO
、tableRoot
和branch
。默认使用主分支DEFAULT_MAIN_BRANCH
。 copyWithBranch(String branchName)
: 这是一个工厂方法,用于创建一个新的SchemaManager
实例来操作指定的分支。这体现了 Paimon 对多分支的支持。
Schema 读取方法
// ... existing code ... public Optional latest() { try { return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX) .reduce(Math::max) .map(this::schema); } catch (IOException e) { throw new UncheckedIOException(e); } }// ... existing code ... public List listAll() { return listAllIds().stream().map(this::schema).collect(Collectors.toList()); } public List listAllIds() { try { return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX) .collect(Collectors.toList()); } catch (IOException e) { throw new UncheckedIOException(e); } }// ... existing code ...
latest()
: 获取最新版本的TableSchema
。它通过listVersionedFiles
工具方法列出schema
目录下所有符合schema-*
格式的文件,提取出版本号,找到最大的版本号,然后调用schema(long id)
方法读取并反序列化对应的 schema 文件。listAll()
: 获取所有版本的TableSchema
列表。listAllIds()
: 仅获取所有 schema 版本的 ID 列表。schema(long id)
(未在片段中完全展示,但被latest()
调用): 这是一个内部方法,根据给定的 ID 构建 schema 文件路径(如.../schema/schema-5
),然后使用fileIO
读取文件内容,并通过TableSchema.fromJSON(String json)
将其反序列化为TableSchema
对象。
表创建 createTable(...)
// ... existing code ... public TableSchema createTable(Schema schema, boolean externalTable) throws Exception { while (true) { Optional latest = latest(); if (latest.isPresent()) { TableSchema latestSchema = latest.get(); if (externalTable) { checkSchemaForExternalTable(latestSchema.toSchema(), schema); return latestSchema; } else { throw new IllegalStateException( \"Schema in filesystem exists, creation is not allowed.\"); } } TableSchema newSchema = TableSchema.create(0, schema); // validate table from creating table FileStoreTableFactory.create(fileIO, tableRoot, newSchema).store(); boolean success = commit(newSchema); if (success) { return newSchema; } } }// ... existing code ...
- 这是一个原子性操作,通过
while(true)
循环和文件系统的原子性创建来保证。 - 检查存在性: 首先调用
latest()
检查是否已有 schema 文件存在。如果存在且不是创建外部表,则抛出异常,防止覆盖现有表。 - 创建新 Schema: 如果不存在,则使用
TableSchema.create(0, schema)
创建一个 ID 为 0 的新TableSchema
。 - 验证: 调用
FileStoreTableFactory.create(...)
来验证 schema 的有效性(例如,检查主键、分区键等配置是否合法)。 - 提交: 调用
commit(newSchema)
方法,该方法会尝试原子性地创建schema-0
文件。如果创建成功,循环结束并返回新的TableSchema
。如果因为并发冲突导致创建失败,循环会继续,重新尝试整个过程。
Schema 变更 commitChanges(...)
这是执行 ALTER TABLE
操作的核心逻辑。
// ... existing code ... public TableSchema commitChanges(List changes) throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException { SnapshotManager snapshotManager = new SnapshotManager(fileIO, tableRoot, branch, null, null); LazyField hasSnapshots = new LazyField(() -> snapshotManager.latestSnapshot() != null); while (true) { TableSchema oldTableSchema = latest().orElseThrow( () ->new Catalog.TableNotExistException( identifierFromPath( tableRoot.toString(), true, branch))); TableSchema newTableSchema = generateTableSchema(oldTableSchema, changes, hasSnapshots); try { boolean success = commit(newTableSchema); if (success) { return newTableSchema; } } catch (Exception e) { throw new RuntimeException(e); } } } public boolean commit(TableSchema newSchema) throws Exception { SchemaValidation.validateTableSchema(newSchema); SchemaValidation.validateFallbackBranch(this, newSchema); Path schemaPath = toSchemaPath(newSchema.id()); return fileIO.tryToWriteAtomic(schemaPath, newSchema.toString()); }// ... existing code ...
- 同样使用
while(true)
循环来保证原子性。 - 获取旧 Schema: 首先获取当前的最新 schema (
oldTableSchema
)。 - 生成新 Schema: 调用
generateTableSchema
方法,该方法是变更逻辑的核心。它接收旧 schema 和一个SchemaChange
列表,然后逐个应用这些变更(如AddColumn
,DropColumn
,SetOption
等),生成一个新的TableSchema
对象。这个新对象的 ID 是旧 ID 加 1。 - 提交新 Schema: 调用
commit(newTableSchema)
尝试原子性地创建新的 schema 文件(如schema-5
->schema-6
)。如果成功,则返回新 schema。如果失败,则重试。
generateTableSchema(...)
这个方法是应用 SchemaChange
的具体实现。它像一个状态机,基于 oldTableSchema
,根据 changes
列表中的每个变更项,逐步构建出 newTableSchema
的各个部分。
// ... existing code ... public TableSchema generateTableSchema( TableSchema oldTableSchema, List changes, LazyField hasSnapshots) throws Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException { Map oldOptions = new HashMap(oldTableSchema.options()); Map newOptions = new HashMap(oldTableSchema.options()); List newFields = new ArrayList(oldTableSchema.fields()); AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId()); String newComment = oldTableSchema.comment(); for (SchemaChange change : changes) { if (change instanceof SetOption) {// ... existing code ... } else if (change instanceof RemoveOption) {// ... existing code ... } else if (change instanceof AddColumn) {// ... existing code ... } else if (change instanceof RenameColumn) {// ... existing code ... } else if (change instanceof DropColumn) {// ... existing code ... } else if (change instanceof UpdateColumnType) {// ... existing code ... } else if (change instanceof UpdateColumnNullability) {// ... existing code ... } else if (change instanceof UpdateColumnPosition) {// ... existing code ... } else if (change instanceof UpdateColumnComment) {// ... existing code ... } }// ... existing code ...
它通过 instanceof
判断 SchemaChange
的具体类型,并执行相应的逻辑:
SetOption
/RemoveOption
: 修改newOptions
这个 Map。AddColumn
: 向newFields
列表中添加新字段,并使用highestFieldId
分配新 ID。RenameColumn
: 修改newFields
中某个字段的名称。DropColumn
: 从newFields
中移除字段。UpdateColumnType
/UpdateColumnNullability
: 更新字段的类型或可空性。- ...等等。
总结
SchemaManager
是 Paimon 表结构管理的基石。它通过将 schema 版本化并持久化到文件系统中,实现了 schema 的可靠追踪和演进。其原子性的提交操作(无论是创建还是变更)确保了在并发环境下的元数据一致性。它与 SchemaMergingUtils
(负责逻辑合并)和 SchemaChange
(负责定义变更操作)等类紧密协作,共同构成了 Paimon 强大而灵活的 Schema Evolution 机制。