import asyncio import atexit import httpx import orjson import uvloop from environs import Env env = Env() env.read_env() unit_status_url: str = env.str("UNIT_STATUS_URL") uvloop.install() client: httpx.AsyncClient = httpx.AsyncClient() async def fetch_json_metrics() -> dict[str, str | int | float | dict]: response = await client.get(unit_status_url) response.raise_for_status() return orjson.loads(response.content) def json_to_prometheus_metrics(data: dict[str, str | int | float | dict]) -> list[str]: lines: list[str] = [] stack: list[tuple[str, dict[str, str | int | float | dict]]] = [("", data)] while stack: prefix, current_data = stack.pop() for key, value in current_data.items(): metric_name = f"{prefix}_{key}" if prefix else key if isinstance(value, dict): stack.append((metric_name, value)) elif isinstance(value, (int | float)): sanitized_name = metric_name.replace(".", "_").replace("-", "_") lines.append(f"{sanitized_name} {value}") return lines async def app(scope: dict[str, str | dict], receive: callable, send: callable) -> None: if scope["type"] == "http": if scope["path"] == "/metrics": data = await fetch_json_metrics() metrics_lines = json_to_prometheus_metrics(data) metrics_text = "\n".join(metrics_lines) + "\n" headers = [(b"content-type", b"text/plain; version=0.0.4; charset=utf-8")] await send({"type": "http.response.start", "status": 200, "headers": headers}) await send({"type": "http.response.body", "body": metrics_text.encode("utf-8")}) else: await send( { "type": "http.response.start", "status": 404, "headers": [(b"content-type", b"text/plain; charset=utf-8")], } ) await send({"type": "http.response.body", "body": b"Not Found"}) @atexit.register def close_client() -> None: asyncio.get_event_loop().run_until_complete(client.aclose())