Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 210 additions & 29 deletions code/python/data_loading/db_load.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
import tempfile
import traceback
from urllib.parse import urlparse
import textwrap
import copy
import re
from bs4 import BeautifulSoup, Tag

from typing import List, Dict, Any, Tuple, Union, Optional

Expand All @@ -24,6 +28,8 @@
read_file_lines,
prepare_documents_from_json,
documents_from_csv_line,
int64_hash,
get_item_name
)

# Import vector database client directly
Expand Down Expand Up @@ -499,59 +505,234 @@ async def process_csv_file(file_path: str, site: str) -> List[Dict[str, Any]]:
return documents



async def process_rss_feed(file_path: str, site: str) -> List[Dict[str, Any]]:
"""
Process an RSS/Atom feed into document objects.

Args:
file_path: Path to the RSS file or URL
site: Site identifier

Returns:
List of document objects
"""
print(f"Processing RSS/Atom feed: {file_path}")

try:
# Convert feed to schema.org format
podcast_episodes = rss2schema.feed_to_schema(file_path)
items = rss2schema.feed_to_schema(file_path)

documents = []
# Process each episode in the feed
for episode in podcast_episodes:
# Extract URL

# Process each item in the feed
for episode in items:
# Extract (or synthesise) the canonical URL
url = episode.get("url")

# Generate a synthetic URL if needed
if not url and "name" in episode:
url = f"synthetic:{site}:{episode['name']}"
episode["url"] = url
print(f"Generated synthetic URL for episode: {episode['name']}")
elif not url:
# Skip items without any identifiable information
print(f"Generated synthetic URL for item: {episode['name']}")
elif not url: # Skip items with no identifier at all
continue

# CHUNKING Introduction ------------------------------------------------------------------------------
"""
Heading-aware chunking (v2) — Misha
- Splits long HTML content into retrieval-friendly chunks
- H1 → pageTitle (metadata); H2 → parentSection (context only).
- Preserves <h3> blocks intact
- Promotes <h4>/<h5> to their own “lowerHeading” chunks
- Never cuts inside a <p>, <ul>, <ol>, or <div>
- Adds pageTitle / parentSection / heading / lowerHeading
- Uses stable anchor slugs in URLs.

Trade-offs
- Slightly higher embedding/storage cost (negligible).
- Significantly better recall and section-level precision.

Fallbacks
- If no suitable headings or body HTML is missing, emits a single unchunked document.
"""

MAX_CHARS = 2_000 # hard upper limit per chunk

# choose the longest body-like field available
body_html = (
episode.get("articleBody")
or episode.get("description")
or episode.get("content")
)

# if nothing to chunk, fall back to single-doc path
if not body_html:
pass # drop through to “single-doc” below
else:
soup = BeautifulSoup(body_html, "html.parser")

# debug
# print("DEBUG: body-chars:", len(body_html))
# print("DEBUG: h2:", len(soup.find_all("h2")))
# print("DEBUG: h3:", len(soup.find_all("h3")))
# print("DEBUG: h4:", len(soup.find_all("h4")))
# print("DEBUG: h5:", len(soup.find_all("h5")))

# util → slug for URL anchors
def slugify(txt: str) -> str:
txt = txt.lower()
txt = re.sub(r"[^\w\s-]", "", txt)
txt = re.sub(r"\s+", "-", txt)
return txt[:80].strip("-")

page_title = soup.find("h1")
page_title = (
page_title.get_text(strip=True)
if page_title
else episode.get("name", "")
)

current_h2: Optional[str] = None
current_h3: Optional[str] = None

# helper: collect every sibling until the next heading
heading_tags = {"h2", "h3", "h4", "h5"}

def collect_block_siblings(start_node: Tag) -> List[str]:
"""Return HTML strings of all siblings up to the next heading."""
parts: List[str] = []
for sib in start_node.next_siblings:
if isinstance(sib, Tag) and sib.name in heading_tags:
break
if isinstance(sib, Tag):
parts.append(str(sib)) # keep <p>, <ul>, <ol>, <div>…
return parts

# iterate over every heading *wherever it lives* in DOM order
for heading in soup.find_all(list(heading_tags)):
tag = heading.name

# H2
if tag == "h2":
current_h2 = heading.get_text(strip=True)
current_h3 = None
continue

# H3 (main section block)
if tag == "h3":
current_h3 = heading.get_text(strip=True)
chunk_html_parts = collect_block_siblings(heading)

if not chunk_html_parts:
continue # nothing to store

full_html = "".join(chunk_html_parts)

# paragraph-aware split when > MAX_CHARS
if len(full_html) > MAX_CHARS:
temp_chunks: List[str] = []
buf = ""
for p_html in chunk_html_parts:
if len(buf) + len(p_html) > MAX_CHARS:
temp_chunks.append(buf)
buf = ""
buf += p_html
if buf:
temp_chunks.append(buf)
else:
temp_chunks = [full_html]

for idx, html_piece in enumerate(temp_chunks):
anchor = slugify(current_h3)
sub_url = f"{url}#{anchor}" if len(temp_chunks) == 1 else f"{url}#{anchor}-{idx}"

sub = copy.deepcopy(episode)
sub.update(
{
"articleBody" : html_piece,
"pageTitle" : page_title,
"parentSection": current_h2 or "",
"heading" : current_h3,
}
)

sub_json = json.dumps(sub, ensure_ascii=False).replace("\n", " ")
documents.append(
{
"id" : str(int64_hash(sub_url)),
"schema_json": sub_json,
"url" : sub_url,
"name" : get_item_name(sub) or page_title,
"site" : site,
}
)
print(f"CHUNKED -> {sub_url} ({len(html_piece)} chars)")
continue # done with this <h3>

# H4 / H5 (promoted lowerHeading)
if tag in {"h4", "h5"}:
lower_heading = heading.get_text(strip=True)
chunk_html_parts = collect_block_siblings(heading)

if not chunk_html_parts:
continue

full_html = "".join(chunk_html_parts)
# simple wrap if still too long (rare for h4 blocks)
parts = (
textwrap.wrap(full_html, MAX_CHARS, break_long_words=False)
if len(full_html) > MAX_CHARS
else [full_html]
)

for idx, html_piece in enumerate(parts):
anchor = slugify(lower_heading)
sub_url = f"{url}#{anchor}" if len(parts) == 1 else f"{url}#{anchor}-{idx}"

sub = copy.deepcopy(episode)
sub.update(
{
"articleBody" : html_piece,
"pageTitle" : page_title,
"parentSection": current_h2 or "",
"heading" : current_h3 or "",
"lowerHeading" : lower_heading,
}
)

sub_json = json.dumps(sub, ensure_ascii=False).replace("\n", " ")
documents.append(
{
"id" : str(int64_hash(sub_url)),
"schema_json": sub_json,
"url" : sub_url,
"name" : get_item_name(sub) or page_title,
"site" : site,
}
)
print(f"CHUNKED -> {sub_url} ({len(html_piece)} chars)")

# if we added any heading-based chunks, skip the single-doc path
if documents:
continue
# END heading-aware chunking -----------------------------------------------------------

# Convert to JSON - ensure no newlines in the JSON
json_data = json.dumps(episode, ensure_ascii=False).replace("\n", " ")

# Extract name
name = episode.get("name", "Untitled Episode")
# Create document
document = {
"id": str(hash(url) % (2**63)), # Create a stable ID from the URL
name = episode.get("name", "Untitled Item")

# Create single (short) document
documents.append({
"id" : str(int64_hash(url)),
"schema_json": json_data,
"url": url,
"name": name,
"site": site
}

documents.append(document)

print(f"Processed {len(documents)} episodes from RSS/Atom feed")
"url" : url,
"name" : name,
"site" : site
})

print(f"Processed {len(documents)} items from RSS/Atom feed")
return documents

except Exception as e:
print(f"Error processing RSS/Atom feed: {str(e)}")
traceback.print_exc()
Expand Down
Loading