Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bindings/cpp/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ fn get_decimal_type(idx: usize, schema: Option<&fcore::metadata::Schema>) -> Res
.ok_or_else(|| anyhow!("Schema not available for decimal column {idx}"))?;
match col.data_type() {
fcore::metadata::DataType::Decimal(dt) => Ok((dt.precision(), dt.scale())),
other => Err(anyhow!("Column {idx} is {:?}, not Decimal", other)),
other => Err(anyhow!("Column {idx} is {other:?}, not Decimal")),
}
}

Expand Down
66 changes: 66 additions & 0 deletions bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,45 @@ class FlussConnection:
def __repr__(self) -> str: ...

class FlussAdmin:
async def create_database(
self,
database_name: str,
ignore_if_exists: bool = False,
database_descriptor: Optional["DatabaseDescriptor"] = None,
) -> None:
"""Create a database."""
...
async def drop_database(
self,
database_name: str,
ignore_if_not_exists: bool = False,
cascade: bool = True,
) -> None:
"""Drop a database."""
...
async def list_databases(self) -> List[str]:
"""List all databases."""
...
async def database_exists(self, database_name: str) -> bool:
"""Check if a database exists."""
...
async def get_database_info(self, database_name: str) -> "DatabaseInfo":
"""Get database information."""
...
async def list_tables(self, database_name: str) -> List[str]:
"""List all tables in a database."""
...
async def table_exists(self, table_path: TablePath) -> bool:
"""Check if a table exists."""
...
async def drop_partition(
self,
table_path: TablePath,
partition_spec: Dict[str, str],
ignore_if_not_exists: bool = False,
) -> None:
"""Drop a partition from a partitioned table."""
...
async def create_table(
self,
table_path: TablePath,
Expand Down Expand Up @@ -203,6 +242,33 @@ class FlussAdmin:
...
def __repr__(self) -> str: ...


class DatabaseDescriptor:
"""Descriptor for a Fluss database (comment and custom properties)."""

def __init__(
self,
comment: Optional[str] = None,
custom_properties: Optional[Dict[str, str]] = None,
) -> None: ...
@property
def comment(self) -> Optional[str]: ...
def get_custom_properties(self) -> Dict[str, str]: ...
def __repr__(self) -> str: ...


class DatabaseInfo:
"""Information about a Fluss database."""

@property
def database_name(self) -> str: ...
def get_database_descriptor(self) -> DatabaseDescriptor: ...
@property
def created_time(self) -> int: ...
@property
def modified_time(self) -> int: ...
def __repr__(self) -> str: ...

class TableScan:
"""Builder for creating log scanners with flexible configuration.

Expand Down
227 changes: 223 additions & 4 deletions bindings/python/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use crate::*;
use fcore::rpc::message::OffsetSpec;
use pyo3::conversion::IntoPyObject;
use pyo3_async_runtimes::tokio::future_into_py;
use std::sync::Arc;

Expand All @@ -38,8 +39,7 @@ fn parse_offset_spec(offset_type: &str, timestamp: Option<i64>) -> PyResult<Offs
Ok(OffsetSpec::Timestamp(ts))
}
_ => Err(FlussError::new_err(format!(
"Invalid offset_type: '{}'. Must be 'earliest', 'latest', or 'timestamp'",
offset_type
"Invalid offset_type: '{offset_type}'. Must be 'earliest', 'latest', or 'timestamp'"
))),
}
}
Expand All @@ -49,8 +49,7 @@ fn validate_bucket_ids(bucket_ids: &[i32]) -> PyResult<()> {
for &bucket_id in bucket_ids {
if bucket_id < 0 {
return Err(FlussError::new_err(format!(
"Invalid bucket_id: {}. Bucket IDs must be non-negative",
bucket_id
"Invalid bucket_id: {bucket_id}. Bucket IDs must be non-negative"
)));
}
}
Expand All @@ -59,6 +58,226 @@ fn validate_bucket_ids(bucket_ids: &[i32]) -> PyResult<()> {

#[pymethods]
impl FlussAdmin {
/// Create a database.
///
/// Args:
/// database_name: Name of the database
/// ignore_if_exists: If True, don't raise error if database already exists
/// database_descriptor: Optional descriptor (comment, custom_properties)
///
/// Returns:
/// None
#[pyo3(signature = (database_name, ignore_if_exists=false, database_descriptor=None))]
pub fn create_database<'py>(
&self,
py: Python<'py>,
database_name: &str,
ignore_if_exists: bool,
database_descriptor: Option<&DatabaseDescriptor>,
) -> PyResult<Bound<'py, PyAny>> {
let admin = self.__admin.clone();
let name = database_name.to_string();
let descriptor = database_descriptor.map(|d| d.to_core().clone());

future_into_py(py, async move {
admin
.create_database(&name, ignore_if_exists, descriptor.as_ref())
.await
.map_err(|e| FlussError::new_err(format!("Failed to create database: {e}")))?;

Python::attach(|py| Ok(py.None()))
})
}

/// Drop a database.
///
/// Args:
/// database_name: Name of the database
/// ignore_if_not_exists: If True, don't raise error if database does not exist
/// cascade: If True, drop tables in the database first
///
/// Returns:
/// None
#[pyo3(signature = (database_name, ignore_if_not_exists=false, cascade=true))]
pub fn drop_database<'py>(
&self,
py: Python<'py>,
database_name: &str,
ignore_if_not_exists: bool,
cascade: bool,
) -> PyResult<Bound<'py, PyAny>> {
let admin = self.__admin.clone();
let name = database_name.to_string();

future_into_py(py, async move {
admin
.drop_database(&name, ignore_if_not_exists, cascade)
.await
.map_err(|e| FlussError::new_err(format!("Failed to drop database: {e}")))?;

Python::attach(|py| Ok(py.None()))
})
}

/// List all databases.
///
/// Returns:
/// List[str]: Names of all databases
pub fn list_databases<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let admin = self.__admin.clone();

future_into_py(py, async move {
let names = admin
.list_databases()
.await
.map_err(|e| FlussError::new_err(format!("Failed to list databases: {e}")))?;

Python::attach(|py| {
let py_list = pyo3::types::PyList::empty(py);
for name in names {
py_list.append(name)?;
}
Ok(py_list.unbind())
})
})
}

/// Check if a database exists.
///
/// Args:
/// database_name: Name of the database
///
/// Returns:
/// bool: True if the database exists
pub fn database_exists<'py>(
&self,
py: Python<'py>,
database_name: &str,
) -> PyResult<Bound<'py, PyAny>> {
let admin = self.__admin.clone();
let name = database_name.to_string();

future_into_py(py, async move {
let exists = admin.database_exists(&name).await.map_err(|e| {
FlussError::new_err(format!("Failed to check database exists: {e}"))
})?;

Python::attach(|py| Ok(exists.into_pyobject(py)?.to_owned().into_any().unbind()))
})
}

/// Get database information.
///
/// Args:
/// database_name: Name of the database
///
/// Returns:
/// DatabaseInfo: Database metadata
pub fn get_database_info<'py>(
&self,
py: Python<'py>,
database_name: &str,
) -> PyResult<Bound<'py, PyAny>> {
let admin = self.__admin.clone();
let name = database_name.to_string();

future_into_py(py, async move {
let info = admin
.get_database_info(&name)
.await
.map_err(|e| FlussError::new_err(format!("Failed to get database info: {e}")))?;

Python::attach(|py| Py::new(py, DatabaseInfo::from_core(info)))
})
}

/// List all tables in a database.
///
/// Args:
/// database_name: Name of the database
///
/// Returns:
/// List[str]: Names of all tables in the database
pub fn list_tables<'py>(
&self,
py: Python<'py>,
database_name: &str,
) -> PyResult<Bound<'py, PyAny>> {
let admin = self.__admin.clone();
let name = database_name.to_string();

future_into_py(py, async move {
let names = admin
.list_tables(&name)
.await
.map_err(|e| FlussError::new_err(format!("Failed to list tables: {e}")))?;

Python::attach(|py| {
let py_list = pyo3::types::PyList::empty(py);
for name in names {
py_list.append(name)?;
}
Ok(py_list.unbind())
})
})
}

/// Check if a table exists.
///
/// Args:
/// table_path: Path to the table (database, table)
///
/// Returns:
/// bool: True if the table exists
pub fn table_exists<'py>(
&self,
py: Python<'py>,
table_path: &TablePath,
) -> PyResult<Bound<'py, PyAny>> {
let core_table_path = table_path.to_core();
let admin = self.__admin.clone();

future_into_py(py, async move {
let exists = admin
.table_exists(&core_table_path)
.await
.map_err(|e| FlussError::new_err(format!("Failed to check table exists: {e}")))?;

Python::attach(|py| Ok(exists.into_pyobject(py)?.to_owned().into_any().unbind()))
})
}

/// Drop a partition from a partitioned table.
///
/// Args:
/// table_path: Path to the table
/// partition_spec: Dict mapping partition column name to value (e.g., {"region": "US"})
/// ignore_if_not_exists: If True, don't raise error if partition does not exist
///
/// Returns:
/// None
#[pyo3(signature = (table_path, partition_spec, ignore_if_not_exists=false))]
pub fn drop_partition<'py>(
&self,
py: Python<'py>,
table_path: &TablePath,
partition_spec: std::collections::HashMap<String, String>,
ignore_if_not_exists: bool,
) -> PyResult<Bound<'py, PyAny>> {
let core_table_path = table_path.to_core();
let admin = self.__admin.clone();
let core_partition_spec = fcore::metadata::PartitionSpec::new(partition_spec);

future_into_py(py, async move {
admin
.drop_partition(&core_table_path, &core_partition_spec, ignore_if_not_exists)
.await
.map_err(|e| FlussError::new_err(format!("Failed to drop partition: {e}")))?;

Python::attach(|py| Ok(py.None()))
})
}

/// Create a table with the given schema
#[pyo3(signature = (table_path, table_descriptor, ignore_if_exists=None))]
pub fn create_table<'py>(
Expand Down
2 changes: 2 additions & 0 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PartitionInfo>()?;
m.add_class::<OffsetType>()?;
m.add_class::<WriteResultHandle>()?;
m.add_class::<DatabaseDescriptor>()?;
m.add_class::<DatabaseInfo>()?;

// Register constants
m.add("EARLIEST_OFFSET", fcore::client::EARLIEST_OFFSET)?;
Expand Down
Loading
Loading