Skip to content

Commit

Permalink
feat: add lazy loading of tables
Browse files Browse the repository at this point in the history
  • Loading branch information
PengLiVectra committed Dec 1, 2023
1 parent 8ca8d65 commit b6fba58
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 15 deletions.
7 changes: 7 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ class RawDeltaTable:
log_buffer_size: Optional[int],
) -> None: ...
@staticmethod
def load_lazy(
table_uri: str,
storage_options: Optional[Dict[str, str]],
without_files: bool,
log_buffer_size: Optional[int],
) -> RawDeltaTable: ...
@staticmethod
def get_table_uri_from_data_catalog(
data_catalog: str,
database_name: str,
Expand Down
14 changes: 13 additions & 1 deletion python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ def __init__(
storage_options: Optional[Dict[str, str]] = None,
without_files: bool = False,
log_buffer_size: Optional[int] = None,
lazy_load: bool = False,
):
"""
Create the Delta Table from a path with an optional version.
Expand All @@ -248,9 +249,18 @@ def __init__(
This can decrease latency if there are many files in the log since the last checkpoint,
but will also increase memory usage. Possible rate limits of the storage backend should
also be considered for optimal performance. Defaults to 4 * number of cpus.
lazy_load: when true the table metadata isn't loaded
"""
self._storage_options = storage_options
if lazy_load:
self._table = RawDeltaTable.load_lazy(
str(table_uri),
storage_options=storage_options,
without_files=without_files,
log_buffer_size=log_buffer_size,
)
self._metadata = None
return
self._table = RawDeltaTable(
str(table_uri),
version=version,
Expand Down Expand Up @@ -426,6 +436,8 @@ def metadata(self) -> Metadata:
Returns:
the current Metadata registered in the transaction log
"""
if not self._metadata:
self._metadata = Metadata(self._table)
return self._metadata

def protocol(self) -> ProtocolVersions:
Expand Down
60 changes: 46 additions & 14 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,28 @@ struct RawDeltaTableMetaData {
configuration: HashMap<String, Option<String>>,
}

#[inline]
fn build_table(
table_uri: &str,
storage_options: Option<HashMap<String, String>>,
without_files: bool,
log_buffer_size: Option<usize>,
) -> Result<DeltaTableBuilder, PythonError> {
let mut builder = deltalake::DeltaTableBuilder::from_uri(table_uri);
if let Some(storage_options) = storage_options {
builder = builder.with_storage_options(storage_options)
}
if without_files {
builder = builder.without_files()
}
if let Some(buf_size) = log_buffer_size {
builder = builder
.with_log_buffer_size(buf_size)
.map_err(PythonError::from)?;
}
Ok(builder)
}

#[pymethods]
impl RawDeltaTable {
#[new]
Expand All @@ -96,24 +118,34 @@ impl RawDeltaTable {
without_files: bool,
log_buffer_size: Option<usize>,
) -> PyResult<Self> {
let mut builder = deltalake::DeltaTableBuilder::from_uri(table_uri);
let options = storage_options.clone().unwrap_or_default();
if let Some(storage_options) = storage_options {
builder = builder.with_storage_options(storage_options)
}
let mut builder = build_table(table_uri, storage_options, without_files, log_buffer_size);
if let Some(version) = version {
builder = builder.with_version(version)
}
if without_files {
builder = builder.without_files()
}
if let Some(buf_size) = log_buffer_size {
builder = builder
.with_log_buffer_size(buf_size)
.map_err(PythonError::from)?;
builder = Ok(builder?.with_version(version))
}

let table = rt()?.block_on(builder.load()).map_err(PythonError::from)?;
let table = rt()?.block_on(builder?.load()).map_err(PythonError::from)?;
Ok(RawDeltaTable {
_table: table,
_config: FsConfig {
root_url: table_uri.into(),
options,
},
})
}

#[classmethod]
#[pyo3(signature = (table_uri, storage_options = None, without_files = false, log_buffer_size = None))]
fn load_lazy(
_cls: &PyType,
table_uri: &str,
storage_options: Option<HashMap<String, String>>,
without_files: bool,
log_buffer_size: Option<usize>,
) -> PyResult<Self> {
let options = storage_options.clone().unwrap_or_default();
let builder = build_table(table_uri, storage_options, without_files, log_buffer_size);
let table = builder?.build().map_err(PythonError::from)?;
Ok(RawDeltaTable {
_table: table,
_config: FsConfig {
Expand Down
19 changes: 19 additions & 0 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,25 @@ def test_read_simple_table_using_options_to_dict():
assert dt.to_pyarrow_dataset().to_table().to_pydict() == {"value": [1, 2, 3]}


def test_simple_table_lazy_loading():
table_path = "../crates/deltalake-core/tests/data/simple_table"
dt = DeltaTable(table_path, lazy_load=True)
dt.load_version(2)
assert dt.version() == 2


def test_simple_table_lazy_loading_with_options():
table_path = "../crates/deltalake-core/tests/data/simple_table"
dt = DeltaTable(
table_path,
storage_options={},
without_files=False,
log_buffer_size=1,
lazy_load=True,
)
assert isinstance(dt, DeltaTable)


def test_load_with_datetime():
log_dir = "../crates/deltalake-core/tests/data/simple_table/_delta_log"
log_mtime_pair = [
Expand Down

0 comments on commit b6fba58

Please sign in to comment.