Skip to content

Commit

Permalink
Cherrypick #32276 to 2.59.0 release branch. (#32280)
Browse files Browse the repository at this point in the history
* Add an idle shutdown timout to prism binary.

* Correct flag text.

Co-authored-by: Jack McCluskey <[email protected]>

---------

Co-authored-by: lostluck <[email protected]>
Co-authored-by: Jack McCluskey <[email protected]>
  • Loading branch information
3 people authored Aug 22, 2024
1 parent efa8dbc commit 4173f54
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 18 deletions.
25 changes: 16 additions & 9 deletions sdks/go/cmd/prism/prism.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,34 @@ import (
)

var (
jobPort = flag.Int("job_port", 8073, "specify the job management service port")
webPort = flag.Int("web_port", 8074, "specify the web ui port")
jobManagerEndpoint = flag.String("jm_override", "", "set to only stand up a web ui that refers to a seperate JobManagement endpoint")
serveHTTP = flag.Bool("serve_http", true, "enable or disable the web ui")
jobPort = flag.Int("job_port", 8073, "specify the job management service port")
webPort = flag.Int("web_port", 8074, "specify the web ui port")
jobManagerEndpoint = flag.String("jm_override", "", "set to only stand up a web ui that refers to a seperate JobManagement endpoint")
serveHTTP = flag.Bool("serve_http", true, "enable or disable the web ui")
idleShutdownTimeout = flag.Duration("idle_shutdown_timeout", -1, "duration that prism will wait for a new job before shutting itself down. Negative durations disable auto shutdown. Defaults to never shutting down.")
)

func main() {
flag.Parse()
ctx := context.Background()
cli, err := makeJobClient(ctx, prism.Options{Port: *jobPort}, *jobManagerEndpoint)
ctx, cancel := context.WithCancelCause(context.Background())

cli, err := makeJobClient(ctx,
prism.Options{
Port: *jobPort,
IdleShutdownTimeout: *idleShutdownTimeout,
CancelFn: cancel,
},
*jobManagerEndpoint)
if err != nil {
log.Fatalf("error creating job server: %v", err)
}
if *serveHTTP {
if err := prism.CreateWebServer(ctx, cli, prism.Options{Port: *webPort}); err != nil {
log.Fatalf("error creating web server: %v", err)
}
} else {
// Block main thread forever to keep main from exiting.
<-(chan struct{})(nil) // receives on nil channels block.
}
// Block main thread forever to keep main from exiting.
<-ctx.Done()
}

func makeJobClient(ctx context.Context, opts prism.Options, endpoint string) (jobpb.JobServiceClient, error) {
Expand Down
22 changes: 17 additions & 5 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,31 @@ func (e *joinError) Error() string {
return string(b)
}

func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jobpb.PrepareJobResponse, error) {
func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ *jobpb.PrepareJobResponse, err error) {
s.mu.Lock()
defer s.mu.Unlock()

// Since jobs execute in the background, they should not be tied to a request's context.
rootCtx, cancelFn := context.WithCancelCause(context.Background())
// Wrap in a Once so it will only be invoked a single time for the job.
terminalOnceWrap := sync.OnceFunc(s.jobTerminated)
job := &Job{
key: s.nextId(),
Pipeline: req.GetPipeline(),
jobName: req.GetJobName(),
options: req.GetPipelineOptions(),
streamCond: sync.NewCond(&sync.Mutex{}),
RootCtx: rootCtx,
CancelFn: cancelFn,

CancelFn: func(err error) {
cancelFn(err)
terminalOnceWrap()
},
artifactEndpoint: s.Endpoint(),
}
// Stop the idle timer when a new job appears.
if idleTimer := s.idleTimer.Load(); idleTimer != nil {
idleTimer.Stop()
}

// Queue initial state of the job.
job.state.Store(jobpb.JobState_STOPPED)
Expand Down Expand Up @@ -155,7 +163,9 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
case urns.TransformParDo:
var pardo pipepb.ParDoPayload
if err := proto.Unmarshal(t.GetSpec().GetPayload(), &pardo); err != nil {
return nil, fmt.Errorf("unable to unmarshal ParDoPayload for %v - %q: %w", tid, t.GetUniqueName(), err)
wrapped := fmt.Errorf("unable to unmarshal ParDoPayload for %v - %q: %w", tid, t.GetUniqueName(), err)
job.Failed(wrapped)
return nil, wrapped
}

isStateful := false
Expand All @@ -181,7 +191,9 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
case urns.TransformTestStream:
var testStream pipepb.TestStreamPayload
if err := proto.Unmarshal(t.GetSpec().GetPayload(), &testStream); err != nil {
return nil, fmt.Errorf("unable to unmarshal TestStreamPayload for %v - %q: %w", tid, t.GetUniqueName(), err)
wrapped := fmt.Errorf("unable to unmarshal TestStreamPayload for %v - %q: %w", tid, t.GetUniqueName(), err)
job.Failed(wrapped)
return nil, wrapped
}

t.EnvironmentId = "" // Unset the environment, to ensure it's handled prism side.
Expand Down
53 changes: 52 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
package jobservices

import (
"context"
"fmt"
"math"
"net"
"os"
"sync"
"sync/atomic"
"time"

fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
Expand All @@ -39,9 +43,17 @@ type Server struct {

// Job Management
mu sync.Mutex
index uint32
index uint32 // Use with atomics.
jobs map[string]*Job

// IdleShutdown management. Needs to use atomics, since they
// may be both while already holding the lock, or when not
// (eg via job state).
idleTimer atomic.Pointer[time.Timer]
terminatedJobCount uint32 // Use with atomics.
idleTimeout time.Duration
cancelFn context.CancelCauseFunc

// execute defines how a job is executed.
execute func(*Job)

Expand Down Expand Up @@ -91,3 +103,42 @@ func (s *Server) Serve() {
func (s *Server) Stop() {
s.server.GracefulStop()
}

// IdleShutdown allows the server to call the cancelFn if there have been no active jobs
// for at least the given timeout.
func (s *Server) IdleShutdown(timeout time.Duration, cancelFn context.CancelCauseFunc) {
s.mu.Lock()
defer s.mu.Unlock()
s.idleTimeout = timeout
s.cancelFn = cancelFn

// Stop gap to kill the process less gracefully.
if s.cancelFn == nil {
s.cancelFn = func(cause error) {
os.Exit(1)
}
}

s.idleTimer.Store(time.AfterFunc(timeout, s.idleShutdownCallback))
}

// idleShutdownCallback is called by the AfterFunc timer for idle shutdown.
func (s *Server) idleShutdownCallback() {
index := atomic.LoadUint32(&s.index)
terminated := atomic.LoadUint32(&s.terminatedJobCount)
if index == terminated {
slog.Info("shutting down after being idle", "idleTimeout", s.idleTimeout)
s.cancelFn(nil)
}
}

// jobTerminated marks that the job has been terminated, and if there are no active jobs, starts the idle timer.
func (s *Server) jobTerminated() {
if s.idleTimer.Load() != nil {
terminated := atomic.AddUint32(&s.terminatedJobCount, 1)
total := atomic.LoadUint32(&s.index)
if total == terminated {
s.idleTimer.Store(time.AfterFunc(s.idleTimeout, s.idleShutdownCallback))
}
}
}
8 changes: 5 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ import (
"embed"
"encoding/json"
"fmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"html/template"
"io"
"net/http"
"sort"
"strings"
"sync"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/metricsx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
Expand Down Expand Up @@ -435,5 +436,6 @@ func Initialize(ctx context.Context, port int, jobcli jobpb.JobServiceClient) er
endpoint := fmt.Sprintf("localhost:%d", port)

slog.Info("Serving WebUI", slog.String("endpoint", "http://"+endpoint))
return http.ListenAndServe(endpoint, mux)
go http.ListenAndServe(endpoint, mux)
return nil
}
12 changes: 12 additions & 0 deletions sdks/go/pkg/beam/runners/prism/prism.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package prism

import (
"context"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
Expand Down Expand Up @@ -58,13 +59,24 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error)

// Options for in process server creation.
type Options struct {
// Port the Job Management Server should start on.
Port int

// The time prism will wait for new jobs before shuting itself down.
IdleShutdownTimeout time.Duration
// CancelFn allows Prism to terminate the program due to it's internal state, such as via the idle shutdown timeout.
// If unset, os.Exit(1) will be called instead.
CancelFn context.CancelCauseFunc
}

// CreateJobServer returns a Beam JobServicesClient connected to an in memory JobServer.
// This call is non-blocking.
func CreateJobServer(ctx context.Context, opts Options) (jobpb.JobServiceClient, error) {
s := jobservices.NewServer(opts.Port, internal.RunPipeline)

if opts.IdleShutdownTimeout > 0 {
s.IdleShutdown(opts.IdleShutdownTimeout, opts.CancelFn)
}
go s.Serve()
clientConn, err := grpc.DialContext(ctx, s.Endpoint(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
if err != nil {
Expand Down

0 comments on commit 4173f54

Please sign in to comment.