> 文档中心 > Greenplum【代码分享 01】实现replace insert或insert on conflict类似on duplicate key update批量入库数据(合并插入无则新增有则更新)

Greenplum【代码分享 01】实现replace insert或insert on conflict类似on duplicate key update批量入库数据(合并插入无则新增有则更新)

阿里云开发者社区有《PostgreSQL upsert 功能 insert on conflict do 的用法》说明,这里不再赘述。

1.问题描述

项目数据库要从 MySQL 切换到 Greenplum 问题最多的就是 SQL 语法不同,MySQL有on duplicate key update实现冲突更新,Greenplum却没有。

PostgreSQL 9.5 引入了 UPSET 功能,其语法insert on conflict do非常强大,支持合并写入(当违反某唯一约束时,冲突则更新,不冲突则写入),同时支持流式计算。

当前使用的 Greenplum 数据库版本为6.13.0其 PostgreSQL 版本为 9.4.24无法使用 UPSERT 语法:

# select version();     PostgreSQL 9.4.24 (Greenplum Database 6.13.0 build commit:4f1adf8e247a9685c19ea02bcaddfdc200937ecd Open Source) on x86_64-unknown-linux-gnu, compiled by gcc (GCC) 6.4.0, 64-bit compiled on Dec 18 2020 22:31:16

遗憾的是20220408发布的最新版本6.20.3依然是9.4😢

PostgreSQL 9.4.26 (Greenplum Database 6.20.3 build commit:24b949d2585cdbe8a157062fb756013e7c2874ab Open Source) on x86_64-unknown-linux-gnu, compiled by gcc (GCC) 6.4.0, 64-bit compiled on Apr  6 2022 19:59:57

那使用 Greenplum 如何实现合并插入无则新增有则更新操作呢?阿里云开发者社区有《Greenplum merge insert 用法与性能 insert on conflict》说明,大家可以查看,下边写详细实现。

2.功能实现

2.1 思路分析

  1. 将要写入的数据保存到无主键约束的临时表。
  2. 合并临时表数据,使其不存在相同主键(合并逻辑根据由实际情况决定)入库临时表1。
  3. 临时表1内关联主表数据,查询出与主表数据冲突的数据入库临时表2。
  4. 根据临时表2的记录删除主表冲突数据。
  5. 将临时表2及临时表1内的新数据入库主表。

2.2 代码实现

  1. 代码调用
// 为了实现无侵入处理在原始的入库操作前后增加了 dealDuplicateKeyBeforeInsert 和 dealDuplicateKeyAfterInsertbaseUtils.dealDuplicateKeyBeforeInsert(params);baseService.insert("basicQuery.insertBatch", params);baseUtils.dealDuplicateKeyAfterInsert(params);
  1. 源码
 /**     * 处理主键值重复(数据入库前操作)     * 1.创建无主键的临时表_tmp并修改数据入库表名称     *     * @param params 操作相关参数     */    public void dealDuplicateKeyBeforeInsert(Map<String, Object> params) { String tableName = MapUtils.getString(params, "table_name"); baseService.update("basicQuery.createTmpTable", params); params.put("table_name", tableName + "_tmp");    }    /**     * 处理主键值重复(数据入库后操作)     * 1. 合并临时表_tmp的数据保证数据不重复并入库临时表_tmp1(distinctTmpTableData)     * 2. 临时表_tmp1关联主表获取重复主键数据并入库临时表_tmp2(queryDuplicateKeyData)     * 3. 删除主表重复主键数据(deleteDuplicateKeyData)     * 4. 入库主表_tmp1非重复主键数据和_tmp2全部数据(insertDistinctData)     * 5. 删除所有使用过的临时表(dropTmpTable)     *     * @param params 操作相关参数     */    public void dealDuplicateKeyAfterInsert(Map<String, Object> params) { // 获取表元数据 String tableName = MapUtils.getString(params, "table_name").replace("_tmp", ""); params.put("table_name", tableName); params.put("tableName", tableName); List<String> timeFieldNames = baseService.select("basicQuery.queryDateTimeField", tableName); if(timeFieldNames.size()>0){ String timeFieldName = timeFieldNames.get(0);     params.put("timeFieldName",timeFieldName); } // 表主键 Map primaryKey = (Map) baseService.selectOne("basicQuery.queryPrimaryKeyByTableName", params); String primaryKeyColumnName = MapUtils.getString(primaryKey, "columnName", ""); params.put("primaryKeyColumnName", primaryKeyColumnName); // 表字段 List<Map> columnList = baseService.select("basicQuery.queryFieldsByTableName", params); List<String> columnNameList = columnList.stream().map(mapObject -> MapUtils.getObject(mapObject, "column_name").toString()).collect(Collectors.toList()); columnNameList.remove(primaryKeyColumnName); params.put("columnNameList", columnNameList); try {     // 1.合并临时表_tmp的数据保证数据不重复并入库临时表_tmp1     baseService.update("basicQuery.distinctTmpTableData", params);     // 2.临时表_tmp1关联主表获取重复主键数据并入库临时表_tmp2     baseService.update("basicQuery.queryDuplicateKeyData", params);     // 3.删除主表重复主键数据     baseService.delete("basicQuery.deleteDuplicateKeyData", params);     // 4.入库主表数据     // 入库字段处理     String insertFields = String.join(",", columnNameList);     String insertTmpFields = "t_tmp." + String.join(",t_tmp.", columnNameList);     params.put("insertFieldStr", insertFields + "," + primaryKeyColumnName);     params.put("insertTmpFieldStr", insertTmpFields + ", t_tmp." + primaryKeyColumnName);     baseService.insert("basicQuery.insertDistinctData", params); } catch (Exception e) {     e.printStackTrace(); } finally {     // 5.删除临时表     baseService.delete("basicQuery.dropTmpTable", params); }    }
  1. Mapper
<select id="queryDateTimeField" parameterType="java.lang.String" resultType="java.lang.String">select lower(column_name) as fieldName from information_schema.columns where table_name=#{tableName} and table_schema='public' and udt_name= 'timestamp'</select>    <select id="queryFieldsByTableName" parameterType="java.util.Map" resultType="java.util.Map">select column_name,udt_name from information_schema.columns where table_name= #{tableName} and table_schema='public'</select><select id="queryPrimaryKeyByTableName" parameterType="java.util.Map" resultType="java.util.Map">SELECT A.attname AS "columnName",( i.keys ).n AS "keySeq",ci.relname AS "pkName"FROMpg_catalog.pg_class ct JOIN pg_catalog.pg_attribute A ON ( ct.oid = A.attrelid )JOIN pg_catalog.pg_namespace n ON ( ct.relnamespace = n.oid )JOIN (SELECTi.indexrelid,i.indrelid,i.indisprimary,information_schema._pg_expandarray ( i.indkey ) AS keysFROMpg_catalog.pg_index i) i ON ( A.attnum = ( i.keys ).x AND A.attrelid = i.indrelid )JOIN pg_catalog.pg_class ci ON ( ci.oid = i.indexrelid )WHERETRUEAND n.nspname = 'public'AND ct.relname =  #{tableName}AND i.indisprimary</select>

业务相关的SQL:

<update id="createTmpTable" parameterType="java.util.Map"> create table ${schemaName}${table_name}_tmp ( like ${table_name} )</update><update id="distinctTmpTableData" parameterType="java.util.Map">create table ${schemaName}${table_name}_tmp1 asselect ${primaryKeyColumnName},<foreach collection="columnNameList" index="index" item="item" open="" separator=" , " close=""><choose><when test="timeFieldName != null  and timeFieldName != ''">( ARRAY_AGG ( ${item} order by ${timeFieldName} desc ) ) [ 1 ] AS ${item}</when><otherwise>( ARRAY_AGG ( ${item} ) ) [ 1 ] AS ${item}</otherwise></choose></foreach>from ${schemaName}${table_name}_tmp group by ${primaryKeyColumnName}</update><update id="queryDuplicateKeyData" parameterType="java.util.Map">create table ${schemaName}${table_name}_tmp2 asselect ${primaryKeyColumnName},<foreach collection="columnNameList" index="index" item="item" open="" separator=" , " close="">t_tmp.${item}</foreach>from${schemaName}${table_name}_tmp1 as t_tmpinner join ${schemaName}${table_name} using ( ${primaryKeyColumnName} )</update><delete id="deleteDuplicateKeyData" parameterType="java.util.Map">delete from ${schemaName}${table_name} tusing ${schemaName}${table_name}_tmp2 t_tmpwhere t.${primaryKeyColumnName}=t_tmp.${primaryKeyColumnName}</delete><update id="insertDistinctData" parameterType="java.util.Map"> insert into ${schemaName}${table_name} (${insertFieldStr}) select ${insertTmpFieldStr} from ${schemaName}${table_name}_tmp1 t_tmp left join ${schemaName}${table_name}_tmp2 using ( ${primaryKeyColumnName} ) where ${schemaName}${table_name}_tmp2.* is null union all select ${insertFieldStr} from ${schemaName}${table_name}_tmp2</update><delete id="dropTmpTable" parameterType="java.util.Map">drop table ${schemaName}${table_name}_tmp, ${schemaName}${table_name}_tmp1, ${schemaName}${table_name}_tmp2</delete>

3.总结

当前代码可实现合并插入无则新增有则更新的功能,但是代码不算健壮。