@@ -19,6 +19,8 @@ use std::sync::Arc;
1919
2020use futures:: StreamExt ;
2121use futures:: stream:: BoxStream ;
22+ use tracing:: Instrument ;
23+ use tracing:: field:: Empty ;
2224
2325use crate :: delete_file_index:: DeleteFileIndex ;
2426use crate :: expr:: { Bind , BoundPredicate , Predicate } ;
@@ -31,6 +33,7 @@ use crate::spec::{
3133 ManifestContentType , ManifestEntryRef , ManifestFile , ManifestList , SchemaRef , SnapshotRef ,
3234 TableMetadataRef ,
3335} ;
36+ use crate :: traced_stream:: TracedStream ;
3437use crate :: { Error , ErrorKind , Result } ;
3538
3639/// Wraps a [`ManifestFile`] alongside the objects that are needed
@@ -53,14 +56,25 @@ pub(crate) struct ManifestEntryContext {
5356 pub bound_predicates : Option < Arc < BoundPredicates > > ,
5457 pub partition_spec_id : i32 ,
5558 pub snapshot_schema : SchemaRef ,
59+ pub ( crate ) span : tracing:: Span ,
5660}
5761
5862impl ManifestFileContext {
5963 /// Consumes this [`ManifestFileContext`], fetching its Manifest from FileIO and then
6064 /// streaming its constituent [`ManifestEntries`]
6165 pub ( crate ) async fn fetch_manifest_and_stream_entries (
6266 self ,
67+ parent_span : tracing:: Span ,
6368 ) -> Result < BoxStream < ' static , Result < ManifestEntryContext > > > {
69+ let manifest_span = tracing:: debug_span!(
70+ parent: & parent_span,
71+ "iceberg.scan.plan.process_manifest" ,
72+ iceberg. scan. plan. manifest. file_path = self . manifest_file. manifest_path,
73+ iceberg. scan. plan. manifest. entries_count = Empty ,
74+ ) ;
75+
76+ let span = manifest_span. clone ( ) ;
77+
6478 let ManifestFileContext {
6579 object_cache,
6680 manifest_file,
@@ -71,21 +85,46 @@ impl ManifestFileContext {
7185 ..
7286 } = self ;
7387
74- let manifest = object_cache. get_manifest ( & manifest_file) . await ?;
88+ let ( manifest, manifest_file) = async move {
89+ let manifest = object_cache. get_manifest ( & manifest_file) . await ;
90+ ( manifest, manifest_file)
91+ }
92+ . instrument ( manifest_span. clone ( ) )
93+ . await ;
94+ let manifest = manifest?;
95+
96+ span. record (
97+ "iceberg.scan.plan.manifest.entries_count" ,
98+ manifest. entries ( ) . len ( ) ,
99+ ) ;
75100
76- Ok ( async_stream:: stream! {
101+ let stream = async_stream:: stream! {
77102 for manifest_entry in manifest. entries( ) {
103+ let manifest_entry_span = tracing:: debug_span!(
104+ parent: span. clone( ) ,
105+ "iceberg.scan.plan.process_data_file" ,
106+ iceberg. scam. plan. data_file. file_path = manifest_entry. file_path( ) ,
107+ "iceberg.scan.plan_data_file.type" = Empty ,
108+ iceberg. scan. plan. data_file. skipped = Empty ,
109+ iceberg. scan. plan. data_file. skipped_reason = Empty ,
110+ ) ;
111+
78112 yield Ok ( ManifestEntryContext {
79113 manifest_entry: manifest_entry. clone( ) ,
80114 expression_evaluator_cache: expression_evaluator_cache. clone( ) ,
81115 field_ids: field_ids. clone( ) ,
82116 partition_spec_id: manifest_file. partition_spec_id,
83117 bound_predicates: bound_predicates. clone( ) ,
84118 snapshot_schema: snapshot_schema. clone( ) ,
119+ span: manifest_entry_span,
85120 } ) ;
86121 }
87122 }
88- . boxed ( ) )
123+ . boxed ( ) ;
124+
125+ Ok ( Box :: pin ( TracedStream :: new ( stream, vec ! [
126+ manifest_span. clone( ) ,
127+ ] ) ) )
89128 }
90129
91130 pub ( crate ) fn is_delete ( & self ) -> bool {
@@ -144,7 +183,11 @@ pub(crate) struct PlanContext {
144183}
145184
146185impl PlanContext {
147- #[ tracing:: instrument( skip_all) ]
186+ #[ tracing:: instrument(
187+ skip_all,
188+ level = "debug" ,
189+ fields( iceberg. scan. plan. manifest_list. file_path = ?self . snapshot. manifest_list( ) ) ,
190+ ) ]
148191 pub ( crate ) async fn get_manifest_list ( & self ) -> Result < Arc < ManifestList > > {
149192 self . object_cache
150193 . as_ref ( )
@@ -175,14 +218,19 @@ impl PlanContext {
175218
176219 #[ tracing:: instrument(
177220 skip_all,
221+ level = "debug" ,
222+ name = "iceberg.scan.plan.process_manifest_list" ,
178223 fields(
179- manifest_list. len = manifest_list. entries( ) . len( ) ,
224+ iceberg . scan . plan . manifest_list. entries_count = manifest_list. entries( ) . len( ) ,
180225 )
181226 ) ]
182- pub ( crate ) fn build_manifest_file_context_iter (
227+ pub ( crate ) fn build_manifest_file_contexts (
183228 & self ,
184229 manifest_list : Arc < ManifestList > ,
185- ) -> impl Iterator < Item = Result < ManifestFileContext > > {
230+ ) -> (
231+ Vec < Result < ManifestFileContext > > ,
232+ Vec < Result < ManifestFileContext > > ,
233+ ) {
186234 let has_predicate = self . predicate . is_some ( ) ;
187235
188236 ( 0 ..manifest_list. entries ( ) . len ( ) )
@@ -198,24 +246,28 @@ impl PlanContext {
198246 . get ( manifest_file. partition_spec_id , predicate. clone ( ) )
199247 . eval ( & manifest_file) ?
200248 {
201- tracing:: trace!( file_path = manifest_file. manifest_path, "iceberg.scan.manifest_file.skipped" ) ;
202- metrics:: counter!( "iceberg.scan.manifest_file.skipped" , "reason" => "partition" ) . increment ( 1 ) ;
249+ tracing:: debug!(
250+ iceberg. scan. plan. manifest. file_path = manifest_file. manifest_path,
251+ iceberg. scan. plan. manifest. skip_reason = "partition" ,
252+ "iceberg.scan.plan.manifest_file.skipped"
253+ ) ;
254+ metrics:: counter!( "iceberg.scan.plan.manifest_file.skipped" , "reason" => "partition" ) . increment ( 1 ) ;
203255 return Ok ( None ) ; // Skip this file.
204256 }
205257 Some ( predicate)
206258 } else {
207259 None
208260 } ;
209261
210- tracing:: trace!( file_path = manifest_file. manifest_path, "iceberg.scan.manifest_file.included" ) ;
211- metrics:: counter!( "iceberg.scan.manifest_file.included" ) . increment ( 1 ) ;
262+ metrics:: counter!( "iceberg.scan.plan.manifest_file.included" ) . increment ( 1 ) ;
212263
213264 let context = self
214265 . create_manifest_file_context ( manifest_file, partition_bound_predicate) ?;
215266 Ok ( Some ( context) )
216267 } ) ( )
217268 . transpose ( )
218269 } )
270+ . partition ( |ctx| ctx. as_ref ( ) . map_or ( true , |ctx| ctx. is_delete ( ) ) )
219271 }
220272
221273 fn create_manifest_file_context (
0 commit comments