kafka
kafka 生成者和消费者
topic_name 主题名需要一致。
from confluent_kafka import Producer, KafkaException # 导入 Kafka 的生产者和异常类class KafkaOperator(): def __init__(self, topic_name): \"\"\" KafkaOperator 构造函数 :param topic_name: 要发送消息的 Kafka 主题名 \"\"\" self.topic_name = topic_name # 保存 Kafka 主题名 # Kafka 配置参数,包括多个服务器地址和客户端 ID self.kafka_params = { \'bootstrap.servers\': \'xxx,\' \'xxx,\' \'xxx\', \'client.id\': \'video_keywords\' } # 创建 Kafka 生产者实例 self.kafka_producer = Producer(self.kafka_params) def produce(self, message): \"\"\" 向 Kafka 主题发送一条消息 :param message: 字符串格式的消息 \"\"\" self.kafka_producer.produce(self.topic_name, message) # 发送消息 self.kafka_producer.flush() # 强制将缓存中的消息立即推送出去,确保消息发送完成if __name__ == \"__main__\": # 当此脚本作为主程序运行时,执行以下代码 # 实例化 KafkaOperator,指定 Kafka 的 topic k_operator = KafkaOperator(topic_name=\"xxx\") # 构造要发送的消息,包含关键词和图像质量 k_msg = { \"keywords\": \"tv/movies-test,tv/movies-here\", # 视频关键词 \"image_quality\": \"high\",# 图像质量设置为高 } import json # 引入 json 模块以将 Python 字典转换为字符串 # 将消息字典转换为 JSON 字符串并发送到 Kafka k_operator.produce(json.dumps(k_msg)) print(\"done\") # 打印完成信息