diff --git a/AI/libs/core/pipeline.py b/AI/libs/core/pipeline.py index ebeab552..fdf166b4 100644 --- a/AI/libs/core/pipeline.py +++ b/AI/libs/core/pipeline.py @@ -1,126 +1,150 @@ import os +import sys +from typing import List, Dict import json -import datetime as dt +from datetime import datetime, timedelta import pandas as pd -from typing import Dict -# ============================================== -# 내부 모듈 (이미 구현돼 있다고 가정) -# ============================================== -from finder.modules.finder import run_finder_with_scores # 종목+점수 매기기 포함 -from transform.modules.transform import run_transform -from xai.modules.xai import run_xai -from AI.libs.utils.data import fetch_ohlcv -from AI.libs.utils.io import _log +# --- 프로젝트 루트 경로 설정 --- +project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +sys.path.append(project_root) +# ------------------------------ -# ============================================== -# Helper: Finder 결과 → JSON 변환 -# ============================================== -def make_reasons_json(finder_df: pd.DataFrame, run_date: str) -> Dict: +# --- 모듈 import --- +from finder.main import run_finder +from transform.modules.main import run_transform +from libs.utils.data.fetch_ohlcv import fetch_ohlcv +from xai.run_xai import run_xai +# --------------------------------- + +def run_weekly_finder() -> List[str]: """ - Finder 결과에서 종목 선택 이유를 JSON 구조로 변환 - { "YYYY-MM-DD": { "TICKER1": "이유 요약", "TICKER2": "..." } } + 주간 종목 발굴(Finder)을 실행하고 결과(종목 리스트)를 반환합니다. """ - reasons = {} - daily_reasons = {} - for _, row in finder_df.iterrows(): - daily_reasons[row["ticker"]] = row.get("reason", "선정 사유 없음") - reasons[run_date] = daily_reasons - return reasons - -# ============================================== -# 주간 Finder (월요일 1회) -# ============================================== -def run_weekly_finder(config: dict, run_date: str) -> pd.DataFrame: - _log(f"[FINDER] 주간 종목 선정 실행 ({run_date})") - - finder_df = run_finder_with_scores(config) # 종목+점수+이유 포함 DataFrame - - out_dir = os.path.join(config["storage"]["out_dir"], "finder") - os.makedirs(out_dir, exist_ok=True) - - # parquet 저장 - finder_path = os.path.join(out_dir, f"finder_{run_date}.parquet") - finder_df.to_parquet(finder_path, index=False) - - # JSON 이유 저장 (append) - reasons_path = os.path.join(out_dir, "reasons.json") - reasons = make_reasons_json(finder_df, run_date) - if os.path.exists(reasons_path): - with open(reasons_path, "r", encoding="utf-8") as f: - prev = json.load(f) - else: - prev = {} - prev.update(reasons) - with open(reasons_path, "w", encoding="utf-8") as f: - json.dump(prev, f, ensure_ascii=False, indent=2) + print("--- [PIPELINE-STEP 1] Finder 모듈 실행 시작 ---") + top_tickers = run_finder() + # top_tickers = ['AAPL', 'MSFT', 'GOOGL'] # 임시 데이터 + print(f"--- [PIPELINE-STEP 1] Finder 모듈 실행 완료 ---") + return top_tickers - return finder_df - -# ============================================== -# 일간 Transform + XAI -# ============================================== -def run_daily_tasks(config: dict, run_date: str, finder_df: pd.DataFrame) -> None: - _log(f"[DAILY] Transform + XAI 실행 ({run_date})") - - # 데이터 수집 - tickers = finder_df["ticker"].tolist() - window_days = int(config.get("data", {}).get("window_days", 252 * 5)) - interval = str(config.get("data", {}).get("interval", "1d")) - cache_dir = str(config.get("storage", {}).get("cache_dir", "")) - - market_data = fetch_ohlcv(tickers, period_days=window_days, interval=interval, cache_dir=cache_dir) - - # Transform (학습 + 로그 생성) - tr = run_transform( - finder_df, - seq_len=config["transform"]["seq_len"], - pred_h=config["transform"]["pred_h"], +def run_signal_transform(tickers: List[str], config: Dict) -> pd.DataFrame: + """ + 종목 리스트를 받아 Transform 모듈을 실행하고, 신호(결정 로그)를 반환합니다. + """ + print("--- [PIPELINE-STEP 2] Transform 모듈 실행 시작 ---") + + # --- 실제 Transform 모듈 호출 --- + end_date = datetime.now() + start_date = end_date - timedelta(days=600) + all_ohlcv_df = [] + for ticker in tickers: + ohlcv_df = fetch_ohlcv( + ticker=ticker, + start=start_date.strftime('%Y-%m-%d'), + end=end_date.strftime('%Y-%m-%d'), + config=config + ) + ohlcv_df['ticker'] = ticker + all_ohlcv_df.append(ohlcv_df) + if not all_ohlcv_df: + print("OHLCV 데이터를 가져오지 못했습니다.") + return pd.DataFrame() + raw_data = pd.concat(all_ohlcv_df, ignore_index=True) + finder_df = pd.DataFrame(tickers, columns=['ticker']) + transform_result = run_transform( + finder_df=finder_df, + seq_len=60, + pred_h=1, + raw_data=raw_data, + config=config ) - logs_df: pd.DataFrame = tr["logs"] # (종목,날짜,매매여부,가격,비중,피쳐...,확률...) - - # Transform 로그 저장 (Parquet) - out_dir = os.path.join(config["storage"]["out_dir"], "transform") - os.makedirs(out_dir, exist_ok=True) - log_path = os.path.join(out_dir, f"logs_{run_date}.parquet") - logs_df.to_parquet(log_path, index=False) - - # XAI 리포트 생성 + 저장 (JSON per ticker) - xai_out_dir = os.path.join(config["storage"]["out_dir"], "xai", run_date) - os.makedirs(xai_out_dir, exist_ok=True) - - xai_reports = run_xai(logs_df) - for ticker, report in xai_reports.items(): - with open(os.path.join(xai_out_dir, f"{ticker}.json"), "w", encoding="utf-8") as f: - json.dump(report, f, ensure_ascii=False, indent=2) - - _log(f"[DAILY] Transform 로그 + XAI 저장 완료 ({run_date})") - -# ============================================== -# 메인 파이프라인 -# ============================================== -def run_pipeline(config: dict) -> bool: - run_date = dt.datetime.now(dt.timezone(dt.timedelta(hours=9))).strftime("%Y-%m-%d") - + logs_df = transform_result.get("logs", pd.DataFrame()) + + # --- 임시 결정 로그 데이터 (주석 처리) --- + # data = { + # 'ticker': ['AAPL', 'GOOGL', 'MSFT'], + # 'date': ['2025-09-17', '2025-09-17', '2025-09-17'], + # 'action': ['SELL', 'BUY', 'SELL'], + # 'price': [238.99, 249.52, 510.01], + # 'weight': [0.16, 0.14, 0.15], + # 'feature1': ['RSI', 'Stochastic', 'MACD'], + # 'feature2': ['MACD', 'MA_5', 'ATR'], + # 'feature3': ['Bollinger_Bands_lower', 'RSI', 'MA_200'], + # 'prob1': [0.5, 0.4, 0.6], + # 'prob2': [0.3, 0.25, 0.2], + # 'prob3': [0.1, 0.15, 0.1] + # } + # logs_df = pd.DataFrame(data) + + print(f"--- [PIPELINE-STEP 2] Transform 모듈 실행 완료 ---") + return logs_df + +def run_xai_report(decision_log: pd.DataFrame) -> List[str]: + """ + 결정 로그를 바탕으로 실제 XAI 리포트를 생성합니다. + """ + print("--- [PIPELINE-STEP 3] XAI 리포트 생성 시작 ---") + api_key = os.environ.get("GROQ_API_KEY") + if not api_key: + raise ValueError("XAI 리포트 생성을 위해 GROQ_API_KEY 환경 변수를 설정해주세요.") + reports = [] + if decision_log.empty: + return reports + for _, row in decision_log.iterrows(): + decision = { + "ticker": row['ticker'], + "date": row['date'], + "signal": row['action'], + "price": row['price'], + "evidence": [ + {"feature_name": row['feature1'], "contribution": row['prob1']}, + {"feature_name": row['feature2'], "contribution": row['prob2']}, + {"feature_name": row['feature3'], "contribution": row['prob3']}, + ] + } + try: + report = run_xai(decision, api_key) + reports.append(report) + print(f"--- {row['ticker']} XAI 리포트 생성 완료 ---") + except Exception as e: + error_message = f"--- {row['ticker']} XAI 리포트 생성 중 오류 발생: {e} ---" + print(error_message) + reports.append(error_message) + print(f"--- [PIPELINE-STEP 3] XAI 리포트 생성 완료 ---") + return reports + +# --- 전체 파이프라인 실행 --- +def run_pipeline(): + """ + 전체 파이프라인(Finder -> Transform -> XAI)을 실행합니다. + """ + config = None try: - _log(f"=== 배치 시작: {run_date} ===") - - # 1) 주간 Finder (월요일만 새로 실행) - finder_out_dir = os.path.join(config["storage"]["out_dir"], "finder") - if dt.datetime.now().weekday() == 0: # 월요일 - finder_df = run_weekly_finder(config, run_date) - else: - last_file = sorted( - [f for f in os.listdir(finder_out_dir) if f.startswith("finder_")] - )[-1] - finder_df = pd.read_parquet(os.path.join(finder_out_dir, last_file)) - - # 2) 일간 Transform + XAI - run_daily_tasks(config, run_date, finder_df) - - _log("=== 배치 성공 ===") - return True - - except Exception as e: - _log(f"[ERROR] 배치 실패: {e}") - return False + with open(os.path.join(project_root, 'configs', 'config.json'), 'r') as f: + config = json.load(f) + except FileNotFoundError: + print("[WARN] configs/config.json 파일을 찾을 수 없어 DB 연결이 필요 없는 기능만 작동합니다.") + top_tickers = run_weekly_finder() + if not top_tickers: + print("Finder에서 종목을 찾지 못해 파이프라인을 중단합니다.") + return None + decision_log = run_signal_transform(top_tickers, config) + if decision_log.empty: + print("Transform에서 신호를 생성하지 못해 파이프라인을 중단합니다.") + return None + xai_reports = run_xai_report(decision_log) + return xai_reports + +# --- 테스트를 위한 실행 코드 --- +if __name__ == "__main__": + print(">>> 파이프라인 (Finder -> Transform -> XAI) 테스트를 시작합니다.") + final_reports = run_pipeline() + print("\n>>> 최종 반환 결과 (XAI Reports):") + if final_reports: + for report in final_reports: + print(report) + else: + print("생성된 리포트가 없습니다.") + print("\n---") + print("테스트가 정상적으로 완료되었다면, 위 '최종 반환 결과'에 각 종목에 대한 XAI 리포트가 출력되어야 합니다.") + print("---") diff --git a/AI/libs/utils/data/fetch_ohlcv.py b/AI/libs/utils/data/fetch_ohlcv.py index a0752d3d..5eda2b7e 100644 --- a/AI/libs/utils/data/fetch_ohlcv.py +++ b/AI/libs/utils/data/fetch_ohlcv.py @@ -38,15 +38,14 @@ def fetch_ohlcv( query = """ SELECT date, open, high, low, close, volume - FROM stock_prices + FROM public.price_data WHERE ticker = %s - AND interval = %s AND date BETWEEN %s AND %s ORDER BY date; """ # 파라미터 바인딩 (%s) 사용 → SQL injection 방지 - df = pd.read_sql(query, conn, params=(ticker, interval, start, end)) + df = pd.read_sql(query, conn, params=(ticker, start, end)) conn.close() return df diff --git a/AI/libs/utils/news_processing_requests.py b/AI/libs/utils/news_processing_requests.py new file mode 100644 index 00000000..931c5a9a --- /dev/null +++ b/AI/libs/utils/news_processing_requests.py @@ -0,0 +1,212 @@ +import pandas as pd +import time +import re +from tqdm import tqdm +from datetime import datetime, timedelta +from typing import List +import warnings +warnings.filterwarnings("ignore", category=FutureWarning) + +from langchain_community.llms import Ollama + +import requests +from bs4 import BeautifulSoup + +# --- private 함수 --- + +def _news_href_crawl(target_date: datetime) -> pd.DataFrame: + ''' + 네이버 페이증권 해외증시 뉴스 링크 수집 함수 (requests + bs4, page 파라미터 방식) + ''' + base_url = "https://finance.naver.com/news/news_list.naver" + base_params = { + "mode": "LSS3D", + "section_id": "101", + "section_id2": "258", + "section_id3": "403", + "date": target_date + } + + href_list, title_list, time_list = [], [], [] + page = 1 + + while True: + params = base_params.copy() + params['page'] = page + + response = requests.get(base_url, params=params, headers={'User-Agent': 'Mozilla/5.0'}) + response.raise_for_status() + soup = BeautifulSoup(response.content, 'html.parser') + + articles = soup.select(".realtimeNewsList .articleSubject") + if not articles: + break # 기사가 없으면 중단 + + summaries = soup.select(".realtimeNewsList .articleSummary") + for i, article in enumerate(articles): + link_tag = article.find('a') + if link_tag and link_tag.has_attr('href'): + from urllib.parse import urlparse, parse_qs + + # 파라미터 파싱으로 office_id와 article_id 추출 + parsed_url = urlparse(link_tag['href']) + query_params = parse_qs(parsed_url.query) + + office_id = query_params.get('office_id', [None])[0] + article_id = query_params.get('article_id', [None])[0] + + if office_id and article_id: + # 새로운 표준 URL로 재조립 + clean_url = f"https://n.news.naver.com/mnews/article/{office_id}/{article_id}" + href_list.append(clean_url) + title_list.append(link_tag.get('title', '')) + if i < len(summaries): + time_list.append(summaries[i].find('span', class_='wdate').text.strip()) + else: + time_list.append('') + + page += 1 # 다음 페이지로 + + return pd.DataFrame({"href": href_list, "title": title_list, "date": time_list}) + +def _news_content_crawl(url_list: List[str]) -> List[str]: + ''' + news_href_crawl에서 수집된 뉴스 본문 수집 함수 (requests + bs4) + ''' + content_list = [] + for url in tqdm(url_list): + try: + response = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}) + response.raise_for_status() + soup = BeautifulSoup(response.content, 'html.parser') + + # 기사 본문 선택 + article_body = soup.find('article') + if article_body: + text = article_body.get_text() + else: + # 일부 다른 구조의 뉴스 페이지 대응 + article_body = soup.select_one('#newsct_article') + if article_body: + text = article_body.get_text() + else: + text = "" # 본문을 찾지 못한 경우 + + # 정규식을 이용한 텍스트 클리닝 + text = re.sub(r"\([^)]*기자\)|[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+", "", text) + text = re.sub(r"\['.*?']", "", text) + text = re.sub(r"\s+", " ", text).strip() + + content_list.append(text) + except requests.exceptions.RequestException as e: + print(f"Error fetching {url}: {e}") + content_list.append("") # 오류 발생 시 빈 문자열 추가 + + return content_list + +def _llm_prompt(text: str) -> str: + ''' + 뉴스 요약을 위한 프롬프트 생성 함수 + ''' + prompt = f"""You're an expert analyst. Extract key information from the news article below. Use only information from news articles, do not add unknown information. + +News: +{text} + +Please respond in English and No output other than the output format is required. Summarize output format(JSON): +{{ + "Stock": "", + "Event": "", + "Factor": "", + "Reason": "", + "Sentiment": "", + "Confidence": "" +}} +""" + + return prompt + +def _llm_summary(time_list: List, content_list: List, llm_client) -> pd.DataFrame: + ''' + 뉴스 요약 함수 + ''' + summary_list = [] + + for text in tqdm(content_list): + input_txt = _llm_prompt(text) + summary_list.append(llm_client.invoke(input_txt)) + + return pd.DataFrame({"date": time_list, + "summary": summary_list + }) + +def _collect_and_summarize_news(target_date: datetime, llm_client) -> pd.DataFrame: + ''' + 지정된 날짜의 해외증시 뉴스를 수집하고 LLM으로 요약하는 함수 + ''' + print(f"===== [News Processing] {target_date} 뉴스 수집 및 요약 시작 =====") + + href_df = _news_href_crawl(target_date) + + content_list = _news_content_crawl(list(href_df['href'])) + href_df['content'] = content_list + + valid_df = href_df[href_df['content'].isnull()==False].reset_index(drop=True) + + summary_df = _llm_summary(valid_df['date'], valid_df['content'], llm_client=llm_client) + + print("===== [News Processing] 완료 =====") + + return summary_df + +# --- public 함수 --- +def get_weekly_news_summary(days: int, llm_client) -> pd.DataFrame: + ''' + 지정된 기간(일)만큼 뉴스를 하루씩 순차적으로 수집하고 요약하여 합치는 역할을 합니다. + ''' + print(f"===== [Weekly News Summary] 지난 {days}일치 뉴스 요약 시작 =====") + + all_summaries = [] + + for i in range(1, days + 1): + date = datetime.now() - timedelta(days=i) + target_date = date.strftime('%Y%m%d') + + try: + daily_summary_df = _collect_and_summarize_news( + target_date = target_date, + llm_client = llm_client + ) + all_summaries.append(daily_summary_df) + print(f"--- {target_date} 뉴스 요약 완료 ---") + + except Exception as e: + print(f"!!! {target_date} 처리 중 오류 발생: {e} !!!") + continue + + if not all_summaries: + print("수집된 뉴스 요약이 없습니다.") + return pd.DataFrame() + + weekly_summary_df = pd.concat(all_summaries, ignore_index=True) + + print(f"===== [Weekly News Summary] 총 {len(weekly_summary_df)}개 뉴스 요약 완료 =====") + + return weekly_summary_df + +# --- 테스트 코드 --- +if __name__ == '__main__': + print("--- news_processing_requests.py 테스트 모드 (주간 수집) ---") + + my_llm = Ollama(model="llama3.2") + DAYS_TO_COLLECT = 1 # 테스트를 위해 1일치만 수집 + + try: + weekly_output_df = get_weekly_news_summary(days=DAYS_TO_COLLECT, llm_client=my_llm) + + print(f"\n[최종 {DAYS_TO_COLLECT}일치 요약 결과 (상위 5개)]") + print(weekly_output_df.head()) + print(f"\n전체 요약 개수: {len(weekly_output_df)}") + + except Exception as e: + print(f"\n테스트 중 오류 발생: {e}") diff --git a/AI/requirements.txt b/AI/requirements.txt new file mode 100644 index 00000000..0757c868 --- /dev/null +++ b/AI/requirements.txt @@ -0,0 +1,13 @@ +pandas +psycopg2-binary +langchain-community +tqdm +selenium +webdriver-manager +numpy +scikit-learn +tensorflow +yfinance +groq +requests +beautifulsoup4 diff --git a/AI/transform/modules/main.py b/AI/transform/modules/main.py index c4e281ee..eed8d0d1 100644 --- a/AI/transform/modules/main.py +++ b/AI/transform/modules/main.py @@ -5,10 +5,11 @@ import numpy as np import pandas as pd from sklearn.preprocessing import MinMaxScaler -import tensorflow as tf from tensorflow.keras import layers, Model +import tensorflow as tf -from AI.libs.utils.io import _log +# from AI.libs.utils.io import _log +_log = print # TODO: 추후 io._log 구현 시 복구 from .models import build_transformer_classifier # <-- 모델 분리됨 # ===== 공개 상수 ===== diff --git a/AI/xai/requirements.txt b/AI/xai/requirements.txt deleted file mode 100644 index 34150119..00000000 --- a/AI/xai/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -pandas -yfinance -numpy -groq