Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
899cf01
feat: add bucket for map and all-reduce
ChenZiHong-Gavin Nov 20, 2025
8d7f2c5
feat: stream reading files
ChenZiHong-Gavin Nov 20, 2025
3927239
fix: fix params in collect_ops
ChenZiHong-Gavin Nov 20, 2025
166ecaf
refactor: refactor engine to dataflow orchestration
ChenZiHong-Gavin Nov 20, 2025
fa23d32
fix: use default if input_stream is not givin
ChenZiHong-Gavin Nov 20, 2025
543c5d9
refactor: rename chunk_document.py
ChenZiHong-Gavin Nov 20, 2025
e18fe7a
feat: handle async generator
ChenZiHong-Gavin Nov 20, 2025
9cc1479
wip
ChenZiHong-Gavin Nov 20, 2025
c26bd9d
Merge branch 'main' of https://github.com/open-sciencelab/GraphGen in…
ChenZiHong-Gavin Nov 20, 2025
4b3d9d9
feat: adapt read, chunk, build_kg operators to new optypes
ChenZiHong-Gavin Nov 20, 2025
8eebad1
fix: async_lock when using blast search
ChenZiHong-Gavin Nov 20, 2025
1e69b0e
feat: add search config
ChenZiHong-Gavin Nov 20, 2025
6112d83
feat: add search scripts
ChenZiHong-Gavin Nov 21, 2025
54181d5
Merge branch 'main' of https://github.com/open-sciencelab/GraphGen in…
ChenZiHong-Gavin Nov 21, 2025
a7a0155
style: diable pylint error for catching too general exceptions
ChenZiHong-Gavin Nov 21, 2025
bc487fb
feat: add parallel scan_files
ChenZiHong-Gavin Nov 21, 2025
bd2f7c4
refactor: refactor txt_reader using ray data
ChenZiHong-Gavin Nov 21, 2025
0422bd0
refactor: refactor csv_reader using ray data
ChenZiHong-Gavin Nov 21, 2025
36e80ef
refactor: refactor json_reader using ray data
ChenZiHong-Gavin Nov 21, 2025
db8252c
refactor: refactor parquet_reader using ray data
ChenZiHong-Gavin Nov 21, 2025
97f7e75
refactor: refactor pdf_reader using ray data
ChenZiHong-Gavin Nov 21, 2025
ba865c8
Merge branch 'main' of https://github.com/open-sciencelab/GraphGen in…
ChenZiHong-Gavin Nov 21, 2025
ac99aa8
refactor: refactor pickle_reader using ray data
ChenZiHong-Gavin Nov 22, 2025
3d9185a
refactor: refactor rdf_reader using ray data
ChenZiHong-Gavin Nov 22, 2025
d5924f0
fix: fix scanning file path
ChenZiHong-Gavin Nov 24, 2025
1e71080
fix: fix read_files
ChenZiHong-Gavin Nov 24, 2025
00551e3
fix: fix pylint problems
ChenZiHong-Gavin Nov 24, 2025
cb2833c
fix: fix for pull request finding 'Empty except'
ChenZiHong-Gavin Nov 24, 2025
f391c24
perf: optimize read_files.py by deleting implementation of ray.data.D…
ChenZiHong-Gavin Nov 24, 2025
e2bf272
Merge branch 'feature/map-and-all-reduce' of https://github.com/open-…
ChenZiHong-Gavin Nov 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ disable=raw-checker-failed,
R0917, # Too many positional arguments (6/5) (too-many-positional-arguments)
C0103,
E0401,
W0703, # Catching too general exception Exception

# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
Expand Down
95 changes: 55 additions & 40 deletions graphgen/bases/base_reader.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os
from abc import ABC, abstractmethod
from typing import Any, Dict, List
from typing import Any, Dict, List, Union

import pandas as pd
import requests
from ray.data import Dataset


class BaseReader(ABC):
Expand All @@ -14,52 +16,65 @@ def __init__(self, text_column: str = "content"):
self.text_column = text_column

@abstractmethod
def read(self, file_path: str) -> List[Dict[str, Any]]:
def read(self, input_path: Union[str, List[str]]) -> Dataset:
"""
Read data from the specified file path.

:param file_path: Path to the input file.
:return: List of dictionaries containing the data.
:param input_path: Path to the input file or list of file paths.
:return: Ray Dataset containing the read data.
"""

@staticmethod
def filter(data: List[dict]) -> List[dict]:
def _should_keep_item(self, item: Dict[str, Any]) -> bool:
"""
Determine whether to keep the given item based on the text column.

:param item: Dictionary representing a data entry.
:return: True if the item should be kept, False otherwise.
"""
Filter out entries with empty or missing text in the specified column.
item_type = item.get("type")
assert item_type in [
"text",
"image",
"table",
"equation",
"protein",
], f"Unsupported item type: {item_type}"
if item_type == "text":
content = item.get(self.text_column, "").strip()
return bool(content)
return True

:param data: List of dictionaries containing the data.
:return: Filtered list of dictionaries.
def _validate_batch(self, batch: pd.DataFrame) -> pd.DataFrame:
"""
Validate data format.
"""
if "type" not in batch.columns:
raise ValueError(f"Missing 'type' column. Found: {list(batch.columns)}")

def _image_exists(path_or_url: str, timeout: int = 3) -> bool:
"""
Check if an image exists at the given local path or URL.
:param path_or_url: Local file path or remote URL of the image.
:param timeout: Timeout for remote URL requests in seconds.
:return: True if the image exists, False otherwise.
"""
if not path_or_url:
return False
if not path_or_url.startswith(("http://", "https://", "ftp://")):
path = path_or_url.replace("file://", "", 1)
path = os.path.abspath(path)
return os.path.isfile(path)
try:
resp = requests.head(path_or_url, allow_redirects=True, timeout=timeout)
return resp.status_code == 200
except requests.RequestException:
return False
if "text" in batch["type"].values:
if self.text_column not in batch.columns:
raise ValueError(
f"Missing '{self.text_column}' column for text documents"
)

filtered_data = []
for item in data:
if item.get("type") == "text":
content = item.get("content", "").strip()
if content:
filtered_data.append(item)
elif item.get("type") in ("image", "table", "equation"):
img_path = item.get("img_path")
if _image_exists(img_path):
filtered_data.append(item)
else:
filtered_data.append(item)
return filtered_data
return batch

@staticmethod
def _image_exists(path_or_url: str, timeout: int = 3) -> bool:
"""
Check if an image exists at the given local path or URL.
:param path_or_url: Local file path or remote URL of the image.
:param timeout: Timeout for remote URL requests in seconds.
:return: True if the image exists, False otherwise.
"""
if not path_or_url:
return False
if not path_or_url.startswith(("http://", "https://", "ftp://")):
path = path_or_url.replace("file://", "", 1)
path = os.path.abspath(path)
return os.path.isfile(path)
try:
resp = requests.head(path_or_url, allow_redirects=True, timeout=timeout)
return resp.status_code == 200
except requests.RequestException:
return False
4 changes: 2 additions & 2 deletions graphgen/bases/base_splitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def split_text(self, text: str) -> List[str]:
"""
Split the input text into smaller chunks.

:param text: The input text to be split.
:param text: The input text to be chunk.
:return: A list of text chunks.
"""

Expand Down Expand Up @@ -111,7 +111,7 @@ def _merge_splits(self, splits: Iterable[str], separator: str) -> List[str]:
def _split_text_with_regex(
text: str, separator: str, keep_separator: Union[bool, Literal["start", "end"]]
) -> List[str]:
# Now that we have the separator, split the text
# Now that we have the separator, chunk the text
if separator:
if keep_separator:
# The parentheses in the pattern keep the delimiters in the result.
Expand Down
2 changes: 1 addition & 1 deletion graphgen/configs/search_config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pipeline:
- name: read
params:
input_file: resources/input_examples/search_demo.json # input file path, support json, jsonl, txt, pdf. See resources/input_examples for examples
input_file: resources/input_examples/search_demo.jsonl # input file path, support json, jsonl, txt, pdf. See resources/input_examples for examples

- name: search
params:
Expand Down
Loading
Loading