All checks were successful
Build Docker Image / build-and-deploy-image (push) Successful in 43s
92 lines
3.1 KiB
Python
92 lines
3.1 KiB
Python
import contextlib
|
|
import os
|
|
from collections.abc import Awaitable, Callable
|
|
from typing import Any
|
|
|
|
import aiofiles
|
|
import orjson
|
|
from environs import Env
|
|
|
|
env: Env = Env()
|
|
env.read_env()
|
|
|
|
unit_log_file: str = env.str("UNIT_LOG_FILE")
|
|
|
|
|
|
def escape_label_value(value: str) -> str:
|
|
return value.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
|
|
|
|
|
|
async def read_and_process_logs() -> dict[tuple[str, str], dict[str, float]]:
|
|
metrics_data: dict[tuple[str, str], dict[str, float]] = {}
|
|
|
|
if os.path.exists(unit_log_file):
|
|
async with aiofiles.open(unit_log_file, mode="r+") as f:
|
|
lines: list[str] = await f.readlines()
|
|
await f.truncate(0)
|
|
|
|
for line in lines:
|
|
with contextlib.suppress(Exception):
|
|
log_data: dict[str, str] = orjson.loads(line)
|
|
request_time: float = float(log_data["request_time"])
|
|
url: str = log_data["url"]
|
|
http_method: str = log_data["http_method"]
|
|
key: tuple[str, str] = (url, http_method)
|
|
|
|
if key in metrics_data:
|
|
metrics_data[key]["count"] += 1
|
|
metrics_data[key]["sum"] += request_time
|
|
else:
|
|
metrics_data[key] = {"count": 1, "sum": request_time}
|
|
|
|
return metrics_data
|
|
|
|
|
|
async def generate_metrics_text() -> str:
|
|
lines: list[str] = [
|
|
"# HELP request_duration_seconds_total Total request duration in seconds",
|
|
"# TYPE request_duration_seconds_total counter",
|
|
"# HELP request_duration_seconds_count Total number of requests",
|
|
"# TYPE request_duration_seconds_count counter",
|
|
]
|
|
|
|
metrics_data: dict[tuple[str, str], dict[str, float]] = await read_and_process_logs()
|
|
|
|
for key, data in metrics_data.items():
|
|
url, http_method = key
|
|
count: int = int(data["count"])
|
|
sum_values: float = data["sum"]
|
|
|
|
labels: str = (
|
|
f'url="{escape_label_value(url)}",' f'http_method="{escape_label_value(http_method)}"'
|
|
)
|
|
|
|
lines.append(f"request_duration_seconds_count{{{labels}}} {count}")
|
|
lines.append(f"request_duration_seconds_total{{{labels}}} {sum_values}")
|
|
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
async def app(
|
|
scope: dict[str, Any],
|
|
_: Callable[[], Awaitable[dict[str, Any]]],
|
|
send: Callable[[dict[str, Any]], Awaitable[None]],
|
|
) -> None:
|
|
if scope["type"] == "http":
|
|
if scope["path"] == "/metrics":
|
|
headers: list[tuple[bytes, bytes]] = [(b"content-type", b"text/plain; charset=utf-8")]
|
|
metrics_text: str = await generate_metrics_text()
|
|
body: bytes = metrics_text.encode("utf-8")
|
|
|
|
await send({"type": "http.response.start", "status": 200, "headers": headers})
|
|
await send({"type": "http.response.body", "body": body})
|
|
else:
|
|
await send(
|
|
{
|
|
"type": "http.response.start",
|
|
"status": 404,
|
|
"headers": [(b"content-type", b"text/plain; charset=utf-8")],
|
|
}
|
|
)
|
|
await send({"type": "http.response.body", "body": b"Not Found"})
|