diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs index 7837032e..05d3d6a1 100644 --- a/bindings/cpp/src/types.rs +++ b/bindings/cpp/src/types.rs @@ -257,7 +257,7 @@ fn get_decimal_type(idx: usize, schema: Option<&fcore::metadata::Schema>) -> Res .ok_or_else(|| anyhow!("Schema not available for decimal column {idx}"))?; match col.data_type() { fcore::metadata::DataType::Decimal(dt) => Ok((dt.precision(), dt.scale())), - other => Err(anyhow!("Column {idx} is {:?}, not Decimal", other)), + other => Err(anyhow!("Column {idx} is {other:?}, not Decimal")), } } diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 50f3b20b..a9ef828a 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -121,6 +121,45 @@ class FlussConnection: def __repr__(self) -> str: ... class FlussAdmin: + async def create_database( + self, + database_name: str, + ignore_if_exists: bool = False, + database_descriptor: Optional["DatabaseDescriptor"] = None, + ) -> None: + """Create a database.""" + ... + async def drop_database( + self, + database_name: str, + ignore_if_not_exists: bool = False, + cascade: bool = True, + ) -> None: + """Drop a database.""" + ... + async def list_databases(self) -> List[str]: + """List all databases.""" + ... + async def database_exists(self, database_name: str) -> bool: + """Check if a database exists.""" + ... + async def get_database_info(self, database_name: str) -> "DatabaseInfo": + """Get database information.""" + ... + async def list_tables(self, database_name: str) -> List[str]: + """List all tables in a database.""" + ... + async def table_exists(self, table_path: TablePath) -> bool: + """Check if a table exists.""" + ... + async def drop_partition( + self, + table_path: TablePath, + partition_spec: Dict[str, str], + ignore_if_not_exists: bool = False, + ) -> None: + """Drop a partition from a partitioned table.""" + ... async def create_table( self, table_path: TablePath, @@ -203,6 +242,33 @@ class FlussAdmin: ... def __repr__(self) -> str: ... + +class DatabaseDescriptor: + """Descriptor for a Fluss database (comment and custom properties).""" + + def __init__( + self, + comment: Optional[str] = None, + custom_properties: Optional[Dict[str, str]] = None, + ) -> None: ... + @property + def comment(self) -> Optional[str]: ... + def get_custom_properties(self) -> Dict[str, str]: ... + def __repr__(self) -> str: ... + + +class DatabaseInfo: + """Information about a Fluss database.""" + + @property + def database_name(self) -> str: ... + def get_database_descriptor(self) -> DatabaseDescriptor: ... + @property + def created_time(self) -> int: ... + @property + def modified_time(self) -> int: ... + def __repr__(self) -> str: ... + class TableScan: """Builder for creating log scanners with flexible configuration. diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs index d28c9c06..335aa248 100644 --- a/bindings/python/src/admin.rs +++ b/bindings/python/src/admin.rs @@ -17,6 +17,7 @@ use crate::*; use fcore::rpc::message::OffsetSpec; +use pyo3::conversion::IntoPyObject; use pyo3_async_runtimes::tokio::future_into_py; use std::sync::Arc; @@ -38,8 +39,7 @@ fn parse_offset_spec(offset_type: &str, timestamp: Option) -> PyResult Err(FlussError::new_err(format!( - "Invalid offset_type: '{}'. Must be 'earliest', 'latest', or 'timestamp'", - offset_type + "Invalid offset_type: '{offset_type}'. Must be 'earliest', 'latest', or 'timestamp'" ))), } } @@ -49,8 +49,7 @@ fn validate_bucket_ids(bucket_ids: &[i32]) -> PyResult<()> { for &bucket_id in bucket_ids { if bucket_id < 0 { return Err(FlussError::new_err(format!( - "Invalid bucket_id: {}. Bucket IDs must be non-negative", - bucket_id + "Invalid bucket_id: {bucket_id}. Bucket IDs must be non-negative" ))); } } @@ -59,6 +58,226 @@ fn validate_bucket_ids(bucket_ids: &[i32]) -> PyResult<()> { #[pymethods] impl FlussAdmin { + /// Create a database. + /// + /// Args: + /// database_name: Name of the database + /// ignore_if_exists: If True, don't raise error if database already exists + /// database_descriptor: Optional descriptor (comment, custom_properties) + /// + /// Returns: + /// None + #[pyo3(signature = (database_name, ignore_if_exists=false, database_descriptor=None))] + pub fn create_database<'py>( + &self, + py: Python<'py>, + database_name: &str, + ignore_if_exists: bool, + database_descriptor: Option<&DatabaseDescriptor>, + ) -> PyResult> { + let admin = self.__admin.clone(); + let name = database_name.to_string(); + let descriptor = database_descriptor.map(|d| d.to_core().clone()); + + future_into_py(py, async move { + admin + .create_database(&name, ignore_if_exists, descriptor.as_ref()) + .await + .map_err(|e| FlussError::new_err(format!("Failed to create database: {e}")))?; + + Python::attach(|py| Ok(py.None())) + }) + } + + /// Drop a database. + /// + /// Args: + /// database_name: Name of the database + /// ignore_if_not_exists: If True, don't raise error if database does not exist + /// cascade: If True, drop tables in the database first + /// + /// Returns: + /// None + #[pyo3(signature = (database_name, ignore_if_not_exists=false, cascade=true))] + pub fn drop_database<'py>( + &self, + py: Python<'py>, + database_name: &str, + ignore_if_not_exists: bool, + cascade: bool, + ) -> PyResult> { + let admin = self.__admin.clone(); + let name = database_name.to_string(); + + future_into_py(py, async move { + admin + .drop_database(&name, ignore_if_not_exists, cascade) + .await + .map_err(|e| FlussError::new_err(format!("Failed to drop database: {e}")))?; + + Python::attach(|py| Ok(py.None())) + }) + } + + /// List all databases. + /// + /// Returns: + /// List[str]: Names of all databases + pub fn list_databases<'py>(&self, py: Python<'py>) -> PyResult> { + let admin = self.__admin.clone(); + + future_into_py(py, async move { + let names = admin + .list_databases() + .await + .map_err(|e| FlussError::new_err(format!("Failed to list databases: {e}")))?; + + Python::attach(|py| { + let py_list = pyo3::types::PyList::empty(py); + for name in names { + py_list.append(name)?; + } + Ok(py_list.unbind()) + }) + }) + } + + /// Check if a database exists. + /// + /// Args: + /// database_name: Name of the database + /// + /// Returns: + /// bool: True if the database exists + pub fn database_exists<'py>( + &self, + py: Python<'py>, + database_name: &str, + ) -> PyResult> { + let admin = self.__admin.clone(); + let name = database_name.to_string(); + + future_into_py(py, async move { + let exists = admin.database_exists(&name).await.map_err(|e| { + FlussError::new_err(format!("Failed to check database exists: {e}")) + })?; + + Python::attach(|py| Ok(exists.into_pyobject(py)?.to_owned().into_any().unbind())) + }) + } + + /// Get database information. + /// + /// Args: + /// database_name: Name of the database + /// + /// Returns: + /// DatabaseInfo: Database metadata + pub fn get_database_info<'py>( + &self, + py: Python<'py>, + database_name: &str, + ) -> PyResult> { + let admin = self.__admin.clone(); + let name = database_name.to_string(); + + future_into_py(py, async move { + let info = admin + .get_database_info(&name) + .await + .map_err(|e| FlussError::new_err(format!("Failed to get database info: {e}")))?; + + Python::attach(|py| Py::new(py, DatabaseInfo::from_core(info))) + }) + } + + /// List all tables in a database. + /// + /// Args: + /// database_name: Name of the database + /// + /// Returns: + /// List[str]: Names of all tables in the database + pub fn list_tables<'py>( + &self, + py: Python<'py>, + database_name: &str, + ) -> PyResult> { + let admin = self.__admin.clone(); + let name = database_name.to_string(); + + future_into_py(py, async move { + let names = admin + .list_tables(&name) + .await + .map_err(|e| FlussError::new_err(format!("Failed to list tables: {e}")))?; + + Python::attach(|py| { + let py_list = pyo3::types::PyList::empty(py); + for name in names { + py_list.append(name)?; + } + Ok(py_list.unbind()) + }) + }) + } + + /// Check if a table exists. + /// + /// Args: + /// table_path: Path to the table (database, table) + /// + /// Returns: + /// bool: True if the table exists + pub fn table_exists<'py>( + &self, + py: Python<'py>, + table_path: &TablePath, + ) -> PyResult> { + let core_table_path = table_path.to_core(); + let admin = self.__admin.clone(); + + future_into_py(py, async move { + let exists = admin + .table_exists(&core_table_path) + .await + .map_err(|e| FlussError::new_err(format!("Failed to check table exists: {e}")))?; + + Python::attach(|py| Ok(exists.into_pyobject(py)?.to_owned().into_any().unbind())) + }) + } + + /// Drop a partition from a partitioned table. + /// + /// Args: + /// table_path: Path to the table + /// partition_spec: Dict mapping partition column name to value (e.g., {"region": "US"}) + /// ignore_if_not_exists: If True, don't raise error if partition does not exist + /// + /// Returns: + /// None + #[pyo3(signature = (table_path, partition_spec, ignore_if_not_exists=false))] + pub fn drop_partition<'py>( + &self, + py: Python<'py>, + table_path: &TablePath, + partition_spec: std::collections::HashMap, + ignore_if_not_exists: bool, + ) -> PyResult> { + let core_table_path = table_path.to_core(); + let admin = self.__admin.clone(); + let core_partition_spec = fcore::metadata::PartitionSpec::new(partition_spec); + + future_into_py(py, async move { + admin + .drop_partition(&core_table_path, &core_partition_spec, ignore_if_not_exists) + .await + .map_err(|e| FlussError::new_err(format!("Failed to drop partition: {e}")))?; + + Python::attach(|py| Ok(py.None())) + }) + } + /// Create a table with the given schema #[pyo3(signature = (table_path, table_descriptor, ignore_if_exists=None))] pub fn create_table<'py>( diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index f1f4ee6b..41f8de54 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -91,6 +91,8 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; // Register constants m.add("EARLIEST_OFFSET", fcore::client::EARLIEST_OFFSET)?; diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs index f39f9d44..d6b122d2 100644 --- a/bindings/python/src/metadata.rs +++ b/bindings/python/src/metadata.rs @@ -53,7 +53,7 @@ impl ChangeType { } fn __repr__(&self) -> String { - format!("ChangeType.{:?}", self) + format!("ChangeType.{self:?}") } } @@ -657,3 +657,105 @@ impl LakeSnapshot { } } } + +/// Descriptor for a Fluss database (comment and custom properties) +#[pyclass] +#[derive(Clone)] +pub struct DatabaseDescriptor { + __descriptor: fcore::metadata::DatabaseDescriptor, +} + +#[pymethods] +impl DatabaseDescriptor { + /// Create a new DatabaseDescriptor + #[new] + #[pyo3(signature = (comment=None, custom_properties=None))] + pub fn new( + comment: Option, + custom_properties: Option>, + ) -> PyResult { + let mut builder = fcore::metadata::DatabaseDescriptor::builder(); + if let Some(c) = comment { + builder = builder.comment(&c); + } + if let Some(props) = custom_properties { + builder = builder.custom_properties(props); + } + let __descriptor = builder.build(); + Ok(Self { __descriptor }) + } + + /// Get comment if set + #[getter] + pub fn comment(&self) -> Option { + self.__descriptor.comment().map(|s| s.to_string()) + } + + /// Get custom properties + pub fn get_custom_properties(&self) -> HashMap { + self.__descriptor.custom_properties().clone() + } + + fn __repr__(&self) -> String { + format!( + "DatabaseDescriptor(comment={:?}, custom_properties={:?})", + self.comment(), + self.get_custom_properties() + ) + } +} + +impl DatabaseDescriptor { + pub fn to_core(&self) -> &fcore::metadata::DatabaseDescriptor { + &self.__descriptor + } +} + +/// Information about a Fluss database +#[pyclass] +pub struct DatabaseInfo { + __info: fcore::metadata::DatabaseInfo, +} + +#[pymethods] +impl DatabaseInfo { + /// Get the database name + #[getter] + pub fn database_name(&self) -> String { + self.__info.database_name().to_string() + } + + /// Get the database descriptor + pub fn get_database_descriptor(&self) -> DatabaseDescriptor { + DatabaseDescriptor { + __descriptor: self.__info.database_descriptor().clone(), + } + } + + /// Get created time + #[getter] + pub fn created_time(&self) -> i64 { + self.__info.created_time() + } + + /// Get modified time + #[getter] + pub fn modified_time(&self) -> i64 { + self.__info.modified_time() + } + + fn __repr__(&self) -> String { + format!( + "DatabaseInfo(database_name='{}', created_time={}, modified_time={})", + self.database_name(), + self.created_time(), + self.modified_time() + ) + } +} + +impl DatabaseInfo { + pub fn from_core(info: fcore::metadata::DatabaseInfo) -> Self { + Self { __info: info } + } +} diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index d9265963..cb203dc6 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -315,7 +315,7 @@ fn resolve_projection_indices( let idx = columns .iter() .position(|c| c.name() == name) - .ok_or_else(|| FlussError::new_err(format!("Column '{}' not found", name)))?; + .ok_or_else(|| FlussError::new_err(format!("Column '{name}' not found")))?; indices.push(idx); } Ok(Some(indices)) @@ -796,9 +796,9 @@ pub fn python_pk_to_generic_row( let field: &fcore::metadata::DataField = &fields[*pk_idx]; let value = dict .get_item(pk_name)? - .ok_or_else(|| FlussError::new_err(format!("Missing PK field: {}", pk_name)))?; + .ok_or_else(|| FlussError::new_err(format!("Missing PK field: {pk_name}")))?; datums[*pk_idx] = python_value_to_datum(&value, field.data_type()) - .map_err(|e| FlussError::new_err(format!("PK field '{}': {}", pk_name, e)))?; + .map_err(|e| FlussError::new_err(format!("PK field '{pk_name}': {e}")))?; } }