Skip to content

Conversation

@micheal-o
Copy link
Contributor

@micheal-o micheal-o commented Nov 14, 2025

What changes were proposed in this pull request?

Introducing the API for offline repartitioning of streaming state. This is currently not exposed, since it is still in development. Also implemented some of the core functionalities of the repartition batch runner, that validates the checkpoint, creates the repartition batch and commits. Subsequent PRs will build on this. Also Spark connect and pyspark APIs will be added in subsequent PRs.

Also introduce the streamingCheckpointManager for performing operations on the streaming checkpoint. This is currently not exposed, since it is still in development.

Why are the changes needed?

Streaming state repartitioning

Does this PR introduce any user-facing change?

No

How was this patch tested?

New test suite added

Was this patch authored or co-authored using generative AI tooling?

No

},
"sqlState" : "55019"
},
"STATE_REPARTITION_INVALID_PARAMETER" : {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: INVALID_OPTIONS ?

Copy link
Contributor Author

@micheal-o micheal-o Nov 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to avoid confusion, since options in spark means .option(). So using parameter instead.

/**
* Returns a `StreamingCheckpointManager` that allows managing any streaming checkpoint.
*/
private[spark] def streamingCheckpointManager: StreamingCheckpointManager =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add to Spark connect also ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark connect and pyspark will be added in subsequent PRs.

/** @inheritdoc */
override private[spark] def repartition(
checkpointLocation: String,
numPartitions: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the underlying recorded value is Int, but should we consider bumping this to Long eventually - probably unlikely for users to have those many partitions though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets keep it as int since that is what we record in checkpoint. Also using Long is very unrealistic.

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

object StreamingUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put this under this directory -

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

throw OfflineStateRepartitionErrors.parameterIsNotGreaterThanZeroError("numPartitions")
}

val runner = new OfflineStateRepartitionRunner(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we encapsulate this whole block in a try-catch in case we want to catch and log any warnings ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see run method

numPartitions,
enforceExactlyOnceSink
)
runner.run()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add some logging to indicate the repartition started/ended and the time it took to complete the operation along with other identifying information about the query ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see the runner run method


val newBatchId = createNewBatchIfNeeded(lastBatchId, lastCommittedBatchId)

// todo: Do the repartitioning here, in subsequent PR
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create SPARK JIRAs and link them here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants