Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2166 databricks direct loading #2219

Open
wants to merge 27 commits into
base: devel
Choose a base branch
from

Conversation

donotpush
Copy link
Collaborator

Copy link

netlify bot commented Jan 15, 2025

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit 18b2bd8
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/679cac52b902c300080d1e38

@donotpush donotpush changed the title #2166 databricks direct loading WIP: #2166 databricks direct loading Jan 16, 2025
@donotpush donotpush marked this pull request as draft January 16, 2025 12:22
@rudolfix rudolfix self-requested a review January 20, 2025 11:21
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On top of this review:
We should be able to run databricks without configured staging so we can enable many tests that will do that.

We have destination config in destinations_configs test util:

  1. remove databricks here:
destination_configs += [
            DestinationTestConfiguration(destination_type=destination)
            for destination in SQL_DESTINATIONS
            if destination
            not in ("athena", "synapse", "databricks", "dremio", "clickhouse", "sqlalchemy")
        ]
  1. add staging to this and move it in with other staging databricks setup
destination_configs += [
            DestinationTestConfiguration(
                destination_type="databricks",
                file_format="parquet",
                bucket_url=AZ_BUCKET,
                extra_info="az-authorization",
            )
        ]

you can also ping me and I can prepare a valid commit

# databricks authentication: get context config
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code is correct but you must handle the situation when default credentials do not exist (ie. outside of notebook). I get this exception in this case:

ValueError: default auth: cannot configure default credentials, please check https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication to configure credentials for your preferred authentication method.

just skip the code that assign values

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, I haven't tested yet on the notebook, and you are also right about that exception context - I have to catch that exception, and add some tests.

else:
return "", file_name

volume_path = f"/Volumes/{self._sql_client.database_name}/{self._sql_client.dataset_name}/{self._sql_client.volume_name}/{time.time_ns()}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how IMO we should handle volumes:

  1. Allow to define staging_volume_name in DatabricksClientConfiguration. This should be (I think) fully qualified name.
  2. If staging_volume_name is empty: create here (ad hoc) a volume with _dlt_temp_load_volume
  3. we do not need to handle volumes on the level of sql_client. you can drop additional method you added
  4. we do not need to care to drop _dlt_temp_load_volume. it belong to current schema. so if schema is dropped, the volume will be dropped as well (I hope!)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need time_ns?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree about the volume_name

I'll answer the path (time_ns) and file_name in another comment

return "", file_name

volume_path = f"/Volumes/{self._sql_client.database_name}/{self._sql_client.dataset_name}/{self._sql_client.volume_name}/{time.time_ns()}"
volume_file_name = ( # replace file_name for random hex code - databricks loading fails when file_name starts with - or .
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the same name as in file_name cant be used here? it will be unique (will contain file_id part which is uniq_id() already)


file_name = FileStorage.get_file_name_from_file_path(local_file_path)
file_format = ""
if file_name.endswith(".parquet"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think that you need to know the file format here. just upload a file we have. it has proper extension. also keep the file_name as mentioned above

@@ -63,6 +63,7 @@ def iter_df(self, chunk_size: int) -> Generator[DataFrame, None, None]:

class DatabricksSqlClient(SqlClientBase[DatabricksSqlConnection], DBTransaction):
dbapi: ClassVar[DBApi] = databricks_lib
volume_name: str = "_dlt_temp_load_volume"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to DatabricksConfiguration (as mentioned above)

@@ -102,6 +103,18 @@ def close_connection(self) -> None:
self._conn.close()
self._conn = None

def create_volume(self) -> None:
self.execute_sql(f"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create volume ad hoc before uploading file

@@ -176,6 +176,7 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
self.sql_client.create_dataset()
elif truncate_tables:
self.sql_client.truncate_tables(*truncate_tables)
self.sql_client.create_volume()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can drop all of those

@@ -1,4 +1,6 @@
[runtime]
log_level="DEBUG"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remember to remove before final push

" and the server_hostname."
)

self.direct_load = True
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we need this. if we have a local file we do direct load. we do not need to be in a notebook context to do it. just the default access token needs notebook

@donotpush
Copy link
Collaborator Author

On top of this review: We should be able to run databricks without configured staging so we can enable many tests that will do that.

We have destination config in destinations_configs test util:

  1. remove databricks here:
destination_configs += [
            DestinationTestConfiguration(destination_type=destination)
            for destination in SQL_DESTINATIONS
            if destination
            not in ("athena", "synapse", "databricks", "dremio", "clickhouse", "sqlalchemy")
        ]
  1. add staging to this and move it in with other staging databricks setup
destination_configs += [
            DestinationTestConfiguration(
                destination_type="databricks",
                file_format="parquet",
                bucket_url=AZ_BUCKET,
                extra_info="az-authorization",
            )
        ]

you can also ping me and I can prepare a valid commit

I have managed to run tests without staging this way:

@pytest.mark.parametrize(
    "destination_config",
    destinations_configs(default_sql_configs=True, subset=("databricks",)),
    ids=lambda x: x.name,
)
def test_databricks_direct_load(destination_config: DestinationTestConfiguration) -> None:
    os.environ["DESTINATION__DATABRICKS__CREDENTIALS__CLIENT_ID"] = ""
    os.environ["DESTINATION__DATABRICKS__CREDENTIALS__CLIENT_SECRET"] = ""

    # direct_load
    os.environ["DESTINATION__DATABRICKS__CREDENTIALS__DIRECT_LOAD"] = "True"

    bricks = databricks()
    config = bricks.configuration(None, accept_partial=True)
    assert config.credentials.access_token

    dataset_name = "test_databricks_token" + uniq_id()
    pipeline = destination_config.setup_pipeline(
        "test_databricks_token", dataset_name=dataset_name, destination=bricks
    )

    info = pipeline.run([1, 2, 3], table_name="digits", **destination_config.run_kwargs)
    assert info.has_failed_jobs is False

    with pipeline.sql_client() as client:
        rows = client.execute_sql(f"select * from {dataset_name}.digits")
        assert len(rows) == 3

@donotpush
Copy link
Collaborator Author

donotpush commented Jan 20, 2025

Answering (1) CREATE/DROP volume and (2) file name issues

(1) CREATE/DROP Volume

I agree that we can implement your proposed approach for CREATE/DROP VOLUME. However, it will be less efficient as the volume will remain until either the user decides to clean it or we implement a cleanup process (e.g., deleting files after copy statement).

The reason I introduced the create_volume and drop_volume methods is that the DatabricksLoadJob.run method is executed for every single file in parallel, treating each file as a separate COPY statement. This approach is inefficient for “direct loading” since we could load multiple files into a single table using one COPY statement. Additionally, the COPY statement from a volume isn’t file-based—the path must refer to a directory that contains multiple files. This design constraint necessitates creating a unique directory for each file in the current implementation.

Your proposal to run multiple CREATE VOLUME statements might not heavily impact ODBC performance, but it was problem for when I was creating the volume with the SDK (API rate limits) . While this might not be an issue with ODBC since it supports CREATE VOLUME IF NOT EXISTS, it will execute a lot of redudant statements but ODBC will handle it.

Another reason for introducing these methods was to allow the DROP VOLUME statement to be executed outside the run_pool or worker context, which is not feasible otherwise. That said, I’m flexible. If you prefer, I can remove these methods, execute CREATE VOLUME as many times as there are files in the pipeline, and leave the volumes as is, relying on the user for cleanup.

(2) file name issues

I spent an entire day troubleshooting this issue:

[COPY_INTO_SOURCE_SCHEMA_INFERENCE_FAILED] The source directory did not contain any parsable files of type PARQUET. Please check the contents of '/Volumes/test/test_databricks_tokenb3337f88ee667396b15f4e5b2dd5dbb0/_temp_load_volume/file_1737154045813408000'. The error can be silenced by setting 'spark.databricks.delta.copyInto.emptySourceCheck.enabled' to 'false'.

After extensive debugging, I discovered that Databricks runs everything on Spark, and files with names starting with _ or . are treated as hidden files. This caused the COPY command to fail. As a result, we need a way to rename files to avoid conflicts, ensuring file names never depend on the exact table name. For instance, it always failed on the DLT table _dlt_pipeline_state (file _dlt_pipeline_state.1b87dba623.0.parquet).

Since the COPY statement requires a directory (not a file path) and each directory can contain 1 to N files, our current design creates a directory per file. I’ve been using time.time_ns() to create unique directories inside the volume and a random UID as a hex code for the file name. Changing the file name also required determining the correct file extension or format.

To summarize:
• For (1), we can remove the methods and use CREATE VOLUME IF NOT EXISTS. However, for large loads, this will generate many unnecessary SQL statements.
•For (2), we need to rename files to avoid conflicts with Spark/Databricks limitations, and each COPY statement must point to a unique directory.

I’m open to adjusting the path format to something more standardized or continuing with random UIDs. Also, let me know your thoughts on deleting files from the volume after loading. Should we leave them for the user to clean, or implement automated cleanup?

@donotpush donotpush marked this pull request as ready for review January 24, 2025 18:15
@donotpush donotpush changed the title WIP: #2166 databricks direct loading #2166 databricks direct loading Jan 24, 2025
@donotpush donotpush requested a review from rudolfix January 24, 2025 21:24
@donotpush
Copy link
Collaborator Author

@rudolfix I’ve implemented most of your recommendations, but I ran into an issue with enabling the non-staging tests. Unfortunately, your suggested approach didn’t work as expected, the destination gets added, but the files are not local. I’m unsure if it’s possible to achieve this by only changing the configuration without modifying how dlt.destination.databricks is instantiated in the tests

That said, I did add a new test that uses the local file successfully.

Let me know if you have further suggestions or ideas on this!

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I enables no staging mode as a default for databricks.

  1. please update the documentation
  2. is it hard to delete staging files from databrick volumes? I bet you can do that from SQL... we have similar functionality for snowflake. IMO should be pretty easy
    otherwise LGTM! good job

w = WorkspaceClient()
self.access_token = w.dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None) # type: ignore[union-attr]

# pick the first warehouse on the list
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please take a list of warehouses when server_hostname or http_path and then set only the empty. configuration takes precedence. and if we have both values we skip getting warehouses altogether

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

fully_qualified_volume_name = self._job_client.config.staging_volume_name
volume_catalog, volume_database, volume_name = fully_qualified_volume_name.split(".")

self._sql_client.execute_sql(f"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create only unnamed volumes. if staging_volume_name assume that it exists. please also document this. mention that if you go for production it is better to use named volume

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

CREATE VOLUME IF NOT EXISTS {fully_qualified_volume_name}
""")

volume_path = f"/Volumes/{volume_catalog}/{volume_database}/{volume_name}/{time.time_ns()}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you should take uniq_id() from dlt common instead of time? for example on windows you get timer resolution of 20ms. you may get clashing names

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@donotpush
Copy link
Collaborator Author

@rudolfix

Thanks for the review, and helping out with enabling the tests.

I've implemented all your recommendations.

  • Docs, I've named this feature "Direct Load", feel free to suggest changes or change it directly, perhaps it is better to add it just as another staging flavour: databricks managed volumes.

  • Files deleted from volume right after loading using SQL: REMOVE volume_path - executing that command required a new conn parameter to be set: self.staging_allowed_local_path = get_dlt_pipelines_dir() - I didn't know which path to use, perhaps is better to use something different (it also works just setting a non-empty string), I don't see any data being generated in the provided path.

  • About the tests, I think that a few aren't working propertly, for example test_databricks_external_location is now only running on destination "databricks-no-staging" (but files aren't local) instead of databricks-*-staging-az-authorization

@donotpush donotpush requested a review from rudolfix January 29, 2025 23:19
rudolfix
rudolfix previously approved these changes Jan 30, 2025
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! thanks for the tests and docs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Load data into databricks without external staging and auth.
2 participants