Skip to content
Merged
594 changes: 350 additions & 244 deletions AI/libs/database/repository.py

Large diffs are not rendered by default.

Empty file added AI/modules/__init__.py
Empty file.
23 changes: 20 additions & 3 deletions AI/modules/data_collector/ticker_master_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import List, Dict
import requests
from datetime import datetime
from io import StringIO

# 프로젝트 루트 경로 설정
current_dir = os.path.dirname(os.path.abspath(__file__))
Expand Down Expand Up @@ -34,7 +35,14 @@ def fetch_sp500_tickers(self) -> List[Dict]:
print("[Master] S&P 500 리스트 다운로드 중 (Wikipedia)...")
try:
url = 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
tables = pd.read_html(url)
# 봇 차단 우회를 위한 User-Agent 헤더 추가
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'}

response = requests.get(url, headers=headers)
response.raise_for_status()

# StringIO로 텍스트를 감싸서 pandas로 읽기
tables = pd.read_html(StringIO(response.text))
df = tables[0]

# yfinance 호환성을 위해 티커 변경 (예: BRK.B -> BRK-B)
Expand All @@ -59,14 +67,23 @@ def fetch_nasdaq100_tickers(self) -> List[Dict]:
print("[Master] NASDAQ 100 리스트 다운로드 중...")
try:
url = 'https://en.wikipedia.org/wiki/Nasdaq-100'
tables = pd.read_html(url)
# 봇 차단 우회를 위한 User-Agent 헤더 추가
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'}

response = requests.get(url, headers=headers)
response.raise_for_status()

tables = pd.read_html(StringIO(response.text))

# 보통 5번째 테이블이 구성 종목 (Wikipedia 구조 변경 시 확인 필요)
# 안전하게 컬럼명으로 찾기
df = None
for table in tables:
if 'Ticker' in table.columns and 'Company' in table.columns:
df = table
break
else:

if df is None:
return []

df['Ticker'] = df['Ticker'].str.replace('.', '-', regex=False)
Expand Down
Empty file.
4 changes: 2 additions & 2 deletions AI/modules/features/event_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
import pandas as pd

def add_date_distance(df: pd.DataFrame, event_dates: pd.Series, col_name: str) -> pd.DataFrame:
"""asof_date 기준 특정 이벤트 후 경과일 계산 [명세서 준수]"""
"""asof_date 기준 특정 이벤트 후 경과일 계산"""
# event_dates는 각 행(날짜)별로 가장 최근의 이벤트 날짜를 가지고 있어야 함
df[f'days_since_{col_name}'] = (df.index - pd.to_datetime(event_dates)).dt.days
# 음수(미래)는 0 처리
df[f'days_since_{col_name}'] = df[f'days_since_{col_name}'].clip(lower=0)
return df

def add_event_window_flags(df: pd.DataFrame, event_dates_list: list, col_name: str) -> pd.DataFrame:
"""FOMC/CPI 전후 1일 여부 (True/False) [명세서 준수]"""
"""FOMC/CPI 전후 1일 여부 (True/False)"""
# 이벤트 당일 플래그
df[f'event_window_flag_{col_name}'] = df.index.isin(event_dates_list)
# 전후 1일로 확장 (rolling max)
Expand Down
1 change: 0 additions & 1 deletion AI/modules/features/legacy/technical_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
레거시 버전입니다.
[Stationary Multi-Timeframe Features - Fixed]
- 절대 가격(Price)을 Ratio로 변환합니다.
- [수정] 주가 데이터를 파괴하던 clip(-10, 10) 로직을 제거했습니다.
- 무한대(inf)는 NaN -> 0 처리하여 안전하게 만듭니다.
"""

Expand Down
2 changes: 1 addition & 1 deletion AI/modules/features/market_derived.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# AI/modules/features/market_derived.py
import pandas as pd
import numpy as np
from features.technical import compute_rsi, compute_atr, compute_macd, compute_bollinger_bands
from .technical import compute_rsi, compute_atr, compute_macd, compute_bollinger_bands
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: 패키지 구조를 위한 __init__.py 파일 존재 여부 확인

echo "=== Checking for __init__.py files ==="
echo ""

# Check AI/modules/__init__.py
if [ -f "AI/modules/__init__.py" ]; then
    echo "✓ AI/modules/__init__.py exists"
else
    echo "✗ AI/modules/__init__.py NOT FOUND (Required for package structure)"
fi

# Check AI/modules/features/__init__.py
if [ -f "AI/modules/features/__init__.py" ]; then
    echo "✓ AI/modules/features/__init__.py exists"
else
    echo "✗ AI/modules/features/__init__.py NOT FOUND (Required for relative imports)"
fi

echo ""
echo "=== Checking if technical.py exists ==="
if [ -f "AI/modules/features/technical.py" ]; then
    echo "✓ AI/modules/features/technical.py exists"
else
    echo "✗ AI/modules/features/technical.py NOT FOUND"
fi

echo ""
echo "=== Searching for files that import from market_derived.py ==="
rg -n "from AI\.modules\.features\.market_derived import|from \.market_derived import" --type py -C 1

Repository: SISC-IT/sisc-web

Length of output: 944


__init__.py 파일이 누락되어 상대 임포트가 작동하지 않습니다.

상대 임포트(.technical)를 사용하려면 AI/modules/AI/modules/features/ 디렉토리에 __init__.py 파일이 필요합니다. 현재 두 파일 모두 존재하지 않으므로 ImportError가 발생합니다.

또한 AI/modules/signal/core/dataset_builder.py의 절대 임포트(from AI.modules.features.market_derived import ...)도 패키지 구조가 완성되지 않으면 실패합니다.

다음 파일들을 생성하세요:

  • AI/modules/__init__.py (빈 파일)
  • AI/modules/features/__init__.py (빈 파일)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@AI/modules/features/market_derived.py` at line 4, Create two empty package
initializer files so Python treats the directories as packages: add an empty
AI/modules/__init__.py and an empty AI/modules/features/__init__.py. This will
allow the relative import in AI/modules/features/market_derived.py (the "from
.technical import compute_rsi, compute_atr, compute_macd,
compute_bollinger_bands" statement) to resolve and also enable the absolute
import used in AI/modules/signal/core/dataset_builder.py ("from
AI.modules.features.market_derived import ...") to succeed.


def add_market_changes(df: pd.DataFrame) -> pd.DataFrame:
"""가격 및 거래량 기반 변화율 계산 [명세서 준수]"""
Expand Down
27 changes: 18 additions & 9 deletions AI/modules/features/processor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# AI/modules/features/processor.py
import pandas as pd
from .market_derived import add_standard_technical_features, add_multi_timeframe_features
from .event_features import add_event_features
from .technical import compute_correlation_spike, compute_recent_loss_ema
from AI.modules.features.market_derived import add_standard_technical_features, add_multi_timeframe_features
from AI.modules.features.event_features import add_date_distance, add_event_window_flags
from AI.modules.features.technical import compute_correlation_spike, compute_recent_loss_ema

class FeatureProcessor:
"""
Expand All @@ -15,22 +15,31 @@ def __init__(self, df: pd.DataFrame):
if 'date' in self.df.columns:
self.df['date'] = pd.to_datetime(self.df['date'])
self.df = self.df.sort_values('date')
self.df.set_index('date', inplace=True)

def execute_pipeline(self, event_info=None, sector_df=None):
"""전체 파생 피처 생성 파이프라인 실행"""

# 1. 일봉 기준 표준 기술적 지표 및 수익률 계산 (Standard Key 생성)
# 1. 일봉 기준 표준 기술적 지표 및 수익률 계산
self.df = add_standard_technical_features(self.df)

# 2. 주봉/월봉 멀티 타임프레임 피처 결합 (Legacy 로직 완벽 대체)
# 2. 주봉/월봉 멀티 타임프레임 피처 결합
self.df = add_multi_timeframe_features(self.df)

# 3. 이벤트 기반 피처 (IPO 경과일, 실적발표 등)
# 3. 이벤트 기반 피처
if event_info:
self.df = add_event_features(self.df, event_info)
self.df = add_date_distance(self.df, event_info.get('ipo_dates', pd.Series()), 'ipo')
self.df = add_event_window_flags(self.df, event_info.get('fomc_dates', []), 'fomc')
Comment on lines +29 to +32
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

ipo_dates 기본값이 helper 입력 계약을 깨고 있습니다.

add_date_distance()는 각 행과 정렬된 이벤트 날짜 Series를 전제로 하는데, 여기서는 key가 없을 때 길이 0짜리 pd.Series()를 넘기고 있습니다. event_infofomc_dates만 있고 ipo_dates가 없으면 인덱스가 맞지 않아 잘못된 계산이나 예외로 이어질 수 있습니다.

제안 수정안
-            self.df = add_date_distance(self.df, event_info.get('ipo_dates', pd.Series()), 'ipo')
+            ipo_dates = event_info.get('ipo_dates')
+            if ipo_dates is None:
+                ipo_dates = pd.Series(pd.NaT, index=self.df.index, dtype='datetime64[ns]')
+            self.df = add_date_distance(self.df, ipo_dates, 'ipo')
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@AI/modules/features/processor.py` around lines 29 - 32, The code passes a
bare pd.Series() as the default for event_info['ipo_dates'], violating
add_date_distance's contract that expects a datetime Series aligned to the
DataFrame index; change the default to an empty datetime Series indexed to
self.df (e.g. a Series with dtype datetime64[ns] and index=self.df.index) when
calling add_date_distance so add_date_distance(self.df, ipo_dates, 'ipo') always
receives an index-aligned Series; keep the existing call to
add_event_window_flags for fomc_dates as-is.


# 사용자님이 완벽하게 고치신 부분! 👍
if 'vix_close' in self.df.columns:
self.df['correlation_spike'] = compute_correlation_spike(self.df['close'], self.df['vix_close'])

self.df['recent_loss_ema'] = compute_recent_loss_ema(self.df['close'], self.df['close'].shift(1))

# 4. 데이터 정제 (Legacy 안정성 로직)
# 4. 데이터 정제
self.df = self.finalize_data()
self.df.reset_index(inplace=True)

return self.df

Expand All @@ -39,4 +48,4 @@ def finalize_data(self):
import numpy as np
self.df.replace([np.inf, -np.inf], np.nan, inplace=True)
self.df = self.df.fillna(0)
return self.df
return self.df
2 changes: 1 addition & 1 deletion AI/modules/finder/screener.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class DynamicScreener:
"""
def __init__(self, db_name="db"):
self.db_name = db_name
self.config_path = os.path.join(project_root, "AI", "config", "watchlist.json")
self.config_path = os.path.join(project_root, "config", "watchlist.json")

def update_watchlist(self, target_date: str, top_n: int = 30) -> list:
"""
Expand Down
13 changes: 13 additions & 0 deletions AI/modules/signal/core/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ def predict(self, X_input: np.ndarray) -> np.ndarray:
"""
pass

@abstractmethod
def get_signals(self, df: pd.DataFrame, ticker_id: int, sector_id: int) -> Dict[str, float]:
"""
모델별 시그널을 딕셔너리 형태로 반환하는 메서드
Args:
df (pd.DataFrame): 종목별 시계열 데이터
ticker_id (int): 종목 ID
sector_id (int): 섹터 ID
Returns:
Dict[str, float]: 모델 이름을 키로 하고 예측 확률을 값으로 하는 딕셔너리
"""
pass

@abstractmethod
def save(self, filepath: str):
"""모델 가중치 저장"""
Expand Down
6 changes: 3 additions & 3 deletions AI/modules/signal/core/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(self, db_name="db", lookback=60, horizons: List[int] = None):
# 메타데이터 ID 매핑
self.ticker_to_id: Dict[str, int] = {}
self.sector_to_id: Dict[str, int] = {}
self.ticker_sector_map: Dict[str, int] = {}
self.ticker_to_sector_id: Dict[str, int] = {}

# 공통 데이터 캐싱 (Macro, Market Breadth)
self.macro_df: pd.DataFrame = pd.DataFrame()
Expand All @@ -72,7 +72,7 @@ def _load_metadata(self):
self.sector_to_id = {sec: i for i, sec in enumerate(unique_sectors)}

for _, row in df_meta.iterrows():
self.ticker_sector_map[row['ticker']] = self.sector_to_id[row['sector']]
self.ticker_to_sector_id[row['ticker']] = self.sector_to_id[row['sector']]

self.ticker_to_id = {t: i for i, t in enumerate(df_meta['ticker'])}
print(f"[DataLoader] 메타데이터 로드 완료: {len(self.ticker_to_id)}개 종목")
Expand Down Expand Up @@ -254,7 +254,7 @@ def create_dataset(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.n

# 메타데이터 매핑
t_id = self.ticker_to_id.get(ticker, 0)
s_id = self.ticker_sector_map.get(ticker, 0)
s_id = self.ticker_to_sector_id.get(ticker, 0)

# Numpy 변환 (속도 최적화)
feature_vals = sub_df[available_cols].values
Expand Down
95 changes: 62 additions & 33 deletions AI/modules/signal/models/transformer/wrapper.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
# AI/modules/signal/models/transformer/wrapper.py
#AI/modules/signal/models/transformer/wrapper.py
"""
[Transformer 모델 래퍼]
[Transformer 모델 래퍼] - Meta-Ensemble 업그레이드 버전
- BaseSignalModel 인터페이스를 구현한 실제 실행 클래스입니다.
- architecture.py에서 정의한 모델을 빌드하고, 학습/예측/저장 로직을 수행합니다.
- 기존 Numpy 텐서 기반의 추론과 신규 DataFrame 기반의 앙상블 추론을 모두 지원합니다.
"""

import os
import pickle
import numpy as np
import pandas as pd
import tensorflow as tf
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, Union
from AI.modules.signal.core.base_model import BaseSignalModel
from .architecture import build_transformer_model

Expand All @@ -17,57 +19,56 @@ def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.model_name = "transformer"

self.seq_len = config.get("seq_len", 60)
self.features = config.get("features", []) # 사용할 17개 피처 리스트
self.scaler = None

def load_scaler(self, filepath: str):
"""[추가] 추론 시 사용할 데이터 정규화 스케일러 로드"""
if not os.path.exists(filepath):
raise FileNotFoundError(f"스케일러 파일이 없습니다: {filepath}")
with open(filepath, "rb") as f:
self.scaler = pickle.load(f)
print(f"✅ 스케일러 로드 완료: {filepath}")

def build(self, input_shape: tuple):
"""설정(config)에 따라 모델 아키텍처 생성"""
# 차원 검증
if len(input_shape) != 2:
# input_shape가 (timesteps, features) 2차원이 아니라면 경고 또는 에러
# 일부 환경에서 (None, timesteps, features)로 올 수 있으므로 유연하게 처리
if len(input_shape) == 3 and input_shape[0] is None:
input_shape = input_shape[1:]
else:
raise ValueError(f"입력 차원은 (timesteps, features) 2차원이어야 합니다. 현재: {input_shape}")

self.model = build_transformer_model(
input_shape=input_shape,
n_tickers=self.config.get("n_tickers", 1000),
n_sectors=self.config.get("n_sectors", 50),
n_outputs=4, # 1일, 3일, 5일, 7일 예측
head_size=self.config.get("head_size", 256),
num_heads=self.config.get("num_heads", 4),
ff_dim=self.config.get("ff_dim", 4),
ff_dim=self.config.get("ff_dim", 4),
num_transformer_blocks=self.config.get("num_blocks", 4),
mlp_units=self.config.get("mlp_units", [128]),
dropout=self.config.get("dropout", 0.4),
mlp_dropout=self.config.get("mlp_dropout", 0.25)
)

# 컴파일
learning_rate = self.config.get("learning_rate", 1e-4)
self.model.compile(
loss="binary_crossentropy",
optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
metrics=["accuracy", "AUC"]
)

def train(
self,
X_train: np.ndarray,
y_train: np.ndarray,
X_val: Optional[np.ndarray] = None,
y_val: Optional[np.ndarray] = None,
**kwargs
):
def train(self, X_train: np.ndarray, y_train: np.ndarray, X_val: Optional[np.ndarray] = None, y_val: Optional[np.ndarray] = None, **kwargs):
"""모델 학습 수행"""
if self.model is None:
raise ValueError("모델이 빌드되지 않았습니다. build()를 먼저 호출하세요.")

# ✅ 호출자가 주면 우선, 없으면 config, 없으면 default
epochs = int(kwargs.pop("epochs", self.config.get("epochs", 50)))
batch_size = int(kwargs.pop("batch_size", self.config.get("batch_size", 32)))
verbose = int(kwargs.pop("verbose", 1))

# callbacks는 pop으로 빼서 중복 전달 방지
callbacks = kwargs.pop("callbacks", [])

# validation_data는 (X_val, y_val)이 둘 다 있을 때만
validation_data = (X_val, y_val) if (X_val is not None and y_val is not None) else None

history = self.model.fit(
Expand All @@ -81,33 +82,61 @@ def train(
)
return history




def predict(self, X_input: np.ndarray, **kwargs) -> np.ndarray:
"""추론 수행"""
def predict(self, X_input: np.ndarray, ticker_id: int = 0, sector_id: int = 0, **kwargs) -> np.ndarray:
"""
[기본 계약 준수]
- 순수 Numpy 배열을 받아 예측을 수행하고 Numpy 배열을 반환합니다.
"""
if self.model is None:
raise ValueError("모델이 없습니다. load()하거나 build() 하세요.")

# Keras 모델은 (batch, time, feat) 형태를 기대하므로 차원 확인
if len(X_input.shape) == 2:
X_input = np.expand_dims(X_input, axis=0)

return self.model.predict(X_input, **kwargs)
t_id_tensor = np.array([[ticker_id]])
s_id_tensor = np.array([[sector_id]])

return self.model.predict([X_input, t_id_tensor, s_id_tensor], **kwargs)

def get_signals(self, df: pd.DataFrame, ticker_id: int = 0, sector_id: int = 0) -> Dict[str, float]:
"""
[파이프라인 전용 메서드]
- DataFrame을 입력받아 전처리 후 포트폴리오 로직에 맞는 딕셔너리로 반환합니다.
"""
if not self.features:
raise ValueError("추론에 필요한 features(컬럼 리스트)가 설정되지 않았습니다.")
if self.scaler is None:
raise ValueError("스케일러가 로드되지 않았습니다. load_scaler()를 먼저 호출하세요.")

# 1. 피처 추출 및 시퀀스 길이만큼 자르기
data = df[self.features].iloc[-self.seq_len:].values

# 2. 스케일링
scaled_data = self.scaler.transform(data)

# 3. 모델 예측 (내부의 predict 재사용)
pred_array = self.predict(scaled_data, ticker_id=ticker_id, sector_id=sector_id, verbose=0)
probs = pred_array[0]

# 4. 딕셔너리로 반환
return {
f"{self.model_name}_1d": float(probs[0]),
f"{self.model_name}_3d": float(probs[1]),
f"{self.model_name}_5d": float(probs[2]),
f"{self.model_name}_7d": float(probs[3])
}
def save(self, filepath: str):
"""모델 저장"""
if self.model is None:
print("저장할 모델이 없습니다.")
return

os.makedirs(os.path.dirname(filepath), exist_ok=True)
self.model.save(filepath)
print(f"모델 저장 완료: {filepath}")
print(f"모델 저장 완료: {filepath}")

def load(self, filepath: str):
"""모델 로드"""
if not os.path.exists(filepath):
raise FileNotFoundError(f"모델 파일이 없습니다: {filepath}")
self.model = tf.keras.models.load_model(filepath)
self.model = tf.keras.models.load_model(filepath)
print(f"✅ 모델 로드 완료: {filepath}")
Loading