diff --git a/pkg/agents/agents.go b/pkg/agents/agents.go index 06fdd47e..6241039f 100644 --- a/pkg/agents/agents.go +++ b/pkg/agents/agents.go @@ -30,10 +30,20 @@ func AllFlags() []cli.Flag { // ConfigureAll initializes all configured agents and returns a slice of ToolSets. // Agents that are not configured will return nil and be skipped. -func ConfigureAll(ctx context.Context) ([]interfaces.ToolSet, error) { +// +// storageClient and storagePrefix are the warren-wide shared storage client +// (bucket already bound) and object-key prefix. They are injected into any +// factory implementing StorageAware before Configure is called. storageClient +// may be nil when storage is not configured; storage-aware agents must degrade +// gracefully in that case. +func ConfigureAll(ctx context.Context, storageClient interfaces.StorageClient, storagePrefix string) ([]interfaces.ToolSet, error) { var toolSets []interfaces.ToolSet for _, factory := range All { + if aware, ok := factory.(StorageAware); ok { + aware.SetStorage(storageClient, storagePrefix) + } + ts, err := factory.Configure(ctx) if err != nil { return nil, goerr.Wrap(err, "failed to configure agent") diff --git a/pkg/agents/factory.go b/pkg/agents/factory.go index 602de663..d28f03ba 100644 --- a/pkg/agents/factory.go +++ b/pkg/agents/factory.go @@ -17,3 +17,12 @@ type ToolSetFactory interface { // Returns (nil, nil) if the agent is not configured. Configure(ctx context.Context) (interfaces.ToolSet, error) } + +// StorageAware is an optional interface for factories that need the +// warren-wide storage client and prefix (e.g. for snapshotting large +// result sets to shared object storage). ConfigureAll injects these +// before calling Configure. Factories that do not need storage simply +// omit this interface. +type StorageAware interface { + SetStorage(client interfaces.StorageClient, prefix string) +} diff --git a/pkg/agents/falcon/README.md b/pkg/agents/falcon/README.md index 1e395ac0..01173d9f 100644 --- a/pkg/agents/falcon/README.md +++ b/pkg/agents/falcon/README.md @@ -124,6 +124,15 @@ The agent exposes the following read-only tools to the LLM: > **Note:** Event search uses the Next-Gen SIEM Search API, which runs queries asynchronously. The tool handles job creation and polling internally — results are returned once the search completes. +#### Pagination and result limits + +`falcon_search_events` returns events in pages to avoid flooding the agent with large result sets: + +- At most **100 events** are returned per call (`limit`, max 100). The response includes `total` (number of events in the result set), `offset`, `returned`, and `has_more`. +- A filter query returns at most **200 events by default**. To retrieve more (up to **20,000**), append `| tail(N)` to the query string. For the exact number of matching events, use an aggregation such as `| count()` rather than paging through raw events. +- The first response includes a `result_set_id`. To fetch later pages, call the tool again with that `result_set_id` and an increased `offset` — this serves pages from a stored snapshot **without re-running the query**, so pagination is stable and cheap. +- Snapshots are written to the configured Cloud Storage bucket (shared `--storage-bucket` / `--storage-prefix`) under `falcon/events/`. When storage is not configured, only the first page is available (no `result_set_id`). Snapshot lifetime is governed by the bucket's object lifecycle (TTL) policy. + ## Troubleshooting ### Agent not appearing in available tools diff --git a/pkg/agents/falcon/export_test.go b/pkg/agents/falcon/export_test.go index acef1bff..f0b5c5c6 100644 --- a/pkg/agents/falcon/export_test.go +++ b/pkg/agents/falcon/export_test.go @@ -1,6 +1,10 @@ package falcon -import "context" +import ( + "context" + + "github.com/secmon-lab/warren/pkg/domain/interfaces" +) // TokenProviderForTest wraps tokenProvider for testing purposes. type TokenProviderForTest struct { @@ -29,11 +33,21 @@ type InternalToolForTest struct { tool *internalTool } -// NewInternalToolForTest creates an internalTool wrapper for testing. +// NewInternalToolForTest creates an internalTool wrapper for testing without +// storage (event search returns the first page directly). func NewInternalToolForTest(clientID, clientSecret, baseURL string) *InternalToolForTest { tp := newTokenProvider(clientID, clientSecret, baseURL) return &InternalToolForTest{ - tool: newInternalTool(tp, baseURL), + tool: newInternalTool(tp, baseURL, nil, ""), + } +} + +// NewInternalToolForTestWithStorage creates an internalTool wrapper backed by +// the given storage client, enabling result-set snapshotting and pagination. +func NewInternalToolForTestWithStorage(clientID, clientSecret, baseURL string, storage interfaces.StorageClient, prefix string) *InternalToolForTest { + tp := newTokenProvider(clientID, clientSecret, baseURL) + return &InternalToolForTest{ + tool: newInternalTool(tp, baseURL, storage, prefix), } } @@ -50,3 +64,14 @@ func (t *InternalToolForTest) SpecCount(ctx context.Context) (int, error) { } return len(specs), nil } + +// ParseLimit exposes parseLimit for testing. +func ParseLimit(args map[string]any) int { return parseLimit(args) } + +// ParseOffset exposes parseOffset for testing. +func ParseOffset(args map[string]any) int { return parseOffset(args) } + +// Paginate exposes paginate for testing. +func Paginate(events []any, offset, limit int) ([]any, int, bool) { + return paginate(events, offset, limit) +} diff --git a/pkg/agents/falcon/factory.go b/pkg/agents/falcon/factory.go index 8f4c39ea..f776a93a 100644 --- a/pkg/agents/falcon/factory.go +++ b/pkg/agents/falcon/factory.go @@ -15,6 +15,17 @@ type Factory struct { clientID string clientSecret string baseURL string + + storageClient interfaces.StorageClient + storagePrefix string +} + +// SetStorage implements agents.StorageAware. It receives the warren-wide +// shared storage client and prefix, used to snapshot large event result +// sets for stable pagination. +func (f *Factory) SetStorage(client interfaces.StorageClient, prefix string) { + f.storageClient = client + f.storagePrefix = prefix } // Flags implements agents.ToolSetFactory. @@ -66,6 +77,6 @@ func (f *Factory) Configure(ctx context.Context) (interfaces.ToolSet, error) { ) return &toolSet{ - internal: newInternalTool(tp, baseURL), + internal: newInternalTool(tp, baseURL, f.storageClient, f.storagePrefix), }, nil } diff --git a/pkg/agents/falcon/prompt/system.md b/pkg/agents/falcon/prompt/system.md index f480b665..70b62faa 100644 --- a/pkg/agents/falcon/prompt/system.md +++ b/pkg/agents/falcon/prompt/system.md @@ -164,6 +164,15 @@ CQL is used with `falcon_search_events` to query raw EDR telemetry data. CQL is - PowerShell executions: `FileName="powershell.exe" AND #event_simpleName=ProcessRollup2 | tail(50)` - Events by agent ID in last 24h: `aid=abc123` (use start="1d" parameter) +### Result Limits and Pagination + +`falcon_search_events` caps how many events it returns so the context is not flooded: + +- A filter (non-aggregate) query returns at most **200 events by default**. To retrieve more (up to **20,000**), append `| tail(N)` to the query string (e.g. `... | tail(5000)`). The tool does NOT add `tail` for you. +- When you only need the count of matching events, use an aggregation such as `| count()` instead of paging through raw events — it returns the exact total in a single call. +- Each call returns at most **100 events** (`limit`, max 100). The response includes `total`, `offset`, `returned`, `has_more`, and a `result_set_id`. +- To page through a larger result set, call again with the same `result_set_id` and an increased `offset`. This serves the next page from a stored snapshot without re-running the query, so paging is stable and cheap. Do not re-issue the original query just to get later pages. + ## Standard Investigation Workflow ### 1. Understand the Request diff --git a/pkg/agents/falcon/tool.go b/pkg/agents/falcon/tool.go index 95f77240..67a08bf9 100644 --- a/pkg/agents/falcon/tool.go +++ b/pkg/agents/falcon/tool.go @@ -9,11 +9,14 @@ import ( "io" "net/http" "net/url" + "strconv" "strings" "time" "github.com/gollem-dev/gollem" "github.com/m-mizutani/goerr/v2" + "github.com/secmon-lab/warren/pkg/domain/interfaces" + "github.com/secmon-lab/warren/pkg/utils/errutil" "github.com/secmon-lab/warren/pkg/utils/logging" "github.com/secmon-lab/warren/pkg/utils/msg" "github.com/secmon-lab/warren/pkg/utils/safe" @@ -24,14 +27,23 @@ type internalTool struct { tokenProvider *tokenProvider baseURL string httpClient *http.Client + + // storage holds event search snapshots for stable pagination across + // separate tool calls. It is the warren-wide shared client (bucket + // bound). When nil, event search degrades to returning the first page + // directly without snapshotting. storagePrefix namespaces the objects. + storage interfaces.StorageClient + storagePrefix string } // newInternalTool creates a new internalTool for Falcon API calls. -func newInternalTool(tp *tokenProvider, baseURL string) *internalTool { +func newInternalTool(tp *tokenProvider, baseURL string, storage interfaces.StorageClient, storagePrefix string) *internalTool { return &internalTool{ tokenProvider: tp, baseURL: baseURL, httpClient: &http.Client{Timeout: 60 * time.Second}, + storage: storage, + storagePrefix: storagePrefix, } } @@ -176,13 +188,15 @@ func (t *internalTool) Specs(_ context.Context) ([]gollem.ToolSpec, error) { }, }, { - Name: "falcon_search_events", - Description: "Search EDR telemetry events using CrowdStrike Query Language (CQL). This uses the Next-Gen SIEM Search API to query raw event data (process executions, network connections, file writes, DNS requests, etc.). The search runs asynchronously and this tool automatically polls until results are ready.", + Name: "falcon_search_events", + Description: "Search EDR telemetry events using CrowdStrike Query Language (CQL) via the Next-Gen SIEM Search API (process executions, network connections, file writes, DNS requests, etc.). " + + "Results are returned in pages: at most 100 events are returned per call along with the total count. " + + "NOTE: a filter query returns at most 200 events by default; to retrieve more (up to 20000), append `| tail(N)` to query_string. For the exact number of matching events, use an aggregation like `| count()` instead of paging through raw events. " + + "To page through results, pass the returned result_set_id with an increased offset; this avoids re-running the query.", Parameters: map[string]*gollem.Parameter{ "query_string": { Type: gollem.TypeString, - Description: "CQL query string (e.g., \"aid=abc123\", \"#event_simpleName=ProcessRollup2 AND FileName=cmd.exe\", \"ComputerName=workstation1 | tail(100)\")", - Required: true, + Description: "CQL query string (e.g., \"aid=abc123\", \"#event_simpleName=ProcessRollup2 AND FileName=cmd.exe\", \"ComputerName=workstation1 | tail(1000)\"). Required for a new search; omitted when paging with result_set_id.", }, "repository": { Type: gollem.TypeString, @@ -196,6 +210,18 @@ func (t *internalTool) Specs(_ context.Context) ([]gollem.ToolSpec, error) { Type: gollem.TypeString, Description: "End time for the search (e.g., \"now\", \"2025-01-02T00:00:00Z\"). Default: \"now\"", }, + "limit": { + Type: gollem.TypeNumber, + Description: "Maximum number of events to return in this page (default: 100, max: 100).", + }, + "offset": { + Type: gollem.TypeNumber, + Description: "Zero-based index of the first event to return for pagination (default: 0).", + }, + "result_set_id": { + Type: gollem.TypeString, + Description: "ID of a previously created result set (returned by an earlier call). When set, returns another page from the stored snapshot without re-running the query. Use together with offset/limit.", + }, }, }, }, nil @@ -503,14 +529,35 @@ func (t *internalTool) getCrowdScores(ctx context.Context, args map[string]any) return result, nil } -// searchEvents runs a CQL query via the Next-Gen SIEM Search API. -// It creates a query job and polls until the job completes, returning all events. +const ( + defaultEventLimit = 100 + maxEventLimit = 100 + maxEventPolls = 60 + eventPollInterval = 2 * time.Second +) + +// searchEvents runs a CQL query via the Next-Gen SIEM Search API and returns +// events one page at a time (at most maxEventLimit per call) together with the +// total result-set size. +// +// On a new search it polls the query job until completion and, when storage is +// configured, snapshots the full result set so later pages can be served via +// result_set_id without re-running the query. The query job poll returns the +// cumulative result set on every poll, so only the final (done) response is +// authoritative — earlier polls are not accumulated (doing so would duplicate +// events). func (t *internalTool) searchEvents(ctx context.Context, args map[string]any) (map[string]any, error) { - log := logging.From(ctx) + limit := parseLimit(args) + offset := parseOffset(args) + + // Pagination over an existing snapshot: serve from storage, no query run. + if resultSetID, ok := args["result_set_id"].(string); ok && resultSetID != "" { + return t.paginateEventSnapshot(ctx, resultSetID, offset, limit) + } queryString, ok := args["query_string"].(string) if !ok || queryString == "" { - return nil, goerr.New("query_string is required") + return nil, goerr.New("query_string is required", goerr.T(errutil.TagValidation)) } repository := "search-all" @@ -518,9 +565,49 @@ func (t *internalTool) searchEvents(ctx context.Context, args map[string]any) (m repository = repo } - body := map[string]any{ - "queryString": queryString, + jobID, events, metadata, done, err := t.runEventQuery(ctx, queryString, repository, args) + if err != nil { + return nil, err + } + + // Snapshot for later pagination. Best-effort: a storage failure must not + // fail the search, but it does mean only the first page is reachable. + resultSetID := "" + if t.storage != nil { + if err := t.writeEventSnapshot(ctx, jobID, events); err != nil { + errutil.Handle(ctx, goerr.Wrap(err, "failed to snapshot falcon events for pagination", goerr.V("job_id", jobID))) + msg.Warn(ctx, "⚠️ *[Falcon]* Could not store result set for pagination; only the first page is available") + } else { + resultSetID = jobID + } } + + page, returned, hasMore := paginate(events, offset, limit) + result := buildEventResult(resultSetID, page, len(events), offset, limit, returned, hasMore, done) + result["repository"] = repository + if metadata != nil { + result["metadata"] = metadata + } + // Surface the true match count when the API reports it (it may exceed the + // number of events actually returned, e.g. when the query has no tail()). + if matched, ok := eventCountFromMetadata(metadata); ok && matched != len(events) { + result["total_matched"] = matched + } + if !done { + result["warning"] = "Search did not complete within the polling limit. Partial results returned." + } + return result, nil +} + +// runEventQuery creates a Next-Gen SIEM query job and polls until completion. +// It returns the job ID, the final (cumulative) events, the metaData object, +// and whether the search completed within the poll limit. Intermediate polls +// overwrite (not append) the events, since each poll returns the full result +// set computed so far. +func (t *internalTool) runEventQuery(ctx context.Context, queryString, repository string, args map[string]any) (jobID string, events []any, metadata any, done bool, err error) { + log := logging.From(ctx) + + body := map[string]any{"queryString": queryString} if start, ok := args["start"].(string); ok && start != "" { body["start"] = start } else { @@ -532,13 +619,13 @@ func (t *internalTool) searchEvents(ctx context.Context, args map[string]any) (m body["end"] = "now" } - // Step 1: Create query job msg.Trace(ctx, "🔍 Searching events (query: `%s`, repo: `%s`)", queryString, repository) jobPath := fmt.Sprintf("/humio/api/v1/repositories/%s/queryjobs", repository) jobResp, err := t.doRequest(ctx, http.MethodPost, jobPath, body) if err != nil { msg.Warn(ctx, "⚠️ *[Falcon]* Failed to create event search job (query: `%s`): %v", queryString, err) - return nil, goerr.Wrap(err, "failed to create event search query job", + return "", nil, nil, false, goerr.Wrap(err, "failed to create event search query job", + goerr.T(errutil.TagExternal), goerr.V("repository", repository), goerr.V("query", queryString), ) @@ -547,7 +634,8 @@ func (t *internalTool) searchEvents(ctx context.Context, args map[string]any) (m jobID, ok := jobResp["id"].(string) if !ok || jobID == "" { msg.Warn(ctx, "⚠️ *[Falcon]* No job ID returned from event search (query: `%s`)", queryString) - return nil, goerr.New("no job ID returned from query job creation", + return "", nil, nil, false, goerr.New("no job ID returned from query job creation", + goerr.T(errutil.TagExternal), goerr.V("response", jobResp), ) } @@ -555,81 +643,236 @@ func (t *internalTool) searchEvents(ctx context.Context, args map[string]any) (m log.Debug("Event search query job created", "job_id", jobID, "repository", repository) msg.Trace(ctx, "⏳ Event search job created (job_id: `%s`), polling for results...", jobID) - // Step 2: Poll for results until done resultPath := fmt.Sprintf("/humio/api/v1/repositories/%s/queryjobs/%s", repository, jobID) - const ( - maxPolls = 60 - pollInterval = 2 * time.Second - ) - var allEvents []any + var lastEvents []any + var lastMeta any - for i := range maxPolls { + for i := range maxEventPolls { if i > 0 { select { case <-ctx.Done(): - return nil, goerr.Wrap(ctx.Err(), "context canceled while polling event search") - case <-time.After(pollInterval): + return "", nil, nil, false, goerr.Wrap(ctx.Err(), "context canceled while polling event search", goerr.T(errutil.TagTimeout)) + case <-time.After(eventPollInterval): } } pollResp, err := t.doRequest(ctx, http.MethodGet, resultPath, nil) if err != nil { msg.Warn(ctx, "⚠️ *[Falcon]* Failed to poll event search results (job: `%s`, attempt %d): %v", jobID, i+1, err) - return nil, goerr.Wrap(err, "failed to poll event search results", + return "", nil, nil, false, goerr.Wrap(err, "failed to poll event search results", + goerr.T(errutil.TagExternal), goerr.V("job_id", jobID), goerr.V("poll_attempt", i+1), ) } - // Collect events from this poll - if events, ok := pollResp["events"].([]any); ok { - allEvents = append(allEvents, events...) + // Each poll returns the cumulative result set, so overwrite rather + // than append to avoid duplicating events across polls. + if evs, ok := pollResp["events"].([]any); ok { + lastEvents = evs + } + if meta, ok := pollResp["metaData"]; ok { + lastMeta = meta } - // Check if done - if done, ok := pollResp["done"].(bool); ok && done { - log.Debug("Event search completed", - "job_id", jobID, - "total_events", len(allEvents), - "polls", i+1, - ) - msg.Trace(ctx, "✅ Event search completed: %d events retrieved", len(allEvents)) + if isDone, _ := pollResp["done"].(bool); isDone { + log.Debug("Event search completed", "job_id", jobID, "total_events", len(lastEvents), "polls", i+1) + msg.Trace(ctx, "✅ Event search completed: %d events retrieved", len(lastEvents)) + return jobID, lastEvents, lastMeta, true, nil + } - result := map[string]any{ - "done": true, - "events": allEvents, - "repository": repository, - } + log.Debug("Event search still running, polling...", "job_id", jobID, "poll_attempt", i+1, "events_so_far", len(lastEvents)) + } + + log.Warn("Event search reached max poll limit, returning partial results", "job_id", jobID, "total_events", len(lastEvents)) + msg.Trace(ctx, "⚠️ Event search reached poll limit, returning %d partial results", len(lastEvents)) + return jobID, lastEvents, lastMeta, false, nil +} + +// paginateEventSnapshot serves a page of events from a previously stored +// snapshot, reading the newline-delimited JSON without holding the whole +// document as a single combined structure. +func (t *internalTool) paginateEventSnapshot(ctx context.Context, resultSetID string, offset, limit int) (map[string]any, error) { + if t.storage == nil { + return nil, goerr.New("result set pagination is unavailable because storage is not configured", + goerr.T(errutil.TagInvalidState), goerr.V("result_set_id", resultSetID)) + } + + path := t.eventSnapshotPath(resultSetID) + r, err := t.storage.GetObject(ctx, path) + if err != nil { + msg.Warn(ctx, "⚠️ *[Falcon]* Result set `%s` not found; re-run the search without result_set_id", resultSetID) + return nil, goerr.Wrap(err, "failed to open event snapshot", + goerr.T(errutil.TagNotFound), goerr.V("result_set_id", resultSetID)) + } + defer safe.Close(ctx, r) - // Include metadata if available - if meta, ok := pollResp["metadataResult"]; ok { - result["metadata"] = meta + page, total, err := decodeNDJSON(r, offset, limit) + if err != nil { + return nil, goerr.Wrap(err, "failed to read event snapshot", + goerr.T(errutil.TagInternal), goerr.V("result_set_id", resultSetID)) + } + + returned := len(page) + hasMore := offset+returned < total + msg.Trace(ctx, "📄 Returning events page from result set `%s` (offset: %d, returned: %d, total: %d)", resultSetID, offset, returned, total) + return buildEventResult(resultSetID, page, total, offset, limit, returned, hasMore, true), nil +} + +// writeEventSnapshot streams the events to object storage as newline-delimited +// JSON (one event per line) so later pages can be read back without re-running +// the query. Events are encoded one line at a time rather than as one combined +// document to avoid building a large intermediate payload. +func (t *internalTool) writeEventSnapshot(ctx context.Context, resultSetID string, events []any) error { + w := t.storage.PutObject(ctx, t.eventSnapshotPath(resultSetID)) + + enc := json.NewEncoder(w) + for _, ev := range events { + if err := enc.Encode(ev); err != nil { + // Skip Close so the partial object is never committed (GCS and the + // in-memory client both commit on Close). + return goerr.Wrap(err, "failed to encode event to snapshot", goerr.T(errutil.TagInternal)) + } + } + + if err := w.Close(); err != nil { + return goerr.Wrap(err, "failed to finalize event snapshot", goerr.T(errutil.TagInternal)) + } + return nil +} + +// eventSnapshotPath builds the storage object key for an event result set, +// honoring the warren-wide storage prefix. +func (t *internalTool) eventSnapshotPath(resultSetID string) string { + return fmt.Sprintf("%sfalcon/events/%s.ndjson", t.storagePrefix, resultSetID) +} + +// decodeNDJSON streams newline-delimited JSON events from r, materializing only +// the requested page (events at index [offset, offset+limit)) and counting the +// rest. Events outside the page are decoded as raw bytes and discarded, so peak +// retained memory stays bounded by the page size rather than the full result +// set (which may hold up to 20,000 events). +func decodeNDJSON(r io.Reader, offset, limit int) (page []any, total int, err error) { + page = []any{} + dec := json.NewDecoder(r) + for { + if total >= offset && len(page) < limit { + var ev any + if err := dec.Decode(&ev); err != nil { + if errors.Is(err, io.EOF) { + break + } + return nil, 0, goerr.Wrap(err, "failed to decode snapshot event") + } + page = append(page, ev) + } else { + // Skip events outside the page without parsing them into maps. + var raw json.RawMessage + if err := dec.Decode(&raw); err != nil { + if errors.Is(err, io.EOF) { + break + } + return nil, 0, goerr.Wrap(err, "failed to decode snapshot event") } + } + total++ + } + return page, total, nil +} - return result, nil +// parseLimit returns the page size bounded to [1, maxEventLimit], defaulting to +// defaultEventLimit for missing or invalid input. +func parseLimit(args map[string]any) int { + n, ok := parseIntArg(args["limit"]) + if !ok || n < 1 { + return defaultEventLimit + } + if n > maxEventLimit { + return maxEventLimit + } + return n +} + +// parseOffset returns a non-negative pagination offset, defaulting to 0. +func parseOffset(args map[string]any) int { + n, ok := parseIntArg(args["offset"]) + if !ok || n < 0 { + return 0 + } + return n +} + +// parseIntArg interprets a tool argument as an int. gollem may deliver numbers +// as float64 or as strings, so both are accepted. +func parseIntArg(v any) (int, bool) { + switch n := v.(type) { + case float64: + return int(n), true + case int: + return n, true + case string: + trimmed := strings.TrimSpace(n) + if trimmed == "" { + return 0, false } + i, err := strconv.Atoi(trimmed) + if err != nil { + return 0, false + } + return i, true + default: + return 0, false + } +} - log.Debug("Event search still running, polling...", - "job_id", jobID, - "poll_attempt", i+1, - "events_so_far", len(allEvents), - ) +// paginate returns events[offset:offset+limit] along with the number returned +// and whether any events remain beyond the page. +func paginate(events []any, offset, limit int) (page []any, returned int, hasMore bool) { + total := len(events) + if offset >= total { + return []any{}, 0, false } + end := min(offset+limit, total) + page = events[offset:end] + return page, len(page), end < total +} - // Return partial results if max polls reached - log.Warn("Event search reached max poll limit, returning partial results", - "job_id", jobID, - "total_events", len(allEvents), - ) - msg.Trace(ctx, "⚠️ Event search reached poll limit, returning %d partial results", len(allEvents)) +// eventCountFromMetadata extracts the total matched event count from the query +// job metaData, when present. +func eventCountFromMetadata(metadata any) (int, bool) { + m, ok := metadata.(map[string]any) + if !ok { + return 0, false + } + switch ec := m["eventCount"].(type) { + case float64: + return int(ec), true + case int: + return ec, true + default: + return 0, false + } +} - return map[string]any{ - "done": false, - "events": allEvents, - "repository": repository, - "warning": "Search did not complete within the polling limit. Partial results returned.", - }, nil +// buildEventResult assembles the common paginated event response fields. +func buildEventResult(resultSetID string, page []any, total, offset, limit, returned int, hasMore, done bool) map[string]any { + if page == nil { + page = []any{} + } + result := map[string]any{ + "done": done, + "events": page, + "total": total, + "limit": limit, + "offset": offset, + "returned": returned, + "has_more": hasMore, + } + if resultSetID != "" { + result["result_set_id"] = resultSetID + } + return result } // buildQueryParams constructs URL query parameters from tool arguments. diff --git a/pkg/agents/falcon/tool_test.go b/pkg/agents/falcon/tool_test.go index 7db1c467..99ece412 100644 --- a/pkg/agents/falcon/tool_test.go +++ b/pkg/agents/falcon/tool_test.go @@ -11,6 +11,7 @@ import ( "testing" "github.com/m-mizutani/gt" + "github.com/secmon-lab/warren/pkg/adapter/storage" "github.com/secmon-lab/warren/pkg/agents/falcon" ) @@ -412,8 +413,22 @@ func TestInternalTool_SearchEvents_Immediate(t *testing.T) { events, ok := result["events"].([]any) gt.True(t, ok) gt.Equal(t, len(events), 1) + + // Pagination metadata is present even without storage. + gt.Equal(t, result["total"].(int), 1) + gt.Equal(t, result["returned"].(int), 1) + gt.Equal(t, result["offset"].(int), 0) + gt.Equal(t, result["limit"].(int), 100) + gt.Equal(t, result["has_more"].(bool), false) + // No storage configured -> no result_set_id for further paging. + _, hasID := result["result_set_id"] + gt.False(t, hasID) } +// TestInternalTool_SearchEvents_WithPolling verifies that polls are not +// accumulated: the query job returns the cumulative result set on each poll, +// so only the final (done) response is authoritative and events must not be +// duplicated across polls. func TestInternalTool_SearchEvents_WithPolling(t *testing.T) { var pollCount int @@ -431,26 +446,19 @@ func TestInternalTool_SearchEvents_WithPolling(t *testing.T) { pollCount++ w.Header().Set("Content-Type", "application/json") - if pollCount < 3 { - // Not done yet, return partial results - err := json.NewEncoder(w).Encode(map[string]any{ - "done": false, - "cancelled": false, - "events": []map[string]any{ - {"aid": "test", "event": fmt.Sprintf("event-%d", pollCount)}, - }, - }) - gt.NoError(t, err) - return + // Each poll returns the cumulative result set computed so far. + cumulative := []map[string]any{{"aid": "test", "event": "event-1"}} + if pollCount >= 2 { + cumulative = append(cumulative, map[string]any{"aid": "test", "event": "event-2"}) + } + if pollCount >= 3 { + cumulative = append(cumulative, map[string]any{"aid": "test", "event": "event-3"}) } - // Done on third poll err := json.NewEncoder(w).Encode(map[string]any{ - "done": true, + "done": pollCount >= 3, "cancelled": false, - "events": []map[string]any{ - {"aid": "test", "event": "event-final"}, - }, + "events": cumulative, }) gt.NoError(t, err) return @@ -473,12 +481,23 @@ func TestInternalTool_SearchEvents_WithPolling(t *testing.T) { gt.True(t, ok) gt.True(t, done) - // Should have accumulated events from all polls + // Only the final cumulative result is used: exactly 3 distinct events, + // no duplication from intermediate polls. events, ok := result["events"].([]any) gt.True(t, ok) - gt.Equal(t, len(events), 3) // 1 from poll 1 + 1 from poll 2 + 1 from final + gt.Equal(t, len(events), 3) + gt.Equal(t, result["total"].(int), 3) gt.Equal(t, pollCount, 3) + seen := map[string]bool{} + for _, e := range events { + ev := e.(map[string]any) + name := ev["event"].(string) + gt.False(t, seen[name]) // no duplicates + seen[name] = true + } + gt.True(t, seen["event-1"] && seen["event-2"] && seen["event-3"]) + repo, ok := result["repository"].(string) gt.True(t, ok) gt.Equal(t, repo, "investigate_view") @@ -490,6 +509,183 @@ func TestInternalTool_SearchEvents_MissingQueryString(t *testing.T) { gt.Error(t, err) } +// eventsSearchServer returns a test server whose query job completes +// immediately with the given number of generated events, and counts how many +// search-related (job create + poll) requests it received. +func eventsSearchServer(t *testing.T, jobID string, numEvents int, eventCount any, hits *int) *httptest.Server { + t.Helper() + return newTestServer(t, func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPost && strings.Contains(r.URL.Path, "/queryjobs") { + *hits++ + w.Header().Set("Content-Type", "application/json") + gt.NoError(t, json.NewEncoder(w).Encode(map[string]any{"id": jobID})) + return + } + if r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/queryjobs/"+jobID) { + *hits++ + events := make([]map[string]any, numEvents) + for i := range events { + events[i] = map[string]any{"idx": i, "name": fmt.Sprintf("event-%d", i)} + } + resp := map[string]any{"done": true, "cancelled": false, "events": events} + if eventCount != nil { + resp["metaData"] = map[string]any{"eventCount": eventCount} + } + w.Header().Set("Content-Type", "application/json") + gt.NoError(t, json.NewEncoder(w).Encode(resp)) + return + } + t.Fatalf("unexpected request: %s %s", r.Method, r.URL.Path) + }) +} + +func TestInternalTool_SearchEvents_SnapshotPagination(t *testing.T) { + var hits int + srv := eventsSearchServer(t, "job-page", 250, nil, &hits) + defer srv.Close() + + store := storage.NewMemoryClient() + tool := falcon.NewInternalToolForTestWithStorage("id", "secret", srv.URL, store, "tenant/") + + // First call: runs the query, snapshots, returns first page. + first, err := tool.Run(context.Background(), "falcon_search_events", map[string]any{ + "query_string": "#event_simpleName=ProcessRollup2 | tail(250)", + }) + gt.NoError(t, err) + gt.Equal(t, first["total"].(int), 250) + gt.Equal(t, first["returned"].(int), 100) + gt.Equal(t, first["has_more"].(bool), true) + rsID := first["result_set_id"].(string) + gt.Equal(t, rsID, "job-page") + firstEvents := first["events"].([]any) + gt.Equal(t, len(firstEvents), 100) + gt.Equal(t, firstEvents[0].(map[string]any)["idx"].(float64), float64(0)) + + hitsAfterFirst := hits + + // Second call: paginate from the snapshot, no re-query. + second, err := tool.Run(context.Background(), "falcon_search_events", map[string]any{ + "result_set_id": rsID, + "offset": 100, + }) + gt.NoError(t, err) + gt.Equal(t, second["total"].(int), 250) + gt.Equal(t, second["offset"].(int), 100) + gt.Equal(t, second["returned"].(int), 100) + gt.Equal(t, second["has_more"].(bool), true) + secondEvents := second["events"].([]any) + gt.Equal(t, len(secondEvents), 100) + gt.Equal(t, secondEvents[0].(map[string]any)["idx"].(float64), float64(100)) + + // Last page. + third, err := tool.Run(context.Background(), "falcon_search_events", map[string]any{ + "result_set_id": rsID, + "offset": 200, + }) + gt.NoError(t, err) + gt.Equal(t, third["returned"].(int), 50) + gt.Equal(t, third["has_more"].(bool), false) + gt.Equal(t, third["events"].([]any)[49].(map[string]any)["idx"].(float64), float64(249)) + + // Offset beyond the total still reports the full total with an empty page. + beyond, err := tool.Run(context.Background(), "falcon_search_events", map[string]any{ + "result_set_id": rsID, + "offset": 1000, + }) + gt.NoError(t, err) + gt.Equal(t, beyond["total"].(int), 250) + gt.Equal(t, beyond["returned"].(int), 0) + gt.Equal(t, beyond["has_more"].(bool), false) + gt.Equal(t, len(beyond["events"].([]any)), 0) + + // Pagination must not have triggered any additional Falcon API requests. + gt.Equal(t, hits, hitsAfterFirst) +} + +func TestInternalTool_SearchEvents_LimitClamp(t *testing.T) { + var hits int + srv := eventsSearchServer(t, "job-clamp", 150, nil, &hits) + defer srv.Close() + + store := storage.NewMemoryClient() + tool := falcon.NewInternalToolForTestWithStorage("id", "secret", srv.URL, store, "") + + result, err := tool.Run(context.Background(), "falcon_search_events", map[string]any{ + "query_string": "aid=x | tail(150)", + "limit": 500, // exceeds max, must clamp to 100 + }) + gt.NoError(t, err) + gt.Equal(t, result["limit"].(int), 100) + gt.Equal(t, result["returned"].(int), 100) + gt.Equal(t, result["total"].(int), 150) + gt.Equal(t, result["has_more"].(bool), true) +} + +func TestInternalTool_SearchEvents_TotalMatchedFromMetadata(t *testing.T) { + var hits int + // API returns 200 events but reports 5000 matched via metaData.eventCount. + srv := eventsSearchServer(t, "job-meta", 200, float64(5000), &hits) + defer srv.Close() + + store := storage.NewMemoryClient() + tool := falcon.NewInternalToolForTestWithStorage("id", "secret", srv.URL, store, "") + + result, err := tool.Run(context.Background(), "falcon_search_events", map[string]any{ + "query_string": "aid=x", + }) + gt.NoError(t, err) + // total reflects the pageable snapshot size; total_matched reports the + // true match count from the API. + gt.Equal(t, result["total"].(int), 200) + gt.Equal(t, result["total_matched"].(int), 5000) +} + +func TestInternalTool_SearchEvents_ResultSetNotFound(t *testing.T) { + store := storage.NewMemoryClient() + tool := falcon.NewInternalToolForTestWithStorage("id", "secret", "http://localhost", store, "") + + _, err := tool.Run(context.Background(), "falcon_search_events", map[string]any{ + "result_set_id": "does-not-exist", + "offset": 0, + }) + gt.Error(t, err) +} + +func TestParseLimit(t *testing.T) { + gt.Equal(t, falcon.ParseLimit(map[string]any{}), 100) // default + gt.Equal(t, falcon.ParseLimit(map[string]any{"limit": float64(50)}), 50) // in range + gt.Equal(t, falcon.ParseLimit(map[string]any{"limit": float64(500)}), 100) // clamp + gt.Equal(t, falcon.ParseLimit(map[string]any{"limit": float64(0)}), 100) // invalid -> default + gt.Equal(t, falcon.ParseLimit(map[string]any{"limit": "30"}), 30) // string number + gt.Equal(t, falcon.ParseLimit(map[string]any{"limit": "bad"}), 100) // unparsable -> default +} + +func TestParseOffset(t *testing.T) { + gt.Equal(t, falcon.ParseOffset(map[string]any{}), 0) + gt.Equal(t, falcon.ParseOffset(map[string]any{"offset": float64(100)}), 100) + gt.Equal(t, falcon.ParseOffset(map[string]any{"offset": float64(-5)}), 0) // negative -> 0 + gt.Equal(t, falcon.ParseOffset(map[string]any{"offset": "20"}), 20) +} + +func TestPaginate(t *testing.T) { + events := []any{"a", "b", "c", "d", "e"} + + page, returned, hasMore := falcon.Paginate(events, 0, 2) + gt.Equal(t, returned, 2) + gt.Equal(t, hasMore, true) + gt.Equal(t, page, []any{"a", "b"}) + + page, returned, hasMore = falcon.Paginate(events, 4, 2) + gt.Equal(t, returned, 1) + gt.Equal(t, hasMore, false) + gt.Equal(t, page, []any{"e"}) + + page, returned, hasMore = falcon.Paginate(events, 10, 2) + gt.Equal(t, returned, 0) + gt.Equal(t, hasMore, false) + gt.Equal(t, len(page), 0) +} + func TestInternalTool_SearchDevices(t *testing.T) { srv := newTestServer(t, func(w http.ResponseWriter, r *http.Request) { gt.Equal(t, r.Method, http.MethodGet) diff --git a/pkg/cli/chat.go b/pkg/cli/chat.go index 3ce7f308..7e05cb34 100644 --- a/pkg/cli/chat.go +++ b/pkg/cli/chat.go @@ -135,7 +135,7 @@ func cmdChat() *cli.Command { } // Initialize all configured agents and merge into tool sets - agentToolSets, err := agents.ConfigureAll(ctx) + agentToolSets, err := agents.ConfigureAll(ctx, storageClient, storageCfg.Prefix()) if err != nil { return goerr.Wrap(err, "failed to configure agents") } diff --git a/pkg/cli/serve.go b/pkg/cli/serve.go index e09166f1..093779d0 100644 --- a/pkg/cli/serve.go +++ b/pkg/cli/serve.go @@ -323,7 +323,7 @@ func cmdServe() *cli.Command { tagService := tag.New(repo) // Initialize all configured agents and merge into tool sets - agentToolSets, err := agents.ConfigureAll(ctx) + agentToolSets, err := agents.ConfigureAll(ctx, storageClient, storageCfg.Prefix()) if err != nil { return goerr.Wrap(err, "failed to configure agents") }