Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 23 additions & 22 deletions AI/libs/database/fetcher.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# libs/database/fetcher.py
# AI/libs/database/fetcher.py
from __future__ import annotations
from typing import Optional
import pandas as pd
Expand All @@ -15,25 +15,22 @@ def fetch_ohlcv(
db_name: str = "db",
) -> pd.DataFrame:
"""
특정 티커, 날짜 범위의 OHLCV 데이터를 DB에서 불러오기 (SQLAlchemy 엔진 사용)
특정 티커, 날짜 범위의 OHLCV 데이터를 DB에서 불러오기

Args:
ticker (str): 종목 코드 (예: "AAPL")
start (str): 시작일자 'YYYY-MM-DD' (inclusive)
end (str): 종료일자 'YYYY-MM-DD' (inclusive)
interval (str): 데이터 간격 ('1d' 등) - 현재 테이블이 일봉만 제공하면 무시됨
db_name (str): get_engine()가 참조할 설정 블록 이름 (예: "db", "report_DB")
start (str): 시작일자 'YYYY-MM-DD'
end (str): 종료일자 'YYYY-MM-DD'
interval (str): 데이터 간격 (현재 일봉만 지원)
db_name (str): DB 설정 이름

Returns:
pd.DataFrame: 컬럼 = [ticker, date, open, high, low, close, adjusted_close, volume]
(date 컬럼은 pandas datetime으로 변환됨)
pd.DataFrame: [ticker, date, open, high, low, close, adjusted_close, volume]
"""

# 1) SQLAlchemy engine 얻기 (configs/config.json 기준)
engine = get_engine(db_name)

# 2) 쿼리: named parameter(:ticker 등) 사용 -> 안전하고 가독성 좋음
# - interval 분기가 필요하면 테이블/파티션 구조에 따라 쿼리를 분기하도록 확장 가능
# adjusted_close가 중요하다면 쿼리 단계에서 확실히 가져옵니다.
query = text("""
SELECT ticker, date, open, high, low, close, adjusted_close, volume
FROM public.price_data
Expand All @@ -42,28 +39,32 @@ def fetch_ohlcv(
ORDER BY date;
""")

# 3) DB에서 읽기 (with 문으로 커넥션 자동 정리)
with engine.connect() as conn:
df = pd.read_sql(
query,
con=conn, # 꼭 키워드 인자로 con=conn
params={"ticker": ticker, "start": start, "end": end}, # 튜플 X, 딕셔너리 O
)
con=conn,
params={"ticker": ticker, "start": start, "end": end},
)

# 4) 후처리: 컬럼 정렬 및 date 타입 통일
# 빈 데이터 처리
if df is None or df.empty:
# 빈 DataFrame이면 일관된 컬럼 스키마로 반환
return pd.DataFrame(columns=["ticker", "date", "open", "high", "low", "close", "adjusted_close", "volume"])

# date 컬럼을 datetime으로 변경 (UTC로 맞추고 싶으면 pd.to_datetime(..., utc=True) 사용)
# 날짜 변환
if "date" in df.columns:
df["date"] = pd.to_datetime(df["date"])

# 선택: 컬럼 순서 고정 (일관성 유지)
# 데이터 보정 로직 추가
# 1. adjusted_close가 없는 경우(NaN) -> close 값으로 대체 (결측치 방지)
if "adjusted_close" in df.columns and "close" in df.columns:
df["adjusted_close"] = df["adjusted_close"].fillna(df["close"])
elif "adjusted_close" not in df.columns and "close" in df.columns:
# 컬럼 자체가 없으면 close를 복사해서 생성
df["adjusted_close"] = df["close"]

# 컬럼 순서 정리
desired_cols = ["ticker", "date", "open", "high", "low", "close", "adjusted_close", "volume"]
# 존재하는 컬럼만 가져오기
cols_present = [c for c in desired_cols if c in df.columns]
df = df.loc[:, cols_present]

return df

return df
51 changes: 30 additions & 21 deletions AI/modules/signal/core/features.py
Original file line number Diff line number Diff line change
@@ -1,69 +1,78 @@
# AI/modules/signal/core/features.py
"""
[피처 엔지니어링 모듈]
- OHLCV 데이터를 입력받아 학습에 필요한 기술적 지표(RSI, MACD, 볼린저밴드 등)를 추가합니다.
- 데이터 로더(DataLoader)에서 이 함수를 호출하여 전처리를 수행합니다.
[피처 엔지니어링 모듈 - Adjusted Close 통합 버전]
- 데이터에 'adjusted_close'가 있다면 이를 'close'로 덮어씌웁니다.
- 이렇게 하면 모든 지표(RSI, MACD 등)가 자연스럽게 '조정 종가' 기준으로 계산됩니다.
- 학습 시 'close'와 'adjusted_close'가 중복되는 문제도 해결됩니다.
"""

import pandas as pd
import numpy as np

def add_technical_indicators(df: pd.DataFrame) -> pd.DataFrame:
"""
데이터프레임에 기술적 지표 컬럼을 추가합니다.

Args:
df (pd.DataFrame): OHLCV 데이터 (필수 컬럼: 'close', 'high', 'low', 'volume')

Returns:
pd.DataFrame: 지표가 추가된 데이터프레임
1. 조정 종가(Adjusted Close)를 종가(Close)로 통합합니다.
2. 기술적 지표를 계산하여 추가합니다.
"""
if df.empty:
return df

df = df.copy()

# ★ [핵심 수정] 조정 종가 우선 정책
# adjusted_close가 있으면, 이를 close에 덮어쓰고 adjusted_close 컬럼은 삭제합니다.
if 'adjusted_close' in df.columns:
# 결측치 방지 (혹시 adjusted_close가 비어있으면 close 값 사용)
df['adjusted_close'] = df['adjusted_close'].fillna(df['close'])

# 덮어쓰기
df['close'] = df['adjusted_close']

# 중복 방지를 위해 삭제 (이제 close가 adjusted_close 역할을 함)
df.drop(columns=['adjusted_close'], inplace=True)

# --- 이하 모든 계산은 'close'(실제로는 조정 종가)를 기준으로 수행됨 ---

# 1. 이동평균선 (Simple Moving Average)
df['ma5'] = df['close'].rolling(window=5).mean()
df['ma20'] = df['close'].rolling(window=20).mean()
df['ma60'] = df['close'].rolling(window=60).mean()

# 2. RSI (Relative Strength Index)
# CodeRabbit 리뷰 반영: 엣지 케이스(횡보, 상승지속) 정밀 처리
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()

# division by zero 등 경고 억제
with np.errstate(divide='ignore', invalid='ignore'):
rs = gain / loss

# 기본 RSI 계산 (loss=0인 경우 rs=inf가 되며, 100/(1+inf)=0 이므로 RSI=100이 됨 -> 정상)
df['rsi'] = 100 - (100 / (1 + rs))

# [보정 1] 가격 변동이 아예 없는 경우 (Gain=0, Loss=0) -> NaN 발생 -> 50(중립)으로 설정
# RSI 보정
df.loc[(gain == 0) & (loss == 0), 'rsi'] = 50

# [보정 2] 하락 없이 상승만 한 경우 (Loss=0, Gain>0) -> 100(강세)으로 설정 (수식상 자동 처리되나 명시)
df.loc[(loss == 0) & (gain > 0), 'rsi'] = 100

# 3. 볼린저 밴드 (Bollinger Bands)
df['std20'] = df['close'].rolling(window=20).std()
df['upper_band'] = df['ma20'] + (df['std20'] * 2)
df['lower_band'] = df['ma20'] - (df['std20'] * 2)

# 4. MACD (Moving Average Convergence Divergence)
# 4. MACD
exp12 = df['close'].ewm(span=12, adjust=False).mean()
exp26 = df['close'].ewm(span=26, adjust=False).mean()
df['macd'] = exp12 - exp26
df['signal_line'] = df['macd'].ewm(span=9, adjust=False).mean()

# 5. 거래량 변화율
df['vol_change'] = df['volume'].pct_change()
if 'volume' in df.columns:
df['vol_change'] = df['volume'].pct_change()
df['vol_change'] = df['vol_change'].replace([np.inf, -np.inf], 0)
else:
df['vol_change'] = 0

# 6. 결측치 처리 (지표 계산 초반 구간)
# [수정] FutureWarning 해결: fillna(method='bfill') -> bfill()
# === [데이터 정제] ===
df.replace([np.inf, -np.inf], np.nan, inplace=True)
df = df.bfill()
df = df.fillna(0) # 앞부분 bfill로도 안 채워지는 경우 0 처리
df = df.fillna(0)

return df
10 changes: 10 additions & 0 deletions AI/modules/trader/backtest/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""
[Backtest Execution Package]
- 단일 종목 및 포트폴리오 단위의 백테스트 실행 함수들을 제공합니다.
"""

# 함수 이름이 겹치지 않게 alias(별칭)를 주어 명확히 구분합니다.
from .run_portfolio import run_backtest as run_portfolio_backtest
from .run_backtrader_single import run_single_backtest

__all__ = ['run_portfolio_backtest', 'run_single_backtest']
188 changes: 188 additions & 0 deletions AI/modules/trader/backtest/run_backtrader_single.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# AI/modules/trader/backtest/run_backtrader_single.py
"""
[Backtrader 기반 단일 종목 정밀 백테스트]
- Walk-Forward Validation 지원
- strategies/rule_based.py 의 RuleBasedStrategy 클래스 사용
- AI Score 시각화 기능 포함
"""

import sys
import os
import backtrader as bt
import pandas as pd
import numpy as np

current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(current_dir, "../../../.."))
if project_root not in sys.path:
sys.path.append(project_root)

from AI.modules.signal.core.data_loader import SignalDataLoader
from AI.modules.signal.models import get_model
# ★ [수정] 클래스 기반 전략 불러오기
from AI.modules.trader.strategies.rule_based import RuleBasedStrategy

class AIScoreObserver(bt.Observer):
"""차트 하단에 AI 모델 점수를 그리기 위한 클래스"""
lines = ('score', 'limit_buy', 'limit_sell')
plotinfo = dict(plot=True, subplot=True, plotname='AI Probability')
plotlines = dict(
score=dict(marker='o', markersize=3.0, color='blue', _fill_gt=(0.5, 'red'), _fill_lt=(0.5, 'green')),
limit_buy=dict(color='red', linestyle='--'),
limit_sell=dict(color='green', linestyle='--')
)

def next(self):
score = getattr(self._owner, 'current_score', 0.5)
self.lines.score[0] = score
self.lines.limit_buy[0] = 0.65
self.lines.limit_sell[0] = 0.40

class TransformerWalkForwardStrategy(bt.Strategy):
params = (
('model_weights_path', None),
('raw_df', None),
('features', None),
('loader', None),
('seq_len', 60),
('model_name', "transformer"),
)

def __init__(self):
self.model = self._load_model()
self.order = None
self.current_score = 0.5
# ★ [수정] 전략 객체 초기화
self.strategy_logic = RuleBasedStrategy(buy_threshold=0.65, sell_threshold=0.40)

def log(self, txt, dt=None):
dt = dt or self.datas[0].datetime.date(0)
print(f'[{dt.isoformat()}] {txt}')

def _load_model(self):
path = self.p.model_weights_path
if not path or not os.path.exists(path):
self.log("⚠️ 모델 가중치 파일 없음.")
return None

default_config = {
"head_size": 256, "num_heads": 4, "ff_dim": 4,
"num_blocks": 4, "mlp_units": [128], "dropout": 0.1
}
try:
model = get_model(self.p.model_name, default_config)
model.build((None, self.p.seq_len, len(self.p.features)))
if hasattr(model, 'model'):
model.model.load_weights(path)
else:
model.load_weights(path)
return model
except Exception as e:
self.log(f"⚠️ 모델 로드 에러: {e}")
return None

def notify_order(self, order):
if order.status in [order.Completed]:
if order.isbuy():
self.log(f"🔵 BUY 체결 @ {order.executed.price:,.0f}")
elif order.issell():
self.log(f"🔴 SELL 체결 @ {order.executed.price:,.0f}")
self.order = None

def next(self):
if len(self) < self.p.seq_len:
return

current_date = self.datas[0].datetime.datetime(0)
past_data = self.p.raw_df.loc[:current_date]
if len(past_data) < self.p.seq_len:
return

# 1. Walk-Forward Prediction
self.p.loader.scaler.fit(past_data[self.p.features])
recent_data = past_data.iloc[-self.p.seq_len:]
input_seq = np.expand_dims(self.p.loader.scaler.transform(recent_data[self.p.features]), axis=0)

if self.model:
pred = self.model.predict(input_seq, verbose=0)
score = float(pred[0][0])
else:
score = 0.5

self.current_score = score

# 2. 매매 판단 (RuleBasedStrategy 사용)
if self.order: return # 이미 주문 중이면 패스

position_qty = self.position.size
# ★ [수정] 클래스 메서드 호출로 변경 (코드가 훨씬 깔끔해짐)
decision = self.strategy_logic.get_action(score, position_qty)

if decision['type'] == 'BUY':
# 보유 현금의 95%만큼 매수 계산 (Backtrader 로직)
cash = self.broker.get_cash()
price = self.datas[0].close[0]
# 수수료 고려하여 안전하게 계산
size = int((cash * 0.95) / price)
if size > 0:
self.log(f"BUY 신호 (Score: {score:.2f})")
self.order = self.buy(size=size)

elif decision['type'] == 'SELL':
if position_qty > 0:
self.log(f"SELL 신호 (Score: {score:.2f})")
self.order = self.close() # 전량 청산

def run_single_backtest(ticker="AAPL", start_date="2024-01-01", end_date="2024-06-01", enable_plot=True):
print(f"\n=== [{ticker}] 단일 종목 백테스트 시작 ===")

weight_path = os.path.join(project_root, "AI/data/weights/transformer/universal_transformer.keras")
loader = SignalDataLoader(sequence_length=60)
df = loader.load_data(ticker, start_date, end_date)

if df is None or len(df) < 100:
print("❌ 데이터 로드 실패")
return

if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)

data_feed = bt.feeds.PandasData(dataname=df)
features = df.select_dtypes(include=[np.number]).columns.tolist()

cerebro = bt.Cerebro()
cerebro.adddata(data_feed)

cerebro.addstrategy(
TransformerWalkForwardStrategy,
model_weights_path=weight_path,
raw_df=df,
features=features,
loader=loader
)

if enable_plot:
cerebro.addobserver(AIScoreObserver)

cerebro.broker.setcash(10_000_000)
cerebro.broker.setcommission(commission=0.0015)
cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe', riskfreerate=0.0)
cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')

print(f"💰 초기 자산: {cerebro.broker.getvalue():,.0f}원")
results = cerebro.run()

strat = results[0]
final_val = cerebro.broker.getvalue()
mdd = strat.analyzers.drawdown.get_analysis()['max']['drawdown']
sharpe = strat.analyzers.sharpe.get_analysis().get('sharperatio', 0.0)

print(f"💰 최종 자산: {final_val:,.0f}원 ({(final_val/10000000 - 1)*100:.2f}%)")
print(f"📉 MDD: {mdd:.2f}% | 📊 Sharpe: {sharpe:.4f}")

if enable_plot:
cerebro.plot(style='candlestick', volume=False)

if __name__ == "__main__":
run_single_backtest()
Loading