Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rust/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ pub enum ErrorCode {
DatasetCreateScalarIndex = 46,
DatasetCalculateDataStats = 47,
DatasetTake = 48,

NamespaceDescribeTableInfo = 49,
NamespaceCreateEmptyTable = 50,
NamespaceDropTable = 51,
}

struct LastError {
Expand Down
251 changes: 250 additions & 1 deletion rust/ffi/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ use std::ptr;
use std::sync::Arc;

use lance::dataset::builder::DatasetBuilder;
use lance_core::Error as LanceError;

use lance_namespace::models::{DescribeTableRequest, ListTablesRequest};
use lance_namespace::models::{
CreateEmptyTableRequest, DescribeTableRequest, DropTableRequest, ListTablesRequest,
};
use lance_namespace::LanceNamespace;
use lance_namespace_impls::RestNamespaceBuilder;

Expand Down Expand Up @@ -43,6 +46,19 @@ fn build_config(
builder
}

fn storage_options_to_tsv(storage_options: std::collections::HashMap<String, String>) -> String {
if storage_options.is_empty() {
return String::new();
}
let mut items: Vec<(String, String)> = storage_options.into_iter().collect();
items.sort_by(|(a, _), (b, _)| a.cmp(b));
items
.into_iter()
.map(|(k, v)| format!("{k}\t{v}"))
.collect::<Vec<_>>()
.join("\n")
}

fn list_tables_inner(
endpoint: *const c_char,
namespace_id: *const c_char,
Expand Down Expand Up @@ -113,6 +129,239 @@ pub unsafe extern "C" fn lance_namespace_list_tables(
}
}

fn describe_table_info_inner(
endpoint: *const c_char,
table_id: *const c_char,
bearer_token: *const c_char,
api_key: *const c_char,
delimiter: *const c_char,
) -> FfiResult<(String, String)> {
let endpoint = unsafe { cstr_to_str(endpoint, "endpoint")? };
let table_id = unsafe { cstr_to_str(table_id, "table_id")? };
let delimiter = unsafe { optional_cstr_to_string(delimiter, "delimiter")? };
let bearer_token = unsafe { optional_cstr_to_string(bearer_token, "bearer_token")? };
let api_key = unsafe { optional_cstr_to_string(api_key, "api_key")? };

let delimiter = delimiter.unwrap_or_else(|| "$".to_string());
let namespace = build_config(endpoint, bearer_token.as_deref(), api_key.as_deref())
.delimiter(delimiter)
.build();

let (location, storage_options_tsv) = runtime::block_on(async move {
let mut req = DescribeTableRequest::new();
req.id = Some(vec![table_id.to_string()]);
let resp = namespace.describe_table(req).await.map_err(|err| {
FfiError::new(
ErrorCode::NamespaceDescribeTableInfo,
format!("namespace describe_table: {err}"),
)
})?;
let location = resp.location.ok_or_else(|| {
FfiError::new(
ErrorCode::NamespaceDescribeTableInfo,
"namespace describe_table: missing location",
)
})?;
let storage_options_tsv = storage_options_to_tsv(resp.storage_options.unwrap_or_default());
Ok::<_, FfiError>((location, storage_options_tsv))
})
.map_err(|err| FfiError::new(ErrorCode::Runtime, format!("runtime: {err}")))??;

Ok((location, storage_options_tsv))
}

#[no_mangle]
pub unsafe extern "C" fn lance_namespace_describe_table(
endpoint: *const c_char,
table_id: *const c_char,
bearer_token: *const c_char,
api_key: *const c_char,
delimiter: *const c_char,
out_location: *mut *const c_char,
out_storage_options_tsv: *mut *const c_char,
) -> i32 {
if !out_location.is_null() {
unsafe {
std::ptr::write_unaligned(out_location, ptr::null());
}
}
if !out_storage_options_tsv.is_null() {
unsafe {
std::ptr::write_unaligned(out_storage_options_tsv, ptr::null());
}
}

match describe_table_info_inner(endpoint, table_id, bearer_token, api_key, delimiter) {
Ok((location, storage_options_tsv)) => {
clear_last_error();
if !out_location.is_null() {
let c = CString::new(location).unwrap_or_else(|_| to_c_string("invalid location"));
unsafe {
std::ptr::write_unaligned(out_location, c.into_raw() as *const c_char);
}
}
if !out_storage_options_tsv.is_null() {
let c = CString::new(storage_options_tsv)
.unwrap_or_else(|_| to_c_string("invalid storage options"));
unsafe {
std::ptr::write_unaligned(
out_storage_options_tsv,
c.into_raw() as *const c_char,
);
}
}
0
}
Err(err) => {
set_last_error(err.code, err.message);
-1
}
}
}

fn create_empty_table_inner(
endpoint: *const c_char,
table_id: *const c_char,
bearer_token: *const c_char,
api_key: *const c_char,
delimiter: *const c_char,
) -> FfiResult<(String, String)> {
let endpoint = unsafe { cstr_to_str(endpoint, "endpoint")? };
let table_id = unsafe { cstr_to_str(table_id, "table_id")? };
let delimiter = unsafe { optional_cstr_to_string(delimiter, "delimiter")? };
let bearer_token = unsafe { optional_cstr_to_string(bearer_token, "bearer_token")? };
let api_key = unsafe { optional_cstr_to_string(api_key, "api_key")? };

let delimiter = delimiter.unwrap_or_else(|| "$".to_string());
let namespace = build_config(endpoint, bearer_token.as_deref(), api_key.as_deref())
.delimiter(delimiter)
.build();

let (location, storage_options_tsv) = runtime::block_on(async move {
let mut req = CreateEmptyTableRequest::new();
req.id = Some(vec![table_id.to_string()]);
let resp = namespace.create_empty_table(req).await.map_err(|err| {
FfiError::new(
ErrorCode::NamespaceCreateEmptyTable,
format!("namespace create_empty_table: {err}"),
)
})?;
let location = resp.location.ok_or_else(|| {
FfiError::new(
ErrorCode::NamespaceCreateEmptyTable,
"namespace create_empty_table: missing location",
)
})?;
let storage_options_tsv = storage_options_to_tsv(resp.storage_options.unwrap_or_default());
Ok::<_, FfiError>((location, storage_options_tsv))
})
.map_err(|err| FfiError::new(ErrorCode::Runtime, format!("runtime: {err}")))??;

Ok((location, storage_options_tsv))
}

#[no_mangle]
pub unsafe extern "C" fn lance_namespace_create_empty_table(
endpoint: *const c_char,
table_id: *const c_char,
bearer_token: *const c_char,
api_key: *const c_char,
delimiter: *const c_char,
out_location: *mut *const c_char,
out_storage_options_tsv: *mut *const c_char,
) -> i32 {
if !out_location.is_null() {
unsafe {
std::ptr::write_unaligned(out_location, ptr::null());
}
}
if !out_storage_options_tsv.is_null() {
unsafe {
std::ptr::write_unaligned(out_storage_options_tsv, ptr::null());
}
}

match create_empty_table_inner(endpoint, table_id, bearer_token, api_key, delimiter) {
Ok((location, storage_options_tsv)) => {
clear_last_error();
if !out_location.is_null() {
let c = CString::new(location).unwrap_or_else(|_| to_c_string("invalid location"));
unsafe {
std::ptr::write_unaligned(out_location, c.into_raw() as *const c_char);
}
}
if !out_storage_options_tsv.is_null() {
let c = CString::new(storage_options_tsv)
.unwrap_or_else(|_| to_c_string("invalid storage options"));
unsafe {
std::ptr::write_unaligned(
out_storage_options_tsv,
c.into_raw() as *const c_char,
);
}
}
0
}
Err(err) => {
set_last_error(err.code, err.message);
-1
}
}
}

fn drop_table_inner(
endpoint: *const c_char,
table_id: *const c_char,
bearer_token: *const c_char,
api_key: *const c_char,
delimiter: *const c_char,
) -> FfiResult<()> {
let endpoint = unsafe { cstr_to_str(endpoint, "endpoint")? };
let table_id = unsafe { cstr_to_str(table_id, "table_id")? };
let delimiter = unsafe { optional_cstr_to_string(delimiter, "delimiter")? };
let bearer_token = unsafe { optional_cstr_to_string(bearer_token, "bearer_token")? };
let api_key = unsafe { optional_cstr_to_string(api_key, "api_key")? };

let delimiter = delimiter.unwrap_or_else(|| "$".to_string());
let namespace = build_config(endpoint, bearer_token.as_deref(), api_key.as_deref())
.delimiter(delimiter)
.build();

runtime::block_on(async move {
let mut req = DropTableRequest::new();
req.id = Some(vec![table_id.to_string()]);
match namespace.drop_table(req).await {
Ok(_) => Ok(()),
Err(LanceError::NotFound { .. }) => Ok(()),
Err(err) => Err(FfiError::new(
ErrorCode::NamespaceDropTable,
format!("namespace drop_table '{table_id}': {err}"),
)),
}
})
.map_err(|err| FfiError::new(ErrorCode::Runtime, format!("runtime: {err}")))?
}

#[no_mangle]
pub unsafe extern "C" fn lance_namespace_drop_table(
endpoint: *const c_char,
table_id: *const c_char,
bearer_token: *const c_char,
api_key: *const c_char,
delimiter: *const c_char,
) -> i32 {
match drop_table_inner(endpoint, table_id, bearer_token, api_key, delimiter) {
Ok(()) => {
clear_last_error();
0
}
Err(err) => {
set_last_error(err.code, err.message);
-1
}
}
}

fn open_dataset_in_namespace_inner(
endpoint: *const c_char,
table_id: *const c_char,
Expand Down
11 changes: 4 additions & 7 deletions rust/ffi/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,7 @@ fn rewrite_rows_update_transaction_inner(
let arrow_schema: Arc<arrow_schema::Schema> = Arc::new(dataset.schema().into());
let planner = Planner::new(arrow_schema.clone());
let predicate_expr = if let Some(predicate) = predicate.as_deref() {
let predicate_expr = planner
.parse_filter(predicate)
.map_err(|e| e.to_string())?;
let predicate_expr = planner.parse_filter(predicate).map_err(|e| e.to_string())?;
let predicate_expr = planner
.optimize_expr(predicate_expr)
.map_err(|e| e.to_string())?;
Expand Down Expand Up @@ -300,17 +298,16 @@ fn rewrite_rows_update_transaction_inner(
.ok_or_else(|| format!("column does not exist: {}", column))?;

let value_sql = if value_sql.trim().eq_ignore_ascii_case("DEFAULT") {
field.metadata
field
.metadata
.get("duckdb_default_expr")
.map(|s| s.as_str())
.unwrap_or("NULL")
} else {
value_sql.as_str()
};

let mut value_expr = planner
.parse_expr(value_sql)
.map_err(|e| e.to_string())?;
let mut value_expr = planner.parse_expr(value_sql).map_err(|e| e.to_string())?;

let dest_type = field.data_type();
let src_type = value_expr.get_type(&df_schema).map_err(|e| e.to_string())?;
Expand Down
8 changes: 6 additions & 2 deletions rust/filter_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,15 @@ fn contains_udf() -> Arc<ScalarUDF> {
}

fn lower_udf() -> Arc<ScalarUDF> {
LOWER_UDF.get_or_init(datafusion_functions::string::lower).clone()
LOWER_UDF
.get_or_init(datafusion_functions::string::lower)
.clone()
}

fn upper_udf() -> Arc<ScalarUDF> {
UPPER_UDF.get_or_init(datafusion_functions::string::upper).clone()
UPPER_UDF
.get_or_init(datafusion_functions::string::upper)
.clone()
}

pub fn parse_filter_ir(filter_ir: &[u8]) -> Result<Expr> {
Expand Down
35 changes: 35 additions & 0 deletions src/include/lance_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,41 @@ LanceOpenDatasetInNamespace(ClientContext &context, const string &endpoint,
const string &api_key, const string &delimiter,
string &out_table_uri);

bool TryLanceNamespaceDescribeTable(
ClientContext &context, const string &endpoint, const string &table_id,
const string &bearer_token, const string &api_key, const string &delimiter,
string &out_location, vector<string> &out_option_keys,
vector<string> &out_option_values, string &out_error);

bool TryLanceNamespaceCreateEmptyTable(
ClientContext &context, const string &endpoint, const string &table_id,
const string &bearer_token, const string &api_key, const string &delimiter,
string &out_location, vector<string> &out_option_keys,
vector<string> &out_option_values, string &out_error);

bool TryLanceNamespaceDropTable(ClientContext &context, const string &endpoint,
const string &table_id,
const string &bearer_token,
const string &api_key, const string &delimiter,
string &out_error);

class LanceTableEntry;

void *LanceOpenDatasetForTable(ClientContext &context,
const LanceTableEntry &table,
string &out_display_uri);

void ResolveLanceStorageOptionsForTable(ClientContext &context,
const LanceTableEntry &table,
string &out_open_path,
vector<string> &out_option_keys,
vector<string> &out_option_values,
string &out_display_uri);

int64_t LanceTruncateDatasetWithStorageOptions(
const string &open_path, const vector<string> &option_keys,
const vector<string> &option_values, const string &display_uri);

int64_t LanceTruncateDataset(ClientContext &context, const string &dataset_uri);

} // namespace duckdb
12 changes: 12 additions & 0 deletions src/include/lance_ffi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ const char *lance_namespace_list_tables(const char *endpoint,
const char *bearer_token,
const char *api_key,
const char *delimiter);
int32_t
lance_namespace_describe_table(const char *endpoint, const char *table_id,
const char *bearer_token, const char *api_key,
const char *delimiter, const char **out_location,
const char **out_storage_options_tsv);
int32_t lance_namespace_create_empty_table(
const char *endpoint, const char *table_id, const char *bearer_token,
const char *api_key, const char *delimiter, const char **out_location,
const char **out_storage_options_tsv);
int32_t lance_namespace_drop_table(const char *endpoint, const char *table_id,
const char *bearer_token,
const char *api_key, const char *delimiter);
void *lance_open_dataset_in_namespace(
const char *endpoint, const char *table_id, const char *bearer_token,
const char *api_key, const char *delimiter, const char **out_table_uri);
Expand Down
Loading