Skip to content

Commit

Permalink
Merge pull request #204 from mfreeman451/updates/snmp_poller
Browse files Browse the repository at this point in the history
fixed issues with json configs
  • Loading branch information
mfreeman451 authored Feb 16, 2025
2 parents 6fb5c1e + 568359a commit fedbdb8
Show file tree
Hide file tree
Showing 18 changed files with 518 additions and 176 deletions.
7 changes: 6 additions & 1 deletion pkg/agent/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ type PortChecker struct {
}

func NewPortChecker(details string) (*PortChecker, error) {
log.Printf("Creating new port checker with details: %s", details)

if details == "" {
return nil, errDetailsRequired
log.Printf("NewPortChecker: %v", errDetailsRequiredPorts)
return nil, errDetailsRequiredPorts
}

// Split the details into host and port
Expand All @@ -91,6 +94,8 @@ func NewPortChecker(details string) (*PortChecker, error) {
return nil, fmt.Errorf("%w: %d", errInvalidPort, port)
}

log.Printf("Successfully created port checker for %s:%d", host, port)

return &PortChecker{
Host: host,
Port: port,
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import "errors"

var (
errInvalidPort = errors.New("invalid port")
errDetailsRequired = errors.New("details field is required for port checks")
errDetailsRequiredPorts = errors.New("details field is required for port checks")
errDetailsRequiredGRPC = errors.New("details field is required for gRPC checks")
errDetailsRequiredSNMP = errors.New("details field is required for SNMP checks")
errInvalidDetailsFormat = errors.New("invalid details format: expected 'host:port'")
)
14 changes: 13 additions & 1 deletion pkg/agent/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,23 @@ func initRegistry() checker.Registry {
// Register the gRPC checker
registry.Register("grpc", func(ctx context.Context, serviceName, details string) (checker.Checker, error) {
if details == "" {
return nil, errDetailsRequired
return nil, errDetailsRequiredGRPC
}

return NewExternalChecker(ctx, serviceName, "grpc", details)
})

registry.Register("snmp", func(ctx context.Context, serviceName, details string) (checker.Checker, error) {
if details == "" {
return nil, errDetailsRequiredSNMP
}

// Parse the address from details
addr := details
return &SNMPChecker{
address: addr,
}, nil
})

return registry
}
33 changes: 24 additions & 9 deletions pkg/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -55,6 +56,7 @@ type CheckerConfig struct {
Timeout Duration `json:"timeout,omitempty"`
ListenAddr string `json:"listen_addr,omitempty"`
Additional json.RawMessage `json:"additional,omitempty"`
Details json.RawMessage `json:"details,omitempty"`
}

// ServerConfig holds the agent server configuration.
Expand Down Expand Up @@ -287,11 +289,6 @@ func (s *Server) GetStatus(ctx context.Context, req *proto.StatusRequest) (*prot
return s.getSweepStatus(ctx)
}

// Validate details field for port checks
if req.ServiceType == "port" && req.Details == "" {
return nil, errDetailsRequired
}

// Get the appropriate checker
c, err := s.getChecker(ctx, req)
if err != nil {
Expand Down Expand Up @@ -348,23 +345,31 @@ func (s *Server) loadCheckerConfigs() error {
}

path := filepath.Join(s.configDir, file.Name())

data, err := os.ReadFile(path)
if err != nil {
log.Printf("Warning: Failed to read config file %s: %v", path, err)
continue
}

// Special handling for SNMP config
if strings.HasPrefix(file.Name(), "snmp") {
var conf CheckerConfig
conf.Name = "snmp-" + strings.TrimSuffix(file.Name(), ".json")
conf.Type = "snmp"
conf.Additional = json.RawMessage(data) // Use the entire file as Additional
s.checkerConfs[conf.Name] = conf
log.Printf("Loaded SNMP checker config: %s", conf.Name)
continue
}

// Handle other configs normally
var conf CheckerConfig
if err := json.Unmarshal(data, &conf); err != nil {
log.Printf("Warning: Failed to parse config file %s: %v", path, err)

continue
}

s.checkerConfs[conf.Name] = conf

log.Printf("Loaded checker config: %s (type: %s)", conf.Name, conf.Type)
}

Expand Down Expand Up @@ -393,15 +398,25 @@ func (s *Server) getChecker(ctx context.Context, req *proto.StatusRequest) (chec
s.mu.Lock()
defer s.mu.Unlock()

log.Printf("Getting checker for request - Type: %s, Name: %s, Details: %s",
req.GetServiceType(),
req.GetServiceName(),
req.GetDetails())

key := fmt.Sprintf("%s:%s:%s", req.GetServiceType(), req.GetServiceName(), req.GetDetails())

// Return existing checker if available
if check, exists := s.checkers[key]; exists {
return check, nil
}

// Use the details from the request
details := req.GetDetails()

log.Printf("Creating new checker with details: %s", details)

// Create new checker using registry
check, err := s.registry.Get(ctx, req.ServiceType, req.ServiceName, req.Details)
check, err := s.registry.Get(ctx, req.ServiceType, req.ServiceName, details)
if err != nil {
return nil, err
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/agent/snmp_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package agent

import (
"context"
"fmt"
"log"
"net"
"time"
)

// SNMPChecker implements the checker.Checker interface.
type SNMPChecker struct {
address string
}

func (c *SNMPChecker) Check(ctx context.Context) (bool, string) {
// Try to connect to the SNMP service
conn, err := net.DialTimeout("tcp", c.address, 5*time.Second)
if err != nil {
return false, fmt.Sprintf("Failed to connect to SNMP service: %v", err)
}
defer func(conn net.Conn) {
err := conn.Close()
if err != nil {
log.Printf("Failed to close connection: %v", err)
}
}(conn)

return true, fmt.Sprintf("SNMP service is running at %s", c.address)
}
131 changes: 83 additions & 48 deletions pkg/checker/snmp/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,12 @@ const (
defaultDataChanBufferMultiplier = 2
)

// SNMPCollector implements the Collector interface.
type SNMPCollector struct {
target *Target
client SNMPClient
dataChan chan DataPoint
errorChan chan error
done chan struct{}
closeOnce sync.Once
mu sync.RWMutex
status TargetStatus
bufferPool *sync.Pool
}

// NewCollector creates a new SNMP collector for a target.
func NewCollector(target *Target) (Collector, error) {
if err := validateTarget(target); err != nil {
return nil, fmt.Errorf("%w %w", ErrInvalidTargetConfig, err)
}

// Initialize the SNMP client
client, err := newSNMPClient(target)
if err != nil {
return nil, fmt.Errorf("%w %w", ErrSNMPConnect, err)
Expand All @@ -44,7 +30,7 @@ func NewCollector(target *Target) (Collector, error) {
collector := &SNMPCollector{
target: target,
client: client,
dataChan: make(chan DataPoint, len(target.OIDs)*defaultDataChanBufferMultiplier), // Buffer for 2 polls per OID
dataChan: make(chan DataPoint, len(target.OIDs)*defaultDataChanBufferMultiplier),
errorChan: make(chan error, defaultErrorChan),
done: make(chan struct{}),
status: TargetStatus{
Expand Down Expand Up @@ -148,40 +134,29 @@ func (c *SNMPCollector) pollTarget(ctx context.Context) error {

// processResult handles a single OID result.
func (c *SNMPCollector) processResult(ctx context.Context, oid string, value interface{}) error {
// Find OID config
var oidConfig *OIDConfig

for _, cfg := range c.target.OIDs {
if cfg.OID == oid {
oidConfig = &cfg

break
}
}

oidConfig := c.findOIDConfig(oid)
if oidConfig == nil {
return fmt.Errorf("%w %s", ErrNoOIDConfig, oid)
}

// Convert value based on type
converted, err := c.convertValue(value, oidConfig)
if err != nil {
return fmt.Errorf("%w - %w", ErrSNMPConvert, err)
}

// Create data point
// Create data point with the proper fields
point := DataPoint{
OIDName: oidConfig.Name,
Value: converted,
Timestamp: time.Now(),
DataType: oidConfig.DataType,
Scale: oidConfig.Scale,
Delta: oidConfig.Delta,
}

log.Printf("Collected data point for %s: %v", point.OIDName, point.Value)

// Update OID status
c.updateOIDStatus(oidConfig.Name, point)

// Send data point
select {
case c.dataChan <- point:
return nil
Expand All @@ -193,18 +168,21 @@ func (c *SNMPCollector) processResult(ctx context.Context, oid string, value int
}

// convertValue converts an SNMP value based on the OID configuration.
// Update convertValue to handle interface{} return
func (c *SNMPCollector) convertValue(value interface{}, config *OIDConfig) (interface{}, error) {
switch config.DataType {
case TypeCounter:
return c.convertCounter(value, config.Scale)
return c.convertCounter(value)
case TypeGauge:
return c.convertGauge(value, config.Scale)
return c.convertGauge(value)
case TypeBoolean:
return c.convertBoolean(value)
case TypeBytes:
return c.convertBytes(value, config.Scale)
return c.convertBytes(value)
case TypeString:
return c.convertString(value)
case TypeFloat:
return c.convertFloat(value)
default:
return nil, fmt.Errorf("%w %v", ErrUnsupportedDataType, config.DataType)
}
Expand Down Expand Up @@ -255,24 +233,73 @@ func (c *SNMPCollector) GetStatus() TargetStatus {
}

// convertCounter converts a counter value to a uint64.
func (*SNMPCollector) convertCounter(value interface{}, scale float64) (uint64, error) {
v, ok := value.(uint64)
if !ok {
return 0, fmt.Errorf("%w %T", ErrInvalidCounterType, value)
func (*SNMPCollector) convertCounter(value interface{}) (uint64, error) {
switch v := value.(type) {
case uint64:
return v, nil
case uint32:
return uint64(v), nil
case int64:
if v < 0 {
return 0, fmt.Errorf("%w: negative value", ErrInvalidCounterType)
}
return uint64(v), nil
case int32:
if v < 0 {
return 0, fmt.Errorf("%w: negative value", ErrInvalidCounterType)
}
return uint64(v), nil
case float64:
if v < 0 {
return 0, fmt.Errorf("%w: negative value", ErrInvalidCounterType)
}
return uint64(v), nil
default:
return 0, fmt.Errorf("%w: %T", ErrInvalidCounterType, value)
}
}

return uint64(float64(v) * scale), nil
func (*SNMPCollector) convertFloat(value interface{}) (float64, error) {
switch v := value.(type) {
case float64:
return v, nil
case float32:
return float64(v), nil
case int64:
return float64(v), nil
case int32:
return float64(v), nil
case uint64:
return float64(v), nil
case uint32:
return float64(v), nil
default:
return 0, fmt.Errorf("%w: %T", ErrInvalidFloatType, value)
}
}

func (c *SNMPCollector) findOIDConfig(oid string) *OIDConfig {
for _, cfg := range c.target.OIDs {
if cfg.OID == oid {
return &cfg
}
}
return nil
}

// convertGauge converts a gauge value to a float64.
func (*SNMPCollector) convertGauge(value interface{}, scale float64) (float64, error) {
func (*SNMPCollector) convertGauge(value interface{}) (float64, error) {
switch v := value.(type) {
case uint64:
return float64(v) * scale, nil
return float64(v), nil
case int64:
return float64(v) * scale, nil
return float64(v), nil
case float64:
return v * scale, nil
return v, nil
case uint32:
return float64(v), nil
case int32:
return float64(v), nil
default:
return 0, fmt.Errorf("%w %T", ErrInvalidGaugeType, value)
}
Expand All @@ -291,13 +318,21 @@ func (*SNMPCollector) convertBoolean(value interface{}) (bool, error) {
}

// convertBytes converts a byte value to a uint64.
func (*SNMPCollector) convertBytes(value interface{}, scale float64) (uint64, error) {
v, ok := value.(uint64)
if !ok {

func (*SNMPCollector) convertBytes(value interface{}) (uint64, error) {
switch v := value.(type) {
case uint64:
return v, nil
case uint32:
return uint64(v), nil
case int64:
if v < 0 {
return 0, fmt.Errorf("%w: negative value", ErrInvalidBytesType)
}
return uint64(v), nil
default:
return 0, fmt.Errorf("%w %T", ErrInvalidBytesType, value)
}

return uint64(float64(v) * scale), nil
}

// convertString converts a string value to a string.
Expand Down
Loading

0 comments on commit fedbdb8

Please sign in to comment.