Skip to content

Commit b6f0591

Browse files
committed
Attempt to introduce graceful close from workers back to coordinator
1 parent 0d420f0 commit b6f0591

File tree

2 files changed

+14
-2
lines changed

2 files changed

+14
-2
lines changed

src/workers.jl

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,17 @@ struct Request
3333
# ignoring other fields
3434
shutdown::Bool
3535
end
36+
is_shutdown(r::Request) = r.shutdown
3637

3738
# worker executes Request and returns a serialized Response object *if* Request has an id
3839
struct Response
3940
result
4041
error::Union{Nothing, Exception}
4142
id::UInt64 # matches a corresponding Request.id
43+
# if true, worker is shutting down, so we can stop listening to it.
44+
shutdown::Bool
4245
end
46+
is_shutdown(r::Response) = r.shutdown
4347

4448
# simple Future that coordinator can wait on until a Response comes back for a Request
4549
struct Future
@@ -232,6 +236,7 @@ function process_responses(w::Worker, ev::Threads.Event)
232236
# get the next Response from the worker
233237
r = deserialize(w.socket)
234238
@assert r isa Response "Received invalid response from worker $(w.pid): $(r)"
239+
is_shutdown(r) && break
235240
# println("Received response $(r) from worker $(w.pid)")
236241
@lock lock begin
237242
@assert haskey(reqs, r.id) "Received response for unknown request $(r.id) from worker $(w.pid)"
@@ -318,7 +323,14 @@ function serve_requests(io)
318323
while true
319324
req = deserialize(io)
320325
@assert req isa Request
321-
req.shutdown && break
326+
if is_shutdown(req)
327+
resp = Response(nothing, nothing, rand(UInt64), true)
328+
@lock iolock begin
329+
# println("sending response: $(resp)")
330+
serialize(io, resp)
331+
flush(io)
332+
end
333+
end
322334
# println("received request: $(req)")
323335
Threads.@spawn begin
324336
r = $req

test/workers.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ using Test
1717
@testset "clean shutdown ($w)" begin
1818
close(w)
1919
@test !process_running(w.process)
20-
@test w.process.termsignal == Base.SIGTERM
20+
@test w.process.termsignal == 0
2121
@test w.process.exitcode == 0
2222
@test !isopen(w.socket)
2323
@test w.terminated

0 commit comments

Comments
 (0)