diff --git a/pkg/epp/flowcontrol/README.md b/pkg/epp/flowcontrol/README.md new file mode 100644 index 000000000..5db86f446 --- /dev/null +++ b/pkg/epp/flowcontrol/README.md @@ -0,0 +1,116 @@ +# Flow Control Module + +## Introduction + +In a multi-tenant, heterogeneous inference serving environment, managing diverse SLOs and fairness requirements is +critical. Today, the serving stack often relies on a simple "best-effort" or FIFO (First-In, First-Out) basis for +handling requests. This is insufficient and leads to significant problems: + +* **Head-of-Line Blocking**: A long-running, low-priority request can block short, high-priority requests, violating + SLOs. +* **Lack of Predictability**: Without proper queuing and prioritization, it's impossible to provide predictable latency + guarantees to different tenants. +* **Inability to Handle Saturation**: Under heavy load, the system has no graceful way to manage overload, leading to + cascading failures instead of controlled degradation. + +The Flow Controller is a sophisticated library designed to solve these problems. It acts as a crucial gatekeeper that +decides *if* and *when* a request should proceed to be scheduled. Its primary mission is to enable predictable, fair, +and efficient utilization of shared backend resources by enforcing prioritization, applying fairness policies, managing +request queuing under saturation, and orchestrating displacement (the eviction of lower-priority queued items to make +space for higher-priority ones). + +It is designed for extensibility, allowing custom logic for policies and queuing mechanisms to be plugged into a robust, +high-performance orchestration engine. + +### Role in the Gateway API Inference Extension + +Within the Gateway API Inference Extension's Endpoint Picker (EPP), the Flow Controller acts as a crucial gatekeeper +between the Routing and Scheduling layers. It decides *if* and *when* a request, already assigned to a logical flow +(e.g., a specific workload or tenant), should proceed to be scheduled onto a backend resource. It is the primary +mechanism for managing diverse SLOs, ensuring fairness among competing workloads, and maintaining system stability under +high load. + +### High Level Architecture + +The following diagram illustrates the high-level dependency model and request flow for the system. It shows how +concurrent client requests are managed by the central `FlowController`, which in turn relies on a set of decoupled +components to make its decisions. Each component package in this module will contain its own more detailed architectural +diagrams. + +```mermaid +graph LR + %% Style Definitions + classDef default fill:#fff,stroke:#333,stroke-width:1.5px,color:#000; + classDef client fill:#dcfce7,stroke:#333; + classDef system_entry fill:#fef9c3,stroke:#333; + classDef downstream_ok fill:#dbeafe,stroke:#333; + classDef downstream_err fill:#fee2e2,stroke:#333; + + %% Client Goroutines (Fan-In) + subgraph Client Goroutines + direction TB + R1(Goroutine 1); + R2(Goroutine N); + end + + %% Flow Control System + subgraph Flow Control System + C{Flow Controller Engine}; + + subgraph Internal Interactions + direction LR + D(Ports) -- "abstracts state" --> E(Flow Registry); + D -- "abstracts load" --> SD(Saturation Detector); + E -- "configures" --> F(Framework); + F -- "defines" --> P(Plugins: Queues & Policies); + end + + C -- "Orchestrates via
abstractions" --> D; + end + + %% Downstream Actions (Fan-Out) + subgraph Downstream Actions + direction TB + A1(Outcome: Dispatched
Proceed to Scheduler); + A2(Outcome: Rejected
Return Error); + end + + %% Connections + R1 -- "calls & blocks" --> C; + R2 -- "calls & blocks" --> C; + C -- "unblocks 'goroutine 1'" --> A1; + C -- "unblocks 'goroutine N'" --> A2; + + %% Apply Classes + class R1,R2 client; + class C system_entry; + class A1 downstream_ok; + class A2 downstream_err; + class D,E,F,P,SD default; +``` + +## Architectural Pillars + +The Flow Controller framework is built on several key components that work in concert. This architecture is designed to +be highly modular and scalable, with clear separation of concerns. For a deep dive into the specific design choices and +their justifications, please refer to the detailed documentation within the relevant sub-packages. + +1. **The `FlowController` Engine (`./controller`)**: The central, sharded orchestrator responsible for the main request + processing loop. It manages a pool of workers that distribute incoming requests, apply policies, and dispatch + requests to the backends. Its design focuses on high throughput and backpressure. + +2. **Pluggable `Policy` Framework (`./framework`)**: This defines the core interfaces for all pluggable logic. It + features a two-tier policy system for `InterFlow` (decisions *between* different flows) and `IntraFlow` + (decisions *within* a single flow) logic, covering both request dispatch and displacement. + +3. **Extensible `SafeQueue` System (`./framework`)**: This defines the `framework.SafeQueue` interface for + concurrent-safe request storage. It uses a `QueueCapability` system that allows for diverse and extensible queue + implementations (e.g., FIFO, Priority Heap) while maintaining a stable interface. + +4. **The `FlowRegistry` (`./registry`, `./ports`)**: This is the stateful control plane of the system. It manages the + configuration and lifecycle of all flows, policies, and queues. It presents a sharded view of its state to the + `FlowController` workers to enable parallel operation with minimal lock contention. + +5. **Core Types and Service Ports (`./types`, `./ports`)**: These packages define the foundational data structures + (e.g., `FlowControlRequest`), errors, and service interfaces that decouple the engine from its dependencies, + following a "Ports and Adapters" architectural style. diff --git a/pkg/epp/flowcontrol/types/README.md b/pkg/epp/flowcontrol/types/README.md new file mode 100644 index 000000000..a75e976fc --- /dev/null +++ b/pkg/epp/flowcontrol/types/README.md @@ -0,0 +1,33 @@ +# Flow Control Core Types + +This package defines the fundamental data structures, interfaces, and errors that form the vocabulary of the Flow +Control system. It establishes the core concepts of the request lifecycle and its final, reportable outcomes. + +## Request Lifecycle Interfaces + +A request's journey through the Flow Controller is represented by a series of interfaces that define its state as it +moves through the system: + +1. **`FlowControlRequest`**: The initial, "raw" contract for an incoming request. It carries the essential data + provided by the client, such as its `FlowID` and `ByteSize`. +2. **`QueueItemAccessor`**: The internal, enriched, and read-only view of a request once it has been accepted by the + controller. This interface is the primary means by which policy plugins inspect items. +3. **`QueueItemHandle`**: An opaque, queue-specific handle to a queued item. The controller uses this handle to perform + targeted operations, such as removing a specific item, without needing to know the queue's internal implementation + details. + +## Final State Reporting: Outcomes and Errors + +The final state of every request is reported using a combination of a `QueueOutcome` enum and a corresponding `error`. +This provides a clear, machine-inspectable way to understand the result. + +* **`QueueOutcome`**: A concise enum summarizing the final result (e.g., `QueueOutcomeDispatched`, + `QueueOutcomeRejectedCapacity`, `QueueOutcomeEvictedDisplaced`). This is ideal for metrics. + +* **Errors**: For any non-dispatch outcome, a specific sentinel error is returned. These are nested to provide detailed + context: + * `ErrRejected`: The parent error for any request rejected *before* being enqueued. + * `ErrEvicted`: The parent error for any request removed *after* being enqueued for reasons other than dispatch. + +Callers of `FlowController.EnqueueAndWait()` can first use `errors.Is()` to check for the general class of failure +(`ErrRejected` or `ErrEvicted`), and then unwrap the error to find the specific cause (e.g., `ErrQueueAtCapacity`). diff --git a/pkg/epp/flowcontrol/types/errors.go b/pkg/epp/flowcontrol/types/errors.go new file mode 100644 index 000000000..9e7882f94 --- /dev/null +++ b/pkg/epp/flowcontrol/types/errors.go @@ -0,0 +1,78 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package types + +import ( + "errors" +) + +// --- High Level Queue Outcome Errors --- +var ( + // ErrRejected is a sentinel error indicating a request was rejected by the Flow Controller *before* being formally + // enqueued. Errors returned by `FlowController.EnqueueAndWait()` that signify pre-queue rejection will wrap this + // error. + // Callers should use `errors.Is(err, ErrRejected)` to check for this general class of failure. + ErrRejected = errors.New("request rejected pre-queue") + + // ErrEvicted is a sentinel error indicating a request was removed from a queue *after* being successfully enqueued, + // but for reasons other than successful dispatch (e.g., TTL expiry, displacement). + // Errors returned by `FlowController.EnqueueAndWait()` that signify post-queue eviction will wrap this error. + // Callers should use `errors.Is(err, ErrEvicted)` to check for this general class of failure. + ErrEvicted = errors.New("request evicted from queue") +) + +// --- Pre-Enqueue Rejection Errors --- +// Errors that can occur before a request is formally added to a `framework.SafeQueue`. +// When returned by `FlowController.EnqueueAndWait()`, these specific errors will typically be wrapped by `ErrRejected`. +var ( + // ErrNilRequest indicates that a nil `types.FlowControlRequest` was provided. + ErrNilRequest = errors.New("FlowControlRequest cannot be nil") + + // ErrFlowIDEmpty indicates that a flow ID was empty when one was required. + ErrFlowIDEmpty = errors.New("flow ID cannot be empty") + + // ErrQueueAtCapacity indicates that a request could not be enqueued because queue capacity limits were met and + // displacement (if applicable) failed to make space. + ErrQueueAtCapacity = errors.New("queue at capacity and displacement failed to make space") +) + +// --- Post-Enqueue Eviction Errors --- +// Errors that occur when a request, already in a `framework.SafeQueue`, is removed for reasons other than dispatch. +// When returned by `FlowController.EnqueueAndWait()`, these specific errors will typically be wrapped by `ErrEvicted`. +var ( + // ErrTTLExpired indicates a request was evicted from a queue because its effective Time-To-Live expired. + ErrTTLExpired = errors.New("request TTL expired") + + // ErrContextCancelled indicates a request was evicted because its associated context (from + // `FlowControlRequest.Context()`) was cancelled. This error typically wraps the underlying `context.Canceled` or + // `context.DeadlineExceeded` error. + ErrContextCancelled = errors.New("request context cancelled") + + // ErrDisplaced indicates a request was evicted from a queue because it was chosen as a victim by a displacement + // policy to make space for another request. + ErrDisplaced = errors.New("request displaced") +) + +// --- General FlowController Errors --- +// General runtime errors for the Flow Controller. +var ( + // ErrFlowControllerShutdown indicates that an operation could not complete or an item was evicted because the Flow + // Controller is shutting down or has stopped. + // When returned by `FlowController.EnqueueAndWait()`, this will be wrapped by `ErrRejected` (if rejection happens + // before internal queuing) or `ErrEvicted` (if eviction happens after internal queuing). + ErrFlowControllerShutdown = errors.New("FlowController is shutting down") +) diff --git a/pkg/epp/flowcontrol/types/flow.go b/pkg/epp/flowcontrol/types/flow.go new file mode 100644 index 000000000..031a83799 --- /dev/null +++ b/pkg/epp/flowcontrol/types/flow.go @@ -0,0 +1,33 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package types defines the core data structures and service contracts for the Flow Controller system. It establishes +// the "vocabulary" of the system, including the request lifecycle interfaces, final outcomes, and standard error types. +package types + +// FlowSpecification defines the configuration of a logical flow, encapsulating its identity and registered priority. +// +// A FlowSpecification acts as the registration key for a flow within the Flow Registry. +type FlowSpecification interface { + // ID returns the unique name or identifier for this flow (e.g., model name, tenant ID), corresponding to the value + // from `FlowControlRequest.FlowID()`. + ID() string + + // Priority returns the numerical priority level currently associated with this flow within the Flow Registry. + // + // Convention: Lower numerical values indicate higher priority. + Priority() uint +} diff --git a/pkg/epp/flowcontrol/types/outcomes.go b/pkg/epp/flowcontrol/types/outcomes.go new file mode 100644 index 000000000..7c52db934 --- /dev/null +++ b/pkg/epp/flowcontrol/types/outcomes.go @@ -0,0 +1,93 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package types + +import "strconv" + +// QueueOutcome represents the high-level final state of a request's lifecycle within the Flow Controller. +// +// It is returned by `FlowController.EnqueueAndWait()` along with a corresponding error. This enum is designed to be a +// low-cardinality label ideal for metrics, while the error provides fine-grained details for non-dispatched outcomes. +type QueueOutcome int + +const ( + // QueueOutcomeDispatched indicates the request was successfully processed by the Flow Controller and unblocked for + // the caller to proceed. + // The associated error from `FlowController.EnqueueAndWait()` will be nil. + QueueOutcomeDispatched QueueOutcome = iota + + // --- Pre-Enqueue Rejection Outcomes (request never entered a `framework.SafeQueue`) --- + // For these outcomes, the error from `FlowController.EnqueueAndWait()` will wrap `ErrRejected`. + + // QueueOutcomeRejectedCapacity indicates rejection because queue capacity limits were met and displacement (if + // applicable) failed to make space. + // The associated error will wrap `ErrQueueAtCapacity` (and `ErrRejected`). + QueueOutcomeRejectedCapacity + + // QueueOutcomeRejectedOther indicates rejection for reasons other than capacity before the request was formally + // enqueued. + // The specific underlying cause can be determined from the associated error (e.g., a nil request, an unregistered + // flow ID, or a controller shutdown), which will be wrapped by `ErrRejected`. + QueueOutcomeRejectedOther + + // --- Post-Enqueue Eviction Outcomes (request was in a `framework.SafeQueue` but not dispatched) --- + // For these outcomes, the error from `FlowController.EnqueueAndWait()` will wrap `ErrEvicted`. + + // QueueOutcomeEvictedTTL indicates eviction from a queue because the request's effective Time-To-Live expired. + // The associated error will wrap `ErrTTLExpired` (and `ErrEvicted`). + QueueOutcomeEvictedTTL + + // QueueOutcomeEvictedContextCancelled indicates eviction from a queue because the request's own context (from + // `FlowControlRequest.Context()`) was cancelled. + // The associated error will wrap `ErrContextCancelled` (which may further wrap the underlying `context.Canceled` or + // `context.DeadlineExceeded` error) (and `ErrEvicted`). + QueueOutcomeEvictedContextCancelled + + // QueueOutcomeEvictedDisplaced indicates eviction from a queue to make space for another request due to a + // displacement policy. + // The associated error will wrap `ErrDisplaced` (and `ErrEvicted`). + QueueOutcomeEvictedDisplaced + + // QueueOutcomeEvictedOther indicates eviction from a queue for reasons not covered by more specific eviction + // outcomes. + // The specific underlying cause can be determined from the associated error (e.g., a controller shutdown while the + // item was queued), which will be wrapped by `ErrEvicted`. + QueueOutcomeEvictedOther +) + +// String returns a human-readable string representation of the QueueOutcome. +func (o QueueOutcome) String() string { + switch o { + case QueueOutcomeDispatched: + return "Dispatched" + case QueueOutcomeRejectedCapacity: + return "RejectedCapacity" + case QueueOutcomeRejectedOther: + return "RejectedOther" + case QueueOutcomeEvictedTTL: + return "EvictedTTL" + case QueueOutcomeEvictedContextCancelled: + return "EvictedContextCancelled" + case QueueOutcomeEvictedDisplaced: + return "EvictedDisplaced" + case QueueOutcomeEvictedOther: + return "EvictedOther" + default: + // Return the integer value for unknown outcomes to aid in debugging. + return "UnknownOutcome(" + strconv.Itoa(int(o)) + ")" + } +} diff --git a/pkg/epp/flowcontrol/types/request.go b/pkg/epp/flowcontrol/types/request.go new file mode 100644 index 000000000..bf6960507 --- /dev/null +++ b/pkg/epp/flowcontrol/types/request.go @@ -0,0 +1,121 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package types + +import ( + "context" + "time" +) + +// FlowControlRequest is the contract for an incoming request submitted to the Flow Controller. It represents the "raw" +// user-provided data and context for a single unit of work. +// +// An object implementing this interface is the primary input to `FlowController.EnqueueAndWait()`. The controller then +// wraps this object with its own internal structures (which implement `QueueItemAccessor`) to manage the request's +// lifecycle without modifying the original. +type FlowControlRequest interface { + // Context returns the request's context. The Flow Controller uses this for monitoring cancellation (e.g., if the + // client disconnects or a request-scoped timeout occurs), which can lead to the request being evicted from a queue. + Context() context.Context + + // FlowID returns the unique identifier for the flow this request belongs to (e.g., model name, tenant ID). The + // Flow Controller uses this ID, in conjunction with the flow's registered priority, to look up the active + // `ports.ManagedQueue` from the Flow Registry's `ports.RegistryShard`. + FlowID() string + + // ByteSize returns the request's size in bytes (e.g., prompt size). This is used by the Flow Controller and for + // managing byte-based capacity limits and for Flow Registry statistics. + ByteSize() uint64 + + // InitialEffectiveTTL returns the suggested Time-To-Live for this request. + // This value is treated as a hint; the Flow Controller may override it based on its own configuration or policies. + // A zero value indicates the request has no specific TTL preference, and a system-wide default should be applied. + InitialEffectiveTTL() time.Duration + + // ID returns an optional, user-facing unique identifier for this specific request. It is intended for logging, + // tracing, and observability. The core flow control logic does not use this ID for dispatching decisions; it uses + // the internal, opaque `QueueItemHandle`. + ID() string +} + +// QueueItemHandle is an opaque handle to an item that has been successfully added to a `framework.SafeQueue`. It acts +// as a key, allowing the Flow Controller to perform targeted operations (like removal) on a specific item without +// needing to know the queue's internal structure. +// +// A handle is created by and bound to the specific `framework.SafeQueue` instance that stores the item. +type QueueItemHandle interface { + // Handle returns the underlying, queue-specific raw handle (e.g., *list.Element). + // This method is intended for internal use by the `framework.SafeQueue` implementation that created it. + // Callers outside the queue implementation should treat the returned value as opaque. + Handle() any + + // Invalidate marks this handle as no longer valid for future operations. + // This method MUST be called by the `framework.SafeQueue` implementation itself after the item associated with this + // handle has been removed. + // + // Conformance: Implementations of this method MUST be idempotent. + Invalidate() + + // IsInvalidated returns true if this handle has been marked as invalid (e.g., by a call to `Invalidate`). + // A `framework.SafeQueue` MUST reject any operation that attempts to use an invalidated handle, typically by + // returning `framework.ErrInvalidQueueItemHandle`. + IsInvalidated() bool +} + +// QueueItemAccessor provides the internal, enriched, read-only view of a request being managed within the Flow +// Controller's queues. It is the primary interface through which `framework.SafeQueue` implementations and policy +// plugins interact with request data and its associated flow control metadata. +// +// The Flow Controller creates an object that implements this interface by wrapping an incoming `FlowControlRequest`. +type QueueItemAccessor interface { + // EnqueueTime is the timestamp when the item was logically accepted by the Flow Controller for queuing (i.e., when + // `FlowController.EnqueueAndWait()` was called. + EnqueueTime() time.Time + + // ByteSize returns the byte size of the original request, cached from `FlowControlRequest.ByteSize()`. + ByteSize() uint64 + + // FlowID returns the unique identifier of the flow this item belongs to, cached from `FlowControlRequest.FlowID()`. + FlowID() string + + // EffectiveTTL is the actual Time-To-Live assigned to this item by the Flow Controller, taking into account the + // request's preference (`FlowControlRequest.InitialEffectiveTTL()`) and any Flow Controller or per-flow + // defaults/policies. + EffectiveTTL() time.Duration + + // RequestID is the user-facing ID from the original request (`FlowControlRequest.ID()`). + RequestID() string + + // OriginalRequest returns the underlying `FlowControlRequest` that this accessor provides a view of. + // This method serves as an escape hatch, allowing policies or components that are aware of specific + // `FlowControlRequest` implementations to perform type assertions and access richer, application-specific data. + OriginalRequest() FlowControlRequest + + // Handle returns the `QueueItemHandle` associated with this item once it has been successfully added to a + // `framework.SafeQueue`. It returns nil if the item is not yet in a queue. + Handle() QueueItemHandle + + // SetHandle associates a `QueueItemHandle` with this item. + // + // Conformance: This method MUST be called by a `framework.SafeQueue` implementation within its `Add` method, + // immediately after a new `QueueItemHandle` is created for the item. This ensures that the item always carries a + // valid handle while it is in a queue. This method is not intended for use outside of `framework.SafeQueue` + // implementations. + // + //go:doc + SetHandle(handle QueueItemHandle) +}