Skip to content

Commit

Permalink
Merge pull request #30 from mfreeman451/29-improve-ui-to-handle-more-…
Browse files Browse the repository at this point in the history
…devices

29 improve UI to handle more devices
  • Loading branch information
mfreeman451 authored Jan 18, 2025
2 parents 9786f71 + 0b04d3e commit 3662887
Show file tree
Hide file tree
Showing 22 changed files with 1,953 additions and 531 deletions.
1 change: 0 additions & 1 deletion buildAll.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
./setup-deb-poller.sh
./setup-deb-dusk-checker.sh
./setup-deb-agent.sh
./setup-deb-cloud.sh

scp release-artifacts/homemon-poller_1.0.0.deb [email protected]:~/
scp release-artifacts/homemon-agent_1.0.0.deb [email protected]:~/
Expand Down
58 changes: 33 additions & 25 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package main

import (
"context"
"flag"
"log"
"os"
Expand Down Expand Up @@ -29,61 +30,68 @@ func main() {
listenAddr := flag.String("listen", ":50051", "gRPC listen address")
flag.Parse()

// Create gRPC server
// Create main context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create agent server first
server, err := agent.NewServer(*configDir)
if err != nil {
log.Fatalf("Failed to create agent server: %v", err)
}

// Create and configure gRPC server
grpcServer := grpc.NewServer(*listenAddr,
grpc.WithMaxRecvSize(maxRecvSize),
grpc.WithMaxSendSize(maxSendSize),
)

// Setup health check
hs := health.NewServer()
hs.SetServingStatus("AgentService", healthpb.HealthCheckResponse_SERVING)
err := grpcServer.RegisterHealthServer(hs)

if err != nil {
if err := grpcServer.RegisterHealthServer(hs); err != nil {
log.Fatalf("Failed to register health server: %v", err)
}

// Create agent server
server, err := agent.NewServer(*configDir)
if err != nil {
log.Fatalf("Failed to create agent server: %v", err)
}

defer func(server *agent.Server) {
err := server.Close()
if err != nil {
log.Printf("Failed to close agent server: %v", err)
}
}(server)

// Register agent service with gRPC server
// Register agent service
proto.RegisterAgentServiceServer(grpcServer.GetGRPCServer(), server)

// Start gRPC server in a goroutine
// Start the gRPC server
errChan := make(chan error, 1)

go func() {
log.Printf("gRPC server listening on %s", *listenAddr)

log.Printf("Starting gRPC server on %s", *listenAddr)
if err := grpcServer.Start(); err != nil {
errChan <- err
}
}()

// Handle shutdown signals
// Start the agent services
if err := server.Start(ctx); err != nil {
log.Printf("Warning: Failed to start some agent services: %v", err)
}

// Handle shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

// Wait for shutdown signal or error
select {
case err := <-errChan:
log.Printf("Server error: %v", err)
case sig := <-sigChan:
log.Printf("Received signal %v, initiating shutdown", sig)
case err := <-errChan:
log.Printf("Server error: %v, initiating shutdown", err)
}

// Begin graceful shutdown
log.Printf("Starting graceful shutdown...")

// Stop gRPC server
grpcServer.Stop()

// Close agent server
if err := server.Close(); err != nil {
log.Printf("Error during server shutdown: %v", err)
}

log.Printf("Shutdown complete")
}
17 changes: 17 additions & 0 deletions pkg/agent/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package agent

import (
"context"

"github.com/mfreeman451/homemon/proto"
)

type Service interface {
Start(context.Context) error
Stop() error
}

// SweepStatusProvider is an interface for services that can provide sweep status
type SweepStatusProvider interface {
GetStatus(context.Context) (*proto.StatusResponse, error)
}
163 changes: 137 additions & 26 deletions pkg/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/mfreeman451/homemon/pkg/checker"
"github.com/mfreeman451/homemon/pkg/sweeper"
"github.com/mfreeman451/homemon/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -34,6 +35,15 @@ var (

type Duration time.Duration

// SweepConfig represents sweep service configuration from JSON.
type SweepConfig struct {
Networks []string `json:"networks"`
Ports []int `json:"ports"`
Interval Duration `json:"interval"`
Concurrency int `json:"concurrency"`
Timeout Duration `json:"timeout"`
}

// CheckerConfig represents the configuration for a checker.
type CheckerConfig struct {
Name string `json:"name"`
Expand All @@ -52,6 +62,7 @@ type Server struct {
checkers map[string]checker.Checker
checkerConfs map[string]CheckerConfig
configDir string
services []Service
}

// NewServer creates a new agent server.
Expand All @@ -66,9 +77,111 @@ func NewServer(configDir string) (*Server, error) {
return nil, fmt.Errorf("failed to load checker configs: %w", err)
}

// Load optional services
if err := s.loadServices(); err != nil {
log.Printf("Warning: some services failed to load: %v", err)
}

return s, nil
}

// Start starts all registered services
func (s *Server) Start(ctx context.Context) error {
var startupErrs []error

// Start each service in its own goroutine
for _, svc := range s.services {
go func(svc Service) {
if err := svc.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
startupErrs = append(startupErrs, err)
log.Printf("Service startup error: %v", err)
}
}(svc)
}

// If we want to wait for services to start, we could add a startup channel pattern here

if len(startupErrs) > 0 {
return fmt.Errorf("some services failed to start: %v", startupErrs)
}

return nil
}

func (s *Server) GetStatus(ctx context.Context, req *proto.StatusRequest) (*proto.StatusResponse, error) {
// Special handling for sweep status requests
if req.ServiceType == "sweep" {
return s.getSweepStatus(ctx)
}

// Get the appropriate checker
c, err := s.getChecker(ctx, req)
if err != nil {
return nil, err
}

// Execute the check
available, message := c.Check(ctx)

// Return the status response
return &proto.StatusResponse{
Available: available,
Message: message,
ServiceName: req.ServiceName,
ServiceType: req.ServiceType,
}, nil
}

func loadSweepService(configDir string) (Service, error) {
sweepConfigPath := filepath.Join(configDir, "sweep.json")

// Check if config exists
if _, err := os.Stat(sweepConfigPath); os.IsNotExist(err) {
return nil, err
}

// Load and parse config
data, err := os.ReadFile(sweepConfigPath)
if err != nil {
return nil, fmt.Errorf("failed to read sweep config: %w", err)
}

var sweepConfig SweepConfig
if err = json.Unmarshal(data, &sweepConfig); err != nil {
return nil, fmt.Errorf("failed to parse sweep config: %w", err)
}

// Convert to sweeper.Config
config := sweeper.Config{
Networks: sweepConfig.Networks,
Ports: sweepConfig.Ports,
Interval: time.Duration(sweepConfig.Interval),
Concurrency: sweepConfig.Concurrency,
Timeout: time.Duration(sweepConfig.Timeout),
}

// Create service
return NewSweepService(config)
}

// loadServices initializes any optional services found in the config directory
func (s *Server) loadServices() error {
// Try to load sweep service if configured
sweepService, err := loadSweepService(s.configDir)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("failed to load sweep service: %w", err)
}
} else if sweepService != nil {
s.services = append(s.services, sweepService)
}

// Additional services can be loaded here in the future
// Each service should follow the Service interface

return nil
}

func (d *Duration) UnmarshalJSON(b []byte) error {
var s string

Expand All @@ -91,7 +204,7 @@ func (d *Duration) UnmarshalJSON(b []byte) error {
return err
}

*d = Duration(time.Duration(n))
*d = Duration(n)

return nil
}
Expand Down Expand Up @@ -158,23 +271,21 @@ func (*Server) initializeChecker(
}
}

// GetStatus returns the status of a service.
func (s *Server) GetStatus(ctx context.Context, req *proto.StatusRequest) (*proto.StatusResponse, error) {
// logs, etc.
check, err := s.getChecker(ctx, req) // pass the entire request
if err != nil {
return nil, err
// getSweepStatus handles status requests specifically for the sweep service.
func (s *Server) getSweepStatus(ctx context.Context) (*proto.StatusResponse, error) {
// Find sweep service among registered services
for _, svc := range s.services {
if provider, ok := svc.(SweepStatusProvider); ok {
return provider.GetStatus(ctx)
}
}

// Run the check
timeoutCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()

available, msg := check.Check(timeoutCtx)

// Return a response indicating sweep service is not configured
return &proto.StatusResponse{
Available: available,
Message: msg,
Available: false,
Message: "Sweep service not configured",
ServiceName: "network_sweep",
ServiceType: "sweep",
}, nil
}

Expand Down Expand Up @@ -322,21 +433,21 @@ func (s *Server) ListServices() []string {
return services
}

// Close handles cleanup when the server shuts down.
// Close stops all services and cleans up resources.
func (s *Server) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
var closeErrs []error

var lastErr error
for _, svc := range s.services {
if err := svc.Stop(); err != nil {
closeErrs = append(closeErrs, err)

for name, check := range s.checkers {
if closer, ok := check.(interface{ Close() error }); ok {
if err := closer.Close(); err != nil {
log.Printf("Error closing checker %s: %v", name, err)
lastErr = err
}
log.Printf("Error stopping service: %v", err)
}
}

return lastErr
if len(closeErrs) > 0 {
return fmt.Errorf("errors during shutdown: %v", closeErrs)
}

return nil
}
Loading

0 comments on commit 3662887

Please sign in to comment.