Querying the Kubernetes Metrics API from a Pod: RBAC, Python Client, and Grafana Without Prometheus
Wired up a FastAPI application to read node and pod metrics from the Kubernetes metrics-server API using in-cluster config, a scoped ClusterRole, and the Python kubernetes client.
ON THIS PAGE
The Problem: Infra Visibility Without Prometheus
The <client> platform runs on AKS with a data pipeline (Prefect, MLflow, Kafka, Great Expectations). I needed real-time visibility into node and pod resource utilization — CPU, memory, restart counts — to correlate with pipeline performance. The obvious choice was Prometheus + Grafana, but I hadn’t configured a metrics scraper, retention policy, or alert rules. That’s operational overhead I didn’t have time for before the demo on Saturday.
Kubernetes already collects metrics via the metrics-server API — it powers kubectl top nodes and kubectl top pods. The catch: accessing it programmatically requires RBAC rules that read from the metrics.k8s.io API group, and Grafana needs a datasource that can fetch and parse the response.
I built a FastAPI service that exposes /api/v1/infra/nodes and /api/v1/infra/pods endpoints, wires them to Grafana via the JSON datasource plugin, and does the metric parsing in Python. No Prometheus, no sidecar collectors — just the metrics-server API and RBAC.
Architecture
The pattern:
- Create a
ServiceAccountbound to a scopedClusterRolethat permits read access tometrics.k8s.io/nodesandmetrics.k8s.io/pods. - In the FastAPI pod, load the in-cluster kubeconfig (certificate + token auto-injected by Kubernetes).
- Use the Python
kubernetesclient to hit the custom object API atmetrics.k8s.io/v1beta1. - Parse nanoCPU and memory strings into comparable units (milliCPU, MiB).
- Calculate utilization percentages against allocatable resources from the core API.
- Return JSON that Grafana’s JSON datasource plugin can consume.
The RBAC Setup
Metrics are not part of the core API (v1); they live in the metrics.k8s.io group. Nodes and pods are cluster-scoped resources, so ClusterRole and ClusterRoleBinding are required, not Role and RoleBinding.
Here’s the manifest:
apiVersion: v1
kind: ServiceAccount
metadata:
name: <client>-api
namespace: <namespace>
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: <client>-api-metrics-reader
rules:
- apiGroups: ["metrics.k8s.io"]
resources: ["nodes", "pods"]
verbs: ["get", "list"]
- apiGroups: [""]
resources: ["nodes", "pods"]
verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: <client>-api-metrics-reader
subjects:
- kind: ServiceAccount
name: <client>-api
namespace: <namespace>
roleRef:
kind: ClusterRole
name: <client>-api-metrics-reader
apiGroup: rbac.authorization.k8s.io
Two API groups:
metrics.k8s.io: The metrics API group. Without this, theCustomObjectsApicall fails with a 403.""(empty, which is the core group): Read basic node and pod objects to fetch allocatable resources and labels. Metrics alone don’t tell you what a node is capable of; you neednode.status.allocatable.cpuandnode.status.allocatable.memoryfrom the core API.
This is tighter than cluster-admin and reflects the principle of least privilege.
Python Client: In-Cluster Config and Metric Parsing
The FastAPI service runs as a pod. When Kubernetes creates a pod, it injects a service account token and CA certificate as files into /var/run/secrets/kubernetes.io/serviceaccount/. The Python client auto-detects this and uses it to authenticate to the API server.
try:
from kubernetes import client as k8s_client, config as k8s_config
try:
k8s_config.load_incluster_config()
except k8s_config.ConfigException:
k8s_config.load_kube_config()
K8S_AVAILABLE = True
except Exception:
K8S_AVAILABLE = False
The try-except chain allows local development (where load_kube_config() reads ~/.kube/config) and graceful degradation if Kubernetes isn’t available.
Parsing CPU: Nanos to Millicores
Kubernetes represents CPU as a string with a unit suffix:
100n= 100 nanocores = 0.1 millicores500m= 500 millicores = 0.5 cores2(no suffix) = 2 cores = 2000 millicores
Metrics API returns values in nanocores; allocatable resources are often in millicores. Here’s the parser:
def parse_cpu(val: str) -> int:
if val.endswith("n"):
return int(val[:-1]) // 1_000_000
if val.endswith("m"):
return int(val[:-1])
return int(val) * 1000
All outputs are in millicores (milliCPU). A node allocatable cpu: 7750m becomes 7750 (int); a metric cpu: 7532567891n becomes 7533 (int, rounded).
Parsing Memory: Ki, Mi, Gi to MiB
Memory units are trickier. Kubernetes uses binary prefixes:
Ki= kibibyte = 1024 bytesMi= mebibyte = 1024^2 bytesGi= gibibyte = 1024^3 bytes
The metrics API returns memory in bytes; allocatable resources use Ki or Mi. The parser normalizes to MiB:
def parse_mem_mi(val: str) -> int:
if val.endswith("Ki"):
return int(val[:-2]) // 1024
if val.endswith("Mi"):
return int(val[:-2])
if val.endswith("Gi"):
return int(val[:-2]) * 1024
return int(val) // (1024 * 1024)
Output is always MiB. A node allocatable memory: 14833444Ki becomes 14486 MiB (int); metrics memory: 15160959320 (bytes) becomes 14459 MiB (int).
Endpoint: Node Metrics
The /api/v1/infra/nodes endpoint queries the metrics API for all nodes, enriches them with allocatable resources from the core API, and calculates utilization percentages.
@app.get("/api/v1/infra/nodes", tags=["Infrastructure"])
def infra_nodes():
if not K8S_AVAILABLE:
raise HTTPException(503, "Kubernetes API not available")
api = k8s_client.CustomObjectsApi()
core = k8s_client.CoreV1Api()
metrics = api.list_cluster_custom_object("metrics.k8s.io", "v1beta1", "nodes")
nodes_info = core.list_node(label_selector=f"agentpool={NODE_POOL}")
alloc = {n.metadata.name: n.status.allocatable for n in nodes_info.items}
result = []
for item in metrics["items"]:
name = item["metadata"]["name"]
if name not in alloc:
continue
cpu_used = parse_cpu(item["usage"]["cpu"])
mem_used = parse_mem_mi(item["usage"]["memory"])
cpu_alloc = parse_cpu(alloc[name].get("cpu", "0"))
mem_alloc = parse_mem_mi(alloc[name].get("memory", "0Ki"))
result.append({
"node": name,
"cpu_millicores": cpu_used,
"cpu_allocatable": cpu_alloc,
"cpu_pct": round(cpu_used / cpu_alloc * 100, 1) if cpu_alloc else 0,
"memory_mi": mem_used,
"memory_allocatable_mi": mem_alloc,
"memory_pct": round(mem_used / mem_alloc * 100, 1) if mem_alloc else 0,
})
return {"nodes": result, "generated_at": datetime.now(timezone.utc).isoformat()}
Key moves:
list_cluster_custom_object()queries the metrics API without schema validation. It returns a dict, not a typed object.list_node(label_selector=...)filters nodes by the AKS nodepool label (agentpool=<nodepool>).- For each metric item, look up its allocatable resources in the core API response.
- If a node in metrics isn’t in the filtered core API response (e.g., it’s in a different nodepool), skip it.
- All percentages are rounded to one decimal.
Response example:
{
"nodes": [
{
"node": "aks-<cluster>-12345678-vmss000000",
"cpu_millicores": 3251,
"cpu_allocatable": 7750,
"cpu_pct": 42.0,
"memory_mi": 9216,
"memory_allocatable_mi": 14486,
"memory_pct": 63.6
}
],
"generated_at": "2026-04-25T10:30:15.123456+00:00"
}
Endpoint: Pod Metrics
The /api/v1/infra/pods endpoint queries metrics for all pods in <namespace>, enriches them with phase and restart counts from the core API, and returns a sorted list.
@app.get("/api/v1/infra/pods", tags=["Infrastructure"])
def infra_pods():
if not K8S_AVAILABLE:
raise HTTPException(503, "Kubernetes API not available")
api = k8s_client.CustomObjectsApi()
core = k8s_client.CoreV1Api()
metrics = api.list_namespaced_custom_object(
"metrics.k8s.io", "v1beta1", NAMESPACE, "pods"
)
pods = core.list_namespaced_pod(NAMESPACE)
pod_status = {}
for p in pods.items:
restarts = sum(
cs.restart_count for cs in (p.status.container_statuses or [])
)
pod_status[p.metadata.name] = {
"phase": p.status.phase,
"restarts": restarts,
"app": p.metadata.labels.get("app", "unknown"),
}
result = []
for item in metrics["items"]:
name = item["metadata"]["name"]
total_cpu = sum(parse_cpu(c["usage"]["cpu"]) for c in item["containers"])
total_mem = sum(parse_mem_mi(c["usage"]["memory"]) for c in item["containers"])
info = pod_status.get(name, {})
result.append({
"pod": name,
"app": info.get("app", "unknown"),
"cpu_millicores": total_cpu,
"memory_mi": total_mem,
"phase": info.get("phase", "Unknown"),
"restarts": info.get("restarts", 0),
})
result.sort(key=lambda x: x["cpu_millicores"], reverse=True)
return {"pods": result, "generated_at": datetime.now(timezone.utc).isoformat()}
Subtleties:
- Metrics are per-container; sum CPU and memory across all containers in a pod.
- Pod phase (Running, Pending, Failed) comes from the core API, not metrics.
- Restart count is the sum across all containers; containers that have restarted appear in
container_statuseswithrestart_count > 0. - Results are sorted by CPU usage descending, so the heaviest workloads appear first.
Response example:
{
"pods": [
{
"pod": "<client>-api-abc123def456",
"app": "<client>-api",
"cpu_millicores": 487,
"memory_mi": 1024,
"phase": "Running",
"restarts": 0
},
{
"pod": "mlflow-pod-xyz789",
"app": "mlflow",
"cpu_millicores": 142,
"memory_mi": 512,
"phase": "Running",
"restarts": 2
}
],
"generated_at": "2026-04-25T10:30:15.123456+00:00"
}
Grafana Integration
Grafana’s JSON datasource plugin (marcusolsson-json-datasource) can consume HTTP endpoints that return JSON and extract values via JSONPath. The datasource is configured in a ConfigMap:
apiVersion: v1
kind: ConfigMap
metadata:
name: grafana-datasources
namespace: <namespace>
data:
datasources.yaml: |
apiVersion: 1
datasources:
- name: <client>API
type: marcusolsson-json-datasource
url: http://<client>-api:8000
access: proxy
isDefault: true
The dashboard definition uses targets with urlPath and JSONPath fields:
{
"datasource": "<client>API",
"targets": [
{
"refId": "A",
"urlPath": "/api/v1/infra/nodes",
"method": "GET",
"fields": [
{"jsonPath": "$.nodes[*].cpu_pct", "type": "number", "name": "CPU %"}
]
}
],
"title": "Node CPU Utilization (%)",
"type": "gauge"
}
Grafana parses the response, extracts all CPU percentages from the array, and renders them as a gauge. Another panel pulls node details into a table:
{
"targets": [
{
"refId": "A",
"urlPath": "/api/v1/infra/nodes",
"method": "GET",
"fields": [
{"jsonPath": "$.nodes[*].node", "type": "string", "name": "Node"},
{"jsonPath": "$.nodes[*].cpu_millicores", "type": "number", "name": "CPU (m)"},
{"jsonPath": "$.nodes[*].cpu_allocatable", "type": "number", "name": "CPU Alloc (m)"},
{"jsonPath": "$.nodes[*].cpu_pct", "type": "number", "name": "CPU %"},
{"jsonPath": "$.nodes[*].memory_mi", "type": "number", "name": "Mem (Mi)"},
{"jsonPath": "$.nodes[*].memory_allocatable_mi", "type": "number", "name": "Mem Alloc (Mi)"},
{"jsonPath": "$.nodes[*].memory_pct", "type": "number", "name": "Mem %"}
]
}
],
"title": "Node Details",
"type": "table"
}
The dashboard auto-refreshes every 30 seconds and shows both gauges (green <60%, yellow 60-85%, red >85%) and tables for detailed inspection.
Deployment Context
The FastAPI service runs in the same namespace (<namespace>) as the pipeline workloads. The deployment references the <client>-api ServiceAccount:
apiVersion: apps/v1
kind: Deployment
metadata:
name: <client>-api
namespace: <namespace>
spec:
replicas: 1
selector:
matchLabels:
app: <client>-api
template:
metadata:
labels:
app: <client>-api
spec:
serviceAccountName: <client>-api
containers:
- name: api
image: <acr-registry>.azurecr.io/<client>-api:1ab6d45ceab959439d84d94917a8df48e85abfcd
ports:
- containerPort: 8000
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
The token and CA are automatically mounted at /var/run/secrets/kubernetes.io/serviceaccount/. The Python client finds them without explicit configuration.
Why Not Prometheus?
Prometheus is the right tool for long-term metrics retention, complex alerting, and time-series analytics. But it requires:
- A scrape config and service monitors to tell Prometheus what to scrape.
- Storage (local disk or object storage like S3).
- A retention policy (how long to keep data).
- Alertmanager integration.
For a demo that needs to show “node CPU is at 42%, pods are running, no surprises,” Prometheus is overengineered. The metrics-server API is already running on every Kubernetes cluster and retains recent data in memory. Querying it directly via a simple HTTP API and exposing it to Grafana gives you real-time visibility with near-zero operational burden.
Metrics API Gotchas
API group scoping
Metrics are in metrics.k8s.io, not the core API. Forgetting this in the ClusterRole means the pod gets a 403 forbidden error. Nodes and pods are cluster-scoped, so ClusterRole is required; a namespaced Role won’t work.
Metric precision
The metrics API rounds recent measurements; it’s not precise enough for billing or SLA guarantees. Use it for dashboards and alerting, not for auditing resource usage.
Unit confusion
The metrics API returns CPU in nanocores but allocatable resources are often in millicores. Always parse and normalize; don’t assume.
Container-level aggregation
Pod metrics are per-container. If you have an init container or multiple app containers, you must sum them. The endpoint does this automatically.
Refresh latency
Grafana’s JSON datasource doesn’t cache by default and respects the API response’s timestamp. If your FastAPI service is slow, dashboard refresh will feel sluggish. Keep the metric queries fast.
This approach scales to tens of nodes and hundreds of pods without breaking. For larger clusters, Prometheus becomes necessary.
Discussion