diff --git a/AI/libs/database/fetcher.py b/AI/libs/database/fetcher.py index 3ca55a2e..69a5e050 100644 --- a/AI/libs/database/fetcher.py +++ b/AI/libs/database/fetcher.py @@ -1,4 +1,4 @@ -# libs/database/fetcher.py +# AI/libs/database/fetcher.py from __future__ import annotations from typing import Optional import pandas as pd @@ -15,25 +15,22 @@ def fetch_ohlcv( db_name: str = "db", ) -> pd.DataFrame: """ - 특정 티커, 날짜 범위의 OHLCV 데이터를 DB에서 불러오기 (SQLAlchemy 엔진 사용) + 특정 티커, 날짜 범위의 OHLCV 데이터를 DB에서 불러오기 Args: ticker (str): 종목 코드 (예: "AAPL") - start (str): 시작일자 'YYYY-MM-DD' (inclusive) - end (str): 종료일자 'YYYY-MM-DD' (inclusive) - interval (str): 데이터 간격 ('1d' 등) - 현재 테이블이 일봉만 제공하면 무시됨 - db_name (str): get_engine()가 참조할 설정 블록 이름 (예: "db", "report_DB") + start (str): 시작일자 'YYYY-MM-DD' + end (str): 종료일자 'YYYY-MM-DD' + interval (str): 데이터 간격 (현재 일봉만 지원) + db_name (str): DB 설정 이름 Returns: - pd.DataFrame: 컬럼 = [ticker, date, open, high, low, close, adjusted_close, volume] - (date 컬럼은 pandas datetime으로 변환됨) + pd.DataFrame: [ticker, date, open, high, low, close, adjusted_close, volume] """ - # 1) SQLAlchemy engine 얻기 (configs/config.json 기준) engine = get_engine(db_name) - # 2) 쿼리: named parameter(:ticker 등) 사용 -> 안전하고 가독성 좋음 - # - interval 분기가 필요하면 테이블/파티션 구조에 따라 쿼리를 분기하도록 확장 가능 + # adjusted_close가 중요하다면 쿼리 단계에서 확실히 가져옵니다. query = text(""" SELECT ticker, date, open, high, low, close, adjusted_close, volume FROM public.price_data @@ -42,28 +39,32 @@ def fetch_ohlcv( ORDER BY date; """) - # 3) DB에서 읽기 (with 문으로 커넥션 자동 정리) with engine.connect() as conn: df = pd.read_sql( query, - con=conn, # 꼭 키워드 인자로 con=conn - params={"ticker": ticker, "start": start, "end": end}, # 튜플 X, 딕셔너리 O - ) + con=conn, + params={"ticker": ticker, "start": start, "end": end}, + ) - # 4) 후처리: 컬럼 정렬 및 date 타입 통일 + # 빈 데이터 처리 if df is None or df.empty: - # 빈 DataFrame이면 일관된 컬럼 스키마로 반환 return pd.DataFrame(columns=["ticker", "date", "open", "high", "low", "close", "adjusted_close", "volume"]) - # date 컬럼을 datetime으로 변경 (UTC로 맞추고 싶으면 pd.to_datetime(..., utc=True) 사용) + # 날짜 변환 if "date" in df.columns: df["date"] = pd.to_datetime(df["date"]) - # 선택: 컬럼 순서 고정 (일관성 유지) + # 데이터 보정 로직 추가 + # 1. adjusted_close가 없는 경우(NaN) -> close 값으로 대체 (결측치 방지) + if "adjusted_close" in df.columns and "close" in df.columns: + df["adjusted_close"] = df["adjusted_close"].fillna(df["close"]) + elif "adjusted_close" not in df.columns and "close" in df.columns: + # 컬럼 자체가 없으면 close를 복사해서 생성 + df["adjusted_close"] = df["close"] + + # 컬럼 순서 정리 desired_cols = ["ticker", "date", "open", "high", "low", "close", "adjusted_close", "volume"] - # 존재하는 컬럼만 가져오기 cols_present = [c for c in desired_cols if c in df.columns] df = df.loc[:, cols_present] - return df - + return df \ No newline at end of file diff --git a/AI/modules/signal/core/features.py b/AI/modules/signal/core/features.py index d7467ab5..ca3d98e1 100644 --- a/AI/modules/signal/core/features.py +++ b/AI/modules/signal/core/features.py @@ -1,8 +1,9 @@ # AI/modules/signal/core/features.py """ -[피처 엔지니어링 모듈] -- OHLCV 데이터를 입력받아 학습에 필요한 기술적 지표(RSI, MACD, 볼린저밴드 등)를 추가합니다. -- 데이터 로더(DataLoader)에서 이 함수를 호출하여 전처리를 수행합니다. +[피처 엔지니어링 모듈 - Adjusted Close 통합 버전] +- 데이터에 'adjusted_close'가 있다면 이를 'close'로 덮어씌웁니다. +- 이렇게 하면 모든 지표(RSI, MACD 등)가 자연스럽게 '조정 종가' 기준으로 계산됩니다. +- 학습 시 'close'와 'adjusted_close'가 중복되는 문제도 해결됩니다. """ import pandas as pd @@ -10,41 +11,45 @@ def add_technical_indicators(df: pd.DataFrame) -> pd.DataFrame: """ - 데이터프레임에 기술적 지표 컬럼을 추가합니다. - - Args: - df (pd.DataFrame): OHLCV 데이터 (필수 컬럼: 'close', 'high', 'low', 'volume') - - Returns: - pd.DataFrame: 지표가 추가된 데이터프레임 + 1. 조정 종가(Adjusted Close)를 종가(Close)로 통합합니다. + 2. 기술적 지표를 계산하여 추가합니다. """ if df.empty: return df df = df.copy() + # ★ [핵심 수정] 조정 종가 우선 정책 + # adjusted_close가 있으면, 이를 close에 덮어쓰고 adjusted_close 컬럼은 삭제합니다. + if 'adjusted_close' in df.columns: + # 결측치 방지 (혹시 adjusted_close가 비어있으면 close 값 사용) + df['adjusted_close'] = df['adjusted_close'].fillna(df['close']) + + # 덮어쓰기 + df['close'] = df['adjusted_close'] + + # 중복 방지를 위해 삭제 (이제 close가 adjusted_close 역할을 함) + df.drop(columns=['adjusted_close'], inplace=True) + + # --- 이하 모든 계산은 'close'(실제로는 조정 종가)를 기준으로 수행됨 --- + # 1. 이동평균선 (Simple Moving Average) df['ma5'] = df['close'].rolling(window=5).mean() df['ma20'] = df['close'].rolling(window=20).mean() df['ma60'] = df['close'].rolling(window=60).mean() # 2. RSI (Relative Strength Index) - # CodeRabbit 리뷰 반영: 엣지 케이스(횡보, 상승지속) 정밀 처리 delta = df['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() - # division by zero 등 경고 억제 with np.errstate(divide='ignore', invalid='ignore'): rs = gain / loss - # 기본 RSI 계산 (loss=0인 경우 rs=inf가 되며, 100/(1+inf)=0 이므로 RSI=100이 됨 -> 정상) df['rsi'] = 100 - (100 / (1 + rs)) - # [보정 1] 가격 변동이 아예 없는 경우 (Gain=0, Loss=0) -> NaN 발생 -> 50(중립)으로 설정 + # RSI 보정 df.loc[(gain == 0) & (loss == 0), 'rsi'] = 50 - - # [보정 2] 하락 없이 상승만 한 경우 (Loss=0, Gain>0) -> 100(강세)으로 설정 (수식상 자동 처리되나 명시) df.loc[(loss == 0) & (gain > 0), 'rsi'] = 100 # 3. 볼린저 밴드 (Bollinger Bands) @@ -52,18 +57,22 @@ def add_technical_indicators(df: pd.DataFrame) -> pd.DataFrame: df['upper_band'] = df['ma20'] + (df['std20'] * 2) df['lower_band'] = df['ma20'] - (df['std20'] * 2) - # 4. MACD (Moving Average Convergence Divergence) + # 4. MACD exp12 = df['close'].ewm(span=12, adjust=False).mean() exp26 = df['close'].ewm(span=26, adjust=False).mean() df['macd'] = exp12 - exp26 df['signal_line'] = df['macd'].ewm(span=9, adjust=False).mean() # 5. 거래량 변화율 - df['vol_change'] = df['volume'].pct_change() + if 'volume' in df.columns: + df['vol_change'] = df['volume'].pct_change() + df['vol_change'] = df['vol_change'].replace([np.inf, -np.inf], 0) + else: + df['vol_change'] = 0 - # 6. 결측치 처리 (지표 계산 초반 구간) - # [수정] FutureWarning 해결: fillna(method='bfill') -> bfill() + # === [데이터 정제] === + df.replace([np.inf, -np.inf], np.nan, inplace=True) df = df.bfill() - df = df.fillna(0) # 앞부분 bfill로도 안 채워지는 경우 0 처리 + df = df.fillna(0) return df \ No newline at end of file diff --git a/AI/modules/trader/backtest/__init__.py b/AI/modules/trader/backtest/__init__.py new file mode 100644 index 00000000..c51e16b9 --- /dev/null +++ b/AI/modules/trader/backtest/__init__.py @@ -0,0 +1,10 @@ +""" +[Backtest Execution Package] +- 단일 종목 및 포트폴리오 단위의 백테스트 실행 함수들을 제공합니다. +""" + +# 함수 이름이 겹치지 않게 alias(별칭)를 주어 명확히 구분합니다. +from .run_portfolio import run_backtest as run_portfolio_backtest +from .run_backtrader_single import run_single_backtest + +__all__ = ['run_portfolio_backtest', 'run_single_backtest'] \ No newline at end of file diff --git a/AI/modules/trader/backtest/run_backtrader_single.py b/AI/modules/trader/backtest/run_backtrader_single.py new file mode 100644 index 00000000..dd7b19c5 --- /dev/null +++ b/AI/modules/trader/backtest/run_backtrader_single.py @@ -0,0 +1,188 @@ +# AI/modules/trader/backtest/run_backtrader_single.py +""" +[Backtrader 기반 단일 종목 정밀 백테스트] +- Walk-Forward Validation 지원 +- strategies/rule_based.py 의 RuleBasedStrategy 클래스 사용 +- AI Score 시각화 기능 포함 +""" + +import sys +import os +import backtrader as bt +import pandas as pd +import numpy as np + +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../../../..")) +if project_root not in sys.path: + sys.path.append(project_root) + +from AI.modules.signal.core.data_loader import SignalDataLoader +from AI.modules.signal.models import get_model +# ★ [수정] 클래스 기반 전략 불러오기 +from AI.modules.trader.strategies.rule_based import RuleBasedStrategy + +class AIScoreObserver(bt.Observer): + """차트 하단에 AI 모델 점수를 그리기 위한 클래스""" + lines = ('score', 'limit_buy', 'limit_sell') + plotinfo = dict(plot=True, subplot=True, plotname='AI Probability') + plotlines = dict( + score=dict(marker='o', markersize=3.0, color='blue', _fill_gt=(0.5, 'red'), _fill_lt=(0.5, 'green')), + limit_buy=dict(color='red', linestyle='--'), + limit_sell=dict(color='green', linestyle='--') + ) + + def next(self): + score = getattr(self._owner, 'current_score', 0.5) + self.lines.score[0] = score + self.lines.limit_buy[0] = 0.65 + self.lines.limit_sell[0] = 0.40 + +class TransformerWalkForwardStrategy(bt.Strategy): + params = ( + ('model_weights_path', None), + ('raw_df', None), + ('features', None), + ('loader', None), + ('seq_len', 60), + ('model_name', "transformer"), + ) + + def __init__(self): + self.model = self._load_model() + self.order = None + self.current_score = 0.5 + # ★ [수정] 전략 객체 초기화 + self.strategy_logic = RuleBasedStrategy(buy_threshold=0.65, sell_threshold=0.40) + + def log(self, txt, dt=None): + dt = dt or self.datas[0].datetime.date(0) + print(f'[{dt.isoformat()}] {txt}') + + def _load_model(self): + path = self.p.model_weights_path + if not path or not os.path.exists(path): + self.log("⚠️ 모델 가중치 파일 없음.") + return None + + default_config = { + "head_size": 256, "num_heads": 4, "ff_dim": 4, + "num_blocks": 4, "mlp_units": [128], "dropout": 0.1 + } + try: + model = get_model(self.p.model_name, default_config) + model.build((None, self.p.seq_len, len(self.p.features))) + if hasattr(model, 'model'): + model.model.load_weights(path) + else: + model.load_weights(path) + return model + except Exception as e: + self.log(f"⚠️ 모델 로드 에러: {e}") + return None + + def notify_order(self, order): + if order.status in [order.Completed]: + if order.isbuy(): + self.log(f"🔵 BUY 체결 @ {order.executed.price:,.0f}") + elif order.issell(): + self.log(f"🔴 SELL 체결 @ {order.executed.price:,.0f}") + self.order = None + + def next(self): + if len(self) < self.p.seq_len: + return + + current_date = self.datas[0].datetime.datetime(0) + past_data = self.p.raw_df.loc[:current_date] + if len(past_data) < self.p.seq_len: + return + + # 1. Walk-Forward Prediction + self.p.loader.scaler.fit(past_data[self.p.features]) + recent_data = past_data.iloc[-self.p.seq_len:] + input_seq = np.expand_dims(self.p.loader.scaler.transform(recent_data[self.p.features]), axis=0) + + if self.model: + pred = self.model.predict(input_seq, verbose=0) + score = float(pred[0][0]) + else: + score = 0.5 + + self.current_score = score + + # 2. 매매 판단 (RuleBasedStrategy 사용) + if self.order: return # 이미 주문 중이면 패스 + + position_qty = self.position.size + # ★ [수정] 클래스 메서드 호출로 변경 (코드가 훨씬 깔끔해짐) + decision = self.strategy_logic.get_action(score, position_qty) + + if decision['type'] == 'BUY': + # 보유 현금의 95%만큼 매수 계산 (Backtrader 로직) + cash = self.broker.get_cash() + price = self.datas[0].close[0] + # 수수료 고려하여 안전하게 계산 + size = int((cash * 0.95) / price) + if size > 0: + self.log(f"BUY 신호 (Score: {score:.2f})") + self.order = self.buy(size=size) + + elif decision['type'] == 'SELL': + if position_qty > 0: + self.log(f"SELL 신호 (Score: {score:.2f})") + self.order = self.close() # 전량 청산 + +def run_single_backtest(ticker="AAPL", start_date="2024-01-01", end_date="2024-06-01", enable_plot=True): + print(f"\n=== [{ticker}] 단일 종목 백테스트 시작 ===") + + weight_path = os.path.join(project_root, "AI/data/weights/transformer/universal_transformer.keras") + loader = SignalDataLoader(sequence_length=60) + df = loader.load_data(ticker, start_date, end_date) + + if df is None or len(df) < 100: + print("❌ 데이터 로드 실패") + return + + if 'date' in df.columns: + df['date'] = pd.to_datetime(df['date']) + df.set_index('date', inplace=True) + + data_feed = bt.feeds.PandasData(dataname=df) + features = df.select_dtypes(include=[np.number]).columns.tolist() + + cerebro = bt.Cerebro() + cerebro.adddata(data_feed) + + cerebro.addstrategy( + TransformerWalkForwardStrategy, + model_weights_path=weight_path, + raw_df=df, + features=features, + loader=loader + ) + + if enable_plot: + cerebro.addobserver(AIScoreObserver) + + cerebro.broker.setcash(10_000_000) + cerebro.broker.setcommission(commission=0.0015) + cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe', riskfreerate=0.0) + cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown') + + print(f"💰 초기 자산: {cerebro.broker.getvalue():,.0f}원") + results = cerebro.run() + + strat = results[0] + final_val = cerebro.broker.getvalue() + mdd = strat.analyzers.drawdown.get_analysis()['max']['drawdown'] + sharpe = strat.analyzers.sharpe.get_analysis().get('sharperatio', 0.0) + + print(f"💰 최종 자산: {final_val:,.0f}원 ({(final_val/10000000 - 1)*100:.2f}%)") + print(f"📉 MDD: {mdd:.2f}% | 📊 Sharpe: {sharpe:.4f}") + + if enable_plot: + cerebro.plot(style='candlestick', volume=False) + +if __name__ == "__main__": + run_single_backtest() \ No newline at end of file diff --git a/AI/modules/trader/backtest/run_backtrader_single.py.py b/AI/modules/trader/backtest/run_backtrader_single.py.py deleted file mode 100644 index 5f8db954..00000000 --- a/AI/modules/trader/backtest/run_backtrader_single.py.py +++ /dev/null @@ -1,231 +0,0 @@ -# AI/modules/trader/backtest/run_backtrader_single.py -""" -[Backtrader 기반 단일 종목 백테스트 엔진] -- Walk-Forward Validation (매일 Scaler 재학습) 지원 -- AI Score 시각화 (Observer) 포함 -- 용도: 특정 종목에 대한 모델의 타점 정밀 분석 -""" - -import sys -import os -import backtrader as bt -import pandas as pd -import numpy as np - -# 프로젝트 루트 경로 추가 (모듈 임포트 문제 해결) -current_dir = os.path.dirname(os.path.abspath(__file__)) -project_root = os.path.abspath(os.path.join(current_dir, "../../../..")) -if project_root not in sys.path: - sys.path.append(project_root) - -from AI.modules.signal.core.data_loader import SignalDataLoader -from AI.modules.signal.models import get_model - -# ★ 리팩토링된 경로 확인 (아직 파일명 변경 전이면 policy.py에서 import) -try: - from AI.modules.trader.strategies.rule_based import decide_order -except ImportError: - from AI.modules.trader.policy import decide_order - -# ───────────────────────────────────────────────────────────────────────────── -# 1. AI Score Observer (차트 시각화용) -# ───────────────────────────────────────────────────────────────────────────── -class AIScoreObserver(bt.Observer): - """차트 하단에 AI 모델 점수와 임계값을 그립니다.""" - lines = ('score', 'limit_buy', 'limit_sell') - plotinfo = dict(plot=True, subplot=True, plotname='AI Probability') - plotlines = dict( - score=dict(marker='o', markersize=4.0, color='blue', _fill_gt=(0.5, 'red'), _fill_lt=(0.5, 'green')), - limit_buy=dict(color='red', linestyle='--', linewidth=1.0), - limit_sell=dict(color='green', linestyle='--', linewidth=1.0) - ) - - def next(self): - # Strategy에서 current_score 값을 가져옴 - score = getattr(self._owner, 'current_score', 0.5) - self.lines.score[0] = score - self.lines.limit_buy[0] = 0.65 # 매수 기준선 (Policy와 일치시킴) - self.lines.limit_sell[0] = 0.40 # 매도 기준선 - -# ───────────────────────────────────────────────────────────────────────────── -# 2. Walk-Forward Strategy -# ───────────────────────────────────────────────────────────────────────────── -class TransformerWalkForwardStrategy(bt.Strategy): - params = ( - ('model_weights_path', None), - ('raw_df', None), - ('features', None), - ('loader', None), - ('seq_len', 60), - ('model_name', "transformer"), - ) - - def log(self, txt, dt=None): - dt = dt or self.datas[0].datetime.date(0) - print(f'[{dt.isoformat()}] {txt}') - - def __init__(self): - self.model = self._load_model() - self.order = None - self.current_score = 0.5 - - def _load_model(self): - path = self.p.model_weights_path - if not path or not os.path.exists(path): - self.log("⚠️ 모델 가중치 파일 없음. (Score=0.5 고정)") - return None - - # 모델 Config (학습시 설정과 동일해야 함) - default_config = { - "head_size": 256, "num_heads": 4, "ff_dim": 4, - "num_blocks": 4, "mlp_units": [128], "dropout": 0.1 - } - try: - model = get_model(self.p.model_name, default_config) - # 입력 형태 빌드 (None, 60, features) - model.build((None, self.p.seq_len, len(self.p.features))) - - # Wrapper 대응 - if hasattr(model, 'model'): - model.model.load_weights(path) - else: - model.load_weights(path) - - return model - except Exception as e: - self.log(f"⚠️ 모델 로드 에러: {e}") - return None - - def notify_order(self, order): - if order.status in [order.Completed]: - if order.isbuy(): - self.log(f"🔵 BUY 체결 | 가격: {order.executed.price:,.0f}") - elif order.issell(): - self.log(f"🔴 SELL 체결 | 가격: {order.executed.price:,.0f} | 수익: {order.executed.pnl:,.0f}") - self.order = None - - def next(self): - if len(self) < self.p.seq_len: - return - - # Backtrader의 현재 날짜 가져오기 - current_date = self.datas[0].datetime.datetime(0) - - # [Walk-Forward Logic] - # 미래 데이터(Look-ahead Bias) 방지를 위해 현재 시점까지의 데이터만 슬라이싱 - past_data = self.p.raw_df.loc[:current_date] - if len(past_data) < self.p.seq_len: - return - - # 1. Scaler Fit (과거 데이터로만 학습) - self.p.loader.scaler.fit(past_data[self.p.features]) - - # 2. Transform (최근 60일 데이터 변환) - recent_data = past_data.iloc[-self.p.seq_len:] - input_seq_scaled = self.p.loader.scaler.transform(recent_data[self.p.features]) - input_seq = np.expand_dims(input_seq_scaled, axis=0) - - # 3. Predict - if self.model: - # verbose=0: 진행바 숨김 - pred = self.model.predict(input_seq, verbose=0) - score = float(pred[0][0]) - else: - score = 0.5 - - self.current_score = score # Observer로 전달 - - # 4. 매매 판단 (Rule-Based Policy 사용) - current_close = self.datas[0].close[0] - cash = self.broker.get_cash() - position_size = self.position.size - avg_price = self.position.price - total_value = self.broker.get_value() - - action, qty, reason = decide_order( - "Target", score, current_close, cash, - position_size, avg_price, total_value - ) - - if self.order: return - - if action == "BUY" and qty > 0: - self.log(f"BUY 신호 (점수: {score:.2f}) - {reason}") - self.order = self.buy(size=qty) - elif action == "SELL" and qty > 0: - self.log(f"SELL 신호 (점수: {score:.2f}) - {reason}") - self.order = self.sell(size=qty) - -# ───────────────────────────────────────────────────────────────────────────── -# 실행 함수 -# ───────────────────────────────────────────────────────────────────────────── -def run_single_backtest(ticker="AAPL", start_date="2024-01-01", end_date="2024-06-01", enable_plot=True): - - print(f"\n=== [{ticker}] 정밀 백테스트 시작 (Walk-Forward) ===") - - # 가중치 파일 경로 자동 설정 - weight_path = os.path.join(project_root, "AI/data/weights/transformer/universal_transformer.keras") - - # 1. 데이터 로드 - loader = SignalDataLoader(sequence_length=60) - df = loader.load_data(ticker, start_date, end_date) - - if df is None or len(df) < 100: - print("❌ 데이터 부족 또는 로드 실패") - return - - # Backtrader용 데이터 피드 생성 - if 'date' in df.columns: - df['date'] = pd.to_datetime(df['date']) - df.set_index('date', inplace=True) - - data_feed = bt.feeds.PandasData(dataname=df) - features = df.select_dtypes(include=[np.number]).columns.tolist() - - # 2. 엔진 설정 - cerebro = bt.Cerebro() - cerebro.adddata(data_feed) - - cerebro.addstrategy( - TransformerWalkForwardStrategy, - model_weights_path=weight_path, - raw_df=df, # 원본 DF 전달 (Walk-Forward용) - features=features, - loader=loader - ) - - if enable_plot: - cerebro.addobserver(AIScoreObserver) - - # 3. 자금 설정 - initial_cash = 10_000_000 - cerebro.broker.setcash(initial_cash) - cerebro.broker.setcommission(commission=0.0015) # 0.15% 수수료 - - # 4. 분석기 - cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe', riskfreerate=0.0) - cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown') - - # 5. 실행 - print(f"💰 초기 자산: {initial_cash:,.0f}원") - results = cerebro.run() - final_value = cerebro.broker.getvalue() - - # 6. 결과 출력 - profit_pct = (final_value - initial_cash) / initial_cash * 100 - print(f"💰 최종 자산: {final_value:,.0f}원") - print(f"📈 수익률: {profit_pct:.2f}%") - - strat = results[0] - mdd = strat.analyzers.drawdown.get_analysis()['max']['drawdown'] - sharpe = strat.analyzers.sharpe.get_analysis().get('sharperatio', 0.0) - - print(f"📉 MDD: {mdd:.2f}%") - print(f"📊 Sharpe: {sharpe:.4f}") - - if enable_plot: - # 차트 출력 (브라우저 또는 팝업) - cerebro.plot(style='candlestick', volume=False, iplot=False) - -if __name__ == "__main__": - run_single_backtest() \ No newline at end of file diff --git a/AI/modules/trader/backtest/run_portfolio.py b/AI/modules/trader/backtest/run_portfolio.py new file mode 100644 index 00000000..7e43c58f --- /dev/null +++ b/AI/modules/trader/backtest/run_portfolio.py @@ -0,0 +1,230 @@ +# AI/modules/trader/backtest/run_portfolio.py +""" +[포트폴리오 통합 백테스트 엔진 (Lazy Loading 적용)] +- 더미 데이터 없이, 실제 데이터가 들어오는 시점에 모델을 동적으로 로드합니다. +- 기술적 지표(RSI, MACD 등)를 자동 반영하여 모델 입력 차원을 맞춥니다. +""" + +import sys +import os +import backtrader as bt +import pandas as pd +import numpy as np +from datetime import datetime + +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../../../..")) +if project_root not in sys.path: + sys.path.append(project_root) + +from AI.modules.signal.core.data_loader import SignalDataLoader +from AI.modules.signal.models import get_model +from AI.modules.signal.core.features import add_technical_indicators +from AI.modules.trader.strategies.portfolio_logic import calculate_portfolio_allocation + +class AIPortfolioStrategy(bt.Strategy): + params = ( + ('model_weights_path', None), + ('strategy_config', {'seq_len': 60, 'top_k': 3, 'buy_threshold': 0.6}), + ('rebalance_days', 1), + ) + + def log(self, txt, dt=None): + dt = dt or self.datas[0].datetime.date(0) + print(f'[{dt.isoformat()}] {txt}') + + def __init__(self): + # ★ 개선: 여기서 모델을 로드하지 않습니다. (Lazy Loading) + # 실제 데이터가 들어와서 피처 개수가 확정될 때까지 기다립니다. + self.model = None + self.feature_columns = None + self.daily_value = [] + + def _initialize_model(self, input_dim): + """실제 데이터의 피처 개수(input_dim)를 확인한 후 모델을 로드합니다.""" + print(f"🔄 모델 초기화 중... (감지된 입력 차원: {input_dim})") + path = self.p.model_weights_path + config = self.p.strategy_config + + model_config = { + "head_size": 256, "num_heads": 4, "ff_dim": 4, + "num_blocks": 4, "mlp_units": [128], "dropout": 0.25 + } + + try: + model = get_model("transformer", model_config) + dummy_input = (None, config['seq_len'], input_dim) + model.build(dummy_input) + + if path and os.path.exists(path): + if hasattr(model, 'model') and hasattr(model.model, 'load_weights'): + model.model.load_weights(path) + else: + model.load_weights(path) + print(f"✅ 모델 로드 성공: {path}") + else: + print("⚠️ 모델 가중치 파일 없음. (랜덤 예측 모드)") + return model + except Exception as e: + print(f"❌ 모델 로드 실패: {e}") + return None + + def next(self): + if len(self) % self.p.rebalance_days != 0: + return + + current_date = self.datas[0].datetime.date(0) + lookback = self.p.strategy_config['seq_len'] + fetch_len = lookback + 50 + + # 1. 데이터 수집 및 전처리 + data_map = {} + valid_datas = [] + + for d in self.datas: + ticker = d._name + if len(d) < fetch_len: continue + + try: + o = d.open.get(ago=0, size=fetch_len) + h = d.high.get(ago=0, size=fetch_len) + l = d.low.get(ago=0, size=fetch_len) + c = d.close.get(ago=0, size=fetch_len) + v = d.volume.get(ago=0, size=fetch_len) + + if len(o) < fetch_len: continue + + df = pd.DataFrame({'open': o, 'high': h, 'low': l, 'close': c, 'volume': v}) + + # ★ 피처 엔지니어링 수행 + df = add_technical_indicators(df) + df = df.fillna(0) + + # ★ [핵심] 첫 실행 시점에 모델 초기화 + if self.model is None: + # 'date', 'ticker' 등 불필요한 컬럼 제외하고 숫자형 컬럼만 선택 + feature_cols = [col for col in df.columns if col not in ['date', 'ticker', 'target']] + self.feature_columns = feature_cols + # 여기서 실제 피처 개수를 세서 모델을 만듭니다. + self.model = self._initialize_model(len(feature_cols)) + + if len(df) >= lookback: + data_map[ticker] = df + valid_datas.append(d) + + except Exception: + continue + + if not data_map: return + + # 모델 로드 실패 시 중단 방지 + if self.model is None and self.feature_columns is None: + return + + # 2. 전략 코어 호출 + target_weights, scores = calculate_portfolio_allocation( + data_map=data_map, + model=self.model, + feature_columns=self.feature_columns, + config=self.p.strategy_config + ) + + # 3. 주문 실행 + current_value = self.broker.getvalue() + self.daily_value.append((current_date, current_value)) + + if current_value <= 0: return + + buy_orders = [] + + for d in valid_datas: + ticker = d._name + target_pct = target_weights.get(ticker, 0.0) + pos_value = self.getposition(d).size * d.close[0] + current_pct = pos_value / current_value + + if abs(target_pct - current_pct) > 0.01: + if target_pct < current_pct: + self.order_target_percent(data=d, target=target_pct) + else: + buy_orders.append((d, target_pct)) + + for d, target_pct in buy_orders: + self.order_target_percent(data=d, target=target_pct) + + def notify_order(self, order): + pass + +def run_backtest(): + print("\n=== 🚀 AI 포트폴리오 백테스트 시작 (Lazy Loading) ===") + + cerebro = bt.Cerebro() + start_date = "2023-01-01" + end_date = datetime.now().strftime("%Y-%m-%d") + target_tickers = ["AAPL", "TSLA", "MSFT", "NVDA", "AMD"] + + print(f"대상 종목: {target_tickers}") + + loader = SignalDataLoader(sequence_length=60) + loaded_count = 0 + + for ticker in target_tickers: + try: + df = loader.load_data(ticker, start_date=start_date, end_date=end_date) + if df is None or df.empty or len(df) < 100: + print(f"⚠️ {ticker}: 데이터 부족") + continue + + if 'date' in df.columns: + df['date'] = pd.to_datetime(df['date']) + df.set_index('date', inplace=True) + + data_feed = bt.feeds.PandasData(dataname=df, name=ticker, plot=False) + cerebro.adddata(data_feed) + loaded_count += 1 + except Exception as e: + print(f"❌ {ticker} 로드 에러: {e}") + + if loaded_count == 0: + print("❌ 실행할 데이터가 없습니다.") + return + + model_path = os.path.join(project_root, "AI/data/weights/transformer/universal_transformer.keras") + cerebro.addstrategy( + AIPortfolioStrategy, + model_weights_path=model_path, + strategy_config={"seq_len": 60, "top_k": 3, "buy_threshold": 0.6} + ) + + initial_cash = 100_000_000 + cerebro.broker.setcash(initial_cash) + cerebro.broker.setcommission(commission=0.0015) + + cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe', riskfreerate=0.02, timeframe=bt.TimeFrame.Days) + cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown') + + print(f"초기 자산: {initial_cash:,.0f}원") + print("⏳ 시뮬레이션 진행 중...") + + results = cerebro.run() + strat = results[0] + + final_value = cerebro.broker.getvalue() + profit = final_value - initial_cash + profit_rate = (profit / initial_cash) * 100 + + sharpe_dict = strat.analyzers.sharpe.get_analysis() + sharpe_val = sharpe_dict.get('sharperatio') + sharpe = sharpe_val if sharpe_val is not None else 0.0 + + dd_dict = strat.analyzers.drawdown.get_analysis() + mdd = dd_dict['max']['drawdown'] if dd_dict else 0.0 + + print("\n=== 📊 백테스트 최종 결과 ===") + print(f"최종 자산: {final_value:,.0f}원") + print(f"총 수익금: {profit:+,.0f}원 ({profit_rate:+.2f}%)") + print(f"Sharpe Ratio: {sharpe:.4f}") + print(f"Max Drawdown (MDD): {mdd:.2f}%") + +if __name__ == "__main__": + run_backtest() \ No newline at end of file diff --git a/AI/modules/trader/backtest/run_portfolio.py.py b/AI/modules/trader/backtest/run_portfolio.py.py deleted file mode 100644 index 9c41aed8..00000000 --- a/AI/modules/trader/backtest/run_portfolio.py.py +++ /dev/null @@ -1,287 +0,0 @@ -# AI/modules/trader/backtest/run_portfolio.py -""" -[포트폴리오 통합 백테스트 엔진 (BugFixed)] -- 모델 로드 시 wrapper 객체 구조 대신 model.load() 사용 -- Scikit-Learn UserWarning 해결 (DataFrame 형태로 transform 수행) -- 실시간 데이터 스케일링 적용 -""" - -import sys -import os -import backtrader as bt -import pandas as pd -import numpy as np -from datetime import datetime - -# 프로젝트 루트 경로 추가 -current_dir = os.path.dirname(os.path.abspath(__file__)) -project_root = os.path.abspath(os.path.join(current_dir, "../../../..")) -if project_root not in sys.path: - sys.path.append(project_root) - -from AI.modules.signal.core.data_loader import SignalDataLoader -from AI.modules.signal.models import get_model -from AI.modules.trader.strategies.portfolio_logic import calculate_portfolio_allocation - -class AIPortfolioStrategy(bt.Strategy): - params = ( - ('model_path', None), # .keras 모델 파일 경로 - ('strategy_config', {'seq_len': 60, 'top_k': 3, 'buy_threshold': 0.6}), - ('rebalance_days', 1), - ('raw_data_map', {}), # 원본 데이터프레임 (보조지표 포함) 전달용 - ('scaler', None), # 학습 시 사용한 스케일러 - ('feature_columns', []), # 학습에 사용된 컬럼 순서 리스트 - ) - - def log(self, txt, dt=None): - dt = dt or self.datas[0].datetime.date(0) - print(f'[{dt.isoformat()}] {txt}') - - def __init__(self): - self.model = self._load_model_safe() - self.daily_value = [] - self.order_list = [] # 미체결 주문 관리 - - def _load_model_safe(self): - """AI 모델 로드 (Wrapper의 load 메서드 활용)""" - path = self.p.model_path - - # 껍데기 모델 생성 (Config는 로드 시 덮어써지므로 빈 딕셔너리 전달) - model = get_model("transformer", {}) - - try: - if path and os.path.exists(path): - # ★ 핵심 수정: 가중치만 로드하는 것이 아니라 모델 전체 구조를 로드 - model.load(path) - print(f"✅ 모델 로드 성공: {path}") - return model - else: - print("⚠️ 모델 파일 없음. 랜덤 예측 모드로 동작합니다.") - return None - except Exception as e: - print(f"❌ 모델 로드 중 치명적 오류: {e}") - return None - - def next(self): - # 리밸런싱 주기 체크 - if len(self) % self.p.rebalance_days != 0: - return - - current_date = self.datas[0].datetime.date(0) - ts_date = pd.Timestamp(current_date) - - seq_len = self.p.strategy_config['seq_len'] - - # 1. 모델 입력 데이터 준비 (스케일링 및 시퀀싱) - current_data_map = {} - valid_tickers = [] - - for d in self.datas: - ticker = d._name - full_df = self.p.raw_data_map.get(ticker) - - if full_df is None: - continue - - try: - # 현재 날짜 포함 이전 데이터 가져오기 (Lookahead bias 방지) - # loc[:ts_date]는 해당 날짜까지의 데이터를 포함함 - subset = full_df.loc[:ts_date] - - if len(subset) < seq_len: - continue - - # 최근 seq_len 만큼 추출 - recent_df = subset.iloc[-seq_len:] - - # ★ [수정] .values를 제거하여 DataFrame 형태 유지 (Warning 해결) - # DataFrame을 그대로 전달해야 Scikit-Learn이 컬럼 이름을 확인하고 경고를 띄우지 않음 - if self.p.feature_columns: - features_input = recent_df[self.p.feature_columns] - else: - features_input = recent_df.select_dtypes(include=[np.number]) - - # 스케일링 수행 - if self.p.scaler: - scaled_array = self.p.scaler.transform(features_input) - else: - scaled_array = features_input.values # 스케일러 없으면 값만 사용 - - # DataFrame 형태로 다시 포장 (전략 로직 전달용, 인덱스 유지) - df_prepared = pd.DataFrame( - scaled_array, - index=recent_df.index, - columns=self.p.feature_columns if self.p.feature_columns else None - ) - - current_data_map[ticker] = df_prepared - valid_tickers.append(d) - - except KeyError: - # 해당 날짜 데이터가 없는 종목 (휴장 등) - continue - - if not current_data_map: - return - - # 2. 포트폴리오 비중 계산 (외부 로직) - target_weights, scores = calculate_portfolio_allocation( - data_map=current_data_map, # 스케일링된 데이터프레임들 - model=self.model, - feature_columns=self.p.feature_columns, - config=self.p.strategy_config - ) - - # 3. 주문 실행 - current_val = self.broker.getvalue() - self.daily_value.append((current_date, current_val)) - - if current_val <= 0: return - - # (1) 매도 처리 (현금 확보) - for d in valid_tickers: - ticker = d._name - target_pct = target_weights.get(ticker, 0.0) - - # 현재 포지션 비중 계산 - pos = self.getposition(d) - pos_value = pos.size * d.close[0] - curr_pct = pos_value / current_val if current_val > 0 else 0 - - # 비중 축소 (매도) - 오차 1% 이상일 때만 실행 - if target_pct < curr_pct and abs(target_pct - curr_pct) > 0.01: - self.order_target_percent(d, target=target_pct) - - # (2) 매수 처리 - for d in valid_tickers: - ticker = d._name - target_pct = target_weights.get(ticker, 0.0) - - pos = self.getposition(d) - pos_value = pos.size * d.close[0] - curr_pct = pos_value / current_val if current_val > 0 else 0 - - # 비중 확대 (매수) - if target_pct > curr_pct and abs(target_pct - curr_pct) > 0.01: - self.order_target_percent(d, target=target_pct) - - def notify_order(self, order): - if order.status in [order.Completed]: - # 너무 많은 로그 방지를 위해 주석 처리하거나 필요시 해제 - # type_str = 'BUY' if order.isbuy() else 'SELL' - # self.log(f'{type_str} 체결: {order.data._name} @ {order.executed.price:.2f}') - pass - -def run_backtest(): - print("\n=== 🚀 AI 포트폴리오 백테스트 시작 (Fixed) ===") - - cerebro = bt.Cerebro() - - # 설정 - start_date = "2023-01-01" - end_date = datetime.now().strftime("%Y-%m-%d") - target_tickers = ["AAPL", "TSLA", "MSFT", "NVDA", "AMD"] # 테스트 대상 종목 - - # 1. 데이터 로드 및 전역 저장소 준비 - loader = SignalDataLoader(sequence_length=60) - raw_data_map = {} - all_data_list = [] # 스케일러 학습용 데이터 리스트 - - print("1. 데이터 로드 및 전처리 중...") - - for ticker in target_tickers: - try: - df = loader.load_data(ticker, start_date, end_date) - - if df is not None and not df.empty and len(df) > 100: - # 날짜 인덱스 보장 - if 'date' in df.columns: - df['date'] = pd.to_datetime(df['date']) - df.set_index('date', inplace=True) - - # 원본 데이터 저장 (보조지표 포함) - raw_data_map[ticker] = df - all_data_list.append(df) - - # Backtrader용 데이터 피드 추가 (OHLCV만 사용됨) - data_feed = bt.feeds.PandasData(dataname=df, name=ticker, plot=False) - cerebro.adddata(data_feed) - else: - print(f"⚠️ {ticker}: 데이터 부족 또는 로드 실패") - - except Exception as e: - print(f"❌ {ticker} 로드 중 에러: {e}") - - if not all_data_list: - print("❌ 실행할 데이터가 없습니다.") - return - - # 2. 스케일러 설정 및 피처 컬럼 확정 - # 모든 종목의 데이터를 합쳐서 스케일러를 학습(Fit)시킵니다. - # 주의: 실제 운영 시에는 Training Data로만 Fit 해야 Data Leakage가 없습니다. - # 여기서는 백테스트 편의상 전체 기간 데이터로 Fit 합니다. - sample_df = all_data_list[0] - - # 숫자형 컬럼만 피처로 사용 - feature_columns = sample_df.select_dtypes(include=[np.number]).columns.tolist() - - print(f" - Feature Columns ({len(feature_columns)}개): {feature_columns[:5]} ...") - - # 통합 Fit - combined_df = pd.concat(all_data_list) - # DataFrame 형태로 fit하여 컬럼 이름 저장 - loader.scaler.fit(combined_df[feature_columns]) - print(" - Scaler 학습 완료") - - # 3. 전략 실행 설정 - # 모델 파일 경로 (실제 파일이 있는 경로로 수정 필요) - model_path = os.path.join(project_root, "AI/data/weights/transformer/universal_transformer.keras") - - cerebro.addstrategy( - AIPortfolioStrategy, - model_path=model_path, - strategy_config={"seq_len": 60, "top_k": 3}, - # ★ 핵심: 전략에 원본 데이터와 스케일러, 피처 정보를 주입 - raw_data_map=raw_data_map, - scaler=loader.scaler, - feature_columns=feature_columns, - rebalance_days=1 - ) - - initial_cash = 100_000_000 - cerebro.broker.setcash(initial_cash) - cerebro.broker.setcommission(commission=0.0015) # 수수료 0.15% - - # 분석기 추가 - cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe', riskfreerate=0.02, timeframe=bt.TimeFrame.Days) - cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown') - cerebro.addanalyzer(bt.analyzers.Returns, _name='returns') - - print(f"초기 자산: {initial_cash:,.0f}원") - print("⏳ 시뮬레이션 진행 중...") - - results = cerebro.run() - strat = results[0] - - # 결과 출력 - final_value = cerebro.broker.getvalue() - profit = final_value - initial_cash - profit_rate = (profit / initial_cash) * 100 - - # 샤프지수 안전하게 가져오기 - sharpe_res = strat.analyzers.sharpe.get_analysis() - sharpe = sharpe_res.get('sharperatio', 0.0) - if sharpe is None: sharpe = 0.0 - - # MDD 안전하게 가져오기 - dd_res = strat.analyzers.drawdown.get_analysis() - mdd = dd_res['max']['drawdown'] if dd_res else 0.0 - - print("\n=== 📊 백테스트 최종 결과 ===") - print(f"최종 자산: {final_value:,.0f}원") - print(f"총 수익금: {profit:+,.0f}원 ({profit_rate:+.2f}%)") - print(f"Sharpe Ratio: {sharpe:.4f}") - print(f"Max Drawdown (MDD): {mdd:.2f}%") - -if __name__ == "__main__": - run_backtest() \ No newline at end of file diff --git a/AI/modules/trader/core/__init__.py b/AI/modules/trader/core/__init__.py new file mode 100644 index 00000000..cf2faffe --- /dev/null +++ b/AI/modules/trader/core/__init__.py @@ -0,0 +1,9 @@ +""" +[Trader Core Package] +- 트레이딩 시뮬레이션의 핵심 엔진과 계좌 관리 클래스를 노출합니다. +""" + +from .account import TradingAccount +from .simulator import Simulator + +__all__ = ['TradingAccount', 'Simulator'] \ No newline at end of file diff --git a/AI/modules/trader/core/simulator.py b/AI/modules/trader/core/simulator.py index 57830645..2c490730 100644 --- a/AI/modules/trader/core/simulator.py +++ b/AI/modules/trader/core/simulator.py @@ -10,21 +10,27 @@ from .account import TradingAccount class Simulator: - def __init__(self, ticker: str, data: pd.DataFrame): + def __init__(self, ticker: str, data: pd.DataFrame, initial_balance: float = 10_000_000): self.ticker = ticker self.data = data self.current_idx = 0 self.max_idx = len(data) - 1 + # 초기 자금 저장 (Reset 시 재사용) + self.initial_balance = initial_balance + # 계좌 생성 - self.account = TradingAccount() + self.account = TradingAccount(initial_balance=initial_balance) # 로그 self.history = [] def reset(self): self.current_idx = 0 - self.account = TradingAccount() + + # ★ [수정] 저장해둔 초기 자금으로 리셋 + self.account = TradingAccount(initial_balance=self.initial_balance) + self.history = [] return self._get_state() @@ -72,7 +78,19 @@ def step(self, action: dict): done = self.current_idx >= self.max_idx # 보상(Reward) 계산 (강화학습용): 당일 수익률 - prev_asset = self.history[-2]['asset'] if len(self.history) > 1 else self.account.initial_balance - reward = (total_asset - prev_asset) / prev_asset + # history가 비어있거나 막 시작했을 땐 초기 자금과 비교 + if len(self.history) > 1: + prev_asset = self.history[-2]['asset'] + else: + prev_asset = self.account.initial_balance + + # 0으로 나누기 방지 + if prev_asset == 0: + reward = 0 + else: + reward = (total_asset - prev_asset) / prev_asset + + # 다음 상태 반환 (종료 시 None) + next_state = self._get_state() if not done else None - return self._get_state() if not done else None, reward, done \ No newline at end of file + return next_state, reward, done \ No newline at end of file diff --git a/AI/modules/trader/strategies/__init__.py b/AI/modules/trader/strategies/__init__.py new file mode 100644 index 00000000..346fb51e --- /dev/null +++ b/AI/modules/trader/strategies/__init__.py @@ -0,0 +1,9 @@ +""" +[Trading Strategies Package] +- 다양한 투자 전략(Rule-based, Portfolio Allocation, RL Agent 등)을 모아둡니다. +""" + +from .rule_based import RuleBasedStrategy +from .portfolio_logic import calculate_portfolio_allocation + +__all__ = ['RuleBasedStrategy', 'calculate_portfolio_allocation'] \ No newline at end of file diff --git a/AI/modules/trader/strategies/rl_agent.py b/AI/modules/trader/strategies/rl_agent.py index e69de29b..9d06788e 100644 --- a/AI/modules/trader/strategies/rl_agent.py +++ b/AI/modules/trader/strategies/rl_agent.py @@ -0,0 +1,46 @@ +# AI/modules/trader/strategies/rl_agent.py +""" +[RL 기반 전략] +- 학습된 PPO 모델 파일을 로드하여 행동을 결정합니다. +- 백테스트(run_portfolio.py)나 실전 매매에서 이 클래스를 호출하여 사용합니다. +""" + +import os +import numpy as np +from stable_baselines3 import PPO + +class RLAgentStrategy: + def __init__(self, model_path=None): + if model_path is None: + # 기본 경로 설정 + current_dir = os.path.dirname(os.path.abspath(__file__)) + project_root = os.path.abspath(os.path.join(current_dir, "../../../..")) + model_path = os.path.join(project_root, "AI/data/weights/rl_agent_ppo.zip") + + if os.path.exists(model_path): + self.model = PPO.load(model_path) + print(f"🤖 RL 에이전트 로드 성공: {model_path}") + else: + print(f"⚠️ RL 모델 파일 없음: {model_path} (랜덤 모드로 동작)") + self.model = None + + def get_action(self, obs_vector: np.ndarray) -> dict: + """ + Args: + obs_vector (np.array): [현금비율, 수익률, RSI, MACD, 변동성, 거래량] + Returns: + dict: {'type': 'BUY'/'SELL', 'amount': float} + """ + if self.model: + action, _ = self.model.predict(obs_vector, deterministic=True) + val = float(action[0]) + else: + val = np.random.uniform(-1, 1) # 모델 없으면 랜덤 + + # 행동 해석 (rl_env.py의 로직과 동일해야 함) + if val > 0.05: + return {'type': 'BUY', 'amount': val} + elif val < -0.05: + return {'type': 'SELL', 'amount': abs(val)} + else: + return {'type': 'HOLD', 'amount': 0} \ No newline at end of file diff --git a/AI/modules/trader/strategies/rule_based.py b/AI/modules/trader/strategies/rule_based.py index 562e641f..ad18e6a4 100644 --- a/AI/modules/trader/strategies/rule_based.py +++ b/AI/modules/trader/strategies/rule_based.py @@ -1,97 +1,31 @@ -# AI/modules/trader/policy.py +# AI/modules/trader/strategies/rule_based.py """ -[주문 정책 모듈] -- AI 모델의 예측 점수(Score)와 현재 포지션 상태를 기반으로 매수/매도 여부와 수량을 결정합니다. -- 자금 관리(Money Management)와 리스크 관리(Risk Management) 로직이 포함됩니다. +[룰 기반 전략] +- AI Score를 입력받아 매수/매도/관망을 결정하는 고전적인 전략 로직입니다. """ -from typing import Tuple +class RuleBasedStrategy: + def __init__(self, buy_threshold=0.65, sell_threshold=0.40): + self.buy_threshold = buy_threshold + self.sell_threshold = sell_threshold -def decide_order( - ticker: str, - score: float, - current_price: float, - cash: float, - position_qty: int, - avg_price: float, - total_asset: float -) -> Tuple[str, int, str]: - """ - 매매 의사결정 함수 - - Args: - ticker (str): 종목 코드 - score (float): AI 모델 예측 점수 (0.0 ~ 1.0, 높을수록 상승 확률 높음) - current_price (float): 현재가 - cash (float): 현재 보유 현금 - position_qty (int): 현재 보유 수량 - avg_price (float): 평균 매입 단가 - total_asset (float): 총 자산 가치 + def get_action(self, score: float, position_qty: float) -> dict: + """ + AI 점수(score)를 보고 행동을 결정합니다. + Return: {'type': str, 'amount': float} + """ - Returns: - Tuple[str, int, str]: (주문종류 'BUY'/'SELL'/'HOLD', 수량, 로그메시지) - """ - - # 1. 정책 설정 (상수) - BUY_THRESHOLD = 0.65 # 매수 기준 점수 - SELL_THRESHOLD = 0.40 # 매도 기준 점수 - STOP_LOSS_PCT = 0.05 # 손절매 기준 (-5%) - TAKE_PROFIT_PCT = 0.10 # 익절매 기준 (+10%) - MAX_INVEST_RATIO = 0.95 # 최대 투자 비중 (현금 5% 남김) - - action = "HOLD" - qty = 0 - reason = "" - - # 2. 리스크 관리 (손절/익절 우선 체크) - if position_qty > 0: - if avg_price <= 0: - # 평단가가 없으면 리스크 관리 스킵 - reason = "평단가 정보 없음 (리스크 관리 불가)" - return "HOLD", 0, reason - else: - pnl_rate = (current_price - avg_price) / avg_price - - if pnl_rate <= -STOP_LOSS_PCT: - action = "SELL" - qty = position_qty - reason = f"손절매 발동 (수익률 {pnl_rate*100:.2f}%)" - return action, qty, reason - - elif pnl_rate >= TAKE_PROFIT_PCT: - action = "SELL" - qty = position_qty - reason = f"익절매 발동 (수익률 {pnl_rate*100:.2f}%)" - return action, qty, reason - - - # 3. AI 점수 기반 매매 판단 - # (1) 매수 조건 - if score >= BUY_THRESHOLD: - if position_qty == 0: # 포지션 없을 때만 진입 (단타 전략 예시) - # 가용 현금의 95%까지 매수 - invest_amount = cash * MAX_INVEST_RATIO - buy_qty = int(invest_amount // current_price) - - if buy_qty > 0: - action = "BUY" - qty = buy_qty - reason = f"강력 매수 신호 (점수: {score:.4f})" - else: - reason = "매수 신호 발생했으나 현금 부족" - else: - reason = "이미 포지션 보유 중 (추가 매수 없음)" - - # (2) 매도 조건 - elif score <= SELL_THRESHOLD: - if position_qty > 0: - action = "SELL" - qty = position_qty - reason = f"매도 신호 (점수: {score:.4f})" - else: - reason = "매도 신호 발생했으나 보유 물량 없음" - - else: - reason = f"관망 (점수: {score:.4f})" - - return action, qty, reason \ No newline at end of file + # 1. 매수 조건 + if score >= self.buy_threshold: + if position_qty == 0: + # 점수가 높으면 풀매수 (현금 99%) + return {'type': 'BUY', 'amount': 0.99} + + # 2. 매도 조건 + elif score <= self.sell_threshold: + if position_qty > 0: + # 점수가 낮으면 전량 매도 + return {'type': 'SELL', 'amount': 1.0} + + # 3. 관망 + return {'type': 'HOLD', 'amount': 0} \ No newline at end of file diff --git a/AI/modules/trader/train/rl_env.py b/AI/modules/trader/train/rl_env.py index e69de29b..4b044e44 100644 --- a/AI/modules/trader/train/rl_env.py +++ b/AI/modules/trader/train/rl_env.py @@ -0,0 +1,105 @@ +# AI/modules/trader/train/rl_env.py +""" +[강화학습 환경] +- core.simulator를 OpenAI Gym 환경으로 래핑합니다. +- PPO 알고리즘이 학습할 수 있도록 상태(State)와 보상(Reward)을 제공합니다. +""" + +import gymnasium as gym +from gymnasium import spaces +import numpy as np +import pandas as pd + +from AI.modules.trader.core.simulator import Simulator +from AI.modules.signal.core.data_loader import SignalDataLoader + +class StockTradingEnv(gym.Env): + metadata = {'render_modes': ['human']} + + def __init__(self, ticker: str, start_date: str, end_date: str, initial_balance=10_000_000): + super(StockTradingEnv, self).__init__() + + # 1. 데이터 준비 + self.loader = SignalDataLoader() + # 학습 속도를 위해 전처리된 데이터를 메모리에 로드해둡니다. + self.df = self.loader.load_data(ticker, start_date, end_date) + if self.df is None or len(self.df) < 100: + raise ValueError(f"데이터 부족: {ticker}") + + # Simulator 인스턴스 생성 (우리가 만든 엔진 사용) + self.simulator = Simulator(ticker, self.df, initial_balance=initial_balance) + + # 2. Action Space (행동 정의) + # Continuous: -1.0(전량매도) ~ +1.0(전량매수) + self.action_space = spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float32) + + # 3. Observation Space (관측 정의) + # [현금비율, 수익률, RSI, MACD, 변동성, 거래량변화] (6개 특징) + self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(6,), dtype=np.float32) + + def reset(self, seed=None, options=None): + super().reset(seed=seed) + + # 시뮬레이터 초기화 + first_row = self.simulator.reset() + return self._get_observation(first_row), {} + + def step(self, action): + # AI의 행동(float) -> 시뮬레이터 행동(dict) 변환 + action_val = float(action[0]) + + sim_action = {'type': 'HOLD', 'amount': 0} + + # 임계값(0.05)을 두어 불필요한 매매 방지 + if action_val > 0.05: + sim_action = {'type': 'BUY', 'amount': action_val} + elif action_val < -0.05: + sim_action = {'type': 'SELL', 'amount': abs(action_val)} + + # 시뮬레이터 진행 (핵심!) + next_row, reward, done = self.simulator.step(sim_action) + + # 상태 관측 생성 + obs = self._get_observation(next_row) if not done else np.zeros(6, dtype=np.float32) + + # 추가 정보 + info = { + 'asset': self.simulator.account.get_total_asset({self.simulator.ticker: next_row['close']}) if not done else 0 + } + + return obs, reward, done, False, info + + def _get_observation(self, row): + """현재 시점의 시장 데이터 + 내 계좌 상태를 벡터로 변환""" + if row is None: + return np.zeros(6, dtype=np.float32) + + # 1. 계좌 상태 + acc = self.simulator.account + current_price = row['close'] + total_asset = acc.get_total_asset({self.simulator.ticker: current_price}) + cash_ratio = acc.cash / total_asset if total_asset > 0 else 0 + + profit_rate = 0.0 + if self.simulator.ticker in acc.positions: + avg_price = acc.positions[self.simulator.ticker]['avg_price'] + profit_rate = (current_price - avg_price) / avg_price + + # 2. 시장 데이터 (SignalDataLoader가 만든 지표들 사용) + # 만약 데이터프레임에 해당 컬럼이 없으면 0으로 처리 + rsi = row.get('rsi', 50) / 100.0 + macd = row.get('macd', 0) + volatility = row.get('volatility', 0) # 예: ATR 등 + vol_change = row.get('volume_change', 0) + + # 정규화하여 AI에게 전달 + obs = np.array([ + cash_ratio, # 0~1 + profit_rate, # -inf ~ inf + rsi, # 0~1 + macd, # -inf ~ inf + volatility, # 0 ~ inf + vol_change # -inf ~ inf + ], dtype=np.float32) + + return obs diff --git a/AI/modules/trader/train/train_ppo.py b/AI/modules/trader/train/train_ppo.py index e69de29b..f8dd02d8 100644 --- a/AI/modules/trader/train/train_ppo.py +++ b/AI/modules/trader/train/train_ppo.py @@ -0,0 +1,58 @@ +# AI/modules/trader/train/train_ppo.py +""" +[PPO 학습 실행기] +- rl_env.py 환경을 불러와 AI를 훈련시킵니다. +- 학습된 모델(.zip)을 'AI/data/weights/rl_agent_ppo' 경로에 저장합니다. +""" + +import os +import sys +from stable_baselines3 import PPO +from stable_baselines3.common.vec_env import DummyVecEnv + +# 프로젝트 루트 경로 추가 +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../../../..")) +if project_root not in sys.path: + sys.path.append(project_root) + +from AI.modules.trader.train.rl_env import StockTradingEnv + +def train_agent(): + print("🚀 [RL] PPO 트레이딩 에이전트 학습 시작") + + ticker = "AAPL" # 학습할 종목 + start_date = "2020-01-01" + end_date = "2023-12-31" + + # 1. 환경 생성 (벡터화된 환경) + env = DummyVecEnv([lambda: StockTradingEnv(ticker, start_date, end_date)]) + + # 2. 모델 설정 (MlpPolicy: 수치 데이터용 신경망) + model = PPO( + "MlpPolicy", + env, + verbose=1, + learning_rate=0.0003, + batch_size=64, + gamma=0.99, # 미래 보상 할인율 + ent_coef=0.01 # 탐험 장려 + ) + + # 3. 학습 (Timesteps: 학습 횟수) + total_timesteps = 50_000 + print(f" - 학습 기간: {start_date} ~ {end_date}") + print(f" - 총 스텝: {total_timesteps}") + + model.learn(total_timesteps=total_timesteps) + + # 4. 저장 + save_dir = os.path.join(project_root, "AI", "data", "weights") + os.makedirs(save_dir, exist_ok=True) + save_path = os.path.join(save_dir, "rl_agent_ppo") + + model.save(save_path) + print(f"✅ 모델 저장 완료: {save_path}.zip") + +if __name__ == "__main__": + train_agent() \ No newline at end of file diff --git a/AI/requirements.txt b/AI/requirements.txt index 8d53caa6..4dc5d820 100644 --- a/AI/requirements.txt +++ b/AI/requirements.txt @@ -14,4 +14,7 @@ beautifulsoup4 pathlib fredapi matplotlib -python-dotenv \ No newline at end of file +python-dotenv +stable-baselines3 +shimmy +gymnasium \ No newline at end of file