Skip to content

feat(flowcontrol): Add Foundational Types and Architecture #997

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions pkg/epp/flowcontrol/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Flow Control Module

## Introduction

In a multi-tenant, heterogeneous inference serving environment, managing diverse SLOs and fairness requirements is
critical. Today, the serving stack often relies on a simple "best-effort" or FIFO (First-In, First-Out) basis for
handling requests. This is insufficient and leads to significant problems:

* **Head-of-Line Blocking**: A long-running, low-priority request can block short, high-priority requests, violating
SLOs.
* **Lack of Predictability**: Without proper queuing and prioritization, it's impossible to provide predictable latency
guarantees to different tenants.
* **Inability to Handle Saturation**: Under heavy load, the system has no graceful way to manage overload, leading to
cascading failures instead of controlled degradation.

The Flow Controller is a sophisticated library designed to solve these problems. It acts as a crucial gatekeeper that
decides *if* and *when* a request should proceed to be scheduled. Its primary mission is to enable predictable, fair,
and efficient utilization of shared backend resources by enforcing prioritization, applying fairness policies, managing
request queuing under saturation, and orchestrating displacement (the eviction of lower-priority queued items to make
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use the word shedding instead of eviction

Copy link
Contributor Author

@LukeAVanDrie LukeAVanDrie Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I named the specific extension point "displacement". I documented the justification in: https://github.com/LukeAVanDrie/gateway-api-inference-extension/blob/flow-control/pkg/epp/flowcontrol/framework/README.md#terminology-dispatch-vs-displacement.

Eviction, preemption, and shedding were also considered.

space for higher-priority ones).

It is designed for extensibility, allowing custom logic for policies and queuing mechanisms to be plugged into a robust,
high-performance orchestration engine.

### Role in the Gateway API Inference Extension
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend to define goals and non-goals, here is my rough list:

Goals:

  • Enable priority and fairness across workloads.
  • Optimize request scheduling by shifting a portion of request queueing from the model server to a centralized queue.
  • O(seconds) average and O(single digit minutes) tail queueing time

Non-goals:

  • Persistence: Queueing is handled in the endpoint picker's memory.
  • Scale: Scale is limited by available memory and number of ext-proc connections of the endpoint picker and L7LB. While they can horizontally scale, maintaining connections for many minutes to hours is not a goal.
  • A substitute for a message queue: The queue manages requests with open connections via the L7LB and is not intended for asynchronous request handling.


Within the Gateway API Inference Extension's Endpoint Picker (EPP), the Flow Controller acts as a crucial gatekeeper
between the Routing and Scheduling layers. It decides *if* and *when* a request, already assigned to a logical flow
(e.g., a specific workload or tenant), should proceed to be scheduled onto a backend resource. It is the primary
mechanism for managing diverse SLOs, ensuring fairness among competing workloads, and maintaining system stability under
high load.

### High Level Architecture

The following diagram illustrates the high-level dependency model and request flow for the system. It shows how
concurrent client requests are managed by the central `FlowController`, which in turn relies on a set of decoupled
components to make its decisions. Each component package in this module will contain its own more detailed architectural
diagrams.

```mermaid
graph LR
%% Style Definitions
classDef default fill:#fff,stroke:#333,stroke-width:1.5px,color:#000;
classDef client fill:#dcfce7,stroke:#333;
classDef system_entry fill:#fef9c3,stroke:#333;
classDef downstream_ok fill:#dbeafe,stroke:#333;
classDef downstream_err fill:#fee2e2,stroke:#333;

%% Client Goroutines (Fan-In)
subgraph Client Goroutines
direction TB
R1(Goroutine 1);
R2(Goroutine N);
end

%% Flow Control System
subgraph Flow Control System
C{Flow Controller Engine};

subgraph Internal Interactions
direction LR
D(Ports) -- "abstracts state" --> E(Flow Registry);
D -- "abstracts load" --> SD(Saturation Detector);
E -- "configures" --> F(Framework);
F -- "defines" --> P(Plugins: Queues & Policies);
end

C -- "Orchestrates via<br>abstractions" --> D;
end

%% Downstream Actions (Fan-Out)
subgraph Downstream Actions
direction TB
A1(Outcome: Dispatched<br>Proceed to Scheduler);
A2(Outcome: Rejected<br>Return Error);
end

%% Connections
R1 -- "calls & blocks" --> C;
R2 -- "calls & blocks" --> C;
C -- "unblocks 'goroutine 1'" --> A1;
C -- "unblocks 'goroutine N'" --> A2;

%% Apply Classes
class R1,R2 client;
class C system_entry;
class A1 downstream_ok;
class A2 downstream_err;
class D,E,F,P,SD default;
```

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should clearly distinguish between request flow (the top one?) and configuration flow (the bottom one?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, I will add some controller (e.g., a k8s operator) that configures the Flow Registry.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to completely detach from epp, so I don't think we want to describe a generic k8s operator that does this.

I am just asking to distinguish between the arrows that are describing a request flow (traversed for each request) and the ones that are configuration (registering plugins and whatnot). I would split this actually into two diagrams if necessary. I would like to see the data flow separately from the configuration flow

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to see the data flow separately from the configuration flow

++ they are different concerns & should be able to be reasoned about seperately

## Architectural Pillars

The Flow Controller framework is built on several key components that work in concert. This architecture is designed to
be highly modular and scalable, with clear separation of concerns. For a deep dive into the specific design choices and
their justifications, please refer to the detailed documentation within the relevant sub-packages.

1. **The `FlowController` Engine (`./controller`)**: The central, sharded orchestrator responsible for the main request
processing loop. It manages a pool of workers that distribute incoming requests, apply policies, and dispatch
requests to the backends. Its design focuses on high throughput and backpressure.

2. **Pluggable `Policy` Framework (`./framework`)**: This defines the core interfaces for all pluggable logic. It
features a two-tier policy system for `InterFlow` (decisions *between* different flows) and `IntraFlow`
(decisions *within* a single flow) logic, covering both request dispatch and displacement.

3. **Extensible `SafeQueue` System (`./framework`)**: This defines the `framework.SafeQueue` interface for
concurrent-safe request storage. It uses a `QueueCapability` system that allows for diverse and extensible queue
implementations (e.g., FIFO, Priority Heap) while maintaining a stable interface.

4. **The `FlowRegistry` (`./registry`, `./ports`)**: This is the stateful control plane of the system. It manages the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is a port in this context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using a "Ports and Adapters" (or "Hexagonal") architecture pattern here. The ports in this flow control system are documented here: https://github.com/LukeAVanDrie/gateway-api-inference-extension/blob/flow-control/pkg/epp/flowcontrol/ports/README.md.

configuration and lifecycle of all flows, policies, and queues. It presents a sharded view of its state to the
`FlowController` workers to enable parallel operation with minimal lock contention.

5. **Core Types and Service Ports (`./types`, `./ports`)**: These packages define the foundational data structures
(e.g., `FlowControlRequest`), errors, and service interfaces that decouple the engine from its dependencies,
following a "Ports and Adapters" architectural style.
33 changes: 33 additions & 0 deletions pkg/epp/flowcontrol/types/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Flow Control Core Types
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend having this documentation in the code as comments, I worry that as the code evolves, such detailed documentation will quickly go out of sync.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++.
this PR includes a lot of documentation.
I strongly recommend leaving only the main README and remove all the smaller docs.
two main patterns I recommend to adopt:

  • as Abdullah mentioned, document these kind of things in the code as comments.
  • write your code as a self explainable code as much as possible. that means, using meaningful function names and variable names, structuring your code in a way that is easy to follow (and maintain), etc.

such a lot of documentation on multiple markdown files usually gets out of sync very quickly.


This package defines the fundamental data structures, interfaces, and errors that form the vocabulary of the Flow
Control system. It establishes the core concepts of the request lifecycle and its final, reportable outcomes.

## Request Lifecycle Interfaces

A request's journey through the Flow Controller is represented by a series of interfaces that define its state as it
moves through the system:

1. **`FlowControlRequest`**: The initial, "raw" contract for an incoming request. It carries the essential data
provided by the client, such as its `FlowID` and `ByteSize`.
2. **`QueueItemAccessor`**: The internal, enriched, and read-only view of a request once it has been accepted by the
controller. This interface is the primary means by which policy plugins inspect items.
3. **`QueueItemHandle`**: An opaque, queue-specific handle to a queued item. The controller uses this handle to perform
targeted operations, such as removing a specific item, without needing to know the queue's internal implementation
details.

## Final State Reporting: Outcomes and Errors

The final state of every request is reported using a combination of a `QueueOutcome` enum and a corresponding `error`.
This provides a clear, machine-inspectable way to understand the result.

* **`QueueOutcome`**: A concise enum summarizing the final result (e.g., `QueueOutcomeDispatched`,
`QueueOutcomeRejectedCapacity`, `QueueOutcomeEvictedDisplaced`). This is ideal for metrics.

* **Errors**: For any non-dispatch outcome, a specific sentinel error is returned. These are nested to provide detailed
context:
* `ErrRejected`: The parent error for any request rejected *before* being enqueued.
* `ErrEvicted`: The parent error for any request removed *after* being enqueued for reasons other than dispatch.

Callers of `FlowController.EnqueueAndWait()` can first use `errors.Is()` to check for the general class of failure
(`ErrRejected` or `ErrEvicted`), and then unwrap the error to find the specific cause (e.g., `ErrQueueAtCapacity`).
78 changes: 78 additions & 0 deletions pkg/epp/flowcontrol/types/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package types

import (
"errors"
)

// --- High Level Queue Outcome Errors ---
var (
// ErrRejected is a sentinel error indicating a request was rejected by the Flow Controller *before* being formally
// enqueued. Errors returned by `FlowController.EnqueueAndWait()` that signify pre-queue rejection will wrap this
// error.
// Callers should use `errors.Is(err, ErrRejected)` to check for this general class of failure.
ErrRejected = errors.New("request rejected pre-queue")

// ErrEvicted is a sentinel error indicating a request was removed from a queue *after* being successfully enqueued,
// but for reasons other than successful dispatch (e.g., TTL expiry, displacement).
// Errors returned by `FlowController.EnqueueAndWait()` that signify post-queue eviction will wrap this error.
// Callers should use `errors.Is(err, ErrEvicted)` to check for this general class of failure.
ErrEvicted = errors.New("request evicted from queue")
)

// --- Pre-Enqueue Rejection Errors ---
// Errors that can occur before a request is formally added to a `framework.SafeQueue`.
// When returned by `FlowController.EnqueueAndWait()`, these specific errors will typically be wrapped by `ErrRejected`.
var (
// ErrNilRequest indicates that a nil `types.FlowControlRequest` was provided.
ErrNilRequest = errors.New("FlowControlRequest cannot be nil")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any use case where request is nil? isn't this verified in higher layers before FlowControl?
I understand your code makes no assumptions on input validity, but we need to decide when is it ok to remove obvious checks.. if we initialize the request in GIE, then it's probably won't be nil :)


// ErrFlowIDEmpty indicates that a flow ID was empty when one was required.
ErrFlowIDEmpty = errors.New("flow ID cannot be empty")

// ErrQueueAtCapacity indicates that a request could not be enqueued because queue capacity limits were met and
// displacement (if applicable) failed to make space.
ErrQueueAtCapacity = errors.New("queue at capacity and displacement failed to make space")
)

// --- Post-Enqueue Eviction Errors ---
// Errors that occur when a request, already in a `framework.SafeQueue`, is removed for reasons other than dispatch.
// When returned by `FlowController.EnqueueAndWait()`, these specific errors will typically be wrapped by `ErrEvicted`.
var (
// ErrTTLExpired indicates a request was evicted from a queue because its effective Time-To-Live expired.
ErrTTLExpired = errors.New("request TTL expired")

// ErrContextCancelled indicates a request was evicted because its associated context (from
// `FlowControlRequest.Context()`) was cancelled. This error typically wraps the underlying `context.Canceled` or
// `context.DeadlineExceeded` error.
ErrContextCancelled = errors.New("request context cancelled")

// ErrDisplaced indicates a request was evicted from a queue because it was chosen as a victim by a displacement
// policy to make space for another request.
ErrDisplaced = errors.New("request displaced")
)

// --- General FlowController Errors ---
// General runtime errors for the Flow Controller.
var (
// ErrFlowControllerShutdown indicates that an operation could not complete or an item was evicted because the Flow
// Controller is shutting down or has stopped.
// When returned by `FlowController.EnqueueAndWait()`, this will be wrapped by `ErrRejected` (if rejection happens
// before internal queuing) or `ErrEvicted` (if eviction happens after internal queuing).
ErrFlowControllerShutdown = errors.New("FlowController is shutting down")
)
33 changes: 33 additions & 0 deletions pkg/epp/flowcontrol/types/flow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package types defines the core data structures and service contracts for the Flow Controller system. It establishes
// the "vocabulary" of the system, including the request lifecycle interfaces, final outcomes, and standard error types.
package types

// FlowSpecification defines the configuration of a logical flow, encapsulating its identity and registered priority.
//
// A FlowSpecification acts as the registration key for a flow within the Flow Registry.
type FlowSpecification interface {
Copy link
Contributor

@nirrozenbaum nirrozenbaum Jun 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why not just Flow?

// ID returns the unique name or identifier for this flow (e.g., model name, tenant ID), corresponding to the value
// from `FlowControlRequest.FlowID()`.
ID() string

// Priority returns the numerical priority level currently associated with this flow within the Flow Registry.
//
// Convention: Lower numerical values indicate higher priority.
Priority() uint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great, so we are not limited to discrete values, correct?

Copy link
Contributor Author

@LukeAVanDrie LukeAVanDrie Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep; however, the priority bands (defining the possible values of Priority()) are startup-time configured in the Flow Registry. Most of the system supports dynamic updates, just not this.

Flows can migrate between priorities at runtime, you just cannot change what the priority options are.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we have this limitation?

}
93 changes: 93 additions & 0 deletions pkg/epp/flowcontrol/types/outcomes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package types

import "strconv"

// QueueOutcome represents the high-level final state of a request's lifecycle within the Flow Controller.
//
// It is returned by `FlowController.EnqueueAndWait()` along with a corresponding error. This enum is designed to be a
// low-cardinality label ideal for metrics, while the error provides fine-grained details for non-dispatched outcomes.
type QueueOutcome int

const (
// QueueOutcomeDispatched indicates the request was successfully processed by the Flow Controller and unblocked for
// the caller to proceed.
// The associated error from `FlowController.EnqueueAndWait()` will be nil.
QueueOutcomeDispatched QueueOutcome = iota

// --- Pre-Enqueue Rejection Outcomes (request never entered a `framework.SafeQueue`) ---
// For these outcomes, the error from `FlowController.EnqueueAndWait()` will wrap `ErrRejected`.

// QueueOutcomeRejectedCapacity indicates rejection because queue capacity limits were met and displacement (if
// applicable) failed to make space.
// The associated error will wrap `ErrQueueAtCapacity` (and `ErrRejected`).
QueueOutcomeRejectedCapacity

// QueueOutcomeRejectedOther indicates rejection for reasons other than capacity before the request was formally
// enqueued.
// The specific underlying cause can be determined from the associated error (e.g., a nil request, an unregistered
// flow ID, or a controller shutdown), which will be wrapped by `ErrRejected`.
QueueOutcomeRejectedOther

// --- Post-Enqueue Eviction Outcomes (request was in a `framework.SafeQueue` but not dispatched) ---
// For these outcomes, the error from `FlowController.EnqueueAndWait()` will wrap `ErrEvicted`.

// QueueOutcomeEvictedTTL indicates eviction from a queue because the request's effective Time-To-Live expired.
// The associated error will wrap `ErrTTLExpired` (and `ErrEvicted`).
QueueOutcomeEvictedTTL

// QueueOutcomeEvictedContextCancelled indicates eviction from a queue because the request's own context (from
// `FlowControlRequest.Context()`) was cancelled.
// The associated error will wrap `ErrContextCancelled` (which may further wrap the underlying `context.Canceled` or
// `context.DeadlineExceeded` error) (and `ErrEvicted`).
QueueOutcomeEvictedContextCancelled

// QueueOutcomeEvictedDisplaced indicates eviction from a queue to make space for another request due to a
// displacement policy.
// The associated error will wrap `ErrDisplaced` (and `ErrEvicted`).
QueueOutcomeEvictedDisplaced

// QueueOutcomeEvictedOther indicates eviction from a queue for reasons not covered by more specific eviction
// outcomes.
// The specific underlying cause can be determined from the associated error (e.g., a controller shutdown while the
// item was queued), which will be wrapped by `ErrEvicted`.
QueueOutcomeEvictedOther
)

// String returns a human-readable string representation of the QueueOutcome.
func (o QueueOutcome) String() string {
switch o {
case QueueOutcomeDispatched:
return "Dispatched"
case QueueOutcomeRejectedCapacity:
return "RejectedCapacity"
case QueueOutcomeRejectedOther:
return "RejectedOther"
case QueueOutcomeEvictedTTL:
return "EvictedTTL"
case QueueOutcomeEvictedContextCancelled:
return "EvictedContextCancelled"
case QueueOutcomeEvictedDisplaced:
return "EvictedDisplaced"
case QueueOutcomeEvictedOther:
return "EvictedOther"
default:
// Return the integer value for unknown outcomes to aid in debugging.
return "UnknownOutcome(" + strconv.Itoa(int(o)) + ")"
}
}
Loading