diff --git a/code/python/data_loading/db_load.py b/code/python/data_loading/db_load.py
old mode 100644
new mode 100755
index c44fb5701..953863cf0
--- a/code/python/data_loading/db_load.py
+++ b/code/python/data_loading/db_load.py
@@ -15,6 +15,10 @@
import tempfile
import traceback
from urllib.parse import urlparse
+import textwrap
+import copy
+import re
+from bs4 import BeautifulSoup, Tag
from typing import List, Dict, Any, Tuple, Union, Optional
@@ -24,6 +28,8 @@
read_file_lines,
prepare_documents_from_json,
documents_from_csv_line,
+ int64_hash,
+ get_item_name
)
# Import vector database client directly
@@ -499,59 +505,234 @@ async def process_csv_file(file_path: str, site: str) -> List[Dict[str, Any]]:
return documents
-
async def process_rss_feed(file_path: str, site: str) -> List[Dict[str, Any]]:
"""
Process an RSS/Atom feed into document objects.
-
+
Args:
file_path: Path to the RSS file or URL
site: Site identifier
-
+
Returns:
List of document objects
"""
print(f"Processing RSS/Atom feed: {file_path}")
-
+
try:
# Convert feed to schema.org format
- podcast_episodes = rss2schema.feed_to_schema(file_path)
-
+ items = rss2schema.feed_to_schema(file_path)
+
documents = []
-
- # Process each episode in the feed
- for episode in podcast_episodes:
- # Extract URL
+
+ # Process each item in the feed
+ for episode in items:
+ # Extract (or synthesise) the canonical URL
url = episode.get("url")
-
- # Generate a synthetic URL if needed
if not url and "name" in episode:
url = f"synthetic:{site}:{episode['name']}"
episode["url"] = url
- print(f"Generated synthetic URL for episode: {episode['name']}")
- elif not url:
- # Skip items without any identifiable information
+ print(f"Generated synthetic URL for item: {episode['name']}")
+ elif not url: # Skip items with no identifier at all
continue
+
+ # CHUNKING Introduction ------------------------------------------------------------------------------
+ """
+ Heading-aware chunking (v2) — Misha
+ - Splits long HTML content into retrieval-friendly chunks
+ - H1 → pageTitle (metadata); H2 → parentSection (context only).
+ - Preserves
blocks intact
+ - Promotes / to their own “lowerHeading” chunks
+ - Never cuts inside a
,
, , or
+ - Adds pageTitle / parentSection / heading / lowerHeading
+ - Uses stable anchor slugs in URLs.
+ Trade-offs
+ - Slightly higher embedding/storage cost (negligible).
+ - Significantly better recall and section-level precision.
+
+ Fallbacks
+ - If no suitable headings or body HTML is missing, emits a single unchunked document.
+ """
+
+ MAX_CHARS = 2_000 # hard upper limit per chunk
+
+ # choose the longest body-like field available
+ body_html = (
+ episode.get("articleBody")
+ or episode.get("description")
+ or episode.get("content")
+ )
+
+ # if nothing to chunk, fall back to single-doc path
+ if not body_html:
+ pass # drop through to “single-doc” below
+ else:
+ soup = BeautifulSoup(body_html, "html.parser")
+
+ # debug
+ # print("DEBUG: body-chars:", len(body_html))
+ # print("DEBUG: h2:", len(soup.find_all("h2")))
+ # print("DEBUG: h3:", len(soup.find_all("h3")))
+ # print("DEBUG: h4:", len(soup.find_all("h4")))
+ # print("DEBUG: h5:", len(soup.find_all("h5")))
+
+ # util → slug for URL anchors
+ def slugify(txt: str) -> str:
+ txt = txt.lower()
+ txt = re.sub(r"[^\w\s-]", "", txt)
+ txt = re.sub(r"\s+", "-", txt)
+ return txt[:80].strip("-")
+
+ page_title = soup.find("h1")
+ page_title = (
+ page_title.get_text(strip=True)
+ if page_title
+ else episode.get("name", "")
+ )
+
+ current_h2: Optional[str] = None
+ current_h3: Optional[str] = None
+
+ # helper: collect every sibling until the next heading
+ heading_tags = {"h2", "h3", "h4", "h5"}
+
+ def collect_block_siblings(start_node: Tag) -> List[str]:
+ """Return HTML strings of all siblings up to the next heading."""
+ parts: List[str] = []
+ for sib in start_node.next_siblings:
+ if isinstance(sib, Tag) and sib.name in heading_tags:
+ break
+ if isinstance(sib, Tag):
+ parts.append(str(sib)) # keep
,
, , …
+ return parts
+
+ # iterate over every heading *wherever it lives* in DOM order
+ for heading in soup.find_all(list(heading_tags)):
+ tag = heading.name
+
+ # H2
+ if tag == "h2":
+ current_h2 = heading.get_text(strip=True)
+ current_h3 = None
+ continue
+
+ # H3 (main section block)
+ if tag == "h3":
+ current_h3 = heading.get_text(strip=True)
+ chunk_html_parts = collect_block_siblings(heading)
+
+ if not chunk_html_parts:
+ continue # nothing to store
+
+ full_html = "".join(chunk_html_parts)
+
+ # paragraph-aware split when > MAX_CHARS
+ if len(full_html) > MAX_CHARS:
+ temp_chunks: List[str] = []
+ buf = ""
+ for p_html in chunk_html_parts:
+ if len(buf) + len(p_html) > MAX_CHARS:
+ temp_chunks.append(buf)
+ buf = ""
+ buf += p_html
+ if buf:
+ temp_chunks.append(buf)
+ else:
+ temp_chunks = [full_html]
+
+ for idx, html_piece in enumerate(temp_chunks):
+ anchor = slugify(current_h3)
+ sub_url = f"{url}#{anchor}" if len(temp_chunks) == 1 else f"{url}#{anchor}-{idx}"
+
+ sub = copy.deepcopy(episode)
+ sub.update(
+ {
+ "articleBody" : html_piece,
+ "pageTitle" : page_title,
+ "parentSection": current_h2 or "",
+ "heading" : current_h3,
+ }
+ )
+
+ sub_json = json.dumps(sub, ensure_ascii=False).replace("\n", " ")
+ documents.append(
+ {
+ "id" : str(int64_hash(sub_url)),
+ "schema_json": sub_json,
+ "url" : sub_url,
+ "name" : get_item_name(sub) or page_title,
+ "site" : site,
+ }
+ )
+ print(f"CHUNKED -> {sub_url} ({len(html_piece)} chars)")
+ continue # done with this
+
+ # H4 / H5 (promoted lowerHeading)
+ if tag in {"h4", "h5"}:
+ lower_heading = heading.get_text(strip=True)
+ chunk_html_parts = collect_block_siblings(heading)
+
+ if not chunk_html_parts:
+ continue
+
+ full_html = "".join(chunk_html_parts)
+ # simple wrap if still too long (rare for h4 blocks)
+ parts = (
+ textwrap.wrap(full_html, MAX_CHARS, break_long_words=False)
+ if len(full_html) > MAX_CHARS
+ else [full_html]
+ )
+
+ for idx, html_piece in enumerate(parts):
+ anchor = slugify(lower_heading)
+ sub_url = f"{url}#{anchor}" if len(parts) == 1 else f"{url}#{anchor}-{idx}"
+
+ sub = copy.deepcopy(episode)
+ sub.update(
+ {
+ "articleBody" : html_piece,
+ "pageTitle" : page_title,
+ "parentSection": current_h2 or "",
+ "heading" : current_h3 or "",
+ "lowerHeading" : lower_heading,
+ }
+ )
+
+ sub_json = json.dumps(sub, ensure_ascii=False).replace("\n", " ")
+ documents.append(
+ {
+ "id" : str(int64_hash(sub_url)),
+ "schema_json": sub_json,
+ "url" : sub_url,
+ "name" : get_item_name(sub) or page_title,
+ "site" : site,
+ }
+ )
+ print(f"CHUNKED -> {sub_url} ({len(html_piece)} chars)")
+
+ # if we added any heading-based chunks, skip the single-doc path
+ if documents:
+ continue
+ # END heading-aware chunking -----------------------------------------------------------
+
# Convert to JSON - ensure no newlines in the JSON
json_data = json.dumps(episode, ensure_ascii=False).replace("\n", " ")
-
+
# Extract name
- name = episode.get("name", "Untitled Episode")
-
- # Create document
- document = {
- "id": str(hash(url) % (2**63)), # Create a stable ID from the URL
+ name = episode.get("name", "Untitled Item")
+
+ # Create single (short) document
+ documents.append({
+ "id" : str(int64_hash(url)),
"schema_json": json_data,
- "url": url,
- "name": name,
- "site": site
- }
-
- documents.append(document)
-
- print(f"Processed {len(documents)} episodes from RSS/Atom feed")
+ "url" : url,
+ "name" : name,
+ "site" : site
+ })
+
+ print(f"Processed {len(documents)} items from RSS/Atom feed")
return documents
+
except Exception as e:
print(f"Error processing RSS/Atom feed: {str(e)}")
traceback.print_exc()
diff --git a/code/python/data_loading/rss2schema.py b/code/python/data_loading/rss2schema.py
old mode 100644
new mode 100755
index 0de740ab2..c3071c36c
--- a/code/python/data_loading/rss2schema.py
+++ b/code/python/data_loading/rss2schema.py
@@ -244,73 +244,150 @@ def parse_rss_2_0(root: ET.Element, feed_url: Optional[str] = None) -> List[Dict
print("Warning: No channel element found in RSS feed")
return result
- # Extract podcast (feed) information
- podcast_title = safe_get_text(channel.find('title'))
- podcast_description = safe_get_text(channel.find('description'))
- podcast_link = safe_get_text(channel.find('link'))
- podcast_language = safe_get_text(channel.find('language'))
+ # Extract feed information
+ title = safe_get_text(channel.find('title'))
+ description = safe_get_text(channel.find('description'))
+ link = safe_get_text(channel.find('link'))
+ language = safe_get_text(channel.find('language'))
# Extract image
- podcast_image = None
+ image_obj = None
image_elem = channel.find('image')
if image_elem is not None:
image_url = safe_get_text(image_elem.find('url'))
if image_url:
- podcast_image = {"@type": "ImageObject", "url": fix_url(image_url)}
+ image_obj = {"@type": "ImageObject", "url": fix_url(image_url)}
# iTunes image (higher quality)
for ns_prefix, ns_uri in NAMESPACES.items():
if ns_prefix == 'itunes':
itunes_image = channel.find(f".//{{{ns_uri}}}image")
if itunes_image is not None and 'href' in itunes_image.attrib:
- podcast_image = {"@type": "ImageObject", "url": fix_url(itunes_image.get('href'))}
-
- # Create basic podcast series schema
- podcast_series = {
- "@type": "PodcastSeries",
- "name": podcast_title,
- "description": podcast_description,
- "url": fix_url(podcast_link) or feed_url or ""
+ image_obj = {"@type": "ImageObject", "url": fix_url(itunes_image.get('href'))}
+
+ # Create container schema for the feed
+ schema = {
+ "@type": "WebSite",
+ "name": title,
+ "description": description,
+ "url": fix_url(link) or feed_url or ""
}
- if podcast_image:
- podcast_series["image"] = podcast_image
+ if image_obj:
+ schema["image"] = image_obj
- if podcast_language:
- podcast_series["inLanguage"] = podcast_language
+ if language:
+ schema["inLanguage"] = language
- # Process each item (episode)
+ # Process each item
for item in channel.findall('item'):
try:
# Basic fields
- title = safe_get_text(item.find('title'))
+ title = safe_get_text(item.find('title'))
description = safe_get_text(item.find('description'))
- pub_date = safe_get_text(item.find('pubDate'))
-
- # URL (critical field)
- url = extract_best_url(item, feed_url)
-
- if not url and not title:
- # Skip items without any identifiable information
- continue
+ pub_date = safe_get_text(item.find('pubDate'))
- # Create episode schema
+ # (default "Article")
+ art_type_elem = item.find('articleType')
+ article_type_raw = art_type_elem.text.strip() if art_type_elem is not None and art_type_elem.text else "Article"
+
+ # split CSV → list; single value stays string
+ article_types = [t.strip() for t in article_type_raw.split(',') if t.strip()]
+ article_type = article_types if len(article_types) > 1 else article_types[0]
+ # ------------------------------------------------------------
+ # Build the Article object **first** so we can write into it
+ # ------------------------------------------------------------
episode = {
- "@type": "PodcastEpisode",
- "name": title,
- "description": description,
+ "@type" : article_type,
+ "name" : title,
+ "description" : description,
"datePublished": pub_date
}
+ # Language
+ lang_elem = item.find('language')
+ if lang_elem is not None and lang_elem.text:
+ episode["inLanguage"] = lang_elem.text.strip()
+
+ # -> articleBody
+ content_encoded = item.find('{http://purl.org/rss/1.0/modules/content/}encoded')
+ if content_encoded is not None and content_encoded.text:
+ episode["articleBody"] = content_encoded.text
+
+ # -> author
+ creator = item.find('{http://purl.org/dc/elements/1.1/}creator')
+ author_type_elem = item.find('authorType')
+ author_type = author_type_elem.text.strip() if author_type_elem is not None and author_type_elem.text else "Organization"
+
+ if creator is not None and creator.text:
+ episode["author"] = {
+ "@type": "Person" if author_type.lower() == "person" else "Organization",
+ "name" : creator.text.strip()
+ }
+
+ # tag (leaf category) → articleSection
+ sec_elem = item.find('articleSection')
+ if sec_elem is not None and sec_elem.text:
+ episode["articleSection"] = sec_elem.text.strip()
+
+ # tags → about[] (all categories/tags)
+ cats = [c.text.strip() for c in item.findall('category') if c.text]
+ if cats:
+ episode["about"] = cats
+ # Fallback: if articleSection wasn’t filled by the tag, use first category
+ if "articleSection" not in episode:
+ episode["articleSection"] = cats[0]
+
+ # and → isPartOf (nested)
+ mid_elem = item.find('midSection')
+ parent_elem = item.find('parentSection')
+ mid_name = mid_elem.text.strip() if mid_elem is not None and mid_elem.text else None
+ parent_name = parent_elem.text.strip() if parent_elem is not None and parent_elem.text else None
+
+ if mid_name and parent_name:
+ episode["isPartOf"] = {
+ "@type": "CreativeWorkSeries",
+ "name": mid_name,
+ "isPartOf": {
+ "@type": "CreativeWorkSeries",
+ "name": parent_name
+ }
+ }
+ elif parent_name:
+ episode["isPartOf"] = {
+ "@type": "CreativeWorkSeries",
+ "name": parent_name
+ }
+
+ # -> image (as an ImageObject)
+ feat_img = item.find('featuredImage')
+ if feat_img is not None and feat_img.text:
+ episode["image"] = {
+ "@type": "ImageObject",
+ "url": fix_url(feat_img.text.strip())
+ }
+
+ # tag -> keywords[]
+ kw_elem = item.find('keywords')
+ if kw_elem is not None and kw_elem.text:
+ episode["keywords"] = list(dict.fromkeys(
+ w.strip() for w in kw_elem.text.split(',') if w.strip()
+ ))
+
+ # URL (critical field)
+ url = extract_best_url(item, feed_url)
if url:
episode["url"] = url
+ elif not title:
+ # Nothing identifiable – skip this -
+ continue
- # Add GUID if available
+ # GUID → identifier (optional but useful)
guid = extract_guid(item)
if guid and guid != url:
episode["identifier"] = guid
- # Add enclosure (audio file)
+ # Enclosure (audio file)
enclosure = item.find('enclosure')
if enclosure is not None:
enclosure_url = enclosure.get('url')
@@ -334,7 +411,7 @@ def parse_rss_2_0(root: ET.Element, feed_url: Optional[str] = None) -> List[Dict
episode["associatedMedia"] = audio_object
- # Add iTunes specific fields
+ # iTunes-specific fields
for ns_prefix, ns_uri in NAMESPACES.items():
if ns_prefix == 'itunes':
# Duration
@@ -363,7 +440,7 @@ def parse_rss_2_0(root: ET.Element, feed_url: Optional[str] = None) -> List[Dict
except ValueError:
pass
- # Add image if available
+ # iTunes image (per-item)
for ns_prefix, ns_uri in NAMESPACES.items():
if ns_prefix == 'itunes':
itunes_image = item.find(f".//{{{ns_uri}}}image")
@@ -373,8 +450,8 @@ def parse_rss_2_0(root: ET.Element, feed_url: Optional[str] = None) -> List[Dict
"url": fix_url(itunes_image.get('href'))
}
- # Add podcast series reference
- episode["partOf"] = podcast_series
+ # Reference the feed container
+ episode["partOf"] = schema
# Add to result
result.append(episode)
@@ -414,9 +491,9 @@ def parse_atom(root: ET.Element, feed_url: Optional[str] = None) -> List[Dict[st
feed_link = fix_url(feed_link or "")
- # Create podcast series schema
- podcast_series = {
- "@type": "PodcastSeries",
+ # Create container schema for the feed
+ schema = {
+ "@type": "WebSite",
"name": feed_title,
"description": feed_subtitle,
"url": feed_link
@@ -452,7 +529,7 @@ def parse_atom(root: ET.Element, feed_url: Optional[str] = None) -> List[Dict[st
# Skip entries without any identifiable information
continue
- # Create episode schema
+ # Create item schema
episode = {
"@type": "PodcastEpisode",
"name": title,
@@ -493,8 +570,8 @@ def parse_atom(root: ET.Element, feed_url: Optional[str] = None) -> List[Dict[st
episode["associatedMedia"] = audio_object
break
- # Add podcast series reference
- episode["partOf"] = podcast_series
+ # Reference the feed container
+ episode["partOf"] = schema
# Add to result
result.append(episode)
diff --git a/code/python/requirements.txt b/code/python/requirements.txt
old mode 100644
new mode 100755
index 260ef3a3a..fc9849975
--- a/code/python/requirements.txt
+++ b/code/python/requirements.txt
@@ -18,6 +18,7 @@ httpx>=0.28.1
seaborn>=0.13.0
openai>=1.12.0
PyJWT>=2.8.0
+beautifulsoup4>=4.12.3
# Optional LLM provider dependencies
# NOTE: These packages will be installed AUTOMATICALLY at runtime when you first use a provider.