Skip to content

Commit

Permalink
Ensure build_query_plan() cancels when the request cancels (#6840)
Browse files Browse the repository at this point in the history
When a request is cancelled, resources (compute time/memory) allocated to that request should be released. However, query planning is an exception, and currently keeps running after the request is cancelled. This PR adds cooperative cancellation to `build_query_plan()`, to ensure that when the request is cancelled, query planning errors shortly after.

Co-authored-by: Sachin D. Shinde <[email protected]>
(cherry picked from commit 2c554fc)

# Conflicts:
#	apollo-federation/src/query_graph/path_tree.rs
#	apollo-router/src/compute_job.rs
#	apollo-router/src/introspection.rs
#	apollo-router/src/query_planner/query_planner_service.rs
#	apollo-router/src/services/layers/query_analysis.rs
  • Loading branch information
SimonSapin committed Mar 3, 2025
1 parent a79e3fe commit 4af2014
Show file tree
Hide file tree
Showing 15 changed files with 291 additions and 24 deletions.
5 changes: 5 additions & 0 deletions .changesets/fix_stance_intercom_body_director.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
### Ensure query planning exits when its request times out ([PR #6840](https://github.com/apollographql/router/pull/6840))

When a request times out/is cancelled, resources allocated to that request should be released. Previously, query planning could potentially keep running after the request is cancelled. After this fix, query planning now exits shortly after the request is cancelled. If you need query planning to run longer, you can [increase the request timeout](https://www.apollographql.com/docs/graphos/routing/performance/traffic-shaping#timeouts).

By [@SimonSapin](https://github.com/SimonSapin) and [@sachindshinde](https://github.com/sachindshinde) in https://github.com/apollographql/router/pull/6840
3 changes: 3 additions & 0 deletions apollo-federation/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ pub enum SingleFederationError {
DeferredSubscriptionUnsupported,
#[error("{message}")]
QueryPlanComplexityExceeded { message: String },
#[error("the caller requested cancellation")]
PlanningCancelled,
}

impl SingleFederationError {
Expand Down Expand Up @@ -492,6 +494,7 @@ impl SingleFederationError {
SingleFederationError::QueryPlanComplexityExceeded { .. } => {
ErrorCode::QueryPlanComplexityExceededError
}
SingleFederationError::PlanningCancelled => ErrorCode::Internal,
}
}
}
Expand Down
25 changes: 19 additions & 6 deletions apollo-federation/src/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ pub(crate) use contains::*;
pub(crate) use directive_list::DirectiveList;
pub(crate) use merging::*;
pub(crate) use rebase::*;
#[cfg(test)]
pub(crate) use tests::never_cancel;

pub(crate) const TYPENAME_FIELD: Name = name!("__typename");

Expand Down Expand Up @@ -1541,9 +1543,12 @@ impl SelectionSet {
Ok(())
}

pub(crate) fn expand_all_fragments(&self) -> Result<SelectionSet, FederationError> {
pub(crate) fn expand_all_fragments(
&self,
check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>,
) -> Result<SelectionSet, FederationError> {
let mut expanded_selections = vec![];
SelectionSet::expand_selection_set(&mut expanded_selections, self)?;
SelectionSet::expand_selection_set(&mut expanded_selections, self, check_cancellation)?;

let mut expanded = SelectionSet {
schema: self.schema.clone(),
Expand All @@ -1557,12 +1562,14 @@ impl SelectionSet {
fn expand_selection_set(
destination: &mut Vec<Selection>,
selection_set: &SelectionSet,
check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>,
) -> Result<(), FederationError> {
for value in selection_set.selections.values() {
check_cancellation()?;
match value {
Selection::Field(field_selection) => {
let selections = match &field_selection.selection_set {
Some(s) => Some(s.expand_all_fragments()?),
Some(s) => Some(s.expand_all_fragments(check_cancellation)?),
None => None,
};
destination.push(Selection::from_field(
Expand All @@ -1580,12 +1587,14 @@ impl SelectionSet {
SelectionSet::expand_selection_set(
destination,
&spread_selection.selection_set,
check_cancellation,
)?;
} else {
// convert to inline fragment
let expanded = InlineFragmentSelection::from_fragment_spread_selection(
selection_set.type_position.clone(), // the parent type of this inline selection
spread_selection,
check_cancellation,
)?;
destination.push(Selection::InlineFragment(Arc::new(expanded)));
}
Expand All @@ -1594,7 +1603,9 @@ impl SelectionSet {
destination.push(
InlineFragmentSelection::new(
inline_selection.inline_fragment.clone(),
inline_selection.selection_set.expand_all_fragments()?,
inline_selection
.selection_set
.expand_all_fragments(check_cancellation)?,
)
.into(),
);
Expand Down Expand Up @@ -2718,6 +2729,7 @@ impl InlineFragmentSelection {
pub(crate) fn from_fragment_spread_selection(
parent_type_position: CompositeTypeDefinitionPosition,
fragment_spread_selection: &Arc<FragmentSpreadSelection>,
check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>,
) -> Result<InlineFragmentSelection, FederationError> {
let schema = fragment_spread_selection.spread.schema.schema();
for directive in fragment_spread_selection.spread.directives.iter() {
Expand Down Expand Up @@ -2755,7 +2767,7 @@ impl InlineFragmentSelection {
},
fragment_spread_selection
.selection_set
.expand_all_fragments()?,
.expand_all_fragments(check_cancellation)?,
))
}

Expand Down Expand Up @@ -3751,10 +3763,11 @@ pub(crate) fn normalize_operation(
named_fragments: NamedFragments,
schema: &ValidFederationSchema,
interface_types_with_interface_objects: &IndexSet<InterfaceTypeDefinitionPosition>,
check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>,
) -> Result<Operation, FederationError> {
let mut normalized_selection_set =
SelectionSet::from_selection_set(&operation.selection_set, &named_fragments, schema)?;
normalized_selection_set = normalized_selection_set.expand_all_fragments()?;
normalized_selection_set = normalized_selection_set.expand_all_fragments(check_cancellation)?;
// We clear up the fragments since we've expanded all.
// Also note that expanding fragment usually generate unnecessary fragments/inefficient
// selections, so it basically always make sense to flatten afterwards. Besides, fragment
Expand Down
39 changes: 38 additions & 1 deletion apollo-federation/src/operation/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use super::Selection;
use super::SelectionKey;
use super::SelectionSet;
use crate::error::FederationError;
use crate::error::SingleFederationError;
use crate::query_graph::graph_path::OpPathElement;
use crate::schema::position::InterfaceTypeDefinitionPosition;
use crate::schema::position::ObjectTypeDefinitionPosition;
Expand Down Expand Up @@ -60,7 +61,19 @@ pub(super) fn parse_and_expand(
.expect("must have anonymous operation");
let fragments = NamedFragments::new(&doc.fragments, schema);

normalize_operation(operation, fragments, schema, &Default::default())
normalize_operation(
operation,
fragments,
schema,
&Default::default(),
&never_cancel,
)
}

/// The `normalize_operation()` function has a `check_cancellation` parameter that we'll want to
/// configure to never cancel during tests. We create a convenience function here for that purpose.
pub(crate) fn never_cancel() -> Result<(), SingleFederationError> {
Ok(())
}

#[test]
Expand Down Expand Up @@ -100,6 +113,7 @@ type Foo {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
normalized_operation.named_fragments = Default::default();
Expand Down Expand Up @@ -154,6 +168,7 @@ type Foo {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
normalized_operation.named_fragments = Default::default();
Expand Down Expand Up @@ -196,6 +211,7 @@ type Query {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();

Expand Down Expand Up @@ -231,6 +247,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test {
Expand Down Expand Up @@ -274,6 +291,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test($skipIf: Boolean!) {
Expand Down Expand Up @@ -320,6 +338,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test($skipIf: Boolean!) {
Expand Down Expand Up @@ -364,6 +383,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test($skipIf: Boolean!) {
Expand Down Expand Up @@ -410,6 +430,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test($skip1: Boolean!, $skip2: Boolean!) {
Expand Down Expand Up @@ -461,6 +482,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test {
Expand Down Expand Up @@ -527,6 +549,7 @@ type V {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test {
Expand Down Expand Up @@ -586,6 +609,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test {
Expand Down Expand Up @@ -632,6 +656,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test($skipIf: Boolean!) {
Expand Down Expand Up @@ -682,6 +707,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test($skipIf: Boolean!) {
Expand Down Expand Up @@ -730,6 +756,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test($skipIf: Boolean!) {
Expand Down Expand Up @@ -778,6 +805,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test($skip1: Boolean!, $skip2: Boolean!) {
Expand Down Expand Up @@ -830,6 +858,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test {
Expand Down Expand Up @@ -898,6 +927,7 @@ type V {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test {
Expand Down Expand Up @@ -944,6 +974,7 @@ type Foo {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query TestQuery {
Expand Down Expand Up @@ -983,6 +1014,7 @@ type Foo {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query TestQuery {
Expand Down Expand Up @@ -1033,6 +1065,7 @@ scalar FieldSet
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&interface_objects,
&never_cancel,
)
.unwrap();
let expected = r#"query TestQuery {
Expand Down Expand Up @@ -1172,6 +1205,7 @@ mod make_selection_tests {
Default::default(),
&schema,
&Default::default(),
&never_cancel,
)
.unwrap();

Expand Down Expand Up @@ -1271,6 +1305,7 @@ mod lazy_map_tests {
Default::default(),
&schema,
&Default::default(),
&never_cancel,
)
.unwrap();

Expand Down Expand Up @@ -1329,6 +1364,7 @@ mod lazy_map_tests {
Default::default(),
&schema,
&Default::default(),
&never_cancel,
)
.unwrap();

Expand Down Expand Up @@ -1534,6 +1570,7 @@ fn test_expand_all_fragments1() {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
normalized_operation.named_fragments = Default::default();
Expand Down
Loading

0 comments on commit 4af2014

Please sign in to comment.