Skip to content

Commit 2a8768b

Browse files
committed
WIP: Trying to prevent preemptively terminating the process in response to its shutdown signal.
But encountering a bug: JuliaLang/julia#54145
1 parent 2f3e31a commit 2a8768b

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

src/workers.jl

+15-2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ struct Response
4343
# if true, worker is shutting down, so we can stop listening to it.
4444
shutdown::Bool
4545
end
46+
Response(a, b, c) = Response(a, b, c, false)
47+
shutdown_response() = Response(nothing, nothing, rand(UInt64), true)
4648
is_shutdown(r::Response) = r.shutdown
4749

4850
# simple Future that coordinator can wait on until a Response comes back for a Request
@@ -96,6 +98,7 @@ function terminate!(w::Worker, from::Symbol=:manual)
9698
if !(w.socket.status == Base.StatusUninit || w.socket.status == Base.StatusInit || w.socket.handle === C_NULL)
9799
close(w.socket)
98100
end
101+
@debug "Done cleaning up after terminating worker $(w.pid) from $from"
99102
return
100103
end
101104

@@ -210,6 +213,8 @@ function redirect_worker_output(io::IO, w::Worker, fn, proc, ev::Threads.Event)
210213
try
211214
notify(ev) # notify we've started
212215
while !process_exited(proc) && !w.terminated
216+
# Core.println(getpid(proc))
217+
# Core.println(process_exited(proc))
213218
line = readline(proc)
214219
if !isempty(line)
215220
fn(io, w.pid, line)
@@ -237,7 +242,14 @@ function process_responses(w::Worker, ev::Threads.Event)
237242
# get the next Response from the worker
238243
r = deserialize(w.socket)
239244
@assert r isa Response "Received invalid response from worker $(w.pid): $(r)"
240-
is_shutdown(r) && break
245+
if is_shutdown(r)
246+
@debug "Received shutdown response from worker $(w.pid). Waiting for shutdown of $(w.process)"
247+
# TODO(PR): SOMEHOW this wait(p) is not getting interrupted when p dies as a zombie <defunct> process.
248+
wait(w.process)
249+
@debug "shutdown"
250+
terminate!(w, :process_responses)
251+
break
252+
end
241253
# println("Received response $(r) from worker $(w.pid)")
242254
@lock lock begin
243255
@assert haskey(reqs, r.id) "Received response for unknown request $(r.id) from worker $(w.pid)"
@@ -295,6 +307,7 @@ function startworker()
295307
serve_requests(accept(sock))
296308
finally
297309
close(sock)
310+
@debug "Shutting down worker $(getpid())"
298311
exit(0)
299312
end
300313
end
@@ -326,7 +339,7 @@ function serve_requests(io)
326339
@assert req isa Request
327340
if is_shutdown(req)
328341
@debug "Received shutdown request on worker $(getpid())"
329-
resp = Response(nothing, nothing, rand(UInt64), true)
342+
resp = shutdown_response()
330343
@lock iolock begin
331344
# println("sending response: $(resp)")
332345
serialize(io, resp)

test/workers.jl

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ using Test
4141

4242
w = Worker()
4343
@testset "remote_eval/remote_fetch ($w)" begin
44+
@info "starting testset remote_eval/remote_fetch ($w)"
4445
expr = quote
4546
global x
4647
x = 101

0 commit comments

Comments
 (0)