第四章:核心模块Python实现详解

4.1 数据接入模块:基于FastAPI + Kafka的通用接收器
import asynciofrom fastapi import FastAPI, HTTPException, BackgroundTasksfrom fastapi.responses import JSONResponsefrom confluent_kafka import Producer, KafkaExceptionimport jsonimport loggingfrom typing import Optional, Dict, Anyfrom pydantic import BaseModel, Fieldlogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)KAFKA_BROKERS = \"kafka1:9092,kafka2:9092\"KAFKA_TOPIC = \"raw_events\"class EventModel(BaseModel): event_id: str = Field(..., description=\"Unique event identifier\") event_type: str = Field(..., description=\"Type of the event\") source: str = Field(..., description=\"Source system of the event\") timestamp: Optional[str] = Field(None, description=\"Event timestamp (ISO 8601)\") payload: Dict[str, Any] = Field(..., description=\"Event data payload\")conf = { \'bootstrap.servers\': KAFKA_BROKERS, \'client.id\': \'fastapi_ingestor\', }producer = Producer(conf)app = FastAPI( title=\"Real-Time Event Ingestion API\", description=\"API for ingesting events into Kafka\", version=\"1.0.0\")async def produce_event(topic: str, key: str, value: dict): loop = asyncio.get_event_loop() try: await loop.run_in_executor( None, lambda: producer.produce(topic, key=key, value=json.dumps(value).encode(\'utf-8\')) ) producer.poll(0) logger.info(f\"Event with key \'{ key}\' sent to topic \'{ topic}\'\") except BufferError: logger.error(f\"Kafka producer buffer full for key \'{ key}\'\") raise HTTPException(status_code=503, detail=\"Service temporarily unavailable (Kafka buffer full)\") except KafkaException as e: logger.error(f\"Kafka error for key \'{ key}\': { e}\") raise HTTPException(status_code=500, detail=f\"Internal server error (Kafka: { e})\")def delivery_report(err, msg): if err is not None: logger.error(f\'Message delivery failed: { err}\') else: logger.info(f\'Message delivered to { msg.topic()} [{ msg.partition()}]\')producer = Producer({ **conf, \'on_delivery\': delivery_report})@app.post(\"/events/\", response_model=Dict[str, str], status_code=202)async def ingest_event(event: EventModel, background_tasks: BackgroundTasks): \"\"\" Ingest a single event into the Kafka topic. \"\"\" try: if not event.timestamp: from datetime import datetime event.timestamp = datetime.utcnow().isoformat() + \"Z\" kafka_message = event.dict() kafka_key = event.event_id background_tasks.add_task(produce_event, KAFKA_TOPIC, kafka_key, kafka_message) return { \"status\": \"accepted\", \"event_id\": event.event_id} except HTTPException: raise except Exception as e: logger.error(f\"Unexpected error ingesting event { event.event_id}: { e}\") raise HTTPException(status_code=500, detail=\"Internal server error\")@app.post(\"/events/batch/\", response_model=Dict[str, Any], status_code=207)async def ingest_events_batch(events: list[EventModel], background_tasks: BackgroundTasks): \"\"\" Ingest a batch of events into the Kafka topic. Returns a multi-status response indicating success/failure per event. \"\"\" results = [] success_count = 0 failure_count = 0 for event in events: try: if not event.timestamp: from datetime import datetime event.timestamp = datetime.utcnow().isoformat() + \"Z\" kafka_message = event.dict() kafka_key = event.event_id background_tasks.add_task(produce_event, KAFKA_TOPIC, kafka_key, kafka_message) results.append({ \"event_id\": event.event_id, \"status\": \"accepted\"}