diff --git a/docs/components/Loki.mdx b/docs/components/Loki.mdx new file mode 100644 index 0000000000..2a5f71c6b5 --- /dev/null +++ b/docs/components/Loki.mdx @@ -0,0 +1,101 @@ +--- +title: "Loki" +--- + +Push and query logs in Grafana Loki + +import { CardGrid, LinkCard } from "@astrojs/starlight/components"; + +## Actions + + + + + + +## Instructions + +To configure Loki to work with SuperPlane: + +1. **Loki URL**: Provide the base URL of your Loki instance (e.g. `https://loki.example.com`) +2. **Authentication**: Choose the authentication method: + - **None**: No authentication (e.g. local or internal Loki) + - **Basic**: Username and password (e.g. Grafana Cloud) + - **Bearer**: Bearer token (e.g. Loki behind an auth proxy) +3. **Tenant ID** (optional): If your Loki instance is multi-tenant, provide the tenant ID (sent as `X-Scope-OrgID` header) + + + +## Push Logs + +The Push Logs component sends log entries to a Loki instance via the push API (`POST /loki/api/v1/push`). + +### Configuration + +- **Labels**: Comma-separated key=value pairs used as Loki stream labels (e.g. `job=superplane,env=prod`) +- **Message**: Log message content to push + +### Output + +The component emits an event containing: +- `labels`: The labels attached to the log stream +- `message`: The log message that was pushed + +### Example Output + +```json +{ + "data": { + "labels": { + "job": "superplane", + "env": "prod" + }, + "message": "Deployment completed successfully" + }, + "timestamp": "2026-02-15T10:30:00Z", + "type": "loki.pushLogs" +} +``` + + + +## Query Logs + +The Query Logs component executes a LogQL query against Loki (`GET /loki/api/v1/query_range`). + +### Configuration + +- **Query**: LogQL expression to evaluate (e.g. `{job="superplane"}`) +- **Start** (optional): Start timestamp in RFC3339 or Unix nanosecond format +- **End** (optional): End timestamp in RFC3339 or Unix nanosecond format +- **Limit** (optional): Maximum number of entries to return (default: 100) + +### Output + +The component emits an event containing: +- `resultType`: The type of result (e.g. `streams`) +- `result`: Array of log stream results with their values + +### Example Output + +```json +{ + "data": { + "resultType": "streams", + "result": [ + { + "stream": { + "job": "superplane", + "env": "prod" + }, + "values": [ + ["1708000000000000000", "Deployment started"], + ["1708000015000000000", "Deployment completed successfully"] + ] + } + ] + }, + "timestamp": "2026-02-15T10:30:05Z", + "type": "loki.queryLogs" +} +``` diff --git a/pkg/integrations/loki/client.go b/pkg/integrations/loki/client.go new file mode 100644 index 0000000000..6d6b50f046 --- /dev/null +++ b/pkg/integrations/loki/client.go @@ -0,0 +1,239 @@ +package loki + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + + "github.com/superplanehq/superplane/pkg/core" +) + +const MaxResponseSize = 1 * 1024 * 1024 // 1MB + +type Client struct { + baseURL string + authType string + username string + password string + bearerToken string + tenantID string + http core.HTTPContext +} + +type PushRequest struct { + Streams []Stream `json:"streams"` +} + +type Stream struct { + Stream map[string]string `json:"stream"` + Values [][]string `json:"values"` +} + +type QueryResponse struct { + Status string `json:"status"` + Data QueryData `json:"data"` +} + +type QueryData struct { + ResultType string `json:"resultType"` + Result json.RawMessage `json:"result"` +} + +func NewClient(httpContext core.HTTPContext, integration core.IntegrationContext) (*Client, error) { + baseURL, err := requiredConfig(integration, "baseURL") + if err != nil { + return nil, err + } + + authType, err := requiredConfig(integration, "authType") + if err != nil { + return nil, err + } + + client := &Client{ + baseURL: normalizeBaseURL(baseURL), + authType: authType, + tenantID: optionalConfig(integration, "tenantID"), + http: httpContext, + } + + switch authType { + case AuthTypeNone: + return client, nil + case AuthTypeBasic: + username, err := requiredConfig(integration, "username") + if err != nil { + return nil, fmt.Errorf("username is required when authentication is basic") + } + password, err := requiredConfig(integration, "password") + if err != nil { + return nil, fmt.Errorf("password is required when authentication is basic") + } + + client.username = username + client.password = password + return client, nil + case AuthTypeBearer: + bearerToken, err := requiredConfig(integration, "bearerToken") + if err != nil { + return nil, fmt.Errorf("bearerToken is required when authentication is bearer") + } + + client.bearerToken = bearerToken + return client, nil + default: + return nil, fmt.Errorf("invalid authType %q", authType) + } +} + +func (c *Client) Ready() error { + _, err := c.execRequest(http.MethodGet, "/ready", nil) + return err +} + +func (c *Client) Push(streams []Stream) error { + payload := PushRequest{Streams: streams} + + body, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("failed to marshal push request: %w", err) + } + + _, err = c.execRequest(http.MethodPost, "/loki/api/v1/push", bytes.NewReader(body)) + return err +} + +func (c *Client) QueryRange(query, start, end, limit string) (*QueryData, error) { + params := url.Values{} + params.Set("query", query) + + if start != "" { + params.Set("start", start) + } + if end != "" { + params.Set("end", end) + } + if limit != "" { + params.Set("limit", limit) + } + + path := "/loki/api/v1/query_range?" + params.Encode() + + body, err := c.execRequest(http.MethodGet, path, nil) + if err != nil { + return nil, err + } + + var response QueryResponse + if err := json.Unmarshal(body, &response); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + if response.Status != "success" { + return nil, fmt.Errorf("Loki query returned status: %s", response.Status) + } + + return &response.Data, nil +} + +func (c *Client) execRequest(method, path string, body io.Reader) ([]byte, error) { + apiURL := c.baseURL + if strings.HasPrefix(path, "/") { + apiURL += path + } else { + apiURL += "/" + path + } + + req, err := http.NewRequest(method, apiURL, body) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Accept", "application/json") + if body != nil { + req.Header.Set("Content-Type", "application/json") + } + + if c.tenantID != "" { + req.Header.Set("X-Scope-OrgID", c.tenantID) + } + + if err := c.setAuth(req); err != nil { + return nil, err + } + + res, err := c.http.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to execute request: %w", err) + } + defer res.Body.Close() + + limitedReader := io.LimitReader(res.Body, MaxResponseSize+1) + responseBody, err := io.ReadAll(limitedReader) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + if len(responseBody) > MaxResponseSize { + return nil, fmt.Errorf("response too large: exceeds maximum size of %d bytes", MaxResponseSize) + } + + if res.StatusCode < 200 || res.StatusCode >= 300 { + return nil, fmt.Errorf("request failed with status %d: %s", res.StatusCode, string(responseBody)) + } + + return responseBody, nil +} + +func (c *Client) setAuth(req *http.Request) error { + switch c.authType { + case AuthTypeNone: + return nil + case AuthTypeBasic: + req.SetBasicAuth(c.username, c.password) + return nil + case AuthTypeBearer: + req.Header.Set("Authorization", "Bearer "+c.bearerToken) + return nil + default: + return fmt.Errorf("invalid authType %q", c.authType) + } +} + +func optionalConfig(ctx core.IntegrationContext, name string) string { + value, err := ctx.GetConfig(name) + if err != nil { + return "" + } + return string(value) +} + +func requiredConfig(ctx core.IntegrationContext, name string) (string, error) { + value, err := ctx.GetConfig(name) + if err != nil { + return "", fmt.Errorf("%s is required", name) + } + + s := string(value) + if s == "" { + return "", fmt.Errorf("%s is required", name) + } + + return s, nil +} + +func normalizeBaseURL(baseURL string) string { + if baseURL == "/" { + return baseURL + } + + for len(baseURL) > 0 && strings.HasSuffix(baseURL, "/") { + baseURL = baseURL[:len(baseURL)-1] + } + + return baseURL +} diff --git a/pkg/integrations/loki/client_test.go b/pkg/integrations/loki/client_test.go new file mode 100644 index 0000000000..b9883fea93 --- /dev/null +++ b/pkg/integrations/loki/client_test.go @@ -0,0 +1,283 @@ +package loki + +import ( + "io" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/superplanehq/superplane/test/support/contexts" +) + +func Test__Client__NewClient(t *testing.T) { + t.Run("no auth -> creates client", func(t *testing.T) { + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "https://loki.example.com", + "authType": AuthTypeNone, + }, + } + + client, err := NewClient(&contexts.HTTPContext{}, appCtx) + require.NoError(t, err) + assert.Equal(t, "https://loki.example.com", client.baseURL) + assert.Equal(t, AuthTypeNone, client.authType) + }) + + t.Run("basic auth without username -> error", func(t *testing.T) { + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "https://loki.example.com", + "authType": AuthTypeBasic, + }, + } + + _, err := NewClient(&contexts.HTTPContext{}, appCtx) + require.ErrorContains(t, err, "username is required") + }) + + t.Run("basic auth without password -> error", func(t *testing.T) { + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "https://loki.example.com", + "authType": AuthTypeBasic, + "username": "admin", + }, + } + + _, err := NewClient(&contexts.HTTPContext{}, appCtx) + require.ErrorContains(t, err, "password is required") + }) + + t.Run("bearer auth without token -> error", func(t *testing.T) { + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "https://loki.example.com", + "authType": AuthTypeBearer, + }, + } + + _, err := NewClient(&contexts.HTTPContext{}, appCtx) + require.ErrorContains(t, err, "bearerToken is required") + }) + + t.Run("with tenant ID -> sets tenant ID", func(t *testing.T) { + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "https://loki.example.com", + "authType": AuthTypeNone, + "tenantID": "my-org", + }, + } + + client, err := NewClient(&contexts.HTTPContext{}, appCtx) + require.NoError(t, err) + assert.Equal(t, "my-org", client.tenantID) + }) + + t.Run("normalizes trailing slashes in base URL", func(t *testing.T) { + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "https://loki.example.com///", + "authType": AuthTypeNone, + }, + } + + client, err := NewClient(&contexts.HTTPContext{}, appCtx) + require.NoError(t, err) + assert.Equal(t, "https://loki.example.com", client.baseURL) + }) +} + +func Test__Client__Ready(t *testing.T) { + t.Run("successful readiness check", func(t *testing.T) { + httpCtx := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("ready")), + }, + }, + } + + client := &Client{ + baseURL: "https://loki.example.com", + authType: AuthTypeNone, + http: httpCtx, + } + + err := client.Ready() + require.NoError(t, err) + require.Len(t, httpCtx.Requests, 1) + assert.Equal(t, "https://loki.example.com/ready", httpCtx.Requests[0].URL.String()) + }) + + t.Run("failed readiness check", func(t *testing.T) { + httpCtx := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusServiceUnavailable, + Body: io.NopCloser(strings.NewReader("not ready")), + }, + }, + } + + client := &Client{ + baseURL: "https://loki.example.com", + authType: AuthTypeNone, + http: httpCtx, + } + + err := client.Ready() + require.Error(t, err) + assert.Contains(t, err.Error(), "503") + }) +} + +func Test__Client__Push(t *testing.T) { + t.Run("successful push", func(t *testing.T) { + httpCtx := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusNoContent, + Body: io.NopCloser(strings.NewReader("")), + }, + }, + } + + client := &Client{ + baseURL: "https://loki.example.com", + authType: AuthTypeNone, + http: httpCtx, + } + + streams := []Stream{ + { + Stream: map[string]string{"job": "test"}, + Values: [][]string{{"1708000000000000000", "hello world"}}, + }, + } + + err := client.Push(streams) + require.NoError(t, err) + require.Len(t, httpCtx.Requests, 1) + assert.Equal(t, http.MethodPost, httpCtx.Requests[0].Method) + assert.Contains(t, httpCtx.Requests[0].URL.String(), "/loki/api/v1/push") + assert.Equal(t, "application/json", httpCtx.Requests[0].Header.Get("Content-Type")) + }) + + t.Run("push failure", func(t *testing.T) { + httpCtx := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusBadRequest, + Body: io.NopCloser(strings.NewReader(`{"message":"invalid stream"}`)), + }, + }, + } + + client := &Client{ + baseURL: "https://loki.example.com", + authType: AuthTypeNone, + http: httpCtx, + } + + streams := []Stream{ + { + Stream: map[string]string{"job": "test"}, + Values: [][]string{{"bad-ts", "hello"}}, + }, + } + + err := client.Push(streams) + require.Error(t, err) + assert.Contains(t, err.Error(), "400") + }) +} + +func Test__Client__QueryRange(t *testing.T) { + t.Run("successful query", func(t *testing.T) { + httpCtx := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(`{ + "status": "success", + "data": { + "resultType": "streams", + "result": [ + { + "stream": {"job": "superplane"}, + "values": [["1708000000000000000", "log line 1"]] + } + ] + } + }`)), + }, + }, + } + + client := &Client{ + baseURL: "https://loki.example.com", + authType: AuthTypeNone, + http: httpCtx, + } + + data, err := client.QueryRange(`{job="superplane"}`, "2026-01-01T00:00:00Z", "2026-01-02T00:00:00Z", "100") + require.NoError(t, err) + assert.Equal(t, "streams", data.ResultType) + assert.NotNil(t, data.Result) + + require.Len(t, httpCtx.Requests, 1) + assert.Equal(t, http.MethodGet, httpCtx.Requests[0].Method) + assert.Contains(t, httpCtx.Requests[0].URL.String(), "/loki/api/v1/query_range") + assert.Contains(t, httpCtx.Requests[0].URL.String(), "query=") + }) + + t.Run("query with non-success status", func(t *testing.T) { + httpCtx := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(`{ + "status": "error", + "data": {"resultType": "", "result": []} + }`)), + }, + }, + } + + client := &Client{ + baseURL: "https://loki.example.com", + authType: AuthTypeNone, + http: httpCtx, + } + + _, err := client.QueryRange(`{invalid}`, "", "", "") + require.Error(t, err) + assert.Contains(t, err.Error(), "status") + }) + + t.Run("query with HTTP error", func(t *testing.T) { + httpCtx := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusBadRequest, + Body: io.NopCloser(strings.NewReader("parse error")), + }, + }, + } + + client := &Client{ + baseURL: "https://loki.example.com", + authType: AuthTypeNone, + http: httpCtx, + } + + _, err := client.QueryRange(`{invalid`, "", "", "") + require.Error(t, err) + assert.Contains(t, err.Error(), "400") + }) +} diff --git a/pkg/integrations/loki/example.go b/pkg/integrations/loki/example.go new file mode 100644 index 0000000000..5714b8bbb3 --- /dev/null +++ b/pkg/integrations/loki/example.go @@ -0,0 +1,28 @@ +package loki + +import ( + _ "embed" + "sync" + + "github.com/superplanehq/superplane/pkg/utils" +) + +//go:embed example_output_push_logs.json +var exampleOutputPushLogsBytes []byte + +var exampleOutputPushLogsOnce sync.Once +var exampleOutputPushLogsData map[string]any + +func exampleOutputPushLogs() map[string]any { + return utils.UnmarshalEmbeddedJSON(&exampleOutputPushLogsOnce, exampleOutputPushLogsBytes, &exampleOutputPushLogsData) +} + +//go:embed example_output_query_logs.json +var exampleOutputQueryLogsBytes []byte + +var exampleOutputQueryLogsOnce sync.Once +var exampleOutputQueryLogsData map[string]any + +func exampleOutputQueryLogs() map[string]any { + return utils.UnmarshalEmbeddedJSON(&exampleOutputQueryLogsOnce, exampleOutputQueryLogsBytes, &exampleOutputQueryLogsData) +} diff --git a/pkg/integrations/loki/example_output_push_logs.json b/pkg/integrations/loki/example_output_push_logs.json new file mode 100644 index 0000000000..a74e3b7999 --- /dev/null +++ b/pkg/integrations/loki/example_output_push_logs.json @@ -0,0 +1 @@ +{"data":{"labels":{"job":"superplane","env":"prod"},"message":"Deployment completed successfully"},"timestamp":"2026-02-15T10:30:00Z","type":"loki.pushLogs"} \ No newline at end of file diff --git a/pkg/integrations/loki/example_output_query_logs.json b/pkg/integrations/loki/example_output_query_logs.json new file mode 100644 index 0000000000..4fb83df140 --- /dev/null +++ b/pkg/integrations/loki/example_output_query_logs.json @@ -0,0 +1 @@ +{"data":{"resultType":"streams","result":[{"stream":{"job":"superplane","env":"prod"},"values":[["1708000000000000000","Deployment started"],["1708000015000000000","Deployment completed successfully"]]}]},"timestamp":"2026-02-15T10:30:05Z","type":"loki.queryLogs"} \ No newline at end of file diff --git a/pkg/integrations/loki/loki.go b/pkg/integrations/loki/loki.go new file mode 100644 index 0000000000..dd3b5da313 --- /dev/null +++ b/pkg/integrations/loki/loki.go @@ -0,0 +1,182 @@ +package loki + +import ( + "fmt" + + "github.com/mitchellh/mapstructure" + "github.com/superplanehq/superplane/pkg/configuration" + "github.com/superplanehq/superplane/pkg/core" + "github.com/superplanehq/superplane/pkg/registry" +) + +const ( + AuthTypeNone = "none" + AuthTypeBasic = "basic" + AuthTypeBearer = "bearer" +) + +const installationInstructions = `### Connection + +1. **Loki URL**: Provide the base URL of your Loki instance (e.g. ` + "`https://loki.example.com`" + `) +2. **Authentication**: Choose the authentication method: + - **None**: No authentication (e.g. local or internal Loki) + - **Basic**: Username and password (e.g. Grafana Cloud) + - **Bearer**: Bearer token (e.g. Loki behind an auth proxy) +3. **Tenant ID** (optional): If your Loki instance is multi-tenant, provide the tenant ID (sent as ` + "`X-Scope-OrgID`" + ` header) +` + +func init() { + registry.RegisterIntegration("loki", &Loki{}) +} + +type Loki struct{} + +type Configuration struct { + BaseURL string `json:"baseURL" mapstructure:"baseURL"` + AuthType string `json:"authType" mapstructure:"authType"` + Username string `json:"username,omitempty" mapstructure:"username"` + Password string `json:"password,omitempty" mapstructure:"password"` + BearerToken string `json:"bearerToken,omitempty" mapstructure:"bearerToken"` + TenantID string `json:"tenantID,omitempty" mapstructure:"tenantID"` +} + +func (l *Loki) Name() string { + return "loki" +} + +func (l *Loki) Label() string { + return "Loki" +} + +func (l *Loki) Icon() string { + return "loki" +} + +func (l *Loki) Description() string { + return "Push and query logs in Grafana Loki" +} + +func (l *Loki) Instructions() string { + return installationInstructions +} + +func (l *Loki) Configuration() []configuration.Field { + return []configuration.Field{ + { + Name: "baseURL", + Label: "Loki Base URL", + Type: configuration.FieldTypeString, + Required: true, + Placeholder: "https://loki.example.com", + Description: "Base URL of your Loki instance", + }, + { + Name: "authType", + Label: "Authentication", + Type: configuration.FieldTypeSelect, + Required: true, + Default: AuthTypeNone, + TypeOptions: &configuration.TypeOptions{ + Select: &configuration.SelectTypeOptions{ + Options: []configuration.FieldOption{ + {Label: "None", Value: AuthTypeNone}, + {Label: "Basic", Value: AuthTypeBasic}, + {Label: "Bearer", Value: AuthTypeBearer}, + }, + }, + }, + }, + { + Name: "username", + Label: "Username", + Type: configuration.FieldTypeString, + Required: false, + VisibilityConditions: []configuration.VisibilityCondition{ + {Field: "authType", Values: []string{AuthTypeBasic}}, + }, + }, + { + Name: "password", + Label: "Password", + Type: configuration.FieldTypeString, + Required: false, + Sensitive: true, + VisibilityConditions: []configuration.VisibilityCondition{ + {Field: "authType", Values: []string{AuthTypeBasic}}, + }, + }, + { + Name: "bearerToken", + Label: "Bearer Token", + Type: configuration.FieldTypeString, + Required: false, + Sensitive: true, + VisibilityConditions: []configuration.VisibilityCondition{ + {Field: "authType", Values: []string{AuthTypeBearer}}, + }, + }, + { + Name: "tenantID", + Label: "Tenant ID", + Type: configuration.FieldTypeString, + Required: false, + Description: "X-Scope-OrgID header value for multi-tenant Loki deployments", + }, + } +} + +func (l *Loki) Components() []core.Component { + return []core.Component{ + &PushLogs{}, + &QueryLogs{}, + } +} + +func (l *Loki) Triggers() []core.Trigger { + return []core.Trigger{} +} + +func (l *Loki) Cleanup(ctx core.IntegrationCleanupContext) error { + return nil +} + +func (l *Loki) Sync(ctx core.SyncContext) error { + config := Configuration{} + if err := mapstructure.Decode(ctx.Configuration, &config); err != nil { + return fmt.Errorf("failed to decode configuration: %v", err) + } + + if config.BaseURL == "" { + return fmt.Errorf("baseURL is required") + } + + if config.AuthType == "" { + return fmt.Errorf("authType is required") + } + + client, err := NewClient(ctx.HTTP, ctx.Integration) + if err != nil { + return fmt.Errorf("failed to create client: %v", err) + } + + if err := client.Ready(); err != nil { + return fmt.Errorf("failed to verify connection: %v", err) + } + + ctx.Integration.Ready() + return nil +} + +func (l *Loki) HandleRequest(ctx core.HTTPRequestContext) {} + +func (l *Loki) ListResources(resourceType string, ctx core.ListResourcesContext) ([]core.IntegrationResource, error) { + return []core.IntegrationResource{}, nil +} + +func (l *Loki) Actions() []core.Action { + return []core.Action{} +} + +func (l *Loki) HandleAction(ctx core.IntegrationActionContext) error { + return nil +} diff --git a/pkg/integrations/loki/loki_test.go b/pkg/integrations/loki/loki_test.go new file mode 100644 index 0000000000..64d97b1772 --- /dev/null +++ b/pkg/integrations/loki/loki_test.go @@ -0,0 +1,260 @@ +package loki + +import ( + "io" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/superplanehq/superplane/pkg/core" + "github.com/superplanehq/superplane/test/support/contexts" +) + +func Test__Loki__Sync(t *testing.T) { + l := &Loki{} + + t.Run("no baseURL -> error", func(t *testing.T) { + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "", + "authType": AuthTypeNone, + }, + } + + err := l.Sync(core.SyncContext{ + Configuration: appCtx.Configuration, + Integration: appCtx, + }) + + require.ErrorContains(t, err, "baseURL is required") + }) + + t.Run("no authType -> error", func(t *testing.T) { + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "https://loki.example.com", + "authType": "", + }, + } + + err := l.Sync(core.SyncContext{ + Configuration: appCtx.Configuration, + Integration: appCtx, + }) + + require.ErrorContains(t, err, "authType is required") + }) + + t.Run("successful connection with no auth -> ready", func(t *testing.T) { + httpContext := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("ready")), + }, + }, + } + + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "https://loki.example.com", + "authType": AuthTypeNone, + }, + } + + err := l.Sync(core.SyncContext{ + Configuration: appCtx.Configuration, + HTTP: httpContext, + Integration: appCtx, + }) + + require.NoError(t, err) + assert.Equal(t, "ready", appCtx.State) + require.Len(t, httpContext.Requests, 1) + assert.Contains(t, httpContext.Requests[0].URL.String(), "/ready") + }) + + t.Run("successful connection with basic auth -> ready", func(t *testing.T) { + httpContext := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("ready")), + }, + }, + } + + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "https://loki.example.com", + "authType": AuthTypeBasic, + "username": "admin", + "password": "secret", + }, + } + + err := l.Sync(core.SyncContext{ + Configuration: appCtx.Configuration, + HTTP: httpContext, + Integration: appCtx, + }) + + require.NoError(t, err) + assert.Equal(t, "ready", appCtx.State) + require.Len(t, httpContext.Requests, 1) + + username, password, ok := httpContext.Requests[0].BasicAuth() + assert.True(t, ok) + assert.Equal(t, "admin", username) + assert.Equal(t, "secret", password) + }) + + t.Run("successful connection with bearer auth -> ready", func(t *testing.T) { + httpContext := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("ready")), + }, + }, + } + + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "https://loki.example.com", + "authType": AuthTypeBearer, + "bearerToken": "my-token", + }, + } + + err := l.Sync(core.SyncContext{ + Configuration: appCtx.Configuration, + HTTP: httpContext, + Integration: appCtx, + }) + + require.NoError(t, err) + assert.Equal(t, "ready", appCtx.State) + require.Len(t, httpContext.Requests, 1) + assert.Equal(t, "Bearer my-token", httpContext.Requests[0].Header.Get("Authorization")) + }) + + t.Run("connection with tenant ID sends X-Scope-OrgID header", func(t *testing.T) { + httpContext := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("ready")), + }, + }, + } + + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "https://loki.example.com", + "authType": AuthTypeNone, + "tenantID": "my-tenant", + }, + } + + err := l.Sync(core.SyncContext{ + Configuration: appCtx.Configuration, + HTTP: httpContext, + Integration: appCtx, + }) + + require.NoError(t, err) + assert.Equal(t, "ready", appCtx.State) + require.Len(t, httpContext.Requests, 1) + assert.Equal(t, "my-tenant", httpContext.Requests[0].Header.Get("X-Scope-OrgID")) + }) + + t.Run("connection failure -> error", func(t *testing.T) { + httpContext := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusServiceUnavailable, + Body: io.NopCloser(strings.NewReader("Loki is not ready")), + }, + }, + } + + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "https://loki.example.com", + "authType": AuthTypeNone, + }, + } + + err := l.Sync(core.SyncContext{ + Configuration: appCtx.Configuration, + HTTP: httpContext, + Integration: appCtx, + }) + + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to verify connection") + assert.NotEqual(t, "ready", appCtx.State) + }) +} + +func Test__Loki__Components(t *testing.T) { + l := &Loki{} + components := l.Components() + + require.Len(t, components, 2) + assert.Equal(t, "loki.pushLogs", components[0].Name()) + assert.Equal(t, "loki.queryLogs", components[1].Name()) +} + +func Test__Loki__Triggers(t *testing.T) { + l := &Loki{} + triggers := l.Triggers() + + require.Len(t, triggers, 0) +} + +func Test__Loki__Configuration(t *testing.T) { + l := &Loki{} + config := l.Configuration() + + require.Len(t, config, 6) + + baseURLField := config[0] + assert.Equal(t, "baseURL", baseURLField.Name) + assert.True(t, baseURLField.Required) + + authTypeField := config[1] + assert.Equal(t, "authType", authTypeField.Name) + assert.True(t, authTypeField.Required) + + usernameField := config[2] + assert.Equal(t, "username", usernameField.Name) + assert.False(t, usernameField.Required) + assert.Len(t, usernameField.VisibilityConditions, 1) + + passwordField := config[3] + assert.Equal(t, "password", passwordField.Name) + assert.True(t, passwordField.Sensitive) + assert.Len(t, passwordField.VisibilityConditions, 1) + + bearerTokenField := config[4] + assert.Equal(t, "bearerToken", bearerTokenField.Name) + assert.True(t, bearerTokenField.Sensitive) + assert.Len(t, bearerTokenField.VisibilityConditions, 1) + + tenantIDField := config[5] + assert.Equal(t, "tenantID", tenantIDField.Name) + assert.False(t, tenantIDField.Required) +} + +func Test__Loki__Instructions(t *testing.T) { + l := &Loki{} + instructions := l.Instructions() + + assert.NotEmpty(t, instructions) + assert.Contains(t, instructions, "Loki URL") + assert.Contains(t, instructions, "Authentication") +} diff --git a/pkg/integrations/loki/push_logs.go b/pkg/integrations/loki/push_logs.go new file mode 100644 index 0000000000..f866243bdb --- /dev/null +++ b/pkg/integrations/loki/push_logs.go @@ -0,0 +1,215 @@ +package loki + +import ( + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "github.com/google/uuid" + "github.com/mitchellh/mapstructure" + "github.com/superplanehq/superplane/pkg/configuration" + "github.com/superplanehq/superplane/pkg/core" +) + +type PushLogs struct{} + +type PushLogsSpec struct { + Labels string `json:"labels"` + Message string `json:"message"` +} + +type PushLogsNodeMetadata struct { + Labels string `json:"labels"` +} + +func (c *PushLogs) Name() string { + return "loki.pushLogs" +} + +func (c *PushLogs) Label() string { + return "Push Logs" +} + +func (c *PushLogs) Description() string { + return "Push log entries to Loki" +} + +func (c *PushLogs) Documentation() string { + return `The Push Logs component sends log entries to a Loki instance via the push API (` + "`POST /loki/api/v1/push`" + `). + +## Configuration + +- **Labels**: Required comma-separated key=value pairs used as Loki stream labels (e.g. ` + "`job=superplane,env=prod`" + `) +- **Message**: Required log message content to push + +## Output + +Emits one ` + "`loki.pushLogs`" + ` payload confirming the pushed labels and message.` +} + +func (c *PushLogs) Icon() string { + return "loki" +} + +func (c *PushLogs) Color() string { + return "gray" +} + +func (c *PushLogs) OutputChannels(configuration any) []core.OutputChannel { + return []core.OutputChannel{core.DefaultOutputChannel} +} + +func (c *PushLogs) Configuration() []configuration.Field { + return []configuration.Field{ + { + Name: "labels", + Label: "Labels", + Type: configuration.FieldTypeString, + Required: true, + Placeholder: "job=superplane,env=prod", + Description: "Comma-separated key=value label pairs for the log stream", + }, + { + Name: "message", + Label: "Message", + Type: configuration.FieldTypeText, + Required: true, + Description: "Log message to push", + }, + } +} + +func (c *PushLogs) Setup(ctx core.SetupContext) error { + spec := PushLogsSpec{} + if err := mapstructure.Decode(ctx.Configuration, &spec); err != nil { + return fmt.Errorf("failed to decode configuration: %w", err) + } + + spec = sanitizePushLogsSpec(spec) + + if spec.Labels == "" { + return fmt.Errorf("labels is required") + } + + if spec.Message == "" { + return fmt.Errorf("message is required") + } + + if _, err := parseLabels(spec.Labels); err != nil { + return fmt.Errorf("invalid labels: %w", err) + } + + return nil +} + +func (c *PushLogs) Execute(ctx core.ExecutionContext) error { + spec := PushLogsSpec{} + if err := mapstructure.Decode(ctx.Configuration, &spec); err != nil { + return fmt.Errorf("failed to decode configuration: %w", err) + } + + spec = sanitizePushLogsSpec(spec) + + labels, err := parseLabels(spec.Labels) + if err != nil { + return fmt.Errorf("invalid labels: %w", err) + } + + client, err := NewClient(ctx.HTTP, ctx.Integration) + if err != nil { + return fmt.Errorf("failed to create Loki client: %w", err) + } + + ts := strconv.FormatInt(time.Now().UnixNano(), 10) + + streams := []Stream{ + { + Stream: labels, + Values: [][]string{{ts, spec.Message}}, + }, + } + + if err := client.Push(streams); err != nil { + return fmt.Errorf("failed to push logs: %w", err) + } + + ctx.Metadata.Set(PushLogsNodeMetadata{Labels: spec.Labels}) + + payload := map[string]any{ + "labels": labels, + "message": spec.Message, + } + + return ctx.ExecutionState.Emit( + core.DefaultOutputChannel.Name, + "loki.pushLogs", + []any{payload}, + ) +} + +func (c *PushLogs) ProcessQueueItem(ctx core.ProcessQueueContext) (*uuid.UUID, error) { + return ctx.DefaultProcessing() +} + +func (c *PushLogs) HandleWebhook(ctx core.WebhookRequestContext) (int, error) { + return http.StatusOK, nil +} + +func (c *PushLogs) Actions() []core.Action { + return []core.Action{} +} + +func (c *PushLogs) HandleAction(ctx core.ActionContext) error { + return nil +} + +func (c *PushLogs) Cancel(ctx core.ExecutionContext) error { + return nil +} + +func (c *PushLogs) Cleanup(ctx core.SetupContext) error { + return nil +} + +func (c *PushLogs) ExampleOutput() map[string]any { + return exampleOutputPushLogs() +} + +func parseLabels(labelsStr string) (map[string]string, error) { + labels := make(map[string]string) + + for _, pair := range strings.Split(labelsStr, ",") { + pair = strings.TrimSpace(pair) + if pair == "" { + continue + } + + parts := strings.SplitN(pair, "=", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("invalid label format %q, expected key=value", pair) + } + + key := strings.TrimSpace(parts[0]) + value := strings.TrimSpace(parts[1]) + + if key == "" { + return nil, fmt.Errorf("label key cannot be empty") + } + + labels[key] = value + } + + if len(labels) == 0 { + return nil, fmt.Errorf("at least one label is required") + } + + return labels, nil +} + +func sanitizePushLogsSpec(spec PushLogsSpec) PushLogsSpec { + spec.Labels = strings.TrimSpace(spec.Labels) + spec.Message = strings.TrimSpace(spec.Message) + return spec +} diff --git a/pkg/integrations/loki/push_logs_test.go b/pkg/integrations/loki/push_logs_test.go new file mode 100644 index 0000000000..4f65d07685 --- /dev/null +++ b/pkg/integrations/loki/push_logs_test.go @@ -0,0 +1,228 @@ +package loki + +import ( + "io" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/superplanehq/superplane/pkg/core" + "github.com/superplanehq/superplane/test/support/contexts" +) + +func Test__PushLogs__Setup(t *testing.T) { + component := &PushLogs{} + + t.Run("invalid configuration -> decode error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: "invalid", + }) + + require.ErrorContains(t, err, "failed to decode configuration") + }) + + t.Run("missing labels -> error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "labels": "", + "message": "hello", + }, + }) + + require.ErrorContains(t, err, "labels is required") + }) + + t.Run("missing message -> error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "labels": "job=test", + "message": "", + }, + }) + + require.ErrorContains(t, err, "message is required") + }) + + t.Run("invalid label format -> error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "labels": "invalid-label", + "message": "hello", + }, + }) + + require.ErrorContains(t, err, "invalid labels") + }) + + t.Run("valid configuration -> success", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "labels": "job=superplane,env=prod", + "message": "test message", + }, + }) + + require.NoError(t, err) + }) +} + +func Test__PushLogs__Execute(t *testing.T) { + component := &PushLogs{} + + t.Run("successful push", func(t *testing.T) { + httpContext := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusNoContent, + Body: io.NopCloser(strings.NewReader("")), + }, + }, + } + + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "https://loki.example.com", + "authType": AuthTypeNone, + }, + } + + executionState := &contexts.ExecutionStateContext{ + KVs: make(map[string]string), + } + metadataCtx := &contexts.MetadataContext{} + + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{ + "labels": "job=superplane,env=prod", + "message": "Deployment completed", + }, + HTTP: httpContext, + Integration: appCtx, + ExecutionState: executionState, + Metadata: metadataCtx, + }) + + require.NoError(t, err) + assert.True(t, executionState.Passed) + assert.Equal(t, "default", executionState.Channel) + assert.Equal(t, "loki.pushLogs", executionState.Type) + + require.Len(t, httpContext.Requests, 1) + req := httpContext.Requests[0] + assert.Equal(t, http.MethodPost, req.Method) + assert.Contains(t, req.URL.String(), "/loki/api/v1/push") + }) + + t.Run("push failure -> returns error", func(t *testing.T) { + httpContext := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusBadRequest, + Body: io.NopCloser(strings.NewReader(`{"message":"invalid stream"}`)), + }, + }, + } + + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "https://loki.example.com", + "authType": AuthTypeNone, + }, + } + + executionState := &contexts.ExecutionStateContext{ + KVs: make(map[string]string), + } + metadataCtx := &contexts.MetadataContext{} + + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{ + "labels": "job=test", + "message": "hello", + }, + HTTP: httpContext, + Integration: appCtx, + ExecutionState: executionState, + Metadata: metadataCtx, + }) + + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to push logs") + }) + + t.Run("invalid labels -> returns error", func(t *testing.T) { + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "https://loki.example.com", + "authType": AuthTypeNone, + }, + } + + executionState := &contexts.ExecutionStateContext{ + KVs: make(map[string]string), + } + metadataCtx := &contexts.MetadataContext{} + + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{ + "labels": "invalid", + "message": "hello", + }, + HTTP: &contexts.HTTPContext{}, + Integration: appCtx, + ExecutionState: executionState, + Metadata: metadataCtx, + }) + + require.Error(t, err) + assert.Contains(t, err.Error(), "invalid labels") + }) +} + +func Test__PushLogs__OutputChannels(t *testing.T) { + component := &PushLogs{} + channels := component.OutputChannels(nil) + + require.Len(t, channels, 1) + assert.Equal(t, "default", channels[0].Name) +} + +func Test__PushLogs__parseLabels(t *testing.T) { + t.Run("single label", func(t *testing.T) { + labels, err := parseLabels("job=superplane") + require.NoError(t, err) + assert.Equal(t, map[string]string{"job": "superplane"}, labels) + }) + + t.Run("multiple labels", func(t *testing.T) { + labels, err := parseLabels("job=superplane,env=prod") + require.NoError(t, err) + assert.Equal(t, map[string]string{"job": "superplane", "env": "prod"}, labels) + }) + + t.Run("labels with spaces", func(t *testing.T) { + labels, err := parseLabels("job = superplane , env = prod") + require.NoError(t, err) + assert.Equal(t, map[string]string{"job": "superplane", "env": "prod"}, labels) + }) + + t.Run("empty string -> error", func(t *testing.T) { + _, err := parseLabels("") + require.Error(t, err) + assert.Contains(t, err.Error(), "at least one label is required") + }) + + t.Run("invalid format -> error", func(t *testing.T) { + _, err := parseLabels("noequalssign") + require.Error(t, err) + assert.Contains(t, err.Error(), "invalid label format") + }) + + t.Run("empty key -> error", func(t *testing.T) { + _, err := parseLabels("=value") + require.Error(t, err) + assert.Contains(t, err.Error(), "label key cannot be empty") + }) +} diff --git a/pkg/integrations/loki/query_logs.go b/pkg/integrations/loki/query_logs.go new file mode 100644 index 0000000000..fddbf6fada --- /dev/null +++ b/pkg/integrations/loki/query_logs.go @@ -0,0 +1,190 @@ +package loki + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/google/uuid" + "github.com/mitchellh/mapstructure" + "github.com/superplanehq/superplane/pkg/configuration" + "github.com/superplanehq/superplane/pkg/core" +) + +type QueryLogs struct{} + +type QueryLogsSpec struct { + Query string `json:"query"` + Start string `json:"start"` + End string `json:"end"` + Limit string `json:"limit"` +} + +type QueryLogsNodeMetadata struct { + Query string `json:"query"` +} + +func (c *QueryLogs) Name() string { + return "loki.queryLogs" +} + +func (c *QueryLogs) Label() string { + return "Query Logs" +} + +func (c *QueryLogs) Description() string { + return "Query logs from Loki using LogQL" +} + +func (c *QueryLogs) Documentation() string { + return `The Query Logs component executes a LogQL query against Loki (` + "`GET /loki/api/v1/query_range`" + `). + +## Configuration + +- **Query**: Required LogQL expression (supports expressions). Example: ` + "`{job=\"superplane\"}`" + ` +- **Start**: Optional start timestamp in RFC3339 or Unix nanosecond format (supports expressions) +- **End**: Optional end timestamp in RFC3339 or Unix nanosecond format (supports expressions) +- **Limit**: Optional maximum number of entries to return (default: 100) + +## Output + +Emits one ` + "`loki.queryLogs`" + ` payload with the result type and query results.` +} + +func (c *QueryLogs) Icon() string { + return "loki" +} + +func (c *QueryLogs) Color() string { + return "gray" +} + +func (c *QueryLogs) OutputChannels(configuration any) []core.OutputChannel { + return []core.OutputChannel{core.DefaultOutputChannel} +} + +func (c *QueryLogs) Configuration() []configuration.Field { + return []configuration.Field{ + { + Name: "query", + Label: "Query", + Type: configuration.FieldTypeString, + Required: true, + Placeholder: `{job="superplane"}`, + Description: "LogQL expression to evaluate", + }, + { + Name: "start", + Label: "Start", + Type: configuration.FieldTypeString, + Required: false, + Placeholder: "2026-01-01T00:00:00Z", + Description: "Start timestamp (RFC3339 or Unix nanoseconds)", + }, + { + Name: "end", + Label: "End", + Type: configuration.FieldTypeString, + Required: false, + Placeholder: "2026-01-02T00:00:00Z", + Description: "End timestamp (RFC3339 or Unix nanoseconds)", + }, + { + Name: "limit", + Label: "Limit", + Type: configuration.FieldTypeString, + Required: false, + Default: "100", + Description: "Maximum number of entries to return", + }, + } +} + +func (c *QueryLogs) Setup(ctx core.SetupContext) error { + spec := QueryLogsSpec{} + if err := mapstructure.Decode(ctx.Configuration, &spec); err != nil { + return fmt.Errorf("failed to decode configuration: %w", err) + } + + spec = sanitizeQueryLogsSpec(spec) + + if spec.Query == "" { + return fmt.Errorf("query is required") + } + + return nil +} + +func (c *QueryLogs) Execute(ctx core.ExecutionContext) error { + spec := QueryLogsSpec{} + if err := mapstructure.Decode(ctx.Configuration, &spec); err != nil { + return fmt.Errorf("failed to decode configuration: %w", err) + } + + spec = sanitizeQueryLogsSpec(spec) + + client, err := NewClient(ctx.HTTP, ctx.Integration) + if err != nil { + return fmt.Errorf("failed to create Loki client: %w", err) + } + + data, err := client.QueryRange(spec.Query, spec.Start, spec.End, spec.Limit) + if err != nil { + return fmt.Errorf("failed to query logs: %w", err) + } + + ctx.Metadata.Set(QueryLogsNodeMetadata{Query: spec.Query}) + + var result any + if err := json.Unmarshal(data.Result, &result); err != nil { + result = json.RawMessage(data.Result) + } + + payload := map[string]any{ + "resultType": data.ResultType, + "result": result, + } + + return ctx.ExecutionState.Emit( + core.DefaultOutputChannel.Name, + "loki.queryLogs", + []any{payload}, + ) +} + +func (c *QueryLogs) ProcessQueueItem(ctx core.ProcessQueueContext) (*uuid.UUID, error) { + return ctx.DefaultProcessing() +} + +func (c *QueryLogs) HandleWebhook(ctx core.WebhookRequestContext) (int, error) { + return http.StatusOK, nil +} + +func (c *QueryLogs) Actions() []core.Action { + return []core.Action{} +} + +func (c *QueryLogs) HandleAction(ctx core.ActionContext) error { + return nil +} + +func (c *QueryLogs) Cancel(ctx core.ExecutionContext) error { + return nil +} + +func (c *QueryLogs) Cleanup(ctx core.SetupContext) error { + return nil +} + +func (c *QueryLogs) ExampleOutput() map[string]any { + return exampleOutputQueryLogs() +} + +func sanitizeQueryLogsSpec(spec QueryLogsSpec) QueryLogsSpec { + spec.Query = strings.TrimSpace(spec.Query) + spec.Start = strings.TrimSpace(spec.Start) + spec.End = strings.TrimSpace(spec.End) + spec.Limit = strings.TrimSpace(spec.Limit) + return spec +} diff --git a/pkg/integrations/loki/query_logs_test.go b/pkg/integrations/loki/query_logs_test.go new file mode 100644 index 0000000000..9a66f75054 --- /dev/null +++ b/pkg/integrations/loki/query_logs_test.go @@ -0,0 +1,208 @@ +package loki + +import ( + "io" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/superplanehq/superplane/pkg/core" + "github.com/superplanehq/superplane/test/support/contexts" +) + +func Test__QueryLogs__Setup(t *testing.T) { + component := &QueryLogs{} + + t.Run("invalid configuration -> decode error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: "invalid", + }) + + require.ErrorContains(t, err, "failed to decode configuration") + }) + + t.Run("missing query -> error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "query": "", + }, + }) + + require.ErrorContains(t, err, "query is required") + }) + + t.Run("valid configuration -> success", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "query": `{job="superplane"}`, + }, + }) + + require.NoError(t, err) + }) + + t.Run("valid configuration with all fields -> success", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "query": `{job="superplane"}`, + "start": "2026-01-01T00:00:00Z", + "end": "2026-01-02T00:00:00Z", + "limit": "50", + }, + }) + + require.NoError(t, err) + }) +} + +func Test__QueryLogs__Execute(t *testing.T) { + component := &QueryLogs{} + + t.Run("successful query", func(t *testing.T) { + httpContext := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(`{ + "status": "success", + "data": { + "resultType": "streams", + "result": [ + { + "stream": {"job": "superplane"}, + "values": [ + ["1708000000000000000", "log line 1"], + ["1708000015000000000", "log line 2"] + ] + } + ] + } + }`)), + }, + }, + } + + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "https://loki.example.com", + "authType": AuthTypeNone, + }, + } + + executionState := &contexts.ExecutionStateContext{ + KVs: make(map[string]string), + } + metadataCtx := &contexts.MetadataContext{} + + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{ + "query": `{job="superplane"}`, + "start": "2026-01-01T00:00:00Z", + "end": "2026-01-02T00:00:00Z", + "limit": "100", + }, + HTTP: httpContext, + Integration: appCtx, + ExecutionState: executionState, + Metadata: metadataCtx, + }) + + require.NoError(t, err) + assert.True(t, executionState.Passed) + assert.Equal(t, "default", executionState.Channel) + assert.Equal(t, "loki.queryLogs", executionState.Type) + + require.Len(t, httpContext.Requests, 1) + req := httpContext.Requests[0] + assert.Equal(t, http.MethodGet, req.Method) + assert.Contains(t, req.URL.String(), "/loki/api/v1/query_range") + }) + + t.Run("query failure -> returns error", func(t *testing.T) { + httpContext := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusBadRequest, + Body: io.NopCloser(strings.NewReader("parse error")), + }, + }, + } + + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "https://loki.example.com", + "authType": AuthTypeNone, + }, + } + + executionState := &contexts.ExecutionStateContext{ + KVs: make(map[string]string), + } + metadataCtx := &contexts.MetadataContext{} + + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{ + "query": `{invalid`, + }, + HTTP: httpContext, + Integration: appCtx, + ExecutionState: executionState, + Metadata: metadataCtx, + }) + + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to query logs") + }) + + t.Run("query without optional fields", func(t *testing.T) { + httpContext := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(`{ + "status": "success", + "data": { + "resultType": "streams", + "result": [] + } + }`)), + }, + }, + } + + appCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "baseURL": "https://loki.example.com", + "authType": AuthTypeNone, + }, + } + + executionState := &contexts.ExecutionStateContext{ + KVs: make(map[string]string), + } + metadataCtx := &contexts.MetadataContext{} + + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{ + "query": `{job="superplane"}`, + }, + HTTP: httpContext, + Integration: appCtx, + ExecutionState: executionState, + Metadata: metadataCtx, + }) + + require.NoError(t, err) + assert.True(t, executionState.Passed) + }) +} + +func Test__QueryLogs__OutputChannels(t *testing.T) { + component := &QueryLogs{} + channels := component.OutputChannels(nil) + + require.Len(t, channels, 1) + assert.Equal(t, "default", channels[0].Name) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index efbd4321fb..1b191f4a94 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -59,6 +59,7 @@ import ( _ "github.com/superplanehq/superplane/pkg/integrations/jfrog_artifactory" _ "github.com/superplanehq/superplane/pkg/integrations/jira" _ "github.com/superplanehq/superplane/pkg/integrations/launchdarkly" + _ "github.com/superplanehq/superplane/pkg/integrations/loki" _ "github.com/superplanehq/superplane/pkg/integrations/octopus" _ "github.com/superplanehq/superplane/pkg/integrations/openai" _ "github.com/superplanehq/superplane/pkg/integrations/pagerduty" diff --git a/web_src/src/assets/icons/integrations/loki.svg b/web_src/src/assets/icons/integrations/loki.svg new file mode 100644 index 0000000000..a3877734ce --- /dev/null +++ b/web_src/src/assets/icons/integrations/loki.svg @@ -0,0 +1,6 @@ + + + + + + diff --git a/web_src/src/pages/workflowv2/mappers/index.ts b/web_src/src/pages/workflowv2/mappers/index.ts index 771a963eab..d373ad9195 100644 --- a/web_src/src/pages/workflowv2/mappers/index.ts +++ b/web_src/src/pages/workflowv2/mappers/index.ts @@ -194,6 +194,11 @@ import { triggerRenderers as servicenowTriggerRenderers, eventStateRegistry as servicenowEventStateRegistry, } from "./servicenow/index"; +import { + componentMappers as lokiComponentMappers, + triggerRenderers as lokiTriggerRenderers, + eventStateRegistry as lokiEventStateRegistry, +} from "./loki/index"; import { filterMapper, FILTER_STATE_REGISTRY } from "./filter"; import { sshMapper, SSH_STATE_REGISTRY } from "./ssh"; @@ -265,6 +270,7 @@ const appMappers: Record> = { honeycomb: honeycombComponentMappers, harness: harnessComponentMappers, servicenow: servicenowComponentMappers, + loki: lokiComponentMappers, }; const appTriggerRenderers: Record> = { @@ -302,6 +308,7 @@ const appTriggerRenderers: Record> = { honeycomb: honeycombTriggerRenderers, harness: harnessTriggerRenderers, servicenow: servicenowTriggerRenderers, + loki: lokiTriggerRenderers, }; const appEventStateRegistries: Record> = { @@ -338,6 +345,7 @@ const appEventStateRegistries: Record honeycomb: honeycombEventStateRegistry, harness: harnessEventStateRegistry, servicenow: servicenowEventStateRegistry, + loki: lokiEventStateRegistry, }; const componentAdditionalDataBuilders: Record = { diff --git a/web_src/src/pages/workflowv2/mappers/loki/index.ts b/web_src/src/pages/workflowv2/mappers/loki/index.ts new file mode 100644 index 0000000000..094472c5e9 --- /dev/null +++ b/web_src/src/pages/workflowv2/mappers/loki/index.ts @@ -0,0 +1,16 @@ +import { ComponentBaseMapper, EventStateRegistry, TriggerRenderer } from "../types"; +import { pushLogsMapper } from "./push_logs"; +import { queryLogsMapper } from "./query_logs"; +import { buildActionStateRegistry } from "../utils"; + +export const componentMappers: Record = { + pushLogs: pushLogsMapper, + queryLogs: queryLogsMapper, +}; + +export const triggerRenderers: Record = {}; + +export const eventStateRegistry: Record = { + pushLogs: buildActionStateRegistry("pushed"), + queryLogs: buildActionStateRegistry("queried"), +}; diff --git a/web_src/src/pages/workflowv2/mappers/loki/push_logs.ts b/web_src/src/pages/workflowv2/mappers/loki/push_logs.ts new file mode 100644 index 0000000000..8519953863 --- /dev/null +++ b/web_src/src/pages/workflowv2/mappers/loki/push_logs.ts @@ -0,0 +1,99 @@ +import { ComponentBaseProps, EventSection } from "@/ui/componentBase"; +import { getBackgroundColorClass } from "@/utils/colors"; +import { getState, getStateMap, getTriggerRenderer } from ".."; +import { + ComponentBaseContext, + ComponentBaseMapper, + ExecutionDetailsContext, + ExecutionInfo, + NodeInfo, + OutputPayload, + SubtitleContext, +} from "../types"; +import { MetadataItem } from "@/ui/metadataList"; +import lokiIcon from "@/assets/icons/integrations/loki.svg"; +import { PushLogsConfiguration, PushLogsNodeMetadata, PushLogsPayload } from "./types"; +import { formatTimeAgo } from "@/utils/date"; + +export const pushLogsMapper: ComponentBaseMapper = { + props(context: ComponentBaseContext): ComponentBaseProps { + const lastExecution = context.lastExecutions.length > 0 ? context.lastExecutions[0] : null; + const componentName = context.componentDefinition.name || "unknown"; + + return { + iconSrc: lokiIcon, + collapsedBackground: getBackgroundColorClass(context.componentDefinition.color), + collapsed: context.node.isCollapsed, + title: + context.node.name || + context.componentDefinition.label || + context.componentDefinition.name || + "Unnamed component", + eventSections: lastExecution ? baseEventSections(context.nodes, lastExecution, componentName) : undefined, + metadata: metadataList(context.node), + includeEmptyState: !lastExecution, + eventStateMap: getStateMap(componentName), + }; + }, + + getExecutionDetails(context: ExecutionDetailsContext): Record { + const details: Record = {}; + + if (context.execution.createdAt) { + details["Pushed At"] = new Date(context.execution.createdAt).toLocaleString(); + } + + const outputs = context.execution.outputs as { default: OutputPayload[] }; + if (!outputs?.default?.[0]?.data) { + return details; + } + + const payload = outputs.default[0].data as PushLogsPayload; + + if (payload?.labels) { + const labelStr = Object.entries(payload.labels) + .map(([k, v]) => `${k}=${v}`) + .join(", "); + details["Labels"] = labelStr; + } + + if (payload?.message) { + details["Message"] = payload.message; + } + + return details; + }, + + subtitle(context: SubtitleContext): string { + if (!context.execution.createdAt) return ""; + return formatTimeAgo(new Date(context.execution.createdAt)); + }, +}; + +function metadataList(node: NodeInfo): MetadataItem[] { + const metadata: MetadataItem[] = []; + const nodeMetadata = node.metadata as PushLogsNodeMetadata | undefined; + const configuration = node.configuration as PushLogsConfiguration | undefined; + + const labels = nodeMetadata?.labels || configuration?.labels; + if (labels) { + metadata.push({ icon: "tag", label: labels }); + } + + return metadata.slice(0, 3); +} + +function baseEventSections(nodes: NodeInfo[], execution: ExecutionInfo, componentName: string): EventSection[] { + const rootTriggerNode = nodes.find((n) => n.id === execution.rootEvent?.nodeId); + const rootTriggerRenderer = getTriggerRenderer(rootTriggerNode?.componentName!); + const { title } = rootTriggerRenderer.getTitleAndSubtitle({ event: execution.rootEvent }); + + return [ + { + receivedAt: new Date(execution.createdAt!), + eventTitle: title, + eventState: getState(componentName)(execution), + eventId: execution.rootEvent?.id || "", + }, + ]; +} diff --git a/web_src/src/pages/workflowv2/mappers/loki/query_logs.ts b/web_src/src/pages/workflowv2/mappers/loki/query_logs.ts new file mode 100644 index 0000000000..3faa01fc15 --- /dev/null +++ b/web_src/src/pages/workflowv2/mappers/loki/query_logs.ts @@ -0,0 +1,101 @@ +import { ComponentBaseProps, EventSection } from "@/ui/componentBase"; +import { MetadataItem } from "@/ui/metadataList"; +import { getBackgroundColorClass } from "@/utils/colors"; +import { formatTimeAgo } from "@/utils/date"; +import lokiIcon from "@/assets/icons/integrations/loki.svg"; +import { getState, getStateMap, getTriggerRenderer } from ".."; +import { + ComponentBaseContext, + ComponentBaseMapper, + ExecutionDetailsContext, + ExecutionInfo, + NodeInfo, + OutputPayload, + SubtitleContext, +} from "../types"; +import { QueryLogsConfiguration, QueryLogsNodeMetadata, QueryLogsPayload } from "./types"; + +export const queryLogsMapper: ComponentBaseMapper = { + props(context: ComponentBaseContext): ComponentBaseProps { + const lastExecution = context.lastExecutions.length > 0 ? context.lastExecutions[0] : null; + const componentName = context.componentDefinition.name || context.node.componentName || "unknown"; + + return { + iconSrc: lokiIcon, + collapsedBackground: getBackgroundColorClass(context.componentDefinition.color), + collapsed: context.node.isCollapsed, + title: context.node.name || context.componentDefinition.label || "Unnamed component", + eventSections: lastExecution ? buildEventSections(context.nodes, lastExecution, componentName) : undefined, + metadata: getMetadata(context.node), + includeEmptyState: !lastExecution, + eventStateMap: getStateMap(componentName), + }; + }, + + subtitle(context: SubtitleContext): string { + if (!context.execution.createdAt) { + return ""; + } + + return formatTimeAgo(new Date(context.execution.createdAt)); + }, + + getExecutionDetails(context: ExecutionDetailsContext): Record { + const details: Record = {}; + + if (context.execution.createdAt) { + details["Executed At"] = new Date(context.execution.createdAt).toLocaleString(); + } + + const outputs = context.execution.outputs as { default?: OutputPayload[] } | undefined; + if (!outputs || !outputs.default || outputs.default.length === 0) { + return details; + } + + const queryResult = outputs.default[0].data as QueryLogsPayload; + + const configuration = context.node?.configuration as QueryLogsConfiguration | undefined; + if (configuration?.query) { + details["Query"] = configuration.query; + } + + if (queryResult?.resultType) { + details["Result Type"] = queryResult.resultType; + } + + if (queryResult?.result !== undefined) { + details["Results"] = String(Array.isArray(queryResult.result) ? queryResult.result.length : 0); + } + + return details; + }, +}; + +function getMetadata(node: NodeInfo): MetadataItem[] { + const metadata: MetadataItem[] = []; + const nodeMetadata = node.metadata as QueryLogsNodeMetadata | undefined; + const configuration = node.configuration as QueryLogsConfiguration | undefined; + + const query = nodeMetadata?.query || configuration?.query; + if (query) { + metadata.push({ icon: "search", label: query }); + } + + return metadata.slice(0, 3); +} + +function buildEventSections(nodes: NodeInfo[], execution: ExecutionInfo, componentName: string): EventSection[] { + const rootTriggerNode = nodes.find((n) => n.id === execution.rootEvent?.nodeId); + const rootTriggerRenderer = getTriggerRenderer(rootTriggerNode?.componentName!); + const { title } = rootTriggerRenderer.getTitleAndSubtitle({ event: execution.rootEvent }); + + return [ + { + receivedAt: new Date(execution.createdAt!), + eventTitle: title, + eventSubtitle: execution.createdAt ? formatTimeAgo(new Date(execution.createdAt)) : "", + eventState: getState(componentName)(execution), + eventId: execution.rootEvent!.id!, + }, + ]; +} diff --git a/web_src/src/pages/workflowv2/mappers/loki/types.ts b/web_src/src/pages/workflowv2/mappers/loki/types.ts new file mode 100644 index 0000000000..e21ef7ea59 --- /dev/null +++ b/web_src/src/pages/workflowv2/mappers/loki/types.ts @@ -0,0 +1,29 @@ +export interface PushLogsConfiguration { + labels?: string; + message?: string; +} + +export interface PushLogsPayload { + labels?: Record; + message?: string; +} + +export interface PushLogsNodeMetadata { + labels?: string; +} + +export interface QueryLogsConfiguration { + query?: string; + start?: string; + end?: string; + limit?: string; +} + +export interface QueryLogsNodeMetadata { + query?: string; +} + +export interface QueryLogsPayload { + resultType?: string; + result?: any[]; +} diff --git a/web_src/src/ui/componentSidebar/integrationIcons.tsx b/web_src/src/ui/componentSidebar/integrationIcons.tsx index ad6363141b..94fde0be5d 100644 --- a/web_src/src/ui/componentSidebar/integrationIcons.tsx +++ b/web_src/src/ui/componentSidebar/integrationIcons.tsx @@ -46,6 +46,7 @@ import harnessIcon from "@/assets/icons/integrations/harness.svg"; import servicenowIcon from "@/assets/icons/integrations/servicenow.svg"; import statuspageIcon from "@/assets/icons/integrations/statuspage.svg"; import launchdarklyIcon from "@/assets/icons/integrations/launchdarkly.svg"; +import lokiIcon from "@/assets/icons/integrations/loki.svg"; /** Integration type name (e.g. "github") → logo src. Used for Settings tab and header. */ export const INTEGRATION_APP_LOGO_MAP: Record = { @@ -86,6 +87,7 @@ export const INTEGRATION_APP_LOGO_MAP: Record = { servicenow: servicenowIcon, statuspage: statuspageIcon, launchdarkly: launchdarklyIcon, + loki: lokiIcon, }; /** Block name first part (e.g. "github") or compound (e.g. aws.lambda) → logo src for header. */ @@ -124,6 +126,7 @@ export const APP_LOGO_MAP: Record> = { servicenow: servicenowIcon, statuspage: statuspageIcon, launchdarkly: launchdarklyIcon, + loki: lokiIcon, aws: { cloudwatch: awsCloudwatchIcon, codeArtifact: awsCodeArtifactIcon,