Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

from data_designer.engine.testing.seed_readers import LineFanoutDirectorySeedReader
from data_designer.engine.testing.stubs import (
StubChoice,
StubHuggingFaceSeedReader,
Expand All @@ -15,6 +16,7 @@
from data_designer.engine.testing.utils import assert_valid_plugin

__all__ = [
LineFanoutDirectorySeedReader.__name__,
"StubChoice",
"StubHuggingFaceSeedReader",
"StubMCPFacade",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

from pathlib import Path
from typing import Any

import data_designer.lazy_heavy_imports as lazy
from data_designer.config.seed_source import DirectorySeedSource
from data_designer.engine.resources.seed_reader import FileSystemSeedReader, SeedReaderFileSystemContext


class LineFanoutDirectorySeedReader(FileSystemSeedReader[DirectorySeedSource]):
def __init__(self, *, include_file_name: bool = False) -> None:
self.include_file_name = include_file_name
self.hydrated_relative_paths: list[str] = []
self.output_columns = ["relative_path", "line_index", "line"]
if include_file_name:
self.output_columns.insert(1, "file_name")

def build_manifest(self, *, context: SeedReaderFileSystemContext) -> lazy.pd.DataFrame | list[dict[str, str]]:
matched_paths = self.get_matching_relative_paths(
context=context,
file_pattern=self.source.file_pattern,
recursive=self.source.recursive,
)
return [
{
"relative_path": relative_path,
**({"file_name": Path(relative_path).name} if self.include_file_name else {}),
}
for relative_path in matched_paths
]

def hydrate_row(
self,
*,
manifest_row: dict[str, Any],
context: SeedReaderFileSystemContext,
) -> list[dict[str, Any]]:
relative_path = str(manifest_row["relative_path"])
self.hydrated_relative_paths.append(relative_path)
with context.fs.open(relative_path, "r", encoding="utf-8") as handle:
lines = handle.read().splitlines()
return [
{
"relative_path": relative_path,
**({"file_name": str(manifest_row["file_name"])} if self.include_file_name else {}),
"line_index": line_index,
"line": line,
}
for line_index, line in enumerate(lines)
]
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

from collections.abc import Callable
from pathlib import Path
from typing import Any

Expand All @@ -23,6 +24,7 @@
SeedReaderRegistry,
)
from data_designer.engine.secret_resolver import PlaintextResolver
from data_designer.engine.testing.seed_readers import LineFanoutDirectorySeedReader


class TrackingFileContentsSeedReader(FileContentsSeedReader):
Expand Down Expand Up @@ -134,50 +136,10 @@ def hydrate_row(
}


class FanoutDirectorySeedReader(FileSystemSeedReader[DirectorySeedSource]):
output_columns = ["relative_path", "file_name", "line_index", "line"]

def __init__(self) -> None:
self.hydrated_relative_paths: list[str] = []

def build_manifest(self, *, context: SeedReaderFileSystemContext) -> lazy.pd.DataFrame | list[dict[str, str]]:
matched_paths = self.get_matching_relative_paths(
context=context,
file_pattern=self.source.file_pattern,
recursive=self.source.recursive,
)
return [
{
"relative_path": relative_path,
"file_name": Path(relative_path).name,
}
for relative_path in matched_paths
]

def hydrate_row(
self,
*,
manifest_row: dict[str, str],
context: SeedReaderFileSystemContext,
) -> list[dict[str, Any]]:
relative_path = manifest_row["relative_path"]
self.hydrated_relative_paths.append(relative_path)
with context.fs.open(relative_path, "r", encoding="utf-8") as handle:
lines = handle.read().splitlines()
return [
{
"relative_path": relative_path,
"file_name": manifest_row["file_name"],
"line_index": line_index,
"line": line,
}
for line_index, line in enumerate(lines)
]


class InvalidHydrationReturnSeedReader(FileSystemSeedReader[DirectorySeedSource]):
def __init__(self, hydrated_return: Any) -> None:
class ConfigurableHydrationDirectorySeedReader(FileSystemSeedReader[DirectorySeedSource]):
def __init__(self, *, hydrated_return: Any, output_columns: list[str] | None = None) -> None:
self._hydrated_return = hydrated_return
self.output_columns = output_columns

def build_manifest(self, *, context: SeedReaderFileSystemContext) -> lazy.pd.DataFrame | list[dict[str, str]]:
matched_paths = self.get_matching_relative_paths(
Expand All @@ -190,36 +152,13 @@ def build_manifest(self, *, context: SeedReaderFileSystemContext) -> lazy.pd.Dat
def hydrate_row(
self,
*,
manifest_row: dict[str, str],
manifest_row: dict[str, Any],
context: SeedReaderFileSystemContext,
) -> Any:
del manifest_row, context
return self._hydrated_return


class SchemaMismatchFanoutSeedReader(FileSystemSeedReader[DirectorySeedSource]):
def __init__(self, *, output_columns: list[str], hydrated_rows: list[dict[str, str]]) -> None:
self.output_columns = output_columns
self._hydrated_rows = hydrated_rows

def build_manifest(self, *, context: SeedReaderFileSystemContext) -> lazy.pd.DataFrame | list[dict[str, str]]:
matched_paths = self.get_matching_relative_paths(
context=context,
file_pattern=self.source.file_pattern,
recursive=self.source.recursive,
)
return [{"relative_path": relative_path} for relative_path in matched_paths]

def hydrate_row(
self,
*,
manifest_row: dict[str, str],
context: SeedReaderFileSystemContext,
) -> list[dict[str, str]]:
del manifest_row, context
return self._hydrated_rows


class ContextCountingDirectorySeedReader(FileSystemSeedReader[DirectorySeedSource]):
def __init__(self) -> None:
self.filesystem_context_calls = 0
Expand All @@ -237,6 +176,16 @@ def build_manifest(self, *, context: SeedReaderFileSystemContext) -> lazy.pd.Dat
return [{"relative_path": relative_path} for relative_path in matched_paths]


@pytest.fixture
def write_alpha_beta_text_files(tmp_path: Path) -> Callable[[str, str], Path]:
def _write_alpha_beta_text_files(alpha_contents: str, beta_contents: str) -> Path:
(tmp_path / "alpha.txt").write_text(alpha_contents, encoding="utf-8")
(tmp_path / "beta.txt").write_text(beta_contents, encoding="utf-8")
return tmp_path

return _write_alpha_beta_text_files


def test_one_reader_per_seed_type():
local_1 = LocalFileSeedReader()
local_2 = LocalFileSeedReader()
Expand Down Expand Up @@ -308,7 +257,7 @@ def test_plugin_style_filesystem_seed_reader_can_fan_out_rows(tmp_path: Path) ->
(tmp_path / "alpha.txt").write_text("alpha-0\nalpha-1", encoding="utf-8")
(tmp_path / "beta.txt").write_text("beta-0", encoding="utf-8")

reader = FanoutDirectorySeedReader()
reader = LineFanoutDirectorySeedReader(include_file_name=True)
reader.attach(
DirectorySeedSource(path=str(tmp_path), file_pattern="*.txt"),
PlaintextResolver(),
Expand Down Expand Up @@ -444,13 +393,14 @@ def test_file_contents_seed_reader_hydrates_only_selected_manifest_rows(tmp_path
assert reader.hydrated_relative_paths == ["beta.txt"]


def test_filesystem_seed_reader_fanout_keeps_manifest_based_index_selection(tmp_path: Path) -> None:
(tmp_path / "alpha.txt").write_text("alpha-0\nalpha-1", encoding="utf-8")
(tmp_path / "beta.txt").write_text("beta-0\nbeta-1", encoding="utf-8")
def test_filesystem_seed_reader_fanout_keeps_manifest_based_index_selection(
write_alpha_beta_text_files: Callable[[str, str], Path],
) -> None:
seed_dir = write_alpha_beta_text_files("alpha-0\nalpha-1", "beta-0\nbeta-1")

reader = FanoutDirectorySeedReader()
reader = LineFanoutDirectorySeedReader(include_file_name=True)
reader.attach(
DirectorySeedSource(path=str(tmp_path), file_pattern="*.txt"),
DirectorySeedSource(path=str(seed_dir), file_pattern="*.txt"),
PlaintextResolver(),
)

Expand All @@ -467,14 +417,13 @@ def test_filesystem_seed_reader_fanout_keeps_manifest_based_index_selection(tmp_


def test_filesystem_seed_reader_batch_reader_raises_for_selected_manifest_rows_with_empty_fanout(
tmp_path: Path,
write_alpha_beta_text_files: Callable[[str, str], Path],
) -> None:
(tmp_path / "alpha.txt").write_text("alpha-0", encoding="utf-8")
(tmp_path / "beta.txt").write_text("", encoding="utf-8")
seed_dir = write_alpha_beta_text_files("alpha-0", "")

reader = FanoutDirectorySeedReader()
reader = LineFanoutDirectorySeedReader(include_file_name=True)
reader.attach(
DirectorySeedSource(path=str(tmp_path), file_pattern="*.txt"),
DirectorySeedSource(path=str(seed_dir), file_pattern="*.txt"),
PlaintextResolver(),
)

Expand All @@ -494,14 +443,13 @@ def test_filesystem_seed_reader_batch_reader_raises_for_selected_manifest_rows_w


def test_filesystem_seed_reader_batch_reader_skips_empty_fanout_rows_before_returning_records(
tmp_path: Path,
write_alpha_beta_text_files: Callable[[str, str], Path],
) -> None:
(tmp_path / "alpha.txt").write_text("", encoding="utf-8")
(tmp_path / "beta.txt").write_text("beta-0\nbeta-1", encoding="utf-8")
seed_dir = write_alpha_beta_text_files("", "beta-0\nbeta-1")

reader = FanoutDirectorySeedReader()
reader = LineFanoutDirectorySeedReader(include_file_name=True)
reader.attach(
DirectorySeedSource(path=str(tmp_path), file_pattern="*.txt"),
DirectorySeedSource(path=str(seed_dir), file_pattern="*.txt"),
PlaintextResolver(),
)

Expand All @@ -518,14 +466,13 @@ def test_filesystem_seed_reader_batch_reader_skips_empty_fanout_rows_before_retu


def test_filesystem_seed_reader_batch_reader_stops_cleanly_after_emitting_records_when_only_empty_fanout_rows_remain(
tmp_path: Path,
write_alpha_beta_text_files: Callable[[str, str], Path],
) -> None:
(tmp_path / "alpha.txt").write_text("alpha-0", encoding="utf-8")
(tmp_path / "beta.txt").write_text("", encoding="utf-8")
seed_dir = write_alpha_beta_text_files("alpha-0", "")

reader = FanoutDirectorySeedReader()
reader = LineFanoutDirectorySeedReader(include_file_name=True)
reader.attach(
DirectorySeedSource(path=str(tmp_path), file_pattern="*.txt"),
DirectorySeedSource(path=str(seed_dir), file_pattern="*.txt"),
PlaintextResolver(),
)

Expand All @@ -545,6 +492,21 @@ def test_filesystem_seed_reader_batch_reader_stops_cleanly_after_emitting_record
assert reader.hydrated_relative_paths == ["alpha.txt", "beta.txt"]


def test_filesystem_seed_reader_full_output_raises_when_all_manifest_rows_fan_out_to_empty(
write_alpha_beta_text_files: Callable[[str, str], Path],
) -> None:
seed_dir = write_alpha_beta_text_files("", "")

reader = LineFanoutDirectorySeedReader(include_file_name=True)
reader.attach(
DirectorySeedSource(path=str(seed_dir), file_pattern="*.txt"),
PlaintextResolver(),
)

with pytest.raises(SeedReaderError, match="Seed source at .* did not produce any rows"):
reader.create_duckdb_connection().execute(f"SELECT * FROM '{reader.get_dataset_uri()}'").df()


def test_local_file_seed_reader_uses_load_time_runtime_path_when_cwd_changes(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
Expand Down Expand Up @@ -649,10 +611,11 @@ def test_filesystem_seed_reader_raises_for_undeclared_hydrated_columns(
@pytest.mark.parametrize(
("hydrated_return", "error_pattern"),
[
(None, "Manifest row index 0 returned NoneType"),
(123, "Manifest row index 0 returned int"),
(["not-a-record"], "Manifest row index 0 returned an iterable containing str"),
],
ids=["scalar", "iterable-of-invalid-records"],
ids=["none", "scalar", "iterable-of-invalid-records"],
)
def test_filesystem_seed_reader_rejects_invalid_hydrate_row_returns(
tmp_path: Path,
Expand All @@ -661,15 +624,15 @@ def test_filesystem_seed_reader_rejects_invalid_hydrate_row_returns(
) -> None:
(tmp_path / "alpha.txt").write_text("alpha", encoding="utf-8")

reader = InvalidHydrationReturnSeedReader(hydrated_return)
reader = ConfigurableHydrationDirectorySeedReader(hydrated_return=hydrated_return)
reader.attach(DirectorySeedSource(path=str(tmp_path), file_pattern="*.txt"), PlaintextResolver())

with pytest.raises(SeedReaderError, match=error_pattern):
reader.create_duckdb_connection().execute(f"SELECT * FROM '{reader.get_dataset_uri()}'").df()


@pytest.mark.parametrize(
("output_columns", "hydrated_rows", "error_pattern"),
("output_columns", "hydrated_return", "error_pattern"),
[
(
["relative_path", "content"],
Expand All @@ -693,12 +656,15 @@ def test_filesystem_seed_reader_rejects_invalid_hydrate_row_returns(
def test_filesystem_seed_reader_validates_each_fanout_record_against_output_columns(
tmp_path: Path,
output_columns: list[str],
hydrated_rows: list[dict[str, str]],
hydrated_return: list[dict[str, str]],
error_pattern: str,
) -> None:
(tmp_path / "alpha.txt").write_text("alpha", encoding="utf-8")

reader = SchemaMismatchFanoutSeedReader(output_columns=output_columns, hydrated_rows=hydrated_rows)
reader = ConfigurableHydrationDirectorySeedReader(
output_columns=output_columns,
hydrated_return=hydrated_return,
)
reader.attach(DirectorySeedSource(path=str(tmp_path), file_pattern="*.txt"), PlaintextResolver())

with pytest.raises(SeedReaderError, match=error_pattern):
Expand Down
Loading
Loading