-
Notifications
You must be signed in to change notification settings - Fork 63
refactor: refactor pipeline engine using ray data #110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…to refactor/refactor-with-ray-data
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
…open-sciencelab/GraphGen into refactor/refactor-with-ray-data
|
/gemini summary |
|
Hi @ChenZiHong-Gavin, it looks like you've tried to invoke the |
|
/gemini summary |
|
/gemini review |
Summary of ChangesThis pull request introduces a major architectural overhaul to the GraphGen project, transitioning its pipeline execution from a custom threading model to a robust, distributed framework built on Ray Data. This refactoring is designed to enhance the system's scalability, resource efficiency, and maintainability by standardizing operations around Ray's data processing primitives. Key changes include the introduction of a Highlights
Changelog
Activity
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This is an impressive and substantial refactoring of the pipeline execution engine. Migrating from a custom threading-based system to a Ray Data-native framework is a significant improvement for scalability, maintainability, and robustness. The new architecture with a declarative YAML configuration, a Ray Data-based engine, and modular operators is well-designed. The use of Ray actors for managing stateful services like LLMs and storage is a solid pattern for distributed execution.
My review includes a few suggestions to further improve robustness and performance. Specifically, I've commented on improving error handling in data processing, making logging more consistent in storage classes, and optimizing database write patterns.
Overall, this is an excellent piece of work that modernizes the project's core infrastructure.
| def _should_keep_item(self, item: Dict[str, Any]) -> bool: | ||
| """ | ||
| Determine whether to keep the given item based on the text column. | ||
| :param item: Dictionary representing a data entry. | ||
| :return: True if the item should be kept, False otherwise. | ||
| """ | ||
| Filter out entries with empty or missing text in the specified column. | ||
| item_type = item.get("type") | ||
| assert item_type in [ | ||
| "text", | ||
| "image", | ||
| "table", | ||
| "equation", | ||
| "protein", | ||
| ], f"Unsupported item type: {item_type}" | ||
| if item_type == "text": | ||
| content = item.get(self.text_column, "").strip() | ||
| return bool(content) | ||
| return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In _should_keep_item, using assert to validate item_type will crash the Ray worker on unsupported types. It's more robust to log a warning and return False to filter out invalid items without halting the pipeline.
def _should_keep_item(self, item: Dict[str, Any]) -> bool:
"""
Determine whether to keep the given item based on the text column.
:param item: Dictionary representing a data entry.
:return: True if the item should be kept, False otherwise.
"""
item_type = item.get("type")
if item_type not in {
"text",
"image",
"table",
"equation",
"protein",
}:
# Consider logging a warning for visibility on data quality issues.
return False
if item_type == "text":
content = item.get(self.text_column, "").strip()
return bool(content)
return True| self._graph.nodes[node_id].update(node_data) | ||
| else: | ||
| logger.warning("Node %s not found in the graph for update.", node_id) | ||
| print(f"Node {node_id} not found in the graph for update.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replacing logger.warning with print is a regression in logging practices. print bypasses logging configuration (levels, formatters, handlers), making it difficult to control log output in different environments. It's better to use the standard logging module. Since the context-aware logger might not be available in this class, importing logging and using logging.warning(...) would be a more robust solution. This feedback applies to other print statements in this file and other storage classes (e.g., kuzu_storage.py, json_storage.py).
| except Exception as e: # pylint: disable=broad-except | ||
| logger.error("Error in judging description: %s", e) | ||
| logger.info("Use default loss 0.1") | ||
| item["loss"] = -math.log(0.1) | ||
| return item |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catching a broad Exception can hide bugs and make debugging difficult. If the LLM client library defines more specific exceptions (e.g., for API errors, timeouts, or content filtering), it's better to catch those explicitly. This allows for more granular error handling and reporting. If specific exceptions are not available, consider using logger.exception() to automatically include traceback information in the log, which is very helpful for debugging.
| def _pre_tokenize(self) -> None: | ||
| """Pre-tokenize all nodes and edges to add token length information.""" | ||
| logger.info("Starting pre-tokenization of nodes and edges...") | ||
|
|
||
| nodes = self.kg_instance.get_all_nodes() | ||
| edges = self.kg_instance.get_all_edges() | ||
|
|
||
| # Process nodes | ||
| for node_id, node_data in nodes: | ||
| if "length" not in node_data: | ||
| try: | ||
| description = node_data.get("description", "") | ||
| tokens = self.tokenizer_instance.encode(description) | ||
| node_data["length"] = len(tokens) | ||
| self.kg_instance.update_node(node_id, node_data) | ||
| except Exception as e: | ||
| logger.warning("Failed to tokenize node %s: %s", node_id, e) | ||
| node_data["length"] = 0 | ||
|
|
||
| # Process edges | ||
| for u, v, edge_data in edges: | ||
| if "length" not in edge_data: | ||
| try: | ||
| description = edge_data.get("description", "") | ||
| tokens = self.tokenizer_instance.encode(description) | ||
| edge_data["length"] = len(tokens) | ||
| self.kg_instance.update_edge(u, v, edge_data) | ||
| except Exception as e: | ||
| logger.warning("Failed to tokenize edge %s-%s: %s", u, v, e) | ||
| edge_data["length"] = 0 | ||
|
|
||
| # Persist changes | ||
| self.kg_instance.index_done_callback() | ||
| logger.info("Pre-tokenization completed.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _pre_tokenize method updates nodes and edges one by one inside loops. This can lead to a large number of individual write operations to the storage backend, which is inefficient, especially for database-backed storages like KuzuDB. It would be more performant to batch these updates. Consider collecting all node and edge updates into lists and then performing a bulk update operation if the storage backend supports it. This might require adding bulk update methods to the BaseGraphStorage interface for better performance.
…ing read functionality
…ing read functionality
* feat: add config and operator node types * refactor: refactor readers with ray data * fix: delete param parallelism for readers * fix: fix import error * refactor read and chunk operators with no side effects * fix: fix import error * fix: fix return logic * refactor: rename operator split to chunk * refactor: refactor build_kg to accomodate ray data * feat: add StorageFactory & global params * refactor: refactor quiz to accomodata ray data engine * fix: reload graph before quizzing * Potential fix for pull request finding 'Unreachable code' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> * fix: fix quiz params * refactor: refactor quiz&judge to ray actors * fix: fix transferring quizzed data to JudgeService * refactor: refactor partition to accomodate ray data * fix: fix lint problem * refactor: refactor op generate * feat: write results in output folder * fix: raise error when no dataset is created * fix: return generator in ece_partitioner * fix: return generator in ece_partitioner * refactor: refactor data format to support multi-modal input * fix: delete fetching schema to avoid ray's duplicate execution * fix: fix operators' registry * feat: refactor schema_guided_extraction & add examples * feat: seperate ray logs and service logs * feat: use storage actor * feat: add kuzu graph database * feat: add llm as actors * refactor: delete old runner * fix: fix vllm wrapper * docs: update .env.example * fix: use kuzudb in quiz_service * fix: update webui * feat: make storage backend configuragble * docs: update README” --------- Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
This PR fundamentally refactors GraphGen's pipeline execution engine from a custom threading-based orchestration system to a Ray Data-native distributed processing framework. The changes improve scalability, resource management, and maintainability while preserving the core pipeline semantics.