Skip to content

Commit f0fd184

Browse files
committed
WIP: refactor
1 parent a6f393e commit f0fd184

File tree

11 files changed

+758
-339
lines changed

11 files changed

+758
-339
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ tags
77
.idea
88
.DS_Store
99
.venv
10+
.vscode

cmd/localstack/main.go

Lines changed: 76 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,32 @@ package main
44

55
import (
66
"context"
7+
"fmt"
8+
"net"
79
"os"
10+
"os/signal"
811
"runtime/debug"
912
"strconv"
1013
"strings"
14+
"sync"
1115

1216
"github.com/aws/aws-sdk-go-v2/config"
1317
"github.com/localstack/lambda-runtime-init/internal/aws/xray"
1418
"github.com/localstack/lambda-runtime-init/internal/bootstrap"
19+
"github.com/localstack/lambda-runtime-init/internal/events"
1520
"github.com/localstack/lambda-runtime-init/internal/hotreloading"
1621
"github.com/localstack/lambda-runtime-init/internal/logging"
1722
"github.com/localstack/lambda-runtime-init/internal/server"
23+
24+
"github.com/localstack/lambda-runtime-init/internal/supervisor"
1825
"github.com/localstack/lambda-runtime-init/internal/tracing"
1926
"github.com/localstack/lambda-runtime-init/internal/utils"
2027
log "github.com/sirupsen/logrus"
2128
"go.amzn.com/lambda/core/directinvoke"
29+
"go.amzn.com/lambda/interop"
2230
"go.amzn.com/lambda/rapidcore"
31+
supv "go.amzn.com/lambda/supervisor"
32+
"go.amzn.com/lambda/telemetry"
2333
)
2434

2535
func InitLsOpts() *server.LsOpts {
@@ -97,6 +107,7 @@ func main() {
97107
lsOpts := InitLsOpts()
98108
functionConf := InitFunctionConfig()
99109
awsEnvConf, _ := config.NewEnvConfig()
110+
awsEnvConf.Credentials.AccountID = lsOpts.AccountId
100111

101112
UnsetLsEnvs()
102113

@@ -136,6 +147,10 @@ func main() {
136147
log.Panicln("Please specify a number for LOCALSTACK_MAX_PAYLOAD_SIZE")
137148
}
138149
directinvoke.MaxDirectResponseSize = int64(payloadSize)
150+
if directinvoke.MaxDirectResponseSize > interop.MaxPayloadSize {
151+
log.Infof("Large response size detected (%d bytes), forcing streaming mode", directinvoke.MaxDirectResponseSize)
152+
directinvoke.InvokeResponseMode = interop.InvokeResponseModeStreaming
153+
}
139154

140155
// download code archive if env variable is set
141156
if err := utils.DownloadCodeArchives(lsOpts.CodeArchives); err != nil {
@@ -166,13 +181,26 @@ func main() {
166181
}
167182
}
168183

184+
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
185+
defer stop()
186+
169187
// file watcher for hot-reloading
170-
fileWatcherContext, cancelFileWatcher := context.WithCancel(context.Background())
188+
fileWatcherContext, cancelFileWatcher := context.WithCancel(ctx)
189+
defer cancelFileWatcher()
171190

191+
// Custom Interop Server
192+
defaultServer := rapidcore.NewServer()
193+
lsAdapter := server.NewLocalStackAdapter(lsOpts.RuntimeEndpoint, lsOpts.RuntimeId)
194+
interopServer := server.NewInteropServer(defaultServer, lsAdapter)
195+
196+
// Services required for Sandbox environment
172197
logCollector := logging.NewLogCollector()
173198
localStackLogsEgressApi := logging.NewLocalStackLogsEgressAPI(logCollector)
174199
tracer := tracing.NewLocalStackTracer()
175-
// localSupervisor := supervisor.NewLocalSupervisor()
200+
eventsListener := events.NewEventsListener(lsAdapter, &telemetry.NoOpEventsAPI{})
201+
202+
defaultSupv := supv.NewLocalSupervisor()
203+
wrappedSupv := supervisor.NewLocalStackSupervisor(ctx, defaultSupv, eventsListener, interopServer.InternalState)
176204

177205
// build sandbox
178206
sandbox := rapidcore.
@@ -185,22 +213,26 @@ func main() {
185213
SetExtensionsFlag(true).
186214
SetInitCachingFlag(true).
187215
SetLogsEgressAPI(localStackLogsEgressApi).
188-
SetTracer(tracer)
216+
SetTracer(tracer).
217+
SetInteropServer(interopServer).
218+
SetSupervisor(wrappedSupv)
219+
220+
// SetEventsAPI(eventsListener)
189221

190222
// Externally set supervisor for metrics tracking
191223
// sandbox.SetSupervisor(localSupervisor)
192224
// sandbox.SetRuntimeFsRootPath(localSupervisor.RootPath)
193225

194226
// xray daemon
195-
endpoint := "http://" + lsOpts.LocalstackIP + ":" + lsOpts.EdgePort
227+
endpoint := "http://" + net.JoinHostPort(lsOpts.LocalstackIP, lsOpts.EdgePort)
196228
xrayConfig := xray.NewConfig(endpoint, xRayLogLevel)
197229
d := xray.NewDaemon(xrayConfig, lsOpts.EnableXRayTelemetry == "1")
198-
sandbox.AddShutdownFunc(func() {
230+
defer func() {
199231
log.Debugln("Shutting down xray daemon")
200232
d.Stop()
201233
log.Debugln("Flushing segments in xray daemon")
202234
d.Close()
203-
})
235+
}()
204236
d.Run() // async
205237

206238
if len(handler) > 0 {
@@ -214,37 +246,63 @@ func main() {
214246
// initialize all flows and start runtime API
215247
sandboxContext, internalStateFn := sandbox.Create()
216248
// Populate our interop server
217-
sandbox.DefaultInteropServer().SetSandboxContext(sandboxContext)
218-
sandbox.DefaultInteropServer().SetInternalStateGetter(internalStateFn)
249+
interopServer.SetSandboxContext(sandboxContext)
250+
interopServer.SetInternalStateGetter(internalStateFn)
219251

220-
localStackService := server.NewLocalStackAPI(sandbox.LambdaInvokeAPI(), bootstrap, logCollector, xrayConfig.Endpoint, lsOpts, functionConf, awsEnvConf)
252+
localStackService := server.NewLocalStackService(interopServer, logCollector, lsAdapter, xrayConfig.Endpoint, lsOpts, functionConf, awsEnvConf)
221253

222254
// start runtime init. It is important to start `InitHandler` synchronously because we need to ensure the
223255
// notification channels and status fields are properly initialized before `AwaitInitialized`
224256
log.Debugln("Starting runtime init.")
225-
localStackService.Initialize()
257+
if err := localStackService.Initialize(bootstrap); err != nil {
258+
log.Fatalf("Failed to initialize runtime: %s", err)
259+
return
260+
}
226261

227262
invokeServer := server.NewServer(lsOpts.InteropPort, localStackService)
228263
invokeServer.RegisterOnShutdown(localStackService.Close)
229264

230265
defer invokeServer.Shutdown(context.Background())
231266

232-
go invokeServer.ListenAndServe()
233-
go hotreloading.RunHotReloadingListener(sandbox.DefaultInteropServer(), lsOpts.HotReloadingPaths, fileWatcherContext, lsOpts.FileWatcherStrategy)
267+
var wg sync.WaitGroup
268+
269+
wg.Add(1)
270+
go func() {
271+
defer wg.Done()
272+
listener, err := net.Listen("tcp", fmt.Sprintf(":%s", lsOpts.InteropPort))
273+
274+
if err != nil {
275+
log.Fatalf("failed to start listener for custom interops server: %s", err)
276+
}
277+
go invokeServer.Serve(listener)
278+
log.Debugf("LocalStack API gateway listening on %s", listener.Addr().String())
279+
}()
280+
281+
wg.Add(1)
282+
go func() {
283+
defer wg.Done()
284+
hotreloading.RunHotReloadingListener(interopServer, lsOpts.HotReloadingPaths, fileWatcherContext, lsOpts.FileWatcherStrategy)
285+
}()
286+
287+
wg.Wait()
234288

235289
log.Debugln("Awaiting initialization of runtime init.")
236-
if err := sandbox.DefaultInteropServer().AwaitInitialized(); err != nil {
290+
if err := interopServer.AwaitInitialized(); err != nil {
237291
// Error cases: ErrInitDoneFailed or ErrInitResetReceived
238292
log.Errorln("Runtime init failed to initialize: " + err.Error() + ". Exiting.")
239293
// NOTE: Sending the error status to LocalStack is handled beforehand in the custom_interop.go through the
240294
// callback SendInitErrorResponse because it contains the correct error response payload.
241-
return
295+
// return
296+
} else {
297+
log.Debugln("Completed initialization of runtime init. Sending status ready to LocalStack.")
298+
if err := localStackService.SendStatus(server.Ready, []byte{}); err != nil {
299+
log.Fatalln("Failed to send status ready to LocalStack " + err.Error() + ". Exiting.")
300+
}
242301
}
243302

244-
log.Debugln("Completed initialization of runtime init. Sending status ready to LocalStack.")
245-
if err := localStackService.SendStatus(server.Ready, []byte{}); err != nil {
246-
log.Fatalln("Failed to send status ready to LocalStack " + err.Error() + ". Exiting.")
303+
select {
304+
case <-ctx.Done():
305+
case <-exitChan:
247306
}
248307

249-
<-exitChan
250308
}

cmd/ls-api/main.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ import (
55
"bytes"
66
"encoding/json"
77
"fmt"
8+
"io"
9+
"net/http"
10+
811
"github.com/go-chi/chi"
912
"github.com/go-chi/chi/middleware"
1013
log "github.com/sirupsen/logrus"
11-
"io"
12-
"net/http"
1314
)
1415

1516
const apiPort = 9563
@@ -33,7 +34,7 @@ func main() {
3334
invokeRequest, _ := json.Marshal(InvokeRequest{InvokeId: uid, Payload: "{\"counter\":0}"})
3435
_, err := http.Post(invokeUrl, "application/json", bytes.NewReader(invokeRequest))
3536
if err != nil {
36-
log.Fatal(err)
37+
log.Error(err)
3738
}
3839

3940
w.WriteHeader(200)
@@ -47,7 +48,7 @@ func main() {
4748
invokeRequest, _ := json.Marshal(InvokeRequest{InvokeId: uid, Payload: "{\"counter\":0, \"fail\": \"yes\"}"})
4849
_, err := http.Post(invokeUrl, "application/json", bytes.NewReader(invokeRequest))
4950
if err != nil {
50-
log.Fatal(err)
51+
log.Error(err)
5152
}
5253

5354
w.WriteHeader(200)
@@ -57,6 +58,7 @@ func main() {
5758
}
5859
})
5960

61+
log.Infof("Listening on port :%d", listenPort)
6062
err := http.ListenAndServe(fmt.Sprintf(":%d", listenPort), router)
6163
if err != nil {
6264
log.Fatal(err)
@@ -74,7 +76,7 @@ func invokeLogsHandler(w http.ResponseWriter, r *http.Request) {
7476
}
7577

7678
type InvokeRequest struct {
77-
InvokeId string `json:"invoke-id"`
79+
InvokeId string `json:"request-id"`
7880
Payload string `json:"payload"`
7981
}
8082

internal/events/events.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package events
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
7+
"github.com/localstack/lambda-runtime-init/internal/server"
8+
"go.amzn.com/lambda/interop"
9+
)
10+
11+
type EventsListener struct {
12+
interop.EventsAPI
13+
14+
adapter *server.LocalStackAdapter
15+
}
16+
17+
func NewEventsListener(adapter *server.LocalStackAdapter, eventsAPI interop.EventsAPI) *EventsListener {
18+
return &EventsListener{
19+
adapter: adapter,
20+
EventsAPI: eventsAPI,
21+
}
22+
23+
}
24+
25+
func (ev *EventsListener) SendFault(data interop.FaultData) error {
26+
resp := server.ErrorResponse{
27+
ErrorMessage: fmt.Sprintf("RequestId: %s Error: %s", data.RequestID, data.ErrorMessage),
28+
ErrorType: string(data.ErrorType),
29+
}
30+
31+
payload, err := json.Marshal(resp)
32+
if err != nil {
33+
return err
34+
}
35+
36+
return ev.adapter.SendStatus(server.Error, payload)
37+
}

0 commit comments

Comments
 (0)