Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 34 additions & 30 deletions lance_ray/compaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,13 @@
from lance.optimize import Compaction, CompactionOptions, CompactionTask
from ray.util.multiprocessing import Pool

logger = logging.getLogger(__name__)


def _create_storage_options_provider(
namespace_impl: Optional[str],
namespace_properties: Optional[dict[str, str]],
table_id: Optional[list[str]],
):
"""Create a LanceNamespaceStorageOptionsProvider if namespace parameters are provided."""
if namespace_impl is None or namespace_properties is None or table_id is None:
return None

import lance_namespace as ln
from lance import LanceNamespaceStorageOptionsProvider
from .utils import (
create_storage_options_provider,
get_or_create_namespace,
validate_uri_or_namespace,
)

namespace = ln.connect(namespace_impl, namespace_properties)
return LanceNamespaceStorageOptionsProvider(namespace=namespace, table_id=table_id)
logger = logging.getLogger(__name__)


def _handle_compaction_task(
Expand Down Expand Up @@ -50,7 +40,7 @@ def func(task: CompactionTask) -> dict[str, Any]:
"""
try:
# Create storage options provider in worker for credentials refresh
storage_options_provider = _create_storage_options_provider(
storage_options_provider = create_storage_options_provider(
namespace_impl, namespace_properties, table_id
)

Expand Down Expand Up @@ -88,14 +78,14 @@ def func(task: CompactionTask) -> dict[str, Any]:


def compact_files(
uri: str,
uri: Optional[str] = None,
*,
table_id: Optional[list[str]] = None,
compaction_options: Optional[CompactionOptions] = None,
num_workers: int = 4,
storage_options: Optional[dict[str, str]] = None,
namespace_impl: Optional[str] = None,
namespace_properties: Optional[dict[str, str]] = None,
table_id: Optional[list[str]] = None,
ray_remote_args: Optional[dict[str, Any]] = None,
) -> Optional[CompactionMetrics]:
"""
Expand All @@ -106,18 +96,18 @@ def compact_files(
committed as a single compaction operation.

Args:
uri: The URI of the Lance dataset to compact.
uri: The URI of the Lance dataset to compact. Either uri OR
(namespace_impl + table_id) must be provided.
table_id: The table identifier as a list of strings. Must be provided
together with namespace_impl.
compaction_options: Options for the compaction operation.
num_workers: Number of Ray workers to use (default: 4).
storage_options: Storage options for the dataset.
namespace_impl: The namespace implementation type (e.g., "rest", "dir").
Used together with namespace_properties and table_id for credentials
vending in distributed workers.
Used together with table_id for resolving the dataset location and
credentials vending in distributed workers.
namespace_properties: Properties for connecting to the namespace.
Used together with namespace_impl and table_id for credentials vending.
table_id: The table identifier as a list of strings.
Used together with namespace_impl and namespace_properties for
credentials vending.
Used together with namespace_impl and table_id.
ray_remote_args: Options for Ray tasks (e.g., num_cpus, resources).

Returns:
Expand All @@ -127,17 +117,31 @@ def compact_files(
ValueError: If input parameters are invalid.
RuntimeError: If compaction fails.
"""
storage_options = storage_options or {}
validate_uri_or_namespace(uri, namespace_impl, table_id)

merged_storage_options: dict[str, Any] = {}
if storage_options:
merged_storage_options.update(storage_options)

# Resolve URI and get storage options from namespace if provided
namespace = get_or_create_namespace(namespace_impl, namespace_properties)
if namespace is not None and table_id is not None:
from lance_namespace import DescribeTableRequest

describe_response = namespace.describe_table(DescribeTableRequest(id=table_id))
uri = describe_response.location
if describe_response.storage_options:
merged_storage_options.update(describe_response.storage_options)

# Create storage options provider for local operations
storage_options_provider = _create_storage_options_provider(
storage_options_provider = create_storage_options_provider(
namespace_impl, namespace_properties, table_id
)

# Load dataset
dataset = lance.LanceDataset(
uri,
storage_options=storage_options,
storage_options=merged_storage_options,
storage_options_provider=storage_options_provider,
)

Expand All @@ -163,7 +167,7 @@ def compact_files(
# Create the task handler function
task_handler = _handle_compaction_task(
dataset_uri=uri,
storage_options=storage_options,
storage_options=merged_storage_options,
namespace_impl=namespace_impl,
namespace_properties=namespace_properties,
table_id=table_id,
Expand Down
52 changes: 36 additions & 16 deletions lance_ray/datasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
from ray.data.datasource.datasink import Datasink

from .fragment import write_fragment
from .utils import create_storage_options_provider, get_or_create_namespace

if TYPE_CHECKING:
from lance_namespace import LanceNamespace

import pandas as pd


def _declare_table_with_fallback(
namespace: "LanceNamespace", table_id: list[str]
namespace, table_id: list[str]
) -> tuple[str, Optional[dict[str, str]]]:
"""Declare a table using declare_table, falling back to create_empty_table.

Expand Down Expand Up @@ -51,12 +50,13 @@ class _BaseLanceDatasink(Datasink):
def __init__(
self,
uri: Optional[str] = None,
namespace: Optional["LanceNamespace"] = None,
table_id: Optional[list[str]] = None,
*args: Any,
schema: Optional[pa.Schema] = None,
mode: Literal["create", "append", "overwrite"] = "create",
storage_options: Optional[dict[str, Any]] = None,
namespace_impl: Optional[str] = None,
namespace_properties: Optional[dict[str, str]] = None,
**kwargs: Any,
):
super().__init__(*args, **kwargs)
Expand All @@ -65,8 +65,12 @@ def __init__(
if storage_options:
merged_storage_options.update(storage_options)

# Handle namespace-based table writing
self.storage_options_provider = None
# Store namespace_impl and namespace_properties for worker reconstruction
self._namespace_impl = namespace_impl
self._namespace_properties = namespace_properties

# Construct namespace from impl and properties (cached per worker)
namespace = get_or_create_namespace(namespace_impl, namespace_properties)

if namespace is not None and table_id is not None:
self.table_id = table_id
Expand Down Expand Up @@ -107,22 +111,27 @@ def __init__(
merged_storage_options.update(ns_storage_options)
has_namespace_storage_options = True

# Create storage options provider for credentials vending
if has_namespace_storage_options:
from lance import LanceNamespaceStorageOptionsProvider

self.storage_options_provider = LanceNamespaceStorageOptionsProvider(
namespace=namespace, table_id=table_id
)
# Mark that we have namespace storage options for provider creation
self._has_namespace_storage_options = has_namespace_storage_options
else:
self.table_id = None
self.uri = uri
self._has_namespace_storage_options = False

self.schema = schema
self.mode = mode
self.read_version: Optional[int] = None
self.storage_options = merged_storage_options

@property
def storage_options_provider(self):
"""Lazily create storage options provider using namespace_impl/properties."""
if not self._has_namespace_storage_options:
return None
return create_storage_options_provider(
self._namespace_impl, self._namespace_properties, self.table_id
)

@property
def supports_distributed_writes(self) -> bool:
return True
Expand Down Expand Up @@ -221,6 +230,13 @@ class LanceDatasink(_BaseLanceDatasink):
for more details.
storage_options : Dict[str, Any], optional
The storage options for the writer. Default is None.
namespace_impl : str, optional
The namespace implementation type (e.g., "rest", "dir").
Used together with namespace_properties and table_id for credentials
vending in distributed workers.
namespace_properties : Dict[str, str], optional
Properties for connecting to the namespace.
Used together with namespace_impl and table_id for credentials vending.
"""

NAME = "Lance"
Expand All @@ -231,7 +247,6 @@ class LanceDatasink(_BaseLanceDatasink):
def __init__(
self,
uri: Optional[str] = None,
namespace: Optional["LanceNamespace"] = None,
table_id: Optional[list[str]] = None,
*args: Any,
schema: Optional[pa.Schema] = None,
Expand All @@ -240,16 +255,19 @@ def __init__(
max_rows_per_file: int = 64 * 1024 * 1024,
data_storage_version: Optional[str] = None,
storage_options: Optional[dict[str, Any]] = None,
namespace_impl: Optional[str] = None,
namespace_properties: Optional[dict[str, str]] = None,
**kwargs: Any,
):
super().__init__(
uri,
namespace,
table_id,
*args,
schema=schema,
mode=mode,
storage_options=storage_options,
namespace_impl=namespace_impl,
namespace_properties=namespace_properties,
**kwargs,
)

Expand Down Expand Up @@ -296,7 +314,9 @@ def write(
max_rows_per_file=self.max_rows_per_file,
data_storage_version=self.data_storage_version,
storage_options=self.storage_options,
storage_options_provider=self.storage_options_provider,
namespace_impl=self._namespace_impl,
namespace_properties=self._namespace_properties,
table_id=self.table_id,
retry_params=self._retry_params,
)
return [
Expand Down
Loading