From bb10575891dbcd0ed30bb5857ebe6f3a5ce97f2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EC=84=B1=EC=9D=BC?= Date: Sun, 8 Mar 2026 22:19:52 +0900 Subject: [PATCH 1/6] =?UTF-8?q?[AI][FEAT]:=20TCN=20=EB=AA=A8=EB=8D=B8=20?= =?UTF-8?q?=EB=BC=88=EB=8C=80=20=EB=B0=8F=20=ED=95=99=EC=8A=B5/=EC=B6=94?= =?UTF-8?q?=EB=A1=A0=20=EC=96=B4=EB=8C=91=ED=84=B0=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 4 +- AI/modules/signal/models/TCN/architecture.py | 102 ++++++++ AI/modules/signal/models/TCN/train.py | 241 +++++++++++++++++++ AI/modules/signal/models/TCN/wrapper.py | 235 ++++++++++++++---- 4 files changed, 535 insertions(+), 47 deletions(-) create mode 100644 AI/modules/signal/models/TCN/architecture.py create mode 100644 AI/modules/signal/models/TCN/train.py diff --git a/.gitignore b/.gitignore index 6b86470f..c268f163 100644 --- a/.gitignore +++ b/.gitignore @@ -28,9 +28,11 @@ __pycache__/ /env /.vs /.venv/ +AI/.venv/ +AI/data/weights/tcn/ # ===== Backend ===== backend/src/main/java/org/sejongisc/backend/stock/TestController.java # ===== windows ===== -.desktop.ini \ No newline at end of file +.desktop.ini diff --git a/AI/modules/signal/models/TCN/architecture.py b/AI/modules/signal/models/TCN/architecture.py new file mode 100644 index 00000000..6412d154 --- /dev/null +++ b/AI/modules/signal/models/TCN/architecture.py @@ -0,0 +1,102 @@ +import torch +import torch.nn as nn +from typing import List + + +class Chomp1d(nn.Module): + # Causal padding 뒤에 생기는 미래 시점 누수를 잘라냅니다. + def __init__(self, chomp_size: int): + super().__init__() + self.chomp_size = chomp_size + + def forward(self, x: torch.Tensor) -> torch.Tensor: + if self.chomp_size == 0: + return x + return x[:, :, :-self.chomp_size].contiguous() + + +class TemporalBlock(nn.Module): + # 두 개의 dilated Conv1d와 residual connection으로 TCN의 기본 블록을 구성합니다. + def __init__( + self, + in_channels: int, + out_channels: int, + kernel_size: int, + dilation: int, + dropout: float, + ): + super().__init__() + padding = (kernel_size - 1) * dilation + + self.net = nn.Sequential( + nn.Conv1d( + in_channels, + out_channels, + kernel_size, + padding=padding, + dilation=dilation, + ), + Chomp1d(padding), + nn.ReLU(), + nn.Dropout(dropout), + nn.Conv1d( + out_channels, + out_channels, + kernel_size, + padding=padding, + dilation=dilation, + ), + Chomp1d(padding), + nn.ReLU(), + nn.Dropout(dropout), + ) + self.downsample = ( + nn.Conv1d(in_channels, out_channels, kernel_size=1) + if in_channels != out_channels + else None + ) + self.activation = nn.ReLU() + + def forward(self, x: torch.Tensor) -> torch.Tensor: + residual = x if self.downsample is None else self.downsample(x) + return self.activation(self.net(x) + residual) + + +class TCNClassifier(nn.Module): + # 여러 개의 TemporalBlock을 쌓아 멀티 호라이즌 이진 분류 logits를 출력합니다. + def __init__( + self, + input_size: int, + output_size: int, + num_channels: List[int], + kernel_size: int = 3, + dropout: float = 0.2, + ): + super().__init__() + + layers = [] + for i, out_channels in enumerate(num_channels): + in_channels = input_size if i == 0 else num_channels[i - 1] + dilation = 2 ** i + layers.append( + TemporalBlock( + in_channels=in_channels, + out_channels=out_channels, + kernel_size=kernel_size, + dilation=dilation, + dropout=dropout, + ) + ) + + self.backbone = nn.Sequential(*layers) + self.head = nn.Sequential( + nn.AdaptiveAvgPool1d(1), + nn.Flatten(), + nn.Linear(num_channels[-1], output_size), + ) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + # 입력은 [Batch, Seq, Features]이며 Conv1d에 맞게 [Batch, Features, Seq]로 바꿉니다. + x = x.permute(0, 2, 1) + x = self.backbone(x) + return self.head(x) diff --git a/AI/modules/signal/models/TCN/train.py b/AI/modules/signal/models/TCN/train.py new file mode 100644 index 00000000..4adc3d8d --- /dev/null +++ b/AI/modules/signal/models/TCN/train.py @@ -0,0 +1,241 @@ +import argparse +import json +import os +import pickle +import sys +from typing import Dict, List, Tuple + +import numpy as np +import pandas as pd +import torch +import torch.nn as nn +from sklearn.preprocessing import StandardScaler +from torch.utils.data import DataLoader as TorchDataLoader +from torch.utils.data import TensorDataset + +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../../../../..")) +if project_root not in sys.path: + sys.path.append(project_root) + +from AI.modules.features.legacy.technical_features import add_technical_indicators +from AI.modules.signal.core.data_loader import DataLoader +from AI.modules.signal.models.TCN.architecture import TCNClassifier + + +# TCN은 명세에 맞춰 개별 기술적 지표만 사용합니다. +FEATURE_COLUMNS = [ + "log_return", + "open_ratio", + "high_ratio", + "low_ratio", + "vol_change", + "ma5_ratio", + "ma20_ratio", + "ma60_ratio", + "rsi", + "macd_ratio", + "bb_position", +] + +# 서비스 추론 결과와 맞추기 위해 1/3/5/7일 방향성을 동시에 학습합니다. +HORIZONS = [1, 3, 5, 7] + + +def build_sequences( + df: pd.DataFrame, + seq_len: int, + feature_cols: List[str], + horizons: List[int], +) -> Tuple[np.ndarray, np.ndarray]: + # 종목별 시계열을 순회하면서 [seq_len, features] 윈도우와 멀티 호라이즌 라벨을 만듭니다. + features = [] + labels = [] + max_horizon = max(horizons) + + for _, sub_df in df.groupby("ticker"): + sub_df = sub_df.sort_values("date").copy() + sub_df = add_technical_indicators(sub_df) + sub_df = sub_df.dropna(subset=["close"]) + sub_df = sub_df.replace([np.inf, -np.inf], np.nan).fillna(0) + + if len(sub_df) < seq_len + max_horizon: + continue + + feature_values = sub_df[feature_cols].to_numpy(dtype=np.float32) + closes = sub_df["close"].to_numpy(dtype=np.float32) + + for start in range(len(sub_df) - seq_len - max_horizon + 1): + end = start + seq_len + current_close = closes[end - 1] + target = [] + for horizon in horizons: + future_close = closes[end + horizon - 1] + target.append(1.0 if future_close > current_close else 0.0) + + features.append(feature_values[start:end]) + labels.append(target) + + if not features: + raise ValueError("No training sequences were created. Check DB data coverage.") + + return np.array(features, dtype=np.float32), np.array(labels, dtype=np.float32) + + +def fit_scaler(X_train: np.ndarray) -> StandardScaler: + # 시퀀스 축을 펼쳐 feature 단위로 표준화 스케일러를 학습합니다. + scaler = StandardScaler() + scaler.fit(X_train.reshape(-1, X_train.shape[-1])) + return scaler + + +def transform_sequences(X: np.ndarray, scaler: StandardScaler) -> np.ndarray: + # 학습 시 저장한 스케일러를 시퀀스 전체에 동일하게 적용합니다. + shape = X.shape + flat = X.reshape(-1, shape[-1]) + scaled = scaler.transform(flat) + return scaled.reshape(shape).astype(np.float32) + + +def train_model(args: argparse.Namespace): + # 공통 DataLoader로 DB에서 가격 데이터를 읽고 TCN 전용 입력만 추립니다. + loader = DataLoader(lookback=args.seq_len, horizons=HORIZONS) + raw_df = loader.load_data_from_db( + start_date=args.start_date, + end_date=args.end_date, + tickers=args.tickers, + ) + + if raw_df.empty: + raise ValueError("No raw price data loaded from DB.") + + X, y = build_sequences(raw_df, args.seq_len, FEATURE_COLUMNS, HORIZONS) + + # 시계열 순서를 유지한 채 뒤쪽 구간을 검증셋으로 둡니다. + split_idx = max(int(len(X) * 0.8), 1) + if split_idx >= len(X): + split_idx = len(X) - 1 + + X_train, X_val = X[:split_idx], X[split_idx:] + y_train, y_val = y[:split_idx], y[split_idx:] + + scaler = fit_scaler(X_train) + X_train = transform_sequences(X_train, scaler) + X_val = transform_sequences(X_val, scaler) + + train_dataset = TensorDataset(torch.from_numpy(X_train), torch.from_numpy(y_train)) + val_dataset = TensorDataset(torch.from_numpy(X_val), torch.from_numpy(y_val)) + + train_loader = TorchDataLoader(train_dataset, batch_size=args.batch_size, shuffle=True) + val_loader = TorchDataLoader(val_dataset, batch_size=args.batch_size, shuffle=False) + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + # output_size는 horizon 개수와 동일합니다. + model = TCNClassifier( + input_size=len(FEATURE_COLUMNS), + output_size=len(HORIZONS), + num_channels=args.channels, + kernel_size=args.kernel_size, + dropout=args.dropout, + ).to(device) + + criterion = nn.BCEWithLogitsLoss() + optimizer = torch.optim.Adam(model.parameters(), lr=args.learning_rate) + + best_val_loss = float("inf") + best_state = None + + for epoch in range(args.epochs): + model.train() + train_loss = 0.0 + for batch_x, batch_y in train_loader: + batch_x = batch_x.to(device) + batch_y = batch_y.to(device) + + optimizer.zero_grad() + logits = model(batch_x) + loss = criterion(logits, batch_y) + loss.backward() + optimizer.step() + + train_loss += loss.item() * batch_x.size(0) + + model.eval() + val_loss = 0.0 + with torch.no_grad(): + for batch_x, batch_y in val_loader: + batch_x = batch_x.to(device) + batch_y = batch_y.to(device) + logits = model(batch_x) + loss = criterion(logits, batch_y) + val_loss += loss.item() * batch_x.size(0) + + train_loss /= len(train_dataset) + val_loss /= len(val_dataset) + print( + f"Epoch {epoch + 1}/{args.epochs} " + f"- train_loss: {train_loss:.4f} val_loss: {val_loss:.4f}" + ) + + if val_loss < best_val_loss: + best_val_loss = val_loss + best_state = model.state_dict() + + if best_state is None: + best_state = model.state_dict() + + # wrapper가 바로 사용할 수 있도록 가중치, scaler, 메타데이터를 함께 저장합니다. + os.makedirs(args.output_dir, exist_ok=True) + model_path = os.path.join(args.output_dir, "model.pt") + scaler_path = os.path.join(args.output_dir, "scaler.pkl") + metadata_path = os.path.join(args.output_dir, "metadata.json") + + torch.save(best_state, model_path) + with open(scaler_path, "wb") as f: + pickle.dump(scaler, f) + + metadata: Dict[str, object] = { + "feature_columns": FEATURE_COLUMNS, + "horizons": HORIZONS, + "seq_len": args.seq_len, + "kernel_size": args.kernel_size, + "dropout": args.dropout, + "channels": args.channels, + "model_path": model_path, + "scaler_path": scaler_path, + } + with open(metadata_path, "w", encoding="utf-8") as f: + json.dump(metadata, f, ensure_ascii=True, indent=2) + + print(f"Saved model to: {model_path}") + print(f"Saved scaler to: {scaler_path}") + print(f"Saved metadata to: {metadata_path}") + + +def parse_args() -> argparse.Namespace: + # 실험 시 종목, 기간, 모델 폭을 CLI에서 바로 바꿀 수 있게 둡니다. + parser = argparse.ArgumentParser(description="Train TCN signal model.") + parser.add_argument("--start-date", default="2018-01-01") + parser.add_argument("--end-date", default=None) + parser.add_argument("--tickers", nargs="*", default=None) + parser.add_argument("--seq-len", type=int, default=60) + parser.add_argument("--epochs", type=int, default=20) + parser.add_argument("--batch-size", type=int, default=64) + parser.add_argument("--learning-rate", type=float, default=1e-3) + parser.add_argument("--kernel-size", type=int, default=3) + parser.add_argument("--dropout", type=float, default=0.2) + parser.add_argument( + "--channels", + type=int, + nargs="+", + default=[32, 64, 64], + ) + parser.add_argument( + "--output-dir", + default=os.path.join(project_root, "AI", "data", "weights", "tcn"), + ) + return parser.parse_args() + + +if __name__ == "__main__": + train_model(parse_args()) diff --git a/AI/modules/signal/models/TCN/wrapper.py b/AI/modules/signal/models/TCN/wrapper.py index 6981249d..43ecaed8 100644 --- a/AI/modules/signal/models/TCN/wrapper.py +++ b/AI/modules/signal/models/TCN/wrapper.py @@ -1,66 +1,209 @@ -# AI/modules/signal/models/TCN/wrapper.py +import json +import os +import pickle +from typing import Any, Dict, Optional + +import numpy as np +import pandas as pd import torch import torch.nn as nn -import numpy as np -from typing import Optional, Dict, Any -from ...core.base_model import BaseSignalModel - -# (간단한 TCN 아키텍처 내부 클래스 혹은 별도 파일 import) -class SimpleTCN(nn.Module): - def __init__(self, input_size, output_size, num_channels, kernel_size, dropout): - super(SimpleTCN, self).__init__() - # 예시: 1D Conv 레이어 스택 - self.net = nn.Sequential( - nn.Conv1d(input_size, num_channels[0], kernel_size, padding=(kernel_size-1)//2), - nn.ReLU(), - nn.Dropout(dropout), - nn.AdaptiveAvgPool1d(1), # Global Pooling - nn.Flatten(), - nn.Linear(num_channels[0], output_size) - ) - def forward(self, x): - # x: [Batch, Seq, Feat] -> [Batch, Feat, Seq] (Conv1d 입력) - x = x.permute(0, 2, 1) - return self.net(x) + +from AI.modules.features.legacy.technical_features import add_technical_indicators +from AI.modules.signal.core.base_model import BaseSignalModel +from AI.modules.signal.models.TCN.architecture import TCNClassifier + + +DEFAULT_FEATURE_COLUMNS = [ + "log_return", + "open_ratio", + "high_ratio", + "low_ratio", + "vol_change", + "ma5_ratio", + "ma20_ratio", + "ma60_ratio", + "rsi", + "macd_ratio", + "bb_position", +] + +DEFAULT_HORIZONS = [1, 3, 5, 7] + class TCNWrapper(BaseSignalModel): """ - [TCN 구현체] BaseSignalModel 인터페이스 준수 - - 용도: 단기 패턴 포착 (Local Pattern) + TCN inference adapter for the service pipeline. + predict(df)가 호출되면 feature 생성, scaling, window slicing, 추론까지 내부에서 처리합니다. """ + def __init__(self, config: Dict[str, Any]): super().__init__(config) - self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') - self.model = None + self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + self.seq_len = int(config.get("seq_len", 60)) + self.feature_columns = config.get("feature_columns", DEFAULT_FEATURE_COLUMNS) + self.horizons = config.get("horizons", DEFAULT_HORIZONS) + self.channels = config.get("channels", [32, 64, 64]) + self.kernel_size = int(config.get("kernel_size", 3)) + self.dropout = float(config.get("dropout", 0.2)) + self.scaler = None + self.metadata = {} + + base_dir = config.get( + "weights_dir", + os.path.join( + os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../../..")), + "AI", + "data", + "weights", + "tcn", + ), + ) + self.weights_dir = base_dir + self.model_path = config.get("model_path", os.path.join(base_dir, "model.pt")) + self.scaler_path = config.get("scaler_path", os.path.join(base_dir, "scaler.pkl")) + self.metadata_path = config.get("metadata_path", os.path.join(base_dir, "metadata.json")) def build(self, input_shape: tuple): - # input_shape: (seq_len, num_features) - self.model = SimpleTCN( - input_size=input_shape[1], # features - output_size=1, # binary classification - num_channels=[self.config.get('hidden_dim', 64)], - kernel_size=self.config.get('kernel_size', 3), - dropout=self.config.get('dropout', 0.2) + # 학습 메타데이터 기준 shape로 TCN 본체를 복원합니다. + self.model = TCNClassifier( + input_size=input_shape[1], + output_size=len(self.horizons), + num_channels=self.channels, + kernel_size=self.kernel_size, + dropout=self.dropout, ).to(self.device) - def train(self, X_train: np.ndarray, y_train: np.ndarray, **kwargs): - if self.model is None: self.build(X_train.shape[1:]) - # (PatchTST와 유사한 학습 루프 - 생략 또는 공통화 가능) - pass + def train( + self, + X_train: np.ndarray, + y_train: np.ndarray, + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None, + **kwargs, + ): + # wrapper 단독 테스트용 학습 루프입니다. 실제 대규모 학습은 train.py 사용을 기준으로 둡니다. + if self.model is None: + self.build(X_train.shape[1:]) + + criterion = nn.BCEWithLogitsLoss() + optimizer = torch.optim.Adam( + self.model.parameters(), + lr=float(kwargs.get("learning_rate", self.config.get("learning_rate", 1e-3))), + ) + epochs = int(kwargs.get("epochs", self.config.get("epochs", 20))) + batch_size = int(kwargs.get("batch_size", self.config.get("batch_size", 64))) + + X_tensor = torch.from_numpy(X_train).float().to(self.device) + y_tensor = torch.from_numpy(y_train).float().to(self.device) + + for epoch in range(epochs): + self.model.train() + permutation = torch.randperm(X_tensor.size(0), device=self.device) + epoch_loss = 0.0 + + for i in range(0, X_tensor.size(0), batch_size): + indices = permutation[i : i + batch_size] + batch_x = X_tensor[indices] + batch_y = y_tensor[indices] + + optimizer.zero_grad() + logits = self.model(batch_x) + loss = criterion(logits, batch_y) + loss.backward() + optimizer.step() + epoch_loss += loss.item() + + if (epoch + 1) % 5 == 0: + print(f"Epoch [{epoch + 1}/{epochs}] Loss: {epoch_loss:.4f}") + + def _load_artifacts(self): + # metadata -> scaler -> model 순서로 읽어 추론에 필요한 상태를 복원합니다. + if self.metadata_path and os.path.exists(self.metadata_path): + with open(self.metadata_path, "r", encoding="utf-8") as f: + self.metadata = json.load(f) + self.feature_columns = self.metadata.get("feature_columns", self.feature_columns) + self.horizons = self.metadata.get("horizons", self.horizons) + self.seq_len = int(self.metadata.get("seq_len", self.seq_len)) + self.channels = self.metadata.get("channels", self.channels) + self.kernel_size = int(self.metadata.get("kernel_size", self.kernel_size)) + self.dropout = float(self.metadata.get("dropout", self.dropout)) + + if self.scaler is None and os.path.exists(self.scaler_path): + with open(self.scaler_path, "rb") as f: + self.scaler = pickle.load(f) + + if self.model is None: + self.build((self.seq_len, len(self.feature_columns))) + + if self.model is not None and os.path.exists(self.model_path): + state_dict = torch.load(self.model_path, map_location=self.device) + self.model.load_state_dict(state_dict) + self.model.eval() + + def _prepare_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: + # 서비스 파이프라인이 넘겨준 원본 df에서 TCN용 기술지표를 생성합니다. + if df is None or df.empty: + raise ValueError("Input dataframe is empty.") + + prepared = add_technical_indicators(df.copy()) + missing = [col for col in self.feature_columns if col not in prepared.columns] + if missing: + raise ValueError(f"Missing required TCN feature columns: {missing}") + + prepared = prepared.replace([np.inf, -np.inf], np.nan).fillna(0) + return prepared + + def _prepare_input_tensor(self, df: pd.DataFrame) -> torch.Tensor: + # 최근 seq_len 구간만 잘라서 학습 때와 동일한 feature 순서/스케일로 맞춥니다. + prepared = self._prepare_dataframe(df) + feature_frame = prepared[self.feature_columns] + + if len(feature_frame) < self.seq_len: + raise ValueError( + f"Not enough rows for TCN inference. Required {self.seq_len}, got {len(feature_frame)}." + ) + + latest_window = feature_frame.tail(self.seq_len).to_numpy(dtype=np.float32) + if self.scaler is not None: + latest_window = self.scaler.transform(latest_window).astype(np.float32) + + batch = np.expand_dims(latest_window, axis=0) + return torch.from_numpy(batch).float().to(self.device) + + def predict(self, X_input) -> Dict[str, float]: + # DataFrame 입력이 기본 경로이며, 테스트 편의를 위해 ndarray도 허용합니다. + self._load_artifacts() + + if self.model is None: + raise ValueError("TCN model is not available.") + + if isinstance(X_input, pd.DataFrame): + tensor_x = self._prepare_input_tensor(X_input) + else: + array_x = np.asarray(X_input, dtype=np.float32) + if array_x.ndim == 2: + array_x = np.expand_dims(array_x, axis=0) + tensor_x = torch.from_numpy(array_x).float().to(self.device) - def predict(self, X_input: np.ndarray) -> np.ndarray: - if self.model is None: raise Exception("Model not built") self.model.eval() with torch.no_grad(): - tensor_x = torch.FloatTensor(X_input).to(self.device) - out = self.model(tensor_x) - return torch.sigmoid(out).cpu().numpy() + logits = self.model(tensor_x) + probs = torch.sigmoid(logits).cpu().numpy().flatten() + + # 포트폴리오 파이프라인이 바로 읽을 수 있도록 horizon별 dict 형태로 반환합니다. + return { + f"tcn_{horizon}d": float(prob) + for horizon, prob in zip(self.horizons, probs) + } def save(self, filepath: str): + # 수동 저장이 필요한 경우 wrapper에서도 state_dict 저장이 가능합니다. + if self.model is None: + raise ValueError("No TCN model to save.") + os.makedirs(os.path.dirname(filepath), exist_ok=True) torch.save(self.model.state_dict(), filepath) def load(self, filepath: str): - # 빌드 후 로드 - if self.model is None: - self.build((self.config.get('seq_len', 30), self.config.get('enc_in', 7))) - self.model.load_state_dict(torch.load(filepath, map_location=self.device)) \ No newline at end of file + # 외부 경로의 가중치를 덮어쓸 수 있게 model_path를 갱신한 뒤 공통 로더를 재사용합니다. + self.model_path = filepath + self._load_artifacts() From fda87a1e705e534d30fa34fe3d1ae69d8ed9ab01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EC=84=B1=EC=9D=BC?= Date: Sun, 8 Mar 2026 22:19:52 +0900 Subject: [PATCH 2/6] =?UTF-8?q?[AI][FEAT]:=20TCN=20=EB=AA=A8=EB=8D=B8=20?= =?UTF-8?q?=EB=BC=88=EB=8C=80=20=EB=B0=8F=20=ED=95=99=EC=8A=B5/=EC=B6=94?= =?UTF-8?q?=EB=A1=A0=20=EC=96=B4=EB=8C=91=ED=84=B0=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 4 +- AI/modules/signal/models/TCN/architecture.py | 102 ++++++++ AI/modules/signal/models/TCN/train.py | 241 +++++++++++++++++++ AI/modules/signal/models/TCN/wrapper.py | 235 ++++++++++++++---- 4 files changed, 535 insertions(+), 47 deletions(-) create mode 100644 AI/modules/signal/models/TCN/architecture.py create mode 100644 AI/modules/signal/models/TCN/train.py diff --git a/.gitignore b/.gitignore index 6b86470f..c268f163 100644 --- a/.gitignore +++ b/.gitignore @@ -28,9 +28,11 @@ __pycache__/ /env /.vs /.venv/ +AI/.venv/ +AI/data/weights/tcn/ # ===== Backend ===== backend/src/main/java/org/sejongisc/backend/stock/TestController.java # ===== windows ===== -.desktop.ini \ No newline at end of file +.desktop.ini diff --git a/AI/modules/signal/models/TCN/architecture.py b/AI/modules/signal/models/TCN/architecture.py new file mode 100644 index 00000000..6412d154 --- /dev/null +++ b/AI/modules/signal/models/TCN/architecture.py @@ -0,0 +1,102 @@ +import torch +import torch.nn as nn +from typing import List + + +class Chomp1d(nn.Module): + # Causal padding 뒤에 생기는 미래 시점 누수를 잘라냅니다. + def __init__(self, chomp_size: int): + super().__init__() + self.chomp_size = chomp_size + + def forward(self, x: torch.Tensor) -> torch.Tensor: + if self.chomp_size == 0: + return x + return x[:, :, :-self.chomp_size].contiguous() + + +class TemporalBlock(nn.Module): + # 두 개의 dilated Conv1d와 residual connection으로 TCN의 기본 블록을 구성합니다. + def __init__( + self, + in_channels: int, + out_channels: int, + kernel_size: int, + dilation: int, + dropout: float, + ): + super().__init__() + padding = (kernel_size - 1) * dilation + + self.net = nn.Sequential( + nn.Conv1d( + in_channels, + out_channels, + kernel_size, + padding=padding, + dilation=dilation, + ), + Chomp1d(padding), + nn.ReLU(), + nn.Dropout(dropout), + nn.Conv1d( + out_channels, + out_channels, + kernel_size, + padding=padding, + dilation=dilation, + ), + Chomp1d(padding), + nn.ReLU(), + nn.Dropout(dropout), + ) + self.downsample = ( + nn.Conv1d(in_channels, out_channels, kernel_size=1) + if in_channels != out_channels + else None + ) + self.activation = nn.ReLU() + + def forward(self, x: torch.Tensor) -> torch.Tensor: + residual = x if self.downsample is None else self.downsample(x) + return self.activation(self.net(x) + residual) + + +class TCNClassifier(nn.Module): + # 여러 개의 TemporalBlock을 쌓아 멀티 호라이즌 이진 분류 logits를 출력합니다. + def __init__( + self, + input_size: int, + output_size: int, + num_channels: List[int], + kernel_size: int = 3, + dropout: float = 0.2, + ): + super().__init__() + + layers = [] + for i, out_channels in enumerate(num_channels): + in_channels = input_size if i == 0 else num_channels[i - 1] + dilation = 2 ** i + layers.append( + TemporalBlock( + in_channels=in_channels, + out_channels=out_channels, + kernel_size=kernel_size, + dilation=dilation, + dropout=dropout, + ) + ) + + self.backbone = nn.Sequential(*layers) + self.head = nn.Sequential( + nn.AdaptiveAvgPool1d(1), + nn.Flatten(), + nn.Linear(num_channels[-1], output_size), + ) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + # 입력은 [Batch, Seq, Features]이며 Conv1d에 맞게 [Batch, Features, Seq]로 바꿉니다. + x = x.permute(0, 2, 1) + x = self.backbone(x) + return self.head(x) diff --git a/AI/modules/signal/models/TCN/train.py b/AI/modules/signal/models/TCN/train.py new file mode 100644 index 00000000..4adc3d8d --- /dev/null +++ b/AI/modules/signal/models/TCN/train.py @@ -0,0 +1,241 @@ +import argparse +import json +import os +import pickle +import sys +from typing import Dict, List, Tuple + +import numpy as np +import pandas as pd +import torch +import torch.nn as nn +from sklearn.preprocessing import StandardScaler +from torch.utils.data import DataLoader as TorchDataLoader +from torch.utils.data import TensorDataset + +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../../../../..")) +if project_root not in sys.path: + sys.path.append(project_root) + +from AI.modules.features.legacy.technical_features import add_technical_indicators +from AI.modules.signal.core.data_loader import DataLoader +from AI.modules.signal.models.TCN.architecture import TCNClassifier + + +# TCN은 명세에 맞춰 개별 기술적 지표만 사용합니다. +FEATURE_COLUMNS = [ + "log_return", + "open_ratio", + "high_ratio", + "low_ratio", + "vol_change", + "ma5_ratio", + "ma20_ratio", + "ma60_ratio", + "rsi", + "macd_ratio", + "bb_position", +] + +# 서비스 추론 결과와 맞추기 위해 1/3/5/7일 방향성을 동시에 학습합니다. +HORIZONS = [1, 3, 5, 7] + + +def build_sequences( + df: pd.DataFrame, + seq_len: int, + feature_cols: List[str], + horizons: List[int], +) -> Tuple[np.ndarray, np.ndarray]: + # 종목별 시계열을 순회하면서 [seq_len, features] 윈도우와 멀티 호라이즌 라벨을 만듭니다. + features = [] + labels = [] + max_horizon = max(horizons) + + for _, sub_df in df.groupby("ticker"): + sub_df = sub_df.sort_values("date").copy() + sub_df = add_technical_indicators(sub_df) + sub_df = sub_df.dropna(subset=["close"]) + sub_df = sub_df.replace([np.inf, -np.inf], np.nan).fillna(0) + + if len(sub_df) < seq_len + max_horizon: + continue + + feature_values = sub_df[feature_cols].to_numpy(dtype=np.float32) + closes = sub_df["close"].to_numpy(dtype=np.float32) + + for start in range(len(sub_df) - seq_len - max_horizon + 1): + end = start + seq_len + current_close = closes[end - 1] + target = [] + for horizon in horizons: + future_close = closes[end + horizon - 1] + target.append(1.0 if future_close > current_close else 0.0) + + features.append(feature_values[start:end]) + labels.append(target) + + if not features: + raise ValueError("No training sequences were created. Check DB data coverage.") + + return np.array(features, dtype=np.float32), np.array(labels, dtype=np.float32) + + +def fit_scaler(X_train: np.ndarray) -> StandardScaler: + # 시퀀스 축을 펼쳐 feature 단위로 표준화 스케일러를 학습합니다. + scaler = StandardScaler() + scaler.fit(X_train.reshape(-1, X_train.shape[-1])) + return scaler + + +def transform_sequences(X: np.ndarray, scaler: StandardScaler) -> np.ndarray: + # 학습 시 저장한 스케일러를 시퀀스 전체에 동일하게 적용합니다. + shape = X.shape + flat = X.reshape(-1, shape[-1]) + scaled = scaler.transform(flat) + return scaled.reshape(shape).astype(np.float32) + + +def train_model(args: argparse.Namespace): + # 공통 DataLoader로 DB에서 가격 데이터를 읽고 TCN 전용 입력만 추립니다. + loader = DataLoader(lookback=args.seq_len, horizons=HORIZONS) + raw_df = loader.load_data_from_db( + start_date=args.start_date, + end_date=args.end_date, + tickers=args.tickers, + ) + + if raw_df.empty: + raise ValueError("No raw price data loaded from DB.") + + X, y = build_sequences(raw_df, args.seq_len, FEATURE_COLUMNS, HORIZONS) + + # 시계열 순서를 유지한 채 뒤쪽 구간을 검증셋으로 둡니다. + split_idx = max(int(len(X) * 0.8), 1) + if split_idx >= len(X): + split_idx = len(X) - 1 + + X_train, X_val = X[:split_idx], X[split_idx:] + y_train, y_val = y[:split_idx], y[split_idx:] + + scaler = fit_scaler(X_train) + X_train = transform_sequences(X_train, scaler) + X_val = transform_sequences(X_val, scaler) + + train_dataset = TensorDataset(torch.from_numpy(X_train), torch.from_numpy(y_train)) + val_dataset = TensorDataset(torch.from_numpy(X_val), torch.from_numpy(y_val)) + + train_loader = TorchDataLoader(train_dataset, batch_size=args.batch_size, shuffle=True) + val_loader = TorchDataLoader(val_dataset, batch_size=args.batch_size, shuffle=False) + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + # output_size는 horizon 개수와 동일합니다. + model = TCNClassifier( + input_size=len(FEATURE_COLUMNS), + output_size=len(HORIZONS), + num_channels=args.channels, + kernel_size=args.kernel_size, + dropout=args.dropout, + ).to(device) + + criterion = nn.BCEWithLogitsLoss() + optimizer = torch.optim.Adam(model.parameters(), lr=args.learning_rate) + + best_val_loss = float("inf") + best_state = None + + for epoch in range(args.epochs): + model.train() + train_loss = 0.0 + for batch_x, batch_y in train_loader: + batch_x = batch_x.to(device) + batch_y = batch_y.to(device) + + optimizer.zero_grad() + logits = model(batch_x) + loss = criterion(logits, batch_y) + loss.backward() + optimizer.step() + + train_loss += loss.item() * batch_x.size(0) + + model.eval() + val_loss = 0.0 + with torch.no_grad(): + for batch_x, batch_y in val_loader: + batch_x = batch_x.to(device) + batch_y = batch_y.to(device) + logits = model(batch_x) + loss = criterion(logits, batch_y) + val_loss += loss.item() * batch_x.size(0) + + train_loss /= len(train_dataset) + val_loss /= len(val_dataset) + print( + f"Epoch {epoch + 1}/{args.epochs} " + f"- train_loss: {train_loss:.4f} val_loss: {val_loss:.4f}" + ) + + if val_loss < best_val_loss: + best_val_loss = val_loss + best_state = model.state_dict() + + if best_state is None: + best_state = model.state_dict() + + # wrapper가 바로 사용할 수 있도록 가중치, scaler, 메타데이터를 함께 저장합니다. + os.makedirs(args.output_dir, exist_ok=True) + model_path = os.path.join(args.output_dir, "model.pt") + scaler_path = os.path.join(args.output_dir, "scaler.pkl") + metadata_path = os.path.join(args.output_dir, "metadata.json") + + torch.save(best_state, model_path) + with open(scaler_path, "wb") as f: + pickle.dump(scaler, f) + + metadata: Dict[str, object] = { + "feature_columns": FEATURE_COLUMNS, + "horizons": HORIZONS, + "seq_len": args.seq_len, + "kernel_size": args.kernel_size, + "dropout": args.dropout, + "channels": args.channels, + "model_path": model_path, + "scaler_path": scaler_path, + } + with open(metadata_path, "w", encoding="utf-8") as f: + json.dump(metadata, f, ensure_ascii=True, indent=2) + + print(f"Saved model to: {model_path}") + print(f"Saved scaler to: {scaler_path}") + print(f"Saved metadata to: {metadata_path}") + + +def parse_args() -> argparse.Namespace: + # 실험 시 종목, 기간, 모델 폭을 CLI에서 바로 바꿀 수 있게 둡니다. + parser = argparse.ArgumentParser(description="Train TCN signal model.") + parser.add_argument("--start-date", default="2018-01-01") + parser.add_argument("--end-date", default=None) + parser.add_argument("--tickers", nargs="*", default=None) + parser.add_argument("--seq-len", type=int, default=60) + parser.add_argument("--epochs", type=int, default=20) + parser.add_argument("--batch-size", type=int, default=64) + parser.add_argument("--learning-rate", type=float, default=1e-3) + parser.add_argument("--kernel-size", type=int, default=3) + parser.add_argument("--dropout", type=float, default=0.2) + parser.add_argument( + "--channels", + type=int, + nargs="+", + default=[32, 64, 64], + ) + parser.add_argument( + "--output-dir", + default=os.path.join(project_root, "AI", "data", "weights", "tcn"), + ) + return parser.parse_args() + + +if __name__ == "__main__": + train_model(parse_args()) diff --git a/AI/modules/signal/models/TCN/wrapper.py b/AI/modules/signal/models/TCN/wrapper.py index 6981249d..43ecaed8 100644 --- a/AI/modules/signal/models/TCN/wrapper.py +++ b/AI/modules/signal/models/TCN/wrapper.py @@ -1,66 +1,209 @@ -# AI/modules/signal/models/TCN/wrapper.py +import json +import os +import pickle +from typing import Any, Dict, Optional + +import numpy as np +import pandas as pd import torch import torch.nn as nn -import numpy as np -from typing import Optional, Dict, Any -from ...core.base_model import BaseSignalModel - -# (간단한 TCN 아키텍처 내부 클래스 혹은 별도 파일 import) -class SimpleTCN(nn.Module): - def __init__(self, input_size, output_size, num_channels, kernel_size, dropout): - super(SimpleTCN, self).__init__() - # 예시: 1D Conv 레이어 스택 - self.net = nn.Sequential( - nn.Conv1d(input_size, num_channels[0], kernel_size, padding=(kernel_size-1)//2), - nn.ReLU(), - nn.Dropout(dropout), - nn.AdaptiveAvgPool1d(1), # Global Pooling - nn.Flatten(), - nn.Linear(num_channels[0], output_size) - ) - def forward(self, x): - # x: [Batch, Seq, Feat] -> [Batch, Feat, Seq] (Conv1d 입력) - x = x.permute(0, 2, 1) - return self.net(x) + +from AI.modules.features.legacy.technical_features import add_technical_indicators +from AI.modules.signal.core.base_model import BaseSignalModel +from AI.modules.signal.models.TCN.architecture import TCNClassifier + + +DEFAULT_FEATURE_COLUMNS = [ + "log_return", + "open_ratio", + "high_ratio", + "low_ratio", + "vol_change", + "ma5_ratio", + "ma20_ratio", + "ma60_ratio", + "rsi", + "macd_ratio", + "bb_position", +] + +DEFAULT_HORIZONS = [1, 3, 5, 7] + class TCNWrapper(BaseSignalModel): """ - [TCN 구현체] BaseSignalModel 인터페이스 준수 - - 용도: 단기 패턴 포착 (Local Pattern) + TCN inference adapter for the service pipeline. + predict(df)가 호출되면 feature 생성, scaling, window slicing, 추론까지 내부에서 처리합니다. """ + def __init__(self, config: Dict[str, Any]): super().__init__(config) - self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') - self.model = None + self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + self.seq_len = int(config.get("seq_len", 60)) + self.feature_columns = config.get("feature_columns", DEFAULT_FEATURE_COLUMNS) + self.horizons = config.get("horizons", DEFAULT_HORIZONS) + self.channels = config.get("channels", [32, 64, 64]) + self.kernel_size = int(config.get("kernel_size", 3)) + self.dropout = float(config.get("dropout", 0.2)) + self.scaler = None + self.metadata = {} + + base_dir = config.get( + "weights_dir", + os.path.join( + os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../../..")), + "AI", + "data", + "weights", + "tcn", + ), + ) + self.weights_dir = base_dir + self.model_path = config.get("model_path", os.path.join(base_dir, "model.pt")) + self.scaler_path = config.get("scaler_path", os.path.join(base_dir, "scaler.pkl")) + self.metadata_path = config.get("metadata_path", os.path.join(base_dir, "metadata.json")) def build(self, input_shape: tuple): - # input_shape: (seq_len, num_features) - self.model = SimpleTCN( - input_size=input_shape[1], # features - output_size=1, # binary classification - num_channels=[self.config.get('hidden_dim', 64)], - kernel_size=self.config.get('kernel_size', 3), - dropout=self.config.get('dropout', 0.2) + # 학습 메타데이터 기준 shape로 TCN 본체를 복원합니다. + self.model = TCNClassifier( + input_size=input_shape[1], + output_size=len(self.horizons), + num_channels=self.channels, + kernel_size=self.kernel_size, + dropout=self.dropout, ).to(self.device) - def train(self, X_train: np.ndarray, y_train: np.ndarray, **kwargs): - if self.model is None: self.build(X_train.shape[1:]) - # (PatchTST와 유사한 학습 루프 - 생략 또는 공통화 가능) - pass + def train( + self, + X_train: np.ndarray, + y_train: np.ndarray, + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None, + **kwargs, + ): + # wrapper 단독 테스트용 학습 루프입니다. 실제 대규모 학습은 train.py 사용을 기준으로 둡니다. + if self.model is None: + self.build(X_train.shape[1:]) + + criterion = nn.BCEWithLogitsLoss() + optimizer = torch.optim.Adam( + self.model.parameters(), + lr=float(kwargs.get("learning_rate", self.config.get("learning_rate", 1e-3))), + ) + epochs = int(kwargs.get("epochs", self.config.get("epochs", 20))) + batch_size = int(kwargs.get("batch_size", self.config.get("batch_size", 64))) + + X_tensor = torch.from_numpy(X_train).float().to(self.device) + y_tensor = torch.from_numpy(y_train).float().to(self.device) + + for epoch in range(epochs): + self.model.train() + permutation = torch.randperm(X_tensor.size(0), device=self.device) + epoch_loss = 0.0 + + for i in range(0, X_tensor.size(0), batch_size): + indices = permutation[i : i + batch_size] + batch_x = X_tensor[indices] + batch_y = y_tensor[indices] + + optimizer.zero_grad() + logits = self.model(batch_x) + loss = criterion(logits, batch_y) + loss.backward() + optimizer.step() + epoch_loss += loss.item() + + if (epoch + 1) % 5 == 0: + print(f"Epoch [{epoch + 1}/{epochs}] Loss: {epoch_loss:.4f}") + + def _load_artifacts(self): + # metadata -> scaler -> model 순서로 읽어 추론에 필요한 상태를 복원합니다. + if self.metadata_path and os.path.exists(self.metadata_path): + with open(self.metadata_path, "r", encoding="utf-8") as f: + self.metadata = json.load(f) + self.feature_columns = self.metadata.get("feature_columns", self.feature_columns) + self.horizons = self.metadata.get("horizons", self.horizons) + self.seq_len = int(self.metadata.get("seq_len", self.seq_len)) + self.channels = self.metadata.get("channels", self.channels) + self.kernel_size = int(self.metadata.get("kernel_size", self.kernel_size)) + self.dropout = float(self.metadata.get("dropout", self.dropout)) + + if self.scaler is None and os.path.exists(self.scaler_path): + with open(self.scaler_path, "rb") as f: + self.scaler = pickle.load(f) + + if self.model is None: + self.build((self.seq_len, len(self.feature_columns))) + + if self.model is not None and os.path.exists(self.model_path): + state_dict = torch.load(self.model_path, map_location=self.device) + self.model.load_state_dict(state_dict) + self.model.eval() + + def _prepare_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: + # 서비스 파이프라인이 넘겨준 원본 df에서 TCN용 기술지표를 생성합니다. + if df is None or df.empty: + raise ValueError("Input dataframe is empty.") + + prepared = add_technical_indicators(df.copy()) + missing = [col for col in self.feature_columns if col not in prepared.columns] + if missing: + raise ValueError(f"Missing required TCN feature columns: {missing}") + + prepared = prepared.replace([np.inf, -np.inf], np.nan).fillna(0) + return prepared + + def _prepare_input_tensor(self, df: pd.DataFrame) -> torch.Tensor: + # 최근 seq_len 구간만 잘라서 학습 때와 동일한 feature 순서/스케일로 맞춥니다. + prepared = self._prepare_dataframe(df) + feature_frame = prepared[self.feature_columns] + + if len(feature_frame) < self.seq_len: + raise ValueError( + f"Not enough rows for TCN inference. Required {self.seq_len}, got {len(feature_frame)}." + ) + + latest_window = feature_frame.tail(self.seq_len).to_numpy(dtype=np.float32) + if self.scaler is not None: + latest_window = self.scaler.transform(latest_window).astype(np.float32) + + batch = np.expand_dims(latest_window, axis=0) + return torch.from_numpy(batch).float().to(self.device) + + def predict(self, X_input) -> Dict[str, float]: + # DataFrame 입력이 기본 경로이며, 테스트 편의를 위해 ndarray도 허용합니다. + self._load_artifacts() + + if self.model is None: + raise ValueError("TCN model is not available.") + + if isinstance(X_input, pd.DataFrame): + tensor_x = self._prepare_input_tensor(X_input) + else: + array_x = np.asarray(X_input, dtype=np.float32) + if array_x.ndim == 2: + array_x = np.expand_dims(array_x, axis=0) + tensor_x = torch.from_numpy(array_x).float().to(self.device) - def predict(self, X_input: np.ndarray) -> np.ndarray: - if self.model is None: raise Exception("Model not built") self.model.eval() with torch.no_grad(): - tensor_x = torch.FloatTensor(X_input).to(self.device) - out = self.model(tensor_x) - return torch.sigmoid(out).cpu().numpy() + logits = self.model(tensor_x) + probs = torch.sigmoid(logits).cpu().numpy().flatten() + + # 포트폴리오 파이프라인이 바로 읽을 수 있도록 horizon별 dict 형태로 반환합니다. + return { + f"tcn_{horizon}d": float(prob) + for horizon, prob in zip(self.horizons, probs) + } def save(self, filepath: str): + # 수동 저장이 필요한 경우 wrapper에서도 state_dict 저장이 가능합니다. + if self.model is None: + raise ValueError("No TCN model to save.") + os.makedirs(os.path.dirname(filepath), exist_ok=True) torch.save(self.model.state_dict(), filepath) def load(self, filepath: str): - # 빌드 후 로드 - if self.model is None: - self.build((self.config.get('seq_len', 30), self.config.get('enc_in', 7))) - self.model.load_state_dict(torch.load(filepath, map_location=self.device)) \ No newline at end of file + # 외부 경로의 가중치를 덮어쓸 수 있게 model_path를 갱신한 뒤 공통 로더를 재사용합니다. + self.model_path = filepath + self._load_artifacts() From 3d5359c11a26740929da1b4ca4e59026b423f98e Mon Sep 17 00:00:00 2001 From: twq110 Date: Thu, 12 Mar 2026 16:41:02 +0900 Subject: [PATCH 3/6] =?UTF-8?q?[AI]=20SISC-290=20[FIX]=20=20TCN=20wrapper?= =?UTF-8?q?=20=EC=B5=9C=EC=A0=81=ED=99=94(=EC=A4=91=EB=B3=B5=20=EB=A3=A8?= =?UTF-8?q?=ED=94=84=20=EC=A0=9C=EA=B1=B0)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AI/modules/signal/models/TCN/wrapper.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/AI/modules/signal/models/TCN/wrapper.py b/AI/modules/signal/models/TCN/wrapper.py index 43ecaed8..a2deab0a 100644 --- a/AI/modules/signal/models/TCN/wrapper.py +++ b/AI/modules/signal/models/TCN/wrapper.py @@ -1,7 +1,7 @@ import json import os import pickle -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Union import numpy as np import pandas as pd @@ -47,6 +47,7 @@ def __init__(self, config: Dict[str, Any]): self.dropout = float(config.get("dropout", 0.2)) self.scaler = None self.metadata = {} + self.is_loaded = False #중복로딩 방지 플래그 base_dir = config.get( "weights_dir", @@ -118,6 +119,9 @@ def train( def _load_artifacts(self): # metadata -> scaler -> model 순서로 읽어 추론에 필요한 상태를 복원합니다. + + if self.is_loaded: + return # 이미 로드된 상태라면 중복 로딩을 방지합니다. if self.metadata_path and os.path.exists(self.metadata_path): with open(self.metadata_path, "r", encoding="utf-8") as f: self.metadata = json.load(f) @@ -140,6 +144,8 @@ def _load_artifacts(self): self.model.load_state_dict(state_dict) self.model.eval() + self.is_loaded = True + def _prepare_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: # 서비스 파이프라인이 넘겨준 원본 df에서 TCN용 기술지표를 생성합니다. if df is None or df.empty: @@ -170,7 +176,7 @@ def _prepare_input_tensor(self, df: pd.DataFrame) -> torch.Tensor: batch = np.expand_dims(latest_window, axis=0) return torch.from_numpy(batch).float().to(self.device) - def predict(self, X_input) -> Dict[str, float]: + def predict(self, X_input: Union[pd.DataFrame, np.ndarray]) -> Dict[str, float]: # DataFrame 입력이 기본 경로이며, 테스트 편의를 위해 ndarray도 허용합니다. self._load_artifacts() @@ -204,6 +210,14 @@ def save(self, filepath: str): torch.save(self.model.state_dict(), filepath) def load(self, filepath: str): - # 외부 경로의 가중치를 덮어쓸 수 있게 model_path를 갱신한 뒤 공통 로더를 재사용합니다. + """ + 외부 경로의 가중치를 불러옵니다. + 가중치가 위치한 동일 폴더 내의 scaler 및 metadata를 읽어오도록 경로를 동기화합니다. + """ self.model_path = filepath + target_dir = os.path.dirname(filepath) + self.scaler_path = os.path.join(target_dir, "scaler.pkl") + self.metadata_path = os.path.join(target_dir, "metadata.json") + + self.is_loaded = False # 새 경로로 로드할 때는 중복 로딩 방지 플래그를 초기화합니다. self._load_artifacts() From 762b34a57cb13995013e7b03aa3b7ba6ab00542a Mon Sep 17 00:00:00 2001 From: twq110 Date: Thu, 12 Mar 2026 17:20:30 +0900 Subject: [PATCH 4/6] =?UTF-8?q?[AI]=20SISC-290=20[FIX]=20=20TCN=20wrapper?= =?UTF-8?q?=20=EA=B8=B0=EC=88=A0=EC=A0=81=20=EC=A7=80=ED=91=9C=20=EB=A0=88?= =?UTF-8?q?=EA=B1=B0=EC=8B=9C=20=EB=B2=84=EC=A0=84=20=EC=82=AC=EC=9A=A9=20?= =?UTF-8?q?=ED=94=BD=EC=8A=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AI/modules/features/market_derived.py | 27 ++++++++--- AI/modules/signal/core/dataset_builder.py | 56 +++++++++++++++++------ AI/modules/signal/models/TCN/wrapper.py | 4 +- 3 files changed, 64 insertions(+), 23 deletions(-) diff --git a/AI/modules/features/market_derived.py b/AI/modules/features/market_derived.py index a8d0f7d8..9ce0b602 100644 --- a/AI/modules/features/market_derived.py +++ b/AI/modules/features/market_derived.py @@ -4,11 +4,21 @@ from .technical import compute_rsi, compute_atr, compute_macd, compute_bollinger_bands def add_market_changes(df: pd.DataFrame) -> pd.DataFrame: - """가격 및 거래량 기반 변화율 계산 [명세서 준수]""" + """가격 및 거래량 기반 변화율 계산 [명세서 및 레거시 하위 호환]""" + epsilon = 1e-9 + + # --- [레거시 호환] 기존 모델 학습에 사용되었던 캔들 모양 및 거래량 피처 복구 --- + prev_close = df['close'].shift(1) + df['open_ratio'] = (df['open'] - prev_close) / (prev_close + epsilon) + df['high_ratio'] = (df['high'] - prev_close) / (prev_close + epsilon) + df['low_ratio'] = (df['low'] - prev_close) / (prev_close + epsilon) + df['vol_change'] = df['volume'].pct_change() + + # --- [신규 명세서] 일간 수익률 및 일중 변동성 --- df['ret_1d'] = df['close'].pct_change() df['log_return'] = np.log(df['close'] / df['close'].shift(1)) - # 일중변동성비율 : (High - Low) / Close - df['intraday_vol'] = (df['high'] - df['low']) / (df['close'] + 1e-9) + df['intraday_vol'] = (df['high'] - df['low']) / (df['close'] + epsilon) + return df def add_macro_changes(df: pd.DataFrame) -> pd.DataFrame: @@ -32,8 +42,10 @@ def add_standard_technical_features(df: pd.DataFrame) -> pd.DataFrame: """레거시 로직 + 명세서 신규 지표 통합""" epsilon = 1e-9 - # 1. RSI (rsi_14) + # 1. RSI + # 명세서 기준(rsi_14)과 레거시 모델 호환용(rsi) 컬럼을 모두 생성합니다. df['rsi_14'] = compute_rsi(df['close'], 14) / 100.0 + df['rsi'] = df['rsi_14'] # 2. MACD (macd, macd_signal) df['macd'], df['macd_signal'] = compute_macd(df['close']) @@ -48,11 +60,12 @@ def add_standard_technical_features(df: pd.DataFrame) -> pd.DataFrame: # 모델 입력용 포지션 df['bb_position'] = (df['close'] - df['bollinger_lb']) / ( (df['bollinger_ub'] - df['bollinger_lb']).replace(0, epsilon) ) - # 4. ATR (atr_14) + # 4. ATR (atr_14) - [신규 명세서 지표] df['atr_14'] = compute_atr(df['high'], df['low'], df['close'], 14) - # 5. Moving Averages (ma_20, ma_60) - for w in [20, 60]: + # 5. Moving Averages (ma_5, ma_20, ma_60) + # [레거시 호환] 기존에 사용하던 5일 이평선(ma_5)을 복구하여 배열에 추가했습니다. + for w in [5, 20, 60]: df[f'ma_{w}'] = df['close'].rolling(window=w).mean() # 모델 입력용 이격도 (Standard Key: ma_trend_score 등에 활용) df[f'ma{w}_ratio'] = (df['close'] - df[f'ma_{w}']) / (df[f'ma_{w}'] + epsilon) diff --git a/AI/modules/signal/core/dataset_builder.py b/AI/modules/signal/core/dataset_builder.py index ebf13480..26d8f6c1 100644 --- a/AI/modules/signal/core/dataset_builder.py +++ b/AI/modules/signal/core/dataset_builder.py @@ -1,25 +1,53 @@ # AI/modules/signal/core/dataset_builder.py +import pandas as pd +from typing import Union, Optional from AI.modules.signal.core.data_loader import DataLoader -from AI.modules.features.technical import add_technical_indicators -from AI.modules.features.market_derived import add_macro_features +from AI.modules.features.processor import FeatureProcessor +from AI.modules.features.market_derived import add_market_changes, add_macro_changes -def get_standard_training_data(start_date: str, end_date: str) -> pd.DataFrame: +def apply_strict_nan_rules(df: pd.DataFrame) -> pd.DataFrame: """ - 데이터 수집/전처리 코드를 짤 필요 없이 이 함수만 호출하면 됩니다. - 명세서에 정의된 모든 원천/파생 피처가 포함된 DataFrame을 반환합니다. + SISC 데이터 명세서에 따른 엄격한 결측치 처리 규칙을 적용합니다. """ - # 1. DB에서 원천 데이터(Raw) 로드 - loader = DataLoader() - raw_df = loader.load_data_from_db(start_date, end_date) + df_clean = df.copy() + + macro_cols = ['vix_close', 'vix_change_rate', 'us10y', 'us10y_chg', 'dxy_close', 'dxy_chg'] + available_macro = [col for col in macro_cols if col in df_clean.columns] + + if available_macro: + df_clean[available_macro] = df_clean[available_macro].ffill() + + df_clean = df_clean.dropna().reset_index(drop=True) + return df_clean + +def get_standard_training_data( + start_date_or_df: Union[str, pd.DataFrame], + end_date: Optional[str] = None +) -> pd.DataFrame: + """ + SISC 파이프라인 표준 학습 데이터셋 생성기. (학습/추론 겸용) + - 사용법 1 (학습용): get_standard_training_data('2020-01-01', '2024-01-01') -> DB 조회 + - 사용법 2 (추론용): get_standard_training_data(df) -> DB 생략하고 전처리만 수행 + """ + # 1. 입력 타입에 따른 분기 처리 (DB 로드 vs 직접 주입) + if isinstance(start_date_or_df, pd.DataFrame): + df = start_date_or_df.copy() + else: + loader = DataLoader() + df = loader.load_data_from_db(start_date_or_df, end_date) + if df is None or df.empty: + raise ValueError(f"지정된 기간({start_date_or_df} ~ {end_date})의 데이터를 불러오지 못했습니다.") + + # 2. 파생 피처 레이어 계산 (1차: 기초 변화율 연산) + df = add_market_changes(df) + df = add_macro_changes(df) - # 2. 파생 피처 레이어 계산 - # (팀장님이 기존 features 모듈을 활용해 모든 지표를 미리 계산해서 붙여줌) - df = add_technical_indicators(raw_df) # rsi_14, macd 등 추가 - df = add_macro_features(df) # vix_z_score, us10y_chg 등 추가 + # 3. 파생 피처 레이어 계산 (2차: FeatureProcessor를 통한 심화 지표 일괄 연산) + processor = FeatureProcessor(df) + df = processor.execute_pipeline() - # 3. 결측치(NaN) 처리 - # Market은 Drop, Macro는 ffill 등... + # 4. 결측치(NaN) 처리 규칙 적용 df = apply_strict_nan_rules(df) return df \ No newline at end of file diff --git a/AI/modules/signal/models/TCN/wrapper.py b/AI/modules/signal/models/TCN/wrapper.py index a2deab0a..6f7c0979 100644 --- a/AI/modules/signal/models/TCN/wrapper.py +++ b/AI/modules/signal/models/TCN/wrapper.py @@ -8,7 +8,7 @@ import torch import torch.nn as nn -from AI.modules.features.legacy.technical_features import add_technical_indicators +from AI.modules.signal.core.dataset_builder import get_standard_training_data from AI.modules.signal.core.base_model import BaseSignalModel from AI.modules.signal.models.TCN.architecture import TCNClassifier @@ -151,7 +151,7 @@ def _prepare_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: if df is None or df.empty: raise ValueError("Input dataframe is empty.") - prepared = add_technical_indicators(df.copy()) + prepared = get_standard_training_data(df.copy()) missing = [col for col in self.feature_columns if col not in prepared.columns] if missing: raise ValueError(f"Missing required TCN feature columns: {missing}") From 30a51272d588f2bc3362ffd59cce9788a2ddbba3 Mon Sep 17 00:00:00 2001 From: twq110 Date: Thu, 12 Mar 2026 17:23:12 +0900 Subject: [PATCH 5/6] =?UTF-8?q?[AI]=20SISC-290=20[FEAT]=20TCN=20=EB=AA=A8?= =?UTF-8?q?=EB=8D=B8=20=EB=9E=98=ED=8D=BC=20=ED=85=90=EC=84=9C=20=ED=95=99?= =?UTF-8?q?=EC=8A=B5=20=EC=BD=94=EB=93=9C=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AI/modules/signal/models/TCN/wrapper.py | 60 +++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/AI/modules/signal/models/TCN/wrapper.py b/AI/modules/signal/models/TCN/wrapper.py index 6f7c0979..78596318 100644 --- a/AI/modules/signal/models/TCN/wrapper.py +++ b/AI/modules/signal/models/TCN/wrapper.py @@ -201,6 +201,66 @@ def predict(self, X_input: Union[pd.DataFrame, np.ndarray]) -> Dict[str, float]: f"tcn_{horizon}d": float(prob) for horizon, prob in zip(self.horizons, probs) } + + def predict_batch(self, ticker_data_map: Dict[str, pd.DataFrame]) -> Dict[str, Dict[str, float]]: + """ + [Batch 추론] 여러 종목의 DataFrame을 받아 한 번의 GPU 연산으로 결과를 반환합니다. + 입력: {"AAPL": df_aapl, "MSFT": df_msft, ...} + 출력: {"AAPL": {"tcn_1d": 0.55, ...}, "MSFT": {"tcn_1d": 0.61, ...}} + """ + self._load_artifacts() + + if self.model is None: + raise ValueError("TCN model is not available.") + + valid_tickers = [] + tensor_list = [] + + # 1. 딕셔너리로 받은 종목별 데이터를 순회하며 전처리 및 윈도우 추출 + for ticker, df in ticker_data_map.items(): + try: + # dataset_builder의 다형성을 활용하여 전처리 (에러 발생 종목은 스킵) + prepared = get_standard_training_data(df.copy()) + feature_frame = prepared[self.feature_columns] + + if len(feature_frame) < self.seq_len: + continue # 시퀀스 길이가 부족한 신규 상장 종목 등은 건너뜁니다. + + latest_window = feature_frame.tail(self.seq_len).to_numpy(dtype=np.float32) + + # [참고] Global Scaler를 가정하고 일괄 적용합니다. + if self.scaler is not None: + latest_window = self.scaler.transform(latest_window).astype(np.float32) + + tensor_list.append(latest_window) + valid_tickers.append(ticker) + + except Exception as e: + # 특정 종목 데이터 불량 시 전체 파이프라인이 멈추지 않도록 예외 처리 + print(f"⚠️ [{ticker}] 전처리 실패로 배치 추론에서 제외됨: {e}") + + if not tensor_list: + return {} # 유효한 종목이 없으면 빈 결과 반환 + + # 2. 리스트에 모인 2D 배열들을 3D 텐서로 조립 [Batch, Seq, Features] + batch_array = np.stack(tensor_list, axis=0) + batch_tensor = torch.from_numpy(batch_array).float().to(self.device) + + # 3. GPU 병렬 추론 (단 1번의 호출로 전체 종목 예측) + self.model.eval() + with torch.no_grad(): + logits = self.model(batch_tensor) + probs = torch.sigmoid(logits).cpu().numpy() # 형태: [Batch, Horizons] + + # 4. 결과를 포트폴리오 모듈이 인식할 수 있도록 딕셔너리로 매핑 + results = {} + for i, ticker in enumerate(valid_tickers): + results[ticker] = { + f"tcn_{horizon}d": float(probs[i, j]) + for j, horizon in enumerate(self.horizons) + } + + return results def save(self, filepath: str): # 수동 저장이 필요한 경우 wrapper에서도 state_dict 저장이 가능합니다. From 6937d90b88ea93a93b9e672cdf7a44cc013e9f86 Mon Sep 17 00:00:00 2001 From: twq110 Date: Thu, 12 Mar 2026 17:34:07 +0900 Subject: [PATCH 6/6] =?UTF-8?q?[AI]=20SISC-290=20[FEAT]=20TCN=20train=20?= =?UTF-8?q?=EB=8D=B0=EC=9D=B4=ED=84=B0=EC=85=8B=20=EC=9D=B8=EB=8D=B1?= =?UTF-8?q?=EC=8A=A4=20=EC=8A=AC=EB=9D=BC=EC=9D=B4=EC=8B=B1=EC=9D=84=20?= =?UTF-8?q?=EB=82=A0=EC=A7=9C=EB=8B=A8=EC=9C=84=EB=A1=9C=20=EC=B2=98?= =?UTF-8?q?=EB=A6=AC=ED=95=98=EB=8F=84=EB=A1=9D=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AI/modules/signal/models/TCN/train.py | 109 +++++++++++--------------- 1 file changed, 46 insertions(+), 63 deletions(-) diff --git a/AI/modules/signal/models/TCN/train.py b/AI/modules/signal/models/TCN/train.py index 4adc3d8d..3c272fb8 100644 --- a/AI/modules/signal/models/TCN/train.py +++ b/AI/modules/signal/models/TCN/train.py @@ -18,12 +18,11 @@ if project_root not in sys.path: sys.path.append(project_root) -from AI.modules.features.legacy.technical_features import add_technical_indicators +from AI.modules.signal.core.dataset_builder import get_standard_training_data from AI.modules.signal.core.data_loader import DataLoader from AI.modules.signal.models.TCN.architecture import TCNClassifier -# TCN은 명세에 맞춰 개별 기술적 지표만 사용합니다. FEATURE_COLUMNS = [ "log_return", "open_ratio", @@ -38,7 +37,6 @@ "bb_position", ] -# 서비스 추론 결과와 맞추기 위해 1/3/5/7일 방향성을 동시에 학습합니다. HORIZONS = [1, 3, 5, 7] @@ -48,16 +46,16 @@ def build_sequences( feature_cols: List[str], horizons: List[int], ) -> Tuple[np.ndarray, np.ndarray]: - # 종목별 시계열을 순회하면서 [seq_len, features] 윈도우와 멀티 호라이즌 라벨을 만듭니다. + """ + 미리 스케일링이 완료된 DataFrame을 받아 [Batch, Seq, Features] 형태의 윈도우와 라벨을 생성합니다. + """ features = [] labels = [] max_horizon = max(horizons) for _, sub_df in df.groupby("ticker"): sub_df = sub_df.sort_values("date").copy() - sub_df = add_technical_indicators(sub_df) sub_df = sub_df.dropna(subset=["close"]) - sub_df = sub_df.replace([np.inf, -np.inf], np.nan).fillna(0) if len(sub_df) < seq_len + max_horizon: continue @@ -68,60 +66,55 @@ def build_sequences( for start in range(len(sub_df) - seq_len - max_horizon + 1): end = start + seq_len current_close = closes[end - 1] + target = [] for horizon in horizons: future_close = closes[end + horizon - 1] + # 미래 종가가 현재 종가보다 크면 상승(1), 아니면 하락(0) target.append(1.0 if future_close > current_close else 0.0) features.append(feature_values[start:end]) labels.append(target) + # 빈 배열 처리 if not features: - raise ValueError("No training sequences were created. Check DB data coverage.") + return np.empty((0, seq_len, len(feature_cols)), dtype=np.float32), np.empty((0, len(horizons)), dtype=np.float32) return np.array(features, dtype=np.float32), np.array(labels, dtype=np.float32) -def fit_scaler(X_train: np.ndarray) -> StandardScaler: - # 시퀀스 축을 펼쳐 feature 단위로 표준화 스케일러를 학습합니다. - scaler = StandardScaler() - scaler.fit(X_train.reshape(-1, X_train.shape[-1])) - return scaler - - -def transform_sequences(X: np.ndarray, scaler: StandardScaler) -> np.ndarray: - # 학습 시 저장한 스케일러를 시퀀스 전체에 동일하게 적용합니다. - shape = X.shape - flat = X.reshape(-1, shape[-1]) - scaled = scaler.transform(flat) - return scaled.reshape(shape).astype(np.float32) - - def train_model(args: argparse.Namespace): - # 공통 DataLoader로 DB에서 가격 데이터를 읽고 TCN 전용 입력만 추립니다. - loader = DataLoader(lookback=args.seq_len, horizons=HORIZONS) - raw_df = loader.load_data_from_db( - start_date=args.start_date, - end_date=args.end_date, - tickers=args.tickers, - ) + # 1. DB에서 원시 데이터 로드 및 파이프라인 표준 전처리 적용 (단일 데이터프레임 반환) + # [주의] get_standard_training_data가 원본 DB 로드 기능까지 수행하므로 DataLoader 별도 호출 불필요 + raw_df = get_standard_training_data(args.start_date, args.end_date) if raw_df.empty: raise ValueError("No raw price data loaded from DB.") - X, y = build_sequences(raw_df, args.seq_len, FEATURE_COLUMNS, HORIZONS) + # 예: 전체 기간의 80% 시점을 기준으로 날짜를 분할합니다. + dates = raw_df['date'].sort_values().unique() + split_date_idx = int(len(dates) * 0.8) # 전체 날짜의 80% 지점에서 분할 날짜 인덱스 계산 + split_date = dates[split_date_idx] + + train_df = raw_df[raw_df['date'] <= split_date].copy() + val_df = raw_df[raw_df['date'] > split_date].copy() + + print(f"Data Split - Train: ~{split_date}, Validation: {split_date}~") + + # 2D 형태에서 스케일링 수행 (안전하고 직관적) + scaler = StandardScaler() + # Train 데이터로만 스케일러 학습 (Validation 데이터 유출 방지) + scaler.fit(train_df[FEATURE_COLUMNS]) - # 시계열 순서를 유지한 채 뒤쪽 구간을 검증셋으로 둡니다. - split_idx = max(int(len(X) * 0.8), 1) - if split_idx >= len(X): - split_idx = len(X) - 1 + train_df[FEATURE_COLUMNS] = scaler.transform(train_df[FEATURE_COLUMNS]) + val_df[FEATURE_COLUMNS] = scaler.transform(val_df[FEATURE_COLUMNS]) - X_train, X_val = X[:split_idx], X[split_idx:] - y_train, y_val = y[:split_idx], y[split_idx:] + # 4. 스케일링된 DataFrame으로 3D 시퀀스(윈도우) 텐서 구축 + X_train, y_train = build_sequences(train_df, args.seq_len, FEATURE_COLUMNS, HORIZONS) + X_val, y_val = build_sequences(val_df, args.seq_len, FEATURE_COLUMNS, HORIZONS) - scaler = fit_scaler(X_train) - X_train = transform_sequences(X_train, scaler) - X_val = transform_sequences(X_val, scaler) + if len(X_train) == 0 or len(X_val) == 0: + raise ValueError("Insufficient data to create sequences for either train or validation set.") train_dataset = TensorDataset(torch.from_numpy(X_train), torch.from_numpy(y_train)) val_dataset = TensorDataset(torch.from_numpy(X_val), torch.from_numpy(y_val)) @@ -130,7 +123,8 @@ def train_model(args: argparse.Namespace): val_loader = TorchDataLoader(val_dataset, batch_size=args.batch_size, shuffle=False) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - # output_size는 horizon 개수와 동일합니다. + print(f"Using device: {device}") + model = TCNClassifier( input_size=len(FEATURE_COLUMNS), output_size=len(HORIZONS), @@ -145,12 +139,12 @@ def train_model(args: argparse.Namespace): best_val_loss = float("inf") best_state = None + # 5. 모델 학습 루프 for epoch in range(args.epochs): model.train() train_loss = 0.0 for batch_x, batch_y in train_loader: - batch_x = batch_x.to(device) - batch_y = batch_y.to(device) + batch_x, batch_y = batch_x.to(device), batch_y.to(device) optimizer.zero_grad() logits = model(batch_x) @@ -164,19 +158,16 @@ def train_model(args: argparse.Namespace): val_loss = 0.0 with torch.no_grad(): for batch_x, batch_y in val_loader: - batch_x = batch_x.to(device) - batch_y = batch_y.to(device) + batch_x, batch_y = batch_x.to(device), batch_y.to(device) logits = model(batch_x) loss = criterion(logits, batch_y) val_loss += loss.item() * batch_x.size(0) train_loss /= len(train_dataset) val_loss /= len(val_dataset) - print( - f"Epoch {epoch + 1}/{args.epochs} " - f"- train_loss: {train_loss:.4f} val_loss: {val_loss:.4f}" - ) + print(f"Epoch {epoch + 1}/{args.epochs} - train_loss: {train_loss:.4f} val_loss: {val_loss:.4f}") + # 검증 손실이 최저일 때 모델 가중치 저장 (Early Stopping 준비) if val_loss < best_val_loss: best_val_loss = val_loss best_state = model.state_dict() @@ -184,7 +175,7 @@ def train_model(args: argparse.Namespace): if best_state is None: best_state = model.state_dict() - # wrapper가 바로 사용할 수 있도록 가중치, scaler, 메타데이터를 함께 저장합니다. + # 6. 아티팩트(가중치, 스케일러, 메타데이터) 파일 시스템 저장 os.makedirs(args.output_dir, exist_ok=True) model_path = os.path.join(args.output_dir, "model.pt") scaler_path = os.path.join(args.output_dir, "scaler.pkl") @@ -205,18 +196,18 @@ def train_model(args: argparse.Namespace): "scaler_path": scaler_path, } with open(metadata_path, "w", encoding="utf-8") as f: - json.dump(metadata, f, ensure_ascii=True, indent=2) + json.dump(metadata, f, ensure_ascii=False, indent=2) + print("\n✅ Training Complete!") print(f"Saved model to: {model_path}") print(f"Saved scaler to: {scaler_path}") print(f"Saved metadata to: {metadata_path}") def parse_args() -> argparse.Namespace: - # 실험 시 종목, 기간, 모델 폭을 CLI에서 바로 바꿀 수 있게 둡니다. parser = argparse.ArgumentParser(description="Train TCN signal model.") parser.add_argument("--start-date", default="2018-01-01") - parser.add_argument("--end-date", default=None) + parser.add_argument("--end-date", default="2024-01-01") parser.add_argument("--tickers", nargs="*", default=None) parser.add_argument("--seq-len", type=int, default=60) parser.add_argument("--epochs", type=int, default=20) @@ -224,18 +215,10 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--learning-rate", type=float, default=1e-3) parser.add_argument("--kernel-size", type=int, default=3) parser.add_argument("--dropout", type=float, default=0.2) - parser.add_argument( - "--channels", - type=int, - nargs="+", - default=[32, 64, 64], - ) - parser.add_argument( - "--output-dir", - default=os.path.join(project_root, "AI", "data", "weights", "tcn"), - ) + parser.add_argument("--channels", type=int, nargs="+", default=[32, 64, 64]) + parser.add_argument("--output-dir", default=os.path.join(project_root, "AI", "data", "weights", "tcn")) return parser.parse_args() if __name__ == "__main__": - train_model(parse_args()) + train_model(parse_args()) \ No newline at end of file