Flink-Hadoop实战项目_flink和hadoop结合
项目说明文档
1. 项目概述
1.1 项目简介
本项目是一个基于Apache Flink的大数据流处理平台,专门用于处理铁路系统的票务和车次信息数据。系统包含两个核心流处理作业:文件处理作业和数据合并作业,采用定时调度机制,支持Kerberos安全认证,实现从文件读取到数据仓库存储的完整数据处理链路。
1.2 技术栈
-
流处理引擎: Apache Flink 1.18.1
-
存储系统: HDFS (Hadoop分布式文件系统)
-
数据仓库: Apache Hive
-
文件格式: Parquet (列式存储格式)
-
安全认证: Kerberos
-
调度框架: Quartz Scheduler
-
开发语言: Java 8
-
日志框架: Log4j2
-
序列化: Apache Avro
1.3 系统架构图
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐│ 文件监控层 │ │ 流处理层 │ │ 存储层 ││ │ │ │ │ ││ ┌───────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ ││ │文件扫描器 │ │───▶│ │文件处理作业 │ │───▶│ │ Hive表 │ ││ └───────────┘ │ │ └─────────────┘ │ │ │info_appendix│ ││ │ │ │ │ │sale_record │ ││ ┌───────────┐ │ │ ┌─────────────┐ │ │ └─────────────┘ ││ │定时调度器 │ │───▶│ │数据合并作业 │ │───▶│ ┌─────────────┐ ││ └───────────┘ │ │ └─────────────┘ │ │ │ Hive表 │ │└─────────────────┘ └─────────────────┘ │ │train_info │ │ │ └─────────────┘ │ └─────────────────┘
2. 需求分析
2.1 业务背景
铁路系统每天产生大量的票务交易数据和乘客信息数据,这些数据以文本文件的形式存储在HDFS系统中。为了便于数据分析和报表统计,需要将这些原始文件数据转换为结构化的数据仓库表,并且将分散的信息进行关联整合。
2.2 核心需求
2.2.1 需求一:文件处理作业
具体描述:
-
开发一个Flink流处理程序,实现定时扫描HDFS目录下的文件
-
目标目录:
/testInput
-
需要处理的文件类型:
-
GASMZ_info_appendix*.txt
:包含乘客身份信息的附加数据 -
GASMZ_sale*.txt
:包含票务销售记录数据
-
-
处理流程:扫描→读取→解析→写入Hive表→删除源文件
-
目标表:
-
info_appendix
表:存储乘客身份信息 -
sale_record
表:存储销售记录信息
-
业务价值:
-
将非结构化文本文件转换为结构化数据表
-
实现数据的实时入库,提高数据时效性
-
通过Parquet格式存储,提高查询性能
2.2.2 需求二:数据合并作业
具体描述:
-
开发一个Flink流处理程序,定时执行数据关联合并
-
调度频率:每小时的第1分钟执行一次(Cron表达式:
0 1 * * * ?
) -
数据源:
info_appendix
表和sale_record
表 -
关联条件:通过
ticket_no
等主键字段进行内连接 -
目标表:
train_info
表(包含完整的票务和乘客信息)
业务价值:
-
整合分散的数据,形成完整的业务视图
-
为下游数据分析和报表提供高质量的数据源
-
支持复杂的业务查询需求
2.3 技术需求
2.3.1 性能需求
-
处理能力:支持千万级别记录的处理
-
延迟要求:文件从产生到入库完成不超过5分钟
-
吞吐量:单作业支持每秒处理10,000条记录
-
并发度:支持多文件并行处理
2.3.2 可靠性需求
-
容错机制:支持任务失败自动重试(最多3次)
-
数据一致性:确保数据完整性和幂等性
-
故障恢复:支持从Checkpoint恢复
-
监控告警:提供完整的监控和告警机制
2.3.3 安全需求
-
认证授权:支持Kerberos认证
-
数据安全:支持HDFS数据传输加密
-
访问控制:基于角色的访问控制
2.3.4 运维需求
-
可观测性:详细的日志记录和指标监控
-
可配置性:支持动态配置调整
-
可扩展性:支持水平扩展
3. 系统设计
3.1 整体架构设计
3.1.1 分层架构
┌─────────────────────────────────────────────────────────────┐│ 应用层 ││ ┌─────────────────┐ ┌─────────────────┐ ││ │ZipFileProcessor │ │TrainInfoMerge │ ││ │Job │ │Job │ ││ └─────────────────┘ └─────────────────┘ │├─────────────────────────────────────────────────────────────┤│ 服务层 ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐││ │SourceFunction│ │MapFunction │ │SinkFunction │ │Scheduler│││ └─────────────┘ └─────────────┘ └─────────────┘ └─────────┘│├─────────────────────────────────────────────────────────────┤│ 工具层 ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐││ │HdfsUtils │ │KerberosUtils│ │ParquetWriter│ │HiveTable│││ │ │ │ │ │Utils │ │Utils │││ └─────────────┘ └─────────────┘ └─────────────┘ └─────────┘│├─────────────────────────────────────────────────────────────┤│ 基础设施层 ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││ │Flink Runtime│ │HDFS │ │Hive │ ││ └─────────────┘ └─────────────┘ └─────────────┘ │└─────────────────────────────────────────────────────────────┘
3.1.2 数据流设计
文件扫描 → 文件读取 → 数据解析 → 数据转换 → 批量写入 → 文件清理 ↓ ↓ ↓ ↓ ↓ ↓ 定时扫描 内容提取 字段映射 实体转换 Parquet 源文件删除/testInput FSDataInput CSV解析 POJO对象 存储格式
3.2 核心组件设计
3.2.1 ZipFileProcessorJob详细设计
主要职责:
-
定时扫描HDFS目录
-
读取和解析文本文件
-
数据转换和清洗
-
写入Hive表
-
源文件清理
核心组件:
-
TextFileSourceFunction
// ---------------------------------------------------------------------// 自定义 SourceFunction:定时扫描 testInput 目录下的目标文本文件// ---------------------------------------------------------------------public class TextFileSourceFunction extends RichSourceFunction { private volatile boolean isRunning = true; private final long scanInterval = 300000; // 5分钟扫描一次 @Override public void run(SourceContext ctx) { while (isRunning) { try { // 1. 扫描目录获取文件列表 List textFiles = HdfsUtils.listFilesByExtension(inputPath, \".txt\"); // 2. 过滤目标文件 for (String file : textFiles) { if (file.contains(\"GASMZ_info_appendix\") || file.contains(\"GASMZ_sale\")) { ctx.collect(file); } } // 3. 等待下次扫描 Thread.sleep(scanInterval); } catch (Exception ex) { logger.error(\"文件扫描异常\", ex); // 重试机制 Thread.sleep(10000); } } }}
-
TextFileReadFunction
// 改进的文件读取函数,增加重试机制public class TextFileReadFunction implements MapFunction<String, Tuple2> { @Override public Tuple2 map(String filePath) throws Exception { return KerberosUtils.doAs(() -> { return readFileWithRetry(filePath); }); } private Tuple2 readFileWithRetry(String filePath) throws Exception { // 实现带重试的文件读取逻辑 // 1. 检查文件存在性 // 2. 读取文件内容 // 3. 处理异常和重试 // 4. 删除源文件 }}
-
InfoAppendixParseFunction
// -------------------------------------------------------------------// InfoAppendix 解析(FlatMap)// -------------------------------------------------------------------public class InfoAppendixParseFunction implements FlatMapFunction<Tuple2, InfoAppendix> { @Override public void flatMap(Tuple2 value, Collector out) { String fileName = value.f0; String content = value.f1; // 1. 按行分割内容 String[] rows = content.split(\"\\\\n\"); // 2. 解析每一行数据 for (String row : rows) { if (row.trim().isEmpty() || row.startsWith(\"statist_date\")) { continue; // 跳过空行和表头 } // 3. CSV解析 String[] fields = HiveTableUtils.parseCsvLine(row, \";\"); // 4. 字段映射 InfoAppendix info = new InfoAppendix(); info.setStatistDate(fields[0]); info.setStatisticsDate(fields[1]); info.setInnerCode(fields[2]); // ... 其他字段映射 // 5. 生成主键和分区字段 info.setPk(HiveTableUtils.generatePrimaryKey( info.getStatisticsDate(), info.getOfficeNo(), info.getWindowNo(), info.getTicketNo())); info.setSdate(HiveTableUtils.generatePartitionValue()); out.collect(info); } }}
-
InfoAppendixParquetSink
// -------------------------------------------------------------------// InfoAppendix Parquet Sink - 修复版本// -------------------------------------------------------------------public class InfoAppendixParquetSink extends RichSinkFunction { private List buffer = new ArrayList(); private final int batchSize = 10; @Override public void invoke(InfoAppendix value, Context context) throws Exception { buffer.add(value); if (buffer.size() >= batchSize) { flushBuffer(); } } private void flushBuffer() throws Exception { if (ConfigManager.isKerberosEnabled()) { KerberosUtils.doAs(() -> { ParquetWriterUtils.writeInfoAppendixToParquet(buffer, currentPartition); return null; }); } else { ParquetWriterUtils.writeInfoAppendixToParquet(buffer, currentPartition); } buffer.clear(); }}
3.2.2 TrainInfoMergeJob详细设计
主要职责:
-
注册Hive Catalog
-
执行SQL关联查询
-
数据流转换
-
写入合并结果
核心实现流程:
1.环境初始化
public static void runMergeOnce() throws Exception { // 1. 系统属性设置 setupCriticalSystemProperties(); setupSystemProperties(); // 2. Kerberos认证初始化 initializeKerberosIfEnabled(); // 3. Flink环境创建 StreamExecutionEnvironment env = createClusterExecutionEnvironment(); configureFlinkJob(env); // 4. Table环境创建 StreamTableEnvironment tableEnv = createTableEnvironment(env); // 5. Hive Catalog注册 registerHiveCatalogAndTables(tableEnv); // 6. 数据处理执行 executeDataMerge(env, tableEnv); // 7. 作业启动 env.execute(\"TrainInfoMergeJob-Fixed\");}
2.Hive Catalog注册
private static void registerHiveCatalogAndTables(StreamTableEnvironment tableEnv) { String catalogName = \"hive_catalog\"; String defaultDatabase = ConfigManager.getHiveDatabaseName(); String hiveConfDir = ConfigManager.getHiveConfDir(); // 1. 构建Catalog DDL StringBuilder catalogDdl = new StringBuilder(); catalogDdl.append(\"CREATE CATALOG \").append(catalogName).append(\" WITH (\\n\"); catalogDdl.append(\" \'type\' = \'hive\',\\n\"); catalogDdl.append(\" \'default-database\' = \'\").append(defaultDatabase).append(\"\',\\n\"); catalogDdl.append(\" \'hive-conf-dir\' = \'\").append(hiveConfDir).append(\"\',\\n\"); catalogDdl.append(\" \'hadoop-conf-dir\' = \'\").append(hiveConfDir).append(\"\'\\n\"); catalogDdl.append(\")\"); // 2. 执行DDL创建Catalog tableEnv.executeSql(catalogDdl.toString()); // 3. 切换到Hive Catalog tableEnv.useCatalog(catalogName); tableEnv.useDatabase(defaultDatabase);}
3.SQL关联查询
private static String buildJoinSql() { String infoAppendixTable = ConfigManager.getInfoAppendixTableName(); String saleRecordTable = ConfigManager.getSaleRecordTableName(); String partitionValue = HiveTableUtils.generatePartitionValue(); return String.format( \"SELECT \" + \"s.statistdate, s.traindate, s.boardtraincode, s.fromtelecode, s.totelecode, \" + \"s.fromstationname, s.tostationname, s.starttime, s.coachno, s.seatno, \" + \"s.seattypecode, s.tickettype, s.ticketprice, s.innercode, s.saletime, \" + \"s.officeno, s.windowno, s.operaterno, s.ticketno, s.statisticsdate, \" + \"s.sequenceno, s.statisticsflag, s.relaytickettype, s.salemode, \" + \"s.ticketstate, s.areacentercode, s.pk, \" + \"i.idkind, i.idno, i.idname, s.sdate \" + \"FROM %s s \" + \"JOIN %s i \" + \"ON s.ticketno = i.ticketno \" + \"WHERE s.sdate = \'%s\' \" + \"LIMIT 1000\", saleRecordTable, infoAppendixTable, partitionValue);}
4.Row到TrainInfo转换
public static class RowToTrainInfoMapper implements MapFunction { @Override public TrainInfo map(Row row) throws Exception { TrainInfo trainInfo = new TrainInfo(); // 按SELECT字段顺序映射 trainInfo.setStatistDate(getStringFromRow(row, 0)); trainInfo.setTrainDate(getStringFromRow(row, 1)); trainInfo.setBoardTrainCode(getStringFromRow(row, 2)); // ... 其他字段映射 // 设置处理时间 trainInfo.setProcessTime(LocalDateTime.now() .format(DateTimeFormatter.ofPattern(\"yyyy-MM-dd HH:mm:ss\"))); return trainInfo; } private String getStringFromRow(Row row, int index) { try { Object value = row.getField(index); return value == null ? \"\" : value.toString(); } catch (Exception e) { return \"\"; } }}
3.3 数据模型设计
3.3.1 InfoAppendix实体类
InfoAppendix表结构
public class InfoAppendix { private String statistDate; // 统计日期 private String statisticsDate; // 统计日期2 private String innerCode; // 内部编码 private String officeNo; // 车站编号 private String windowNo; // 窗口编号 private String ticketNo; // 票号(关联键) private String idKind; // 证件类型 private String idNo; // 证件号码 private String idName; // 姓名 private String areaCenterCode; // 区域中心编码 private String pk; // 主键 private String sdate; // 分区字段 // 构造函数、getter、setter方法}
3.3.2 SaleRecord实体类
SaleRecord表结构
public class SaleRecord { private String statistDate; // 统计日期 private String trainDate; // 乘车日期 private String boardTrainCode; // 车次号 private String fromTeleCode; // 起点电报码 private String toTeleCode; // 终点电报码 private String fromStationName; // 起点站名 private String toStationName; // 终点站名 private String startTime; // 开车时间 private String coachNo; // 车厢号 private String seatNo; // 座位号 private String seatTypeCode; // 座位类型 private String ticketType; // 票种 private String ticketPrice; // 票价 private String innerCode; // 内部编码 private String saleTime; // 售票时间 private String officeNo; // 车站编号 private String windowNo; // 窗口编号 private String operaterNo; // 操作员号 private String ticketNo; // 票号(关联键) private String statisticsDate; // 统计日期 private String sequenceNo; // 序列号 private String statisticsFlag; // 统计标志 private String relayTicketType; // 中转票类型 private String saleMode; // 售票方式 private String ticketState; // 票据状态 private String areaCenterCode; // 区域中心编码 private String pk; // 主键 private String sdate; // 分区字段 // 构造函数、getter、setter方法}
3.3.3 TrainInfo实体类(合并后的完整信息)
public class TrainInfo { // 继承SaleRecord的所有字段 private String statistDate; private String trainDate; private String boardTrainCode; // ... 其他SaleRecord字段 // 来自InfoAppendix的字段 private String idKind; // 证件类型 private String idNo; // 证件号码 private String idName; // 姓名 // 处理信息字段 private String processTime; // 处理时间 // 构造函数、getter、setter方法}
4. 关键技术实现
4.1 Kerberos安全认证机制
4.1.1 Kerberos认证基本原理
Kerberos是一种网络认证协议,采用\"票据\"机制实现安全的身份验证,主要包含以下核心概念:
-
KDC (Key Distribution Center) - 密钥分发中心,包含:
-
AS (Authentication Server):认证服务器,负责初始认证和TGT发放
-
TGS (Ticket Granting Server):票据授权服务器,负责服务票据发放
-
-
认证流程三阶段:
-
认证阶段:客户端从AS获取TGT (Ticket Granting Ticket)
-
授权阶段:客户端使用TGT从TGS获取服务票据(ST)
-
服务请求阶段:客户端使用ST访问具体服务
-
-
核心安全机制:
-
对称加密:所有票据都使用密钥加密
-
时间戳:防止重放攻击
-
有限生命周期:票据具有有效期
-
认证流程:
┌─────────────┐ 1. 获取TGT ┌─────────────┐│ │────────────────────▶│ ││ Client │ │ KDC ││ (Flink Job) │◀────────────────────│ ││ │ 2. 返回TGT │ │└─────────────┘ └─────────────┘ │ │ │ 3. 请求服务票据(ST) │ │─────────────────────────────────────▶ │ │ │◀───────────────────────────────────── │ 4. 返回服务票据 │ │ │ ▼ ▼┌─────────────┐ 5. 认证请求 ┌─────────────┐│ │─────────────────────▶│ ││ HDFS ││ Kerberized ││ NameNode │◀─────────────────────│ Service ││ │ 6. 认证响应 │ │└─────────────┘ └─────────────┘
4.1.2 在Hadoop生态中的实现
2.1 认证流程
-
客户端配置:
-
配置krb5.conf文件指定KDC信息
-
准备keytab文件或配置用户名/密码
-
-
认证过程:
1. 客户端 → KDC-AS: 请求TGT2. KDC-AS → 客户端: 返回加密的TGT3. 客户端 → KDC-TGS: 使用TGT请求服务票据4. KDC-TGS → 客户端: 返回加密的服务票据5. 客户端 → 服务端: 使用服务票据访问服务
-
Hadoop组件集成:
-
每个Hadoop服务(如NameNode)都需要在KDC中注册SPN(Service Principal Name)
-
客户端访问服务时需要对应服务的SPN
-
2.2 关键配置文件
-
krb5.conf - 定义Kerberos领域和KDC信息:
[libdefaults] default_realm = EXAMPLE.COM dns_lookup_realm = false dns_lookup_kdc = false[realms] EXAMPLE.COM = { kdc = kdc.example.com admin_server = kdc.example.com }
-
core-site.xml - Hadoop核心安全配置:
hadoop.security.authentication kerberos
-
hdfs-site.xml - HDFS安全配置:
dfs.namenode.kerberos.principal hdfs/_HOST@EXAMPLE.COM
多组件认证关系:
┌───────────────────────────────────────────────────────┐│ Flink Cluster││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐││ │ JobManager │ │ TaskManager │ │ TaskManager │││ └─────────────┘ └─────────────┘ └─────────────┘││ │ │ │ ││ ▼ ▼ ▼ ││ ┌───────────────────────────────────────────────────┐││ │ Kerberos Realm │││ │ │││ │ ┌─────────────┐ ┌───────────────────┐ │││ │ │ Keytab │ │ Ticket Cache │ │││ │ │ Credential │ │ (Optional) │ │││ │ └─────────────┘ └───────────────────┘ │││ │ │││ └───────────────────────────────────────────────────┘││ │└───────────────────────────────────────────────────────┘ │ │ ▼┌───────────────────────────────────────────────────────┐│ Kerberized Services ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐││ │ HDFS │ │ Hive │ │ YARN │││ └─────────────┘ └─────────────┘ └─────────────┘│└───────────────────────────────────────────────────────┘
4.1.3 核心实现
public class KerberosUtils { private static volatile UserGroupInformation loginUser = null; private static volatile boolean initialized = false; private static final Object LOCK = new Object(); /** * 初始化Kerberos认证 */ public static synchronized void initKerberos() throws IOException { if (!ConfigManager.isKerberosEnabled()) { return; } if (initialized && loginUser != null && loginUser.hasKerberosCredentials() && loginUser.isFromKeytab()) { return; // 已经初始化且有效 } synchronized (LOCK) { // 1. 设置系统属性 setupKerberosSystemProperties(); // 2. 创建Hadoop配置 Configuration conf = createHadoopConfiguration(); UserGroupInformation.setConfiguration(conf); // 3. 验证配置文件 validateKerberosFiles(); // 4. 执行Keytab登录 loginUser = UserGroupInformation.loginUserFromKeytabAndReturnUGI( ConfigManager.getKerberosPrincipal(), ConfigManager.getKerberosKeytab() ); if (loginUser == null || !loginUser.hasKerberosCredentials()) { throw new IOException(\"Kerberos认证失败\"); } initialized = true; logger.info(\"Kerberos登录成功: {}\", loginUser.getUserName()); } } /** * 安全执行方法 */ public static T doAs(PrivilegedExceptionAction action) throws Exception { if (!ConfigManager.isKerberosEnabled()) { return action.run(); } // 确保本地已初始化 if (!initialized || loginUser == null || !loginUser.hasKerberosCredentials() || !loginUser.isFromKeytab()) { initKerberos(); } return loginUser.doAs(action); } /** * 检查和重新登录 */ public static void checkAndRelogin() throws IOException { if (!ConfigManager.isKerberosEnabled()) return; if (!initialized || loginUser == null || !loginUser.hasKerberosCredentials() || !loginUser.isFromKeytab()) { initKerberos(); return; } try { loginUser.checkTGTAndReloginFromKeytab(); logger.debug(\"Kerberos票据已刷新\"); } catch (Exception e) { logger.error(\"票据刷新失败,重新登录\", e); synchronized (LOCK) { loginUser = null; initialized = false; initKerberos(); } } }}
4.1.4 系统属性设置
private static void setupKerberosSystemProperties() { try { String krb5Config = ConfigManager.getKrb5ConfPath(); if (krb5Config != null && new File(krb5Config).exists()) { System.setProperty(\"java.security.krb5.conf\", krb5Config); } String jaasConfig = ConfigManager.getJaasConfPath(); if (jaasConfig != null && new File(jaasConfig).exists()) { System.setProperty(\"java.security.auth.login.config\", jaasConfig); } System.setProperty(\"javax.security.auth.useSubjectCredsOnly\", \"false\"); System.setProperty(\"hadoop.security.authentication\", \"kerberos\"); // 调试模式 if (ConfigManager.isKerberosDebugEnabled()) { System.setProperty(\"sun.security.krb5.debug\", \"true\"); System.setProperty(\"sun.security.jgss.debug\", \"true\"); } } catch (Exception e) { throw new RuntimeException(\"设置Kerberos系统属性失败\", e); }}
4.2 HDFS文件操作机制
4.2.1 设计思路
-
连接管理:单例模式管理FileSystem连接
-
重试机制:网络异常时自动重试
-
安全封装:统一的Kerberos安全调用
-
错误处理:详细的异常分类和处理
4.2.2 核心实现
public class HdfsUtils { private static volatile FileSystem fileSystem; private static volatile boolean initialized = false; /** * 统一的Kerberos doAs封装 */ private static T doKerberosAction(PrivilegedExceptionAction action) throws IOException { if (ConfigManager.isKerberosEnabled()) { try { return KerberosUtils.doAs(action); } catch (Exception e) { throw new IOException(\"Kerberos doAs操作失败\", e); } } else { try { return action.run(); } catch (Exception e) { throw new IOException(\"操作失败\", e); } } } /** * 带重试机制的文件读取 */ public static FSDataInputStream openFileWithRetry(String path, int maxRetries) throws IOException { IOException lastException = null; for (int attempt = 1; attempt { if (ConfigManager.isKerberosEnabled()) { KerberosUtils.checkAndRelogin(); } Configuration conf = createHadoopConfiguration(); FileSystem fs = FileSystem.get(conf); return fs.open(new Path(path)); }); } catch (IOException e) { lastException = e; logger.warn(\"打开文件失败 (第{}/{}次): {}\", attempt, maxRetries, e.getMessage()); if (attempt < maxRetries) { try { Thread.sleep(5000); // 等待5秒 reinitializeFileSystem(); // 重新初始化连接 } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new IOException(\"重试被中断\", ie); } } } } throw new IOException(\"文件打开失败,已重试\" + maxRetries + \"次: \" + path, lastException); } /** * 创建增强的Hadoop配置 */ private static Configuration createHadoopConfiguration() { Configuration conf = new Configuration(); // HDFS HA配置 conf.set(\"fs.defaultFS\", \"hdfs://nameservice1\"); conf.set(\"dfs.nameservices\", \"nameservice1\"); conf.set(\"dfs.ha.namenodes.nameservice1\", \"nn1,nn2\"); conf.set(\"dfs.namenode.rpc-address.nameservice1.nn1\", \"ddp1:8020\"); conf.set(\"dfs.namenode.rpc-address.nameservice1.nn2\", \"ddp2:8020\"); conf.set(\"dfs.client.failover.proxy.provider.nameservice1\", \"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\"); // 网络优化配置 conf.setBoolean(\"dfs.client.use.datanode.hostname\", false); conf.setBoolean(\"dfs.datanode.use.datanode.hostname\", false); conf.setBoolean(\"dfs.client.read.shortcircuit\", false); conf.setBoolean(\"dfs.domain.socket.path.disable\", true); // 超时和重试配置 conf.setInt(\"dfs.client.socket-timeout\", 300000); // 5分钟 conf.setInt(\"ipc.client.connect.timeout\", 120000); // 2分钟 conf.setInt(\"ipc.client.connect.max.retries\", 10); conf.setInt(\"ipc.client.rpc.timeout\", 300000); // 安全配置 if (ConfigManager.isKerberosEnabled()) { conf.set(\"hadoop.security.authentication\", \"kerberos\"); conf.set(\"hadoop.security.authorization\", \"true\"); conf.set(\"dfs.data.transfer.protection\", \"authentication\"); } else { conf.set(\"hadoop.security.authentication\", \"simple\"); conf.set(\"hadoop.security.authorization\", \"false\"); } return conf; }}
4.3 Parquet文件写入机制
4.3.1 设计原理
-
批量写入:采用缓冲区机制,提高写入效率
-
Schema管理:使用Avro Schema定义数据结构
-
压缩优化:采用Snappy压缩算法
-
并发安全:支持多线程并发写入
4.3.2 Avro Schema定义
public class ParquetWriterUtils { // InfoAppendix的Avro Schema private static final String INFO_APPENDIX_SCHEMA = \"{\\n\" + \" \\\"type\\\": \\\"record\\\",\\n\" + \" \\\"name\\\": \\\"InfoAppendix\\\",\\n\" + \" \\\"namespace\\\": \\\"com.train.data.entity\\\",\\n\" + \" \\\"fields\\\": [\\n\" + \" {\\\"name\\\": \\\"statistDate\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"statisticsDate\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"innerCode\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"officeNo\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"windowNo\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"ticketNo\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"idKind\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"idNo\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"idName\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"areaCenterCode\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"},\\n\" + \" {\\\"name\\\": \\\"pk\\\", \\\"type\\\": \\\"string\\\", \\\"default\\\": \\\"\\\"}\\n\" + \" ]\\n\" + \"}\";}
4.3.3 批量写入实现
private static void doWriteInfoAppendixToParquet(List records, String partitionValue) throws IOException { String tableName = ConfigManager.getInfoAppendixTableName(); String outputPath = buildParquetFilePath(tableName, partitionValue, \"info_appendix\"); logger.info(\"开始写入InfoAppendix数据到Parquet文件: {}, 记录数: {}\", outputPath, records.size()); // 1. 解析Schema Schema schema = new Schema.Parser().parse(INFO_APPENDIX_SCHEMA); // 2. 创建Hadoop配置 Configuration conf = createHadoopConfiguration(); Path parquetPath = new Path(outputPath); // 3. 创建Parquet写入器 ParquetWriter writer = null; try { writer = AvroParquetWriter.builder(parquetPath) .withSchema(schema) .withConf(conf) .withCompressionCodec(CompressionCodecName.SNAPPY) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build(); // 4. 数据转换和写入 int count = 0; for (InfoAppendix record : records) { GenericRecord avroRecord = new GenericData.Record(schema); // 字段映射 avroRecord.put(\"statistDate\", safeString(record.getStatistDate())); avroRecord.put(\"statisticsDate\", safeString(record.getStatisticsDate())); avroRecord.put(\"innerCode\", safeString(record.getInnerCode())); avroRecord.put(\"officeNo\", safeString(record.getOfficeNo())); avroRecord.put(\"windowNo\", safeString(record.getWindowNo())); avroRecord.put(\"ticketNo\", safeString(record.getTicketNo())); avroRecord.put(\"idKind\", safeString(record.getIdKind())); avroRecord.put(\"idNo\", safeString(record.getIdNo())); avroRecord.put(\"idName\", safeString(record.getIdName())); avroRecord.put(\"areaCenterCode\", safeString(record.getAreaCenterCode())); avroRecord.put(\"pk\", safeString(record.getPk())); writer.write(avroRecord); count++; // 进度日志 if (count <= 5 || count % 100 == 0) { logger.info(\"已写入第{}条InfoAppendix记录: ticketNo={}\", count, record.getTicketNo()); } } logger.info(\"InfoAppendix数据写入完成: {}, 记录数: {}\", outputPath, count); } finally { if (writer != null) { try { writer.close(); } catch (Exception e) { logger.error(\"关闭ParquetWriter失败\", e); } } }}/** * 安全的字符串转换,避免null值 */private static String safeString(String value) { return value == null ? \"\" : value;}/** * 构建Parquet文件路径,避免文件名冲突 */private static String buildParquetFilePath(String tableName, String partitionValue, String filePrefix) { String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern(\"yyyyMMdd_HHmmss_SSS\")); String uuid = UUID.randomUUID().toString().substring(0, 8); String threadId = String.valueOf(Thread.currentThread().getId()); String fileName = String.format(\"%s_%s_%s_%s.parquet\", filePrefix, timestamp, threadId, uuid); return String.format(\"%s/%s/sdate=%s/%s\", ConfigManager.getHiveWarehouseDir(), tableName, partitionValue, fileName);}
4.4 定时调度机制
4.4.1 Quartz调度配置
public class QuartzMain { public static void main(String[] args) throws Exception { // 1. 创建作业详情 JobDetail job = JobBuilder.newJob(TrainInfoMergeQuartzJob.class) .withIdentity(\"mergeJob\", \"group1\") .storeDurably(true) // 持久化作业 .build(); // 2. 创建触发器 - 每小时的第1分钟执行 Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(\"mergeTrigger\", \"group1\") .withSchedule(CronScheduleBuilder.cronSchedule(\"0 1 * * * ?\") .withMisfireHandlingInstructionDoNothing()) // 错过执行时间不补执行 .build(); // 3. 创建调度器 SchedulerFactory factory = new StdSchedulerFactory(); Scheduler scheduler = factory.getScheduler(); // 4. 启动调度器 scheduler.start(); logger.info(\"Quartz调度器启动成功\"); // 5. 注册作业和触发器 scheduler.scheduleJob(job, trigger); logger.info(\"数据合并作业调度注册成功,执行计划: 每小时第1分钟\"); // 6. 添加关闭钩子 Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { scheduler.shutdown(true); logger.info(\"Quartz调度器已关闭\"); } catch (SchedulerException e) { logger.error(\"关闭调度器失败\", e); } })); }}/** * Quartz作业实现类 */public class TrainInfoMergeQuartzJob implements Job { private static final Logger logger = LogManager.getLogger(TrainInfoMergeQuartzJob.class); @Override public void execute(JobExecutionContext context) throws JobExecutionException { logger.info(\"开始执行定时数据合并作业...\"); try { long startTime = System.currentTimeMillis(); // 调用合并作业 TrainInfoMergeJob.runMergeOnce(); long duration = System.currentTimeMillis() - startTime; logger.info(\"数据合并作业执行完成,耗时: {}ms\", duration); } catch (Exception e) { logger.error(\"数据合并作业执行失败\", e); throw new JobExecutionException(\"数据合并作业执行失败: \" + e.getMessage(), e); } }}
4.4.2 文件扫描调度
public class TextFileSourceFunction extends RichSourceFunction { private volatile boolean isRunning = true; private final long scanInterval; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.scanInterval = ConfigManager.getZipProcessorInterval(); // 从配置读取扫描间隔 // TaskManager中的Kerberos认证 if (ConfigManager.isKerberosEnabled()) { KerberosUtils.reloginInTaskManager(); } } @Override public void run(SourceContext ctx) throws Exception { logger.info(\"文件扫描服务启动,扫描间隔: {}ms\", scanInterval); while (isRunning) { try { String inputPath = ConfigManager.getHdfsInputPath(); // 1. 扫描目录获取文件列表 List textFiles = scanTargetFiles(inputPath); // 2. 发送文件路径到下游 for (String file : textFiles) { logger.info(\"发现目标文件: {}\", file); ctx.collect(file); } // 3. 等待下次扫描 Thread.sleep(scanInterval); } catch (InterruptedException e) { logger.info(\"文件扫描服务被中断\"); break; } catch (Exception e) { logger.error(\"文件扫描异常\", e); // 异常重试机制 try { Thread.sleep(Math.min(scanInterval, 60000)); // 最多等待1分钟 } catch (InterruptedException ie) { break; } } } logger.info(\"文件扫描服务已停止\"); } /** * 扫描目标文件 */ private List scanTargetFiles(String inputPath) throws Exception { return KerberosUtils.doAs(() -> { List targetFiles = new ArrayList(); // 获取所有txt文件 List allFiles = HdfsUtils.listFilesByExtension(inputPath, \".txt\"); // 过滤目标文件 for (String file : allFiles) { String fileName = file.substring(file.lastIndexOf(\'/\') + 1); if (fileName.contains(\"GASMZ_info_appendix\") || fileName.contains(\"GASMZ_sale\")) { targetFiles.add(file); } } logger.debug(\"扫描目录 {}, 找到目标文件 {} 个\", inputPath, targetFiles.size()); return targetFiles; }); } @Override public void cancel() { isRunning = false; logger.info(\"文件扫描服务收到停止信号\"); }}
5. 配置管理
5.1 ConfigManager设计
public class ConfigManager { private static Properties properties = new Properties(); private static boolean loaded = false; static { loadProperties(); } private static void loadProperties() { try (InputStream input = ConfigManager.class.getClassLoader() .getResourceAsStream(\"application.properties\")) { if (input == null) { throw new RuntimeException(\"找不到配置文件 application.properties\"); } properties.load(input); loaded = true; logger.info(\"配置文件加载成功\"); } catch (IOException e) { throw new RuntimeException(\"加载配置文件失败\", e); } } // HDFS配置 public static String getHdfsNameNodeUrl() { return getProperty(\"hdfs.namenode.url\", \"hdfs://nameservice1\"); } public static String getHdfsInputPath() { return getProperty(\"hdfs.input.path\", \"/testInput\"); } // Hive配置 public static String getHiveDatabaseName() { return getProperty(\"hive.database.name\", \"default\"); } public static String getHiveWarehouseDir() { return getProperty(\"hive.warehouse.dir\", \"/user/hive/warehouse\"); } public static String getInfoAppendixTableName() { return getProperty(\"hive.table.info_appendix\", \"info_appendix\"); } public static String getSaleRecordTableName() { return getProperty(\"hive.table.sale_record\", \"sale_record\"); } public static String getTrainInfoTableName() { return getProperty(\"hive.table.train_info\", \"train_info\"); } // Kerberos配置 public static boolean isKerberosEnabled() { return Boolean.parseBoolean(getProperty(\"security.kerberos.enabled\", \"false\")); } public static String getKerberosKeytab() { return getProperty(\"security.kerberos.login.keytab\"); } public static String getKerberosPrincipal() { return getProperty(\"security.kerberos.login.principal\"); } public static String getKrb5ConfPath() { return getProperty(\"security.kerberos.krb5.conf\"); } public static String getJaasConfPath() { return getProperty(\"security.jaas.conf\"); } // Flink配置 public static int getFlinkJobParallelism() { return Integer.parseInt(getProperty(\"flink.job.parallelism\", \"2\")); } public static long getFlinkCheckpointInterval() { return Long.parseLong(getProperty(\"flink.checkpoint.interval\", \"60000\")); } public static long getFlinkCheckpointTimeout() { return Long.parseLong(getProperty(\"flink.checkpoint.timeout\", \"600000\")); } public static int getFlinkRestartAttempts() { return Integer.parseInt(getProperty(\"flink.restart.attempts\", \"3\")); } public static long getFlinkRestartDelay() { return Long.parseLong(getProperty(\"flink.restart.delay\", \"10000\")); } // 调度配置 public static long getZipProcessorInterval() { return Long.parseLong(getProperty(\"scheduler.zip.processor.interval\", \"300000\")); } public static String getTrainInfoMergeCron() { return getProperty(\"scheduler.train_info.merge.cron\", \"0 1 * * * ?\"); } // 文件处理配置 public static String getFileDelimiter() { return getProperty(\"file.delimiter\", \";\"); } private static String getProperty(String key, String defaultValue) { return properties.getProperty(key, defaultValue); } private static String getProperty(String key) { String value = properties.getProperty(key); if (value == null || value.trim().isEmpty()) { throw new RuntimeException(\"配置项缺失: \" + key); } return value.trim(); }}
5.2 详细配置说明
5.2.1 application.properties完整配置
# ========== HDFS配置 ==========# HDFS NameNode地址,支持HA模式hdfs.namenode.url=hdfs://nameservice1# 输入文件目录hdfs.input.path=/testInput# 临时目录hdfs.temp.path=/tmp/train_data_processor# ========== HDFS HA配置 ==========dfs.nameservices=nameservice1dfs.ha.namenodes.nameservice1=nn1,nn2dfs.namenode.rpc-address.nameservice1.nn1=(服务器ip):(namenode)dfs.namenode.rpc-address.nameservice1.nn2=(服务器ip):(namenode)dfs.client.failover.proxy.provider.nameservice1=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider# ========== 网络优化配置 ==========# 禁用DataNode hostname使用,避免网络问题dfs.client.use.datanode.hostname=falsedfs.datanode.use.datanode.hostname=false# 禁用短路读取dfs.client.read.shortcircuit=falsedfs.domain.socket.path.disable=true# 超时配置hdfs.client.socket.timeout=300000hdfs.client.connect.timeout=120000hdfs.client.retry.max.attempts=5# ========== Hive配置 ==========# Hive Metastore地址hive.metastore.uris=thrift://(服务器ip):9083# 数据库名称hive.database.name=default# 数据仓库目录hive.warehouse.dir=/user/hive/warehouse# Hive配置目录hive.conf.dir=/home/flink/flink-1.18.1/plugins/hadoop-kerberos# ========== 表名配置 ==========hive.table.info_appendix=info_appendixhive.table.sale_record=sale_recordhive.table.train_info=train_info# ========== 文件处理配置 ==========file.encoding=UTF-8file.delimiter=;zip.extract.temp.dir=/tmp/zip_extract# ========== 调度配置 ==========# 文件处理间隔(毫秒)scheduler.zip.processor.interval=300000# 数据合并调度表达式scheduler.train_info.merge.cron=0 1 * * * ?# ========== Flink配置 ==========flink.job.parallelism=2flink.checkpoint.interval=60000flink.checkpoint.timeout=600000flink.restart.strategy=fixed-delayflink.restart.attempts=3flink.restart.delay=10000# ========== Kerberos配置 ==========security.kerberos.enabled=truesecurity.kerberos.login.use-ticket-cache=falsesecurity.kerberos.login.keytab=/home/flink/flink-1.18.1/plugins/hadoop-kerberos/hive.service.keytabsecurity.kerberos.login.principal=hive/ddp1@HADOOP.COMsecurity.kerberos.realm=HADOOP.COMsecurity.kerberos.kdc=(服务器ip)security.kerberos.krb5.conf=/home/flink/flink-1.18.1/plugins/hadoop-kerberos/krb5.confsecurity.jaas.conf=/home/flink/flink-1.18.1/plugins/hadoop-kerberos/flink-jaas.confsecurity.kerberos.debug=false# ========== Hadoop安全配置 ==========hadoop.security.authentication=kerberoshadoop.security.authorization=truehadoop.rpc.protection=authenticationhadoop.security.token.service.use_ip=falsedfs.data.transfer.protection=authentication# ========== IPC配置 ==========ipc.client.connect.timeout=60000ipc.client.connect.max.retries=5ipc.client.rpc.timeout=120000# ========== 调试配置 ==========enable.hive.sink=truedebug.print.parsed.records=truedebug.max.records.to.print=10# ========== 日志配置 ==========log.level=INFOlog.file.path=/var/log/flink-train-processorlog.file.max.size=100MBlog.file.max.history=30
5.2.2 flink-jaas.conf完整配置
Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true useTicketCache=false keyTab=\"/home/flink/flink-1.18.1/plugins/hadoop-kerberos/hive.service.keytab\" principal=\"hive/ddp1@HADOOP.COM\" debug=true;};FlinkClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true useTicketCache=false keyTab=\"/home/flink/flink-1.18.1/plugins/hadoop-kerberos/hive.service.keytab\" principal=\"hive/ddp1@HADOOP.COM\" debug=true;};