Skip to content

Commit

Permalink
Merge pull request #28 from deepfence/sensor-plugin-output
Browse files Browse the repository at this point in the history
sensor: Add a posssibility to output directly to plugins
  • Loading branch information
vadorovsky authored Jun 2, 2022
2 parents 83cb8f2 + e9dd214 commit cbc3c1b
Show file tree
Hide file tree
Showing 16 changed files with 233 additions and 89 deletions.
15 changes: 11 additions & 4 deletions cmd/sensor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package cmd

import (
"context"
"log"
"os"
"os/signal"
"syscall"

"github.com/spf13/cobra"

Expand All @@ -19,17 +23,20 @@ var sensorCmd = &cobra.Command{
log.Fatalf("Invalid configuration: %v", err)
}

mainSignalChannel := make(chan bool)

proto := "tcp"
if err := streamer.InitOutput(cfg, proto); err != nil {
log.Fatalf("Failed to connect: %v", err)
}

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())

log.Println("Start sending")
streamer.StartSensor(cfg, mainSignalChannel)
streamer.StartSensor(ctx, cfg)
log.Println("Now waiting in main")
<-mainSignalChannel
<-sigs
cancel()
},
}

Expand Down
2 changes: 0 additions & 2 deletions contrib/config/receiver-s3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ input:
address: 0.0.0.0
port: 8081
output:
file:
path: /dev/null
plugins:
s3:
region: eu-west-1
Expand Down
10 changes: 10 additions & 0 deletions contrib/config/sensor-s3.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
output:
plugins:
s3:
region: eu-west-1
bucket: foo-pcap
totalFileSize: 10MB
uploadChunkSize: 5MB
uploadTimeout: 1m
cannedACL: bucket-owner-full-control
pcapMode: all
2 changes: 2 additions & 0 deletions docs/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
- [Using with Docker](./quickstart/docker.md)
- [Using on Kubernetes](./quickstart/kubernetes.md)
- [Using on Vagrant](./quickstart/vagrant.md)
- [Plugins](./plugins/README.md)
- [S3](./plugins/s3.md)
- [Using with other tools](./tools/README.md)
- [Suricata](./tools/suricata.md)
- [Configuration](./configuration.md)
32 changes: 20 additions & 12 deletions docs/src/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,37 @@
`packetstreamer` is configured using a yaml-formatted configuration file.

```yaml
input: # required in 'receiver' mode
input: # required in 'receiver' mode
address: _ip-address_
port: _listen-port_
output:
server: # required in 'sensor' mode
server: # required in 'sensor' mode
address: _ip-address_
port: _listen-port_
file: # required in 'receiver' mode
path: _filename_|stdout # 'stdout' is a reserved name. Receiver will write to stdout
tls: # optional
file: # required in 'receiver' mode
path: _filename_|stdout # 'stdout' is a reserved name. Receiver will write to stdout
plugins: # optional
s3:
bucket: _string_
region: _string_
totalFileSize: _file_size_ # optional; default: 10 MB
uploadChunkSize: _file_size_ # optional; default: 5 MB
uploadTimeout: _timeout_ # optional; default: 1m
cannedACL: _acl_ # optional; default: Bucket owner enforced
tls: # optional
enable: _true_|_false_
certfile: _filename_
keyfile: _filename_
auth: # optional; receiver and sensor must use same shared key
auth: # optional; receiver and sensor must use same shared key
enable: _true_|_false_
key: _string_
compressBlockSize: _integer_ # optional; default: 65
inputPacketLen: _integer_ # optional; default: 65535
logFilename: _filename_ # optional
pcapMode: _Allow_|_Deny_|_All_ # optional
capturePorts: _list-of-ports_ # optional
compressBlockSize: _integer_ # optional; default: 65
inputPacketLen: _integer_ # optional; default: 65535
logFilename: _filename_ # optional
pcapMode: _Allow_|_Deny_|_All_ # optional
capturePorts: _list-of-ports_ # optional
captureInterfacesPorts: _map: interface-name:port_ # optional
ignorePorts: _list-of-ports_ # optional
ignorePorts: _list-of-ports_ # optional
```
You can find example configuration files in the [`/contrib/config/`](https://github.com/deepfence/PacketStreamer/tree/main/contrib/config)
Expand Down
15 changes: 15 additions & 0 deletions docs/src/plugins/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Plugins

This documentation section is about plugins which allow to stream packets to
various external storage services.

Plugins can be used both from:

- **sensor** - in that case, locally captured packets are streamed through the
plugin
- **receiver** - all packets retrieved from (potentially multiple) sensors are
streamed through the plugin

Currently the plugins are:

- [S3](./s3.md)
62 changes: 62 additions & 0 deletions docs/src/plugins/s3.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# S3

The S3 plugins allows to stream packets to the given S3 buckets.

## Configuration

### AWS credentials

Before running PacketStreamer, AWS credentials need to be configured by one of
the following ways:

- `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables
- `~/.aws/config` file - it can be created by `aws configure`

The first way might be more convenient when running as root (required when
running a sensor).

### Configuration scheme

S3 plugin configuration has the following syntax:

```yaml
output:
plugins: # optional
s3:
bucket: _string_
region: _string_
totalFileSize: _file_size_ # optional; default: 10 MB
uploadChunkSize: _file_size_ # optional; default: 5 MB
uploadTimeout: _timeout_ # optional; default: 1m
cannedACL: _acl_ # optional; default: Bucket owner enforced
```
### Sensor configuration
If you want to stream locally captured packets from sensor to S3, you can use
the following example configuration from
[contrib/config/sensor-s3.yaml](https://raw.githubusercontent.com/deepfence/PacketStreamer/main/contrib/config/sensor-s3.yaml):
```yaml
{{#rustdoc_include ../../../contrib/config/sensor-s3.yaml}}
```

And run PacketStreamer with it:

```bash
sudo packetstreamer sensor --config ./contrib/config/sensor-s3.yaml
```

### Receiver configuration

If you want to stream packets from receiver to S3, you can use the following
example configuration from
[contrib/config/receiver-s3.yaml]

```yaml
{{#rustdoc_include ../../../contrib/config/receiver-s3.yaml}}
```

```bash
packetstreamer receiver --config ./contrib/config/receiver-s3.yaml
```
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/deepfence/PacketStreamer
go 1.17

require (
github.com/aws/aws-sdk-go-v2 v1.16.2
github.com/aws/aws-sdk-go-v2 v1.16.4
github.com/aws/aws-sdk-go-v2/config v1.15.3
github.com/aws/aws-sdk-go-v2/service/s3 v1.26.3
github.com/confluentinc/confluent-kafka-go v1.8.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/aws/aws-sdk-go-v2 v1.16.2 h1:fqlCk6Iy3bnCumtrLz9r3mJ/2gUT0pJ0wLFVIdWh+JA=
github.com/aws/aws-sdk-go-v2 v1.16.2/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU=
github.com/aws/aws-sdk-go-v2 v1.16.4 h1:swQTEQUyJF/UkEA94/Ga55miiKFoXmm/Zd67XHgmjSg=
github.com/aws/aws-sdk-go-v2 v1.16.4/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1 h1:SdK4Ppk5IzLs64ZMvr6MrSficMtjY2oS0WOORXTlxwU=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1/go.mod h1:n8Bs1ElDD2wJ9kCRTczA83gYbBmjSwZp3umc6zF4EeM=
github.com/aws/aws-sdk-go-v2/config v1.15.3 h1:5AlQD0jhVXlGzwo+VORKiUuogkG7pQcLJNzIzK7eodw=
Expand Down
4 changes: 3 additions & 1 deletion pkg/config/sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ var (
)

func ValidateSensorConfig(config *Config) error {
if config.Output.File == nil && config.Output.Server == nil {
if config.Output.File == nil && config.Output.Server == nil &&
(config.Output.Plugins == nil ||
(config.Output.Plugins.S3 == nil && config.Output.Plugins.Kafka == nil)) {
return ErrNoOutputConfigured
}
if config.Output.Server != nil && config.Output.Server.Port == nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package plugins
import (
"context"
"fmt"
"log"

"github.com/deepfence/PacketStreamer/pkg/config"
"github.com/deepfence/PacketStreamer/pkg/plugins/kafka"
"github.com/deepfence/PacketStreamer/pkg/plugins/s3"
Expand All @@ -18,7 +20,8 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) {
var plugins []chan<- string

if config.Output.Plugins.S3 != nil {
s3plugin, err := s3.NewPlugin(ctx, config.Output.Plugins.S3)
log.Println("Starting S3 plugin")
s3plugin, err := s3.NewPlugin(ctx, config)

if err != nil {
return nil, fmt.Errorf("error starting S3 plugin, %v", err)
Expand All @@ -29,6 +32,7 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) {
}

if config.Output.Plugins.Kafka != nil {
log.Println("Starting Kafka plugin")
kafkaPlugin, err := kafka.NewPlugin(config.Output.Plugins.Kafka)

if err != nil {
Expand Down
49 changes: 25 additions & 24 deletions pkg/plugins/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ package s3
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"log"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
awsConfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcapgo"

"github.com/deepfence/PacketStreamer/pkg/config"
"github.com/deepfence/PacketStreamer/pkg/file"
"log"
"time"
)

const (
Expand All @@ -23,6 +25,7 @@ type Plugin struct {
S3Client *s3.Client
Region string
Bucket string
InputPacketLen int
TotalFileSize uint64
UploadChunkSize uint64
UploadTimeout time.Duration
Expand All @@ -36,8 +39,9 @@ type MultipartUpload struct {
TotalDataSent int
}

func NewPlugin(ctx context.Context, config *config.S3PluginConfig) (*Plugin, error) {
awsCfg, err := awsConfig.LoadDefaultConfig(ctx, awsConfig.WithRegion(config.Region))
func NewPlugin(ctx context.Context, config *config.Config) (*Plugin, error) {
awsCfg, err := awsConfig.LoadDefaultConfig(ctx,
awsConfig.WithRegion(config.Output.Plugins.S3.Region))

if err != nil {
return nil, fmt.Errorf("error loading AWS config when creating S3 client, %v", err)
Expand All @@ -51,12 +55,12 @@ func NewPlugin(ctx context.Context, config *config.S3PluginConfig) (*Plugin, err

return &Plugin{
S3Client: s3Client,
Region: config.Region,
Bucket: config.Bucket,
TotalFileSize: uint64(*config.TotalFileSize),
UploadChunkSize: uint64(*config.UploadChunkSize),
UploadTimeout: config.UploadTimeout,
CannedACL: config.CannedACL,
Region: config.Output.Plugins.S3.Region,
Bucket: config.Output.Plugins.S3.Bucket,
TotalFileSize: uint64(*config.Output.Plugins.S3.TotalFileSize),
UploadChunkSize: uint64(*config.Output.Plugins.S3.UploadChunkSize),
UploadTimeout: config.Output.Plugins.S3.UploadTimeout,
CannedACL: config.Output.Plugins.S3.CannedACL,
}, nil
}

Expand All @@ -78,7 +82,6 @@ func (mpu *MultipartUpload) appendToBuffer(data []byte) {
func (p *Plugin) Start(ctx context.Context) chan<- string {
inputChan := make(chan string)
go func() {
payloadMarker := []byte{0x0, 0x0, 0x0, 0x0}
var mpu *MultipartUpload

for {
Expand All @@ -92,18 +95,8 @@ func (p *Plugin) Start(ctx context.Context) chan<- string {
log.Printf("error creating multipart upload, stopping... - %v\n", err)
return
}

mpu.appendToBuffer(file.Header)

if err != nil {
log.Printf("error adding header to buffer, stopping... - %v\n", err)
return
}
}
data := []byte(chunk)
dataLen := len(data)
binary.LittleEndian.PutUint32(payloadMarker[:], uint32(dataLen))
mpu.appendToBuffer(payloadMarker)
mpu.appendToBuffer(data)

if uint64(len(mpu.Buffer)) >= p.UploadChunkSize {
Expand Down Expand Up @@ -230,5 +223,13 @@ func (p *Plugin) createMultipartUpload(ctx context.Context) (*MultipartUpload, e
return nil, fmt.Errorf("error creating multipart upload, %v", err)
}

return newMultipartUpload(output), nil
mpu := newMultipartUpload(output)

var pcapBuffer bytes.Buffer
pcapWriter := pcapgo.NewWriter(&pcapBuffer)
pcapWriter.WriteFileHeader(uint32(p.InputPacketLen), layers.LinkTypeEthernet)

mpu.appendToBuffer(pcapBuffer.Bytes())

return mpu, nil
}
13 changes: 7 additions & 6 deletions pkg/streamer/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ var (
hdrData = [...]byte{0xde, 0xef, 0xec, 0xe0}
)

func writeOutput(config *config.Config, tmpData []byte) int {
func writeOutput(config *config.Config, tmpData []byte) error {
if outputFd == nil {
return nil
}

var numAttempts = 0
reconnectAttempt := false
Expand All @@ -38,15 +41,13 @@ func writeOutput(config *config.Config, tmpData []byte) int {
reconnectAttempt = true
err := InitOutput(config, "tcp")
if err != nil {
log.Printf("Tried to reconnect but got: %v\n", err)
return 1
return fmt.Errorf("tried to reconnect but got: %w", err)
}
log.Printf("Tried to write for %d times. Reconnecting once. \n", numAttempts)
numAttempts = 0
continue
}
log.Printf("Tried to write for %d times. Bailing out. \n", numAttempts)
return 1
return fmt.Errorf("tried to write for %d times", numAttempts)
}

bytesWritten, err := outputFd.Write(tmpData[totalBytesWritten:])
Expand All @@ -63,7 +64,7 @@ func writeOutput(config *config.Config, tmpData []byte) int {
continue
}

return 0
return nil
}
}

Expand Down
Loading

0 comments on commit cbc3c1b

Please sign in to comment.