Skip to content
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ __pycache__/
/env
/.vs
/.venv/
AI/.venv/
AI/data/weights/tcn/

# ===== Backend =====
backend/src/main/java/org/sejongisc/backend/stock/TestController.java

# ===== windows =====
.desktop.ini
.desktop.ini
27 changes: 20 additions & 7 deletions AI/modules/features/market_derived.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,21 @@
from .technical import compute_rsi, compute_atr, compute_macd, compute_bollinger_bands

def add_market_changes(df: pd.DataFrame) -> pd.DataFrame:
"""가격 및 거래량 기반 변화율 계산 [명세서 준수]"""
"""가격 및 거래량 기반 변화율 계산 [명세서 및 레거시 하위 호환]"""
epsilon = 1e-9

# --- [레거시 호환] 기존 모델 학습에 사용되었던 캔들 모양 및 거래량 피처 복구 ---
prev_close = df['close'].shift(1)
df['open_ratio'] = (df['open'] - prev_close) / (prev_close + epsilon)
df['high_ratio'] = (df['high'] - prev_close) / (prev_close + epsilon)
df['low_ratio'] = (df['low'] - prev_close) / (prev_close + epsilon)
df['vol_change'] = df['volume'].pct_change()

# --- [신규 명세서] 일간 수익률 및 일중 변동성 ---
df['ret_1d'] = df['close'].pct_change()
df['log_return'] = np.log(df['close'] / df['close'].shift(1))
# 일중변동성비율 : (High - Low) / Close
df['intraday_vol'] = (df['high'] - df['low']) / (df['close'] + 1e-9)
df['intraday_vol'] = (df['high'] - df['low']) / (df['close'] + epsilon)

return df

def add_macro_changes(df: pd.DataFrame) -> pd.DataFrame:
Expand All @@ -32,8 +42,10 @@ def add_standard_technical_features(df: pd.DataFrame) -> pd.DataFrame:
"""레거시 로직 + 명세서 신규 지표 통합"""
epsilon = 1e-9

# 1. RSI (rsi_14)
# 1. RSI
# 명세서 기준(rsi_14)과 레거시 모델 호환용(rsi) 컬럼을 모두 생성합니다.
df['rsi_14'] = compute_rsi(df['close'], 14) / 100.0
df['rsi'] = df['rsi_14']

# 2. MACD (macd, macd_signal)
df['macd'], df['macd_signal'] = compute_macd(df['close'])
Expand All @@ -48,11 +60,12 @@ def add_standard_technical_features(df: pd.DataFrame) -> pd.DataFrame:
# 모델 입력용 포지션
df['bb_position'] = (df['close'] - df['bollinger_lb']) / ( (df['bollinger_ub'] - df['bollinger_lb']).replace(0, epsilon) )

# 4. ATR (atr_14)
# 4. ATR (atr_14) - [신규 명세서 지표]
df['atr_14'] = compute_atr(df['high'], df['low'], df['close'], 14)

# 5. Moving Averages (ma_20, ma_60)
for w in [20, 60]:
# 5. Moving Averages (ma_5, ma_20, ma_60)
# [레거시 호환] 기존에 사용하던 5일 이평선(ma_5)을 복구하여 배열에 추가했습니다.
for w in [5, 20, 60]:
df[f'ma_{w}'] = df['close'].rolling(window=w).mean()
# 모델 입력용 이격도 (Standard Key: ma_trend_score 등에 활용)
df[f'ma{w}_ratio'] = (df['close'] - df[f'ma_{w}']) / (df[f'ma_{w}'] + epsilon)
Expand Down
56 changes: 42 additions & 14 deletions AI/modules/signal/core/dataset_builder.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,53 @@
# AI/modules/signal/core/dataset_builder.py
import pandas as pd
from typing import Union, Optional

from AI.modules.signal.core.data_loader import DataLoader
from AI.modules.features.technical import add_technical_indicators
from AI.modules.features.market_derived import add_macro_features
from AI.modules.features.processor import FeatureProcessor
from AI.modules.features.market_derived import add_market_changes, add_macro_changes

def get_standard_training_data(start_date: str, end_date: str) -> pd.DataFrame:
def apply_strict_nan_rules(df: pd.DataFrame) -> pd.DataFrame:
"""
데이터 수집/전처리 코드를 짤 필요 없이 이 함수만 호출하면 됩니다.
명세서에 정의된 모든 원천/파생 피처가 포함된 DataFrame을 반환합니다.
SISC 데이터 명세서에 따른 엄격한 결측치 처리 규칙을 적용합니다.
"""
# 1. DB에서 원천 데이터(Raw) 로드
loader = DataLoader()
raw_df = loader.load_data_from_db(start_date, end_date)
df_clean = df.copy()

macro_cols = ['vix_close', 'vix_change_rate', 'us10y', 'us10y_chg', 'dxy_close', 'dxy_chg']
available_macro = [col for col in macro_cols if col in df_clean.columns]

if available_macro:
df_clean[available_macro] = df_clean[available_macro].ffill()

df_clean = df_clean.dropna().reset_index(drop=True)
return df_clean

def get_standard_training_data(
start_date_or_df: Union[str, pd.DataFrame],
end_date: Optional[str] = None
) -> pd.DataFrame:
"""
SISC 파이프라인 표준 학습 데이터셋 생성기. (학습/추론 겸용)
- 사용법 1 (학습용): get_standard_training_data('2020-01-01', '2024-01-01') -> DB 조회
- 사용법 2 (추론용): get_standard_training_data(df) -> DB 생략하고 전처리만 수행
"""
# 1. 입력 타입에 따른 분기 처리 (DB 로드 vs 직접 주입)
if isinstance(start_date_or_df, pd.DataFrame):
df = start_date_or_df.copy()
else:
loader = DataLoader()
df = loader.load_data_from_db(start_date_or_df, end_date)
if df is None or df.empty:
raise ValueError(f"지정된 기간({start_date_or_df} ~ {end_date})의 데이터를 불러오지 못했습니다.")

# 2. 파생 피처 레이어 계산 (1차: 기초 변화율 연산)
df = add_market_changes(df)
df = add_macro_changes(df)

# 2. 파생 피처 레이어 계산
# (팀장님이 기존 features 모듈을 활용해 모든 지표를 미리 계산해서 붙여줌)
df = add_technical_indicators(raw_df) # rsi_14, macd 등 추가
df = add_macro_features(df) # vix_z_score, us10y_chg 등 추가
# 3. 파생 피처 레이어 계산 (2차: FeatureProcessor를 통한 심화 지표 일괄 연산)
processor = FeatureProcessor(df)
df = processor.execute_pipeline()
Comment on lines +33 to +48
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

정렬·종목 분리 전에 변화율을 계산하면 피처가 오염됩니다.

add_market_changes()shift()/pct_change()에 의존하는데, 지금은 FeatureProcessor가 날짜 정렬을 수행하기 전에 먼저 실행됩니다. 게다가 load_data_from_db(start_date_or_df, end_date)tickers를 넘기지 않아 전체 종목을 반환할 수 있어서, 멀티 종목 데이터에서는 다른 종목의 직전 행이 prev_close로 섞일 수 있습니다. 최소한 날짜 정렬을 먼저 하고, 멀티 종목 입력이면 ticker 기준으로 분리해서 변화율/롤링 지표를 계산해야 합니다.

🧰 Tools
🪛 Ruff (0.15.5)

[warning] 40-40: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@AI/modules/signal/core/dataset_builder.py` around lines 33 - 48, The current
pipeline calls add_market_changes/add_macro_changes before sorting and splitting
by ticker, which lets pct_change/shift cross different tickers and corrupt
features; modify dataset_builder.py so after obtaining df (from
DataLoader.load_data_from_db or when passed a DataFrame) you first sort by date
(and any necessary keys) and then, if multiple tickers exist, group/split by the
ticker column and apply add_market_changes/add_macro_changes (or compute
pct_change/shift/rolling) per-group before re-concatenating; alternatively
ensure DataLoader.load_data_from_db is called with tickers to return
single-ticker data and document that FeatureProcessor.execute_pipeline expects
pre-sorted, per-ticker input.


# 3. 결측치(NaN) 처리
# Market은 Drop, Macro는 ffill 등...
# 4. 결측치(NaN) 처리 규칙 적용
df = apply_strict_nan_rules(df)

return df
102 changes: 102 additions & 0 deletions AI/modules/signal/models/TCN/architecture.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import torch
import torch.nn as nn
from typing import List


class Chomp1d(nn.Module):
# Causal padding 뒤에 생기는 미래 시점 누수를 잘라냅니다.
def __init__(self, chomp_size: int):
super().__init__()
self.chomp_size = chomp_size

def forward(self, x: torch.Tensor) -> torch.Tensor:
if self.chomp_size == 0:
return x
return x[:, :, :-self.chomp_size].contiguous()


class TemporalBlock(nn.Module):
# 두 개의 dilated Conv1d와 residual connection으로 TCN의 기본 블록을 구성합니다.
def __init__(
self,
in_channels: int,
out_channels: int,
kernel_size: int,
dilation: int,
dropout: float,
):
super().__init__()
padding = (kernel_size - 1) * dilation

self.net = nn.Sequential(
nn.Conv1d(
in_channels,
out_channels,
kernel_size,
padding=padding,
dilation=dilation,
),
Chomp1d(padding),
nn.ReLU(),
nn.Dropout(dropout),
nn.Conv1d(
out_channels,
out_channels,
kernel_size,
padding=padding,
dilation=dilation,
),
Chomp1d(padding),
nn.ReLU(),
nn.Dropout(dropout),
)
self.downsample = (
nn.Conv1d(in_channels, out_channels, kernel_size=1)
if in_channels != out_channels
else None
)
self.activation = nn.ReLU()

def forward(self, x: torch.Tensor) -> torch.Tensor:
residual = x if self.downsample is None else self.downsample(x)
return self.activation(self.net(x) + residual)


class TCNClassifier(nn.Module):
# 여러 개의 TemporalBlock을 쌓아 멀티 호라이즌 이진 분류 logits를 출력합니다.
def __init__(
self,
input_size: int,
output_size: int,
num_channels: List[int],
kernel_size: int = 3,
dropout: float = 0.2,
):
super().__init__()

layers = []
for i, out_channels in enumerate(num_channels):
in_channels = input_size if i == 0 else num_channels[i - 1]
dilation = 2 ** i
layers.append(
TemporalBlock(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
dilation=dilation,
dropout=dropout,
)
)

self.backbone = nn.Sequential(*layers)
self.head = nn.Sequential(
nn.AdaptiveAvgPool1d(1),
nn.Flatten(),
nn.Linear(num_channels[-1], output_size),
)

def forward(self, x: torch.Tensor) -> torch.Tensor:
# 입력은 [Batch, Seq, Features]이며 Conv1d에 맞게 [Batch, Features, Seq]로 바꿉니다.
x = x.permute(0, 2, 1)
x = self.backbone(x)
return self.head(x)
Loading