Flink流数据分流:Filter与侧输出流实战_flink filter
目录
代码总结
1. 环境设置
2. 数据源
3. 使用 filter 进行分流
4. 使用侧输出流进行分流
5. 执行任务
代码拓展
1. 侧输出流的应用场景
2. 性能优化
3. 更复杂的分流逻辑
4. 侧输出流与窗口结合
5. 错误处理与日志
总结
package transformplusimport org.apache.flink.streaming.api.functions.ProcessFunctionimport org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collectorimport source.{ClickSource, Event}/** * * @PROJECT_NAME: flink1.13 * @PACKAGE_NAME: splitstream * @author: 赵嘉盟-HONOR * @data: 2025-05-15 17:51 * @DESCRIPTION * */object Split { def main(args: Array[String]): Unit = { val env=StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val data=env.addSource(new ClickSource).assignAscendingTimestamps(_.timestamp) //TODO 使用filter进行分流 data.filter(_.user == \"Mary\").print(\"mary_stream\") data.filter(_.user == \"Bob\").print(\"bob_stream\") data.filter(data=>data.user != \"Mary\" && data.user != \"Bob\").print(\"else_stream\") //TODO 使用测输出流进行分流 val mary_tag=OutputTag[(String,String,Long)](\"mary_tag\") val bob_tag=OutputTag[(String,String,Long)](\"bob_tag\") val outputStream = data.process(new ProcessFunction[Event, Event] { override def processElement(i: Event, context: ProcessFunction[Event, Event]#Context, collector: Collector[Event]): Unit = { i.user match { case \"Mary\" => context.output(mary_tag,(i.user,i.url,i.timestamp)) case \"Bob\" => context.output(bob_tag,(i.user,i.url,i.timestamp)) case _ => collector.collect(i) } } }) outputStream.print(\"else_tag\") outputStream.getSideOutput(mary_tag).print(\"mary_tag\") outputStream.getSideOutput(bob_tag).print(\"bob_tag\") env.execute(\"splitStream\") }}
这段代码展示了如何使用 Apache Flink 对流数据进行分流操作,分别通过 filter
和 侧输出流(Side Output) 的方式实现。以下是对代码的详细总结和拓展:
代码总结
1. 环境设置
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)
StreamExecutionEnvironment
是 Flink 流处理程序的入口。setParallelism(1)
设置并行度为 1,意味着所有操作都在单个线程中执行,通常用于调试和测试。
2. 数据源
val data = env.addSource(new ClickSource).assignAscendingTimestamps(_.timestamp)
ClickSource
是一个自定义的数据源,生成模拟的点击事件流。assignAscendingTimestamps
为流中的事件分配时间戳,用于后续基于时间的操作(如窗口、定时器等)。
3. 使用 filter
进行分流
data.filter(_.user == \"Mary\").print(\"mary_stream\")data.filter(_.user == \"Bob\").print(\"bob_stream\")data.filter(data => data.user != \"Mary\" && data.user != \"Bob\").print(\"else_stream\")
filter
是 Flink 的过滤操作符,用于筛选满足条件的元素。- 这里将流分为三部分:
mary_stream
:用户为 \"Mary\" 的事件。bob_stream
:用户为 \"Bob\" 的事件。else_stream
:其他用户的事件。
- 缺点:
filter
需要对同一个流进行多次遍历,效率较低。
4. 使用侧输出流进行分流
val mary_tag = OutputTag[(String, String, Long)](\"mary_tag\")val bob_tag = OutputTag[(String, String, Long)](\"bob_tag\")val outputStream = data.process(new ProcessFunction[Event, Event] { override def processElement(i: Event, context: ProcessFunction[Event, Event]#Context, collector: Collector[Event]): Unit = { i.user match { case \"Mary\" => context.output(mary_tag, (i.user, i.url, i.timestamp)) case \"Bob\" => context.output(bob_tag, (i.user, i.url, i.timestamp)) case _ => collector.collect(i) } }})outputStream.print(\"else_tag\")outputStream.getSideOutput(mary_tag).print(\"mary_tag\")outputStream.getSideOutput(bob_tag).print(\"bob_tag\")
- 侧输出流:允许在同一个
ProcessFunction
中将数据分流到多个输出流中。 OutputTag
用于定义侧输出流的标签。ProcessFunction
是一个低级别的流处理函数,可以访问上下文(Context
)和收集器(Collector
)。context.output
将数据发送到侧输出流。collector.collect
将数据发送到主输出流。
getSideOutput
用于从侧输出流中获取数据。- 优点:只需遍历流一次,效率更高。
5. 执行任务
env.execute(\"splitStream\")
execute
方法启动 Flink 作业的执行。
代码拓展
1. 侧输出流的应用场景
侧输出流非常适合以下场景:
- 分流:将流数据按条件分成多个子流。
- 错误处理:将错误数据发送到侧输出流,主流继续处理正常数据。
- 多路输出:将数据按不同维度分发到多个下游任务。
2. 性能优化
- 避免多次遍历:使用侧输出流代替多次
filter
,减少流数据的重复处理。 - 并行度调整:根据集群资源和任务复杂度,调整并行度以提高性能。
3. 更复杂的分流逻辑
如果需要更复杂的分流逻辑(如基于多个条件或动态规则),可以结合 ProcessFunction
和状态管理(如 ValueState
)实现。
4. 侧输出流与窗口结合
侧输出流可以与窗口操作结合,例如:
- 将窗口内超时的事件发送到侧输出流。
- 将窗口内不符合条件的事件发送到侧输出流。
5. 错误处理与日志
在 ProcessFunction
中,可以捕获异常并将错误信息发送到侧输出流,便于后续分析和处理。
总结
这段代码展示了 Flink 中两种常见的分流方式:
filter
:简单易用,但效率较低,适合小规模数据或简单场景。- 侧输出流:高效灵活,适合大规模数据或复杂场景。
在实际应用中,侧输出流是更推荐的方式,因为它只需遍历流一次,且可以处理更复杂的业务逻辑。通过结合状态管理、定时器和窗口操作,侧输出流可以满足更多高级需求。