diff --git a/Cargo.lock b/Cargo.lock index 2ee907b30cf02..103ca4040d68f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2144,6 +2144,7 @@ dependencies = [ "datafusion-expr-common", "datafusion-functions-aggregate-common", "datafusion-functions-window-common", + "datafusion-macros", "datafusion-physical-expr-common", "env_logger", "indexmap 2.12.1", diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index 75aa59595bed5..a9bba5ef5d6d4 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -64,5 +64,10 @@ sqlparser = { workspace = true, optional = true } [dev-dependencies] ctor = { workspace = true } +datafusion-macros = { workspace = true } env_logger = { workspace = true } insta = { workspace = true } + +[package.metadata.cargo-machete] +# See: https://github.com/bnjbvr/cargo-machete/issues/163 +ignored = ["datafusion-macros"] diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs index 8f2b8a0d9bfe5..6c9d2a2689dfd 100644 --- a/datafusion/expr/src/udwf.rs +++ b/datafusion/expr/src/udwf.rs @@ -52,7 +52,7 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr; /// A UDWF is different from a user defined function (UDF) in that it is /// stateful across batches. /// -/// See the documentation on [`PartitionEvaluator`] for more details +/// See the documentation on [`PartitionEvaluator`] for more details. /// /// 1. For simple use cases, use [`create_udwf`] (examples in /// [`simple_udwf.rs`]). @@ -61,10 +61,9 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr; /// access (examples in [`advanced_udwf.rs`]). /// /// # API Note -/// This is a separate struct from `WindowUDFImpl` to maintain backwards +/// This is a separate struct from [`WindowUDFImpl`] to maintain backwards /// compatibility with the older API. /// -/// [`PartitionEvaluator`]: crate::PartitionEvaluator /// [`create_udwf`]: crate::expr_fn::create_udwf /// [`simple_udwf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/simple_udwf.rs /// [`advanced_udwf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udwf.rs @@ -73,7 +72,6 @@ pub struct WindowUDF { inner: Arc, } -/// Defines how the WindowUDF is shown to users impl Display for WindowUDF { fn fmt(&self, f: &mut Formatter) -> fmt::Result { write!(f, "{}", self.name()) @@ -95,9 +93,9 @@ impl Hash for WindowUDF { } impl WindowUDF { - /// Create a new `WindowUDF` from a `[WindowUDFImpl]` trait object + /// Create a new [`WindowUDF`] from a [`WindowUDFImpl`] trait object. /// - /// Note this is the same as using the `From` impl (`WindowUDF::from`) + /// This is the same as using [`WindowUDF::from`]. pub fn new_from_impl(fun: F) -> WindowUDF where F: WindowUDFImpl + 'static, @@ -105,25 +103,26 @@ impl WindowUDF { Self::new_from_shared_impl(Arc::new(fun)) } - /// Create a new `WindowUDF` from a `[WindowUDFImpl]` trait object + /// Create a new [`WindowUDF`] from a [`WindowUDFImpl`] trait object. pub fn new_from_shared_impl(fun: Arc) -> WindowUDF { Self { inner: fun } } - /// Return the underlying [`WindowUDFImpl`] trait object for this function + /// Return the underlying [`WindowUDFImpl`] trait object for this function. pub fn inner(&self) -> &Arc { &self.inner } /// Adds additional names that can be used to invoke this function, in - /// addition to `name` + /// addition to `name`. /// - /// If you implement [`WindowUDFImpl`] directly you should return aliases directly. + /// If you implement [`WindowUDFImpl`] directly you should return aliases via + /// [`WindowUDFImpl::aliases`] instead. pub fn with_aliases(self, aliases: impl IntoIterator) -> Self { Self::new_from_impl(AliasedWindowUDFImpl::new(Arc::clone(&self.inner), aliases)) } - /// creates a [`Expr`] that calls the window function with default + /// Creates an [`Expr`] that calls the window function with default /// values for `order_by`, `partition_by`, `window_frame`. /// /// See [`ExprFunctionExt`] for details on setting these values. @@ -138,7 +137,7 @@ impl WindowUDF { Expr::from(WindowFunction::new(fun, args)) } - /// Returns this function's name + /// Returns this function's name. /// /// See [`WindowUDFImpl::name`] for more details. pub fn name(&self) -> &str { @@ -146,18 +145,20 @@ impl WindowUDF { } /// Returns the aliases for this function. + /// + /// See [`WindowUDFImpl::aliases`] for more details. pub fn aliases(&self) -> &[String] { self.inner.aliases() } - /// Returns this function's signature (what input types are accepted) + /// Returns this function's signature (what input types are accepted). /// /// See [`WindowUDFImpl::signature`] for more details. pub fn signature(&self) -> &Signature { self.inner.signature() } - /// Do the function rewrite + /// Retrieve function simplification rule, if present. /// /// See [`WindowUDFImpl::simplify`] for more details. pub fn simplify(&self) -> Option { @@ -170,7 +171,10 @@ impl WindowUDF { pub fn expressions(&self, expr_args: ExpressionArgs) -> Vec> { self.inner.expressions(expr_args) } - /// Return a `PartitionEvaluator` for evaluating this window function + + /// Return a [`PartitionEvaluator`] for evaluating this window function. + /// + /// See [`WindowUDFImpl::partition_evaluator`] for more details. pub fn partition_evaluator_factory( &self, partition_evaluator_args: PartitionEvaluatorArgs, @@ -236,69 +240,67 @@ where /// See [`advanced_udwf.rs`] for a full example with complete implementation and /// [`WindowUDF`] for other available options. /// -/// /// [`advanced_udwf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udwf.rs /// # Basic Example /// ``` /// # use std::any::Any; -/// # use std::sync::LazyLock; /// # use arrow::datatypes::{DataType, Field, FieldRef}; -/// # use datafusion_common::{DataFusionError, plan_err, Result}; -/// # use datafusion_expr::{col, Signature, Volatility, PartitionEvaluator, WindowFrame, ExprFunctionExt, Documentation, LimitEffect}; -/// # use datafusion_expr::{WindowUDFImpl, WindowUDF}; +/// # use datafusion_common::Result; +/// # use datafusion_expr::{col, Signature, Volatility, PartitionEvaluator, WindowFrame, ExprFunctionExt, Documentation, WindowUDFImpl, WindowUDF}; /// # use datafusion_functions_window_common::field::WindowUDFFieldArgs; /// # use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; -/// # use datafusion_expr::window_doc_sections::DOC_SECTION_ANALYTICAL; -/// # use datafusion_physical_expr_common::physical_expr; +/// # use datafusion_macros::user_doc; /// # use std::sync::Arc; -/// +/// #[user_doc( +/// doc_section(label = "Analytical Functions"), +/// description = "Smooths the windows", +/// syntax_example = "smooth_it(2)", +/// argument( +/// name = "arg1", +/// description = "The int32 number to smooth by" +/// ) +/// )] /// #[derive(Debug, Clone, PartialEq, Eq, Hash)] /// struct SmoothIt { -/// signature: Signature, +/// signature: Signature, /// } /// /// impl SmoothIt { -/// fn new() -> Self { -/// Self { -/// signature: Signature::uniform(1, vec![DataType::Int32], Volatility::Immutable), -/// } -/// } -/// } -/// -/// static DOCUMENTATION: LazyLock = LazyLock::new(|| { -/// Documentation::builder(DOC_SECTION_ANALYTICAL, "smooths the windows", "smooth_it(2)") -/// .with_argument("arg1", "The int32 number to smooth by") -/// .build() -/// }); -/// -/// fn get_doc() -> &'static Documentation { -/// &DOCUMENTATION +/// fn new() -> Self { +/// Self { +/// signature: Signature::exact(vec![DataType::Int32], Volatility::Immutable), +/// } +/// } /// } /// /// /// Implement the WindowUDFImpl trait for SmoothIt /// impl WindowUDFImpl for SmoothIt { -/// fn as_any(&self) -> &dyn Any { self } -/// fn name(&self) -> &str { "smooth_it" } -/// fn signature(&self) -> &Signature { &self.signature } -/// // The actual implementation would smooth the window -/// fn partition_evaluator( -/// &self, -/// _partition_evaluator_args: PartitionEvaluatorArgs, -/// ) -> Result> { -/// unimplemented!() -/// } -/// fn field(&self, field_args: WindowUDFFieldArgs) -> Result { -/// if let Some(DataType::Int32) = field_args.get_input_field(0).map(|f| f.data_type().clone()) { -/// Ok(Field::new(field_args.name(), DataType::Int32, false).into()) -/// } else { -/// plan_err!("smooth_it only accepts Int32 arguments") -/// } -/// } -/// fn documentation(&self) -> Option<&Documentation> { -/// Some(get_doc()) -/// } -/// fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { -/// LimitEffect::Unknown +/// fn as_any(&self) -> &dyn Any { +/// self +/// } +/// +/// fn name(&self) -> &str { +/// "smooth_it" +/// } +/// +/// fn signature(&self) -> &Signature { +/// &self.signature +/// } +/// +/// fn partition_evaluator( +/// &self, +/// _partition_evaluator_args: PartitionEvaluatorArgs, +/// ) -> Result> { +/// // The actual implementation would smooth the window +/// unimplemented!() +/// } +/// +/// fn field(&self, field_args: WindowUDFFieldArgs) -> Result { +/// Ok(Field::new(field_args.name(), DataType::Int32, false).into()) +/// } +/// +/// fn documentation(&self) -> Option<&Documentation> { +/// self.doc() /// } /// } /// @@ -315,22 +317,24 @@ where /// .unwrap(); /// ``` pub trait WindowUDFImpl: Debug + DynEq + DynHash + Send + Sync { - /// Returns this object as an [`Any`] trait object + /// Returns this object as an [`Any`] trait object. fn as_any(&self) -> &dyn Any; - /// Returns this function's name + /// Returns this function's name. fn name(&self) -> &str; /// Returns any aliases (alternate names) for this function. /// /// Note: `aliases` should only include names other than [`Self::name`]. - /// Defaults to `[]` (no aliases) + /// Defaults to no aliases (`&[]`). fn aliases(&self) -> &[String] { &[] } /// Returns the function's [`Signature`] for information about what input - /// types are accepted and the function's Volatility. + /// types are accepted and the function's [`Volatility`]. + /// + /// [`Volatility`]: crate::Volatility fn signature(&self) -> &Signature; /// Returns the expressions that are passed to the [`PartitionEvaluator`]. @@ -338,7 +342,7 @@ pub trait WindowUDFImpl: Debug + DynEq + DynHash + Send + Sync { expr_args.input_exprs().into() } - /// Invoke the function, returning the [`PartitionEvaluator`] instance + /// Invoke the function, returning the [`PartitionEvaluator`] instance. fn partition_evaluator( &self, partition_evaluator_args: PartitionEvaluatorArgs, @@ -349,27 +353,33 @@ pub trait WindowUDFImpl: Debug + DynEq + DynHash + Send + Sync { /// This can be used to apply function specific simplification rules during /// optimization. The default implementation does nothing. /// - /// Note that DataFusion handles simplifying arguments and "constant + /// Note that DataFusion handles simplifying arguments and "constant /// folding" (replacing a function call with constant arguments such as /// `my_add(1,2) --> 3` ). Thus, there is no need to implement such /// optimizations manually for specific UDFs. /// - /// Example: - /// `advanced_udwf.rs`: + /// See an example implementation in [`advanced_udwf.rs`]. /// /// # Returns - /// [None] if simplify is not defined or, /// - /// Or, a closure with two arguments: - /// * 'window_function': [crate::expr::WindowFunction] for which simplified has been invoked - /// * 'info': [crate::simplify::SimplifyContext] + /// `None` if there are no simplification rules, otherwise a closure with + /// two arguments: + /// * `window_function`: The [`WindowFunction`] for which simplification is + /// being invoked for + /// * `info`: [`SimplifyContext`] + /// + /// The closure will perform the simplification and return an [`Expr`]. /// /// # Notes + /// /// The returned expression must have the same schema as the original /// expression, including both the data type and nullability. For example, /// if the original expression is nullable, the returned expression must /// also be nullable, otherwise it may lead to schema verification errors /// later in query planning. + /// + /// [`advanced_udwf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udwf.rs + /// [`SimplifyContext`]: crate::simplify::SimplifyContext fn simplify(&self) -> Option { None } @@ -392,23 +402,26 @@ pub trait WindowUDFImpl: Debug + DynEq + DynHash + Send + Sync { /// Coerce arguments of a function call to types that the function can evaluate. /// - /// This function is only called if [`WindowUDFImpl::signature`] returns [`crate::TypeSignature::UserDefined`]. Most - /// UDWFs should return one of the other variants of `TypeSignature` which handle common - /// cases + /// This function is only called if [`WindowUDFImpl::signature`] is defined as + /// [`TypeSignature::UserDefined`]. Most UDWFs should return one of the other + /// variants of [`TypeSignature`] which handle common cases. /// - /// See the [type coercion module](crate::type_coercion) - /// documentation for more details on type coercion + /// See the [type coercion module] documentation for more details. /// - /// For example, if your function requires a floating point arguments, but the user calls - /// it like `my_func(1::int)` (aka with `1` as an integer), coerce_types could return `[DataType::Float64]` - /// to ensure the argument was cast to `1::double` + /// For example, if your function requires a floating point arguments, but the + /// user calls it like `my_func(1::int)` (aka with `1` as an integer), coerce_types + /// could return [`DataType::Float64`] to ensure the argument was cast to `1::double`. /// /// # Parameters - /// * `arg_types`: The argument types of the arguments this function with + /// * `arg_types`: The types of the arguments this function is called with /// /// # Return value /// A Vec the same length as `arg_types`. DataFusion will `CAST` the function call /// arguments to these specific types. + /// + /// [type coercion module]: crate::type_coercion + /// [`TypeSignature::UserDefined`]: crate::TypeSignature::UserDefined + /// [`TypeSignature`]: crate::TypeSignature fn coerce_types(&self, _arg_types: &[DataType]) -> Result> { not_impl_err!("Function {} does not implement coerce_types", self.name()) } @@ -427,21 +440,130 @@ pub trait WindowUDFImpl: Debug + DynEq + DynHash + Send + Sync { None } - /// If not causal, returns the effect this function will have on the window + /// Returns the effect this function has on limit pushdowns. See [`LimitEffect`] + /// for more details. + /// + /// Defaults to [`LimitEffect::Unknown`]. fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { LimitEffect::Unknown } } -/// the effect this function will have on the limit pushdown +/// The effect this function will have on limit pushdowns through a window bound. +/// +/// # Categories +/// +/// Window functions will fall into one of three categories: +/// +/// ## Causal +/// +/// A causal window function is one that is not affected by a limit pushdown. +/// For example, `row_number()` is causal. Given a query: +/// +/// ```sql +/// SELECT row_number() OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) +/// FROM employees +/// ORDER BY empno +/// LIMIT 5 +/// ``` +/// +/// A pseudo logical plan would look like: +/// +/// ```text +/// Sort: empno, fetch=5 +/// --WindowAggr: row_number() +/// ----TableScan: employees +/// ``` +/// +/// We can leverage the information that `row_number()` is causal to optimize the +/// physical plan to push through the limit like so: +/// +/// ```text +/// WindowAggr: row_number() +/// --Sort: empno, fetch=5 +/// ----TableScan: employees +/// ``` +/// +/// This optimization ensures `row_number()` maintains correctness whilst limiting +/// the actual input to it, otherwise it would require computing `row_number()` +/// on the whole file before applying the limit. `row_number()` does not require +/// extra data beyond the input limited data, nor access to the entire +/// dataset, to calculate its results therefore it is considered causal. +/// +/// Causal window functions correspond to [`LimitEffect::None`]. +/// +/// Aggregate window functions are all considered causal. +/// +/// ## Acausal +/// +/// An acausal function requires lookahead to compute its results. They still allow +/// limit pushdowns, but must grow the limit by an amount to permit this optimization. +/// For example, `lead()` is an acausal window function. Given a query: +/// +/// ```sql +/// SELECT lead(empno, 2) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) +/// FROM employees +/// ORDER BY empno +/// LIMIT 5 +/// ``` +/// +/// A pseudo logical plan would look like: +/// +/// ```text +/// Sort: empno, fetch=5 +/// --WindowAggr: lead() +/// ----TableScan: employees +/// ``` +/// +/// Because `lead()` needs to see rows offset by 2 to calculate its results, +/// we must grow the limit by that amount: +/// +/// ```text +/// Sort: empno, fetch=5 +/// --WindowAggr: lead() +/// ----Sort: empno, fetch=7 +/// ------TableScan: employees +/// ``` +/// +/// For the eventual 5 rows we see, dictated by the global limit, `lead()` needs +/// at least 7 rows to calculate the correct lookahead for each row, hence we push +/// down a limit of 7 (the limit grew by 2). +/// +/// It is worth noting that its sibling `lag()` is *not* acausal and is instead +/// causal. This is because `lag()` only needs lookbehind to calculate its results. +/// That is, within a limited window frame, `lag()` already has enough information +/// to calculate its results and does not need to grow the window frame if we push +/// down a limit. +/// +/// Acausal window functions correspond to [`LimitEffect::Relative`] and [`LimitEffect::Absolute`], +/// where `lead()` is [`LimitEffect::Relative`] since it grows the limit relative +/// to how much lookahead it needs (2 in the above example). [`LimitEffect::Absolute`] +/// is for functions which require a fixed number of rows to compute its result; +/// the limit will be grown to at least the specified amount if pushing down through +/// such a function. +/// +/// ## Unknown +/// +/// This prevents a limit being pushed down through the window function, in case +/// it is dynamic (only evaluatable at run time) or simply undeclared. This is a +/// safe default if you are unsure which is applicable. +/// +/// # Multiple window functions +/// +/// If there are multiple window functions with varying limit effects present in +/// a single projection, the most conservative effect applies. That is, if any +/// function is unknown then no limit is pushed down; if any function is acausal +/// then the limit is grown to the maximum required limit across all functions +/// before being pushed down; otherwise limit is pushed down as is if all functions +/// are causal. pub enum LimitEffect { - /// Does not affect the limit (i.e. this is causal) + /// Does not affect the limit (i.e. this is causal). None, - /// Either undeclared, or dynamic (only evaluatable at run time) + /// Either undeclared, or dynamic (only evaluatable at run time). Unknown, - /// Grow the limit by N rows + /// Grow the limit by N rows. Relative(usize), - /// Limit needs to be at least N rows + /// Limit needs to be at least N rows. Absolute(usize), }