> 技术文档 > 流处理、实时分析与RAG驱动的Python ETL框架:构建智能数据管道(中)

流处理、实时分析与RAG驱动的Python ETL框架:构建智能数据管道(中)


第四章:核心模块Python实现详解

在这里插入图片描述

4.1 数据接入模块:基于FastAPI + Kafka的通用接收器
# fastapi_kafka_ingestor.pyimport asynciofrom fastapi import FastAPI, HTTPException, BackgroundTasksfrom fastapi.responses import JSONResponsefrom confluent_kafka import Producer, KafkaExceptionimport jsonimport loggingfrom typing import Optional, Dict, Anyfrom pydantic import BaseModel, Field# 配置日志logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)# Kafka配置 (应从环境变量或配置中心读取)KAFKA_BROKERS = \"kafka1:9092,kafka2:9092\"KAFKA_TOPIC = \"raw_events\"# Pydantic模型用于请求验证class EventModel(BaseModel): event_id: str = Field(..., description=\"Unique event identifier\") event_type: str = Field(..., description=\"Type of the event\") source: str = Field(..., description=\"Source system of the event\") timestamp: Optional[str] = Field(None, description=\"Event timestamp (ISO 8601)\") payload: Dict[str, Any] = Field(..., description=\"Event data payload\")# Kafka Producer配置conf = {  \'bootstrap.servers\': KAFKA_BROKERS, \'client.id\': \'fastapi_ingestor\', # 可选: 启用压缩 # \'compression.codec\': \'snappy\', # 可选: 启用批处理 # \'batch.num.messages\': 100, # \'linger.ms\': 10, # 可选: 启用ACKs保证 # \'acks\': \'all\', # 可选: 重试 # \'retries\': 3, # \'retry.backoff.ms\': 100,}producer = Producer(conf)# FastAPI应用app = FastAPI( title=\"Real-Time Event Ingestion API\", description=\"API for ingesting events into Kafka\", version=\"1.0.0\")# 异步发送消息到Kafkaasync def produce_event(topic: str, key: str, value: dict): loop = asyncio.get_event_loop() try: # 在单独的线程中运行同步的producer.produce await loop.run_in_executor( None, lambda: producer.produce(topic, key=key, value=json.dumps(value).encode(\'utf-8\')) ) producer.poll(0) # 触发回调 logger.info(f\"Event with key \'{ key}\' sent to topic \'{ topic}\'\") except BufferError: logger.error(f\"Kafka producer buffer full for key \'{ key}\'\") raise HTTPException(status_code=503, detail=\"Service temporarily unavailable (Kafka buffer full)\") except KafkaException as e: logger.error(f\"Kafka error for key \'{ key}\': { e}\") raise HTTPException(status_code=500, detail=f\"Internal server error (Kafka: { e})\")# 交付报告回调 (可选,用于确认消息是否成功发送)def delivery_report(err, msg): if err is not None: logger.error(f\'Message delivery failed: { err}\') else: logger.info(f\'Message delivered to { msg.topic()} [{ msg.partition()}]\')# 设置交付报告回调producer = Producer({ **conf, \'on_delivery\': delivery_report})# API端点:接收单个事件@app.post(\"/events/\", response_model=Dict[str, str], status_code=202)async def ingest_event(event: EventModel, background_tasks: BackgroundTasks): \"\"\" Ingest a single event into the Kafka topic. \"\"\" try: # 如果没有提供时间戳,使用当前时间 if not event.timestamp: from datetime import datetime event.timestamp = datetime.utcnow().isoformat() + \"Z\" # 构造Kafka消息 kafka_message = event.dict() kafka_key = event.event_id # 使用event_id作为Kafka key保证顺序性 # 异步发送消息 (使用BackgroundTasks避免阻塞响应) background_tasks.add_task(produce_event, KAFKA_TOPIC, kafka_key, kafka_message) return { \"status\": \"accepted\", \"event_id\": event.event_id} except HTTPException: raise except Exception as e: logger.error(f\"Unexpected error ingesting event { event.event_id}: { e}\") raise HTTPException(status_code=500, detail=\"Internal server error\")# API端点:批量接收事件@app.post(\"/events/batch/\", response_model=Dict[str, Any], status_code=207)async def ingest_events_batch(events: list[EventModel], background_tasks: BackgroundTasks): \"\"\" Ingest a batch of events into the Kafka topic. Returns a multi-status response indicating success/failure per event. \"\"\" results = [] success_count = 0 failure_count = 0 for event in events: try: if not event.timestamp: from datetime import datetime event.timestamp = datetime.utcnow().isoformat() + \"Z\" kafka_message = event.dict() kafka_key = event.event_id background_tasks.add_task(produce_event, KAFKA_TOPIC, kafka_key, kafka_message) results.append({ \"event_id\": event.event_id, \"status\": \"accepted\"}