Skip to content

Commit 81c8844

Browse files
committed
Small refactor
1 parent 527faa0 commit 81c8844

File tree

13 files changed

+236
-201
lines changed

13 files changed

+236
-201
lines changed

cmd/localstack/main.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@ import (
1414
"sync"
1515

1616
"github.com/aws/aws-sdk-go-v2/config"
17+
"github.com/localstack/lambda-runtime-init/internal/aws/lambda"
1718
"github.com/localstack/lambda-runtime-init/internal/aws/xray"
1819
"github.com/localstack/lambda-runtime-init/internal/bootstrap"
1920
"github.com/localstack/lambda-runtime-init/internal/events"
2021
"github.com/localstack/lambda-runtime-init/internal/hotreloading"
22+
"github.com/localstack/lambda-runtime-init/internal/localstack"
2123
"github.com/localstack/lambda-runtime-init/internal/logging"
2224
"github.com/localstack/lambda-runtime-init/internal/server"
2325

@@ -31,8 +33,8 @@ import (
3133
supv "go.amzn.com/lambda/supervisor"
3234
)
3335

34-
func InitLsOpts() *server.LsOpts {
35-
return &server.LsOpts{
36+
func InitLsOpts() *localstack.Config {
37+
return &localstack.Config{
3638
// required
3739
RuntimeEndpoint: utils.MustGetEnv("LOCALSTACK_RUNTIME_ENDPOINT"),
3840
RuntimeId: utils.MustGetEnv("LOCALSTACK_RUNTIME_ID"),
@@ -55,8 +57,8 @@ func InitLsOpts() *server.LsOpts {
5557
}
5658
}
5759

58-
func InitFunctionConfig() server.FunctionConfig {
59-
return server.FunctionConfig{
60+
func InitFunctionConfig() lambda.FunctionConfig {
61+
return lambda.FunctionConfig{
6062
FunctionName: utils.GetEnvWithDefault("AWS_LAMBDA_FUNCTION_NAME", "test_function"),
6163
FunctionVersion: utils.GetEnvWithDefault("AWS_LAMBDA_FUNCTION_VERSION", "$LATEST"),
6264
FunctionTimeoutSec: utils.GetEnvWithDefault("AWS_LAMBDA_FUNCTION_TIMEOUT", "30"),
@@ -189,14 +191,14 @@ func main() {
189191

190192
// Custom Interop Server
191193
defaultServer := rapidcore.NewServer()
192-
lsAdapter := server.NewLocalStackAdapter(lsOpts.RuntimeEndpoint, lsOpts.RuntimeId)
193-
interopServer := server.NewInteropServer(defaultServer, lsAdapter)
194+
lsClient := localstack.NewLocalStackClient(lsOpts.RuntimeEndpoint, lsOpts.RuntimeId)
195+
interopServer := server.NewInteropServer(defaultServer, lsClient)
194196

195197
// Services required for Sandbox environment
196198
logCollector := logging.NewLogCollector()
197199
localStackLogsEgressApi := logging.NewLocalStackLogsEgressAPI(logCollector)
198200
tracer := tracing.NewLocalStackTracer()
199-
eventsListener := events.NewEventsListener(lsAdapter)
201+
eventsListener := events.NewLocalStackEventsAPI(lsClient)
200202

201203
defaultSupv := supv.NewLocalSupervisor()
202204
wrappedSupv := supervisor.NewLocalStackSupervisor(ctx, defaultSupv, eventsListener, interopServer.InternalState)
@@ -246,7 +248,7 @@ func main() {
246248
interopServer.SetSandboxContext(sandboxContext)
247249
interopServer.SetInternalStateGetter(internalStateFn)
248250

249-
localStackService := server.NewLocalStackService(interopServer, logCollector, lsAdapter, xrayConfig.Endpoint, lsOpts, functionConf, awsEnvConf)
251+
localStackService := server.NewLocalStackService(interopServer, logCollector, lsClient, xrayConfig.Endpoint, lsOpts, functionConf, awsEnvConf)
250252

251253
// start runtime init. It is important to start `InitHandler` synchronously because we need to ensure the
252254
// notification channels and status fields are properly initialized before `AwaitInitialized`
@@ -285,7 +287,7 @@ func main() {
285287
// return
286288
} else {
287289
log.Debugln("Completed initialization of runtime init. Sending status ready to LocalStack.")
288-
if err := localStackService.SendStatus(server.Ready, []byte{}); err != nil {
290+
if err := localStackService.SendStatus(localstack.Ready, []byte{}); err != nil {
289291
log.Fatalln("Failed to send status ready to LocalStack " + err.Error() + ". Exiting.")
290292
}
291293
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ require (
3333
github.com/google/uuid v1.6.0 // indirect
3434
github.com/jmespath/go-jmespath v0.4.0 // indirect
3535
golang.org/x/net v0.38.0 // indirect
36+
golang.org/x/sync v0.15.0 // indirect
3637
golang.org/x/text v0.23.0 // indirect
3738
gopkg.in/yaml.v2 v2.2.8 // indirect
3839
)

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
7878
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
7979
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
8080
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
81+
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
82+
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
8183
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
8284
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
8385
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

internal/aws/lambda/config.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package lambda
2+
3+
type FunctionConfig struct {
4+
FunctionName string // AWS_LAMBDA_FUNCTION_NAME
5+
FunctionMemorySizeMb string // AWS_LAMBDA_FUNCTION_MEMORY_SIZE
6+
FunctionVersion string // AWS_LAMBDA_FUNCTION_VERSION
7+
FunctionTimeoutSec string // AWS_LAMBDA_FUNCTION_TIMEOUT
8+
InitializationType string // AWS_LAMBDA_INITIALIZATION_TYPE
9+
LogGroupName string // AWS_LAMBDA_LOG_GROUP_NAME
10+
LogStreamName string // AWS_LAMBDA_LOG_STREAM_NAME
11+
FunctionHandler string // AWS_LAMBDA_FUNCTION_HANDLER || _HANDLER
12+
}

internal/events/events.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,31 @@ import (
44
"encoding/json"
55
"fmt"
66

7-
"github.com/localstack/lambda-runtime-init/internal/server"
7+
"github.com/localstack/lambda-runtime-init/internal/localstack"
88

99
"go.amzn.com/lambda/interop"
10-
"go.amzn.com/lambda/telemetry"
10+
"go.amzn.com/lambda/rapidcore/standalone/telemetry"
1111
)
1212

13-
type EventsListener struct {
13+
// LocalStackEventsAPI handles internally emitted rapid events.
14+
// TODO: Logs should all be collected here
15+
type LocalStackEventsAPI struct {
1416
interop.EventsAPI
15-
adapter *server.LocalStackAdapter
17+
adapter *localstack.LocalStackClient
1618
}
1719

18-
func NewEventsListener(adapter *server.LocalStackAdapter) *EventsListener {
19-
return &EventsListener{
20-
adapter: adapter,
21-
// For now just use the no-ops API to satisfy the EventsAPI interface
22-
EventsAPI: &telemetry.NoOpEventsAPI{},
20+
func NewLocalStackEventsAPI(adapter *localstack.LocalStackClient) *LocalStackEventsAPI {
21+
return &LocalStackEventsAPI{
22+
adapter: adapter,
23+
EventsAPI: new(telemetry.StandaloneEventsAPI),
2324
}
24-
2525
}
2626

27-
func (ev *EventsListener) SendFault(data interop.FaultData) error {
28-
resp := server.ErrorResponse{
27+
func (ev *LocalStackEventsAPI) SendFault(data interop.FaultData) error {
28+
// We can ignore whatever errors are returned here
29+
_ = ev.EventsAPI.SendFault(data)
30+
31+
resp := localstack.ErrorResponse{
2932
ErrorMessage: fmt.Sprintf("RequestId: %s Error: %s", data.RequestID, data.ErrorMessage),
3033
ErrorType: string(data.ErrorType),
3134
}
@@ -35,5 +38,5 @@ func (ev *EventsListener) SendFault(data interop.FaultData) error {
3538
return err
3639
}
3740

38-
return ev.adapter.SendStatus(server.Error, payload)
41+
return ev.adapter.SendStatus(localstack.Error, payload)
3942
}

internal/localstack/client.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package localstack
2+
3+
import (
4+
"bytes"
5+
"net/http"
6+
"net/url"
7+
8+
"github.com/sirupsen/logrus"
9+
)
10+
11+
type LocalStackClient struct {
12+
UpstreamEndpoint string
13+
RuntimeId string
14+
}
15+
16+
func NewLocalStackClient(endpoint, runtimeId string) *LocalStackClient {
17+
return &LocalStackClient{
18+
UpstreamEndpoint: endpoint,
19+
RuntimeId: runtimeId,
20+
}
21+
}
22+
23+
func (ls *LocalStackClient) sendCallback(namespace, invokeId, dest string, payload []byte) error {
24+
endpoint, err := url.JoinPath(ls.UpstreamEndpoint, namespace, invokeId, dest)
25+
if err != nil {
26+
return err
27+
}
28+
logrus.WithField("url", endpoint).WithField("invoke-id", invokeId).Debugf("Sending callback.")
29+
30+
if _, err := http.Post(endpoint, "application/json", bytes.NewReader(payload)); err != nil {
31+
return err
32+
}
33+
34+
return nil
35+
}
36+
37+
func (ls *LocalStackClient) SendStatus(status LocalStackStatus, payload []byte) error {
38+
return ls.sendCallback("/status", ls.RuntimeId, string(status), payload)
39+
}
40+
41+
func (ls *LocalStackClient) SendInvocation(invokeId, dest string, payload []byte) error {
42+
return ls.sendCallback("invocations", invokeId, dest, payload)
43+
}

internal/localstack/config.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package localstack
2+
3+
// Original implementation: lambda/rapidcore/server.go includes Server struct with state
4+
// Server interface between Runtime API and this init: lambda/interop/model.go:Server
5+
6+
type Config struct {
7+
InteropPort string
8+
RuntimeEndpoint string
9+
RuntimeId string
10+
AccountId string
11+
InitTracingPort string
12+
User string
13+
CodeArchives string
14+
HotReloadingPaths []string
15+
FileWatcherStrategy string
16+
ChmodPaths string
17+
LocalstackIP string
18+
InitLogLevel string
19+
EdgePort string
20+
EnableXRayTelemetry string
21+
PostInvokeWaitMS string
22+
MaxPayloadSize string
23+
}
24+
25+
// The InvokeRequest is sent by LocalStack to trigger an invocation
26+
type InvokeRequest struct {
27+
InvokeId string `json:"request-id"`
28+
InvokedFunctionArn string `json:"invoked-function-arn"`
29+
Payload string `json:"payload"`
30+
TraceId string `json:"trace-id"`
31+
ClientContext string `json:"client-context"`
32+
}
33+
34+
// The ErrorResponse is sent TO LocalStack when encountering an error
35+
type ErrorResponse struct {
36+
ErrorMessage string `json:"errorMessage"`
37+
ErrorType string `json:"errorType,omitempty"`
38+
RequestId *string `json:"requestId,omitempty"`
39+
StackTrace []string `json:"stackTrace,omitempty"`
40+
Trace []string `json:"trace,omitempty"`
41+
}

internal/localstack/model.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package localstack
2+
3+
type LocalStackStatus string
4+
5+
const (
6+
Ready LocalStackStatus = "ready"
7+
Error LocalStackStatus = "error"
8+
)

internal/server/handler.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"reflect"
1010

1111
"github.com/go-chi/chi"
12+
"github.com/localstack/lambda-runtime-init/internal/localstack"
1213
log "github.com/sirupsen/logrus"
1314
"go.amzn.com/lambda/fatalerror"
1415
"go.amzn.com/lambda/rapidcore"
@@ -35,7 +36,7 @@ func InvokeHandler(api *LocalStackService) http.HandlerFunc {
3536

3637
// TODO: We shouldn't be using a custom request body here,
3738
// instead we should be forwarding the entire boto3/AWS request
38-
var req InvokeRequest
39+
var req localstack.InvokeRequest
3940
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
4041
log.WithError(err).Error("Failed to decode invoke request")
4142
}
@@ -46,7 +47,7 @@ func InvokeHandler(api *LocalStackService) http.HandlerFunc {
4647
// we can actually just continue here, error message is sent below
4748
case errors.Is(err, rapidcore.ErrInvokeTimeout):
4849
log.Debugf("Got invoke timeout")
49-
errorResponse := ErrorResponse{
50+
errorResponse := localstack.ErrorResponse{
5051
ErrorMessage: fmt.Sprintf(
5152
"RequestId: %s Error: Task timed out after %s.00 seconds",
5253
req.InvokeId,
@@ -70,7 +71,7 @@ func InvokeHandler(api *LocalStackService) http.HandlerFunc {
7071

7172
// If the response cannot be decoded into an ErrorResponse type,
7273
// we assume no error is present.
73-
var errorResponse ErrorResponse
74+
var errorResponse localstack.ErrorResponse
7475
if err := invokeRespDecoder.Decode(&errorResponse); err != nil || reflect.ValueOf(errorResponse).IsZero() {
7576
api.SendResponse(req.InvokeId, response)
7677
return

0 commit comments

Comments
 (0)