From b7741139e3924a90a184bc1ac7ce268e5be0ed6a Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 22 Nov 2025 15:48:58 -0500 Subject: [PATCH] Implement FFI_ExtensionOptions --- datafusion/common/src/config.rs | 33 +- .../ffi/src/config/extension_options.rs | 286 ++++++++++++++++++ datafusion/ffi/src/config/mod.rs | 75 +++++ datafusion/ffi/src/lib.rs | 1 + datafusion/ffi/src/session/config.rs | 36 +-- 5 files changed, 398 insertions(+), 33 deletions(-) create mode 100644 datafusion/ffi/src/config/extension_options.rs create mode 100644 datafusion/ffi/src/config/mod.rs diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 95a02147438b0..c9936b8d30b6c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1231,7 +1231,7 @@ impl<'a> TryInto> for &'a FormatOptions } /// A key value pair, with a corresponding description -#[derive(Debug, Hash, PartialEq, Eq)] +#[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct ConfigEntry { /// A unique string to identify this config value pub key: String, @@ -1327,6 +1327,8 @@ impl ConfigField for ConfigOptions { } } +pub const DATAFUSION_FFI_CONFIG_NAMESPACE: &str = "datafusion_ffi"; + impl ConfigOptions { /// Creates a new [`ConfigOptions`] with default values pub fn new() -> Self { @@ -1341,12 +1343,12 @@ impl ConfigOptions { /// Set a configuration option pub fn set(&mut self, key: &str, value: &str) -> Result<()> { - let Some((prefix, key)) = key.split_once('.') else { + let Some((mut prefix, mut inner_key)) = key.split_once('.') else { return _config_err!("could not find config namespace for key \"{key}\""); }; if prefix == "datafusion" { - if key == "optimizer.enable_dynamic_filter_pushdown" { + if inner_key == "optimizer.enable_dynamic_filter_pushdown" { let bool_value = value.parse::().map_err(|e| { DataFusionError::Configuration(format!( "Failed to parse '{value}' as bool: {e}", @@ -1361,13 +1363,23 @@ impl ConfigOptions { } return Ok(()); } - return ConfigField::set(self, key, value); + return ConfigField::set(self, inner_key, value); + } + + if !self.extensions.0.contains_key(prefix) + && self + .extensions + .0 + .contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE) + { + inner_key = key; + prefix = DATAFUSION_FFI_CONFIG_NAMESPACE; } let Some(e) = self.extensions.0.get_mut(prefix) else { return _config_err!("Could not find config namespace \"{prefix}\""); }; - e.0.set(key, value) + e.0.set(inner_key, value) } /// Create new [`ConfigOptions`], taking values from environment variables @@ -2132,7 +2144,7 @@ impl TableOptions { /// /// A result indicating success or failure in setting the configuration option. pub fn set(&mut self, key: &str, value: &str) -> Result<()> { - let Some((prefix, _)) = key.split_once('.') else { + let Some((mut prefix, _)) = key.split_once('.') else { return _config_err!("could not find config namespace for key \"{key}\""); }; @@ -2144,6 +2156,15 @@ impl TableOptions { return Ok(()); } + if !self.extensions.0.contains_key(prefix) + && self + .extensions + .0 + .contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE) + { + prefix = DATAFUSION_FFI_CONFIG_NAMESPACE; + } + let Some(e) = self.extensions.0.get_mut(prefix) else { return _config_err!("Could not find config namespace \"{prefix}\""); }; diff --git a/datafusion/ffi/src/config/extension_options.rs b/datafusion/ffi/src/config/extension_options.rs new file mode 100644 index 0000000000000..c56ed38b17064 --- /dev/null +++ b/datafusion/ffi/src/config/extension_options.rs @@ -0,0 +1,286 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::collections::HashMap; +use std::ffi::c_void; + +use abi_stable::StableAbi; +use abi_stable::std_types::{RResult, RStr, RString, RVec, Tuple2}; +use datafusion_common::config::{ConfigEntry, ConfigExtension, ExtensionOptions}; +use datafusion_common::{Result, exec_err}; + +use crate::df_result; + +/// A stable struct for sharing [`ExtensionOptions`] across FFI boundaries. +/// +/// Unlike other FFI structs in this crate, we do not construct a foreign +/// variant of this object. This is due to the typical method for interacting +/// with extension options is by creating a local struct of your concrete type. +/// To support this methodology use the `to_extension` method instead. +/// +/// When using [`FFI_ExtensionOptions`] with multiple extensions, all extension +/// values are stored on a single [`FFI_ExtensionOptions`] object. The keys +/// are stored with the full path prefix to avoid overwriting values when using +/// multiple extensions. +#[repr(C)] +#[derive(Debug, StableAbi)] +pub struct FFI_ExtensionOptions { + /// Return a deep clone of this [`ExtensionOptions`] + pub cloned: unsafe extern "C" fn(&Self) -> FFI_ExtensionOptions, + + /// Set the given `key`, `value` pair + pub set: + unsafe extern "C" fn(&mut Self, key: RStr, value: RStr) -> RResult<(), RString>, + + /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`] + pub entries: unsafe extern "C" fn(&Self) -> RVec>, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(&mut Self), + + /// Internal data. This is only to be accessed by the provider of the options. + /// A [`ForeignExtensionOptions`] should never attempt to access this data. + pub private_data: *mut c_void, +} + +unsafe impl Send for FFI_ExtensionOptions {} +unsafe impl Sync for FFI_ExtensionOptions {} + +pub struct ExtensionOptionsPrivateData { + pub options: HashMap, +} + +impl FFI_ExtensionOptions { + #[inline] + fn inner_mut(&mut self) -> &mut HashMap { + let private_data = self.private_data as *mut ExtensionOptionsPrivateData; + unsafe { &mut (*private_data).options } + } + + #[inline] + fn inner(&self) -> &HashMap { + let private_data = self.private_data as *const ExtensionOptionsPrivateData; + unsafe { &(*private_data).options } + } +} + +unsafe extern "C" fn cloned_fn_wrapper( + options: &FFI_ExtensionOptions, +) -> FFI_ExtensionOptions { + options + .inner() + .iter() + .map(|(k, v)| (k.to_owned(), v.to_owned())) + .collect::>() + .into() +} + +unsafe extern "C" fn set_fn_wrapper( + options: &mut FFI_ExtensionOptions, + key: RStr, + value: RStr, +) -> RResult<(), RString> { + let _ = options.inner_mut().insert(key.into(), value.into()); + RResult::ROk(()) +} + +unsafe extern "C" fn entries_fn_wrapper( + options: &FFI_ExtensionOptions, +) -> RVec> { + options + .inner() + .iter() + .map(|(key, value)| (key.to_owned().into(), value.to_owned().into()).into()) + .collect() +} + +unsafe extern "C" fn release_fn_wrapper(options: &mut FFI_ExtensionOptions) { + let private_data = unsafe { + Box::from_raw(options.private_data as *mut ExtensionOptionsPrivateData) + }; + drop(private_data); +} + +impl Default for FFI_ExtensionOptions { + fn default() -> Self { + HashMap::new().into() + } +} + +impl From> for FFI_ExtensionOptions { + fn from(options: HashMap) -> Self { + let private_data = ExtensionOptionsPrivateData { options }; + + Self { + cloned: cloned_fn_wrapper, + set: set_fn_wrapper, + entries: entries_fn_wrapper, + release: release_fn_wrapper, + private_data: Box::into_raw(Box::new(private_data)) as *mut c_void, + } + } +} + +impl Drop for FFI_ExtensionOptions { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +impl Clone for FFI_ExtensionOptions { + fn clone(&self) -> Self { + unsafe { (self.cloned)(self) } + } +} + +impl ConfigExtension for FFI_ExtensionOptions { + const PREFIX: &'static str = + datafusion_common::config::DATAFUSION_FFI_CONFIG_NAMESPACE; +} + +impl ExtensionOptions for FFI_ExtensionOptions { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + fn cloned(&self) -> Box { + let ffi_options = unsafe { (self.cloned)(self) }; + Box::new(ffi_options) + } + + fn set(&mut self, key: &str, value: &str) -> Result<()> { + if key.split_once('.').is_none() { + return exec_err!("Unable to set FFI config value without namespace set"); + }; + + df_result!(unsafe { (self.set)(self, key.into(), value.into()) }) + } + + fn entries(&self) -> Vec { + unsafe { + (self.entries)(self) + .into_iter() + .map(|entry_tuple| ConfigEntry { + key: entry_tuple.0.into(), + value: Some(entry_tuple.1.into()), + description: "ffi_config_options", + }) + .collect() + } + } +} + +impl FFI_ExtensionOptions { + /// Add all of the values in a concrete configuration extension to the + /// FFI variant. This is safe to call on either side of the FFI + /// boundary. + pub fn add_config(&mut self, config: &C) -> Result<()> { + for entry in config.entries() { + if let Some(value) = entry.value { + let key = format!("{}.{}", C::PREFIX, entry.key); + self.set(key.as_str(), value.as_str())?; + } + } + + Ok(()) + } + + /// Merge another `FFI_ExtensionOptions` configurations into this one. + /// This is safe to call on either side of the FFI boundary. + pub fn merge(&mut self, other: &FFI_ExtensionOptions) -> Result<()> { + for entry in other.entries() { + if let Some(value) = entry.value { + self.set(entry.key.as_str(), value.as_str())?; + } + } + Ok(()) + } + + /// Create a concrete extension type from the FFI variant. + /// This is safe to call on either side of the FFI boundary. + pub fn to_extension(&self) -> Result { + let mut result = C::default(); + + unsafe { + for entry in (self.entries)(self) { + let key = entry.0.as_str(); + let value = entry.1.as_str(); + + if let Some((prefix, inner_key)) = key.split_once('.') + && prefix == C::PREFIX + { + result.set(inner_key, value)?; + } + } + } + + Ok(result) + } +} + +#[cfg(test)] +mod tests { + use datafusion_common::config::{ConfigExtension, ConfigOptions}; + use datafusion_common::extensions_options; + + use crate::config::extension_options::FFI_ExtensionOptions; + + // Define a new configuration struct using the `extensions_options` macro + extensions_options! { + /// My own config options. + pub struct MyConfig { + /// Should "foo" be replaced by "bar"? + pub foo_to_bar: bool, default = true + + /// How many "baz" should be created? + pub baz_count: usize, default = 1337 + } + } + + impl ConfigExtension for MyConfig { + const PREFIX: &'static str = "my_config"; + } + + #[test] + fn round_trip_ffi_extension_options() { + // set up config struct and register extension + let mut config = ConfigOptions::default(); + let mut ffi_options = FFI_ExtensionOptions::default(); + ffi_options.add_config(&MyConfig::default()).unwrap(); + + config.extensions.insert(ffi_options); + + // overwrite config default + config.set("my_config.baz_count", "42").unwrap(); + + // check config state + let returned_ffi_config = + config.extensions.get::().unwrap(); + let my_config: MyConfig = returned_ffi_config.to_extension().unwrap(); + + // check default value + assert!(my_config.foo_to_bar); + + // check overwritten value + assert_eq!(my_config.baz_count, 42); + } +} diff --git a/datafusion/ffi/src/config/mod.rs b/datafusion/ffi/src/config/mod.rs new file mode 100644 index 0000000000000..ead1eed8b654d --- /dev/null +++ b/datafusion/ffi/src/config/mod.rs @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod extension_options; + +use abi_stable::StableAbi; +use abi_stable::std_types::{RHashMap, RString}; +use datafusion_common::config::{ConfigOptions, ExtensionOptions}; +use datafusion_common::{DataFusionError, Result}; + +use crate::config::extension_options::FFI_ExtensionOptions; + +// TODO(tsaucer) add text about how extension options will require user to convert to concrete type. +#[repr(C)] +#[derive(Debug, Clone, StableAbi)] +pub struct FFI_ConfigOptions { + base_options: RHashMap, + + extensions: FFI_ExtensionOptions, +} + +impl From<&ConfigOptions> for FFI_ConfigOptions { + fn from(options: &ConfigOptions) -> Self { + let base_options: RHashMap = options + .entries() + .into_iter() + .filter_map(|entry| entry.value.map(|value| (entry.key, value))) + .map(|(key, value)| (key.into(), value.into())) + .collect(); + + let mut extensions = FFI_ExtensionOptions::default(); + for (extension_name, extension) in options.extensions.iter() { + for entry in extension.entries().iter() { + if let Some(value) = entry.value.as_ref() { + extensions + .set(format!("{extension_name}.{}", entry.key).as_str(), value) + .expect("FFI_ExtensionOptions set should always return Ok"); + } + } + } + + Self { + base_options, + extensions, + } + } +} + +impl TryFrom for ConfigOptions { + type Error = DataFusionError; + fn try_from(ffi_options: FFI_ConfigOptions) -> Result { + let mut options = ConfigOptions::default(); + options.extensions.insert(ffi_options.extensions); + + for kv_tuple in ffi_options.base_options.iter() { + options.set(kv_tuple.0.as_str(), kv_tuple.1.as_str())?; + } + + Ok(options) + } +} diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index bf0cf9b122c1c..75eea9fd0013b 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -29,6 +29,7 @@ pub mod arrow_wrappers; pub mod catalog_provider; pub mod catalog_provider_list; +pub mod config; pub mod execution; pub mod execution_plan; pub mod expr; diff --git a/datafusion/ffi/src/session/config.rs b/datafusion/ffi/src/session/config.rs index eb9c4e2c6986a..c9fb9f7fa0c77 100644 --- a/datafusion/ffi/src/session/config.rs +++ b/datafusion/ffi/src/session/config.rs @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; use std::ffi::c_void; +use crate::config::FFI_ConfigOptions; use abi_stable::StableAbi; -use abi_stable::std_types::{RHashMap, RString}; +use datafusion_common::config::ConfigOptions; use datafusion_common::error::{DataFusionError, Result}; use datafusion_execution::config::SessionConfig; @@ -39,7 +39,7 @@ use datafusion_execution::config::SessionConfig; pub struct FFI_SessionConfig { /// Return a hash map from key to value of the config options represented /// by string values. - pub config_options: unsafe extern "C" fn(config: &Self) -> RHashMap, + pub config_options: FFI_ConfigOptions, /// Used to create a clone on the provider of the execution plan. This should /// only need to be called by the receiver of the plan. @@ -67,21 +67,6 @@ impl FFI_SessionConfig { } } -unsafe extern "C" fn config_options_fn_wrapper( - config: &FFI_SessionConfig, -) -> RHashMap { - let config_options = config.inner().options(); - - let mut options = RHashMap::default(); - for config_entry in config_options.entries() { - if let Some(value) = config_entry.value { - options.insert(config_entry.key.into(), value.into()); - } - } - - options -} - unsafe extern "C" fn release_fn_wrapper(config: &mut FFI_SessionConfig) { unsafe { debug_assert!(!config.private_data.is_null()); @@ -100,7 +85,7 @@ unsafe extern "C" fn clone_fn_wrapper(config: &FFI_SessionConfig) -> FFI_Session let private_data = Box::new(SessionConfigPrivateData { config: old_config }); FFI_SessionConfig { - config_options: config_options_fn_wrapper, + config_options: config.config_options.clone(), private_data: Box::into_raw(private_data) as *mut c_void, clone: clone_fn_wrapper, release: release_fn_wrapper, @@ -119,8 +104,10 @@ impl From<&SessionConfig> for FFI_SessionConfig { config: session.clone(), }); + let config_options = FFI_ConfigOptions::from(session.options().as_ref()); + Self { - config_options: config_options_fn_wrapper, + config_options, private_data: Box::into_raw(private_data) as *mut c_void, clone: clone_fn_wrapper, release: release_fn_wrapper, @@ -149,14 +136,9 @@ impl TryFrom<&FFI_SessionConfig> for SessionConfig { return Ok(config.inner().clone()); } - let config_options = unsafe { (config.config_options)(config) }; - - let mut options_map = HashMap::new(); - config_options.iter().for_each(|kv_pair| { - options_map.insert(kv_pair.0.to_string(), kv_pair.1.to_string()); - }); + let config_options = ConfigOptions::try_from(config.config_options.clone())?; - SessionConfig::from_string_hash_map(&options_map) + Ok(SessionConfig::from(config_options)) } }