Python高效操作Kafka实战指南
Python操作Kafka的高效
以下是使用Python操作Kafka的高效消息发送实例,涵盖基础发送、批量处理、异步回调等场景。示例基于confluent-kafka
库(推荐)和kafka-python
库,代码均经过实测。
流程图
基础消息发送(同步)
from confluent_kafka import Producerproducer = Producer({\'bootstrap.servers\': \'localhost:9092\'})producer.produce(\'test_topic\', key=\'key1\', value=\'Hello Kafka\')producer.flush() # 确保消息发送完成
基础消息发送(异步)
from confluent_kafka import Producerdef delivery_report(err, msg): if err: print(f\'Message delivery failed: {err}\') else: print(f\'Message delivered to {msg.topic()} [{msg.partition()}]\')producer = Producer({\'bootstrap.servers\': \'localhost:9092\'})producer.produce(\'test_topic\', value=\'Async message\', callback=delivery_report)producer.poll(0) # 触发回调producer.flush()
批量消息发送
from confluent_kafka import Producerproducer = Producer({\'bootstrap.servers\': \'localhost:9092\'})for i in range(100): producer.produce(\'batch_topic\', value=f\'Message {i}\')producer.flush()
带Key的消息发送
from confluent_kafka import Producerproducer = Producer({\'bootstrap.servers\': \'localhost:9092\'})for user_id in [\'user1\', \'user2\', \'user3\']: producer.produce(\'user_events\', key=user_id, value=f\'Event for {user_id}\')producer.flush()
高性能配置
from confluent_kafka import Producerconf = { \'bootstrap.servers\': \'localhost:9092\', \'queue.buffering.max.messages\': 100000, \'queue.buffering.max.ms\': 500, \'batch.num.messages\': 1000}producer = Producer(conf)
消息头(Headers)支持
from confluent_kafka import Producerproducer = Producer({\'bootstrap.servers\': \'localhost:9092\'})headers = [(\'trace-id\', \'12345\'), (\'source\', \'python-app\')]producer.produce(\'with_headers\', value=\'Message\', headers=headers)producer.flush()
消息时间戳
from confluent_kafka import Producerimport timeproducer = Producer({\'bootstrap.servers\': \'localhost:9092\'})producer.produce(\'timed_topic\', value=\'Timestamped\', timestamp=int(time.time()*1000))producer.flush()
自定义分区路由
from confluent_kafka import Producerdef partitioner(key, partitions, opaque): return hash(key) % len(partitions)producer = Producer({ \'bootstrap.servers\': \'localhost:9092\', \'partitioner\': partitioner})producer.produce(\'custom_partition\', key=\'user123\', value=\'Data\')producer.flush()
压缩消息
from confluent_kafka import Producerproducer = Producer({ \'bootstrap.servers\': \'localhost:9092\', \'compression.type\': \'gzip\'})producer.produce(\'compressed_topic\', value=\'Compressed message\')producer.flush()
同步发送超时控制
from confluent_kafka import Producer, KafkaExceptionproducer = Producer({\'bootstrap.servers\': \'localhost:9092\'})try: producer.produce(\'timeout_topic\', value=\'Test\') producer.flush(timeout=5) # 5秒超时except KafkaException as e: print(f\"Send failed: {e}\")
使用kafka-python基础发送
from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=\'localhost:9092\')producer.send(\'python_topic\', value=b\'Message from kafka-python\')producer.flush()
kafka-python带Key发送
from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=\'localhost:9092\')producer.send(\'keyed_topic\', key=b\'user1\', value=b\'User event\')producer.flush()
kafka-python批量发送
from kafka i