Skip to content

Commit fb03c12

Browse files
authored
Merge pull request #24 from famarting/get-work-items-send-timeout
optional timeout to get work items stream send
2 parents e10f6af + 2cabb81 commit fb03c12

File tree

1 file changed

+29
-1
lines changed

1 file changed

+29
-1
lines changed

backend/executor.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type grpcExecutor struct {
5757
onWorkItemConnection func(context.Context) error
5858
onWorkItemDisconnect func(context.Context) error
5959
streamShutdownChan <-chan any
60+
streamSendTimeout *time.Duration
6061
}
6162

6263
type grpcExecutorOptions func(g *grpcExecutor)
@@ -91,6 +92,12 @@ func WithStreamShutdownChannel(c <-chan any) grpcExecutorOptions {
9192
}
9293
}
9394

95+
func WithStreamSendTimeout(d time.Duration) grpcExecutorOptions {
96+
return func(g *grpcExecutor) {
97+
g.streamSendTimeout = &d
98+
}
99+
}
100+
94101
// NewGrpcExecutor returns the Executor object and a method to invoke to register the gRPC server in the executor.
95102
func NewGrpcExecutor(be Backend, logger Logger, opts ...grpcExecutorOptions) (executor Executor, registerServerFn func(grpcServer grpc.ServiceRegistrar)) {
96103
grpcExecutor := &grpcExecutor{
@@ -323,7 +330,7 @@ func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream prot
323330
}
324331
}
325332

326-
if err := stream.Send(wi); err != nil {
333+
if err := g.sendWorkItem(stream, wi); err != nil {
327334
g.logger.Errorf("encountered an error while sending work item: %v", err)
328335
return err
329336
}
@@ -337,6 +344,27 @@ func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream prot
337344
}
338345
}
339346

347+
func (g *grpcExecutor) sendWorkItem(stream protos.TaskHubSidecarService_GetWorkItemsServer, wi *protos.WorkItem) error {
348+
ctx := stream.Context()
349+
if g.streamSendTimeout != nil {
350+
var cancel context.CancelFunc
351+
ctx, cancel = context.WithTimeout(ctx, *g.streamSendTimeout)
352+
defer cancel()
353+
}
354+
355+
errCh := make(chan error, 2)
356+
go func() {
357+
select {
358+
case errCh <- stream.Send(wi):
359+
case <-ctx.Done():
360+
g.logger.Errorf("timed out while sending work item")
361+
errCh <- ctx.Err()
362+
}
363+
}()
364+
365+
return <-errCh
366+
}
367+
340368
// CompleteOrchestratorTask implements protos.TaskHubSidecarServiceServer
341369
func (g *grpcExecutor) CompleteOrchestratorTask(ctx context.Context, res *protos.OrchestratorResponse) (*protos.CompleteTaskResponse, error) {
342370
iid := api.InstanceID(res.InstanceId)

0 commit comments

Comments
 (0)