diff --git a/lance_ray/compaction.py b/lance_ray/compaction.py index da49286..89c7feb 100644 --- a/lance_ray/compaction.py +++ b/lance_ray/compaction.py @@ -6,23 +6,13 @@ from lance.optimize import Compaction, CompactionOptions, CompactionTask from ray.util.multiprocessing import Pool -logger = logging.getLogger(__name__) - - -def _create_storage_options_provider( - namespace_impl: Optional[str], - namespace_properties: Optional[dict[str, str]], - table_id: Optional[list[str]], -): - """Create a LanceNamespaceStorageOptionsProvider if namespace parameters are provided.""" - if namespace_impl is None or namespace_properties is None or table_id is None: - return None - - import lance_namespace as ln - from lance import LanceNamespaceStorageOptionsProvider +from .utils import ( + create_storage_options_provider, + get_or_create_namespace, + validate_uri_or_namespace, +) - namespace = ln.connect(namespace_impl, namespace_properties) - return LanceNamespaceStorageOptionsProvider(namespace=namespace, table_id=table_id) +logger = logging.getLogger(__name__) def _handle_compaction_task( @@ -50,7 +40,7 @@ def func(task: CompactionTask) -> dict[str, Any]: """ try: # Create storage options provider in worker for credentials refresh - storage_options_provider = _create_storage_options_provider( + storage_options_provider = create_storage_options_provider( namespace_impl, namespace_properties, table_id ) @@ -88,14 +78,14 @@ def func(task: CompactionTask) -> dict[str, Any]: def compact_files( - uri: str, + uri: Optional[str] = None, *, + table_id: Optional[list[str]] = None, compaction_options: Optional[CompactionOptions] = None, num_workers: int = 4, storage_options: Optional[dict[str, str]] = None, namespace_impl: Optional[str] = None, namespace_properties: Optional[dict[str, str]] = None, - table_id: Optional[list[str]] = None, ray_remote_args: Optional[dict[str, Any]] = None, ) -> Optional[CompactionMetrics]: """ @@ -106,18 +96,18 @@ def compact_files( committed as a single compaction operation. Args: - uri: The URI of the Lance dataset to compact. + uri: The URI of the Lance dataset to compact. Either uri OR + (namespace_impl + table_id) must be provided. + table_id: The table identifier as a list of strings. Must be provided + together with namespace_impl. compaction_options: Options for the compaction operation. num_workers: Number of Ray workers to use (default: 4). storage_options: Storage options for the dataset. namespace_impl: The namespace implementation type (e.g., "rest", "dir"). - Used together with namespace_properties and table_id for credentials - vending in distributed workers. + Used together with table_id for resolving the dataset location and + credentials vending in distributed workers. namespace_properties: Properties for connecting to the namespace. - Used together with namespace_impl and table_id for credentials vending. - table_id: The table identifier as a list of strings. - Used together with namespace_impl and namespace_properties for - credentials vending. + Used together with namespace_impl and table_id. ray_remote_args: Options for Ray tasks (e.g., num_cpus, resources). Returns: @@ -127,17 +117,31 @@ def compact_files( ValueError: If input parameters are invalid. RuntimeError: If compaction fails. """ - storage_options = storage_options or {} + validate_uri_or_namespace(uri, namespace_impl, table_id) + + merged_storage_options: dict[str, Any] = {} + if storage_options: + merged_storage_options.update(storage_options) + + # Resolve URI and get storage options from namespace if provided + namespace = get_or_create_namespace(namespace_impl, namespace_properties) + if namespace is not None and table_id is not None: + from lance_namespace import DescribeTableRequest + + describe_response = namespace.describe_table(DescribeTableRequest(id=table_id)) + uri = describe_response.location + if describe_response.storage_options: + merged_storage_options.update(describe_response.storage_options) # Create storage options provider for local operations - storage_options_provider = _create_storage_options_provider( + storage_options_provider = create_storage_options_provider( namespace_impl, namespace_properties, table_id ) # Load dataset dataset = lance.LanceDataset( uri, - storage_options=storage_options, + storage_options=merged_storage_options, storage_options_provider=storage_options_provider, ) @@ -163,7 +167,7 @@ def compact_files( # Create the task handler function task_handler = _handle_compaction_task( dataset_uri=uri, - storage_options=storage_options, + storage_options=merged_storage_options, namespace_impl=namespace_impl, namespace_properties=namespace_properties, table_id=table_id, diff --git a/lance_ray/datasink.py b/lance_ray/datasink.py index 6c757b1..ebe2ba7 100644 --- a/lance_ray/datasink.py +++ b/lance_ray/datasink.py @@ -15,15 +15,14 @@ from ray.data.datasource.datasink import Datasink from .fragment import write_fragment +from .utils import create_storage_options_provider, get_or_create_namespace if TYPE_CHECKING: - from lance_namespace import LanceNamespace - import pandas as pd def _declare_table_with_fallback( - namespace: "LanceNamespace", table_id: list[str] + namespace, table_id: list[str] ) -> tuple[str, Optional[dict[str, str]]]: """Declare a table using declare_table, falling back to create_empty_table. @@ -51,12 +50,13 @@ class _BaseLanceDatasink(Datasink): def __init__( self, uri: Optional[str] = None, - namespace: Optional["LanceNamespace"] = None, table_id: Optional[list[str]] = None, *args: Any, schema: Optional[pa.Schema] = None, mode: Literal["create", "append", "overwrite"] = "create", storage_options: Optional[dict[str, Any]] = None, + namespace_impl: Optional[str] = None, + namespace_properties: Optional[dict[str, str]] = None, **kwargs: Any, ): super().__init__(*args, **kwargs) @@ -65,8 +65,12 @@ def __init__( if storage_options: merged_storage_options.update(storage_options) - # Handle namespace-based table writing - self.storage_options_provider = None + # Store namespace_impl and namespace_properties for worker reconstruction + self._namespace_impl = namespace_impl + self._namespace_properties = namespace_properties + + # Construct namespace from impl and properties (cached per worker) + namespace = get_or_create_namespace(namespace_impl, namespace_properties) if namespace is not None and table_id is not None: self.table_id = table_id @@ -107,22 +111,27 @@ def __init__( merged_storage_options.update(ns_storage_options) has_namespace_storage_options = True - # Create storage options provider for credentials vending - if has_namespace_storage_options: - from lance import LanceNamespaceStorageOptionsProvider - - self.storage_options_provider = LanceNamespaceStorageOptionsProvider( - namespace=namespace, table_id=table_id - ) + # Mark that we have namespace storage options for provider creation + self._has_namespace_storage_options = has_namespace_storage_options else: self.table_id = None self.uri = uri + self._has_namespace_storage_options = False self.schema = schema self.mode = mode self.read_version: Optional[int] = None self.storage_options = merged_storage_options + @property + def storage_options_provider(self): + """Lazily create storage options provider using namespace_impl/properties.""" + if not self._has_namespace_storage_options: + return None + return create_storage_options_provider( + self._namespace_impl, self._namespace_properties, self.table_id + ) + @property def supports_distributed_writes(self) -> bool: return True @@ -221,6 +230,13 @@ class LanceDatasink(_BaseLanceDatasink): for more details. storage_options : Dict[str, Any], optional The storage options for the writer. Default is None. + namespace_impl : str, optional + The namespace implementation type (e.g., "rest", "dir"). + Used together with namespace_properties and table_id for credentials + vending in distributed workers. + namespace_properties : Dict[str, str], optional + Properties for connecting to the namespace. + Used together with namespace_impl and table_id for credentials vending. """ NAME = "Lance" @@ -231,7 +247,6 @@ class LanceDatasink(_BaseLanceDatasink): def __init__( self, uri: Optional[str] = None, - namespace: Optional["LanceNamespace"] = None, table_id: Optional[list[str]] = None, *args: Any, schema: Optional[pa.Schema] = None, @@ -240,16 +255,19 @@ def __init__( max_rows_per_file: int = 64 * 1024 * 1024, data_storage_version: Optional[str] = None, storage_options: Optional[dict[str, Any]] = None, + namespace_impl: Optional[str] = None, + namespace_properties: Optional[dict[str, str]] = None, **kwargs: Any, ): super().__init__( uri, - namespace, table_id, *args, schema=schema, mode=mode, storage_options=storage_options, + namespace_impl=namespace_impl, + namespace_properties=namespace_properties, **kwargs, ) @@ -296,7 +314,9 @@ def write( max_rows_per_file=self.max_rows_per_file, data_storage_version=self.data_storage_version, storage_options=self.storage_options, - storage_options_provider=self.storage_options_provider, + namespace_impl=self._namespace_impl, + namespace_properties=self._namespace_properties, + table_id=self.table_id, retry_params=self._retry_params, ) return [ diff --git a/lance_ray/datasource.py b/lance_ray/datasource.py index a2c69cf..c323f97 100644 --- a/lance_ray/datasource.py +++ b/lance_ray/datasource.py @@ -9,11 +9,10 @@ from ray.data.datasource import Datasource from ray.data.datasource.datasource import ReadTask -from .utils import array_split +from .utils import array_split, create_storage_options_provider, get_or_create_namespace if TYPE_CHECKING: import lance - from lance_namespace import LanceNamespace class LanceDatasource(Datasource): @@ -29,7 +28,6 @@ class LanceDatasource(Datasource): def __init__( self, uri: Optional[str] = None, - namespace: Optional["LanceNamespace"] = None, table_id: Optional[list[str]] = None, columns: Optional[list[str]] = None, filter: Optional[str] = None, @@ -37,6 +35,8 @@ def __init__( scanner_options: Optional[dict[str, Any]] = None, dataset_options: Optional[dict[str, Any]] = None, fragment_ids: Optional[list[int]] = None, + namespace_impl: Optional[str] = None, + namespace_properties: Optional[dict[str, str]] = None, ): _check_import(self, module="lance", package="pylance") @@ -48,10 +48,17 @@ def __init__( self._scanner_options["filter"] = filter self._uri = uri - self._namespace = namespace self._table_id = table_id self._storage_options = storage_options + # Store namespace_impl and namespace_properties for worker reconstruction. + # Workers will use these to reconstruct the namespace and storage options provider. + self._namespace_impl = namespace_impl + self._namespace_properties = namespace_properties + + # Construct namespace from impl and properties (cached per worker) + self._namespace = get_or_create_namespace(namespace_impl, namespace_properties) + match = [] match.extend(self.READ_FRAGMENTS_ERRORS_TO_RETRY) match.extend(DataContext.get_current().retried_io_errors) @@ -76,6 +83,9 @@ def lance_dataset(self) -> "lance.LanceDataset": dataset_options["namespace"] = self._namespace dataset_options["table_id"] = self._table_id dataset_options["storage_options"] = self._storage_options + # Note: lance.dataset() doesn't accept storage_options_provider. + # When namespace is provided, it handles credential refresh internally. + # For workers, we pass namespace_impl/properties to reconstruct the provider. self._lance_ds = lance.dataset(**dataset_options) return self._lance_ds @@ -95,6 +105,18 @@ def get_read_tasks(self, parallelism: int, **kwargs) -> list[ReadTask]: read_tasks = [] + # Extract dataset components for worker reconstruction. + # We pass namespace_impl/properties/table_id instead of the provider object + # because namespace objects are not serializable. Workers will reconstruct + # the namespace and provider using these serializable parameters. + dataset_uri = self.lance_dataset.uri + dataset_version = self.lance_dataset.version + dataset_storage_options = self._lance_ds._storage_options + serialized_manifest = self._lance_ds._ds.serialized_manifest() + namespace_impl = self._namespace_impl + namespace_properties = self._namespace_properties + table_id = self._table_id + for fragments in array_split(self.fragments, parallelism): if len(fragments) == 0: continue @@ -134,10 +156,25 @@ def get_read_tasks(self, parallelism: int, **kwargs) -> list[ReadTask]: read_task = ReadTask( lambda fids=fragment_ids, - lance_ds=self.lance_dataset, + uri=dataset_uri, + version=dataset_version, + storage_options=dataset_storage_options, + manifest=serialized_manifest, + ns_impl=namespace_impl, + ns_props=namespace_properties, + tbl_id=table_id, scanner_options=self._scanner_options, retry_params=self._retry_params: _read_fragments_with_retry( - fids, lance_ds, scanner_options, retry_params + fids, + uri, + version, + storage_options, + manifest, + ns_impl, + ns_props, + tbl_id, + scanner_options, + retry_params, ), metadata, ) @@ -160,10 +197,31 @@ def estimate_inmemory_data_size(self) -> Optional[int]: def _read_fragments_with_retry( fragment_ids: list[int], - lance_ds: "lance.LanceDataset", + uri: str, + version: int, + storage_options: Optional[dict[str, str]], + manifest: bytes, + namespace_impl: Optional[str], + namespace_properties: Optional[dict[str, str]], + table_id: Optional[list[str]], scanner_options: dict[str, Any], retry_params: dict[str, Any], ) -> Iterator[pa.Table]: + # Reconstruct storage options provider on worker for credential refresh + storage_options_provider = create_storage_options_provider( + namespace_impl, namespace_properties, table_id + ) + + import lance + + lance_ds = lance.LanceDataset( + uri, + version=version, + storage_options=storage_options, + serialized_manifest=manifest, + storage_options_provider=storage_options_provider, + ) + return call_with_retry( lambda: _read_fragments(fragment_ids, lance_ds, scanner_options), **retry_params, diff --git a/lance_ray/fragment.py b/lance_ray/fragment.py index 8ce47f3..5f8432b 100644 --- a/lance_ray/fragment.py +++ b/lance_ray/fragment.py @@ -26,6 +26,7 @@ ] from .pandas import pd_to_arrow +from .utils import create_storage_options_provider def write_fragment( @@ -38,7 +39,9 @@ def write_fragment( max_rows_per_group: int = 1024, # Only useful for v1 writer. data_storage_version: Optional[str] = None, storage_options: Optional[dict[str, Any]] = None, - storage_options_provider=None, + namespace_impl: Optional[str] = None, + namespace_properties: Optional[dict[str, str]] = None, + table_id: Optional[list[str]] = None, retry_params: Optional[dict[str, Any]] = None, ) -> list[tuple["FragmentMetadata", pa.Schema]]: from lance.dependencies import _PANDAS_AVAILABLE @@ -80,6 +83,11 @@ def record_batch_converter(): "max_backoff_s": 0, } + # Create storage options provider from namespace parameters + storage_options_provider = create_storage_options_provider( + namespace_impl, namespace_properties, table_id + ) + fragments = call_with_retry( lambda: write_fragments( reader, @@ -134,6 +142,17 @@ class LanceFragmentWriter: `data_storage_version` parameter instead. storage_options : Dict[str, Any], optional The storage options for the writer. Default is None. + namespace_impl : str, optional + The namespace implementation type (e.g., "rest", "dir"). + Used together with namespace_properties and table_id for credentials + vending in distributed workers. + namespace_properties : Dict[str, str], optional + Properties for connecting to the namespace. + Used together with namespace_impl and table_id for credentials vending. + table_id : List[str], optional + The table identifier as a list of strings. + Used together with namespace_impl and namespace_properties for + credentials vending. retry_params : Dict[str, Any], optional Retry parameters for write operations. Default is None. If provided, should contain keys like 'description', 'match', @@ -153,7 +172,9 @@ def __init__( data_storage_version: Optional[str] = None, use_legacy_format: Optional[bool] = False, storage_options: Optional[dict[str, Any]] = None, - storage_options_provider=None, + namespace_impl: Optional[str] = None, + namespace_properties: Optional[dict[str, str]] = None, + table_id: Optional[list[str]] = None, retry_params: Optional[dict[str, Any]] = None, ): if use_legacy_format is not None: @@ -175,7 +196,9 @@ def __init__( self.max_bytes_per_file = max_bytes_per_file self.data_storage_version = data_storage_version self.storage_options = storage_options - self.storage_options_provider = storage_options_provider + self.namespace_impl = namespace_impl + self.namespace_properties = namespace_properties + self.table_id = table_id self.retry_params = retry_params def __call__(self, batch: Union[pa.Table, "pd.DataFrame", dict]) -> pa.Table: @@ -199,7 +222,9 @@ def __call__(self, batch: Union[pa.Table, "pd.DataFrame", dict]) -> pa.Table: max_bytes_per_file=self.max_bytes_per_file, data_storage_version=self.data_storage_version, storage_options=self.storage_options, - storage_options_provider=self.storage_options_provider, + namespace_impl=self.namespace_impl, + namespace_properties=self.namespace_properties, + table_id=self.table_id, retry_params=self.retry_params, ) return pa.Table.from_pydict( diff --git a/lance_ray/index.py b/lance_ray/index.py index cb2aeac..8fab46d 100755 --- a/lance_ray/index.py +++ b/lance_ray/index.py @@ -11,23 +11,13 @@ from packaging import version from ray.util.multiprocessing import Pool -logger = logging.getLogger(__name__) - - -def _create_storage_options_provider( - namespace_impl: Optional[str], - namespace_properties: Optional[dict[str, str]], - table_id: Optional[list[str]], -): - """Create a LanceNamespaceStorageOptionsProvider if namespace parameters are provided.""" - if namespace_impl is None or namespace_properties is None or table_id is None: - return None - - import lance_namespace as ln - from lance import LanceNamespaceStorageOptionsProvider +from .utils import ( + create_storage_options_provider, + get_or_create_namespace, + validate_uri_or_namespace, +) - namespace = ln.connect(namespace_impl, namespace_properties) - return LanceNamespaceStorageOptionsProvider(namespace=namespace, table_id=table_id) +logger = logging.getLogger(__name__) def _distribute_fragments_balanced( @@ -160,7 +150,7 @@ def func(fragment_ids: list[int]) -> dict[str, Any]: raise ValueError(f"Invalid fragment_id: {fragment_id}") # Create storage options provider in worker for credentials refresh - storage_options_provider = _create_storage_options_provider( + storage_options_provider = create_storage_options_provider( namespace_impl, namespace_properties, table_id ) @@ -232,7 +222,7 @@ def merge_index_metadata_compat(dataset, index_id, index_type, **kwargs): def create_scalar_index( - uri: str, + uri: Optional[str] = None, *, column: str, index_type: Literal["BTREE"] @@ -243,6 +233,7 @@ def create_scalar_index( | Literal["NGRAM"] | Literal["ZONEMAP"] | IndexConfig, + table_id: Optional[list[str]] = None, name: Optional[str] = None, replace: bool = True, train: bool = True, @@ -252,7 +243,6 @@ def create_scalar_index( storage_options: Optional[dict[str, str]] = None, namespace_impl: Optional[str] = None, namespace_properties: Optional[dict[str, str]] = None, - table_id: Optional[list[str]] = None, ray_remote_args: Optional[dict[str, Any]] = None, **kwargs: Any, ) -> "lance.LanceDataset": @@ -264,10 +254,13 @@ def create_scalar_index( merged and committed as a single index. Args: - uri: The URI of the Lance dataset to build index on. + uri: The URI of the Lance dataset to build index on. Either uri OR + (namespace_impl + table_id) must be provided. column: Column name to index. index_type: Type of index to build ("BTREE", "BITMAP", "LABEL_LIST", "INVERTED", "FTS", "NGRAM", "ZONEMAP") or IndexConfig object. + table_id: The table identifier as a list of strings. Must be provided + together with namespace_impl. name: Name of the index (generated if None). replace: Whether to replace existing index with the same name (default: True). train: Whether to train the index (default: True). @@ -276,13 +269,10 @@ def create_scalar_index( num_workers: Number of Ray workers to use (keyword-only). storage_options: Storage options for the dataset (keyword-only). namespace_impl: The namespace implementation type (e.g., "rest", "dir"). - Used together with namespace_properties and table_id for credentials - vending in distributed workers. + Used together with table_id for resolving the dataset location and + credentials vending in distributed workers. namespace_properties: Properties for connecting to the namespace. - Used together with namespace_impl and table_id for credentials vending. - table_id: The table identifier as a list of strings. - Used together with namespace_impl and namespace_properties for - credentials vending. + Used together with namespace_impl and table_id. ray_remote_args: Options for Ray tasks (e.g., num_cpus, resources) (keyword-only). **kwargs: Additional arguments to pass to create_scalar_index. @@ -318,6 +308,9 @@ def create_scalar_index( index_id = str(uuid.uuid4()) logger.info(f"Starting distributed index build with ID: {index_id}") + # Validate uri or namespace params + validate_uri_or_namespace(uri, namespace_impl, table_id) + # Basic input validation if not column: raise ValueError("Column name cannot be empty") @@ -356,17 +349,29 @@ def create_scalar_index( # Note: Ray initialization is now handled by the Pool, following the pattern from io.py # This removes the need for explicit ray.init() calls - storage_options = storage_options or {} + merged_storage_options: dict[str, Any] = {} + if storage_options: + merged_storage_options.update(storage_options) + + # Resolve URI and get storage options from namespace if provided + namespace = get_or_create_namespace(namespace_impl, namespace_properties) + if namespace is not None and table_id is not None: + from lance_namespace import DescribeTableRequest + + describe_response = namespace.describe_table(DescribeTableRequest(id=table_id)) + uri = describe_response.location + if describe_response.storage_options: + merged_storage_options.update(describe_response.storage_options) # Create storage options provider for local operations - storage_options_provider = _create_storage_options_provider( + storage_options_provider = create_storage_options_provider( namespace_impl, namespace_properties, table_id ) # Load dataset dataset = LanceDataset( uri, - storage_options=storage_options, + storage_options=merged_storage_options, storage_options_provider=storage_options_provider, ) @@ -464,7 +469,7 @@ def create_scalar_index( index_uuid=index_id, replace=replace, train=train, - storage_options=storage_options, + storage_options=merged_storage_options, namespace_impl=namespace_impl, namespace_properties=namespace_properties, table_id=table_id, @@ -496,7 +501,7 @@ def create_scalar_index( # Reload dataset to get the latest state after fragment index creation dataset = LanceDataset( uri, - storage_options=storage_options, + storage_options=merged_storage_options, storage_options_provider=storage_options_provider, ) @@ -534,7 +539,7 @@ def create_scalar_index( uri, create_index_op, read_version=dataset.version, - storage_options=storage_options, + storage_options=merged_storage_options, storage_options_provider=storage_options_provider, ) diff --git a/lance_ray/io.py b/lance_ray/io.py index b1d76de..bac101b 100644 --- a/lance_ray/io.py +++ b/lance_ray/io.py @@ -14,10 +14,14 @@ from .datasink import LanceDatasink from .datasource import LanceDatasource +from .utils import ( + create_storage_options_provider, + has_namespace_params, + validate_uri_or_namespace, +) if TYPE_CHECKING: from lance.types import ReaderLike - from lance_namespace import LanceNamespace TransformType = ( dict[str, str] @@ -30,7 +34,6 @@ def read_lance( uri: Optional[str] = None, *, - namespace: Optional["LanceNamespace"] = None, table_id: Optional[list[str]] = None, columns: Optional[list[str]] = None, filter: Optional[str] = None, @@ -38,6 +41,8 @@ def read_lance( scanner_options: Optional[dict[str, Any]] = None, dataset_options: Optional[dict[str, Any]] = None, fragment_ids: Optional[list[int]] = None, + namespace_impl: Optional[str] = None, + namespace_properties: Optional[dict[str, str]] = None, ray_remote_args: Optional[dict[str, Any]] = None, concurrency: Optional[int] = None, override_num_blocks: Optional[int] = None, @@ -55,22 +60,20 @@ def read_lance( ... filter="label = 2 AND text IS NOT NULL", ... ) - Using a LanceNamespace and table ID: - >>> import lance_namespace as ln - >>> namespace = ln.connect("dir", {"root": "/path/to/tables"}) # doctest: +SKIP + Using namespace_impl and namespace_properties: >>> ds = lr.read_lance( # doctest: +SKIP - ... namespace=namespace, + ... namespace_impl="dir", + ... namespace_properties={"root": "/path/to/tables"}, ... table_id=["my_table"], ... columns=["image", "label"], ... ) Args: uri: The URI of the Lance dataset to read from. Local file paths, S3, and GCS - are supported. Either uri OR (namespace + table_id) must be provided. - namespace: A LanceNamespace instance to load the table from. Must be provided - together with table_id. + are supported. Either uri OR (namespace_impl + namespace_properties + table_id) + must be provided. table_id: The table identifier as a list of strings. Must be provided together - with namespace. + with namespace_impl and namespace_properties. columns: The columns to read. By default, all columns are read. filter: Read returns only the rows matching the filter. By default, no filter is applied. @@ -84,6 +87,10 @@ def read_lance( This can include options like `version`, `block_size`, etc. For more information, see `Lance API doc `_. fragment_ids: The fragment IDs to read. If provided, only the fragments with the given IDs will be read. + namespace_impl: The namespace implementation type (e.g., "rest", "dir"). + Used together with namespace_properties and table_id. + namespace_properties: Properties for connecting to the namespace. + Used together with namespace_impl and table_id. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks. concurrency: The maximum number of Ray tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn't change the @@ -97,11 +104,10 @@ def read_lance( Returns: A :class:`~ray.data.Dataset` producing records read from the Lance dataset. """ # noqa: E501 - _validate_uri_or_namespace_args(uri, namespace, table_id) + validate_uri_or_namespace(uri, namespace_impl, table_id) datasource = LanceDatasource( uri=uri, - namespace=namespace, table_id=table_id, columns=columns, filter=filter, @@ -109,6 +115,8 @@ def read_lance( scanner_options=scanner_options, dataset_options=dataset_options, fragment_ids=fragment_ids, + namespace_impl=namespace_impl, + namespace_properties=namespace_properties, ) return read_datasource( @@ -123,7 +131,6 @@ def write_lance( ds: Dataset, uri: Optional[str] = None, *, - namespace: Optional["LanceNamespace"] = None, table_id: Optional[list[str]] = None, schema: Optional[pa.Schema] = None, mode: Literal["create", "append", "overwrite"] = "create", @@ -131,6 +138,8 @@ def write_lance( max_rows_per_file: int = 64 * 1024 * 1024, data_storage_version: Optional[str] = None, storage_options: Optional[dict[str, Any]] = None, + namespace_impl: Optional[str] = None, + namespace_properties: Optional[dict[str, str]] = None, ray_remote_args: Optional[dict[str, Any]] = None, concurrency: Optional[int] = None, ) -> None: @@ -146,22 +155,26 @@ def write_lance( ds = ray.data.from_pandas(pd.DataFrame(docs)) lr.write_lance(ds, "/tmp/data/") - Using a LanceNamespace and table ID: + Using namespace_impl and namespace_properties: .. testcode:: - import lance_namespace as ln import lance_ray as lr import pandas as pd docs = [{"title": "Lance data sink test"} for key in range(4)] ds = ray.data.from_pandas(pd.DataFrame(docs)) - namespace = ln.connect("dir", {"root": "/tmp/tables"}) # doctest: +SKIP - lr.write_lance(ds, namespace=namespace, table_id=["my_table"]) # doctest: +SKIP + lr.write_lance( # doctest: +SKIP + ds, + namespace_impl="dir", + namespace_properties={"root": "/tmp/tables"}, + table_id=["my_table"], + ) Args: ds: The Ray dataset to write. - uri: The path to the destination Lance dataset. Can only be provided together with namespace/table_id when creating a new dataset (mode='create' or 'overwrite'). - namespace: A LanceNamespace instance to write the table to. Must be provided together with table_id. - table_id: The table identifier as a list of strings. Must be provided together with namespace. + uri: The path to the destination Lance dataset. Can only be provided together + with namespace parameters when creating a new dataset (mode='create' or 'overwrite'). + table_id: The table identifier as a list of strings. Must be provided together + with namespace_impl and namespace_properties. schema: The schema of the dataset. If not provided, it is inferred from the data. mode: The write mode. Can be "create", "append", or "overwrite". min_rows_per_file: The minimum number of rows per file. @@ -171,12 +184,15 @@ def write_lance( "legacy" which will use the legacy v1 version. See the user guide for more details. storage_options: The storage options for the writer. Default is None. + namespace_impl: The namespace implementation type (e.g., "rest", "dir"). + Used together with namespace_properties and table_id. + namespace_properties: Properties for connecting to the namespace. + Used together with namespace_impl and table_id. """ - _validate_write_args(uri, namespace, table_id, mode) + _validate_write_args(uri, namespace_impl, table_id, mode) datasink = LanceDatasink( uri, - namespace=namespace, table_id=table_id, schema=schema, mode=mode, @@ -184,6 +200,8 @@ def write_lance( max_rows_per_file=max_rows_per_file, data_storage_version=data_storage_version, storage_options=storage_options, + namespace_impl=namespace_impl, + namespace_properties=namespace_properties, ) ds.write_datasink( @@ -193,22 +211,6 @@ def write_lance( ) -def _create_storage_options_provider( - namespace_impl: Optional[str], - namespace_properties: Optional[dict[str, str]], - table_id: Optional[list[str]], -): - """Create a LanceNamespaceStorageOptionsProvider if namespace parameters are provided.""" - if namespace_impl is None or namespace_properties is None or table_id is None: - return None - - import lance_namespace as ln - from lance import LanceNamespaceStorageOptionsProvider - - namespace = ln.connect(namespace_impl, namespace_properties) - return LanceNamespaceStorageOptionsProvider(namespace=namespace, table_id=table_id) - - def _handle_fragment( uri: str, transform: "TransformType", @@ -227,7 +229,7 @@ def _handle_fragment( def func(fragment_id: int): # Create storage options provider in worker for credentials refresh - storage_options_provider = _create_storage_options_provider( + storage_options_provider = create_storage_options_provider( namespace_impl, namespace_properties, table_id ) @@ -306,7 +308,7 @@ def add_columns( storage_options = storage_options or {} # Create storage options provider for local operations - storage_options_provider = _create_storage_options_provider( + storage_options_provider = create_storage_options_provider( namespace_impl, namespace_properties, table_id ) @@ -360,49 +362,29 @@ def add_columns( pool.close() -def _validate_uri_or_namespace_args( - uri: Optional[str], - namespace: Optional["LanceNamespace"], - table_id: Optional[list[str]], -) -> None: - """Validate that either uri OR (namespace + table_id) is provided.""" - if uri is not None and (namespace is not None or table_id is not None): - raise ValueError( - "Cannot provide both 'uri' and 'namespace'/'table_id'. " - "Use either 'uri' OR ('namespace' + 'table_id')." - ) - - if uri is None and (namespace is None or table_id is None): - raise ValueError( - "Must provide either 'uri' OR both 'namespace' and 'table_id'." - ) - - def _validate_write_args( uri: Optional[str], - namespace: Optional["LanceNamespace"], + namespace_impl: Optional[str], table_id: Optional[list[str]], mode: str, ) -> None: """Validate write arguments. - For create/overwrite modes, allows both uri and namespace/table_id to be provided + For create/overwrite modes, allows both uri and namespace parameters to be provided together (to create at a specific location and register with namespace). - For append mode, requires exactly one of uri OR (namespace + table_id). + For append mode, requires exactly one of uri OR namespace parameters. """ - # namespace and table_id must be provided together - if (namespace is None) != (table_id is None): - raise ValueError("Both 'namespace' and 'table_id' must be provided together.") + has_ns = has_namespace_params(namespace_impl, table_id) # For append mode, use the same validation as read operations - if mode == "append" and uri is not None and namespace is not None: + if mode == "append" and uri is not None and has_ns: raise ValueError( - "For append mode, cannot provide both 'uri' and 'namespace'/'table_id'. " - "Use either 'uri' OR ('namespace' + 'table_id')." + "For append mode, cannot provide both 'uri' and namespace parameters. " + "Use either 'uri' OR ('namespace_impl' + 'table_id')." ) # Must provide at least one way to identify the dataset - if uri is None and namespace is None: + if uri is None and not has_ns: raise ValueError( - "Must provide either 'uri' OR both 'namespace' and 'table_id'." + "Must provide either 'uri' OR ('namespace_impl' + 'table_id')." ) diff --git a/lance_ray/utils.py b/lance_ray/utils.py index e2b4787..146f75e 100644 --- a/lance_ray/utils.py +++ b/lance_ray/utils.py @@ -1,9 +1,139 @@ +import os import sys from collections.abc import Iterable, Sequence -from typing import TypeVar +from functools import lru_cache +from typing import Optional, TypeVar T = TypeVar("T") +# Cache size for namespace clients per worker, configurable via environment variable +_NAMESPACE_CACHE_SIZE = int(os.environ.get("LANCE_RAY_NAMESPACE_CACHE_SIZE", "16")) + + +def has_namespace_params( + namespace_impl: Optional[str], + table_id: Optional[list[str]], +) -> bool: + """Check if namespace parameters are provided. + + Only namespace_impl and table_id are required; namespace_properties can be None. + + Args: + namespace_impl: The namespace implementation type (e.g., "rest", "dir"). + table_id: The table identifier as a list of strings. + + Returns: + True if both namespace_impl and table_id are provided, False otherwise. + """ + return namespace_impl is not None and table_id is not None + + +def validate_uri_or_namespace( + uri: Optional[str], + namespace_impl: Optional[str], + table_id: Optional[list[str]], +) -> None: + """Validate that either uri OR (namespace_impl + table_id) is provided. + + Args: + uri: The URI of the dataset. + namespace_impl: The namespace implementation type. + table_id: The table identifier. + + Raises: + ValueError: If both uri and namespace params are provided, or neither. + """ + has_ns = has_namespace_params(namespace_impl, table_id) + + if uri is not None and has_ns: + raise ValueError( + "Cannot provide both 'uri' and namespace parameters. " + "Use either 'uri' OR ('namespace_impl' + 'table_id')." + ) + + if uri is None and not has_ns: + raise ValueError( + "Must provide either 'uri' OR ('namespace_impl' + 'table_id')." + ) + + +@lru_cache(maxsize=_NAMESPACE_CACHE_SIZE) +def _get_cached_namespace( + namespace_impl: str, + namespace_properties_tuple: Optional[tuple[tuple[str, str], ...]], +): + """Internal cached namespace loader. Use get_or_create_namespace() instead.""" + import lance_namespace as ln + + namespace_properties = ( + dict(namespace_properties_tuple) if namespace_properties_tuple else {} + ) + return ln.connect(namespace_impl, namespace_properties) + + +def get_or_create_namespace( + namespace_impl: Optional[str], + namespace_properties: Optional[dict[str, str]], +): + """Get or create a cached namespace client. + + This function loads a namespace client from cache or creates a new one. + The namespace client is cached per-worker using lru_cache. Module-level state + persists across task invocations within the same Ray worker process, avoiding + redundant network calls to recreate namespace connections. + + Args: + namespace_impl: The namespace implementation type (e.g., "rest", "dir"). + namespace_properties: Properties for connecting to the namespace (can be None). + + Returns: + A namespace client instance, or None if namespace_impl is not provided. + """ + if namespace_impl is None: + return None + + # Convert dict to hashable tuple for lru_cache (None if no properties) + namespace_properties_tuple = ( + tuple(sorted(namespace_properties.items())) if namespace_properties else None + ) + return _get_cached_namespace(namespace_impl, namespace_properties_tuple) + + +def create_storage_options_provider( + namespace_impl: Optional[str], + namespace_properties: Optional[dict[str, str]], + table_id: Optional[list[str]], +): + """Create a LanceNamespaceStorageOptionsProvider if namespace parameters are provided. + + This function reconstructs a namespace connection and creates a storage options + provider for credential refresh in distributed workers. Workers receive serializable + namespace_impl/properties/table_id instead of the non-serializable namespace object. + + The namespace client is cached per-worker to avoid redundant connection overhead + across multiple task invocations within the same Ray worker. + + Args: + namespace_impl: The namespace implementation type (e.g., "rest", "dir"). + namespace_properties: Properties for connecting to the namespace (can be None). + table_id: The table identifier as a list of strings. + + Returns: + LanceNamespaceStorageOptionsProvider if namespace_impl and table_id are provided, + None otherwise. + """ + if not has_namespace_params(namespace_impl, table_id): + return None + + namespace = get_or_create_namespace(namespace_impl, namespace_properties) + if namespace is None: + return None + + from lance import LanceNamespaceStorageOptionsProvider + + return LanceNamespaceStorageOptionsProvider(namespace=namespace, table_id=table_id) + + if sys.version_info >= (3, 12): from itertools import batched diff --git a/tests/test_basic_read_write.py b/tests/test_basic_read_write.py index 00755f1..dff3cb2 100644 --- a/tests/test_basic_read_write.py +++ b/tests/test_basic_read_write.py @@ -320,15 +320,21 @@ class TestNamespaceReadWrite: def test_write_and_read_with_directory_namespace(self, sample_data, temp_dir): """Test write and read using DirectoryNamespace.""" - import lance_namespace as ln - - namespace = ln.connect("dir", {"root": temp_dir}) table_id = ["test_table"] original_dataset = ray.data.from_pandas(sample_data) - lr.write_lance(original_dataset, namespace=namespace, table_id=table_id) + lr.write_lance( + original_dataset, + namespace_impl="dir", + namespace_properties={"root": temp_dir}, + table_id=table_id, + ) - read_dataset = lr.read_lance(namespace=namespace, table_id=table_id) + read_dataset = lr.read_lance( + namespace_impl="dir", + namespace_properties={"root": temp_dir}, + table_id=table_id, + ) read_df = read_dataset.to_pandas() original_sorted = sample_data.sort_values("id").reset_index(drop=True) diff --git a/tests/test_distributed_compaction.py b/tests/test_distributed_compaction.py index 15709d3..028daad 100644 --- a/tests/test_distributed_compaction.py +++ b/tests/test_distributed_compaction.py @@ -428,7 +428,6 @@ def test_compaction_with_directory_namespace(self, temp_dir): """Test compaction using DirectoryNamespace for credentials vending.""" import lance_namespace as ln - namespace = ln.connect("dir", {"root": temp_dir}) table_id = ["compaction_test_table"] fragment1 = pd.DataFrame( @@ -447,7 +446,8 @@ def test_compaction_with_directory_namespace(self, temp_dir): first_ray_ds = ray.data.from_pandas(fragment1) lr.write_lance( first_ray_ds, - namespace=namespace, + namespace_impl="dir", + namespace_properties={"root": temp_dir}, table_id=table_id, min_rows_per_file=10, max_rows_per_file=10, @@ -456,7 +456,8 @@ def test_compaction_with_directory_namespace(self, temp_dir): second_ray_ds = ray.data.from_pandas(fragment2) lr.write_lance( second_ray_ds, - namespace=namespace, + namespace_impl="dir", + namespace_properties={"root": temp_dir}, table_id=table_id, mode="append", min_rows_per_file=10, @@ -465,6 +466,7 @@ def test_compaction_with_directory_namespace(self, temp_dir): from lance_namespace import DescribeTableRequest + namespace = ln.connect("dir", {"root": temp_dir}) describe_response = namespace.describe_table(DescribeTableRequest(id=table_id)) uri = describe_response.location @@ -477,8 +479,8 @@ def test_compaction_with_directory_namespace(self, temp_dir): num_threads=1, ) + # Use namespace params only - compact_files will resolve URI internally metrics = lr.compact_files( - uri=uri, compaction_options=compaction_options, num_workers=2, namespace_impl="dir", @@ -490,5 +492,7 @@ def test_compaction_with_directory_namespace(self, temp_dir): assert metrics.fragments_added == 1, "Should add 1 fragment" dataset = lance.dataset(uri) - assert len(dataset.get_fragments()) == 1, "Should have 1 fragment after compaction" + assert len(dataset.get_fragments()) == 1, ( + "Should have 1 fragment after compaction" + ) assert dataset.count_rows() == 20, "Should still have 20 total rows" diff --git a/tests/test_distributed_indexing.py b/tests/test_distributed_indexing.py index ebc5e52..7d693fe 100755 --- a/tests/test_distributed_indexing.py +++ b/tests/test_distributed_indexing.py @@ -897,9 +897,6 @@ class TestNamespaceIndexing: def test_distributed_fts_index_with_directory_namespace(self, temp_dir): """Test distributed FTS index building using DirectoryNamespace.""" - import lance_namespace as ln - - namespace = ln.connect("dir", {"root": temp_dir}) table_id = ["fts_index_test_table"] data = pd.DataFrame( @@ -913,19 +910,15 @@ def test_distributed_fts_index_with_directory_namespace(self, temp_dir): dataset = ray.data.from_pandas(data) lr.write_lance( dataset, - namespace=namespace, + namespace_impl="dir", + namespace_properties={"root": temp_dir}, table_id=table_id, min_rows_per_file=25, max_rows_per_file=25, ) - from lance_namespace import DescribeTableRequest - - describe_response = namespace.describe_table(DescribeTableRequest(id=table_id)) - uri = describe_response.location - + # Use namespace params only - create_scalar_index will resolve URI internally updated_dataset = lr.create_scalar_index( - uri=uri, column="text", index_type="INVERTED", name="fts_namespace_idx", @@ -954,9 +947,6 @@ def test_distributed_fts_index_with_directory_namespace(self, temp_dir): def test_distributed_btree_index_with_directory_namespace(self, temp_dir): """Test distributed BTREE index building using DirectoryNamespace.""" - import lance_namespace as ln - - namespace = ln.connect("dir", {"root": temp_dir}) table_id = ["btree_index_test_table"] data = pd.DataFrame( @@ -968,19 +958,15 @@ def test_distributed_btree_index_with_directory_namespace(self, temp_dir): dataset = ray.data.from_pandas(data) lr.write_lance( dataset, - namespace=namespace, + namespace_impl="dir", + namespace_properties={"root": temp_dir}, table_id=table_id, min_rows_per_file=50, max_rows_per_file=50, ) - from lance_namespace import DescribeTableRequest - - describe_response = namespace.describe_table(DescribeTableRequest(id=table_id)) - uri = describe_response.location - + # Use namespace params only - create_scalar_index will resolve URI internally updated_dataset = lr.create_scalar_index( - uri=uri, column="id", index_type="BTREE", name="btree_namespace_idx",