Skip to content

Commit

Permalink
Merge pull request #40 from mfreeman451/39-bug-not-tracking-recovery
Browse files Browse the repository at this point in the history
refactored, cleaning up a bunch of stuff..
  • Loading branch information
mfreeman451 authored Jan 19, 2025
2 parents 166e925 + 20755b3 commit 28d1efc
Show file tree
Hide file tree
Showing 16 changed files with 657 additions and 367 deletions.
9 changes: 6 additions & 3 deletions buildAll.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#!/bin/bash

VERSION=${VERSION:-1.0.1}


./setup-deb-poller.sh
./setup-deb-dusk-checker.sh
./setup-deb-agent.sh

scp release-artifacts/serviceradar-poller_1.0.0.deb [email protected]:~/
scp release-artifacts/serviceradar-agent_1.0.0.deb [email protected]:~/
scp release-artifacts/serviceradar-dusk-checker_1.0.0.deb [email protected]:~/
scp release-artifacts/serviceradar-poller_${VERSION}.deb [email protected]:~/
scp release-artifacts/serviceradar-agent_${VERSION}.deb [email protected]:~/
scp release-artifacts/serviceradar-dusk-checker_${VERSION}.deb [email protected]:~/
250 changes: 137 additions & 113 deletions pkg/agent/sweep_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,56 +13,124 @@ import (
"github.com/mfreeman451/serviceradar/proto"
)

// SweepService implements sweeper.SweepService and provides network scanning capabilities
type SweepService struct {
sweeper sweeper.Sweeper
mu sync.RWMutex
closed chan struct{}
config sweeper.Config
results []sweeper.Result
scanner sweeper.Scanner
store sweeper.Store
processor sweeper.ResultProcessor
mu sync.RWMutex
closed chan struct{}
config sweeper.Config
}

// NewSweepService creates a new sweep service.
// NewSweepService creates a new sweep service with default configuration
func NewSweepService(config sweeper.Config) (*SweepService, error) {
// Ensure we have default sweep modes if none specified
// Apply default configuration
config = applyDefaultConfig(config)

log.Printf("Creating sweep service with config: %+v", config)

// Create components
scanner := sweeper.NewCombinedScanner(config.Timeout, config.Concurrency, config.ICMPCount)
store := sweeper.NewInMemoryStore()
processor := sweeper.NewDefaultProcessor()

return &SweepService{
scanner: scanner,
store: store,
processor: processor,
closed: make(chan struct{}),
config: config,
}, nil
}

func applyDefaultConfig(config sweeper.Config) sweeper.Config {
// Ensure we have default sweep modes
if len(config.SweepModes) == 0 {
config.SweepModes = []sweeper.SweepMode{
sweeper.ModeTCP, // Always enable TCP scanning
sweeper.ModeICMP, // Enable ICMP for host discovery
sweeper.ModeTCP,
sweeper.ModeICMP,
}
}

// Ensure reasonable defaults
// Set reasonable defaults
if config.Timeout == 0 {
config.Timeout = 2 * time.Second
}
if config.Concurrency == 0 {
config.Concurrency = 100
}
if config.ICMPCount == 0 {
config.ICMPCount = 3 // Default to 3 ICMP attempts
config.ICMPCount = 3
}

log.Printf("Creating sweep service with config: %+v", config)

// Create network sweeper instance
sw := sweeper.NewNetworkSweeper(config)

return &SweepService{
sweeper: sw,
closed: make(chan struct{}),
config: config,
results: make([]sweeper.Result, 0),
}, nil
return config
}

func (s *SweepService) Start(ctx context.Context) error {
log.Printf("Starting sweep service with config: %+v", s.config)
return s.sweeper.Start(ctx)

// Start periodic sweeps
go s.sweepLoop(ctx)

return nil
}

func (s *SweepService) sweepLoop(ctx context.Context) {
ticker := time.NewTicker(s.config.Interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-s.closed:
return
case <-ticker.C:
if err := s.performSweep(ctx); err != nil {
log.Printf("Error during sweep: %v", err)
}
}
}
}

func (s *SweepService) performSweep(ctx context.Context) error {
// Generate targets based on configuration
targets, err := generateTargets(s.config)
if err != nil {
return fmt.Errorf("failed to generate targets: %w", err)
}

// Reset processor state
s.processor.Reset()

// Start the scan
results, err := s.scanner.Scan(ctx, targets)
if err != nil {
return fmt.Errorf("scan failed: %w", err)
}

// Process results as they come in
for result := range results {
// Store the result
if err := s.store.SaveResult(ctx, &result); err != nil {
log.Printf("Failed to save result: %v", err)
continue
}

// Process the result
if err := s.processor.Process(&result); err != nil {
log.Printf("Failed to process result: %v", err)
continue
}
}

return nil
}

func (s *SweepService) Stop() error {
close(s.closed)
return s.sweeper.Stop()
return s.scanner.Stop()
}

// identifyService maps common port numbers to service names
Expand Down Expand Up @@ -102,95 +170,11 @@ func (s *SweepService) GetStatus(ctx context.Context) (*proto.StatusResponse, er
}, nil
}

// Get latest results and log them
results, err := s.sweeper.GetResults(ctx, &sweeper.ResultFilter{
StartTime: time.Now().Add(-s.config.Interval),
})
// Get current summary from processor
summary, err := s.processor.GetSummary()
if err != nil {
log.Printf("Error getting sweep results: %v", err)
return nil, fmt.Errorf("failed to get sweep results: %w", err)
}

log.Printf("Processing %d sweep results", len(results))

// Aggregate results by host
hostMap := make(map[string]*sweeper.HostResult)
portCounts := make(map[int]int)
totalHosts := 0

for _, result := range results {
log.Printf("Processing result for host %s (port %d): available=%v time=%v",
result.Target.Host, result.Target.Port, result.Available, result.RespTime)

// Update port counts
if result.Available {
portCounts[result.Target.Port]++
}

// Get or create host result
host, exists := hostMap[result.Target.Host]
if !exists {
totalHosts++
host = &sweeper.HostResult{
Host: result.Target.Host,
FirstSeen: result.FirstSeen,
LastSeen: result.LastSeen,
Available: false,
PortResults: make([]*sweeper.PortResult, 0),
}
hostMap[result.Target.Host] = host
}

// Update host details
if result.Available {
host.Available = true
if result.Target.Mode == sweeper.ModeTCP {
portResult := &sweeper.PortResult{
Port: result.Target.Port,
Available: true,
RespTime: result.RespTime,
Service: identifyService(result.Target.Port),
}
host.PortResults = append(host.PortResults, portResult)
log.Printf("Found open port %d on host %s (%s)",
result.Target.Port, host.Host, portResult.Service)
}
}
}

// Create the summary
hosts := make([]sweeper.HostResult, 0, len(hostMap))
availableHosts := 0
for _, host := range hostMap {
if host.Available {
availableHosts++
}
hosts = append(hosts, *host)
}

now := time.Now()
summary := sweeper.SweepSummary{
Network: s.config.Networks[0],
TotalHosts: totalHosts,
AvailableHosts: availableHosts,
LastSweep: now,
Hosts: hosts,
Ports: make([]sweeper.PortCount, 0, len(portCounts)),
}

// Add port statistics
for port, count := range portCounts {
summary.Ports = append(summary.Ports, sweeper.PortCount{
Port: port,
Available: count,
})
}

// Log the final summary
log.Printf("Sweep summary: %d total hosts, %d available, %d ports scanned",
summary.TotalHosts, summary.AvailableHosts, len(summary.Ports))
for _, port := range summary.Ports {
log.Printf("Port %d: %d hosts available", port.Port, port.Available)
log.Printf("Error getting sweep summary: %v", err)
return nil, fmt.Errorf("failed to get sweep summary: %w", err)
}

// Convert to JSON for the message field
Expand All @@ -200,6 +184,7 @@ func (s *SweepService) GetStatus(ctx context.Context) (*proto.StatusResponse, er
return nil, fmt.Errorf("failed to marshal sweep status: %w", err)
}

// Return status response
return &proto.StatusResponse{
Available: true,
Message: string(statusJSON),
Expand All @@ -213,12 +198,51 @@ func (s *SweepService) UpdateConfig(config sweeper.Config) error {
s.mu.Lock()
defer s.mu.Unlock()

// Apply default configuration
config = applyDefaultConfig(config)
s.config = config

return s.sweeper.UpdateConfig(config)
return nil
}

// Close implements io.Closer.
func (s *SweepService) Close() error {
return s.Stop()
}

func generateTargets(config sweeper.Config) ([]sweeper.Target, error) {
var targets []sweeper.Target

// For each network
for _, network := range config.Networks {
// Generate IP addresses for the network
ips, err := sweeper.GenerateIPsFromCIDR(network)
if err != nil {
return nil, fmt.Errorf("failed to generate IPs for %s: %w", network, err)
}

// For each IP, create appropriate targets
for _, ip := range ips {
// Add ICMP target if enabled
if sweeper.ContainsMode(config.SweepModes, sweeper.ModeICMP) {
targets = append(targets, sweeper.Target{
Host: ip.String(),
Mode: sweeper.ModeICMP,
})
}

// Add TCP targets if enabled
if sweeper.ContainsMode(config.SweepModes, sweeper.ModeTCP) {
for _, port := range config.Ports {
targets = append(targets, sweeper.Target{
Host: ip.String(),
Port: port,
Mode: sweeper.ModeTCP,
})
}
}
}
}

return targets, nil
}
Loading

0 comments on commit 28d1efc

Please sign in to comment.