Churn Prediction with RandomForest, MLflow, and DuckDB in a Kubernetes Init Container
Trained a telecom churn prediction model using feature-engineered DuckDB Gold layer data, tracked with MLflow, orchestrated by Prefect, and executed as a Kubernetes init container on every deployment.
ON THIS PAGE
I needed to ship a churn prediction model into a
The Architecture
The pipeline follows a medallion architecture:
- Kafka producer generates synthetic BSS (billing), OSS (network), and IoT events
- Kafka consumer writes to DuckDB Bronze layer (raw, unvalidated)
- Medallion transformer creates Silver (cleaned, quality-gated) and Gold (subscriber-level features)
- RandomForest trainer consumes Gold features and logs experiments to MLflow
- FastAPI server loads the trained model and exposes prediction endpoints
The entire flow from step 1 to 4 runs as a Kubernetes initContainer on every deployment. The API container waits for the init to complete, loads the artifact, and starts serving.
Feature Engineering: From Rows to Subscriber Profiles
The Gold layer aggregates raw events into subscriber-level features. From a messy stream of BSS, OSS, and IoT events, I computed:
CREATE OR REPLACE TABLE gold_subscriber_features AS
SELECT
subscriber_id,
-- Event volume
COUNT(*) AS total_events,
COUNT(*) FILTER (WHERE event_type = 'BSS') AS bss_events,
COUNT(*) FILTER (WHERE event_type = 'OSS') AS oss_events,
COUNT(*) FILTER (WHERE event_type = 'IoT') AS iot_events,
-- Network quality
ROUND(AVG(oss_signal_dbm), 1) AS avg_signal_dbm,
ROUND(AVG(oss_latency_ms), 1) AS avg_latency_ms,
ROUND(AVG(oss_packet_loss_pct), 2) AS avg_packet_loss_pct,
-- Billing behaviour
ROUND(AVG(bss_bill_zmw), 2) AS avg_bill_zmw,
ROUND(AVG(bss_data_used_mb), 0) AS avg_data_used_mb,
MAX(bss_days_overdue) AS max_days_overdue,
COUNT(*) FILTER (
WHERE event_type = 'BSS' AND bss_payment_status = 'overdue'
) AS overdue_count,
-- Device health
ROUND(AVG(iot_battery_pct), 1) AS avg_battery_pct,
-- Data quality
ROUND(AVG(quality_score), 1) AS avg_quality_score,
-- Derived churn-risk heuristic (used for model label)
CASE
WHEN MAX(bss_days_overdue) > 30
OR AVG(oss_signal_dbm) < -90
OR AVG(oss_latency_ms) > 150
THEN 1
ELSE 0
END AS churn_label
FROM silver_clean_events
GROUP BY subscriber_id
HAVING total_events >= 2
Billing stress (overdue payments), network degradation (weak signal, high latency), and device health all feed the label. This heuristic is not gospel — it’s a starting signal. In production, I’d layer actual churn labels from CRM data.
The feature engineering step filters out noise at the Silver layer: any event with quality_score < 45 gets quarantined. This reduces garbage flowing into the model.
Training: RandomForest with MLflow Instrumentation
The trainer loads the Gold layer, splits the data, and trains a balanced RandomForest:
def load_data(con: duckdb.DuckDBPyConnection):
query = f"""
SELECT {', '.join(FEATURES)}, {TARGET}
FROM gold_subscriber_features
WHERE {' AND '.join(f"{f} IS NOT NULL" for f in FEATURES)}
"""
df = con.execute(query).fetchdf()
# Fill any residual nulls with column medians
for col in FEATURES:
df[col] = df[col].fillna(df[col].median())
return df
X = df[FEATURES].values
y = df[TARGET].values
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.25, random_state=42, stratify=y if y.sum() > 1 else None
)
model = Pipeline([
("scaler", StandardScaler()),
("clf", RandomForestClassifier(
n_estimators=200,
max_depth=6,
min_samples_leaf=2,
class_weight="balanced",
random_state=42,
)),
])
model.fit(X_train, y_train)
The pipeline couples a StandardScaler with the forest. If I load the model later without the scaler, features won’t be normalized and predictions will be garbage. Joblib preserves the entire pipeline as a single object.
MLflow Pinning and the registered_model_name Gotcha
I pinned MLflow to v2.13.0 in the Kubernetes manifest:
image: ghcr.io/mlflow/mlflow:v2.13.0
MLflow’s behavior around model registries changed across versions. In earlier versions, calling mlflow.sklearn.log_model() with the registered_model_name parameter would auto-register the model in the Model Registry. This parameter was deprecated and removed in later versions. Pinning to 2.13.0 avoids API churn and ensures the trainer script works without modification.
The trainer logs metrics and artifacts without relying on the registry:
with mlflow.start_run() if mlflow_active else contextlib.nullcontext():
if mlflow_active:
mlflow.log_param("n_estimators", 200)
mlflow.log_param("max_depth", 6)
mlflow.log_metric("test_auc", auc)
mlflow.log_metric("cv_auc_mean", cv_scores.mean())
mlflow.log_metric("cv_auc_std", cv_scores.std())
mlflow.sklearn.log_model(model, "churn_model")
# Feature importance visualization
rf = model.named_steps["clf"]
imp = sorted(zip(FEATURES, rf.feature_importances_), key=lambda x: -x[1])
All artifacts go to MLflow’s artifact store (an emptyDir volume in this POC — replace with S3/Azure Blob in production). The trainer also saves the model locally with joblib for the API to load at startup.
Joblib Serialization
The trained pipeline is serialized and saved alongside the features list:
joblib.dump({"model": model, "features": FEATURES}, MODEL_PATH)
Joblib handles sklearn pipelines cleanly and is the de facto standard in the ML ecosystem. At API startup:
try:
artefact = joblib.load(MODEL_PATH)
ML_MODEL = artefact["model"]
ML_FEATS = artefact["features"]
MODEL_OK = True
except FileNotFoundError:
ML_MODEL = None
ML_FEATS = FEATURES
MODEL_OK = False
The health check endpoint signals MODEL_OK to the readiness probe. If the model fails to load, the pod is marked unhealthy and the deployment fails fast. This is intentional: a broken model should not be hidden behind a crash loop.
Orchestration with Prefect
Prefect 3 tasks wrap each pipeline stage. The flow is:
@flow(name="<client>-dscoe-pipeline", log_prints=True)
def dscoe_pipeline():
"""Full DSCoE pipeline orchestration"""
logger = get_run_logger()
logger.info("Starting <client> DSCoE complete pipeline")
ingest_events() # Kafka producer
consume_events() # Kafka → Bronze
run_pipeline() # Bronze → Silver → Gold
train_model() # RandomForest on Gold
logger.info("Pipeline complete — all stages successful")
if __name__ == "__main__":
dscoe_pipeline()
Each task loads its source module dynamically (because the producer is 01_producer.py — numeric prefixes don’t import cleanly). No task dependencies; Prefect executes sequentially by default:
def _load_module(name, filepath):
"""Load a Python module from file path (works with numeric-prefix filenames)."""
spec = importlib.util.spec_from_file_location(name, filepath)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
Tasks log via Prefect’s run logger, which flows upstream to the deployment logs. Debugging a failed init container: check the pod logs.
Kubernetes Init Container Execution
The init container declaration in the API deployment:
initContainers:
- name: pipeline-seed
image: <acr-registry>.azurecr.io/<client>-api:1ab6d45ceab959439d84d94917a8df48e85abfcd
command: ["python", "flows/dscoe_flow.py"]
env:
- name: DB_PATH
value: "/data/<client>_lakehouse.duckdb"
- name: MODEL_PATH
value: "/data/churn_model.joblib"
- name: MLFLOW_TRACKING_URI
value: "http://mlflow:5000"
- name: CHROMA_PATH
value: "/data/chroma_db"
- name: KAFKA_BOOTSTRAP
value: "kafka.<namespace>.svc.cluster.local:9092"
- name: PREFECT_API_URL
value: "http://prefect-server:4200/api"
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "2Gi"
cpu: "1000m"
volumeMounts:
- name: app-data
mountPath: /data
Init containers share volumes with the main container. Both mount /data, which persists to a PVC. The init container writes the DuckDB file and the trained model there; the API container reads them. If the init fails, the pod never reaches Running state — the deployment fails immediately.
Resource limits are set deliberately. The producer + consumer + pipeline can be memory-hungry (full subscriber event fan-out), so I allow up to 2Gi. The pipeline typically completes in under 60 seconds for synthetic data; for real data at scale, this timeout would need tuning.
Model Loading and Prediction Serving
The API boots and loads the model once:
# Load model once at startup
try:
artefact = joblib.load(MODEL_PATH)
ML_MODEL = artefact["model"]
ML_FEATS = artefact["features"]
MODEL_OK = True
except FileNotFoundError:
ML_MODEL = None
ML_FEATS = FEATURES
MODEL_OK = False
The health endpoint returns:
@app.get("/api/v1/health", tags=["System"])
def health():
"""System health check: model status and timestamp."""
return {
"status": "ok",
"model_loaded": MODEL_OK,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
The Kubernetes readiness probe calls this:
readinessProbe:
httpGet:
path: /api/v1/health
port: 8000
initialDelaySeconds: 15
periodSeconds: 10
If the model fails to load, model_loaded is false, the probe fails, and traffic is not routed to the pod until it’s healthy.
Prediction serving accepts a subscriber ID and returns a risk score:
@app.get("/api/v1/subscribers/{subscriber_id}/predict", tags=["ML"])
def predict_churn(subscriber_id: str):
"""Return churn probability for a given subscriber from the Gold feature store."""
if not MODEL_OK:
raise HTTPException(503, "Model not loaded — run 04_train.py first.")
con = get_db()
try:
row = con.execute(
f"SELECT {', '.join(ML_FEATS)} FROM gold_subscriber_features "
"WHERE subscriber_id = ?",
[subscriber_id.upper()],
).fetchone()
finally:
con.close()
if row is None:
raise HTTPException(404, f"Subscriber {subscriber_id} not found in Gold layer.")
# Replace None with 0 (median fallback)
X = np.array([[v if v is not None else 0.0 for v in row]])
proba = float(ML_MODEL.predict_proba(X)[0, 1])
risk = "high" if proba >= 0.65 else "medium" if proba >= 0.35 else "low"
actions = {
"high": "Assign retention manager & offer loyalty bundle immediately.",
"medium": "Send proactive win-back offer via SMS campaign.",
"low": "Standard engagement; monitor next billing cycle.",
}
return {
"subscriber_id": subscriber_id.upper(),
"churn_probability": round(proba, 3),
"risk_level": risk,
"recommended_action": actions[risk],
"model_version": "RandomForest v1.0",
"scored_at": datetime.now(timezone.utc).isoformat(),
}
The response bridges data science and operations: churn probability (for dashboard tuning), risk tier (for action routing), and a recommended action (for the retention team).
Data Quality Validation
The Gold layer includes avg_quality_score for every subscriber. The API exposes a data quality endpoint:
@app.get("/api/v1/analytics/quality", tags=["Analytics"])
def data_quality_report():
"""Data quality breakdown across pipeline layers."""
con = get_db()
try:
bronze = con.execute("SELECT COUNT(*) FROM bronze_raw_events").fetchone()[0]
silver = con.execute("SELECT COUNT(*) FROM silver_clean_events").fetchone()[0]
quarantined = bronze - silver
avg_quality = con.execute(
"SELECT ROUND(AVG(quality_score), 1) FROM bronze_raw_events"
).fetchone()[0] or 0
quality_dist = con.execute("""
SELECT
COUNT(*) FILTER (WHERE quality_score >= 80) AS excellent,
COUNT(*) FILTER (WHERE quality_score >= 60 AND quality_score < 80) AS good,
COUNT(*) FILTER (WHERE quality_score >= 45 AND quality_score < 60) AS fair,
COUNT(*) FILTER (WHERE quality_score < 45) AS poor
FROM bronze_raw_events
""").fetchone()
return {
"pipeline": {
"bronze_total": bronze,
"silver_total": silver,
"quarantined": quarantined,
"pass_rate": round(silver / bronze * 100, 1) if bronze else 0,
},
"quality_scores": {
"avg_bronze": avg_quality,
"excellent_gte80": quality_dist[0],
"good_60_79": quality_dist[1],
"fair_45_59": quality_dist[2],
"poor_lt45": quality_dist[3],
},
"threshold": "Events with quality_score < 45 are quarantined at the Silver layer",
"generated_at": datetime.now(timezone.utc).isoformat(),
}
# REVIEW: verify this command — try block appears to lack finally: con.close()
This surfaces the true pass rate: how many Bronze records made it to Silver. In production, a sudden drop signals upstream data issues. By the time the model sees Gold features, they’ve already been vetted.
Model Durability Issues
MLflow artifact store
The emptyDir volume for MLflow artifacts disappears when the pod restarts. For POC this is fine. In production, mount an S3 bucket or Azure Blob Container.
Model staleness
Every deployment retrains the model on fresh synthetic data. Real pipelines need offline retraining windows and model approval gates before pushing to production.
Feature drift
If Gold schema changes (adding a new billing field), the trainer recomputes and the model retrains. But if you change features without retraining, the predict endpoint will fail on a shape mismatch. Version features; assert they match the stored model.
Null handling
The trainer fills residual nulls with column medians. The predictor replaces None with 0. These must be consistent. A better pattern: use a SimpleImputer in the pipeline itself.
Serialization mismatch
If you upgrade scikit-learn after training and the model can’t load the serialized pipeline, the API won’t start. Pin dependencies in the container image.
Shipping It
The deployment workflow:
- Build and push image to ACR with all source code
- Deploy the Kubernetes manifest (init container runs automatically)
- Init container logs flow to stdout — monitor with
kubectl logs -f pod-name -c pipeline-seed - When init completes, main API container starts and loads the model
- Readiness probe passes, traffic flows in
Total time from deploy to serving: about 90 seconds for synthetic data. The init container has a 2Gi memory limit and no explicit timeout, so it’s bounded but not aggressive. For real production data, profile and tune.
Production Rule
Fresh models on every deploy. No manual retraining, no model registry overhead, no drift from stale artifacts. The model lives where the code lives — in the container image — and gets refreshed with each rollout. For a DSCoE proof-of-concept, this is clean and low-ops.
For a production platform, add model validation gates (reject models with falling AUC), separate training and serving timelines, and real experiment tracking. The skeleton scales: Prefect orchestrates complex DAGs, MLflow tracks experiments even without the registry, and DuckDB handles the medallion transformation with SQL — no Spark required.
The init container pattern works for any batch job: data migration, cache warming, schema bootstrap. It’s a lever that shouldn’t be underused.
Discussion