Skip to content

[SPARK-52238][PYTHON] Python client for Declarative Pipelines #50963

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

sryza
Copy link
Contributor

@sryza sryza commented May 21, 2025

What changes were proposed in this pull request?

Adds the Python client for Declarative Pipelines. This implements the command line interface and Python APIs described in the Declarative Pipelines SPIP.

Python API for defining pipeline graph elements

The Python API consists of these APIs for defining flows and datasets in a pipeline dataflow graph (see their docstring for more details):

  • create_streaming_table
  • @append_flow
  • @materialized_view
  • @table
  • @temporary_view

Example file of definitions:

from pyspark.sql import SparkSession
from pyspark.sql import pipelines as sdp

spark = SparkSession.getActiveSession()

@sdp.materialized_view
def baby_names_raw():
    return (
        spark.read.option("header", "true").csv("babynames.csv")
        .withColumnRenamed("First Name", "First_Name")
    )

Command line interface

The CLI is implemented as a Spark Connect client. It enables launching runs of declarative pipelines. It accepts a YAML spec, which specifies where on the local filesystem to look for the Python and SQL files that contain the definitions of the flows and datasets that make up the pipeline dataflow graph.

Example usage:

bin/spark-pipelines run --remote sc://localhost --spec pipeline.yml

Example output:

Loading pipeline spec from pipeline.yaml...
Creating Spark session...
Creating dataflow graph...
Registering graph elements...
Loading definitions. Root directory: ..
Found 1 files matching glob 'transformations/**/*.py'
Importing transformations/baby_names_raw.py...
Found 1 files matching glob 'transformations/**/*.sql'
Registering SQL file transformations/baby_names_prepared.sql...
Starting run...
Starting execution...
2025-05-20T15:08:01.395Z: Flow `spark_catalog`.`default`.`baby_names_raw` is QUEUED.
2025-05-20T15:08:01.398Z: Flow `spark_catalog`.`default`.`baby_names_prepared` is QUEUED.
2025-05-20T15:08:01.402Z: Flow 'spark_catalog.default.baby_names_raw' is PLANNING.
2025-05-20T15:08:01.403Z: Flow `spark_catalog`.`default`.`baby_names_raw` is STARTING.
2025-05-20T15:08:01.404Z: Flow `spark_catalog`.`default`.`baby_names_raw` is RUNNING.
2025-05-20T15:08:03.096Z: Flow 'spark_catalog.default.baby_names_raw' has COMPLETED.
2025-05-20T15:08:03.422Z: Flow 'spark_catalog.default.baby_names_prepared' is PLANNING.
2025-05-20T15:08:03.422Z: Flow `spark_catalog`.`default`.`baby_names_prepared` is STARTING.
2025-05-20T15:08:03.422Z: Flow `spark_catalog`.`default`.`baby_names_prepared` is RUNNING.
2025-05-20T15:08:03.875Z: Flow 'spark_catalog.default.baby_names_prepared' has COMPLETED.
2025-05-20T15:08:05.492Z: Run has COMPLETED.

Architecture diagram

image

Why are the changes needed?

In order to implement Declarative Pipelines, as described in the SPIP.

Does this PR introduce any user-facing change?

No previous behavior is changed, but new behavior is introduced.

How was this patch tested?

Unit testing

Includes unit tests for:

  • Python API error cases – test_decorators.py
  • Command line functionality – test_cli.py
  • The harness for registering graph elements while evaluating pipeline definition Python files – test_graph_element_registry.py
  • Code for blocking execution and analysis within decorated query functions – test_block_connect_access.py

Note that, once the backend is wired up, we will submit additional unit tests that cover end-to-end pipeline execution with Python.

CLI testing

With the Declarative Pipelines Spark Connect backend (coming in a future PR), I ran the CLI and confirmed that it executed a pipeline as expected.

Was this patch authored or co-authored using generative AI tooling?

@HyukjinKwon HyukjinKwon changed the title [SPARK-52238] Python client for Declarative Pipelines [SPARK-52238][PYTHON] Python client for Declarative Pipelines May 21, 2025
Copy link
Contributor

@zhengruifeng zhengruifeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Declarative Pipelines supposed to be only supported in connect mode?

from pyspark.sql.pipelines.block_connect_access import block_spark_connect_execution_and_analysis


class BlockSparkConnectAccessTests(ReusedConnectTestCase):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new tests should be registered in dev/sparktestsupport/modules.py, otherwise they are skipped

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we plan to also test this file in classic mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should only be tested in connect mode – do I need to add something to the file to set that up?


class GraphElementRegistryTest(unittest.TestCase):
def test_graph_element_registry(self):
spark = SparkSession.builder.getOrCreate()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not reusing the ReusedSQLTestCase (for classic) and ReusedConnectTestCase (for connect)?

@sryza
Copy link
Contributor Author

sryza commented May 21, 2025

@zhengruifeng this initial implementation is just for Connect. Connect is more straightforward to support, because Connect DataFrames are lazier than classic DataFrames. This means we can evaluate the user's decorated query function immediately rather than call back after all upstream datasets have been resolved.

However, it's designed in a way that can support classic in the future – by implementing a GraphElementRegistry that registers graph elements over Py4J instead of Connect.

@sryza sryza requested a review from zhengruifeng May 22, 2025 18:07
@github-actions github-actions bot added the BUILD label May 22, 2025
@sryza sryza self-assigned this May 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants