diff --git a/.gitignore b/.gitignore index 73d2efc2..16e7ffe9 100644 --- a/.gitignore +++ b/.gitignore @@ -4,7 +4,6 @@ .vscode/ *.iml .DS_Store - # 환경 변수 & 민감 정보 .env *.secret @@ -26,3 +25,5 @@ __pycache__/ .ipynb_checkpoints/ *.pyc /venv/ +/env +/.vs diff --git a/AI/configs/config.json b/AI/configs/config.json index 36e05aff..c66eaa90 100644 --- a/AI/configs/config.json +++ b/AI/configs/config.json @@ -1,4 +1,4 @@ -{ +{ "db": { "host": "ep-misty-lab-adgec0kl-pooler.c-2.us-east-1.aws.neon.tech", "user": "neondb_owner", diff --git a/AI/finder/main.py b/AI/finder/main.py index 7d540122..ac91a307 100644 --- a/AI/finder/main.py +++ b/AI/finder/main.py @@ -1,47 +1,75 @@ +# finder/run_finder.py +import csv import sys import os - -from libs.utils import news_processing -from finder import ticker_selector +import time +import requests import pandas as pd -from langchain_community.llms import Ollama - +# ---- 경로 세팅 ---- project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(project_root) +from libs.utils import news_processing +from finder import ticker_selector +from libs.llm_clients.ollama_client import get_ollama_client # ← 분리된 유틸 임포트 def run_finder(): - ''' + """ 전체 프로세스를 조율하여 최종 Top 3 투자 종목 반환 - ''' + """ # --- 1단계: 의존성 객체 및 데이터 준비 --- - llm = Ollama(model="llama3.2") - try: - stability_df = pd.read_csv('data/stability_score_2025.csv') + llm = get_ollama_client() # ✅ 헬스체크 및 모델 확인 포함 + except Exception as e: + print(str(e)) + return [] + + csv_path = os.path.join(project_root, "data", "stability_score_2025.csv") + + try: + stability_df = pd.read_csv(csv_path) except FileNotFoundError: - print("오류: 'data/stability_score_2025.csv' 파일을 찾을 수 없습니다.") + print(f"오류: {csv_path} 파일을 찾을 수 없습니다.") return [] # --- 2단계: 주간 뉴스 데이터 수집 및 요약 --- - weekly_news_df = news_processing.get_weekly_news_summary(days=5, llm_client=llm) + try: + weekly_news_df = news_processing.get_weekly_news_summary(days=5, llm_client=llm) + except requests.exceptions.ConnectionError as e: + print(f"[LLM 연결 오류] 뉴스 요약 단계에서 LLM 서버 연결 실패: {e}") + return [] + except requests.exceptions.Timeout as e: + print(f"[LLM 타임아웃] 뉴스 요약 단계에서 응답 지연: {e}") + return [] + except Exception as e: + print(f"[예기치 못한 오류] 뉴스 요약 단계: {e}") + return [] - if weekly_news_df.empty: + if weekly_news_df is None or getattr(weekly_news_df, "empty", False): print("분석할 뉴스 데이터가 없어 프로세스를 종료합니다.") return [] # --- 3단계: 뉴스 데이터와 재무 데이터를 기반으로 Top 3 종목 선정 --- - top_3_tickers = ticker_selector.select_top_stocks( - news_summary_df=weekly_news_df, - stability_df=stability_df, - llm_client=llm - ) + try: + top_3_tickers = ticker_selector.select_top_stocks( + news_summary_df=weekly_news_df, + stability_df=stability_df, + llm_client=llm + ) + except requests.exceptions.ConnectionError as e: + print(f"[LLM 연결 오류] 종목 선정 단계에서 LLM 서버 연결 실패: {e}") + return [] + except requests.exceptions.Timeout as e: + print(f"[LLM 타임아웃] 종목 선정 단계에서 응답 지연: {e}") + return [] + except Exception as e: + print(f"[예기치 못한 오류] 종목 선정 단계: {e}") + return [] print("\n🎉 [Finder 모듈 최종 결과] 투자 추천 Top 3 종목 🎉") print(top_3_tickers) - return top_3_tickers if __name__ == '__main__': - run_finder() \ No newline at end of file + run_finder() diff --git a/AI/libs/core/pipeline.py b/AI/libs/core/pipeline.py index 053da095..caca8fc9 100644 --- a/AI/libs/core/pipeline.py +++ b/AI/libs/core/pipeline.py @@ -1,4 +1,4 @@ -import os +import os import sys from typing import List, Dict import json @@ -12,10 +12,10 @@ # --- 모듈 import --- from finder.main import run_finder -from AI.transformer.main import run_transformer -from AI.libs.utils.fetch_ohlcv import fetch_ohlcv +from transformer.main import run_transformer +from libs.utils.fetch_ohlcv import fetch_ohlcv from xai.run_xai import run_xai -from AI.libs.utils.get_db_conn import get_db_conn +from libs.utils.get_db_conn import get_db_conn # --------------------------------- def run_weekly_finder() -> List[str]: @@ -23,8 +23,8 @@ def run_weekly_finder() -> List[str]: 주간 종목 발굴(Finder)을 실행하고 결과(종목 리스트)를 반환합니다. """ print("--- [PIPELINE-STEP 1] Finder 모듈 실행 시작 ---") - top_tickers = run_finder() - # top_tickers = ['AAPL', 'MSFT', 'GOOGL'] # 임시 데이터 + #top_tickers = run_finder() + top_tickers = ['AAPL', 'MSFT', 'GOOGL'] # 임시 데이터 print(f"--- [PIPELINE-STEP 1] Finder 모듈 실행 완료 ---") return top_tickers @@ -145,12 +145,15 @@ def run_pipeline(): """ 전체 파이프라인(Finder -> Transformer -> XAI)을 실행합니다. """ + #--- 설정 파일 로드 --- config : Dict = {} try: with open(os.path.join(project_root, 'configs', 'config.json'), 'r') as f: config = json.load(f) except FileNotFoundError: print("[WARN] configs/config.json 파일을 찾을 수 없어 DB 연결이 필요 없는 기능만 작동합니다.") + + #--- 파이프라인 단계별 실행 --- top_tickers = run_weekly_finder() if not top_tickers: print("Finder에서 종목을 찾지 못해 파이프라인을 중단합니다.") diff --git a/AI/libs/llm_clients/__init__.py b/AI/libs/llm_clients/__init__.py new file mode 100644 index 00000000..9f9fce74 --- /dev/null +++ b/AI/libs/llm_clients/__init__.py @@ -0,0 +1,3 @@ +#AI/libs/llm_clients/ollama_client.py +from libs.llm_clients.ollama_client import get_ollama_client +__all__ = ["get_ollama_client"] \ No newline at end of file diff --git a/AI/libs/llm_clients/ollama_client.py b/AI/libs/llm_clients/ollama_client.py new file mode 100644 index 00000000..8b578e45 --- /dev/null +++ b/AI/libs/llm_clients/ollama_client.py @@ -0,0 +1,68 @@ +# libs/llm_clients/ollama_client.py +import os +import requests +from typing import Optional +from langchain_community.llms import Ollama + +# ---- 기본 설정 (환경변수로 오버라이드 가능) ---- +OLLAMA_BASE_URL = os.environ.get("OLLAMA_BASE_URL", "http://127.0.0.1:11434") +OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "llama3.2") + +def _ollama_alive(base_url: str, timeout: float = 3.0) -> bool: + """ + Ollama 서버 헬스체크: /api/tags 로 간단 확인 + """ + try: + r = requests.get(f"{base_url}/api/tags", timeout=timeout) + return r.ok + except requests.exceptions.RequestException: + return False + +def _model_available(base_url: str, model: str) -> bool: + """ + 지정 모델이 로컬 Ollama에 존재하는지 확인 + """ + try: + r = requests.get(f"{base_url}/api/tags", timeout=5) + r.raise_for_status() + tags = r.json().get("models", []) + names = {m.get("name") for m in tags if isinstance(m, dict)} + # ollama는 "llama3.2" 또는 "llama3.2:latest" 식으로 존재 가능 + return model in names or f"{model}:latest" in names + except Exception: + return False + +def get_ollama_client( + model: Optional[str] = None, + base_url: Optional[str] = None, + # langchain 0.2+에서 request_timeout 인자를 직접 받지 않는 경우가 있어 주석 처리 + # request_timeout: float = 60.0, +) -> Ollama: + """ + Ollama LangChain LLM 클라이언트 생성 + - 서버와 모델 존재 여부를 사전 점검 + """ + model = model or OLLAMA_MODEL + base_url = base_url or OLLAMA_BASE_URL + + if not _ollama_alive(base_url): + raise RuntimeError( + f"[연결 실패] Ollama 서버에 접속할 수 없습니다. llama3.2 설치 여부 확인해주세요.\n" + f"- base_url: {base_url}\n" + f"- 조치: (1) 'ollama serve' 실행 여부 확인 (2) 방화벽/프록시 (NO_PROXY=localhost,127.0.0.1) (3) 11434 포트 개방\n" + f"- 테스트: curl {base_url}/api/tags" + ) + + if not _model_available(base_url, model): + raise RuntimeError( + f"[모델 없음] '{model}' 모델이 Ollama에 없습니다.\n" + f"- 조치: ollama pull {model}\n" + f"- 보유 모델 확인: curl {base_url}/api/tags" + ) + + return Ollama( + model=model, + base_url=base_url, + # 필요 시 model_kwargs로 세부 파라미터 전달 가능 + # model_kwargs={"num_ctx": 4096}, + ) diff --git a/AI/libs/utils/__init__.py b/AI/libs/utils/__init__.py new file mode 100644 index 00000000..f65d84a5 --- /dev/null +++ b/AI/libs/utils/__init__.py @@ -0,0 +1,8 @@ +# AI/libs/utils/__init__.py +from .fetch_ohlcv import fetch_ohlcv +from .get_db_conn import get_db_conn + +__all__ = [ + "fetch_ohlcv", + "get_db_conn", +] diff --git a/AI/libs/utils/fetch_ohlcv.py b/AI/libs/utils/fetch_ohlcv.py index 5b4f1742..9570c771 100644 --- a/AI/libs/utils/fetch_ohlcv.py +++ b/AI/libs/utils/fetch_ohlcv.py @@ -1,8 +1,7 @@ -import psycopg2 -import pandas as pd +import pandas as pd # DB 접속 커넥션 생성 -from AI.libs.utils.get_db_conn import get_db_conn +from .get_db_conn import get_db_conn # OHLCV 데이터 불러오기 def fetch_ohlcv( @@ -23,12 +22,12 @@ def fetch_ohlcv( config (dict): DB 접속 정보 포함한 설정 Returns: - DataFrame: 컬럼 = [date, open, high, low, close, volume] + DataFrame: 컬럼 = [ticker, date, open, high, low, close, volume, adjusted_close] """ conn = get_db_conn(config) query = """ - SELECT date, open, high, low, close, volume + SELECT ticker, date, open, high, low, close, adjusted_close, volume FROM public.price_data WHERE ticker = %s AND date BETWEEN %s AND %s diff --git a/AI/requirements.txt b/AI/requirements.txt index 0757c868..bdeab51d 100644 --- a/AI/requirements.txt +++ b/AI/requirements.txt @@ -1,4 +1,4 @@ -pandas +pandas psycopg2-binary langchain-community tqdm @@ -11,3 +11,4 @@ yfinance groq requests beautifulsoup4 +pathlib \ No newline at end of file diff --git a/AI/tests/quick_db_check.py b/AI/tests/quick_db_check.py new file mode 100644 index 00000000..baa9dc97 --- /dev/null +++ b/AI/tests/quick_db_check.py @@ -0,0 +1,49 @@ +# quick_db_check.py +import os +import sys +import json +from typing import Dict, Union + +import psycopg2 + + +# --- 프로젝트 루트 경로 설정 --------------------------------------------------- +project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(project_root) + +# --- 설정 파일 로드 ------------------------------------------------------------ +cfg_path = os.path.join(project_root, "configs", "config.json") + +config: Dict = {} +if os.path.isfile(cfg_path): + with open(cfg_path, "r", encoding="utf-8") as f: + config = json.load(f) + print("[INFO] configs/config.json 로드 완료") +else: + print(f"[WARN] 설정 파일이 없습니다: {cfg_path}") + +db_cfg: Union[str, Dict] = (config or {}).get("db", {}) + +# --- DB 연결 테스트 ------------------------------------------------------------ +conn = None +try: + # db 설정이 dict면 키워드 인자로, 문자열(DSN)이면 그대로 사용 + if isinstance(db_cfg, dict): + conn = psycopg2.connect(**db_cfg) # 예: {"host": "...", ...} + else: + conn = psycopg2.connect(dsn=str(db_cfg)) + + with conn: + with conn.cursor() as cur: + cur.execute("SELECT version();") + print("✅ 연결 성공:", cur.fetchone()[0]) + + cur.execute("SELECT current_database(), current_user;") + db, user = cur.fetchone() + print(f"ℹ️ DB/USER: {db} / {user}") +except Exception as e: + print("❌ 연결 실패:", repr(e)) +finally: + if conn is not None: + conn.close() + diff --git a/AI/tests/test_transfomer.py b/AI/tests/test_transfomer.py new file mode 100644 index 00000000..8953394c --- /dev/null +++ b/AI/tests/test_transfomer.py @@ -0,0 +1,209 @@ +# AI/tests/test_transformer_live_fetch.py +# -*- coding: utf-8 -*- +""" +[목적] +- Transformer 모듈만 실제 OHLCV로 테스트 (저장 없음, 출력만) +- 데이터 수집은 프로젝트 표준 유틸: libs.utils.fetch_ohlcv.fetch_ohlcv 를 '그대로' 사용 + (즉, DB 우선 조회 + 실패/결측 시 야후 파이낸스 API 폴백 로직은 fetch_ohlcv 내부 정책을 따름) + +[실행] +> cd AI +> python -m tests.test_transformer_live_fetch +또는 +> python tests/test_transformer_live_fetch.py +""" + +import os +import sys +import json +from typing import Dict, List, Optional +from datetime import datetime, timedelta +import time +import random + +import pandas as pd +import numpy as np + +# --- 프로젝트 루트 경로 설정 --------------------------------------------------- +project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(project_root) +# --------------------------------------------------------------------------- + +# --- 모듈 임포트 -------------------------------------------------------------- +from transformer.main import run_transformer # Transformer 단독 테스트 대상 +from libs.utils.fetch_ohlcv import fetch_ohlcv # ★ 표준 OHLCV 수집 유틸(반드시 이걸 사용) +# --------------------------------------------------------------------------- + + +# ============================================================================= +# (옵션) 안전한 fetch 래퍼: 일시적 실패/429 등에 대비한 재시도 +# - fetch_ohlcv 내부에서도 재시도/폴백이 구현되어 있을 수 있으나, +# 테스트 안정성을 위해 여기서 추가로 얇은 재시도를 감쌉니다. +# ============================================================================= +def safe_fetch_ohlcv( + ticker: str, + start: str, + end: str, + config: Optional[Dict] = None, + max_retries: int = 5, + base_sleep: float = 0.8 +) -> pd.DataFrame: + """ + fetch_ohlcv 호출을 얇게 감싸는 재시도 래퍼. + - 429/일시 네트워크 오류 같은 경우를 대비하여 지수 백오프 + 지터 적용 + - fetch_ohlcv가 raise하면 여기서 재시도 후 최종 raise + """ + attempt = 0 + while True: + try: + df = fetch_ohlcv( + ticker=ticker, + start=start, + end=end, + config=(config or {}) + ) + return df + except Exception as e: + attempt += 1 + if attempt >= max_retries: + raise + # 지수 백오프 + 약간의 랜덤 지터 + sleep_s = base_sleep * (2 ** (attempt - 1)) + random.uniform(0, 0.6) + print(f"[WARN] {ticker} fetch_ohlcv 실패({attempt}/{max_retries}) -> {e} | {sleep_s:.2f}s 대기 후 재시도") + time.sleep(sleep_s) + + +# ============================================================================= +# Transformer 단독 라이브 테스트 +# ============================================================================= +def run_transform_only_live_with_fetch(): + """ + - configs/config.json 로드 → db 설정 전달 + - 티커별로 fetch_ohlcv 호출(★ 프로젝트 유틸 사용) → raw_data 결합 + - run_transformer 호출 → 출력만 수행(저장 없음) + """ + print("=== [TEST] Transformer 단독(실데이터) 테스트 시작 — using libs.utils.fetch_ohlcv ===") + + # ---------------------------------------------------------------------- + # (A) 설정/입력 + # ---------------------------------------------------------------------- + cfg_path = os.path.join(project_root, "configs", "config.json") + config: Dict = {} + if os.path.isfile(cfg_path): + try: + with open(cfg_path, "r", encoding="utf-8") as f: + config = json.load(f) + print("[INFO] configs/config.json 로드 완료") + except Exception as e: + print(f"[WARN] config 로드 실패(빈 설정으로 진행): {e}") + + db_config = (config or {}).get("db", {}) # fetch_ohlcv에 그대로 넘김 + + # 테스트 티커/기간 + tickers: List[str] = ["AAPL", "MSFT", "GOOGL"] # 필요 시 교체 + end_dt = datetime.now() + start_dt = end_dt - timedelta(days=600) + start_str = start_dt.strftime("%Y-%m-%d") + end_str = end_dt.strftime("%Y-%m-%d") + + seq_len = 60 + pred_h = 1 + transformer_cfg: Dict = { + "transformer": { + "features": ["open", "high", "low", "close", "volume","adjusted_close"], + "target": "close", + "scaler": "standard" + } + } + + # ---------------------------------------------------------------------- + # (B) fetch_ohlcv 로 실데이터 가져오기 (티커별 → concat) + # ---------------------------------------------------------------------- + raw_parts: List[pd.DataFrame] = [] + for tkr in tickers: + try: + print(f"[INFO] 수집 시작: {tkr} ({start_str} → {end_str})") + df = safe_fetch_ohlcv( + ticker=tkr, + start=start_str, + end=end_str, + config=db_config, # ★ fetch_ohlcv는 내부에서 DB/야후 폴백 처리 + max_retries=5, + base_sleep=0.8 + ) + if df is None or df.empty: + print(f"[WARN] {tkr} 데이터가 비어 있습니다.") + else: + # 스키마 정합성(Transformer가 기대하는 컬럼 존재 검사) + required = ["ticker", "date", "open", "high", "low", "close", "volume", "adjusted_close"] + missing = [c for c in required if c not in df.columns] + if missing: + raise ValueError(f"{tkr} 수집 데이터에 필수 컬럼 누락: {missing}") + # 날짜형 변환 보정 + if not np.issubdtype(df["date"].dtype, np.datetime64): + df["date"] = pd.to_datetime(df["date"], errors="coerce") + raw_parts.append(df.reset_index(drop=True)) + print(f"[INFO] {tkr} 수집 완료: {len(df):,} rows") + finally: + # API rate 제한 완화(티커 사이 간격) + time.sleep(0.6 + random.uniform(0, 0.6)) + + if not raw_parts: + print("[ERROR] 모든 소스에서 OHLCV 확보 실패(fetch_ohlcv 사용).") + return + + raw_data = pd.concat(raw_parts, ignore_index=True) + + # ---------------------------------------------------------------------- + # (C) Transformer 호출 + # ---------------------------------------------------------------------- + finder_df = pd.DataFrame({"ticker": tickers}) + + print("[INFO] run_transformer 호출 중...") + result = run_transformer( + finder_df=finder_df, + seq_len=seq_len, + pred_h=pred_h, + raw_data=raw_data, + config=transformer_cfg + ) + + logs_df = result.get("logs", pd.DataFrame()) if isinstance(result, dict) else pd.DataFrame() + meta = {k: v for k, v in result.items() if k != "logs"} if isinstance(result, dict) else {} + + # ---------------------------------------------------------------------- + # (D) 출력만(저장 없음) + # ---------------------------------------------------------------------- + print("\n--- [RESULT] Transformer 반환 메타 키 ---") + print(list(meta.keys())) + + print("\n--- [RESULT] 결정 로그(logs) 미리보기 ---") + if not logs_df.empty: + if not np.issubdtype(logs_df["date"].dtype, np.datetime64): + logs_df["date"] = pd.to_datetime(logs_df["date"], errors="coerce") + print(logs_df.head(10).to_string(index=False)) + else: + print("logs_df가 비어 있습니다. Transformer 내부 로직을 확인하세요.") + + if not logs_df.empty: + if "action" in logs_df.columns: + print("\n--- [STATS] 액션별 건수 ---") + print(logs_df["action"].value_counts()) + if {"ticker", "date"}.issubset(logs_df.columns): + print("\n--- [STATS] 티커별 최근 신호 2건 ---") + latest = ( + logs_df.sort_values(["ticker", "date"], ascending=[True, False]) + .groupby("ticker") + .head(2) + .reset_index(drop=True) + ) + print(latest.to_string(index=False)) + + print(f"\n=== [TEST] 종료: 총 원시행(raw_data) = {len(raw_data):,} ===") + + +# ----------------------------------------------------------------------------- +# 엔트리포인트 +# ----------------------------------------------------------------------------- +if __name__ == "__main__": + run_transform_only_live_with_fetch() diff --git a/AI/transformer/__init__.py b/AI/transformer/__init__.py index a7eb2341..2f29da46 100644 --- a/AI/transformer/__init__.py +++ b/AI/transformer/__init__.py @@ -1,3 +1,3 @@ -# AI/finder/__init__.py +# AI/transformer/__init__.py from .main import run_transformer __all__ = ["run_transformer"] diff --git a/AI/transformer/datasets/.gitkeep b/AI/transformer/datasets/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/AI/transformer/main.py b/AI/transformer/main.py index ea7dde9a..f998b282 100644 --- a/AI/transformer/main.py +++ b/AI/transformer/main.py @@ -1,163 +1,17 @@ -# transformer/modules/transformer.py +# transformer/main.py from __future__ import annotations -from typing import Dict, List, Optional, Tuple - -import numpy as np +from typing import Dict, Optional import pandas as pd -from sklearn.preprocessing import MinMaxScaler -import tensorflow as tf -from tensorflow.keras import layers, Model - -# from AI.libs.utils.io import _log -_log = print # TODO: 추후 io._log 구현 시 복구 -from .models import build_transformer_classifier # <-- 모델 분리됨 - -# ===== 공개 상수 ===== -FEATURES: List[str] = [ - "RSI", - "MACD", - "Bollinger_Bands_upper", - "Bollinger_Bands_lower", - "ATR", - "OBV", - "Stochastic", # %K - "MFI", - "MA_5", - "MA_20", - "MA_50", - "MA_200", - "CLOSE_RAW", # 마지막에 추가 (스케일 제외, 로그용) -] - -CLASS_NAMES = ["BUY", "HOLD", "SELL"] - -# ====== 기술지표 유틸 ====== -def _ema(s: pd.Series, span: int) -> pd.Series: - return s.ewm(span=span, adjust=False).mean() - -def _rsi_wilder(close: pd.Series, period: int = 14) -> pd.Series: - delta = close.diff() - gain = delta.clip(lower=0.0) - loss = -delta.clip(upper=0.0) - avg_gain = gain.ewm(alpha=1/period, adjust=False).mean() - avg_loss = loss.ewm(alpha=1/period, adjust=False).mean() - rs = avg_gain / avg_loss.replace(0, np.nan) - return 100.0 - (100.0 / (1.0 + rs)) - -def _macd_line(close: pd.Series, fast: int = 12, slow: int = 26) -> pd.Series: - return _ema(close, fast) - _ema(close, slow) - -def _true_range(high: pd.Series, low: pd.Series, close: pd.Series) -> pd.Series: - prev_close = close.shift(1) - tr1 = high - low - tr2 = (high - prev_close).abs() - tr3 = (low - prev_close).abs() - return pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) - -def _atr_wilder(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: - tr = _true_range(high, low, close) - return tr.ewm(alpha=1/period, adjust=False).mean() - -def _obv(close: pd.Series, volume: pd.Series) -> pd.Series: - # 시리즈 보장 + 결측 처리 - close = pd.Series(close) - volume = pd.Series(volume).fillna(0) - - # 전일 대비 가격 변화 - diff = close.diff() - - # 방향: 상승=1, 하락=-1, 보합=0 (NaN 안전 비교) - direction = np.where(diff.gt(0), 1, np.where(diff.lt(0), -1, 0)) - direction = pd.Series(direction, index=close.index) - - # OBV = 방향 * 거래량 누적합 - obv_series = (direction * volume).cumsum().fillna(0) - obv_series.name = "OBV" - return obv_series - -def _stochastic_k(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: - ll = low.rolling(period).min() - hh = high.rolling(period).max() - denom = (hh - ll).replace(0, np.nan) - return (close - ll) / denom * 100.0 - -def _mfi(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series, period: int = 14) -> pd.Series: - tp = ((high + low + close) / 3.0).astype(float) - rmf = (tp * volume.astype(float)).astype(float) - - delta_tp: pd.Series = tp.diff().astype(float) - - pos_mf = rmf.where(delta_tp.gt(0), 0.0) - neg_mf = rmf.where(delta_tp.lt(0), 0.0).abs() - - pos_sum = pos_mf.rolling(period).sum() - neg_sum = neg_mf.rolling(period).sum().replace(0, np.nan) - mr = pos_sum / neg_sum - return 100.0 - (100.0 / (1.0 + mr)) - +from pathlib import Path -# ====== 피처 빌더 ====== -def build_features(df: pd.DataFrame) -> pd.DataFrame: - cols = {c.lower(): c for c in df.columns} - need = ["open", "high", "low", "close", "volume"] - mapping = {} - for k in need: - if k in cols: - mapping[cols[k]] = k - if mapping: - df = df.rename(columns=mapping) - O = df["open"].astype(float) - H = df["high"].astype(float) - L = df["low"].astype(float) - C = df["close"].astype(float) - V = df["volume"].astype(float) +# (선택) 프로젝트 공용 로거가 있다면 교체: from AI.libs.utils.io import _log +_log = print - feats = pd.DataFrame(index=df.index) - feats["RSI"] = _rsi_wilder(C, period=14) - feats["MACD"] = _macd_line(C, fast=12, slow=26) +# ★ 실제 추론 로직은 modules/inference.run_inference 에 구현되어 있음 +from .modules.inference import run_inference - ma20 = C.rolling(20).mean() - std20 = C.rolling(20).std(ddof=0) - feats["Bollinger_Bands_upper"] = ma20 + 2.0 * std20 - feats["Bollinger_Bands_lower"] = ma20 - 2.0 * std20 - feats["ATR"] = _atr_wilder(H, L, C, period=14) - feats["OBV"] = _obv(C, V) - feats["Stochastic"] = _stochastic_k(H, L, C, period=14) - feats["MFI"] = _mfi(H, L, C, V, period=14) - feats["MA_5"] = C.rolling(5).mean() - feats["MA_20"] = ma20 - feats["MA_50"] = C.rolling(50).mean() - feats["MA_200"] = C.rolling(200).mean() - feats["CLOSE_RAW"] = C - return feats.dropna() -# ====== 모델 로드 ====== -def _load_or_build_model(seq_len: int, n_features: int, model_path: Optional[str]) -> Model: - model = build_transformer_classifier(seq_len, n_features) - if model_path: - try: - model.load_weights(model_path) - _log(f"[TRANSFORMER] Transformer weights loaded: {model_path}") - except Exception as e: - _log(f"[TRANSFORMER][WARN] 모델 가중치 로드 실패 → 랜덤 초기화: {e}") - else: - _log("[TRANSFORMER][WARN] model_path 미지정 → 랜덤 초기화로 진행") - return model - -# ====== 시퀀스/스케일링 ====== -def _make_sequence(feats: pd.DataFrame, use_cols: List[str], seq_len: int) -> Optional[np.ndarray]: - if len(feats) < seq_len: - return None - X = feats[use_cols].iloc[-seq_len:].copy() - return X.values.astype("float32") - -def _scale_per_ticker(seq_arr: np.ndarray) -> Tuple[np.ndarray, MinMaxScaler]: - scaler = MinMaxScaler(feature_range=(0, 1), clip=True) - X_scaled = scaler.fit_transform(seq_arr) - return X_scaled.astype("float32"), scaler - -# ====== 메인 엔트리포인트 ====== def run_transformer( *, finder_df: pd.DataFrame, @@ -168,122 +22,61 @@ def run_transformer( config: Optional[dict] = None, interval: str = "1d", ) -> Dict[str, pd.DataFrame]: - tickers = finder_df["ticker"].astype(str).tolist() - if raw_data is None or raw_data.empty: - _log("[TRANSFORMER] raw_data empty -> empty logs") - return {"logs": pd.DataFrame(columns=[ - "ticker","date","action","price","weight", - "feature1","feature2","feature3","prob1","prob2","prob3" - ])} - - df = raw_data.copy() - ts_col = "ts_local" if "ts_local" in df.columns else ("date" if "date" in df.columns else None) - if ts_col is None: - raise ValueError("raw_data에 'ts_local' 또는 'date' 컬럼이 필요합니다.") - df[ts_col] = pd.to_datetime(df[ts_col]) - df = df.rename(columns={c: c.lower() for c in df.columns}) - df = df[df["ticker"].astype(str).isin(tickers)] - if df.empty: - _log("[TRANSFORMER] 대상 종목 데이터 없음") - return {"logs": pd.DataFrame(columns=[ - "ticker","date","action","price","weight", - "feature1","feature2","feature3","prob1","prob2","prob3" - ])} - - if run_date is None: - end_dt = pd.Timestamp.now(tz="Asia/Seoul").normalize() + pd.Timedelta(days=1) - pd.Timedelta(microseconds=1) - else: - end_dt = pd.to_datetime(run_date).tz_localize("Asia/Seoul", nonexistent="shift_forward").normalize() - end_dt = end_dt + pd.Timedelta(days=1) - pd.Timedelta(microseconds=1) - - if df[ts_col].dt.tz is not None: - end_cut = end_dt.tz_convert(df[ts_col].dt.tz) - else: - end_cut = end_dt.tz_localize(None) - - df = df[df[ts_col] <= end_cut] - df = df.sort_values(["ticker", ts_col]).reset_index(drop=True) - - model_feats = [f for f in FEATURES if f != "CLOSE_RAW"] - n_features = len(model_feats) - - model_path = None - if config and "transformer" in config and "model_path" in config["transformer"]: - model_path = str(config["transformer"]["model_path"]) - - model = _load_or_build_model(seq_len=seq_len, n_features=n_features, model_path=model_path) - - rows: List[dict] = [] - for t, g in df.groupby("ticker", sort=False): - try: - if g.empty: - continue - g = g.rename(columns={ts_col: "date"}).set_index("date") - ohlcv = g[["open", "high", "low", "close", "volume"]].copy() - - feats = build_features(ohlcv) - if feats.empty: - _log(f"[TRANSFORMER] {t} features empty -> skip") - continue - - X_seq = _make_sequence(feats, model_feats, seq_len) - if X_seq is None: - _log(f"[TRANSFORMER] {t} 부족한 길이(seq_len={seq_len}) -> skip") - continue - - X_scaled, _ = _scale_per_ticker(X_seq) - X_scaled = np.expand_dims(X_scaled, axis=0) - - try: - probs = model.predict(X_scaled, verbose=0)[0] - probs = np.clip(probs.astype(float), 1e-6, 1.0) - probs = probs / probs.sum() - buy_p, hold_p, sell_p = float(probs[0]), float(probs[1]), float(probs[2]) - action = CLASS_NAMES[int(np.argmax(probs))] - except Exception as e: - _log(f"[TRANSFORMER][WARN] 모델 예측 실패({t}) → 룰기반 fallback: {e}") - recent = feats.iloc[-1] - rsi = float(recent["RSI"]) - macd = float(recent["MACD"]) - if rsi < 30 and macd > 0: - action = "BUY"; buy_p, hold_p, sell_p = 0.65, 0.30, 0.05 - elif rsi > 70 and macd < 0: - action = "SELL"; buy_p, hold_p, sell_p = 0.05, 0.30, 0.65 - else: - action = "HOLD"; buy_p, hold_p, sell_p = 0.33, 0.34, 0.33 - - p_max = max(buy_p, hold_p, sell_p) - confidence = float(np.clip((p_max - 1/3) * 1.5, 0.0, 1.0)) - ret = 0.0 - if len(feats) > 2: - c_now = float(feats["CLOSE_RAW"].iloc[-1]) - c_prev = float(feats["CLOSE_RAW"].iloc[-2]) - if c_prev: - ret = (c_now / c_prev) - 1.0 - weight = float(np.clip(0.05 + confidence * 0.20 + abs(ret) * 0.05, 0.05, 0.30)) - recent = feats.iloc[-1] - close_price = float(recent["CLOSE_RAW"]) - - rows.append({ - "ticker": str(t), - "date": feats.index[-1].strftime("%Y-%m-%d"), - "action": action, - "price": close_price, - "weight": weight, - "feature1": float(recent["RSI"]), - "feature2": float(recent["MACD"]), - "feature3": float(recent["ATR"]), - "prob1": float(buy_p), - "prob2": float(hold_p), - "prob3": float(sell_p), - }) - except Exception as e: - _log(f"[TRANSFORMER][ERROR] {t}: {e}") - continue - - logs_df = pd.DataFrame(rows, columns=[ - "ticker","date","action","price","weight", - "feature1","feature2","feature3","prob1","prob2","prob3" - ]) - return {"logs": logs_df} - + """ + + Parameters + ---------- + finder_df : pd.DataFrame + ['ticker'] 컬럼 포함. Finder 단계에서 선정된 추론 대상 종목 목록. + seq_len : int + 모델 입력 시퀀스 길이(예: 64). + pred_h : int + 예측 지평(예: 5). 라벨링/정책 기준(로그, 가중치 산정 보조)에 쓰이며 + 추론 확률 계산 자체에는 직접 관여하지 않음. + raw_data : pd.DataFrame + OHLCV 시계열. 필수 컬럼: + ['ticker','open','high','low','close','volume', ('ts_local' or 'date')] + run_date : Optional[str] + 'YYYY-MM-DD' 형식. 지정 시, 해당 날짜(포함)까지의 데이터만 사용해 추론. + 미지정 시, Asia/Seoul 기준 당일 종가까지 사용. + config : Optional[dict] + config["transformer"]["model_path"] 에 학습된 가중치 경로가 존재해야 함. + 예) {"transformer": {"model_path": "artifacts/transformer_cls.h5"}} + (추후 추론 방식 옵션이 늘어나면 이 dict 에 플래그/파라미터를 확장하세요.) + interval : str + 캔들 간격 표기(로그용). 예: '1d', '1h' 등. + + Returns + ------- + Dict[str, pd.DataFrame] + {"logs": DataFrame} 형식. + 컬럼: ["ticker","date","action","price","weight", + "feature1","feature2","feature3","prob1","prob2","prob3"] + + Notes + ----- + - 이 래퍼는 '이름/시그니처의 안정성' 확보가 목적입니다. + 내부 추론 엔진이 변경되어도 외부 호출부 수정 없이 교체가 가능합니다. + """ + + # 1) weights_path 경로지정 + base_dir = Path("/transformer/weights") + candidate = base_dir / "inital.weights.h5" + + weights_path = str(candidate) if candidate.exists() else None + + if not weights_path: + _log("[TRANSFORMER][WARN] weights_path 미설정 → 가중치 없이 랜덤 초기화로 추론될 수 있음(품질 저하).") + _log(" config 예시: {'transformer': {'weights_path': 'weights/inital.weights.h5'}}") + + + # 2) 실제 추론 실행(모듈 위임) + return run_inference( + finder_df=finder_df, + raw_data=raw_data, + seq_len=seq_len, + pred_h=pred_h, + weights_path=weights_path, # ★ 학습 가중치 경로 전달 + run_date=run_date, + interval=interval, + ) diff --git a/AI/transformer/models/__init__.py b/AI/transformer/modules/__init__.py similarity index 100% rename from AI/transformer/models/__init__.py rename to AI/transformer/modules/__init__.py diff --git a/AI/transformer/modules/features.py b/AI/transformer/modules/features.py new file mode 100644 index 00000000..e2104d0f --- /dev/null +++ b/AI/transformer/modules/features.py @@ -0,0 +1,131 @@ +# transformer/modules/features.py +from __future__ import annotations +from typing import List +import numpy as np +import pandas as pd + +# ===== 공개 상수 ===== +FEATURES: List[str] = [ + "RSI", + "MACD", + "Bollinger_Bands_upper", + "Bollinger_Bands_lower", + "ATR", + "OBV", + "Stochastic", # %K + "MFI", + "MA_5", + "MA_20", + "MA_50", + "MA_200", + "CLOSE_RAW", # 마지막에 추가 (스케일 제외, 로그/가격 참조용) +] + +# ===== 기술지표 유틸 ===== +def _ema(s: pd.Series, span: int) -> pd.Series: + """지수이동평균(EMA).""" + return s.ewm(span=span, adjust=False).mean() + +def _rsi_wilder(close: pd.Series, period: int = 14) -> pd.Series: + """Wilder RSI 계산.""" + delta = close.diff() + gain = delta.clip(lower=0.0) + loss = -delta.clip(upper=0.0) + avg_gain = gain.ewm(alpha=1/period, adjust=False).mean() + avg_loss = loss.ewm(alpha=1/period, adjust=False).mean() + rs = avg_gain / avg_loss.replace(0, np.nan) + return 100.0 - (100.0 / (1.0 + rs)) + +def _macd_line(close: pd.Series, fast: int = 12, slow: int = 26) -> pd.Series: + """MACD 라인(시그널은 미사용).""" + return _ema(close, fast) - _ema(close, slow) + +def _true_range(high: pd.Series, low: pd.Series, close: pd.Series) -> pd.Series: + """TR(True Range).""" + prev_close = close.shift(1) + tr1 = high - low + tr2 = (high - prev_close).abs() + tr3 = (low - prev_close).abs() + return pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) + +def _atr_wilder(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: + """Wilder ATR.""" + tr = _true_range(high, low, close) + return tr.ewm(alpha=1/period, adjust=False).mean() + +def _obv(close: pd.Series, volume: pd.Series) -> pd.Series: + """ + OBV(On-Balance Volume). + - 상승일: +거래량, 하락일: -거래량, 보합: 0 → 누적합 + - NaN/보합 처리 안전성 강화 + """ + close = pd.Series(close) + volume = pd.Series(volume).fillna(0) + diff = close.diff() + direction = np.where(diff.gt(0), 1, np.where(diff.lt(0), -1, 0)) + direction = pd.Series(direction, index=close.index) + obv_series = (direction * volume).cumsum().fillna(0) + obv_series.name = "OBV" + return obv_series + +def _stochastic_k(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: + """스토캐스틱 %K (단순형).""" + ll = low.rolling(period).min() + hh = high.rolling(period).max() + denom = (hh - ll).replace(0, np.nan) + return (close - ll) / denom * 100.0 + +def _mfi(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series, period: int = 14) -> pd.Series: + """MFI (Money Flow Index).""" + tp = ((high + low + close) / 3.0).astype(float) + rmf = (tp * volume.astype(float)).astype(float) + delta_tp: pd.Series = tp.diff().astype(float) + pos_mf = rmf.where(delta_tp.gt(0), 0.0) + neg_mf = rmf.where(delta_tp.lt(0), 0.0).abs() + pos_sum = pos_mf.rolling(period).sum() + neg_sum = neg_mf.rolling(period).sum().replace(0, np.nan) + mr = pos_sum / neg_sum + return 100.0 - (100.0 / (1.0 + mr)) + +def build_features(df: pd.DataFrame) -> pd.DataFrame: + """ + 입력: OHLCV 컬럼을 가진 DataFrame (open, high, low, close, volume) + 출력: 모델 학습/추론용 피처 DataFrame + - 기술지표 계산 후 NaN 행 제거 + - CLOSE_RAW는 스케일링 제외를 위해 마지막에 포함 + """ + # 컬럼 소문자 매핑 + cols = {c.lower(): c for c in df.columns} + need = ["open", "high", "low", "close", "volume"] + mapping = {} + for k in need: + if k in cols: + mapping[cols[k]] = k + if mapping: + df = df.rename(columns=mapping) + + O = df["open"].astype(float) + H = df["high"].astype(float) + L = df["low"].astype(float) + C = df["close"].astype(float) + V = df["volume"].astype(float) + + feats = pd.DataFrame(index=df.index) + feats["RSI"] = _rsi_wilder(C, period=14) + feats["MACD"] = _macd_line(C, fast=12, slow=26) + + ma20 = C.rolling(20).mean() + std20 = C.rolling(20).std(ddof=0) + feats["Bollinger_Bands_upper"] = ma20 + 2.0 * std20 + feats["Bollinger_Bands_lower"] = ma20 - 2.0 * std20 + feats["ATR"] = _atr_wilder(H, L, C, period=14) + feats["OBV"] = _obv(C, V) + feats["Stochastic"] = _stochastic_k(H, L, C, period=14) + feats["MFI"] = _mfi(H, L, C, V, period=14) + feats["MA_5"] = C.rolling(5).mean() + feats["MA_20"] = ma20 + feats["MA_50"] = C.rolling(50).mean() + feats["MA_200"] = C.rolling(200).mean() + feats["CLOSE_RAW"] = C + + return feats.dropna() diff --git a/AI/transformer/modules/inference.py b/AI/transformer/modules/inference.py new file mode 100644 index 00000000..7c7700c0 --- /dev/null +++ b/AI/transformer/modules/inference.py @@ -0,0 +1,198 @@ +# transformer/modules/inference.py +from __future__ import annotations +from typing import Dict, List, Optional, Tuple +import numpy as np +import pandas as pd +from sklearn.preprocessing import MinMaxScaler +from tensorflow.keras import Model + +# from AI.libs.utils.io import _log +_log = print # TODO: 추후 io._log 로 교체 +from transformer.modules.models import build_transformer_classifier +from transformer.modules.features import FEATURES, build_features + +CLASS_NAMES = ["BUY", "HOLD", "SELL"] + +# ===== 내부 유틸 ===== +def _make_sequence(feats: pd.DataFrame, use_cols: List[str], seq_len: int) -> Optional[np.ndarray]: + """마지막 구간(seq_len)만 잘라서 (seq_len, n_features) 배열 생성.""" + if len(feats) < seq_len: + return None + X = feats[use_cols].iloc[-seq_len:].copy() + return X.values.astype("float32") + +def _scale_per_ticker(seq_arr: np.ndarray) -> Tuple[np.ndarray, MinMaxScaler]: + """ + (중요) 추론 단계에서는 학습 시 저장한 스케일러 사용이 가장 바람직. + - 다만, '티커별 미세 스케일링' 전략을 유지하고자 할 때는 아래처럼 + 입력 시퀀스에 대해 개별 MinMax를 적용할 수 있음(일관성↓, 적응성↑). + """ + scaler = MinMaxScaler(feature_range=(0, 1), clip=True) + X_scaled = scaler.fit_transform(seq_arr) + return X_scaled.astype("float32"), scaler + +def _load_or_build_model(seq_len: int, n_features: int, weights_path: Optional[str]) -> Model: + """가중치 로드 전용. 가중치 경로 없으면 경고 후 랜덤 초기화(추론 품질↓).""" + model = build_transformer_classifier(seq_len, n_features) + if weights_path: + try: + model.load_weights(weights_path) + _log(f"[INFER] Transformer weights loaded: {weights_path}") + except Exception as e: + _log(f"[INFER][WARN] 가중치 로드 실패 → 랜덤 초기화: {e}") + else: + _log("[INFER][WARN] weights_path 미지정 → 랜덤 초기화로 진행") + return model + +# ===== 공개 엔트리포인트 (추론) ===== +def run_inference( + *, + finder_df: pd.DataFrame, + raw_data: pd.DataFrame, + seq_len: int, + pred_h: int, # (현재는 미사용; 로그/정책에 남겨두기용) + weights_path: Optional[str], + run_date: Optional[str] = None, + interval: str = "1d", +) -> Dict[str, pd.DataFrame]: + """ + ※ 추론 전용 함수 + - 입력: 선정된 종목 목록(finder_df), OHLCV 원천(raw_data) + - 처리: 피처→시퀀스→스케일링→모델 예측 + - 출력: logs DataFrame (기존 포맷 유지) + + Parameters + ---------- + finder_df : DataFrame + ['ticker'] 컬럼 포함. 추론 대상 종목 목록. + raw_data : DataFrame + OHLCV 시계열. 필수 컬럼: ['ticker','open','high','low','close','volume', ('ts_local' or 'date')] + seq_len : int + 모델 입력 시퀀스 길이. + pred_h : int + 예측 지평 (현재 로깅 목적으로만 사용). + weights_path : str + 학습된 가중치 파일 경로(.h5 또는 checkpoint). + run_date : str, optional + 'YYYY-MM-DD'. 이 날짜(포함)까지의 데이터만 사용. + interval : str + '1d' 등. (로그 용도) + """ + tickers = finder_df["ticker"].astype(str).tolist() + if raw_data is None or raw_data.empty: + _log("[INFER] raw_data empty -> empty logs") + return {"logs": pd.DataFrame(columns=[ + "ticker","date","action","price","weight", + "feature1","feature2","feature3","prob1","prob2","prob3" + ])} + + df = raw_data.copy() + ts_col = "ts_local" if "ts_local" in df.columns else ("date" if "date" in df.columns else None) + if ts_col is None: + raise ValueError("raw_data에 'ts_local' 또는 'date' 컬럼이 필요합니다.") + df[ts_col] = pd.to_datetime(df[ts_col]) + df = df.rename(columns={c: c.lower() for c in df.columns}) + df = df[df["ticker"].astype(str).isin(tickers)] + if df.empty: + _log("[INFER] 대상 종목 데이터 없음") + return {"logs": pd.DataFrame(columns=[ + "ticker","date","action","price","weight", + "feature1","feature2","feature3","prob1","prob2","prob3" + ])} + + # run_date 컷 + if run_date is None: + end_dt = pd.Timestamp.now(tz="Asia/Seoul").normalize() + pd.Timedelta(days=1) - pd.Timedelta(microseconds=1) + else: + end_dt = pd.to_datetime(run_date).tz_localize("Asia/Seoul", nonexistent="shift_forward").normalize() + end_dt = end_dt + pd.Timedelta(days=1) - pd.Timedelta(microseconds=1) + + if df[ts_col].dt.tz is not None: + end_cut = end_dt.tz_convert(df[ts_col].dt.tz) + else: + end_cut = end_dt.tz_localize(None) + + df = df[df[ts_col] <= end_cut].sort_values(["ticker", ts_col]).reset_index(drop=True) + + model_feats = [f for f in FEATURES if f != "CLOSE_RAW"] + n_features = len(model_feats) + + # ★ 추론: 반드시 학습 가중치를 로드 + model = _load_or_build_model(seq_len=seq_len, n_features=n_features, weights_path=weights_path) + + rows: List[dict] = [] + for t, g in df.groupby("ticker", sort=False): + try: + if g.empty: + continue + + g = g.rename(columns={ts_col: "date"}).set_index("date") + ohlcv = g[["open", "high", "low", "close", "volume"]].copy() + + feats = build_features(ohlcv) + if feats.empty: + _log(f"[INFER] {t} features empty -> skip") + continue + + X_seq = _make_sequence(feats, model_feats, seq_len) + if X_seq is None: + _log(f"[INFER] {t} 부족한 길이(seq_len={seq_len}) -> skip") + continue + + X_scaled, _ = _scale_per_ticker(X_seq) + X_scaled = np.expand_dims(X_scaled, axis=0) # (1, seq_len, n_features) + + try: + probs = model.predict(X_scaled, verbose=0)[0] + probs = np.clip(probs.astype(float), 1e-6, 1.0) + probs = probs / probs.sum() + buy_p, hold_p, sell_p = float(probs[0]), float(probs[1]), float(probs[2]) + action = ["BUY","HOLD","SELL"][int(np.argmax(probs))] + except Exception as e: + _log(f"[INFER][WARN] 예측 실패({t}) → 룰기반 fallback: {e}") + recent = feats.iloc[-1] + rsi = float(recent["RSI"]) + macd = float(recent["MACD"]) + if rsi < 30 and macd > 0: + action = "BUY"; buy_p, hold_p, sell_p = 0.65, 0.30, 0.05 + elif rsi > 70 and macd < 0: + action = "SELL"; buy_p, hold_p, sell_p = 0.05, 0.30, 0.65 + else: + action = "HOLD"; buy_p, hold_p, sell_p = 0.33, 0.34, 0.33 + + # 가중치(비중) 산출 로직(간단 정책 유지) + p_max = max(buy_p, hold_p, sell_p) + confidence = float(np.clip((p_max - 1/3) * 1.5, 0.0, 1.0)) + ret = 0.0 + if len(feats) > 2: + c_now = float(feats["CLOSE_RAW"].iloc[-1]) + c_prev = float(feats["CLOSE_RAW"].iloc[-2]) + if c_prev: + ret = (c_now / c_prev) - 1.0 + weight = float(np.clip(0.05 + confidence * 0.20 + abs(ret) * 0.05, 0.05, 0.30)) + + recent = feats.iloc[-1] + close_price = float(recent["CLOSE_RAW"]) + + rows.append({ + "ticker": str(t), + "date": feats.index[-1].strftime("%Y-%m-%d"), + "action": action, + "price": close_price, + "weight": weight, + "feature1": float(recent["RSI"]), + "feature2": float(recent["MACD"]), + "feature3": float(recent["ATR"]), + "prob1": float(buy_p), + "prob2": float(hold_p), + "prob3": float(sell_p), + }) + except Exception as e: + _log(f"[INFER][ERROR] {t}: {e}") + continue + + logs_df = pd.DataFrame(rows, columns=[ + "ticker","date","action","price","weight", + "feature1","feature2","feature3","prob1","prob2","prob3" + ]) + return {"logs": logs_df} diff --git a/AI/transformer/models/models.py b/AI/transformer/modules/models.py similarity index 100% rename from AI/transformer/models/models.py rename to AI/transformer/modules/models.py diff --git a/AI/libs/.gitkeep b/AI/transformer/scaler/.gitkeep similarity index 100% rename from AI/libs/.gitkeep rename to AI/transformer/scaler/.gitkeep diff --git a/AI/transformer/training/__init__.py b/AI/transformer/training/__init__.py new file mode 100644 index 00000000..9e57612b --- /dev/null +++ b/AI/transformer/training/__init__.py @@ -0,0 +1,3 @@ +# AI/training/train_transformer/__init__.py +from .train_transformer import train_transformer_classifier +__all__ = ["run_transformer"] diff --git a/AI/transformer/training/train_transformer.py b/AI/transformer/training/train_transformer.py new file mode 100644 index 00000000..4bda0db2 --- /dev/null +++ b/AI/transformer/training/train_transformer.py @@ -0,0 +1,440 @@ +# transformer/training/train_transformer.py +from __future__ import annotations +from typing import Dict, List, Optional +import os +import time +import pickle +import sys +import requests # ← yfinance SSL 이슈 회피용: REST 직접 호출 +import numpy as np +import pandas as pd +from sklearn.preprocessing import MinMaxScaler +from sklearn.model_selection import train_test_split +import tensorflow as tf +from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau + + +# --- 프로젝트 루트 경로 설정 --------------------------------------------------- +project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(project_root) +# --------------------------------------------------------------------------- + +from modules.features import FEATURES, build_features +from modules.models import build_transformer_classifier + +# from AI.libs.utils.io import _log +_log = print # TODO: io._log 로 교체 + +CLASS_NAMES = ["BUY", "HOLD", "SELL"] + +# ============================================================================= +# 1) 라벨링 정책 (분류) +# ============================================================================= +def _label_by_future_return(close: pd.Series, pred_h: int, hold_thr: float = 0.003) -> pd.Series: + """ + 미래 수익률 기준 다중분류 라벨 생성. + - r = (C[t+pred_h] / C[t]) - 1 + - |r| <= hold_thr → HOLD + - r > hold_thr → BUY + - r < -hold_thr → SELL + ※ pred_h 이후 데이터가 없으면 NaN + + Parameters + ---------- + close : pd.Series + 종가 시계열 (인덱스: datetime) + pred_h : int + 예측 지평(캔들 수) + hold_thr : float + HOLD 구간 임계치 (절댓값 기준) + """ + future = close.shift(-pred_h) + r = (future / close) - 1.0 + + buy = (r > hold_thr).astype(int) + sell = (r < -hold_thr).astype(int) + hold = ((r.abs() <= hold_thr) & r.notna()).astype(int) + + # np.select는 조건을 위에서부터 평가하므로, 우선순위: BUY > HOLD > SELL + label = np.select([buy.eq(1), hold.eq(1), sell.eq(1)], [0, 1, 2], default=np.nan) + return pd.Series(label, index=close.index, dtype="float") + +# ============================================================================= +# 2) 시퀀스/스케일링 유틸 +# ============================================================================= +def _build_sequences(feats: pd.DataFrame, use_cols: List[str], seq_len: int) -> np.ndarray: + """ + 주어진 피처 프레임에서 rolling window 방식으로 (N, seq_len, n_features) 시퀀스 배열 생성. + - NaN 포함 구간은 제외됨 (사전에 dropna 권장) + - feats.index는 datetime(정렬 완료)이어야 함 + """ + X_list = [] + for i in range(seq_len, len(feats) + 1): + window = feats[use_cols].iloc[i - seq_len : i].values.astype("float32") + # NaN이 있으면 스킵 (안전장치) + if np.isnan(window).any(): + continue + X_list.append(window) + if not X_list: + return np.empty((0, seq_len, len(use_cols)), dtype="float32") + return np.stack(X_list, axis=0) + +def _align_labels(feats: pd.DataFrame, labels: pd.Series, seq_len: int) -> np.ndarray: + """ + 시퀀스 끝 시점에 대한 라벨을 맞추기 위해, 시퀀스 시작 오프셋(seq_len-1)만큼 라벨을 잘라서 정렬. + - feats, labels는 동일 인덱스(날짜)여야 함 + """ + return labels.iloc[seq_len - 1 :].values + +def _fit_scaler_on_train(X: np.ndarray) -> MinMaxScaler: + """ + 학습 데이터 전체 분포에 맞춰 스케일러 적합. + - 입력 X: (N, seq_len, n_features) + - 스케일은 feature-wise로 수행하기 위해 2D로 변형 후 적합 + """ + _, _, f = X.shape + scaler = MinMaxScaler(feature_range=(0, 1), clip=True) + X2 = X.reshape(-1, f) # (N*seq_len, n_features) + scaler.fit(X2) + return scaler + +def _apply_scaler(X: np.ndarray, scaler: MinMaxScaler) -> np.ndarray: + """학습/검증/테스트에 동일 스케일 적용.""" + n, s, f = X.shape + X2 = X.reshape(-1, f) + X2 = scaler.transform(X2) + return X2.reshape(n, s, f).astype("float32") + +# ============================================================================= +# 3) 학습 메인 파이프라인 +# ============================================================================= +def train_transformer_classifier( + *, + raw_data: pd.DataFrame, + seq_len: int, + pred_h: int, + model_out_path: str, + scaler_out_path: Optional[str] = None, + tickers: Optional[List[str]] = None, + run_date: Optional[str] = None, + test_size: float = 0.2, + random_state: int = 42, + hold_thr: float = 0.003, + batch_size: int = 64, + epochs: int = 50, +) -> Dict[str, any]: + """ + Transformer 분류기 학습 파이프라인. + - 입력: 원천 OHLCV(raw_data; 여러 티커 혼합 가능) + - 처리: 피처 생성 → 시퀀스 빌드 → 라벨링 → 스케일 → 학습 + - 출력: history, 저장된 가중치 경로 등 + + Parameters + ---------- + raw_data : DataFrame + ['ticker','open','high','low','close','volume', ('ts_local' or 'date')] 포함 + seq_len : int + 입력 시퀀스 길이 + pred_h : int + 미래 라벨링 지평(일수/캔들수) + model_out_path : str + 최종 가중치 저장 경로(.h5 권장): 예) 'artifacts/transformer_cls.h5' + scaler_out_path : str, optional + 스케일러 저장 경로(.pkl). 추론 시 동일 스케일 사용을 원할 때 권장. + tickers : list, optional + 학습 대상 티커 필터. None이면 모두 사용. + run_date : str, optional + 'YYYY-MM-DD'. 이 날짜(포함)까지의 데이터 사용. + test_size : float + train/val 분할 비율. + hold_thr : float + HOLD 라벨 임계값(|r| <= hold_thr). + """ + # ---------- 데이터 준비 ---------- + if raw_data is None or raw_data.empty: + raise ValueError("raw_data가 비어있습니다.") + + df = raw_data.copy() + # 컬럼 소문자화(혼용 방지) + df = df.rename(columns={c: c.lower() for c in df.columns}) + ts_col = "ts_local" if "ts_local" in df.columns else ("date" if "date" in df.columns else None) + if ts_col is None: + raise ValueError("raw_data에 'ts_local' 또는 'date' 컬럼이 필요합니다.") + df[ts_col] = pd.to_datetime(df[ts_col], utc=True, errors="coerce") + if df[ts_col].isna().any(): + raise ValueError("타임스탬프 파싱 중 NaT가 발생했습니다. 원본 데이터를 확인하세요.") + + if tickers is not None: + df["ticker"] = df["ticker"].astype(str) + df = df[df["ticker"].isin([str(t) for t in tickers])] + + # run_date 컷 (Asia/Seoul 기준) + if run_date is not None: + # Asia/Seoul 자정까지 포함되도록 끝점 계산 + end_dt = pd.to_datetime(run_date).tz_localize("Asia/Seoul", nonexistent="shift_forward").normalize() + end_dt = end_dt + pd.Timedelta(days=1) - pd.Timedelta(microseconds=1) + # df는 UTC → 동일 기준으로 비교 + end_cut_utc = end_dt.tz_convert("UTC") + df = df[df[ts_col] <= end_cut_utc] + + df = df.sort_values(["ticker", ts_col]).reset_index(drop=True) + + # ---------- 피처 + 라벨 ---------- + # 모델 입력 피처 후보 (CLOSE_RAW는 라벨링용으로만 사용하고 입력 피처에서는 제외) + model_feats = [f for f in FEATURES if f != "CLOSE_RAW"] + X_all, y_all = [], [] + + for t, g in df.groupby("ticker", sort=False): + g = g.rename(columns={ts_col: "date"}).set_index("date") + ohlcv = g[["open", "high", "low", "close", "volume"]].copy() + + # 사용자 정의 피처 빌드 + feats = build_features(ohlcv) # 반드시 'CLOSE_RAW' 포함한다고 가정 + if len(feats) < (seq_len + pred_h + 1): + # 시퀀스/라벨링 최소 길이 부족 시 스킵 + _log(f"[WARN] {t}: 데이터가 부족하여 스킵 (len={len(feats)})") + continue + + # 라벨 생성 (미래 수익률 기준) + labels = _label_by_future_return(feats["CLOSE_RAW"], pred_h=pred_h, hold_thr=hold_thr) + + # 동시 NaN 제거 및 정렬 + feats = feats.dropna() + labels = labels.reindex(feats.index) + + # 라벨 NaN(미래 없음) 제거 + valid_mask = labels.notna() + feats = feats[valid_mask] + labels = labels[valid_mask] + + # 시퀀스/라벨 정렬 + X_seq = _build_sequences(feats, model_feats, seq_len) + y_seq = _align_labels(feats, labels, seq_len) + + # 마지막 pred_h 구간은 미래가 없어 NaN일 수 있음 → 제거 + valid_idx = ~np.isnan(y_seq) + X_seq = X_seq[valid_idx] + y_seq = y_seq[valid_idx].astype(int) + + if len(X_seq) == 0: + _log(f"[WARN] {t}: 유효 시퀀스 0개 (정책/길이 확인)") + continue + + X_all.append(X_seq) + y_all.append(y_seq) + + if not X_all: + raise ValueError("학습에 사용할 수 있는 시퀀스가 생성되지 않았습니다. (데이터 길이/라벨 정책 확인)") + + X = np.concatenate(X_all, axis=0) # (N, seq_len, n_features) + y = np.concatenate(y_all, axis=0) # (N,) + + # ---------- 스케일링(학습 데이터 기준) ---------- + scaler = _fit_scaler_on_train(X) + X = _apply_scaler(X, scaler) + + # 클래스 불균형이 심할 수 있으니 stratify 분할 + X_train, X_val, y_train, y_val = train_test_split( + X, y, test_size=test_size, random_state=random_state, stratify=y + ) + + # ---------- 모델 ---------- + n_features = X.shape[-1] + model = build_transformer_classifier(seq_len=seq_len, n_features=n_features) + model.compile( + optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), + loss="sparse_categorical_crossentropy", + metrics=["accuracy"] + ) + + # ---------- 콜백 ---------- + os.makedirs(os.path.dirname(model_out_path), exist_ok=True) + ckpt = ModelCheckpoint( + filepath=model_out_path, + monitor="val_accuracy", + save_best_only=True, + save_weights_only=True, + verbose=1 + ) + es = EarlyStopping(monitor="val_accuracy", patience=8, restore_best_weights=True, verbose=1) + rlrop = ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=4, min_lr=1e-5, verbose=1) + + # ---------- 학습 ---------- + history = model.fit( + X_train, y_train, + validation_data=(X_val, y_val), + epochs=epochs, + batch_size=batch_size, + callbacks=[ckpt, es, rlrop], + verbose=1 + ) + + # ---------- 스케일러 저장(옵션) ---------- + if scaler_out_path: + os.makedirs(os.path.dirname(scaler_out_path), exist_ok=True) + with open(scaler_out_path, "wb") as f: + pickle.dump(scaler, f) + _log(f"[TRAIN] Scaler saved: {scaler_out_path}") + + _log(f"[TRAIN] Weights saved(best): {model_out_path}") + return { + "history": history.history, + "n_samples": int(len(X)), + "class_dist": {int(k): int(v) for k, v in zip(*np.unique(y, return_counts=True))}, + "model_path": model_out_path, + "scaler_path": scaler_out_path + } + +# ============================================================================= +# 4) 야후 파이낸스 REST 폴백: OHLCV 수집 (requests) +# - yfinance SSL/차단 이슈를 피해 직접 엔드포인트 호출 +# ============================================================================= +def _yahoo_interval_str(interval: str) -> str: + """ + 야후 차트 API interval 명세 검증/정규화. + - 허용: '1d','1h','1wk','1mo' 등 + """ + allowed = {"1m","2m","5m","15m","30m","60m","90m","1h","1d","5d","1wk","1mo","3mo"} + if interval not in allowed: + raise ValueError(f"지원하지 않는 interval: {interval} (허용: {sorted(allowed)})") + return interval + +def _fetch_yahoo_ohlcv( + ticker: str, + start: pd.Timestamp, + end: pd.Timestamp, + interval: str = "1d", + retries: int = 3, + sleep_sec: float = 1.0, +) -> pd.DataFrame: + """ + 야후 파이낸스 차트 API(v8)에서 OHLCV를 수집하여 DataFrame 반환. + - 요청 URL: https://query2.finance.yahoo.com/v8/finance/chart/{ticker} + - 파라미터: period1(UNIX), period2(UNIX), interval + - 반환 컬럼: ['ticker','date','open','high','low','close','volume','ts_local(Asia/Seoul)'] + + 주의 + ---- + * period1/period2는 초 단위 UNIX 타임스탬프. + * 반환 timeZone은 종목 거래소 기준이므로, ts_local은 Asia/Seoul로 별도 변환해서 제공. + * 프리마켓/서머타임 등 미세한 체결 시간 차이에 따른 분봉은 케이스별 확인 필요. + """ + interval = _yahoo_interval_str(interval) + base = "https://query2.finance.yahoo.com/v8/finance/chart/{}".format(ticker) + params = { + "period1": int(pd.Timestamp(start).tz_convert("UTC").timestamp()), + "period2": int(pd.Timestamp(end).tz_convert("UTC").timestamp()), + "interval": interval, + "events": "div,splits" + } + headers = { + "User-Agent": "Mozilla/5.0", + "Accept": "application/json, text/plain, */*", + "Connection": "keep-alive", + } + + last_err = None + for _ in range(retries): + try: + resp = requests.get(base, params=params, headers=headers, timeout=10) + resp.raise_for_status() + data = resp.json() + result = data.get("chart", {}).get("result") + if not result: + raise ValueError(f"Yahoo 응답에 result가 없습니다: {data}") + result = result[0] + + ts_list = result["timestamp"] # 초 단위 UNIX + ind = pd.to_datetime(ts_list, unit="s", utc=True) + + q = result["indicators"]["quote"][0] + df = pd.DataFrame({ + "open": q.get("open"), + "high": q.get("high"), + "low": q.get("low"), + "close": q.get("close"), + "volume": q.get("volume"), + }, index=ind) + + # 기본 정리 + df = df.dropna(subset=["open","high","low","close"]) # 완전결측 제거 + df["ticker"] = str(ticker) + + # 로컬(Asia/Seoul) 타임스탬프 컬럼 별도 생성 + df["ts_local"] = df.index.tz_convert("Asia/Seoul") + + # date(UTC), ts_local 둘 다 보유 (학습코드는 ts_local/ date 둘 중 하나만 있으면 동작) + df = df.reset_index().rename(columns={"index": "date"}) + return df[["ticker","date","open","high","low","close","volume","ts_local"]] + except Exception as e: + last_err = e + time.sleep(sleep_sec) + raise RuntimeError(f"야후 차트 API 호출 실패: {last_err}") + +# ============================================================================= +# 5) 단독 실행(초기 가중치 생성)용 CLI 엔트리포인트 +# ============================================================================= +def run_training(config: dict): + """config 딕셔너리 기반 Transformer 학습 실행""" + + # ---- 1) 데이터 수집 ---- + start = pd.Timestamp(config["start"], tz="Asia/Seoul").tz_convert("UTC") + end = pd.Timestamp(config["end"], tz="Asia/Seoul").tz_convert("UTC") + pd.Timedelta(days=1) + + frames = [] + for t in config["tickers"]: + _log(f"[FETCH] {t} {config['interval']} {config['start']}→{config['end']}") + df_t = _fetch_yahoo_ohlcv( + ticker=t, + start=start, + end=end, + interval=config["interval"] + ) + frames.append(df_t) + raw = pd.concat(frames, ignore_index=True) + + # ---- 2) 학습 ---- + os.makedirs(os.path.dirname(config["model_out"]), exist_ok=True) + os.makedirs(os.path.dirname(config["scaler_out"]), exist_ok=True) + + result = train_transformer_classifier( + raw_data=raw, + seq_len=config["seq_len"], + pred_h=config["pred_h"], + model_out_path=config["model_out"], + scaler_out_path=config["scaler_out"], + tickers=config["tickers"], + run_date=config.get("run_date"), + test_size=config["test_size"], + hold_thr=config["hold_thr"], + batch_size=config["batch_size"], + epochs=config["epochs"], + ) + + # ---- 3) 요약 ---- + _log("[DONE] -------- Summary --------") + _log(f"Samples: {result['n_samples']}") + _log(f"Class dist (0:BUY,1:HOLD,2:SELL): {result['class_dist']}") + _log(f"Weights: {result['model_path']}") + _log(f"Scaler : {result['scaler_path']}") + + +if __name__ == "__main__": + # ⚙️ 여기에 원하는 설정만 바꾸면 됨 + config = { + "tickers": ["AAPL", "MSFT"], # 학습 대상 종목 + "start": "2018-01-01", # 시작일 + "end": "2025-10-31", # 종료일 + "interval": "1d", # 일봉 + "seq_len": 64, # 시퀀스 길이 + "pred_h": 5, # 예측 지평(미래 5일) + "hold_thr": 0.003, # HOLD 임계치 + "test_size": 0.2, # 검증셋 비율 + "epochs": 3, # 에폭 수 + "batch_size": 128, # 배치 크기 + "model_out": "transformer/weights/initial.weights.h5", # 가중치 저장 + "scaler_out": "transformer/scaler/scaler.pkl", # 스케일러 저장 + "run_date": None, # 특정 날짜까지만 사용할 경우 지정 + } + + run_training(config) diff --git a/AI/transformer/weights/initial.weights.h5 b/AI/transformer/weights/initial.weights.h5 new file mode 100644 index 00000000..52a04ebd Binary files /dev/null and b/AI/transformer/weights/initial.weights.h5 differ diff --git a/transformer/scaler/scaler.pkl b/transformer/scaler/scaler.pkl new file mode 100644 index 00000000..b71b9ba4 Binary files /dev/null and b/transformer/scaler/scaler.pkl differ