Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,10 +703,13 @@ impl TableFunctionImpl for StatisticsCacheFunc {
}
}

// Implementation of the `list_files_cache` table function in datafusion-cli.
/// Implementation of the `list_files_cache` table function in datafusion-cli.
///
/// This function returns the cached results of running a LIST command on a
/// particular object store path for a table. The object metadata is returned as
/// a List of Structs, with one Struct for each object. DataFusion uses these
/// cached results to plan queries against external tables.
///
/// This function returns the cached results of running a LIST command on a particular object store path for a table. The object metadata is returned as a List of Structs, with one Struct for each object.
/// DataFusion uses these cached results to plan queries against external tables.
/// # Schema
/// ```sql
/// > describe select * from list_files_cache();
Expand Down Expand Up @@ -788,7 +791,7 @@ impl TableFunctionImpl for ListFilesCacheFunc {
Field::new("metadata", DataType::Struct(nested_fields.clone()), true);

let schema = Arc::new(Schema::new(vec![
Field::new("table", DataType::Utf8, false),
Field::new("table", DataType::Utf8, true),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This uses NULL (rather than "NULL") in list_files_cache

Field::new("path", DataType::Utf8, false),
Field::new("metadata_size_bytes", DataType::UInt64, false),
// expires field in ListFilesEntry has type Instant when set, from which we cannot get "the number of seconds", hence using Duration instead of Timestamp as data type.
Expand Down Expand Up @@ -821,7 +824,7 @@ impl TableFunctionImpl for ListFilesCacheFunc {
let mut current_offset: i32 = 0;

for (path, entry) in list_files_cache.list_entries() {
table_arr.push(path.table.map_or("NULL".to_string(), |t| t.to_string()));
table_arr.push(path.table.map(|t| t.to_string()));
path_arr.push(path.path.to_string());
metadata_size_bytes_arr.push(entry.size_bytes as u64);
// calculates time left before entry expires
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,7 @@ impl TableProviderFactory for ListingTableFactory {
}
None => format!("*.{}", cmd.file_type.to_lowercase()),
};
table_path = table_path
.with_glob(glob.as_ref())?
.with_table_ref(cmd.name.clone());
table_path = table_path.with_glob(glob.as_ref())?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified due to the change in with_glob

}
let schema = options.infer_schema(session_state, &table_path).await?;
let df_schema = Arc::clone(&schema).to_dfschema()?;
Expand Down
12 changes: 7 additions & 5 deletions datafusion/datasource/src/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct ListingTableUrl {
prefix: Path,
/// An optional glob expression used to filter files
glob: Option<Pattern>,

/// Optional table reference for the table this url belongs to
table_ref: Option<TableReference>,
}

Expand Down Expand Up @@ -340,17 +340,19 @@ impl ListingTableUrl {
}

/// Returns a copy of current [`ListingTableUrl`] with a specified `glob`
pub fn with_glob(self, glob: &str) -> Result<Self> {
let glob =
Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?;
Self::try_new(self.url, Some(glob))
pub fn with_glob(mut self, glob: &str) -> Result<Self> {
self.glob =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to call the constructor again as self.path hasn't changed and the constructor doesn't do anything with glob

pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
let prefix = Path::from_url_path(url.path())?;
Ok(Self {
url,
prefix,
glob,
table_ref: None,
})
}

This then simplifies the code in datafusion/core/src/datasource/listing_table_factory.rs

Some(Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?);
Ok(self)
}

/// Set the table reference for this [`ListingTableUrl`]
pub fn with_table_ref(mut self, table_ref: TableReference) -> Self {
self.table_ref = Some(table_ref);
self
}

/// Return the table reference for this [`ListingTableUrl`]
pub fn get_table_ref(&self) -> &Option<TableReference> {
&self.table_ref
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/execution/src/cache/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub trait ListFilesCache:
/// Retrieves the information about the entries currently cached.
fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry>;

/// Drop all entries for the given table reference.
fn drop_table_entries(&self, table_ref: &Option<TableReference>) -> Result<()>;
}

Expand Down
5 changes: 5 additions & 0 deletions datafusion/execution/src/cache/list_files_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB
/// The default cache TTL for the [`DefaultListFilesCache`]
pub const DEFAULT_LIST_FILES_CACHE_TTL: Option<Duration> = None; // Infinite

/// Key for [`DefaultListFilesCache`]
///
/// Each entry is scoped to its use within a specific table so that the cache
/// can differentiate between identical paths in different tables, and
/// table-level cache invalidation.
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
pub struct TableScopedPath {
pub table: Option<TableReference>,
Expand Down