Skip to content

Conversation

ericm-db
Copy link
Contributor

@ericm-db ericm-db commented Jul 29, 2025

What changes were proposed in this pull request?

Currently, RocksDB memory is untracked and not included in memory decisions in Spark. We want to factor the RocksDB memory usage into memory allocations so we don't hit OOMs. This change introduces a background memory polling thread from the MemoryManager that queries RocksDB memory every X seconds (configurable via SQLConf).

Why are the changes needed?

This helps us avoid OOMs when RocksDB is used as the StateStoreProvider by taking other Spark allocations into account.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit tests

Was this patch authored or co-authored using generative AI tooling?

No

@LuciferYang
Copy link
Contributor

Besides Structured Streaming, RocksDB is also used in other areas like the Live UI. Don’t they require similar handling?

@ericm-db
Copy link
Contributor Author

@LuciferYang Sorry, I think I'm missing something. Could you elaborate on what the suggestion is here?

@anishshri-db
Copy link
Contributor

@LuciferYang - which components are you referring to ?

@LuciferYang
Copy link
Contributor

@LuciferYang - which components are you referring to ?

For instance, Spark Live UI can also utilize RocksDB as its storage backend, and I'm not sure if it encounters similar issues as well.

@LuciferYang
Copy link
Contributor

@LuciferYang - which components are you referring to ?

For instance, Spark Live UI can also utilize RocksDB as its storage backend, and I'm not sure if it encounters similar issues as well.

I'm just quite curious about this. Even if such issues exist, we can still fix them in a separate PR.

@ericm-db
Copy link
Contributor Author

I'm just quite curious about this. Even if such issues exist, we can still fix them in a separate PR.

Sure, sounds good

"Setting this to 0 disables unmanaged memory polling.")
.version("4.1.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be safe and avoid a regression, shall we start with 0 by default, @ericm-db , @anishshri-db, @gatorsmile ?

* @param unmanagedMemoryConsumer The consumer to register for memory tracking
*/
def registerUnmanagedMemoryConsumer(
unmanagedMemoryConsumer: UnmanagedMemoryConsumer): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation, @ericm-db ?

case class UnmanagedMemoryConsumerId(
componentType: String,
instanceKey: String
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation, @ericm-db ?

* - Native libraries with custom memory allocation
* - Off-heap caches managed outside of Spark
*/
trait UnmanagedMemoryConsumer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we move this into a separate file, UnmanagedMemoryConsumer.scala?

* @return Total memory usage in bytes across all tracked components
*/
def getMemoryUsage: Long = {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Let's remove this redundant empty line.

require(db != null && !db.isClosed, "RocksDB must be open to get memory usage")
RocksDB.mainMemorySources.map { memorySource =>
getDBProperty(memorySource)
}.sum
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a one-liner? Maybe, the following style?

- RocksDB.mainMemorySources.map { memorySource =>
-   getDBProperty(memorySource)
- }.sum
+ RocksDB.mainMemorySources.map(getDBProperty).sum

* Updates the cached memory usage if enough time has passed.
* This is called from task thread operations, so it's already thread-safe.
*/
def updateMemoryUsageIfNeeded(): Unit = {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like being invoked frequently in several places. What is the overload of this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's minimal, on the order of ns

*/
object RocksDBMemoryManager extends Logging {
object RocksDBMemoryManager extends Logging with UnmanagedMemoryConsumer{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little surprised because Scala linter didn't catch this, with UnmanagedMemoryConsumer{. Could you add a space like with UnmanagedMemoryConsumer {?

boundedMemoryEnabled.toString)) {

import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.sql.streaming.Trigger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have special reasons why we have these import statements in the test code body? Otherwise, please move this to the file header.


try {
// Let the stream run to establish RocksDB instances and generate state operations
Thread.sleep(2000) // 2 seconds should be enough for several processing cycles
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a little risky. Can we use eventually instead of Thread.sleep?

val maxAttempts = 15 // 15 attempts with 1-second intervals = 15 seconds max

while (rocksDBMemory <= 0L && attempts < maxAttempts) {
Thread.sleep(1000) // Wait between checks to allow memory updates
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

}

// Verify memory tracking remains stable during continued operation
Thread.sleep(2000) // Let stream continue running
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

@ericm-db
Copy link
Contributor Author

ericm-db commented Aug 1, 2025

@dongjoon-hyun thank you for all the feedback, I will address this

@ericm-db
Copy link
Contributor Author

ericm-db commented Aug 1, 2025

@dongjoon-hyun Can you PTAL at this PR: #51778

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants