Skip to content

Commit

Permalink
Merge pull request #91 from mfreeman451/bug/network_sweeper
Browse files Browse the repository at this point in the history
Bug/network sweeper
  • Loading branch information
mfreeman451 authored Jan 22, 2025
2 parents b7815bd + 777a799 commit 37c9c4f
Show file tree
Hide file tree
Showing 16 changed files with 417 additions and 347 deletions.
24 changes: 24 additions & 0 deletions cmd/agent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# agent

## Startup Sequence

```mermaid
sequenceDiagram
participant A as Agent Server
participant S as Sweep Service
participant M as Monitor
participant CS as Combined Scanner
A->>S: Start(ctx)
Note over A,S: loadServices() -> loadSweepService()
A->>S: startNodeMonitoring(ctx)
Note over S: Sleep 30s (nodeDiscoveryTimeout)
Note over S: checkInitialStates()
Note over S: Sleep 30s (nodeNeverReportedTimeout)
Note over S: checkNeverReportedPollers()
S->>M: MonitorPollers(ctx)
Note over M: ticker := time.NewTicker(pollerTimeout)
M->>CS: Scan(ctx, targets)
```
21 changes: 14 additions & 7 deletions cmd/poller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@ func main() {
if err != nil {
log.Printf("Failed to create poller: %v", err)
cancel()

return
}

defer func(p *poller.Poller) {
err := p.Close()
if err != nil {
// Ensure poller is closed after main exits
defer func() {
log.Printf("Closing poller...")

if err := p.Close(); err != nil {
log.Printf("Failed to close poller: %v", err)
}
}(p)
}()

// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
Expand All @@ -46,7 +50,12 @@ func main() {
// Start poller in a goroutine
errChan := make(chan error, 1)
go func() {
log.Printf("Starting poller...")
errChan <- p.Start(ctx)

log.Printf("Poller Start() goroutine finished")

close(errChan) // Ensure channel is closed after Start returns
}()

// Wait for either error or shutdown signal
Expand All @@ -57,11 +66,9 @@ func main() {
cancel()
}
case sig := <-sigChan:
log.Printf("Received signal %v, shutting down", sig)
log.Printf("Received signal %v, initiating shutdown", sig)
cancel()
}

// Wait for shutdown to complete
<-errChan
log.Println("Shutdown complete")
}
62 changes: 40 additions & 22 deletions pkg/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,20 @@ const (
processConfigurationName = "process"
)

var (
errInvalidDuration = errors.New("invalid duration")
)

type Duration time.Duration

// SweepConfig represents sweep service configuration from JSON.
type SweepConfig struct {
Networks []string `json:"networks"`
Ports []int `json:"ports"`
Interval Duration `json:"interval"`
Concurrency int `json:"concurrency"`
Timeout Duration `json:"timeout"`
Networks []string `json:"networks"`
Ports []int `json:"ports"`
SweepModes []models.SweepMode `json:"sweep_modes"`
Interval Duration `json:"interval"`
Concurrency int `json:"concurrency"`
Timeout Duration `json:"timeout"`
}

// CheckerConfig represents the configuration for a checker.
Expand Down Expand Up @@ -126,34 +131,49 @@ func (s *Server) GetStatus(ctx context.Context, req *proto.StatusRequest) (*prot

func loadSweepService(configDir string) (Service, error) {
sweepConfigPath := filepath.Join(configDir, "sweep.json")
log.Printf("Looking for sweep config at: %s", sweepConfigPath)

// Check if config exists
if _, err := os.Stat(sweepConfigPath); os.IsNotExist(err) {
log.Printf("Sweep config not found at %s", sweepConfigPath)
return nil, err
}

// Load and parse config
data, err := os.ReadFile(sweepConfigPath)
if err != nil {
log.Printf("Failed to read sweep config: %v", err)
return nil, fmt.Errorf("failed to read sweep config: %w", err)
}

var sweepConfig SweepConfig
if err = json.Unmarshal(data, &sweepConfig); err != nil {
log.Printf("Failed to parse sweep config: %v", err)
return nil, fmt.Errorf("failed to parse sweep config: %w", err)
}

log.Printf("Successfully loaded sweep config: %+v", sweepConfig)

// Convert to sweeper.Config
config := &models.Config{
Networks: sweepConfig.Networks,
Ports: sweepConfig.Ports,
SweepModes: sweepConfig.SweepModes,
Interval: time.Duration(sweepConfig.Interval),
Concurrency: sweepConfig.Concurrency,
Timeout: time.Duration(sweepConfig.Timeout),
}

// Create service
return NewSweepService(config)
service, err := NewSweepService(config)
if err != nil {
log.Printf("Failed to create sweep service: %v", err)
return nil, err
}

log.Printf("Successfully created sweep service with config: %+v", config)

return service, nil
}

// loadServices initializes any optional services found in the config directory.
Expand All @@ -174,31 +194,29 @@ func (s *Server) loadServices() error {
return nil
}

// UnmarshalJSON implements json.Unmarshaler for Duration.
func (d *Duration) UnmarshalJSON(b []byte) error {
var s string
var v interface{}
if err := json.Unmarshal(b, &v); err != nil {
return err
}

if err := json.Unmarshal(b, &s); err == nil {
// user wrote e.g. "5m"
parsed, err := time.ParseDuration(s)
switch value := v.(type) {
case float64:
*d = Duration(time.Duration(value))
return nil
case string:
tmp, err := time.ParseDuration(value)
if err != nil {
return err
}

*d = Duration(parsed)
*d = Duration(tmp)

return nil
default:
return errInvalidDuration
}

// fallback to number-of-nanoseconds if needed
var n int64

if err := json.Unmarshal(b, &n); err != nil {
return err
}

*d = Duration(n)

return nil
}

// loadCheckerConfigs loads all checker configurations from the config directory.
Expand Down
Loading

0 comments on commit 37c9c4f

Please sign in to comment.