Skip to content

Commit d93526e

Browse files
sumedhsakdeoclaude
andcommitted
Move batch_size parameter to ArrivalOrder for better semantic design
- Add batch_size parameter to ArrivalOrder class with comprehensive documentation - Include memory formula: Peak memory ≈ concurrent_streams × batch_size × max_buffered_batches × (average row size) - Update default concurrent_streams from 1 to 8 for better performance out-of-the-box - Remove batch_size parameter from to_arrow_batch_reader() and to_record_batches() methods - Simplify API by putting batch_size where it has direct memory impact (streaming orders) - TaskOrder uses PyArrow defaults, ArrivalOrder provides full memory control 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent b5cfb78 commit d93526e

File tree

2 files changed

+33
-20
lines changed

2 files changed

+33
-20
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@
144144
visit,
145145
visit_with_partner,
146146
)
147-
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, ArrivalOrder, ScanOrder, TableProperties, TaskOrder
147+
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, ArrivalOrder, ScanOrder, TableProperties
148148
from pyiceberg.table.locations import load_location_provider
149149
from pyiceberg.table.metadata import TableMetadata
150150
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
@@ -1837,7 +1837,6 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
18371837
def to_record_batches(
18381838
self,
18391839
tasks: Iterable[FileScanTask],
1840-
batch_size: int | None = None,
18411840
order: ScanOrder | None = None,
18421841
) -> Iterator[pa.RecordBatch]:
18431842
"""Scan the Iceberg table and return an Iterator[pa.RecordBatch].
@@ -1853,11 +1852,15 @@ def to_record_batches(
18531852
18541853
Args:
18551854
tasks: FileScanTasks representing the data files and delete files to read from.
1856-
batch_size: The number of rows per batch. If None, PyArrow's default is used.
18571855
order: Controls the order in which record batches are returned.
18581856
TaskOrder() (default) yields batches one file at a time in task order.
1859-
ArrivalOrder(concurrent_streams=N, max_buffered_batches=M) yields batches
1860-
as they are produced without materializing entire files into memory.
1857+
ArrivalOrder(concurrent_streams=N, batch_size=B, max_buffered_batches=M)
1858+
yields batches as they are produced without materializing entire files
1859+
into memory. Peak memory ≈ concurrent_streams × batch_size × max_buffered_batches
1860+
× (average row size in bytes). batch_size is the number of rows per batch.
1861+
For example (if average row size ≈ 32 bytes):
1862+
- ArrivalOrder(concurrent_streams=4, batch_size=32768, max_buffered_batches=8)
1863+
- Peak memory ≈ 4 × 32768 rows × 8 × 32 bytes ≈ ~32 MB (plus Arrow overhead)
18611864
18621865
Returns:
18631866
An Iterator of PyArrow RecordBatches.
@@ -1868,9 +1871,6 @@ def to_record_batches(
18681871
ValueError: When a field type in the file cannot be projected to the schema type,
18691872
or when an invalid order value is provided, or when concurrent_streams < 1.
18701873
"""
1871-
if order is None:
1872-
order = TaskOrder()
1873-
18741874
if not isinstance(order, ScanOrder):
18751875
raise ValueError(f"Invalid order: {order!r}. Must be a ScanOrder instance (TaskOrder() or ArrivalOrder()).")
18761876

@@ -1881,11 +1881,11 @@ def to_record_batches(
18811881
raise ValueError(f"concurrent_streams must be >= 1, got {order.concurrent_streams}")
18821882
return self._apply_limit(
18831883
self._iter_batches_arrival(
1884-
task_list, deletes_per_file, batch_size, order.concurrent_streams, order.max_buffered_batches
1884+
task_list, deletes_per_file, order.batch_size, order.concurrent_streams, order.max_buffered_batches
18851885
)
18861886
)
18871887

1888-
return self._apply_limit(self._iter_batches_materialized(task_list, deletes_per_file, batch_size))
1888+
return self._apply_limit(self._iter_batches_materialized(task_list, deletes_per_file))
18891889

18901890
def _prepare_tasks_and_deletes(
18911891
self, tasks: Iterable[FileScanTask]
@@ -1914,13 +1914,12 @@ def _iter_batches_materialized(
19141914
self,
19151915
task_list: list[FileScanTask],
19161916
deletes_per_file: dict[str, list[ChunkedArray]],
1917-
batch_size: int | None,
19181917
) -> Iterator[pa.RecordBatch]:
19191918
"""Yield batches using executor.map with full file materialization."""
19201919
executor = ExecutorFactory.get_or_create()
19211920

19221921
def batches_for_task(task: FileScanTask) -> list[pa.RecordBatch]:
1923-
return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file, batch_size))
1922+
return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
19241923

19251924
for batches in executor.map(batches_for_task, task_list):
19261925
yield from batches

pyiceberg/table/__init__.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,23 @@ class ArrivalOrder(ScanOrder):
174174
175175
Batches are yielded as they are produced without materializing entire
176176
files into memory. Supports concurrent processing of multiple files.
177+
178+
Memory Usage:
179+
Peak memory ≈ concurrent_streams × batch_size × max_buffered_batches
180+
× (average row size in bytes). batch_size is the number of rows per batch.
181+
182+
For example (if average row size ≈ 32 bytes):
183+
- ArrivalOrder(concurrent_streams=4, batch_size=32768, max_buffered_batches=8)
184+
- Peak memory ≈ 4 × 32768 rows × 8 × 32 bytes ≈ ~32 MB (plus Arrow overhead)
185+
186+
Args:
187+
concurrent_streams: Number of files to read concurrently (default: 8)
188+
batch_size: Number of rows per batch, controls memory per stream (default: None, uses PyArrow default ~131K)
189+
max_buffered_batches: Maximum batches buffered per stream (default: 16)
177190
"""
178191

179-
concurrent_streams: int = 1
192+
concurrent_streams: int = 8
193+
batch_size: int | None = None
180194
max_buffered_batches: int = 16
181195

182196

@@ -2181,7 +2195,7 @@ def to_arrow(self) -> pa.Table:
21812195
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
21822196
).to_table(self.plan_files())
21832197

2184-
def to_arrow_batch_reader(self, batch_size: int | None = None, order: ScanOrder | None = None) -> pa.RecordBatchReader:
2198+
def to_arrow_batch_reader(self, order: ScanOrder | None = None) -> pa.RecordBatchReader:
21852199
"""Return an Arrow RecordBatchReader from this DataScan.
21862200
21872201
For large results, using a RecordBatchReader requires less memory than
@@ -2194,12 +2208,12 @@ def to_arrow_batch_reader(self, batch_size: int | None = None, order: ScanOrder
21942208
Within each file, batch ordering follows row order.
21952209
21962210
Args:
2197-
batch_size: The number of rows per batch. If None, PyArrow's default is used.
21982211
order: Controls the order in which record batches are returned.
2199-
TaskOrder() (default) yields batches one file at a time in task order.
2200-
ArrivalOrder(concurrent_streams=N, max_buffered_batches=M) yields batches
2201-
as they are produced without materializing entire files into memory.
2202-
concurrent_streams controls parallelism, max_buffered_batches controls memory.
2212+
TaskOrder() (default) yields batches one file at a time in task order using
2213+
PyArrow's default batch size.
2214+
ArrivalOrder(concurrent_streams=N, batch_size=B, max_buffered_batches=M)
2215+
yields batches as they are produced without materializing entire files
2216+
into memory. Memory usage ≈ concurrent_streams × batch_size × max_buffered_batches × (average row size in bytes).
22032217
22042218
Returns:
22052219
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
@@ -2215,7 +2229,7 @@ def to_arrow_batch_reader(self, batch_size: int | None = None, order: ScanOrder
22152229
target_schema = schema_to_pyarrow(self.projection())
22162230
batches = ArrowScan(
22172231
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2218-
).to_record_batches(self.plan_files(), batch_size=batch_size, order=order)
2232+
).to_record_batches(self.plan_files(), order=order)
22192233

22202234
return pa.RecordBatchReader.from_batches(
22212235
target_schema,

0 commit comments

Comments
 (0)