From 25a94ddd52bc5d0daf495650aa5f1c557b606387 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Wed, 5 Feb 2025 12:33:13 +0100 Subject: [PATCH 01/20] CSV Document Splitter --- haystack/components/preprocessors/__init__.py | 3 +- .../preprocessors/csv_document_splitter.py | 181 ++++++++++++++++++ ...sv-document-splitter-426dcc0392c08f62.yaml | 5 + .../converters/test_csv_document_splitter.py | 86 +++++++++ 4 files changed, 274 insertions(+), 1 deletion(-) create mode 100644 haystack/components/preprocessors/csv_document_splitter.py create mode 100644 releasenotes/notes/csv-document-splitter-426dcc0392c08f62.yaml create mode 100644 test/components/converters/test_csv_document_splitter.py diff --git a/haystack/components/preprocessors/__init__.py b/haystack/components/preprocessors/__init__.py index 26d30c1520..92e466c63d 100644 --- a/haystack/components/preprocessors/__init__.py +++ b/haystack/components/preprocessors/__init__.py @@ -2,9 +2,10 @@ # # SPDX-License-Identifier: Apache-2.0 +from .csv_document_splitter import CSVDocumentSplitter from .document_cleaner import DocumentCleaner from .document_splitter import DocumentSplitter from .recursive_splitter import RecursiveDocumentSplitter from .text_cleaner import TextCleaner -__all__ = ["DocumentSplitter", "DocumentCleaner", "RecursiveDocumentSplitter", "TextCleaner"] +__all__ = ["DocumentSplitter", "DocumentCleaner", "RecursiveDocumentSplitter", "TextCleaner", "CSVDocumentSplitter"] diff --git a/haystack/components/preprocessors/csv_document_splitter.py b/haystack/components/preprocessors/csv_document_splitter.py new file mode 100644 index 0000000000..865e5680e4 --- /dev/null +++ b/haystack/components/preprocessors/csv_document_splitter.py @@ -0,0 +1,181 @@ +from io import StringIO +from typing import Dict, List, Literal + +import pandas as pd + +from haystack import Document, component, logging + +logger = logging.getLogger(__name__) + + +@component +class CSVDocumentSplitter: + """ + A component for splitting CSV documents + """ + + def __init__(self, row_split_threshold: int = 2, column_split_threshold: int = 2) -> None: + """ + Initializes the CSVDocumentSplitter component. + + :param row_split_threshold: + The minimum number of consecutive empty rows required to trigger a split. + A higher threshold prevents excessive splitting, while a lower threshold may lead + to more fragmented sub-tables. + :param column_split_threshold: + The minimum number of consecutive empty columns required to trigger a split. + A higher threshold prevents excessive splitting, while a lower threshold may lead + to more fragmented sub-tables. + """ + if row_split_threshold < 1: + raise ValueError("split_threshold must be greater than 0") + self.row_split_threshold = row_split_threshold + if column_split_threshold < 1: + raise ValueError("split_threshold must be greater than 0") + self.column_split_threshold = column_split_threshold + + @component.output_types(documents=List[Document]) + def run(self, documents: List[Document]) -> Dict[str, List[Document]]: + """ + Processes and splits a list of CSV documents into multiple sub-tables. + + **Splitting Process:** + 1. Row Splitting: Detects empty rows and separates tables stacked vertically. + 2. Column Splitting: Detects empty columns and separates side-by-side tables. + 3. Recursive Row Check: After splitting by columns, it checks for new row splits + introduced by the column split. + + :param documents: A list of Documents containing CSV-formatted content. + Each document is assumed to contain one or more tables separated by empty rows or columns. + + :return: + A dictionary with a key `"documents"`, mapping to a list of new `Document` objects, + each representing an extracted sub-table from the original CSV. + + - If a document cannot be processed, it is returned unchanged. + - The `meta` field from the original document is preserved in the split documents. + """ + cleaned_documents = [] + for document in documents: + try: + df = pd.read_csv(StringIO(document.content), header=None, dtype=object) # type: ignore + except Exception as e: + logger.error(f"Error processing document {document.id}. Keeping it, but skipping splitting. Error: {e}") + cleaned_documents.append(document) + continue + + split_dfs = self._recursive_split( + df=df, row_split_threshold=self.row_split_threshold, column_split_threshold=self.column_split_threshold + ) + for split_df in split_dfs: + cleaned_documents.append( + Document(content=split_df.to_csv(index=False, header=False), meta=document.meta.copy()) + ) + + return {"documents": cleaned_documents} + + def _find_split_indices(self, df: pd.DataFrame, split_threshold: int, axis: Literal["row", "column"]) -> List[int]: + """ + Finds the indices of consecutive empty rows or columns in a DataFrame. + + :param df: DataFrame to split. + :param split_threshold: Minimum number of consecutive empty rows or columns to trigger a split. + :param axis: Axis along which to find empty elements. Either "row" or "column". + :return: List of indices where consecutive empty rows or columns start. + """ + if axis == "row": + empty_elements = df[df.isnull().all(axis=1)].index.tolist() + else: + empty_elements = df.columns[df.isnull().all(axis=0)].tolist() + + # Identify groups of consecutive empty elements + split_indices = [] + consecutive_count = 1 + for i in range(1, len(empty_elements)): + if empty_elements[i] == empty_elements[i - 1] + 1: + consecutive_count += 1 + else: + if consecutive_count >= split_threshold: + split_indices.append(empty_elements[i - 1]) + consecutive_count = 1 + + if consecutive_count >= split_threshold: + split_indices.append(empty_elements[-1]) + + return split_indices + + def _split_dataframe( + self, df: pd.DataFrame, split_threshold: int, axis: Literal["row", "column"] + ) -> List[pd.DataFrame]: + """ + Splits a DataFrame into sub-tables based on consecutive empty rows or columns exceeding `split_threshold`. + + :param df: DataFrame to split. + :param split_threshold: Minimum number of consecutive empty rows or columns to trigger a split. + :param axis: Axis along which to split. Either "row" or "column". + :return: List of split DataFrames. + """ + # Find indices of consecutive empty rows or columns + split_indices = self._find_split_indices(df=df, split_threshold=split_threshold, axis=axis) + + # Split the DataFrame at identified indices + sub_tables = [] + start_idx = 0 + df_length = df.shape[0] if axis == "row" else df.shape[1] + for end_idx in split_indices + [df_length]: + # Avoid empty splits + if end_idx - start_idx > 1: + if axis == "row": + sub_table = df.iloc[start_idx:end_idx].dropna(how="all", axis=0) + else: + sub_table = df.iloc[:, start_idx:end_idx].dropna(how="all", axis=1) + if not sub_table.empty: + sub_tables.append(sub_table) + start_idx = end_idx + 1 + + return sub_tables + + def _recursive_split( + self, df: pd.DataFrame, row_split_threshold: int, column_split_threshold: int + ) -> List[pd.DataFrame]: + """ + Recursively splits a DataFrame. + + Recursively splits a DataFrame first by empty rows, then by empty columns, and repeats the process + until no more splits are possible. Returns a list of DataFrames, each representing a fully separated sub-table. + + :param df: A Pandas DataFrame representing a table (or multiple tables) extracted from a CSV. + :param row_split_threshold: The minimum number of consecutive empty rows required to trigger a split. + :param column_split_threshold: The minimum number of consecutive empty columns to trigger a split. + + **Splitting Process:** + 1. Row Splitting: Detects empty rows and separates tables stacked vertically. + 2. Column Splitting: Detects empty columns and separates side-by-side tables. + 3. Recursive Row Check: After splitting by columns, it checks for new row splits + introduced by the column split. + + Termination Condition: If no further splits are detected, the recursion stops. + """ + + # Step 1: Split by rows + new_sub_tables = self._split_dataframe(df=df, split_threshold=row_split_threshold, axis="row") + + # Step 2: Split by columns + final_tables = [] + for table in new_sub_tables: + final_tables.extend(self._split_dataframe(df=table, split_threshold=column_split_threshold, axis="column")) + + # Step 3: Recursively reapply splitting checked by whether any new empty rows appear after column split + result = [] + for table in final_tables: + # Check if there are consecutive rows >= row_split_threshold now present + if len(self._find_split_indices(df=table, split_threshold=row_split_threshold, axis="row")) > 0: + result.extend( + self._recursive_split( + df=table, row_split_threshold=row_split_threshold, column_split_threshold=column_split_threshold + ) + ) + else: + result.append(table) + + return result diff --git a/releasenotes/notes/csv-document-splitter-426dcc0392c08f62.yaml b/releasenotes/notes/csv-document-splitter-426dcc0392c08f62.yaml new file mode 100644 index 0000000000..9f59c03d12 --- /dev/null +++ b/releasenotes/notes/csv-document-splitter-426dcc0392c08f62.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + Introducing CSVDocumentSplitter: The CSVDocumentSplitter splits CSV documents into structured sub-tables by recursively splitting by empty rows and columns larger than a specified threshold. + This is particularly useful when converting Excel files which can often have multiple tables within one sheet. diff --git a/test/components/converters/test_csv_document_splitter.py b/test/components/converters/test_csv_document_splitter.py new file mode 100644 index 0000000000..77058a5a8d --- /dev/null +++ b/test/components/converters/test_csv_document_splitter.py @@ -0,0 +1,86 @@ +import pytest +from haystack import Document +from haystack.components.preprocessors.csv_document_splitter import CSVDocumentSplitter + + +@pytest.fixture +def splitter() -> CSVDocumentSplitter: + return CSVDocumentSplitter() + + +def test_single_table_no_split(splitter: CSVDocumentSplitter) -> None: + csv_content = """A,B,C +1,2,3 +4,5,6 +""" + doc = Document(content=csv_content) + result = splitter.run([doc])["documents"] + assert len(result) == 1 + assert result[0].content == csv_content + + +def test_row_split(splitter: CSVDocumentSplitter) -> None: + csv_content = """A,B,C +1,2,3 +,, +,, +X,Y,Z +7,8,9 +""" + doc = Document(content=csv_content) + result = splitter.run([doc])["documents"] + assert len(result) == 2 + expected_tables = ["A,B,C\n1,2,3\n", "X,Y,Z\n7,8,9\n"] + for i, table in enumerate(result): + assert table.content == expected_tables[i] + + +def test_column_split(splitter: CSVDocumentSplitter) -> None: + csv_content = """A,B,,,X,Y +1,2,,,7,8 +3,4,,,9,10 +""" + doc = Document(content=csv_content) + result = splitter.run([doc])["documents"] + assert len(result) == 2 + expected_tables = ["A,B\n1,2\n3,4\n", "X,Y\n7,8\n9,10\n"] + for i, table in enumerate(result): + assert table.content == expected_tables[i] + + +def test_recursive_split(splitter: CSVDocumentSplitter) -> None: + csv_content = """A,B,,,X,Y +1,2,,,7,8 +,,,,, +,,,,, +P,Q,,,M,N +3,4,,,9,10 +""" + doc = Document(content=csv_content) + result = splitter.run([doc])["documents"] + assert len(result) == 4 + expected_tables = ["A,B\n1,2\n", "X,Y\n7,8\n", "P,Q\n3,4\n", "M,N\n9,10\n"] + for i, table in enumerate(result): + assert table.content == expected_tables[i] + + +def test_threshold_no_effect() -> None: + splitter = CSVDocumentSplitter(row_split_threshold=3) + csv_content = """A,B,C +1,2,3 +,, +,, +X,Y,Z +7,8,9 +""" + doc = Document(content=csv_content) + result = splitter.run([doc])["documents"] + assert len(result) == 1 + + +def test_empty_input(splitter: CSVDocumentSplitter) -> None: + csv_content = "" + doc = Document(content=csv_content) + result = splitter.run([doc])["documents"] + assert len(result) == 1 + assert result[0].content == csv_content From d8f62c9b83c3fcab6fb09c6944d9f603ca4badfd Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Wed, 5 Feb 2025 12:40:20 +0100 Subject: [PATCH 02/20] Add license header --- haystack/components/preprocessors/csv_document_splitter.py | 3 +++ test/components/converters/test_csv_document_splitter.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/haystack/components/preprocessors/csv_document_splitter.py b/haystack/components/preprocessors/csv_document_splitter.py index 865e5680e4..509c8fead8 100644 --- a/haystack/components/preprocessors/csv_document_splitter.py +++ b/haystack/components/preprocessors/csv_document_splitter.py @@ -1,3 +1,6 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 from io import StringIO from typing import Dict, List, Literal diff --git a/test/components/converters/test_csv_document_splitter.py b/test/components/converters/test_csv_document_splitter.py index 77058a5a8d..7eade2aace 100644 --- a/test/components/converters/test_csv_document_splitter.py +++ b/test/components/converters/test_csv_document_splitter.py @@ -1,3 +1,6 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 import pytest from haystack import Document from haystack.components.preprocessors.csv_document_splitter import CSVDocumentSplitter From 92a14408ceb90326fb8ff06e013e420b37dc66cf Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Wed, 5 Feb 2025 12:42:07 +0100 Subject: [PATCH 03/20] Add newline --- haystack/components/preprocessors/csv_document_splitter.py | 1 + test/components/converters/test_csv_document_splitter.py | 1 + 2 files changed, 2 insertions(+) diff --git a/haystack/components/preprocessors/csv_document_splitter.py b/haystack/components/preprocessors/csv_document_splitter.py index 509c8fead8..2ed6154e14 100644 --- a/haystack/components/preprocessors/csv_document_splitter.py +++ b/haystack/components/preprocessors/csv_document_splitter.py @@ -1,6 +1,7 @@ # SPDX-FileCopyrightText: 2022-present deepset GmbH # # SPDX-License-Identifier: Apache-2.0 + from io import StringIO from typing import Dict, List, Literal diff --git a/test/components/converters/test_csv_document_splitter.py b/test/components/converters/test_csv_document_splitter.py index 7eade2aace..967ae024d4 100644 --- a/test/components/converters/test_csv_document_splitter.py +++ b/test/components/converters/test_csv_document_splitter.py @@ -1,6 +1,7 @@ # SPDX-FileCopyrightText: 2022-present deepset GmbH # # SPDX-License-Identifier: Apache-2.0 + import pytest from haystack import Document from haystack.components.preprocessors.csv_document_splitter import CSVDocumentSplitter From f8bdfdc483b85e7cad7988dd9199a7b93409570a Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Wed, 5 Feb 2025 12:50:01 +0100 Subject: [PATCH 04/20] Add to docs --- docs/pydoc/config/preprocessors_api.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/pydoc/config/preprocessors_api.yml b/docs/pydoc/config/preprocessors_api.yml index d5a0df24c6..ab560d157f 100644 --- a/docs/pydoc/config/preprocessors_api.yml +++ b/docs/pydoc/config/preprocessors_api.yml @@ -1,7 +1,7 @@ loaders: - type: haystack_pydoc_tools.loaders.CustomPythonLoader search_path: [../../../haystack/components/preprocessors] - modules: ["document_cleaner", "document_splitter", "recursive_splitter", "text_cleaner"] + modules: ["csv_document_splitter", "document_cleaner", "document_splitter", "recursive_splitter", "text_cleaner"] ignore_when_discovered: ["__init__"] processors: - type: filter From a5abe52a80c8f6e1a44154cb601ffd96ccd250c6 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Wed, 5 Feb 2025 13:26:03 +0100 Subject: [PATCH 05/20] Add lineterminator --- haystack/components/preprocessors/csv_document_splitter.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/haystack/components/preprocessors/csv_document_splitter.py b/haystack/components/preprocessors/csv_document_splitter.py index 2ed6154e14..920801c41c 100644 --- a/haystack/components/preprocessors/csv_document_splitter.py +++ b/haystack/components/preprocessors/csv_document_splitter.py @@ -73,7 +73,10 @@ def run(self, documents: List[Document]) -> Dict[str, List[Document]]: ) for split_df in split_dfs: cleaned_documents.append( - Document(content=split_df.to_csv(index=False, header=False), meta=document.meta.copy()) + Document( + content=split_df.to_csv(index=False, header=False, lineterminator="\n"), + meta=document.meta.copy(), + ) ) return {"documents": cleaned_documents} From 5d94f7699980add6ee632d8e14849d3c9cd25e65 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Fri, 7 Feb 2025 12:51:17 +0100 Subject: [PATCH 06/20] Updated csv splitter to allow user to specify to split by row, column or both --- .../preprocessors/csv_document_splitter.py | 60 ++++++++++++------- .../test_csv_document_splitter.py | 0 2 files changed, 40 insertions(+), 20 deletions(-) rename test/components/{converters => preprocessors}/test_csv_document_splitter.py (100%) diff --git a/haystack/components/preprocessors/csv_document_splitter.py b/haystack/components/preprocessors/csv_document_splitter.py index 920801c41c..d58e4a6a9b 100644 --- a/haystack/components/preprocessors/csv_document_splitter.py +++ b/haystack/components/preprocessors/csv_document_splitter.py @@ -3,11 +3,13 @@ # SPDX-License-Identifier: Apache-2.0 from io import StringIO -from typing import Dict, List, Literal - -import pandas as pd +from typing import Dict, List, Literal, Optional from haystack import Document, component, logging +from haystack.lazy_imports import LazyImport + +with LazyImport("Run 'pip install pandas'") as pandas_import: + import pandas as pd logger = logging.getLogger(__name__) @@ -18,7 +20,7 @@ class CSVDocumentSplitter: A component for splitting CSV documents """ - def __init__(self, row_split_threshold: int = 2, column_split_threshold: int = 2) -> None: + def __init__(self, row_split_threshold: Optional[int] = 2, column_split_threshold: Optional[int] = 2) -> None: """ Initializes the CSVDocumentSplitter component. @@ -31,11 +33,17 @@ def __init__(self, row_split_threshold: int = 2, column_split_threshold: int = 2 A higher threshold prevents excessive splitting, while a lower threshold may lead to more fragmented sub-tables. """ - if row_split_threshold < 1: - raise ValueError("split_threshold must be greater than 0") + pandas_import.check() + if row_split_threshold is not None and row_split_threshold < 1: + raise ValueError("row_split_threshold must be greater than 0") + + if column_split_threshold is not None and column_split_threshold < 1: + raise ValueError("column_split_threshold must be greater than 0") + + if row_split_threshold is None and column_split_threshold is None: + raise ValueError("At least one of row_split_threshold or column_split_threshold must be specified.") + self.row_split_threshold = row_split_threshold - if column_split_threshold < 1: - raise ValueError("split_threshold must be greater than 0") self.column_split_threshold = column_split_threshold @component.output_types(documents=List[Document]) @@ -59,29 +67,41 @@ def run(self, documents: List[Document]) -> Dict[str, List[Document]]: - If a document cannot be processed, it is returned unchanged. - The `meta` field from the original document is preserved in the split documents. """ - cleaned_documents = [] + split_documents = [] for document in documents: try: df = pd.read_csv(StringIO(document.content), header=None, dtype=object) # type: ignore except Exception as e: logger.error(f"Error processing document {document.id}. Keeping it, but skipping splitting. Error: {e}") - cleaned_documents.append(document) + split_documents.append(document) continue - split_dfs = self._recursive_split( - df=df, row_split_threshold=self.row_split_threshold, column_split_threshold=self.column_split_threshold - ) + if self.row_split_threshold is not None: + # split by rows + split_dfs = self._split_dataframe(df=df, split_threshold=self.row_split_threshold, axis="row") + elif self.column_split_threshold is not None: + # split by columns + split_dfs = self._split_dataframe(df=df, split_threshold=self.column_split_threshold, axis="column") + else: + # recursive split + split_dfs = self._recursive_split( + df=df, + row_split_threshold=self.row_split_threshold, + column_split_threshold=self.column_split_threshold, + ) + for split_df in split_dfs: - cleaned_documents.append( + split_documents.append( Document( content=split_df.to_csv(index=False, header=False, lineterminator="\n"), meta=document.meta.copy(), ) ) - return {"documents": cleaned_documents} + return {"documents": split_documents} - def _find_split_indices(self, df: pd.DataFrame, split_threshold: int, axis: Literal["row", "column"]) -> List[int]: + @staticmethod + def _find_split_indices(df: "pd.DataFrame", split_threshold: int, axis: Literal["row", "column"]) -> List[int]: """ Finds the indices of consecutive empty rows or columns in a DataFrame. @@ -112,8 +132,8 @@ def _find_split_indices(self, df: pd.DataFrame, split_threshold: int, axis: Lite return split_indices def _split_dataframe( - self, df: pd.DataFrame, split_threshold: int, axis: Literal["row", "column"] - ) -> List[pd.DataFrame]: + self, df: "pd.DataFrame", split_threshold: int, axis: Literal["row", "column"] + ) -> List["pd.DataFrame"]: """ Splits a DataFrame into sub-tables based on consecutive empty rows or columns exceeding `split_threshold`. @@ -143,8 +163,8 @@ def _split_dataframe( return sub_tables def _recursive_split( - self, df: pd.DataFrame, row_split_threshold: int, column_split_threshold: int - ) -> List[pd.DataFrame]: + self, df: "pd.DataFrame", row_split_threshold: Optional[int], column_split_threshold: Optional[int] + ) -> List["pd.DataFrame"]: """ Recursively splits a DataFrame. diff --git a/test/components/converters/test_csv_document_splitter.py b/test/components/preprocessors/test_csv_document_splitter.py similarity index 100% rename from test/components/converters/test_csv_document_splitter.py rename to test/components/preprocessors/test_csv_document_splitter.py From cb9b766cd6dc2b3e6811c6134dc56f8d53a50fb4 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Fri, 7 Feb 2025 13:16:54 +0100 Subject: [PATCH 07/20] Adding more tests --- .../preprocessors/csv_document_splitter.py | 5 + .../test_csv_document_splitter.py | 138 +++++++++++------- 2 files changed, 92 insertions(+), 51 deletions(-) diff --git a/haystack/components/preprocessors/csv_document_splitter.py b/haystack/components/preprocessors/csv_document_splitter.py index d58e4a6a9b..0257bf0d3a 100644 --- a/haystack/components/preprocessors/csv_document_splitter.py +++ b/haystack/components/preprocessors/csv_document_splitter.py @@ -67,6 +67,9 @@ def run(self, documents: List[Document]) -> Dict[str, List[Document]]: - If a document cannot be processed, it is returned unchanged. - The `meta` field from the original document is preserved in the split documents. """ + if len(documents) == 0: + return {"documents": documents} + split_documents = [] for document in documents: try: @@ -153,8 +156,10 @@ def _split_dataframe( # Avoid empty splits if end_idx - start_idx > 1: if axis == "row": + # TODO Shouldn't drop all empty rows just the ones in the range sub_table = df.iloc[start_idx:end_idx].dropna(how="all", axis=0) else: + # TODO Shouldn't drop all empty columns just the ones in the range sub_table = df.iloc[:, start_idx:end_idx].dropna(how="all", axis=1) if not sub_table.empty: sub_tables.append(sub_table) diff --git a/test/components/preprocessors/test_csv_document_splitter.py b/test/components/preprocessors/test_csv_document_splitter.py index 967ae024d4..5c2bdbc934 100644 --- a/test/components/preprocessors/test_csv_document_splitter.py +++ b/test/components/preprocessors/test_csv_document_splitter.py @@ -3,6 +3,8 @@ # SPDX-License-Identifier: Apache-2.0 import pytest +import pandas as pd +from io import StringIO from haystack import Document from haystack.components.preprocessors.csv_document_splitter import CSVDocumentSplitter @@ -12,79 +14,113 @@ def splitter() -> CSVDocumentSplitter: return CSVDocumentSplitter() -def test_single_table_no_split(splitter: CSVDocumentSplitter) -> None: - csv_content = """A,B,C +@pytest.fixture +def two_tables_sep_by_two_empty_rows() -> str: + return """A,B,C 1,2,3 -4,5,6 +,, +,, +X,Y,Z +7,8,9 """ - doc = Document(content=csv_content) - result = splitter.run([doc])["documents"] - assert len(result) == 1 - assert result[0].content == csv_content -def test_row_split(splitter: CSVDocumentSplitter) -> None: - csv_content = """A,B,C +class TestFindSplitIndices: + def test_find_split_indices_row_two_tables( + self, splitter: CSVDocumentSplitter, two_tables_sep_by_two_empty_rows: str + ) -> None: + df = pd.read_csv(StringIO(two_tables_sep_by_two_empty_rows), header=None, dtype=object) # type: ignore + result = splitter._find_split_indices(df, split_threshold=2, axis="row") + assert result == [3] + + def test_find_split_indices_row_two_tables_with_empty_row(self, splitter: CSVDocumentSplitter) -> None: + csv_content = """A,B,C +,, +1,2,3 +,, +,, +X,Y,Z +7,8,9 +""" + df = pd.read_csv(StringIO(csv_content), header=None, dtype=object) # type: ignore + result = splitter._find_split_indices(df, split_threshold=2, axis="row") + assert result == [4] + + def test_find_split_indices_row_three_tables(self, splitter: CSVDocumentSplitter) -> None: + csv_content = """A,B,C 1,2,3 ,, ,, X,Y,Z 7,8,9 +,, +,, +P,Q,R +""" + df = pd.read_csv(StringIO(csv_content), header=None, dtype=object) # type: ignore + result = splitter._find_split_indices(df, split_threshold=2, axis="row") + assert result == [3, 7] + + +class TestCSVDocumentSplitter: + def test_single_table_no_split(self, splitter: CSVDocumentSplitter) -> None: + csv_content = """A,B,C +1,2,3 +4,5,6 """ - doc = Document(content=csv_content) - result = splitter.run([doc])["documents"] - assert len(result) == 2 - expected_tables = ["A,B,C\n1,2,3\n", "X,Y,Z\n7,8,9\n"] - for i, table in enumerate(result): - assert table.content == expected_tables[i] + doc = Document(content=csv_content) + result = splitter.run([doc])["documents"] + assert len(result) == 1 + assert result[0].content == csv_content + def test_row_split(self, splitter: CSVDocumentSplitter, two_tables_sep_by_two_empty_rows: str) -> None: + doc = Document(content=two_tables_sep_by_two_empty_rows) + result = splitter.run([doc])["documents"] + assert len(result) == 2 + expected_tables = ["A,B,C\n1,2,3\n", "X,Y,Z\n7,8,9\n"] + for i, table in enumerate(result): + assert table.content == expected_tables[i] -def test_column_split(splitter: CSVDocumentSplitter) -> None: - csv_content = """A,B,,,X,Y + def test_column_split(self, splitter: CSVDocumentSplitter) -> None: + csv_content = """A,B,,,X,Y 1,2,,,7,8 3,4,,,9,10 """ - doc = Document(content=csv_content) - result = splitter.run([doc])["documents"] - assert len(result) == 2 - expected_tables = ["A,B\n1,2\n3,4\n", "X,Y\n7,8\n9,10\n"] - for i, table in enumerate(result): - assert table.content == expected_tables[i] - + doc = Document(content=csv_content) + result = splitter.run([doc])["documents"] + assert len(result) == 2 + expected_tables = ["A,B\n1,2\n3,4\n", "X,Y\n7,8\n9,10\n"] + for i, table in enumerate(result): + assert table.content == expected_tables[i] -def test_recursive_split(splitter: CSVDocumentSplitter) -> None: - csv_content = """A,B,,,X,Y + def test_recursive_split(self, splitter: CSVDocumentSplitter) -> None: + csv_content = """A,B,,,X,Y 1,2,,,7,8 ,,,,, ,,,,, P,Q,,,M,N 3,4,,,9,10 """ - doc = Document(content=csv_content) - result = splitter.run([doc])["documents"] - assert len(result) == 4 - expected_tables = ["A,B\n1,2\n", "X,Y\n7,8\n", "P,Q\n3,4\n", "M,N\n9,10\n"] - for i, table in enumerate(result): - assert table.content == expected_tables[i] + doc = Document(content=csv_content) + result = splitter.run([doc])["documents"] + assert len(result) == 4 + expected_tables = ["A,B\n1,2\n", "X,Y\n7,8\n", "P,Q\n3,4\n", "M,N\n9,10\n"] + for i, table in enumerate(result): + assert table.content == expected_tables[i] + def test_threshold_no_effect(self, two_tables_sep_by_two_empty_rows: str) -> None: + splitter = CSVDocumentSplitter(row_split_threshold=3) + doc = Document(content=two_tables_sep_by_two_empty_rows) + result = splitter.run([doc])["documents"] + assert len(result) == 1 -def test_threshold_no_effect() -> None: - splitter = CSVDocumentSplitter(row_split_threshold=3) - csv_content = """A,B,C -1,2,3 -,, -,, -X,Y,Z -7,8,9 -""" - doc = Document(content=csv_content) - result = splitter.run([doc])["documents"] - assert len(result) == 1 - + def test_empty_input(self, splitter: CSVDocumentSplitter) -> None: + csv_content = "" + doc = Document(content=csv_content) + result = splitter.run([doc])["documents"] + assert len(result) == 1 + assert result[0].content == csv_content -def test_empty_input(splitter: CSVDocumentSplitter) -> None: - csv_content = "" - doc = Document(content=csv_content) - result = splitter.run([doc])["documents"] - assert len(result) == 1 - assert result[0].content == csv_content + def test_empty_documents(self, splitter: CSVDocumentSplitter) -> None: + result = splitter.run([])["documents"] + assert len(result) == 0 From 78f71b5947f3808c515eafe01eb21c293580c475 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Fri, 7 Feb 2025 13:22:38 +0100 Subject: [PATCH 08/20] Column tests --- .../test_csv_document_splitter.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/test/components/preprocessors/test_csv_document_splitter.py b/test/components/preprocessors/test_csv_document_splitter.py index 5c2bdbc934..a357d8d3b3 100644 --- a/test/components/preprocessors/test_csv_document_splitter.py +++ b/test/components/preprocessors/test_csv_document_splitter.py @@ -25,6 +25,14 @@ def two_tables_sep_by_two_empty_rows() -> str: """ +@pytest.fixture +def two_tables_sep_by_two_empty_columns() -> str: + return """A,B,,,X,Y +1,2,,,7,8 +3,4,,,9,10 +""" + + class TestFindSplitIndices: def test_find_split_indices_row_two_tables( self, splitter: CSVDocumentSplitter, two_tables_sep_by_two_empty_rows: str @@ -61,6 +69,31 @@ def test_find_split_indices_row_three_tables(self, splitter: CSVDocumentSplitter result = splitter._find_split_indices(df, split_threshold=2, axis="row") assert result == [3, 7] + def test_find_split_indices_column_two_tables( + self, splitter: CSVDocumentSplitter, two_tables_sep_by_two_empty_columns: str + ) -> None: + df = pd.read_csv(StringIO(two_tables_sep_by_two_empty_columns), header=None, dtype=object) # type: ignore + result = splitter._find_split_indices(df, split_threshold=1, axis="column") + assert result == [3] + + def test_find_split_indices_column_two_tables_with_empty_column(self, splitter: CSVDocumentSplitter) -> None: + csv_content = """A,,B,,,X,Y +1,,2,,,7,8 +3,,4,,,9,10 +""" + df = pd.read_csv(StringIO(csv_content), header=None, dtype=object) # type: ignore + result = splitter._find_split_indices(df, split_threshold=2, axis="column") + assert result == [4] + + def test_find_split_indices_column_three_tables(self, splitter: CSVDocumentSplitter) -> None: + csv_content = """A,B,,,X,Y,,,P,Q +1,2,,,7,8,,,11,12 +3,4,,,9,10,,,13,14 +""" + df = pd.read_csv(StringIO(csv_content), header=None, dtype=object) # type: ignore + result = splitter._find_split_indices(df, split_threshold=2, axis="column") + assert result == [3, 7] + class TestCSVDocumentSplitter: def test_single_table_no_split(self, splitter: CSVDocumentSplitter) -> None: From 990062cb2a553994a21e1a38cee08f1e7c033bf5 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Fri, 7 Feb 2025 13:38:19 +0100 Subject: [PATCH 09/20] Some refactoring to remove incorrect dropna call --- .../preprocessors/csv_document_splitter.py | 36 ++++++++++++------- .../test_csv_document_splitter.py | 12 +++---- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/haystack/components/preprocessors/csv_document_splitter.py b/haystack/components/preprocessors/csv_document_splitter.py index 0257bf0d3a..3e60d26f6a 100644 --- a/haystack/components/preprocessors/csv_document_splitter.py +++ b/haystack/components/preprocessors/csv_document_splitter.py @@ -3,7 +3,7 @@ # SPDX-License-Identifier: Apache-2.0 from io import StringIO -from typing import Dict, List, Literal, Optional +from typing import Dict, List, Literal, Optional, Tuple from haystack import Document, component, logging from haystack.lazy_imports import LazyImport @@ -104,7 +104,9 @@ def run(self, documents: List[Document]) -> Dict[str, List[Document]]: return {"documents": split_documents} @staticmethod - def _find_split_indices(df: "pd.DataFrame", split_threshold: int, axis: Literal["row", "column"]) -> List[int]: + def _find_split_indices( + df: "pd.DataFrame", split_threshold: int, axis: Literal["row", "column"] + ) -> List[Tuple[int, int]]: """ Finds the indices of consecutive empty rows or columns in a DataFrame. @@ -118,19 +120,27 @@ def _find_split_indices(df: "pd.DataFrame", split_threshold: int, axis: Literal[ else: empty_elements = df.columns[df.isnull().all(axis=0)].tolist() + # If no empty elements found, return empty list + if len(empty_elements) == 0: + return [] + # Identify groups of consecutive empty elements split_indices = [] consecutive_count = 1 + start_index = empty_elements[0] if empty_elements else None + for i in range(1, len(empty_elements)): if empty_elements[i] == empty_elements[i - 1] + 1: consecutive_count += 1 else: if consecutive_count >= split_threshold: - split_indices.append(empty_elements[i - 1]) + split_indices.append((start_index, empty_elements[i - 1])) consecutive_count = 1 + start_index = empty_elements[i] + # Handle the last group of consecutive elements if consecutive_count >= split_threshold: - split_indices.append(empty_elements[-1]) + split_indices.append((start_index, empty_elements[-1])) return split_indices @@ -148,22 +158,24 @@ def _split_dataframe( # Find indices of consecutive empty rows or columns split_indices = self._find_split_indices(df=df, split_threshold=split_threshold, axis=axis) + # If no split_indices are found, return the original DataFrame + if len(split_indices) == 0: + return [df] + # Split the DataFrame at identified indices sub_tables = [] - start_idx = 0 + table_start_idx = 0 df_length = df.shape[0] if axis == "row" else df.shape[1] - for end_idx in split_indices + [df_length]: + for empty_start_idx, empty_end_idx in split_indices + [(df_length, df_length)]: # Avoid empty splits - if end_idx - start_idx > 1: + if empty_start_idx - table_start_idx > 1: if axis == "row": - # TODO Shouldn't drop all empty rows just the ones in the range - sub_table = df.iloc[start_idx:end_idx].dropna(how="all", axis=0) + sub_table = df.iloc[table_start_idx:empty_start_idx] else: - # TODO Shouldn't drop all empty columns just the ones in the range - sub_table = df.iloc[:, start_idx:end_idx].dropna(how="all", axis=1) + sub_table = df.iloc[:, table_start_idx:empty_start_idx] if not sub_table.empty: sub_tables.append(sub_table) - start_idx = end_idx + 1 + table_start_idx = empty_end_idx + 1 return sub_tables diff --git a/test/components/preprocessors/test_csv_document_splitter.py b/test/components/preprocessors/test_csv_document_splitter.py index a357d8d3b3..2f5fdbff26 100644 --- a/test/components/preprocessors/test_csv_document_splitter.py +++ b/test/components/preprocessors/test_csv_document_splitter.py @@ -39,7 +39,7 @@ def test_find_split_indices_row_two_tables( ) -> None: df = pd.read_csv(StringIO(two_tables_sep_by_two_empty_rows), header=None, dtype=object) # type: ignore result = splitter._find_split_indices(df, split_threshold=2, axis="row") - assert result == [3] + assert result == [(2, 3)] def test_find_split_indices_row_two_tables_with_empty_row(self, splitter: CSVDocumentSplitter) -> None: csv_content = """A,B,C @@ -52,7 +52,7 @@ def test_find_split_indices_row_two_tables_with_empty_row(self, splitter: CSVDoc """ df = pd.read_csv(StringIO(csv_content), header=None, dtype=object) # type: ignore result = splitter._find_split_indices(df, split_threshold=2, axis="row") - assert result == [4] + assert result == [(3, 4)] def test_find_split_indices_row_three_tables(self, splitter: CSVDocumentSplitter) -> None: csv_content = """A,B,C @@ -67,14 +67,14 @@ def test_find_split_indices_row_three_tables(self, splitter: CSVDocumentSplitter """ df = pd.read_csv(StringIO(csv_content), header=None, dtype=object) # type: ignore result = splitter._find_split_indices(df, split_threshold=2, axis="row") - assert result == [3, 7] + assert result == [(2, 3), (6, 7)] def test_find_split_indices_column_two_tables( self, splitter: CSVDocumentSplitter, two_tables_sep_by_two_empty_columns: str ) -> None: df = pd.read_csv(StringIO(two_tables_sep_by_two_empty_columns), header=None, dtype=object) # type: ignore result = splitter._find_split_indices(df, split_threshold=1, axis="column") - assert result == [3] + assert result == [(2, 3)] def test_find_split_indices_column_two_tables_with_empty_column(self, splitter: CSVDocumentSplitter) -> None: csv_content = """A,,B,,,X,Y @@ -83,7 +83,7 @@ def test_find_split_indices_column_two_tables_with_empty_column(self, splitter: """ df = pd.read_csv(StringIO(csv_content), header=None, dtype=object) # type: ignore result = splitter._find_split_indices(df, split_threshold=2, axis="column") - assert result == [4] + assert result == [(3, 4)] def test_find_split_indices_column_three_tables(self, splitter: CSVDocumentSplitter) -> None: csv_content = """A,B,,,X,Y,,,P,Q @@ -92,7 +92,7 @@ def test_find_split_indices_column_three_tables(self, splitter: CSVDocumentSplit """ df = pd.read_csv(StringIO(csv_content), header=None, dtype=object) # type: ignore result = splitter._find_split_indices(df, split_threshold=2, axis="column") - assert result == [3, 7] + assert result == [(2, 3), (6, 7)] class TestCSVDocumentSplitter: From 4ecaea67e7383f4d36dada71247e45f132557945 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Fri, 7 Feb 2025 13:52:01 +0100 Subject: [PATCH 10/20] Fix --- .../preprocessors/csv_document_splitter.py | 4 +-- .../test_csv_document_splitter.py | 34 ++++++++++--------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/haystack/components/preprocessors/csv_document_splitter.py b/haystack/components/preprocessors/csv_document_splitter.py index 3e60d26f6a..c62f1c2655 100644 --- a/haystack/components/preprocessors/csv_document_splitter.py +++ b/haystack/components/preprocessors/csv_document_splitter.py @@ -79,10 +79,10 @@ def run(self, documents: List[Document]) -> Dict[str, List[Document]]: split_documents.append(document) continue - if self.row_split_threshold is not None: + if self.row_split_threshold is not None and self.column_split_threshold is None: # split by rows split_dfs = self._split_dataframe(df=df, split_threshold=self.row_split_threshold, axis="row") - elif self.column_split_threshold is not None: + elif self.column_split_threshold is not None and self.row_split_threshold is None: # split by columns split_dfs = self._split_dataframe(df=df, split_threshold=self.column_split_threshold, axis="column") else: diff --git a/test/components/preprocessors/test_csv_document_splitter.py b/test/components/preprocessors/test_csv_document_splitter.py index 2f5fdbff26..e29e5f423e 100644 --- a/test/components/preprocessors/test_csv_document_splitter.py +++ b/test/components/preprocessors/test_csv_document_splitter.py @@ -25,6 +25,18 @@ def two_tables_sep_by_two_empty_rows() -> str: """ +@pytest.fixture +def three_tables_sep_by_empty_rows() -> str: + return """A,B,C +,, +1,2,3 +,, +,, +X,Y,Z +7,8,9 +""" + + @pytest.fixture def two_tables_sep_by_two_empty_columns() -> str: return """A,B,,,X,Y @@ -41,16 +53,10 @@ def test_find_split_indices_row_two_tables( result = splitter._find_split_indices(df, split_threshold=2, axis="row") assert result == [(2, 3)] - def test_find_split_indices_row_two_tables_with_empty_row(self, splitter: CSVDocumentSplitter) -> None: - csv_content = """A,B,C -,, -1,2,3 -,, -,, -X,Y,Z -7,8,9 -""" - df = pd.read_csv(StringIO(csv_content), header=None, dtype=object) # type: ignore + def test_find_split_indices_row_two_tables_with_empty_row( + self, splitter: CSVDocumentSplitter, three_tables_sep_by_empty_rows: str + ) -> None: + df = pd.read_csv(StringIO(three_tables_sep_by_empty_rows), header=None, dtype=object) # type: ignore result = splitter._find_split_indices(df, split_threshold=2, axis="row") assert result == [(3, 4)] @@ -114,12 +120,8 @@ def test_row_split(self, splitter: CSVDocumentSplitter, two_tables_sep_by_two_em for i, table in enumerate(result): assert table.content == expected_tables[i] - def test_column_split(self, splitter: CSVDocumentSplitter) -> None: - csv_content = """A,B,,,X,Y -1,2,,,7,8 -3,4,,,9,10 -""" - doc = Document(content=csv_content) + def test_column_split(self, splitter: CSVDocumentSplitter, two_tables_sep_by_two_empty_columns: str) -> None: + doc = Document(content=two_tables_sep_by_two_empty_columns) result = splitter.run([doc])["documents"] assert len(result) == 2 expected_tables = ["A,B\n1,2\n3,4\n", "X,Y\n7,8\n9,10\n"] From 1003904e05ff1f5ccf2849d7cf0b868f10cc116c Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Fri, 7 Feb 2025 13:58:34 +0100 Subject: [PATCH 11/20] More complicated test --- .../preprocessors/test_csv_document_splitter.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/test/components/preprocessors/test_csv_document_splitter.py b/test/components/preprocessors/test_csv_document_splitter.py index e29e5f423e..bf44357e88 100644 --- a/test/components/preprocessors/test_csv_document_splitter.py +++ b/test/components/preprocessors/test_csv_document_splitter.py @@ -128,7 +128,7 @@ def test_column_split(self, splitter: CSVDocumentSplitter, two_tables_sep_by_two for i, table in enumerate(result): assert table.content == expected_tables[i] - def test_recursive_split(self, splitter: CSVDocumentSplitter) -> None: + def test_recursive_split_one_level(self, splitter: CSVDocumentSplitter) -> None: csv_content = """A,B,,,X,Y 1,2,,,7,8 ,,,,, @@ -143,6 +143,21 @@ def test_recursive_split(self, splitter: CSVDocumentSplitter) -> None: for i, table in enumerate(result): assert table.content == expected_tables[i] + def test_recursive_split_two_levels(self, splitter: CSVDocumentSplitter) -> None: + csv_content = """A,B,,,X,Y +1,2,,,7,8 +,,,,M,N +,,,,9,10 +P,Q,,,, +3,4,,,, +""" + doc = Document(content=csv_content) + result = splitter.run([doc])["documents"] + assert len(result) == 3 + expected_tables = ["A,B\n1,2\n", "P,Q\n3,4\n", "X,Y\n7,8\nM,N\n9,10\n"] + for i, table in enumerate(result): + assert table.content == expected_tables[i] + def test_threshold_no_effect(self, two_tables_sep_by_two_empty_rows: str) -> None: splitter = CSVDocumentSplitter(row_split_threshold=3) doc = Document(content=two_tables_sep_by_two_empty_rows) From 626b6ba0f47db9fc5312beabe30379b74b743711 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Fri, 7 Feb 2025 14:29:11 +0100 Subject: [PATCH 12/20] Adding more relevant metadata to match whats provided in our other splitters --- .../preprocessors/csv_document_splitter.py | 19 ++++++++-- .../test_csv_document_splitter.py | 36 +++++++++++++++---- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/haystack/components/preprocessors/csv_document_splitter.py b/haystack/components/preprocessors/csv_document_splitter.py index c62f1c2655..98719a3c58 100644 --- a/haystack/components/preprocessors/csv_document_splitter.py +++ b/haystack/components/preprocessors/csv_document_splitter.py @@ -63,6 +63,12 @@ def run(self, documents: List[Document]) -> Dict[str, List[Document]]: :return: A dictionary with a key `"documents"`, mapping to a list of new `Document` objects, each representing an extracted sub-table from the original CSV. + The metadata of each document includes: + - A field `source_id` to track the original document. + - A field `row_idx_start` to indicate the starting row index of the sub-table in the original table. + - A field `col_idx_start` to indicate the starting column index of the sub-table in the original table. + - A field `split_id` to indicate the order of the split in the original document. + - All other metadata copied from the original document. - If a document cannot be processed, it is returned unchanged. - The `meta` field from the original document is preserved in the split documents. @@ -93,11 +99,20 @@ def run(self, documents: List[Document]) -> Dict[str, List[Document]]: column_split_threshold=self.column_split_threshold, ) - for split_df in split_dfs: + # Sort split_dfs first by row index, then by column index + split_dfs.sort(key=lambda dataframe: (dataframe.index[0], dataframe.columns[0])) + + for split_id, split_df in enumerate(split_dfs): split_documents.append( Document( content=split_df.to_csv(index=False, header=False, lineterminator="\n"), - meta=document.meta.copy(), + meta={ + **document.meta.copy(), + "source_id": document.id, + "row_idx_start": int(split_df.index[0]), + "col_idx_start": int(split_df.columns[0]), + "split_id": split_id, + }, ) ) diff --git a/test/components/preprocessors/test_csv_document_splitter.py b/test/components/preprocessors/test_csv_document_splitter.py index bf44357e88..697178539a 100644 --- a/test/components/preprocessors/test_csv_document_splitter.py +++ b/test/components/preprocessors/test_csv_document_splitter.py @@ -107,26 +107,37 @@ def test_single_table_no_split(self, splitter: CSVDocumentSplitter) -> None: 1,2,3 4,5,6 """ - doc = Document(content=csv_content) + doc = Document(content=csv_content, id="test_id") result = splitter.run([doc])["documents"] assert len(result) == 1 assert result[0].content == csv_content + assert result[0].meta == {"source_id": "test_id", "row_idx_start": 0, "col_idx_start": 0, "split_id": 0} def test_row_split(self, splitter: CSVDocumentSplitter, two_tables_sep_by_two_empty_rows: str) -> None: - doc = Document(content=two_tables_sep_by_two_empty_rows) + doc = Document(content=two_tables_sep_by_two_empty_rows, id="test_id") result = splitter.run([doc])["documents"] assert len(result) == 2 expected_tables = ["A,B,C\n1,2,3\n", "X,Y,Z\n7,8,9\n"] + expected_meta = [ + {"source_id": "test_id", "row_idx_start": 0, "col_idx_start": 0, "split_id": 0}, + {"source_id": "test_id", "row_idx_start": 4, "col_idx_start": 0, "split_id": 1}, + ] for i, table in enumerate(result): assert table.content == expected_tables[i] + assert table.meta == expected_meta[i] def test_column_split(self, splitter: CSVDocumentSplitter, two_tables_sep_by_two_empty_columns: str) -> None: - doc = Document(content=two_tables_sep_by_two_empty_columns) + doc = Document(content=two_tables_sep_by_two_empty_columns, id="test_id") result = splitter.run([doc])["documents"] assert len(result) == 2 expected_tables = ["A,B\n1,2\n3,4\n", "X,Y\n7,8\n9,10\n"] + expected_meta = [ + {"source_id": "test_id", "row_idx_start": 0, "col_idx_start": 0, "split_id": 0}, + {"source_id": "test_id", "row_idx_start": 0, "col_idx_start": 4, "split_id": 1}, + ] for i, table in enumerate(result): assert table.content == expected_tables[i] + assert table.meta == expected_meta[i] def test_recursive_split_one_level(self, splitter: CSVDocumentSplitter) -> None: csv_content = """A,B,,,X,Y @@ -136,12 +147,19 @@ def test_recursive_split_one_level(self, splitter: CSVDocumentSplitter) -> None: P,Q,,,M,N 3,4,,,9,10 """ - doc = Document(content=csv_content) + doc = Document(content=csv_content, id="test_id") result = splitter.run([doc])["documents"] assert len(result) == 4 expected_tables = ["A,B\n1,2\n", "X,Y\n7,8\n", "P,Q\n3,4\n", "M,N\n9,10\n"] + expected_meta = [ + {"source_id": "test_id", "row_idx_start": 0, "col_idx_start": 0, "split_id": 0}, + {"source_id": "test_id", "row_idx_start": 0, "col_idx_start": 4, "split_id": 1}, + {"source_id": "test_id", "row_idx_start": 4, "col_idx_start": 0, "split_id": 2}, + {"source_id": "test_id", "row_idx_start": 4, "col_idx_start": 4, "split_id": 3}, + ] for i, table in enumerate(result): assert table.content == expected_tables[i] + assert table.meta == expected_meta[i] def test_recursive_split_two_levels(self, splitter: CSVDocumentSplitter) -> None: csv_content = """A,B,,,X,Y @@ -151,12 +169,18 @@ def test_recursive_split_two_levels(self, splitter: CSVDocumentSplitter) -> None P,Q,,,, 3,4,,,, """ - doc = Document(content=csv_content) + doc = Document(content=csv_content, id="test_id") result = splitter.run([doc])["documents"] assert len(result) == 3 - expected_tables = ["A,B\n1,2\n", "P,Q\n3,4\n", "X,Y\n7,8\nM,N\n9,10\n"] + expected_tables = ["A,B\n1,2\n", "X,Y\n7,8\nM,N\n9,10\n", "P,Q\n3,4\n"] + expected_meta = [ + {"source_id": "test_id", "row_idx_start": 0, "col_idx_start": 0, "split_id": 0}, + {"source_id": "test_id", "row_idx_start": 0, "col_idx_start": 4, "split_id": 1}, + {"source_id": "test_id", "row_idx_start": 4, "col_idx_start": 0, "split_id": 2}, + ] for i, table in enumerate(result): assert table.content == expected_tables[i] + assert table.meta == expected_meta[i] def test_threshold_no_effect(self, two_tables_sep_by_two_empty_rows: str) -> None: splitter = CSVDocumentSplitter(row_split_threshold=3) From 1c5368a896df27345bd3869eb8f5ab90fe321409 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Fri, 7 Feb 2025 14:37:51 +0100 Subject: [PATCH 13/20] value error tests --- .../preprocessors/test_csv_document_splitter.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/test/components/preprocessors/test_csv_document_splitter.py b/test/components/preprocessors/test_csv_document_splitter.py index 697178539a..37b589d87e 100644 --- a/test/components/preprocessors/test_csv_document_splitter.py +++ b/test/components/preprocessors/test_csv_document_splitter.py @@ -101,6 +101,22 @@ def test_find_split_indices_column_three_tables(self, splitter: CSVDocumentSplit assert result == [(2, 3), (6, 7)] +class TestInit: + def test_row_split_threshold_raises_error(self) -> None: + with pytest.raises(ValueError, match="row_split_threshold must be greater than 0"): + CSVDocumentSplitter(row_split_threshold=-1) + + def test_column_split_threshold_raises_error(self) -> None: + with pytest.raises(ValueError, match="column_split_threshold must be greater than 0"): + CSVDocumentSplitter(column_split_threshold=-1) + + def test_row_split_threshold_and_row_column_threshold_none(self) -> None: + with pytest.raises( + ValueError, match="At least one of row_split_threshold or column_split_threshold must be specified." + ): + CSVDocumentSplitter(row_split_threshold=None, column_split_threshold=None) + + class TestCSVDocumentSplitter: def test_single_table_no_split(self, splitter: CSVDocumentSplitter) -> None: csv_content = """A,B,C From 18d6e4062d907b5eb93797f538175149c40ffc5c Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Fri, 7 Feb 2025 14:48:17 +0100 Subject: [PATCH 14/20] Fix mypy --- .../components/preprocessors/csv_document_splitter.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/haystack/components/preprocessors/csv_document_splitter.py b/haystack/components/preprocessors/csv_document_splitter.py index 98719a3c58..90d4fd9519 100644 --- a/haystack/components/preprocessors/csv_document_splitter.py +++ b/haystack/components/preprocessors/csv_document_splitter.py @@ -95,8 +95,8 @@ def run(self, documents: List[Document]) -> Dict[str, List[Document]]: # recursive split split_dfs = self._recursive_split( df=df, - row_split_threshold=self.row_split_threshold, - column_split_threshold=self.column_split_threshold, + row_split_threshold=self.row_split_threshold, # type: ignore + column_split_threshold=self.column_split_threshold, # type: ignore ) # Sort split_dfs first by row index, then by column index @@ -142,7 +142,7 @@ def _find_split_indices( # Identify groups of consecutive empty elements split_indices = [] consecutive_count = 1 - start_index = empty_elements[0] if empty_elements else None + start_index = empty_elements[0] for i in range(1, len(empty_elements)): if empty_elements[i] == empty_elements[i - 1] + 1: @@ -195,7 +195,7 @@ def _split_dataframe( return sub_tables def _recursive_split( - self, df: "pd.DataFrame", row_split_threshold: Optional[int], column_split_threshold: Optional[int] + self, df: "pd.DataFrame", row_split_threshold: int, column_split_threshold: int ) -> List["pd.DataFrame"]: """ Recursively splits a DataFrame. From 34a7dc4f0207203d311a7aaa02d4bfefc7e63af7 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Fri, 7 Feb 2025 15:05:30 +0100 Subject: [PATCH 15/20] Docstring updates --- .../preprocessors/csv_document_splitter.py | 26 +++++++------------ 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/haystack/components/preprocessors/csv_document_splitter.py b/haystack/components/preprocessors/csv_document_splitter.py index 90d4fd9519..fd687dbabb 100644 --- a/haystack/components/preprocessors/csv_document_splitter.py +++ b/haystack/components/preprocessors/csv_document_splitter.py @@ -17,7 +17,10 @@ @component class CSVDocumentSplitter: """ - A component for splitting CSV documents + A component for splitting CSV documents into sub-tables based on empty rows and columns. + + The splitter identifies consecutive empty rows or columns that exceed a given threshold + and uses them as delimiters to segment the document into smaller tables. """ def __init__(self, row_split_threshold: Optional[int] = 2, column_split_threshold: Optional[int] = 2) -> None: @@ -26,12 +29,8 @@ def __init__(self, row_split_threshold: Optional[int] = 2, column_split_threshol :param row_split_threshold: The minimum number of consecutive empty rows required to trigger a split. - A higher threshold prevents excessive splitting, while a lower threshold may lead - to more fragmented sub-tables. :param column_split_threshold: The minimum number of consecutive empty columns required to trigger a split. - A higher threshold prevents excessive splitting, while a lower threshold may lead - to more fragmented sub-tables. """ pandas_import.check() if row_split_threshold is not None and row_split_threshold < 1: @@ -52,10 +51,11 @@ def run(self, documents: List[Document]) -> Dict[str, List[Document]]: Processes and splits a list of CSV documents into multiple sub-tables. **Splitting Process:** - 1. Row Splitting: Detects empty rows and separates tables stacked vertically. - 2. Column Splitting: Detects empty columns and separates side-by-side tables. - 3. Recursive Row Check: After splitting by columns, it checks for new row splits - introduced by the column split. + 1. Applies a row-based split if `row_split_threshold` is provided. + 2. Applies a column-based split if `column_split_threshold` is provided. + 3. If both thresholds are specified, performs a recursive split by rows first, then columns, ensuring + further fragmentation of any sub-tables that still contain empty sections. + 4. Sorts the resulting sub-tables based on their original positions within the document. :param documents: A list of Documents containing CSV-formatted content. Each document is assumed to contain one or more tables separated by empty rows or columns. @@ -206,14 +206,6 @@ def _recursive_split( :param df: A Pandas DataFrame representing a table (or multiple tables) extracted from a CSV. :param row_split_threshold: The minimum number of consecutive empty rows required to trigger a split. :param column_split_threshold: The minimum number of consecutive empty columns to trigger a split. - - **Splitting Process:** - 1. Row Splitting: Detects empty rows and separates tables stacked vertically. - 2. Column Splitting: Detects empty columns and separates side-by-side tables. - 3. Recursive Row Check: After splitting by columns, it checks for new row splits - introduced by the column split. - - Termination Condition: If no further splits are detected, the recursion stops. """ # Step 1: Split by rows From e26389877e9c53c60a37bf75173e4e1c377bf9f9 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Mon, 10 Feb 2025 10:12:09 +0100 Subject: [PATCH 16/20] Add skip_blank_lines=False --- haystack/components/preprocessors/csv_document_splitter.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/haystack/components/preprocessors/csv_document_splitter.py b/haystack/components/preprocessors/csv_document_splitter.py index fd687dbabb..55238b952c 100644 --- a/haystack/components/preprocessors/csv_document_splitter.py +++ b/haystack/components/preprocessors/csv_document_splitter.py @@ -79,7 +79,9 @@ def run(self, documents: List[Document]) -> Dict[str, List[Document]]: split_documents = [] for document in documents: try: - df = pd.read_csv(StringIO(document.content), header=None, dtype=object) # type: ignore + df = pd.read_csv( # type: ignore + StringIO(document.content), header=None, skip_blank_lines=False, dtype=object + ) except Exception as e: logger.error(f"Error processing document {document.id}. Keeping it, but skipping splitting. Error: {e}") split_documents.append(document) From 3d5df705891bc58b7c98b424700818f297c32a4d Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Mon, 10 Feb 2025 10:28:05 +0100 Subject: [PATCH 17/20] Add to dict test --- .../preprocessors/csv_document_splitter.py | 32 +++++++++++----- .../test_csv_document_splitter.py | 38 +++++++++++++++++++ 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/haystack/components/preprocessors/csv_document_splitter.py b/haystack/components/preprocessors/csv_document_splitter.py index 55238b952c..89b9efebb5 100644 --- a/haystack/components/preprocessors/csv_document_splitter.py +++ b/haystack/components/preprocessors/csv_document_splitter.py @@ -3,7 +3,7 @@ # SPDX-License-Identifier: Apache-2.0 from io import StringIO -from typing import Dict, List, Literal, Optional, Tuple +from typing import Any, Dict, List, Literal, Optional, Tuple from haystack import Document, component, logging from haystack.lazy_imports import LazyImport @@ -23,14 +23,23 @@ class CSVDocumentSplitter: and uses them as delimiters to segment the document into smaller tables. """ - def __init__(self, row_split_threshold: Optional[int] = 2, column_split_threshold: Optional[int] = 2) -> None: + def __init__( + self, + row_split_threshold: Optional[int] = 2, + column_split_threshold: Optional[int] = 2, + read_csv_kwargs: Optional[Dict[str, Any]] = None, + ) -> None: """ Initializes the CSVDocumentSplitter component. - :param row_split_threshold: - The minimum number of consecutive empty rows required to trigger a split. - :param column_split_threshold: - The minimum number of consecutive empty columns required to trigger a split. + :param row_split_threshold: The minimum number of consecutive empty rows required to trigger a split. + :param column_split_threshold: The minimum number of consecutive empty columns required to trigger a split. + :param read_csv_kwargs: Additional keyword arguments to pass to `pandas.read_csv`. + By default, the component with options: + - `header=None` + - `skip_blank_lines=False` to preserve blank lines + - `dtype=object` to prevent type inference (e.g., converting numbers to floats). + See https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html for more information. """ pandas_import.check() if row_split_threshold is not None and row_split_threshold < 1: @@ -44,6 +53,7 @@ def __init__(self, row_split_threshold: Optional[int] = 2, column_split_threshol self.row_split_threshold = row_split_threshold self.column_split_threshold = column_split_threshold + self.read_csv_kwargs = read_csv_kwargs or {} @component.output_types(documents=List[Document]) def run(self, documents: List[Document]) -> Dict[str, List[Document]]: @@ -79,9 +89,13 @@ def run(self, documents: List[Document]) -> Dict[str, List[Document]]: split_documents = [] for document in documents: try: - df = pd.read_csv( # type: ignore - StringIO(document.content), header=None, skip_blank_lines=False, dtype=object - ) + resolved_read_csv_kwargs = { + "header": None, + "skip_blank_lines": False, + "dtype": object, + **self.read_csv_kwargs, + } + df = pd.read_csv(StringIO(document.content), **resolved_read_csv_kwargs) # type: ignore except Exception as e: logger.error(f"Error processing document {document.id}. Keeping it, but skipping splitting. Error: {e}") split_documents.append(document) diff --git a/test/components/preprocessors/test_csv_document_splitter.py b/test/components/preprocessors/test_csv_document_splitter.py index 37b589d87e..de41372d49 100644 --- a/test/components/preprocessors/test_csv_document_splitter.py +++ b/test/components/preprocessors/test_csv_document_splitter.py @@ -6,6 +6,7 @@ import pandas as pd from io import StringIO from haystack import Document +from haystack.core.serialization import component_from_dict, component_to_dict from haystack.components.preprocessors.csv_document_splitter import CSVDocumentSplitter @@ -198,6 +199,34 @@ def test_recursive_split_two_levels(self, splitter: CSVDocumentSplitter) -> None assert table.content == expected_tables[i] assert table.meta == expected_meta[i] + def test_csv_with_blank_lines(self, splitter: CSVDocumentSplitter) -> None: + csv_data = """ID,LeftVal,,,RightVal,Extra +1,Hello,,,World,Joined +2,StillLeft,,,StillRight,Bridge + +A,B,,,C,D +E,F,,,G,H +""" + splitter = CSVDocumentSplitter(row_split_threshold=1, column_split_threshold=1) + result = splitter.run([Document(content=csv_data, id="test_id")]) + docs = result["documents"] + assert len(docs) == 4 + expected_tables = [ + "ID,LeftVal\n1,Hello\n2,StillLeft\n", + "RightVal,Extra\nWorld,Joined\nStillRight,Bridge\n", + "A,B\nE,F\n", + "C,D\nG,H\n", + ] + expected_meta = [ + {"source_id": "test_id", "row_idx_start": 0, "col_idx_start": 0, "split_id": 0}, + {"source_id": "test_id", "row_idx_start": 0, "col_idx_start": 4, "split_id": 1}, + {"source_id": "test_id", "row_idx_start": 4, "col_idx_start": 0, "split_id": 2}, + {"source_id": "test_id", "row_idx_start": 4, "col_idx_start": 4, "split_id": 3}, + ] + for i, table in enumerate(docs): + assert table.content == expected_tables[i] + assert table.meta == expected_meta[i] + def test_threshold_no_effect(self, two_tables_sep_by_two_empty_rows: str) -> None: splitter = CSVDocumentSplitter(row_split_threshold=3) doc = Document(content=two_tables_sep_by_two_empty_rows) @@ -214,3 +243,12 @@ def test_empty_input(self, splitter: CSVDocumentSplitter) -> None: def test_empty_documents(self, splitter: CSVDocumentSplitter) -> None: result = splitter.run([])["documents"] assert len(result) == 0 + + def test_default_to_dict(self) -> None: + splitter = CSVDocumentSplitter() + config_serialized = component_to_dict(splitter, name="CSVDocumentSplitter") + config = { + "type": "haystack.components.preprocessors.csv_document_splitter.CSVDocumentSplitter", + "init_parameters": {"row_split_threshold": 2, "column_split_threshold": 2, "read_csv_kwargs": {}}, + } + assert config_serialized == config From 8b5ad103d528264fae3bbd7490e69e60dad262db Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Mon, 10 Feb 2025 11:26:01 +0100 Subject: [PATCH 18/20] More from and to dict tests --- haystack/core/serialization.py | 2 +- .../test_csv_document_splitter.py | 47 ++++++++++++++++++- 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/haystack/core/serialization.py b/haystack/core/serialization.py index 3477f0c806..32b9ce492b 100644 --- a/haystack/core/serialization.py +++ b/haystack/core/serialization.py @@ -82,7 +82,7 @@ def component_to_dict(obj: Any, name: str) -> Dict[str, Any]: def _validate_component_to_dict_output(component: Any, name: str, data: Dict[str, Any]) -> None: # Ensure that only basic Python types are used in the serde data. def is_allowed_type(obj: Any) -> bool: - return isinstance(obj, (str, int, float, bool, list, dict, set, tuple, type(None))) + return isinstance(obj, (str, int, float, bool, list, dict, set, tuple, type(None), object)) def check_iterable(l: Iterable[Any]): for v in l: diff --git a/test/components/preprocessors/test_csv_document_splitter.py b/test/components/preprocessors/test_csv_document_splitter.py index de41372d49..a5d2a19636 100644 --- a/test/components/preprocessors/test_csv_document_splitter.py +++ b/test/components/preprocessors/test_csv_document_splitter.py @@ -244,7 +244,7 @@ def test_empty_documents(self, splitter: CSVDocumentSplitter) -> None: result = splitter.run([])["documents"] assert len(result) == 0 - def test_default_to_dict(self) -> None: + def test_to_dict_with_defaults(self) -> None: splitter = CSVDocumentSplitter() config_serialized = component_to_dict(splitter, name="CSVDocumentSplitter") config = { @@ -252,3 +252,48 @@ def test_default_to_dict(self) -> None: "init_parameters": {"row_split_threshold": 2, "column_split_threshold": 2, "read_csv_kwargs": {}}, } assert config_serialized == config + + def test_to_dict_non_defaults(self) -> None: + splitter = CSVDocumentSplitter( + row_split_threshold=1, column_split_threshold=None, read_csv_kwargs={"sep": ";", "dtype": object} + ) + config_serialized = component_to_dict(splitter, name="CSVDocumentSplitter") + config = { + "type": "haystack.components.preprocessors.csv_document_splitter.CSVDocumentSplitter", + "init_parameters": { + "row_split_threshold": 1, + "column_split_threshold": None, + "read_csv_kwargs": {"sep": ";", "dtype": object}, + }, + } + assert config_serialized == config + + def test_from_dict_defaults(self) -> None: + splitter = component_from_dict( + CSVDocumentSplitter, + data={ + "type": "haystack.components.preprocessors.csv_document_splitter.CSVDocumentSplitter", + "init_parameters": {}, + }, + name="CSVDocumentSplitter", + ) + assert splitter.row_split_threshold == 2 + assert splitter.column_split_threshold == 2 + assert splitter.read_csv_kwargs == {} + + def test_from_dict_non_defaults(self) -> None: + splitter = component_from_dict( + CSVDocumentSplitter, + data={ + "type": "haystack.components.preprocessors.csv_document_splitter.CSVDocumentSplitter", + "init_parameters": { + "row_split_threshold": 1, + "column_split_threshold": None, + "read_csv_kwargs": {"sep": ";", "dtype": object}, + }, + }, + name="CSVDocumentSplitter", + ) + assert splitter.row_split_threshold == 1 + assert splitter.column_split_threshold is None + assert splitter.read_csv_kwargs == {"sep": ";", "dtype": object} From 4d4ea34021654569db00f4182f8c515e3a08f119 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Mon, 10 Feb 2025 12:13:16 +0100 Subject: [PATCH 19/20] Fixes --- haystack/core/serialization.py | 2 +- .../preprocessors/test_csv_document_splitter.py | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/haystack/core/serialization.py b/haystack/core/serialization.py index 32b9ce492b..3477f0c806 100644 --- a/haystack/core/serialization.py +++ b/haystack/core/serialization.py @@ -82,7 +82,7 @@ def component_to_dict(obj: Any, name: str) -> Dict[str, Any]: def _validate_component_to_dict_output(component: Any, name: str, data: Dict[str, Any]) -> None: # Ensure that only basic Python types are used in the serde data. def is_allowed_type(obj: Any) -> bool: - return isinstance(obj, (str, int, float, bool, list, dict, set, tuple, type(None), object)) + return isinstance(obj, (str, int, float, bool, list, dict, set, tuple, type(None))) def check_iterable(l: Iterable[Any]): for v in l: diff --git a/test/components/preprocessors/test_csv_document_splitter.py b/test/components/preprocessors/test_csv_document_splitter.py index a5d2a19636..e94efd349a 100644 --- a/test/components/preprocessors/test_csv_document_splitter.py +++ b/test/components/preprocessors/test_csv_document_splitter.py @@ -5,7 +5,7 @@ import pytest import pandas as pd from io import StringIO -from haystack import Document +from haystack import Document, Pipeline from haystack.core.serialization import component_from_dict, component_to_dict from haystack.components.preprocessors.csv_document_splitter import CSVDocumentSplitter @@ -254,16 +254,14 @@ def test_to_dict_with_defaults(self) -> None: assert config_serialized == config def test_to_dict_non_defaults(self) -> None: - splitter = CSVDocumentSplitter( - row_split_threshold=1, column_split_threshold=None, read_csv_kwargs={"sep": ";", "dtype": object} - ) + splitter = CSVDocumentSplitter(row_split_threshold=1, column_split_threshold=None, read_csv_kwargs={"sep": ";"}) config_serialized = component_to_dict(splitter, name="CSVDocumentSplitter") config = { "type": "haystack.components.preprocessors.csv_document_splitter.CSVDocumentSplitter", "init_parameters": { "row_split_threshold": 1, "column_split_threshold": None, - "read_csv_kwargs": {"sep": ";", "dtype": object}, + "read_csv_kwargs": {"sep": ";"}, }, } assert config_serialized == config @@ -289,11 +287,11 @@ def test_from_dict_non_defaults(self) -> None: "init_parameters": { "row_split_threshold": 1, "column_split_threshold": None, - "read_csv_kwargs": {"sep": ";", "dtype": object}, + "read_csv_kwargs": {"sep": ";"}, }, }, name="CSVDocumentSplitter", ) assert splitter.row_split_threshold == 1 assert splitter.column_split_threshold is None - assert splitter.read_csv_kwargs == {"sep": ";", "dtype": object} + assert splitter.read_csv_kwargs == {"sep": ";"} From dc12957818983787e2ce6946f4de4e416c6e6725 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Mon, 10 Feb 2025 17:53:42 +0100 Subject: [PATCH 20/20] Move dict creation outside of for loop --- .../components/preprocessors/csv_document_splitter.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/haystack/components/preprocessors/csv_document_splitter.py b/haystack/components/preprocessors/csv_document_splitter.py index 89b9efebb5..4809bf8381 100644 --- a/haystack/components/preprocessors/csv_document_splitter.py +++ b/haystack/components/preprocessors/csv_document_splitter.py @@ -86,15 +86,11 @@ def run(self, documents: List[Document]) -> Dict[str, List[Document]]: if len(documents) == 0: return {"documents": documents} + resolved_read_csv_kwargs = {"header": None, "skip_blank_lines": False, "dtype": object, **self.read_csv_kwargs} + split_documents = [] for document in documents: try: - resolved_read_csv_kwargs = { - "header": None, - "skip_blank_lines": False, - "dtype": object, - **self.read_csv_kwargs, - } df = pd.read_csv(StringIO(document.content), **resolved_read_csv_kwargs) # type: ignore except Exception as e: logger.error(f"Error processing document {document.id}. Keeping it, but skipping splitting. Error: {e}")