> 技术文档 > Flink Stream API 源码走读 - socketTextStream_flink stream api源码走读-sockettextstream

Flink Stream API 源码走读 - socketTextStream_flink stream api源码走读-sockettextstream


概述

本文深入分析了 Flink 中 socketTextStream() 方法的源码实现,从用户API调用到最终返回 DataStream 的完整流程。

核心知识点

1. socketTextStream 方法重载链

// 用户调用入口env.socketTextStream(\"hostname\", 9999) ↓ 补充分隔符参数env.socketTextStream(\"hostname\", 9999, \"\\n\") ↓ 补充重试次数参数env.socketTextStream(\"hostname\", 9999, \"\\n\", 0) ↓ 创建 SocketTextStreamFunctionaddSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), \"Socket Stream\")

重载过程分析:

  • 第一层:补充分隔符参数(默认 “\\n”)
  • 第二层:补充重试次数参数(默认 0)
  • 最终:创建 SocketTextStreamFunction 并调用 addSource

2. SourceFunction 的重要说明

@Deprecatedpublic class SocketTextStreamFunction implements SourceFunction<String>

⚠️ 重要提醒:

  • SourceFunction 已被标记为 @Deprecated(过时)
  • 官方建议使用新的 Source API
  • 基于 SourceFunction 的架构是老架构
  • 新架构基于 org.apache.flink.api.connector.source.Source

3. addSource 方法的重载链

#mermaid-svg-TWddEtUwT6svfMOH {font-family:\"trebuchet ms\",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-TWddEtUwT6svfMOH .error-icon{fill:#552222;}#mermaid-svg-TWddEtUwT6svfMOH .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-TWddEtUwT6svfMOH .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-TWddEtUwT6svfMOH .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-TWddEtUwT6svfMOH .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-TWddEtUwT6svfMOH .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-TWddEtUwT6svfMOH .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-TWddEtUwT6svfMOH .marker{fill:#333333;stroke:#333333;}#mermaid-svg-TWddEtUwT6svfMOH .marker.cross{stroke:#333333;}#mermaid-svg-TWddEtUwT6svfMOH svg{font-family:\"trebuchet ms\",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-TWddEtUwT6svfMOH .label{font-family:\"trebuchet ms\",verdana,arial,sans-serif;color:#333;}#mermaid-svg-TWddEtUwT6svfMOH .cluster-label text{fill:#333;}#mermaid-svg-TWddEtUwT6svfMOH .cluster-label span{color:#333;}#mermaid-svg-TWddEtUwT6svfMOH .label text,#mermaid-svg-TWddEtUwT6svfMOH span{fill:#333;color:#333;}#mermaid-svg-TWddEtUwT6svfMOH .node rect,#mermaid-svg-TWddEtUwT6svfMOH .node circle,#mermaid-svg-TWddEtUwT6svfMOH .node ellipse,#mermaid-svg-TWddEtUwT6svfMOH .node polygon,#mermaid-svg-TWddEtUwT6svfMOH .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-TWddEtUwT6svfMOH .node .label{text-align:center;}#mermaid-svg-TWddEtUwT6svfMOH .node.clickable{cursor:pointer;}#mermaid-svg-TWddEtUwT6svfMOH .arrowheadPath{fill:#333333;}#mermaid-svg-TWddEtUwT6svfMOH .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-TWddEtUwT6svfMOH .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-TWddEtUwT6svfMOH .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-TWddEtUwT6svfMOH .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-TWddEtUwT6svfMOH .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-TWddEtUwT6svfMOH .cluster text{fill:#333;}#mermaid-svg-TWddEtUwT6svfMOH .cluster span{color:#333;}#mermaid-svg-TWddEtUwT6svfMOH div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:\"trebuchet ms\",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-TWddEtUwT6svfMOH :root{--mermaid-font-family:\"trebuchet ms\",verdana,arial,sans-serif;} addSource(function, sourceName) addSource(function, sourceName, null) addSource(function, sourceName, typeInfo, CONTINUOUS_UNBOUNDED) 核心处理逻辑

参数补充过程:

  1. addSource(function, \"Socket Stream\")
  2. addSource(function, \"Socket Stream\", null) - 补充 TypeInformation 为 null
  3. addSource(function, \"Socket Stream\", null, CONTINUOUS_UNBOUNDED) - 补充有界性

4. 核心处理逻辑分析

private <OUT> DataStreamSource<OUT> addSource( final SourceFunction<OUT> function, final String sourceName, @Nullable final TypeInformation<OUT> typeInfo, final Boundedness boundedness) { // 1. 非空检查 checkNotNull(function); checkNotNull(sourceName); checkNotNull(boundedness); // 2. 抽取类型信息 TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo); // 3. 判断是否并行 boolean isParallel = function instanceof ParallelSourceFunction; // 4. 序列化检查 clean(function); // 5. Function → Operator final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function); // 6. 返回 DataStreamSource return new DataStreamSource<>( this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);}

5. 四个核心概念的转换

#mermaid-svg-clCqxN5EwxttlTRo {font-family:\"trebuchet ms\",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-clCqxN5EwxttlTRo .error-icon{fill:#552222;}#mermaid-svg-clCqxN5EwxttlTRo .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-clCqxN5EwxttlTRo .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-clCqxN5EwxttlTRo .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-clCqxN5EwxttlTRo .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-clCqxN5EwxttlTRo .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-clCqxN5EwxttlTRo .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-clCqxN5EwxttlTRo .marker{fill:#333333;stroke:#333333;}#mermaid-svg-clCqxN5EwxttlTRo .marker.cross{stroke:#333333;}#mermaid-svg-clCqxN5EwxttlTRo svg{font-family:\"trebuchet ms\",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-clCqxN5EwxttlTRo .label{font-family:\"trebuchet ms\",verdana,arial,sans-serif;color:#333;}#mermaid-svg-clCqxN5EwxttlTRo .cluster-label text{fill:#333;}#mermaid-svg-clCqxN5EwxttlTRo .cluster-label span{color:#333;}#mermaid-svg-clCqxN5EwxttlTRo .label text,#mermaid-svg-clCqxN5EwxttlTRo span{fill:#333;color:#333;}#mermaid-svg-clCqxN5EwxttlTRo .node rect,#mermaid-svg-clCqxN5EwxttlTRo .node circle,#mermaid-svg-clCqxN5EwxttlTRo .node ellipse,#mermaid-svg-clCqxN5EwxttlTRo .node polygon,#mermaid-svg-clCqxN5EwxttlTRo .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-clCqxN5EwxttlTRo .node .label{text-align:center;}#mermaid-svg-clCqxN5EwxttlTRo .node.clickable{cursor:pointer;}#mermaid-svg-clCqxN5EwxttlTRo .arrowheadPath{fill:#333333;}#mermaid-svg-clCqxN5EwxttlTRo .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-clCqxN5EwxttlTRo .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-clCqxN5EwxttlTRo .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-clCqxN5EwxttlTRo .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-clCqxN5EwxttlTRo .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-clCqxN5EwxttlTRo .cluster text{fill:#333;}#mermaid-svg-clCqxN5EwxttlTRo .cluster span{color:#333;}#mermaid-svg-clCqxN5EwxttlTRo div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:\"trebuchet ms\",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-clCqxN5EwxttlTRo :root{--mermaid-font-family:\"trebuchet ms\",verdana,arial,sans-serif;} Function
用户逻辑 Operator
算子封装 Transformation
转换操作 DataStream
用户API

概念解释:

  1. Function: 用户的业务逻辑封装

    • SocketTextStreamFunction - Socket连接和数据读取逻辑
    • 继承自 SourceFunction
  2. Operator: 算子的抽象

    • StreamSource - 将Function包装成算子
    • 继承自 AbstractUdfStreamOperator
  3. Transformation: 转换操作的封装

    • LegacySourceTransformation - 包装Operator和相关元信息
    • 包含类型信息、并行度、有界性等
  4. DataStream: 面向用户的流式API

    • DataStreamSource - 继承自 DataStream
    • 支持链式调用(map、filter、keyBy等)

6. 重要参数说明

TypeInformation(类型信息)
// 为什么需要 TypeInformation?// Java 泛型在编译后会被类型擦除,Flink需要显式的类型信息来:// 1. 创建序列化器/反序列化器// 2. 根据不同类型产生不同的序列化机制TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);

想更好理解,可以去看 我这篇文章:Java 泛型类型擦除

Boundedness(有界性)
// CONTINUOUS_UNBOUNDED 表示无界流// 在翻译成物理执行计划时会用到这个信息// 有界流和无界流会生成不同的执行计划Boundedness.CONTINUOUS_UNBOUNDED
并行性检查
// 检查是否为并行源函数boolean isParallel = function instanceof ParallelSourceFunction;// SocketTextStreamFunction 不是 ParallelSourceFunction,所以 isParallel = false

7. DataStreamSource 的构造

public DataStreamSource( StreamExecutionEnvironment environment, TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator, boolean isParallel, String sourceName, Boundedness boundedness) { // 调用父类构造,创建 LegacySourceTransformation super(environment, new LegacySourceTransformation<>( sourceName, operator, outTypeInfo, environment.getParallelism(), boundedness)); // 如果不是并行的,设置并行度为1 if (!isParallel) { setParallelism(1); }}

8. 继承关系分析

#mermaid-svg-XSpLJd4qsmDz5KLX {font-family:\"trebuchet ms\",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-XSpLJd4qsmDz5KLX .error-icon{fill:#552222;}#mermaid-svg-XSpLJd4qsmDz5KLX .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-XSpLJd4qsmDz5KLX .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-XSpLJd4qsmDz5KLX .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-XSpLJd4qsmDz5KLX .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-XSpLJd4qsmDz5KLX .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-XSpLJd4qsmDz5KLX .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-XSpLJd4qsmDz5KLX .marker{fill:#333333;stroke:#333333;}#mermaid-svg-XSpLJd4qsmDz5KLX .marker.cross{stroke:#333333;}#mermaid-svg-XSpLJd4qsmDz5KLX svg{font-family:\"trebuchet ms\",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-XSpLJd4qsmDz5KLX .label{font-family:\"trebuchet ms\",verdana,arial,sans-serif;color:#333;}#mermaid-svg-XSpLJd4qsmDz5KLX .cluster-label text{fill:#333;}#mermaid-svg-XSpLJd4qsmDz5KLX .cluster-label span{color:#333;}#mermaid-svg-XSpLJd4qsmDz5KLX .label text,#mermaid-svg-XSpLJd4qsmDz5KLX span{fill:#333;color:#333;}#mermaid-svg-XSpLJd4qsmDz5KLX .node rect,#mermaid-svg-XSpLJd4qsmDz5KLX .node circle,#mermaid-svg-XSpLJd4qsmDz5KLX .node ellipse,#mermaid-svg-XSpLJd4qsmDz5KLX .node polygon,#mermaid-svg-XSpLJd4qsmDz5KLX .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-XSpLJd4qsmDz5KLX .node .label{text-align:center;}#mermaid-svg-XSpLJd4qsmDz5KLX .node.clickable{cursor:pointer;}#mermaid-svg-XSpLJd4qsmDz5KLX .arrowheadPath{fill:#333333;}#mermaid-svg-XSpLJd4qsmDz5KLX .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-XSpLJd4qsmDz5KLX .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-XSpLJd4qsmDz5KLX .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-XSpLJd4qsmDz5KLX .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-XSpLJd4qsmDz5KLX .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-XSpLJd4qsmDz5KLX .cluster text{fill:#333;}#mermaid-svg-XSpLJd4qsmDz5KLX .cluster span{color:#333;}#mermaid-svg-XSpLJd4qsmDz5KLX div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:\"trebuchet ms\",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-XSpLJd4qsmDz5KLX :root{--mermaid-font-family:\"trebuchet ms\",verdana,arial,sans-serif;} DataStream SingleOutputStreamOperator DataStreamSource 包含所有流式API
map, filter, keyBy, window等

重要理解:

  • DataStreamSource 本质上就是一个 DataStream
  • 所有的链式调用API都定义在 DataStream
  • SingleOutputStreamOperator 这个命名容易误导,它实际上是个 DataStream

9. DataStream 的内部结构

public class DataStream<T> { // 两个最重要的成员 protected final StreamExecutionEnvironment environment; // 执行环境 protected final Transformation<T> transformation; // 转换操作}

关系链:

  • DataStream 包含 Transformation
  • Transformation 包含 Operator
  • Operator 包含 Function

10. 链式调用的实现

DataStream<String> stream = env.socketTextStream(\"localhost\", 9999) .map(...)  // 返回 SingleOutputStreamOperator (实际是DataStream) .filter(...) // 返回 SingleOutputStreamOperator  .keyBy(...) // 返回 KeyedStream .window(...) // 返回 WindowedStream .sum(...)  // 返回 SingleOutputStreamOperator .print(); // 返回 DataStreamSink

流程:
DataStreamSource → 各种变换 → DataStreamSink

总结

核心流程回顾

  1. 用户调用 env.socketTextStream(hostname, port)
  2. 参数补全 通过重载方法逐步补充参数
  3. Function创建 创建 SocketTextStreamFunction
  4. addSource调用 进入核心处理逻辑
  5. 类型推断 抽取输出数据的类型信息
  6. 并行性检查 判断是否为并行源函数
  7. Function→Operator 封装成 StreamSource
  8. Operator→Transformation 创建 LegacySourceTransformation
  9. 返回DataStream 创建 DataStreamSource

设计模式体现

  • 装饰器模式: Function → Operator → Transformation → DataStream
  • 建造者模式: 通过重载方法逐步构建完整对象
  • 模板方法模式: addSource的处理流程

关键技术点

  • 类型擦除处理: 通过 TypeInformation 解决Java泛型擦除问题
  • 序列化机制: 根据类型信息创建对应的序列化器
  • 并行度控制: 非并行源强制设置并行度为1
  • 有界性标识: 为后续执行计划生成提供信息

下节预告

Flink Stream API 源码走读 map和 flatmap
返回目录:Flink 源码系列 - 前言


注意: 基于 Flink 1.18 版本,SourceFunction 已被标记为过时,实际项目中建议使用新的 Source API。