> 技术文档 > Flink-Hadoop实战项目_flink和hadoop结合

Flink-Hadoop实战项目_flink和hadoop结合


项目说明文档

1. 项目概述

1.1 项目简介

本项目是一个基于Apache Flink的大数据流处理平台,专门用于处理铁路系统的票务和车次信息数据。系统包含两个核心流处理作业文件处理作业和数据合并作业,采用定时调度机制,支持Kerberos安全认证,实现从文件读取到数据仓库存储的完整数据处理链路。

1.2 技术栈

  • 流处理引擎: Apache Flink 1.18.1

  • 存储系统: HDFS (Hadoop分布式文件系统)

  • 数据仓库: Apache Hive

  • 文件格式: Parquet (列式存储格式)

  • 安全认证: Kerberos

  • 调度框架: Quartz Scheduler

  • 开发语言: Java 8

  • 日志框架: Log4j2

  • 序列化: Apache Avro

1.3 系统架构图

┌─────────────────┐   ┌─────────────────┐   ┌─────────────────┐│   文件监控层     │   │   流处理层       │   │   存储层         ││                 │   │                 │   │                 ││ ┌───────────┐ │   │ ┌─────────────┐ │   │ ┌─────────────┐ ││ │文件扫描器   │ │───▶│ │文件处理作业 │ │───▶│ │ Hive表     │ ││ └───────────┘ │   │ └─────────────┘ │   │ │info_appendix│ ││                 │   │                 │   │ │sale_record │ ││ ┌───────────┐ │   │ ┌─────────────┐ │   │ └─────────────┘ ││ │定时调度器   │ │───▶│ │数据合并作业 │ │───▶│ ┌─────────────┐ ││ └───────────┘ │   │ └─────────────┘ │   │ │ Hive表     │ │└─────────────────┘   └─────────────────┘   │ │train_info   │ │                                             │ └─────────────┘ │                                             └─────────────────┘

2. 需求分析

2.1 业务背景

铁路系统每天产生大量的票务交易数据和乘客信息数据,这些数据以文本文件的形式存储在HDFS系统中。为了便于数据分析和报表统计,需要将这些原始文件数据转换为结构化的数据仓库表,并且将分散的信息进行关联整合。

2.2 核心需求

2.2.1 需求一:文件处理作业

具体描述

  • 开发一个Flink流处理程序,实现定时扫描HDFS目录下的文件

  • 目标目录:/testInput

  • 需要处理的文件类型:

    • GASMZ_info_appendix*.txt:包含乘客身份信息的附加数据

    • GASMZ_sale*.txt:包含票务销售记录数据

  • 处理流程:扫描→读取→解析→写入Hive表→删除源文件

  • 目标表:

    • info_appendix表:存储乘客身份信息

    • sale_record表:存储销售记录信息

业务价值

  • 将非结构化文本文件转换为结构化数据表

  • 实现数据的实时入库,提高数据时效性

  • 通过Parquet格式存储,提高查询性能

2.2.2 需求二:数据合并作业

具体描述

  • 开发一个Flink流处理程序,定时执行数据关联合并

  • 调度频率:每小时的第1分钟执行一次(Cron表达式:0 1 * * * ?

  • 数据源:info_appendix表和sale_record

  • 关联条件:通过ticket_no等主键字段进行内连接

  • 目标表:train_info表(包含完整的票务和乘客信息)

业务价值

  • 整合分散的数据,形成完整的业务视图

  • 为下游数据分析和报表提供高质量的数据源

  • 支持复杂的业务查询需求

2.3 技术需求

2.3.1 性能需求
  • 处理能力:支持千万级别记录的处理

  • 延迟要求:文件从产生到入库完成不超过5分钟

  • 吞吐量:单作业支持每秒处理10,000条记录

  • 并发度:支持多文件并行处理

2.3.2 可靠性需求
  • 容错机制:支持任务失败自动重试(最多3次)

  • 数据一致性:确保数据完整性和幂等性

  • 故障恢复:支持从Checkpoint恢复

  • 监控告警:提供完整的监控和告警机制

2.3.3 安全需求
  • 认证授权:支持Kerberos认证

  • 数据安全:支持HDFS数据传输加密

  • 访问控制:基于角色的访问控制

2.3.4 运维需求
  • 可观测性:详细的日志记录和指标监控

  • 可配置性:支持动态配置调整

  • 可扩展性:支持水平扩展

3. 系统设计

3.1 整体架构设计

3.1.1 分层架构
┌─────────────────────────────────────────────────────────────┐│                       应用层                               ││ ┌─────────────────┐             ┌─────────────────┐       ││ │ZipFileProcessor │             │TrainInfoMerge   │       ││ │Job             │             │Job             │       ││ └─────────────────┘             └─────────────────┘       │├─────────────────────────────────────────────────────────────┤│                       服务层                               ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐││ │SourceFunction│ │MapFunction │ │SinkFunction │ │Scheduler│││ └─────────────┘ └─────────────┘ └─────────────┘ └─────────┘│├─────────────────────────────────────────────────────────────┤│                       工具层                               ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐││ │HdfsUtils   │ │KerberosUtils│ │ParquetWriter│ │HiveTable│││ │             │ │             │ │Utils       │ │Utils   │││ └─────────────┘ └─────────────┘ └─────────────┘ └─────────┘│├─────────────────────────────────────────────────────────────┤│                       基础设施层                           ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐           ││ │Flink Runtime│ │HDFS         │ │Hive         │           ││ └─────────────┘ └─────────────┘ └─────────────┘           │└─────────────────────────────────────────────────────────────┘
3.1.2 数据流设计
 文件扫描 → 文件读取 → 数据解析 → 数据转换 → 批量写入 → 文件清理   ↓         ↓           ↓         ↓         ↓         ↓ 定时扫描   内容提取     字段映射   实体转换   Parquet   源文件删除/testInput FSDataInput CSV解析   POJO对象 存储格式

3.2 核心组件设计

3.2.1 ZipFileProcessorJob详细设计

主要职责

  • 定时扫描HDFS目录

  • 读取和解析文本文件

  • 数据转换和清洗

  • 写入Hive表

  • 源文件清理

核心组件

  • TextFileSourceFunction

// ---------------------------------------------------------------------// 自定义 SourceFunction:定时扫描 testInput 目录下的目标文本文件// ---------------------------------------------------------------------public class TextFileSourceFunction extends RichSourceFunction {    private volatile boolean isRunning = true;    private final long scanInterval = 300000; // 5分钟扫描一次        @Override    public void run(SourceContext ctx) {        while (isRunning) {            try {                // 1. 扫描目录获取文件列表                List textFiles = HdfsUtils.listFilesByExtension(inputPath, \".txt\");                                // 2. 过滤目标文件                for (String file : textFiles) {                    if (file.contains(\"GASMZ_info_appendix\") || file.contains(\"GASMZ_sale\")) {                        ctx.collect(file);                   }               }                                // 3. 等待下次扫描                Thread.sleep(scanInterval);           } catch (Exception ex) {                logger.error(\"文件扫描异常\", ex);                // 重试机制                Thread.sleep(10000);           }       }   }}
  • TextFileReadFunction

// 改进的文件读取函数,增加重试机制public class TextFileReadFunction implements MapFunction<String, Tuple2> {    @Override    public Tuple2 map(String filePath) throws Exception {        return KerberosUtils.doAs(() -> {            return readFileWithRetry(filePath);       });   }        private Tuple2 readFileWithRetry(String filePath) throws Exception {        // 实现带重试的文件读取逻辑        // 1. 检查文件存在性        // 2. 读取文件内容        // 3. 处理异常和重试        // 4. 删除源文件   }}
  • InfoAppendixParseFunction

// -------------------------------------------------------------------// InfoAppendix 解析(FlatMap)// -------------------------------------------------------------------public class InfoAppendixParseFunction implements FlatMapFunction<Tuple2, InfoAppendix> {    @Override    public void flatMap(Tuple2 value, Collector out) {        String fileName = value.f0;        String content = value.f1;                // 1. 按行分割内容        String[] rows = content.split(\"\\\\n\");                // 2. 解析每一行数据        for (String row : rows) {            if (row.trim().isEmpty() || row.startsWith(\"statist_date\")) {                continue; // 跳过空行和表头           }                        // 3. CSV解析            String[] fields = HiveTableUtils.parseCsvLine(row, \";\");                        // 4. 字段映射            InfoAppendix info = new InfoAppendix();            info.setStatistDate(fields[0]);            info.setStatisticsDate(fields[1]);            info.setInnerCode(fields[2]);            // ... 其他字段映射                        // 5. 生成主键和分区字段            info.setPk(HiveTableUtils.generatePrimaryKey(                info.getStatisticsDate(), info.getOfficeNo(),                info.getWindowNo(), info.getTicketNo()));            info.setSdate(HiveTableUtils.generatePartitionValue());                        out.collect(info);       }   }}
  • InfoAppendixParquetSink

// -------------------------------------------------------------------// InfoAppendix Parquet Sink - 修复版本// -------------------------------------------------------------------public class InfoAppendixParquetSink extends RichSinkFunction {    private List buffer = new ArrayList();    private final int batchSize = 10;        @Override    public void invoke(InfoAppendix value, Context context) throws Exception {        buffer.add(value);                if (buffer.size() >= batchSize) {            flushBuffer();       }   }        private void flushBuffer() throws Exception {        if (ConfigManager.isKerberosEnabled()) {            KerberosUtils.doAs(() -> {                ParquetWriterUtils.writeInfoAppendixToParquet(buffer, currentPartition);                return null;           });       } else {            ParquetWriterUtils.writeInfoAppendixToParquet(buffer, currentPartition);       }        buffer.clear();   }}
3.2.2 TrainInfoMergeJob详细设计

主要职责

  • 注册Hive Catalog

  • 执行SQL关联查询

  • 数据流转换

  • 写入合并结果

核心实现流程

1.环境初始化

public static void runMergeOnce() throws Exception {    // 1. 系统属性设置    setupCriticalSystemProperties();    setupSystemProperties();        // 2. Kerberos认证初始化    initializeKerberosIfEnabled();        // 3. Flink环境创建    StreamExecutionEnvironment env = createClusterExecutionEnvironment();    configureFlinkJob(env);        // 4. Table环境创建    StreamTableEnvironment tableEnv = createTableEnvironment(env);        // 5. Hive Catalog注册    registerHiveCatalogAndTables(tableEnv);        // 6. 数据处理执行    executeDataMerge(env, tableEnv);        // 7. 作业启动    env.execute(\"TrainInfoMergeJob-Fixed\");}

2.Hive Catalog注册

private static void registerHiveCatalogAndTables(StreamTableEnvironment tableEnv) { String catalogName = \"hive_catalog\"; String defaultDatabase = ConfigManager.getHiveDatabaseName(); String hiveConfDir = ConfigManager.getHiveConfDir(); // 1. 构建Catalog DDL StringBuilder catalogDdl = new StringBuilder(); catalogDdl.append(\"CREATE CATALOG \").append(catalogName).append(\" WITH (\\n\"); catalogDdl.append(\" \'type\' = \'hive\',\\n\"); catalogDdl.append(\" \'default-database\' = \'\").append(defaultDatabase).append(\"\',\\n\"); catalogDdl.append(\" \'hive-conf-dir\' = \'\").append(hiveConfDir).append(\"\',\\n\"); catalogDdl.append(\" \'hadoop-conf-dir\' = \'\").append(hiveConfDir).append(\"\'\\n\"); catalogDdl.append(\")\"); // 2. 执行DDL创建Catalog tableEnv.executeSql(catalogDdl.toString()); // 3. 切换到Hive Catalog tableEnv.useCatalog(catalogName); tableEnv.useDatabase(defaultDatabase);}

3.SQL关联查询

private static String buildJoinSql() { String infoAppendixTable = ConfigManager.getInfoAppendixTableName(); String saleRecordTable = ConfigManager.getSaleRecordTableName(); String partitionValue = HiveTableUtils.generatePartitionValue(); return String.format( \"SELECT \" + \"s.statistdate, s.traindate, s.boardtraincode, s.fromtelecode, s.totelecode, \" + \"s.fromstationname, s.tostationname, s.starttime, s.coachno, s.seatno, \" + \"s.seattypecode, s.tickettype, s.ticketprice, s.innercode, s.saletime, \" + \"s.officeno, s.windowno, s.operaterno, s.ticketno, s.statisticsdate, \" + \"s.sequenceno, s.statisticsflag, s.relaytickettype, s.salemode, \" + \"s.ticketstate, s.areacentercode, s.pk, \" + \"i.idkind, i.idno, i.idname, s.sdate \" + \"FROM %s s \" + \"JOIN %s i \" + \"ON s.ticketno = i.ticketno \" + \"WHERE s.sdate = \'%s\' \" + \"LIMIT 1000\", saleRecordTable, infoAppendixTable, partitionValue);}

4.Row到TrainInfo转换

public static class RowToTrainInfoMapper implements MapFunction { @Override public TrainInfo map(Row row) throws Exception { TrainInfo trainInfo = new TrainInfo(); // 按SELECT字段顺序映射 trainInfo.setStatistDate(getStringFromRow(row, 0)); trainInfo.setTrainDate(getStringFromRow(row, 1)); trainInfo.setBoardTrainCode(getStringFromRow(row, 2)); // ... 其他字段映射 // 设置处理时间 trainInfo.setProcessTime(LocalDateTime.now() .format(DateTimeFormatter.ofPattern(\"yyyy-MM-dd HH:mm:ss\"))); return trainInfo; } private String getStringFromRow(Row row, int index) { try { Object value = row.getField(index); return value == null ? \"\" : value.toString(); } catch (Exception e) { return \"\"; } }}

3.3 数据模型设计

3.3.1 InfoAppendix实体类

InfoAppendix表结构

字段名 类型 描述 statist_date String 统计日期 statistics_date String 统计日期 inner_code String 内部编码 office_no String 车站编号 window_no String 窗口编号 ticket_no String 票号(关联键) id_kind String 证件类型 id_no String 证件号码 id_name String 姓名 area_center_code String 区域中心编码 pk String 主键
public class InfoAppendix { private String statistDate; // 统计日期 private String statisticsDate; // 统计日期2 private String innerCode; // 内部编码 private String officeNo; // 车站编号 private String windowNo; // 窗口编号 private String ticketNo; // 票号(关联键) private String idKind;  // 证件类型 private String idNo; // 证件号码 private String idName;  // 姓名 private String areaCenterCode; // 区域中心编码 private String pk;  // 主键 private String sdate; // 分区字段 // 构造函数、getter、setter方法}
3.3.2 SaleRecord实体类

SaleRecord表结构

字段名 类型 描述 statist_date String 统计日期 train_date String 乘车日期 board_train_code String 车次号 from_tele_code String 起点电报码 to_tele_code String 终点电报码 from_station_name String 起点站名 to_station_name String 终点站名 start_time String 开车时间 coach_no String 车厢号 seat_no String 座位号 seat_type_code String 座位类型 ticket_type String 票种 ticket_price String 票价 inner_code String 内部编码 sale_time String 售票时间 office_no String 车站编号 window_no String 窗口编号 operater_no String 操作员号 ticket_no String 票号(关联键) statistics_date String 统计日期 sequence_no String 序列号 statistics_flag String 统计标志 relay_ticket_type String 中转票类型 sale_mode String 售票方式 ticket_state String 票据状态 area_center_code String 区域中心编码 pk String 主键
public class SaleRecord { private String statistDate; // 统计日期 private String trainDate; // 乘车日期 private String boardTrainCode; // 车次号 private String fromTeleCode; // 起点电报码 private String toTeleCode; // 终点电报码 private String fromStationName; // 起点站名 private String toStationName; // 终点站名 private String startTime; // 开车时间 private String coachNo; // 车厢号 private String seatNo;  // 座位号 private String seatTypeCode; // 座位类型 private String ticketType; // 票种 private String ticketPrice; // 票价 private String innerCode; // 内部编码 private String saleTime; // 售票时间 private String officeNo; // 车站编号 private String windowNo; // 窗口编号 private String operaterNo; // 操作员号 private String ticketNo; // 票号(关联键) private String statisticsDate; // 统计日期 private String sequenceNo; // 序列号 private String statisticsFlag; // 统计标志 private String relayTicketType; // 中转票类型 private String saleMode; // 售票方式 private String ticketState; // 票据状态 private String areaCenterCode; // 区域中心编码 private String pk;  // 主键 private String sdate; // 分区字段 // 构造函数、getter、setter方法}
3.3.3 TrainInfo实体类(合并后的完整信息)
public class TrainInfo { // 继承SaleRecord的所有字段 private String statistDate; private String trainDate; private String boardTrainCode; // ... 其他SaleRecord字段 // 来自InfoAppendix的字段 private String idKind;  // 证件类型 private String idNo; // 证件号码 private String idName;  // 姓名 // 处理信息字段 private String processTime; // 处理时间 // 构造函数、getter、setter方法}

4. 关键技术实现

4.1 Kerberos安全认证机制

4.1.1 Kerberos认证基本原理

Kerberos是一种网络认证协议,采用\"票据\"机制实现安全的身份验证,主要包含以下核心概念:

  1. KDC (Key Distribution Center) - 密钥分发中心,包含:

    • AS (Authentication Server):认证服务器,负责初始认证和TGT发放

    • TGS (Ticket Granting Server):票据授权服务器,负责服务票据发放

  2. 认证流程三阶段

    • 认证阶段:客户端从AS获取TGT (Ticket Granting Ticket)

    • 授权阶段:客户端使用TGT从TGS获取服务票据(ST)

    • 服务请求阶段:客户端使用ST访问具体服务

  3. 核心安全机制

    • 对称加密:所有票据都使用密钥加密

    • 时间戳:防止重放攻击

    • 有限生命周期:票据具有有效期

认证流程:

┌─────────────┐ 1. 获取TGT ┌─────────────┐│ │────────────────────▶│ ││ Client │  │ KDC ││ (Flink Job) │◀────────────────────│ ││ │ 2. 返回TGT │ │└─────────────┘  └─────────────┘ │  │ │ 3. 请求服务票据(ST)  │ │─────────────────────────────────────▶ │  │ │◀───────────────────────────────────── │ 4. 返回服务票据 │ │  │ ▼  ▼┌─────────────┐ 5. 认证请求 ┌─────────────┐│ │─────────────────────▶│ ││ HDFS ││ Kerberized ││ NameNode │◀─────────────────────│ Service ││ │ 6. 认证响应 │ │└─────────────┘ └─────────────┘
4.1.2 在Hadoop生态中的实现

2.1 认证流程

  1. 客户端配置

    • 配置krb5.conf文件指定KDC信息

    • 准备keytab文件或配置用户名/密码

  2. 认证过程

    1. 客户端 → KDC-AS: 请求TGT2. KDC-AS → 客户端: 返回加密的TGT3. 客户端 → KDC-TGS: 使用TGT请求服务票据4. KDC-TGS → 客户端: 返回加密的服务票据5. 客户端 → 服务端: 使用服务票据访问服务
  3. Hadoop组件集成

    • 每个Hadoop服务(如NameNode)都需要在KDC中注册SPN(Service Principal Name)

    • 客户端访问服务时需要对应服务的SPN

2.2 关键配置文件

  1. krb5.conf - 定义Kerberos领域和KDC信息:

    [libdefaults] default_realm = EXAMPLE.COM dns_lookup_realm = false dns_lookup_kdc = false[realms] EXAMPLE.COM = { kdc = kdc.example.com admin_server = kdc.example.com }
  2. core-site.xml - Hadoop核心安全配置:

     hadoop.security.authentication kerberos
  3. hdfs-site.xml - HDFS安全配置:

     dfs.namenode.kerberos.principal hdfs/_HOST@EXAMPLE.COM

多组件认证关系:

┌───────────────────────────────────────────────────────┐│  Flink Cluster││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐││ │ JobManager │ │ TaskManager │ │ TaskManager │││ └─────────────┘ └─────────────┘ └─────────────┘││  │  │  │ ││  ▼  ▼  ▼ ││ ┌───────────────────────────────────────────────────┐││ │  Kerberos Realm  │││ │ │││ │ ┌─────────────┐  ┌───────────────────┐ │││ │ │ Keytab │  │ Ticket Cache │ │││ │ │ Credential │  │ (Optional) │ │││ │ └─────────────┘  └───────────────────┘ │││ │ │││ └───────────────────────────────────────────────────┘││  │└───────────────────────────────────────────────────────┘ │ │ ▼┌───────────────────────────────────────────────────────┐│  Kerberized Services  ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐││ │ HDFS │ │ Hive │ │ YARN │││ └─────────────┘ └─────────────┘ └─────────────┘│└───────────────────────────────────────────────────────┘
4.1.3 核心实现
public class KerberosUtils { private static volatile UserGroupInformation loginUser = null; private static volatile boolean initialized = false; private static final Object LOCK = new Object(); /** * 初始化Kerberos认证 */ public static synchronized void initKerberos() throws IOException { if (!ConfigManager.isKerberosEnabled()) { return; } if (initialized && loginUser != null && loginUser.hasKerberosCredentials() && loginUser.isFromKeytab()) { return; // 已经初始化且有效 } synchronized (LOCK) { // 1. 设置系统属性 setupKerberosSystemProperties(); // 2. 创建Hadoop配置 Configuration conf = createHadoopConfiguration(); UserGroupInformation.setConfiguration(conf); // 3. 验证配置文件 validateKerberosFiles(); // 4. 执行Keytab登录 loginUser = UserGroupInformation.loginUserFromKeytabAndReturnUGI( ConfigManager.getKerberosPrincipal(),  ConfigManager.getKerberosKeytab() ); if (loginUser == null || !loginUser.hasKerberosCredentials()) { throw new IOException(\"Kerberos认证失败\"); } initialized = true; logger.info(\"Kerberos登录成功: {}\", loginUser.getUserName()); } } /** * 安全执行方法 */ public static  T doAs(PrivilegedExceptionAction action) throws Exception { if (!ConfigManager.isKerberosEnabled()) { return action.run(); } // 确保本地已初始化 if (!initialized || loginUser == null || !loginUser.hasKerberosCredentials() || !loginUser.isFromKeytab()) { initKerberos(); } return loginUser.doAs(action); } /** * 检查和重新登录 */ public static void checkAndRelogin() throws IOException { if (!ConfigManager.isKerberosEnabled()) return; if (!initialized || loginUser == null || !loginUser.hasKerberosCredentials() || !loginUser.isFromKeytab()) { initKerberos(); return; } try { loginUser.checkTGTAndReloginFromKeytab(); logger.debug(\"Kerberos票据已刷新\"); } catch (Exception e) { logger.error(\"票据刷新失败,重新登录\", e); synchronized (LOCK) { loginUser = null; initialized = false; initKerberos(); } } }}
4.1.4 系统属性设置
private static void setupKerberosSystemProperties() { try { String krb5Config = ConfigManager.getKrb5ConfPath(); if (krb5Config != null && new File(krb5Config).exists()) { System.setProperty(\"java.security.krb5.conf\", krb5Config); } String jaasConfig = ConfigManager.getJaasConfPath(); if (jaasConfig != null && new File(jaasConfig).exists()) { System.setProperty(\"java.security.auth.login.config\", jaasConfig); } System.setProperty(\"javax.security.auth.useSubjectCredsOnly\", \"false\"); System.setProperty(\"hadoop.security.authentication\", \"kerberos\"); // 调试模式 if (ConfigManager.isKerberosDebugEnabled()) { System.setProperty(\"sun.security.krb5.debug\", \"true\"); System.setProperty(\"sun.security.jgss.debug\", \"true\"); } } catch (Exception e) { throw new RuntimeException(\"设置Kerberos系统属性失败\", e); }}

4.2 HDFS文件操作机制

4.2.1 设计思路
  • 连接管理:单例模式管理FileSystem连接

  • 重试机制:网络异常时自动重试

  • 安全封装:统一的Kerberos安全调用

  • 错误处理:详细的异常分类和处理

4.2.2 核心实现
public class HdfsUtils { private static volatile FileSystem fileSystem; private static volatile boolean initialized = false; /** * 统一的Kerberos doAs封装 */ private static  T doKerberosAction(PrivilegedExceptionAction action) throws IOException { if (ConfigManager.isKerberosEnabled()) { try { return KerberosUtils.doAs(action); } catch (Exception e) { throw new IOException(\"Kerberos doAs操作失败\", e); } } else { try { return action.run(); } catch (Exception e) { throw new IOException(\"操作失败\", e); } } } /** * 带重试机制的文件读取 */ public static FSDataInputStream openFileWithRetry(String path, int maxRetries) throws IOException { IOException lastException = null; for (int attempt = 1; attempt  {  if (ConfigManager.isKerberosEnabled()) { KerberosUtils.checkAndRelogin();  }  Configuration conf = createHadoopConfiguration();  FileSystem fs = FileSystem.get(conf);  return fs.open(new Path(path)); }); } catch (IOException e) { lastException = e; logger.warn(\"打开文件失败 (第{}/{}次): {}\", attempt, maxRetries, e.getMessage()); if (attempt < maxRetries) {  try { Thread.sleep(5000); // 等待5秒 reinitializeFileSystem(); // 重新初始化连接  } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new IOException(\"重试被中断\", ie);  } } } } throw new IOException(\"文件打开失败,已重试\" + maxRetries + \"次: \" + path, lastException); } /** * 创建增强的Hadoop配置 */ private static Configuration createHadoopConfiguration() { Configuration conf = new Configuration(); // HDFS HA配置 conf.set(\"fs.defaultFS\", \"hdfs://nameservice1\"); conf.set(\"dfs.nameservices\", \"nameservice1\"); conf.set(\"dfs.ha.namenodes.nameservice1\", \"nn1,nn2\"); conf.set(\"dfs.namenode.rpc-address.nameservice1.nn1\", \"ddp1:8020\"); conf.set(\"dfs.namenode.rpc-address.nameservice1.nn2\", \"ddp2:8020\"); conf.set(\"dfs.client.failover.proxy.provider.nameservice1\", \"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\"); // 网络优化配置 conf.setBoolean(\"dfs.client.use.datanode.hostname\", false); conf.setBoolean(\"dfs.datanode.use.datanode.hostname\", false); conf.setBoolean(\"dfs.client.read.shortcircuit\", false); conf.setBoolean(\"dfs.domain.socket.path.disable\", true); // 超时和重试配置 conf.setInt(\"dfs.client.socket-timeout\", 300000); // 5分钟 conf.setInt(\"ipc.client.connect.timeout\", 120000); // 2分钟 conf.setInt(\"ipc.client.connect.max.retries\", 10); conf.setInt(\"ipc.client.rpc.timeout\", 300000); // 安全配置 if (ConfigManager.isKerberosEnabled()) { conf.set(\"hadoop.security.authentication\", \"kerberos\"); conf.set(\"hadoop.security.authorization\", \"true\"); conf.set(\"dfs.data.transfer.protection\", \"authentication\"); } else { conf.set(\"hadoop.security.authentication\", \"simple\"); conf.set(\"hadoop.security.authorization\", \"false\"); } return conf; }}

4.3 Parquet文件写入机制

4.3.1 设计原理
  • 批量写入:采用缓冲区机制,提高写入效率

  • Schema管理:使用Avro Schema定义数据结构

  • 压缩优化:采用Snappy压缩算法

  • 并发安全:支持多线程并发写入

4.3.2 Avro Schema定义
public class ParquetWriterUtils { // InfoAppendix的Avro Schema private static final String INFO_APPENDIX_SCHEMA = \"{\\n\" + \" \\\"type\\\": \\\"record\\\",\\n\" + \" \\\"name\\\": \\\"InfoAppendix\\\",\\n\" + \" \\\"namespace\\\": \\\"com.train.data.entity\\\",\\n\" + \" \\\"fields\\\": [\\n\" + \" {\\\"name\\\": \\\"statistDate\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"statisticsDate\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"innerCode\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"officeNo\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"windowNo\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"ticketNo\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"idKind\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"idNo\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"idName\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"areaCenterCode\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"pk\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"}\\n\" + \" ]\\n\" + \"}\";}
4.3.3 批量写入实现
private static void doWriteInfoAppendixToParquet(List records, String partitionValue) throws IOException { String tableName = ConfigManager.getInfoAppendixTableName(); String outputPath = buildParquetFilePath(tableName, partitionValue, \"info_appendix\"); logger.info(\"开始写入InfoAppendix数据到Parquet文件: {}, 记录数: {}\", outputPath, records.size()); // 1. 解析Schema Schema schema = new Schema.Parser().parse(INFO_APPENDIX_SCHEMA); // 2. 创建Hadoop配置 Configuration conf = createHadoopConfiguration(); Path parquetPath = new Path(outputPath); // 3. 创建Parquet写入器 ParquetWriter writer = null; try { writer = AvroParquetWriter.builder(parquetPath) .withSchema(schema) .withConf(conf) .withCompressionCodec(CompressionCodecName.SNAPPY) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build(); // 4. 数据转换和写入 int count = 0; for (InfoAppendix record : records) { GenericRecord avroRecord = new GenericData.Record(schema); // 字段映射 avroRecord.put(\"statistDate\", safeString(record.getStatistDate())); avroRecord.put(\"statisticsDate\", safeString(record.getStatisticsDate())); avroRecord.put(\"innerCode\", safeString(record.getInnerCode())); avroRecord.put(\"officeNo\", safeString(record.getOfficeNo())); avroRecord.put(\"windowNo\", safeString(record.getWindowNo())); avroRecord.put(\"ticketNo\", safeString(record.getTicketNo())); avroRecord.put(\"idKind\", safeString(record.getIdKind())); avroRecord.put(\"idNo\", safeString(record.getIdNo())); avroRecord.put(\"idName\", safeString(record.getIdName())); avroRecord.put(\"areaCenterCode\", safeString(record.getAreaCenterCode())); avroRecord.put(\"pk\", safeString(record.getPk())); writer.write(avroRecord); count++; // 进度日志 if (count <= 5 || count % 100 == 0) { logger.info(\"已写入第{}条InfoAppendix记录: ticketNo={}\", count, record.getTicketNo()); } } logger.info(\"InfoAppendix数据写入完成: {}, 记录数: {}\", outputPath, count); } finally { if (writer != null) { try { writer.close(); } catch (Exception e) { logger.error(\"关闭ParquetWriter失败\", e); } } }}/** * 安全的字符串转换,避免null值 */private static String safeString(String value) { return value == null ? \"\" : value;}/** * 构建Parquet文件路径,避免文件名冲突 */private static String buildParquetFilePath(String tableName, String partitionValue, String filePrefix) { String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern(\"yyyyMMdd_HHmmss_SSS\")); String uuid = UUID.randomUUID().toString().substring(0, 8); String threadId = String.valueOf(Thread.currentThread().getId()); String fileName = String.format(\"%s_%s_%s_%s.parquet\", filePrefix, timestamp, threadId, uuid); return String.format(\"%s/%s/sdate=%s/%s\", ConfigManager.getHiveWarehouseDir(), tableName, partitionValue, fileName);}

4.4 定时调度机制

4.4.1 Quartz调度配置
public class QuartzMain { public static void main(String[] args) throws Exception { // 1. 创建作业详情 JobDetail job = JobBuilder.newJob(TrainInfoMergeQuartzJob.class) .withIdentity(\"mergeJob\", \"group1\") .storeDurably(true) // 持久化作业 .build(); // 2. 创建触发器 - 每小时的第1分钟执行 Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(\"mergeTrigger\", \"group1\") .withSchedule(CronScheduleBuilder.cronSchedule(\"0 1 * * * ?\") .withMisfireHandlingInstructionDoNothing()) // 错过执行时间不补执行 .build(); // 3. 创建调度器 SchedulerFactory factory = new StdSchedulerFactory(); Scheduler scheduler = factory.getScheduler(); // 4. 启动调度器 scheduler.start(); logger.info(\"Quartz调度器启动成功\"); // 5. 注册作业和触发器 scheduler.scheduleJob(job, trigger); logger.info(\"数据合并作业调度注册成功,执行计划: 每小时第1分钟\"); // 6. 添加关闭钩子 Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { scheduler.shutdown(true); logger.info(\"Quartz调度器已关闭\"); } catch (SchedulerException e) { logger.error(\"关闭调度器失败\", e); } })); }}/** * Quartz作业实现类 */public class TrainInfoMergeQuartzJob implements Job { private static final Logger logger = LogManager.getLogger(TrainInfoMergeQuartzJob.class); @Override public void execute(JobExecutionContext context) throws JobExecutionException { logger.info(\"开始执行定时数据合并作业...\"); try { long startTime = System.currentTimeMillis(); // 调用合并作业 TrainInfoMergeJob.runMergeOnce(); long duration = System.currentTimeMillis() - startTime; logger.info(\"数据合并作业执行完成,耗时: {}ms\", duration);  } catch (Exception e) { logger.error(\"数据合并作业执行失败\", e); throw new JobExecutionException(\"数据合并作业执行失败: \" + e.getMessage(), e); } }}
4.4.2 文件扫描调度
public class TextFileSourceFunction extends RichSourceFunction { private volatile boolean isRunning = true; private final long scanInterval; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.scanInterval = ConfigManager.getZipProcessorInterval(); // 从配置读取扫描间隔 // TaskManager中的Kerberos认证 if (ConfigManager.isKerberosEnabled()) { KerberosUtils.reloginInTaskManager(); } } @Override public void run(SourceContext ctx) throws Exception { logger.info(\"文件扫描服务启动,扫描间隔: {}ms\", scanInterval); while (isRunning) { try { String inputPath = ConfigManager.getHdfsInputPath(); // 1. 扫描目录获取文件列表 List textFiles = scanTargetFiles(inputPath); // 2. 发送文件路径到下游 for (String file : textFiles) {  logger.info(\"发现目标文件: {}\", file);  ctx.collect(file); } // 3. 等待下次扫描 Thread.sleep(scanInterval); } catch (InterruptedException e) { logger.info(\"文件扫描服务被中断\"); break; } catch (Exception e) { logger.error(\"文件扫描异常\", e); // 异常重试机制 try {  Thread.sleep(Math.min(scanInterval, 60000)); // 最多等待1分钟 } catch (InterruptedException ie) {  break; } } } logger.info(\"文件扫描服务已停止\"); } /** * 扫描目标文件 */ private List scanTargetFiles(String inputPath) throws Exception { return KerberosUtils.doAs(() -> { List targetFiles = new ArrayList(); // 获取所有txt文件 List allFiles = HdfsUtils.listFilesByExtension(inputPath, \".txt\"); // 过滤目标文件 for (String file : allFiles) { String fileName = file.substring(file.lastIndexOf(\'/\') + 1); if (fileName.contains(\"GASMZ_info_appendix\") || fileName.contains(\"GASMZ_sale\")) {  targetFiles.add(file); } } logger.debug(\"扫描目录 {}, 找到目标文件 {} 个\", inputPath, targetFiles.size()); return targetFiles; }); } @Override public void cancel() { isRunning = false; logger.info(\"文件扫描服务收到停止信号\"); }}

5. 配置管理

5.1 ConfigManager设计

public class ConfigManager { private static Properties properties = new Properties(); private static boolean loaded = false; static { loadProperties(); } private static void loadProperties() { try (InputStream input = ConfigManager.class.getClassLoader() .getResourceAsStream(\"application.properties\")) { if (input == null) { throw new RuntimeException(\"找不到配置文件 application.properties\"); } properties.load(input); loaded = true; logger.info(\"配置文件加载成功\"); } catch (IOException e) { throw new RuntimeException(\"加载配置文件失败\", e); } } // HDFS配置 public static String getHdfsNameNodeUrl() { return getProperty(\"hdfs.namenode.url\", \"hdfs://nameservice1\"); } public static String getHdfsInputPath() { return getProperty(\"hdfs.input.path\", \"/testInput\"); } // Hive配置 public static String getHiveDatabaseName() { return getProperty(\"hive.database.name\", \"default\"); } public static String getHiveWarehouseDir() { return getProperty(\"hive.warehouse.dir\", \"/user/hive/warehouse\"); } public static String getInfoAppendixTableName() { return getProperty(\"hive.table.info_appendix\", \"info_appendix\"); } public static String getSaleRecordTableName() { return getProperty(\"hive.table.sale_record\", \"sale_record\"); } public static String getTrainInfoTableName() { return getProperty(\"hive.table.train_info\", \"train_info\"); } // Kerberos配置 public static boolean isKerberosEnabled() { return Boolean.parseBoolean(getProperty(\"security.kerberos.enabled\", \"false\")); } public static String getKerberosKeytab() { return getProperty(\"security.kerberos.login.keytab\"); } public static String getKerberosPrincipal() { return getProperty(\"security.kerberos.login.principal\"); } public static String getKrb5ConfPath() { return getProperty(\"security.kerberos.krb5.conf\"); } public static String getJaasConfPath() { return getProperty(\"security.jaas.conf\"); } // Flink配置 public static int getFlinkJobParallelism() { return Integer.parseInt(getProperty(\"flink.job.parallelism\", \"2\")); } public static long getFlinkCheckpointInterval() { return Long.parseLong(getProperty(\"flink.checkpoint.interval\", \"60000\")); } public static long getFlinkCheckpointTimeout() { return Long.parseLong(getProperty(\"flink.checkpoint.timeout\", \"600000\")); } public static int getFlinkRestartAttempts() { return Integer.parseInt(getProperty(\"flink.restart.attempts\", \"3\")); } public static long getFlinkRestartDelay() { return Long.parseLong(getProperty(\"flink.restart.delay\", \"10000\")); } // 调度配置 public static long getZipProcessorInterval() { return Long.parseLong(getProperty(\"scheduler.zip.processor.interval\", \"300000\")); } public static String getTrainInfoMergeCron() { return getProperty(\"scheduler.train_info.merge.cron\", \"0 1 * * * ?\"); } // 文件处理配置 public static String getFileDelimiter() { return getProperty(\"file.delimiter\", \";\"); } private static String getProperty(String key, String defaultValue) { return properties.getProperty(key, defaultValue); } private static String getProperty(String key) { String value = properties.getProperty(key); if (value == null || value.trim().isEmpty()) { throw new RuntimeException(\"配置项缺失: \" + key); } return value.trim(); }}

5.2 详细配置说明

5.2.1 application.properties完整配置
# ========== HDFS配置 ==========# HDFS NameNode地址,支持HA模式hdfs.namenode.url=hdfs://nameservice1# 输入文件目录hdfs.input.path=/testInput# 临时目录hdfs.temp.path=/tmp/train_data_processor# ========== HDFS HA配置 ==========dfs.nameservices=nameservice1dfs.ha.namenodes.nameservice1=nn1,nn2dfs.namenode.rpc-address.nameservice1.nn1=(服务器ip):(namenode)dfs.namenode.rpc-address.nameservice1.nn2=(服务器ip):(namenode)dfs.client.failover.proxy.provider.nameservice1=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider# ========== 网络优化配置 ==========# 禁用DataNode hostname使用,避免网络问题dfs.client.use.datanode.hostname=falsedfs.datanode.use.datanode.hostname=false# 禁用短路读取dfs.client.read.shortcircuit=falsedfs.domain.socket.path.disable=true# 超时配置hdfs.client.socket.timeout=300000hdfs.client.connect.timeout=120000hdfs.client.retry.max.attempts=5# ========== Hive配置 ==========# Hive Metastore地址hive.metastore.uris=thrift://(服务器ip):9083# 数据库名称hive.database.name=default# 数据仓库目录hive.warehouse.dir=/user/hive/warehouse# Hive配置目录hive.conf.dir=/home/flink/flink-1.18.1/plugins/hadoop-kerberos# ========== 表名配置 ==========hive.table.info_appendix=info_appendixhive.table.sale_record=sale_recordhive.table.train_info=train_info# ========== 文件处理配置 ==========file.encoding=UTF-8file.delimiter=;zip.extract.temp.dir=/tmp/zip_extract# ========== 调度配置 ==========# 文件处理间隔(毫秒)scheduler.zip.processor.interval=300000# 数据合并调度表达式scheduler.train_info.merge.cron=0 1 * * * ?# ========== Flink配置 ==========flink.job.parallelism=2flink.checkpoint.interval=60000flink.checkpoint.timeout=600000flink.restart.strategy=fixed-delayflink.restart.attempts=3flink.restart.delay=10000# ========== Kerberos配置 ==========security.kerberos.enabled=truesecurity.kerberos.login.use-ticket-cache=falsesecurity.kerberos.login.keytab=/home/flink/flink-1.18.1/plugins/hadoop-kerberos/hive.service.keytabsecurity.kerberos.login.principal=hive/ddp1@HADOOP.COMsecurity.kerberos.realm=HADOOP.COMsecurity.kerberos.kdc=(服务器ip)security.kerberos.krb5.conf=/home/flink/flink-1.18.1/plugins/hadoop-kerberos/krb5.confsecurity.jaas.conf=/home/flink/flink-1.18.1/plugins/hadoop-kerberos/flink-jaas.confsecurity.kerberos.debug=false# ========== Hadoop安全配置 ==========hadoop.security.authentication=kerberoshadoop.security.authorization=truehadoop.rpc.protection=authenticationhadoop.security.token.service.use_ip=falsedfs.data.transfer.protection=authentication# ========== IPC配置 ==========ipc.client.connect.timeout=60000ipc.client.connect.max.retries=5ipc.client.rpc.timeout=120000# ========== 调试配置 ==========enable.hive.sink=truedebug.print.parsed.records=truedebug.max.records.to.print=10# ========== 日志配置 ==========log.level=INFOlog.file.path=/var/log/flink-train-processorlog.file.max.size=100MBlog.file.max.history=30
5.2.2 flink-jaas.conf完整配置
Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true useTicketCache=false keyTab=\"/home/flink/flink-1.18.1/plugins/hadoop-kerberos/hive.service.keytab\" principal=\"hive/ddp1@HADOOP.COM\" debug=true;};FlinkClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true useTicketCache=false keyTab=\"/home/flink/flink-1.18.1/plugins/hadoop-kerberos/hive.service.keytab\" principal=\"hive/ddp1@HADOOP.COM\" debug=true;};