Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 142 additions & 8 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::client::{
use crate::types::{
CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest,
ListNamespaceResponse, ListTableResponse, LoadTableResponse, NamespaceSerde,
RenameTableRequest,
RegisterTableRequest, RenameTableRequest,
};

const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
Expand Down Expand Up @@ -101,6 +101,10 @@ impl RestCatalogConfig {
self.url_prefixed(&["tables", "rename"])
}

fn register_table_endpoint(&self, ns: &NamespaceIdent) -> String {
self.url_prefixed(&["namespaces", &ns.to_url_string(), "register"])
}

fn table_endpoint(&self, table: &TableIdent) -> String {
self.url_prefixed(&[
"namespaces",
Expand Down Expand Up @@ -238,7 +242,7 @@ struct RestContext {
pub struct RestCatalog {
/// User config is stored as-is and never be changed.
///
/// It's could be different from the config fetched from the server and used at runtime.
/// It could be different from the config fetched from the server and used at runtime.
user_config: RestCatalogConfig,
ctx: OnceCell<RestContext>,
/// Extensions for the FileIOBuilder.
Expand Down Expand Up @@ -755,13 +759,60 @@ impl Catalog for RestCatalog {

async fn register_table(
&self,
_table_ident: &TableIdent,
_metadata_location: String,
table_ident: &TableIdent,
metadata_location: String,
) -> Result<Table> {
Err(Error::new(
ErrorKind::FeatureUnsupported,
"Registering a table is not supported yet",
))
let context = self.context().await?;

let request = context
.client
.request(
Method::POST,
context
.config
.register_table_endpoint(table_ident.namespace()),
)
.json(&RegisterTableRequest {
name: table_ident.name.clone(),
metadata_location: metadata_location.clone(),
overwrite: Some(false),
})
.build()?;

let http_response = context.client.query_catalog(request).await?;

let response: LoadTableResponse = match http_response.status() {
StatusCode::OK => {
deserialize_catalog_response::<LoadTableResponse>(http_response).await?
}
StatusCode::NOT_FOUND => {
return Err(Error::new(
ErrorKind::NamespaceNotFound,
"The namespace specified does not exist.",
));
}
StatusCode::CONFLICT => {
return Err(Error::new(
ErrorKind::TableAlreadyExists,
"The given table already exists.",
));
}
_ => return Err(deserialize_unexpected_catalog_error(http_response).await),
};

let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
ErrorKind::DataInvalid,
"Metadata location missing in `register_table` response!",
))?;

let file_io = self.load_file_io(Some(metadata_location), None).await?;

Table::builder()
.identifier(table_ident.clone())
.file_io(file_io)
.metadata(response.metadata)
.metadata_location(metadata_location.clone())
.build()
}

async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
Expand Down Expand Up @@ -2470,4 +2521,87 @@ mod tests {
update_table_mock.assert_async().await;
load_table_mock.assert_async().await;
}

#[tokio::test]
async fn test_register_table() {
let mut server = Server::new_async().await;

let config_mock = create_config_mock(&mut server).await;

let register_table_mock = server
.mock("POST", "/v1/namespaces/ns1/register")
.with_status(200)
.with_body_from_file(format!(
"{}/testdata/{}",
env!("CARGO_MANIFEST_DIR"),
"load_table_response.json"
))
.create_async()
.await;

let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
let table_ident =
TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
let metadata_location = String::from(
"s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
);

let table = catalog
.register_table(&table_ident, metadata_location)
.await
.unwrap();

assert_eq!(
&TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
table.identifier()
);
assert_eq!(
"s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
table.metadata_location().unwrap()
);

config_mock.assert_async().await;
register_table_mock.assert_async().await;
}

#[tokio::test]
async fn test_register_table_404() {
let mut server = Server::new_async().await;

let config_mock = create_config_mock(&mut server).await;

let register_table_mock = server
.mock("POST", "/v1/namespaces/ns1/register")
.with_status(404)
.with_body(
r#"
{
"error": {
"message": "The namespace specified does not exist",
"type": "NoSuchNamespaceErrorException",
"code": 404
}
}
"#,
)
.create_async()
.await;

let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());

let table_ident =
TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
let metadata_location = String::from(
"s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
);
let table = catalog
.register_table(&table_ident, metadata_location)
.await;

assert!(table.is_err());
assert!(table.err().unwrap().message().contains("does not exist"));

config_mock.assert_async().await;
register_table_mock.assert_async().await;
}
}
8 changes: 8 additions & 0 deletions crates/catalog/rest/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,11 @@ pub(super) struct CommitTableResponse {
pub(super) metadata_location: String,
pub(super) metadata: TableMetadata,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub(super) struct RegisterTableRequest {
pub(super) name: String,
pub(super) metadata_location: String,
pub(super) overwrite: Option<bool>,
}
36 changes: 36 additions & 0 deletions crates/catalog/rest/tests/rest_catalog_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,39 @@ async fn test_list_empty_multi_level_namespace() {
.unwrap();
assert!(nss.is_empty());
}

#[tokio::test]
async fn test_register_table() {
let catalog = get_catalog().await;

// Create namespace
let ns = NamespaceIdent::from_strs(["ns"]).unwrap();
catalog.create_namespace(&ns, HashMap::new()).await.unwrap();

// Create the table, store the metadata location, drop the table
let empty_schema = Schema::builder().build().unwrap();
let table_creation = TableCreation::builder()
.name("t1".to_string())
.schema(empty_schema)
.build();

let table = catalog.create_table(&ns, table_creation).await.unwrap();

let metadata_location = table.metadata_location().unwrap();
catalog.drop_table(table.identifier()).await.unwrap();

let new_table_identifier = TableIdent::from_strs(["ns", "t2"]).unwrap();
let table_registered = catalog
.register_table(&new_table_identifier, metadata_location.to_string())
.await
.unwrap();

assert_eq!(
table.metadata_location(),
table_registered.metadata_location()
);
assert_ne!(
table.identifier().to_string(),
table_registered.identifier().to_string()
);
}
Loading