import contextlib import os from datetime import datetime import aiofiles import orjson from environs import Env env = Env() env.read_env() unit_log_file: str = env.str("UNIT_LOG_FILE") def escape_label_value(value): return value.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n") async def read_and_process_logs(): metric_lines = [] if os.path.exists(unit_log_file): async with aiofiles.open(unit_log_file, mode="r+") as f: lines = await f.readlines() await f.truncate(0) for line in lines: with contextlib.suppress(Exception): log_data = orjson.loads(line) request_time = float(log_data["request_time"]) url = log_data["url"] http_method = log_data["http_method"] datetime_str = log_data["datetime"] # Parse datetime_str to Unix timestamp in milliseconds dt = datetime.strptime(datetime_str, "%d/%b/%Y:%H:%M:%S %z") timestamp_ms = int(dt.timestamp() * 1000) labels = ( f'url="{escape_label_value(url)}",' f'http_method="{escape_label_value(http_method)}"' ) # Generate the metric line metric_line = f"request_duration_seconds{{{labels}}} {request_time} {timestamp_ms}" metric_lines.append(metric_line) return metric_lines async def generate_metrics_text(): lines = [ "# HELP request_duration_seconds Request duration in seconds", "# TYPE request_duration_seconds gauge", ] metric_lines = await read_and_process_logs() lines.extend(metric_lines) return "\n".join(lines) + "\n" async def app(scope, receive, send): if scope["type"] == "http": if scope["path"] == "/metrics": headers = [(b"content-type", b"text/plain; charset=utf-8")] metrics_text = await generate_metrics_text() body = metrics_text.encode("utf-8") await send({"type": "http.response.start", "status": 200, "headers": headers}) await send({"type": "http.response.body", "body": body}) else: await send( { "type": "http.response.start", "status": 404, "headers": [(b"content-type", b"text/plain; charset=utf-8")], } ) await send({"type": "http.response.body", "body": b"Not Found"})