|
1 | 1 | import json |
2 | 2 | import os |
3 | | -from typing import Any, Dict, Iterator, List, Union |
| 3 | +from typing import Any, Dict, List, Union |
4 | 4 |
|
5 | 5 | import ray |
6 | 6 | import ray.data |
@@ -44,41 +44,6 @@ def read(self, input_path: Union[str, List[str]]) -> ray.data.Dataset: |
44 | 44 | ds = ds.filter(self._should_keep_item) |
45 | 45 | return ds |
46 | 46 |
|
47 | | - def read_stream(self, file_path: str) -> Iterator[Dict[str, Any]]: |
48 | | - """ |
49 | | - Stream read JSONL files line by line without loading entire file into memory. |
50 | | - Returns an iterator that yields filtered documents. |
51 | | -
|
52 | | - :param file_path: Path to the JSONL file. |
53 | | - :return: Iterator of dictionaries containing the data. |
54 | | - """ |
55 | | - if not file_path.endswith(".jsonl"): |
56 | | - raise ValueError("read_stream only supports JSONL files, not JSON files") |
57 | | - |
58 | | - with open(file_path, "r", encoding="utf-8") as f: |
59 | | - for line in f: |
60 | | - try: |
61 | | - doc = json.loads(line) |
62 | | - assert "type" in doc, f"Missing 'type' in document: {doc}" |
63 | | - if doc.get("type") == "text" and self.text_column not in doc: |
64 | | - raise ValueError( |
65 | | - f"Missing '{self.text_column}' in document: {doc}" |
66 | | - ) |
67 | | - |
68 | | - # Apply filtering logic inline (similar to BaseReader.filter) |
69 | | - if doc.get("type") == "text": |
70 | | - content = doc.get(self.text_column, "").strip() |
71 | | - if content: |
72 | | - yield doc |
73 | | - elif doc.get("type") in ("image", "table", "equation"): |
74 | | - img_path = doc.get("img_path") |
75 | | - if self._image_exists(img_path): |
76 | | - yield doc |
77 | | - else: |
78 | | - yield doc |
79 | | - except json.JSONDecodeError as e: |
80 | | - logger.error("Error decoding JSON line: %s. Error: %s", line, e) |
81 | | - |
82 | 47 | @staticmethod |
83 | 48 | def _image_exists(path_or_url: str, timeout: int = 3) -> bool: |
84 | 49 | """ |
|
0 commit comments