diff --git a/AI/libs/database/fetcher.py b/AI/libs/database/fetcher.py index 69a5e050..ea864a8e 100644 --- a/AI/libs/database/fetcher.py +++ b/AI/libs/database/fetcher.py @@ -1,70 +1,133 @@ -# AI/libs/database/fetcher.py -from __future__ import annotations -from typing import Optional +#AI/libs/database/fetcher.py +from typing import Dict import pandas as pd from sqlalchemy import text - -# DB용 유틸: SQLAlchemy Engine 생성 함수 사용 (get_engine) from .connection import get_engine -def fetch_ohlcv( +def fetch_price_data( ticker: str, - start: str, - end: str, - interval: str = "1d", - db_name: str = "db", + start_date: str, + db_name: str = "db" ) -> pd.DataFrame: """ - 특정 티커, 날짜 범위의 OHLCV 데이터를 DB에서 불러오기 - - Args: - ticker (str): 종목 코드 (예: "AAPL") - start (str): 시작일자 'YYYY-MM-DD' - end (str): 종료일자 'YYYY-MM-DD' - interval (str): 데이터 간격 (현재 일봉만 지원) - db_name (str): DB 설정 이름 - - Returns: - pd.DataFrame: [ticker, date, open, high, low, close, adjusted_close, volume] + [기본] 종목별 시세 데이터 조회 (Price Data) """ - engine = get_engine(db_name) - - # adjusted_close가 중요하다면 쿼리 단계에서 확실히 가져옵니다. query = text(""" - SELECT ticker, date, open, high, low, close, adjusted_close, volume + SELECT date, ticker, open, high, low, close, adjusted_close, volume, amount FROM public.price_data WHERE ticker = :ticker - AND date BETWEEN :start AND :end - ORDER BY date; + AND date >= :start_date + ORDER BY date ASC; """) with engine.connect() as conn: - df = pd.read_sql( - query, - con=conn, - params={"ticker": ticker, "start": start, "end": end}, - ) + df = pd.read_sql(query, conn, params={"ticker": ticker, "start_date": start_date}) + + if not df.empty: + df["date"] = pd.to_datetime(df["date"]) + # 수정주가 처리 + if "adjusted_close" in df.columns: + df["close"] = df["adjusted_close"].fillna(df["close"]) + + return df - # 빈 데이터 처리 - if df is None or df.empty: - return pd.DataFrame(columns=["ticker", "date", "open", "high", "low", "close", "adjusted_close", "volume"]) +def fetch_macro_indicators( + start_date: str, + db_name: str = "db" +) -> pd.DataFrame: + """ + [공통] 거시경제 지표 조회 (모든 종목 공통 적용) + """ + engine = get_engine(db_name) + query = text(""" + SELECT date, + us10y, us2y, yield_spread, + vix_close, dxy_close, wti_price, gold_price, + credit_spread_hy + FROM public.macroeconomic_indicators + WHERE date >= :start_date + ORDER BY date ASC; + """) + + with engine.connect() as conn: + df = pd.read_sql(query, conn, params={"start_date": start_date}) + + if not df.empty: + df["date"] = pd.to_datetime(df["date"]) + + return df + +def fetch_market_breadth( + start_date: str, + db_name: str = "db" +) -> pd.DataFrame: + """ + [공통] 시장 건전성 지표 (Market Breadth) + """ + engine = get_engine(db_name) + query = text(""" + SELECT date, + advance_decline_ratio, fear_greed_index, + new_highs, new_lows, above_ma200_pct + FROM public.market_breadth + WHERE date >= :start_date + ORDER BY date ASC; + """) - # 날짜 변환 - if "date" in df.columns: + with engine.connect() as conn: + df = pd.read_sql(query, conn, params={"start_date": start_date}) + + if not df.empty: df["date"] = pd.to_datetime(df["date"]) - # 데이터 보정 로직 추가 - # 1. adjusted_close가 없는 경우(NaN) -> close 값으로 대체 (결측치 방지) - if "adjusted_close" in df.columns and "close" in df.columns: - df["adjusted_close"] = df["adjusted_close"].fillna(df["close"]) - elif "adjusted_close" not in df.columns and "close" in df.columns: - # 컬럼 자체가 없으면 close를 복사해서 생성 - df["adjusted_close"] = df["close"] + return df + +def fetch_news_sentiment( + ticker: str, + start_date: str, + db_name: str = "db" +) -> pd.DataFrame: + """ + [개별] 뉴스 심리 지수 조회 + """ + engine = get_engine(db_name) + query = text(""" + SELECT date, sentiment_score, risk_keyword_cnt, article_count + FROM public.news_sentiment + WHERE ticker = :ticker + AND date >= :start_date + ORDER BY date ASC; + """) - # 컬럼 순서 정리 - desired_cols = ["ticker", "date", "open", "high", "low", "close", "adjusted_close", "volume"] - cols_present = [c for c in desired_cols if c in df.columns] - df = df.loc[:, cols_present] + with engine.connect() as conn: + df = pd.read_sql(query, conn, params={"ticker": ticker, "start_date": start_date}) + + if not df.empty: + df["date"] = pd.to_datetime(df["date"]) + + return df +def fetch_fundamentals( + ticker: str, + db_name: str = "db" +) -> pd.DataFrame: + """ + [개별] 기업 펀더멘털 데이터 (재무제표) + """ + engine = get_engine(db_name) + # 재무제표는 start_date 제한 없이 가져와서 ffill 하는 것이 안전함 + query = text(""" + SELECT date, per, pbr, roe, debt_ratio, operating_cash_flow + FROM public.company_fundamentals + WHERE ticker = :ticker + ORDER BY date ASC; + """) + + with engine.connect() as conn: + df = pd.read_sql(query, conn, params={"ticker": ticker}) + + if not df.empty: + df["date"] = pd.to_datetime(df["date"]) + return df \ No newline at end of file diff --git a/AI/modules/data_collector/__init__.py b/AI/modules/data_collector/__init__.py index 515cabdf..2bb18233 100644 --- a/AI/modules/data_collector/__init__.py +++ b/AI/modules/data_collector/__init__.py @@ -1,8 +1,25 @@ -# AI/modules/collector/__init__.py -from .market_data import update_market_data -from .news_data import collect_news +# AI/modules/data_collector/__init__.py + +from .market_data import MarketDataCollector +from .stock_info_collector import StockInfoCollector +from .company_fundamentals_data import FundamentalsDataCollector +from .macro_data import MacroDataCollector +from .crypto_data import CryptoDataCollector +from .index_data import IndexDataCollector +from .event_data import EventDataCollector +from .market_breadth_data import MarketBreadthCollector +from .market_breadth_stats import MarketBreadthStatsCollector +# from .news_data import NewsDataCollector # 뉴스 모듈 구현 시 주석 해제 __all__ = [ - "update_market_data", - "collect_news", + "MarketDataCollector", + "StockInfoCollector", + "FundamentalsDataCollector", + "MacroDataCollector", + "CryptoDataCollector" + "IndexDataCollector", + "EventDataCollector", + "MarketBreadthCollector", + "MarketBreadthStatsCollector", + # "NewsDataCollector", # 뉴스 모듈 구현 시 주석 해제 ] \ No newline at end of file diff --git a/AI/modules/data_collector/company_fundamentals_data.py b/AI/modules/data_collector/company_fundamentals_data.py index 0da49d66..9b875769 100644 --- a/AI/modules/data_collector/company_fundamentals_data.py +++ b/AI/modules/data_collector/company_fundamentals_data.py @@ -1,21 +1,14 @@ -# AI/modules/data_collector/fundamentals_data.py -""" -[기업 펀더멘털 데이터 수집기] -- yfinance를 통해 기업의 재무제표(손익계산서, 대차대조표) 데이터를 수집합니다. -- 수집된 데이터는 'company_fundamentals' 테이블에 저장됩니다. -- 주로 분기(Quarterly) 데이터를 기준으로 수집합니다. -""" - +#AI/modules/data_collector/company_fundamentals_data.py import sys import os import yfinance as yf import pandas as pd import numpy as np -from datetime import datetime -from typing import List, Optional +from datetime import timedelta +from typing import List from psycopg2.extras import execute_values -# 프로젝트 루트 경로 추가 (기존 스타일 유지) +# 프로젝트 루트 경로 설정 current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.abspath(os.path.join(current_dir, "../../..")) if project_root not in sys.path: @@ -23,130 +16,207 @@ from AI.libs.database.connection import get_db_conn - -def update_company_fundamentals(tickers: List[str], db_name: str = "db"): +class FundamentalsDataCollector: """ - 지정된 종목들의 재무제표(펀더멘털) 데이터를 수집하여 DB에 저장합니다. + 기업의 재무제표(손익계산서, 대차대조표, 현금흐름표)를 수집하고 + 주요 퀀트 투자 지표(PER, PBR, ROE, 이자보상배율 등)를 계산하여 DB에 저장하는 클래스 """ - print(f"[Fundamentals] {len(tickers)}개 종목 재무 데이터 업데이트 시작...") - - conn = get_db_conn(db_name) - cursor = conn.cursor() - - try: - for ticker in tickers: - print(f" [{ticker}] 재무 정보 수집 중...") - + + def __init__(self, db_name: str = "db"): + self.db_name = db_name + + def get_safe_value(self, row: pd.Series, keys: List[str]) -> float: + """ + 여러 키 후보 중 존재하는 값을 찾아 반환합니다. (결측치 처리 포함) + """ + for k in keys: + if k in row and pd.notna(row[k]): + return float(row[k]) + return None + + def fetch_and_calculate_metrics(self, ticker: str): + """ + 개별 종목의 재무 데이터를 수집, 병합 및 지표를 계산합니다. + """ + stock = yf.Ticker(ticker) + + # 1. 재무 데이터 가져오기 (분기 기준) + try: + fin_df = stock.quarterly_financials.T + bal_df = stock.quarterly_balance_sheet.T + cash_df = stock.quarterly_cashflow.T + except Exception as e: + # print(f" [{ticker}] yfinance 데이터 로드 실패: {e}") + return pd.DataFrame() + + if fin_df.empty or bal_df.empty: + return pd.DataFrame() + + # 2. 인덱스(날짜) 통일 + fin_df.index = pd.to_datetime(fin_df.index) + bal_df.index = pd.to_datetime(bal_df.index) + cash_df.index = pd.to_datetime(cash_df.index) + + # 3. 데이터프레임 병합 (Inner Join) + merged_df = fin_df.join(bal_df, lsuffix='_fin', rsuffix='_bal', how='inner') + merged_df = merged_df.join(cash_df, rsuffix='_cash', how='left') + + # 4. 주가 데이터 로드 (PER/PBR 계산용) + if not merged_df.empty: + start_date = merged_df.index.min() - timedelta(days=5) + end_date = merged_df.index.max() + timedelta(days=5) try: - stock = yf.Ticker(ticker) - - # 1. 재무 데이터 가져오기 (분기 데이터 우선) - # yfinance API: quarterly_financials(손익), quarterly_balance_sheet(대차대조표) - # 데이터는 컬럼이 '날짜(Date)'로 되어 있으므로 전치(T)하여 행을 날짜로 만듦 - fin_df = stock.quarterly_financials.T - bal_df = stock.quarterly_balance_sheet.T - - if fin_df.empty or bal_df.empty: - print(f" [{ticker}] 재무 데이터 없음 (Skip).") - continue - - # 2. 인덱스(날짜) 통일 및 병합 - # 인덱스 이름이 다를 수 있으므로 날짜 포맷 통일 - fin_df.index = pd.to_datetime(fin_df.index) - bal_df.index = pd.to_datetime(bal_df.index) - - # 날짜(index) 기준으로 병합 (Inner Join: 손익/대차대조표 모두 있는 날짜만) - merged_df = fin_df.join(bal_df, lsuffix='_fin', rsuffix='_bal', how='inner') - - # 3. 데이터 매핑 및 전처리 - # yfinance 필드명 -> DB 컬럼명 매핑 - # (존재하지 않는 필드는 NaN 처리됨) - - data_to_insert = [] - - for date_idx, row in merged_df.iterrows(): - date_val = date_idx.date() - - # 안전하게 값 가져오기 헬퍼 함수 - def get_val(keys): - for k in keys: - if k in row and pd.notna(row[k]): - return float(row[k]) - return None - - # 매핑 로직 (yfinance 필드명이 종종 변경되므로 여러 후보군 확인) - revenue = get_val(['Total Revenue', 'Operating Revenue']) - net_income = get_val(['Net Income', 'Net Income Common Stockholders']) - total_assets = get_val(['Total Assets']) - - # 부채총계: Total Liabilities Net Minority Interest 또는 Total Liabilities - total_liabilities = get_val(['Total Liabilities Net Minority Interest', 'Total Liabilities']) - - # 자본총계: Stockholders Equity - equity = get_val(['Stockholders Equity', 'Total Equity Gross Minority Interest']) - - # EPS - eps = get_val(['Basic EPS', 'Diluted EPS']) - - # P/E Ratio - # 재무제표 발표 시점의 P/E는 과거 주가가 필요하므로, - # 여기서는 간단히 NULL로 두거나, 필요시 price_data 테이블 조인해서 계산해야 함. - # 일단은 NULL로 저장. - pe_ratio = None - - data_to_insert.append(( - str(ticker), - date_val, - revenue, - net_income, - total_assets, - total_liabilities, - equity, - eps, - pe_ratio - )) - - # 4. DB 저장 (Upsert) - # company_fundamentals_pkey: (ticker, date) - insert_query = """ - INSERT INTO public.company_fundamentals ( - ticker, date, revenue, net_income, total_assets, - total_liabilities, equity, eps, pe_ratio - ) - VALUES %s - ON CONFLICT (ticker, date) - DO UPDATE SET - revenue = EXCLUDED.revenue, - net_income = EXCLUDED.net_income, - total_assets = EXCLUDED.total_assets, - total_liabilities = EXCLUDED.total_liabilities, - equity = EXCLUDED.equity, - eps = EXCLUDED.eps, - pe_ratio = EXCLUDED.pe_ratio - """ - - if data_to_insert: - execute_values(cursor, insert_query, data_to_insert) - conn.commit() - print(f" [{ticker}] {len(data_to_insert)}건 재무 데이터 저장 완료.") - else: - print(f" [{ticker}] 저장할 유효한 데이터가 없습니다.") + hist_df = stock.history(start=start_date, end=end_date) + except: + hist_df = pd.DataFrame() + else: + hist_df = pd.DataFrame() - except Exception as e: - print(f" [{ticker}] 처리 중 에러 발생: {e}") - conn.rollback() - continue + processed_data = [] + + for date_idx, row in merged_df.iterrows(): + date_val = date_idx.date() + + # --- A. 기본 재무 데이터 매핑 --- + revenue = self.get_safe_value(row, ['Total Revenue', 'Operating Revenue']) + net_income = self.get_safe_value(row, ['Net Income', 'Net Income Common Stockholders']) + total_assets = self.get_safe_value(row, ['Total Assets']) + total_liabilities = self.get_safe_value(row, ['Total Liabilities Net Minority Interest', 'Total Liabilities']) + equity = self.get_safe_value(row, ['Stockholders Equity', 'Total Equity Gross Minority Interest']) + eps = self.get_safe_value(row, ['Basic EPS', 'Diluted EPS']) + operating_cash_flow = self.get_safe_value(row, ['Operating Cash Flow', 'Total Cash From Operating Activities']) + shares_issued = self.get_safe_value(row, ['Share Issued', 'Ordinary Shares Number']) + + # --- [추가] 이자보상배율 계산을 위한 항목 --- + op_income = self.get_safe_value(row, ['Operating Income', 'EBIT']) + int_expense = self.get_safe_value(row, ['Interest Expense', 'Interest Expense Non Operating']) + + # --- B. 파생 지표 계산 --- + + # 1. ROE (자기자본이익률) + roe = None + if net_income is not None and equity is not None and equity != 0: + roe = net_income / equity + + # 2. 부채비율 (Debt Ratio) + debt_ratio = None + if total_liabilities is not None and equity is not None and equity != 0: + debt_ratio = total_liabilities / equity - except Exception as e: - conn.rollback() - print(f"[Fundamentals][Error] 치명적 오류: {e}") - finally: - cursor.close() - conn.close() + # 3. [신규] 이자보상배율 (Interest Coverage) = 영업이익 / |이자비용| + interest_coverage = None + if op_income is not None and int_expense is not None: + abs_int = abs(int_expense) + if abs_int > 0: + interest_coverage = op_income / abs_int + # 4. 주가 기반 지표 (PER, PBR) + close_price = None + if not hist_df.empty: + try: + target_ts = pd.Timestamp(date_val) + if target_ts in hist_df.index: + close_price = float(hist_df.loc[target_ts]['Close']) + else: + idx = hist_df.index.get_indexer([target_ts], method='pad') + if idx[0] != -1: + close_price = float(hist_df.iloc[idx[0]]['Close']) + except: + close_price = None + + # PER + per = None + if close_price is not None and eps is not None and eps != 0: + per = close_price / eps + + # PBR + pbr = None + if close_price is not None and equity is not None and shares_issued is not None and shares_issued != 0: + bps = equity / shares_issued + pbr = close_price / bps + + processed_data.append(( + str(ticker), + date_val, + revenue, + net_income, + total_assets, + total_liabilities, + equity, + eps, + per, + pbr, + roe, + debt_ratio, + interest_coverage, # [추가됨] + operating_cash_flow + )) + + return processed_data + + def save_to_db(self, ticker: str, data: List[tuple]): + """ + 처리된 데이터를 DB에 저장(Upsert)합니다. + """ + if not data: + # print(f" [{ticker}] 저장할 유효한 데이터가 없습니다.") + return + + conn = get_db_conn(self.db_name) + cursor = conn.cursor() + + try: + # [SQL 수정] interest_coverage 컬럼 추가 + insert_query = """ + INSERT INTO public.financial_statements ( + ticker, date, revenue, net_income, total_assets, + total_liabilities, equity, eps, per, pbr, roe, debt_ratio, + interest_coverage, operating_cash_flow + ) + VALUES %s + ON CONFLICT (ticker, date) + DO UPDATE SET + revenue = EXCLUDED.revenue, + net_income = EXCLUDED.net_income, + total_assets = EXCLUDED.total_assets, + total_liabilities = EXCLUDED.total_liabilities, + equity = EXCLUDED.equity, + eps = EXCLUDED.eps, + per = EXCLUDED.per, + pbr = EXCLUDED.pbr, + roe = EXCLUDED.roe, + debt_ratio = EXCLUDED.debt_ratio, + interest_coverage = EXCLUDED.interest_coverage, + operating_cash_flow = EXCLUDED.operating_cash_flow; + """ + + execute_values(cursor, insert_query, data) + conn.commit() + print(f" [{ticker}] {len(data)}건 펀더멘털 데이터(이자보상배율 포함) 저장 완료.") + + except Exception as e: + conn.rollback() + print(f" [{ticker}][Error] DB 저장 실패: {e}") + finally: + cursor.close() + conn.close() + + def update_tickers(self, tickers: List[str]): + """ + 주어진 종목 리스트에 대해 업데이트를 수행합니다. + """ + print(f"[Fundamentals] {len(tickers)}개 종목 재무 데이터 업데이트 시작...") + + for ticker in tickers: + # print(f" [{ticker}] 재무 정보 분석 및 수집 중...") + try: + data = self.fetch_and_calculate_metrics(ticker) + self.save_to_db(ticker, data) + except Exception as e: + print(f" [{ticker}][Error] 처리 중 예외 발생: {e}") # ---------------------------------------------------------------------- -# [수동 실행 모드] +# [실행 모드] # ---------------------------------------------------------------------- if __name__ == "__main__": import argparse @@ -158,22 +228,23 @@ def get_val(keys): args = parser.parse_args() target_tickers = args.tickers - - # --all 옵션 처리 + + # DB 연결 테스트 및 종목 로드 + conn = get_db_conn(args.db) + if args.all: try: - # ticker_loader가 없다면 직접 쿼리 - conn = get_db_conn(args.db) cur = conn.cursor() - cur.execute("SELECT DISTINCT ticker FROM public.price_data") # 가격 데이터가 있는 종목 기준 + cur.execute("SELECT DISTINCT ticker FROM public.price_data") rows = cur.fetchall() target_tickers = [r[0] for r in rows] cur.close() - conn.close() print(f">> DB에서 {len(target_tickers)}개 종목을 조회했습니다.") except Exception as e: print(f"[Error] 종목 로드 실패: {e}") sys.exit(1) + + conn.close() if not target_tickers: print("\n>> 수집할 종목 코드를 입력하세요 (예: AAPL TSLA)") @@ -188,5 +259,6 @@ def get_val(keys): sys.exit(0) if target_tickers: - update_company_fundamentals(target_tickers, db_name=args.db) + collector = FundamentalsDataCollector(db_name=args.db) + collector.update_tickers(target_tickers) print("\n[완료] 작업이 끝났습니다.") \ No newline at end of file diff --git a/AI/modules/data_collector/crypto_data.py b/AI/modules/data_collector/crypto_data.py new file mode 100644 index 00000000..f1566de4 --- /dev/null +++ b/AI/modules/data_collector/crypto_data.py @@ -0,0 +1,220 @@ +#AI/modules/data_collector/crypto_data.py +import sys +import os +import yfinance as yf +import pandas as pd +from datetime import datetime, timedelta +from typing import List +from psycopg2.extras import execute_values + +# 프로젝트 루트 경로 설정 +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../../..")) +if project_root not in sys.path: + sys.path.append(project_root) + +from AI.libs.database.connection import get_db_conn + +class CryptoDataCollector: + """ + 암호화폐(Crypto)의 시세 데이터를 수집하여 DB에 적재하는 클래스 + - 대상 테이블: crypto_price_data + - yfinance 티커 예시: BTC-USD, ETH-USD + """ + + def __init__(self, db_name: str = "db"): + self.db_name = db_name + # 암호화폐 데이터가 없을 경우 시작할 기본 날짜 (비트코인 등 주요 코인 고려) + self.FIXED_START_DATE = "2018-01-01" + + def get_start_date(self, ticker: str, repair_mode: bool) -> str: + """ + DB를 조회하여 수집 시작 날짜를 결정합니다. + """ + if repair_mode: + return self.FIXED_START_DATE + + conn = get_db_conn(self.db_name) + cursor = conn.cursor() + try: + # 티커에서 '-USD'를 제거하거나 그대로 사용할지 정책에 따라 다르지만, + # 여기서는 입력받은 티커 그대로 DB에 저장한다고 가정합니다. + cursor.execute("SELECT MAX(date) FROM public.crypto_price_data WHERE ticker = %s", (ticker,)) + last_date = cursor.fetchone()[0] + + if last_date: + # 마지막 데이터 다음 날부터 수집 + return (last_date + timedelta(days=1)).strftime("%Y-%m-%d") + else: + return self.FIXED_START_DATE + except Exception as e: + print(f" [Warning] 시작 날짜 조회 실패 ({ticker}): {e}") + return self.FIXED_START_DATE + finally: + cursor.close() + conn.close() + + def fetch_crypto_ohlcv(self, ticker: str, start_date: str) -> pd.DataFrame: + """ + yfinance에서 암호화폐 OHLCV 데이터를 다운로드합니다. + """ + try: + # Crypto는 24/7 거래되므로 interval='1d'로 일봉 수집 + df = yf.download(ticker, start=start_date, progress=False, auto_adjust=False, threads=False) + + if df.empty: + return pd.DataFrame() + + # MultiIndex 컬럼 평탄화 + if isinstance(df.columns, pd.MultiIndex): + df.columns = df.columns.get_level_values(0) + + # yfinance history 데이터에는 과거 Market Cap이 포함되지 않는 경우가 많음. + # 스키마 요구사항에 따라 컬럼은 존재해야 하므로 None으로 초기화하거나 + # 별도 로직(Close * Circulating Supply)이 필요함. 현재는 None 처리. + if 'Market Cap' not in df.columns: + df['Market Cap'] = None + + return df + except Exception as e: + print(f" [Error] {ticker} 다운로드 중 에러: {e}") + return pd.DataFrame() + + def save_to_db(self, ticker: str, df: pd.DataFrame): + """ + 데이터프레임을 crypto_price_data 테이블에 저장(Upsert)합니다. + """ + conn = get_db_conn(self.db_name) + cursor = conn.cursor() + + insert_query = """ + INSERT INTO public.crypto_price_data ( + ticker, date, open, high, low, close, volume, market_cap + ) + VALUES %s + ON CONFLICT (ticker, date) DO UPDATE SET + open = EXCLUDED.open, + high = EXCLUDED.high, + low = EXCLUDED.low, + close = EXCLUDED.close, + volume = EXCLUDED.volume, + market_cap = EXCLUDED.market_cap; + """ + + try: + data_to_insert = [] + has_adj = 'Adj Close' in df.columns # Crypto는 보통 Close == Adj Close + + for index, row in df.iterrows(): + # Timestamp to datetime + date_val = index.to_pydatetime() + + def get_val(col): + val = row.get(col, 0) + if hasattr(val, 'iloc'): + return float(val.iloc[0]) + # None(Market Cap) 처리 + if pd.isna(val) or val is None: + return None + return float(val) + + open_val = get_val('Open') + high_val = get_val('High') + low_val = get_val('Low') + close_val = get_val('Close') + vol_val = get_val('Volume') + mkt_cap_val = get_val('Market Cap') + + data_to_insert.append(( + ticker, date_val, open_val, high_val, low_val, close_val, vol_val, mkt_cap_val + )) + + if data_to_insert: + execute_values(cursor, insert_query, data_to_insert) + conn.commit() + print(f" [{ticker}] {len(data_to_insert)}건 저장 완료.") + else: + print(f" [{ticker}] 저장할 데이터가 없습니다.") + + except Exception as e: + conn.rollback() + print(f" [{ticker}][Error] DB 저장 실패: {e}") + finally: + cursor.close() + conn.close() + + def update_tickers(self, tickers: List[str], repair_mode: bool = False): + """ + 여러 암호화폐 티커에 대해 수집 작업을 수행합니다. + """ + mode_msg = "[Repair Mode]" if repair_mode else "[Update Mode]" + print(f"[CryptoData] {mode_msg} {len(tickers)}개 코인 업데이트 시작...") + + today = datetime.now().strftime("%Y-%m-%d") + + for ticker in tickers: + # 1. 수집 시작 날짜 결정 + start_date = self.get_start_date(ticker, repair_mode) + + # 이미 최신이면 스킵 (Repair 모드가 아닐 때만) + if not repair_mode and start_date > today: + print(f" [{ticker}] 이미 최신 데이터입니다.") + continue + + print(f" [{ticker}] 수집 시작 ({start_date} ~ )...") + + # 2. 데이터 수집 + df = self.fetch_crypto_ohlcv(ticker, start_date) + + # 3. DB 저장 + if not df.empty: + self.save_to_db(ticker, df) + else: + print(f" [{ticker}] 수집된 데이터가 없습니다.") + +# ---------------------------------------------------------------------- +# [실행 모드] +# ---------------------------------------------------------------------- +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="[수동] 암호화폐 데이터 수집기") + parser.add_argument("tickers", nargs='*', help="수집할 티커 리스트 (예: BTC-USD ETH-USD)") + # --all 옵션 추가: BTC-USD와 ETH-USD만 대상으로 설정 + parser.add_argument("--all", action="store_true", help="주요 코인(BTC, ETH) 데이터 일괄 업데이트") + parser.add_argument("--repair", action="store_true", help="[복구모드] 전체 기간 재수집") + parser.add_argument("--db", default="db", help="DB 이름") + + args = parser.parse_args() + target_tickers = args.tickers + + # --all 옵션 처리: BTC와 ETH만 강제 지정 + if args.all: + target_tickers = ["BTC-USD", "ETH-USD"] + print(">> [Info] --all 옵션 활성화: BTC-USD, ETH-USD 만 수집합니다.") + + # 입력이 없을 경우 인터랙티브 모드 + if not target_tickers: + print("\n========================================================") + print(" [Manual Mode] 암호화폐 데이터 수집기") + print("========================================================") + print(" 사용 예시:") + print(" 1) 특정 코인 수집 : python crypto_data.py BTC-USD XRP-USD") + print(" 2) 주요 코인(BTC/ETH) 업데이트 : python crypto_data.py --all") + print(" 3) 누락 데이터 복구 : python crypto_data.py --all --repair") + print("========================================================") + print(">> 수집할 티커를 입력하세요 (종료: Enter)") + + try: + input_str = sys.stdin.readline().strip() + if input_str: + target_tickers = input_str.split() + else: + sys.exit(0) + except KeyboardInterrupt: + sys.exit(0) + + if target_tickers: + collector = CryptoDataCollector(db_name=args.db) + collector.update_tickers(target_tickers, repair_mode=args.repair) + print("\n[완료] 작업이 끝났습니다.") \ No newline at end of file diff --git a/AI/modules/data_collector/event_data.py b/AI/modules/data_collector/event_data.py new file mode 100644 index 00000000..7040bfa2 --- /dev/null +++ b/AI/modules/data_collector/event_data.py @@ -0,0 +1,266 @@ +import sys +import os +import time +import requests +import pandas as pd +import numpy as np +import yfinance as yf +from datetime import datetime, timedelta, date +from typing import List, Optional +from psycopg2.extras import execute_values + +# 프로젝트 루트 경로 설정 +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../../..")) +if project_root not in sys.path: + sys.path.append(project_root) + +from AI.libs.database.connection import get_db_conn + +# FMP API 키 +FMP_API_KEY = os.getenv("FMP_API_KEY", "your_fmp_api_key_here") + +class EventDataCollector: + """ + [이벤트 일정 및 서프라이즈 수집기] + - Macro: FMP API를 통해 Forecast/Actual 수집 + - Earnings: yfinance를 통해 EPS Estimate/Reported 수집 + - Schema: event_date, event_type, ticker, description, forecast, actual + """ + + def __init__(self, db_name: str = "db"): + self.db_name = db_name + self.base_url = "https://financialmodelingprep.com/api/v3/economic_calendar" + + def _has_sufficient_macro_data(self) -> bool: + """DB에 미래 데이터가 충분한지 확인""" + conn = get_db_conn(self.db_name) + cursor = conn.cursor() + try: + query = """ + SELECT COUNT(*) + FROM public.event_calendar + WHERE ticker = 'MACRO' AND event_date > CURRENT_DATE + """ + cursor.execute(query) + count = cursor.fetchone()[0] + # 미래 데이터가 5개 이상이면 Skip (단, Actual 업데이트를 위해 force_update 필요할 수 있음) + return count >= 5 + except Exception: + return False + finally: + cursor.close() + conn.close() + + def fetch_macro_from_api(self): + """FMP API 호출""" + print("[Event] FMP API로 경제 지표(Surprise) 수집 중...") + # 과거 데이터 업데이트(Actual 채우기)를 위해 시작일을 30일 전으로 설정 + start_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d") + end_date = (datetime.now() + timedelta(days=365)).strftime("%Y-%m-%d") + + url = f"{self.base_url}?from={start_date}&to={end_date}&apikey={FMP_API_KEY}" + try: + response = requests.get(url, timeout=10) + if response.status_code != 200: + print(f" [Error] API 호출 실패: {response.status_code}") + return [] + return response.json() + except Exception as e: + print(f" [Error] API 연동 오류: {e}") + return [] + + def update_macro_events(self, force_update: bool = False): + """거시경제 일정 및 수치 저장""" + # force_update가 False여도, 지난달의 Actual 값이 비어있을 수 있으므로 + # API 호출을 아예 막기보다, API 호출 부담이 적다면 주기적으로 실행하는 것이 좋습니다. + # 여기서는 '미래 일정'만 체크하는 로직 유지하되, 필요시 force_update=True로 실행 권장. + if not force_update and self._has_sufficient_macro_data(): + # print(" >> Macro 일정 충분함 (Skip). Actual 업데이트 필요시 --repair 사용.") + return + + if "your_fmp_api_key_here" in FMP_API_KEY and not os.getenv("FMP_API_KEY"): + print("[Error] FMP_API_KEY 미설정.") + return + + events = self.fetch_macro_from_api() + if not events: return + + conn = get_db_conn(self.db_name) + cursor = conn.cursor() + + # [SQL 수정] actual, forecast 추가 + insert_query = """ + INSERT INTO public.event_calendar + (event_date, event_type, ticker, description, forecast, actual) + VALUES %s + ON CONFLICT (event_date, event_type, ticker) + DO UPDATE SET + description = EXCLUDED.description, + forecast = EXCLUDED.forecast, + actual = EXCLUDED.actual; + """ + + target_keywords = { + 'FOMC': ['FOMC', 'Fed Interest Rate Decision'], + 'CPI': ['CPI', 'Consumer Price Index'], + 'GDP': ['GDP Growth Rate', 'Gross Domestic Product'], + 'PCE': ['PCE', 'Personal Consumption Expenditures'] + } + + data_to_insert = [] + seen = set() + + for item in events: + if item.get('country') != 'US': continue + + evt_date = item.get('date', '')[:10] + evt_name = item.get('event', '') + + # 수치 파싱 (None 처리) + estimate_val = item.get('estimate') + actual_val = item.get('actual') + + detected_type = None + for key, keywords in target_keywords.items(): + if any(k in evt_name for k in keywords): + detected_type = key + break + + if detected_type: + # 키: 날짜+타입 + if (evt_date, detected_type) not in seen: + data_to_insert.append(( + evt_date, + detected_type, + 'MACRO', + evt_name, + estimate_val, # forecast + actual_val # actual + )) + seen.add((evt_date, detected_type)) + + try: + if data_to_insert: + execute_values(cursor, insert_query, data_to_insert) + conn.commit() + print(f" >> Macro 일정 및 서프라이즈 {len(data_to_insert)}건 저장 완료.") + else: + print(" >> 저장할 주요 이벤트가 없습니다.") + except Exception as e: + conn.rollback() + print(f" [Error] DB 저장 실패: {e}") + finally: + cursor.close() + conn.close() + + def update_earnings_dates(self, tickers: List[str]): + """ + 기업 실적 발표일 및 EPS 예측치/실제치 저장 + yfinance의 get_earnings_dates() 사용 + """ + if not tickers: return + print(f"[Event] 기업 {len(tickers)}개 실적 서프라이즈 데이터 수집 중...") + + conn = get_db_conn(self.db_name) + cursor = conn.cursor() + + insert_query = """ + INSERT INTO public.event_calendar + (event_date, event_type, ticker, description, forecast, actual) + VALUES %s + ON CONFLICT (event_date, event_type, ticker) + DO UPDATE SET + forecast = EXCLUDED.forecast, + actual = EXCLUDED.actual; + """ + + data_buffer = [] + + for i, ticker in enumerate(tickers): + try: + yf_ticker = yf.Ticker(ticker) + + # [변경] calendar 대신 get_earnings_dates 사용 (과거+미래 데이터 포함) + # limit=12 (최근 1년 ~ 미래 1년 정도) + df_earnings = yf_ticker.get_earnings_dates(limit=8) + + if df_earnings is None or df_earnings.empty: + continue + + # 인덱스가 날짜임. 컬럼: 'EPS Estimate', 'Reported EPS', 'Surprise(%)' + # 미래 데이터는 Reported EPS가 NaN임. + + for dt_idx, row in df_earnings.iterrows(): + evt_date = dt_idx.date() + + # 너무 먼 과거 데이터는 스킵 (최근 6개월 ~ 미래 1년만 저장) + if evt_date < (date.today() - timedelta(days=180)): + continue + if evt_date > (date.today() + timedelta(days=365)): + continue + + estimate = row.get('EPS Estimate') + reported = row.get('Reported EPS') + + # NaN -> None 변환 (DB 저장을 위해) + if pd.isna(estimate): estimate = None + else: estimate = float(estimate) + + if pd.isna(reported): reported = None + else: reported = float(reported) + + description = f"{ticker} Earnings Release" + + data_buffer.append(( + evt_date, + 'EARNINGS', + ticker, + description, + estimate, # forecast + reported # actual + )) + + # API 호출 속도 조절 + time.sleep(0.1) + + if len(data_buffer) >= 50: + execute_values(cursor, insert_query, data_buffer) + conn.commit() + data_buffer = [] + + except Exception: + # yfinance 에러 발생 시 조용히 넘어감 (데이터 없는 경우 다수) + continue + + if data_buffer: + execute_values(cursor, insert_query, data_buffer) + conn.commit() + + cursor.close() + conn.close() + print(f" >> 실적 데이터 업데이트 완료.") + + def run(self, tickers: List[str] = None): + # 기본적으로 Macro는 업데이트 + self.update_macro_events(force_update=False) + if tickers: + self.update_earnings_dates(tickers) + +if __name__ == "__main__": + import argparse + from AI.libs.database.ticker_loader import load_all_tickers_from_db + + parser = argparse.ArgumentParser() + parser.add_argument("--all", action="store_true") + parser.add_argument("--force", action="store_true") + parser.add_argument("--db", default="db") + args = parser.parse_args() + + collector = EventDataCollector(db_name=args.db) + + if args.force: + collector.update_macro_events(force_update=True) + + targets = load_all_tickers_from_db() if args.all else None + collector.run(tickers=targets) \ No newline at end of file diff --git a/AI/modules/data_collector/index_data.py b/AI/modules/data_collector/index_data.py new file mode 100644 index 00000000..bac8fed5 --- /dev/null +++ b/AI/modules/data_collector/index_data.py @@ -0,0 +1,166 @@ +#AI/modules/data_collector/index_data.py +import sys +import os +import yfinance as yf +import pandas as pd +from datetime import datetime, timedelta +from psycopg2.extras import execute_values + +# 프로젝트 루트 경로 설정 +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../../..")) +if project_root not in sys.path: + sys.path.append(project_root) + +from AI.libs.database.connection import get_db_conn + +class IndexDataCollector: + """ + 주요 주식 시장 지수(Benchmark Index)를 수집하여 price_data 테이블에 저장하는 클래스 + - S&P 500, NASDAQ, KOSPI, KOSDAQ 등 + - 개별 종목과 동일하게 price_data에 저장되지만, 티커명이 '^'로 시작함 + """ + + def __init__(self, db_name: str = "db"): + self.db_name = db_name + # 관리할 주요 지수 목록 (필요에 따라 추가) + self.INDICES = { + '^GSPC': 'S&P 500', + '^IXIC': 'NASDAQ Composite', + '^DJI': 'Dow Jones Industrial Average', + '^KS11': 'KOSPI Composite', + '^KQ11': 'KOSDAQ Composite', + '^RUT': 'Russell 2000' + } + self.FIXED_START_DATE = "2010-01-01" + + def get_start_date(self, ticker: str, repair_mode: bool) -> str: + """DB에서 마지막 수집일을 조회""" + if repair_mode: + return self.FIXED_START_DATE + + conn = get_db_conn(self.db_name) + cursor = conn.cursor() + try: + cursor.execute("SELECT MAX(date) FROM public.price_data WHERE ticker = %s", (ticker,)) + last_date = cursor.fetchone()[0] + if last_date: + return (last_date + timedelta(days=1)).strftime("%Y-%m-%d") + return self.FIXED_START_DATE + except Exception: + return self.FIXED_START_DATE + finally: + cursor.close() + conn.close() + + def fetch_index_data(self, ticker: str, start_date: str) -> pd.DataFrame: + """yfinance를 통해 지수 데이터 수집""" + try: + # 지수는 수정주가(Adj Close) 개념이 명확하지 않으나 통일성을 위해 수집 + df = yf.download(ticker, start=start_date, progress=False, auto_adjust=False, threads=False) + if df.empty: + return pd.DataFrame() + + if isinstance(df.columns, pd.MultiIndex): + df.columns = df.columns.get_level_values(0) + + # 거래대금 계산 (지수는 Volume이 0이거나 없는 경우가 많음) + if 'Close' in df.columns and 'Volume' in df.columns: + # 지수의 'Volume'은 거래량이 아닌 경우가 많아(계약수 등) 단순 참고용 + df['Amount'] = df['Close'] * df['Volume'] + else: + df['Amount'] = 0 + + return df + except Exception as e: + print(f" [Error] 지수 {ticker} 수집 실패: {e}") + return pd.DataFrame() + + def save_to_db(self, ticker: str, df: pd.DataFrame): + """DB 저장 (market_data와 동일한 price_data 테이블 사용)""" + conn = get_db_conn(self.db_name) + cursor = conn.cursor() + + insert_query = """ + INSERT INTO public.price_data ( + date, ticker, open, high, low, close, volume, adjusted_close, amount + ) + VALUES %s + ON CONFLICT (date, ticker) DO UPDATE SET + open = EXCLUDED.open, + high = EXCLUDED.high, + low = EXCLUDED.low, + close = EXCLUDED.close, + volume = EXCLUDED.volume, + adjusted_close = EXCLUDED.adjusted_close; + """ + + try: + data_to_insert = [] + has_adj = 'Adj Close' in df.columns + + for index, row in df.iterrows(): + try: + date_val = index.date() + + def get_val(col): + val = row.get(col, 0) + if hasattr(val, 'iloc'): return float(val.iloc[0]) + return float(val) if pd.notnull(val) else 0 + + open_val = get_val('Open') + high_val = get_val('High') + low_val = get_val('Low') + close_val = get_val('Close') + vol_val = int(get_val('Volume')) + adj_close_val = get_val('Adj Close') if has_adj else close_val + amount_val = get_val('Amount') + + data_to_insert.append(( + date_val, ticker, open_val, high_val, low_val, + close_val, vol_val, adj_close_val, amount_val + )) + except: + continue + + if data_to_insert: + execute_values(cursor, insert_query, data_to_insert) + conn.commit() + print(f" [{self.INDICES[ticker]}] {len(data_to_insert)}일치 저장 완료.") + else: + print(f" [{self.INDICES[ticker]}] 최신 데이터임.") + + except Exception as e: + conn.rollback() + print(f" [Error] DB 저장 오류: {e}") + finally: + cursor.close() + conn.close() + + def run(self, repair_mode: bool = False): + """전체 지수 업데이트 실행""" + mode_msg = "[Repair Mode]" if repair_mode else "[Update Mode]" + print(f"[IndexCollector] {mode_msg} 주요 시장 지수 업데이트 시작...") + + today = datetime.now().strftime("%Y-%m-%d") + + for ticker, name in self.INDICES.items(): + start_date = self.get_start_date(ticker, repair_mode) + + if not repair_mode and start_date > today: + print(f" [{name}] 이미 최신입니다.") + continue + + print(f" [{name}({ticker})] 수집 시작 ({start_date} ~ )...") + df = self.fetch_index_data(ticker, start_date) + if not df.empty: + self.save_to_db(ticker, df) + +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser() + parser.add_argument("--repair", action="store_true", help="전체 기간 재수집") + args = parser.parse_args() + + collector = IndexDataCollector() + collector.run(repair_mode=args.repair) \ No newline at end of file diff --git a/AI/modules/data_collector/macro_data.py b/AI/modules/data_collector/macro_data.py index 001692d9..44e982ef 100644 --- a/AI/modules/data_collector/macro_data.py +++ b/AI/modules/data_collector/macro_data.py @@ -1,9 +1,11 @@ +#AI/modules/data_collector/macro_data.py import sys import os import pandas as pd +import numpy as np import yfinance as yf from fredapi import Fred -from datetime import datetime +from datetime import datetime, timedelta from psycopg2.extras import execute_values # 프로젝트 루트 경로 설정 @@ -14,105 +16,225 @@ from AI.libs.database.connection import get_db_conn -# FRED API 키 설정 (발급받은 키를 입력하거나 환경변수 사용) -# 테스트용 예시 키입니다. 실제 사용 시 본인 키로 교체하세요. -FRED_API_KEY = os.getenv("FRED_API_KEY", "your_fred_api_key_here") +# 환경 변수 로드 +FRED_API_KEY = os.getenv("FRED_API_KEY") -def update_macro_data(db_name: str = "db"): +class MacroDataCollector: """ - 거시경제 지표(금리, CPI, 환율 등)를 수집하여 DB에 저장합니다. + 거시경제 지표 및 시장 위험 지표를 수집하여 DB에 적재하는 클래스 """ - print(f"[Macro Collector] 거시경제 데이터 업데이트 시작...") - - conn = get_db_conn(db_name) - cursor = conn.cursor() - - try: - fred = Fred(api_key=FRED_API_KEY) + def __init__(self, db_name="db"): + self.db_name = db_name + self.fred = Fred(api_key=FRED_API_KEY) + + def fetch_fred_data(self, start_date): + """ + FRED에서 거시경제 데이터를 수집합니다. + """ + print("[Macro] FRED 데이터 수집 시작...") - # 1. FRED 데이터 매핑 (DB컬럼명 : FRED Series ID) + # 스키마 컬럼명 : FRED Series ID 매핑 fred_map = { - 'interest_rate': 'FEDFUNDS', # 기준금리 - 'cpi': 'CPIAUCSL', # 소비자물가지수 - 'unemployment_rate': 'UNRATE', # 실업률 - 'gdp': 'GDP', # GDP - 'consumer_sentiment': 'UMCSENT', # 소비자심리지수 - 'ppi': 'PPIACO' # 생산자물가지수 + # --- 물가 및 경제성장 --- + 'cpi': 'CPIAUCSL', # 소비자물가지수 (Monthly) + 'core_cpi': 'CPILFESL', # 근원 소비자물가지수 (Monthly) + 'ppi': 'PPIACO', # 생산자물가지수 (Monthly) + 'gdp': 'GDP', # 명목 GDP (Quarterly) + 'real_gdp': 'GDPC1', # 실질 GDP (Quarterly) + 'pce': 'PCEPI', # 개인소비지출 물가지수 (Monthly) + 'core_pce': 'PCEPILFE', # 근원 PCE (Monthly) + + # --- 고용 및 소비 심리 --- + 'unemployment_rate': 'UNRATE', # 실업률 (Monthly) + 'jolt': 'JTSJOL', # 구인 건수 (Monthly) + 'consumer_sentiment': 'UMCSENT', # 미시간대 소비자심리지수 (Monthly) + 'cci': 'CSCICP03USM665S', # OECD 기준 소비자신뢰지수 (Monthly) + + # --- 금리 및 통화 정책 --- + 'interest_rate': 'DFF', # 연방기금 실효금리 (Daily, FEDFUNDS는 월간) + 'ff_targetrate_upper': 'DFEDTARU', # 연방기금금리 목표 상단 (Daily) + 'ff_targetrate_lower': 'DFEDTARL', # 연방기금금리 목표 하단 (Daily) + 'us10y': 'DGS10', # 미국채 10년물 금리 (Daily) + 'us2y': 'DGS2', # 미국채 2년물 금리 (Daily) + 'credit_spread_hy': 'BAMLH0A0HYM2', # 하이일드 채권 스프레드 (Daily) + + # --- 무역 --- + 'trade_balance': 'BOPGSTB', # 상품 및 서비스 무역수지 (Monthly) + 'tradebalance_goods': 'BOPGTB', # 상품 무역수지 (Monthly) + 'trade_import': 'IMPGS', # 수입액 (Monthly) + 'trade_export': 'EXPGS' # 수출액 (Monthly) } - # 데이터프레임 병합을 위한 리스트 dfs = [] - - # FRED 데이터 가져오기 for col, series_id in fred_map.items(): try: - # 최근 5년치 데이터 (필요 시 수정) - series = fred.get_series(series_id, observation_start='2020-01-01') + series = self.fred.get_series(series_id, observation_start=start_date) df = pd.DataFrame(series, columns=[col]) dfs.append(df) except Exception as e: - print(f" [FRED] {col} 수집 실패: {e}") - - # 2. Yahoo Finance 데이터 (환율, 유가 등) - # 스키마에 컬럼이 없다면 필요한 것만 추가하거나, 기존 컬럼에 매핑 - # 여기서는 예시로 추가적인 데이터를 수집한다고 가정 - yf_map = { - # 'col_name': 'ticker' - # 현재 schema.sql에는 환율 컬럼이 명시되어 있지 않으나, - # 필요하다면 ALTER TABLE로 추가 후 사용 권장 - } - - # 3. 데이터 병합 (날짜 기준 Outer Join) - if dfs: - macro_df = pd.concat(dfs, axis=1) - else: - macro_df = pd.DataFrame() + print(f" [Warning] FRED '{col}' ({series_id}) 수집 실패: {e}") + + if not dfs: + return pd.DataFrame() - # FRED 데이터는 월/분기 단위이므로 빈 날짜가 많음 -> 필요시 ffill()로 채우거나 그대로 저장 - # 여기서는 원본 날짜 그대로 저장 (발표일 기준) - macro_df.dropna(how='all', inplace=True) + # 인덱스(날짜) 기준으로 병합 (Outer Join) + fred_df = pd.concat(dfs, axis=1) + return fred_df + + def fetch_yahoo_data(self, start_date): + """ + Yahoo Finance에서 시장 지표(VIX, 달러, 유가, 금)를 수집합니다. + """ + print("[Macro] Yahoo Finance 데이터 수집 시작...") - # 4. DB 저장 + # 스키마 컬럼명 : Yahoo Ticker 매핑 + yahoo_map = { + 'vix_close': '^VIX', # 변동성 지수 + 'dxy_close': 'DX-Y.NYB', # 달러 인덱스 + 'wti_price': 'CL=F', # WTI 원유 선물 + 'gold_price': 'GC=F' # 금 선물 + } + + dfs = [] + for col, ticker in yahoo_map.items(): + try: + # yfinance는 데이터프레임으로 반환 + data = yf.download(ticker, start=start_date, progress=False) + if not data.empty: + # 'Close' 컬럼만 추출하여 이름 변경 + # yfinance 버전 이슈로 인해 MultiIndex 컬럼일 수 있음 처리 + if isinstance(data.columns, pd.MultiIndex): + price_series = data['Close'][ticker] + else: + price_series = data['Close'] + + df = pd.DataFrame(price_series) + df.columns = [col] + dfs.append(df) + except Exception as e: + print(f" [Warning] Yahoo '{col}' ({ticker}) 수집 실패: {e}") + + if not dfs: + return pd.DataFrame() + + yahoo_df = pd.concat(dfs, axis=1) + # Timezone 정보 제거 (FRED 데이터와 인덱스 타입을 맞추기 위함) + if yahoo_df.index.tz is not None: + yahoo_df.index = yahoo_df.index.tz_localize(None) + + return yahoo_df + + def save_to_db(self, combined_df): + """ + 통합된 데이터를 DB에 저장합니다. + """ + conn = get_db_conn(self.db_name) + cursor = conn.cursor() + insert_query = """ INSERT INTO public.macroeconomic_indicators ( - date, interest_rate, cpi, unemployment_rate, gdp, - consumer_sentiment, ppi + date, cpi, gdp, ppi, jolt, cci, interest_rate, trade_balance, + core_cpi, real_gdp, unemployment_rate, consumer_sentiment, + ff_targetrate_upper, ff_targetrate_lower, pce, core_pce, + tradebalance_goods, trade_import, trade_export, + us10y, us2y, yield_spread, vix_close, dxy_close, + wti_price, gold_price, credit_spread_hy ) VALUES %s ON CONFLICT (date) DO UPDATE SET - interest_rate = EXCLUDED.interest_rate, cpi = EXCLUDED.cpi, - unemployment_rate = EXCLUDED.unemployment_rate, gdp = EXCLUDED.gdp, + ppi = EXCLUDED.ppi, + jolt = EXCLUDED.jolt, + cci = EXCLUDED.cci, + interest_rate = EXCLUDED.interest_rate, + trade_balance = EXCLUDED.trade_balance, + core_cpi = EXCLUDED.core_cpi, + real_gdp = EXCLUDED.real_gdp, + unemployment_rate = EXCLUDED.unemployment_rate, consumer_sentiment = EXCLUDED.consumer_sentiment, - ppi = EXCLUDED.ppi; + ff_targetrate_upper = EXCLUDED.ff_targetrate_upper, + ff_targetrate_lower = EXCLUDED.ff_targetrate_lower, + pce = EXCLUDED.pce, + core_pce = EXCLUDED.core_pce, + tradebalance_goods = EXCLUDED.tradebalance_goods, + trade_import = EXCLUDED.trade_import, + trade_export = EXCLUDED.trade_export, + us10y = EXCLUDED.us10y, + us2y = EXCLUDED.us2y, + yield_spread = EXCLUDED.yield_spread, + vix_close = EXCLUDED.vix_close, + dxy_close = EXCLUDED.dxy_close, + wti_price = EXCLUDED.wti_price, + gold_price = EXCLUDED.gold_price, + credit_spread_hy = EXCLUDED.credit_spread_hy; """ + + try: + data_to_insert = [] + for date_idx, row in combined_df.iterrows(): + # NaN 값을 None(SQL NULL)으로 변환 + row = row.where(pd.notnull(row), None) + + data_to_insert.append(( + date_idx.date(), + row.get('cpi'), row.get('gdp'), row.get('ppi'), row.get('jolt'), row.get('cci'), + row.get('interest_rate'), row.get('trade_balance'), + row.get('core_cpi'), row.get('real_gdp'), row.get('unemployment_rate'), row.get('consumer_sentiment'), + row.get('ff_targetrate_upper'), row.get('ff_targetrate_lower'), + row.get('pce'), row.get('core_pce'), + row.get('tradebalance_goods'), row.get('trade_import'), row.get('trade_export'), + row.get('us10y'), row.get('us2y'), row.get('yield_spread'), + row.get('vix_close'), row.get('dxy_close'), + row.get('wti_price'), row.get('gold_price'), row.get('credit_spread_hy') + )) + + if data_to_insert: + execute_values(cursor, insert_query, data_to_insert) + conn.commit() + print(f"[Macro] {len(data_to_insert)}일치 데이터 저장/업데이트 완료.") + else: + print("[Macro] 저장할 데이터가 없습니다.") + + except Exception as e: + conn.rollback() + print(f"[Macro][Error] DB 저장 실패: {e}") + finally: + cursor.close() + conn.close() + + def run(self, lookback_days=365*2): + """ + 데이터 수집 및 저장 실행 메인 메소드 + """ + # 시작 날짜 계산 + start_date = (datetime.now() - timedelta(days=lookback_days)).strftime('%Y-%m-%d') - data_to_insert = [] - for date_idx, row in macro_df.iterrows(): - data_to_insert.append(( - date_idx.date(), - row.get('interest_rate'), - row.get('cpi'), - row.get('unemployment_rate'), - row.get('gdp'), - row.get('consumer_sentiment'), - row.get('ppi') - )) - - if data_to_insert: - execute_values(cursor, insert_query, data_to_insert) - conn.commit() - print(f"[Macro Collector] {len(data_to_insert)}건 저장 완료.") + # 1. 데이터 수집 + df_fred = self.fetch_fred_data(start_date) + df_yahoo = self.fetch_yahoo_data(start_date) + + # 2. 데이터 병합 (Outer Join) + if df_fred.empty and df_yahoo.empty: + print("[Macro] 수집된 데이터가 없습니다.") + return + + combined_df = pd.concat([df_fred, df_yahoo], axis=1) + + # 3. 파생 변수 계산: 장단기 금리차 (10Y - 2Y) + # 두 데이터가 모두 존재할 때만 계산 + if 'us10y' in combined_df.columns and 'us2y' in combined_df.columns: + combined_df['yield_spread'] = combined_df['us10y'] - combined_df['us2y'] else: - print("[Macro Collector] 저장할 데이터가 없습니다.") + combined_df['yield_spread'] = np.nan - except Exception as e: - conn.rollback() - print(f"[Macro Collector][Error] 실패: {e}") - finally: - cursor.close() - conn.close() + # 날짜 내림차순 정렬 (필수는 아니지만 확인용) + combined_df.sort_index(inplace=True) + + # 4. DB 저장 + self.save_to_db(combined_df) if __name__ == "__main__": - update_macro_data() \ No newline at end of file + # 최근 5년치 데이터 업데이트 + collector = MacroDataCollector() + collector.run(lookback_days=365*5) \ No newline at end of file diff --git a/AI/modules/data_collector/market_breadth_data.py b/AI/modules/data_collector/market_breadth_data.py new file mode 100644 index 00000000..7e9acced --- /dev/null +++ b/AI/modules/data_collector/market_breadth_data.py @@ -0,0 +1,161 @@ +#AI/modules/data_collector/market_breadth_data.py +import sys +import os +import yfinance as yf +import pandas as pd +from datetime import datetime, timedelta +from psycopg2.extras import execute_values + +# 프로젝트 루트 경로 설정 +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../../..")) +if project_root not in sys.path: + sys.path.append(project_root) + +from AI.libs.database.connection import get_db_conn + +class MarketBreadthCollector: + """ + [시장 폭 및 섹터 데이터 수집기] + - GICS 11개 섹터 + SPY(시장 전체) 수집 + - 매핑되지 않는 섹터의 '보험(Fallback)'으로 SPY를 사용 + """ + + def __init__(self, db_name: str = "db"): + self.db_name = db_name + + # [핵심] 섹터 매핑 정의 (표준화) + # Key: DB에 저장될 표준 섹터명 + # Value: 대표 ETF 티커 + self.SECTOR_ETF_MAP = { + # 1. GICS 11개 표준 섹터 + 'Technology': 'XLK', + 'Financial Services': 'XLF', + 'Healthcare': 'XLV', + 'Consumer Cyclical': 'XLY', + 'Consumer Defensive': 'XLP', + 'Energy': 'XLE', + 'Basic Materials': 'XLB', + 'Industrials': 'XLI', + 'Utilities': 'XLU', + 'Real Estate': 'XLRE', + 'Communication Services': 'XLC', + + # 2. [추가] 매핑 실패 시 사용할 '시장 전체' (Fallback) + 'Market': 'SPY' + } + + # [보완] yfinance의 변칙적인 섹터명을 표준명으로 연결하는 사전 + # (나중에 전처리나 stock_info 수집 시 활용 가능하지만, + # 여기서는 '어떤 ETF 데이터를 어디에 매핑할지'가 중요하므로 참고용 주석) + # 예: 'Information Technology' -> 'Technology' + # 'Materials' -> 'Basic Materials' + + def fetch_and_save_sector_returns(self, days_back: int = 365): + # ... (이전 코드와 로직 동일) ... + # self.SECTOR_ETF_MAP에 'Market': 'SPY'가 추가되었으므로 + # 자동으로 SPY 데이터도 'Market'이라는 섹터명으로 저장됩니다. + + print("[Breadth] 섹터 ETF(+SPY) 데이터 수집 시작...") + + start_date = (datetime.now() - timedelta(days=days_back + 10)).strftime('%Y-%m-%d') + tickers = list(self.SECTOR_ETF_MAP.values()) + ticker_to_sector = {v: k for k, v in self.SECTOR_ETF_MAP.items()} + + try: + # 1. 데이터 다운로드 + data = yf.download(tickers, start=start_date, progress=False, auto_adjust=True, threads=True) + + if data.empty: + print(" >> 수집된 데이터가 없습니다.") + return + + # MultiIndex 처리 (yfinance 버전에 따른 안전장치) + if isinstance(data.columns, pd.MultiIndex): + # 'Close' 레벨이 있으면 가져오고, 없으면 전체가 Close라고 가정 시도 + try: + closes = data.xs('Close', axis=1, level=0) + except KeyError: + # columns 구조가 (Ticker, PriceType)이 아니라 (PriceType, Ticker) 일수도 있음 + # yfinance 최근 버전은 (Price, Ticker) 구조임. + # 여기서는 간단히 'Adj Close'나 'Close'를 찾아서 처리 + if 'Close' in data.columns.get_level_values(0): + closes = data.xs('Close', axis=1, level=0) + else: + # 구조를 알 수 없을 때 + print(" [Warning] 데이터 컬럼 구조 인식 불가. Skip.") + return + else: + closes = data['Close'] + + # 2. 수익률 계산 + returns = closes.pct_change() + + # 3. DB 저장 + conn = get_db_conn(self.db_name) + cursor = conn.cursor() + + insert_query = """ + INSERT INTO public.sector_returns (date, sector, etf_ticker, return, close) + VALUES %s + ON CONFLICT (date, sector) + DO UPDATE SET + return = EXCLUDED.return, + close = EXCLUDED.close, + etf_ticker = EXCLUDED.etf_ticker; + """ + + data_to_insert = [] + + for date_idx, row in returns.iterrows(): + date_val = date_idx.date() + + for etf_ticker in tickers: + if etf_ticker not in row: continue + ret_val = row[etf_ticker] + if pd.isna(ret_val): continue + + # 종가 (closes 데이터프레임에서 가져옴) + if etf_ticker in closes.columns: + close_val = closes.loc[date_idx, etf_ticker] + else: + close_val = 0.0 + + sector_name = ticker_to_sector.get(etf_ticker, 'Unknown') + + data_to_insert.append(( + date_val, + sector_name, + etf_ticker, + float(ret_val), + float(close_val) + )) + + if data_to_insert: + batch_size = 1000 + for i in range(0, len(data_to_insert), batch_size): + execute_values(cursor, insert_query, data_to_insert[i:i+batch_size]) + conn.commit() + print(f" >> 섹터(+Market) 수익률 {len(data_to_insert)}건 저장 완료.") + + except Exception as e: + conn.rollback() + print(f" [Error] 섹터 데이터 처리 중 오류: {e}") + finally: + if 'cursor' in locals(): cursor.close() + if 'conn' in locals(): conn.close() + + def run(self, repair_mode: bool = False): + # Repair 모드면 2010년부터, 아니면 최근 2년치 + days = 365 * 15 if repair_mode else 365 * 2 + self.fetch_and_save_sector_returns(days_back=days) + +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser() + parser.add_argument("--repair", action="store_true") + parser.add_argument("--db", default="db") + args = parser.parse_args() + + collector = MarketBreadthCollector(db_name=args.db) + collector.run(repair_mode=args.repair) \ No newline at end of file diff --git a/AI/modules/data_collector/market_breadth_stats.py b/AI/modules/data_collector/market_breadth_stats.py new file mode 100644 index 00000000..6b79e7ee --- /dev/null +++ b/AI/modules/data_collector/market_breadth_stats.py @@ -0,0 +1,165 @@ +#AI/modules/data_collector/market_breadth_stats.py +import sys +import os +import pandas as pd +import numpy as np +from datetime import datetime, timedelta +from psycopg2.extras import execute_values + +# 프로젝트 루트 경로 설정 +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "../../..")) +if project_root not in sys.path: + sys.path.append(project_root) + +from AI.libs.database.connection import get_db_conn + +class MarketBreadthStatsCollector: + """ + [시장 통계 계산기] + - 외부 수집 없이 DB 내부 데이터(price_data)를 집계하여 파생 지표 생성 + - 1. NH-NL Index (52주 신고가 - 신저가) + - 2. Stocks > MA200 (200일선 상회 비율) + """ + + def __init__(self, db_name: str = "db"): + self.db_name = db_name + + def load_price_data(self, lookback_days: int = 400) -> pd.DataFrame: + """ + 통계 계산을 위해 최근 주가 데이터를 로드합니다. + (MA200, 52주(252일) 신고가 계산을 위해 최소 1년 이상의 데이터 필요) + """ + print(f"[Stats] 최근 {lookback_days}일치 주가 데이터 로딩 중 (메모리 최적화)...") + conn = get_db_conn(self.db_name) + + # 필요한 컬럼만 로드하여 메모리 절약 + query = f""" + SELECT date, ticker, close + FROM public.price_data + WHERE date >= CURRENT_DATE - INTERVAL '{lookback_days} days' + ORDER BY ticker, date + """ + + try: + df = pd.read_sql(query, conn) + df['date'] = pd.to_datetime(df['date']) + return df + except Exception as e: + print(f" [Error] 데이터 로드 실패: {e}") + return pd.DataFrame() + finally: + conn.close() + + def calculate_stats(self, df: pd.DataFrame) -> pd.DataFrame: + """Pandas를 이용해 종목별 지표 계산 후 날짜별로 집계(Groupby)""" + print("[Stats] 시장 지표(NH-NL, MA200%) 계산 중...") + + if df.empty: + return pd.DataFrame() + + # 1. 종목별 보조지표 계산 (Rolling) + # 52주 = 약 252 거래일, MA200 = 200 거래일 + df = df.sort_values(['ticker', 'date']) + + # GroupBy 객체 생성 + grouped = df.groupby('ticker')['close'] + + # (1) 52주 고가/저가 (현재가 포함) + df['high_52w'] = grouped.rolling(window=252, min_periods=200).max().reset_index(0, drop=True) + df['low_52w'] = grouped.rolling(window=252, min_periods=200).min().reset_index(0, drop=True) + + # (2) 200일 이동평균 + df['ma_200'] = grouped.rolling(window=200, min_periods=150).mean().reset_index(0, drop=True) + + # 2. 상태 판별 (Boolean) + # 신고가: 현재가가 52주 최고가와 같음 (또는 근접 99% 등 조정 가능하나 여기선 엄격하게 적용) + # 신저가: 현재가가 52주 최저가와 같음 + # 데이터 정밀도 문제로 약간의 오차 허용 (np.isclose) 또는 단순 비교 + + # 단순 비교 (Close >= High_52w) + # 주의: 당일 종가가 갱신되면 당일 High_52w도 당일 종가로 변함. 따라서 == 비교가 맞음. + df['is_nh'] = df['close'] >= df['high_52w'] + df['is_nl'] = df['close'] <= df['low_52w'] + df['is_above_ma200'] = df['close'] > df['ma_200'] + + # 3. 날짜별 집계 (Cross-sectional Aggregation) + stats = df.groupby('date').agg( + total_count=('ticker', 'count'), + nh_count=('is_nh', 'sum'), + nl_count=('is_nl', 'sum'), + above_ma200_count=('is_above_ma200', 'sum') + ).reset_index() + + # 4. 최종 메트릭 계산 + stats['nh_nl_index'] = stats['nh_count'] - stats['nl_count'] + stats['ma200_pct'] = (stats['above_ma200_count'] / stats['total_count']) * 100 + + # NaN 처리 (종목 수가 너무 적거나 초기 데이터 구간) + stats = stats.dropna() + + # 유효한 통계만 필터링 (최소 50종목 이상 거래된 날만) + stats = stats[stats['total_count'] > 50] + + return stats[['date', 'nh_nl_index', 'ma200_pct']] + + def save_to_db(self, stats_df: pd.DataFrame): + """계산된 통계를 market_breadth 테이블에 저장""" + if stats_df.empty: + print(" >> 계산된 통계 데이터가 없습니다.") + return + + conn = get_db_conn(self.db_name) + cursor = conn.cursor() + + query = """ + INSERT INTO public.market_breadth (date, nh_nl_index, ma200_pct) + VALUES %s + ON CONFLICT (date) + DO UPDATE SET + nh_nl_index = EXCLUDED.nh_nl_index, + ma200_pct = EXCLUDED.ma200_pct; + """ + + # DataFrame -> Tuples + data_values = [] + for _, row in stats_df.iterrows(): + data_values.append(( + row['date'].date(), + int(row['nh_nl_index']), + float(row['ma200_pct']) + )) + + try: + execute_values(cursor, query, data_values) + conn.commit() + print(f" >> 시장 통계(NH-NL, MA200%) {len(data_values)}일치 저장 완료.") + except Exception as e: + conn.rollback() + print(f" [Error] DB 저장 실패: {e}") + finally: + cursor.close() + conn.close() + + def run(self, repair_mode: bool = False): + # Repair 모드면 전체 기간(예: 10년), 아니면 최근 400일(MA200 계산 여유분 포함) 로드 + lookback = 365 * 10 if repair_mode else 400 + + # 1. 데이터 로드 + df = self.load_price_data(lookback_days=lookback) + + # 2. 계산 + stats_df = self.calculate_stats(df) + + # 3. 저장 + self.save_to_db(stats_df) + +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser() + parser.add_argument("--repair", action="store_true") + parser.add_argument("--db", default="db") + args = parser.parse_args() + + collector = MarketBreadthStatsCollector(db_name=args.db) + collector.run(repair_mode=args.repair) \ No newline at end of file diff --git a/AI/modules/data_collector/market_data.py b/AI/modules/data_collector/market_data.py index 924b29a6..b1d7277c 100644 --- a/AI/modules/data_collector/market_data.py +++ b/AI/modules/data_collector/market_data.py @@ -1,5 +1,4 @@ -# AI/modules/collector/market_data.py - +#AI/modules/data_collector/market_data.py import sys import os import yfinance as yf @@ -8,7 +7,7 @@ from typing import List from psycopg2.extras import execute_values -# 프로젝트 루트 경로 추가 +# 프로젝트 루트 경로 설정 current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.abspath(os.path.join(current_dir, "../../..")) if project_root not in sys.path: @@ -16,167 +15,214 @@ from AI.libs.database.connection import get_db_conn -# repair_mode 인자 추가 -def update_market_data(tickers: List[str], db_name: str = "db", repair_mode: bool = False): +class MarketDataCollector: """ - 지정된 종목들의 데이터를 수집하여 DB에 업데이트합니다. - - Args: - tickers: 종목 코드 리스트 - db_name: DB 설정 이름 - repair_mode: True일 경우, DB의 마지막 날짜를 무시하고 2015년부터 전체 재수집 (누락 데이터 복구용) + 주식 종목의 시세(OHLCV) 데이터를 수집하여 DB에 적재하는 클래스 + - yfinance를 사용하여 데이터 수집 + - 수리(Repair) 모드 지원 (누락 데이터 복구) + - 거래대금(Amount) 자동 계산 """ - mode_msg = "[Repair Mode]" if repair_mode else "[Update Mode]" - print(f"[Collector]{mode_msg} {len(tickers)}개 종목 데이터 작업 시작...") - - conn = get_db_conn(db_name) - cursor = conn.cursor() - # [설정] 전체 복구 시 시작할 기준 연도 - FIXED_START_DATE = "2015-01-01" - - try: - for ticker in tickers: - start_date = FIXED_START_DATE - - # 1. 일반 모드일 때만 DB에서 마지막 날짜 체크 - if not repair_mode: - cursor.execute("SELECT MAX(date) FROM public.price_data WHERE ticker = %s", (ticker,)) - last_date = cursor.fetchone()[0] - - if last_date: - start_date = (last_date + timedelta(days=1)).strftime("%Y-%m-%d") - else: - start_date = FIXED_START_DATE # 데이터 없으면 초기값 - - # 2. 오늘 날짜와 비교 - today = datetime.now().strftime("%Y-%m-%d") + def __init__(self, db_name: str = "db"): + self.db_name = db_name + # 수집 시작 기준일 (데이터가 아예 없을 경우) + self.FIXED_START_DATE = "2015-01-01" + + def get_start_date(self, ticker: str, repair_mode: bool) -> str: + """ + DB를 조회하여 수집 시작 날짜를 결정합니다. + """ + # 복구 모드이거나 강제 업데이트인 경우 고정 시작일 반환 + if repair_mode: + return self.FIXED_START_DATE + + conn = get_db_conn(self.db_name) + cursor = conn.cursor() + try: + cursor.execute("SELECT MAX(date) FROM public.price_data WHERE ticker = %s", (ticker,)) + last_date = cursor.fetchone()[0] - if start_date > today: - print(f" [{ticker}] 이미 최신 데이터입니다.") - continue - - # 3. yfinance 데이터 다운로드 - # repair_mode일 때는 2015년부터 오늘까지 전부 다운로드 (이미 있는건 DB가 무시함) - print(f" [{ticker}] 수집 중 ({start_date} ~ Today)...") - try: - # auto_adjust=False: Adj Close 확보, threads=False: 안정성 우선 - df = yf.download(ticker, start=start_date, progress=False, auto_adjust=False, threads=False) - except Exception as download_err: - print(f" [{ticker}] 다운로드 실패: {download_err}") - continue + if last_date: + # 마지막 데이터 다음 날부터 수집 + return (last_date + timedelta(days=1)).strftime("%Y-%m-%d") + else: + return self.FIXED_START_DATE + except Exception as e: + print(f" [Warning] 시작 날짜 조회 실패 ({ticker}): {e}") + return self.FIXED_START_DATE + finally: + cursor.close() + conn.close() + + def fetch_ohlcv(self, ticker: str, start_date: str) -> pd.DataFrame: + """ + yfinance에서 OHLCV 데이터를 다운로드하고 전처리합니다. + """ + try: + # auto_adjust=False: Adj Close 별도 확보 + df = yf.download(ticker, start=start_date, progress=False, auto_adjust=False, threads=False) if df.empty: - print(f" [{ticker}] 데이터 없음.") - continue + return pd.DataFrame() - # MultiIndex 컬럼 평탄화 + # MultiIndex 컬럼 평탄화 (yfinance 버전 이슈 대응) if isinstance(df.columns, pd.MultiIndex): df.columns = df.columns.get_level_values(0) - - # 4. DB 저장 - insert_query = """ - INSERT INTO public.price_data (date, ticker, open, high, low, close, volume, adjusted_close) - VALUES %s - ON CONFLICT (date, ticker) DO NOTHING - """ - # ON CONFLICT ... DO NOTHING 덕분에 - # Repair 모드에서 중복 데이터를 넣으려 해도 에러 없이 무시되고, - # '비어있는 날짜'만 정상적으로 INSERT 됩니다. + # 거래대금(Amount) 계산: 종가 * 거래량 (근사치) + # yfinance는 원화/달러 여부에 따라 단위가 다르지만, raw value 유지 + if 'Close' in df.columns and 'Volume' in df.columns: + df['Amount'] = df['Close'] * df['Volume'] + else: + df['Amount'] = 0 + + return df + except Exception as e: + print(f" [Error] {ticker} 다운로드 중 에러: {e}") + return pd.DataFrame() + + def save_to_db(self, ticker: str, df: pd.DataFrame): + """ + 데이터프레임을 DB에 저장(Upsert)합니다. + """ + conn = get_db_conn(self.db_name) + cursor = conn.cursor() + + insert_query = """ + INSERT INTO public.price_data ( + date, ticker, open, high, low, close, volume, adjusted_close, amount + ) + VALUES %s + ON CONFLICT (date, ticker) DO UPDATE SET + open = EXCLUDED.open, + high = EXCLUDED.high, + low = EXCLUDED.low, + close = EXCLUDED.close, + volume = EXCLUDED.volume, + adjusted_close = EXCLUDED.adjusted_close, + amount = EXCLUDED.amount; + """ + # repair_mode에서도 기존 값이 수정될 수 있도록 DO UPDATE로 변경하거나, + # 원본 보존을 원하면 DO NOTHING을 유지할 수 있음. + # 여기서는 최신 데이터로 갱신하는 DO UPDATE 전략 채택. + + try: data_to_insert = [] - has_adj_close = 'Adj Close' in df.columns + has_adj = 'Adj Close' in df.columns for index, row in df.iterrows(): - try: - date_val = index.date() - - def get_val(col_name): - val = row[col_name] - return val.iloc[0] if isinstance(val, pd.Series) else val - - open_val = float(get_val('Open')) - high_val = float(get_val('High')) - low_val = float(get_val('Low')) - close_val = float(get_val('Close')) - vol_val = int(get_val('Volume')) - - if has_adj_close: - adj_close_val = float(get_val('Adj Close')) - else: - adj_close_val = close_val - - data_to_insert.append(( - date_val, ticker, open_val, high_val, low_val, close_val, vol_val, adj_close_val - )) - except Exception: - continue - + date_val = index.date() + + # 안전한 값 추출 (Series일 경우 첫 번째 값 사용) + def get_val(col): + val = row.get(col, 0) + if hasattr(val, 'iloc'): + return float(val.iloc[0]) + return float(val) if pd.notnull(val) else 0 + + open_val = get_val('Open') + high_val = get_val('High') + low_val = get_val('Low') + close_val = get_val('Close') + vol_val = int(get_val('Volume')) + amount_val = get_val('Amount') + + adj_close_val = get_val('Adj Close') if has_adj else close_val + + data_to_insert.append(( + date_val, ticker, open_val, high_val, low_val, close_val, vol_val, adj_close_val, amount_val + )) + if data_to_insert: execute_values(cursor, insert_query, data_to_insert) conn.commit() - # Repair 모드일 땐 중복은 0건으로 잡히므로, 실제로 들어간 갯수를 정확히 알긴 어렵지만 - # execute_values는 에러 없이 수행됩니다. - print(f" [{ticker}] 처리 완료 (수집: {len(data_to_insert)}건).") - - except Exception as e: - conn.rollback() - print(f"[Collector][Error] 실패: {e}") - finally: - if cursor: cursor.close() - if conn: conn.close() + print(f" [{ticker}] {len(data_to_insert)}건 저장 완료.") + else: + print(f" [{ticker}] 저장할 데이터가 없습니다.") + + except Exception as e: + conn.rollback() + print(f" [{ticker}][Error] DB 저장 실패: {e}") + finally: + cursor.close() + conn.close() + + def update_tickers(self, tickers: List[str], repair_mode: bool = False): + """ + 여러 종목에 대해 수집 작업을 수행하는 메인 로직 + """ + mode_msg = "[Repair Mode]" if repair_mode else "[Update Mode]" + print(f"[MarketData] {mode_msg} {len(tickers)}개 종목 업데이트 시작...") + + today = datetime.now().strftime("%Y-%m-%d") + + for ticker in tickers: + # 1. 수집 시작 날짜 결정 + start_date = self.get_start_date(ticker, repair_mode) + + # 이미 최신이면 스킵 (Repair 모드가 아닐 때만) + if not repair_mode and start_date > today: + print(f" [{ticker}] 이미 최신 데이터입니다.") + continue + + print(f" [{ticker}] 수집 시작 ({start_date} ~ )...") + + # 2. 데이터 수집 + df = self.fetch_ohlcv(ticker, start_date) + + # 3. DB 저장 + if not df.empty: + self.save_to_db(ticker, df) + else: + print(f" [{ticker}] 수집된 데이터가 없습니다.") # ---------------------------------------------------------------------- -# [수동 실행 모드] +# [실행 모드] # ---------------------------------------------------------------------- if __name__ == "__main__": import argparse + from AI.libs.database.ticker_loader import load_all_tickers_from_db - # 1. 터미널 인자 파서 설정 parser = argparse.ArgumentParser(description="[수동] 주식 데이터 수집기") parser.add_argument("tickers", nargs='*', help="수집할 종목 티커 리스트") - parser.add_argument("--all", action="store_true", help="DB 전 종목 업데이트") - parser.add_argument("--repair", action="store_true", help="[복구모드] 누락 확인을 위해 전체 기간 재수집") + parser.add_argument("--all", action="store_true", help="DB에 등록된 모든 종목 업데이트") + parser.add_argument("--repair", action="store_true", help="[복구모드] 전체 기간 재수집") parser.add_argument("--db", default="db", help="DB 이름") args = parser.parse_args() target_tickers = args.tickers - # 2. --all 옵션 처리 + # --all 옵션 처리 if args.all: try: - from AI.libs.database.ticker_loader import load_all_tickers_from_db print(">> DB에서 전체 종목 리스트를 조회합니다...") target_tickers = load_all_tickers_from_db(verbose=True) except Exception as e: print(f"[Error] 종목 로드 실패: {e}") sys.exit(1) - # 3. 인자가 없을 때: 사용법 안내 및 입력 대기 - if not target_tickers and not args.all: + # 입력이 없을 경우 인터랙티브 모드 + if not target_tickers: print("\n========================================================") - print(" [Manual Mode] 주식 데이터 수집기") + print(" [Manual Mode] 주식 시장 데이터 수집기") print("========================================================") print(" 사용 예시:") print(" 1) 특정 종목 수집 : python market_data.py AAPL TSLA") print(" 2) 전체 종목 업데이트: python market_data.py --all") print(" 3) 누락 데이터 복구 : python market_data.py --all --repair") print("========================================================") - print(">> 수집할 종목 코드를 공백으로 구분하여 입력하세요.") - print(" (종료하려면 그냥 엔터를 누르세요)") + print(">> 수집할 종목 코드를 공백으로 구분하여 입력하세요 (종료: Enter)") try: input_str = sys.stdin.readline().strip() if input_str: target_tickers = input_str.split() + else: + sys.exit(0) except KeyboardInterrupt: - print("\n취소되었습니다.") sys.exit(0) - # 4. 수집 실행 if target_tickers: - # update_market_data 함수에 repair_mode 전달 - update_market_data(target_tickers, db_name=args.db, repair_mode=args.repair) - print("\n[완료] 모든 작업이 끝났습니다.") - else: - print(">> 입력된 종목이 없어 종료합니다.") \ No newline at end of file + collector = MarketDataCollector(db_name=args.db) + collector.update_tickers(target_tickers, repair_mode=args.repair) + print("\n[완료] 모든 작업이 끝났습니다.") \ No newline at end of file diff --git a/AI/modules/data_collector/run.py b/AI/modules/data_collector/run.py index c6725c19..1e87b687 100644 --- a/AI/modules/data_collector/run.py +++ b/AI/modules/data_collector/run.py @@ -1,4 +1,3 @@ -# AI/modules/data_collector/run.py import sys import os import argparse @@ -6,9 +5,7 @@ from datetime import datetime # ----------------------------------------------------------- -# [경로 설정 수정] -# 파일 위치가 깊어졌으므로 프로젝트 루트를 찾기 위해 3단계 상위로 이동합니다. -# (run.py -> data_collector -> modules -> AI -> 프로젝트 루트) +# [경로 설정] # ----------------------------------------------------------- current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.abspath(os.path.join(current_dir, "../../..")) @@ -20,113 +17,221 @@ # [모듈 임포트] # ----------------------------------------------------------- from AI.libs.database.connection import get_db_conn -# 같은 폴더 내에 있지만, 프로젝트 루트를 기준으로 절대 경로 import를 권장합니다. -from AI.modules.data_collector.market_data import update_market_data -from AI.modules.data_collector.stock_info_collector import update_stock_info -from AI.modules.data_collector.company_fundamentals_data import update_company_fundamentals -from AI.modules.data_collector.macro_data import update_macro_data -def get_target_tickers(db_name="db"): +from AI.modules.data_collector.market_data import MarketDataCollector +from AI.modules.data_collector.stock_info_collector import StockInfoCollector +from AI.modules.data_collector.company_fundamentals_data import FundamentalsDataCollector +from AI.modules.data_collector.macro_data import MacroDataCollector +from AI.modules.data_collector.crypto_data import CryptoDataCollector +from AI.modules.data_collector.index_data import IndexDataCollector +from AI.modules.data_collector.event_data import EventDataCollector +from AI.modules.data_collector.market_breadth_data import MarketBreadthCollector +from AI.modules.data_collector.market_breadth_stats import MarketBreadthStatsCollector + +def get_stock_tickers(db_name="db"): """ - 수집 대상 티커 리스트를 DB에서 조회합니다. + 주식 수집 대상 티커 리스트를 DB에서 조회합니다. """ conn = get_db_conn(db_name) cursor = conn.cursor() try: + # price_data 테이블에 있는 종목들 조회 cursor.execute("SELECT DISTINCT ticker FROM public.price_data") rows = cursor.fetchall() - tickers = [r[0] for r in rows] + tickers = [r[0] for r in rows if not r[0].startswith('^') and '-USD' not in r[0]] if not tickers: - print(">> DB에 종목이 없습니다. 기본 종목(AAPL, TSLA 등)으로 시작합니다.") + print(">> [Init] DB에 주식 종목이 없습니다. 기본 종목(Big Tech)으로 시작합니다.") return ["AAPL", "TSLA", "NVDA", "MSFT", "GOOGL", "AMD", "INTC"] return tickers except Exception as e: print(f"[Error] 티커 목록 조회 실패: {e}") - return [] + return ["AAPL", "TSLA"] finally: - cursor.close() - conn.close() + if cursor: cursor.close() + if conn: conn.close() def main(): parser = argparse.ArgumentParser(description="[SISC AI] 통합 데이터 수집 파이프라인") parser.add_argument("--db", default="db", help="DB 연결 정보 키") - parser.add_argument("--tickers", nargs="*", help="특정 티커만 수집 (생략 시 전체)") + parser.add_argument("--tickers", nargs="*", help="특정 주식 티커만 수집 (생략 시 DB 전체)") + + # 모듈별 스킵 옵션 + parser.add_argument("--skip-price", action="store_true", help="주식 시세 수집 Skip") + parser.add_argument("--skip-index", action="store_true", help="시장 지수 수집 Skip") + parser.add_argument("--skip-info", action="store_true", help="주식 정보 수집 Skip") + parser.add_argument("--skip-fund", action="store_true", help="재무제표 수집 Skip") + parser.add_argument("--skip-macro", action="store_true", help="거시경제 수집 Skip") + parser.add_argument("--skip-crypto", action="store_true", help="암호화폐 수집 Skip") + parser.add_argument("--skip-event", action="store_true", help="이벤트 데이터 수집 Skip") + parser.add_argument("--skip-breadth", action="store_true", help="시장 폭(ETF) 데이터 수집 Skip") + parser.add_argument("--skip-stats", action="store_true", help="시장 통계(NH-NL) 계산 Skip") + + parser.add_argument("--repair", action="store_true", help="데이터 누락 복구 모드 (전체 기간 재수집)") - # 개별 수집 스킵 옵션 - parser.add_argument("--skip-price", action="store_true", help="주가 데이터 수집 건너뛰기") - parser.add_argument("--skip-info", action="store_true", help="주식 정보 수집 건너뛰기") - parser.add_argument("--skip-fund", action="store_true", help="재무제표 수집 건너뛰기") - parser.add_argument("--skip-macro", action="store_true", help="거시경제 지표 수집 건너뛰기") + # 단독 실행 모드 옵션 + parser.add_argument("--event-only", action="store_true", help="이벤트 데이터만 수집 (나머지 Skip)") + parser.add_argument("--market-breadth-only", action="store_true", help="시장 폭(ETF) 데이터만 수집 (나머지 Skip)") + parser.add_argument("--stats-only", action="store_true", help="시장 통계 계산만 수행 (나머지 Skip)") args = parser.parse_args() - # 1. 대상 종목 선정 - if args.tickers: - target_tickers = args.tickers - else: - print(">> 전체 관리 종목을 조회합니다...") - target_tickers = get_target_tickers(args.db) + # ------------------------------------------------------- + # [Only 모드 로직 처리] + # 특정 모드가 켜지면 나머지 skip 옵션을 강제로 True로 설정 + # ------------------------------------------------------- + + # 1. Stats Only + if args.stats_only: + args.skip_price = True + args.skip_index = True + args.skip_info = True + args.skip_fund = True + args.skip_macro = True + args.skip_crypto = True + args.skip_event = True + args.skip_breadth = True + args.skip_stats = False # 통계만 실행 + print(">> [Mode] Market Stats Calculation Only 모드") - if not target_tickers: - print(">> 수집할 종목이 없어 종료합니다.") - return + # 2. Event Only + if args.event_only: + args.skip_price = True + args.skip_index = True + args.skip_info = True + args.skip_fund = True + args.skip_macro = True + args.skip_crypto = True + args.skip_breadth = True + args.skip_stats = True # [수정] 통계 계산도 Skip 해야 함 + args.skip_event = False + print(">> [Mode] Event Data Only 모드로 실행합니다.") + + # 3. Market Breadth Only (ETF Sector Returns) + if args.market_breadth_only: + args.skip_price = True + args.skip_index = True + args.skip_info = True + args.skip_fund = True + args.skip_macro = True + args.skip_crypto = True + args.skip_event = True + args.skip_stats = True # [수정] 통계 계산도 Skip 해야 함 + args.skip_breadth = False + print(">> [Mode] Market Breadth Only 모드로 실행합니다.") print(f"\n========================================================") print(f" [SISC Data Collector] 통합 수집 시작 ({datetime.now()})") - print(f" 대상 종목 수: {len(target_tickers)}개") print(f"========================================================\n") start_time = time.time() + # 1. 주식 종목 선정 (Price, Info, Fund, Event 단계에서만 필요) + # Stats 단계는 내부 데이터만 쓰므로 티커 리스트 불필요 + need_stock_tickers = not (args.skip_price and args.skip_info and args.skip_fund and args.skip_event) + + stock_tickers = [] + if need_stock_tickers: + if args.tickers: + stock_tickers = args.tickers + else: + stock_tickers = get_stock_tickers(args.db) + print(f">> 타겟 주식 종목 수: {len(stock_tickers)}개") + else: + print(">> 개별 주식 티커 조회를 건너뜁니다.") + # ------------------------------------------------------- # 2. 순차적 데이터 수집 실행 # ------------------------------------------------------- - # (1) 주가 데이터 (OHLCV) - if not args.skip_price: + # (1) 거시경제 지표 (Macro) + if not args.skip_macro: + try: + print("\n>>> [Step 1] 거시경제 지표(Macro) 업데이트") + collector = MacroDataCollector(db_name=args.db) + collector.run(lookback_days=365*5 if args.repair else 365*2) + except Exception as e: + print(f"[Error] Macro Data 수집 중단: {e}") + + # (2) 시장 지수 (Index) + if not args.skip_index: + try: + print("\n>>> [Step 2] 시장 지수(Index) 업데이트") + collector = IndexDataCollector(db_name=args.db) + collector.run(repair_mode=args.repair) + except Exception as e: + print(f"[Error] Index Data 수집 중단: {e}") + + # (3) 주가 데이터 (Stocks OHLCV) + if not args.skip_price and stock_tickers: try: - print("\n>>> [Step 1] 주가 데이터(OHLCV) 업데이트") - update_market_data(target_tickers, db_name=args.db, repair_mode=False) + print("\n>>> [Step 3] 개별 주식 시세(OHLCV) 업데이트") + collector = MarketDataCollector(db_name=args.db) + collector.update_tickers(stock_tickers, repair_mode=args.repair) except Exception as e: print(f"[Error] Market Data 수집 중단: {e}") - else: - print("\n>>> [Step 1] 주가 데이터 수집 Skip") - # (2) 주식 기본 정보 (Stock Info) - if not args.skip_info: + # (4) 암호화폐 데이터 (Crypto) + if not args.skip_crypto: try: - print("\n>>> [Step 2] 주식 기본 정보(Stock Info) 업데이트") - update_stock_info(target_tickers, db_name=args.db) + print("\n>>> [Step 4] 암호화폐(Crypto) 업데이트") + target_crypto = ["BTC-USD", "ETH-USD"] + collector = CryptoDataCollector(db_name=args.db) + collector.update_tickers(target_crypto, repair_mode=args.repair) except Exception as e: - print(f"[Error] Stock Info 수집 중단: {e}") - else: - print("\n>>> [Step 2] 주식 정보 수집 Skip") + print(f"[Error] Crypto Data 수집 중단: {e}") - # (3) 재무제표 (Fundamentals) - if not args.skip_fund: + # (5) 재무제표 (Fundamentals) + if not args.skip_fund and stock_tickers: try: - print("\n>>> [Step 3] 재무제표(Fundamentals) 업데이트") - update_company_fundamentals(target_tickers, db_name=args.db) + print("\n>>> [Step 5] 기업 재무제표(Fundamentals) 업데이트") + collector = FundamentalsDataCollector(db_name=args.db) + collector.update_tickers(stock_tickers) except Exception as e: print(f"[Error] Fundamentals 수집 중단: {e}") - else: - print("\n>>> [Step 3] 재무제표 수집 Skip") - # (4) 거시경제 지표 (Macro) - if not args.skip_macro: + # (6) 주식 기본 정보 (Stock Info) + if not args.skip_info and stock_tickers: try: - print("\n>>> [Step 4] 거시경제 지표(Macro) 업데이트") - update_macro_data(db_name=args.db) + print("\n>>> [Step 6] 주식 정보(Stock Info) 업데이트") + collector = StockInfoCollector(db_name=args.db) + collector.update_tickers(stock_tickers) except Exception as e: - print(f"[Error] Macro Data 수집 중단: {e}") - else: - print("\n>>> [Step 4] 거시경제 지표 수집 Skip") + print(f"[Error] Stock Info 수집 중단: {e}") + + # (7) 이벤트 일정 (Earnings, Macro Events) + if not args.skip_event: + try: + print("\n>>> [Step 7] 이벤트 일정(Event Data) 업데이트") + collector = EventDataCollector(db_name=args.db) + collector.update_macro_events(force_update=args.repair) + if stock_tickers: + collector.update_earnings_dates(stock_tickers) + except Exception as e: + print(f"[Error] Event Data 수집 중단: {e}") + + # (8) 시장 폭 및 섹터 데이터 (Market Breadth - Sector Returns) + if not args.skip_breadth: + try: + print("\n>>> [Step 8] 시장 폭 및 섹터 데이터(Sector Returns) 업데이트") + collector = MarketBreadthCollector(db_name=args.db) + collector.run(repair_mode=args.repair) + except Exception as e: + print(f"[Error] Sector Data 수집 중단: {e}") + + # (9) 시장 통계 계산 (Market Breadth Stats - Internal Aggregation) + if not args.skip_stats: + try: + print("\n>>> [Step 9] 시장 통계(NH-NL, MA200%) 계산 및 저장") + collector = MarketBreadthStatsCollector(db_name=args.db) + collector.run(repair_mode=args.repair) + except Exception as e: + print(f"[Error] Market Stats 계산 중단: {e}") elapsed = time.time() - start_time print(f"\n========================================================") - print(f" [완료] 모든 데이터 수집 종료 (총 소요시간: {elapsed:.2f}초)") + print(f" [완료] 모든 작업 종료 (총 소요시간: {elapsed:.2f}초)") print(f"========================================================") if __name__ == "__main__": diff --git a/AI/modules/data_collector/stock_info_collector.py b/AI/modules/data_collector/stock_info_collector.py index e1ca300a..de8211e6 100644 --- a/AI/modules/data_collector/stock_info_collector.py +++ b/AI/modules/data_collector/stock_info_collector.py @@ -1,11 +1,4 @@ # AI/modules/collector/stock_info_collector.py -""" -[종목 정보 수집기] -- yfinance의 Ticker.info를 사용하여 섹터(Sector), 산업(Industry), 시가총액 등을 수집합니다. -- 수집된 정보는 public.stock_info 테이블에 저장됩니다. -- 이 작업은 API 호출 속도가 느리므로, 자주 실행하지 않고 월 1회 또는 필요시 실행을 권장합니다. -""" - import sys import os import time @@ -13,7 +6,7 @@ from typing import List from datetime import datetime -# 프로젝트 루트 경로 추가 +# 프로젝트 루트 경로 설정 current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.abspath(os.path.join(current_dir, "../../..")) if project_root not in sys.path: @@ -21,87 +14,103 @@ from AI.libs.database.connection import get_db_conn -def update_stock_info(tickers: List[str], db_name: str = "db"): +class StockInfoCollector: """ - 지정된 티커들의 메타 정보(Sector, Industry 등)를 수집하여 DB에 Upsert 합니다. + [종목 정보 수집기] + - yfinance의 Ticker.info를 사용하여 기업의 섹터(Sector), 산업(Industry), 시가총액(Market Cap) 등을 수집합니다. + - 수집된 데이터는 'stock_info' 테이블에 저장되며, 재무 분석 및 포트폴리오 구성 시 메타 데이터로 활용됩니다. + - API 호출 속도 제한을 고려하여 적절한 대기 시간(sleep)을 가집니다. """ - print(f"[Info Collector] 총 {len(tickers)}개 종목 정보 업데이트 시작...") - - conn = get_db_conn(db_name) - cursor = conn.cursor() - - # Upsert 쿼리 (이미 있으면 업데이트) - upsert_query = """ - INSERT INTO public.stock_info (ticker, sector, industry, market_cap, updated_at) - VALUES (%s, %s, %s, %s, %s) - ON CONFLICT (ticker) - DO UPDATE SET - sector = EXCLUDED.sector, - industry = EXCLUDED.industry, - market_cap = EXCLUDED.market_cap, - updated_at = EXCLUDED.updated_at; - """ - - success_count = 0 - fail_count = 0 - - try: - for i, ticker in enumerate(tickers): - try: - # 진행 상황 출력 (10개 단위) - if i % 10 == 0: - print(f" >> 진행 중... ({i}/{len(tickers)})") - # yfinance API 호출 (네트워크 통신 발생) - yf_ticker = yf.Ticker(ticker) - info = yf_ticker.info - - # 필요한 정보 추출 (없으면 None) - sector = info.get('sector') - industry = info.get('industry') - market_cap = info.get('marketCap') - - # 유효성 체크: 최소한 섹터 정보라도 있어야 저장 가치가 있음 - if not sector and not industry: - # 정보가 아예 없으면 건너뜀 (ETF나 상장폐지 종목일 수도 있음) - # print(f" [{ticker}] 정보 없음(Skip)") - fail_count += 1 - continue + def __init__(self, db_name: str = "db"): + self.db_name = db_name + + def save_to_db(self, ticker: str, info: dict, cursor): + """ + 단일 종목의 정보를 DB에 저장(Upsert)합니다. + """ + # 필요한 정보 추출 (없으면 None) + sector = info.get('sector') + industry = info.get('industry') + market_cap = info.get('marketCap') + + # 유효성 체크: 최소한 섹터 정보라도 있어야 저장 가치가 있음 + if not sector and not industry: + return False - # DB 실행 - cursor.execute(upsert_query, ( - ticker, - sector, - industry, - market_cap, - datetime.now() - )) - success_count += 1 - - # 너무 빠른 요청 방지 (0.2초 대기) - time.sleep(0.2) - - except Exception as e: - print(f" [{ticker}] 에러 발생: {e}") - fail_count += 1 - continue + upsert_query = """ + INSERT INTO public.stock_info (ticker, sector, industry, market_cap, updated_at) + VALUES (%s, %s, %s, %s, %s) + ON CONFLICT (ticker) + DO UPDATE SET + sector = EXCLUDED.sector, + industry = EXCLUDED.industry, + market_cap = EXCLUDED.market_cap, + updated_at = EXCLUDED.updated_at; + """ + + cursor.execute(upsert_query, ( + ticker, + sector, + industry, + market_cap, + datetime.now() + )) + return True - conn.commit() - print(f"\n[Info Collector] 완료! (성공: {success_count}건, 실패/없음: {fail_count}건)") + def update_tickers(self, tickers: List[str]): + """ + 지정된 티커 리스트의 메타 정보를 수집하여 업데이트합니다. + """ + print(f"[StockInfo] 총 {len(tickers)}개 종목 정보 업데이트 시작...") + + conn = get_db_conn(self.db_name) + cursor = conn.cursor() + + success_count = 0 + fail_count = 0 - except Exception as e: - conn.rollback() - print(f"[Info Collector][Fatal Error] 작업 중단: {e}") - finally: - cursor.close() - conn.close() + try: + for i, ticker in enumerate(tickers): + try: + # 진행 상황 출력 (10개 단위) + if i > 0 and i % 10 == 0: + print(f" >> 진행 중... ({i}/{len(tickers)})") + + # yfinance API 호출 (네트워크 통신 발생) + yf_ticker = yf.Ticker(ticker) + info = yf_ticker.info + + if self.save_to_db(ticker, info, cursor): + success_count += 1 + else: + # 정보가 없는 경우 (ETF나 상장폐지 등) + fail_count += 1 + + # API 과부하 방지 (0.2초 대기) + time.sleep(0.2) + + except Exception as e: + print(f" [{ticker}] 수집 실패: {e}") + fail_count += 1 + continue + + conn.commit() + print(f"\n[StockInfo] 완료! (성공: {success_count}건, 실패/없음: {fail_count}건)") + + except Exception as e: + conn.rollback() + print(f"[StockInfo][Fatal Error] 작업 중단: {e}") + finally: + cursor.close() + conn.close() # ---------------------------------------------------------------------- # [실행 모드] -# 사용법: python stock_info_collector.py --all # ---------------------------------------------------------------------- if __name__ == "__main__": import argparse + from AI.libs.database.ticker_loader import load_all_tickers_from_db parser = argparse.ArgumentParser(description="주식 종목 정보(섹터/산업) 수집기") parser.add_argument("tickers", nargs='*', help="수집할 종목 티커 (예: AAPL)") @@ -113,7 +122,6 @@ def update_stock_info(tickers: List[str], db_name: str = "db"): if args.all: try: - from AI.libs.database.ticker_loader import load_all_tickers_from_db print(">> DB에서 종목 리스트를 불러옵니다...") target_tickers = load_all_tickers_from_db(verbose=True) except Exception as e: @@ -121,6 +129,9 @@ def update_stock_info(tickers: List[str], db_name: str = "db"): sys.exit(1) if target_tickers: - update_stock_info(target_tickers, db_name=args.db) + collector = StockInfoCollector(db_name=args.db) + collector.update_tickers(target_tickers) else: - print(">> 실행할 종목이 없습니다. (사용법: python stock_info_collector.py --all)") \ No newline at end of file + print("\n[Manual Mode] 사용법:") + print(" 1) 특정 종목 : python stock_info_collector.py AAPL MSFT") + print(" 2) 전체 종목 : python stock_info_collector.py --all") \ No newline at end of file diff --git a/AI/modules/finder/evaluator.py b/AI/modules/finder/evaluator.py index 6559f7b9..2618a248 100644 --- a/AI/modules/finder/evaluator.py +++ b/AI/modules/finder/evaluator.py @@ -17,7 +17,7 @@ if project_root not in sys.path: sys.path.append(project_root) -from AI.libs.database.fetcher import fetch_ohlcv +from AI.libs.database.fetcher import fetch_price_data from AI.modules.signal.core.features import add_technical_indicators def evaluate_ticker(ticker: str) -> Dict[str, float]: @@ -31,7 +31,7 @@ def evaluate_ticker(ticker: str) -> Dict[str, float]: start_str = start_dt.strftime("%Y-%m-%d") end_str = end_dt.strftime("%Y-%m-%d") - df = fetch_ohlcv(ticker, start=start_str, end=end_str) + df = fetch_price_data(ticker, start_date=start_str, end_date=end_str) if df.empty or len(df) < 60: return {'total_score': 0.0, 'reason': '데이터 부족'} diff --git a/AI/modules/signal/core/data_loader.py b/AI/modules/signal/core/data_loader.py index 0d4e5fee..f76c5f42 100644 --- a/AI/modules/signal/core/data_loader.py +++ b/AI/modules/signal/core/data_loader.py @@ -1,187 +1,290 @@ # AI/modules/signal/core/data_loader.py """ -[데이터 로더 - Multi-Horizon Version] -- 1, 3, 5, 7일 뒤의 등락을 한 번에 모두 라벨링합니다. -- 정답(y)의 모양이 (N,)에서 (N, 4)로 바뀝니다. +[Data Loader - Integrated & Dynamic Version] +- 주가(Price), 거시경제(Macro), 시장지표(Breadth), 뉴스심리(Sentiment), 펀더멘털(Fundamental) 데이터를 통합 로드합니다. +- 테이블별로 데이터를 조회한 뒤, Pandas Merge를 통해 시계열을 정렬합니다. +- Multi-Horizon (예: 1, 3, 5, 7일) 예측을 동적으로 설정하여 라벨링을 수행합니다. """ +import sys +import os import numpy as np import pandas as pd -from typing import Tuple, Dict +from typing import Tuple, Dict, List, Optional from sklearn.preprocessing import MinMaxScaler +from sqlalchemy import text from tqdm import tqdm -import sys -import os - +# 프로젝트 루트 경로 설정 current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.abspath(os.path.join(current_dir, "../../../..")) if project_root not in sys.path: sys.path.append(project_root) -from AI.libs.database.connection import get_db_conn -from AI.modules.signal.core.features import add_technical_indicators, add_multi_timeframe_features +# DB 연결 및 Fetcher 모듈 import +from AI.libs.database.connection import get_engine +from AI.libs.database.fetcher import ( + fetch_macro_indicators, + fetch_market_breadth, + fetch_news_sentiment, + fetch_fundamentals +) +from AI.modules.signal.core.features import add_technical_indicators class DataLoader: - def __init__(self, db_name="db", lookback=60): + def __init__(self, db_name="db", lookback=60, horizons: List[int] = None): + """ + :param db_name: DB 연결 설정 이름 + :param lookback: 시퀀스 길이 (과거 며칠을 볼 것인가) + :param horizons: 예측할 미래 시점 리스트 (예: [1, 3, 5] -> 1일뒤, 3일뒤, 5일뒤 예측) + None일 경우 기본값 [1, 3, 5, 7] 사용 + """ self.db_name = db_name self.lookback = lookback + self.horizons = horizons if horizons else [1, 3, 5, 7] self.scaler = MinMaxScaler() + # 메타데이터 ID 매핑 self.ticker_to_id: Dict[str, int] = {} self.sector_to_id: Dict[str, int] = {} self.ticker_sector_map: Dict[str, int] = {} + # 공통 데이터 캐싱 (Macro, Market Breadth) + self.macro_df: pd.DataFrame = pd.DataFrame() + self.breadth_df: pd.DataFrame = pd.DataFrame() + + # 초기화 시 메타데이터 로드 self._load_metadata() def _load_metadata(self): - conn = get_db_conn(self.db_name) - cursor = conn.cursor() + """종목 및 섹터 정보를 로드하여 ID 매핑 생성""" + engine = get_engine(self.db_name) try: - query = "SELECT ticker, COALESCE(sector, 'Unknown') FROM public.stock_info" - cursor.execute(query) - rows = cursor.fetchall() - unique_sectors = sorted(list(set([row[1] for row in rows]))) + query = text("SELECT ticker, COALESCE(sector, 'Unknown') as sector FROM public.stock_info") + with engine.connect() as conn: + df_meta = pd.read_sql(query, conn) + + if df_meta.empty: + print("[DataLoader] Warning: stock_info 테이블이 비어있습니다.") + return + + unique_sectors = sorted(df_meta['sector'].unique().tolist()) self.sector_to_id = {sec: i for i, sec in enumerate(unique_sectors)} - self.ticker_sector_map = {row[0]: self.sector_to_id[row[1]] for row in rows} - self.ticker_to_id = {row[0]: i for i, row in enumerate(rows)} + + for _, row in df_meta.iterrows(): + self.ticker_sector_map[row['ticker']] = self.sector_to_id[row['sector']] + + self.ticker_to_id = {t: i for i, t in enumerate(df_meta['ticker'])} print(f"[DataLoader] 메타데이터 로드 완료: {len(self.ticker_to_id)}개 종목") + except Exception as e: print(f"[DataLoader] 메타데이터 로드 실패: {e}") - finally: - if cursor: cursor.close() - if conn: conn.close() + + def _prepare_common_data(self, start_date: str): + """ + [최적화] 모든 종목에 공통으로 적용되는 거시경제/시장지표를 미리 한 번만 로드합니다. + """ + try: + print("[DataLoader] 공통 데이터(Macro, Breadth) 로드 중...") + self.macro_df = fetch_macro_indicators(start_date, self.db_name) + self.breadth_df = fetch_market_breadth(start_date, self.db_name) + except Exception as e: + print(f"[DataLoader] 공통 데이터 로드 중 오류 발생 (무시하고 진행): {e}") def load_data_from_db(self, start_date="2018-01-01") -> pd.DataFrame: - conn = get_db_conn(self.db_name) - query = f""" - SELECT date, ticker, open, high, low, close, volume, adjusted_close + """ + 1. 공통 데이터를 먼저 로드합니다. + 2. 전체 종목의 주가 데이터(Price Data)를 대량으로 조회합니다. + """ + # 1. 공통 데이터 준비 + self._prepare_common_data(start_date) + + # 2. 주가 데이터 Bulk Load + print(f"[DataLoader] {start_date} 부터 전체 주가 데이터를 조회합니다...") + engine = get_engine(self.db_name) + + # 거래대금(amount) 포함 + query = text(""" + SELECT date, ticker, open, high, low, close, volume, adjusted_close, amount FROM public.price_data - WHERE date >= '{start_date}' + WHERE date >= :start_date ORDER BY ticker, date ASC - """ - df = pd.read_sql(query, conn) - conn.close() - df['date'] = pd.to_datetime(df['date']) - return df + """) + + with engine.connect() as conn: + df_price = pd.read_sql(query, conn, params={"start_date": start_date}) + + if not df_price.empty: + df_price['date'] = pd.to_datetime(df_price['date']) + # 수정주가(adjusted_close) 우선 사용 + if 'adjusted_close' in df_price.columns: + df_price['close'] = df_price['adjusted_close'].fillna(df_price['close']) + + print(f"[DataLoader] 주가 데이터 로드 완료: {len(df_price)} rows") + return df_price def create_dataset(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, Dict]: - X_ts_list = [] - X_ticker_list = [] - X_sector_list = [] - y_class_list = [] # 이제 여기가 2차원 리스트가 됨 - y_reg_list = [] - - # ----------------------------------------------------------- - # [설정] Multi-Horizon (1, 3, 5, 7일 뒤 예측) - # ----------------------------------------------------------- - HORIZONS = [1, 3, 5, 7] - max_horizon = max(HORIZONS) - print(f"🎯 예측 목표: {HORIZONS}일 뒤의 등락을 동시 예측") - + """ + 로드된 주가 데이터(df)를 순회하며: + 1. 공통 데이터(Macro, Breadth) 병합 + 2. 개별 데이터(News, Fundamental) 조회 및 병합 + 3. 기술적 지표 생성 및 스케일링 + 4. 시퀀스 데이터셋 생성 (X, y) + """ + X_ts_list, X_ticker_list, X_sector_list = [], [], [] + y_class_list, y_reg_list = [], [] + + # 동적으로 설정된 호라이즌 사용 + max_horizon = max(self.horizons) + tickers = df['ticker'].unique() - print(f"[DataLoader] {len(tickers)}개 종목에 대해 Feature Engineering 시작...") - + print(f"🎯 예측 목표: {self.horizons}일 뒤 등락 동시 예측 (Max Horizon: {max_horizon}일)") + processed_dfs = [] - for ticker in tqdm(tickers, desc="Adding Indicators"): + + # ------------------------------------------------------------------------- + # Step 1: 종목별 데이터 병합 및 전처리 (Merging & Feature Engineering) + # ------------------------------------------------------------------------- + for ticker in tqdm(tickers, desc="Processing Tickers"): + # 해당 종목 데이터 추출 sub_df = df[df['ticker'] == ticker].copy().sort_values('date') - if len(sub_df) < 200: continue - if sub_df['close'].std() == 0: continue # 좀비 데이터 제거 + # 데이터 최소 길이 확인 (lookback + max_horizon 보다 작으면 시퀀스 생성 불가) + if len(sub_df) <= self.lookback + max_horizon: continue + if sub_df['close'].std() == 0: continue # 변동성 없는 데이터 제외 - sub_df = add_technical_indicators(sub_df) + # [Merge 1] 공통 데이터 병합 (Left Join) + if not self.macro_df.empty: + sub_df = pd.merge(sub_df, self.macro_df, on='date', how='left') + if not self.breadth_df.empty: + sub_df = pd.merge(sub_df, self.breadth_df, on='date', how='left') + + # [Merge 2] 개별 데이터 조회 및 병합 (News, Fundamentals) try: - sub_df = add_multi_timeframe_features(sub_df) + # 2-1. 뉴스 심리 (종목별) + df_news = fetch_news_sentiment(ticker, sub_df['date'].min().strftime('%Y-%m-%d'), self.db_name) + if not df_news.empty: + sub_df = pd.merge(sub_df, df_news, on='date', how='left') + sub_df[['sentiment_score', 'risk_keyword_cnt']] = sub_df[['sentiment_score', 'risk_keyword_cnt']].fillna(0) + + # 2-2. 펀더멘털 (종목별) - ffill 사용 + df_fund = fetch_fundamentals(ticker, self.db_name) + if not df_fund.empty: + sub_df = pd.merge(sub_df, df_fund, on='date', how='left') + fund_cols = ['per', 'pbr', 'roe', 'debt_ratio'] + cols_to_fill = [c for c in fund_cols if c in sub_df.columns] + sub_df[cols_to_fill] = sub_df[cols_to_fill].ffill().fillna(0) + except Exception: + pass # 부가 데이터 로드 실패 시 무시하고 진행 + + # [Preprocessing] 결측치 보간 (Macro 주말 데이터 등) + sub_df = sub_df.ffill().bfill() + + # [Feature Engineering] 기술적 지표 생성 + try: + sub_df = add_technical_indicators(sub_df) except Exception: continue processed_dfs.append(sub_df) - if not processed_dfs: raise ValueError("[Error] 유효한 데이터가 없습니다!") + if not processed_dfs: + raise ValueError("[Error] 전처리된 유효 데이터가 없습니다.") + full_df = pd.concat(processed_dfs) + full_df['raw_close'] = full_df['close'] # 스케일링 전 원본 가격 보존 - full_df['raw_close'] = full_df['close'] - - feature_cols = [ - 'log_return', - 'open_ratio', 'high_ratio', 'low_ratio', - 'vol_change', - 'ma5_ratio', 'ma20_ratio', 'ma60_ratio', - 'rsi', - 'macd_ratio', - 'bb_position', - 'week_ma20_ratio', 'week_rsi', 'week_bb_pos', 'week_vol_change', - 'month_ma12_ratio', 'month_rsi' + # 사용 가능한 Feature 자동 감지 + potential_features = [ + # 1. Technical + 'log_return', 'open_ratio', 'high_ratio', 'low_ratio', 'vol_change', + 'ma_5_ratio', 'ma_20_ratio', 'ma_60_ratio', + 'rsi', 'macd_ratio', 'bb_position', + # 2. Macro (New) + 'us10y', 'yield_spread', 'vix_close', 'dxy_close', 'credit_spread_hy', + # 3. Breadth (New) + 'advance_decline_ratio', 'fear_greed_index', + # 4. Sentiment (New) + 'sentiment_score', 'risk_keyword_cnt', + # 5. Fundamental (New) + 'per', 'pbr', 'roe' ] - available_cols = [c for c in feature_cols if c in full_df.columns] + available_cols = [c for c in potential_features if c in full_df.columns] full_df = full_df.dropna(subset=available_cols) - print(f">> 데이터 스케일링 중... (Features: {len(available_cols)}개)") + print(f">> Scaling Features: {len(available_cols)} columns selected") + print(f" (Included: {available_cols})") + + # Scaling (전체 데이터 기준 fitting) full_df[available_cols] = self.scaler.fit_transform(full_df[available_cols]) - print(">> 시퀀스 및 라벨 생성 중...") - - debug_printed = False + # ------------------------------------------------------------------------- + # Step 2: 시퀀스 생성 (Sequencing) + # ------------------------------------------------------------------------- + print(">> Generating Sequences & Labels...") + # 속도 최적화를 위해 numpy 변환 후 루프 수행 + # (종목별로 group하여 처리) for ticker in tqdm(full_df['ticker'].unique(), desc="Sequencing"): sub_df = full_df[full_df['ticker'] == ticker] - # 데이터가 (lookback + 가장 먼 미래) 보다 많아야 함 + + # 시퀀스 생성 가능 길이 확인 (위에서 했지만 dropna 등으로 줄어들었을 수 있으므로 재확인) if len(sub_df) <= self.lookback + max_horizon: continue + # 메타데이터 매핑 t_id = self.ticker_to_id.get(ticker, 0) s_id = self.ticker_sector_map.get(ticker, 0) - values = sub_df[available_cols].values - raw_closes = sub_df['raw_close'].values + # Numpy 변환 (속도 최적화) + feature_vals = sub_df[available_cols].values + raw_closes = sub_df['raw_close'].values - if not debug_printed: - print(f"\n[DEBUG Sample] Ticker: {ticker}") - print(f" - Raw Closes (First 5): {raw_closes[:5]}") - debug_printed = True - - # 루프 범위: 끝에서 max_horizon 만큼은 정답을 알 수 없으므로 제외 + # 루프 범위 계산: 마지막 데이터에서 max_horizon 만큼은 정답을 알 수 없음 num_samples = len(sub_df) - self.lookback - max_horizon + 1 if num_samples <= 0: continue for i in range(num_samples): - window = values[i : i + self.lookback] - curr_raw = raw_closes[i + self.lookback - 1] + # X: 과거 데이터 Window (Sequence) + window = feature_vals[i : i + self.lookback] - # [핵심] 1, 3, 5, 7일 뒤 정답을 모두 구해서 리스트로 만듦 - multi_labels = [] + # y: 미래 예측 (Multi-Horizon) + curr_price = raw_closes[i + self.lookback - 1] - for h in HORIZONS: - next_raw = raw_closes[i + self.lookback + h - 1] - threshold = 0.000 - - if curr_raw == 0: - label = 0 - else: - label = 1 if next_raw > curr_raw * (1 + threshold) else 0 + multi_labels = [] + # 동적 horizons 변수를 사용하여 라벨 생성 + for h in self.horizons: + future_price = raw_closes[i + self.lookback + h - 1] + # 등락 라벨 (1: 상승, 0: 하락/보합) + label = 1 if future_price > curr_price else 0 multi_labels.append(label) - # 회귀 라벨은 대표값(가장 먼 7일) 하나만 씀 (여기선 분류가 메인이므로) - label_reg = 0.0 - + # Regression Target (예시: 가장 먼 미래 수익률) + label_reg = 0.0 + if curr_price != 0: + label_reg = (raw_closes[i + self.lookback + max_horizon - 1] - curr_price) / curr_price + X_ts_list.append(window) X_ticker_list.append(t_id) X_sector_list.append(s_id) - y_class_list.append(multi_labels) # [0, 1, 1, 0] 형태 저장 + y_class_list.append(multi_labels) y_reg_list.append(label_reg) + # 결과 변환 X_ts = np.array(X_ts_list) X_ticker = np.array(X_ticker_list) X_sector = np.array(X_sector_list) - y_class = np.array(y_class_list) # Shape: (N, 4) + y_class = np.array(y_class_list) # Shape: (N, len(horizons)) y_reg = np.array(y_reg_list) info = { "n_tickers": len(self.ticker_to_id), "n_sectors": len(self.sector_to_id), - "scaler": self.scaler, + "feature_names": available_cols, "n_features": len(available_cols), - "horizons": HORIZONS # 메타데이터에 추가 + "horizons": self.horizons, # 메타데이터에 사용된 horizons 기록 + "scaler": self.scaler } + print(f"[Dataset Ready] Samples: {len(y_class)}, Features: {len(available_cols)}") return X_ts, X_ticker, X_sector, y_class, y_reg, info \ No newline at end of file diff --git a/schema.sql b/schema.sql index 967e5ede..e58ee7e1 100644 --- a/schema.sql +++ b/schema.sql @@ -1,238 +1,268 @@ ---------------------------------------------------------------------- --- Schema: public --- - 퀀트 트레이딩 / 백테스트 / XAI / 포트폴리오 관리용 메인 스키마 +-- Schema: public / neon_auth +-- - 퀀트 트레이딩, 백테스트, XAI 분석 및 포트폴리오 관리 시스템의 핵심 데이터 모델 ---------------------------------------------------------------------- -CREATE SCHEMA "public"; -CREATE SCHEMA "neon_auth"; +CREATE SCHEMA IF NOT EXISTS "public"; +CREATE SCHEMA IF NOT EXISTS "neon_auth"; ---------------------------------------------------------------------- -- 1. price_data --- - 개별 종목의 일별 시세(OHLCV)를 저장하는 기본 원천 테이블 --- - 모든 시계열 데이터의 기준 축 (기술지표, 펀더멘털, 공매도 등) --- - (date, ticker) 기준으로 하루 1행 보장 +-- - 개별 주식 종목의 일별 시세(OHLCV) 및 거래 정보를 저장 +-- - 기술적 분석 및 모델 Feature 엔지니어링의 기본 원천 데이터 ---------------------------------------------------------------------- - CREATE TABLE "price_data" ( -"adjusted_close" numeric(38, 2), -- 조정 종가 (배당/액분/병합 반영) -"close" numeric(38, 2), -- 종가 -"date" date, -- 거래일 기준 날짜 -"high" numeric(38, 2), -- 고가 -"low" numeric(38, 2), -- 저가 -"open" numeric(38, 2), -- 시가 -"volume" bigint, -- 거래량 (체결 주식 수) -"ticker" varchar(255), -- 종목 티커 (확장성 고려, 자르지 않음) -CONSTRAINT "price_data_pkey" PRIMARY KEY("date","ticker") + "adjusted_close" numeric(38, 2), -- 조정 종가 (배당, 액면분할 등이 반영된 주가) + "close" numeric(38, 2), -- 당일 종가 + "date" date, -- 거래 일자 + "high" numeric(38, 2), -- 당일 고가 + "low" numeric(38, 2), -- 당일 저가 + "open" numeric(38, 2), -- 당일 시가 + "volume" bigint, -- 일일 거래량 (주식 수) + "ticker" varchar(255), -- 종목 티커 심볼 + "amount" numeric(38, 2), -- 일일 거래대금 + CONSTRAINT "price_data_pkey" PRIMARY KEY("date", "ticker") ); ---------------------------------------------------------------------- --- 2. technical_indicators --- - price_data 기반으로 계산된 기술적 지표 저장 --- - ticker + date 기준으로 price_data와 1:1 매핑 --- - 모델 feature 소스로 활용 ----------------------------------------------------------------------- - -CREATE TABLE "technical_indicators" ( -"ticker" varchar(10), -- 종목 티커 -"date" date, -- 기준 날짜 -"rsi" numeric(18, 6), -- RSI -"macd" numeric(18, 6), -- MACD -"bollinger_bands_upper" numeric(18, 6), -- 볼린저 밴드 상단 -"bollinger_bands_lower" numeric(18, 6), -- 볼린저 밴드 하단 -"atr" numeric(18, 6), -- Average True Range -"obv" numeric(18, 6), -- On-Balance Volume -"stochastic" numeric(18, 6), -- 스토캐스틱 오실레이터 -"mfi" numeric(18, 6), -- Money Flow Index -"ma_5" numeric(18, 6), -- 5일 이동평균 -"ma_20" numeric(18, 6), -- 20일 이동평균 -"ma_50" numeric(18, 6), -- 50일 이동평균 -"ma_200" numeric(18, 6), -- 200일 이동평균 -CONSTRAINT "technical_indicators_pkey" PRIMARY KEY("ticker","date") +-- 2. crypto_price_data +-- - 암호화폐(가상자산)의 시계열 가격 및 마켓 데이터를 저장 +-- - 주식 데이터와 분리하여 자산군별 특화 분석 수행 +---------------------------------------------------------------------- +CREATE TABLE "crypto_price_data" ( + "ticker" varchar(20), -- 코인/토큰 티커 + "date" timestamp, -- 거래 시점 (타임스탬프) + "open" numeric(38, 8), -- 시가 + "high" numeric(38, 8), -- 고가 + "low" numeric(38, 8), -- 저가 + "close" numeric(38, 8), -- 종가 + "volume" numeric(38, 8), -- 거래량 + "market_cap" numeric(38, 2), -- 시가총액 + CONSTRAINT "crypto_price_data_pkey" PRIMARY KEY("date", "ticker") ); ---------------------------------------------------------------------- -- 3. macroeconomic_indicators --- - 날짜 단위 거시경제 지표 저장 --- - 모든 종목에 공통 적용되는 글로벌 변수 --- - price_data와는 date 기준으로 조인 +-- - 국가별 거시경제 지표 및 시장 위험 지표(VIX 등) 저장 +-- - 시장 국면(Regime) 판단 및 멀티모달 AI 모델의 외부 변수로 활용 ---------------------------------------------------------------------- - CREATE TABLE "macroeconomic_indicators" ( -"date" date PRIMARY KEY, -- 기준 날짜 (발표일 등) -"cpi" numeric(18, 2), -- 소비자물가지수 (CPI) -"gdp" numeric(18, 2), -- GDP -"ppi" numeric(18, 2), -- 생산자물가지수 (PPI) -"jolt" numeric(18, 2), -- JOLTs -"cci" numeric(18, 2), -- 소비자신뢰지수 -"interest_rate" numeric(18, 2), -- 기준금리 / 연방기금금리 -"trade_balance" numeric(18, 2), -- 무역수지 -"core_cpi" numeric, -- 근원 CPI -"real_gdp" numeric, -- 실질 GDP -"unemployment_rate" numeric, -- 실업률 -"consumer_sentiment" numeric, -- 소비자심리지수 -"ff_targetrate_upper" numeric, -- FOMC 목표금리 상단 -"ff_targetrate_lower" numeric, -- FOMC 목표금리 하단 -"pce" numeric, -- 개인소비지출 물가지수 -"core_pce" numeric, -- 근원 PCE -"tradebalance_goods" numeric, -- 상품 무역수지 -"trade_import" numeric, -- 수입 -"trade_export" numeric -- 수출 + "date" date PRIMARY KEY, -- 지표 발표 또는 기준 일자 + "cpi" numeric(18, 2), -- 소비자물가지수 (CPI) + "gdp" numeric(18, 2), -- 국내총생산 (GDP) + "ppi" numeric(18, 2), -- 생산자물가지수 (PPI) + "jolt" numeric(18, 2), -- 구인/이직 보고서 (JOLTs) 수치 + "cci" numeric(18, 2), -- 소비자신뢰지수 (CCI) + "interest_rate" numeric(18, 2), -- 기준 금리 + "trade_balance" numeric(18, 2), -- 무역수지 + "core_cpi" numeric, -- 근원 소비자물가지수 + "real_gdp" numeric, -- 실질 GDP + "unemployment_rate" numeric, -- 실업률 + "consumer_sentiment" numeric, -- 소비자심리지수 + "ff_targetrate_upper" numeric, -- 연방기금금리 상단 목표치 + "ff_targetrate_lower" numeric, -- 연방기금금리 하단 목표치 + "pce" numeric, -- 개인소비지출(PCE) 물가지수 + "core_pce" numeric, -- 근원 PCE + "tradebalance_goods" numeric, -- 상품무역수지 + "trade_import" numeric, -- 수입액 + "trade_export" numeric, -- 수출액 + "us10y" numeric(10, 4), -- 미국채 10년물 금리 + "us2y" numeric(10, 4), -- 미국채 2y 금리 + "yield_spread" numeric(10, 4), -- 장단기 금리차 (10Y-2Y) + "vix_close" numeric(10, 2), -- 변동성 지수(VIX) 종가 + "dxy_close" numeric(10, 4), -- 달러 인덱스 종가 + "wti_price" numeric(10, 2), -- WTI 유가 + "gold_price" numeric(10, 2), -- 국제 금 가격 + "credit_spread_hy" numeric(10, 4) -- 하이일드 채권 신용 스프레드 ); ---------------------------------------------------------------------- -- 4. company_fundamentals --- - 개별 종목의 펀더멘털(재무제표 수치) 저장 --- - 분기/연간 데이터를 특정 date에 매핑 --- - price_data와 결합해 펀더멘털 + 가격 분석 +-- - 종목별 재무제표 수치 및 주요 투자 보조 지표 저장 +-- - 가치 투자 전략 및 퀀트 팩터 모델의 핵심 소스 ---------------------------------------------------------------------- - CREATE TABLE "company_fundamentals" ( -"ticker" varchar(10), -- 종목 티커 -"date" date, -- 기준 날짜 (보고서/발표 기준) -"revenue" numeric(30, 6), -- 매출액 -"net_income" numeric(30, 6), -- 순이익 -"total_assets" numeric(30, 6), -- 총자산 -"total_liabilities" numeric(30, 6), -- 총부채 -"equity" numeric(30, 6), -- 자본 -"eps" numeric(18, 6), -- 주당순이익 (EPS) -"pe_ratio" numeric(18, 6), -- 주가수익비율 (P/E) -CONSTRAINT "company_fundamentals_pkey" PRIMARY KEY("ticker","date") + "ticker" varchar(255), -- 종목 티커 + "date" date, -- 데이터 기준일 (분기/연간 보고서 발표일 등) + "revenue" numeric(30, 6), -- 매출액 + "net_income" numeric(30, 6), -- 당기순이익 + "total_assets" numeric(30, 6), -- 총자산 + "total_liabilities" numeric(30, 6),-- 총부채 + "equity" numeric(30, 6), -- 자본총계 + "eps" numeric(18, 6), -- 주당순이익 (EPS) + "per" numeric(18, 6), -- 주가수익비율 (PER) + "pbr" numeric(18, 6), -- 주가순자산비율 (PBR) + "roe" numeric(18, 6), -- 자기자본이익률 (ROE) + "debt_ratio" numeric(18, 6), -- 부채비율 + "operating_cash_flow" numeric(30, 6), -- 영업활동현금흐름 + "interest_coverage" numeric(10, 2);, -- 이자보상배율 + CONSTRAINT "company_fundamentals_pkey" PRIMARY KEY("ticker","date") ); ---------------------------------------------------------------------- --- 5. short_interest --- - 공매도 관련 데이터 저장 --- - 수급/포지셔닝 보조 지표 +-- 5. Market Breadth Stats +-- - 전 종목 대상 통계 데이터 저장 +-- - NH-NL (신고가-신저가), MA200 상회 비율 등 ---------------------------------------------------------------------- - -CREATE TABLE "short_interest" ( -"ticker" varchar(10), -- 종목 티커 -"date" date, -- 기준 날짜 -"short_interest" double precision, -- 공매도 비율 또는 잔고 -"short_volume" bigint, -- 공매도 거래량 -CONSTRAINT "short_interest_pkey" PRIMARY KEY("ticker","date") +CREATE TABLE IF NOT EXISTS "market_breadth" ( + "date" date PRIMARY KEY, + + -- 1. 신고가 - 신저가 지수 (Net High - Net Low) + -- 52주 신고가 종목 수 - 52주 신저가 종목 수 + "nh_nl_index" integer, + + -- 2. 200일 이동평균선 상회 비율 (Market Momemtum) + -- (현재가 > MA200 인 종목 수) / 전체 종목 수 * 100 + "ma200_pct" numeric(5, 2), + + "created_at" timestamp DEFAULT now() ); +-- 인덱스 +CREATE INDEX IF NOT EXISTS "idx_market_breadth_date" ON "market_breadth" ("date"); + ---------------------------------------------------------------------- --- 6. xai_reports --- - XAI 모듈이 생성한 설명 가능한 자연어 리포트 저장 --- - (ticker, date, signal) 기준으로 1개 리포트만 유지 +-- 6. news_sentiment +-- - 비정형 데이터(뉴스/기사)를 분석한 정량적 감성 점수 저장 +-- - 시장 심리 및 이벤트 드리븐 전략에 활용 ---------------------------------------------------------------------- +CREATE TABLE "news_sentiment" ( + "id" bigserial PRIMARY KEY, -- 감성 분석 로그 ID + "date" date NOT NULL, -- 뉴스 기준 날짜 + "ticker" varchar(20) NOT NULL, -- 관련 종목 티커 + "sentiment_score" numeric(5, 4), -- 감성 점수 (-1 ~ 1) + "impact_score" numeric(5, 4), -- 시장 영향력 점수 + "risk_keyword_cnt" integer, -- 위험 키워드 빈도수 + "article_count" integer, -- 분석된 기사 수 + "created_at" timestamp DEFAULT now() -- 분석 기록 생성 시각 +); +---------------------------------------------------------------------- +-- 7. xai_reports +-- - AI 모델이 생성한 의사결정 근거(텍스트)와 매매 신호를 저장 +-- - 투자자가 모델의 판단을 이해할 수 있도록 설명 가능성(Explainability) 제공 +---------------------------------------------------------------------- CREATE TABLE "xai_reports" ( -"id" bigserial PRIMARY KEY, -- XAI 리포트 고유 ID -"ticker" varchar(255) NOT NULL, -- 종목 티커 -"signal" varchar(255) NOT NULL, -- 신호 (BUY / SELL / HOLD) -"price" numeric(38, 2) NOT NULL, -- 신호 발생 시점 가격 -"date" date NOT NULL, -- 신호 발생 날짜 -"report" text, -- 자연어 설명 리포트 -"created_at" timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL, -- 생성 시각 -"run_id" varchar(64), -- 생성된 실행(run) 식별자 -CONSTRAINT "uq_xai_reports_ticker_date_signal" -UNIQUE("ticker","date","signal"), -CONSTRAINT "ck_xai_reports_signal" -CHECK ((signal)::text = ANY (ARRAY['BUY','SELL','HOLD'])) + "id" bigserial PRIMARY KEY, -- XAI 리포트 고유 ID + "ticker" varchar(255) NOT NULL, -- 종목 티커 + "signal" varchar(255) NOT NULL, -- 발생 신호 (BUY, SELL, HOLD) + "price" numeric(38, 2) NOT NULL, -- 신호 발생 당시 가격 + "date" date NOT NULL, -- 신호 발생 날짜 + "report" text, -- LLM/XAI 기반 자연어 리포트 본문 + "created_at" timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL, -- 리포트 생성 시각 + "run_id" varchar(64), -- 분석 실행 프로세스 식별 ID + CONSTRAINT "uq_xai_reports_ticker_date_signal" UNIQUE("ticker","date","signal"), + CONSTRAINT "ck_xai_reports_signal" CHECK (signal = ANY (ARRAY['BUY', 'SELL', 'HOLD'])) ); ---------------------------------------------------------------------- --- 7. executions --- - 백테스트 / 실험 실행 시 발생한 체결 로그 --- - 전략 검증, 성과 분석의 핵심 데이터 +-- 8. executions +-- - 실제 거래 또는 백테스트 시뮬레이션에서 발생한 개별 체결 이력 +-- - 자산 추적 및 성과 평가를 위한 가장 세밀한 로그 데이터 ---------------------------------------------------------------------- - CREATE TABLE "executions" ( -"id" bigserial PRIMARY KEY, -- 체결 로그 ID -"run_id" varchar(64), -- 실행 ID -"ticker" varchar(255) NOT NULL, -- 종목 티커 -"signal_date" date NOT NULL, -- 신호 발생일 -"signal_price" numeric(38, 2), -- 신호 당시 가격 -"signal" varchar(255) NOT NULL, -- 신호 타입 -"fill_date" date NOT NULL, -- 체결일 -"fill_price" numeric(38, 2) NOT NULL, -- 체결가 -"qty" integer NOT NULL, -- 체결 수량 -"side" varchar(255) NOT NULL, -- BUY / SELL -"value" numeric(38, 2) NOT NULL, -- 체결 금액 -"commission" numeric(38, 2) NOT NULL, -- 수수료 -"cash_after" numeric(38, 2) NOT NULL, -- 체결 후 현금 -"position_qty" integer NOT NULL, -- 체결 후 보유 수량 -"avg_price" numeric(38, 2) NOT NULL, -- 평균 매입가 -"pnl_realized" numeric(38, 2) NOT NULL, -- 실현 손익 -"pnl_unrealized" numeric(38, 2) NOT NULL, -- 미실현 손익 -"created_at" timestamp with time zone DEFAULT now() NOT NULL, -- 기록 시각 -"xai_report_id" bigint -- 연결된 XAI 리포트 ID + "id" bigserial PRIMARY KEY, -- 체결 로그 ID + "run_id" varchar(64), -- 백테스트/실행 회차 ID + "ticker" varchar(255) NOT NULL, -- 종목 티커 + "signal_date" date NOT NULL, -- 전략 신호 발생일 + "signal_price" numeric(38, 2), -- 전략 신호 당시 가격 + "signal" varchar(255) NOT NULL, -- 발생한 신호 타입 + "fill_date" date NOT NULL, -- 실제 주문 체결일 + "fill_price" numeric(38, 2) NOT NULL, -- 실제 체결 가격 + "qty" integer NOT NULL, -- 체결 수량 + "side" varchar(255) NOT NULL, -- 매수/매도 구분 (BUY/SELL) + "value" numeric(38, 2) NOT NULL, -- 총 체결 금액 + "commission" numeric(38, 2) NOT NULL, -- 거래 수수료 + "cash_after" numeric(38, 2) NOT NULL, -- 체결 후 잔여 현금 + "position_qty" integer NOT NULL, -- 체결 후 총 보유 수량 + "avg_price" numeric(38, 2) NOT NULL, -- 체결 후 평균 매입가 + "pnl_realized" numeric(38, 2) NOT NULL, -- 해당 거래로 확정된 실현 손익 + "pnl_unrealized" numeric(38, 2) NOT NULL, -- 현재 시점 기준 미실현 손익 + "created_at" timestamp with time zone DEFAULT now() NOT NULL, -- DB 기록 시각 + "xai_report_id" bigint, -- 연관된 XAI 리포트 참조 ID + CONSTRAINT "fk_executions_xai_reports" FOREIGN KEY ("xai_report_id") REFERENCES "xai_reports"("id") ON DELETE SET NULL ); -ALTER TABLE "executions" -ADD CONSTRAINT "fk_executions_xai_reports" -FOREIGN KEY ("xai_report_id") -REFERENCES "xai_reports"("id"); - ---------------------------------------------------------------------- --- 8. portfolio_positions --- - 현재 시점 기준 종목별 포지션 스냅샷 --- - 실시간/웹 조회용 +-- 9. portfolio_summary +-- - 전체 자산의 일별 성과 요약 (Equity Curve 생성용) ---------------------------------------------------------------------- - -CREATE TABLE "portfolio_positions" ( -"id" bigserial PRIMARY KEY, -"ticker" varchar(20) NOT NULL, -- 종목 티커 -"position_qty" integer NOT NULL, -- 보유 수량 -"avg_price" numeric(18, 6) NOT NULL, -- 평균 매입가 -"current_price" numeric(18, 6) NOT NULL, -- 현재가 -"market_value" numeric(20, 6) NOT NULL, -- 평가금액 -"pnl_unrealized" numeric(20, 6) NOT NULL, -- 미실현 손익 -"pnl_realized_cum" numeric(20, 6) NOT NULL, -- 누적 실현 손익 -"updated_at" timestamp with time zone DEFAULT now() -- 갱신 시각 +CREATE TABLE "portfolio_summary" ( + "date" date PRIMARY KEY, -- 기준 날짜 + "total_asset" numeric(20, 6) NOT NULL, -- 총 자산 (현금 + 평가금액) + "cash" numeric(20, 6) NOT NULL, -- 보유 현금 + "market_value" numeric(20, 6) NOT NULL, -- 보유 주식 총 평가금액 + "pnl_unrealized" numeric(20, 6) NOT NULL, -- 전체 미실현 손익 + "pnl_realized_cum" numeric(20, 6) NOT NULL, -- 누적 확정 실현 손익 + "initial_capital" numeric(20, 6) NOT NULL, -- 투자 원금 + "return_rate" numeric(10, 6) NOT NULL, -- 누적 수익률 + "created_at" timestamp with time zone DEFAULT now() -- 기록 시각 ); ---------------------------------------------------------------------- --- 9. portfolio_summary --- - 계좌 전체 기준 일별 요약 (equity curve) +-- 10. stock_info / company_names +-- - 종목의 기본 정보 및 메타데이터 관리 ---------------------------------------------------------------------- +CREATE TABLE "company_names" ( + "company_name" varchar(100) PRIMARY KEY, -- 기업 한글/영문 정식 명칭 + "ticker" varchar(255) NOT NULL UNIQUE -- 종목 티커 +); -CREATE TABLE "portfolio_summary" ( -"date" date PRIMARY KEY, -- 기준 날짜 -"total_asset" numeric(20, 6) NOT NULL,-- 총 자산 -"cash" numeric(20, 6) NOT NULL, -- 현금 -"market_value" numeric(20, 6) NOT NULL,-- 평가금액 합 -"pnl_unrealized" numeric(20, 6) NOT NULL,-- 미실현 손익 -"pnl_realized_cum" numeric(20, 6) NOT NULL,-- 누적 실현 손익 -"initial_capital" numeric(20, 6) NOT NULL,-- 시작 자본 -"return_rate" numeric(10, 6) NOT NULL,-- 수익률 -"created_at" timestamp with time zone DEFAULT now() -- 기록 시각 +CREATE TABLE "stock_info" ( + "ticker" varchar(20) PRIMARY KEY, -- 종목 티커 + "sector" varchar(100), -- 섹터 분류 (예: IT, 금융) + "industry" varchar(200), -- 세부 산업 분류 + "market_cap" bigint, -- 시가총액 (필터링/비중 계산용) + "updated_at" timestamp DEFAULT now() -- 정보 갱신 일시 ); ----------------------------------------------------------------------- --- 10. stock_info --- - 종목의 섹터, 산업, 시가총액 등 정적인 정보를 저장 --- - 종목별 기본 정보를 제공하고, 필터링 및 분석에 활용 +---------------------------------------------------------------------- +-- 11. event_calendar +-- - 주요 경제 일정(FOMC, CPI, GDP) 및 기업 실적 발표일 저장 +-- - AI 모델의 'Event' 피처(D-Day 계산 등)를 위한 원천 데이터 ---------------------------------------------------------------------- -CREATE TABLE public.stock_info ( - ticker VARCHAR(20) PRIMARY KEY, -- 종목 티커 - sector VARCHAR(100), -- 대분류 (예: Technology) - industry VARCHAR(200), -- 세부 분류 (예: Consumer Electronics) - market_cap BIGINT, -- 시가총액 (필터링용) - updated_at TIMESTAMP DEFAULT NOW() -- 마지막 갱신 시각 +CREATE TABLE IF NOT EXISTS "event_calendar" ( + "id" bigserial PRIMARY KEY, + "event_date" date NOT NULL, -- 이벤트 예정일 (YYYY-MM-DD) + "event_type" varchar(50) NOT NULL, -- 이벤트 타입 ('FOMC', 'CPI', 'EARNINGS', 'GDP' 등) + "ticker" varchar(20), -- 관련 티커 (거시지표는 'MACRO' 저장) + "description" text, -- 상세 설명 (예: 'FOMC Rate Decision', 'AAPL Earnings') + "forecast" numeric(18, 5), -- 시장 예측치 (Consensus) + "actual" numeric(18, 5), -- 실제 발표치 + "created_at" timestamp DEFAULT now(), -- 레코드 생성 시각 + + -- 중복 방지: 같은 날짜, 같은 타입, 같은 대상(티커)의 이벤트는 중복될 수 없음 + CONSTRAINT "uq_event_calendar" UNIQUE ("event_date", "event_type", "ticker") ); ---------------------------------------------------------------------- --- 11. neon_auth.users_sync --- - Neon 인증 시스템과 동기화된 사용자 정보 --- - raw_json 기반으로 주요 필드를 generated column으로 추출 +-- 12. sector_returns +-- - stock_info의 'sector'와 매핑되는 ETF의 일별 수익률 저장 +-- - Wide Format이 아닌 Long Format (Date, Sector) 구조 ---------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS "sector_returns" ( + "date" date NOT NULL, -- 기준 일자 + "sector" varchar(100) NOT NULL, -- 섹터명 (stock_info의 sector와 일치, 예: 'Technology') + "etf_ticker" varchar(20), -- 대표 ETF 티커 (예: 'XLK') + "return" numeric(10, 6), -- 일일 등락률 (0.0123 = 1.23%) + "close" numeric(10, 2), -- ETF 종가 (참고용) + "created_at" timestamp DEFAULT now(), + + -- PK: 날짜와 섹터의 조합은 유일해야 함 + CONSTRAINT "pk_sector_returns" PRIMARY KEY ("date", "sector") +); +---------------------------------------------------------------------- +-- 13. neon_auth.users_sync +-- - 인증 서비스(Neon/Clerk 등)와 동기화된 사용자 데이터 정보 +---------------------------------------------------------------------- CREATE TABLE "neon_auth"."users_sync" ( -"raw_json" jsonb NOT NULL, -- 원본 사용자 JSON -"id" text PRIMARY KEY GENERATED ALWAYS AS ((raw_json ->> 'id')) STORED, -"name" text GENERATED ALWAYS AS ((raw_json ->> 'display_name')) STORED, -"email" text GENERATED ALWAYS AS ((raw_json ->> 'primary_email')) STORED, -"created_at" timestamp with time zone -GENERATED ALWAYS AS ( -to_timestamp( -trunc(((raw_json ->> 'signed_up_at_millis')::bigint)::double precision / 1000) -) -) STORED, -"updated_at" timestamp with time zone, -"deleted_at" timestamp with time zone -); + "raw_json" jsonb NOT NULL, -- 인증 서버에서 전달받은 원본 JSON 데이터 + "id" text PRIMARY KEY GENERATED ALWAYS AS ((raw_json ->> 'id')) STORED, -- 사용자 UUID + "name" text GENERATED ALWAYS AS ((raw_json ->> 'display_name')) STORED, -- 사용자 이름 + "email" text GENERATED ALWAYS AS ((raw_json ->> 'primary_email')) STORED, -- 사용자 이메일 + "created_at" timestamp with time zone GENERATED ALWAYS AS (to_timestamp((trunc((((raw_json ->> 'signed_up_at_millis'::text))::bigint)::double precision) / (1000)::double precision))) STORED, -- 가입 시각 + "updated_at" timestamp with time zone, -- 정보 수정 시각 + "deleted_at" timestamp with time zone -- 계정 삭제 시각 (Soft Delete) +); \ No newline at end of file