diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ed9bc77618e9d..a98449e14a676 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -708,6 +708,11 @@ jobs: # If you encounter an error, run './dev/update_function_docs.sh' and commit ./dev/update_function_docs.sh git diff --exit-code + - name: Check if metrics.md has been modified + run: | + # If you encounter an error, run './dev/update_metric_docs.sh' and commit + ./dev/update_metric_docs.sh + git diff --exit-code examples-docs-check: name: check example README is up-to-date diff --git a/Cargo.lock b/Cargo.lock index 1675f26e8a0f0..b4c3409dee639 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1895,6 +1895,7 @@ dependencies = [ "tempfile", "test-utils", "tokio", + "toml", "url", "uuid", "zstd", @@ -2051,8 +2052,10 @@ dependencies = [ "criterion", "datafusion-common", "datafusion-common-runtime", + "datafusion-doc", "datafusion-execution", "datafusion-expr", + "datafusion-macros", "datafusion-physical-expr", "datafusion-physical-expr-adapter", "datafusion-physical-expr-common", @@ -2167,9 +2170,11 @@ dependencies = [ "datafusion-common", "datafusion-common-runtime", "datafusion-datasource", + "datafusion-doc", "datafusion-execution", "datafusion-expr", "datafusion-functions-aggregate-common", + "datafusion-macros", "datafusion-physical-expr", "datafusion-physical-expr-adapter", "datafusion-physical-expr-common", @@ -2188,6 +2193,10 @@ dependencies = [ [[package]] name = "datafusion-doc" version = "51.0.0" +dependencies = [ + "datafusion-macros", + "inventory", +] [[package]] name = "datafusion-examples" @@ -2237,7 +2246,9 @@ dependencies = [ "chrono", "dashmap", "datafusion-common", + "datafusion-doc", "datafusion-expr", + "datafusion-macros", "futures", "insta", "log", @@ -2450,6 +2461,7 @@ name = "datafusion-macros" version = "51.0.0" dependencies = [ "datafusion-doc", + "proc-macro2", "quote", "syn 2.0.111", ] @@ -2568,6 +2580,7 @@ dependencies = [ "criterion", "datafusion-common", "datafusion-common-runtime", + "datafusion-doc", "datafusion-execution", "datafusion-expr", "datafusion-functions", @@ -2575,6 +2588,7 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-functions-window", "datafusion-functions-window-common", + "datafusion-macros", "datafusion-physical-expr", "datafusion-physical-expr-common", "futures", @@ -3827,6 +3841,15 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "inventory" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc61209c082fbeb19919bee74b176221b27223e27b65d781eb91af24eb1fb46e" +dependencies = [ + "rustversion", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -5720,6 +5743,15 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "serde_spanned" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8bbf91e5a4d6315eee45e704372590b30e260ee83af6639d64557f51b067776" +dependencies = [ + "serde_core", +] + [[package]] name = "serde_tokenstream" version = "0.2.2" @@ -6450,6 +6482,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.9.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00e5e5d9bf2475ac9d4f0d9edab68cc573dc2fd644b0dba36b0c30a92dd9eaa0" +dependencies = [ + "indexmap 2.12.1", + "serde_core", + "serde_spanned", + "toml_datetime", + "toml_parser", + "toml_writer", + "winnow", +] + [[package]] name = "toml_datetime" version = "0.7.2" @@ -6480,6 +6527,12 @@ dependencies = [ "winnow", ] +[[package]] +name = "toml_writer" +version = "1.0.6+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab16f14aed21ee8bfd8ec22513f7287cd4a91aa92e44edfe2c17ddd004e92607" + [[package]] name = "tonic" version = "0.14.2" diff --git a/ci/scripts/check_metric_docs.sh b/ci/scripts/check_metric_docs.sh new file mode 100755 index 0000000000000..37b915d883924 --- /dev/null +++ b/ci/scripts/check_metric_docs.sh @@ -0,0 +1,61 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under this License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +SCRIPT_PATH="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/$(basename "${BASH_SOURCE[0]}")" + +echo "$SCRIPT_PATH: Checking if metrics.md is up-to-date" + +# Ensure we're in the project root +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" +cd "$PROJECT_ROOT" + +METRICS_FILE="docs/source/user-guide/metrics.md" +METRICS_BACKUP=$(mktemp) + +if [ ! -f "$METRICS_FILE" ]; then + echo "Warning: $METRICS_FILE not found, skipping check." >&2 + rm -f "$METRICS_BACKUP" + exit 0 +fi + +# Backup the current file +cp "$METRICS_FILE" "$METRICS_BACKUP" + +# Run the update script (this will modify the file) +# Suppress output but capture exit code +if ! ./dev/update_metric_docs.sh > /dev/null 2>&1; then + echo "Error: Failed to run update_metric_docs.sh. Check permissions and try again." >&2 + # Restore the original file + mv "$METRICS_BACKUP" "$METRICS_FILE" + exit 1 +fi + +# Compare the updated file with the backup +if ! diff -q "$METRICS_BACKUP" "$METRICS_FILE" > /dev/null; then + echo "Error: metrics.md is not up-to-date. Run './dev/update_metric_docs.sh' and commit the changes." >&2 + # Restore the original file + mv "$METRICS_BACKUP" "$METRICS_FILE" + exit 1 +fi + +# Clean up the backup file +rm -f "$METRICS_BACKUP" +exit 0 + diff --git a/ci/scripts/doc_prettier_check.sh b/ci/scripts/doc_prettier_check.sh index d94a0d1c96171..3e31efcb96e4c 100755 --- a/ci/scripts/doc_prettier_check.sh +++ b/ci/scripts/doc_prettier_check.sh @@ -40,7 +40,7 @@ if ! command -v npx >/dev/null 2>&1; then fi # Ignore subproject CHANGELOG.md because it is machine generated -npx prettier@2.7.1 $MODE \ +npx prettier@3.7.4 $MODE \ '{datafusion,datafusion-cli,datafusion-examples,dev,docs}/**/*.md' \ '!datafusion/CHANGELOG.md' \ README.md \ diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index bd88ed3b9ca1e..9bfae9f392b4a 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -108,6 +108,7 @@ unicode_expressions = [ "datafusion-sql?/unicode_expressions", "datafusion-functions/unicode_expressions", ] +docs = [] extended_tests = [] [dependencies] @@ -167,8 +168,8 @@ criterion = { workspace = true, features = ["async_tokio", "async_futures"] } ctor = { workspace = true } dashmap = "6.1.0" datafusion-doc = { workspace = true } -datafusion-functions-window-common = { workspace = true } datafusion-macros = { workspace = true } +datafusion-functions-window-common = { workspace = true } datafusion-physical-optimizer = { workspace = true } doc-comment = { workspace = true } env_logger = { workspace = true } @@ -184,6 +185,9 @@ sysinfo = "0.37.2" test-utils = { path = "../../test-utils" } tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot", "fs"] } +[build-dependencies] +toml = "0.9" + [package.metadata.cargo-machete] ignored = ["datafusion-doc", "datafusion-macros", "dashmap"] diff --git a/datafusion/core/build.rs b/datafusion/core/build.rs new file mode 100644 index 0000000000000..5fe0ccaedbbd3 --- /dev/null +++ b/datafusion/core/build.rs @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::env; +use std::fs; +use std::io::{Read, Write}; +use std::path::Path; +use toml::value::Table; + +fn main() { + let out_dir = env::var("OUT_DIR").unwrap(); + let dest_path = Path::new(&out_dir).join("link_metrics.rs"); + let mut f = fs::File::create(&dest_path).unwrap(); + + // Identify crates that use #[metric_doc] + // We scan sibling directories in `datafusion/` + let core_crate_dir = env::var("CARGO_MANIFEST_DIR").unwrap(); + let datafusion_dir = Path::new(&core_crate_dir).parent().unwrap(); + + let mut crates_with_metrics = Vec::new(); + + // Read Cargo.toml to check dependencies + let cargo_toml_path = Path::new(&core_crate_dir).join("Cargo.toml"); + let cargo_toml_content = + fs::read_to_string(&cargo_toml_path).expect("core Cargo.toml must be readable"); + let cargo_manifest: toml::Value = + toml::from_str(&cargo_toml_content).expect("core Cargo.toml must be valid TOML"); + let dependencies = manifest_table(&cargo_manifest, "dependencies"); + let dev_dependencies = manifest_table(&cargo_manifest, "dev-dependencies"); + let features = manifest_table(&cargo_manifest, "features"); + + if let Ok(entries) = fs::read_dir(datafusion_dir) { + for entry in entries.flatten() { + let path = entry.path(); + if path.is_dir() { + // Check if it's a crate (has Cargo.toml) + if path.join("Cargo.toml").exists() { + let dir_name = path.file_name().unwrap().to_str().unwrap(); + let crate_name = format!("datafusion-{dir_name}"); + + // Skip self (core), macros (definition only), and doc (dev-dep only) + if dir_name == "core" || dir_name == "macros" || dir_name == "doc" { + continue; + } + + let is_dependency = dependencies.contains_key(&crate_name) + || dev_dependencies.contains_key(&crate_name); + if !is_dependency { + continue; + } + + if has_metric_doc(&path) { + crates_with_metrics.push(crate_name); + } + } + } + } + } + + // Sort for deterministic output + crates_with_metrics.sort(); + + writeln!( + f, + "/// Automatically generated by build.rs to link crates with metrics" + ) + .unwrap(); + writeln!(f, "pub fn link_metrics() {{").unwrap(); + for krate in crates_with_metrics { + let krate_snake = krate.replace("-", "_"); + + if is_optional_dependency(&dependencies, &krate) { + let feature = dependency_feature(&features, &krate); + writeln!(f, " #[cfg(feature = \"{feature}\")]").unwrap(); + } + + writeln!(f, " {{").unwrap(); + writeln!(f, " // Force link {krate}").unwrap(); + writeln!(f, " use {krate_snake} as _;").unwrap(); + writeln!(f, " }}").unwrap(); + } + writeln!(f, "}}").unwrap(); + + println!("cargo:rerun-if-changed=build.rs"); + println!("cargo:rerun-if-changed=Cargo.toml"); +} + +fn manifest_table(manifest: &toml::Value, key: &str) -> Table { + manifest + .get(key) + .and_then(|value| value.as_table()) + .cloned() + .unwrap_or_default() +} + +fn is_optional_dependency(dependencies: &Table, crate_name: &str) -> bool { + dependencies + .get(crate_name) + .and_then(|dep| dep.as_table()) + .and_then(|dep| dep.get("optional")) + .and_then(|value| value.as_bool()) + .unwrap_or(false) +} + +fn dependency_feature(features: &Table, crate_name: &str) -> String { + let dep_feature = format!("dep:{crate_name}"); + for (feature, deps) in features { + if let Some(array) = deps.as_array() { + for dep in array { + if let Some(dep_str) = dep.as_str() { + if dep_str == crate_name || dep_str == dep_feature { + return feature.clone(); + } + } + } + } + } + crate_name.to_string() +} + +fn has_metric_doc(dir: &Path) -> bool { + if let Ok(entries) = fs::read_dir(dir) { + for entry in entries.flatten() { + let path = entry.path(); + if path.is_dir() { + // Skip target, .git, etc + if let Some(name) = path.file_name().and_then(|n| n.to_str()) + && (name == "target" || name.starts_with('.')) + { + continue; + } + if has_metric_doc(&path) { + return true; + } + } else if let Some(ext) = path.extension() + && ext == "rs" + && let Ok(mut file) = fs::File::open(&path) + { + let mut content = String::new(); + if file.read_to_string(&mut content).is_ok() + && content.contains("#[metric_doc") + { + return true; + } + } + } + } + false +} diff --git a/datafusion/core/src/bin/print_metric_docs.rs b/datafusion/core/src/bin/print_metric_docs.rs new file mode 100644 index 0000000000000..47c31227f5a82 --- /dev/null +++ b/datafusion/core/src/bin/print_metric_docs.rs @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Print metrics documentation collected via `DocumentedMetrics`/`DocumentedExec`. +//! Called from doc generation scripts to refresh `docs/source/user-guide/metrics.md`. + +use datafusion_expr::metric_doc_sections::{ + ExecDoc, MetricDoc, MetricDocPosition, exec_docs, metric_docs, +}; + +fn main() -> std::io::Result<()> { + datafusion::doc::link_metrics(); + + let mut content = String::new(); + let mut metrics: Vec<&MetricDoc> = metric_docs().collect(); + metrics.sort_by(|a, b| a.name.cmp(b.name)); + + let mut execs: Vec<&ExecDoc> = exec_docs().collect(); + execs.sort_by(|a, b| a.name.cmp(b.name)); + + let common: Vec<&MetricDoc> = metrics + .iter() + .copied() + .filter(|m| m.position == MetricDocPosition::Common) + .collect(); + + if !common.is_empty() { + content.push_str("## Common Metrics\n\n"); + for metric in common { + render_metric_doc(&mut content, metric, 3); + } + } + + if !execs.is_empty() { + content.push_str("## Operator-specific Metrics\n\n"); + for exec in execs { + render_exec_doc(&mut content, exec); + } + } + + println!("{content}"); + Ok(()) +} + +fn render_exec_doc(out: &mut String, exec: &ExecDoc) { + out.push_str(&heading(3, exec.name)); + out.push_str("\n\n"); + + if let Some(doc) = summarize(exec.doc) + && !doc.is_empty() + { + out.push_str(&sanitize(doc)); + out.push_str("\n\n"); + } + + // Filter to operator-specific metrics only (common metrics are documented separately) + let mut metrics: Vec<&MetricDoc> = exec + .metrics + .iter() + .copied() + .filter(|metric| metric.position != MetricDocPosition::Common) + .collect(); + metrics.sort_by(|a, b| a.name.cmp(b.name)); + + if metrics.is_empty() { + out.push_str( + "_No operator-specific metrics documented (see Common Metrics)._\n\n", + ); + } else { + for metric in metrics { + render_metric_doc(out, metric, 4); + } + } +} + +fn render_metric_doc(out: &mut String, metric: &MetricDoc, heading_level: usize) { + out.push_str(&heading(heading_level, metric.name)); + out.push_str("\n\n"); + + if let Some(doc) = summarize(metric.doc) + && !doc.is_empty() + { + out.push_str(&sanitize(doc)); + out.push_str("\n\n"); + } + + out.push_str("| Metric | Description |\n"); + out.push_str("| --- | --- |\n"); + for field in metric.fields { + out.push_str(&format!("| {} | {} |\n", field.name, sanitize(field.doc))); + } + out.push('\n'); +} + +fn heading(level: usize, title: &str) -> String { + format!("{} {}", "#".repeat(level), title) +} + +fn summarize(doc: &str) -> Option<&str> { + let trimmed = doc.trim(); + if trimmed.is_empty() { + return None; + } + trimmed.split("\n\n").next().map(str::trim) +} + +fn sanitize(doc: &str) -> String { + doc.split_whitespace().collect::>().join(" ") +} diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index e83934a8e281d..a6d5d1534e52a 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -900,6 +900,11 @@ pub mod test; mod schema_equivalence; pub mod test_util; +/// Documentation generation helpers +pub mod doc { + include!(concat!(env!("OUT_DIR"), "/link_metrics.rs")); +} + #[cfg(doctest)] doc_comment::doctest!("../../../README.md", readme_example_test); diff --git a/datafusion/datasource-parquet/Cargo.toml b/datafusion/datasource-parquet/Cargo.toml index a5f6f56ac6f33..314645fa0763e 100644 --- a/datafusion/datasource-parquet/Cargo.toml +++ b/datafusion/datasource-parquet/Cargo.toml @@ -37,9 +37,11 @@ bytes = { workspace = true } datafusion-common = { workspace = true, features = ["object_store", "parquet"] } datafusion-common-runtime = { workspace = true } datafusion-datasource = { workspace = true } +datafusion-doc = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-functions-aggregate-common = { workspace = true } +datafusion-macros = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-adapter = { workspace = true } datafusion-physical-expr-common = { workspace = true } diff --git a/datafusion/datasource-parquet/src/metrics.rs b/datafusion/datasource-parquet/src/metrics.rs index 5eaa137e9a456..722d6672bd211 100644 --- a/datafusion/datasource-parquet/src/metrics.rs +++ b/datafusion/datasource-parquet/src/metrics.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use datafusion_macros::metric_doc; use datafusion_physical_plan::metrics::{ Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, PruningMetrics, RatioMergeStrategy, RatioMetrics, Time, @@ -26,6 +27,7 @@ use datafusion_physical_plan::metrics::{ /// through [`ParquetFileReaderFactory`]. /// /// [`ParquetFileReaderFactory`]: super::ParquetFileReaderFactory +#[metric_doc] #[derive(Debug, Clone)] pub struct ParquetFileMetrics { /// Number of file **ranges** pruned or matched by partition or file level statistics. diff --git a/datafusion/datasource/Cargo.toml b/datafusion/datasource/Cargo.toml index 48bf30f7a448f..58690639ff09a 100644 --- a/datafusion/datasource/Cargo.toml +++ b/datafusion/datasource/Cargo.toml @@ -49,8 +49,10 @@ bzip2 = { workspace = true, optional = true } chrono = { workspace = true } datafusion-common = { workspace = true, features = ["object_store"] } datafusion-common-runtime = { workspace = true } +datafusion-doc = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-macros = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-adapter = { workspace = true } datafusion-physical-expr-common = { workspace = true } diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index c8090382094ef..55392a4cd2bdc 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -32,6 +32,7 @@ use crate::file_scan_config::FileScanConfig; use arrow::datatypes::SchemaRef; use datafusion_common::error::Result; use datafusion_execution::RecordBatchStream; +use datafusion_macros::metric_doc; use datafusion_physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time, }; @@ -360,6 +361,7 @@ impl StartableTime { /// as other operators. /// /// [`FileStream`]: +#[metric_doc] pub struct FileStreamMetrics { /// Wall clock time elapsed for file opening. /// diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index a3892dfac9778..6639125720a1b 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -36,9 +36,11 @@ use datafusion_physical_plan::{ use itertools::Itertools; use crate::file_scan_config::FileScanConfig; +use crate::file_stream::FileStreamMetrics; use datafusion_common::config::ConfigOptions; use datafusion_common::{Constraints, Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_macros::metric_doc; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_plan::SortOrderPushdownResult; @@ -224,6 +226,7 @@ pub trait DataSource: Send + Sync + Debug { /// the [`FileSource`] trait. /// /// [`FileSource`]: crate::file::FileSource +#[metric_doc(FileStreamMetrics)] #[derive(Clone, Debug)] pub struct DataSourceExec { /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig` diff --git a/datafusion/doc/Cargo.toml b/datafusion/doc/Cargo.toml index c1368c1531533..0c58b16fb8883 100644 --- a/datafusion/doc/Cargo.toml +++ b/datafusion/doc/Cargo.toml @@ -39,3 +39,9 @@ workspace = true [lib] name = "datafusion_doc" + +[dependencies] +inventory = "0.3" + +[dev-dependencies] +datafusion-macros = { workspace = true } diff --git a/datafusion/doc/src/lib.rs b/datafusion/doc/src/lib.rs index 836cb9345b51f..b91e68b317a34 100644 --- a/datafusion/doc/src/lib.rs +++ b/datafusion/doc/src/lib.rs @@ -23,10 +23,12 @@ )] #![cfg_attr(docsrs, feature(doc_cfg))] +mod metrics; mod udaf; mod udf; mod udwf; +pub use metrics::metric_doc_sections; pub use udaf::aggregate_doc_sections; pub use udf::scalar_doc_sections; pub use udwf::window_doc_sections; diff --git a/datafusion/doc/src/metrics.rs b/datafusion/doc/src/metrics.rs new file mode 100644 index 0000000000000..70ce4f4cc281e --- /dev/null +++ b/datafusion/doc/src/metrics.rs @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Documentation structures for execution metrics and operators. + +/// Groupings and exports for metrics documentation (mirrors how function doc sections are exposed). +pub mod metric_doc_sections { + pub use super::{ + DocumentedExec, DocumentedMetrics, ExecDoc, ExecDocEntry, MetricDoc, + MetricDocEntry, MetricDocPosition, MetricFieldDoc, exec_docs, metric_docs, + }; + pub use inventory; +} + +/// Whether a metrics struct should be documented as common or operator-specific. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MetricDocPosition { + /// Metrics that are reused across operators (for example `BaselineMetrics`). + Common, + /// Metrics that are tied to a specific operator. + Operator, +} + +/// Documentation for a single metric field. +#[derive(Debug)] +pub struct MetricFieldDoc { + /// Name of the metric. + pub name: &'static str, + /// Documentation for the metric. + pub doc: &'static str, + /// Type name of the metric field. + pub type_name: &'static str, +} + +/// Documentation attached to a metrics struct. +#[derive(Debug)] +pub struct MetricDoc { + /// Name of the metrics struct (usually ends with `Metrics`). + pub name: &'static str, + /// Documentation from the struct-level doc comment. + pub doc: &'static str, + /// Documentation for each metric field. + pub fields: &'static [MetricFieldDoc], + /// Whether the metrics are common or operator-specific. + pub position: MetricDocPosition, +} + +/// Documentation for an execution plan implementation. +#[derive(Debug)] +pub struct ExecDoc { + /// Name of the execution plan struct (usually ends with `Exec`). + pub name: &'static str, + /// Documentation from the struct-level doc comment. + pub doc: &'static str, + /// Metrics exposed by this operator. + pub metrics: &'static [&'static MetricDoc], +} + +/// Trait implemented for metrics structs to expose their documentation. +pub trait DocumentedMetrics { + /// Static documentation for this metrics struct. + const DOC: &'static MetricDoc; + + /// Returns the documentation for this metrics struct. + fn metric_doc() -> &'static MetricDoc { + Self::DOC + } +} + +/// Trait implemented for execution plan structs to expose their documentation. +pub trait DocumentedExec { + /// Returns the documentation for this operator. + fn exec_doc() -> &'static ExecDoc; +} + +#[derive(Debug)] +pub struct MetricDocEntry(pub &'static MetricDoc); + +#[derive(Debug)] +pub struct ExecDocEntry(pub &'static ExecDoc); + +/// Iterate over all registered metrics docs. +pub fn metric_docs() -> impl Iterator { + inventory::iter:: + .into_iter() + .map(|entry| entry.0) +} + +/// Iterate over all registered execution plan docs. +pub fn exec_docs() -> impl Iterator { + inventory::iter:: + .into_iter() + .map(|entry| entry.0) +} + +inventory::collect!(MetricDocEntry); +inventory::collect!(ExecDocEntry); diff --git a/datafusion/doc/tests/metric_doc_tests.rs b/datafusion/doc/tests/metric_doc_tests.rs new file mode 100644 index 0000000000000..981a484f342e0 --- /dev/null +++ b/datafusion/doc/tests/metric_doc_tests.rs @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for the `#[metric_doc]` macro. +//! +//! These tests verify: +//! 1. Cross-file usage: metrics structs defined in one file, exec in another +//! 2. Multiple metrics groups: one exec referencing multiple metrics structs + +mod test_helpers; + +use datafusion_doc::metric_doc_sections::{DocumentedExec, DocumentedMetrics}; +use test_helpers::separate_exec::UserDefinedExec; +use test_helpers::separate_metrics::{MetricsGroupA, MetricsGroupB}; + +/// Test that metrics structs in a separate file correctly implement DocumentedMetrics +#[test] +fn test_cross_file_metrics_have_documented_metrics_trait() { + // MetricsGroupA should implement DocumentedMetrics + let doc_a = MetricsGroupA::metric_doc(); + assert_eq!(doc_a.name, "MetricsGroupA"); + assert!(doc_a.doc.contains("First group of metrics")); + assert_eq!(doc_a.fields.len(), 2); + + // MetricsGroupB should implement DocumentedMetrics + let doc_b = MetricsGroupB::metric_doc(); + assert_eq!(doc_b.name, "MetricsGroupB"); + assert!(doc_b.doc.contains("Second group of metrics")); + assert_eq!(doc_b.fields.len(), 2); +} + +/// Test that an exec with multiple metrics groups correctly implements DocumentedExec +#[test] +fn test_exec_with_multiple_metrics_groups() { + let exec_doc = UserDefinedExec::exec_doc(); + + // Verify exec documentation + assert_eq!(exec_doc.name, "UserDefinedExec"); + assert!(exec_doc.doc.contains("user-defined execution plan")); + + // Verify that both metrics groups are linked + assert_eq!( + exec_doc.metrics.len(), + 2, + "Expected 2 metrics groups, got {}", + exec_doc.metrics.len() + ); + + // Verify the metrics are the correct ones (order should match declaration order) + let metric_names: Vec<&str> = exec_doc.metrics.iter().map(|m| m.name).collect(); + assert_eq!( + metric_names[0], "MetricsGroupA", + "Expected MetricsGroupA in metrics, got {}", + metric_names[0] + ); + assert_eq!( + metric_names[1], "MetricsGroupB", + "Expected MetricsGroupB in metrics, got {}", + metric_names[1] + ); +} + +/// Test that field documentation is correctly extracted from metrics structs +#[test] +fn test_metrics_field_documentation() { + let doc_a = MetricsGroupA::metric_doc(); + + // Check that field docs are extracted + let field_names: Vec<&str> = doc_a.fields.iter().map(|f| f.name).collect(); + assert_eq!(field_names[0], "phase_a_time"); + assert_eq!(field_names[1], "phase_a_rows"); + + // Check that field descriptions are captured + let time_field = doc_a.fields.iter().find(|f| f.name == "phase_a_time"); + assert_eq!( + time_field.unwrap().doc.trim(), + "Time spent executing phase A" + ); +} diff --git a/datafusion/doc/tests/test_helpers/mod.rs b/datafusion/doc/tests/test_helpers/mod.rs new file mode 100644 index 0000000000000..9a5116c072bfc --- /dev/null +++ b/datafusion/doc/tests/test_helpers/mod.rs @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Test helper modules for metric_doc macro tests. + +pub mod separate_exec; +pub mod separate_metrics; diff --git a/datafusion/doc/tests/test_helpers/separate_exec.rs b/datafusion/doc/tests/test_helpers/separate_exec.rs new file mode 100644 index 0000000000000..960519e16c904 --- /dev/null +++ b/datafusion/doc/tests/test_helpers/separate_exec.rs @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Exec struct that references metrics from a different file. +//! This demonstrates cross-file usage and multiple metrics groups. + +#![allow(dead_code)] + +use datafusion_macros::metric_doc; + +// Import metrics from the separate file +use super::separate_metrics::{MetricsGroupA, MetricsGroupB}; + +/// A user-defined execution plan that demonstrates: +/// 1. Referencing metrics from a different file +/// 2. Having multiple metrics groups +#[metric_doc(MetricsGroupA, MetricsGroupB)] +pub struct UserDefinedExec { + /// Some internal state + pub state: String, +} diff --git a/datafusion/doc/tests/test_helpers/separate_metrics.rs b/datafusion/doc/tests/test_helpers/separate_metrics.rs new file mode 100644 index 0000000000000..387f72dae9e7c --- /dev/null +++ b/datafusion/doc/tests/test_helpers/separate_metrics.rs @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metrics structs defined in a separate file from the exec. +//! This demonstrates that metric_doc works across file boundaries. + +#![allow(dead_code)] + +use datafusion_macros::metric_doc; + +/// First group of metrics for UserDefinedExec. +/// Tracks phase A execution statistics. +#[metric_doc] +pub struct MetricsGroupA { + /// Time spent executing phase A + pub phase_a_time: u64, + /// Number of rows processed in phase A + pub phase_a_rows: usize, +} + +/// Second group of metrics for UserDefinedExec. +/// Tracks phase B execution statistics. +#[metric_doc] +pub struct MetricsGroupB { + /// Time spent executing phase B + pub phase_b_time: u64, + /// Number of rows processed in phase B + pub phase_b_rows: usize, +} diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index ca1fba07cae2d..b56b67eabd2e8 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -54,7 +54,9 @@ async-trait = { workspace = true } chrono = { workspace = true } dashmap = { workspace = true } datafusion-common = { workspace = true, default-features = false } +datafusion-doc = { workspace = true } datafusion-expr = { workspace = true, default-features = false } +datafusion-macros = { workspace = true } futures = { workspace = true } log = { workspace = true } object_store = { workspace = true, features = ["fs"] } diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs index aced2f46d7224..1bd7635d16758 100644 --- a/datafusion/execution/src/lib.rs +++ b/datafusion/execution/src/lib.rs @@ -45,6 +45,7 @@ pub mod registry { }; } +pub use datafusion_doc::metric_doc_sections; pub use disk_manager::DiskManager; pub use registry::FunctionRegistry; pub use stream::{RecordBatchStream, SendableRecordBatchStream}; diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 4fb78933d7a5c..c1e8d77a8300e 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -36,8 +36,6 @@ //! //! The [expr_fn] module contains functions for creating expressions. -extern crate core; - mod literal; mod operation; mod partition_evaluator; @@ -89,7 +87,7 @@ pub mod window_state; pub use datafusion_doc::{ DocSection, Documentation, DocumentationBuilder, aggregate_doc_sections, - scalar_doc_sections, window_doc_sections, + metric_doc_sections, scalar_doc_sections, window_doc_sections, }; pub use datafusion_expr_common::accumulator::Accumulator; pub use datafusion_expr_common::columnar_value::ColumnarValue; diff --git a/datafusion/macros/Cargo.toml b/datafusion/macros/Cargo.toml index 53691edf5a979..b39f743622917 100644 --- a/datafusion/macros/Cargo.toml +++ b/datafusion/macros/Cargo.toml @@ -39,11 +39,10 @@ workspace = true [lib] name = "datafusion_macros" -# lib.rs to be re-added in the future -path = "src/user_doc.rs" proc-macro = true [dependencies] datafusion-doc = { workspace = true } -quote = "1.0.41" -syn = { version = "2.0.111", features = ["full"] } +proc-macro2 = "1.0" +quote = "1.0" +syn = { version = "2.0", features = ["full"] } diff --git a/datafusion/macros/src/lib.rs b/datafusion/macros/src/lib.rs new file mode 100644 index 0000000000000..4cd8cee0130ea --- /dev/null +++ b/datafusion/macros/src/lib.rs @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#![doc( + html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg", + html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg" +)] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![deny(clippy::allow_attributes)] + +extern crate proc_macro; + +mod metric_doc_impl; +mod user_doc_impl; + +use proc_macro::TokenStream; + +/// This procedural macro is intended to parse a rust custom attribute and create user documentation +/// from it by constructing a `DocumentBuilder()` automatically. The `Documentation` can be +/// retrieved from the `documentation()` method +/// declared on `AggregateUDF`, `WindowUDFImpl`, `ScalarUDFImpl` traits. +/// For `doc_section`, this macro will try to find corresponding predefined `DocSection` by label field +/// Predefined `DocSection` can be found in datafusion/expr/src/udf.rs +/// Example: +/// ```ignore +/// #[user_doc( +/// doc_section(label = "Time and Date Functions"), +/// description = r"Converts a value to a date (`YYYY-MM-DD`).", +/// syntax_example = "to_date('2017-05-31', '%Y-%m-%d')", +/// sql_example = r#"```sql +/// > select to_date('2023-01-31'); +/// +-----------------------------+ +/// | to_date(Utf8(\"2023-01-31\")) | +/// +-----------------------------+ +/// | 2023-01-31 | +/// +-----------------------------+ +/// ```"#, +/// standard_argument(name = "expression", prefix = "String"), +/// argument( +/// name = "format_n", +/// description = r"Optional [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression. Formats will be tried in the order +/// they appear with the first successful one being returned. If none of the formats successfully parse the expression +/// an error will be returned." +/// ) +/// )] +/// #[derive(Debug)] +/// pub struct ToDateFunc { +/// signature: Signature, +/// } +/// ``` +/// will generate the following code +/// ```ignore +/// pub struct ToDateFunc { +/// signature: Signature, +/// } +/// impl ToDateFunc { +/// fn doc(&self) -> Option<&datafusion_doc::Documentation> { +/// static DOCUMENTATION: std::sync::LazyLock< +/// datafusion_doc::Documentation, +/// > = std::sync::LazyLock::new(|| { +/// datafusion_doc::Documentation::builder( +/// datafusion_doc::DocSection { +/// include: true, +/// label: "Time and Date Functions", +/// description: None, +/// }, +/// r"Converts a value to a date (`YYYY-MM-DD`).".to_string(), +/// "to_date('2017-05-31', '%Y-%m-%d')".to_string(), +/// ) +/// .with_sql_example( +/// r#"```sql +/// > select to_date('2023-01-31'); +/// +-----------------------------+ +/// | to_date(Utf8(\"2023-01-31\")) | +/// +-----------------------------+ +/// | 2023-01-31 | +/// +-----------------------------+ +/// ```"#, +/// ) +/// .with_standard_argument("expression", "String".into()) +/// .with_argument( +/// "format_n", +/// r"Optional [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression. Formats will be tried in the order +/// they appear with the first successful one being returned. If none of the formats successfully parse the expression +/// an error will be returned.", +/// ) +/// .build() +/// }); +/// Some(&DOCUMENTATION) +/// } +/// } +/// ``` +#[proc_macro_attribute] +pub fn user_doc(args: TokenStream, input: TokenStream) -> TokenStream { + user_doc_impl::user_doc(args, input) +} + +/// Helper attribute to register metrics structs or execs for documentation generation. +/// +/// # Usage +/// +/// ## On Metrics Structs +/// +/// Use `#[metric_doc]` (no arguments) for operator-specific metrics: +/// ```ignore +/// #[metric_doc] +/// struct FilterExecMetrics { +/// /// Common metrics for most operators +/// baseline_metrics: BaselineMetrics, +/// /// Selectivity of the filter +/// selectivity: RatioMetrics, +/// } +/// ``` +/// +/// Use `#[metric_doc(common)]` for reusable metrics shared across operators: +/// ```ignore +/// #[metric_doc(common)] +/// pub struct BaselineMetrics { +/// /// Amount of time the operator was actively using the CPU +/// elapsed_compute: Time, +/// /// Total output rows +/// output_rows: Count, +/// } +/// ``` +/// +/// ## On Exec Structs +/// +/// Reference a single metrics struct: +/// ```ignore +/// #[metric_doc(FilterExecMetrics)] +/// pub struct FilterExec { /* ... */ } +/// ``` +/// +/// Reference multiple metrics structs (for operators with multiple metric groups): +/// ```ignore +/// #[metric_doc(MetricsGroupA, MetricsGroupB)] +/// pub struct UserDefinedExec { /* ... */ } +/// ``` +/// +/// ## Cross-File Usage +/// +/// Metrics structs can be defined in different files/modules from the exec. +/// The macro resolves references at compile time, so metrics can live anywhere +/// as long as they are in scope where the exec is defined: +/// ```ignore +/// // In metrics/groups.rs +/// #[metric_doc] +/// pub struct MetricsGroupA { +/// /// Time spent in phase A +/// phase_a_time: Time, +/// } +/// +/// #[metric_doc] +/// pub struct MetricsGroupB { +/// /// Time spent in phase B +/// phase_b_time: Time, +/// } +/// +/// // In exec/user_defined.rs +/// use crate::metrics::groups::{MetricsGroupA, MetricsGroupB}; +/// +/// #[metric_doc(MetricsGroupA, MetricsGroupB)] +/// pub struct UserDefinedExec { /* ... */ } +/// ``` +#[proc_macro_attribute] +pub fn metric_doc(args: TokenStream, input: TokenStream) -> TokenStream { + metric_doc_impl::metric_doc(&args, input) +} + +/// Derive macro used on `*Metrics` structs to expose doc comments for documentation generation. +#[proc_macro_derive(DocumentedMetrics, attributes(metric_doc, metric_doc_attr))] +pub fn derive_documented_metrics(input: TokenStream) -> TokenStream { + metric_doc_impl::derive_documented_metrics(input) +} + +/// Derive macro used on `*Exec` structs to link them to the metrics they expose. +#[proc_macro_derive(DocumentedExec, attributes(metric_doc, metric_doc_attr))] +pub fn derive_documented_exec(input: TokenStream) -> TokenStream { + metric_doc_impl::derive_documented_exec(input) +} diff --git a/datafusion/macros/src/metric_doc_impl.rs b/datafusion/macros/src/metric_doc_impl.rs new file mode 100644 index 0000000000000..2888c784cebb5 --- /dev/null +++ b/datafusion/macros/src/metric_doc_impl.rs @@ -0,0 +1,246 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metrics documentation macros and helpers. + +use proc_macro::TokenStream; +use quote::{format_ident, quote}; +use syn::{ + Attribute, Data, DeriveInput, Expr, ExprLit, Lit, Path, Token, parse::Parser, + spanned::Spanned, +}; + +pub fn metric_doc(args: &TokenStream, input: TokenStream) -> TokenStream { + let args_ts = proc_macro2::TokenStream::from(args.clone()); + let parsed: syn::punctuated::Punctuated = + match syn::punctuated::Punctuated::parse_terminated.parse2(args_ts.clone()) { + Ok(p) => p, + Err(err) => return err.to_compile_error().into(), + }; + + let is_metrics_struct = parsed.is_empty() || parsed.iter().all(is_position_arg); + let derive_attr = if is_metrics_struct { + quote!(#[derive(datafusion_macros::DocumentedMetrics)]) + } else { + quote!(#[derive(datafusion_macros::DocumentedExec)]) + }; + let meta_attr = quote!(#[metric_doc_attr(#args_ts)]); + let input_ts = proc_macro2::TokenStream::from(input); + + quote!(#derive_attr #meta_attr #input_ts).into() +} + +fn is_position_arg(meta: &syn::Meta) -> bool { + matches!(meta, syn::Meta::Path(p) if p.is_ident("common")) +} + +pub fn derive_documented_metrics(input: TokenStream) -> TokenStream { + match documented_metrics_impl(input) { + Ok(tokens) => tokens, + Err(err) => err.to_compile_error().into(), + } +} + +pub fn derive_documented_exec(input: TokenStream) -> TokenStream { + match documented_exec_impl(input) { + Ok(tokens) => tokens, + Err(err) => err.to_compile_error().into(), + } +} + +fn documented_metrics_impl(input: TokenStream) -> syn::Result { + let input = syn::parse::(input)?; + let ident = input.ident; + + let Data::Struct(data) = input.data else { + return Err(syn::Error::new( + ident.span(), + "DocumentedMetrics can only be derived for structs", + )); + }; + + let doc = doc_expr(&input.attrs); + let position = match metrics_position(&input.attrs)? { + MetricDocPosition::Common => { + quote!(datafusion_doc::metric_doc_sections::MetricDocPosition::Common) + } + MetricDocPosition::Operator => { + quote!(datafusion_doc::metric_doc_sections::MetricDocPosition::Operator) + } + }; + + let mut fields_docs = vec![]; + for field in data.fields { + let Some(field_ident) = field.ident else { + return Err(syn::Error::new( + field.span(), + "DocumentedMetrics does not support tuple structs", + )); + }; + + let field_type = &field.ty; + let doc = doc_expr(&field.attrs); + fields_docs.push( + quote! { datafusion_doc::metric_doc_sections::MetricFieldDoc { + name: stringify!(#field_ident), + doc: #doc, + type_name: stringify!(#field_type), + } }, + ); + } + + let mod_ident = format_ident!("__datafusion_doc_metrics_{}", ident); + + Ok(quote! { + #[allow(non_snake_case)] + mod #mod_ident { + use super::*; + + static DOCUMENTATION: datafusion_doc::metric_doc_sections::MetricDoc = + datafusion_doc::metric_doc_sections::MetricDoc { + name: stringify!(#ident), + doc: #doc, + fields: &[#(#fields_docs),*], + position: #position, + }; + + impl datafusion_doc::metric_doc_sections::DocumentedMetrics for super::#ident { + const DOC: &'static datafusion_doc::metric_doc_sections::MetricDoc = + &DOCUMENTATION; + } + + datafusion_doc::metric_doc_sections::inventory::submit! { + datafusion_doc::metric_doc_sections::MetricDocEntry(&DOCUMENTATION) + } + } + } + .into()) +} + +fn documented_exec_impl(input: TokenStream) -> syn::Result { + let input = syn::parse::(input)?; + let ident = input.ident; + + let Data::Struct(_) = input.data else { + return Err(syn::Error::new( + ident.span(), + "DocumentedExec can only be derived for structs", + )); + }; + + let doc = doc_expr(&input.attrs); + let metrics = metrics_list(&input.attrs)?; + let metrics_refs = metrics.iter().map(|metric| { + quote! { <#metric as datafusion_doc::metric_doc_sections::DocumentedMetrics>::DOC } + }); + let mod_ident = format_ident!("__datafusion_doc_exec_{}", ident); + + Ok(quote! { + #[allow(non_snake_case)] + mod #mod_ident { + use super::*; + + static DOCUMENTATION: datafusion_doc::metric_doc_sections::ExecDoc = + datafusion_doc::metric_doc_sections::ExecDoc { + name: stringify!(#ident), + doc: #doc, + metrics: &[#(#metrics_refs),*], + }; + + impl datafusion_doc::metric_doc_sections::DocumentedExec for super::#ident { + fn exec_doc( + ) -> &'static datafusion_doc::metric_doc_sections::ExecDoc { + &DOCUMENTATION + } + } + + datafusion_doc::metric_doc_sections::inventory::submit! { + datafusion_doc::metric_doc_sections::ExecDocEntry(&DOCUMENTATION) + } + } + } + .into()) +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +enum MetricDocPosition { + Common, + Operator, +} + +fn metrics_position(attrs: &[Attribute]) -> syn::Result { + let mut position = MetricDocPosition::Operator; + for attr in attrs { + if is_metrics_attr(attr) { + attr.parse_nested_meta(|meta| { + if meta.path.is_ident("common") { + position = MetricDocPosition::Common; + return Ok(()); + } + Err(meta.error("unsupported metric_doc attribute")) + })?; + } + } + Ok(position) +} + +fn metrics_list(attrs: &[Attribute]) -> syn::Result> { + let mut metrics = vec![]; + for attr in attrs { + if is_metrics_attr(attr) { + attr.parse_nested_meta(|meta| { + metrics.push(meta.path); + Ok(()) + })?; + } + } + + Ok(metrics) +} + +fn doc_expr(attrs: &[Attribute]) -> proc_macro2::TokenStream { + let mut doc_lines = Vec::new(); + + for attr in attrs { + if !attr.path().is_ident("doc") { + continue; + } + + if let syn::Meta::NameValue(meta) = &attr.meta + && let Expr::Lit(ExprLit { + lit: Lit::Str(lit_str), + .. + }) = &meta.value + { + doc_lines.push(lit_str.value()); + } + } + + if doc_lines.is_empty() { + quote!("") + } else { + let sanitized = doc_lines + .into_iter() + .map(|line| line.trim_end().to_string()) + .collect::>(); + quote!(concat!(#(#sanitized, "\n"),*)) + } +} + +fn is_metrics_attr(attr: &Attribute) -> bool { + attr.path().is_ident("metric_doc") || attr.path().is_ident("metric_doc_attr") +} diff --git a/datafusion/macros/src/user_doc.rs b/datafusion/macros/src/user_doc_impl.rs similarity index 65% rename from datafusion/macros/src/user_doc.rs rename to datafusion/macros/src/user_doc_impl.rs index 27f73fd955380..8723d650f8445 100644 --- a/datafusion/macros/src/user_doc.rs +++ b/datafusion/macros/src/user_doc_impl.rs @@ -15,95 +15,11 @@ // specific language governing permissions and limitations // under the License. -#![doc( - html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg", - html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg" -)] -#![cfg_attr(docsrs, feature(doc_cfg))] -#![deny(clippy::allow_attributes)] - -extern crate proc_macro; use datafusion_doc::scalar_doc_sections::doc_sections_const; use proc_macro::TokenStream; use quote::quote; use syn::{DeriveInput, LitStr, parse_macro_input}; -/// This procedural macro is intended to parse a rust custom attribute and create user documentation -/// from it by constructing a `DocumentBuilder()` automatically. The `Documentation` can be -/// retrieved from the `documentation()` method -/// declared on `AggregateUDF`, `WindowUDFImpl`, `ScalarUDFImpl` traits. -/// For `doc_section`, this macro will try to find corresponding predefined `DocSection` by label field -/// Predefined `DocSection` can be found in datafusion/expr/src/udf.rs -/// Example: -/// ```ignore -/// #[user_doc( -/// doc_section(label = "Time and Date Functions"), -/// description = r"Converts a value to a date (`YYYY-MM-DD`).", -/// syntax_example = "to_date('2017-05-31', '%Y-%m-%d')", -/// sql_example = r#"```sql -/// > select to_date('2023-01-31'); -/// +-----------------------------+ -/// | to_date(Utf8(\"2023-01-31\")) | -/// +-----------------------------+ -/// | 2023-01-31 | -/// +-----------------------------+ -/// ```"#, -/// standard_argument(name = "expression", prefix = "String"), -/// argument( -/// name = "format_n", -/// description = r"Optional [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression. Formats will be tried in the order -/// they appear with the first successful one being returned. If none of the formats successfully parse the expression -/// an error will be returned." -/// ) -/// )] -/// #[derive(Debug)] -/// pub struct ToDateFunc { -/// signature: Signature, -/// } -/// ``` -/// will generate the following code -/// ```ignore -/// pub struct ToDateFunc { -/// signature: Signature, -/// } -/// impl ToDateFunc { -/// fn doc(&self) -> Option<&datafusion_doc::Documentation> { -/// static DOCUMENTATION: std::sync::LazyLock< -/// datafusion_doc::Documentation, -/// > = std::sync::LazyLock::new(|| { -/// datafusion_doc::Documentation::builder( -/// datafusion_doc::DocSection { -/// include: true, -/// label: "Time and Date Functions", -/// description: None, -/// }, -/// r"Converts a value to a date (`YYYY-MM-DD`).".to_string(), -/// "to_date('2017-05-31', '%Y-%m-%d')".to_string(), -/// ) -/// .with_sql_example( -/// r#"```sql -/// > select to_date('2023-01-31'); -/// +-----------------------------+ -/// | to_date(Utf8(\"2023-01-31\")) | -/// +-----------------------------+ -/// | 2023-01-31 | -/// +-----------------------------+ -/// ```"#, -/// ) -/// .with_standard_argument("expression", "String".into()) -/// .with_argument( -/// "format_n", -/// r"Optional [Chrono format](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression. Formats will be tried in the order -/// they appear with the first successful one being returned. If none of the formats successfully parse the expression -/// an error will be returned.", -/// ) -/// .build() -/// }); -/// Some(&DOCUMENTATION) -/// } -/// } -/// ``` -#[proc_macro_attribute] pub fn user_doc(args: TokenStream, input: TokenStream) -> TokenStream { let mut doc_section_lbl: Option = None; diff --git a/datafusion/physical-expr-common/src/metrics/baseline.rs b/datafusion/physical-expr-common/src/metrics/baseline.rs index 0de8e26494931..9867abdbb30f8 100644 --- a/datafusion/physical-expr-common/src/metrics/baseline.rs +++ b/datafusion/physical-expr-common/src/metrics/baseline.rs @@ -21,6 +21,7 @@ use std::task::Poll; use arrow::record_batch::RecordBatch; use datafusion_common::{Result, utils::memory::get_record_batch_memory_size}; +use datafusion_macros::metric_doc; use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp}; @@ -45,6 +46,7 @@ use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp}; /// // when operator is finished: /// baseline_metrics.done(); /// ``` +#[metric_doc(common)] #[derive(Debug, Clone)] pub struct BaselineMetrics { /// end_time is set when `BaselineMetrics::done()` is called @@ -66,8 +68,6 @@ pub struct BaselineMetrics { /// output batches: the total output batch count output_batches: Count, - // Remember to update `docs/source/user-guide/metrics.md` when updating comments - // or adding new metrics } impl BaselineMetrics { @@ -180,6 +180,7 @@ impl Drop for BaselineMetrics { /// Helper for creating and tracking spill-related metrics for /// each operator +#[metric_doc(common)] #[derive(Debug, Clone)] pub struct SpillMetrics { /// count of spills during the execution of the operator @@ -204,6 +205,7 @@ impl SpillMetrics { } /// Metrics for tracking batch splitting activity +#[metric_doc(common)] #[derive(Debug, Clone)] pub struct SplitMetrics { /// Number of times an input [`RecordBatch`] was split diff --git a/datafusion/physical-expr-common/src/metrics/value.rs b/datafusion/physical-expr-common/src/metrics/value.rs index 9a14b804a20b5..f543f76237885 100644 --- a/datafusion/physical-expr-common/src/metrics/value.rs +++ b/datafusion/physical-expr-common/src/metrics/value.rs @@ -22,6 +22,7 @@ use chrono::{DateTime, Utc}; use datafusion_common::{ human_readable_count, human_readable_duration, human_readable_size, instant::Instant, }; +use datafusion_macros::metric_doc; use parking_lot::Mutex; use std::{ borrow::{Borrow, Cow}, @@ -368,6 +369,7 @@ impl Drop for ScopedTimerGuard<'_> { /// 8 of them using statistics, the pruning metrics would look like: 10 total -> 2 matched /// /// Note `clone`ing update the same underlying metrics +#[metric_doc(common)] #[derive(Debug, Clone)] pub struct PruningMetrics { pruned: Arc, @@ -438,6 +440,7 @@ impl PruningMetrics { /// Counters tracking ratio metrics (e.g. matched vs total) /// /// The counters are thread-safe and shared across clones. +#[metric_doc(common)] #[derive(Debug, Clone, Default)] pub struct RatioMetrics { part: Arc, diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 68e67fa018f08..94da61c914796 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -31,6 +31,9 @@ rust-version = { workspace = true } [package.metadata.docs.rs] all-features = true +[package.metadata.cargo-machete] +ignored = ["datafusion-doc"] + # Note: add additional linter rules in lib.rs. # Rust does not support workspace + new linter rules in subcrates yet # https://github.com/rust-lang/cargo/issues/13157 @@ -54,11 +57,13 @@ arrow-schema = { workspace = true } async-trait = { workspace = true } datafusion-common = { workspace = true } datafusion-common-runtime = { workspace = true, default-features = true } +datafusion-doc = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-functions = { workspace = true } datafusion-functions-aggregate-common = { workspace = true } datafusion-functions-window-common = { workspace = true } +datafusion-macros = { workspace = true } datafusion-physical-expr = { workspace = true, default-features = true } datafusion-physical-expr-common = { workspace = true } futures = { workspace = true } diff --git a/datafusion/physical-plan/src/aggregates/group_values/metrics.rs b/datafusion/physical-plan/src/aggregates/group_values/metrics.rs index b6c32204e85f0..496158ac2c86c 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/metrics.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/metrics.rs @@ -18,7 +18,9 @@ //! Metrics for the various group-by implementations. use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time}; +use datafusion_macros::metric_doc; +#[metric_doc] pub(crate) struct GroupByMetrics { /// Time spent calculating the group IDs from the evaluated grouping columns. pub(crate) time_calculating_group_ids: Time, diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 06f12a90195d2..7190aaf6b7745 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -22,8 +22,8 @@ use std::sync::Arc; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties}; use crate::aggregates::{ - no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream, - topk_stream::GroupedTopKAggregateStream, + group_values::GroupByMetrics, no_grouping::AggregateStream, + row_hash::GroupedHashAggregateStream, topk_stream::GroupedTopKAggregateStream, }; use crate::execution_plan::{CardinalityEffect, EmissionType}; use crate::filter_pushdown::{ @@ -50,6 +50,7 @@ use datafusion_common::{ }; use datafusion_execution::TaskContext; use datafusion_expr::{Accumulator, Aggregate}; +use datafusion_macros::metric_doc; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit}; @@ -490,6 +491,7 @@ enum DynamicFilterAggregateType { } /// Hash aggregate execution plan +#[metric_doc(GroupByMetrics)] #[derive(Debug, Clone)] pub struct AggregateExec { /// Aggregation mode (full, partial) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 674fe6692adf5..637b2deebb12a 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -64,6 +64,7 @@ use datafusion_physical_expr::{ conjunction, split_conjunction, }; +use datafusion_macros::metric_doc; use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -73,6 +74,7 @@ const FILTER_EXEC_DEFAULT_BATCH_SIZE: usize = 8192; /// FilterExec evaluates a boolean predicate against all input batches to determine which rows to /// include in its output batches. +#[metric_doc(FilterExecMetrics)] #[derive(Debug, Clone)] pub struct FilterExec { /// The expression to filter on. This expression must evaluate to a boolean value. @@ -691,13 +693,12 @@ struct FilterExecStream { } /// The metrics for `FilterExec` +#[metric_doc] struct FilterExecMetrics { /// Common metrics for most operators baseline_metrics: BaselineMetrics, /// Selectivity of the filter, calculated as output_rows / input_rows selectivity: RatioMetrics, - // Remember to update `docs/source/user-guide/metrics.md` when adding new metrics, - // or modifying metrics comments } impl FilterExecMetrics { diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 4f32b6176ec39..cc046c7effb4b 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -46,6 +46,7 @@ use datafusion_common::{ }; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion_macros::metric_doc; use datafusion_physical_expr::equivalence::join_equivalence_properties; use async_trait::async_trait; @@ -76,6 +77,7 @@ struct JoinLeftData { /// Note this structure includes a [`OnceAsync`] that is used to coordinate the /// loading of the left side with the processing in each output stream. /// Therefore it can not be [`Clone`] +#[metric_doc(BuildProbeJoinMetrics)] #[derive(Debug)] pub struct CrossJoinExec { /// left (build) side which gets loaded in memory diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 91fc1ee4436ee..e5509441690ab 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -74,6 +74,7 @@ use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_expr::Accumulator; use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator}; +use datafusion_macros::metric_doc; use datafusion_physical_expr::equivalence::{ ProjectionMapping, join_equivalence_properties, }; @@ -315,6 +316,7 @@ impl JoinLeftData { /// Note this structure includes a [`OnceAsync`] that is used to coordinate the /// loading of the left side with the processing in each output stream. /// Therefore it can not be [`Clone`] +#[metric_doc(BuildProbeJoinMetrics)] pub struct HashJoinExec { /// left (build) side which gets hashed pub left: Arc, diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 76dca7239114b..021139833bc42 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -47,6 +47,7 @@ use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, }; +use datafusion_macros::metric_doc; use arrow::array::{ Array, BooleanArray, BooleanBufferBuilder, RecordBatchOptions, UInt32Array, @@ -169,6 +170,7 @@ use parking_lot::Mutex; /// Note this structure includes a [`OnceAsync`] that is used to coordinate the /// loading of the left side with the processing in each output stream. /// Therefore it can not be [`Clone`] +#[metric_doc(NestedLoopJoinMetrics)] #[derive(Debug)] pub struct NestedLoopJoinExec { /// left side @@ -797,6 +799,7 @@ pub(crate) struct NestedLoopJoinStream { current_right_batch_matched: Option, } +#[metric_doc] pub(crate) struct NestedLoopJoinMetrics { /// Join execution metrics pub(crate) join_metrics: BuildProbeJoinMetrics, diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs index 508be2e3984f4..e23e931120769 100644 --- a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs @@ -29,6 +29,7 @@ use datafusion_execution::{ memory_pool::{MemoryConsumer, MemoryReservation}, }; use datafusion_expr::{JoinType, Operator}; +use datafusion_macros::metric_doc; use datafusion_physical_expr::equivalence::join_equivalence_properties; use datafusion_physical_expr::{ Distribution, LexOrdering, OrderingRequirements, PhysicalExpr, PhysicalExprRef, @@ -248,6 +249,7 @@ use crate::{ /// /// # Further Reference Material /// DuckDB blog on Range Joins: [Range Joins in DuckDB](https://duckdb.org/2022/05/27/iejoin.html) +#[metric_doc(BuildProbeJoinMetrics)] #[derive(Debug)] pub struct PiecewiseMergeJoinExec { /// Left buffered execution plan diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index 5560c29d546b3..1f2d1d43ad508 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -50,6 +50,7 @@ use datafusion_common::{ }; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::MemoryConsumer; +use datafusion_macros::metric_doc; use datafusion_physical_expr::equivalence::join_equivalence_properties; use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql}; use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements}; @@ -102,6 +103,7 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequiremen /// /// Helpful short video demonstration: /// . +#[metric_doc(SortMergeJoinMetrics)] #[derive(Debug, Clone)] pub struct SortMergeJoinExec { /// Left sorted joining execution plan diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs b/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs index 8457408919e63..543a6ca060222 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs @@ -21,8 +21,10 @@ use crate::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, SpillMetrics, Time, }; +use datafusion_macros::metric_doc; /// Metrics for SortMergeJoinExec +#[metric_doc] pub(super) struct SortMergeJoinMetrics { /// Total time for joining probe-side batches to the build-side batches join_time: Time, diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs index 22cc82a22db5f..d2b586065f4a4 100644 --- a/datafusion/physical-plan/src/joins/stream_join_utils.rs +++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs @@ -39,6 +39,7 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::{HashSet, JoinSide, Result, ScalarValue, arrow_datafusion_err}; use datafusion_expr::interval_arithmetic::Interval; +use datafusion_macros::metric_doc; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph; use datafusion_physical_expr::utils::collect_columns; @@ -671,6 +672,7 @@ pub fn record_visited_indices( } } +#[metric_doc] #[derive(Debug)] pub struct StreamJoinSideMetrics { /// Number of batches consumed by this operator @@ -680,6 +682,7 @@ pub struct StreamJoinSideMetrics { } /// Metrics for HashJoinExec +#[metric_doc] #[derive(Debug)] pub struct StreamJoinMetrics { /// Number of left batches/rows consumed by this operator diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 1f6bc703a0300..e7bb80196be9b 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -73,6 +73,7 @@ use datafusion_common::{ use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_expr::interval_arithmetic::Interval; +use datafusion_macros::metric_doc; use datafusion_physical_expr::equivalence::join_equivalence_properties; use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph; use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql}; @@ -170,6 +171,7 @@ const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4; /// making the smallest value in 'left_sorted' 1231 and any rows below (since ascending) /// than that can be dropped from the inner buffer. /// ``` +#[metric_doc(StreamJoinMetrics)] #[derive(Debug, Clone)] pub struct SymmetricHashJoinExec { /// Left side stream diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index e0cdda6d5a729..6b0e2c9df3566 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -34,6 +34,7 @@ use crate::projection::{ProjectionExec, ProjectionExpr}; use crate::{ ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, Statistics, }; +use datafusion_macros::metric_doc; // compatibility pub use super::join_filter::JoinFilter; pub use super::join_hash_map::JoinHashMapType; @@ -1322,6 +1323,7 @@ fn append_probe_indices_in_order( } /// Metrics for build & probe joins +#[metric_doc(common)] #[derive(Clone, Debug)] pub(crate) struct BuildProbeJoinMetrics { pub(crate) baseline: BaselineMetrics, diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 1efdaaabc7d6a..e0b0b7a8e7296 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -54,6 +54,7 @@ use datafusion_common::{Result, not_impl_err}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::MemoryConsumer; +use datafusion_macros::metric_doc; use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; @@ -674,6 +675,7 @@ impl BatchPartitioner { /// system Paper](https://dl.acm.org/doi/pdf/10.1145/93605.98720) /// which uses the term "Exchange" for the concept of repartitioning /// data across threads. +#[metric_doc(RepartitionMetrics)] #[derive(Debug, Clone)] pub struct RepartitionExec { /// Input execution plan @@ -690,6 +692,7 @@ pub struct RepartitionExec { cache: PlanProperties, } +#[metric_doc] #[derive(Debug, Clone)] struct RepartitionMetrics { /// Time in nanos to execute child operator and fetch batches diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 3e8fdf1f3ed7e..00c76966f002f 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -45,6 +45,7 @@ use crate::stream::RecordBatchStreamAdapter; use crate::stream::ReservationStream; use crate::topk::TopK; use crate::topk::TopKDynamicFilters; +use crate::topk::TopKMetrics; use crate::{ DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, @@ -62,6 +63,7 @@ use datafusion_common::{ use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; +use datafusion_macros::metric_doc; use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; @@ -69,6 +71,7 @@ use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; +#[metric_doc] struct ExternalSorterMetrics { /// metrics baseline: BaselineMetrics, @@ -935,6 +938,7 @@ pub fn sort_batch_chunked( /// /// Support sorting datasets that are larger than the memory allotted /// by the memory manager, by spilling to disk. +#[metric_doc(ExternalSorterMetrics, TopKMetrics)] #[derive(Debug, Clone)] pub struct SortExec { /// Input schema diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index ebac497f4fbc3..ab0ace889f2c4 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -41,6 +41,7 @@ use datafusion_execution::{ memory_pool::{MemoryConsumer, MemoryReservation}, runtime_env::RuntimeEnv, }; +use datafusion_macros::metric_doc; use datafusion_physical_expr::{ PhysicalExpr, expressions::{BinaryExpr, DynamicFilterPhysicalExpr, is_not_null, is_null, lit}, @@ -631,7 +632,8 @@ impl TopK { } } -struct TopKMetrics { +#[metric_doc] +pub(crate) struct TopKMetrics { /// metrics pub baseline: BaselineMetrics, diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 5fef754e80780..c009c824e02e4 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -47,6 +47,7 @@ use datafusion_common::{ internal_err, }; use datafusion_execution::TaskContext; +use datafusion_macros::metric_doc; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_expr::expressions::Column; @@ -59,6 +60,7 @@ use log::trace; /// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m') /// /// See [`UnnestOptions`] for more details and an example. +#[metric_doc(UnnestMetrics)] #[derive(Debug, Clone)] pub struct UnnestExec { /// Input execution plan @@ -269,6 +271,7 @@ impl ExecutionPlan for UnnestExec { } } +#[metric_doc] #[derive(Clone, Debug)] struct UnnestMetrics { /// Execution metrics diff --git a/dev/rust_lint.sh b/dev/rust_lint.sh index 21d4611846413..674778d5c02d3 100755 --- a/dev/rust_lint.sh +++ b/dev/rust_lint.sh @@ -50,3 +50,4 @@ ci/scripts/rust_docs.sh ci/scripts/license_header.sh ci/scripts/typos_check.sh ci/scripts/doc_prettier_check.sh +ci/scripts/check_metric_docs.sh diff --git a/dev/update_config_docs.sh b/dev/update_config_docs.sh index 90bbc5d3bad06..0e12f4ccc52d0 100755 --- a/dev/update_config_docs.sh +++ b/dev/update_config_docs.sh @@ -239,6 +239,6 @@ EOF echo "Running prettier" -npx prettier@2.3.2 --write "$TARGET_FILE" +npx prettier@3.7.4 --write "$TARGET_FILE" echo "'$TARGET_FILE' successfully updated!" diff --git a/dev/update_function_docs.sh b/dev/update_function_docs.sh index 63f4f2475c471..68e9635ded8c1 100755 --- a/dev/update_function_docs.sh +++ b/dev/update_function_docs.sh @@ -25,7 +25,7 @@ cd "${SOURCE_DIR}/../" && pwd TARGET_FILE="docs/source/user-guide/sql/aggregate_functions.md" -PRINT_AGGREGATE_FUNCTION_DOCS_COMMAND="cargo run --manifest-path datafusion/core/Cargo.toml --bin print_functions_docs -- aggregate" +PRINT_AGGREGATE_FUNCTION_DOCS_COMMAND="cargo run --manifest-path datafusion/core/Cargo.toml --features docs --bin print_functions_docs -- aggregate" echo "Inserting header" cat <<'EOF' > "$TARGET_FILE" @@ -114,12 +114,12 @@ echo "Running CLI and inserting aggregate function docs table" $PRINT_AGGREGATE_FUNCTION_DOCS_COMMAND >> "$TARGET_FILE" echo "Running prettier" -npx prettier@2.3.2 --write "$TARGET_FILE" +npx prettier@3.7.4 --write "$TARGET_FILE" echo "'$TARGET_FILE' successfully updated!" TARGET_FILE="docs/source/user-guide/sql/scalar_functions.md" -PRINT_SCALAR_FUNCTION_DOCS_COMMAND="cargo run --manifest-path datafusion/core/Cargo.toml --bin print_functions_docs -- scalar" +PRINT_SCALAR_FUNCTION_DOCS_COMMAND="cargo run --manifest-path datafusion/core/Cargo.toml --features docs --bin print_functions_docs -- scalar" echo "Inserting header" cat <<'EOF' > "$TARGET_FILE" @@ -158,12 +158,12 @@ echo "Running CLI and inserting scalar function docs table" $PRINT_SCALAR_FUNCTION_DOCS_COMMAND >> "$TARGET_FILE" echo "Running prettier" -npx prettier@2.3.2 --write "$TARGET_FILE" +npx prettier@3.7.4 --write "$TARGET_FILE" echo "'$TARGET_FILE' successfully updated!" TARGET_FILE="docs/source/user-guide/sql/window_functions.md" -PRINT_WINDOW_FUNCTION_DOCS_COMMAND="cargo run --manifest-path datafusion/core/Cargo.toml --bin print_functions_docs -- window" +PRINT_WINDOW_FUNCTION_DOCS_COMMAND="cargo run --manifest-path datafusion/core/Cargo.toml --features docs --bin print_functions_docs -- window" echo "Inserting header" cat <<'EOF' > "$TARGET_FILE" @@ -336,6 +336,6 @@ echo "Running CLI and inserting window function docs table" $PRINT_WINDOW_FUNCTION_DOCS_COMMAND >> "$TARGET_FILE" echo "Running prettier" -npx prettier@2.3.2 --write "$TARGET_FILE" +npx prettier@3.7.4 --write "$TARGET_FILE" echo "'$TARGET_FILE' successfully updated!" diff --git a/dev/update_metric_docs.sh b/dev/update_metric_docs.sh new file mode 100755 index 0000000000000..580e49887a86c --- /dev/null +++ b/dev/update_metric_docs.sh @@ -0,0 +1,68 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +set -euo pipefail + +SOURCE_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "${SOURCE_DIR}/../" && pwd + +TARGET_FILE="docs/source/user-guide/metrics.md" +PRINT_METRIC_DOCS_COMMAND="cargo run --manifest-path datafusion/core/Cargo.toml --features docs --bin print_metric_docs" + +echo "Inserting header" +cat <<'EOF' > "$TARGET_FILE" + + + + +# Metrics + +DataFusion operators expose runtime metrics so you can understand where time is spent and how much data flows through the pipeline. See more in [EXPLAIN ANALYZE](sql/explain.md#explain-analyze). + +EOF + +echo "Running CLI and inserting metric docs table" +$PRINT_METRIC_DOCS_COMMAND >> "$TARGET_FILE" + +echo "Running prettier" +npx prettier@3.7.4 --write "$TARGET_FILE" + +echo "'$TARGET_FILE' successfully updated!" diff --git a/docs/source/library-user-guide/query-optimizer.md b/docs/source/library-user-guide/query-optimizer.md index 8ed6593d56203..d960aa3b3d30c 100644 --- a/docs/source/library-user-guide/query-optimizer.md +++ b/docs/source/library-user-guide/query-optimizer.md @@ -443,13 +443,11 @@ When analyzing expressions, DataFusion runs boundary analysis using interval ari Consider a simple predicate like age > 18 AND age <= 25. The analysis flows as follows: 1. Context Initialization - - Begin with known column statistics - Set up initial boundaries based on column constraints - Initialize the shared analysis context 2. Expression Tree Walk - - Analyze each node in the expression tree - Propagate boundary information upward - Allow child nodes to influence parent boundaries diff --git a/docs/source/user-guide/concepts-readings-events.md b/docs/source/user-guide/concepts-readings-events.md index ad444ef91c474..3b5a244f04ca9 100644 --- a/docs/source/user-guide/concepts-readings-events.md +++ b/docs/source/user-guide/concepts-readings-events.md @@ -70,7 +70,6 @@ This is a list of DataFusion related blog posts, articles, and other resources. - **2024-10-16** [Blog: Candle Image Segmentation](https://www.letsql.com/posts/candle-image-segmentation/) - **2024-09-23 → 2024-12-02** [Talks: Carnegie Mellon University: Database Building Blocks Seminar Series - Fall 2024](https://db.cs.cmu.edu/seminar2024/) - - **2024-11-12** [Video: Building InfluxDB 3.0 with the FDAP Stack: Apache Flight, DataFusion, Arrow and Parquet (Paul Dix)](https://www.youtube.com/watch?v=AGS4GNGDK_4) - **2024-11-04** [Video: Synnada: Towards “Unified” Compute Engines: Opportunities and Challenges (Mehmet Ozan Kabak)](https://www.youtube.com/watch?v=z38WY9uZtt4) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index c9222afe8ceb5..18d35cd442f72 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -58,137 +58,137 @@ example, to configure `datafusion.execution.target_partitions`: SET datafusion.execution.target_partitions = '1'; ``` -[`configoptions`]: https://docs.rs/datafusion/latest/datafusion/common/config/struct.ConfigOptions.html -[`configoptions::from_env`]: https://docs.rs/datafusion/latest/datafusion/common/config/struct.ConfigOptions.html#method.from_env +[`ConfigOptions`]: https://docs.rs/datafusion/latest/datafusion/common/config/struct.ConfigOptions.html +[`ConfigOptions::from_env`]: https://docs.rs/datafusion/latest/datafusion/common/config/struct.ConfigOptions.html#method.from_env The following configuration settings are available: -| key | default | description | -| ----------------------------------------------------------------------- | ------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| datafusion.catalog.create_default_catalog_and_schema | true | Whether the default catalog and schema should be created automatically. | -| datafusion.catalog.default_catalog | datafusion | The default catalog name - this impacts what SQL queries use if not specified | -| datafusion.catalog.default_schema | public | The default schema name - this impacts what SQL queries use if not specified | -| datafusion.catalog.information_schema | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | -| datafusion.catalog.location | NULL | Location scanned to load tables for `default` schema | -| datafusion.catalog.format | NULL | Type of `TableProvider` to use when loading `default` schema | -| datafusion.catalog.has_header | true | Default value for `format.has_header` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. | -| datafusion.catalog.newlines_in_values | false | Specifies whether newlines in (quoted) CSV values are supported. This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. Parsing newlines in quoted values may be affected by execution behaviour such as parallel file scanning. Setting this to `true` ensures that newlines in values are parsed successfully, which may reduce performance. | -| datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption | -| datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | -| datafusion.execution.collect_statistics | true | Should DataFusion collect statistics when first creating a table. Has no effect after the table is created. Applies to the default `ListingTableProvider` in DataFusion. Defaults to true. | -| datafusion.execution.target_partitions | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system | -| datafusion.execution.time_zone | NULL | The default time zone Some functions, e.g. `now` return timestamps in this time zone | -| datafusion.execution.parquet.enable_page_index | true | (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. | -| datafusion.execution.parquet.pruning | true | (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file | -| datafusion.execution.parquet.skip_metadata | true | (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata | -| datafusion.execution.parquet.metadata_size_hint | 524288 | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed. | -| datafusion.execution.parquet.pushdown_filters | false | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". | -| datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | -| datafusion.execution.parquet.force_filter_selections | false | (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. | -| datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | -| datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | -| datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | -| datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | -| datafusion.execution.parquet.max_predicate_cache_size | NULL | (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. | -| datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | -| datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes | -| datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" | -| datafusion.execution.parquet.skip_arrow_metadata | false | (writing) Skip encoding the embedded arrow metadata in the KV_meta This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`. Refer to | -| datafusion.execution.parquet.compression | zstd(3) | (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting. | -| datafusion.execution.parquet.dictionary_enabled | true | (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.dictionary_page_size_limit | 1048576 | (writing) Sets best effort maximum dictionary page size, in bytes | -| datafusion.execution.parquet.statistics_enabled | page | (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.max_row_group_size | 1048576 | (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. | -| datafusion.execution.parquet.created_by | datafusion version 51.0.0 | (writing) Sets "created by" property | -| datafusion.execution.parquet.column_index_truncate_length | 64 | (writing) Sets column index truncate length | -| datafusion.execution.parquet.statistics_truncate_length | 64 | (writing) Sets statistics truncate length. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.data_page_row_count_limit | 20000 | (writing) Sets best effort maximum number of rows in data page | -| datafusion.execution.parquet.encoding | NULL | (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.bloom_filter_on_write | false | (writing) Write bloom filters for all columns when creating parquet files | -| datafusion.execution.parquet.bloom_filter_fpp | NULL | (writing) Sets bloom filter false positive probability. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.bloom_filter_ndv | NULL | (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.allow_single_file_parallelism | true | (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | -| datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | -| datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | -| datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | -| datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | -| datafusion.execution.spill_compression | uncompressed | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. | -| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | -| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | -| datafusion.execution.max_spill_file_size_bytes | 134217728 | Maximum size in bytes for individual spill files before rotating to a new file. When operators spill data to disk (e.g., RepartitionExec), they write multiple batches to the same file until this size limit is reached, then rotate to a new file. This reduces syscall overhead compared to one-file-per-batch while preventing files from growing too large. A larger value reduces file creation overhead but may hold more disk space. A smaller value creates more files but allows finer-grained space reclamation as files can be deleted once fully consumed. Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators may create spill files larger than the limit. Default: 128 MB | -| datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | -| datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | -| datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | -| datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | -| datafusion.execution.listing_table_ignore_subdirectory | true | Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). | -| datafusion.execution.listing_table_factory_infer_partitions | true | Should a `ListingTable` created through the `ListingTableFactory` infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema). | -| datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | -| datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | -| datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches | -| datafusion.execution.skip_partial_aggregation_probe_ratio_threshold | 0.8 | Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input | -| datafusion.execution.skip_partial_aggregation_probe_rows_threshold | 100000 | Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode | -| datafusion.execution.use_row_number_estimates_to_optimize_partitioning | false | Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. | -| datafusion.execution.enforce_batch_size_in_joins | false | Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. | -| datafusion.execution.objectstore_writer_buffer_size | 10485760 | Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. | -| datafusion.execution.enable_ansi_mode | false | Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default. | -| datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | -| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | -| datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | -| datafusion.optimizer.enable_window_limits | true | When set to true, the optimizer will attempt to push limit operations past window functions, if possible | -| datafusion.optimizer.enable_topk_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase. | -| datafusion.optimizer.enable_join_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. | -| datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase. | -| datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. | -| datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | -| datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | -| datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | -| datafusion.optimizer.repartition_joins | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | -| datafusion.optimizer.allow_symmetric_joins_without_pruning | true | Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. | -| datafusion.optimizer.repartition_file_scans | true | When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism. This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition). For FileSources, only Parquet and CSV formats are currently supported. If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't happen within a single file. If set to `true` for an in-memory source, all memtable's partitions will have their batches repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change the total number of partitions and batches per partition, but does not slice the initial record tables provided to the MemTable on creation. | -| datafusion.optimizer.preserve_file_partitions | 0 | Minimum number of distinct partition values required to group files by their Hive partition column values (enabling Hash partitioning declaration). How the option is used: - preserve_file_partitions=0: Disable it. - preserve_file_partitions=1: Always enable it. - preserve_file_partitions=N, actual file partitions=M: Only enable when M >= N. This threshold preserves I/O parallelism when file partitioning is below it. Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct partitions is less than the target_partitions. | -| datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level | -| datafusion.optimizer.repartition_sorts | true | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` would turn into the plan below which performs better in multithreaded environments `text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` | -| datafusion.optimizer.subset_repartition_threshold | 4 | Partition count threshold for subset satisfaction optimization. When the current partition count is >= this threshold, DataFusion will skip repartitioning if the required partitioning expression is a subset of the current partition expression such as Hash(a) satisfies Hash(a, b). When the current partition count is < this threshold, DataFusion will repartition to increase parallelism even when subset satisfaction applies. Set to 0 to always repartition (disable subset satisfaction optimization). Set to a high value to always use subset satisfaction. Example (subset_repartition_threshold = 4): `text Hash([a]) satisfies Hash([a, b]) because (Hash([a, b]) is subset of Hash([a]) If current partitions (3) < threshold (4), repartition: AggregateExec: mode=FinalPartitioned, gby=[a, b], aggr=[SUM(x)] RepartitionExec: partitioning=Hash([a, b], 8), input_partitions=3 AggregateExec: mode=Partial, gby=[a, b], aggr=[SUM(x)] DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 3) If current partitions (8) >= threshold (4), use subset satisfaction: AggregateExec: mode=SinglePartitioned, gby=[a, b], aggr=[SUM(x)] DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 8) ` | -| datafusion.optimizer.prefer_existing_sort | false | When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. | -| datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | -| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | -| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | -| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | -| datafusion.optimizer.enable_piecewise_merge_join | false | When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter. | -| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | -| datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition | -| datafusion.optimizer.hash_join_inlist_pushdown_max_size | 131072 | Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides larger than this will use hash table lookups instead. Set to 0 to always use hash table lookups. InList pushdown can be more efficient for small build sides because it can result in better statistics pruning as well as use any bloom filters present on the scan side. InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion. On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory. This setting is per-partition, so we may end up using `hash_join_inlist_pushdown_max_size` \* `target_partitions` memory. The default is 128kB per partition. This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases but avoids excessive memory usage or overhead for larger joins. | -| datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values | 150 | Maximum number of distinct values (rows) in the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides with more rows than this will use hash table lookups instead. Set to 0 to always use hash table lookups. This provides an additional limit beyond `hash_join_inlist_pushdown_max_size` to prevent very large IN lists that might not provide much benefit over hash table lookups. This uses the deduplicated row count once the build side has been evaluated. The default is 150 values per partition. This is inspired by Trino's `max-filter-keys-per-column` setting. See: | -| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | -| datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave | -| datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. | -| datafusion.optimizer.enable_sort_pushdown | true | Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true | -| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | -| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | -| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans | -| datafusion.explain.show_sizes | true | When set to true, the explain statement will print the partition sizes | -| datafusion.explain.show_schema | false | When set to true, the explain statement will print schema information | -| datafusion.explain.format | indent | Display format of explain. Default is "indent". When set to "tree", it will print the plan in a tree-rendered format. | -| datafusion.explain.tree_maximum_render_width | 240 | (format=tree only) Maximum total width of the rendered tree. When set to 0, the tree will have no width limit. | -| datafusion.explain.analyze_level | dev | Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers. | -| datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | -| datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | -| datafusion.sql_parser.enable_options_value_normalization | false | When set to true, SQL parser will normalize options value (convert value to lowercase). Note that this option is ignored and will be removed in the future. All case-insensitive values are normalized automatically. | -| datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks. | -| datafusion.sql_parser.support_varchar_with_length | true | If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but ignore the length. If false, error if a `VARCHAR` with a length is specified. The Arrow type system does not have a notion of maximum string length and thus DataFusion can not enforce such limits. | -| datafusion.sql_parser.map_string_types_to_utf8view | true | If true, string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning. If false, they are mapped to `Utf8`. Default is true. | -| datafusion.sql_parser.collect_spans | false | When set to true, the source locations relative to the original SQL query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected and recorded in the logical plan nodes. | -| datafusion.sql_parser.recursion_limit | 50 | Specifies the recursion depth limit when parsing complex SQL Queries | -| datafusion.sql_parser.default_null_ordering | nulls_max | Specifies the default null ordering for query results. There are 4 options: - `nulls_max`: Nulls appear last in ascending order. - `nulls_min`: Nulls appear first in ascending order. - `nulls_first`: Nulls always be first in any order. - `nulls_last`: Nulls always be last in any order. By default, `nulls_max` is used to follow Postgres's behavior. postgres rule: | -| datafusion.format.safe | true | If set to `true` any formatting errors will be written to the output instead of being converted into a [`std::fmt::Error`] | -| datafusion.format.null | | Format string for nulls | -| datafusion.format.date_format | %Y-%m-%d | Date format for date arrays | -| datafusion.format.datetime_format | %Y-%m-%dT%H:%M:%S%.f | Format for DateTime arrays | -| datafusion.format.timestamp_format | %Y-%m-%dT%H:%M:%S%.f | Timestamp format for timestamp arrays | -| datafusion.format.timestamp_tz_format | NULL | Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used. | -| datafusion.format.time_format | %H:%M:%S%.f | Time format for time arrays | -| datafusion.format.duration_format | pretty | Duration format. Can be either `"pretty"` or `"ISO8601"` | -| datafusion.format.types_info | false | Show types in visual representation batches | +| key | default | description | +| ----------------------------------------------------------------------- | ------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.catalog.create_default_catalog_and_schema | true | Whether the default catalog and schema should be created automatically. | +| datafusion.catalog.default_catalog | datafusion | The default catalog name - this impacts what SQL queries use if not specified | +| datafusion.catalog.default_schema | public | The default schema name - this impacts what SQL queries use if not specified | +| datafusion.catalog.information_schema | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | +| datafusion.catalog.location | NULL | Location scanned to load tables for `default` schema | +| datafusion.catalog.format | NULL | Type of `TableProvider` to use when loading `default` schema | +| datafusion.catalog.has_header | true | Default value for `format.has_header` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. | +| datafusion.catalog.newlines_in_values | false | Specifies whether newlines in (quoted) CSV values are supported. This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. Parsing newlines in quoted values may be affected by execution behaviour such as parallel file scanning. Setting this to `true` ensures that newlines in values are parsed successfully, which may reduce performance. | +| datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption | +| datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | +| datafusion.execution.collect_statistics | true | Should DataFusion collect statistics when first creating a table. Has no effect after the table is created. Applies to the default `ListingTableProvider` in DataFusion. Defaults to true. | +| datafusion.execution.target_partitions | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system | +| datafusion.execution.time_zone | NULL | The default time zone Some functions, e.g. `now` return timestamps in this time zone | +| datafusion.execution.parquet.enable_page_index | true | (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. | +| datafusion.execution.parquet.pruning | true | (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file | +| datafusion.execution.parquet.skip_metadata | true | (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata | +| datafusion.execution.parquet.metadata_size_hint | 524288 | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed. | +| datafusion.execution.parquet.pushdown_filters | false | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". | +| datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | +| datafusion.execution.parquet.force_filter_selections | false | (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. | +| datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | +| datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | +| datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | +| datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | +| datafusion.execution.parquet.max_predicate_cache_size | NULL | (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. | +| datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | +| datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes | +| datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" | +| datafusion.execution.parquet.skip_arrow_metadata | false | (writing) Skip encoding the embedded arrow metadata in the KV_meta This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`. Refer to | +| datafusion.execution.parquet.compression | zstd(3) | (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting. | +| datafusion.execution.parquet.dictionary_enabled | true | (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.dictionary_page_size_limit | 1048576 | (writing) Sets best effort maximum dictionary page size, in bytes | +| datafusion.execution.parquet.statistics_enabled | page | (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.max_row_group_size | 1048576 | (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. | +| datafusion.execution.parquet.created_by | datafusion version 51.0.0 | (writing) Sets "created by" property | +| datafusion.execution.parquet.column_index_truncate_length | 64 | (writing) Sets column index truncate length | +| datafusion.execution.parquet.statistics_truncate_length | 64 | (writing) Sets statistics truncate length. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.data_page_row_count_limit | 20000 | (writing) Sets best effort maximum number of rows in data page | +| datafusion.execution.parquet.encoding | NULL | (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.bloom_filter_on_write | false | (writing) Write bloom filters for all columns when creating parquet files | +| datafusion.execution.parquet.bloom_filter_fpp | NULL | (writing) Sets bloom filter false positive probability. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.bloom_filter_ndv | NULL | (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting | +| datafusion.execution.parquet.allow_single_file_parallelism | true | (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | +| datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | +| datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | +| datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | +| datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | +| datafusion.execution.spill_compression | uncompressed | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. | +| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | +| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | +| datafusion.execution.max_spill_file_size_bytes | 134217728 | Maximum size in bytes for individual spill files before rotating to a new file. When operators spill data to disk (e.g., RepartitionExec), they write multiple batches to the same file until this size limit is reached, then rotate to a new file. This reduces syscall overhead compared to one-file-per-batch while preventing files from growing too large. A larger value reduces file creation overhead but may hold more disk space. A smaller value creates more files but allows finer-grained space reclamation as files can be deleted once fully consumed. Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators may create spill files larger than the limit. Default: 128 MB | +| datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | +| datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | +| datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | +| datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | +| datafusion.execution.listing_table_ignore_subdirectory | true | Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). | +| datafusion.execution.listing_table_factory_infer_partitions | true | Should a `ListingTable` created through the `ListingTableFactory` infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema). | +| datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | +| datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | +| datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches | +| datafusion.execution.skip_partial_aggregation_probe_ratio_threshold | 0.8 | Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input | +| datafusion.execution.skip_partial_aggregation_probe_rows_threshold | 100000 | Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode | +| datafusion.execution.use_row_number_estimates_to_optimize_partitioning | false | Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. | +| datafusion.execution.enforce_batch_size_in_joins | false | Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. | +| datafusion.execution.objectstore_writer_buffer_size | 10485760 | Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. | +| datafusion.execution.enable_ansi_mode | false | Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default. | +| datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | +| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | +| datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | +| datafusion.optimizer.enable_window_limits | true | When set to true, the optimizer will attempt to push limit operations past window functions, if possible | +| datafusion.optimizer.enable_topk_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase. | +| datafusion.optimizer.enable_join_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. | +| datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase. | +| datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. | +| datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | +| datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | +| datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | +| datafusion.optimizer.repartition_joins | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | +| datafusion.optimizer.allow_symmetric_joins_without_pruning | true | Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. | +| datafusion.optimizer.repartition_file_scans | true | When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism. This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition). For FileSources, only Parquet and CSV formats are currently supported. If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't happen within a single file. If set to `true` for an in-memory source, all memtable's partitions will have their batches repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change the total number of partitions and batches per partition, but does not slice the initial record tables provided to the MemTable on creation. | +| datafusion.optimizer.preserve_file_partitions | 0 | Minimum number of distinct partition values required to group files by their Hive partition column values (enabling Hash partitioning declaration). How the option is used: - preserve_file_partitions=0: Disable it. - preserve_file_partitions=1: Always enable it. - preserve_file_partitions=N, actual file partitions=M: Only enable when M >= N. This threshold preserves I/O parallelism when file partitioning is below it. Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct partitions is less than the target_partitions. | +| datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level | +| datafusion.optimizer.repartition_sorts | true | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` would turn into the plan below which performs better in multithreaded environments `text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` | +| datafusion.optimizer.subset_repartition_threshold | 4 | Partition count threshold for subset satisfaction optimization. When the current partition count is >= this threshold, DataFusion will skip repartitioning if the required partitioning expression is a subset of the current partition expression such as Hash(a) satisfies Hash(a, b). When the current partition count is < this threshold, DataFusion will repartition to increase parallelism even when subset satisfaction applies. Set to 0 to always repartition (disable subset satisfaction optimization). Set to a high value to always use subset satisfaction. Example (subset_repartition_threshold = 4): `text Hash([a]) satisfies Hash([a, b]) because (Hash([a, b]) is subset of Hash([a]) If current partitions (3) < threshold (4), repartition: AggregateExec: mode=FinalPartitioned, gby=[a, b], aggr=[SUM(x)] RepartitionExec: partitioning=Hash([a, b], 8), input_partitions=3 AggregateExec: mode=Partial, gby=[a, b], aggr=[SUM(x)] DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 3) If current partitions (8) >= threshold (4), use subset satisfaction: AggregateExec: mode=SinglePartitioned, gby=[a, b], aggr=[SUM(x)] DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 8) ` | +| datafusion.optimizer.prefer_existing_sort | false | When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. | +| datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | +| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | +| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | +| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | +| datafusion.optimizer.enable_piecewise_merge_join | false | When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter. | +| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | +| datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition | +| datafusion.optimizer.hash_join_inlist_pushdown_max_size | 131072 | Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides larger than this will use hash table lookups instead. Set to 0 to always use hash table lookups. InList pushdown can be more efficient for small build sides because it can result in better statistics pruning as well as use any bloom filters present on the scan side. InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion. On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory. This setting is per-partition, so we may end up using `hash_join_inlist_pushdown_max_size` \* `target_partitions` memory. The default is 128kB per partition. This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases but avoids excessive memory usage or overhead for larger joins. | +| datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values | 150 | Maximum number of distinct values (rows) in the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides with more rows than this will use hash table lookups instead. Set to 0 to always use hash table lookups. This provides an additional limit beyond `hash_join_inlist_pushdown_max_size` to prevent very large IN lists that might not provide much benefit over hash table lookups. This uses the deduplicated row count once the build side has been evaluated. The default is 150 values per partition. This is inspired by Trino's `max-filter-keys-per-column` setting. See: | +| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | +| datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave | +| datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. | +| datafusion.optimizer.enable_sort_pushdown | true | Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true | +| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | +| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | +| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans | +| datafusion.explain.show_sizes | true | When set to true, the explain statement will print the partition sizes | +| datafusion.explain.show_schema | false | When set to true, the explain statement will print schema information | +| datafusion.explain.format | indent | Display format of explain. Default is "indent". When set to "tree", it will print the plan in a tree-rendered format. | +| datafusion.explain.tree_maximum_render_width | 240 | (format=tree only) Maximum total width of the rendered tree. When set to 0, the tree will have no width limit. | +| datafusion.explain.analyze_level | dev | Verbosity level for "EXPLAIN ANALYZE". Default is "dev" "summary" shows common metrics for high-level insights. "dev" provides deep operator-level introspection for developers. | +| datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | +| datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | +| datafusion.sql_parser.enable_options_value_normalization | false | When set to true, SQL parser will normalize options value (convert value to lowercase). Note that this option is ignored and will be removed in the future. All case-insensitive values are normalized automatically. | +| datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks. | +| datafusion.sql_parser.support_varchar_with_length | true | If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but ignore the length. If false, error if a `VARCHAR` with a length is specified. The Arrow type system does not have a notion of maximum string length and thus DataFusion can not enforce such limits. | +| datafusion.sql_parser.map_string_types_to_utf8view | true | If true, string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning. If false, they are mapped to `Utf8`. Default is true. | +| datafusion.sql_parser.collect_spans | false | When set to true, the source locations relative to the original SQL query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected and recorded in the logical plan nodes. | +| datafusion.sql_parser.recursion_limit | 50 | Specifies the recursion depth limit when parsing complex SQL Queries | +| datafusion.sql_parser.default_null_ordering | nulls_max | Specifies the default null ordering for query results. There are 4 options: - `nulls_max`: Nulls appear last in ascending order. - `nulls_min`: Nulls appear first in ascending order. - `nulls_first`: Nulls always be first in any order. - `nulls_last`: Nulls always be last in any order. By default, `nulls_max` is used to follow Postgres's behavior. postgres rule: | +| datafusion.format.safe | true | If set to `true` any formatting errors will be written to the output instead of being converted into a [`std::fmt::Error`] | +| datafusion.format.null | | Format string for nulls | +| datafusion.format.date_format | %Y-%m-%d | Date format for date arrays | +| datafusion.format.datetime_format | %Y-%m-%dT%H:%M:%S%.f | Format for DateTime arrays | +| datafusion.format.timestamp_format | %Y-%m-%dT%H:%M:%S%.f | Timestamp format for timestamp arrays | +| datafusion.format.timestamp_tz_format | NULL | Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used. | +| datafusion.format.time_format | %H:%M:%S%.f | Time format for time arrays | +| datafusion.format.duration_format | pretty | Duration format. Can be either `"pretty"` or `"ISO8601"` | +| datafusion.format.types_info | false | Show types in visual representation batches | # Runtime Configuration Settings @@ -223,7 +223,7 @@ to enable parallelization can dominate the actual computation. You can find out how many cores are being used via the [`EXPLAIN`] command and look at the number of partitions in the plan. -[`explain`]: sql/explain.md +[`EXPLAIN`]: sql/explain.md The `datafusion.optimizer.repartition_file_min_size` option controls the minimum file size the [`ListingTable`] provider will attempt to repartition. However, this @@ -237,7 +237,7 @@ than 1MB), we recommend setting `target_partitions` to 1 to avoid repartitioning SET datafusion.execution.target_partitions = '1'; ``` -[`listingtable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html +[`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html ## Memory-limited Queries @@ -263,7 +263,7 @@ SET datafusion.execution.target_partitions = 4; SET datafusion.execution.batch_size = 1024; ``` -[`fairspillpool`]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html +[`FairSpillPool`]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html ## Join Queries diff --git a/docs/source/user-guide/metrics.md b/docs/source/user-guide/metrics.md index 7e0363f4ceb9b..286a6ec5d17b3 100644 --- a/docs/source/user-guide/metrics.md +++ b/docs/source/user-guide/metrics.md @@ -17,6 +17,12 @@ under the License. --> + + # Metrics DataFusion operators expose runtime metrics so you can understand where time is spent and how much data flows through the pipeline. See more in [EXPLAIN ANALYZE](sql/explain.md#explain-analyze). @@ -25,23 +31,215 @@ DataFusion operators expose runtime metrics so you can understand where time is ### BaselineMetrics -`BaselineMetrics` are available in most physical operators to capture common measurements. +Helper for creating and tracking common "baseline" metrics for each operator + +| Metric | Description | +| --------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| end_time | end_time is set when `BaselineMetrics::done()` is called | +| elapsed_compute | amount of time the operator was actively trying to use the CPU | +| output_rows | output rows: the total output rows | +| output_bytes | Memory usage of all output batches. Note: This value may be overestimated. If multiple output `RecordBatch` instances share underlying memory buffers, their sizes will be counted multiple times. Issue: | +| output_batches | output batches: the total output batch count | + +### BuildProbeJoinMetrics + +Metrics for build & probe joins + +| Metric | Description | +| ------------------- | ------------------------------------------------------------------- | +| baseline | | +| build_time | Total time for collecting build-side of join | +| build_input_batches | Number of batches consumed by build-side | +| build_input_rows | Number of rows consumed by build-side | +| build_mem_used | Memory used by build-side in bytes | +| join_time | Total time for joining probe-side batches to the build-side batches | +| input_batches | Number of batches consumed by probe-side of this operator | +| input_rows | Number of rows consumed by probe-side this operator | +| probe_hit_rate | Fraction of probe rows that found more than one match | +| avg_fanout | Average number of build matches per matched probe row | + +### PruningMetrics + +Counters tracking pruning metrics + +| Metric | Description | +| ------- | ----------- | +| pruned | | +| matched | | + +### RatioMetrics + +Counters tracking ratio metrics (e.g. matched vs total) + +| Metric | Description | +| -------------- | ----------- | +| part | | +| total | | +| merge_strategy | | + +### SpillMetrics + +Helper for creating and tracking spill-related metrics for each operator + +| Metric | Description | +| ---------------- | ------------------------------------------------------------------------- | +| spill_file_count | count of spills during the execution of the operator | +| spilled_bytes | total bytes actually written to disk during the execution of the operator | +| spilled_rows | total spilled rows during the execution of the operator | + +### SplitMetrics -| Metric | Description | -| --------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| elapsed_compute | CPU time the operator actively spends processing work. | -| output_rows | Total number of rows the operator produces. | -| output_bytes | Memory usage of all output batches. Note: This value may be overestimated. If multiple output `RecordBatch` instances share underlying memory buffers, their sizes will be counted multiple times. | -| output_batches | Total number of output batches the operator produces. | +Metrics for tracking batch splitting activity + +| Metric | Description | +| ------------- | -------------------------------------------------- | +| batches_split | Number of times an input [`RecordBatch`] was split | ## Operator-specific Metrics +### AggregateExec + +Hash aggregate execution plan + +#### GroupByMetrics + +| Metric | Description | +| -------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| time_calculating_group_ids | Time spent calculating the group IDs from the evaluated grouping columns. | +| aggregate_arguments_time | Time spent evaluating the inputs to the aggregate functions. | +| aggregation_time | Time spent evaluating the aggregate expressions themselves (e.g. summing all elements and counting number of elements for `avg` aggregate). | +| emitting_time | Time spent emitting the final results and constructing the record batch which includes finalizing the grouping expressions (e.g. emit from the hash table in case of hash aggregation) and the accumulators | + +### CrossJoinExec + +Cross Join Execution Plan + +_No operator-specific metrics documented (see Common Metrics)._ + +### DataSourceExec + +[`ExecutionPlan`] that reads one or more files + +#### FileStreamMetrics + +Metrics for [`FileStream`] + +| Metric | Description | +| ------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| time_opening | Wall clock time elapsed for file opening. Time between when [`FileOpener::open`] is called and when the [`FileStream`] receives a stream for reading. If there are multiple files being scanned, the stream will open the next file in the background while scanning the current file. This metric will only capture time spent opening while not also scanning. [`FileStream`]: | +| time_scanning_until_data | Wall clock time elapsed for file scanning + first record batch of decompression + decoding Time between when the [`FileStream`] requests data from the stream and when the first [`RecordBatch`] is produced. [`FileStream`]: | +| time_scanning_total | Total elapsed wall clock time for scanning + record batch decompression / decoding Sum of time between when the [`FileStream`] requests data from the stream and when a [`RecordBatch`] is produced for all record batches in the stream. Note that this metric also includes the time of the parent operator's execution. | +| time_processing | Wall clock time elapsed for data decompression + decoding Time spent waiting for the FileStream's input. | +| file_open_errors | Count of errors opening file. If using `OnError::Skip` this will provide a count of the number of files which were skipped and will not be included in the scan results. | +| file_scan_errors | Count of errors scanning file If using `OnError::Skip` this will provide a count of the number of files which were skipped and will not be included in the scan results. | + ### FilterExec -| Metric | Description | -| ----------- | ----------------------------------------------------------------- | -| selectivity | Selectivity of the filter, calculated as output_rows / input_rows | +FilterExec evaluates a boolean predicate against all input batches to determine which rows to include in its output batches. + +#### FilterExecMetrics + +The metrics for `FilterExec` + +| Metric | Description | +| ---------------- | ----------------------------------------------------------------- | +| baseline_metrics | Common metrics for most operators | +| selectivity | Selectivity of the filter, calculated as output_rows / input_rows | + +### HashJoinExec + +Join execution plan: Evaluates equijoin predicates in parallel on multiple partitions using a hash table and an optional filter list to apply post join. + +_No operator-specific metrics documented (see Common Metrics)._ + +### NestedLoopJoinExec + +NestedLoopJoinExec is a build-probe join operator designed for joins that do not have equijoin keys in their `ON` clause. + +#### NestedLoopJoinMetrics + +| Metric | Description | +| ------------ | ---------------------------------------------------------------- | +| join_metrics | Join execution metrics | +| selectivity | Selectivity of the join: output_rows / (left_rows \* right_rows) | + +### PiecewiseMergeJoinExec + +`PiecewiseMergeJoinExec` is a join execution plan that only evaluates single range filter and show much better performance for these workloads than `NestedLoopJoin` + +_No operator-specific metrics documented (see Common Metrics)._ + +### RepartitionExec + +Maps `N` input partitions to `M` output partitions based on a [`Partitioning`] scheme. + +#### RepartitionMetrics + +| Metric | Description | +| ---------------- | ----------------------------------------------------------------------------------------- | +| fetch_time | Time in nanos to execute child operator and fetch batches | +| repartition_time | Repartitioning elapsed time in nanos | +| send_time | Time in nanos for sending resulting batches to channels. One metric per output partition. | + +### SortExec + +Sort execution plan. + +#### ExternalSorterMetrics + +| Metric | Description | +| ------------- | ----------- | +| baseline | metrics | +| spill_metrics | | +| split_metrics | | + +#### TopKMetrics + +| Metric | Description | +| ---------------- | ------------------------------------------------ | +| baseline | metrics | +| row_replacements | count of how many rows were replaced in the heap | + +### SortMergeJoinExec + +Join execution plan that executes equi-join predicates on multiple partitions using Sort-Merge join algorithm and applies an optional filter post join. Can be used to join arbitrarily large inputs where one or both of the inputs don't fit in the available memory. + +#### SortMergeJoinMetrics + +Metrics for SortMergeJoinExec + +| Metric | Description | +| ---------------- | --------------------------------------------------------------------------------------------- | +| join_time | Total time for joining probe-side batches to the build-side batches | +| input_batches | Number of batches consumed by this operator | +| input_rows | Number of rows consumed by this operator | +| baseline_metrics | Execution metrics | +| peak_mem_used | Peak memory used for buffered data. Calculated as sum of peak memory values across partitions | +| spill_metrics | Metrics related to spilling | + +### SymmetricHashJoinExec + +A symmetric hash join with range conditions is when both streams are hashed on the join key and the resulting hash tables are used to join the streams. The join is considered symmetric because the hash table is built on the join keys from both streams, and the matching of rows is based on the values of the join keys in both streams. This type of join is efficient in streaming context as it allows for fast lookups in the hash table, rather than having to scan through one or both of the streams to find matching rows, also it only considers the elements from the stream that fall within a certain sliding window (w/ range conditions), making it more efficient and less likely to store stale data. This enables operating on unbounded streaming data without any memory issues. + +#### StreamJoinMetrics + +Metrics for HashJoinExec + +| Metric | Description | +| ------------------- | ------------------------------------------------------ | +| left | Number of left batches/rows consumed by this operator | +| right | Number of right batches/rows consumed by this operator | +| stream_memory_usage | Memory used by sides in bytes | +| baseline_metrics | Number of rows produced by this operator | + +### UnnestExec + +Unnest the given columns (either with type struct or list) For list unnesting, each row is vertically transformed into multiple rows For struct unnesting, each column is horizontally transformed into multiple columns, Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m') -## TODO +#### UnnestMetrics -Add metrics for the remaining operators +| Metric | Description | +| ---------------- | -------------------------- | +| baseline_metrics | Execution metrics | +| input_batches | Number of batches consumed | +| input_rows | Number of rows consumed | diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index ddf32a5066ebe..28bf7d0bfc415 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -2451,7 +2451,6 @@ date_bin(interval, expression, origin-timestamp) - **interval**: Bin interval. - **expression**: Time expression to operate on. Can be a constant, column, or function. - **origin-timestamp**: Optional. Starting point used to determine bin boundaries. If not specified defaults 1970-01-01T00:00:00Z (the UNIX epoch in UTC). The following intervals are supported: - - nanoseconds - microseconds - milliseconds @@ -2516,7 +2515,6 @@ date_part(part, expression) #### Arguments - **part**: Part of the date to return. The following date parts are supported: - - year - quarter (emits value in inclusive range [1, 4] based on which quartile of the year the date is in) - month @@ -2556,7 +2554,6 @@ date_trunc(precision, expression) #### Arguments - **precision**: Time precision to truncate to. The following precisions are supported: - - year / YEAR - quarter / QUARTER - month / MONTH