Skip to content

Conversation

@ueshin
Copy link
Member

@ueshin ueshin commented Nov 14, 2025

What changes were proposed in this pull request?

Kills the worker if flush fails in daemon.py.

  • Spark conf: spark.python.daemon.killWorkerOnFlushFailure (default true)
  • SQL conf: spark.sql.execution.pyspark.udf.daemonKillWorkerOnFlushFailure (fallback to the above)

Before it just dies, reuse faulthandler feature and record the thread dump and it will appear in the error message if faulthandler is enabled.

WARN TaskSetManager: Lost task 3.0 in stage 1.0 (TID 8) (127.0.0.1 executor 1): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed): Current thread 0x00000001f0796140 (most recent call first):
  File "/.../python/pyspark/daemon.py", line 95 in worker
  File "/.../python/pyspark/daemon.py", line 228 in manager
  File "/.../python/pyspark/daemon.py", line 253 in <module>
  File "<frozen runpy>", line 88 in _run_code
  File "<frozen runpy>", line 198 in _run_module_as_main

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:679)
...

Even when faulthandler is not eabled, the error will appear in the executor's stderr file.

Traceback (most recent call last):
  File "/.../python/pyspark/daemon.py", line 228, in manager
    code = worker(sock, authenticated)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.../python/pyspark/daemon.py", line 88, in worker
    raise Exception("test")
Exception: test

When this is disabled, the behavior is the same as before but with a log.

Why are the changes needed?

Currently an exception caused by outfile.flush() failure in daemon.py is ignored, but if the last command in worker_main is still not flushed, it could cause a UDF stuck in Java waiting for the response from the Python worker.

It should just die and let Spark retry the task.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Manually.

Test with the patch to emulate the case
% git diff
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 54c9507e625..e107216d769 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -84,6 +84,8 @@ def worker(sock, authenticated):
         exit_code = compute_real_exit_code(exc.code)
     finally:
         try:
+            if worker_main.__globals__.get("TEST", False):
+                raise Exception("test")
             outfile.flush()
         except Exception:
             faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 6e34b041665..ff210f4fd97 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -3413,7 +3413,14 @@ def main(infile, outfile):

     # check end of stream
     if read_int(infile) == SpecialLengths.END_OF_STREAM:
-        write_int(SpecialLengths.END_OF_STREAM, outfile)
+        import random
+
+        if random.random() < 0.1:
+            # emulate the last command is not flushed yet
+            global TEST
+            TEST = True
+        else:
+            write_int(SpecialLengths.END_OF_STREAM, outfile)
     else:
         # write a different value to tell JVM to not reuse this worker
         write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
@@ -3423,6 +3430,9 @@ def main(infile, outfile):
     faulthandler.cancel_dump_traceback_later()


+TEST = False
+
+ if __name__ == "__main__":
     # Read information about how to connect back to the JVM from the environment.
     conn_info = os.environ.get(

With just pass (before this), it gets stuck, and after this it lets Spark retry the task.

Was this patch authored or co-authored using generative AI tooling?

No.

@ueshin
Copy link
Member Author

ueshin commented Nov 14, 2025

I'd mark this as draft for now as I'm not super confident about this.
We may want a flag to disable this in case of the situation even worse.
cc @HyukjinKwon @zhengruifeng @cloud-fan

@ueshin ueshin requested a review from zhengruifeng November 14, 2025 02:47
faulthandler_log_path = os.path.join(faulthandler_log_path, str(os.getpid()))
with open(faulthandler_log_path, "w") as faulthandler_log_file:
faulthandler.dump_traceback(file=faulthandler_log_file)
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the behavior change here intentional? The original code swallows the exception and return the actual exit code. This will raise.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's intentional.
If it returns some exit code (!=0), it waits for Java to send kind of "ACK" command for graceful shutdown, which is part of the protocol, but Java is still waiting for the reply from Python, so it won't recover anyway.
I don't think it can recover in this case as something wrong should be already happening on the connection.

if not reuse or code:
# wait for closing
try:
while sock.recv(1024):
pass
except Exception:
pass
break

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be good enough to just write out the exception to see whether this actually happens or not, without this behavior change, for now.

@ueshin ueshin marked this pull request as ready for review November 15, 2025 00:03
Comment on lines 142 to 152
val PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE =
ConfigBuilder("spark.python.daemon.killWorkerOnFlushFailure")
.doc("When enabled, exceptions raised during output flush operations in the Python " +
"worker managed under Python daemon are not caught, causing the worker to terminate " +
"with the exception. This allows Spark to detect the failure and retry the task. " +
"When disabled (default), flush exceptions are caught and logged, " +
"but the worker continues, " +
"which could cause the worker to get stuck due to protocol mismatch.")
.version("4.1.0")
.booleanConf
.createWithDefault(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default to false here feels weird given we're saying we expect issues with it disabled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it to true. Thanks!

"with the exception. This allows Spark to detect the failure and retry the task. " +
"When disabled, flush exceptions are caught and logged but the worker continues, " +
"which could cause the worker to get stuck due to protocol mismatch.")
.version("4.1.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it targeting Apache Spark 4.1.0 as a bug fix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, yes, but it depends on how the reviews go, Also I'll follow you as the release manager. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants