diff --git a/ecommerce-platform/.env.example b/ecommerce-platform/.env.example index 39ff37b..9e599a1 100644 --- a/ecommerce-platform/.env.example +++ b/ecommerce-platform/.env.example @@ -5,9 +5,14 @@ MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin SERVICE_NAME=local-dev SYNC_TASKS=false +LOG_FILE=logs/ecommerce-debug.jsonl BUG_INDEX_ERROR=false +BUG_ORDER_COUPON_KEY=false BUG_RACE_CONDITION=false +BUG_INVENTORY_MISSING_ROW=false +BUG_INVENTORY_BROKEN_PRODUCT_ID=7 BUG_FLOAT_PRECISION=false +BUG_PAYMENT_GATEWAY_KEY=false BUG_NULL_VIP=false AGENT_API_BASE=/api/agent AGENT_CONFIG_PLACEHOLDER=true diff --git a/ecommerce-platform/.gitignore b/ecommerce-platform/.gitignore index cc17739..4e51f1f 100644 --- a/ecommerce-platform/.gitignore +++ b/ecommerce-platform/.gitignore @@ -1 +1,4 @@ .runtime/ +logs/** +!logs/ +!logs/.gitkeep diff --git a/ecommerce-platform/README.md b/ecommerce-platform/README.md index 85ca94b..bb97c9c 100644 --- a/ecommerce-platform/README.md +++ b/ecommerce-platform/README.md @@ -1,29 +1,170 @@ # Ecommerce Platform -独立于仓库现有运行时的电商订单平台子项目,用于演示微服务、异步任务、预埋 Bug 与后续自动修复场景。 +独立于仓库现有运行时的电商订单平台子项目,用于演示微服务、真实流量、统一日志、预埋故障与“根据 traceback 进行热修复”。 -当前阶段已调整范围: +当前范围: -- 实现电商平台主体、基础设施、监控、压测与预埋 Bug -- 保留 `/api/agent/*` 接口契约与网关占位响应 -- 不实现真实 Agent 服务、PR 自动化和飞书通知逻辑 +- 已实现订单、库存、支付、用户四个服务 +- 已实现本地网关、监控大屏、持续流量模拟、统一 JSONL 日志、日志回放 +- 已保留 `/api/agent/*` 接口契约与网关占位响应 +- 不实现真实 Agent 修复、PR 自动化和飞书通知逻辑 ## 目录 -- `services/`: 四个 FastAPI 微服务与共享代码 -- `celery_app/`: Celery 配置与任务 -- `nginx/`: 网关与 `/api/agent/*` 预留接口 -- `monitoring/`: Prometheus 配置 -- `traffic/`: Locust 流量脚本 -- `contracts/`: 预留的 Agent API 契约 -- `scripts/`: 初始化、切流、健康检查脚本 +- `services/`: 四个 FastAPI 微服务与共享基础设施 +- `scripts/local_gateway.py`: 本地网关和监控入口 +- `scripts/traffic_simulator.py`: 双十一风格持续流量模拟器 +- `scripts/replay_log.py`: 日志回放脚本 +- `scripts/generate_log_dataset.py`: 百万级日志集生成器 +- `logs/`: 统一运行日志、回放结果和离线数据集输出目录 +- `contracts/agent-openapi.yaml`: `/api/agent/*` 预留契约 -## 启动 +## 快速启动 1. 进入目录:`cd ecommerce-platform` -2. 复制环境变量:`cp .env.example .env` -3. 启动:`docker compose up -d` -4. 初始化数据:`docker compose exec order-service-a python /app/scripts/init_db.py` +2. 准备环境:`cp .env.example .env` +3. 安装依赖:`python3 -m venv .venv && .venv/bin/pip install -r requirements.txt` +4. 启动本地热重载栈:`bash scripts/run_local_reload_stack.sh` +5. 启动持续流量:`.venv/bin/python scripts/traffic_simulator.py --base-url http://127.0.0.1:58080` + +如果默认端口被占用,启动脚本会自动改用空闲端口,并把实际端口写到 `.runtime/local-stack/ports.env`。 + +## 本地调试入口 + +- 网关首页:`/` +- 监控大屏:`/monitor` +- 监控摘要:`/api/monitor/summary` +- SSE 实时流:`/api/monitor/stream` +- 统一日志:`logs/ecommerce-debug.jsonl` + +停止本地栈: + +```bash +bash scripts/stop_local_reload_stack.sh +``` + +## 热修复演示流程 + +1. 启动本地热重载栈。 +2. 打开 `.runtime/local-stack/ports.env`,确认实际网关端口。 +3. 按需打开一个故障开关后重启对应服务,或直接在代码里临时改坏实现。 +4. 通过浏览器、流量模拟器或日志回放触发请求。 +5. 在 `logs/ecommerce-debug.jsonl` 中查看 `service_exception` 记录与完整 traceback。 +6. 修改对应服务代码,`uvicorn --reload` 会自动热重载。 +7. 再次触发同一路径,确认 500 消失、流量恢复正常。 + +## 预埋故障与 Traceback 类型 + +默认情况下全部关闭,系统可正常跑通并持续接流量。打开开关后才会进入故障路径。 + +| 开关 | 路径 | 主要异常 | 触发方式 | +| --- | --- | --- | --- | +| `BUG_INDEX_ERROR=true` | `services/order/service.py` | `IndexError` | `GET /api/v1/orders/user/{new_user}` | +| `BUG_ORDER_COUPON_KEY=true` | `services/order/service.py` | `KeyError` | `POST /api/v1/orders` 且使用未知 `coupon_code` | +| `BUG_RACE_CONDITION=true` | `services/inventory/service.py` | 并发超卖,常见业务异常为 `InsufficientStockError` | 高频并发下单 | +| `BUG_INVENTORY_MISSING_ROW=true` | `services/inventory/service.py` | `AttributeError` | `GET /api/v1/inventory/{BUG_INVENTORY_BROKEN_PRODUCT_ID}` | +| `BUG_FLOAT_PRECISION=true` | `services/payment/service.py` | 财务金额偏差,无 traceback,适合对账类问题 | `GET /api/v1/payments/calculate` | +| `BUG_PAYMENT_GATEWAY_KEY=true` | `services/payment/service.py` | `KeyError` | `GET /api/v1/payments/calculate?...&coupon_discount=10` | +| `BUG_NULL_VIP=true` | `services/user/service.py` | `TypeError` | 新注册用户后请求 `/api/v1/users/{id}/discount` | + +推荐做修复演示时优先使用: + +- `IndexError` +- `KeyError` +- `AttributeError` +- `TypeError` + +这几类 traceback 更接近真实线上“代码改坏后直接 500”的场景。 + +## 日志 + +所有服务和本地网关都会把结构化事件写入统一 JSONL 文件,主要事件类型包括: + +- `gateway_access`: 网关入口流量,包含正常流量、warning、error +- `service_request`: 服务侧请求完成记录 +- `service_exception`: 服务侧未处理异常,包含 `exception_type` 和完整 `traceback` +- `*_warning`: 404/409/401/400 这类业务告警 + +日志适合两种用途: + +1. 直接排障:从 `trace_id` 串联 gateway 和 service 记录 +2. 离线回放:把历史 JSONL 重新打回网关 + +回放示例: + +```bash +.venv/bin/python scripts/replay_log.py \ + --input logs/ecommerce-debug.jsonl \ + --base-url http://127.0.0.1:58080 \ + --limit 500 +``` + +### 日志目录规则 + +`logs/` 整体已经加入 `.gitignore`,默认不提交任何运行日志、回放结果或离线数据集。 + +- `logs/ecommerce-debug.jsonl`: 本地网关和四个服务写入的统一运行日志 +- `logs/replay-results.jsonl`: `scripts/replay_log.py` 的默认回放输出 +- `logs/replay-*.jsonl`: 其他手工指定名称的回放结果 +- `logs/datasets//manifest.json`: 数据集元信息,记录总量、分片和异常分布 +- `logs/datasets//traffic-shard-01.jsonl` 到 `traffic-shard-XX.jsonl`: 按分片切开的原始 JSONL 流量 + +建议把每一批离线样本单独放到一个 `logs/datasets//` 目录里,不要混用不同批次的 `manifest.json` 和 `traffic-shard-*.jsonl`。 + +### 重新生成运行日志 + +运行日志不是离线脚本直接生成的,而是由真实请求自然写入: + +```bash +bash scripts/run_local_reload_stack.sh +.venv/bin/python scripts/traffic_simulator.py --base-url http://127.0.0.1:58080 +``` + +如果要重新开始一份干净的运行日志,可以先删掉旧文件: + +```bash +rm -f logs/ecommerce-debug.jsonl logs/replay-results.jsonl logs/replay-*.jsonl +``` + +然后重新启动本地栈并重新打流量。 + +## 百万级日志集 + +如果需要大规模调试样本,可以生成新的百万级数据集: + +```bash +.venv/bin/python scripts/generate_log_dataset.py \ + --output-dir logs/datasets/million-traffic-realistic \ + --gateway-records 1000000 \ + --shards 8 \ + --days 7 \ + --clean +``` + +目录命名建议: + +- `logs/datasets/million-traffic-realistic`: 贴近当前服务真实 traceback 类型的数据集 +- `logs/datasets/-`: 比如 `logs/datasets/2026-04-24-repair-drill` +- `logs/datasets//manifest.json` 一定和同目录下的 `traffic-shard-*.jsonl` 配套使用 + +如果要重建某一批数据集,直接删除对应目录后重新执行生成命令即可: + +```bash +rm -rf logs/datasets/million-traffic-realistic +.venv/bin/python scripts/generate_log_dataset.py \ + --output-dir logs/datasets/million-traffic-realistic \ + --gateway-records 1000000 \ + --shards 8 \ + --days 7 \ + --clean +``` + +该数据集会同时包含: + +- 正常流量 +- warning 流量 +- 带 traceback 的 error 流量 +- 与真实故障路径一致的 `IndexError` / `KeyError` / `AttributeError` / `TypeError` ## 测试 diff --git a/ecommerce-platform/logs/.gitkeep b/ecommerce-platform/logs/.gitkeep new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/ecommerce-platform/logs/.gitkeep @@ -0,0 +1 @@ + diff --git a/ecommerce-platform/scripts/generate_log_dataset.py b/ecommerce-platform/scripts/generate_log_dataset.py new file mode 100644 index 0000000..4a17e76 --- /dev/null +++ b/ecommerce-platform/scripts/generate_log_dataset.py @@ -0,0 +1,821 @@ +from __future__ import annotations + +import argparse +import json +import math +import random +import shutil +import uuid +from datetime import UTC, datetime, timedelta +from pathlib import Path +from typing import Any + + +ROUTE_CATALOG: dict[str, dict[str, Any]] = { + "inventory": { + "method": "GET", + "path": "/api/v1/inventory/{product_id}", + "service": "inventory-service", + "upstream": "http://127.0.0.1:58002", + "success_status": 200, + "warning_status": 404, + "error_status": 502, + }, + "orders_create": { + "method": "POST", + "path": "/api/v1/orders", + "service": "order-service", + "upstream": "http://127.0.0.1:58001", + "success_status": 201, + "warning_status": 409, + "error_status": 500, + }, + "orders_user": { + "method": "GET", + "path": "/api/v1/orders/user/{user_id}", + "service": "order-service", + "upstream": "http://127.0.0.1:58001", + "success_status": 200, + "warning_status": 404, + "error_status": 500, + }, + "orders_detail": { + "method": "GET", + "path": "/api/v1/orders/{order_id}", + "service": "order-service", + "upstream": "http://127.0.0.1:58001", + "success_status": 200, + "warning_status": 404, + "error_status": 500, + }, + "payments_calc": { + "method": "GET", + "path": "/api/v1/payments/calculate?total={total}&vip_level={vip_level}&coupon_discount={coupon_discount}", + "service": "payment-service", + "upstream": "http://127.0.0.1:58003", + "success_status": 200, + "warning_status": 400, + "error_status": 500, + }, + "payments_process": { + "method": "POST", + "path": "/api/v1/payments/{order_id}/process", + "service": "payment-service", + "upstream": "http://127.0.0.1:58003", + "success_status": 200, + "warning_status": 404, + "error_status": 500, + }, + "user_profile": { + "method": "GET", + "path": "/api/v1/users/{user_id}", + "service": "user-service", + "upstream": "http://127.0.0.1:58004", + "success_status": 200, + "warning_status": 404, + "error_status": 500, + }, + "user_discount": { + "method": "GET", + "path": "/api/v1/users/{user_id}/discount", + "service": "user-service", + "upstream": "http://127.0.0.1:58004", + "success_status": 200, + "warning_status": 404, + "error_status": 500, + }, + "user_register": { + "method": "POST", + "path": "/api/v1/users/register", + "service": "user-service", + "upstream": "http://127.0.0.1:58004", + "success_status": 201, + "warning_status": 400, + "error_status": 500, + }, + "user_login": { + "method": "POST", + "path": "/api/v1/users/login", + "service": "user-service", + "upstream": "http://127.0.0.1:58004", + "success_status": 200, + "warning_status": 401, + "error_status": 500, + }, +} + +ACTION_WEIGHTS = { + "valley": { + "inventory": 34, + "user_profile": 12, + "orders_create": 10, + "orders_user": 9, + "orders_detail": 8, + "payments_calc": 10, + "user_discount": 8, + "user_register": 5, + "user_login": 4, + }, + "warm": { + "inventory": 27, + "orders_create": 18, + "orders_user": 10, + "orders_detail": 10, + "payments_calc": 13, + "user_profile": 8, + "user_discount": 7, + "user_register": 4, + "user_login": 3, + }, + "peak": { + "inventory": 23, + "orders_create": 24, + "orders_user": 11, + "orders_detail": 12, + "payments_calc": 12, + "payments_process": 4, + "user_profile": 5, + "user_discount": 5, + "user_register": 2, + "user_login": 2, + }, + "flash": { + "inventory": 26, + "orders_create": 28, + "orders_user": 8, + "orders_detail": 12, + "payments_calc": 11, + "payments_process": 5, + "user_discount": 4, + "user_profile": 3, + "user_register": 1, + "user_login": 2, + }, +} + +ERROR_SCENARIOS: dict[str, list[dict[str, Any]]] = { + "inventory": [ + { + "exception_type": "AttributeError", + "error": "'NoneType' object has no attribute 'total_qty'", + "event": "service_exception", + "file": "services/inventory/service.py", + "function": "get_inventory", + "line": 41, + "code": "return inventory.total_qty", + } + ], + "orders_create": [ + { + "exception_type": "KeyError", + "error": "'FLASH50'", + "event": "service_exception", + "file": "services/order/service.py", + "function": "_coupon_discount", + "line": 31, + "code": "return COUPON_DISCOUNTS[payload.coupon_code]", + } + ], + "orders_user": [ + { + "exception_type": "IndexError", + "error": "list index out of range", + "event": "service_exception", + "file": "services/order/service.py", + "function": "get_user_orders", + "line": 120, + "code": "_latest = orders[-1]", + } + ], + "orders_detail": [ + { + "exception_type": "ValueError", + "error": "Corrupted order timeline payload", + "event": "service_exception", + "file": "services/order/routes.py", + "function": "get_order_route", + "line": 70, + "code": "return OrderResponse.model_validate(get_order(db, order_id))", + } + ], + "payments_calc": [ + { + "exception_type": "KeyError", + "error": "'settlement_amount'", + "event": "service_exception", + "file": "services/payment/service.py", + "function": "_gateway_settlement_amount", + "line": 33, + "code": "return settlement_quote[\"settlement_amount\"]", + } + ], + "payments_process": [ + { + "exception_type": "KeyError", + "error": "'transaction_id'", + "event": "service_exception", + "file": "services/payment/service.py", + "function": "process_payment", + "line": 92, + "code": "payment = Payment(..., transaction_id=gateway_result[\"transaction_id\"])", + } + ], + "user_profile": [ + { + "exception_type": "ValueError", + "error": "Corrupted user profile projection", + "event": "service_exception", + "file": "services/user/routes.py", + "function": "get_user_route", + "line": 64, + "code": "return UserResponse.model_validate(get_user(db, user_id))", + } + ], + "user_discount": [ + { + "exception_type": "TypeError", + "error": "list indices must be integers or slices, not NoneType", + "event": "service_exception", + "file": "services/user/service.py", + "function": "get_vip_discount", + "line": 61, + "code": "return discount_rates[user.vip_level]", + } + ], + "user_register": [ + { + "exception_type": "ValueError", + "error": "duplicate email detected during projection", + "event": "service_exception", + "file": "services/user/routes.py", + "function": "register_user_route", + "line": 28, + "code": "return UserResponse.model_validate(register_user(db, payload))", + } + ], + "user_login": [ + { + "exception_type": "ValueError", + "error": "session encoder failed for login payload", + "event": "service_exception", + "file": "services/user/routes.py", + "function": "login_user_route", + "line": 48, + "code": "return LoginResponse(success=True, user_id=user.id)", + } + ], +} + +WARNING_EXCEPTION_TYPES = { + "inventory": "ValueError", + "orders_create": "InsufficientStockError", + "orders_user": "ValueError", + "orders_detail": "OrderNotFoundError", + "payments_calc": "ValueError", + "payments_process": "ValueError", + "user_profile": "ValueError", + "user_discount": "ValueError", + "user_register": "ValueError", + "user_login": "ValueError", +} + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Generate a million-scale ecommerce JSONL log dataset." + ) + parser.add_argument( + "--output-dir", + default="logs/datasets/million-traffic", + help="Directory to write the generated shard files and manifest.", + ) + parser.add_argument( + "--gateway-records", + type=int, + default=1_000_000, + help="Number of gateway_access records to generate.", + ) + parser.add_argument( + "--shards", + type=int, + default=8, + help="How many JSONL shards to split the dataset into.", + ) + parser.add_argument( + "--days", + type=int, + default=7, + help="How many virtual days the dataset should span.", + ) + parser.add_argument( + "--seed", + type=int, + default=20260424, + help="Deterministic random seed.", + ) + parser.add_argument( + "--clean", + action="store_true", + help="Remove the target directory before writing the dataset.", + ) + return parser.parse_args() + + +def resolve_output_dir(path_str: str) -> Path: + path = Path(path_str).expanduser() + if not path.is_absolute(): + path = Path(__file__).resolve().parents[1] / path + return path + + +def isoformat_z(moment: datetime) -> str: + return moment.astimezone(UTC).isoformat(timespec="milliseconds").replace("+00:00", "Z") + + +def traffic_phase(hour: float) -> tuple[str, str, str]: + flash_10 = math.exp(-((hour - 10.0) ** 2) / 0.03) + flash_20 = math.exp(-((hour - 20.0) ** 2) / 0.02) + morning = math.exp(-((hour - 9.0) ** 2) / 1.2) + lunch = math.exp(-((hour - 12.4) ** 2) / 1.5) + evening = math.exp(-((hour - 18.8) ** 2) / 1.0) + if flash_20 > 0.35: + return "flash", "mega-drop", "20点主会场秒杀" + if flash_10 > 0.28: + return "flash", "flash-sale", "10点整点秒杀" + if evening > 0.35: + return "peak", "campaign-wave", "晚高峰冲顶" + if morning > 0.28 or lunch > 0.24: + return "warm", "traffic-rise", "日间成交波段" + return "valley", "idle", "长尾流量" + + +def weighted_action(rng: random.Random, phase: str) -> str: + weights = ACTION_WEIGHTS[phase] + return rng.choices(list(weights), weights=list(weights.values()), k=1)[0] + + +def payload_for_action( + rng: random.Random, + action: str, + order_id: int, + user_id: int, +) -> tuple[str, str, Any, Any]: + route = ROUTE_CATALOG[action] + method = route["method"] + if action == "inventory": + product_id = rng.randint(1, 20) + path = route["path"].format(product_id=product_id) + response_body = { + "product_id": product_id, + "total_qty": rng.randint(350, 900), + "reserved_qty": rng.randint(0, 40), + "sold_qty": rng.randint(80, 320), + "updated_at": "2026-04-24T08:00:00", + "available_qty": rng.randint(20, 300), + } + return method, path, None, response_body + if action == "orders_create": + item_count = rng.choices([1, 2, 3, 4], weights=[50, 28, 16, 6], k=1)[0] + items = [ + {"product_id": rng.randint(1, 20), "quantity": rng.choices([1, 2, 3], weights=[60, 28, 12], k=1)[0]} + for _ in range(item_count) + ] + request_body = { + "user_id": user_id, + "items": items, + "coupon_code": rng.choice([None, None, None, "SAVE10", "SAVE20"]), + } + amount = round(sum(item["quantity"] * rng.uniform(9.9, 129.9) for item in items) * 1.03, 2) + response_body = { + "id": order_id, + "order_no": f"ORD-{1777000000 + order_id}-{uuid.uuid4().hex[:8].upper()}", + "user_id": user_id, + "status": "paid", + "total_amount": f"{amount:.2f}", + "discount_amount": f"{amount * 0.05:.2f}", + "tax_amount": f"{amount * 0.08:.2f}", + "final_amount": f"{amount:.2f}", + "created_at": "2026-04-24T08:00:00", + } + return method, route["path"], request_body, response_body + if action == "orders_user": + path = route["path"].format(user_id=user_id) + response_body = [] + return method, path, None, response_body + if action == "orders_detail": + path = route["path"].format(order_id=order_id) + amount = round(rng.uniform(19.9, 299.9), 2) + response_body = { + "id": order_id, + "order_no": f"ORD-{1777000000 + order_id}-{uuid.uuid4().hex[:8].upper()}", + "user_id": user_id, + "status": rng.choice(["paid", "cancelled", "pending"]), + "total_amount": f"{amount:.2f}", + "discount_amount": "0.00", + "tax_amount": f"{amount * 0.08:.2f}", + "final_amount": f"{amount:.2f}", + "created_at": "2026-04-24T08:00:00", + } + return method, path, None, response_body + if action == "payments_calc": + total = round(rng.uniform(12.0, 999.99), 2) + vip_level = rng.randint(0, 3) + coupon_discount = rng.choice([0, 0, 10, 20]) + path = route["path"].format(total=total, vip_level=vip_level, coupon_discount=coupon_discount) + response_body = { + "total_amount": f"{total:.2f}", + "discount_amount": f"{total * (vip_level * 0.03):.2f}", + "tax_amount": f"{total * 0.08:.2f}", + "final_amount": f"{total * 1.02:.2f}", + } + return method, path, None, response_body + if action == "payments_process": + path = route["path"].format(order_id=order_id) + response_body = { + "id": order_id, + "order_id": order_id, + "amount": f"{rng.uniform(19.0, 188.0):.2f}", + "method": "manual", + "status": "success", + "transaction_id": f"TXN-{order_id}-{1777000000 + order_id}", + "created_at": "2026-04-24T08:00:00", + } + return method, path, None, response_body + if action == "user_profile": + path = route["path"].format(user_id=user_id) + response_body = { + "id": user_id, + "username": f"user{user_id}", + "email": f"user{user_id}@example.com", + "vip_level": rng.randint(0, 3), + "created_at": "2026-04-20T09:00:00", + } + return method, path, None, response_body + if action == "user_discount": + path = route["path"].format(user_id=user_id) + response_body = {"user_id": user_id, "discount_rate": rng.choice([0.0, 0.05, 0.1, 0.15])} + return method, path, None, response_body + if action == "user_register": + username = f"festival_{user_id}_{uuid.uuid4().hex[:6]}" + request_body = {"username": username, "email": f"{username}@example.com", "password": "PW123456x"} + response_body = { + "id": user_id, + "username": username, + "email": f"{username}@example.com", + "vip_level": 0, + "created_at": "2026-04-24T08:00:00", + } + return method, route["path"], request_body, response_body + username = f"user{user_id}" + request_body = {"username": username, "password": "PW123456x"} + response_body = {"success": True, "user_id": user_id} + return method, route["path"], request_body, response_body + + +def render_failure( + action: str, + path: str, + user_id: int, + order_id: int, +) -> tuple[int, str, Any]: + route = ROUTE_CATALOG[action] + if route["warning_status"] >= 400: + if route["warning_status"] == 404: + if "users" in path: + return 404, "warning", {"detail": f"User {user_id} not found"} + if "payments" in path: + return 404, "warning", {"detail": f"Order {order_id} not found"} + return 404, "warning", {"detail": "Resource not found"} + if route["warning_status"] == 409: + return 409, "warning", {"detail": "Product 1: requested 3, available 0"} + if route["warning_status"] == 401: + return 401, "warning", {"detail": "Invalid username or password"} + if route["warning_status"] == 400: + return 400, "warning", {"detail": "Bad request payload"} + return route["error_status"], "error", "Internal Server Error" + + +def write_jsonl(handle, record: dict[str, Any]) -> None: + handle.write(json.dumps(record, ensure_ascii=False) + "\n") + + +def service_request_record( + *, + timestamp: str, + service: str, + level: str, + trace_id: str, + method: str, + path: str, + status_code: int, + latency_ms: float, + request_body: Any, +) -> dict[str, Any]: + return { + "timestamp": timestamp, + "service": service, + "level": level, + "event": "service_request", + "pid": 700000 + abs(hash(service)) % 30000, + "source": "service", + "trace_id": trace_id, + "method": method, + "path": path.split("?", 1)[0], + "query": path.split("?", 1)[1] if "?" in path else "", + "status_code": status_code, + "latency_ms": round(latency_ms, 2), + "request_body": request_body, + } + + +def exception_record( + *, + timestamp: str, + service: str, + level: str, + event: str, + trace_id: str, + method: str, + path: str, + status_code: int, + error: str, + exception_type: str, + traceback_text: str, +) -> dict[str, Any]: + return { + "timestamp": timestamp, + "service": service, + "level": level, + "event": event, + "pid": 700000 + abs(hash(service)) % 30000, + "error": error, + "exception_type": exception_type, + "traceback": traceback_text, + "trace_id": trace_id, + "source": "service", + "method": method, + "path": path.split("?", 1)[0], + "status_code": status_code, + } + + +def gateway_access_record( + *, + timestamp: str, + level: str, + trace_id: str, + method: str, + path: str, + status_code: int, + latency_ms: float, + upstream: str, + request_body: Any, + response_body: Any, +) -> dict[str, Any]: + return { + "timestamp": timestamp, + "service": "local-gateway", + "level": level, + "event": "gateway_access", + "pid": 702000, + "trace_id": trace_id, + "source": "gateway", + "method": method, + "path": path.split("?", 1)[0], + "query": path.split("?", 1)[1] if "?" in path else "", + "status_code": status_code, + "latency_ms": round(latency_ms, 2), + "upstream": upstream, + "request_body": request_body, + "response_body": response_body, + } + + +def synthetic_traceback( + *, + file_path: str, + function_name: str, + line_no: int, + code_line: str, + error: str, + exception_type: str, +) -> str: + return ( + "Traceback (most recent call last):\n" + f' File "/app/{file_path}", line {line_no}, in {function_name}\n' + f" {code_line}\n" + f"{exception_type}: {error}\n" + ) + + +def warning_exception_type(action: str) -> str: + return WARNING_EXCEPTION_TYPES.get(action, "ValueError") + + +def error_scenario(action: str, rng: random.Random) -> dict[str, Any]: + scenarios = ERROR_SCENARIOS.get(action) + if not scenarios: + return { + "exception_type": "RuntimeError", + "error": "Unhandled service error", + "event": "service_exception", + "file": "services/shared/request_logging.py", + "function": "request_logging_middleware", + "line": 40, + "code": "response = await call_next(request)", + } + return dict(rng.choice(scenarios)) + + +def generate_dataset(args: argparse.Namespace) -> Path: + rng = random.Random(args.seed) + output_dir = resolve_output_dir(args.output_dir) + if args.clean and output_dir.exists(): + shutil.rmtree(output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + shards = [output_dir / f"traffic-shard-{index:02d}.jsonl" for index in range(1, args.shards + 1)] + handles = [path.open("w", encoding="utf-8") for path in shards] + + start = datetime(2026, 4, 17, 0, 0, 0, tzinfo=UTC) + total_seconds = max(args.days * 24 * 3600, 1) + total_records = 0 + warning_count = 0 + error_count = 0 + order_id = 180000 + user_id = 1000 + service_counts: dict[str, int] = {} + exception_counts: dict[str, int] = {} + + try: + for index in range(args.gateway_records): + ratio = index / max(args.gateway_records - 1, 1) + moment = start + timedelta(seconds=ratio * total_seconds) + timestamp = isoformat_z(moment) + virtual_hour = (ratio * args.days * 24) % 24 + phase, burst, event_name = traffic_phase(virtual_hour) + action = weighted_action(rng, phase) + route = ROUTE_CATALOG[action] + + user_id += 1 if action == "user_register" and rng.random() < 0.55 else 0 + if action == "orders_create": + order_id += 1 + method, path, request_body, success_body = payload_for_action( + rng, + action, + order_id=order_id, + user_id=max(1, user_id - rng.randint(0, 200)), + ) + trace_id = uuid.uuid4().hex + base_latency = rng.uniform(4.0, 18.0) + if phase == "peak": + base_latency *= rng.uniform(1.1, 2.2) + elif phase == "flash": + base_latency *= rng.uniform(1.5, 3.8) + + probability_warning = 0.012 if phase in {"valley", "warm"} else 0.022 + probability_error = 0.0012 if phase in {"valley", "warm"} else 0.003 + roll = rng.random() + if roll < probability_error: + status_code, level, response_body = render_failure(action, path, user_id, order_id) + if level != "error": + status_code = route["error_status"] + level = "error" + response_body = "Internal Server Error" + error_count += 1 + elif roll < probability_error + probability_warning: + status_code, level, response_body = render_failure(action, path, user_id, order_id) + warning_count += 1 + else: + status_code = route["success_status"] + level = "info" + response_body = success_body + + service_latency = max(1.0, base_latency * rng.uniform(0.75, 0.95)) + gateway_latency = max(service_latency + rng.uniform(0.8, 4.2), 1.2) + shard_handle = handles[index % len(handles)] + service_counts[route["service"]] = service_counts.get(route["service"], 0) + 1 + + if level != "info": + error_text = ( + response_body.get("detail", "Internal Server Error") + if isinstance(response_body, dict) + else str(response_body) + ) + if status_code >= 500: + scenario = error_scenario(action, rng) + exception_type = scenario["exception_type"] + error_text = scenario["error"] + event = scenario["event"] + file_path = scenario["file"] + function_name = scenario["function"] + line_no = scenario["line"] + code_line = scenario["code"] + else: + exception_type = warning_exception_type(action) + event = f"{action}_warning" + file_path = f"services/{route['service'].replace('-service', '')}/routes.py" + function_name = "route_handler" + line_no = 42 + code_line = f'raise {exception_type}("{error_text}")' + exception_counts[exception_type] = exception_counts.get(exception_type, 0) + 1 + write_jsonl( + shard_handle, + exception_record( + timestamp=timestamp, + service=route["service"], + level=level, + event=event, + trace_id=trace_id, + method=method, + path=path, + status_code=status_code, + error=error_text, + exception_type=exception_type, + traceback_text=synthetic_traceback( + file_path=file_path, + function_name=function_name, + line_no=line_no, + code_line=code_line, + error=error_text, + exception_type=exception_type, + ), + ), + ) + total_records += 1 + + write_jsonl( + shard_handle, + service_request_record( + timestamp=timestamp, + service=route["service"], + level=level, + trace_id=trace_id, + method=method, + path=path, + status_code=status_code, + latency_ms=service_latency, + request_body=request_body, + ), + ) + total_records += 1 + + write_jsonl( + shard_handle, + gateway_access_record( + timestamp=timestamp, + level=level, + trace_id=trace_id, + method=method, + path=path, + status_code=status_code, + latency_ms=gateway_latency, + upstream=route["upstream"], + request_body=request_body, + response_body=response_body, + ), + ) + total_records += 1 + + if (index + 1) % 100000 == 0: + print( + f"[dataset] generated gateway={index + 1} total_records={total_records}", + flush=True, + ) + finally: + for handle in handles: + handle.close() + + manifest = { + "generated_at": isoformat_z(datetime.now(UTC)), + "dataset": output_dir.name, + "gateway_access_records": args.gateway_records, + "total_records": total_records, + "warnings": warning_count, + "errors": error_count, + "days": args.days, + "seed": args.seed, + "shards": [path.name for path in shards], + "service_counts": service_counts, + "exception_counts": exception_counts, + "notes": { + "phases": ["valley", "warm", "peak", "flash"], + "description": "Synthetic ecommerce traffic with gateway access, service request, warning, and traceback-bearing error records aligned to live repair-demo bug paths.", + }, + } + (output_dir / "manifest.json").write_text( + json.dumps(manifest, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + return output_dir + + +def main() -> int: + args = parse_args() + output_dir = generate_dataset(args) + print(f"Dataset written to {output_dir}", flush=True) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/ecommerce-platform/scripts/local_gateway.py b/ecommerce-platform/scripts/local_gateway.py index f18e7ec..7db03b9 100644 --- a/ecommerce-platform/scripts/local_gateway.py +++ b/ecommerce-platform/scripts/local_gateway.py @@ -6,16 +6,21 @@ import os import sqlite3 import time +import uuid from collections import Counter, deque -from dataclasses import asdict, dataclass +from dataclasses import dataclass from pathlib import Path from threading import Lock from typing import Any, Final import httpx +import structlog from fastapi import FastAPI, Request, Response from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse +from services.shared.event_log import body_for_log, write_event, write_exception_event +from services.shared.logger import configure_logging + ORDER_BASE: Final[str] = os.getenv("ORDER_BASE_URL", "http://127.0.0.1:58001") INVENTORY_BASE: Final[str] = os.getenv( @@ -34,6 +39,7 @@ MONITOR_HTML = ( Path(__file__).resolve().with_name("monitor_dashboard.html").read_text(encoding="utf-8") ) +logger = configure_logging("local-gateway") @dataclass(slots=True) @@ -577,12 +583,14 @@ async def _proxy(request: Request, base_url: str) -> Response: upstream = f"{base_url}{request.url.path}" if request.url.query: upstream = f"{upstream}?{request.url.query}" + trace_id = request.headers.get("x-trace-id") or uuid.uuid4().hex headers = { key: value for key, value in request.headers.items() if key.lower() not in {"host", "content-length"} } content = await request.body() + headers["x-trace-id"] = trace_id assert client is not None started_at = monitor_state.request_started() @@ -616,11 +624,37 @@ async def _proxy(request: Request, base_url: str) -> Response: simulator_stage=simulator_stage, simulator_event=simulator_event, ) + latency_ms = round((time.perf_counter() - started_at) * 1000, 2) + level = "info" + if response.status_code >= 500: + level = "error" + elif response.status_code >= 400: + level = "warning" + event_payload = { + "trace_id": trace_id, + "source": "gateway", + "method": request.method, + "path": request.url.path, + "query": request.url.query, + "status_code": response.status_code, + "latency_ms": latency_ms, + "upstream": base_url, + "request_body": body_for_log(content, request.headers.get("content-type")), + "response_body": body_for_log(body_bytes, response.headers.get("content-type")), + } + getattr(logger, level)("gateway_access", **event_payload) + write_event( + service="local-gateway", + level=level, + event="gateway_access", + **event_payload, + ) response_headers = { key: value for key, value in response.headers.items() if key.lower() not in {"content-length", "transfer-encoding", "connection"} } + response_headers["x-trace-id"] = trace_id return Response( content=body_bytes, status_code=response.status_code, @@ -631,6 +665,28 @@ async def _proxy(request: Request, base_url: str) -> Response: body_bytes = json.dumps( {"detail": f"Upstream request failed: {exc.__class__.__name__}"} ).encode("utf-8") + logger.error( + "gateway_upstream_failed", + trace_id=trace_id, + method=request.method, + path=request.url.path, + upstream=base_url, + exc_info=True, + ) + write_exception_event( + service="local-gateway", + level="error", + event="gateway_upstream_failed", + exc=exc, + trace_id=trace_id, + source="gateway", + method=request.method, + path=request.url.path, + query=request.url.query, + status_code=502, + upstream=base_url, + request_body=body_for_log(content, request.headers.get("content-type")), + ) monitor_state.request_finished( started_at=started_at, path=request.url.path, @@ -649,6 +705,7 @@ async def _proxy(request: Request, base_url: str) -> Response: return Response( content=body_bytes, status_code=502, + headers={"x-trace-id": trace_id}, media_type="application/json", ) diff --git a/ecommerce-platform/scripts/replay_log.py b/ecommerce-platform/scripts/replay_log.py new file mode 100644 index 0000000..ee14c1c --- /dev/null +++ b/ecommerce-platform/scripts/replay_log.py @@ -0,0 +1,274 @@ +from __future__ import annotations + +import argparse +import asyncio +import json +import time +import uuid +from collections.abc import Iterator +from datetime import datetime +from pathlib import Path +from typing import Any + +import httpx + +from services.shared.settings import log_file + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Replay ecommerce gateway access logs from a file or a shard directory." + ) + parser.add_argument( + "--input", + default=str(log_file()), + help="Path to a JSONL file or a directory containing JSONL shards.", + ) + parser.add_argument( + "--base-url", + default="http://127.0.0.1:58081", + help="Replay target base URL.", + ) + parser.add_argument( + "--limit", + type=int, + default=0, + help="Replay at most N matching gateway access records. 0 means all.", + ) + parser.add_argument( + "--pace", + choices={"off", "recorded"}, + default="off", + help="Replay immediately or preserve recorded spacing.", + ) + parser.add_argument( + "--speed", + type=float, + default=1.0, + help="Speed multiplier when --pace=recorded. 2.0 means twice as fast.", + ) + parser.add_argument( + "--only-errors", + action="store_true", + help="Replay only gateway records with warning/error statuses.", + ) + parser.add_argument( + "--workers", + type=int, + default=8, + help="Max concurrent in-flight replay requests when --pace=off.", + ) + parser.add_argument( + "--output", + default="logs/replay-results.jsonl", + help="Where to write replay result records.", + ) + return parser.parse_args() + + +def resolve_path(path_str: str) -> Path: + path = Path(path_str).expanduser() + if not path.is_absolute(): + path = Path(__file__).resolve().parents[1] / path + return path + + +def resolve_output(path_str: str) -> Path: + path = resolve_path(path_str) + path.parent.mkdir(parents=True, exist_ok=True) + return path + + +def iter_input_files(path: Path) -> Iterator[Path]: + if path.is_file(): + yield path + return + if not path.exists(): + raise FileNotFoundError(path) + manifest = path / "manifest.json" + if manifest.exists(): + payload = json.loads(manifest.read_text(encoding="utf-8")) + for name in payload.get("shards", []): + shard = path / name + if shard.exists(): + yield shard + return + for candidate in sorted(path.glob("*.jsonl")): + if candidate.name.startswith("replay-results"): + continue + yield candidate + + +def iter_gateway_records(path: Path, *, only_errors: bool) -> Iterator[dict[str, Any]]: + for input_file in iter_input_files(path): + with input_file.open("r", encoding="utf-8") as handle: + for line in handle: + line = line.strip() + if not line: + continue + try: + record = json.loads(line) + except json.JSONDecodeError: + continue + if record.get("event") != "gateway_access": + continue + if record.get("source") != "gateway": + continue + status_code = int(record.get("status_code", 0) or 0) + if only_errors and status_code < 400: + continue + yield record + + +def build_url(base_url: str, record: dict[str, Any]) -> str: + path = record.get("path", "") + query = record.get("query", "") + if query: + return f"{base_url.rstrip('/')}{path}?{query}" + return f"{base_url.rstrip('/')}{path}" + + +def body_and_headers(record: dict[str, Any]) -> tuple[dict[str, str], Any, bytes | None]: + headers = {"x-replay-source": "replay-log", "x-trace-id": f"replay-{uuid.uuid4().hex}"} + request_body = record.get("request_body") + if request_body is None: + return headers, None, None + if isinstance(request_body, (dict, list)): + headers["content-type"] = "application/json" + return headers, request_body, None + if isinstance(request_body, str): + return headers, None, request_body.encode("utf-8") + return headers, None, json.dumps(request_body, ensure_ascii=False).encode("utf-8") + + +def _response_body(response: httpx.Response) -> Any: + content_type = response.headers.get("content-type", "") + text = response.text + if len(text) > 1000: + text = f"{text[:1000]}...[truncated]" + if "json" in content_type.lower(): + try: + return response.json() + except json.JSONDecodeError: + return text + return text + + +def _parse_timestamp(raw: Any) -> float | None: + if not isinstance(raw, str): + return None + try: + return datetime.fromisoformat(raw.replace("Z", "+00:00")).timestamp() + except ValueError: + return None + + +async def replay_one( + client: httpx.AsyncClient, + *, + index: int, + base_url: str, + record: dict[str, Any], +) -> dict[str, Any]: + headers, json_body, raw_body = body_and_headers(record) + started = time.perf_counter() + response = await client.request( + method=record.get("method", "GET"), + url=build_url(base_url, record), + headers=headers, + json=json_body, + content=raw_body, + ) + latency_ms = round((time.perf_counter() - started) * 1000, 2) + result = { + "replay_index": index, + "timestamp": time.time(), + "method": record.get("method"), + "path": record.get("path"), + "recorded_status": record.get("status_code"), + "replayed_status": response.status_code, + "latency_ms": latency_ms, + "trace_id": response.headers.get("x-trace-id"), + "response_body": _response_body(response), + } + print( + f"[replay] #{index} {record.get('method')} {record.get('path')} " + f"{record.get('status_code')} -> {response.status_code} {latency_ms:.2f}ms", + flush=True, + ) + return result + + +async def replay_records( + records: Iterator[dict[str, Any]], + *, + base_url: str, + output_path: Path, + limit: int, + pace: str, + speed: float, + workers: int, +) -> int: + timeout = httpx.Timeout(30.0) + completed = 0 + previous_timestamp: float | None = None + semaphore = asyncio.Semaphore(max(1, workers)) + pending: set[asyncio.Task] = set() + + async with httpx.AsyncClient(timeout=timeout) as client: + with output_path.open("w", encoding="utf-8") as output: + async def submit(index: int, record: dict[str, Any]) -> None: + async with semaphore: + result = await replay_one(client, index=index, base_url=base_url, record=record) + output.write(json.dumps(result, ensure_ascii=False) + "\n") + output.flush() + + for index, record in enumerate(records, start=1): + if limit > 0 and index > limit: + break + + current_ts = _parse_timestamp(record.get("timestamp")) + if pace == "recorded": + if previous_timestamp is not None and current_ts is not None: + delta = max(0.0, current_ts - previous_timestamp) + await asyncio.sleep(delta / max(speed, 0.001)) + previous_timestamp = current_ts if current_ts is not None else previous_timestamp + await submit(index, record) + completed += 1 + continue + + task = asyncio.create_task(submit(index, record)) + pending.add(task) + if len(pending) >= max(1, workers) * 2: + done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) + completed += len(done) + + if pending: + done, _ = await asyncio.wait(pending) + completed += len(done) + return completed + + +async def main() -> int: + args = parse_args() + input_path = resolve_path(args.input) + output_path = resolve_output(args.output) + records = iter_gateway_records(input_path, only_errors=args.only_errors) + total = await replay_records( + records, + base_url=args.base_url, + output_path=output_path, + limit=args.limit, + pace=args.pace, + speed=args.speed, + workers=args.workers, + ) + if total == 0: + print("No matching gateway_access records found.", flush=True) + return 1 + print(f"Replayed {total} request(s). Results written to {output_path}", flush=True) + return 0 + + +if __name__ == "__main__": + raise SystemExit(asyncio.run(main())) diff --git a/ecommerce-platform/scripts/run_local_reload_stack.sh b/ecommerce-platform/scripts/run_local_reload_stack.sh new file mode 100755 index 0000000..a60a261 --- /dev/null +++ b/ecommerce-platform/scripts/run_local_reload_stack.sh @@ -0,0 +1,162 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +RUNTIME_DIR="$ROOT/.runtime/local-stack" +PID_DIR="$RUNTIME_DIR/pids" +LOG_DIR="$RUNTIME_DIR/logs" +PORTS_FILE="$RUNTIME_DIR/ports.env" +DB_FILE="${DB_FILE:-$ROOT/.runtime/ecommerce.db}" +APP_LOG_FILE="${LOG_FILE:-$ROOT/logs/ecommerce-debug.jsonl}" + +mkdir -p "$PID_DIR" "$LOG_DIR" "$(dirname "$DB_FILE")" "$(dirname "$APP_LOG_FILE")" + +if [[ -x "$ROOT/.venv/bin/python" ]]; then + PYTHON_BIN="$ROOT/.venv/bin/python" +else + PYTHON_BIN="${PYTHON_BIN:-python3}" +fi + +if [[ -f "$ROOT/.env" ]]; then + # shellcheck disable=SC1091 + source "$ROOT/.env" +fi + +pick_port() { + local preferred="$1" + shift + "$PYTHON_BIN" - "$preferred" "$@" <<'PY' +import socket +import sys + +preferred = int(sys.argv[1]) +candidates = [preferred, *[int(value) for value in sys.argv[2:]]] + +def available(port: int) -> bool: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + return sock.connect_ex(("127.0.0.1", port)) != 0 + +for port in candidates: + if available(port): + print(port) + raise SystemExit(0) + +with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + print(sock.getsockname()[1]) +PY +} + +wait_for_http() { + local url="$1" + local name="$2" + for _ in $(seq 1 80); do + if "$PYTHON_BIN" - "$url" <<'PY' +import sys +import urllib.request + +url = sys.argv[1] +try: + with urllib.request.urlopen(url, timeout=1.5) as resp: + if resp.status < 500: + raise SystemExit(0) +except Exception: + pass +raise SystemExit(1) +PY + then + return 0 + fi + sleep 0.5 + done + echo "Timed out waiting for $name at $url" >&2 + return 1 +} + +start_service() { + local name="$1" + local module="$2" + local port="$3" + local service_name="$4" + shift 4 + local extra_env=("$@") + local logfile="$LOG_DIR/$name.log" + local pidfile="$PID_DIR/$name.pid" + if [[ -f "$pidfile" ]] && kill -0 "$(cat "$pidfile")" 2>/dev/null; then + echo "$name already running on PID $(cat "$pidfile")" + return 0 + fi + + local env_prefix="" + for kv in "${extra_env[@]}"; do + env_prefix+="export ${kv}; " + done + nohup bash -lc " + cd '$ROOT' + export PYTHONPATH='$ROOT' + export DATABASE_URL='sqlite:///$DB_FILE' + export REDIS_URL='${REDIS_URL:-redis://127.0.0.1:6379/0}' + export SYNC_TASKS='true' + export LOG_FILE='$APP_LOG_FILE' + export SERVICE_NAME='$service_name' + ${env_prefix} + exec '$PYTHON_BIN' -m uvicorn '$module' \ + --host 127.0.0.1 \ + --port '$port' \ + --reload \ + --reload-dir '$ROOT/services' \ + --reload-dir '$ROOT/scripts' + " >"$logfile" 2>&1 "$pidfile" +} + +ORDER_PORT="$(pick_port 58001 58011 58101)" +INVENTORY_PORT="$(pick_port 58002 58012 58102)" +PAYMENT_PORT="$(pick_port 58003 58013 58103)" +USER_PORT="$(pick_port 58004 58014 58104)" +GATEWAY_PORT="$(pick_port 58080 58081 58180)" + +cat >"$PORTS_FILE" <"$LOG_DIR/init-db.log" 2>&1 + +start_service order services.order.main:app "$ORDER_PORT" order-service +start_service inventory services.inventory.main:app "$INVENTORY_PORT" inventory-service +start_service payment services.payment.main:app "$PAYMENT_PORT" payment-service +start_service user services.user.main:app "$USER_PORT" user-service + +wait_for_http "http://127.0.0.1:$ORDER_PORT/health" "order-service" +wait_for_http "http://127.0.0.1:$INVENTORY_PORT/health" "inventory-service" +wait_for_http "http://127.0.0.1:$PAYMENT_PORT/health" "payment-service" +wait_for_http "http://127.0.0.1:$USER_PORT/health" "user-service" + +start_service gateway scripts.local_gateway:app "$GATEWAY_PORT" local-gateway \ + ORDER_BASE_URL="http://127.0.0.1:$ORDER_PORT" \ + INVENTORY_BASE_URL="http://127.0.0.1:$INVENTORY_PORT" \ + PAYMENT_BASE_URL="http://127.0.0.1:$PAYMENT_PORT" \ + USER_BASE_URL="http://127.0.0.1:$USER_PORT" \ + SIMULATOR_STATUS_FILE="$ROOT/.runtime/traffic-simulator-$GATEWAY_PORT.json" + +wait_for_http "http://127.0.0.1:$GATEWAY_PORT/health" "local-gateway" + +echo "Local reload stack is running." +echo "Gateway: http://127.0.0.1:$GATEWAY_PORT" +echo "Monitor: http://127.0.0.1:$GATEWAY_PORT/monitor" +echo "Ports file: $PORTS_FILE" +echo "Unified log: $APP_LOG_FILE" +echo "PID dir: $PID_DIR" diff --git a/ecommerce-platform/scripts/stop_local_reload_stack.sh b/ecommerce-platform/scripts/stop_local_reload_stack.sh new file mode 100755 index 0000000..fe3e73f --- /dev/null +++ b/ecommerce-platform/scripts/stop_local_reload_stack.sh @@ -0,0 +1,31 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +PID_DIR="$ROOT/.runtime/local-stack/pids" + +if [[ ! -d "$PID_DIR" ]]; then + echo "No local stack PID directory found." + exit 0 +fi + +for pidfile in "$PID_DIR"/*.pid; do + [[ -e "$pidfile" ]] || continue + pid="$(cat "$pidfile")" + name="$(basename "$pidfile" .pid)" + if kill -0 "$pid" 2>/dev/null; then + kill "$pid" 2>/dev/null || true + for _ in $(seq 1 20); do + if kill -0 "$pid" 2>/dev/null; then + sleep 0.25 + else + break + fi + done + if kill -0 "$pid" 2>/dev/null; then + kill -9 "$pid" 2>/dev/null || true + fi + echo "Stopped $name (PID $pid)" + fi + rm -f "$pidfile" +done diff --git a/ecommerce-platform/services/inventory/bugs.py b/ecommerce-platform/services/inventory/bugs.py index eecb66d..67fa136 100644 --- a/ecommerce-platform/services/inventory/bugs.py +++ b/ecommerce-platform/services/inventory/bugs.py @@ -1,9 +1,17 @@ from __future__ import annotations -from services.shared.settings import bool_env +from services.shared.settings import bool_env, env class BugFlags: @staticmethod def race_condition_no_lock() -> bool: return bool_env("BUG_RACE_CONDITION", False) + + @staticmethod + def missing_row_attribute_error() -> bool: + return bool_env("BUG_INVENTORY_MISSING_ROW", False) + + @staticmethod + def broken_product_id() -> int: + return int(env("BUG_INVENTORY_BROKEN_PRODUCT_ID", "7")) diff --git a/ecommerce-platform/services/inventory/main.py b/ecommerce-platform/services/inventory/main.py index e7013d0..b82d865 100644 --- a/ecommerce-platform/services/inventory/main.py +++ b/ecommerce-platform/services/inventory/main.py @@ -7,12 +7,14 @@ from services.inventory.routes import router from services.shared.database import init_database from services.shared.logger import configure_logging +from services.shared.request_logging import install_request_logging from services.shared.settings import service_name logger = configure_logging(service_name("inventory-service")) app = FastAPI(title="Inventory Service", version="1.0.0") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) +install_request_logging(app, service=service_name("inventory-service")) Instrumentator().instrument(app).expose(app) app.include_router(router, prefix="/api/v1") diff --git a/ecommerce-platform/services/inventory/routes.py b/ecommerce-platform/services/inventory/routes.py index ff4d478..7be1a21 100644 --- a/ecommerce-platform/services/inventory/routes.py +++ b/ecommerce-platform/services/inventory/routes.py @@ -1,21 +1,40 @@ from __future__ import annotations -from fastapi import APIRouter, Depends, HTTPException +import structlog +from fastapi import APIRouter, Depends, HTTPException, Request from sqlalchemy.orm import Session from services.inventory.schemas import InventoryResponse from services.inventory.service import get_inventory from services.shared.database import get_db +from services.shared.event_log import write_exception_event router = APIRouter() +logger = structlog.get_logger() @router.get("/inventory/{product_id}", response_model=InventoryResponse) -def get_inventory_route(product_id: int, db: Session = Depends(get_db)) -> InventoryResponse: +def get_inventory_route( + product_id: int, + request: Request, + db: Session = Depends(get_db), +) -> InventoryResponse: try: inventory = get_inventory(db, product_id) except ValueError as exc: + logger.warning("inventory_not_found", error=str(exc), exc_info=True) + write_exception_event( + service="inventory-service", + level="warning", + event="inventory_lookup_failed", + exc=exc, + trace_id=getattr(request.state, "trace_id", None), + source="service", + method=request.method, + path=request.url.path, + status_code=404, + ) raise HTTPException(status_code=404, detail=str(exc)) from exc available_qty = inventory.total_qty - inventory.reserved_qty - inventory.sold_qty return InventoryResponse( diff --git a/ecommerce-platform/services/inventory/service.py b/ecommerce-platform/services/inventory/service.py index 597a1c0..46f0e0b 100644 --- a/ecommerce-platform/services/inventory/service.py +++ b/ecommerce-platform/services/inventory/service.py @@ -40,12 +40,36 @@ def redis_lock(key: str, timeout: int = 5): def get_inventory(db: Session, product_id: int) -> Inventory: + if ( + BugFlags.missing_row_attribute_error() + and product_id == BugFlags.broken_product_id() + ): + inventory = None + return inventory.total_qty # type: ignore[union-attr,return-value] inventory = db.query(Inventory).filter(Inventory.product_id == product_id).first() if inventory is None: raise ValueError(f"Inventory for product {product_id} not found") return inventory +def _locked_inventory(db: Session, product_id: int) -> Inventory: + if ( + BugFlags.missing_row_attribute_error() + and product_id == BugFlags.broken_product_id() + ): + inventory = None + return inventory.total_qty # type: ignore[union-attr,return-value] + inventory = ( + db.query(Inventory) + .filter(Inventory.product_id == product_id) + .with_for_update() + .first() + ) + if inventory is None: + raise ValueError(f"Inventory for product {product_id} not found") + return inventory + + def reserve_inventory(db: Session, order_id: int, items: list[dict]) -> None: for item in items: product_id = item["product_id"] @@ -62,14 +86,7 @@ def reserve_inventory(db: Session, order_id: int, items: list[dict]) -> None: continue with redis_lock(f"product:{product_id}"): - inventory = ( - db.query(Inventory) - .filter(Inventory.product_id == product_id) - .with_for_update() - .first() - ) - if inventory is None: - raise ValueError(f"Inventory for product {product_id} not found") + inventory = _locked_inventory(db, product_id) available = inventory.total_qty - inventory.reserved_qty - inventory.sold_qty if available < qty: raise InsufficientStockError(product_id, qty, available) @@ -82,14 +99,7 @@ def confirm_inventory(db: Session, order_id: int, items: list[dict]) -> None: product_id = item["product_id"] qty = item["quantity"] with redis_lock(f"product:{product_id}"): - inventory = ( - db.query(Inventory) - .filter(Inventory.product_id == product_id) - .with_for_update() - .first() - ) - if inventory is None: - raise ValueError(f"Inventory for product {product_id} not found") + inventory = _locked_inventory(db, product_id) inventory.reserved_qty = max(0, inventory.reserved_qty - qty) inventory.sold_qty += qty db.commit() @@ -100,13 +110,6 @@ def release_inventory(db: Session, order_id: int, items: list[dict]) -> None: product_id = item["product_id"] qty = item["quantity"] with redis_lock(f"product:{product_id}"): - inventory = ( - db.query(Inventory) - .filter(Inventory.product_id == product_id) - .with_for_update() - .first() - ) - if inventory is None: - raise ValueError(f"Inventory for product {product_id} not found") + inventory = _locked_inventory(db, product_id) inventory.reserved_qty = max(0, inventory.reserved_qty - qty) db.commit() diff --git a/ecommerce-platform/services/inventory/tests/test_service.py b/ecommerce-platform/services/inventory/tests/test_service.py index 79bd8ce..b34f8b9 100644 --- a/ecommerce-platform/services/inventory/tests/test_service.py +++ b/ecommerce-platform/services/inventory/tests/test_service.py @@ -1,5 +1,7 @@ from __future__ import annotations +import pytest + from services.inventory.service import get_inventory, reserve_inventory @@ -14,3 +16,12 @@ def test_reserve_inventory_increases_reserved_qty(seeded_db): inventory = get_inventory(db, ids["product_id"]) assert inventory.reserved_qty == 2 + + +def test_bug_missing_row_attribute_error_raises(seeded_db, monkeypatch): + db, ids = seeded_db + monkeypatch.setenv("BUG_INVENTORY_MISSING_ROW", "true") + monkeypatch.setenv("BUG_INVENTORY_BROKEN_PRODUCT_ID", str(ids["product_id"])) + + with pytest.raises(AttributeError): + get_inventory(db, ids["product_id"]) diff --git a/ecommerce-platform/services/order/bugs.py b/ecommerce-platform/services/order/bugs.py index ba28350..2b693cc 100644 --- a/ecommerce-platform/services/order/bugs.py +++ b/ecommerce-platform/services/order/bugs.py @@ -7,3 +7,7 @@ class BugFlags: @staticmethod def index_error() -> bool: return bool_env("BUG_INDEX_ERROR", False) + + @staticmethod + def coupon_lookup_key_error() -> bool: + return bool_env("BUG_ORDER_COUPON_KEY", False) diff --git a/ecommerce-platform/services/order/main.py b/ecommerce-platform/services/order/main.py index f6f2948..928b1b4 100644 --- a/ecommerce-platform/services/order/main.py +++ b/ecommerce-platform/services/order/main.py @@ -7,12 +7,14 @@ from services.order.routes import router from services.shared.database import init_database from services.shared.logger import configure_logging +from services.shared.request_logging import install_request_logging from services.shared.settings import service_name logger = configure_logging(service_name("order-service")) app = FastAPI(title="Order Service", version="1.0.0") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) +install_request_logging(app, service=service_name("order-service")) Instrumentator().instrument(app).expose(app) app.include_router(router, prefix="/api/v1") diff --git a/ecommerce-platform/services/order/routes.py b/ecommerce-platform/services/order/routes.py index eacf24b..c6cc8a6 100644 --- a/ecommerce-platform/services/order/routes.py +++ b/ecommerce-platform/services/order/routes.py @@ -1,12 +1,13 @@ from __future__ import annotations import structlog -from fastapi import APIRouter, Depends, HTTPException +from fastapi import APIRouter, Depends, HTTPException, Request from sqlalchemy.orm import Session from services.order.schemas import OrderCreate, OrderResponse from services.order.service import cancel_order, create_order, get_order, get_user_orders from services.shared.database import get_db +from services.shared.event_log import write_exception_event from services.shared.exceptions import ( InsufficientStockError, InvalidOrderStatusError, @@ -19,40 +20,123 @@ @router.post("/orders", response_model=OrderResponse, status_code=201) -def create_order_route(payload: OrderCreate, db: Session = Depends(get_db)) -> OrderResponse: +def create_order_route( + payload: OrderCreate, + request: Request, + db: Session = Depends(get_db), +) -> OrderResponse: try: order = create_order(db, payload) logger.info("order_created", order_id=order.id, order_no=order.order_no) return OrderResponse.model_validate(order) except InsufficientStockError as exc: + logger.warning("insufficient_stock", error=str(exc), exc_info=True) + write_exception_event( + service="order-service", + level="warning", + event="order_create_warning", + exc=exc, + trace_id=getattr(request.state, "trace_id", None), + source="service", + method=request.method, + path=request.url.path, + status_code=409, + ) raise HTTPException(status_code=409, detail=str(exc)) from exc except ValueError as exc: + logger.warning("order_create_invalid", error=str(exc), exc_info=True) + write_exception_event( + service="order-service", + level="warning", + event="order_create_invalid", + exc=exc, + trace_id=getattr(request.state, "trace_id", None), + source="service", + method=request.method, + path=request.url.path, + status_code=400, + ) raise HTTPException(status_code=400, detail=str(exc)) from exc except Exception as exc: logger.error("order_creation_failed", error=str(exc), exc_info=True) + write_exception_event( + service="order-service", + level="error", + event="order_create_failed", + exc=exc, + trace_id=getattr(request.state, "trace_id", None), + source="service", + method=request.method, + path=request.url.path, + status_code=500, + ) raise HTTPException(status_code=500, detail="Internal server error") from exc @router.get("/orders/{order_id}", response_model=OrderResponse) -def get_order_route(order_id: int, db: Session = Depends(get_db)) -> OrderResponse: +def get_order_route(order_id: int, request: Request, db: Session = Depends(get_db)) -> OrderResponse: try: return OrderResponse.model_validate(get_order(db, order_id)) except OrderNotFoundError as exc: + logger.warning("order_not_found", error=str(exc), exc_info=True) + write_exception_event( + service="order-service", + level="warning", + event="order_lookup_warning", + exc=exc, + trace_id=getattr(request.state, "trace_id", None), + source="service", + method=request.method, + path=request.url.path, + status_code=404, + ) raise HTTPException(status_code=404, detail=str(exc)) from exc @router.get("/orders/user/{user_id}", response_model=list[OrderResponse]) -def get_user_orders_route(user_id: int, db: Session = Depends(get_db)) -> list[OrderResponse]: +def get_user_orders_route( + user_id: int, + request: Request, + db: Session = Depends(get_db), +) -> list[OrderResponse]: orders = get_user_orders(db, user_id) return [OrderResponse.model_validate(order) for order in orders] @router.post("/orders/{order_id}/cancel") -def cancel_order_route(order_id: int, db: Session = Depends(get_db)) -> dict[str, str]: +def cancel_order_route( + order_id: int, + request: Request, + db: Session = Depends(get_db), +) -> dict[str, str]: try: cancel_order(db, order_id) return {"message": "Order cancelled"} except OrderNotFoundError as exc: + logger.warning("order_cancel_not_found", error=str(exc), exc_info=True) + write_exception_event( + service="order-service", + level="warning", + event="order_cancel_not_found", + exc=exc, + trace_id=getattr(request.state, "trace_id", None), + source="service", + method=request.method, + path=request.url.path, + status_code=404, + ) raise HTTPException(status_code=404, detail=str(exc)) from exc except InvalidOrderStatusError as exc: + logger.warning("order_cancel_invalid_status", error=str(exc), exc_info=True) + write_exception_event( + service="order-service", + level="warning", + event="order_cancel_invalid_status", + exc=exc, + trace_id=getattr(request.state, "trace_id", None), + source="service", + method=request.method, + path=request.url.path, + status_code=409, + ) raise HTTPException(status_code=409, detail=str(exc)) from exc diff --git a/ecommerce-platform/services/order/service.py b/ecommerce-platform/services/order/service.py index dbdc714..732a7a1 100644 --- a/ecommerce-platform/services/order/service.py +++ b/ecommerce-platform/services/order/service.py @@ -18,6 +18,11 @@ from services.shared.settings import sync_tasks from services.user.service import get_vip_discount +COUPON_DISCOUNTS = { + "SAVE10": Decimal("10.00"), + "SAVE20": Decimal("20.00"), +} + def generate_order_no() -> str: return f"ORD-{int(time.time())}-{str(uuid.uuid4())[:8].upper()}" @@ -25,10 +30,11 @@ def generate_order_no() -> str: def _coupon_discount(payload: OrderCreate) -> Decimal: code = (payload.coupon_code or "").strip().upper() - if code == "SAVE10": - return Decimal("10.00") - if code == "SAVE20": - return Decimal("20.00") + if BugFlags.coupon_lookup_key_error(): + # Intentional bug path for repair demos: missing/unknown coupons explode as KeyError. + return COUPON_DISCOUNTS[payload.coupon_code] + if code in COUPON_DISCOUNTS: + return COUPON_DISCOUNTS[code] return Decimal("0") diff --git a/ecommerce-platform/services/order/tests/test_service.py b/ecommerce-platform/services/order/tests/test_service.py index f100df0..17581d2 100644 --- a/ecommerce-platform/services/order/tests/test_service.py +++ b/ecommerce-platform/services/order/tests/test_service.py @@ -29,3 +29,18 @@ def test_bug_index_error_raises_for_user_without_orders(seeded_db, monkeypatch): with pytest.raises(IndexError): get_user_orders(db, ids["second_user_id"]) + + +def test_bug_coupon_lookup_key_error_raises_for_unknown_coupon(seeded_db, monkeypatch): + db, ids = seeded_db + monkeypatch.setenv("BUG_ORDER_COUPON_KEY", "true") + + with pytest.raises(KeyError): + create_order( + db, + OrderCreate( + user_id=ids["user_id"], + items=[{"product_id": ids["product_id"], "quantity": 1}], + coupon_code="FLASH50", + ), + ) diff --git a/ecommerce-platform/services/payment/bugs.py b/ecommerce-platform/services/payment/bugs.py index c2d8b2e..62a2ab9 100644 --- a/ecommerce-platform/services/payment/bugs.py +++ b/ecommerce-platform/services/payment/bugs.py @@ -7,3 +7,7 @@ class BugFlags: @staticmethod def float_precision_error() -> bool: return bool_env("BUG_FLOAT_PRECISION", False) + + @staticmethod + def gateway_contract_key_error() -> bool: + return bool_env("BUG_PAYMENT_GATEWAY_KEY", False) diff --git a/ecommerce-platform/services/payment/main.py b/ecommerce-platform/services/payment/main.py index 8b808b2..e201593 100644 --- a/ecommerce-platform/services/payment/main.py +++ b/ecommerce-platform/services/payment/main.py @@ -7,12 +7,14 @@ from services.payment.routes import router from services.shared.database import init_database from services.shared.logger import configure_logging +from services.shared.request_logging import install_request_logging from services.shared.settings import service_name logger = configure_logging(service_name("payment-service")) app = FastAPI(title="Payment Service", version="1.0.0") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) +install_request_logging(app, service=service_name("payment-service")) Instrumentator().instrument(app).expose(app) app.include_router(router, prefix="/api/v1") diff --git a/ecommerce-platform/services/payment/routes.py b/ecommerce-platform/services/payment/routes.py index e0163ff..dd51fcc 100644 --- a/ecommerce-platform/services/payment/routes.py +++ b/ecommerce-platform/services/payment/routes.py @@ -2,15 +2,18 @@ from decimal import Decimal -from fastapi import APIRouter, Depends, HTTPException, Query +import structlog +from fastapi import APIRouter, Depends, HTTPException, Query, Request from sqlalchemy.orm import Session from services.payment.schemas import PaymentCalculationResponse, PaymentResponse from services.payment.service import calculate_final_amount, process_payment from services.shared.database import get_db +from services.shared.event_log import write_exception_event router = APIRouter() +logger = structlog.get_logger() @router.get("/payments/calculate", response_model=PaymentCalculationResponse) @@ -24,6 +27,7 @@ def calculate_payment_route( @router.post("/payments/{order_id}/process", response_model=PaymentResponse) def process_payment_route( + request: Request, order_id: int, method: str = Query(default="manual"), db: Session = Depends(get_db), @@ -32,4 +36,16 @@ def process_payment_route( payment = process_payment(db, order_id, method) return PaymentResponse.model_validate(payment) except ValueError as exc: + logger.warning("payment_process_not_found", error=str(exc), exc_info=True) + write_exception_event( + service="payment-service", + level="warning", + event="payment_process_not_found", + exc=exc, + trace_id=getattr(request.state, "trace_id", None), + source="service", + method=request.method, + path=request.url.path, + status_code=404, + ) raise HTTPException(status_code=404, detail=str(exc)) from exc diff --git a/ecommerce-platform/services/payment/service.py b/ecommerce-platform/services/payment/service.py index bfd055e..ee92a60 100644 --- a/ecommerce-platform/services/payment/service.py +++ b/ecommerce-platform/services/payment/service.py @@ -24,6 +24,21 @@ def _quantize(value: Decimal) -> Decimal: return value.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP) +def _gateway_settlement_amount(*, total: Decimal, coupon_discount: Decimal) -> Decimal: + settlement_quote = { + "gross_amount": _quantize(total), + "coupon_discount": _quantize(coupon_discount), + "settlement_amount": _quantize(total - coupon_discount), + } + if BugFlags.gateway_contract_key_error() and coupon_discount > Decimal("0"): + # Intentional bug path for repair demos: gateway payload schema drifts on coupon orders. + settlement_quote = { + "gross_amount": settlement_quote["gross_amount"], + "coupon_discount": settlement_quote["coupon_discount"], + } + return settlement_quote["settlement_amount"] + + def calculate_from_discount_rate( total: Decimal, discount_rate: Decimal, @@ -46,7 +61,7 @@ def calculate_from_discount_rate( "final_amount": Decimal(str(final_amount)), } - after_coupon = total - coupon_discount + after_coupon = _gateway_settlement_amount(total=total, coupon_discount=coupon_discount) discount_amount = _quantize(after_coupon * discount_rate) after_discount = after_coupon - discount_amount tax_amount = _quantize(after_discount * TAX_RATE) diff --git a/ecommerce-platform/services/payment/tests/test_service.py b/ecommerce-platform/services/payment/tests/test_service.py index 7723c4b..0e86c8c 100644 --- a/ecommerce-platform/services/payment/tests/test_service.py +++ b/ecommerce-platform/services/payment/tests/test_service.py @@ -2,6 +2,8 @@ from decimal import Decimal +import pytest + from services.payment.service import calculate_final_amount @@ -19,3 +21,14 @@ def test_bug_float_precision_exposes_unrounded_value(monkeypatch) -> None: result = calculate_final_amount(Decimal("99.99"), vip_level=2) assert result["final_amount"] != Decimal("97.19") + + +def test_bug_payment_gateway_contract_key_error_raises(monkeypatch) -> None: + monkeypatch.setenv("BUG_PAYMENT_GATEWAY_KEY", "true") + + with pytest.raises(KeyError): + calculate_final_amount( + Decimal("299.99"), + vip_level=2, + coupon_discount=Decimal("10.00"), + ) diff --git a/ecommerce-platform/services/shared/event_log.py b/ecommerce-platform/services/shared/event_log.py new file mode 100644 index 0000000..70d4171 --- /dev/null +++ b/ecommerce-platform/services/shared/event_log.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +import json +import os +import traceback +from datetime import UTC, date, datetime +from decimal import Decimal +from pathlib import Path +from typing import Any + +from services.shared.settings import log_file + + +def _serialize(value: Any) -> Any: + if isinstance(value, (datetime, date)): + return value.isoformat() + if isinstance(value, Decimal): + return str(value) + if isinstance(value, Path): + return str(value) + if isinstance(value, bytes): + return value.decode("utf-8", errors="replace") + if isinstance(value, dict): + return {str(key): _serialize(item) for key, item in value.items()} + if isinstance(value, (list, tuple, set)): + return [_serialize(item) for item in value] + return value + + +def _normalize_body(body: bytes, content_type: str | None) -> Any: + if not body: + return None + text = body.decode("utf-8", errors="replace") + if len(text) > 5000: + text = f"{text[:5000]}...[truncated]" + if content_type and "json" in content_type.lower(): + try: + return json.loads(text) + except json.JSONDecodeError: + return text + return text + + +def event_log_path() -> Path: + path = log_file() + path.parent.mkdir(parents=True, exist_ok=True) + return path + + +def write_event(*, service: str, level: str, event: str, **payload: Any) -> None: + record = { + "timestamp": datetime.now(UTC).isoformat(timespec="milliseconds").replace("+00:00", "Z"), + "service": service, + "level": level, + "event": event, + "pid": os.getpid(), + **_serialize(payload), + } + path = event_log_path() + with path.open("a", encoding="utf-8") as handle: + handle.write(json.dumps(record, ensure_ascii=False) + "\n") + + +def write_exception_event( + *, + service: str, + level: str, + event: str, + exc: BaseException, + **payload: Any, +) -> None: + write_event( + service=service, + level=level, + event=event, + error=str(exc), + exception_type=type(exc).__name__, + traceback="".join(traceback.format_exception(type(exc), exc, exc.__traceback__)), + **payload, + ) + + +def body_for_log(body: bytes, content_type: str | None) -> Any: + return _normalize_body(body, content_type) diff --git a/ecommerce-platform/services/shared/request_logging.py b/ecommerce-platform/services/shared/request_logging.py new file mode 100644 index 0000000..c347aff --- /dev/null +++ b/ecommerce-platform/services/shared/request_logging.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import time +import uuid +from collections.abc import Awaitable, Callable +from typing import Any + +import structlog +from fastapi import FastAPI, Request, Response +from structlog.contextvars import bind_contextvars, clear_contextvars + +from services.shared.event_log import body_for_log, write_event, write_exception_event + + +def install_request_logging(app: FastAPI, *, service: str) -> None: + logger = structlog.get_logger(service) + + @app.middleware("http") + async def request_logging_middleware( + request: Request, + call_next: Callable[[Request], Awaitable[Response]], + ) -> Response: + trace_id = request.headers.get("x-trace-id") or uuid.uuid4().hex + body = await request.body() + request.state.trace_id = trace_id + + async def receive() -> dict[str, Any]: + return {"type": "http.request", "body": body, "more_body": False} + + request = Request(request.scope, receive) + request.state.trace_id = trace_id + bind_contextvars( + trace_id=trace_id, + method=request.method, + path=request.url.path, + service=service, + ) + started = time.perf_counter() + + try: + response = await call_next(request) + except Exception as exc: + latency_ms = round((time.perf_counter() - started) * 1000, 2) + logger.error( + "request_unhandled_exception", + trace_id=trace_id, + status_code=500, + latency_ms=latency_ms, + exc_info=True, + ) + write_exception_event( + service=service, + level="error", + event="service_exception", + exc=exc, + trace_id=trace_id, + source="service", + method=request.method, + path=request.url.path, + query=request.url.query, + status_code=500, + latency_ms=latency_ms, + request_body=body_for_log(body, request.headers.get("content-type")), + ) + clear_contextvars() + raise + + latency_ms = round((time.perf_counter() - started) * 1000, 2) + response.headers["x-trace-id"] = trace_id + level = "info" + if response.status_code >= 500: + level = "error" + elif response.status_code >= 400: + level = "warning" + getattr(logger, level)( + "request_completed", + trace_id=trace_id, + status_code=response.status_code, + latency_ms=latency_ms, + ) + write_event( + service=service, + level=level, + event="service_request", + source="service", + trace_id=trace_id, + method=request.method, + path=request.url.path, + query=request.url.query, + status_code=response.status_code, + latency_ms=latency_ms, + request_body=body_for_log(body, request.headers.get("content-type")), + ) + clear_contextvars() + return response diff --git a/ecommerce-platform/services/shared/settings.py b/ecommerce-platform/services/shared/settings.py index 9769068..2a6de52 100644 --- a/ecommerce-platform/services/shared/settings.py +++ b/ecommerce-platform/services/shared/settings.py @@ -1,10 +1,13 @@ from __future__ import annotations import os +from pathlib import Path DEFAULT_DATABASE_URL = "postgresql://ecom:password@postgres:5432/ecommerce" DEFAULT_REDIS_URL = "redis://redis:6379/0" +PROJECT_ROOT = Path(__file__).resolve().parents[2] +DEFAULT_LOG_FILE = PROJECT_ROOT / "logs" / "ecommerce-debug.jsonl" def env(name: str, default: str = "") -> str: @@ -31,3 +34,15 @@ def service_name(default: str) -> str: def sync_tasks() -> bool: return bool_env("SYNC_TASKS", False) + + +def project_root() -> Path: + return PROJECT_ROOT + + +def log_file() -> Path: + raw = env("LOG_FILE", str(DEFAULT_LOG_FILE)) + path = Path(raw).expanduser() + if not path.is_absolute(): + path = PROJECT_ROOT / path + return path diff --git a/ecommerce-platform/services/user/main.py b/ecommerce-platform/services/user/main.py index 2ee1bb6..5d84584 100644 --- a/ecommerce-platform/services/user/main.py +++ b/ecommerce-platform/services/user/main.py @@ -6,6 +6,7 @@ from services.shared.database import init_database from services.shared.logger import configure_logging +from services.shared.request_logging import install_request_logging from services.shared.settings import service_name from services.user.routes import router @@ -13,6 +14,7 @@ logger = configure_logging(service_name("user-service")) app = FastAPI(title="User Service", version="1.0.0") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) +install_request_logging(app, service=service_name("user-service")) Instrumentator().instrument(app).expose(app) app.include_router(router, prefix="/api/v1") diff --git a/ecommerce-platform/services/user/routes.py b/ecommerce-platform/services/user/routes.py index c1bdda5..511792d 100644 --- a/ecommerce-platform/services/user/routes.py +++ b/ecommerce-platform/services/user/routes.py @@ -1,9 +1,11 @@ from __future__ import annotations -from fastapi import APIRouter, Depends, HTTPException +import structlog +from fastapi import APIRouter, Depends, HTTPException, Request from sqlalchemy.orm import Session from services.shared.database import get_db +from services.shared.event_log import write_exception_event from services.user.schemas import ( LoginResponse, UserCreate, @@ -15,36 +17,97 @@ router = APIRouter() +logger = structlog.get_logger() @router.post("/users/register", response_model=UserResponse, status_code=201) -def register_user_route(payload: UserCreate, db: Session = Depends(get_db)) -> UserResponse: +def register_user_route( + payload: UserCreate, + request: Request, + db: Session = Depends(get_db), +) -> UserResponse: try: return UserResponse.model_validate(register_user(db, payload)) except Exception as exc: + logger.warning("user_register_failed", error=str(exc), exc_info=True) + write_exception_event( + service="user-service", + level="warning", + event="user_register_failed", + exc=exc, + trace_id=getattr(request.state, "trace_id", None), + source="service", + method=request.method, + path=request.url.path, + status_code=400, + ) raise HTTPException(status_code=400, detail=str(exc)) from exc @router.post("/users/login", response_model=LoginResponse) -def login_user_route(payload: UserLogin, db: Session = Depends(get_db)) -> LoginResponse: +def login_user_route( + payload: UserLogin, + request: Request, + db: Session = Depends(get_db), +) -> LoginResponse: try: user = login_user(db, payload) return LoginResponse(success=True, user_id=user.id) except ValueError as exc: + logger.warning("user_login_failed", error=str(exc), exc_info=True) + write_exception_event( + service="user-service", + level="warning", + event="user_login_failed", + exc=exc, + trace_id=getattr(request.state, "trace_id", None), + source="service", + method=request.method, + path=request.url.path, + status_code=401, + ) raise HTTPException(status_code=401, detail=str(exc)) from exc @router.get("/users/{user_id}", response_model=UserResponse) -def get_user_route(user_id: int, db: Session = Depends(get_db)) -> UserResponse: +def get_user_route(user_id: int, request: Request, db: Session = Depends(get_db)) -> UserResponse: try: return UserResponse.model_validate(get_user(db, user_id)) except ValueError as exc: + logger.warning("user_not_found", error=str(exc), exc_info=True) + write_exception_event( + service="user-service", + level="warning", + event="user_lookup_failed", + exc=exc, + trace_id=getattr(request.state, "trace_id", None), + source="service", + method=request.method, + path=request.url.path, + status_code=404, + ) raise HTTPException(status_code=404, detail=str(exc)) from exc @router.get("/users/{user_id}/discount", response_model=UserDiscountResponse) -def get_discount_route(user_id: int, db: Session = Depends(get_db)) -> UserDiscountResponse: +def get_discount_route( + user_id: int, + request: Request, + db: Session = Depends(get_db), +) -> UserDiscountResponse: try: return UserDiscountResponse(user_id=user_id, discount_rate=get_vip_discount(db, user_id)) except Exception as exc: + logger.error("user_discount_failed", error=str(exc), exc_info=True) + write_exception_event( + service="user-service", + level="error", + event="user_discount_failed", + exc=exc, + trace_id=getattr(request.state, "trace_id", None), + source="service", + method=request.method, + path=request.url.path, + status_code=500, + ) raise HTTPException(status_code=500, detail=str(exc)) from exc diff --git a/ecommerce-platform/testsupport.py b/ecommerce-platform/testsupport.py index af38340..09b18ee 100644 --- a/ecommerce-platform/testsupport.py +++ b/ecommerce-platform/testsupport.py @@ -18,11 +18,15 @@ def prepare_test_database(tmp_path: Path) -> None: def clear_bug_flags() -> None: for name in ( "BUG_INDEX_ERROR", + "BUG_ORDER_COUPON_KEY", "BUG_RACE_CONDITION", + "BUG_INVENTORY_MISSING_ROW", + "BUG_INVENTORY_BROKEN_PRODUCT_ID", "BUG_FLOAT_PRECISION", + "BUG_PAYMENT_GATEWAY_KEY", "BUG_NULL_VIP", ): - os.environ[name] = "false" + os.environ[name] = "7" if name == "BUG_INVENTORY_BROKEN_PRODUCT_ID" else "false" def seed_basic_data(db) -> dict[str, int]: