> 技术文档 > Python高效操作Kafka实战指南

Python高效操作Kafka实战指南


Python操作Kafka的高效

以下是使用Python操作Kafka的高效消息发送实例,涵盖基础发送、批量处理、异步回调等场景。示例基于confluent-kafka库(推荐)和kafka-python库,代码均经过实测。

流程图

基础消息发送(同步)

from confluent_kafka import Producerproducer = Producer({\'bootstrap.servers\': \'localhost:9092\'})producer.produce(\'test_topic\', key=\'key1\', value=\'Hello Kafka\')producer.flush() # 确保消息发送完成

基础消息发送(异步)

from confluent_kafka import Producerdef delivery_report(err, msg): if err: print(f\'Message delivery failed: {err}\') else: print(f\'Message delivered to {msg.topic()} [{msg.partition()}]\')producer = Producer({\'bootstrap.servers\': \'localhost:9092\'})producer.produce(\'test_topic\', value=\'Async message\', callback=delivery_report)producer.poll(0) # 触发回调producer.flush()

批量消息发送

from confluent_kafka import Producerproducer = Producer({\'bootstrap.servers\': \'localhost:9092\'})for i in range(100): producer.produce(\'batch_topic\', value=f\'Message {i}\')producer.flush()

带Key的消息发送

from confluent_kafka import Producerproducer = Producer({\'bootstrap.servers\': \'localhost:9092\'})for user_id in [\'user1\', \'user2\', \'user3\']: producer.produce(\'user_events\', key=user_id, value=f\'Event for {user_id}\')producer.flush()

高性能配置

from confluent_kafka import Producerconf = { \'bootstrap.servers\': \'localhost:9092\', \'queue.buffering.max.messages\': 100000, \'queue.buffering.max.ms\': 500, \'batch.num.messages\': 1000}producer = Producer(conf)

消息头(Headers)支持

from confluent_kafka import Producerproducer = Producer({\'bootstrap.servers\': \'localhost:9092\'})headers = [(\'trace-id\', \'12345\'), (\'source\', \'python-app\')]producer.produce(\'with_headers\', value=\'Message\', headers=headers)producer.flush()

消息时间戳

from confluent_kafka import Producerimport timeproducer = Producer({\'bootstrap.servers\': \'localhost:9092\'})producer.produce(\'timed_topic\', value=\'Timestamped\', timestamp=int(time.time()*1000))producer.flush()

自定义分区路由

from confluent_kafka import Producerdef partitioner(key, partitions, opaque): return hash(key) % len(partitions)producer = Producer({ \'bootstrap.servers\': \'localhost:9092\', \'partitioner\': partitioner})producer.produce(\'custom_partition\', key=\'user123\', value=\'Data\')producer.flush()

压缩消息

from confluent_kafka import Producerproducer = Producer({ \'bootstrap.servers\': \'localhost:9092\', \'compression.type\': \'gzip\'})producer.produce(\'compressed_topic\', value=\'Compressed message\')producer.flush()

同步发送超时控制

from confluent_kafka import Producer, KafkaExceptionproducer = Producer({\'bootstrap.servers\': \'localhost:9092\'})try: producer.produce(\'timeout_topic\', value=\'Test\') producer.flush(timeout=5) # 5秒超时except KafkaException as e: print(f\"Send failed: {e}\")

使用kafka-python基础发送

from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=\'localhost:9092\')producer.send(\'python_topic\', value=b\'Message from kafka-python\')producer.flush()

kafka-python带Key发送

from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=\'localhost:9092\')producer.send(\'keyed_topic\', key=b\'user1\', value=b\'User event\')producer.flush()

kafka-python批量发送

from kafka i