Skip to content

Commit c86d3d8

Browse files
committed
feat(flowcontrol): add foundational types
Introduces the foundational packages for the new Flow Controller component. This change includes: - The top-level README outlining the motivation, high-level architecture, and component pillars. - The `types` package, which defines the core data contracts, request lifecycle interfaces, error-handling vocabulary, and final outcome enums for the entire module. This foundational PR establishes the core concepts and data models upon which the rest of the Flow Controller implementation will be built.
1 parent aa4989e commit c86d3d8

File tree

6 files changed

+474
-0
lines changed

6 files changed

+474
-0
lines changed

pkg/epp/flowcontrol/README.md

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# Flow Control Module
2+
3+
## Introduction
4+
5+
In a multi-tenant, heterogeneous inference serving environment, managing diverse SLOs and fairness requirements is
6+
critical. Today, the serving stack often relies on a simple "best-effort" or FIFO (First-In, First-Out) basis for
7+
handling requests. This is insufficient and leads to significant problems:
8+
9+
* **Head-of-Line Blocking**: A long-running, low-priority request can block short, high-priority requests, violating
10+
SLOs.
11+
* **Lack of Predictability**: Without proper queuing and prioritization, it's impossible to provide predictable latency
12+
guarantees to different tenants.
13+
* **Inability to Handle Saturation**: Under heavy load, the system has no graceful way to manage overload, leading to
14+
cascading failures instead of controlled degradation.
15+
16+
The Flow Controller is a sophisticated library designed to solve these problems. It acts as a crucial gatekeeper that
17+
decides *if* and *when* a request should proceed to be scheduled. Its primary mission is to enable predictable, fair,
18+
and efficient utilization of shared backend resources by enforcing prioritization, applying fairness policies, managing
19+
request queuing under saturation, and orchestrating displacement (the eviction of lower-priority queued items to make
20+
space for higher-priority ones).
21+
22+
It is designed for extensibility, allowing custom logic for policies and queuing mechanisms to be plugged into a robust,
23+
high-performance orchestration engine.
24+
25+
### Role in the Gateway API Inference Extension
26+
27+
Within the Gateway API Inference Extension's Endpoint Picker (EPP), the Flow Controller acts as a crucial gatekeeper
28+
between the Routing and Scheduling layers. It decides *if* and *when* a request, already assigned to a logical flow
29+
(e.g., a specific workload or tenant), should proceed to be scheduled onto a backend resource. It is the primary
30+
mechanism for managing diverse SLOs, ensuring fairness among competing workloads, and maintaining system stability under
31+
high load.
32+
33+
### High Level Architecture
34+
35+
The following diagram illustrates the high-level dependency model and request flow for the system. It shows how
36+
concurrent client requests are managed by the central `FlowController`, which in turn relies on a set of decoupled
37+
components to make its decisions. Each component package in this module will contain its own more detailed architectural
38+
diagrams.
39+
40+
```mermaid
41+
graph LR
42+
%% Style Definitions
43+
classDef default fill:#fff,stroke:#333,stroke-width:1.5px,color:#000;
44+
classDef client fill:#dcfce7,stroke:#333;
45+
classDef system_entry fill:#fef9c3,stroke:#333;
46+
classDef downstream_ok fill:#dbeafe,stroke:#333;
47+
classDef downstream_err fill:#fee2e2,stroke:#333;
48+
49+
%% Client Goroutines (Fan-In)
50+
subgraph Client Goroutines
51+
direction TB
52+
R1(Goroutine 1);
53+
R2(Goroutine N);
54+
end
55+
56+
%% Flow Control System
57+
subgraph Flow Control System
58+
C{Flow Controller Engine};
59+
60+
subgraph Internal Interactions
61+
direction LR
62+
D(Ports) -- "abstracts state" --> E(Flow Registry);
63+
D -- "abstracts load" --> SD(Saturation Detector);
64+
E -- "configures" --> F(Framework);
65+
F -- "defines" --> P(Plugins: Queues & Policies);
66+
end
67+
68+
C -- "Orchestrates via<br>abstractions" --> D;
69+
end
70+
71+
%% Downstream Actions (Fan-Out)
72+
subgraph Downstream Actions
73+
direction TB
74+
A1(Outcome: Dispatched<br>Proceed to Scheduler);
75+
A2(Outcome: Rejected<br>Return Error);
76+
end
77+
78+
%% Connections
79+
R1 -- "calls & blocks" --> C;
80+
R2 -- "calls & blocks" --> C;
81+
C -- "unblocks 'goroutine 1'" --> A1;
82+
C -- "unblocks 'goroutine N'" --> A2;
83+
84+
%% Apply Classes
85+
class R1,R2 client;
86+
class C system_entry;
87+
class A1 downstream_ok;
88+
class A2 downstream_err;
89+
class D,E,F,P,SD default;
90+
```
91+
92+
## Architectural Pillars
93+
94+
The Flow Controller framework is built on several key components that work in concert. This architecture is designed to
95+
be highly modular and scalable, with clear separation of concerns. For a deep dive into the specific design choices and
96+
their justifications, please refer to the detailed documentation within the relevant sub-packages.
97+
98+
1. **The `FlowController` Engine (`./controller`)**: The central, sharded orchestrator responsible for the main request
99+
processing loop. It manages a pool of workers that distribute incoming requests, apply policies, and dispatch
100+
requests to the backends. Its design focuses on high throughput and backpressure.
101+
102+
2. **Pluggable `Policy` Framework (`./framework`)**: This defines the core interfaces for all pluggable logic. It
103+
features a two-tier policy system for `InterFlow` (decisions *between* different flows) and `IntraFlow`
104+
(decisions *within* a single flow) logic, covering both request dispatch and displacement.
105+
106+
3. **Extensible `SafeQueue` System (`./framework`)**: This defines the `framework.SafeQueue` interface for
107+
concurrent-safe request storage. It uses a `QueueCapability` system that allows for diverse and extensible queue
108+
implementations (e.g., FIFO, Priority Heap) while maintaining a stable interface.
109+
110+
4. **The `FlowRegistry` (`./registry`, `./ports`)**: This is the stateful control plane of the system. It manages the
111+
configuration and lifecycle of all flows, policies, and queues. It presents a sharded view of its state to the
112+
`FlowController` workers to enable parallel operation with minimal lock contention.
113+
114+
5. **Core Types and Service Ports (`./types`, `./ports`)**: These packages define the foundational data structures
115+
(e.g., `FlowControlRequest`), errors, and service interfaces that decouple the engine from its dependencies,
116+
following a "Ports and Adapters" architectural style.

pkg/epp/flowcontrol/types/README.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Flow Control Core Types
2+
3+
This package defines the fundamental data structures, interfaces, and errors that form the vocabulary of the Flow
4+
Control system. It establishes the core concepts of the request lifecycle and its final, reportable outcomes.
5+
6+
## Request Lifecycle Interfaces
7+
8+
A request's journey through the Flow Controller is represented by a series of interfaces that define its state as it
9+
moves through the system:
10+
11+
1. **`FlowControlRequest`**: The initial, "raw" contract for an incoming request. It carries the essential data
12+
provided by the client, such as its `FlowID` and `ByteSize`.
13+
2. **`QueueItemAccessor`**: The internal, enriched, and read-only view of a request once it has been accepted by the
14+
controller. This interface is the primary means by which policy plugins inspect items.
15+
3. **`QueueItemHandle`**: An opaque, queue-specific handle to a queued item. The controller uses this handle to perform
16+
targeted operations, such as removing a specific item, without needing to know the queue's internal implementation
17+
details.
18+
19+
## Final State Reporting: Outcomes and Errors
20+
21+
The final state of every request is reported using a combination of a `QueueOutcome` enum and a corresponding `error`.
22+
This provides a clear, machine-inspectable way to understand the result.
23+
24+
* **`QueueOutcome`**: A concise enum summarizing the final result (e.g., `QueueOutcomeDispatched`,
25+
`QueueOutcomeRejectedCapacity`, `QueueOutcomeEvictedDisplaced`). This is ideal for metrics.
26+
27+
* **Errors**: For any non-dispatch outcome, a specific sentinel error is returned. These are nested to provide detailed
28+
context:
29+
* `ErrRejected`: The parent error for any request rejected *before* being enqueued.
30+
* `ErrEvicted`: The parent error for any request removed *after* being enqueued for reasons other than dispatch.
31+
32+
Callers of `FlowController.EnqueueAndWait()` can first use `errors.Is()` to check for the general class of failure
33+
(`ErrRejected` or `ErrEvicted`), and then unwrap the error to find the specific cause (e.g., `ErrQueueAtCapacity`).

pkg/epp/flowcontrol/types/errors.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package types
18+
19+
import (
20+
"errors"
21+
)
22+
23+
// --- High Level Queue Outcome Errors ---
24+
var (
25+
// ErrRejected is a sentinel error indicating a request was rejected by the Flow Controller *before* being formally
26+
// enqueued. Errors returned by `FlowController.EnqueueAndWait()` that signify pre-queue rejection will wrap this
27+
// error.
28+
// Callers should use `errors.Is(err, ErrRejected)` to check for this general class of failure.
29+
ErrRejected = errors.New("request rejected pre-queue")
30+
31+
// ErrEvicted is a sentinel error indicating a request was removed from a queue *after* being successfully enqueued,
32+
// but for reasons other than successful dispatch (e.g., TTL expiry, displacement).
33+
// Errors returned by `FlowController.EnqueueAndWait()` that signify post-queue eviction will wrap this error.
34+
// Callers should use `errors.Is(err, ErrEvicted)` to check for this general class of failure.
35+
ErrEvicted = errors.New("request evicted from queue")
36+
)
37+
38+
// --- Pre-Enqueue Rejection Errors ---
39+
// Errors that can occur before a request is formally added to a `framework.SafeQueue`.
40+
// When returned by `FlowController.EnqueueAndWait()`, these specific errors will typically be wrapped by `ErrRejected`.
41+
var (
42+
// ErrNilRequest indicates that a nil `types.FlowControlRequest` was provided.
43+
ErrNilRequest = errors.New("FlowControlRequest cannot be nil")
44+
45+
// ErrFlowIDEmpty indicates that a flow ID was empty when one was required.
46+
ErrFlowIDEmpty = errors.New("flow ID cannot be empty")
47+
48+
// ErrQueueAtCapacity indicates that a request could not be enqueued because queue capacity limits were met and
49+
// displacement (if applicable) failed to make space.
50+
ErrQueueAtCapacity = errors.New("queue at capacity and displacement failed to make space")
51+
)
52+
53+
// --- Post-Enqueue Eviction Errors ---
54+
// Errors that occur when a request, already in a `framework.SafeQueue`, is removed for reasons other than dispatch.
55+
// When returned by `FlowController.EnqueueAndWait()`, these specific errors will typically be wrapped by `ErrEvicted`.
56+
var (
57+
// ErrTTLExpired indicates a request was evicted from a queue because its effective Time-To-Live expired.
58+
ErrTTLExpired = errors.New("request TTL expired")
59+
60+
// ErrContextCancelled indicates a request was evicted because its associated context (from
61+
// `FlowControlRequest.Context()`) was cancelled. This error typically wraps the underlying `context.Canceled` or
62+
// `context.DeadlineExceeded` error.
63+
ErrContextCancelled = errors.New("request context cancelled")
64+
65+
// ErrDisplaced indicates a request was evicted from a queue because it was chosen as a victim by a displacement
66+
// policy to make space for another request.
67+
ErrDisplaced = errors.New("request displaced")
68+
)
69+
70+
// --- General FlowController Errors ---
71+
// General runtime errors for the Flow Controller.
72+
var (
73+
// ErrFlowControllerShutdown indicates that an operation could not complete or an item was evicted because the Flow
74+
// Controller is shutting down or has stopped.
75+
// When returned by `FlowController.EnqueueAndWait()`, this will be wrapped by `ErrRejected` (if rejection happens
76+
// before internal queuing) or `ErrEvicted` (if eviction happens after internal queuing).
77+
ErrFlowControllerShutdown = errors.New("FlowController is shutting down")
78+
)

pkg/epp/flowcontrol/types/flow.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS ßIS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package types defines the core data structures and service contracts for the Flow Controller system. It establishes
18+
// the "vocabulary" of the system, including the request lifecycle interfaces, final outcomes, and standard error types.
19+
package types
20+
21+
// FlowSpecification defines the configuration of a logical flow, encapsulating its identity and registered priority.
22+
//
23+
// A FlowSpecification acts as the registration key for a flow within the Flow Registry.
24+
type FlowSpecification interface {
25+
// ID returns the unique name or identifier for this flow (e.g., model name, tenant ID), corresponding to the value
26+
// from `FlowControlRequest.FlowID()`.
27+
ID() string
28+
29+
// Priority returns the numerical priority level currently associated with this flow within the Flow Registry.
30+
//
31+
// Convention: Lower numerical values indicate higher priority.
32+
Priority() uint
33+
}

pkg/epp/flowcontrol/types/outcomes.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package types
18+
19+
import "strconv"
20+
21+
// QueueOutcome represents the high-level final state of a request's lifecycle within the Flow Controller.
22+
//
23+
// It is returned by `FlowController.EnqueueAndWait()` along with a corresponding error. This enum is designed to be a
24+
// low-cardinality label ideal for metrics, while the error provides fine-grained details for non-dispatched outcomes.
25+
type QueueOutcome int
26+
27+
const (
28+
// QueueOutcomeDispatched indicates the request was successfully processed by the Flow Controller and unblocked for
29+
// the caller to proceed.
30+
// The associated error from `FlowController.EnqueueAndWait()` will be nil.
31+
QueueOutcomeDispatched QueueOutcome = iota
32+
33+
// --- Pre-Enqueue Rejection Outcomes (request never entered a `framework.SafeQueue`) ---
34+
// For these outcomes, the error from `FlowController.EnqueueAndWait()` will wrap `ErrRejected`.
35+
36+
// QueueOutcomeRejectedCapacity indicates rejection because queue capacity limits were met and displacement (if
37+
// applicable) failed to make space.
38+
// The associated error will wrap `ErrQueueAtCapacity` (and `ErrRejected`).
39+
QueueOutcomeRejectedCapacity
40+
41+
// QueueOutcomeRejectedOther indicates rejection for reasons other than capacity before the request was formally
42+
// enqueued.
43+
// The specific underlying cause can be determined from the associated error (e.g., a nil request, an unregistered
44+
// flow ID, or a controller shutdown), which will be wrapped by `ErrRejected`.
45+
QueueOutcomeRejectedOther
46+
47+
// --- Post-Enqueue Eviction Outcomes (request was in a `framework.SafeQueue` but not dispatched) ---
48+
// For these outcomes, the error from `FlowController.EnqueueAndWait()` will wrap `ErrEvicted`.
49+
50+
// QueueOutcomeEvictedTTL indicates eviction from a queue because the request's effective Time-To-Live expired.
51+
// The associated error will wrap `ErrTTLExpired` (and `ErrEvicted`).
52+
QueueOutcomeEvictedTTL
53+
54+
// QueueOutcomeEvictedContextCancelled indicates eviction from a queue because the request's own context (from
55+
// `FlowControlRequest.Context()`) was cancelled.
56+
// The associated error will wrap `ErrContextCancelled` (which may further wrap the underlying `context.Canceled` or
57+
// `context.DeadlineExceeded` error) (and `ErrEvicted`).
58+
QueueOutcomeEvictedContextCancelled
59+
60+
// QueueOutcomeEvictedDisplaced indicates eviction from a queue to make space for another request due to a
61+
// displacement policy.
62+
// The associated error will wrap `ErrDisplaced` (and `ErrEvicted`).
63+
QueueOutcomeEvictedDisplaced
64+
65+
// QueueOutcomeEvictedOther indicates eviction from a queue for reasons not covered by more specific eviction
66+
// outcomes.
67+
// The specific underlying cause can be determined from the associated error (e.g., a controller shutdown while the
68+
// item was queued), which will be wrapped by `ErrEvicted`.
69+
QueueOutcomeEvictedOther
70+
)
71+
72+
// String returns a human-readable string representation of the QueueOutcome.
73+
func (o QueueOutcome) String() string {
74+
switch o {
75+
case QueueOutcomeDispatched:
76+
return "Dispatched"
77+
case QueueOutcomeRejectedCapacity:
78+
return "RejectedCapacity"
79+
case QueueOutcomeRejectedOther:
80+
return "RejectedOther"
81+
case QueueOutcomeEvictedTTL:
82+
return "EvictedTTL"
83+
case QueueOutcomeEvictedContextCancelled:
84+
return "EvictedContextCancelled"
85+
case QueueOutcomeEvictedDisplaced:
86+
return "EvictedDisplaced"
87+
case QueueOutcomeEvictedOther:
88+
return "EvictedOther"
89+
default:
90+
// Return the integer value for unknown outcomes to aid in debugging.
91+
return "UnknownOutcome(" + strconv.Itoa(int(o)) + ")"
92+
}
93+
}

0 commit comments

Comments
 (0)