Skip to content

Commit 8d7f2c5

Browse files
feat: stream reading files
1 parent 899cf01 commit 8d7f2c5

File tree

1 file changed

+20
-26
lines changed

1 file changed

+20
-26
lines changed

graphgen/operators/read/read_files.py

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from pathlib import Path
2-
from typing import Any, Dict, List, Optional
2+
from typing import Iterator, List, Optional
33

44
from graphgen.models import (
55
CSVReader,
@@ -39,10 +39,10 @@ def read_files(
3939
input_file: str,
4040
allowed_suffix: Optional[List[str]] = None,
4141
cache_dir: Optional[str] = None,
42-
) -> list[dict]:
42+
) -> Iterator[list[dict]]:
4343
path = Path(input_file).expanduser()
4444
if not path.exists():
45-
raise FileNotFoundError(f"input_path not found: {input_file}")
45+
raise FileNotFoundError(f"[Reader] input_path not found: {input_file}")
4646

4747
if allowed_suffix is None:
4848
support_suffix = set(_MAPPING.keys())
@@ -54,33 +54,27 @@ def read_files(
5454
suffix = path.suffix.lstrip(".").lower()
5555
if suffix not in support_suffix:
5656
logger.warning(
57-
"Skip file %s (suffix '%s' not in allowed_suffix %s)",
57+
"[Reader] Skip file %s (suffix '%s' not in allowed_suffix %s)",
5858
path,
5959
suffix,
6060
support_suffix,
6161
)
62-
return []
62+
return
6363
reader = _build_reader(suffix, cache_dir)
64-
return reader.read(str(path))
64+
logger.info("[Reader] Reading file %s", path)
65+
yield reader.read(str(path))
66+
return
6567

6668
# folder
67-
files_to_read = [
68-
p for p in path.rglob("*") if p.suffix.lstrip(".").lower() in support_suffix
69-
]
70-
logger.info(
71-
"Found %d eligible file(s) under folder %s (allowed_suffix=%s)",
72-
len(files_to_read),
73-
input_file,
74-
support_suffix,
75-
)
76-
77-
all_docs: List[Dict[str, Any]] = []
78-
for p in files_to_read:
79-
try:
80-
suffix = p.suffix.lstrip(".").lower()
81-
reader = _build_reader(suffix, cache_dir)
82-
all_docs.extend(reader.read(str(p)))
83-
except Exception as e: # pylint: disable=broad-except
84-
logger.exception("Error reading %s: %s", p, e)
85-
86-
return all_docs
69+
logger.info("[Reader] Streaming directory %s", path)
70+
for p in path.rglob("*"):
71+
if p.is_file() and p.suffix.lstrip(".").lower() in support_suffix:
72+
try:
73+
suffix = p.suffix.lstrip(".").lower()
74+
reader = _build_reader(suffix, cache_dir)
75+
logger.info("[Reader] Reading file %s", p)
76+
docs = reader.read(str(p))
77+
if docs:
78+
yield docs
79+
except Exception: # pylint: disable=broad-except
80+
logger.exception("[Reader] Error reading %s", p)

0 commit comments

Comments
 (0)