Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,23 @@ impl TableFunctionImpl for StatisticsCacheFunc {
}
}

// Implementation of the `list_files_cache` table function in datafusion-cli.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

///
/// This function returns the cached results of running a LIST command on a particular object store path for a table. The object metadata is returned as a List of Structs, with one Struct for each object.
/// DataFusion uses these cached results to plan queries against external tables.
/// # Schema
/// ```sql
/// > describe select * from list_files_cache();
/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
/// | column_name | data_type | is_nullable |
/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
/// | table | Utf8 | NO |
/// | path | Utf8 | NO |
/// | metadata_size_bytes | UInt64 | NO |
/// | expires_in | Duration(ms) | YES |
/// | metadata_list | List(Struct("file_path": non-null Utf8, "file_modified": non-null Timestamp(ms), "file_size_bytes": non-null UInt64, "e_tag": Utf8, "version": Utf8), field: 'metadata') | YES |
/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
/// ```
#[derive(Debug)]
struct ListFilesCacheTable {
schema: SchemaRef,
Expand Down Expand Up @@ -771,6 +788,7 @@ impl TableFunctionImpl for ListFilesCacheFunc {
Field::new("metadata", DataType::Struct(nested_fields.clone()), true);

let schema = Arc::new(Schema::new(vec![
Field::new("table", DataType::Utf8, false),
Field::new("path", DataType::Utf8, false),
Field::new("metadata_size_bytes", DataType::UInt64, false),
// expires field in ListFilesEntry has type Instant when set, from which we cannot get "the number of seconds", hence using Duration instead of Timestamp as data type.
Expand All @@ -786,6 +804,7 @@ impl TableFunctionImpl for ListFilesCacheFunc {
),
]));

let mut table_arr = vec![];
let mut path_arr = vec![];
let mut metadata_size_bytes_arr = vec![];
let mut expires_arr = vec![];
Expand All @@ -802,7 +821,8 @@ impl TableFunctionImpl for ListFilesCacheFunc {
let mut current_offset: i32 = 0;

for (path, entry) in list_files_cache.list_entries() {
path_arr.push(path.to_string());
table_arr.push(path.table.map_or("NULL".to_string(), |t| t.to_string()));
path_arr.push(path.path.to_string());
metadata_size_bytes_arr.push(entry.size_bytes as u64);
// calculates time left before entry expires
expires_arr.push(
Expand Down Expand Up @@ -841,6 +861,7 @@ impl TableFunctionImpl for ListFilesCacheFunc {
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(table_arr)),
Arc::new(StringArray::from(path_arr)),
Arc::new(UInt64Array::from(metadata_size_bytes_arr)),
Arc::new(DurationMillisecondArray::from(expires_arr)),
Expand Down
31 changes: 0 additions & 31 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -848,35 +848,4 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_list_files_cache_not_set() -> Result<(), DataFusionError> {
let rt = RuntimeEnvBuilder::new()
.with_cache_manager(CacheManagerConfig::default().with_list_files_cache(None))
.build_arc()
.unwrap();

let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt);

ctx.register_udtf(
"list_files_cache",
Arc::new(ListFilesCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);

let rbs = ctx
.sql("SELECT * FROM list_files_cache()")
.await?
.collect()
.await?;
assert_snapshot!(batches_to_string(&rbs),@r"
+------+---------------------+------------+---------------+
| path | metadata_size_bytes | expires_in | metadata_list |
+------+---------------------+------------+---------------+
+------+---------------------+------------+---------------+
");

Ok(())
}
}
7 changes: 6 additions & 1 deletion datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::{
ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics,
};
use datafusion_execution::cache::TableScopedPath;
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_expr::dml::InsertOp;
Expand Down Expand Up @@ -565,7 +566,11 @@ impl TableProvider for ListingTable {

// Invalidate cache entries for this table if they exist
if let Some(lfc) = state.runtime_env().cache_manager.get_list_files_cache() {
let _ = lfc.remove(table_path.prefix());
let key = TableScopedPath {
table: table_path.get_table_ref().clone(),
path: table_path.prefix().clone(),
};
let _ = lfc.remove(&key);
}

// Sink related option, apart from format
Expand Down
7 changes: 5 additions & 2 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ impl TableProviderFactory for ListingTableFactory {
))?
.create(session_state, &cmd.options)?;

let mut table_path = ListingTableUrl::parse(&cmd.location)?;
let mut table_path =
ListingTableUrl::parse(&cmd.location)?.with_table_ref(cmd.name.clone());
let file_extension = match table_path.is_collection() {
// Setting the extension to be empty instead of allowing the default extension seems
// odd, but was done to ensure existing behavior isn't modified. It seems like this
Expand Down Expand Up @@ -160,7 +161,9 @@ impl TableProviderFactory for ListingTableFactory {
}
None => format!("*.{}", cmd.file_type.to_lowercase()),
};
table_path = table_path.with_glob(glob.as_ref())?;
table_path = table_path
.with_glob(glob.as_ref())?
.with_table_ref(cmd.name.clone());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with_glob drops table_ref in the url, hence setting again

pub fn with_glob(self, glob: &str) -> Result<Self> {
let glob =
Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?;
Self::try_new(self.url, Some(glob))
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could make table_path maintain with_glob. I did this change in

And it seems to have worked well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much! I think I can merge the change, or would you like that in a separate PR?

}
let schema = options.infer_schema(session_state, &table_path).await?;
let df_schema = Arc::clone(&schema).to_dfschema()?;
Expand Down
7 changes: 6 additions & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,7 @@ impl SessionContext {
let table = table_ref.table().to_owned();
let maybe_schema = {
let state = self.state.read();
let resolved = state.resolve_table_ref(table_ref);
let resolved = state.resolve_table_ref(table_ref.clone());
state
.catalog_list()
.catalog(&resolved.catalog)
Expand All @@ -1327,6 +1327,11 @@ impl SessionContext {
&& table_provider.table_type() == table_type
{
schema.deregister_table(&table)?;
if table_type == TableType::Base
&& let Some(lfc) = self.runtime_env().cache_manager.get_list_files_cache()
{
lfc.drop_table_entries(&Some(table_ref))?;
}
return Ok(true);
}

Expand Down
49 changes: 43 additions & 6 deletions datafusion/datasource/src/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

use std::sync::Arc;

use datafusion_common::{DataFusionError, Result};
use datafusion_common::{DataFusionError, Result, TableReference};
use datafusion_execution::cache::TableScopedPath;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_session::Session;

Expand All @@ -41,6 +42,8 @@ pub struct ListingTableUrl {
prefix: Path,
/// An optional glob expression used to filter files
glob: Option<Pattern>,

table_ref: Option<TableReference>,
}

impl ListingTableUrl {
Expand Down Expand Up @@ -145,7 +148,12 @@ impl ListingTableUrl {
/// to create a [`ListingTableUrl`].
pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
let prefix = Path::from_url_path(url.path())?;
Ok(Self { url, prefix, glob })
Ok(Self {
url,
prefix,
glob,
table_ref: None,
})
}

/// Returns the URL scheme
Expand Down Expand Up @@ -255,7 +263,14 @@ impl ListingTableUrl {
};

let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
list_with_cache(ctx, store, &self.prefix, prefix.as_ref()).await?
list_with_cache(
ctx,
store,
self.table_ref.as_ref(),
&self.prefix,
prefix.as_ref(),
)
.await?
} else {
match store.head(&full_prefix).await {
Ok(meta) => futures::stream::once(async { Ok(meta) })
Expand All @@ -264,7 +279,14 @@ impl ListingTableUrl {
// If the head command fails, it is likely that object doesn't exist.
// Retry as though it were a prefix (aka a collection)
Err(object_store::Error::NotFound { .. }) => {
list_with_cache(ctx, store, &self.prefix, prefix.as_ref()).await?
list_with_cache(
ctx,
store,
self.table_ref.as_ref(),
&self.prefix,
prefix.as_ref(),
)
.await?
}
Err(e) => return Err(e.into()),
}
Expand Down Expand Up @@ -323,6 +345,15 @@ impl ListingTableUrl {
Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?;
Self::try_new(self.url, Some(glob))
}

pub fn with_table_ref(mut self, table_ref: TableReference) -> Self {
self.table_ref = Some(table_ref);
self
}

pub fn get_table_ref(&self) -> &Option<TableReference> {
&self.table_ref
}
}

/// Lists files with cache support, using prefix-aware lookups.
Expand All @@ -345,6 +376,7 @@ impl ListingTableUrl {
async fn list_with_cache<'b>(
ctx: &'b dyn Session,
store: &'b dyn ObjectStore,
table_ref: Option<&TableReference>,
table_base_path: &Path,
prefix: Option<&Path>,
) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
Expand All @@ -367,9 +399,14 @@ async fn list_with_cache<'b>(
// Convert prefix to Option<Path> for cache lookup
let prefix_filter = prefix.cloned();

let table_scoped_base_path = TableScopedPath {
table: table_ref.cloned(),
path: table_base_path.clone(),
};

// Try cache lookup with optional prefix filter
let vec = if let Some(res) =
cache.get_with_extra(table_base_path, &prefix_filter)
cache.get_with_extra(&table_scoped_base_path, &prefix_filter)
{
debug!("Hit list files cache");
res.as_ref().clone()
Expand All @@ -380,7 +417,7 @@ async fn list_with_cache<'b>(
.list(Some(table_base_path))
.try_collect::<Vec<ObjectMeta>>()
.await?;
cache.put(table_base_path, Arc::new(vec.clone()));
cache.put(&table_scoped_base_path, Arc::new(vec.clone()));

// If a prefix filter was requested, apply it to the results
if prefix.is_some() {
Expand Down
8 changes: 6 additions & 2 deletions datafusion/execution/src/cache/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

use crate::cache::cache_unit::DefaultFilesMetadataCache;
use crate::cache::list_files_cache::ListFilesEntry;
use crate::cache::list_files_cache::TableScopedPath;
use crate::cache::{CacheAccessor, DefaultListFilesCache};
use datafusion_common::TableReference;
use datafusion_common::stats::Precision;
use datafusion_common::{Result, Statistics};
use object_store::ObjectMeta;
Expand Down Expand Up @@ -81,7 +83,7 @@ pub struct FileStatisticsCacheEntry {
///
/// See [`crate::runtime_env::RuntimeEnv`] for more details.
pub trait ListFilesCache:
CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = Option<Path>>
CacheAccessor<TableScopedPath, Arc<Vec<ObjectMeta>>, Extra = Option<Path>>
{
/// Returns the cache's memory limit in bytes.
fn cache_limit(&self) -> usize;
Expand All @@ -96,7 +98,9 @@ pub trait ListFilesCache:
fn update_cache_ttl(&self, ttl: Option<Duration>);

/// Retrieves the information about the entries currently cached.
fn list_entries(&self) -> HashMap<Path, ListFilesEntry>;
fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry>;

fn drop_table_entries(&self, table_ref: &Option<TableReference>) -> Result<()>;
}

/// Generic file-embedded metadata used with [`FileMetadataCache`].
Expand Down
Loading