> 文档中心 > 【Flink1.14实战】Docker环境 Kafka SQL 连接器

【Flink1.14实战】Docker环境 Kafka SQL 连接器


Apache Kafka SQL 连接器

Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。

依赖
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.14.4/flink-sql-connector-kafka_2.11-1.14.4.jar
如何创建 Kafka 表

以下示例展示了如何创建 Kafka 表:

CREATE TABLE KafkaTable (  `user_id` BIGINT,  `item_id` BIGINT,  `behavior` STRING,  `ts` TIMESTAMP(3) METADATA FROM 'timestamp') WITH (  'connector' = 'kafka',  'topic' = 'user_behavior',  'properties.bootstrap.servers' = 'localhost:9092',  'properties.group.id' = 'testGroup',  'scan.startup.mode' = 'earliest-offset',  'format' = 'csv')
实战
  • kafka --> flink --> es -->kibana

数据采集存储到kafka,通过flink消费kafka数据,实时计算,结果存储到es,最后通过kibana展现。

基于docker-compose。

1、编辑 docker-compose.yml

version: "3"services:  zookeeper:    image: wurstmeister/zookeeper:3.4.6  kafka:    image: wurstmeister/kafka:2.12-2.2.1    environment:      KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://:9094      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE      KAFKA_CREATE_TOPICS: "input:2:1, output:2:1"      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181    ports:      - 9092:9092      - 9094:9094     depends_on:      - zookeeper  jobmanager:    image: flink:1.14.4-scala_2.11    ports:      - "8081:8081"    command: jobmanager    environment:      - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager   taskmanager:    image: flink:1.14.4-scala_2.11    depends_on:      - jobmanager    command: taskmanager    scale: 1    environment:      - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 4   

2、启动服务

$ docker-compose up -d# 下载依赖包$ wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.14.4/flink-sql-connector-kafka_2.11-1.14.4.jar$ docker cp flink-sql-connector-kafka_2.11-1.14.4.jar  kafka_taskmanager_1:/opt/flink/lib$ docker cp flink-sql-connector-kafka_2.11-1.14.4.jar  kafka_jobmanager_1:/opt/flink/lib# 查看libdocker exec -it kafka_taskmanager_1 /bin/bash# 重启$ docker-compose restart

3、kafka数据

电商用户行为分析共涉及3个表,商品类目信息表、商品类目信息表、用户行为信息表,其中用户行为信息表共5个列:用户ID、商品ID、商品类目ID、行为类型、时间戳;

$ docker exec -it kafka_kafka_1 /bin/bash$ cd /opt/kafka$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic input1,2268318,2520377,pv,16715440701,2333346,2520771,pv,1671561733

数据格式为csv,以逗号分隔

4、Flink SQL建表读取kafka数据

$ docker-compose exec jobmanager ./bin/sql-client.shCREATE TABLE user_behavior (    user_id BIGINT,    item_id BIGINT,    category_id BIGINT,    behavior STRING,app_time BIGINT,ts AS TO_TIMESTAMP(FROM_UNIXTIME(app_time, 'yyyy-MM-dd HH:mm:ss')),    proctime AS PROCTIME(),    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND) WITH (    'connector' = 'kafka',  --使用kafka connector    'topic' = 'input',  --kafka topic    'scan.startup.mode' = 'earliest-offset',      'properties.bootstrap.servers'='kafka:9092',  'properties.group.id' = 'test-group',    'format' = 'csv',  'csv.field-delimiter'=',' );
  • WATERMARK 定义处理混乱次序的事件时间属性,每5秒触发一次window
  • PROCTIME 是内置函数,产生一个虚拟的Processing Time列,偶尔会用到
  • WITH 里定义kafka连接信息和属性
  • 由于事件时间格式为bigint,在sql中将其转为timestamp

5、分析场景

场景1:分析每10分钟累计在线用户数

最终的分析结果数据会写入print

CREATE TABLE users_v (    date_str STRING,    time_str STRING,    uv BIGINT,    PRIMARY KEY (date_str, time_str) NOT ENFORCED) WITH (     'connector' = 'print');

分析每10分钟在线用户数只需要知道日期(date_str)、时间(time_str)、数量(uv)即可;上面已经定义了消费kafka数据的表 user_behavior,现在查询该表,并将数据写入es;

INSERT INTO users_vSELECT date_str, MAX(time_str), COUNT(DISTINCT user_id) as uvFROM (  SELECT    DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str,    SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0' as time_str,    user_id  FROM user_behavior)GROUP BY date_str;

由于分析跨度为每10分钟,在sql 内层查询中使用 SUBSTR 截取事件小时和分钟字符,拼凑成每10分钟的数据,比如: 12:10,12:20。提交sql后,flink会将sql以流作业方式按照设定的WATERMARK和窗口提交到集群运行;

$ docker-compose logs -f taskmanagertaskmanager_1  | +I[2017-11-24, 17:20, 1]

在这里插入图片描述
在这里插入图片描述

【Flink1.14实战】Docker环境 Kafka SQL 连接器 创作挑战赛 【Flink1.14实战】Docker环境 Kafka SQL 连接器 新人创作奖励来咯,坚持创作打卡瓜分现金大奖美国云服务器