Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 111 additions & 48 deletions AI/libs/database/fetcher.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,133 @@
# AI/libs/database/fetcher.py
from __future__ import annotations
from typing import Optional
#AI/libs/database/fetcher.py
from typing import Dict
import pandas as pd
from sqlalchemy import text

# DB용 유틸: SQLAlchemy Engine 생성 함수 사용 (get_engine)
from .connection import get_engine

def fetch_ohlcv(
def fetch_price_data(
ticker: str,
start: str,
end: str,
interval: str = "1d",
db_name: str = "db",
start_date: str,
db_name: str = "db"
) -> pd.DataFrame:
"""
특정 티커, 날짜 범위의 OHLCV 데이터를 DB에서 불러오기

Args:
ticker (str): 종목 코드 (예: "AAPL")
start (str): 시작일자 'YYYY-MM-DD'
end (str): 종료일자 'YYYY-MM-DD'
interval (str): 데이터 간격 (현재 일봉만 지원)
db_name (str): DB 설정 이름

Returns:
pd.DataFrame: [ticker, date, open, high, low, close, adjusted_close, volume]
[기본] 종목별 시세 데이터 조회 (Price Data)
"""

engine = get_engine(db_name)

# adjusted_close가 중요하다면 쿼리 단계에서 확실히 가져옵니다.
query = text("""
SELECT ticker, date, open, high, low, close, adjusted_close, volume
SELECT date, ticker, open, high, low, close, adjusted_close, volume, amount
FROM public.price_data
WHERE ticker = :ticker
AND date BETWEEN :start AND :end
ORDER BY date;
AND date >= :start_date
ORDER BY date ASC;
""")

with engine.connect() as conn:
df = pd.read_sql(
query,
con=conn,
params={"ticker": ticker, "start": start, "end": end},
)
df = pd.read_sql(query, conn, params={"ticker": ticker, "start_date": start_date})

if not df.empty:
df["date"] = pd.to_datetime(df["date"])
# 수정주가 처리
if "adjusted_close" in df.columns:
df["close"] = df["adjusted_close"].fillna(df["close"])

return df

# 빈 데이터 처리
if df is None or df.empty:
return pd.DataFrame(columns=["ticker", "date", "open", "high", "low", "close", "adjusted_close", "volume"])
def fetch_macro_indicators(
start_date: str,
db_name: str = "db"
) -> pd.DataFrame:
"""
[공통] 거시경제 지표 조회 (모든 종목 공통 적용)
"""
engine = get_engine(db_name)
query = text("""
SELECT date,
us10y, us2y, yield_spread,
vix_close, dxy_close, wti_price, gold_price,
credit_spread_hy
FROM public.macroeconomic_indicators
WHERE date >= :start_date
ORDER BY date ASC;
""")

with engine.connect() as conn:
df = pd.read_sql(query, conn, params={"start_date": start_date})

if not df.empty:
df["date"] = pd.to_datetime(df["date"])

return df

def fetch_market_breadth(
start_date: str,
db_name: str = "db"
) -> pd.DataFrame:
"""
[공통] 시장 건전성 지표 (Market Breadth)
"""
engine = get_engine(db_name)
query = text("""
SELECT date,
advance_decline_ratio, fear_greed_index,
new_highs, new_lows, above_ma200_pct
FROM public.market_breadth
WHERE date >= :start_date
ORDER BY date ASC;
""")

# 날짜 변환
if "date" in df.columns:
with engine.connect() as conn:
df = pd.read_sql(query, conn, params={"start_date": start_date})

if not df.empty:
df["date"] = pd.to_datetime(df["date"])

# 데이터 보정 로직 추가
# 1. adjusted_close가 없는 경우(NaN) -> close 값으로 대체 (결측치 방지)
if "adjusted_close" in df.columns and "close" in df.columns:
df["adjusted_close"] = df["adjusted_close"].fillna(df["close"])
elif "adjusted_close" not in df.columns and "close" in df.columns:
# 컬럼 자체가 없으면 close를 복사해서 생성
df["adjusted_close"] = df["close"]
return df

def fetch_news_sentiment(
ticker: str,
start_date: str,
db_name: str = "db"
) -> pd.DataFrame:
"""
[개별] 뉴스 심리 지수 조회
"""
engine = get_engine(db_name)
query = text("""
SELECT date, sentiment_score, risk_keyword_cnt, article_count
FROM public.news_sentiment
WHERE ticker = :ticker
AND date >= :start_date
ORDER BY date ASC;
""")

# 컬럼 순서 정리
desired_cols = ["ticker", "date", "open", "high", "low", "close", "adjusted_close", "volume"]
cols_present = [c for c in desired_cols if c in df.columns]
df = df.loc[:, cols_present]
with engine.connect() as conn:
df = pd.read_sql(query, conn, params={"ticker": ticker, "start_date": start_date})

if not df.empty:
df["date"] = pd.to_datetime(df["date"])

return df

def fetch_fundamentals(
ticker: str,
db_name: str = "db"
) -> pd.DataFrame:
"""
[개별] 기업 펀더멘털 데이터 (재무제표)
"""
engine = get_engine(db_name)
# 재무제표는 start_date 제한 없이 가져와서 ffill 하는 것이 안전함
query = text("""
SELECT date, per, pbr, roe, debt_ratio, operating_cash_flow
FROM public.company_fundamentals
WHERE ticker = :ticker
ORDER BY date ASC;
""")

with engine.connect() as conn:
df = pd.read_sql(query, conn, params={"ticker": ticker})

if not df.empty:
df["date"] = pd.to_datetime(df["date"])

return df
27 changes: 22 additions & 5 deletions AI/modules/data_collector/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
# AI/modules/collector/__init__.py
from .market_data import update_market_data
from .news_data import collect_news
# AI/modules/data_collector/__init__.py

from .market_data import MarketDataCollector
from .stock_info_collector import StockInfoCollector
from .company_fundamentals_data import FundamentalsDataCollector
from .macro_data import MacroDataCollector
from .crypto_data import CryptoDataCollector
from .index_data import IndexDataCollector
from .event_data import EventDataCollector
from .market_breadth_data import MarketBreadthCollector
from .market_breadth_stats import MarketBreadthStatsCollector
# from .news_data import NewsDataCollector # 뉴스 모듈 구현 시 주석 해제

__all__ = [
"update_market_data",
"collect_news",
"MarketDataCollector",
"StockInfoCollector",
"FundamentalsDataCollector",
"MacroDataCollector",
"CryptoDataCollector"
"IndexDataCollector",
"EventDataCollector",
"MarketBreadthCollector",
"MarketBreadthStatsCollector",
# "NewsDataCollector", # 뉴스 모듈 구현 시 주석 해제
]
Loading