diff --git a/README.md b/README.md index 69e39d4..09e22dc 100644 --- a/README.md +++ b/README.md @@ -201,6 +201,79 @@ Ciężki trening jest odseparowany od CI, ponieważ wymaga danych z Git LFS, więcej czasu i pamięci oraz tworzy duże modele. Z tego samego powodu AutoML nie jest uruchamiany przy każdym Pull Requeście i pozostaje ręczną opcją treningu. +## Serwowanie modelu + +Kod współdzielony między interfejsami serwującymi (Streamlit i FastAPI) znajduje +się w `src/credit_scoring/serving/`: + +```text +src/credit_scoring/serving/schema.py Definicje cech i słowniki kodowania +src/credit_scoring/serving/inference.py Wczytywanie modelu, budowa wektora cech, predykcja +src/credit_scoring/serving/prediction_logger.py Logowanie predykcji + weryfikacja co 10. predykcję +src/credit_scoring/serving/api.py Aplikacja FastAPI +``` + +### Aplikacja Streamlit + +```powershell +streamlit run app.py +``` + +### API FastAPI + +```powershell +uvicorn credit_scoring.serving.api:app --reload --app-dir src +``` + +Dokumentacja interaktywna (Swagger UI): http://127.0.0.1:8000/docs + +Najważniejsze endpointy: + +| Metoda | Endpoint | Opis | +|--------|------------------------|---------------------------------------------------| +| GET | `/health` | Status API i informacja, czy model jest wczytany | +| GET | `/model/metrics` | Zawartość `data/08_reporting/metrics.json` | +| POST | `/predict` | Predykcja dla jednego klienta | +| POST | `/predict/batch` | Predykcja dla listy klientów | +| GET | `/predictions/stats` | Licznik predykcji i wynik ostatniej weryfikacji | + +Przykład żądania `curl`: + +```bash +curl -X POST http://127.0.0.1:8000/predict \ + -H "Content-Type: application/json" \ + -d '{ + "age": 35, "occupation": "Engineer", "annual_income": 50000, + "monthly_salary": 4000, "monthly_balance": 300, "amount_invested": 100, + "num_bank_accounts": 4, "num_credit_card": 4, "num_of_loan": 2, + "interest_rate": 12, "outstanding_debt": 1200, "credit_utilization": 32, + "total_emi": 100, "credit_mix": "Standard", "credit_history_age_months": 120, + "delay_from_due_date": 10, "num_delayed_payment": 5, "changed_credit_limit": 8, + "num_credit_inquiries": 4, "payment_min": "No", "loan_types": ["Personal Loan"], + "payment_behaviour": "Low_spent_Small_value_payments" + }' +``` + +### Logowanie predykcji i weryfikacja co 10. predykcję + +Każda predykcja — niezależnie czy wykonana ze Streamlit, czy z FastAPI — +jest logowana przez `PredictionLogger` do: + +```text +data/09_predictions/predictions_log.jsonl jedna linia JSON na predykcję +data/09_predictions/verification_log.jsonl jedna linia JSON co 10. predykcję +``` + +Co 10. predykcję (`verify_every=10`) automatycznie wykonywana jest weryfikacja +ostatniej partii: sprawdzenie zgodności wektora cech ze schematem modelu, brak +wartości NaN/Inf, sumowanie się prawdopodobieństw do 1.0, średnia ufność modelu +oraz rozkład przewidzianych klas. Wynik (`OK`/`WARNING`) jest zapisywany do +`verification_log.jsonl` oraz logowany przez standardowy moduł `logging` +(logger `credit_scoring.predictions`). Licznik predykcji jest odtwarzany z +liczby linii już zapisanych w pliku logu, dzięki czemu przetrwa restart +aplikacji. Pliki `*.jsonl` w `data/09_predictions/` nie są commitowane do +repozytorium. + ## Do zrobienia Kolejne etapy projektu: diff --git a/app.py b/app.py index 6823ebb..28ae629 100644 --- a/app.py +++ b/app.py @@ -1,8 +1,8 @@ from __future__ import annotations import json -import pickle import sys +import uuid from pathlib import Path import numpy as np @@ -11,105 +11,40 @@ # Ścieżki projektu PROJECT_ROOT = Path(__file__).resolve().parent -MODEL_PATH = PROJECT_ROOT / "data" / "06_models" / "baseline_random_forest.pkl" METRICS_PATH = PROJECT_ROOT / "data" / "08_reporting" / "metrics.json" - sys.path.insert(0, str(PROJECT_ROOT / "src")) +# Cechy modelu, wczytywanie modelu i logowanie predykcji +# wspólne dla Streamlit i API FastAPI +from credit_scoring.serving import inference # noqa: E402 +from credit_scoring.serving.prediction_logger import prediction_logger # noqa: E402 +from credit_scoring.serving.schema import ( # noqa: E402 + CREDIT_MIX_MAP, + LOAN_TYPES, + MODEL_FEATURES, + OCCUPATIONS, + PAYMENT_BEHAVIOURS, + PAYMENT_MIN_MAP, + TARGET_LABELS, + TARGET_PL, +) -# Definicje cech -TARGET_LABELS = {0: "Poor", 1: "Standard", 2: "Good"} -TARGET_PL = {0: "Niska (Poor)", 1: "Średnia (Standard)", 2: "Dobra (Good)"} +MODEL_PATH = inference.MODEL_PATH DEFAULT_COLOR = {0: "#35A4E5", 1: "#8378FF"} TARGET_COLOR = {0: "#e53935", 1: "#fb8c00", 2: "#43a047"} -LOAN_TYPES = [ - "Auto Loan", - "Credit-Builder Loan", - "Debt Consolidation Loan", - "Home Equity Loan", - "Mortgage Loan", - "Not Specified", - "Payday Loan", - "Personal Loan", - "Student Loan", -] - -PAYMENT_BEHAVIOURS = [ - "High_spent_Large_value_payments", - "High_spent_Medium_value_payments", - "High_spent_Small_value_payments", - "Low_spent_Large_value_payments", - "Low_spent_Medium_value_payments", - "Low_spent_Small_value_payments", -] - -OCCUPATIONS = [ - "Architect", - "Developer", - "Doctor", - "Engineer", - "Entrepreneur", - "Journalist", - "Lawyer", - "Manager", - "Mechanic", - "Media_Manager", - "Musician", - "Scientist", - "Teacher", - "Writer", -] - -CREDIT_MIX_MAP = {"Bad": 0, "Standard": 1, "Good": 2} -PAYMENT_MIN_MAP = {"No": 0, "Yes": 1} - -# Kolejność cech wymagana przez model — identyczna jak przy treningu. -MODEL_FEATURES = [ - "Age", - "Annual_Income", - "Monthly_Inhand_Salary", - "Num_Bank_Accounts", - "Num_Credit_Card", - "Interest_Rate", - "Num_of_Loan", - "Delay_from_due_date", - "Num_of_Delayed_Payment", - "Changed_Credit_Limit", - "Num_Credit_Inquiries", - "Credit_Mix", - "Outstanding_Debt", - "Credit_Utilization_Ratio", - "Credit_History_Age", - "Payment_of_Min_Amount", - "Total_EMI_per_month", - "Amount_invested_monthly", - "Monthly_Balance", - *[f"LoanType_{lt}" for lt in LOAN_TYPES], - *[f"PayBeh_{pb}" for pb in PAYMENT_BEHAVIOURS], - *[f"Occupation_{oc}" for oc in OCCUPATIONS], -] - # Ładowanie modelu i metryk def _is_lfs_pointer(path: Path) -> bool: """Wykrywa, czy plik to wskaźnik Git LFS, a nie faktyczny model.""" - try: - if path.stat().st_size > 5000: # ograniczenie pkl (setki MB) - return False - with open(path, "rb") as handle: - head = handle.read(200) - return b"git-lfs" in head - except OSError: - return False + return inference.is_lfs_pointer(path) @st.cache_resource(show_spinner="Wczytywanie modelu...") def load_model(path_str: str): """Wczytuje wytrenowany model z pliku pickle (cache na czas sesji).""" - with open(path_str, "rb") as handle: - return pickle.load(handle) + return inference.load_model(path_str) @st.cache_data(show_spinner=False) @@ -121,51 +56,56 @@ def load_metrics(path_str: str) -> dict: return {} -# Budowa wektora cech z formularza def build_feature_row(inputs: dict) -> pd.DataFrame: - """Tworzy pojedynczy wiersz cech w kolejności wymaganej przez model.""" - row = {feature: 0 for feature in MODEL_FEATURES} - - # Cechy numeryczne i porządkowe - row["Age"] = inputs["age"] - row["Annual_Income"] = inputs["annual_income"] - row["Monthly_Inhand_Salary"] = inputs["monthly_salary"] - row["Num_Bank_Accounts"] = inputs["num_bank_accounts"] - row["Num_Credit_Card"] = inputs["num_credit_card"] - row["Interest_Rate"] = inputs["interest_rate"] - row["Num_of_Loan"] = inputs["num_of_loan"] - row["Delay_from_due_date"] = inputs["delay_from_due_date"] - row["Num_of_Delayed_Payment"] = inputs["num_delayed_payment"] - row["Changed_Credit_Limit"] = inputs["changed_credit_limit"] - row["Num_Credit_Inquiries"] = inputs["num_credit_inquiries"] - row["Credit_Mix"] = CREDIT_MIX_MAP[inputs["credit_mix"]] - row["Outstanding_Debt"] = inputs["outstanding_debt"] - row["Credit_Utilization_Ratio"] = inputs["credit_utilization"] - row["Credit_History_Age"] = inputs["credit_history_age_months"] - row["Payment_of_Min_Amount"] = PAYMENT_MIN_MAP[inputs["payment_min"]] - row["Total_EMI_per_month"] = inputs["total_emi"] - row["Amount_invested_monthly"] = inputs["amount_invested"] - row["Monthly_Balance"] = inputs["monthly_balance"] - - # Typy kredytów (multiselect -> one-hot) - for loan_type in inputs["loan_types"]: - row[f"LoanType_{loan_type}"] = 1 - - # Zachowanie płatnicze (one-hot) - row[f"PayBeh_{inputs['payment_behaviour']}"] = 1 - - # Zawód (one-hot) - if inputs["occupation"] in OCCUPATIONS: - row[f"Occupation_{inputs['occupation']}"] = 1 - - return pd.DataFrame([row])[MODEL_FEATURES] + """Tworzy pojedynczy wiersz cech w kolejności wymaganej przez model. + Nakładka na `credit_scoring.serving.inference.build_feature_row` + """ + return inference.build_feature_row(inputs) # Predykcja def predict(model, features: pd.DataFrame) -> tuple[np.ndarray, np.ndarray | None]: - preds = model.predict(features) - proba = model.predict_proba(features) if hasattr(model, "predict_proba") else None - return preds, proba + return inference.predict(model, features) + + +def log_predictions_and_show_verification( + model, + features_df: pd.DataFrame, + preds: np.ndarray, + proba: np.ndarray | None, + source: str, +) -> None: + """Logowanie predykcji i weryfikacja co 10. predykcję""" + classes = getattr(model, "classes_", [0, 1, 2]) + last_record = None + + for row_idx in range(len(features_df)): + probabilities = None + if proba is not None: + probabilities = { + TARGET_LABELS.get(int(c), str(c)): float(proba[row_idx][idx]) + for idx, c in enumerate(classes) + } + last_record = prediction_logger.log_prediction( + features=features_df.iloc[row_idx].to_dict(), + predicted_class=int(preds[row_idx]), + probabilities=probabilities, + source=source, + request_id=str(uuid.uuid4()), + ) + + if last_record and "verification" in last_record: + verification = last_record["verification"] + avg_conf = verification["avg_confidence"] + summary = ( + f"Zweryfikowano ostatnie {verification['n_checked']} predykcji " + f"(do #{verification['batch_end_index']}) — status: **{verification['status']}**" + + (f", śr. ufność modelu: {avg_conf:.2f}." if avg_conf is not None else ".") + ) + if verification["status"] == "OK": + st.success(f"🔍 {summary}") + else: + st.warning(f"🔍 {summary} Uwagi: " + "; ".join(verification["issues"])) # UI @@ -461,6 +401,7 @@ def render_result(pred: int, proba: np.ndarray | None) -> None: } features = build_feature_row(inputs) preds, proba = predict(model, features) + log_predictions_and_show_verification(model, features, preds, proba, source="streamlit_single") st.divider() render_result(int(preds[0]), proba) @@ -529,6 +470,7 @@ def render_result(pred: int, proba: np.ndarray | None) -> None: if st.button("Przewiduj dla całego pliku", type="primary"): preds, proba = predict(model, model_input) + log_predictions_and_show_verification(model, model_input, preds, proba, source="streamlit_batch") result = raw_df.copy() result["Predykcja"] = [TARGET_LABELS.get(int(p), str(p)) for p in preds] if proba is not None: diff --git a/data/09_predictions/.gitkeep b/data/09_predictions/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/requirements.txt b/requirements.txt index bde55c1..f76d1e6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,4 +14,7 @@ missingno>=0.5 streamlit>=1.38 mlflow>=2.14 +fastapi>=0.115 +uvicorn[standard]>=0.32 + autogluon.tabular[lightgbm,catboost,xgboost,fastai,ray]==1.5.0 \ No newline at end of file diff --git a/src/credit_scoring/serving/__init__.py b/src/credit_scoring/serving/__init__.py new file mode 100644 index 0000000..a04ef22 --- /dev/null +++ b/src/credit_scoring/serving/__init__.py @@ -0,0 +1,11 @@ +"""Warstwa serwowania modelu + +Ten pakiet zawiera kod współdzielony między interfejsem Streamlit (``app.py``) +oraz API REST (``credit_scoring.serving.api``): + +- :mod:`credit_scoring.serving.schema` — definicje cech i słowniki kodowania, +- :mod:`credit_scoring.serving.inference` — wczytywanie modelu i predykcja, +- :mod:`credit_scoring.serving.prediction_logger` — logowanie predykcji + oraz weryfikacja co N-tą predykcję, +- :mod:`credit_scoring.serving.api` — aplikacja FastAPI. +""" diff --git a/src/credit_scoring/serving/api.py b/src/credit_scoring/serving/api.py new file mode 100644 index 0000000..cfb1a5b --- /dev/null +++ b/src/credit_scoring/serving/api.py @@ -0,0 +1,204 @@ +"""API FastAPI dla modelu oceny kredytowej. + +Uruchomienie lokalnie (z katalogu głównego projektu): + + uvicorn credit_scoring.serving.api:app --reload --app-dir src + +Dokumentacja interaktywna (Swagger UI) będzie dostępna pod: + + http://127.0.0.1:8000/docs + +Endpointy: + GET /health — czy API żyje i czy model jest wczytany + GET /model/metrics — metryki modelu zapisane przez pipeline `modeling` + POST /predict — predykcja dla jednego klienta + POST /predict/batch — predykcja dla listy klientów + GET /predictions/stats — statystyki logowania predykcji (licznik, ostatnia weryfikacja) +""" +from __future__ import annotations + +import json +import uuid +from contextlib import asynccontextmanager +from typing import Any, Literal + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel, Field + +from credit_scoring.serving import inference +from credit_scoring.serving.prediction_logger import prediction_logger +from credit_scoring.serving.schema import ( + LOAN_TYPES, + OCCUPATIONS, + PAYMENT_BEHAVIOURS, + TARGET_LABELS, +) + +# Stan aplikacji (model wczytywany raz, przy starcie) ------------------------ + +_app_state: dict[str, Any] = {"model": None, "model_version": None} + + +@asynccontextmanager +async def lifespan(app: FastAPI): + try: + _app_state["model"] = inference.load_model() + # Prosty "identyfikator wersji" modelu — rozmiar + czas modyfikacji pliku. + stat = inference.MODEL_PATH.stat() + _app_state["model_version"] = f"{stat.st_size}-{int(stat.st_mtime)}" + except inference.ModelNotAvailableError as exc: + # Aplikacja wstaje, ale /health i /predict zgłoszą błąd 503 z opisem. + _app_state["model"] = None + _app_state["model_error"] = str(exc) + yield + + +app = FastAPI( + title="Credit Scoring API", + description="API do predykcji oceny kredytowej (projekt ASI / Kedro).", + version="1.0.0", + lifespan=lifespan, +) + + +def _get_model() -> Any: + model = _app_state.get("model") + if model is None: + raise HTTPException( + status_code=503, + detail=_app_state.get( + "model_error", "Model nie jest wczytany. Sprawdź /health." + ), + ) + return model + + +# Schematy żądań / odpowiedzi ------------------------------------------------- + + +class CreditApplicationInput(BaseModel): + """Dane wejściowe dla jednego klienta — odpowiadają polom formularza w app.py.""" + + age: int = Field(ge=18, le=100, example=35) + occupation: Literal[tuple(OCCUPATIONS)] = Field(example="Engineer") # type: ignore[valid-type] + annual_income: float = Field(ge=0, example=50000.0) + monthly_salary: float = Field(ge=0, example=4000.0) + monthly_balance: float = Field(ge=0, example=300.0) + amount_invested: float = Field(ge=0, example=100.0) + num_bank_accounts: int = Field(ge=0, le=20, example=4) + num_credit_card: int = Field(ge=0, le=20, example=4) + num_of_loan: int = Field(ge=0, le=20, example=2) + interest_rate: float = Field(ge=0, le=50, example=12.0) + outstanding_debt: float = Field(ge=0, example=1200.0) + credit_utilization: float = Field(ge=0, le=100, example=32.0) + total_emi: float = Field(ge=0, example=100.0) + credit_mix: Literal["Bad", "Standard", "Good"] = Field(example="Standard") + credit_history_age_months: int = Field(ge=0, example=120) + delay_from_due_date: int = Field(ge=0, example=10) + num_delayed_payment: int = Field(ge=0, example=5) + changed_credit_limit: float = Field(example=8.0) + num_credit_inquiries: int = Field(ge=0, example=4) + payment_min: Literal["No", "Yes"] = Field(example="No") + loan_types: list[Literal[tuple(LOAN_TYPES)]] = Field(default_factory=lambda: ["Personal Loan"]) # type: ignore[valid-type] + payment_behaviour: Literal[tuple(PAYMENT_BEHAVIOURS)] = Field( # type: ignore[valid-type] + example="Low_spent_Small_value_payments" + ) + + +class PredictionResponse(BaseModel): + request_id: str + prediction_index: int + predicted_class: int + predicted_label: str + probabilities: dict[str, float] | None = None + verification_triggered: bool = False + + +class BatchPredictionResponse(BaseModel): + results: list[PredictionResponse] + + +class PredictionStats(BaseModel): + total_predictions: int + verify_every: int + predictions_left_to_next_verification: int + last_verification: dict[str, Any] | None = None + + +# Funkcja pomocnicza: jedna predykcja + logowanie ---------------------------- + + +def _predict_one(payload: CreditApplicationInput, source: str) -> PredictionResponse: + model = _get_model() + inputs = payload.model_dump() + + features_df = inference.build_feature_row(inputs) + preds, proba = inference.predict(model, features_df) + + predicted_class = int(preds[0]) + probabilities = None + if proba is not None: + classes = getattr(model, "classes_", [0, 1, 2]) + probabilities = { + TARGET_LABELS.get(int(c), str(c)): float(proba[0][idx]) + for idx, c in enumerate(classes) + } + + request_id = str(uuid.uuid4()) + record = prediction_logger.log_prediction( + features=features_df.iloc[0].to_dict(), + predicted_class=predicted_class, + probabilities=probabilities, + source=source, + model_version=_app_state.get("model_version"), + request_id=request_id, + ) + + return PredictionResponse( + request_id=request_id, + prediction_index=record["prediction_index"], + predicted_class=predicted_class, + predicted_label=TARGET_LABELS.get(predicted_class, str(predicted_class)), + probabilities=probabilities, + verification_triggered="verification" in record, + ) + + +# Endpointy -------------------------------------------------------------- + + +@app.get("/health") +def health() -> dict[str, Any]: + model_ok = _app_state.get("model") is not None + return { + "status": "ok" if model_ok else "degraded", + "model_loaded": model_ok, + "model_version": _app_state.get("model_version"), + "detail": None if model_ok else _app_state.get("model_error"), + } + + +@app.get("/model/metrics") +def model_metrics() -> dict[str, Any]: + if not inference.METRICS_PATH.exists(): + raise HTTPException(status_code=404, detail="Plik metrics.json nie istnieje.") + with open(inference.METRICS_PATH, "r", encoding="utf-8") as handle: + return json.load(handle) + + +@app.post("/predict", response_model=PredictionResponse) +def predict_single(payload: CreditApplicationInput) -> PredictionResponse: + return _predict_one(payload, source="api") + + +@app.post("/predict/batch", response_model=BatchPredictionResponse) +def predict_batch(payloads: list[CreditApplicationInput]) -> BatchPredictionResponse: + if not payloads: + raise HTTPException(status_code=400, detail="Lista wejściowa jest pusta.") + results = [_predict_one(item, source="api_batch") for item in payloads] + return BatchPredictionResponse(results=results) + + +@app.get("/predictions/stats", response_model=PredictionStats) +def predictions_stats() -> PredictionStats: + return PredictionStats(**prediction_logger.stats()) diff --git a/src/credit_scoring/serving/inference.py b/src/credit_scoring/serving/inference.py new file mode 100644 index 0000000..1524785 --- /dev/null +++ b/src/credit_scoring/serving/inference.py @@ -0,0 +1,106 @@ +"""Wczytywanie modelu oraz budowa wektora cech / predykcja.""" +from __future__ import annotations + +import pickle +from pathlib import Path +from typing import Any + +import numpy as np +import pandas as pd + +from credit_scoring.serving.schema import ( + CREDIT_MIX_MAP, + MODEL_FEATURES, + PAYMENT_MIN_MAP, +) + +# Korzeń projektu Kedro: src/credit_scoring/serving/inference.py -> 3 poziomy wyżej +PROJECT_ROOT = Path(__file__).resolve().parents[3] +MODEL_PATH = PROJECT_ROOT / "data" / "06_models" / "baseline_random_forest.pkl" +METRICS_PATH = PROJECT_ROOT / "data" / "08_reporting" / "metrics.json" + + +class ModelNotAvailableError(RuntimeError): + """Zgłaszany, gdy plik modelu nie istnieje lub jest jedynie wskaźnikiem Git LFS.""" + + +def is_lfs_pointer(path: Path) -> bool: + """Wykrywa, czy plik to wskaźnik Git LFS, a nie faktyczny model.""" + try: + if path.stat().st_size > 5000: # ograniczenie pkl (setki MB) + return False + with open(path, "rb") as handle: + head = handle.read(200) + return b"git-lfs" in head + except OSError: + return False + + +def load_model(path: Path | str = MODEL_PATH) -> Any: + """Wczytuje wytrenowany model z pliku pickle. + + Nie używa cache'owania samodzielnie — w Streamlit owijamy tę funkcję + w `@st.cache_resource`, a w FastAPI model wczytujemy raz przy starcie + aplikacji (zob. `credit_scoring.serving.api`). + """ + path = Path(path) + if not path.exists(): + raise ModelNotAvailableError(f"Nie znaleziono pliku modelu: {path}") + if is_lfs_pointer(path): + raise ModelNotAvailableError( + f"Plik modelu '{path}' to tylko wskaźnik Git LFS. " + "Uruchom `git lfs pull` lub `kedro run --pipeline=modeling`." + ) + with open(path, "rb") as handle: + return pickle.load(handle) + + +def build_feature_row(inputs: dict) -> pd.DataFrame: + """Tworzy pojedynczy wiersz cech w kolejności wymaganej przez model.""" + row = {feature: 0 for feature in MODEL_FEATURES} + + # Cechy numeryczne i porządkowe + row["Age"] = inputs["age"] + row["Annual_Income"] = inputs["annual_income"] + row["Monthly_Inhand_Salary"] = inputs["monthly_salary"] + row["Num_Bank_Accounts"] = inputs["num_bank_accounts"] + row["Num_Credit_Card"] = inputs["num_credit_card"] + row["Interest_Rate"] = inputs["interest_rate"] + row["Num_of_Loan"] = inputs["num_of_loan"] + row["Delay_from_due_date"] = inputs["delay_from_due_date"] + row["Num_of_Delayed_Payment"] = inputs["num_delayed_payment"] + row["Changed_Credit_Limit"] = inputs["changed_credit_limit"] + row["Num_Credit_Inquiries"] = inputs["num_credit_inquiries"] + row["Credit_Mix"] = CREDIT_MIX_MAP[inputs["credit_mix"]] + row["Outstanding_Debt"] = inputs["outstanding_debt"] + row["Credit_Utilization_Ratio"] = inputs["credit_utilization"] + row["Credit_History_Age"] = inputs["credit_history_age_months"] + row["Payment_of_Min_Amount"] = PAYMENT_MIN_MAP[inputs["payment_min"]] + row["Total_EMI_per_month"] = inputs["total_emi"] + row["Amount_invested_monthly"] = inputs["amount_invested"] + row["Monthly_Balance"] = inputs["monthly_balance"] + + # Typy kredytów (multiselect -> one-hot) + for loan_type in inputs["loan_types"]: + row[f"LoanType_{loan_type}"] = 1 + + # Zachowanie płatnicze (one-hot) + row[f"PayBeh_{inputs['payment_behaviour']}"] = 1 + + # Zawód (one-hot) + occupation_keys = { + key.split("Occupation_", 1)[1] + for key in MODEL_FEATURES + if key.startswith("Occupation_") + } + if inputs["occupation"] in occupation_keys: + row[f"Occupation_{inputs['occupation']}"] = 1 + + return pd.DataFrame([row])[MODEL_FEATURES] + + +def predict(model: Any, features: pd.DataFrame) -> tuple[np.ndarray, np.ndarray | None]: + """Wykonuje predykcję klasy oraz (jeśli dostępne) prawdopodobieństw.""" + preds = model.predict(features) + proba = model.predict_proba(features) if hasattr(model, "predict_proba") else None + return preds, proba diff --git a/src/credit_scoring/serving/prediction_logger.py b/src/credit_scoring/serving/prediction_logger.py new file mode 100644 index 0000000..1309e40 --- /dev/null +++ b/src/credit_scoring/serving/prediction_logger.py @@ -0,0 +1,264 @@ +"""Logowanie predykcji oraz okresowa weryfikacja (co N-tą predykcję). + +Moduł realizuje dwa zadania, niezależnie od tego, czy predykcja pochodzi +z interfejsu Streamlit (`app.py`) czy z API FastAPI +(`credit_scoring.serving.api`): + +1. Logowanie predykcji — każda predykcja (wejście, wynik, prawdopodobieństwa, + znacznik czasu, źródło) jest dopisywana jako jedna linia JSON do pliku + `data/09_predictions/predictions_log.jsonl`. + +2. **Weryfikacja co N-tą predykcję** (domyślnie N=10) — po zalogowaniu + N-tej, 2N-tej, 3N-tej... predykcji moduł automatycznie: + - sprawdza integralność ostatnich N wpisów (poprawność wektora cech, + sumowanie się prawdopodobieństw do 1.0, brak NaN/Inf), + - wylicza statystyki (średnia ufność modelu, rozkład klas w danej partii), + - zapisuje wynik takiej weryfikacji do + `data/09_predictions/verification_log.jsonl`, + - loguje wynik (OK / WARNING) standardowym modułem `logging`. +""" +from __future__ import annotations + +import json +import logging +import math +import threading +from collections import Counter +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from credit_scoring.serving.schema import MODEL_FEATURES, TARGET_LABELS + +logger = logging.getLogger("credit_scoring.predictions") +if not logger.handlers: + # Domyślny handler konsolowy, gdyby aplikacja (Streamlit/FastAPI/uvicorn) + # nie skonfigurowała własnego logowania. + _handler = logging.StreamHandler() + _handler.setFormatter( + logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s") + ) + logger.addHandler(_handler) +logger.setLevel(logging.INFO) + +EXPECTED_FEATURE_SET = set(MODEL_FEATURES) + + +class PredictionLogger: + """Loguje predykcje i co `verify_every` z nich uruchamia weryfikację.""" + + def __init__( + self, + log_dir: Path | str, + verify_every: int = 10, + low_confidence_threshold: float = 0.4, + ) -> None: + self.log_dir = Path(log_dir) + self.log_dir.mkdir(parents=True, exist_ok=True) + + self.predictions_path = self.log_dir / "predictions_log.jsonl" + self.verification_path = self.log_dir / "verification_log.jsonl" + + self.verify_every = verify_every + self.low_confidence_threshold = low_confidence_threshold + + self._lock = threading.Lock() + # Odtwarzamy licznik z liczby już zapisanych predykcji, by przetrwał restart. + self._counter = self._count_existing_lines(self.predictions_path) + + @staticmethod + def _count_existing_lines(path: Path) -> int: + if not path.exists(): + return 0 + with open(path, "r", encoding="utf-8") as handle: + return sum(1 for _ in handle) + + @property + def total_predictions(self) -> int: + with self._lock: + return self._counter + + def log_prediction( + self, + features: dict[str, Any], + predicted_class: int, + probabilities: dict[str, float] | None = None, + source: str = "unknown", + model_version: str | None = None, + request_id: str | None = None, + ) -> dict[str, Any]: + """Zapisuje jedną predykcję i — co `verify_every` predykcji — weryfikuje partię. + + Zwraca zapisany rekord (zawiera m.in. numer predykcji `prediction_index` + oraz, jeśli akurat wypadła weryfikacja, pole `verification`). + """ + confidence = max(probabilities.values()) if probabilities else None + + record = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "request_id": request_id, + "source": source, + "model_version": model_version, + "features": features, + "predicted_class": int(predicted_class), + "predicted_label": TARGET_LABELS.get(int(predicted_class), str(predicted_class)), + "probabilities": probabilities, + "confidence": confidence, + } + + with self._lock: + self._counter += 1 + record["prediction_index"] = self._counter + self._append_jsonl(self.predictions_path, record) + logger.info( + "Predykcja #%d (source=%s): klasa=%s, ufność=%s", + record["prediction_index"], + source, + record["predicted_label"], + f"{confidence:.3f}" if confidence is not None else "n/a", + ) + + verification_result = None + if self._counter % self.verify_every == 0: + batch = self._read_last_n_records(self.predictions_path, self.verify_every) + verification_result = self._verify_batch(batch) + self._append_jsonl(self.verification_path, verification_result) + self._log_verification(verification_result) + + if verification_result is not None: + record["verification"] = verification_result + return record + + @staticmethod + def _append_jsonl(path: Path, record: dict[str, Any]) -> None: + with open(path, "a", encoding="utf-8") as handle: + handle.write(json.dumps(record, ensure_ascii=False) + "\n") + + @staticmethod + def _read_last_n_records(path: Path, n: int) -> list[dict[str, Any]]: + if not path.exists(): + return [] + with open(path, "r", encoding="utf-8") as handle: + lines = handle.readlines() + tail = lines[-n:] if len(lines) >= n else lines + return [json.loads(line) for line in tail] + + def _verify_batch(self, batch: list[dict[str, Any]]) -> dict[str, Any]: + """Weryfikuje integralność i jakość ostatnich `verify_every` predykcji.""" + issues: list[str] = [] + + # 1) Zgodność wektora cech ze schematem modelu (wykrywa np. rozjazd + # pipeline'u przetwarzania danych względem modelu). + for rec in batch: + feature_keys = set(rec.get("features", {}).keys()) + if feature_keys and feature_keys != EXPECTED_FEATURE_SET: + missing = EXPECTED_FEATURE_SET - feature_keys + extra = feature_keys - EXPECTED_FEATURE_SET + issues.append( + f"prediction_index={rec.get('prediction_index')}: " + f"niezgodny schemat cech (brakuje={sorted(missing)[:5]}, " + f"nadmiarowe={sorted(extra)[:5]})" + ) + + # 2) NaN / Inf w cechach wejściowych. + for rec in batch: + for name, value in rec.get("features", {}).items(): + if isinstance(value, (int, float)) and ( + math.isnan(value) or math.isinf(value) + ): + issues.append( + f"prediction_index={rec.get('prediction_index')}: " + f"cecha '{name}' ma wartość NaN/Inf" + ) + + # 3) Prawdopodobieństwa sumują się do ~1.0. + for rec in batch: + proba = rec.get("probabilities") + if proba: + total = sum(proba.values()) + if not math.isclose(total, 1.0, abs_tol=1e-2): + issues.append( + f"prediction_index={rec.get('prediction_index')}: " + f"prawdopodobieństwa sumują się do {total:.4f}, a nie 1.0" + ) + + # 4) Średnia ufność modelu w partii — niska ufność może sygnalizować + # dryf danych względem zbioru treningowego. + confidences = [rec["confidence"] for rec in batch if rec.get("confidence") is not None] + avg_confidence = sum(confidences) / len(confidences) if confidences else None + low_confidence_count = sum( + 1 for c in confidences if c < self.low_confidence_threshold + ) + if avg_confidence is not None and avg_confidence < self.low_confidence_threshold: + issues.append( + f"średnia ufność w partii ({avg_confidence:.3f}) jest niższa " + f"od progu {self.low_confidence_threshold}" + ) + + # 5) Rozkład przewidywanych klas w partii (informacyjnie + heurystyka + # wykrywająca podejrzaną jednorodność predykcji). + class_counter = Counter(rec.get("predicted_label") for rec in batch) + if len(batch) >= self.verify_every and len(class_counter) == 1: + issues.append( + "wszystkie predykcje w partii należą do tej samej klasy — " + "sprawdź, czy dane wejściowe się nie powtarzają" + ) + + last_index = batch[-1]["prediction_index"] if batch else None + status = "WARNING" if issues else "OK" + + return { + "timestamp": datetime.now(timezone.utc).isoformat(), + "batch_end_index": last_index, + "n_checked": len(batch), + "status": status, + "avg_confidence": avg_confidence, + "low_confidence_count": low_confidence_count, + "class_distribution": dict(class_counter), + "issues": issues, + } + + @staticmethod + def _log_verification(result: dict[str, Any]) -> None: + level = logging.WARNING if result["status"] == "WARNING" else logging.INFO + logger.log( + level, + "Weryfikacja partii kończącej się na predykcji #%s: status=%s, " + "śr. ufność=%s, rozkład klas=%s%s", + result["batch_end_index"], + result["status"], + f"{result['avg_confidence']:.3f}" if result["avg_confidence"] is not None else "n/a", + result["class_distribution"], + f", problemy: {result['issues']}" if result["issues"] else "", + ) + + def stats(self) -> dict[str, Any]: + """Zwraca podsumowanie stanu logowania (do np. endpointu /predictions/stats).""" + with self._lock: + total = self._counter + last_verification = None + if self.verification_path.exists(): + with open(self.verification_path, "r", encoding="utf-8") as handle: + lines = handle.readlines() + if lines: + last_verification = json.loads(lines[-1]) + return { + "total_predictions": total, + "verify_every": self.verify_every, + "predictions_left_to_next_verification": ( + self.verify_every - (total % self.verify_every) + ) + % self.verify_every + or self.verify_every, + "last_verification": last_verification, + "predictions_log_path": str(self.predictions_path), + "verification_log_path": str(self.verification_path), + } + + +# Współdzielona instancja loggera — używana zarówno przez `app.py` (Streamlit), +# jak i przez `credit_scoring.serving.api` (FastAPI), dzięki czemu licznik +# predykcji jest spójny niezależnie od tego, skąd predykcja przyszła +# (o ile działają w tym samym procesie / tej samej maszynie — zob. docstring modułu). +DEFAULT_LOG_DIR = Path(__file__).resolve().parents[3] / "data" / "09_predictions" +prediction_logger = PredictionLogger(DEFAULT_LOG_DIR, verify_every=10) diff --git a/src/credit_scoring/serving/schema.py b/src/credit_scoring/serving/schema.py new file mode 100644 index 0000000..18de61f --- /dev/null +++ b/src/credit_scoring/serving/schema.py @@ -0,0 +1,76 @@ +"""Definicje cech i słowniki kodowania używane przy predykcji.""" +from __future__ import annotations + +TARGET_LABELS: dict[int, str] = {0: "Poor", 1: "Standard", 2: "Good"} +TARGET_PL: dict[int, str] = { + 0: "Niska (Poor)", + 1: "Średnia (Standard)", + 2: "Dobra (Good)", +} + +LOAN_TYPES: list[str] = [ + "Auto Loan", + "Credit-Builder Loan", + "Debt Consolidation Loan", + "Home Equity Loan", + "Mortgage Loan", + "Not Specified", + "Payday Loan", + "Personal Loan", + "Student Loan", +] + +PAYMENT_BEHAVIOURS: list[str] = [ + "High_spent_Large_value_payments", + "High_spent_Medium_value_payments", + "High_spent_Small_value_payments", + "Low_spent_Large_value_payments", + "Low_spent_Medium_value_payments", + "Low_spent_Small_value_payments", +] + +OCCUPATIONS: list[str] = [ + "Architect", + "Developer", + "Doctor", + "Engineer", + "Entrepreneur", + "Journalist", + "Lawyer", + "Manager", + "Mechanic", + "Media_Manager", + "Musician", + "Scientist", + "Teacher", + "Writer", +] + +CREDIT_MIX_MAP: dict[str, int] = {"Bad": 0, "Standard": 1, "Good": 2} +PAYMENT_MIN_MAP: dict[str, int] = {"No": 0, "Yes": 1} + +# Kolejność cech wymagana przez model — identyczna jak przy treningu. +MODEL_FEATURES: list[str] = [ + "Age", + "Annual_Income", + "Monthly_Inhand_Salary", + "Num_Bank_Accounts", + "Num_Credit_Card", + "Interest_Rate", + "Num_of_Loan", + "Delay_from_due_date", + "Num_of_Delayed_Payment", + "Changed_Credit_Limit", + "Num_Credit_Inquiries", + "Credit_Mix", + "Outstanding_Debt", + "Credit_Utilization_Ratio", + "Credit_History_Age", + "Payment_of_Min_Amount", + "Total_EMI_per_month", + "Amount_invested_monthly", + "Monthly_Balance", + *[f"LoanType_{lt}" for lt in LOAN_TYPES], + *[f"PayBeh_{pb}" for pb in PAYMENT_BEHAVIOURS], + *[f"Occupation_{oc}" for oc in OCCUPATIONS], +]