@@ -86,75 +86,100 @@ def benchmark_table(tmp_path_factory: pytest.TempPathFactory) -> Table:
8686
8787@pytest .mark .benchmark
8888@pytest .mark .parametrize (
89- "streaming,concurrent_files,batch_size" ,
89+ "streaming,concurrent_files,batch_size,max_workers " ,
9090 [
91- pytest .param (False , 1 , None , id = "default" ),
92- pytest .param (True , 1 , None , id = "streaming-cf1" ),
93- pytest .param (True , 2 , None , id = "streaming-cf2" ),
94- pytest .param (True , 4 , None , id = "streaming-cf4" ),
95- pytest .param (True , 8 , None , id = "streaming-cf8" ),
96- pytest .param (True , 16 , None , id = "streaming-cf16" ),
91+ pytest .param (False , 1 , None , None , id = "default" ),
92+ pytest .param (False , 1 , None , 4 , id = "default-4threads" ),
93+ pytest .param (True , 1 , None , None , id = "streaming-cf1" ),
94+ pytest .param (True , 2 , None , None , id = "streaming-cf2" ),
95+ pytest .param (True , 4 , None , None , id = "streaming-cf4" ),
96+ pytest .param (True , 8 , None , None , id = "streaming-cf8" ),
97+ pytest .param (True , 16 , None , None , id = "streaming-cf16" ),
9798 ],
9899)
99100def test_read_throughput (
100101 benchmark_table : Table ,
101102 streaming : bool ,
102103 concurrent_files : int ,
103104 batch_size : int | None ,
105+ max_workers : int | None ,
104106) -> None :
105- """Measure records/sec and peak Arrow memory for a scan configuration."""
107+ """Measure records/sec, time to first record, and peak Arrow memory for a scan configuration."""
108+ from pyiceberg .utils .concurrent import ExecutorFactory
109+
106110 effective_batch_size = batch_size or 131_072 # PyArrow default
107111 if streaming :
108112 config_str = f"streaming=True, concurrent_files={ concurrent_files } , batch_size={ effective_batch_size } "
109113 else :
110- config_str = f"streaming=False (executor.map, all files parallel), batch_size={ effective_batch_size } "
114+ workers_str = f", max_workers={ max_workers } " if max_workers else ""
115+ config_str = f"streaming=False (executor.map, all files parallel), batch_size={ effective_batch_size } { workers_str } "
111116 print ("\n --- ArrowScan Read Throughput Benchmark ---" )
112117 print (f"Config: { config_str } " )
113118 print (f" Files: { NUM_FILES } , Rows per file: { ROWS_PER_FILE } , Total rows: { TOTAL_ROWS } " )
114119
115120 elapsed_times : list [float ] = []
116121 throughputs : list [float ] = []
117122 peak_memories : list [int ] = []
118-
119- for run in range (NUM_RUNS ):
120- # Measure throughput
121- gc .collect ()
122- pa .default_memory_pool ().release_unused ()
123- baseline_mem = pa .total_allocated_bytes ()
124- peak_mem = baseline_mem
125-
126- start = timeit .default_timer ()
127- total_rows = 0
128- for batch in benchmark_table .scan ().to_arrow_batch_reader (
129- batch_size = batch_size ,
130- streaming = streaming ,
131- concurrent_files = concurrent_files ,
132- ):
133- total_rows += len (batch )
134- current_mem = pa .total_allocated_bytes ()
135- if current_mem > peak_mem :
136- peak_mem = current_mem
137- elapsed = timeit .default_timer () - start
138-
139- peak_above_baseline = peak_mem - baseline_mem
140- rows_per_sec = total_rows / elapsed if elapsed > 0 else 0
141- elapsed_times .append (elapsed )
142- throughputs .append (rows_per_sec )
143- peak_memories .append (peak_above_baseline )
144-
145- print (
146- f" Run { run + 1 } : { elapsed :.2f} s, { rows_per_sec :,.0f} rows/s, "
147- f"peak arrow mem: { peak_above_baseline / (1024 * 1024 ):.1f} MB"
148- )
149-
150- assert total_rows == TOTAL_ROWS , f"Expected { TOTAL_ROWS } rows, got { total_rows } "
123+ ttfr_times : list [float ] = []
124+
125+ # Override max_workers if specified
126+ original_instance = None
127+ if max_workers is not None :
128+ from concurrent .futures import ThreadPoolExecutor
129+
130+ original_instance = ExecutorFactory ._instance
131+ ExecutorFactory ._instance = ThreadPoolExecutor (max_workers = max_workers )
132+
133+ try :
134+ for run in range (NUM_RUNS ):
135+ # Measure throughput
136+ gc .collect ()
137+ pa .default_memory_pool ().release_unused ()
138+ baseline_mem = pa .total_allocated_bytes ()
139+ peak_mem = baseline_mem
140+
141+ start = timeit .default_timer ()
142+ total_rows = 0
143+ first_batch_time = None
144+ for batch in benchmark_table .scan ().to_arrow_batch_reader (
145+ batch_size = batch_size ,
146+ streaming = streaming ,
147+ concurrent_files = concurrent_files ,
148+ ):
149+ if first_batch_time is None :
150+ first_batch_time = timeit .default_timer () - start
151+ total_rows += len (batch )
152+ current_mem = pa .total_allocated_bytes ()
153+ if current_mem > peak_mem :
154+ peak_mem = current_mem
155+ elapsed = timeit .default_timer () - start
156+
157+ peak_above_baseline = peak_mem - baseline_mem
158+ rows_per_sec = total_rows / elapsed if elapsed > 0 else 0
159+ elapsed_times .append (elapsed )
160+ throughputs .append (rows_per_sec )
161+ peak_memories .append (peak_above_baseline )
162+ ttfr_times .append (first_batch_time or 0.0 )
163+
164+ print (
165+ f" Run { run + 1 } : { elapsed :.2f} s, { rows_per_sec :,.0f} rows/s, "
166+ f"TTFR: { (first_batch_time or 0 ) * 1000 :.1f} ms, "
167+ f"peak arrow mem: { peak_above_baseline / (1024 * 1024 ):.1f} MB"
168+ )
169+
170+ assert total_rows == TOTAL_ROWS , f"Expected { TOTAL_ROWS } rows, got { total_rows } "
171+ finally :
172+ if original_instance is not None :
173+ ExecutorFactory ._instance = original_instance
151174
152175 mean_elapsed = statistics .mean (elapsed_times )
153176 stdev_elapsed = statistics .stdev (elapsed_times ) if len (elapsed_times ) > 1 else 0.0
154177 mean_throughput = statistics .mean (throughputs )
155178 mean_peak_mem = statistics .mean (peak_memories )
179+ mean_ttfr = statistics .mean (ttfr_times )
156180
157181 print (
158182 f" Mean: { mean_elapsed :.2f} s ± { stdev_elapsed :.2f} s, { mean_throughput :,.0f} rows/s, "
183+ f"TTFR: { mean_ttfr * 1000 :.1f} ms, "
159184 f"peak arrow mem: { mean_peak_mem / (1024 * 1024 ):.1f} MB"
160185 )
0 commit comments