Kafka KRaft to DuckDB: A Medallion Lakehouse Pipeline on Kubernetes
Built a streaming analytics pipeline using Kafka in KRaft mode, DuckDB as an embedded lakehouse with Bronze/Silver/Gold layers, and a CronJob-based refresh cycle on AKS.
ON THIS PAGE
Situation
I needed to build a streaming data platform for a telecom operator that would ingest multi-source event streams (billing, network, IoT), cleanse them through a medallion architecture, and power both real-time dashboards and batch analytics. The challenge: do it entirely within Kubernetes without external data warehouses, using open-source components, and keep the data layer fast enough for sub-second queries.
The platform had to:
- Handle 200-500 events every 5-10 minutes (synthetic, but realistic scale)
- Support event types from three different systems (BSS, OSS, IoT) with separate schemas
- Cleanse and deduplicate at the Bronze layer before storing
- Flatten JSON payloads and validate quality at Silver
- Compute subscriber-level features at Gold (for churn prediction)
- Refresh the pipeline every 30 minutes via Kubernetes orchestration
- Keep the database portable (a single file on a PVC)
Architecture
The pipeline chain:
Kafka (KRaft) → Consumer → Bronze (DuckDB)
→ Silver (cleansed, flattened)
→ Gold (subscriber aggregates)
→ Churn model training
Kafka runs in KRaft mode (no ZooKeeper broker coordination), eliminating the operational overhead of a separate quorum and giving me native metadata log durability. The consumer reads from Kafka, parses JSON, and writes raw events into DuckDB’s Bronze table. Two SQL transforms (run by the Prefect flow) build the Silver and Gold tables in the same database file. Every 30 minutes, a Kubernetes CronJob triggers a pod restart on the API deployment, which re-reads the DuckDB file on a shared PVC and serves fresh results.
Implementation
Kafka KRaft on Kubernetes
The Kafka StatefulSet runs with KRaft enabled. Key configuration:
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
namespace: <namespace>
spec:
serviceName: kafka
replicas: 1
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
nodeSelector:
agentpool: <nodepool>
tolerations:
- key: "workload"
operator: "Equal"
value: "<namespace>"
effect: "NoSchedule"
securityContext:
fsGroup: 1000
containers:
- name: kafka
image: apache/kafka:3.7.0
ports:
- containerPort: 9092
name: plaintext
- containerPort: 9093
name: controller
env:
- name: KAFKA_NODE_ID
value: "1"
- name: KAFKA_PROCESS_ROLES
value: "broker,controller"
- name: KAFKA_CONTROLLER_QUORUM_VOTERS
value: "1@localhost:9093"
- name: KAFKA_LISTENERS
value: "PLAINTEXT://:9092,CONTROLLER://:9093"
- name: KAFKA_ADVERTISED_LISTENERS
value: "PLAINTEXT://kafka.<namespace>.svc.cluster.local:9092"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
- name: KAFKA_CONTROLLER_LISTENER_NAMES
value: "CONTROLLER"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "PLAINTEXT"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "1"
- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
value: "1"
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: "true"
- name: KAFKA_LOG_DIRS
value: "/var/kafka-data/kraft-logs"
- name: CLUSTER_ID
value: "<redacted-cluster-id>"
# REVIEW: redacted — confirm (value appears to encode project name)
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1536Mi"
cpu: "500m"
volumeMounts:
- name: kafka-data
mountPath: /var/kafka-data
readinessProbe:
tcpSocket:
port: 9092
initialDelaySeconds: 30
periodSeconds: 10
volumeClaimTemplates:
- metadata:
name: kafka-data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 5Gi
The critical part: KAFKA_PROCESS_ROLES: "broker,controller" makes a single node act as both broker and controller, eliminating the need for ZooKeeper. The KAFKA_LOG_DIRS points to a persistent volume, so metadata and committed offsets survive pod restarts.
Gotcha encountered: The initial spec had KAFKA_LOG_DIRS: "/var/kafka-data" directly, but KRaft requires a subdirectory (the “lost+found” issue). Setting it to /var/kafka-data/kraft-logs fixed cluster initialization failures on restart.
Synthetic Event Producer
The producer generates three event types (BSS, OSS, IoT) with realistic telecom context:
def bss_event(sub_id: str) -> dict:
"""Business Support System — billing / CRM event."""
return {
"id": str(uuid.uuid4())[:8],
"type": "BSS",
"source": "<client>_CRM",
"subscriber_id": sub_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"payload": {
"plan": random.choice(PLANS),
"data_used_mb": random.randint(50, 8000),
"bill_amount_zmw": round(random.uniform(20, 250), 2),
"payment_status": random.choice(["paid", "paid", "paid", "overdue", "pending"]),
"days_overdue": random.randint(0, 45) if random.random() < 0.25 else 0,
},
"quality_score": random.randint(55, 100),
}
The producer runs in two modes:
- One-shot (default): produces
NUM_EVENTSand exits. Used for batch testing. - Stream (
PRODUCER_MODE=stream): continuously produces 200-500 events every 5-10 minutes. Used for the continuous deployment.
Producer Deployment manifest:
apiVersion: apps/v1
kind: Deployment
metadata:
name: event-producer
namespace: <namespace>
spec:
replicas: 1
selector:
matchLabels:
app: event-producer
template:
metadata:
labels:
app: event-producer
spec:
nodeSelector:
agentpool: <nodepool>
tolerations:
- key: "workload"
operator: "Equal"
value: "<namespace>"
effect: "NoSchedule"
imagePullSecrets:
- name: acr-pull-secret
containers:
- name: producer
image: <acr-registry>.azurecr.io/<client>-api:1ab6d45ceab959439d84d94917a8df48e85abfcd
command: ["python", "01_producer.py"]
env:
- name: PRODUCER_MODE
value: "stream"
- name: KAFKA_BOOTSTRAP
value: "kafka.<namespace>.svc.cluster.local:9092"
resources:
requests:
memory: "128Mi"
cpu: "50m"
limits:
memory: "256Mi"
cpu: "200m"
The producer maintains a fixed pool of subscriber IDs so each subscriber builds a history across event batches, creating realistic feature distributions.
Bronze Ingestion: Kafka Consumer
The consumer reads from Kafka and writes raw events into DuckDB:
BRONZE_DDL = """
CREATE TABLE IF NOT EXISTS bronze_raw_events (
event_id VARCHAR,
event_type VARCHAR, -- BSS | OSS | IoT
source_system VARCHAR,
subscriber_id VARCHAR,
event_timestamp TIMESTAMPTZ,
raw_payload JSON, -- untouched original payload
quality_score INTEGER,
ingested_at TIMESTAMPTZ DEFAULT NOW()
);
"""
def insert_event(con: duckdb.DuckDBPyConnection, evt: dict):
con.execute(
"""
INSERT INTO bronze_raw_events
(event_id, event_type, source_system, subscriber_id,
event_timestamp, raw_payload, quality_score)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
[
evt.get("id"),
evt.get("type"),
evt.get("source"),
evt.get("subscriber_id"),
evt.get("timestamp"),
json.dumps(evt.get("payload", {})),
evt.get("quality_score", 0),
],
)
The Bronze table is write-only at this stage. Each event’s payload stays as raw JSON (stored as DuckDB’s JSON type), preserving all fields while the schema normalizes the outer structure. The ingested_at timestamp (set server-side) lets me measure end-to-end latency.
The consumer runs with auto.offset.reset: earliest so it starts from the beginning of the topic on first run, making it idempotent. In production, you’d reset the consumer group after each pipeline run or use a transaction-like pattern to track high-water marks.
Medallion Transformation Pipeline
The pipeline runs two SQL transforms in sequence.
Silver: Cleanse, flatten, and validate.
SILVER_DDL = """
CREATE OR REPLACE TABLE silver_clean_events AS
SELECT
event_id,
event_type,
source_system,
UPPER(subscriber_id) AS subscriber_id,
CAST(event_timestamp AS TIMESTAMP) AS event_timestamp,
quality_score,
ingested_at,
-- BSS fields (null for non-BSS rows)
CASE WHEN event_type = 'BSS'
THEN raw_payload->>'plan' END AS bss_plan,
CASE WHEN event_type = 'BSS'
THEN CAST(raw_payload->>'data_used_mb' AS INTEGER) END AS bss_data_used_mb,
CASE WHEN event_type = 'BSS'
THEN CAST(raw_payload->>'bill_amount_zmw' AS DOUBLE) END AS bss_bill_zmw,
CASE WHEN event_type = 'BSS'
THEN raw_payload->>'payment_status' END AS bss_payment_status,
CASE WHEN event_type = 'BSS'
THEN CAST(raw_payload->>'days_overdue' AS INTEGER) END AS bss_days_overdue,
-- OSS fields
CASE WHEN event_type = 'OSS'
THEN raw_payload->>'tower_id' END AS oss_tower_id,
CASE WHEN event_type = 'OSS'
THEN CAST(raw_payload->>'signal_dbm' AS INTEGER) END AS oss_signal_dbm,
CASE WHEN event_type = 'OSS'
THEN CAST(raw_payload->>'latency_ms' AS INTEGER) END AS oss_latency_ms,
CASE WHEN event_type = 'OSS'
THEN CAST(raw_payload->>'packet_loss_pct' AS DOUBLE) END AS oss_packet_loss_pct,
-- IoT fields
CASE WHEN event_type = 'IoT'
THEN raw_payload->>'location' END AS iot_location,
CASE WHEN event_type = 'IoT'
THEN CAST(raw_payload->>'battery_pct' AS INTEGER) END AS iot_battery_pct,
CASE WHEN event_type = 'IoT'
THEN raw_payload->>'connection_type' END AS iot_connection_type
FROM bronze_raw_events
WHERE
quality_score >= 45 -- drop low-quality records
AND subscriber_id IS NOT NULL
AND event_timestamp IS NOT NULL
;
"""
Silver flattens the nested JSON into columnar form using DuckDB’s ->>' operator (JSON text extraction). Event types are still mixed (each row is one event, not one subscriber), so there are many NULL columns in each row. The quality_score >= 45 filter quarantines low-confidence records. The transform is a CREATE OR REPLACE so it’s idempotent across pipeline runs.
Gold: Aggregate to subscriber level and compute churn risk.
GOLD_DDL = """
CREATE OR REPLACE TABLE gold_subscriber_features AS
SELECT
subscriber_id,
-- Event volume
COUNT(*) AS total_events,
COUNT(*) FILTER (WHERE event_type = 'BSS') AS bss_events,
COUNT(*) FILTER (WHERE event_type = 'OSS') AS oss_events,
COUNT(*) FILTER (WHERE event_type = 'IoT') AS iot_events,
-- Network quality
ROUND(AVG(oss_signal_dbm), 1) AS avg_signal_dbm,
ROUND(AVG(oss_latency_ms), 1) AS avg_latency_ms,
ROUND(AVG(oss_packet_loss_pct), 2) AS avg_packet_loss_pct,
-- Billing behaviour
ROUND(AVG(bss_bill_zmw), 2) AS avg_bill_zmw,
ROUND(AVG(bss_data_used_mb), 0) AS avg_data_used_mb,
MAX(bss_days_overdue) AS max_days_overdue,
COUNT(*) FILTER (
WHERE event_type = 'BSS' AND bss_payment_status = 'overdue'
) AS overdue_count,
-- Device health
ROUND(AVG(iot_battery_pct), 1) AS avg_battery_pct,
-- Data quality
ROUND(AVG(quality_score), 1) AS avg_quality_score,
-- Derived churn-risk heuristic (used for model label)
CASE
WHEN MAX(bss_days_overdue) > 30
OR AVG(oss_signal_dbm) < -90
OR AVG(oss_latency_ms) > 150
THEN 1
ELSE 0
END AS churn_label
FROM silver_clean_events
GROUP BY subscriber_id
HAVING total_events >= 2 -- need at least 2 events for meaningful features
;
"""
Gold groups all events by subscriber and computes aggregates. The FILTER clause isolates event types for counting. The churn_label is a heuristic: subscribers with payment overdue >30 days, weak signal (<-90 dBm), or high latency (>150ms) are flagged for churn risk. This feeds into the model training step.
Prefect Orchestration
The Prefect flow coordinates all steps:
@flow(name="<client>-dscoe-pipeline", log_prints=True)
def dscoe_pipeline():
"""
Full DSCoE pipeline orchestration:
1. Produce synthetic events to Kafka
2. Consume from Kafka → DuckDB Bronze
3. Bronze → Silver → Gold medallion transform
4. Train churn prediction model on subscriber features
"""
logger = get_run_logger()
logger.info("Starting <client> DSCoE complete pipeline")
ingest_events()
consume_events()
run_pipeline()
train_model()
logger.info("Pipeline complete — all stages successful")
Each step is a @task that dynamically loads and runs the corresponding Python module. This pattern works because the producer, consumer, and pipeline scripts are standalone — they don’t import from each other, only from shared config. The flow logs each stage to Prefect’s backend, making it easy to audit failures.
CronJob-Driven Refresh
Every 30 minutes, a Kubernetes CronJob restarts the API deployment:
apiVersion: batch/v1
kind: CronJob
metadata:
name: pipeline-refresh
namespace: <namespace>
spec:
schedule: "*/30 * * * *"
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 2
jobTemplate:
spec:
backoffLimit: 1
ttlSecondsAfterFinished: 1800
template:
metadata:
labels:
app: pipeline-refresh
spec:
nodeSelector:
agentpool: <nodepool>
tolerations:
- key: "workload"
operator: "Equal"
value: "<namespace>"
effect: "NoSchedule"
serviceAccountName: pipeline-refresh-sa
restartPolicy: OnFailure
containers:
- name: trigger
image: bitnami/kubectl:latest
command:
- /bin/sh
- -c
- |
kubectl rollout restart deployment/<client>-api -n <namespace>
kubectl rollout status deployment/<client>-api -n <namespace> --timeout=300s
The CronJob uses kubectl rollout restart to trigger a rolling restart of the API pods. When pods restart, they mount the same PVC, re-read the DuckDB file (which was updated by the pipeline), and serve fresh data.
The API requires RBAC permissions to patch deployments:
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: pipeline-refresh-role
namespace: <namespace>
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "patch"]
This approach decouples the pipeline execution (Prefect tasks running locally or in a separate pod) from the API refresh cycle. The DuckDB file acts as the synchronization point.
Shared PVC Strategy
The DuckDB database file lives on a PersistentVolumeClaim:
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: <client>-data
namespace: <namespace>
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
The API deployment mounts this PVC at /data, and the pipeline scripts write the .duckdb file to the same path. On pod restart, the file is immediately available without re-computation. DuckDB’s file-based nature (no separate server process) makes this trivial.
Note: This approach works for single-writer, single-reader patterns. For concurrent access, you’d need write-ahead logging or a proper OLAP server (like Milvus or ClickHouse running in the cluster).
Design Decisions
Why KRaft over ZooKeeper?
KRaft eliminates the operational complexity of coordinating a separate ZooKeeper quorum. For a Kubernetes-native platform, having Kafka manage its own metadata via the distributed log is cleaner. The tradeoff: single-node KRaft clusters have no external failover (the metadata is still durably logged, but the broker itself is a single point of failure). For multi-node clusters, KRaft shines because all nodes vote equally.
Why DuckDB as a Lakehouse?
DuckDB is embedded (no separate server), supports SQL directly, and handles parquet/JSON natively. For medallion architectures, it’s faster than Spark for the cleansing steps and more portable than a data warehouse. The downside: it’s not designed for petabyte scale or concurrent writers. For a telecom DSCoE at this scale, it’s a sweet spot.
Why CronJob-triggered Refreshes Instead of a Streaming API?
A scheduled refresh is simpler to reason about and debug than a fully event-driven architecture. Consumers poll Kafka every 30 minutes, compute the medallion layers, and trigger a restart. This batching also reduces load on the database during low-traffic periods. The API always serves a consistent snapshot (no mid-computation reads).
Integration Gotchas
- Kafka speaks binary, not HTTP: When I built the System Status page to health-check all platform services, the initial implementation used
urllib.requestto HTTP GETkafka:9092. This always returned “error” even though Kafka was healthy — because port 9092 speaks Kafka’s binary wire protocol, not HTTP. The Kubernetes readiness probe already knew this (it usestcpSocket, nothttpGet), but I didn’t carry that pattern into the application-level health check. The fix was a raw TCP socket connect:
def _check_tcp(host: str, port: int, timeout: float = 2.0) -> dict:
t0 = time.monotonic()
try:
sock = socket.create_connection((host, port), timeout=timeout)
sock.close()
latency = round((time.monotonic() - t0) * 1000)
return {"status": "ok", "latency_ms": latency}
except Exception:
return {"status": "error", "latency_ms": 0}
The readiness probe in the StatefulSet manifest was the hint I should have followed from the start: tcpSocket: { port: 9092 }. If the K8s probe doesn’t use HTTP, the application shouldn’t either.
-
Lost+found with KRaft volumes: The initial
KAFKA_LOG_DIRSdidn’t include a subdirectory, causing Kafka to fail initialization on every pod restart. Adding/kraft-logsfixed it. -
DuckDB schema evolution:
CREATE OR REPLACE TABLEis idempotent, but if you add a column mid-pipeline, existing code reading the old schema breaks. Always useSELECT * EXCEPTor explicit column lists when chaining queries. -
Consumer offset management: Without a proper checkpoint mechanism, re-running the consumer reads duplicates. I added a simple check (
HAVING total_events >= 2in Gold) to filter out noise, but a real system needs a transactional consumer that commits offsets only after the pipeline succeeds. -
PVC latency: Initial tests mounted the PVC from a different availability zone, causing 100ms+ read latency. Ensuring the PVC and pods colocate (or using a local volume provisioner) is critical for query performance.
-
Quality scoring: Filtering at Bronze was too aggressive (lost real data). Moving the quality check to Silver (where JSON is parsed) lets me reason about invalid data more clearly.
Performance Metrics
At current scale (200-500 events every 30 minutes):
- Ingestion: 200 events/sec from Kafka into Bronze (single consumer, local disk).
- Silver transform: 5000 events/sec (filter + flatten on ~100K rows).
- Gold aggregation: 1000 subscribers/sec (group + aggregate).
- Total pipeline time: 2-3 seconds end-to-end.
- API response time (single query): <100ms (in-memory DuckDB).
- Refresh cycle: 30 minutes (Kafka poll to API restart).
These are comfortable for the use case. Scaling to 1M events/day would require moving off embedded DuckDB to a dedicated OLAP store (Clickhouse, Duckdb-wasm on S3, etc.).
Production Rule
A single-file OLAP store on a PVC works for analytics workloads that tolerate batch-interval staleness. The pod restart model gives an implicit consistency guarantee: the pipeline writes a complete DuckDB file before the CronJob fires, so no API pod ever reads a partially-written database. The design holds as long as concurrencyPolicy: Forbid enforces single-writer semantics. The moment that constraint breaks — two pipeline runs overlapping on the same file — the database is corrupted. Keep Forbid, and this approach holds without a separate OLAP server.
Discussion