diff --git a/README.md b/README.md index 20f380fc..a80933be 100644 --- a/README.md +++ b/README.md @@ -99,7 +99,7 @@ Those are the variables you have to setup: |KAFKA_USER||user| |KAFKA_PASS||supersecret| |KAFKA_BROKER|if set to empty the Async API will be disabled|kafka.example.com:9094| -|KAFKA_TOPICS|Contains a map, using toml format, mapping entities in the Vulcan async API to the kafka topics they wil be pushed to, by now the only available entity is ``assets`` |[assets = "assets-topic"]| +|KAFKA_TOPICS|Contains a map, using toml format, mapping entities in the Vulcan async API to the kafka topics they wil be pushed to, by now the only available entity is ``assets`` |{assets = "assets-topic", findings = "findings-topic"}| First we have to build the `vulcan-api` because the build only copies the file. We need to provide `linux` compiled binary to the docker build command. This won't be necessary when this component has been open sourced. diff --git a/_resources/config/local.toml.example b/_resources/config/local.toml.example index 4fda2edb..c3c9954c 100644 --- a/_resources/config/local.toml.example +++ b/_resources/config/local.toml.example @@ -58,6 +58,12 @@ key = "a key" retries = 4 retry_interval = 2 # seconds +[kafka] +user = "user" +pass = "supersecret" +broker = "kafka.example.com:9094" +topics = '{assets = "assets-topic", findings = "findings-topic"}' + [globalpolicy] # This config policy emulates code policy definition. [globalpolicy.web-scanning-global] diff --git a/docs/asyncapi.yaml b/docs/asyncapi.yaml index f85aefbd..1c13a274 100644 --- a/docs/asyncapi.yaml +++ b/docs/asyncapi.yaml @@ -14,7 +14,11 @@ channels: description: CDC Events of the assets stored in Vulcan. subscribe: message: - $ref: '#/components/messages/asset' + $ref: "#/components/messages/asset" + findings: + subscribe: + message: + $ref: "#/components/messages/finding" components: messages: @@ -29,6 +33,15 @@ components: contentType: application/json payload: $ref: "#/components/schemas/assetPayload" + finding: + headers: + $ref: "#/components/schemas/findingMetadata" + contentType: application/json + payload: + $ref: "#/components/schemas/findingPayload" + summary: Events generated from Vulnerability DB findings state changes + name: Finding + title: Findings state schemas: assetMetadata: @@ -123,3 +136,134 @@ components: - name - description - tag + + findingPayload: + properties: + affected_resource: + type: string + current_exposure: + type: integer + details: + type: string + id: + type: string + impact_details: + type: string + issue: + $ref: '#/components/schemas/issue' + resources: + $ref: '#/components/schemas/resources' + score: + type: number + source: + $ref: '#/components/schemas/source' + status: + type: string + target: + $ref: '#/components/schemas/target' + total_exposure: + type: integer + type: object + additionalProperties: false + + findingMetadata: + type: object + additionalProperties: false + properties: + version: + type: string + description: The value of this field is equal to the value of the field info.version of this document. + required: + - version + + stringArray: + items: + type: string + type: + - array + - "null" + + issue: + properties: + cwe_id: + minimum: 0 + type: integer + description: + type: string + id: + type: string + labels: + items: + type: string + type: + - array + - "null" + recommendations: + $ref: '#/components/schemas/stringArray' + reference_links: + $ref: '#/components/schemas/stringArray' + summary: + type: string + type: object + additionalProperties: false + + resourceGroup: + properties: + attributes: + items: + type: string + type: + - array + - "null" + name: + type: string + resources: + items: + additionalProperties: + type: string + type: object + type: + - array + - "null" + type: object + additionalProperties: false + + resources: + items: + $ref: '#/components/schemas/resourceGroup' + type: + - array + - "null" + + source: + properties: + component: + type: string + id: + type: string + instance: + type: string + name: + type: string + options: + type: string + time: + format: date-time + type: string + type: object + additionalProperties: false + + target: + properties: + id: + type: string + identifier: + type: string + teams: + items: + type: string + type: + - array + - "null" + type: object + additionalProperties: false diff --git a/local.env.example b/local.env.example index d7fb56c8..96af4a79 100644 --- a/local.env.example +++ b/local.env.example @@ -41,4 +41,4 @@ GPC_1_NAME=web-scanning-global KAFKA_USER=user KAFKA_PASS=supersecret KAFKA_BROKER=kafka.example.com:9094 -KAFKA_TOPICS={assets = "assets-topic"} +KAFKA_TOPICS={assets = "assets-topic", findings = "findings-topic"} diff --git a/pkg/api/store/cdc/parser.go b/pkg/api/store/cdc/parser.go index 007b0a2d..c61ce97d 100644 --- a/pkg/api/store/cdc/parser.go +++ b/pkg/api/store/cdc/parser.go @@ -58,6 +58,7 @@ type AsyncTxParser struct { type AsyncAPI interface { PushAsset(asset asyncapi.AssetPayload) error DeleteAsset(asset asyncapi.AssetPayload) error + PushFinding(finding asyncapi.FindingPayload) error } // NewAsyncTxParser builds a new CDC log parser to handle distributed @@ -241,12 +242,7 @@ func (p *AsyncTxParser) processUpdateAsset(data []byte) error { return errInvalidData } asyncAsset := assetToAsyncAsset(dto.NewAsset) - err = p.asyncAPI.PushAsset(asyncAsset) - if err != nil { - return err - } - - return nil + return p.asyncAPI.PushAsset(asyncAsset) } func (p *AsyncTxParser) processDeleteAllAssets(data []byte) error { @@ -276,6 +272,7 @@ func (p *AsyncTxParser) processFindingOverwrite(data []byte) error { return errInvalidData } + // Update finding in vulndb _, err = p.VulnDBClient.UpdateFinding( context.Background(), dto.FindingOverwrite.FindingID, @@ -289,7 +286,28 @@ func (p *AsyncTxParser) processFindingOverwrite(data []byte) error { } return err } - return nil + + // Retrieve current finding status and push event + f, err := p.VulnDBClient.Finding(context.Background(), dto.FindingOverwrite.FindingID) + if err != nil { + if errors.IsKind(err, errors.ErrNotFound) { + return nil + } + } + // TODO: There can be a race condition here between two concurrent state changes for a + // finding between this finding overwrite and a related check processing from vulndb side. + // Currently this can only generate a conflict if the two concurrent events are a "mark as + // false positive" action initiated from Vulcan API and a finding detection event from the + // vulndb side which contains a fingerprint variation, as in that situation the FALSE POSITIVE + // state "preference" does not apply. + // Example: + // API -> mark as false positive -> VulnDB API + // API -> retrieve finding state -> VulnDB API + // VulnDB -> check processing -> Reopen false positive finding + // VulnDB -> push reopened finding -> Kafka + // API -> push false positive finding -> Kafka + asyncFinding := findingToAsyncFinding(f) + return p.asyncAPI.PushFinding(asyncFinding) } // processMergeDiscoveredAssets performs the following actions: @@ -407,3 +425,42 @@ func assetToAsyncAsset(a api.Asset) asyncapi.AssetPayload { } return asyncAsset } + +func findingToAsyncFinding(f *api.Finding) asyncapi.FindingPayload { + findingPayload := asyncapi.FindingPayload{ + AffectedResource: f.Finding.AffectedResource, + Details: f.Finding.Details, + Id: f.Finding.ID, + ImpactDetails: f.Finding.ImpactDetails, + Issue: &asyncapi.Issue{ + CweId: f.Finding.Issue.CWEID, + Description: f.Finding.Issue.Description, + Id: f.Finding.Issue.ID, + Labels: []interface{}{f.Finding.Issue.Labels}, + Recommendations: []interface{}{f.Finding.Issue.Recommendations}, + ReferenceLinks: []interface{}{f.Finding.Issue.ReferenceLinks}, + Summary: f.Finding.Issue.Summary, + }, + Resources: []interface{}{f.Finding.Resources}, + Score: float64(f.Finding.Score), + Source: &asyncapi.Source{ + Component: f.Finding.Source.Component, + Id: f.Finding.Source.ID, + Instance: f.Finding.Source.Instance, + Name: f.Finding.Source.Name, + Options: f.Finding.Source.Options, + Time: f.Finding.Source.Time, + }, + Status: f.Finding.Status, + Target: &asyncapi.Target{ + Id: f.Finding.Target.ID, + Identifier: f.Finding.Target.Identifier, + Teams: []interface{}{f.Finding.Target.Teams}, + }, + TotalExposure: int(f.Finding.TotalExposure), + } + if f.Finding.OpenFinding != nil { + findingPayload.CurrentExposure = int(f.Finding.OpenFinding.CurrentExposure) + } + return findingPayload +} diff --git a/pkg/api/store/cdc/parser_test.go b/pkg/api/store/cdc/parser_test.go index 2dea38c5..4b05ebe2 100644 --- a/pkg/api/store/cdc/parser_test.go +++ b/pkg/api/store/cdc/parser_test.go @@ -141,6 +141,7 @@ type mockVulnDBClient struct { deleteTeamTagF func(ctx context.Context, authTeam, teamID, tag string) error deleteTargetTeamF func(ctx context.Context, authTeam, targetID, teamID string) error deleteTargetTagF func(ctx context.Context, authTeam, targetID, tag string) error + getFindingF func(ctx context.Context, findingID string) (*api.Finding, error) updateFindingF func(ctx context.Context, findingID string, payload *api.UpdateFinding, tag string) (*api.Finding, error) } @@ -162,6 +163,9 @@ func (m *mockVulnDBClient) DeleteTargetTeam(ctx context.Context, authTeam, targe func (m *mockVulnDBClient) DeleteTargetTag(ctx context.Context, authTeam, targetID, tag string) error { return m.deleteTargetTagF(ctx, authTeam, targetID, tag) } +func (m *mockVulnDBClient) Finding(ctx context.Context, findingID string) (*api.Finding, error) { + return m.getFindingF(ctx, findingID) +} func (m *mockVulnDBClient) UpdateFinding(ctx context.Context, findingID string, payload *api.UpdateFinding, tag string) (*api.Finding, error) { return m.updateFindingF(ctx, findingID, payload, tag) } @@ -196,14 +200,15 @@ func init() { func TestParse(t *testing.T) { testCases := []struct { - name string - log []Event - vulnDBClient *mockVulnDBClient - asyncAPI func() (*asyncapi.Vulcan, kafka.Client, error) - loggr *mockLoggr - wantNParsed uint - wantAsyncAssets []testutil.AssetTopicData - wantErr error + name string + log []Event + vulnDBClient *mockVulnDBClient + asyncAPI func() (*asyncapi.Vulcan, kafka.Client, error) + loggr *mockLoggr + wantNParsed uint + wantAsyncAssets []testutil.AssetTopicData + wantAsyncFindings []testutil.FindingTopicData + wantErr error }{ { name: "Happy path", @@ -257,6 +262,12 @@ func TestParse(t *testing.T) { deleteTargetTagF: func(ctx context.Context, authTeam, targetID, tag string) error { return nil }, + getFindingF: func(ctx context.Context, findingID string) (*api.Finding, error) { + return &api.Finding{ + Finding: vulndb.FindingExpanded{ + Finding: vulndb.Finding{ID: "1"}}, + }, nil + }, updateFindingF: func(ctx context.Context, findingID string, payload *api.UpdateFinding, tag string) (*api.Finding, error) { var f = &api.Finding{} return f, nil @@ -307,6 +318,26 @@ func TestParse(t *testing.T) { }, }, }, + wantAsyncFindings: []testutil.FindingTopicData{ + { + Payload: asyncapi.FindingPayload{ + Id: "1", + Issue: &asyncapi.Issue{ + Recommendations: []any{nil}, + ReferenceLinks: []any{nil}, + Labels: []any{nil}, + }, + Source: &asyncapi.Source{}, + Target: &asyncapi.Target{ + Teams: []any{nil}, + }, + Resources: []any{nil}, + }, + Headers: map[string][]byte{ + "version": []byte(asyncapi.Version), + }, + }, + }, wantNParsed: 6, }, { @@ -431,6 +462,8 @@ func TestParse(t *testing.T) { if nParsed != tc.wantNParsed { t.Fatalf("expected nParsed to be %d, but got %d", tc.wantNParsed, nParsed) } + + // Verify async assets topic := kclient.Topics[asyncapi.AssetsEntityName] gotAssets, err := testutil.ReadAllAssetsTopic(topic) if err != nil { @@ -445,6 +478,20 @@ func TestParse(t *testing.T) { t.Fatalf("want!=got, diff: %s", diff) } + // Verify async findings + topic = kclient.Topics[asyncapi.FindingsEntityName] + gotFindings, err := testutil.ReadAllFindingsTopic(topic) + if err != nil { + t.Fatalf("error reading findings from kafka %v", err) + } + wantFindings := tc.wantAsyncFindings + sortSlices = cmpopts.SortSlices(func(a, b testutil.FindingTopicData) bool { + return strings.Compare(a.Payload.Id, b.Payload.Id) < 0 + }) + diff = cmp.Diff(wantFindings, gotFindings, sortSlices) + if diff != "" { + t.Fatalf("want!=got, diff: %s", diff) + } }) } } @@ -464,7 +511,10 @@ func (n nullLogger) Debugf(s string, params ...any) { } func newTestAsyncAPI() (*asyncapi.Vulcan, kafka.Client, error) { - topics := map[string]string{asyncapi.AssetsEntityName: "assets"} + topics := map[string]string{ + asyncapi.AssetsEntityName: "assets", + asyncapi.FindingsEntityName: "findings", + } testTopics, err := testutil.PrepareKafka(topics) if err != nil { return nil, kafka.Client{}, fmt.Errorf("error creating test topics: %v", err) diff --git a/pkg/asyncapi/_gen/gen.sh b/pkg/asyncapi/_gen/gen.sh index 32773938..49241423 100755 --- a/pkg/asyncapi/_gen/gen.sh +++ b/pkg/asyncapi/_gen/gen.sh @@ -27,3 +27,7 @@ docker run \ /bin/sh -c " npm install --silent && node gen.js "${SOURCE_FILE}" "${GO_PACKAGE_NAME}"" + +# Add JSON tags +go install github.com/betacraft/easytags@v1.0.2 +easytags models.go diff --git a/pkg/asyncapi/kafka/client.go b/pkg/asyncapi/kafka/client.go index 01bd1af4..7e0c4918 100644 --- a/pkg/asyncapi/kafka/client.go +++ b/pkg/asyncapi/kafka/client.go @@ -7,6 +7,7 @@ package kafka import ( "errors" "fmt" + "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) @@ -23,6 +24,8 @@ var ( const ( kafkaSecurityProtocol = "sasl_ssl" kafkaSaslMechanisms = "SCRAM-SHA-256" + + maxDeliveryWait = 30 * time.Second ) // Client implements an EventStreamClient using Kafka as the event stream @@ -63,8 +66,10 @@ func (c *Client) Push(entity string, id string, payload []byte, metadata map[str if !ok { return ErrUndefinedEntity } + delivered := make(chan kafka.Event) defer close(delivered) + var headers []kafka.Header for k, v := range metadata { headers = append(headers, kafka.Header{ @@ -72,6 +77,7 @@ func (c *Client) Push(entity string, id string, payload []byte, metadata map[str Value: v, }) } + msg := kafka.Message{ TopicPartition: kafka.TopicPartition{ Topic: &topic, @@ -85,7 +91,14 @@ func (c *Client) Push(entity string, id string, payload []byte, metadata map[str if err != nil { return fmt.Errorf("error producing message: %w", err) } - e := <-delivered + + var e kafka.Event + select { + case <-time.After(maxDeliveryWait): + return fmt.Errorf("error time out waiting for mssg delivery confirmation") + case e = <-delivered: + } + m := e.(*kafka.Message) if m.TopicPartition.Error != nil { return fmt.Errorf("error delivering message: %w", m.TopicPartition.Error) diff --git a/pkg/asyncapi/models.go b/pkg/asyncapi/models.go index 6931366b..19e39342 100644 --- a/pkg/asyncapi/models.go +++ b/pkg/asyncapi/models.go @@ -6,22 +6,22 @@ const Version = "v0.0.2" // AssetPayload represents a AssetPayload model. type AssetPayload struct { - Id string - Team *Team - Alias string - Rolfp string - Scannable bool - AssetType *AssetType - Identifier string - Annotations []*Annotation + Id string `json:"id"` + Team *Team `json:"team"` + Alias string `json:"alias"` + Rolfp string `json:"rolfp"` + Scannable bool `json:"scannable"` + AssetType *AssetType `json:"asset_type"` + Identifier string `json:"identifier"` + Annotations []*Annotation `json:"annotations"` } // Team represents a Team model. type Team struct { - Id string - Name string - Description string - Tag string + Id string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + Tag string `json:"tag"` } // AssetType represents an enum of string. @@ -40,6 +40,62 @@ const ( // Annotation represents a Annotation model. type Annotation struct { - Key string - Value string + Key string `json:"key"` + Value string `json:"value"` +} + +// FindingPayload represents a FindingPayload model. +type FindingPayload struct { + AffectedResource string `json:"affected_resource"` + CurrentExposure int `json:"current_exposure"` + Details string `json:"details"` + Id string `json:"id"` + ImpactDetails string `json:"impact_details"` + Issue *Issue `json:"issue"` + Resources []interface{} `json:"resources"` + Score float64 `json:"score"` + Source *Source `json:"source"` + Status string `json:"status"` + Target *Target `json:"target"` + TotalExposure int `json:"total_exposure"` +} + +// Issue represents a Issue model. +type Issue struct { + CweId int `json:"cwe_id"` + Description string `json:"description"` + Id string `json:"id"` + Labels []interface{} `json:"labels"` + Recommendations []interface{} `json:"recommendations"` + ReferenceLinks []interface{} `json:"reference_links"` + Summary string `json:"summary"` +} + +// ResourceGroup represents a ResourceGroup model. +type ResourceGroup struct { + Attributes []interface{} `json:"attributes"` + Name string `json:"name"` + Resources []interface{} `json:"resources"` +} + +// AnonymousSchema32 represents a AnonymousSchema32 model. +type AnonymousSchema32 struct { + AdditionalProperties map[string]string `json:"additional_properties"` +} + +// Source represents a Source model. +type Source struct { + Component string `json:"component"` + Id string `json:"id"` + Instance string `json:"instance"` + Name string `json:"name"` + Options string `json:"options"` + Time string `json:"time"` +} + +// Target represents a Target model. +type Target struct { + Id string `json:"id"` + Identifier string `json:"identifier"` + Teams []interface{} `json:"teams"` } diff --git a/pkg/asyncapi/vulcan.go b/pkg/asyncapi/vulcan.go index 0a7df49f..cbabfc12 100644 --- a/pkg/asyncapi/vulcan.go +++ b/pkg/asyncapi/vulcan.go @@ -39,9 +39,15 @@ func (l LevelLogger) Debugf(s string, params ...any) { level.Debug(l.Logger).Log("log", v) } -// AssetsEntityName defines the key for the assets entity used by an [EventStreamClient] to -// determine the topic where the assets are send. -const AssetsEntityName = "assets" +const ( + // AssetsEntityName defines the key for the assets entity used by an [EventStreamClient] to + // determine the topic where the assets are sent. + AssetsEntityName = "assets" + + // FindingsEntityName defines the key for the findings entity used by an [EventStreamClient] to + // determine the topic where the findings are sent. + FindingsEntityName = "findings" +) // Vulcan implements the asynchorus API of Vulcan. type Vulcan struct { @@ -104,6 +110,25 @@ func (v *Vulcan) DeleteAsset(asset AssetPayload) error { return err } +// PushFinding publishes the state of a finding in the current point of time +// to the underlying [EventStreamClient]. +func (v *Vulcan) PushFinding(finding FindingPayload) error { + v.logger.Debugf("pushing finding %+v", finding) + payload, err := json.Marshal(finding) + if err != nil { + return fmt.Errorf("error marshaling to json: %w", err) + } + metadata := map[string][]byte{ + "version": []byte(Version), + } + err = v.client.Push(FindingsEntityName, finding.Id, payload, metadata) + if err != nil { + return fmt.Errorf("error pushing finding %v: %w", finding, err) + } + v.logger.Debugf("finding pushed %+v", finding) + return nil +} + // NullVulcan implements an Async Vulcan API interface that does not send the // events to any [EventStreamClient]. It's intended to be used when the async // API is disabled but other components still need to fullfill a dependency @@ -123,6 +148,12 @@ func (v *NullVulcan) PushAsset(asset AssetPayload) error { return nil } +// PushFinding acepts an event indicating that a finding has been modified or +// created and just ignores it. +func (v *NullVulcan) PushFinding(finding FindingPayload) error { + return nil +} + func metadata(asset AssetPayload) map[string][]byte { // The asset type can't be nil. return map[string][]byte{ diff --git a/pkg/asyncapi/vulcan_test.go b/pkg/asyncapi/vulcan_test.go index 4db79a31..6927c20d 100644 --- a/pkg/asyncapi/vulcan_test.go +++ b/pkg/asyncapi/vulcan_test.go @@ -27,6 +27,53 @@ var assetFixtures = map[string]AssetPayload{ }, } +var findingFixtures = map[string]FindingPayload{ + "Finding1": { + AffectedResource: "AffectedResource1", + CurrentExposure: 10, + Details: "Details1", + Id: "FindingId1", + ImpactDetails: "ImpactDetails1", + Issue: &Issue{ + CweId: 1, + Description: "Description1", + Id: "IssueId1", + Labels: []interface{}{[]string{ + "Label1", + "Label2", + }}, + Recommendations: []interface{}{[]string{ + "Recommendation1", + "Recommendation2", + }}, + ReferenceLinks: []interface{}{[]string{ + "ReferenceLink1", + "ReferenceLink2", + }}, + Summary: "Summary1", + }, + Score: 7.0, + Source: &Source{ + Component: "Component1", + Id: "SourceId1", + Instance: "SourceInstance1", + Name: "SourceName1", + Options: "SourceOptions1", + Time: "SourceTime1", + }, + Status: "OPEN", + Target: &Target{ + Id: "TargetId1", + Identifier: "TargetIdentifier1", + Teams: []interface{}{[]string{ + "Team1", + "Team2", + }}, + }, + TotalExposure: 50, + }, +} + type nullLogger struct { } @@ -109,8 +156,57 @@ func TestVulcan_PushAsset(t *testing.T) { } } -func mustJSONMarshal(assset AssetPayload) []byte { - content, err := json.Marshal(assset) +func TestVulcan_PushFinding(t *testing.T) { + tests := []struct { + name string + client *inMemStreamClient + logger Logger + finding FindingPayload + want []streamPayload + wantErr bool + }{ + { + name: "PushesFindings", + client: &inMemStreamClient{ + payloads: []streamPayload{}, + }, + logger: nullLogger{}, + finding: findingFixtures["Finding1"], + want: []streamPayload{ + { + ID: findingFixtures["Finding1"].Id, + Entity: FindingsEntityName, + Content: mustJSONMarshal(findingFixtures["Finding1"]), + Metadata: map[string][]byte{ + "version": []byte(Version), + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + v := &Vulcan{ + client: tt.client, + logger: tt.logger, + } + if err := v.PushFinding(tt.finding); (err != nil) != tt.wantErr { + t.Errorf("Vulcan.PushFinding() error = %v, wantErr %v", err, tt.wantErr) + } + sortOpts := cmpopts.SortSlices(func(a, b streamPayload) bool { + return strings.Compare(a.ID, b.ID) < 0 + }) + got := tt.client.payloads + diff := cmp.Diff(tt.want, got, sortOpts) + if diff != "" { + t.Fatalf("want!=got, diff: %s", diff) + } + }) + } +} + +func mustJSONMarshal(payload any) []byte { + content, err := json.Marshal(payload) if err != nil { panic(err) } diff --git a/pkg/testutil/kafka.go b/pkg/testutil/kafka.go index 37ccd781..ea77fd8c 100644 --- a/pkg/testutil/kafka.go +++ b/pkg/testutil/kafka.go @@ -24,6 +24,7 @@ const KafkaTestBroker = "localhost:29092" // with the entities remapped to new topics created. func PrepareKafka(topics map[string]string) (map[string]string, error) { // Generate a unique deterministic topic name for the caller of this function. + tRef := time.Now().Unix() newTopics := map[string]string{} var newTopicNames []string for entity, topic := range topics { @@ -31,7 +32,7 @@ func PrepareKafka(topics map[string]string) (map[string]string, error) { callerName := strings.Replace(runtime.FuncForPC(pc).Name(), ".", "_", -1) callerName = strings.Replace(callerName, "-", "_", -1) parts := strings.Split(callerName, "/") - name := strings.ToLower(fmt.Sprintf("%s_%s_test", topic, parts[len(parts)-1])) + name := strings.ToLower(fmt.Sprintf("%s_%s_%d_test", topic, parts[len(parts)-1], tRef)) newTopics[entity] = name newTopicNames = append(newTopicNames, name) } @@ -167,3 +168,71 @@ Loop: } return topicAssetsData, nil } + +type FindingTopicData struct { + Payload asyncapi.FindingPayload + Headers map[string][]byte +} + +func ReadAllFindingsTopic(topic string) ([]FindingTopicData, error) { + broker := KafkaTestBroker + config := confluentKafka.ConfigMap{ + "go.events.channel.enable": true, + "bootstrap.servers": broker, + "group.id": "test_" + topic, + "enable.partition.eof": true, + "auto.offset.reset": "earliest", + "enable.auto.commit": false, + } + c, err := confluentKafka.NewConsumer(&config) + if err != nil { + return nil, err + } + defer c.Close() + if err = c.Subscribe(topic, nil); err != nil { + return nil, err + } + + var topicFindingsData []FindingTopicData +Loop: + for ev := range c.Events() { + switch e := ev.(type) { + case *confluentKafka.Message: + data := e.Value + finding := asyncapi.FindingPayload{} + // The data will be empty in case the event is a tombstone. + if len(data) > 0 { + err = json.Unmarshal(data, &finding) + if err != nil { + return nil, err + } + } + headers := map[string][]byte{} + for _, v := range e.Headers { + headers[v.Key] = v.Value + } + topicData := FindingTopicData{ + Payload: finding, + Headers: headers, + } + topicFindingsData = append(topicFindingsData, topicData) + _, err := c.CommitOffsets([]confluentKafka.TopicPartition{ + { + Topic: e.TopicPartition.Topic, + Partition: e.TopicPartition.Partition, + Offset: e.TopicPartition.Offset + 1, + }, + }) + if err != nil { + return nil, err + } + case confluentKafka.Error: + return nil, e + case confluentKafka.PartitionEOF: + break Loop + default: + return nil, fmt.Errorf("received unexpected message %v", e) + } + } + return topicFindingsData, nil +}