import contextlib import os from collections.abc import Awaitable, Callable from typing import Any import aiofiles import orjson from environs import Env env: Env = Env() env.read_env() unit_log_file: str = env.str("UNIT_LOG_FILE") def escape_label_value(value: str) -> str: return value.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n") async def read_and_process_logs() -> dict[tuple[str, str], dict[str, float]]: metrics_data: dict[tuple[str, str], dict[str, float]] = {} if os.path.exists(unit_log_file): async with aiofiles.open(unit_log_file, mode="r+") as f: lines: list[str] = await f.readlines() await f.truncate(0) for line in lines: with contextlib.suppress(Exception): log_data: dict[str, str] = orjson.loads(line) request_time: float = float(log_data["request_time"]) url: str = log_data["url"] http_method: str = log_data["http_method"] key: tuple[str, str] = (url, http_method) if key in metrics_data: metrics_data[key]["count"] += 1 metrics_data[key]["sum"] += request_time else: metrics_data[key] = {"count": 1, "sum": request_time} return metrics_data async def generate_metrics_text() -> str: lines: list[str] = [ "# HELP request_duration_seconds_total Total request duration in seconds", "# TYPE request_duration_seconds_total counter", "# HELP request_duration_seconds_count Total number of requests", "# TYPE request_duration_seconds_count counter", ] metrics_data: dict[tuple[str, str], dict[str, float]] = await read_and_process_logs() for key, data in metrics_data.items(): url, http_method = key count: int = int(data["count"]) sum_values: float = data["sum"] labels: str = ( f'url="{escape_label_value(url)}",http_method="{escape_label_value(http_method)}"' ) lines.append(f"request_duration_seconds_count{{{labels}}} {count}") lines.append(f"request_duration_seconds_total{{{labels}}} {sum_values}") return "\n".join(lines) + "\n" async def app( scope: dict[str, Any], _: Callable[[], Awaitable[dict[str, Any]]], send: Callable[[dict[str, Any]], Awaitable[None]], ) -> None: if scope["type"] == "http": if scope["path"] == "/metrics": headers: list[tuple[bytes, bytes]] = [(b"content-type", b"text/plain; charset=utf-8")] metrics_text: str = await generate_metrics_text() body: bytes = metrics_text.encode("utf-8") await send({"type": "http.response.start", "status": 200, "headers": headers}) await send({"type": "http.response.body", "body": body}) else: await send( { "type": "http.response.start", "status": 404, "headers": [(b"content-type", b"text/plain; charset=utf-8")], } ) await send({"type": "http.response.body", "body": b"Not Found"})