Skip to content

Commit 527faa0

Browse files
committed
WIP: snapshotting etc.
1 parent 9e936a7 commit 527faa0

File tree

7 files changed

+125
-189
lines changed

7 files changed

+125
-189
lines changed

cmd/localstack/main.go

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"go.amzn.com/lambda/interop"
3030
"go.amzn.com/lambda/rapidcore"
3131
supv "go.amzn.com/lambda/supervisor"
32-
"go.amzn.com/lambda/telemetry"
3332
)
3433

3534
func InitLsOpts() *server.LsOpts {
@@ -197,32 +196,37 @@ func main() {
197196
logCollector := logging.NewLogCollector()
198197
localStackLogsEgressApi := logging.NewLocalStackLogsEgressAPI(logCollector)
199198
tracer := tracing.NewLocalStackTracer()
200-
eventsListener := events.NewEventsListener(lsAdapter, &telemetry.NoOpEventsAPI{}, logCollector, functionConf)
199+
eventsListener := events.NewEventsListener(lsAdapter)
201200

202201
defaultSupv := supv.NewLocalSupervisor()
203202
wrappedSupv := supervisor.NewLocalStackSupervisor(ctx, defaultSupv, eventsListener, interopServer.InternalState)
204203

205204
// build sandbox
205+
exitChan := make(chan struct{})
206206
sandbox := rapidcore.
207207
NewSandboxBuilder().
208-
//SetTracer(tracer).
209208
AddShutdownFunc(func() {
210209
log.Debugln("Stopping file watcher")
211210
cancelFileWatcher()
212211
}).
212+
AddShutdownFunc(func() {
213+
exitChan <- struct{}{}
214+
}).
213215
SetExtensionsFlag(true).
214216
SetInitCachingFlag(true).
215217
SetLogsEgressAPI(localStackLogsEgressApi).
216218
SetTracer(tracer).
217219
SetInteropServer(interopServer).
218220
SetSupervisor(wrappedSupv).
219-
SetEventsAPI(eventsListener)
221+
SetHandler(handler)
222+
sandbox.AddShutdownFunc(func() {
223+
exitChan <- struct{}{}
224+
})
220225

221-
// SetEventsAPI(eventsListener)
226+
// Start daemons
222227

223-
// Externally set supervisor for metrics tracking
224-
// sandbox.SetSupervisor(localSupervisor)
225-
// sandbox.SetRuntimeFsRootPath(localSupervisor.RootPath)
228+
// Start hot-reloading watcher
229+
go hotreloading.RunHotReloadingListener(interopServer, lsOpts.HotReloadingPaths, fileWatcherContext, lsOpts.FileWatcherStrategy)
226230

227231
// xray daemon
228232
endpoint := "http://" + net.JoinHostPort(lsOpts.LocalstackIP, lsOpts.EdgePort)
@@ -234,15 +238,7 @@ func main() {
234238
log.Debugln("Flushing segments in xray daemon")
235239
d.Close()
236240
}()
237-
d.Run() // async
238-
239-
if len(handler) > 0 {
240-
sandbox.SetHandler(handler)
241-
}
242-
exitChan := make(chan struct{})
243-
sandbox.AddShutdownFunc(func() {
244-
exitChan <- struct{}{}
245-
})
241+
d.Run() // served async
246242

247243
// initialize all flows and start runtime API
248244
sandboxContext, internalStateFn := sandbox.Create()
@@ -278,12 +274,6 @@ func main() {
278274
log.Debugf("LocalStack API gateway listening on %s", listener.Addr().String())
279275
}()
280276

281-
wg.Add(1)
282-
go func() {
283-
defer wg.Done()
284-
hotreloading.RunHotReloadingListener(interopServer, lsOpts.HotReloadingPaths, fileWatcherContext, lsOpts.FileWatcherStrategy)
285-
}()
286-
287277
wg.Wait()
288278

289279
log.Debugln("Awaiting initialization of runtime init.")

internal/events/events.go

Lines changed: 5 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -3,33 +3,23 @@ package events
33
import (
44
"encoding/json"
55
"fmt"
6-
"time"
76

8-
"github.com/localstack/lambda-runtime-init/internal/logging"
97
"github.com/localstack/lambda-runtime-init/internal/server"
10-
log "github.com/sirupsen/logrus"
118

129
"go.amzn.com/lambda/interop"
1310
"go.amzn.com/lambda/telemetry"
1411
)
1512

1613
type EventsListener struct {
1714
interop.EventsAPI
18-
19-
adapter *server.LocalStackAdapter
20-
logCollector *logging.LogCollector
21-
22-
function server.FunctionConfig
23-
24-
initStart time.Time
25-
initEnd time.Time
15+
adapter *server.LocalStackAdapter
2616
}
2717

28-
func NewEventsListener(adapter *server.LocalStackAdapter, eventsAPI interop.EventsAPI, logger *logging.LogCollector, cfg server.FunctionConfig) *EventsListener {
18+
func NewEventsListener(adapter *server.LocalStackAdapter) *EventsListener {
2919
return &EventsListener{
30-
adapter: adapter,
31-
EventsAPI: eventsAPI,
32-
logCollector: logger,
20+
adapter: adapter,
21+
// For now just use the no-ops API to satisfy the EventsAPI interface
22+
EventsAPI: &telemetry.NoOpEventsAPI{},
3323
}
3424

3525
}
@@ -47,60 +37,3 @@ func (ev *EventsListener) SendFault(data interop.FaultData) error {
4737

4838
return ev.adapter.SendStatus(server.Error, payload)
4939
}
50-
51-
func (ev *EventsListener) SendInitStart(_ interop.InitStartData) error {
52-
ev.initStart = time.Now() // need to use this event to retrieve when the INIT start happens
53-
return nil
54-
}
55-
56-
func (ev *EventsListener) SendInitRuntimeDone(done interop.InitRuntimeDoneData) error {
57-
ev.initEnd = time.Now()
58-
initDurationMs := float64(ev.initEnd.Sub(ev.initStart)) / float64(time.Millisecond) // convert µs -> ms
59-
switch done.Status {
60-
case telemetry.RuntimeDoneSuccess:
61-
return ev.adapter.SendStatus(server.Ready, []byte{})
62-
case telemetry.RuntimeDoneError:
63-
if done.ErrorType != nil {
64-
if done.Phase != telemetry.InitInsideInitPhase {
65-
log.WithField("ErrorType", *done.ErrorType).Debug("Failed to initialise runtime during INIT phase. This will be retried during the next INVOKE.")
66-
}
67-
_, _ = fmt.Fprintf(ev.logCollector, "INIT_REPORT Init Duration: %.2f ms Phase: init Status: %s", initDurationMs, done.Status)
68-
// return ev.adapter.SendStatus(server.Error, []byte(*done.ErrorType))
69-
}
70-
}
71-
return nil
72-
}
73-
74-
func (ev *EventsListener) SendInvokeStart(invoke interop.InvokeStartData) error {
75-
_, _ = fmt.Fprintf(ev.logCollector, "START RequestId: %s Version: %s\n", invoke.RequestID, invoke.Version)
76-
return nil
77-
}
78-
79-
func (ev *EventsListener) SendInvokeRuntimeDone(invoke interop.InvokeRuntimeDoneData) error {
80-
initDurationMs := float64(ev.initEnd.Sub(ev.initStart)) / float64(time.Millisecond) // convert µs -> ms
81-
memUsed := invoke.Metrics.ProducedBytes / (1024 * 1024) // bytes -> mb
82-
83-
report := fmt.Sprintf("REPORT RequestId: %s\t"+
84-
"Duration: %.2f ms\t"+
85-
"Billed Duration: %.0f ms\t"+
86-
"Memory Size: %s MB\t"+
87-
"Max Memory Used: %d MB\t"+
88-
"Init Duration: %.2f ms",
89-
invoke.RequestID, invoke.Metrics.DurationMs, ev.function.FunctionMemorySizeMb, memUsed, initDurationMs)
90-
91-
_, _ = fmt.Fprintln(ev.logCollector, report)
92-
93-
// we dont send the logs from here since we may want to delay sending logs before we know for certain that
94-
// the /response has been sent
95-
96-
return nil
97-
}
98-
99-
func (ev *EventsListener) Reset() {
100-
ev.initStart = time.Time{}
101-
ev.initEnd = time.Time{}
102-
}
103-
104-
// func (ev *EventsListener) SendInitReport(data interop.InitReportData) error {
105-
106-
// }

internal/logging/collector.go

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,40 +10,27 @@ type LogResponse struct {
1010
}
1111

1212
type LogCollector struct {
13-
mutex *sync.Mutex
14-
RuntimeLogs []string
13+
logs strings.Builder
14+
mu *sync.Mutex
1515
}
1616

1717
func NewLogCollector() *LogCollector {
1818
return &LogCollector{
19-
RuntimeLogs: []string{},
20-
mutex: &sync.Mutex{},
19+
mu: &sync.Mutex{},
2120
}
2221
}
2322

2423
func (lc *LogCollector) Write(p []byte) (n int, err error) {
25-
lc.Put(string(p))
26-
return len(p), nil
27-
}
28-
29-
func (lc *LogCollector) Put(line string) {
30-
lc.mutex.Lock()
31-
defer lc.mutex.Unlock()
32-
lc.RuntimeLogs = append(lc.RuntimeLogs, line)
33-
}
34-
35-
func (lc *LogCollector) reset() {
36-
lc.mutex.Lock()
37-
defer lc.mutex.Unlock()
38-
lc.RuntimeLogs = []string{}
24+
lc.mu.Lock()
25+
defer lc.mu.Unlock()
26+
n, err = lc.logs.Write(p)
27+
return
3928
}
4029

4130
func (lc *LogCollector) GetLogs() LogResponse {
42-
lc.mutex.Lock()
43-
defer lc.mutex.Unlock()
44-
response := LogResponse{
45-
Logs: strings.Join(lc.RuntimeLogs, ""),
46-
}
47-
lc.RuntimeLogs = []string{}
48-
return response
31+
lc.mu.Lock()
32+
defer lc.mu.Unlock()
33+
resp := LogResponse{Logs: lc.logs.String()}
34+
lc.logs.Reset()
35+
return resp
4936
}

internal/server/handler.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,10 @@ func InvokeHandler(api *LocalStackService) http.HandlerFunc {
6060
log.Fatalln("unable to marshal json timeout response")
6161
}
6262
response = errorResponseJson
63-
// default:
64-
// log.Fatalln(err)
6563
}
6664

6765
api.AfterInvoke(r.Context())
68-
api.CollectAndSendLogs(req.InvokeId)
66+
api.ForwardLogs(req.InvokeId)
6967

7068
invokeRespDecoder := json.NewDecoder(bytes.NewReader(response))
7169
invokeRespDecoder.DisallowUnknownFields()

internal/server/interop.go

Lines changed: 45 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"github.com/sirupsen/logrus"
1515
log "github.com/sirupsen/logrus"
1616
"go.amzn.com/lambda/core/directinvoke"
17-
"go.amzn.com/lambda/fatalerror"
1817
"go.amzn.com/lambda/interop"
1918
"go.amzn.com/lambda/metering"
2019
"go.amzn.com/lambda/rapi/model"
@@ -60,13 +59,14 @@ func NewInteropServer(server *rapidcore.Server, ls *LocalStackAdapter) *CustomIn
6059
return &CustomInteropServer{
6160
Server: server,
6261
localStackAdapter: ls,
62+
mutex: &sync.Mutex{},
6363
}
6464
}
6565

6666
type CustomInteropServer struct {
6767
*rapidcore.Server
6868
localStackAdapter *LocalStackAdapter
69-
initOnce sync.Once
69+
mutex *sync.Mutex
7070
}
7171

7272
func (c *CustomInteropServer) Invoke(responseWriter http.ResponseWriter, invoke *interop.Invoke) error {
@@ -77,69 +77,78 @@ func (c *CustomInteropServer) Invoke(responseWriter http.ResponseWriter, invoke
7777
defer close(releaseRespChan)
7878

7979
go func() {
80-
_, err := c.Server.Reserve(invoke.ID, invoke.TraceID, invoke.LambdaSegmentID)
81-
if err != nil && !errors.Is(err, rapidcore.ErrAlreadyReserved) {
80+
reserveResp, err := c.Server.Reserve(invoke.ID, invoke.TraceID, invoke.LambdaSegmentID)
81+
if err != nil {
8282
releaseRespChan <- err
8383
return
8484
}
8585

86-
invoke.DeadlineNs = fmt.Sprintf("%d", metering.Monotime()+c.Server.GetInvokeTimeout().Nanoseconds())
86+
invoke.DeadlineNs = fmt.Sprintf("%d", metering.Monotime()+reserveResp.Token.FunctionTimeout.Nanoseconds())
87+
88+
// Wait for initialization to complete
89+
if err := c.Server.AwaitInitialized(); err != nil {
90+
switch err {
91+
case rapidcore.ErrInitDoneFailed:
92+
// Init failed, reset and continue with suppressed init
93+
if _, resetErr := c.Server.Reset("InitFailed", 2000); resetErr != nil {
94+
log.Errorf("Reset failed: %v", resetErr)
95+
}
96+
// Reserve again after reset for suppressed init
97+
if _, reserveErr := c.Server.Reserve(invoke.ID, invoke.TraceID, invoke.LambdaSegmentID); reserveErr != nil {
98+
releaseRespChan <- reserveErr
99+
return
100+
}
101+
default:
102+
releaseRespChan <- err
103+
return
104+
}
105+
}
106+
87107
go func() {
88108
isDirect := directinvoke.MaxDirectResponseSize > interop.MaxPayloadSize
89109
if err := c.Server.FastInvoke(responseWriter, invoke, isDirect); err != nil {
90110
log.Debugf("FastInvoke() error: %s", err)
91111
}
92112
}()
93113

94-
// Waits for an invocation to finish before calling Release()
95114
_, err = c.Server.AwaitRelease()
96115
if err != nil {
97-
log.Debugf("AwaitRelease() error: %s", err)
98116
switch err {
99117
case rapidcore.ErrReleaseReservationDone:
100-
// not an error, expected return value when Reset is called
118+
// Expected when Reset is called
119+
releaseRespChan <- nil
101120
return
102121
case rapidcore.ErrInitDoneFailed, rapidcore.ErrInvokeDoneFailed:
103-
c.Server.Reset("ReleaseFail", 2000)
122+
if _, resetErr := c.Server.Reset("ReleaseFail", 2000); resetErr != nil {
123+
log.Errorf("Reset failed: %v", resetErr)
124+
}
125+
releaseRespChan <- err
126+
return
127+
default:
128+
if _, resetErr := c.Server.Reset("UnexpectedError", 2000); resetErr != nil {
129+
log.Errorf("Reset failed: %v", resetErr)
130+
}
131+
releaseRespChan <- err
132+
return
104133
}
105134
}
106-
releaseRespChan <- err
135+
136+
releaseRespChan <- nil
107137
}()
108138

109139
select {
110140
case err := <-releaseRespChan:
111-
if err != nil {
112-
return err
113-
}
141+
return err
114142
case <-ctx.Done():
115143
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
116-
c.Server.Reset("Timeout", 2000)
144+
if _, resetErr := c.Server.Reset("Timeout", 2000); resetErr != nil {
145+
log.Errorf("Reset failed: %v", resetErr)
146+
}
117147
return rapidcore.ErrInvokeTimeout
118148
}
119149
}
120150

121-
return c.Server.Release()
122-
}
123-
124-
func (c *CustomInteropServer) AwaitInitialized() error {
125-
err := c.Server.AwaitInitialized()
126-
go func() {
127-
state, stateErr := c.InternalState()
128-
if stateErr != nil {
129-
return
130-
}
131-
132-
doneResponse := &interop.Done{}
133-
if state.FirstFatalError != "" {
134-
doneResponse.ErrorType = fatalerror.GetValidRuntimeOrFunctionErrorType(state.FirstFatalError)
135-
}
136-
137-
c.InitDoneChan <- rapidcore.DoneWithState{
138-
Done: doneResponse,
139-
State: c.InternalStateGetter(),
140-
}
141-
}()
142-
return err
151+
return nil
143152
}
144153

145154
func (c *CustomInteropServer) SendInitErrorResponse(resp *interop.ErrorInvokeResponse) error {

internal/server/model.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type ErrorResponse struct {
6161
ErrorType string `json:"errorType,omitempty"`
6262
RequestId *string `json:"requestId,omitempty"`
6363
StackTrace []string `json:"stackTrace,omitempty"`
64+
Trace []string `json:"trace,omitempty"`
6465
}
6566

6667
type ResponseWriterProxy struct {

0 commit comments

Comments
 (0)