Skip to content

Commit 9e936a7

Browse files
committed
WIP events...
1 parent f0fd184 commit 9e936a7

File tree

6 files changed

+134
-46
lines changed

6 files changed

+134
-46
lines changed

cmd/localstack/main.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ func main() {
197197
logCollector := logging.NewLogCollector()
198198
localStackLogsEgressApi := logging.NewLocalStackLogsEgressAPI(logCollector)
199199
tracer := tracing.NewLocalStackTracer()
200-
eventsListener := events.NewEventsListener(lsAdapter, &telemetry.NoOpEventsAPI{})
200+
eventsListener := events.NewEventsListener(lsAdapter, &telemetry.NoOpEventsAPI{}, logCollector, functionConf)
201201

202202
defaultSupv := supv.NewLocalSupervisor()
203203
wrappedSupv := supervisor.NewLocalStackSupervisor(ctx, defaultSupv, eventsListener, interopServer.InternalState)
@@ -215,7 +215,8 @@ func main() {
215215
SetLogsEgressAPI(localStackLogsEgressApi).
216216
SetTracer(tracer).
217217
SetInteropServer(interopServer).
218-
SetSupervisor(wrappedSupv)
218+
SetSupervisor(wrappedSupv).
219+
SetEventsAPI(eventsListener)
219220

220221
// SetEventsAPI(eventsListener)
221222

@@ -255,8 +256,7 @@ func main() {
255256
// notification channels and status fields are properly initialized before `AwaitInitialized`
256257
log.Debugln("Starting runtime init.")
257258
if err := localStackService.Initialize(bootstrap); err != nil {
258-
log.Fatalf("Failed to initialize runtime: %s", err)
259-
return
259+
log.WithError(err).Warnf("Failed to initialize runtime. Initialization will be retried in the next invoke.")
260260
}
261261

262262
invokeServer := server.NewServer(lsOpts.InteropPort, localStackService)

internal/events/events.go

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,33 @@ package events
33
import (
44
"encoding/json"
55
"fmt"
6+
"time"
67

8+
"github.com/localstack/lambda-runtime-init/internal/logging"
79
"github.com/localstack/lambda-runtime-init/internal/server"
10+
log "github.com/sirupsen/logrus"
11+
812
"go.amzn.com/lambda/interop"
13+
"go.amzn.com/lambda/telemetry"
914
)
1015

1116
type EventsListener struct {
1217
interop.EventsAPI
1318

14-
adapter *server.LocalStackAdapter
19+
adapter *server.LocalStackAdapter
20+
logCollector *logging.LogCollector
21+
22+
function server.FunctionConfig
23+
24+
initStart time.Time
25+
initEnd time.Time
1526
}
1627

17-
func NewEventsListener(adapter *server.LocalStackAdapter, eventsAPI interop.EventsAPI) *EventsListener {
28+
func NewEventsListener(adapter *server.LocalStackAdapter, eventsAPI interop.EventsAPI, logger *logging.LogCollector, cfg server.FunctionConfig) *EventsListener {
1829
return &EventsListener{
19-
adapter: adapter,
20-
EventsAPI: eventsAPI,
30+
adapter: adapter,
31+
EventsAPI: eventsAPI,
32+
logCollector: logger,
2133
}
2234

2335
}
@@ -35,3 +47,60 @@ func (ev *EventsListener) SendFault(data interop.FaultData) error {
3547

3648
return ev.adapter.SendStatus(server.Error, payload)
3749
}
50+
51+
func (ev *EventsListener) SendInitStart(_ interop.InitStartData) error {
52+
ev.initStart = time.Now() // need to use this event to retrieve when the INIT start happens
53+
return nil
54+
}
55+
56+
func (ev *EventsListener) SendInitRuntimeDone(done interop.InitRuntimeDoneData) error {
57+
ev.initEnd = time.Now()
58+
initDurationMs := float64(ev.initEnd.Sub(ev.initStart)) / float64(time.Millisecond) // convert µs -> ms
59+
switch done.Status {
60+
case telemetry.RuntimeDoneSuccess:
61+
return ev.adapter.SendStatus(server.Ready, []byte{})
62+
case telemetry.RuntimeDoneError:
63+
if done.ErrorType != nil {
64+
if done.Phase != telemetry.InitInsideInitPhase {
65+
log.WithField("ErrorType", *done.ErrorType).Debug("Failed to initialise runtime during INIT phase. This will be retried during the next INVOKE.")
66+
}
67+
_, _ = fmt.Fprintf(ev.logCollector, "INIT_REPORT Init Duration: %.2f ms Phase: init Status: %s", initDurationMs, done.Status)
68+
// return ev.adapter.SendStatus(server.Error, []byte(*done.ErrorType))
69+
}
70+
}
71+
return nil
72+
}
73+
74+
func (ev *EventsListener) SendInvokeStart(invoke interop.InvokeStartData) error {
75+
_, _ = fmt.Fprintf(ev.logCollector, "START RequestId: %s Version: %s\n", invoke.RequestID, invoke.Version)
76+
return nil
77+
}
78+
79+
func (ev *EventsListener) SendInvokeRuntimeDone(invoke interop.InvokeRuntimeDoneData) error {
80+
initDurationMs := float64(ev.initEnd.Sub(ev.initStart)) / float64(time.Millisecond) // convert µs -> ms
81+
memUsed := invoke.Metrics.ProducedBytes / (1024 * 1024) // bytes -> mb
82+
83+
report := fmt.Sprintf("REPORT RequestId: %s\t"+
84+
"Duration: %.2f ms\t"+
85+
"Billed Duration: %.0f ms\t"+
86+
"Memory Size: %s MB\t"+
87+
"Max Memory Used: %d MB\t"+
88+
"Init Duration: %.2f ms",
89+
invoke.RequestID, invoke.Metrics.DurationMs, ev.function.FunctionMemorySizeMb, memUsed, initDurationMs)
90+
91+
_, _ = fmt.Fprintln(ev.logCollector, report)
92+
93+
// we dont send the logs from here since we may want to delay sending logs before we know for certain that
94+
// the /response has been sent
95+
96+
return nil
97+
}
98+
99+
func (ev *EventsListener) Reset() {
100+
ev.initStart = time.Time{}
101+
ev.initEnd = time.Time{}
102+
}
103+
104+
// func (ev *EventsListener) SendInitReport(data interop.InitReportData) error {
105+
106+
// }

internal/logging/collector.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,18 @@ type LogCollector struct {
1414
RuntimeLogs []string
1515
}
1616

17-
func (lc *LogCollector) Write(p []byte) (n int, err error) {
18-
lc.Put(string(p))
19-
return len(p), nil
20-
}
21-
2217
func NewLogCollector() *LogCollector {
2318
return &LogCollector{
2419
RuntimeLogs: []string{},
2520
mutex: &sync.Mutex{},
2621
}
2722
}
23+
24+
func (lc *LogCollector) Write(p []byte) (n int, err error) {
25+
lc.Put(string(p))
26+
return len(p), nil
27+
}
28+
2829
func (lc *LogCollector) Put(line string) {
2930
lc.mutex.Lock()
3031
defer lc.mutex.Unlock()

internal/server/interop.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,13 @@ func (c *CustomInteropServer) Invoke(responseWriter http.ResponseWriter, invoke
7777
defer close(releaseRespChan)
7878

7979
go func() {
80-
reserveResp, err := c.Server.Reserve(invoke.ID, invoke.TraceID, invoke.LambdaSegmentID)
80+
_, err := c.Server.Reserve(invoke.ID, invoke.TraceID, invoke.LambdaSegmentID)
8181
if err != nil && !errors.Is(err, rapidcore.ErrAlreadyReserved) {
8282
releaseRespChan <- err
8383
return
8484
}
8585

86-
invoke.DeadlineNs = fmt.Sprintf("%d", metering.Monotime()+reserveResp.Token.FunctionTimeout.Nanoseconds())
86+
invoke.DeadlineNs = fmt.Sprintf("%d", metering.Monotime()+c.Server.GetInvokeTimeout().Nanoseconds())
8787
go func() {
8888
isDirect := directinvoke.MaxDirectResponseSize > interop.MaxPayloadSize
8989
if err := c.Server.FastInvoke(responseWriter, invoke, isDirect); err != nil {

internal/server/reporter.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,19 @@ func (r *InvokeReport) Print(w io.Writer) error {
4242

4343
return nil
4444
}
45+
46+
type InitReport struct {
47+
InvokeId string
48+
DurationMs float64
49+
Status string
50+
}
51+
52+
func (r *InitReport) Print(w io.Writer) error {
53+
_, err := fmt.Fprintf(w,
54+
"INIT_REPORT Init Duration: %.2f ms Phase: init Status: %s",
55+
r.DurationMs,
56+
r.Status,
57+
)
58+
59+
return err
60+
}

internal/server/service.go

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"context"
55
"encoding/base64"
66
"encoding/json"
7-
"fmt"
8-
"math"
97
"strconv"
108
"strings"
119
"time"
@@ -14,8 +12,6 @@ import (
1412
"github.com/localstack/lambda-runtime-init/internal/logging"
1513
log "github.com/sirupsen/logrus"
1614
"go.amzn.com/lambda/interop"
17-
"go.amzn.com/lambda/metering"
18-
"go.amzn.com/lambda/rapidcore"
1915
"go.amzn.com/lambda/rapidcore/env"
2016
)
2117

@@ -62,12 +58,13 @@ func NewLocalStackService(
6258
}
6359

6460
func (ls *LocalStackService) Initialize(bs interop.Bootstrap) error {
65-
timeout, err := time.ParseDuration(ls.function.FunctionTimeoutSec + "s")
61+
initTimeout := time.Second * 10
62+
invokeTimeout, err := time.ParseDuration(ls.function.FunctionTimeoutSec + "s")
6663
if err != nil {
6764
log.WithError(err).
6865
WithField("AWS_LAMBDA_FUNCTION_TIMEOUT", ls.function.FunctionTimeoutSec).
6966
Warnf("Failed to set function timeout from environment. Defaulting to 30s.")
70-
timeout = time.Second * 30
67+
invokeTimeout = time.Second * 30
7168
}
7269

7370
memorySize, err := strconv.ParseUint(ls.function.FunctionMemorySizeMb, 10, 64)
@@ -81,8 +78,8 @@ func (ls *LocalStackService) Initialize(bs interop.Bootstrap) error {
8178
initRequest := &interop.Init{
8279
AccountID: ls.aws.Credentials.AccountID,
8380
Handler: ls.function.FunctionHandler,
84-
InvokeTimeoutMs: timeout.Milliseconds(),
85-
InitTimeoutMs: timeout.Milliseconds(),
81+
InvokeTimeoutMs: invokeTimeout.Milliseconds(),
82+
InitTimeoutMs: initTimeout.Milliseconds(),
8683
InstanceMaxMemory: memorySize,
8784
// TODO: LocalStack does not correctly set this to the LS container's <IP>:<PORT>
8885
XRayDaemonAddress: ls.xrayEndpoint,
@@ -101,10 +98,15 @@ func (ls *LocalStackService) Initialize(bs interop.Bootstrap) error {
10198
Bootstrap: bs,
10299
}
103100

104-
initStart := metering.Monotime()
105-
err = ls.sandbox.Init(initRequest, timeout.Milliseconds())
106-
ls.initDuration = float64(metering.Monotime()-initStart) / float64(time.Millisecond)
107-
return err
101+
// initStart := metering.Monotime()
102+
// err = ls.sandbox.Init(initRequest, initRequest.InvokeTimeoutMs)
103+
// ls.initDuration = float64(metering.Monotime()-initStart) / float64(time.Millisecond)
104+
105+
// if err != nil {
106+
// _, _ = fmt.Fprintf(ls.logCollector, "INIT_REPORT Init Duration: %.2f ms Phase: init Status: %s", ls.initDuration)
107+
// }
108+
109+
return ls.sandbox.Init(initRequest, initRequest.InvokeTimeoutMs)
108110
}
109111

110112
func (ls *LocalStackService) SendStatus(status LocalStackStatus, payload []byte) error {
@@ -130,7 +132,7 @@ func (ls *LocalStackService) SendResponse(invokeId string, payload []byte) error
130132
func (ls *LocalStackService) InvokeForward(invoke InvokeRequest) ([]byte, error) {
131133
proxyResponseWriter := NewResponseWriterProxy()
132134

133-
_, _ = fmt.Fprintf(ls.logCollector, "START RequestId: %s Version: %s\n", invoke.InvokeId, ls.function.FunctionVersion)
135+
// _, _ = fmt.Fprintf(ls.logCollector, "START RequestId: %s Version: %s\n", invoke.InvokeId, ls.function.FunctionVersion)
134136

135137
clientContext, err := base64.StdEncoding.DecodeString(invoke.ClientContext)
136138
if err != nil {
@@ -152,28 +154,28 @@ func (ls *LocalStackService) InvokeForward(invoke InvokeRequest) ([]byte, error)
152154
log.WithError(invokeErr).Error("Failed invocation.")
153155
}
154156

155-
invokeStartTime := invokePayload.InvokeReceivedTime
156-
invokeEndTime := metering.Monotime()
157+
// invokeStartTime := invokePayload.InvokeReceivedTime
158+
// invokeEndTime := metering.Monotime()
157159

158-
durationMs := float64(invokeEndTime-invokeStartTime) / float64(time.Millisecond)
160+
// durationMs := float64(invokeEndTime-invokeStartTime) / float64(time.Millisecond)
159161

160-
report := InvokeReport{
161-
InvokeId: invoke.InvokeId,
162-
DurationMs: durationMs,
163-
BilledDurationMs: math.Ceil(durationMs),
164-
MemorySizeMB: ls.function.FunctionMemorySizeMb,
165-
MaxMemoryUsedMB: ls.function.FunctionMemorySizeMb,
166-
InitDurationMs: ls.initDuration,
167-
}
162+
// report := InvokeReport{
163+
// InvokeId: invoke.InvokeId,
164+
// DurationMs: durationMs,
165+
// BilledDurationMs: math.Ceil(durationMs),
166+
// MemorySizeMB: ls.function.FunctionMemorySizeMb,
167+
// MaxMemoryUsedMB: ls.function.FunctionMemorySizeMb,
168+
// InitDurationMs: ls.initDuration,
169+
// }
168170

169-
switch invokeErr {
170-
case rapidcore.ErrInvokeTimeout:
171-
report.Status = "timeout"
172-
}
171+
// switch invokeErr {
172+
// case rapidcore.ErrInvokeTimeout:
173+
// report.Status = "timeout"
174+
// }
173175

174-
if err := report.Print(ls.logCollector); err != nil {
175-
log.WithError(err).Error("Failed to write END report.")
176-
}
176+
// if err := report.Print(ls.logCollector); err != nil {
177+
// log.WithError(err).Error("Failed to write END report.")
178+
// }
177179

178180
return proxyResponseWriter.Body(), invokeErr
179181
}

0 commit comments

Comments
 (0)