Building Real-Time ML Pipelines
Batch ML pipelines process data in scheduled intervals. Real-time systems respond instantly to new data, enabling use cases like fraud detection, recommendation systems, and predictive maintenance.
Batch vs. Streaming
Batch Processing
- Latency: Minutes to hours
- Use case: Daily reports, model retraining
- Tools: Airflow, Spark, dbt
Streaming Processing
- Latency: Milliseconds to seconds
- Use case: Fraud detection, live recommendations
- Tools: Kafka, Flink, FastAPI
Architecture Overview
Data Sources -> Kafka -> ML Service -> Predictions -> Downstream Systems
|
Feature Store
Implementation with FastAPI + Kafka
1. Kafka Producer
pythonfrom kafka import KafkaProducer import json producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) # Stream events producer.send('ml-features', {'user_id': 123, 'features': [...]})
2. ML Inference Service
pythonfrom fastapi import FastAPI from kafka import KafkaConsumer app = FastAPI() model = load_model() @app.post("/predict") async def predict(features: Features): prediction = model.predict(features.values) return {"prediction": prediction}
3. Stream Consumer
pythonconsumer = KafkaConsumer( 'ml-features', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) for message in consumer: features = message.value prediction = await predict(features) producer.send('predictions', prediction)
Feature Store Integration
A feature store provides:
- Low-latency access to precomputed features
- Feature versioning and lineage tracking
- Online/offline consistency
- Feature sharing across teams
Example: Feast Feature Store
pythonfrom feast import FeatureStore store = FeatureStore(repo_path=".") features = store.get_online_features( features=["user:age", "user:country"], entity_rows=[{"user_id": 123}] ).to_dict()
Challenges & Solutions
1. Model Latency
Problem: Model inference too slow for real-time requirements
Solutions:
- Model quantization (TensorRT, ONNX)
- Model distillation
- Batch inference
- Model caching
2. Data Quality
Problem: Missing or invalid features in production
Solutions:
- Feature validation with Pydantic
- Default values for missing features
- Monitoring and alerting
- Circuit breakers
3. Scalability
Problem: High traffic volumes
Solutions:
- Horizontal scaling with Kubernetes
- Load balancing
- Asynchronous processing
- Feature precomputation
Monitoring
Key metrics to track:
- Inference latency: p50, p95, p99
- Throughput: Requests per second
- Error rate: Failed predictions
- Model drift: Prediction distribution changes
Production Lessons
- Start simple: Begin with synchronous API, add streaming later
- Monitor everything: Latency, accuracy, data quality
- Plan for failures: Implement retries, fallbacks, circuit breakers
- Test thoroughly: Load testing, chaos engineering
- Version models: Track model versions deployed in production
Performance Results
Our production system handles:
- 10,000 predictions/second
- <50ms p95 latency
- 99.9% uptime
- Real-time feature updates
Technologies: Python, FastAPI, Apache Kafka, Redis, Docker, Kubernetes, Prometheus, Grafana