44package handler
55
66import (
7+ "bufio"
8+ "bytes"
9+ "encoding/json"
10+ "fmt"
11+ "io/ioutil"
712 "net/http"
13+ "net/url"
14+ "os"
15+ "strings"
16+ "sync"
17+ "time"
818
919 log "github.com/sirupsen/logrus"
1020 "go.amzn.com/lambda/rapi/model"
@@ -16,6 +26,24 @@ const (
1626 telemetryAPIDisabledErrorType = "Telemetry.NotSupported"
1727)
1828
29+ type runtimeTelemetryBuffering struct {
30+ MaxBytes int64 `json:"maxBytes"`
31+ MaxItems int `json:"maxItems"`
32+ TimeoutMs int64 `json:"timeoutMs"`
33+ }
34+
35+ type runtimeTelemetryDestination struct {
36+ URI string `json:"URI"`
37+ Protocol string `json:"protocol"`
38+ }
39+
40+ type runtimeTelemetryRequest struct {
41+ Buffering runtimeTelemetryBuffering `json:"buffering"`
42+ Destination runtimeTelemetryDestination `json:"destination"`
43+ Types []string `json:"types"`
44+ SchemaVersion string `json:"schemaVersion"`
45+ }
46+
1947type runtimeLogsStubAPIHandler struct {}
2048
2149func (h * runtimeLogsStubAPIHandler ) ServeHTTP (writer http.ResponseWriter , request * http.Request ) {
@@ -34,9 +62,36 @@ func NewRuntimeLogsAPIStubHandler() http.Handler {
3462 return & runtimeLogsStubAPIHandler {}
3563}
3664
37- type runtimeTelemetryAPIStubHandler struct {}
65+ type runtimeTelemetryAPIStubHandler struct {
66+ destinations []string
67+ mu sync.Mutex
68+ }
3869
3970func (h * runtimeTelemetryAPIStubHandler ) ServeHTTP (writer http.ResponseWriter , request * http.Request ) {
71+ var runtimeReq runtimeTelemetryRequest
72+ body , err := ioutil .ReadAll (request .Body )
73+ if err != nil {
74+ log .WithError (err ).Warn ("Error while reading request body" )
75+ http .Error (writer , err .Error (), http .StatusInternalServerError )
76+ }
77+ err = json .Unmarshal (body , & runtimeReq )
78+ if err != nil {
79+ log .WithError (err ).Warn ("Error while unmarshaling request" )
80+ http .Error (writer , err .Error (), http .StatusInternalServerError )
81+ }
82+ if len (runtimeReq .Destination .URI ) > 0 && runtimeReq .Destination .Protocol == "HTTP" {
83+ u , err := url .Parse (runtimeReq .Destination .URI )
84+ if err != nil {
85+ log .WithError (err ).Warn ("Error while parsing destination URL" )
86+ http .Error (writer , err .Error (), http .StatusInternalServerError )
87+ }
88+ if sep := strings .IndexRune (u .Host , ':' ); sep != - 1 && u .Host [:sep ] == "sandbox" {
89+ u .Host = "localhost" + u .Host [sep :]
90+ }
91+ h .mu .Lock ()
92+ h .destinations = append (h .destinations , u .String ())
93+ h .mu .Unlock ()
94+ }
4095 if err := rendering .RenderJSON (http .StatusAccepted , writer , request , & model.ErrorResponse {
4196 ErrorType : telemetryAPIDisabledErrorType ,
4297 ErrorMessage : "Telemetry API is not supported" ,
@@ -46,8 +101,55 @@ func (h *runtimeTelemetryAPIStubHandler) ServeHTTP(writer http.ResponseWriter, r
46101 }
47102}
48103
104+ type logMessage struct {
105+ Time string `json:"time"`
106+ Type string `json:"type"`
107+ Record string `json:"record"`
108+ }
109+
49110// NewRuntimeTelemetryAPIStubHandler returns a new instance of http handler
50111// for serving /runtime/logs when a telemetry service implementation is absent
51112func NewRuntimeTelemetryAPIStubHandler () http.Handler {
52- return & runtimeTelemetryAPIStubHandler {}
113+ handler := runtimeTelemetryAPIStubHandler {}
114+ originalStdout := os .Stdout
115+ r , w , _ := os .Pipe ()
116+ os .Stdout = w
117+ os .Stderr = w
118+ scanner := bufio .NewScanner (r )
119+ scanner .Split (bufio .ScanLines )
120+ go func () {
121+ for {
122+ if len (handler .destinations ) > 0 {
123+ var msgs []logMessage
124+ for scanner .Scan () && len (msgs ) < 10 {
125+ line := scanner .Text ()
126+ originalStdout .WriteString (fmt .Sprintf ("%s\n " , line ))
127+ msgs = append (msgs , logMessage {
128+ Time : time .Now ().Format ("2006-01-02T15:04:05.999Z" ),
129+ Type : "function" ,
130+ Record : line ,
131+ })
132+ }
133+ data , err := json .Marshal (msgs )
134+ if err != nil {
135+ originalStdout .WriteString (fmt .Sprintf ("%s\n " , err ))
136+ }
137+ bodyReader := bytes .NewReader (data )
138+ handler .mu .Lock ()
139+ destinations := handler .destinations
140+ handler .mu .Unlock ()
141+ for _ , dest := range destinations {
142+ resp , err := http .Post (dest , "application/json" , bodyReader )
143+ if err != nil {
144+ originalStdout .WriteString (fmt .Sprintf ("%s\n " , err ))
145+ }
146+ if resp .StatusCode > 300 {
147+ originalStdout .WriteString (fmt .Sprintf ("failed to send logs to destination %q: status %d" , dest , resp .StatusCode ))
148+ }
149+ }
150+ }
151+ time .Sleep (5 * time .Second )
152+ }
153+ }()
154+ return & handler
53155}
0 commit comments