【Flink1.14实战】Docker环境 Kafka SQL 连接器
Apache Kafka SQL 连接器
Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。
依赖
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.14.4/flink-sql-connector-kafka_2.11-1.14.4.jar
如何创建 Kafka 表
以下示例展示了如何创建 Kafka 表:
CREATE TABLE KafkaTable ( `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING, `ts` TIMESTAMP(3) METADATA FROM 'timestamp') WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv')
实战
- kafka --> flink --> es -->kibana
数据采集存储到kafka,通过flink消费kafka数据,实时计算,结果存储到es,最后通过kibana展现。
基于docker-compose。
1、编辑 docker-compose.yml
version: "3"services: zookeeper: image: wurstmeister/zookeeper:3.4.6 kafka: image: wurstmeister/kafka:2.12-2.2.1 environment: KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://:9094 KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE KAFKA_CREATE_TOPICS: "input:2:1, output:2:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 ports: - 9092:9092 - 9094:9094 depends_on: - zookeeper jobmanager: image: flink:1.14.4-scala_2.11 ports: - "8081:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager: image: flink:1.14.4-scala_2.11 depends_on: - jobmanager command: taskmanager scale: 1 environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 4
2、启动服务
$ docker-compose up -d# 下载依赖包$ wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.14.4/flink-sql-connector-kafka_2.11-1.14.4.jar$ docker cp flink-sql-connector-kafka_2.11-1.14.4.jar kafka_taskmanager_1:/opt/flink/lib$ docker cp flink-sql-connector-kafka_2.11-1.14.4.jar kafka_jobmanager_1:/opt/flink/lib# 查看libdocker exec -it kafka_taskmanager_1 /bin/bash# 重启$ docker-compose restart
3、kafka数据
电商用户行为分析共涉及3个表,商品类目信息表、商品类目信息表、用户行为信息表,其中用户行为信息表共5个列:用户ID、商品ID、商品类目ID、行为类型、时间戳;
$ docker exec -it kafka_kafka_1 /bin/bash$ cd /opt/kafka$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic input1,2268318,2520377,pv,16715440701,2333346,2520771,pv,1671561733
数据格式为csv,以逗号分隔
4、Flink SQL建表读取kafka数据
$ docker-compose exec jobmanager ./bin/sql-client.shCREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING,app_time BIGINT,ts AS TO_TIMESTAMP(FROM_UNIXTIME(app_time, 'yyyy-MM-dd HH:mm:ss')), proctime AS PROCTIME(), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND) WITH ( 'connector' = 'kafka', --使用kafka connector 'topic' = 'input', --kafka topic 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers'='kafka:9092', 'properties.group.id' = 'test-group', 'format' = 'csv', 'csv.field-delimiter'=',' );
- WATERMARK 定义处理混乱次序的事件时间属性,每5秒触发一次window
- PROCTIME 是内置函数,产生一个虚拟的Processing Time列,偶尔会用到
- WITH 里定义kafka连接信息和属性
- 由于事件时间格式为bigint,在sql中将其转为timestamp
5、分析场景
场景1:分析每10分钟累计在线用户数
最终的分析结果数据会写入print
CREATE TABLE users_v ( date_str STRING, time_str STRING, uv BIGINT, PRIMARY KEY (date_str, time_str) NOT ENFORCED) WITH ( 'connector' = 'print');
分析每10分钟在线用户数只需要知道日期(date_str)、时间(time_str)、数量(uv)即可;上面已经定义了消费kafka数据的表 user_behavior,现在查询该表,并将数据写入es;
INSERT INTO users_vSELECT date_str, MAX(time_str), COUNT(DISTINCT user_id) as uvFROM ( SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0' as time_str, user_id FROM user_behavior)GROUP BY date_str;
由于分析跨度为每10分钟,在sql 内层查询中使用 SUBSTR 截取事件小时和分钟字符,拼凑成每10分钟的数据,比如: 12:10,12:20。提交sql后,flink会将sql以流作业方式按照设定的WATERMARK和窗口提交到集群运行;
$ docker-compose logs -f taskmanagertaskmanager_1 | +I[2017-11-24, 17:20, 1]
创作挑战赛 新人创作奖励来咯,坚持创作打卡瓜分现金大奖美国云服务器