Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
300 changes: 178 additions & 122 deletions AI/modules/data_collector/company_fundamentals_data.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#AI/modules/data_collector/company_fundamentals_data.py
# AI/modules/data_collector/company_fundamentals_data.py
import sys
import os
import time
import requests
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import timedelta
from typing import List
from datetime import datetime
from psycopg2.extras import execute_values

# 프로젝트 루트 경로 설정
Expand All @@ -19,56 +20,48 @@

class FundamentalsDataCollector:
"""
기업의 재무제표(손익계산서, 대차대조표, 현금흐름표)를 수집하고
주요 퀀트 투자 지표(ROE, 부채비율, 이자보상배율 등)를 계산하여 DB에 저장하는 클래스
per과 pbr은 MarketDataCollector에서 계산하여 저장됨(중복 방지)
[하이브리드 재무 데이터 수집기 - Docker 완벽 대응]
1. YFinance 엔진: 빈 DB를 빠르게 채우기 위해 4년 치 연간 데이터를 수집합니다. (제한 없음)
2. FMP API 엔진: 매일 80개씩 10년 치 연간 데이터로 점진적 업그레이드(Backfill)를 수행합니다.
* DB의 stock_info.fmp_completed 상태값을 사용하여 신규 상장주 무한 루프를 방지합니다.
"""

def __init__(self, db_name: str = "db"):
self.db_name = db_name
self.api_key = os.getenv("FMP_API_KEY")

def get_safe_value(self, row: pd.Series, keys: List[str]) -> float:
"""
여러 키 후보 중 존재하는 값을 찾아 반환합니다. (결측치 처리 포함)
"""
for k in keys:
if k in row and pd.notna(row[k]):
return float(row[k])
return None

def fetch_and_calculate_metrics(self, ticker: str):
"""
개별 종목의 재무 데이터를 수집, 병합 및 지표를 계산합니다.
"""
# ==========================================
# 🚀 엔진 1: YFinance (최근 4년 치 빠른 수집)
# ==========================================
def fetch_yf_metrics(self, ticker: str) -> List[tuple]:
stock = yf.Ticker(ticker)

# 1. 재무 데이터 가져오기 (분기 기준)
try:
fin_df = stock.quarterly_financials.T
bal_df = stock.quarterly_balance_sheet.T
cash_df = stock.quarterly_cashflow.T
except Exception as e:
print(f"[ERROR][{ticker}] yfinance 데이터 로드 실패: {e}")
return pd.DataFrame()
fin_df = stock.financials.T
bal_df = stock.balance_sheet.T
cash_df = stock.cashflow.T
except Exception:
return []

if fin_df.empty or bal_df.empty:
return pd.DataFrame()
return []

# 2. 인덱스(날짜) 통일
fin_df.index = pd.to_datetime(fin_df.index)
bal_df.index = pd.to_datetime(bal_df.index)
cash_df.index = pd.to_datetime(cash_df.index)

# 3. 데이터프레임 병합 (Inner Join)
merged_df = fin_df.join(bal_df, lsuffix='_fin', rsuffix='_bal', how='inner')
merged_df = merged_df.join(cash_df, rsuffix='_cash', how='left')

processed_data = []

for date_idx, row in merged_df.iterrows():
date_val = date_idx.date()

# --- A. 기본 재무 데이터 매핑 ---
revenue = self.get_safe_value(row, ['Total Revenue', 'Operating Revenue'])
net_income = self.get_safe_value(row, ['Net Income', 'Net Income Common Stockholders'])
total_assets = self.get_safe_value(row, ['Total Assets'])
Expand All @@ -78,58 +71,79 @@ def fetch_and_calculate_metrics(self, ticker: str):
operating_cash_flow = self.get_safe_value(row, ['Operating Cash Flow', 'Total Cash From Operating Activities'])
shares_issued = self.get_safe_value(row, ['Share Issued', 'Ordinary Shares Number'])

# --- 이자보상배율 계산을 위한 항목 ---
op_income = self.get_safe_value(row, ['Operating Income', 'EBIT'])
int_expense = self.get_safe_value(row, ['Interest Expense', 'Interest Expense Non Operating'])

# --- B. 파생 지표 계산 ---

# 1. ROE (자기자본이익률)
roe = None
if net_income is not None and equity is not None and equity != 0:
roe = net_income / equity

# 2. 부채비율 (Debt Ratio)
debt_ratio = None
if total_liabilities is not None and equity is not None and equity != 0:
debt_ratio = total_liabilities / equity

# 3. 이자보상배율 (Interest Coverage) = 영업이익 / |이자비용|
interest_coverage = None
if op_income is not None and int_expense is not None:
abs_int = abs(int_expense)
if abs_int > 0:
interest_coverage = op_income / abs_int
roe = (net_income / equity) if net_income and equity else None
debt_ratio = (total_liabilities / equity) if total_liabilities and equity else None
interest_coverage = (op_income / abs(int_expense)) if op_income and int_expense and abs(int_expense) > 0 else None
Comment on lines +77 to +79
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

0 값을 None으로 누락시키고 있습니다.

여기 조건식은 0을 false로 취급해서, 실제 값이 0인 경우에도 roe, debt_ratio, interest_coverageNone으로 저장합니다. 예를 들어 순이익 0, 부채 0, 영업이익 0은 유효한 값인데 현재는 결측치로 바뀝니다.

가능한 수정 예시
-            roe = (net_income / equity) if net_income and equity else None
-            debt_ratio = (total_liabilities / equity) if total_liabilities and equity else None
-            interest_coverage = (op_income / abs(int_expense)) if op_income and int_expense and abs(int_expense) > 0 else None
+            roe = (net_income / equity) if net_income is not None and equity not in (None, 0) else None
+            debt_ratio = (total_liabilities / equity) if total_liabilities is not None and equity not in (None, 0) else None
+            interest_coverage = (
+                op_income / abs(int_expense)
+                if op_income is not None and int_expense is not None and int_expense != 0
+                else None
+            )

Also applies to: 130-132

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@AI/modules/data_collector/company_fundamentals_data.py` around lines 77 - 79,
The current conditional uses truthiness so numeric zeros become None; update the
three computations (roe, debt_ratio, interest_coverage) to test for None
explicitly and check denominators for zero: use "net_income is not None and
equity is not None and equity != 0" for roe, "total_liabilities is not None and
equity is not None and equity != 0" for debt_ratio, and "op_income is not None
and int_expense is not None and int_expense != 0" (or abs(int_expense) != 0) for
interest_coverage; apply the same explicit None checks to the analogous
calculations around lines 130-132.


processed_data.append((
str(ticker),
date_val,
revenue,
net_income,
total_assets,
total_liabilities,
equity,
shares_issued,
eps,
roe,
debt_ratio,
interest_coverage,
operating_cash_flow
str(ticker), date_val, revenue, net_income, total_assets, total_liabilities,
equity, shares_issued, eps, roe, debt_ratio, interest_coverage, operating_cash_flow
))
return processed_data

# ==========================================
# 💎 엔진 2: FMP API (과거 10년 치 고급 수집)
# ==========================================
def fetch_fmp_metrics(self, ticker: str) -> List[tuple]:
if not self.api_key: return []
base_url = "https://financialmodelingprep.com/api/v3"
limit = 10

try:
inc_resp = requests.get(f"{base_url}/income-statement/{ticker}?limit={limit}&apikey={self.api_key}")
bal_resp = requests.get(f"{base_url}/balance-sheet-statement/{ticker}?limit={limit}&apikey={self.api_key}")
cf_resp = requests.get(f"{base_url}/cash-flow-statement/{ticker}?limit={limit}&apikey={self.api_key}")

inc_data, bal_data, cf_data = inc_resp.json(), bal_resp.json(), cf_resp.json()

if not inc_data or (isinstance(inc_data, dict) and 'Error Message' in inc_data):
return []

df_inc, df_bal, df_cf = pd.DataFrame(inc_data), pd.DataFrame(bal_data), pd.DataFrame(cf_data)
if df_inc.empty or df_bal.empty or df_cf.empty: return []

df_merged = pd.merge(df_inc, df_bal, on='date', how='inner', suffixes=('', '_bal'))
df_merged = pd.merge(df_merged, df_cf, on='date', how='inner', suffixes=('', '_cf'))
except:
Comment on lines +96 to +110
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat -n AI/modules/data_collector/company_fundamentals_data.py | sed -n '80,120p'

Repository: SISC-IT/sisc-web

Length of output: 2403


🏁 Script executed:

rg -n "fetch_fmp_metrics" AI/modules/data_collector/ -A 3 -B 3

Repository: SISC-IT/sisc-web

Length of output: 1424


🏁 Script executed:

cat -n AI/modules/data_collector/company_fundamentals_data.py | sed -n '245,260p'

Repository: SISC-IT/sisc-web

Length of output: 806


FMP API 호출에 timeout 및 HTTP 상태 검증이 부재합니다.

세 개의 요청 모두 timeout 없이 실행되므로 외부 API가 느리거나 응답하지 않을 경우 수집기가 무한정 멈출 수 있습니다. 또한 HTTP 상태 코드(429, 5xx 등)를 확인하지 않으며, 오류 메시지 검증이 첫 번째 응답(inc_data)에만 적용되고 bal_datacf_data에는 적용되지 않습니다. 배치 루프(250번 줄)에서 하나의 요청이 타임아웃되면 전체 파이프라인이 블록되므로 이는 배치 안정성에 직접적인 영향을 미칩니다.

권장 수정 예시
         try:
-            inc_resp = requests.get(f"{base_url}/income-statement/{ticker}?limit={limit}&apikey={self.api_key}")
-            bal_resp = requests.get(f"{base_url}/balance-sheet-statement/{ticker}?limit={limit}&apikey={self.api_key}")
-            cf_resp = requests.get(f"{base_url}/cash-flow-statement/{ticker}?limit={limit}&apikey={self.api_key}")
+            inc_resp = requests.get(
+                f"{base_url}/income-statement/{ticker}?limit={limit}&apikey={self.api_key}",
+                timeout=(5, 30),
+            )
+            bal_resp = requests.get(
+                f"{base_url}/balance-sheet-statement/{ticker}?limit={limit}&apikey={self.api_key}",
+                timeout=(5, 30),
+            )
+            cf_resp = requests.get(
+                f"{base_url}/cash-flow-statement/{ticker}?limit={limit}&apikey={self.api_key}",
+                timeout=(5, 30),
+            )
+
+            inc_resp.raise_for_status()
+            bal_resp.raise_for_status()
+            cf_resp.raise_for_status()
 
             inc_data, bal_data, cf_data = inc_resp.json(), bal_resp.json(), cf_resp.json()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
inc_resp = requests.get(f"{base_url}/income-statement/{ticker}?limit={limit}&apikey={self.api_key}")
bal_resp = requests.get(f"{base_url}/balance-sheet-statement/{ticker}?limit={limit}&apikey={self.api_key}")
cf_resp = requests.get(f"{base_url}/cash-flow-statement/{ticker}?limit={limit}&apikey={self.api_key}")
inc_data, bal_data, cf_data = inc_resp.json(), bal_resp.json(), cf_resp.json()
if not inc_data or (isinstance(inc_data, dict) and 'Error Message' in inc_data):
return []
df_inc, df_bal, df_cf = pd.DataFrame(inc_data), pd.DataFrame(bal_data), pd.DataFrame(cf_data)
if df_inc.empty or df_bal.empty or df_cf.empty: return []
df_merged = pd.merge(df_inc, df_bal, on='date', how='inner', suffixes=('', '_bal'))
df_merged = pd.merge(df_merged, df_cf, on='date', how='inner', suffixes=('', '_cf'))
except:
inc_resp = requests.get(
f"{base_url}/income-statement/{ticker}?limit={limit}&apikey={self.api_key}",
timeout=(5, 30),
)
bal_resp = requests.get(
f"{base_url}/balance-sheet-statement/{ticker}?limit={limit}&apikey={self.api_key}",
timeout=(5, 30),
)
cf_resp = requests.get(
f"{base_url}/cash-flow-statement/{ticker}?limit={limit}&apikey={self.api_key}",
timeout=(5, 30),
)
inc_resp.raise_for_status()
bal_resp.raise_for_status()
cf_resp.raise_for_status()
inc_data, bal_data, cf_data = inc_resp.json(), bal_resp.json(), cf_resp.json()
if not inc_data or (isinstance(inc_data, dict) and 'Error Message' in inc_data):
return []
df_inc, df_bal, df_cf = pd.DataFrame(inc_data), pd.DataFrame(bal_data), pd.DataFrame(cf_data)
if df_inc.empty or df_bal.empty or df_cf.empty:
return []
df_merged = pd.merge(df_inc, df_bal, on='date', how='inner', suffixes=('', '_bal'))
df_merged = pd.merge(df_merged, df_cf, on='date', how='inner', suffixes=('', '_cf'))
except:
🧰 Tools
🪛 Ruff (0.15.5)

[error] 96-96: Probable use of requests call without timeout

(S113)


[error] 97-97: Probable use of requests call without timeout

(S113)


[error] 98-98: Probable use of requests call without timeout

(S113)


[error] 106-106: Multiple statements on one line (colon)

(E701)


[error] 110-110: Do not use bare except

(E722)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@AI/modules/data_collector/company_fundamentals_data.py` around lines 96 -
110, The three FMP requests (inc_resp, bal_resp, cf_resp) lack timeouts,
status-code checks and proper error handling and the current except is a bare
catch; update the code that builds inc_resp/bal_resp/cf_resp in
company_fundamentals_data.py to: add a reasonable timeout to each requests.get
call, validate each response.status_code (treat non-2xx, 429, 5xx as failures),
attempt to parse JSON only after a successful status and validate bal_data and
cf_data the same way you validate inc_data, avoid a bare except (catch
requests.Timeout, requests.RequestException, JSONDecodeError), log the specific
error and return [] on failure so df_inc/df_bal/df_cf and df_merged are only
built when all three responses are valid.

return []

processed_data = []
for _, row in df_merged.iterrows():
try: date_val = datetime.strptime(row['date'], '%Y-%m-%d').date()
except: continue

revenue = float(row.get('revenue')) if pd.notna(row.get('revenue')) else None
net_income = float(row.get('netIncome')) if pd.notna(row.get('netIncome')) else None
total_assets = float(row.get('totalAssets')) if pd.notna(row.get('totalAssets')) else None
total_liabilities = float(row.get('totalLiabilities')) if pd.notna(row.get('totalLiabilities')) else None
equity = float(row.get('totalStockholdersEquity', row.get('totalEquity'))) if pd.notna(row.get('totalStockholdersEquity', row.get('totalEquity'))) else None
eps = float(row.get('eps')) if pd.notna(row.get('eps')) else None
shares_issued = float(row.get('weightedAverageShsOutDil', row.get('weightedAverageShsOut'))) if pd.notna(row.get('weightedAverageShsOutDil', row.get('weightedAverageShsOut'))) else None
operating_cash_flow = float(row.get('operatingCashFlow')) if pd.notna(row.get('operatingCashFlow')) else None

op_income = row.get('operatingIncome')
int_expense = row.get('interestExpense')

roe = (net_income / equity) if net_income and equity else None
debt_ratio = (total_liabilities / equity) if total_liabilities and equity else None
interest_coverage = (float(op_income) / abs(float(int_expense))) if pd.notna(op_income) and pd.notna(int_expense) and abs(float(int_expense)) > 0 else None

processed_data.append((
str(ticker), date_val, revenue, net_income, total_assets, total_liabilities,
equity, shares_issued, eps, roe, debt_ratio, interest_coverage, operating_cash_flow
))
return processed_data

# ==========================================
# 💾 DB 저장 및 상태 관리 로직
# ==========================================
def save_to_db(self, ticker: str, data: List[tuple]):
"""
처리된 데이터를 DB에 저장(Upsert)합니다.
"""
if not data:
# print(f" [{ticker}] 저장할 유효한 데이터가 없습니다.")
return

if not data: return
conn = get_db_conn(self.db_name)
cursor = conn.cursor()

try:
insert_query = """
INSERT INTO public.company_fundamentals (
Expand All @@ -138,86 +152,128 @@ def save_to_db(self, ticker: str, data: List[tuple]):
interest_coverage, operating_cash_flow
)
VALUES %s
ON CONFLICT (ticker, date)
DO UPDATE SET
revenue = EXCLUDED.revenue,
net_income = EXCLUDED.net_income,
total_assets = EXCLUDED.total_assets,
total_liabilities = EXCLUDED.total_liabilities,
equity = EXCLUDED.equity,
shares_issued = EXCLUDED.shares_issued,
eps = EXCLUDED.eps,
roe = EXCLUDED.roe,
debt_ratio = EXCLUDED.debt_ratio,
interest_coverage = EXCLUDED.interest_coverage,
operating_cash_flow = EXCLUDED.operating_cash_flow;
ON CONFLICT (ticker, date) DO UPDATE SET
revenue = EXCLUDED.revenue, net_income = EXCLUDED.net_income,
total_assets = EXCLUDED.total_assets, total_liabilities = EXCLUDED.total_liabilities,
equity = EXCLUDED.equity, shares_issued = EXCLUDED.shares_issued,
eps = EXCLUDED.eps, roe = EXCLUDED.roe, debt_ratio = EXCLUDED.debt_ratio,
interest_coverage = EXCLUDED.interest_coverage, operating_cash_flow = EXCLUDED.operating_cash_flow;
"""

execute_values(cursor, insert_query, data)
conn.commit()
print(f" [{ticker}] {len(data)}건 펀더멘털 데이터 저장 완료.")

except Exception as e:
conn.rollback()
print(f" [{ticker}][Error] DB 저장 실패: {e}")
finally:
cursor.close()
conn.close()

def update_tickers(self, tickers: List[str]):
def get_fmp_targets(self, tickers: List[str], limit: int = 80) -> List[str]:
"""
주어진 종목 리스트에 대해 업데이트를 수행합니다.
stock_info 테이블에서 fmp_completed가 FALSE(또는 NULL)인 종목만 limit 개수만큼 추려옵니다.
"""
print(f"[Fundamentals] {len(tickers)}개 종목 재무 데이터 업데이트 시작...")

for ticker in tickers:
# print(f" [{ticker}] 재무 정보 분석 및 수집 중...")
conn = get_db_conn(self.db_name)
cursor = conn.cursor()
try:
query = """
SELECT ticker FROM public.stock_info
WHERE fmp_completed IS NOT TRUE
AND ticker = ANY(%s);
"""
cursor.execute(query, (tickers,))
uncompleted = [row[0] for row in cursor.fetchall()]
return uncompleted[:limit]
except Exception as e:
print(f"[Error] FMP 타겟 DB 조회 실패: {e}")
return []
finally:
cursor.close()
conn.close()

def mark_fmp_completed(self, ticker: str):
"""
FMP 수집이 완료된(또는 시도한) 종목은 stock_info 테이블에 TRUE로 영구 도장을 찍습니다.
"""
conn = get_db_conn(self.db_name)
cursor = conn.cursor()
try:
query = "UPDATE public.stock_info SET fmp_completed = TRUE WHERE ticker = %s;"
cursor.execute(query, (ticker,))
conn.commit()
except Exception as e:
conn.rollback()
print(f" [{ticker}][Error] 완료 상태 업데이트 실패: {e}")
finally:
cursor.close()
conn.close()

# ==========================================
# 🔄 메인 업데이트 로직 (YF -> FMP)
# ==========================================
def update_tickers(self, tickers: List[str]):
print(f"\n[Fundamentals] 총 {len(tickers)}개 종목 하이브리드 수집 시작...")

# 1. DB에 이미 데이터가 있는 종목 확인 (yfinance 중복 방지)
conn = get_db_conn(self.db_name)
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT ticker FROM public.company_fundamentals;")
db_filled_tickers = set(row[0] for row in cursor.fetchall())
conn.close()

# [단계 1] 아예 데이터가 없는 종목은 YFinance로 빠르게 베이스라인 채우기
yf_targets = [t for t in tickers if t not in db_filled_tickers]
if yf_targets:
print(f" >> [Phase 1] DB에 없는 {len(yf_targets)}개 종목을 yfinance(4년 치)로 우선 채웁니다.")
for i, ticker in enumerate(yf_targets):
try:
data = self.fetch_yf_metrics(ticker)
if data: self.save_to_db(ticker, data)
if i % 10 == 0 and i > 0: print(f" ... {i}개 yf 수집 완료")
time.sleep(0.5) # yf 과부하 방지
except Exception as e:
print(f" [{ticker}] yf 수집 에러: {e}")
print(" >> [Phase 1] yfinance 베이스라인 수집 완료!\n")

# [단계 2] 매일 80개씩 FMP API 10년 치 업그레이드
if not self.api_key:
print("🚨 FMP_API_KEY가 없어 10년 치 업그레이드는 건너뜁니다.")
return

fmp_targets = self.get_fmp_targets(tickers, limit=80)

if not fmp_targets:
print("🌟 [Phase 2] 모든 종목이 이미 FMP 10년 치로 업그레이드되어 있습니다! (수집 스킵)")
return

print(f" >> [Phase 2] {len(fmp_targets)}개 종목을 FMP API(10년 치)로 업그레이드합니다.")
success_count = 0
for ticker in fmp_targets:
try:
data = self.fetch_and_calculate_metrics(ticker)
self.save_to_db(ticker, data)
data = self.fetch_fmp_metrics(ticker)
if data:
self.save_to_db(ticker, data)

# 수집을 시도했으므로 무조건 완료 도장 찍기 (신규 상장주 무한루프 방지)
self.mark_fmp_completed(ticker)
success_count += 1
except Exception as e:
print(f" [{ticker}][Error] 처리 중 예외 발생: {e}")
print(f" [{ticker}] FMP 수집 에러: {e}")

print(f" >> [Phase 2] 오늘 할당량 끝! ({success_count}개 종목 FMP 업그레이드 및 상태 저장 완료)")

# ----------------------------------------------------------------------
# [실행 모드]
# ----------------------------------------------------------------------
if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser(description="[수동] 기업 펀더멘털 데이터 수집기")
parser.add_argument("tickers", nargs='*', help="수집할 종목 티커 리스트")
parser = argparse.ArgumentParser(description="하이브리드 펀더멘털 데이터 수집기")
parser.add_argument("tickers", nargs='*', help="수집할 종목")
parser.add_argument("--all", action="store_true", help="DB 전 종목 업데이트")
parser.add_argument("--db", default="db", help="DB 이름")

args = parser.parse_args()
target_tickers = args.tickers

# DB 연결 테스트 및 종목 로드
conn = get_db_conn(args.db)

if args.all:
try:
target_tickers = load_all_tickers_from_db(verbose=False)
print(f">> DB에서 전체 종목 {len(target_tickers)}개를 로드했습니다.")
except Exception as e:
print(f"[Error] 종목 로드 실패: {e}")
sys.exit(1)
target_tickers = load_all_tickers_from_db(verbose=False)

conn.close()

if not target_tickers:
print("\n>> 수집할 종목 코드를 입력하세요 (예: AAPL TSLA)")
print(" (종료하려면 엔터)")
try:
input_str = sys.stdin.readline().strip()
if input_str:
target_tickers = input_str.split()
else:
sys.exit(0)
except KeyboardInterrupt:
sys.exit(0)

if target_tickers:
collector = FundamentalsDataCollector(db_name=args.db)
collector.update_tickers(target_tickers)
print("\n[완료] 작업이 끝났습니다.")
collector.update_tickers(target_tickers)
Loading