Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Those are the variables you have to setup:
|KAFKA_USER||user|
|KAFKA_PASS||supersecret|
|KAFKA_BROKER|if set to empty the Async API will be disabled|kafka.example.com:9094|
|KAFKA_TOPICS|Contains a map, using toml format, mapping entities in the Vulcan async API to the kafka topics they wil be pushed to, by now the only available entity is ``assets`` |[assets = "assets-topic"]|
|KAFKA_TOPICS|Contains a map, using toml format, mapping entities in the Vulcan async API to the kafka topics they wil be pushed to, by now the only available entity is ``assets`` |{assets = "assets-topic", findings = "findings-topic"}|
First we have to build the `vulcan-api` because the build only copies the file.

We need to provide `linux` compiled binary to the docker build command. This won't be necessary when this component has been open sourced.
Expand Down
6 changes: 6 additions & 0 deletions _resources/config/local.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ key = "a key"
retries = 4
retry_interval = 2 # seconds

[kafka]
user = "user"
pass = "supersecret"
broker = "kafka.example.com:9094"
topics = '{assets = "assets-topic", findings = "findings-topic"}'

[globalpolicy]
# This config policy emulates code policy definition.
[globalpolicy.web-scanning-global]
Expand Down
146 changes: 145 additions & 1 deletion docs/asyncapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ channels:
description: CDC Events of the assets stored in Vulcan.
subscribe:
message:
$ref: '#/components/messages/asset'
$ref: "#/components/messages/asset"
findings:
subscribe:
message:
$ref: "#/components/messages/finding"

components:
messages:
Expand All @@ -29,6 +33,15 @@ components:
contentType: application/json
payload:
$ref: "#/components/schemas/assetPayload"
finding:
headers:
$ref: "#/components/schemas/findingMetadata"
contentType: application/json
payload:
$ref: "#/components/schemas/findingPayload"
summary: Events generated from Vulnerability DB findings state changes
name: Finding
title: Findings state

schemas:
assetMetadata:
Expand Down Expand Up @@ -123,3 +136,134 @@ components:
- name
- description
- tag

findingPayload:
properties:
affected_resource:
type: string
current_exposure:
type: integer
details:
type: string
id:
type: string
impact_details:
type: string
issue:
$ref: '#/components/schemas/issue'
resources:
$ref: '#/components/schemas/resources'
score:
type: number
source:
$ref: '#/components/schemas/source'
status:
type: string
target:
$ref: '#/components/schemas/target'
total_exposure:
type: integer
type: object
additionalProperties: false

findingMetadata:
type: object
additionalProperties: false
properties:
version:
type: string
description: The value of this field is equal to the value of the field info.version of this document.
required:
- version

stringArray:
items:
type: string
type:
- array
- "null"

issue:
properties:
cwe_id:
minimum: 0
type: integer
description:
type: string
id:
type: string
labels:
items:
type: string
type:
- array
- "null"
recommendations:
$ref: '#/components/schemas/stringArray'
reference_links:
$ref: '#/components/schemas/stringArray'
summary:
type: string
type: object
additionalProperties: false

resourceGroup:
properties:
attributes:
items:
type: string
type:
- array
- "null"
name:
type: string
resources:
items:
additionalProperties:
type: string
type: object
type:
- array
- "null"
type: object
additionalProperties: false

resources:
items:
$ref: '#/components/schemas/resourceGroup'
type:
- array
- "null"

source:
properties:
component:
type: string
id:
type: string
instance:
type: string
name:
type: string
options:
type: string
time:
format: date-time
type: string
type: object
additionalProperties: false

target:
properties:
id:
type: string
identifier:
type: string
teams:
items:
type: string
type:
- array
- "null"
type: object
additionalProperties: false
2 changes: 1 addition & 1 deletion local.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ GPC_1_NAME=web-scanning-global
KAFKA_USER=user
KAFKA_PASS=supersecret
KAFKA_BROKER=kafka.example.com:9094
KAFKA_TOPICS={assets = "assets-topic"}
KAFKA_TOPICS={assets = "assets-topic", findings = "findings-topic"}
71 changes: 64 additions & 7 deletions pkg/api/store/cdc/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type AsyncTxParser struct {
type AsyncAPI interface {
PushAsset(asset asyncapi.AssetPayload) error
DeleteAsset(asset asyncapi.AssetPayload) error
PushFinding(finding asyncapi.FindingPayload) error
}

// NewAsyncTxParser builds a new CDC log parser to handle distributed
Expand Down Expand Up @@ -241,12 +242,7 @@ func (p *AsyncTxParser) processUpdateAsset(data []byte) error {
return errInvalidData
}
asyncAsset := assetToAsyncAsset(dto.NewAsset)
err = p.asyncAPI.PushAsset(asyncAsset)
if err != nil {
return err
}

return nil
return p.asyncAPI.PushAsset(asyncAsset)
}

func (p *AsyncTxParser) processDeleteAllAssets(data []byte) error {
Expand Down Expand Up @@ -276,6 +272,7 @@ func (p *AsyncTxParser) processFindingOverwrite(data []byte) error {
return errInvalidData
}

// Update finding in vulndb
_, err = p.VulnDBClient.UpdateFinding(
context.Background(),
dto.FindingOverwrite.FindingID,
Expand All @@ -289,7 +286,28 @@ func (p *AsyncTxParser) processFindingOverwrite(data []byte) error {
}
return err
}
return nil

// Retrieve current finding status and push event
f, err := p.VulnDBClient.Finding(context.Background(), dto.FindingOverwrite.FindingID)
if err != nil {
if errors.IsKind(err, errors.ErrNotFound) {
return nil
}
}
// TODO: There can be a race condition here between two concurrent state changes for a
// finding between this finding overwrite and a related check processing from vulndb side.
// Currently this can only generate a conflict if the two concurrent events are a "mark as
// false positive" action initiated from Vulcan API and a finding detection event from the
// vulndb side which contains a fingerprint variation, as in that situation the FALSE POSITIVE
// state "preference" does not apply.
// Example:
// API -> mark as false positive -> VulnDB API
// API -> retrieve finding state -> VulnDB API
// VulnDB -> check processing -> Reopen false positive finding
// VulnDB -> push reopened finding -> Kafka
// API -> push false positive finding -> Kafka
asyncFinding := findingToAsyncFinding(f)
return p.asyncAPI.PushFinding(asyncFinding)
}

// processMergeDiscoveredAssets performs the following actions:
Expand Down Expand Up @@ -407,3 +425,42 @@ func assetToAsyncAsset(a api.Asset) asyncapi.AssetPayload {
}
return asyncAsset
}

func findingToAsyncFinding(f *api.Finding) asyncapi.FindingPayload {
findingPayload := asyncapi.FindingPayload{
AffectedResource: f.Finding.AffectedResource,
Details: f.Finding.Details,
Id: f.Finding.ID,
ImpactDetails: f.Finding.ImpactDetails,
Issue: &asyncapi.Issue{
CweId: f.Finding.Issue.CWEID,
Description: f.Finding.Issue.Description,
Id: f.Finding.Issue.ID,
Labels: []interface{}{f.Finding.Issue.Labels},
Recommendations: []interface{}{f.Finding.Issue.Recommendations},
ReferenceLinks: []interface{}{f.Finding.Issue.ReferenceLinks},
Summary: f.Finding.Issue.Summary,
},
Resources: []interface{}{f.Finding.Resources},
Score: float64(f.Finding.Score),
Source: &asyncapi.Source{
Component: f.Finding.Source.Component,
Id: f.Finding.Source.ID,
Instance: f.Finding.Source.Instance,
Name: f.Finding.Source.Name,
Options: f.Finding.Source.Options,
Time: f.Finding.Source.Time,
},
Status: f.Finding.Status,
Target: &asyncapi.Target{
Id: f.Finding.Target.ID,
Identifier: f.Finding.Target.Identifier,
Teams: []interface{}{f.Finding.Target.Teams},
},
TotalExposure: int(f.Finding.TotalExposure),
}
if f.Finding.OpenFinding != nil {
findingPayload.CurrentExposure = int(f.Finding.OpenFinding.CurrentExposure)
}
return findingPayload
}
Loading