> 文档中心 > 【Flink1.14实战】Docker环境 DataStream kafka Source

【Flink1.14实战】Docker环境 DataStream kafka Source


DataStream 连接器

要在应用程序中使用这些连接器之一,通常需要额外的第三方组件,例如用于数据存储或消息队列的服务器。另请注意,虽然本节中列出的流连接器是 Flink 项目的一部分并且包含在源代码版本中,但它们不包含在二进制发行版中。

kafka 连接器

该文档描述的是基于新数据源 API的 Kafka Source。

依赖

Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client 的最新版本保持同步。该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。 当前 Kafka client 向后兼容 0.10.0 或更高版本的 Kafka broker。

<dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-connector-kafka_2.11</artifactId>    <version>1.14.4</version></dependency>

Flink 目前的流连接器还不是二进制发行版的一部分。

Kafka Source

使用方法

Kafka Source 提供了构建类来创建 KafkaSource 的实例。以下代码片段展示了如何构建 KafkaSource 来消费 “input-topic” 最早位点的数据, 使用消费组 “my-group”,并且将 Kafka 消息体反序列化为字符串:

KafkaSource<String> source = KafkaSource.<String>builder()    .setBootstrapServers(brokers)    .setTopics("input-topic")    .setGroupId("my-group")    .setStartingOffsets(OffsetsInitializer.earliest())    .setValueOnlyDeserializer(new SimpleStringSchema())    .build();env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

以下属性在构建 KafkaSource 时是必须指定的:

  • Bootstrap server,通过 setBootstrapServers(String) 方法配置
  • 消费者组 ID,通过 setGroupId(String) 配置
  • 要订阅的 Topic / Partition
  • 用于解析 Kafka 消息的反序列化器(Deserializer)

实战

1、编辑 docker-compose.yml

version: "2.1"services:  zookeeper:    image: wurstmeister/zookeeper:3.4.6  kafka:    image: wurstmeister/kafka:2.12-2.2.1    environment:      KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://:9094      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE      KAFKA_CREATE_TOPICS: "input:2:1, output:2:1"      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181    ports:      - 9092:9092      - 9094:9094jobmanager:    image: flink:1.14.4-scala_2.11    ports:      - "8081:8081"    command: jobmanager    environment:      - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager   taskmanager:    image: flink:1.14.4-scala_2.11    depends_on:      - jobmanager    command: taskmanager    scale: 1    environment:      - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 4   

2、启动服务

$ docker-compose up -d

3、编写kafka数据源程序

package quick;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.connector.kafka.source.KafkaSource;import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;public class KafkaExample {    public static void main(String[] args) throws Exception{ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final ParameterTool params = ParameterTool.fromArgs(args); String brokers = params.get("bootstrap.servers", "kafka:9092"); KafkaSource source = KafkaSource.builder()  .setBootstrapServers(brokers)  .setTopics("input")  .setGroupId("my-group")  .setStartingOffsets(OffsetsInitializer.earliest())  .setValueOnlyDeserializer(new SimpleStringSchema())  .build(); DataStream<Tuple2> dataStream = env  .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")  .flatMap(new Splitter())  .keyBy(value -> value.f0)  .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))  .sum(1); dataStream.print(); env.execute("KafkaExample job");    }    public static class Splitter implements FlatMapFunction<String, Tuple2> { @Override public void flatMap(String sentence, Collector<Tuple2> out) throws Exception {     for (String word: sentence.split(" ")) {  out.collect(new Tuple2(word, 1));     } }    }}

4、kafka生产者不断提交数据

$ docker exec -it kafka_kafka_1 /bin/bash $ cd /opt/kafka# 查看已创建的topic信息$ bin/kafka-topics.sh --describe --topic input --bootstrap-server localhost:9092$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic inputThis is a messageThis is another message

5、flink日志查看

然后,将打包应用程序提交,Flink 的Web UI来提交作业监控集群的状态和正在运行的作业。

参数是 --参数名 值 (中间空格隔开)

程序参数,不用输入--bootstrap.servers kafka:9092

只需输入一些单词,然后按回车键即可传入新单词。这些将作为单词统计程序的输入。如果想查看大于 1 的计数,在 5 秒内重复输入相同的单词即可(如果无法快速输入,则可以将窗口大小从 5 秒增加 ☺)。

$  docker-compose logs -f taskmanager

查看输出

taskmanager_1  | (This,1)taskmanager_1  | (message,1)taskmanager_1  | (a,1)taskmanager_1  | (is,1)