Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 162 additions & 2 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@

//! Functions that are query-able and searchable via the `\h` command

use datafusion_common::instant::Instant;
use std::fmt;
use std::fs::File;
use std::str::FromStr;
use std::sync::Arc;

use arrow::array::{Int64Array, StringArray, TimestampMillisecondArray, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow::array::{
DurationMillisecondArray, GenericListArray, Int64Array, StringArray, StructArray,
TimestampMillisecondArray, UInt64Array,
};
use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use datafusion::catalog::{Session, TableFunctionImpl};
Expand Down Expand Up @@ -697,3 +702,158 @@ impl TableFunctionImpl for StatisticsCacheFunc {
Ok(Arc::new(statistics_cache))
}
}

#[derive(Debug)]
struct ListFilesCacheTable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might help to add some comments and context on what this function does here, to help future readers

Something like this perhaps:

Suggested change
struct ListFilesCacheTable {
/// Implementation of the `list_files_cache` table function in datafusion-cli
///
/// This function returns the cached results of running a LIST command on a particular object
/// store path. The object metadata is returned as a List of Structs, with one Struct for each object.
/// DataFusion uses these cached results to plan queries against external tables
///
/// # Schema
/// ```sql
/// > describe select * from list_files_cache();
/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
/// | column_name | data_type | is_nullable |
/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
/// | path | Utf8 | NO |
/// | metadata_size_bytes | UInt64 | NO |
/// | expires_in | Duration(ms) | YES |
/// | metadata_list | List(Struct("file_path": non-null Utf8, "file_modified": non-null Timestamp(ms), "file_size_bytes": non-null UInt64, "e_tag": Utf8, "version": Utf8), field: 'metadata') | YES |
/// +---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
/// ```
struct ListFilesCacheTable {

schema: SchemaRef,
batch: RecordBatch,
}

#[async_trait]
impl TableProvider for ListFilesCacheTable {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> arrow::datatypes::SchemaRef {
self.schema.clone()
}

fn table_type(&self) -> datafusion::logical_expr::TableType {
datafusion::logical_expr::TableType::Base
}

async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(MemorySourceConfig::try_new_exec(
&[vec![self.batch.clone()]],
TableProvider::schema(self),
projection.cloned(),
)?)
}
}

#[derive(Debug)]
pub struct ListFilesCacheFunc {
cache_manager: Arc<CacheManager>,
}

impl ListFilesCacheFunc {
pub fn new(cache_manager: Arc<CacheManager>) -> Self {
Self { cache_manager }
}
}

impl TableFunctionImpl for ListFilesCacheFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
if !exprs.is_empty() {
return plan_err!("list_files_cache should have no arguments");
}

let nested_fields = Fields::from(vec![
Field::new("file_path", DataType::Utf8, false),
Field::new(
"file_modified",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("file_size_bytes", DataType::UInt64, false),
Field::new("e_tag", DataType::Utf8, true),
Field::new("version", DataType::Utf8, true),
]);

let metadata_field =
Field::new("metadata", DataType::Struct(nested_fields.clone()), true);

let schema = Arc::new(Schema::new(vec![
Field::new("path", DataType::Utf8, false),
Field::new("metadata_size_bytes", DataType::UInt64, false),
// expires field in ListFilesEntry has type Instant when set, from which we cannot get "the number of seconds", hence using Duration instead of Timestamp as data type.
Field::new(
"expires_in",
DataType::Duration(TimeUnit::Millisecond),
true,
),
Field::new(
"metadata_list",
DataType::List(Arc::new(metadata_field.clone())),
true,
),
]));

let mut path_arr = vec![];
let mut metadata_size_bytes_arr = vec![];
let mut expires_arr = vec![];

let mut file_path_arr = vec![];
let mut file_modified_arr = vec![];
let mut file_size_bytes_arr = vec![];
let mut etag_arr = vec![];
let mut version_arr = vec![];
let mut offsets: Vec<i32> = vec![0];

if let Some(list_files_cache) = self.cache_manager.get_list_files_cache() {
let now = Instant::now();
let mut current_offset: i32 = 0;

for (path, entry) in list_files_cache.list_entries() {
path_arr.push(path.to_string());
metadata_size_bytes_arr.push(entry.size_bytes as u64);
// calculates time left before entry expires
expires_arr.push(
entry
.expires
.map(|t| t.duration_since(now).as_millis() as i64),
);

for meta in entry.metas.iter() {
file_path_arr.push(meta.location.to_string());
file_modified_arr.push(meta.last_modified.timestamp_millis());
file_size_bytes_arr.push(meta.size);
etag_arr.push(meta.e_tag.clone());
version_arr.push(meta.version.clone());
}
current_offset += entry.metas.len() as i32;
offsets.push(current_offset);
}
}

let struct_arr = StructArray::new(
nested_fields,
vec![
Arc::new(StringArray::from(file_path_arr)),
Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
Arc::new(UInt64Array::from(file_size_bytes_arr)),
Arc::new(StringArray::from(etag_arr)),
Arc::new(StringArray::from(version_arr)),
],
None,
);

let offsets_buffer: OffsetBuffer<i32> =
OffsetBuffer::new(ScalarBuffer::from(Buffer::from_vec(offsets)));

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(path_arr)),
Arc::new(UInt64Array::from(metadata_size_bytes_arr)),
Arc::new(DurationMillisecondArray::from(expires_arr)),
Arc::new(GenericListArray::new(
Arc::new(metadata_field),
offsets_buffer,
Arc::new(struct_arr),
None,
)),
],
)?;

let list_files_cache = ListFilesCacheTable { schema, batch };
Ok(Arc::new(list_files_cache))
}
}
144 changes: 141 additions & 3 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use datafusion::logical_expr::ExplainFormat;
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
use datafusion_cli::functions::{
MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc,
ListFilesCacheFunc, MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc,
};
use datafusion_cli::object_storage::instrumented::{
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
Expand Down Expand Up @@ -253,6 +253,13 @@ async fn main_inner() -> Result<()> {
)),
);

ctx.register_udtf(
"list_files_cache",
Arc::new(ListFilesCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);

let mut print_options = PrintOptions {
format: args.format,
quiet: args.quiet,
Expand Down Expand Up @@ -431,15 +438,20 @@ pub fn extract_disk_limit(size: &str) -> Result<usize, String> {

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;
use datafusion::{
common::test_util::batches_to_string,
execution::cache::{
cache_manager::CacheManagerConfig, cache_unit::DefaultFileStatisticsCache,
DefaultListFilesCache, cache_manager::CacheManagerConfig,
cache_unit::DefaultFileStatisticsCache,
},
prelude::ParquetReadOptions,
prelude::{ParquetReadOptions, col, lit, split_part},
};
use insta::assert_snapshot;
use object_store::memory::InMemory;
use url::Url;

fn assert_conversion(input: &str, expected: Result<usize, String>) {
let result = extract_memory_pool_size(input);
Expand Down Expand Up @@ -741,4 +753,130 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_list_files_cache() -> Result<(), DataFusionError> {
let list_files_cache = Arc::new(DefaultListFilesCache::new(
1024,
Some(Duration::from_secs(1)),
));

let rt = RuntimeEnvBuilder::new()
.with_cache_manager(
CacheManagerConfig::default()
.with_list_files_cache(Some(list_files_cache)),
)
.build_arc()
.unwrap();

let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt);

ctx.register_object_store(
&Url::parse("mem://test_table").unwrap(),
Arc::new(InMemory::new()),
);

ctx.register_udtf(
"list_files_cache",
Arc::new(ListFilesCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);

ctx.sql(
"CREATE EXTERNAL TABLE src_table
STORED AS PARQUET
LOCATION '../parquet-testing/data/alltypes_plain.parquet'",
)
.await?
.collect()
.await?;

ctx.sql("COPY (SELECT * FROM src_table) TO 'mem://test_table/0.parquet' STORED AS PARQUET").await?.collect().await?;

ctx.sql("COPY (SELECT * FROM src_table) TO 'mem://test_table/1.parquet' STORED AS PARQUET").await?.collect().await?;

ctx.sql(
"CREATE EXTERNAL TABLE test_table
STORED AS PARQUET
LOCATION 'mem://test_table/'
",
)
.await?
.collect()
.await?;

let sql = "SELECT metadata_size_bytes, expires_in, metadata_list FROM list_files_cache()";
let df = ctx
.sql(sql)
.await?
.unnest_columns(&["metadata_list"])?
.with_column_renamed("metadata_list", "metadata")?
.unnest_columns(&["metadata"])?;

assert_eq!(
2,
df.clone()
.filter(col("expires_in").is_not_null())?
.count()
.await?
);

let df = df
.with_column_renamed(r#""metadata.file_size_bytes""#, "file_size_bytes")?
.with_column_renamed(r#""metadata.e_tag""#, "etag")?
.with_column(
"filename",
split_part(col(r#""metadata.file_path""#), lit("/"), lit(-1)),
)?
.select_columns(&[
"metadata_size_bytes",
"filename",
"file_size_bytes",
"etag",
])?
.sort(vec![col("filename").sort(true, false)])?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r"
+---------------------+-----------+-----------------+------+
| metadata_size_bytes | filename | file_size_bytes | etag |
+---------------------+-----------+-----------------+------+
| 212 | 0.parquet | 3645 | 0 |
| 212 | 1.parquet | 3645 | 1 |
+---------------------+-----------+-----------------+------+
");

Ok(())
}

#[tokio::test]
async fn test_list_files_cache_not_set() -> Result<(), DataFusionError> {
let rt = RuntimeEnvBuilder::new()
.with_cache_manager(CacheManagerConfig::default().with_list_files_cache(None))
.build_arc()
.unwrap();

let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt);

ctx.register_udtf(
"list_files_cache",
Arc::new(ListFilesCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);

let rbs = ctx
.sql("SELECT * FROM list_files_cache()")
.await?
.collect()
.await?;
assert_snapshot!(batches_to_string(&rbs),@r"
+------+---------------------+------------+---------------+
| path | metadata_size_bytes | expires_in | metadata_list |
+------+---------------------+------------+---------------+
+------+---------------------+------------+---------------+
");

Ok(())
}
}
4 changes: 4 additions & 0 deletions datafusion/execution/src/cache/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::cache::cache_unit::DefaultFilesMetadataCache;
use crate::cache::list_files_cache::ListFilesEntry;
use crate::cache::{CacheAccessor, DefaultListFilesCache};
use datafusion_common::stats::Precision;
use datafusion_common::{Result, Statistics};
Expand Down Expand Up @@ -93,6 +94,9 @@ pub trait ListFilesCache:

/// Updates the cache with a new TTL (time-to-live).
fn update_cache_ttl(&self, ttl: Option<Duration>);

/// Retrieves the information about the entries currently cached.
fn list_entries(&self) -> HashMap<Path, ListFilesEntry>;
}

/// Generic file-embedded metadata used with [`FileMetadataCache`].
Expand Down
Loading