Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 53 additions & 24 deletions AI/libs/core/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import sys
from typing import List, Dict, Optional
from typing import List, Dict, Optional, Tuple
from datetime import datetime, timedelta, timezone
import pandas as pd

Expand All @@ -22,18 +22,21 @@
MARKET_DB_NAME = "db" # 시세/원천 데이터 DB
REPORT_DB_NAME = "report_DB" # 리포트 저장 DB

# === (신규 전용) 필수 컬럼: inference 로그 → XAI 변환에 필요한 것만 강제 ===
REQUIRED_LOG_COLS = {
"ticker", "date", "action", "price",
"feature1", "feature2", "feature3",
"prob1", "prob2", "prob3"
# XAI evidence 구성에 꼭 필요한 신규 컬럼
"feature_name1", "feature_name2", "feature_name3",
"feature_score1", "feature_score2", "feature_score3",
# (원하면 로깅/모니터링용 확률도 계속 받되 필수는 아님)
}

def run_weekly_finder() -> List[str]:
"""
주간 종목 발굴(Finder)을 실행하고 결과(종목 리스트)를 반환합니다.
"""
print("--- [PIPELINE-STEP 1] Finder 모듈 실행 시작 ---")
# top_tickers = run_finder()
# top_tickers = run_finder() # TODO: 종목 선정 이슈 해결 후 사용
top_tickers = ["AAPL", "MSFT", "GOOGL"] # 임시 데이터
print("--- [PIPELINE-STEP 1] Finder 모듈 실행 완료 ---")
return top_tickers
Expand All @@ -51,8 +54,8 @@ def run_signal_transformer(tickers: List[str], db_name: str) -> pd.DataFrame:
print("[WARN] 빈 종목 리스트가 입력되어 Transformer를 건너뜁니다.")
return pd.DataFrame()

#end_date = _utcnow() # 한국 시간 기준 당일 종가까지 사용, 서버 사용시 주석 해제
end_date = datetime.strptime("2024-10-30", "%Y-%m-%d") #임시 고정 날짜
# end_date = _utcnow() # 서버 사용 시
end_date = datetime.strptime("2024-10-30", "%Y-%m-%d") # 임시 고정 날짜
start_date = end_date - timedelta(days=600)

all_ohlcv_df: List[pd.DataFrame] = []
Expand Down Expand Up @@ -92,19 +95,17 @@ def run_signal_transformer(tickers: List[str], db_name: str) -> pd.DataFrame:
print("[WARN] Transformer 결과 로그가 비어 있습니다.")
return pd.DataFrame()

# 필수 컬럼 검증
# === 신규 포맷 강제 체크 ===
missing_cols = REQUIRED_LOG_COLS - set(logs_df.columns)
if missing_cols:
print(f"[ERROR] 결정 로그에 필수 컬럼 누락: {sorted(missing_cols)}")
print(f"[ERROR] 결정 로그에 필수 컬럼 누락(신규 포맷 전용): {sorted(missing_cols)}")
return pd.DataFrame()

print("--- [PIPELINE-STEP 2] Transformer 모듈 실행 완료 ---")
return logs_df

# --- 안전 변환 유틸 ---
def _to_iso_date(v) -> str:
import pandas as pd
from datetime import datetime
try:
if isinstance(v, (pd.Timestamp, datetime)):
return v.strftime("%Y-%m-%d")
Expand All @@ -114,17 +115,30 @@ def _to_iso_date(v) -> str:

def _to_float(v, fallback=0.0) -> float:
try:
return float(v)
f = float(v)
if pd.isna(f):
return float(fallback)
return f
except Exception:
return float(fallback)

# --- XAI 리포트: 5-튜플(rows)로 반환 ---
from typing import List, Tuple

def run_xai_report(decision_log: pd.DataFrame) -> List[Tuple[str, str, float, str, str]]:
"""
save_reports_to_db()가 기대하는 형식:
rows = List[ (ticker, signal, price, date_str, report_text) ]
반환: List[(ticker, signal, price, date, report_text)]
XAI 포맷:
{
"ticker": "...",
"date": "YYYY-MM-DD",
"signal": "BUY|HOLD|SELL",
"price": float,
"evidence": [
{"feature_name": str, "contribution": float}, # 0~1 점수 권장
...
]
}
※ 신규 포맷 전용:
- feature_name1~3, feature_score1~3 필수
"""
print("--- [PIPELINE-STEP 3] XAI 리포트 생성 시작 ---")
api_key = os.environ.get("GROQ_API_KEY")
Expand All @@ -136,25 +150,42 @@ def run_xai_report(decision_log: pd.DataFrame) -> List[Tuple[str, str, float, st
print("[WARN] 비어있는 결정 로그가 입력되어 XAI 리포트를 생성하지 않습니다.")
return []

# 신규 포맷 강제(안전망)
for c in ["feature_name1","feature_name2","feature_name3",
"feature_score1","feature_score2","feature_score3"]:
if c not in decision_log.columns:
print(f"[ERROR] XAI: 신규 포맷 필수 컬럼 누락: {c}")
return []

rows: List[Tuple[str, str, float, str, str]] = []

for _, row in decision_log.iterrows():
ticker = str(row.get("ticker", "UNKNOWN"))
date_s = _to_iso_date(row.get("date", ""))
signal = str(row.get("action", ""))
signal = str(row.get("action", "")) # action -> signal
price = _to_float(row.get("price", 0.0))

# evidence 등은 DB에 안 넣는 설계로 보이므로 내부 호출에만 사용
# === 신규 포맷 전용 evidence ===
evidence: List[Dict[str, float]] = []
for i in (1, 2, 3):
name = row.get(f"feature_name{i}")
score = row.get(f"feature_score{i}")
# 이름/점수 모두 있어야 추가
if name is None or str(name).strip() == "":
continue
if score is None or pd.isna(score):
continue
evidence.append({
"feature_name": str(name),
"contribution": _to_float(score, 0.0) # 0~1 정규화 점수
})

decision_payload = {
"ticker": ticker,
"date": date_s,
"signal": signal,
"price": price,
"evidence": [
{"feature_name": str(row.get("feature1", "")), "contribution": _to_float(row.get("prob1", 0.0))},
{"feature_name": str(row.get("feature2", "")), "contribution": _to_float(row.get("prob2", 0.0))},
{"feature_name": str(row.get("feature3", "")), "contribution": _to_float(row.get("prob3", 0.0))},
],
"evidence": evidence,
}

try:
Expand All @@ -171,8 +202,6 @@ def run_xai_report(decision_log: pd.DataFrame) -> List[Tuple[str, str, float, st
return rows




def run_pipeline() -> Optional[List[str]]:
"""
전체 파이프라인(Finder -> Transformer -> XAI)을 실행합니다.
Expand Down
29 changes: 14 additions & 15 deletions AI/tests/quick_db_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,22 @@
# --- DB 연결 테스트 ------------------------------------------------------------
conn = None
try:
# db 설정이 dict면 키워드 인자로, 문자열(DSN)이면 그대로 사용
if isinstance(db_cfg, dict):
conn = psycopg2.connect(**db_cfg) # 예: {"host": "...", ...}
else:
conn = psycopg2.connect(dsn=str(db_cfg))

with conn:
with conn.cursor() as cur:
cur.execute("SELECT version();")
print("✅ 연결 성공:", cur.fetchone()[0])

cur.execute("SELECT current_database(), current_user;")
db, user = cur.fetchone()
print(f"ℹ️ DB/USER: {db} / {user}")
if isinstance(db_cfg, dict):
with psycopg2.connect(**db_cfg) as conn:
with conn.cursor() as cur:
cur.execute("SELECT version();")
print("✅ 연결 성공:", cur.fetchone()[0])
cur.execute("SELECT current_database(), current_user;")
db, user = cur.fetchone()
print(f"ℹ️ DB/USER: {db} / {user}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

특수 유니코드 문자 사용

ℹ️ (INFORMATION SOURCE) 문자는 일부 터미널 환경에서 렌더링 문제를 일으킬 수 있습니다. 일반적인 ASCII 문자(예: [INFO])를 사용하는 것을 권장합니다.

원하신다면 다음과 같이 수정할 수 있습니다:

-                print(f"ℹ️ DB/USER: {db} / {user}")
+                print(f"[INFO] DB/USER: {db} / {user}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
print(f"ℹ️ DB/USER: {db} / {user}")
print(f"[INFO] DB/USER: {db} / {user}")
🧰 Tools
🪛 Ruff (0.14.3)

37-37: String contains ambiguous (INFORMATION SOURCE). Did you mean i (LATIN SMALL LETTER I)?

(RUF001)

🤖 Prompt for AI Agents
In AI/tests/quick_db_check.py around line 37 replace the special Unicode
information emoji in the print statement with a plain ASCII tag to avoid
terminal rendering issues; change the output to use a standard marker such as
"[INFO] DB/USER: {db} / {user}" (or better, switch to Python's logging.info for
structured logging) so terminals that don't support the ℹ️ glyph won't break.

else:
with psycopg2.connect(dsn=str(db_cfg)) as conn:
with conn.cursor() as cur:
...
Comment on lines +30 to +41
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Context manager의 변수 스코프 문제 및 DSN 경로 미완성

다음과 같은 심각한 문제들이 있습니다:

  1. 변수 스코프 문제: Line 31과 39의 with psycopg2.connect(...) as conn: 구문에서 conn은 with 블록의 지역 변수입니다. Line 28에서 선언한 외부 스코프의 conn = None을 덮어쓰지 않기 때문에, finally 블록(line 45-47)에서 conn은 항상 None입니다. 결과적으로 finally 블록의 conn.close()는 절대 실행되지 않습니다.

  2. DSN 경로 미완성: Lines 39-41의 DSN string 설정 경로는 ...만 실행하여 실제로 아무런 테스트도 수행하지 않습니다.

다음 diff를 적용하여 문제를 수정하세요:

-conn = None
 try:
     if isinstance(db_cfg, dict):
-        with psycopg2.connect(**db_cfg) as conn:
-            with conn.cursor() as cur:
-                cur.execute("SELECT version();")
-                print("✅ 연결 성공:", cur.fetchone()[0])
-                cur.execute("SELECT current_database(), current_user;")
-                db, user = cur.fetchone()
-                print(f"ℹ️ DB/USER: {db} / {user}")
+        conn = psycopg2.connect(**db_cfg)
+        with conn.cursor() as cur:
+            cur.execute("SELECT version();")
+            print("✅ 연결 성공:", cur.fetchone()[0])
+            cur.execute("SELECT current_database(), current_user;")
+            db, user = cur.fetchone()
+            print(f"ℹ️ DB/USER: {db} / {user}")
     else:
-        with psycopg2.connect(dsn=str(db_cfg)) as conn:
-            with conn.cursor() as cur:
-                ...
+        conn = psycopg2.connect(dsn=str(db_cfg))
+        with conn.cursor() as cur:
+            cur.execute("SELECT version();")
+            print("✅ 연결 성공:", cur.fetchone()[0])
+            cur.execute("SELECT current_database(), current_user;")
+            db, user = cur.fetchone()
+            print(f"ℹ️ DB/USER: {db} / {user}")
 except Exception as e:
     print("❌ 연결 실패:", repr(e))
 finally:
     if conn:
         conn.close()
         print("DB 연결 종료")

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.14.3)

37-37: String contains ambiguous (INFORMATION SOURCE). Did you mean i (LATIN SMALL LETTER I)?

(RUF001)

🤖 Prompt for AI Agents
In AI/tests/quick_db_check.py around lines 30 to 41, the with-statement usage
shadows the outer conn variable so the finally block never sees a live
connection and the DSN branch is left as a placeholder; fix by not shadowing
conn (assign the result of psycopg2.connect(...) to the outer conn variable
inside a try block instead of using "with ... as conn"), run the same test
queries in the DSN branch as in the dict branch, and keep a finally block that
checks "if conn is not None: conn.close()" to ensure the connection is closed on
all paths.

except Exception as e:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

광범위한 예외 처리

모든 Exception을 catch하는 것은 예상치 못한 에러를 숨길 수 있습니다. DB 연결 테스트 스크립트에서는 허용될 수 있지만, 더 구체적인 예외 타입(예: psycopg2.Error)을 catch하는 것이 좋습니다.

더 구체적인 예외 처리를 원하신다면:

-except Exception as e:
+except psycopg2.Error as e:
     print("❌ 연결 실패:", repr(e))

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.14.3)

42-42: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In AI/tests/quick_db_check.py around line 42, the code catches a broad Exception
which can hide unexpected errors; replace the broad except with a
database-specific exception (e.g., except psycopg2.Error as e or the DB client’s
specific error class), ensure that the relevant DB exception class is imported
at top, and optionally keep a narrow fallback that re-raises unexpected
exceptions after logging so only DB-related errors are handled here.

print("❌ 연결 실패:", repr(e))
finally:
if conn is not None:
if conn:
conn.close()
print("DB 연결 종료")

Loading