diff --git a/AGENTS.md b/AGENTS.md index 572f0341..5d4a167f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -160,6 +160,9 @@ Copy `config/config-sample.json` to `config/config.json`. All keys from the orig - `LEGACY_STATUS_PROJECTION_ENABLE`: Keep v1 `site_status` / `last_status_change` projection updated during shadow-v2-state migration - `LOG_FORMAT`: `text` (default, drop-in compatible) or `json` (structured logging) - `USE_VARIABLE_CHECK_INTERVALS`: Respect per-site `check_interval`; the scheduler uses a short idle poll and maintained `next_check_at` timestamps control which sites are ready +- `DNS_MONITOR_ENABLE`: Enable the independent recursive DNS probe stream +- `DNS_MONITOR_INTERVAL_SEC`: Per-site DNS cadence; initial schedule rows are hash-jittered across this interval +- `DNS_MONITOR_BATCH_SIZE`, `DNS_MONITOR_MAX_WORKERS`, `DNS_MONITOR_SCHEDULE_BATCH_SIZE`: Optional DNS guardrails; 0 means auto-size from `NUM_WORKERS` - `DASHBOARD_PORT`: Internal port for the operator dashboard (0 to disable) - `DEBUG_PORT`: localhost-only pprof port, default 6060 (0 to disable; never exposed remotely) @@ -206,6 +209,15 @@ Every HTTPS check inspects `tls.ConnectionState` for: - TLS version — flags TLS 1.0/1.1 as deprecated - Cipher suite — recorded in audit log +**DNS Monitoring:** +When `DNS_MONITOR_ENABLE` is true, Jetmon runs a separate recursive DNS probe +stream from `jetmon_dns_probe_state`. DNS schedules are spread over +`DNS_MONITOR_INTERVAL_SEC`, lookup workers and batches auto-size from +`NUM_WORKERS` by default, and failures open Degraded `dns` events with resolver +evidence (NXDOMAIN, SERVFAIL, timeout, resolver error). The first DNS rollout +slice is advisory: DNS events do not update the legacy HTTP `site_status` +projection and do not send WPCOM downtime notifications. + **Downtime Verification:** 1. Local check fails → open a `Seems Down` event (severity 3) and enter the local retry queue. The event opens on the **first** failure so `started_at` reflects the actual incident start. Subsequent failures during retry are no-ops on the events table (idempotent dedup). 2. After `NUM_OF_CHECKS` local failures → dispatch to Verifliers (event stays Seems Down) @@ -256,6 +268,7 @@ New tables introduced by Jetmon 2: | `jetmon_event_transitions` | Append-only history of every mutation to `jetmon_events` (open, severity change, state change, cause link, close) | | `jetmon_audit_log` | Operational trail — WPCOM notifications, retry dispatch, verifier RPCs, alert/maintenance suppression, config reloads. Site-state changes do **not** flow through here | | `jetmon_check_history` | RTT and timing samples for trending | +| `jetmon_dns_probe_state` | Independent DNS probe schedule plus latest recursive resolver evidence | | `jetmon_false_positives` | Veriflier non-confirmation events | ## Multi-Host Bucket Coordination diff --git a/config/config-sample.json b/config/config-sample.json index ff3c3363..4221a8a1 100644 --- a/config/config-sample.json +++ b/config/config-sample.json @@ -35,6 +35,14 @@ "KEYWORD_READ_MAX_MS" : 0, "USE_VARIABLE_CHECK_INTERVALS" : true, + "DNS_MONITOR_ENABLE" : false, + "DNS_MONITOR_INTERVAL_SEC" : 900, + "DNS_MONITOR_TIMEOUT_MS" : 2000, + "DNS_MONITOR_BATCH_SIZE" : 0, + "DNS_MONITOR_MAX_WORKERS" : 0, + "DNS_MONITOR_SCHEDULE_BATCH_SIZE" : 0, + "DNS_MONITOR_RESOLVERS" : [], + "LOG_FORMAT" : "text", "DASHBOARD_PORT" : 8080, "DASHBOARD_BIND_ADDR" : "127.0.0.1", diff --git a/config/config.readme b/config/config.readme index 13ada750..5dd60310 100644 --- a/config/config.readme +++ b/config/config.readme @@ -116,6 +116,40 @@ follow-up when the normal interval is longer. Default in the sample config: true. Minimal configs that omit the key retain the compatibility default of false. +DNS_MONITOR_ENABLE +Set to true to run the independent recursive DNS monitor. DNS probes use their +own schedule table and bounded worker loop so DNS health checks do not add DNS +load to every HTTP probe. Default: false. + +DNS_MONITOR_INTERVAL_SEC +Seconds between DNS probes for each active site when DNS monitoring is enabled. +Initial schedules are spread across this interval with stable hash jitter so +enabling DNS monitoring does not create a synchronized lookup wave. Default: +900. + +DNS_MONITOR_TIMEOUT_MS +Per-hostname recursive lookup timeout in milliseconds. Default: 2000. + +DNS_MONITOR_BATCH_SIZE +Maximum due DNS probes to process in one scheduler pass. Set to 0 for automatic +sizing based on NUM_WORKERS. Default: 0. + +DNS_MONITOR_MAX_WORKERS +Maximum DNS lookup workers. Set to 0 for automatic sizing based on NUM_WORKERS. +Default: 0. + +DNS_MONITOR_SCHEDULE_BATCH_SIZE +Maximum missing DNS schedule rows to backfill in one scheduler pass. Set to 0 +for automatic sizing based on the DNS batch size. Default: 0. + +DNS_MONITOR_RESOLVERS +Optional JSON array of recursive resolver addresses used by the DNS monitor +instead of the host's system resolver. Entries may be hostnames/IPs or +host:port values; port defaults to 53. Leave empty to use the system resolver. +This is primarily for controlled test environments and production deployments +that require a known recursive resolver path. Example: +["1.1.1.1:53", "8.8.8.8:53"]. + LOG_FORMAT Log output format. Set to "json" for structured logging (e.g. for log aggregators), or "text" for human-readable output. Default: "text". diff --git a/docs/events.md b/docs/events.md index 5e39bc5d..ed3748d1 100644 --- a/docs/events.md +++ b/docs/events.md @@ -204,6 +204,7 @@ The split exists because the two trails have different consumers and different r | `jetmon_events` + `jetmon_event_transitions` | Public API incident timelines, SLA reports | Long — 30/90 days at full fidelity, then rolled up | | `jetmon_audit_log` | Operators investigating "why did the alert fire" | Short — aggressive pruning is fine once the incident is closed | | `jetmon_check_history` | Response-time trending, baseline learning | Medium — granular timing is high volume | +| `jetmon_dns_probe_state` | DNS scheduler and latest recursive resolver evidence | Medium — one current row per monitored site | ## Causal links diff --git a/docs/operations-guide.md b/docs/operations-guide.md index ecba1e8f..d3ff67e3 100644 --- a/docs/operations-guide.md +++ b/docs/operations-guide.md @@ -27,6 +27,13 @@ Key settings: | `BODY_READ_MAX_MS` | 250 | Post-header body-phase budget in milliseconds for budgeted reads (unknown/large responses) | | `KEYWORD_READ_MAX_BYTES` | 1048576 | Max bytes scanned when keyword checks are enabled | | `KEYWORD_READ_MAX_MS` | 0 | Keyword read budget in milliseconds, 0 inherits full request timeout envelope | +| `DNS_MONITOR_ENABLE` | false | Enable the independent recursive DNS monitor | +| `DNS_MONITOR_INTERVAL_SEC` | 900 | Per-site DNS probe cadence when DNS monitoring is enabled | +| `DNS_MONITOR_TIMEOUT_MS` | 2000 | Per-hostname recursive DNS lookup timeout | +| `DNS_MONITOR_BATCH_SIZE` | 0 | Due DNS probes per scheduler pass; 0 auto-sizes from `NUM_WORKERS` | +| `DNS_MONITOR_MAX_WORKERS` | 0 | DNS lookup worker cap; 0 auto-sizes from `NUM_WORKERS` | +| `DNS_MONITOR_SCHEDULE_BATCH_SIZE` | 0 | Missing DNS schedule rows to backfill per pass; 0 auto-sizes | +| `DNS_MONITOR_RESOLVERS` | empty | Optional recursive resolver list for the DNS monitor; empty uses the host system resolver | | `PEER_OFFLINE_LIMIT` | 3 | Veriflier agreements required to confirm downtime | | `WORKER_MAX_MEM_MB` | 0 | Optional Go runtime memory threshold that triggers worker-pool drain; 0 disables the artificial cap | | `BUCKET_TOTAL` | 1000 | Total bucket range across all hosts | @@ -65,6 +72,24 @@ Scheduler behavior: reporting queries do not run on every short scheduler poll. Use `scheduler.round.due_count_sampled.count` to distinguish sampled polls from intentionally skipped reporting polls. +- DNS monitoring uses `jetmon_dns_probe_state` for its own schedule and latest + recursive resolver evidence. Initial schedule rows are jittered across + `DNS_MONITOR_INTERVAL_SEC`, and `DNS_MONITOR_BATCH_SIZE` / + `DNS_MONITOR_MAX_WORKERS` can stay at 0 so the orchestrator auto-sizes DNS + load from the HTTP worker pool. DNS failures open Degraded `dns` events with + resolver metadata but do not change the legacy `site_status` projection or + send WPCOM downtime notifications in this first rollout slice. +- `DNS_MONITOR_RESOLVERS` can pin DNS probes to a known recursive resolver path + for staging, uptime-bench, or production resolver policy. Use recursive + resolvers that can answer normal A/AAAA/CNAME lookups; pointing this at an + authoritative-only nameserver is only useful when that server is known to + answer every monitored name in the test. When several resolvers are listed, + Jetmon picks a stable resolver per hostname to distribute load without + synchronizing all sites onto one upstream. +- DNS probe metrics include scheduler gauges such as `dns.selected.count` and + status counters such as `dns.check.ok.count`, `dns.check.nxdomain.count`, + and `dns.check.timeout.count`. These should appear under the normal + `com.jetpack.jetmon.` StatsD prefix when StatsD is configured. See [../config/config.readme](../config/config.readme) for the full option reference. diff --git a/docs/project.md b/docs/project.md index 14236cde..328bf4a8 100644 --- a/docs/project.md +++ b/docs/project.md @@ -129,6 +129,15 @@ Add `maintenance_start` and `maintenance_end` (nullable `DATETIME`) columns to ` **Granular Timing Breakdown** Go's `net/http/httptrace` provides discrete callbacks for DNS start/done, TCP connect start/done, TLS handshake start/done, request written, and first response byte. Each check records composite RTT plus DNS, TCP, TLS, and TTFB timings. The raw samples are stored in `jetmon_check_history` for response-time trending and API statistics; scheduler-level StatsD metrics report round/page phase timing and write volume. +**Recursive DNS Monitoring** +Jetmon can run a separate recursive DNS probe stream on a staggered schedule. +DNS probes are stored in `jetmon_dns_probe_state`, auto-size their worker/batch +limits from the HTTP worker pool by default, and open Degraded `dns` events +with NXDOMAIN / SERVFAIL / timeout / resolver-error evidence when resolution +fails. The first rollout slice is intentionally advisory: DNS events do not +update the legacy HTTP `site_status` projection and do not send WPCOM downtime +notifications until product semantics for DNS-to-site rollup are finalized. + When the HTTP probe fails during resolver lookup, Jetmon records structured DNS diagnostics in event metadata when Go exposes them: NXDOMAIN, SERVFAIL, timeout, or a generic resolver error, plus the queried name and resolver server when @@ -366,8 +375,13 @@ Benefits over the current static configuration: These are intentionally out of scope for the initial rewrite. They represent the path to making Jetmon 2 a fully competitive standalone monitoring platform rather than a reliable internal Jetpack service. -**DNS Monitoring** -Check that a domain resolves to expected IPs on a schedule, using Go's `net.LookupHost()`. Alert when the answer changes or when resolution fails. Particularly valuable for detecting DNS hijacking and nameserver misconfigurations before they cause HTTP failures. New monitor type stored as a separate DB table. +**Advanced DNS Monitoring** +Build on the recursive DNS probe stream with explicit DNS-record expectations, +DNSSEC checks, split-horizon checks, full CNAME-chain capture, authoritative +nameserver probes, and DNS latency baselines. These need product semantics for +which failures are advisory, which roll up to site-level downtime, and how +monitor-side resolver impairment is reported as Unknown rather than customer +site downtime. **TCP Port Monitoring** Attempt a TCP connection to an arbitrary host:port on a schedule. No HTTP layer — a successful connection is "up". Useful for database ports, SMTP, and custom application services. A small extension of the existing connection logic. diff --git a/docs/roadmap.md b/docs/roadmap.md index 6b9b9f59..eba72256 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -108,27 +108,35 @@ No active candidate branch is queued here right now. stable in production because dynamic WordPress pages need normalization, training, approval/reset workflows, and operator-visible evidence before Jetmon can safely alert on "content changed unexpectedly." -- [x] Improve DNS diagnostics on HTTP lookup failures before building explicit - DNS monitors. The v2 HTTP checker already records DNS timing and classifies - lookup failures as connect failures; event metadata now distinguishes - NXDOMAIN, SERVFAIL, timeout, and resolver errors where Go/runtime resolver - data can support it. This is the recommended near-term step because it helps - HEs explain failures without creating a new monitor type. -- [ ] Track DNS-specific benchmark scenarios separately from HTTP DNS failures. - Explicit DNS-record, DNSSEC, split-horizon, CNAME-chain, authoritative - nameserver, and DNS-latency monitors need a dedicated check type and event - taxonomy before they should be exposed as production uptime signals. Defer - this larger feature until the product semantics are designed: some DNS - failures should be `Warning` or `Degraded`, some should roll up to site-level - `Down`, and monitor-side resolver impairment must remain `Unknown`. -- [ ] Decide whether Jetmon should add an explicit DNS monitor that bypasses or - complements recursive resolver cache visibility. The 2026-05-05 all-services - gapfill run showed every service, including Jetmon v2, missing short - authoritative DNS failure windows, which is consistent with recursive cache - TTLs hiding the outage from HTTP probes. This needs product semantics before - implementation: direct authoritative checks can catch short DNS outages, but - they also increase query load and can report a failure that many end users do - not observe until caches expire. +- [x] Add the first explicit DNS monitor slice as an independent recursive + DNS probe stream. The implementation has its own schedule table, jittered + initial due times, auto-sized batch/worker guardrails, latest resolver + evidence, and `dns` events that do not mutate the legacy HTTP up/down + projection or send WPCOM downtime notifications yet. +- [x] Harden the DNS monitor after the focused uptime-bench smoke test: + configurable recursive resolvers for controlled test/prod resolver paths, + CNAME evidence preserved on address-lookup failures, DNS status counters, and + causal links from active HTTP events to DNS root-cause events when both are + open for the same site. +- [x] Improve DNS diagnostics on HTTP lookup failures. The v2 HTTP checker + already records DNS timing and classifies lookup failures as connect failures; + event metadata now distinguishes NXDOMAIN, SERVFAIL, timeout, and resolver + errors where Go/runtime resolver data can support it. This remains useful + even with explicit DNS probes because it ties a failed HTTP check directly to + the resolver failure seen on that request path. +- [ ] Expand DNS-specific benchmark coverage beyond the first recursive probe + stream. DNS-record expectation checks, DNSSEC, split-horizon, full CNAME-chain + capture, authoritative nameserver probes, and DNS-latency monitors need + product semantics before they should be exposed as production uptime signals: + some DNS failures should be `Warning` or `Degraded`, some should roll up to + site-level `Down`, and monitor-side resolver impairment must remain `Unknown`. +- [ ] Decide whether Jetmon should later add authoritative DNS probes that + bypass or complement recursive resolver cache visibility. The 2026-05-05 + all-services gapfill run showed every service, including Jetmon v2, missing + short authoritative DNS failure windows, which is consistent with recursive + cache TTLs hiding the outage from HTTP probes. Direct authoritative checks can + catch short DNS outages, but they also increase query load and can report a + failure that many end users do not observe until caches expire. - [ ] Validate geo-scoped benchmark assumptions before changing Jetmon production behavior for `http-geo-503`. Confirm the probe source ranges, intended Jetmon region semantics, and support story for partial regional diff --git a/internal/config/config.go b/internal/config/config.go index 7b8947c4..4b0c4851 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -4,7 +4,9 @@ import ( "encoding/json" "fmt" "log" + "net" "os" + "strconv" "strings" "sync" ) @@ -87,6 +89,17 @@ type Config struct { KeywordReadMaxMS int `json:"KEYWORD_READ_MAX_MS"` UseVariableCheckIntervals bool `json:"USE_VARIABLE_CHECK_INTERVALS"` + // DNS monitoring is a separate scheduled probe stream. Batch/worker values + // default to 0, which lets the orchestrator derive bounded values from the + // HTTP worker count instead of requiring per-host tuning. + DNSMonitorEnable bool `json:"DNS_MONITOR_ENABLE"` + DNSMonitorIntervalSec int `json:"DNS_MONITOR_INTERVAL_SEC"` + DNSMonitorTimeoutMS int `json:"DNS_MONITOR_TIMEOUT_MS"` + DNSMonitorBatchSize int `json:"DNS_MONITOR_BATCH_SIZE"` + DNSMonitorMaxWorkers int `json:"DNS_MONITOR_MAX_WORKERS"` + DNSMonitorScheduleBatchSize int `json:"DNS_MONITOR_SCHEDULE_BATCH_SIZE"` + DNSMonitorResolvers []string `json:"DNS_MONITOR_RESOLVERS"` + LogFormat string `json:"LOG_FORMAT"` DashboardPort int `json:"DASHBOARD_PORT"` DashboardBindAddr string `json:"DASHBOARD_BIND_ADDR"` @@ -221,6 +234,8 @@ func defaults() *Config { BodyReadMaxMS: 250, KeywordReadMaxBytes: 1048576, KeywordReadMaxMS: 0, + DNSMonitorIntervalSec: 900, + DNSMonitorTimeoutMS: 2000, LogFormat: "text", DashboardPort: 8080, DashboardBindAddr: "127.0.0.1", @@ -304,6 +319,26 @@ func validate(cfg *Config) error { if cfg.KeywordReadMaxMS < 0 { return fmt.Errorf("KEYWORD_READ_MAX_MS must be >= 0") } + if cfg.DNSMonitorEnable && cfg.DNSMonitorIntervalSec <= 0 { + return fmt.Errorf("DNS_MONITOR_INTERVAL_SEC must be > 0 when DNS_MONITOR_ENABLE is true") + } + if cfg.DNSMonitorEnable && cfg.DNSMonitorTimeoutMS <= 0 { + return fmt.Errorf("DNS_MONITOR_TIMEOUT_MS must be > 0 when DNS_MONITOR_ENABLE is true") + } + if cfg.DNSMonitorBatchSize < 0 { + return fmt.Errorf("DNS_MONITOR_BATCH_SIZE must be >= 0") + } + if cfg.DNSMonitorMaxWorkers < 0 { + return fmt.Errorf("DNS_MONITOR_MAX_WORKERS must be >= 0") + } + if cfg.DNSMonitorScheduleBatchSize < 0 { + return fmt.Errorf("DNS_MONITOR_SCHEDULE_BATCH_SIZE must be >= 0") + } + for i, resolver := range cfg.DNSMonitorResolvers { + if err := validateDNSResolverAddr(resolver); err != nil { + return fmt.Errorf("DNS_MONITOR_RESOLVERS[%d]: %w", i, err) + } + } if cfg.MinTimeBetweenRoundsSec < 0 { return fmt.Errorf("MIN_TIME_BETWEEN_ROUNDS_SEC must be >= 0") } @@ -376,6 +411,36 @@ func validatePinnedBucketRange(cfg *Config) error { return nil } +func validateDNSResolverAddr(addr string) error { + addr = strings.TrimSpace(addr) + if addr == "" { + return fmt.Errorf("resolver address must not be empty") + } + normalized := addr + if _, _, err := net.SplitHostPort(normalized); err != nil { + if strings.Contains(normalized, ":") { + normalized = net.JoinHostPort(strings.Trim(normalized, "[]"), "53") + } else { + normalized = net.JoinHostPort(normalized, "53") + } + } + host, port, err := net.SplitHostPort(normalized) + if err != nil { + return fmt.Errorf("resolver address must be host or host:port") + } + if strings.TrimSpace(host) == "" { + return fmt.Errorf("resolver host must not be empty") + } + if strings.TrimSpace(port) == "" { + return fmt.Errorf("resolver port must not be empty") + } + portNum, err := strconv.Atoi(port) + if err != nil || portNum < 1 || portNum > 65535 { + return fmt.Errorf("resolver port must be a number between 1 and 65535") + } + return nil +} + func displayName(v VerifierConfig, i int) string { if v.Name != "" { return v.Name diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 9ceafcf8..a9c08e8b 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -172,6 +172,60 @@ func TestValidate(t *testing.T) { mutate: func(c *Config) { c.KeywordReadMaxMS = -1 }, wantErr: true, }, + { + name: "dns monitor requires interval when enabled", + mutate: func(c *Config) { + c.DNSMonitorEnable = true + c.DNSMonitorIntervalSec = 0 + c.DNSMonitorTimeoutMS = 2000 + }, + wantErr: true, + }, + { + name: "dns monitor requires timeout when enabled", + mutate: func(c *Config) { + c.DNSMonitorEnable = true + c.DNSMonitorIntervalSec = 900 + c.DNSMonitorTimeoutMS = 0 + }, + wantErr: true, + }, + { + name: "dns monitor accepts auto sizing", + mutate: func(c *Config) { + c.DNSMonitorEnable = true + c.DNSMonitorIntervalSec = 900 + c.DNSMonitorTimeoutMS = 2000 + c.DNSMonitorBatchSize = 0 + c.DNSMonitorMaxWorkers = 0 + c.DNSMonitorScheduleBatchSize = 0 + }, + }, + { + name: "dns monitor batch size negative", + mutate: func(c *Config) { c.DNSMonitorBatchSize = -1 }, + wantErr: true, + }, + { + name: "dns monitor resolver list accepts host and host port", + mutate: func(c *Config) { + c.DNSMonitorResolvers = []string{"1.1.1.1", "8.8.8.8:53", "[2001:4860:4860::8888]:53"} + }, + }, + { + name: "dns monitor resolver list rejects empty entry", + mutate: func(c *Config) { + c.DNSMonitorResolvers = []string{"1.1.1.1", " "} + }, + wantErr: true, + }, + { + name: "dns monitor resolver list rejects invalid port", + mutate: func(c *Config) { + c.DNSMonitorResolvers = []string{"1.1.1.1:99999"} + }, + wantErr: true, + }, { name: "min time between rounds negative", mutate: func(c *Config) { c.MinTimeBetweenRoundsSec = -1 }, diff --git a/internal/db/migrations.go b/internal/db/migrations.go index f0337f2d..a3030611 100644 --- a/internal/db/migrations.go +++ b/internal/db/migrations.go @@ -484,6 +484,24 @@ var migrations = []migration{ // strings without overloading one column. {33, `ALTER TABLE jetpack_monitor_sites ADD COLUMN forbidden_keywords JSON NULL AFTER forbidden_keyword`}, + + // Migration 34 stores the independent DNS probe schedule and latest DNS + // evidence. Keeping this outside jetpack_monitor_sites avoids putting DNS + // cadence writes on the HTTP scheduler's hot row. + {34, `CREATE TABLE IF NOT EXISTS jetmon_dns_probe_state ( + blog_id BIGINT UNSIGNED NOT NULL PRIMARY KEY, + hostname VARCHAR(255) NOT NULL DEFAULT '', + interval_seconds INT UNSIGNED NOT NULL, + last_checked_at DATETIME NULL, + next_check_at DATETIME NOT NULL, + last_result VARCHAR(32) NULL, + last_error VARCHAR(255) NULL, + last_addresses JSON NULL, + last_cname_chain JSON NULL, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_dns_next_check (next_check_at, blog_id), + INDEX idx_dns_hostname (hostname) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4`}, } // Migrate applies all pending migrations idempotently. diff --git a/internal/db/queries.go b/internal/db/queries.go index d1c0e943..f4d6613f 100644 --- a/internal/db/queries.go +++ b/internal/db/queries.go @@ -3,8 +3,12 @@ package db import ( "context" "database/sql" + "encoding/json" "errors" "fmt" + "hash/fnv" + "net" + "net/url" "sort" "strings" "time" @@ -12,6 +16,40 @@ import ( const batchWriteChunkSize = 500 +// DNSProbeTarget is one site-level hostname check selected by the independent +// DNS scheduler. It intentionally carries only the fields needed by the DNS +// probe path so the HTTP scheduler row remains compact. +type DNSProbeTarget struct { + BlogID int64 + BucketNo int + MonitorURL string + Hostname string + IntervalSeconds int + LastCheckedAt *time.Time + NextCheckAt time.Time +} + +// DNSProbeStateUpdate records the result and next due time for one DNS probe. +type DNSProbeStateUpdate struct { + BlogID int64 + Hostname string + IntervalSeconds int + CheckedAt time.Time + NextCheckAt time.Time + Result string + Error string + Addresses []string + CNAMEChain []string +} + +type dnsScheduleSeed struct { + blogID int64 + hostname string + nextCheck time.Time + lastResult string + lastError string +} + // GetSitesForBucket fetches active sites within the given bucket range. func GetSitesForBucket(ctx context.Context, bucketMin, bucketMax, batchSize int, useVariableIntervals bool) ([]Site, error) { query := ` @@ -132,6 +170,306 @@ func CountDueSitesForBucketRange(ctx context.Context, bucketMin, bucketMax int, return count, nil } +// EnsureDNSProbeSchedules creates DNS schedule rows for active sites that do +// not have one yet. Initial due times are spread across the configured interval +// with a stable hash so enabling DNS monitoring does not create a synchronized +// first-run wave. +func EnsureDNSProbeSchedules(ctx context.Context, bucketMin, bucketMax, limit, intervalSec int, now time.Time) (int, error) { + if limit <= 0 { + return 0, nil + } + if intervalSec <= 0 { + intervalSec = 900 + } + if now.IsZero() { + now = time.Now() + } + rows, err := db.QueryContext(ctx, ` + SELECT s.blog_id, s.monitor_url + FROM jetpack_monitor_sites s + LEFT JOIN jetmon_dns_probe_state d + ON d.blog_id = s.blog_id + WHERE s.monitor_active = 1 + AND s.bucket_no BETWEEN ? AND ? + AND d.blog_id IS NULL + ORDER BY s.blog_id ASC + LIMIT ?`, + bucketMin, bucketMax, limit, + ) + if err != nil { + return 0, fmt.Errorf("query missing DNS schedules: %w", err) + } + defer rows.Close() + + seeds := make([]dnsScheduleSeed, 0, limit) + for rows.Next() { + var blogID int64 + var monitorURL string + if err := rows.Scan(&blogID, &monitorURL); err != nil { + return 0, fmt.Errorf("scan missing DNS schedule: %w", err) + } + hostname, hostErr := hostnameFromMonitorURL(monitorURL) + seed := dnsScheduleSeed{ + blogID: blogID, + hostname: hostname, + nextCheck: InitialDNSNextCheckAt(now.UTC(), intervalSec, blogID, hostname), + } + if hostErr != nil { + seed.lastResult = "invalid_url" + seed.lastError = truncateDBString(hostErr.Error(), 255) + } + seeds = append(seeds, seed) + } + if err := rows.Err(); err != nil { + return 0, fmt.Errorf("iterate missing DNS schedules: %w", err) + } + if len(seeds) == 0 { + return 0, nil + } + + for start := 0; start < len(seeds); start += batchWriteChunkSize { + end := min(start+batchWriteChunkSize, len(seeds)) + if err := insertDNSProbeScheduleChunk(ctx, seeds[start:end], intervalSec); err != nil { + return 0, err + } + } + return len(seeds), nil +} + +func insertDNSProbeScheduleChunk(ctx context.Context, seeds []dnsScheduleSeed, intervalSec int) error { + if len(seeds) == 0 { + return nil + } + var query strings.Builder + query.WriteString(`INSERT INTO jetmon_dns_probe_state + (blog_id, hostname, interval_seconds, next_check_at, last_result, last_error) + VALUES `) + args := make([]any, 0, len(seeds)*6) + for i, seed := range seeds { + if i > 0 { + query.WriteByte(',') + } + query.WriteString("(?, ?, ?, ?, ?, ?)") + args = append(args, + seed.blogID, + seed.hostname, + intervalSec, + seed.nextCheck.UTC(), + nullableString(seed.lastResult), + nullableString(seed.lastError), + ) + } + query.WriteString(" ON DUPLICATE KEY UPDATE blog_id = blog_id") + if _, err := db.ExecContext(ctx, query.String(), args...); err != nil { + return fmt.Errorf("insert DNS schedules: %w", err) + } + return nil +} + +// GetDueDNSProbes returns DNS schedule rows due for this host's active bucket +// range. A monitor_url hostname change is corrected when the result is written +// back; the query keeps scheduling independent from the HTTP hot path. +func GetDueDNSProbes(ctx context.Context, bucketMin, bucketMax, limit int) ([]DNSProbeTarget, error) { + if limit <= 0 { + return nil, nil + } + rows, err := db.QueryContext(ctx, ` + SELECT s.blog_id, + s.bucket_no, + s.monitor_url, + d.hostname, + d.interval_seconds, + d.last_checked_at, + d.next_check_at + FROM jetmon_dns_probe_state d + JOIN jetpack_monitor_sites s + ON s.blog_id = d.blog_id + WHERE s.monitor_active = 1 + AND s.bucket_no BETWEEN ? AND ? + AND d.next_check_at <= NOW() + ORDER BY d.next_check_at ASC, s.blog_id ASC + LIMIT ?`, + bucketMin, bucketMax, limit, + ) + if err != nil { + return nil, fmt.Errorf("query due DNS probes: %w", err) + } + defer rows.Close() + + targets := make([]DNSProbeTarget, 0, limit) + for rows.Next() { + var target DNSProbeTarget + if err := rows.Scan( + &target.BlogID, + &target.BucketNo, + &target.MonitorURL, + &target.Hostname, + &target.IntervalSeconds, + &target.LastCheckedAt, + &target.NextCheckAt, + ); err != nil { + return nil, fmt.Errorf("scan due DNS probe: %w", err) + } + targets = append(targets, target) + } + return targets, rows.Err() +} + +// CountDueDNSProbes returns the number of currently due DNS schedule rows for +// this host's active bucket range. +func CountDueDNSProbes(ctx context.Context, bucketMin, bucketMax int) (int, error) { + var count int + err := db.QueryRowContext(ctx, ` + SELECT COUNT(*) + FROM jetmon_dns_probe_state d + JOIN jetpack_monitor_sites s + ON s.blog_id = d.blog_id + WHERE s.monitor_active = 1 + AND s.bucket_no BETWEEN ? AND ? + AND d.next_check_at <= NOW()`, + bucketMin, bucketMax, + ).Scan(&count) + if err != nil { + return 0, fmt.Errorf("count due DNS probes: %w", err) + } + return count, nil +} + +// UpdateDNSProbeStates writes the latest DNS evidence and next due time for a +// batch of probe results. +func UpdateDNSProbeStates(ctx context.Context, updates []DNSProbeStateUpdate) error { + if len(updates) == 0 { + return nil + } + updates = append([]DNSProbeStateUpdate(nil), updates...) + sort.Slice(updates, func(i, j int) bool { + return updates[i].BlogID < updates[j].BlogID + }) + for start := 0; start < len(updates); start += batchWriteChunkSize { + end := min(start+batchWriteChunkSize, len(updates)) + if err := updateDNSProbeStatesChunk(ctx, updates[start:end]); err != nil { + return err + } + } + return nil +} + +func updateDNSProbeStatesChunk(ctx context.Context, updates []DNSProbeStateUpdate) error { + var query strings.Builder + query.WriteString(`INSERT INTO jetmon_dns_probe_state + (blog_id, hostname, interval_seconds, last_checked_at, next_check_at, last_result, last_error, last_addresses, last_cname_chain) + VALUES `) + args := make([]any, 0, len(updates)*9) + for i, update := range updates { + if i > 0 { + query.WriteByte(',') + } + addresses, err := json.Marshal(update.Addresses) + if err != nil { + return fmt.Errorf("marshal DNS addresses blog_id=%d: %w", update.BlogID, err) + } + cnameChain, err := json.Marshal(update.CNAMEChain) + if err != nil { + return fmt.Errorf("marshal DNS cname chain blog_id=%d: %w", update.BlogID, err) + } + query.WriteString("(?, ?, ?, ?, ?, ?, ?, ?, ?)") + args = append(args, + update.BlogID, + update.Hostname, + update.IntervalSeconds, + update.CheckedAt.UTC(), + update.NextCheckAt.UTC(), + truncateDBString(update.Result, 32), + nullableString(truncateDBString(update.Error, 255)), + string(addresses), + string(cnameChain), + ) + } + query.WriteString(` ON DUPLICATE KEY UPDATE + hostname = VALUES(hostname), + interval_seconds = VALUES(interval_seconds), + last_checked_at = VALUES(last_checked_at), + next_check_at = VALUES(next_check_at), + last_result = VALUES(last_result), + last_error = VALUES(last_error), + last_addresses = VALUES(last_addresses), + last_cname_chain = VALUES(last_cname_chain)`) + if _, err := db.ExecContext(ctx, query.String(), args...); err != nil { + return fmt.Errorf("update DNS probe states: %w", err) + } + return nil +} + +// InitialDNSNextCheckAt returns a stable first due slot within intervalSec for +// a site. The first slot is always in the future relative to now. +func InitialDNSNextCheckAt(now time.Time, intervalSec int, blogID int64, hostname string) time.Time { + if intervalSec <= 0 { + intervalSec = 900 + } + interval := time.Duration(intervalSec) * time.Second + offset := time.Duration(stableDNSOffsetSeconds(blogID, hostname, intervalSec)) * time.Second + next := now.UTC().Truncate(interval).Add(offset) + if !next.After(now.UTC()) { + next = next.Add(interval) + } + return next +} + +func stableDNSOffsetSeconds(blogID int64, hostname string, intervalSec int) int { + if intervalSec <= 1 { + return 0 + } + h := fnv.New64a() + _, _ = fmt.Fprintf(h, "%d:%s", blogID, strings.ToLower(hostname)) + return int(h.Sum64() % uint64(intervalSec)) +} + +func hostnameFromMonitorURL(raw string) (string, error) { + value := strings.TrimSpace(raw) + if value == "" { + return "", fmt.Errorf("monitor_url is empty") + } + host := "" + if u, err := url.Parse(value); err == nil { + host = u.Hostname() + } + if host == "" && !strings.Contains(value, "://") { + if u, err := url.Parse("http://" + value); err == nil { + host = u.Hostname() + } + } + if host == "" { + trimmed := strings.Trim(value, "[]") + if splitHost, _, err := net.SplitHostPort(trimmed); err == nil { + host = splitHost + } else { + if strings.ContainsAny(trimmed, "/?#") { + return "", fmt.Errorf("monitor_url has no hostname") + } + host = trimmed + } + } + host = strings.Trim(strings.ToLower(host), ".") + if host == "" { + return "", fmt.Errorf("monitor_url has no hostname") + } + return host, nil +} + +func truncateDBString(value string, maxLen int) string { + if maxLen <= 0 || len(value) <= maxLen { + return value + } + return value[:maxLen] +} + +func nullableString(value string) any { + if value == "" { + return nil + } + return value +} + // UpdateSiteStatus updates site_status and last_status_change for a site. func UpdateSiteStatus(ctx context.Context, blogID int64, status int, changedAt time.Time) error { _, err := db.ExecContext(ctx, diff --git a/internal/db/queries_test.go b/internal/db/queries_test.go index 0f25218b..9a682bbe 100644 --- a/internal/db/queries_test.go +++ b/internal/db/queries_test.go @@ -225,6 +225,83 @@ func TestCountDueSitesForBucketRangeVariableIntervalsUsesNextCheckAt(t *testing. } } +func TestInitialDNSNextCheckAtSpreadsIntoFuture(t *testing.T) { + now := time.Date(2026, 5, 1, 12, 7, 0, 0, time.UTC) + got := InitialDNSNextCheckAt(now, 900, 42, "example.com") + if !got.After(now) { + t.Fatalf("InitialDNSNextCheckAt = %s, want after %s", got, now) + } + if got.Sub(now) > 900*time.Second { + t.Fatalf("InitialDNSNextCheckAt delta = %s, want <= 15m", got.Sub(now)) + } + again := InitialDNSNextCheckAt(now, 900, 42, "example.com") + if !got.Equal(again) { + t.Fatalf("InitialDNSNextCheckAt not stable: %s != %s", got, again) + } +} + +func TestGetDueDNSProbesScansRows(t *testing.T) { + mock, cleanup := withMockDB(t) + defer cleanup() + + now := time.Now().UTC() + rows := sqlmock.NewRows([]string{ + "blog_id", "bucket_no", "monitor_url", "hostname", "interval_seconds", "last_checked_at", "next_check_at", + }).AddRow(int64(42), 7, "https://example.com", "example.com", 900, now.Add(-time.Hour), now.Add(-time.Minute)) + + mock.ExpectQuery("jetmon_dns_probe_state"). + WithArgs(0, 99, 10). + WillReturnRows(rows) + + targets, err := GetDueDNSProbes(context.Background(), 0, 99, 10) + if err != nil { + t.Fatalf("GetDueDNSProbes: %v", err) + } + if len(targets) != 1 || targets[0].BlogID != 42 || targets[0].Hostname != "example.com" { + t.Fatalf("targets = %+v", targets) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet sql expectations: %v", err) + } +} + +func TestUpdateDNSProbeStatesBatchesJSONEvidence(t *testing.T) { + mock, cleanup := withMockDB(t) + defer cleanup() + + now := time.Now().UTC() + mock.ExpectExec("INSERT INTO jetmon_dns_probe_state"). + WithArgs( + int64(42), + "example.com", + 900, + now, + now.Add(15*time.Minute), + "ok", + nil, + `["192.0.2.10"]`, + `["origin.example.com"]`, + ). + WillReturnResult(sqlmock.NewResult(1, 1)) + + err := UpdateDNSProbeStates(context.Background(), []DNSProbeStateUpdate{{ + BlogID: 42, + Hostname: "example.com", + IntervalSeconds: 900, + CheckedAt: now, + NextCheckAt: now.Add(15 * time.Minute), + Result: "ok", + Addresses: []string{"192.0.2.10"}, + CNAMEChain: []string{"origin.example.com"}, + }}) + if err != nil { + t.Fatalf("UpdateDNSProbeStates: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet sql expectations: %v", err) + } +} + func TestCountRecentlyCheckedActiveSitesForBucketRange(t *testing.T) { mock, cleanup := withMockDB(t) defer cleanup() diff --git a/internal/dnsprobe/dnsprobe.go b/internal/dnsprobe/dnsprobe.go new file mode 100644 index 00000000..f1f48abe --- /dev/null +++ b/internal/dnsprobe/dnsprobe.go @@ -0,0 +1,220 @@ +package dnsprobe + +import ( + "context" + "errors" + "fmt" + "hash/fnv" + "net" + "sort" + "strings" + "time" +) + +const ( + StatusOK = "ok" + StatusNXDomain = "nxdomain" + StatusTimeout = "timeout" + StatusNoRecords = "no_records" + StatusSERVFAIL = "servfail" + StatusResolverError = "resolver_error" + StatusInvalidHost = "invalid_host" +) + +// Resolver captures the net.Resolver methods used by DNS checks. Tests can +// provide a fake resolver without changing production behavior. +type Resolver interface { + LookupCNAME(ctx context.Context, host string) (string, error) + LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error) +} + +// Request describes one recursive DNS probe. +type Request struct { + BlogID int64 + Hostname string + Timeout time.Duration + ResolverAddrs []string +} + +// Result contains the latest DNS evidence for a monitored hostname. +type Result struct { + BlogID int64 + Hostname string + Success bool + Status string + Error string + Addresses []string + CNAMEChain []string + Resolver string + Duration time.Duration + Timestamp time.Time +} + +var defaultResolver Resolver = net.DefaultResolver + +// Check performs a recursive DNS lookup. By default it uses the system resolver; +// when Request.ResolverAddrs is set it uses a stable per-hostname resolver from +// that list so tests and operators can point DNS monitoring at a known recursive +// path without creating synchronized resolver load. +func Check(ctx context.Context, req Request) Result { + resolver, label := resolverForRequest(req) + return checkWithResolver(ctx, resolver, req, label) +} + +// CheckWithResolver performs a recursive DNS lookup with a supplied resolver. +func CheckWithResolver(ctx context.Context, resolver Resolver, req Request) Result { + return checkWithResolver(ctx, resolver, req, "injected") +} + +func checkWithResolver(ctx context.Context, resolver Resolver, req Request, resolverLabel string) Result { + hostname := NormalizeHostname(req.Hostname) + start := time.Now() + res := Result{ + BlogID: req.BlogID, + Hostname: hostname, + Status: StatusResolverError, + Resolver: resolverLabel, + Timestamp: start.UTC(), + } + defer func() { + res.Duration = time.Since(start) + }() + + if hostname == "" { + res.Status = StatusInvalidHost + res.Error = "hostname is empty" + return res + } + if resolver == nil { + res.Status = StatusResolverError + res.Error = "resolver is nil" + return res + } + + timeout := req.Timeout + if timeout <= 0 { + timeout = 2 * time.Second + } + probeCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + if canonical, err := resolver.LookupCNAME(probeCtx, hostname); err == nil { + canonical = NormalizeHostname(canonical) + if canonical != "" && canonical != hostname { + res.CNAMEChain = []string{canonical} + } + } + + addrs, err := resolver.LookupIPAddr(probeCtx, hostname) + if err != nil { + res.Status, res.Error = classifyError(err) + return res + } + if len(addrs) == 0 { + res.Status = StatusNoRecords + res.Error = "no A or AAAA records returned" + return res + } + res.Addresses = normalizeAddresses(addrs) + res.Success = true + res.Status = StatusOK + return res +} + +func resolverForRequest(req Request) (Resolver, string) { + addrs := normalizeResolverAddrs(req.ResolverAddrs) + if len(addrs) == 0 { + return defaultResolver, "system" + } + addr := addrs[resolverIndex(req, len(addrs))] + dialer := &net.Dialer{} + return &net.Resolver{ + PreferGo: true, + Dial: func(ctx context.Context, network, _ string) (net.Conn, error) { + return dialer.DialContext(ctx, network, addr) + }, + }, addr +} + +// NormalizeHostname returns a lower-case hostname without a trailing root dot. +func NormalizeHostname(hostname string) string { + return strings.Trim(strings.ToLower(strings.TrimSpace(hostname)), ".") +} + +func normalizeResolverAddrs(addrs []string) []string { + out := make([]string, 0, len(addrs)) + seen := make(map[string]struct{}, len(addrs)) + for _, raw := range addrs { + addr := strings.TrimSpace(raw) + if addr == "" { + continue + } + normalized := normalizeResolverAddr(addr) + if _, ok := seen[normalized]; ok { + continue + } + seen[normalized] = struct{}{} + out = append(out, normalized) + } + return out +} + +func normalizeResolverAddr(addr string) string { + if _, _, err := net.SplitHostPort(addr); err == nil { + return addr + } + if strings.Contains(addr, ":") { + return net.JoinHostPort(strings.Trim(addr, "[]"), "53") + } + return net.JoinHostPort(addr, "53") +} + +func resolverIndex(req Request, n int) int { + if n <= 1 { + return 0 + } + h := fnv.New32a() + _, _ = fmt.Fprintf(h, "%d/%s", req.BlogID, NormalizeHostname(req.Hostname)) + return int(h.Sum32() % uint32(n)) +} + +func normalizeAddresses(addrs []net.IPAddr) []string { + out := make([]string, 0, len(addrs)) + seen := make(map[string]struct{}, len(addrs)) + for _, addr := range addrs { + if addr.IP == nil { + continue + } + value := addr.IP.String() + if _, ok := seen[value]; ok { + continue + } + seen[value] = struct{}{} + out = append(out, value) + } + sort.Strings(out) + return out +} + +func classifyError(err error) (string, string) { + if err == nil { + return StatusOK, "" + } + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return StatusTimeout, err.Error() + } + var dnsErr *net.DNSError + if errors.As(err, &dnsErr) { + switch { + case dnsErr.IsTimeout: + return StatusTimeout, dnsErr.Error() + case dnsErr.IsNotFound: + return StatusNXDomain, dnsErr.Error() + case dnsErr.IsTemporary: + return StatusSERVFAIL, dnsErr.Error() + default: + return StatusResolverError, dnsErr.Error() + } + } + return StatusResolverError, err.Error() +} diff --git a/internal/dnsprobe/dnsprobe_test.go b/internal/dnsprobe/dnsprobe_test.go new file mode 100644 index 00000000..e2308d76 --- /dev/null +++ b/internal/dnsprobe/dnsprobe_test.go @@ -0,0 +1,113 @@ +package dnsprobe + +import ( + "context" + "fmt" + "net" + "testing" + "time" +) + +type fakeResolver struct { + addrs []net.IPAddr + cname string + cnameErr error + err error +} + +func (r fakeResolver) LookupIPAddr(context.Context, string) ([]net.IPAddr, error) { + return r.addrs, r.err +} + +func (r fakeResolver) LookupCNAME(context.Context, string) (string, error) { + if r.cnameErr != nil { + return "", r.cnameErr + } + if r.cname == "" { + return "", &net.DNSError{Err: "no such host", IsNotFound: true} + } + return r.cname, nil +} + +func TestCheckWithResolverSuccessNormalizesEvidence(t *testing.T) { + res := CheckWithResolver(context.Background(), fakeResolver{ + addrs: []net.IPAddr{ + {IP: net.ParseIP("2001:db8::1")}, + {IP: net.ParseIP("192.0.2.10")}, + {IP: net.ParseIP("192.0.2.10")}, + }, + cname: "Origin.Example.COM.", + }, Request{BlogID: 42, Hostname: "WWW.Example.COM.", Timeout: time.Second}) + + if !res.Success || res.Status != StatusOK { + t.Fatalf("result = %+v, want success ok", res) + } + wantAddrs := []string{"192.0.2.10", "2001:db8::1"} + if fmt.Sprint(res.Addresses) != fmt.Sprint(wantAddrs) { + t.Fatalf("addresses = %v, want %v", res.Addresses, wantAddrs) + } + if got := fmt.Sprint(res.CNAMEChain); got != "[origin.example.com]" { + t.Fatalf("CNAMEChain = %s", got) + } +} + +func TestCheckWithResolverPreservesCNAMEEvidenceOnAddressFailure(t *testing.T) { + res := CheckWithResolver(context.Background(), fakeResolver{ + cname: "Target.Example.COM.", + err: &net.DNSError{Err: "no such host", IsNotFound: true}, + }, Request{BlogID: 42, Hostname: "Alias.Example.COM.", Timeout: time.Second}) + + if res.Success || res.Status != StatusNXDomain { + t.Fatalf("result = %+v, want nxdomain failure", res) + } + if got := fmt.Sprint(res.CNAMEChain); got != "[target.example.com]" { + t.Fatalf("CNAMEChain = %s, want target evidence", got) + } +} + +func TestCheckWithResolverClassifiesDNSErrors(t *testing.T) { + tests := []struct { + name string + err error + want string + }{ + {name: "not found", err: &net.DNSError{Err: "no such host", IsNotFound: true}, want: StatusNXDomain}, + {name: "timeout", err: &net.DNSError{Err: "timeout", IsTimeout: true}, want: StatusTimeout}, + {name: "temporary", err: &net.DNSError{Err: "server misbehaving", IsTemporary: true}, want: StatusSERVFAIL}, + {name: "generic", err: fmt.Errorf("resolver failed"), want: StatusResolverError}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res := CheckWithResolver(context.Background(), fakeResolver{err: tt.err}, Request{Hostname: "example.com", Timeout: time.Second}) + if res.Success || res.Status != tt.want { + t.Fatalf("status = %q success=%t, want %q false", res.Status, res.Success, tt.want) + } + }) + } +} + +func TestCheckWithResolverRejectsEmptyHostname(t *testing.T) { + res := CheckWithResolver(context.Background(), fakeResolver{}, Request{}) + if res.Success || res.Status != StatusInvalidHost { + t.Fatalf("result = %+v, want invalid host failure", res) + } +} + +func TestNormalizeResolverAddrsAddsPortsAndDeduplicates(t *testing.T) { + got := normalizeResolverAddrs([]string{"1.1.1.1", "1.1.1.1:53", "[2001:db8::1]", ""}) + want := []string{"1.1.1.1:53", "[2001:db8::1]:53"} + if fmt.Sprint(got) != fmt.Sprint(want) { + t.Fatalf("normalizeResolverAddrs = %v, want %v", got, want) + } +} + +func TestResolverForRequestUsesConfiguredResolver(t *testing.T) { + _, label := resolverForRequest(Request{ + BlogID: 42, + Hostname: "example.com", + ResolverAddrs: []string{"192.0.2.53"}, + }) + if label != "192.0.2.53:53" { + t.Fatalf("resolver label = %q, want configured resolver", label) + } +} diff --git a/internal/eventstore/eventstore.go b/internal/eventstore/eventstore.go index 6f566205..a974a8bb 100644 --- a/internal/eventstore/eventstore.go +++ b/internal/eventstore/eventstore.go @@ -409,9 +409,10 @@ func (t *Tx) Close(ctx context.Context, eventID int64, resolutionReason, source // found it via FindActiveByBlog and now want to close, promote, or otherwise // mutate it without a second round-trip to read its state. type ActiveEvent struct { - ID int64 - Severity uint8 - State string + ID int64 + Severity uint8 + State string + CauseEventID *int64 } // FindActiveByBlog returns the open event for (blog_id, check_type) — the @@ -424,18 +425,23 @@ func (t *Tx) FindActiveByBlog(ctx context.Context, blogID int64, checkType strin return ActiveEvent{}, nil } var ae ActiveEvent + var cause sql.NullInt64 err := t.tx.QueryRowContext(ctx, ` - SELECT id, severity, state FROM jetmon_events + SELECT id, severity, state, cause_event_id FROM jetmon_events WHERE blog_id = ? AND check_type = ? AND ended_at IS NULL ORDER BY started_at ASC LIMIT 1`, blogID, checkType, - ).Scan(&ae.ID, &ae.Severity, &ae.State) + ).Scan(&ae.ID, &ae.Severity, &ae.State, &cause) if errors.Is(err, sql.ErrNoRows) { return ActiveEvent{}, ErrEventNotFound } if err != nil { return ActiveEvent{}, fmt.Errorf("find active event: %w", err) } + if cause.Valid { + causeID := cause.Int64 + ae.CauseEventID = &causeID + } return ae, nil } diff --git a/internal/eventstore/eventstore_test.go b/internal/eventstore/eventstore_test.go index 00a490dc..e9a6bc46 100644 --- a/internal/eventstore/eventstore_test.go +++ b/internal/eventstore/eventstore_test.go @@ -348,9 +348,10 @@ func TestTxFindActiveByBlog(t *testing.T) { defer db.Close() mock.ExpectBegin() - mock.ExpectQuery("SELECT id, severity, state FROM jetmon_events"). + mock.ExpectQuery("SELECT id, severity, state, cause_event_id FROM jetmon_events"). WithArgs(int64(42), "http"). - WillReturnRows(sqlmock.NewRows([]string{"id", "severity", "state"}).AddRow(int64(99), SeverityDown, StateDown)) + WillReturnRows(sqlmock.NewRows([]string{"id", "severity", "state", "cause_event_id"}). + AddRow(int64(99), SeverityDown, StateDown, int64(12))) mock.ExpectRollback() tx, err := New(db).Begin(context.Background()) @@ -361,7 +362,8 @@ func TestTxFindActiveByBlog(t *testing.T) { if err != nil { t.Fatalf("FindActiveByBlog: %v", err) } - if active.ID != 99 || active.Severity != SeverityDown || active.State != StateDown { + if active.ID != 99 || active.Severity != SeverityDown || active.State != StateDown || + active.CauseEventID == nil || *active.CauseEventID != 12 { t.Fatalf("active = %+v", active) } if err := tx.Rollback(); err != nil { diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 14d79e8c..44ef318b 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -7,6 +7,8 @@ import ( "errors" "fmt" "log" + "net" + "net/url" runtimemetrics "runtime/metrics" "sort" "strings" @@ -19,6 +21,7 @@ import ( "github.com/Automattic/jetmon/internal/checker" "github.com/Automattic/jetmon/internal/config" "github.com/Automattic/jetmon/internal/db" + "github.com/Automattic/jetmon/internal/dnsprobe" "github.com/Automattic/jetmon/internal/eventstore" "github.com/Automattic/jetmon/internal/metrics" "github.com/Automattic/jetmon/internal/veriflier" @@ -40,6 +43,7 @@ const ( // constants alongside. const ( checkTypeHTTP = "http" + checkTypeDNS = "dns" checkTypeTLSExpiry = "tls_expiry" checkTypeTLSDeprecated = "tls_deprecated" ) @@ -83,7 +87,12 @@ var ( dbRecordFalsePositive = db.RecordFalsePositive dbUpdateLastAlertSent = db.UpdateLastAlertSent dbCountDueSites = db.CountDueSitesForBucketRange + dbEnsureDNSSchedules = db.EnsureDNSProbeSchedules + dbGetDueDNSProbes = db.GetDueDNSProbes + dbCountDueDNSProbes = db.CountDueDNSProbes + dbUpdateDNSStates = db.UpdateDNSProbeStates dbCountProjectionDrift = db.CountLegacyProjectionDrift + dnsProbeCheckFunc = dnsprobe.Check veriflierCheckFunc = func(c *veriflier.VeriflierClient, ctx stdctx.Context, req veriflier.CheckRequest) (*veriflier.CheckResult, error) { return c.Check(ctx, req) } @@ -146,6 +155,19 @@ type roundSummary struct { checkRedirects int checkKeywords int checkTLSDeprecated int + + dnsSchedulesCreated int + dnsSelected int + dnsCompleted int + dnsFailures int + dnsDueAtStart int + dnsDueRemaining int + dnsScheduleErrors int + dnsFetchErrors int + dnsUpdateErrors int + dnsEventErrors int + dnsDispatchDuration time.Duration + dnsProcessDuration time.Duration } func (s *roundSummary) add(other roundSummary) { @@ -185,6 +207,18 @@ func (s *roundSummary) add(other roundSummary) { s.checkRedirects += other.checkRedirects s.checkKeywords += other.checkKeywords s.checkTLSDeprecated += other.checkTLSDeprecated + s.dnsSchedulesCreated += other.dnsSchedulesCreated + s.dnsSelected += other.dnsSelected + s.dnsCompleted += other.dnsCompleted + s.dnsFailures += other.dnsFailures + s.dnsDueAtStart += other.dnsDueAtStart + s.dnsDueRemaining += other.dnsDueRemaining + s.dnsScheduleErrors += other.dnsScheduleErrors + s.dnsFetchErrors += other.dnsFetchErrors + s.dnsUpdateErrors += other.dnsUpdateErrors + s.dnsEventErrors += other.dnsEventErrors + s.dnsDispatchDuration += other.dnsDispatchDuration + s.dnsProcessDuration += other.dnsProcessDuration if other.oldestSelectedAge > s.oldestSelectedAge { s.oldestSelectedAge = other.oldestSelectedAge } @@ -224,6 +258,11 @@ type siteCheckResult struct { res checker.Result } +type dnsCheckResult struct { + target db.DNSProbeTarget + res dnsprobe.Result +} + // Orchestrator drives the main check loop. type Orchestrator struct { pool *checker.Pool @@ -433,6 +472,10 @@ func (o *Orchestrator) runRound() roundSummary { } } + if cfg.DNSMonitorEnable && !summary.interrupted { + summary.add(o.runDNSProbes(cfg)) + } + if cfg.UseVariableCheckIntervals && dueCountsSampled { if due, err := dbCountDueSites(o.ctx, o.bucketMin, o.bucketMax, true); err != nil { summary.dueCountErrors++ @@ -524,6 +567,349 @@ process: return summary } +func (o *Orchestrator) runDNSProbes(cfg *config.Config) roundSummary { + summary := roundSummary{} + now := nowFunc().UTC() + intervalSec := cfg.DNSMonitorIntervalSec + if intervalSec <= 0 { + intervalSec = 900 + } + + if due, err := dbCountDueDNSProbes(o.ctx, o.bucketMin, o.bucketMax); err != nil { + summary.dnsFetchErrors++ + log.Printf("orchestrator: count due DNS probes failed: %v", err) + } else { + summary.dnsDueAtStart = due + } + + if created, err := dbEnsureDNSSchedules( + o.ctx, + o.bucketMin, + o.bucketMax, + dnsScheduleBatchSize(cfg), + intervalSec, + now, + ); err != nil { + summary.dnsScheduleErrors++ + log.Printf("orchestrator: ensure DNS probe schedules failed: %v", err) + } else { + summary.dnsSchedulesCreated = created + } + + targets, err := dbGetDueDNSProbes(o.ctx, o.bucketMin, o.bucketMax, dnsBatchSize(cfg)) + if err != nil { + summary.dnsFetchErrors++ + log.Printf("orchestrator: fetch due DNS probes failed: %v", err) + return summary + } + if len(targets) == 0 { + return summary + } + summary.dnsSelected = len(targets) + + start := time.Now() + results := o.checkDNSTargets(cfg, targets) + summary.dnsDispatchDuration = time.Since(start) + summary.dnsCompleted = len(results) + for _, result := range results { + if !result.res.Success { + summary.dnsFailures++ + } + } + + processStart := time.Now() + o.processDNSResults(cfg, results, &summary) + summary.dnsProcessDuration = time.Since(processStart) + + if due, err := dbCountDueDNSProbes(o.ctx, o.bucketMin, o.bucketMax); err != nil { + summary.dnsFetchErrors++ + log.Printf("orchestrator: count remaining DNS probes failed: %v", err) + } else { + summary.dnsDueRemaining = due + } + logDNSSummary(summary) + return summary +} + +func (o *Orchestrator) checkDNSTargets(cfg *config.Config, targets []db.DNSProbeTarget) []dnsCheckResult { + workerCount := min(dnsWorkerLimit(cfg), len(targets)) + if workerCount < 1 { + workerCount = 1 + } + jobs := make(chan db.DNSProbeTarget) + results := make(chan dnsCheckResult, len(targets)) + var wg sync.WaitGroup + for range workerCount { + wg.Add(1) + go func() { + defer wg.Done() + for target := range jobs { + hostname := hostnameForDNSTarget(target) + req := dnsprobe.Request{ + BlogID: target.BlogID, + Hostname: hostname, + Timeout: time.Duration(cfg.DNSMonitorTimeoutMS) * time.Millisecond, + ResolverAddrs: cfg.DNSMonitorResolvers, + } + results <- dnsCheckResult{ + target: target, + res: dnsProbeCheckFunc(o.ctx, req), + } + } + }() + } + go func() { + defer close(jobs) + for _, target := range targets { + select { + case jobs <- target: + case <-o.ctx.Done(): + return + } + } + }() + wg.Wait() + close(results) + + out := make([]dnsCheckResult, 0, len(targets)) + for result := range results { + out = append(out, result) + } + sort.Slice(out, func(i, j int) bool { + return out[i].target.BlogID < out[j].target.BlogID + }) + return out +} + +func (o *Orchestrator) processDNSResults(cfg *config.Config, results []dnsCheckResult, summary *roundSummary) { + updates := make([]db.DNSProbeStateUpdate, 0, len(results)) + for _, record := range results { + emitCounter("dns.check."+metricSegment(record.res.Status)+".count", 1) + checkedAt := record.res.Timestamp + if checkedAt.IsZero() { + checkedAt = nowFunc().UTC() + } + intervalSec := record.target.IntervalSeconds + if intervalSec <= 0 { + intervalSec = cfg.DNSMonitorIntervalSec + } + if intervalSec <= 0 { + intervalSec = 900 + } + updates = append(updates, db.DNSProbeStateUpdate{ + BlogID: record.target.BlogID, + Hostname: record.res.Hostname, + IntervalSeconds: intervalSec, + CheckedAt: checkedAt, + NextCheckAt: checkedAt.Add(time.Duration(intervalSec) * time.Second), + Result: record.res.Status, + Error: record.res.Error, + Addresses: record.res.Addresses, + CNAMEChain: record.res.CNAMEChain, + }) + if err := o.handleDNSResult(record.res); err != nil { + summary.dnsEventErrors++ + log.Printf("orchestrator: DNS event update blog_id=%d hostname=%s status=%s: %v", + record.target.BlogID, record.res.Hostname, record.res.Status, err) + } + } + if err := dbUpdateDNSStates(o.ctx, updates); err != nil { + summary.dnsUpdateErrors++ + log.Printf("orchestrator: update DNS probe states rows=%d: %v", len(updates), err) + } +} + +func (o *Orchestrator) handleDNSResult(res dnsprobe.Result) error { + if res.Success { + return o.closeDNSIfOpen(res) + } + meta, _ := json.Marshal(dnsEventMetadata(res)) + tx, err := o.ev().Begin(o.ctx) + if err != nil { + return err + } + defer func() { _ = tx.Rollback() }() + openRes, err := tx.Open(o.ctx, eventstore.OpenInput{ + Identity: eventstore.Identity{ + BlogID: res.BlogID, + CheckType: checkTypeDNS, + }, + Severity: eventstore.SeverityDegraded, + State: eventstore.StateDegraded, + Source: o.hostname, + Metadata: meta, + }) + if err != nil { + return err + } + linked := false + if openRes.EventID > 0 { + linked, err = o.linkActiveHTTPToDNSCause(tx, res.BlogID, openRes.EventID) + if err != nil { + return err + } + } + if err := tx.Commit(); err != nil { + return fmt.Errorf("commit DNS event: %w", err) + } + if openRes.Opened { + emitCounter("dns.event.open.count", 1) + emitCounter("dns.event.open."+metricSegment(res.Status)+".count", 1) + } + if linked { + emitCounter("dns.event.cause_linked.count", 1) + } + return nil +} + +func (o *Orchestrator) closeDNSIfOpen(res dnsprobe.Result) error { + tx, err := o.ev().Begin(o.ctx) + if err != nil { + return err + } + defer func() { _ = tx.Rollback() }() + active, err := tx.FindActiveByBlog(o.ctx, res.BlogID, checkTypeDNS) + if errors.Is(err, eventstore.ErrEventNotFound) { + return nil + } + if err != nil { + return err + } + meta, _ := json.Marshal(dnsEventMetadata(res)) + if err := tx.Close(o.ctx, active.ID, eventstore.ReasonProbeCleared, o.hostname, meta); err != nil { + return err + } + unlinked, err := o.unlinkHTTPIfCausedByDNS(tx, res.BlogID, active.ID) + if err != nil { + return err + } + if err := tx.Commit(); err != nil { + return fmt.Errorf("commit DNS recovery: %w", err) + } + emitCounter("dns.event.close.count", 1) + if unlinked { + emitCounter("dns.event.cause_unlinked.count", 1) + } + return nil +} + +func dnsEventMetadata(res dnsprobe.Result) map[string]any { + return map[string]any{ + "hostname": res.Hostname, + "status": res.Status, + "error": res.Error, + "addresses": res.Addresses, + "cname_chain": res.CNAMEChain, + "duration_ms": res.Duration.Milliseconds(), + "checked_at": res.Timestamp.UTC().Format(time.RFC3339), + "resolver": resolverLabel(res), + } +} + +func (o *Orchestrator) linkActiveHTTPToDNSCause(tx *eventstore.Tx, blogID, dnsEventID int64) (bool, error) { + httpEvent, err := tx.FindActiveByBlog(o.ctx, blogID, checkTypeHTTP) + if errors.Is(err, eventstore.ErrEventNotFound) { + return false, nil + } + if err != nil { + return false, err + } + if httpEvent.CauseEventID != nil && *httpEvent.CauseEventID != dnsEventID { + return false, nil + } + return tx.LinkCause(o.ctx, httpEvent.ID, dnsEventID, o.hostname) +} + +func (o *Orchestrator) unlinkHTTPIfCausedByDNS(tx *eventstore.Tx, blogID, dnsEventID int64) (bool, error) { + httpEvent, err := tx.FindActiveByBlog(o.ctx, blogID, checkTypeHTTP) + if errors.Is(err, eventstore.ErrEventNotFound) { + return false, nil + } + if err != nil { + return false, err + } + if httpEvent.CauseEventID == nil || *httpEvent.CauseEventID != dnsEventID { + return false, nil + } + return tx.LinkCause(o.ctx, httpEvent.ID, 0, o.hostname) +} + +func resolverLabel(res dnsprobe.Result) string { + if res.Resolver != "" { + return res.Resolver + } + return "system" +} + +func dnsWorkerLimit(cfg *config.Config) int { + if cfg.DNSMonitorMaxWorkers > 0 { + return cfg.DNSMonitorMaxWorkers + } + workers := cfg.NumWorkers / 4 + if workers < 4 { + workers = 4 + } + if workers > 64 { + workers = 64 + } + return workers +} + +func dnsBatchSize(cfg *config.Config) int { + if cfg.DNSMonitorBatchSize > 0 { + return cfg.DNSMonitorBatchSize + } + return max(100, dnsWorkerLimit(cfg)*8) +} + +func dnsScheduleBatchSize(cfg *config.Config) int { + if cfg.DNSMonitorScheduleBatchSize > 0 { + return cfg.DNSMonitorScheduleBatchSize + } + return max(500, dnsBatchSize(cfg)*4) +} + +func hostnameForDNSTarget(target db.DNSProbeTarget) string { + if strings.TrimSpace(target.MonitorURL) != "" { + if hostname, err := hostnameFromMonitorURL(target.MonitorURL); err == nil { + return hostname + } + return "" + } + return dnsprobe.NormalizeHostname(target.Hostname) +} + +func hostnameFromMonitorURL(raw string) (string, error) { + value := strings.TrimSpace(raw) + if value == "" { + return "", fmt.Errorf("monitor_url is empty") + } + if u, err := url.Parse(value); err == nil { + if host := dnsprobe.NormalizeHostname(u.Hostname()); host != "" { + return host, nil + } + } + if !strings.Contains(value, "://") { + if u, err := url.Parse("http://" + value); err == nil { + if host := dnsprobe.NormalizeHostname(u.Hostname()); host != "" { + return host, nil + } + } + } + if host, _, err := net.SplitHostPort(value); err == nil { + if normalized := dnsprobe.NormalizeHostname(host); normalized != "" { + return normalized, nil + } + } + if strings.ContainsAny(value, "/?#") { + return "", fmt.Errorf("monitor_url has no hostname") + } + if host := dnsprobe.NormalizeHostname(value); host != "" { + return host, nil + } + return "", fmt.Errorf("monitor_url has no hostname") +} + func emitPageMetrics(summary roundSummary) { m := metricsClientFunc() if m == nil { @@ -785,6 +1171,18 @@ func (o *Orchestrator) finishRound(cfg *config.Config, summary roundSummary) { m.Increment("scheduler.round.check.redirect.count", summary.checkRedirects) m.Increment("scheduler.round.check.keyword.count", summary.checkKeywords) m.Increment("scheduler.round.check.tls_deprecated.count", summary.checkTLSDeprecated) + m.Increment("dns.schedule.created.count", summary.dnsSchedulesCreated) + m.Gauge("dns.due_start.count", summary.dnsDueAtStart) + m.Gauge("dns.selected.count", summary.dnsSelected) + m.Gauge("dns.completed.count", summary.dnsCompleted) + m.Gauge("dns.failure.count", summary.dnsFailures) + m.Gauge("dns.due_remaining.count", summary.dnsDueRemaining) + m.Increment("dns.schedule.error.count", summary.dnsScheduleErrors) + m.Increment("dns.fetch.error.count", summary.dnsFetchErrors) + m.Increment("dns.update.error.count", summary.dnsUpdateErrors) + m.Increment("dns.event.error.count", summary.dnsEventErrors) + m.Timing("dns.dispatch.time", summary.dnsDispatchDuration) + m.Timing("dns.process.time", summary.dnsProcessDuration) if cfg.StatsdSendMemUsage { m.EmitMemStats() @@ -795,17 +1193,51 @@ func (o *Orchestrator) finishRound(cfg *config.Config, summary roundSummary) { logRoundSummary(summary, roundDuration, sps) } +func logDNSSummary(summary roundSummary) { + if summary.dnsSelected == 0 && + summary.dnsSchedulesCreated == 0 && + summary.dnsDueRemaining == 0 && + summary.dnsScheduleErrors == 0 && + summary.dnsFetchErrors == 0 && + summary.dnsUpdateErrors == 0 && + summary.dnsEventErrors == 0 { + return + } + log.Printf( + "orchestrator: DNS summary schedules_created=%d due_start=%d selected=%d completed=%d failures=%d due_remaining=%d schedule_errors=%d fetch_errors=%d update_errors=%d event_errors=%d dispatch=%s process=%s", + summary.dnsSchedulesCreated, + summary.dnsDueAtStart, + summary.dnsSelected, + summary.dnsCompleted, + summary.dnsFailures, + summary.dnsDueRemaining, + summary.dnsScheduleErrors, + summary.dnsFetchErrors, + summary.dnsUpdateErrors, + summary.dnsEventErrors, + summary.dnsDispatchDuration.Round(time.Millisecond), + summary.dnsProcessDuration.Round(time.Millisecond), + ) +} + func logRoundSummary(summary roundSummary, roundDuration time.Duration, sps int) { if summary.selected == 0 && + summary.dnsSelected == 0 && + summary.dnsSchedulesCreated == 0 && summary.dueRemaining == 0 && + summary.dnsDueRemaining == 0 && summary.outstanding == 0 && summary.backpressureWaits == 0 && summary.fetchErrors == 0 && - summary.dueCountErrors == 0 { + summary.dueCountErrors == 0 && + summary.dnsScheduleErrors == 0 && + summary.dnsFetchErrors == 0 && + summary.dnsUpdateErrors == 0 && + summary.dnsEventErrors == 0 { return } log.Printf( - "orchestrator: round summary pages=%d due_count_sampled=%t due_start=%d selected=%d dispatched=%d completed=%d outstanding=%d due_remaining=%d backpressure_waits=%d stale_results=%d duplicate_results=%d never_checked=%d oldest_selected_age_sec=%d dispatch=%s wait=%s process=%s mark_checked=%s history=%s ssl=%s events=%s checks_success=%d checks_failure=%d checks_http_failure=%d checks_timeout=%d checks_connect_error=%d checks_ssl_error=%d checks_redirect=%d checks_keyword=%d checks_tls_deprecated=%d mark_checked_rows=%d history_rows=%d ssl_rows=%d mark_checked_errors=%d history_errors=%d ssl_errors=%d duration=%s sps=%d", + "orchestrator: round summary pages=%d due_count_sampled=%t due_start=%d selected=%d dispatched=%d completed=%d outstanding=%d due_remaining=%d backpressure_waits=%d stale_results=%d duplicate_results=%d never_checked=%d oldest_selected_age_sec=%d dispatch=%s wait=%s process=%s mark_checked=%s history=%s ssl=%s events=%s checks_success=%d checks_failure=%d checks_http_failure=%d checks_timeout=%d checks_connect_error=%d checks_ssl_error=%d checks_redirect=%d checks_keyword=%d checks_tls_deprecated=%d mark_checked_rows=%d history_rows=%d ssl_rows=%d mark_checked_errors=%d history_errors=%d ssl_errors=%d dns_schedules_created=%d dns_due_start=%d dns_selected=%d dns_completed=%d dns_failures=%d dns_due_remaining=%d dns_schedule_errors=%d dns_fetch_errors=%d dns_update_errors=%d dns_event_errors=%d duration=%s sps=%d", summary.pagesFetched, summary.dueCountsSampled, summary.dueAtStart, @@ -841,6 +1273,16 @@ func logRoundSummary(summary roundSummary, roundDuration time.Duration, sps int) summary.markCheckedErrors, summary.historyErrors, summary.sslErrors, + summary.dnsSchedulesCreated, + summary.dnsDueAtStart, + summary.dnsSelected, + summary.dnsCompleted, + summary.dnsFailures, + summary.dnsDueRemaining, + summary.dnsScheduleErrors, + summary.dnsFetchErrors, + summary.dnsUpdateErrors, + summary.dnsEventErrors, roundDuration.Round(time.Millisecond), sps, ) diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index a3f6f4cf..d18b118b 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -17,6 +17,7 @@ import ( "github.com/Automattic/jetmon/internal/checker" "github.com/Automattic/jetmon/internal/config" "github.com/Automattic/jetmon/internal/db" + "github.com/Automattic/jetmon/internal/dnsprobe" "github.com/Automattic/jetmon/internal/eventstore" "github.com/Automattic/jetmon/internal/veriflier" "github.com/Automattic/jetmon/internal/wpcom" @@ -59,6 +60,237 @@ func TestTimeoutForSite(t *testing.T) { } } +func TestDNSAutoSizingUsesHTTPWorkers(t *testing.T) { + cfg := &config.Config{NumWorkers: 60} + if got := dnsWorkerLimit(cfg); got != 15 { + t.Fatalf("dnsWorkerLimit = %d, want 15", got) + } + if got := dnsBatchSize(cfg); got != 120 { + t.Fatalf("dnsBatchSize = %d, want 120", got) + } + + cfg.DNSMonitorMaxWorkers = 200 + cfg.DNSMonitorBatchSize = 25 + if got := dnsWorkerLimit(cfg); got != 200 { + t.Fatalf("dnsWorkerLimit override = %d, want 200", got) + } + if got := dnsBatchSize(cfg); got != 25 { + t.Fatalf("dnsBatchSize override = %d, want 25", got) + } +} + +func TestRunDNSProbesSchedulesChecksAndUpdatesState(t *testing.T) { + restore := stubOrchestratorDeps() + defer restore() + + cfg := &config.Config{ + NumWorkers: 20, + DNSMonitorEnable: true, + DNSMonitorIntervalSec: 900, + DNSMonitorTimeoutMS: 2000, + DNSMonitorBatchSize: 10, + DNSMonitorScheduleBatchSize: 50, + DNSMonitorResolvers: []string{"192.0.2.53:53"}, + LegacyStatusProjectionEnable: false, + } + o := &Orchestrator{ + hostname: "test-host", + bucketMin: 0, + bucketMax: 99, + ctx: context.Background(), + } + rec := newRecordingMetrics() + metricsClientFunc = func() metricsClient { return rec } + + var scheduleLimit int + dbEnsureDNSSchedules = func(_ context.Context, _, _ int, limit, intervalSec int, _ time.Time) (int, error) { + scheduleLimit = limit + if intervalSec != 900 { + t.Fatalf("intervalSec = %d, want 900", intervalSec) + } + return 2, nil + } + dbCountDueDNSProbes = func(context.Context, int, int) (int, error) { return 1, nil } + dbGetDueDNSProbes = func(_ context.Context, _, _ int, limit int) ([]db.DNSProbeTarget, error) { + if limit != 10 { + t.Fatalf("limit = %d, want 10", limit) + } + return []db.DNSProbeTarget{{ + BlogID: 42, + BucketNo: 7, + MonitorURL: "https://Example.COM/path", + Hostname: "old.example.com", + IntervalSeconds: 900, + }}, nil + } + dnsProbeCheckFunc = func(_ context.Context, req dnsprobe.Request) dnsprobe.Result { + if req.Hostname != "example.com" { + t.Fatalf("Hostname = %q, want example.com", req.Hostname) + } + if fmt.Sprint(req.ResolverAddrs) != "[192.0.2.53:53]" { + t.Fatalf("ResolverAddrs = %v", req.ResolverAddrs) + } + return dnsprobe.Result{ + BlogID: req.BlogID, + Hostname: req.Hostname, + Success: true, + Status: dnsprobe.StatusOK, + Addresses: []string{"192.0.2.10"}, + Timestamp: time.Now().UTC(), + } + } + var updates []db.DNSProbeStateUpdate + dbUpdateDNSStates = func(_ context.Context, got []db.DNSProbeStateUpdate) error { + updates = append([]db.DNSProbeStateUpdate(nil), got...) + return nil + } + + summary := o.runDNSProbes(cfg) + if scheduleLimit != 50 { + t.Fatalf("scheduleLimit = %d, want 50", scheduleLimit) + } + if summary.dnsSchedulesCreated != 2 || summary.dnsSelected != 1 || summary.dnsCompleted != 1 || summary.dnsFailures != 0 { + t.Fatalf("summary = %+v", summary) + } + if len(updates) != 1 || updates[0].BlogID != 42 || updates[0].Hostname != "example.com" || updates[0].Result != dnsprobe.StatusOK { + t.Fatalf("updates = %+v", updates) + } + if got := rec.counter("dns.check.ok.count"); got != 1 { + t.Fatalf("dns.check.ok.count = %d, want 1", got) + } +} + +func TestHandleDNSResultLinksActiveHTTPEventToDNSCause(t *testing.T) { + restore := stubOrchestratorDeps() + defer restore() + + sqlDB, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer sqlDB.Close() + + rec := newRecordingMetrics() + metricsClientFunc = func() metricsClient { return rec } + + mock.ExpectBegin() + mock.ExpectExec("INSERT INTO jetmon_events"). + WithArgs(int64(42), nil, checkTypeDNS, nil, eventstore.SeverityDegraded, eventstore.StateDegraded, sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(300, 1)) + mock.ExpectExec("INSERT INTO jetmon_event_transitions"). + WithArgs(int64(300), int64(42), nil, eventstore.SeverityDegraded, nil, eventstore.StateDegraded, eventstore.ReasonOpened, "test-host", sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectQuery("SELECT id, severity, state, cause_event_id FROM jetmon_events"). + WithArgs(int64(42), checkTypeHTTP). + WillReturnRows(sqlmock.NewRows([]string{"id", "severity", "state", "cause_event_id"}). + AddRow(int64(200), eventstore.SeverityDown, eventstore.StateDown, nil)) + mock.ExpectQuery("SELECT blog_id, severity, state, ended_at, cause_event_id"). + WithArgs(int64(200)). + WillReturnRows(sqlmock.NewRows([]string{"blog_id", "severity", "state", "ended_at", "cause_event_id"}). + AddRow(int64(42), eventstore.SeverityDown, eventstore.StateDown, nil, nil)) + mock.ExpectExec("UPDATE jetmon_events SET cause_event_id"). + WithArgs(int64(300), int64(200)). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO jetmon_event_transitions"). + WithArgs(int64(200), int64(42), eventstore.SeverityDown, eventstore.SeverityDown, eventstore.StateDown, eventstore.StateDown, eventstore.ReasonCauseLinked, "test-host", sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(2, 1)) + mock.ExpectCommit() + + o := &Orchestrator{ + events: eventstore.New(sqlDB), + hostname: "test-host", + ctx: context.Background(), + } + err = o.handleDNSResult(dnsprobe.Result{ + BlogID: 42, + Hostname: "example.com", + Status: dnsprobe.StatusNXDomain, + Error: "no such host", + Resolver: "192.0.2.53:53", + }) + if err != nil { + t.Fatalf("handleDNSResult: %v", err) + } + if got := rec.counter("dns.event.open.nxdomain.count"); got != 1 { + t.Fatalf("dns.event.open.nxdomain.count = %d, want 1", got) + } + if got := rec.counter("dns.event.cause_linked.count"); got != 1 { + t.Fatalf("dns.event.cause_linked.count = %d, want 1", got) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet sql expectations: %v", err) + } +} + +func TestCloseDNSIfOpenUnlinksHTTPEventCausedByDNS(t *testing.T) { + restore := stubOrchestratorDeps() + defer restore() + + sqlDB, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer sqlDB.Close() + + rec := newRecordingMetrics() + metricsClientFunc = func() metricsClient { return rec } + + mock.ExpectBegin() + mock.ExpectQuery("SELECT id, severity, state, cause_event_id FROM jetmon_events"). + WithArgs(int64(42), checkTypeDNS). + WillReturnRows(sqlmock.NewRows([]string{"id", "severity", "state", "cause_event_id"}). + AddRow(int64(300), eventstore.SeverityDegraded, eventstore.StateDegraded, nil)) + mock.ExpectQuery("SELECT blog_id, severity, state, ended_at, cause_event_id"). + WithArgs(int64(300)). + WillReturnRows(sqlmock.NewRows([]string{"blog_id", "severity", "state", "ended_at", "cause_event_id"}). + AddRow(int64(42), eventstore.SeverityDegraded, eventstore.StateDegraded, nil, nil)) + mock.ExpectExec("UPDATE jetmon_events"). + WithArgs(eventstore.ReasonProbeCleared, int64(300)). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO jetmon_event_transitions"). + WithArgs(int64(300), int64(42), eventstore.SeverityDegraded, nil, eventstore.StateDegraded, eventstore.StateResolved, eventstore.ReasonProbeCleared, "test-host", sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectQuery("SELECT id, severity, state, cause_event_id FROM jetmon_events"). + WithArgs(int64(42), checkTypeHTTP). + WillReturnRows(sqlmock.NewRows([]string{"id", "severity", "state", "cause_event_id"}). + AddRow(int64(200), eventstore.SeverityDown, eventstore.StateDown, int64(300))) + mock.ExpectQuery("SELECT blog_id, severity, state, ended_at, cause_event_id"). + WithArgs(int64(200)). + WillReturnRows(sqlmock.NewRows([]string{"blog_id", "severity", "state", "ended_at", "cause_event_id"}). + AddRow(int64(42), eventstore.SeverityDown, eventstore.StateDown, nil, int64(300))) + mock.ExpectExec("UPDATE jetmon_events SET cause_event_id"). + WithArgs(nil, int64(200)). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO jetmon_event_transitions"). + WithArgs(int64(200), int64(42), eventstore.SeverityDown, eventstore.SeverityDown, eventstore.StateDown, eventstore.StateDown, eventstore.ReasonCauseUnlinked, "test-host", sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(2, 1)) + mock.ExpectCommit() + + o := &Orchestrator{ + events: eventstore.New(sqlDB), + hostname: "test-host", + ctx: context.Background(), + } + err = o.closeDNSIfOpen(dnsprobe.Result{ + BlogID: 42, + Hostname: "example.com", + Success: true, + Status: dnsprobe.StatusOK, + }) + if err != nil { + t.Fatalf("closeDNSIfOpen: %v", err) + } + if got := rec.counter("dns.event.close.count"); got != 1 { + t.Fatalf("dns.event.close.count = %d, want 1", got) + } + if got := rec.counter("dns.event.cause_unlinked.count"); got != 1 { + t.Fatalf("dns.event.cause_unlinked.count = %d, want 1", got) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet sql expectations: %v", err) + } +} + func TestInMaintenance(t *testing.T) { origNow := nowFunc defer func() { nowFunc = origNow }() @@ -525,7 +757,12 @@ func stubOrchestratorDeps() func() { origDBUpdateSSLExpiry := dbUpdateSSLExpiry origDBUpdateSSLExpiries := dbUpdateSSLExpiries origDBCountDueSites := dbCountDueSites + origDBEnsureDNSSchedules := dbEnsureDNSSchedules + origDBGetDueDNSProbes := dbGetDueDNSProbes + origDBCountDueDNSProbes := dbCountDueDNSProbes + origDBUpdateDNSStates := dbUpdateDNSStates origDBCountProjectionDrift := dbCountProjectionDrift + origDNSProbeCheck := dnsProbeCheckFunc origNotify := wpcomNotifyFunc origVeriflierCheck := veriflierCheckFunc origMetricsClient := metricsClientFunc @@ -546,7 +783,12 @@ func stubOrchestratorDeps() func() { dbUpdateSSLExpiry = func(context.Context, int64, time.Time) error { return nil } dbUpdateSSLExpiries = func(context.Context, []db.SiteSSLExpiry) error { return nil } dbCountDueSites = func(context.Context, int, int, bool) (int, error) { return 0, nil } + dbEnsureDNSSchedules = func(context.Context, int, int, int, int, time.Time) (int, error) { return 0, nil } + dbGetDueDNSProbes = func(context.Context, int, int, int) ([]db.DNSProbeTarget, error) { return nil, nil } + dbCountDueDNSProbes = func(context.Context, int, int) (int, error) { return 0, nil } + dbUpdateDNSStates = func(context.Context, []db.DNSProbeStateUpdate) error { return nil } dbCountProjectionDrift = func(context.Context, int, int) (int, error) { return 0, nil } + dnsProbeCheckFunc = func(context.Context, dnsprobe.Request) dnsprobe.Result { return dnsprobe.Result{} } wpcomNotifyFunc = func(_ *wpcom.Client, _ wpcom.Notification) error { return nil } veriflierCheckFunc = func(c *veriflier.VeriflierClient, ctx context.Context, req veriflier.CheckRequest) (*veriflier.CheckResult, error) { return c.Check(ctx, req) @@ -569,7 +811,12 @@ func stubOrchestratorDeps() func() { dbUpdateSSLExpiry = origDBUpdateSSLExpiry dbUpdateSSLExpiries = origDBUpdateSSLExpiries dbCountDueSites = origDBCountDueSites + dbEnsureDNSSchedules = origDBEnsureDNSSchedules + dbGetDueDNSProbes = origDBGetDueDNSProbes + dbCountDueDNSProbes = origDBCountDueDNSProbes + dbUpdateDNSStates = origDBUpdateDNSStates dbCountProjectionDrift = origDBCountProjectionDrift + dnsProbeCheckFunc = origDNSProbeCheck wpcomNotifyFunc = origNotify veriflierCheckFunc = origVeriflierCheck metricsClientFunc = origMetricsClient @@ -1240,10 +1487,10 @@ func TestCheckTLSDeprecatedClosesWarningOnModernTLS(t *testing.T) { defer sqlDB.Close() mock.ExpectBegin() - mock.ExpectQuery("SELECT id, severity, state FROM jetmon_events"). + mock.ExpectQuery("SELECT id, severity, state, cause_event_id FROM jetmon_events"). WithArgs(int64(73), checkTypeTLSDeprecated). - WillReturnRows(sqlmock.NewRows([]string{"id", "severity", "state"}). - AddRow(int64(202), eventstore.SeverityWarning, eventstore.StateWarning)) + WillReturnRows(sqlmock.NewRows([]string{"id", "severity", "state", "cause_event_id"}). + AddRow(int64(202), eventstore.SeverityWarning, eventstore.StateWarning, nil)) mock.ExpectQuery("SELECT blog_id, severity, state, ended_at, cause_event_id"). WithArgs(int64(202)). WillReturnRows(sqlmock.NewRows([]string{"blog_id", "severity", "state", "ended_at", "cause_event_id"}). @@ -1305,10 +1552,10 @@ func TestCloseSSLExpiryUsesProbeCleared(t *testing.T) { defer sqlDB.Close() mock.ExpectBegin() - mock.ExpectQuery("SELECT id, severity, state FROM jetmon_events"). + mock.ExpectQuery("SELECT id, severity, state, cause_event_id FROM jetmon_events"). WithArgs(int64(74), checkTypeTLSExpiry). - WillReturnRows(sqlmock.NewRows([]string{"id", "severity", "state"}). - AddRow(int64(303), eventstore.SeverityWarning, eventstore.StateWarning)) + WillReturnRows(sqlmock.NewRows([]string{"id", "severity", "state", "cause_event_id"}). + AddRow(int64(303), eventstore.SeverityWarning, eventstore.StateWarning, nil)) mock.ExpectQuery("SELECT blog_id, severity, state, ended_at, cause_event_id"). WithArgs(int64(303)). WillReturnRows(sqlmock.NewRows([]string{"blog_id", "severity", "state", "ended_at", "cause_event_id"}).