Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure build_query_plan() cancels when the request cancels #6840

Merged
merged 4 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changesets/fix_stance_intercom_body_director.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
### Ensure query planning exits when its request times out ([PR #6840](https://github.com/apollographql/router/pull/6840))

When a request times out/is cancelled, resources allocated to that request should be released. Previously, query planning could potentially keep running after the request is cancelled. After this fix, query planning now exits shortly after the request is cancelled. If you need query planning to run longer, you can [increase the request timeout](https://www.apollographql.com/docs/graphos/routing/performance/traffic-shaping#timeouts).

By [@SimonSapin](https://github.com/SimonSapin) and [@sachindshinde](https://github.com/sachindshinde) in https://github.com/apollographql/router/pull/6840
3 changes: 3 additions & 0 deletions apollo-federation/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ pub enum SingleFederationError {
DeferredSubscriptionUnsupported,
#[error("{message}")]
QueryPlanComplexityExceeded { message: String },
#[error("the caller requested cancellation")]
PlanningCancelled,
}

impl SingleFederationError {
Expand Down Expand Up @@ -494,6 +496,7 @@ impl SingleFederationError {
SingleFederationError::QueryPlanComplexityExceeded { .. } => {
ErrorCode::QueryPlanComplexityExceededError
}
SingleFederationError::PlanningCancelled => ErrorCode::Internal,
}
}
}
Expand Down
25 changes: 19 additions & 6 deletions apollo-federation/src/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ pub(crate) use contains::*;
pub(crate) use directive_list::DirectiveList;
pub(crate) use merging::*;
pub(crate) use rebase::*;
#[cfg(test)]
pub(crate) use tests::never_cancel;

pub(crate) const TYPENAME_FIELD: Name = name!("__typename");

Expand Down Expand Up @@ -1541,9 +1543,12 @@ impl SelectionSet {
Ok(())
}

pub(crate) fn expand_all_fragments(&self) -> Result<SelectionSet, FederationError> {
pub(crate) fn expand_all_fragments(
&self,
check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>,
) -> Result<SelectionSet, FederationError> {
let mut expanded_selections = vec![];
SelectionSet::expand_selection_set(&mut expanded_selections, self)?;
SelectionSet::expand_selection_set(&mut expanded_selections, self, check_cancellation)?;

let mut expanded = SelectionSet {
schema: self.schema.clone(),
Expand All @@ -1557,12 +1562,14 @@ impl SelectionSet {
fn expand_selection_set(
destination: &mut Vec<Selection>,
selection_set: &SelectionSet,
check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>,
) -> Result<(), FederationError> {
for value in selection_set.selections.values() {
check_cancellation()?;
match value {
Selection::Field(field_selection) => {
let selections = match &field_selection.selection_set {
Some(s) => Some(s.expand_all_fragments()?),
Some(s) => Some(s.expand_all_fragments(check_cancellation)?),
None => None,
};
destination.push(Selection::from_field(
Expand All @@ -1580,12 +1587,14 @@ impl SelectionSet {
SelectionSet::expand_selection_set(
destination,
&spread_selection.selection_set,
check_cancellation,
)?;
} else {
// convert to inline fragment
let expanded = InlineFragmentSelection::from_fragment_spread_selection(
selection_set.type_position.clone(), // the parent type of this inline selection
spread_selection,
check_cancellation,
)?;
destination.push(Selection::InlineFragment(Arc::new(expanded)));
}
Expand All @@ -1594,7 +1603,9 @@ impl SelectionSet {
destination.push(
InlineFragmentSelection::new(
inline_selection.inline_fragment.clone(),
inline_selection.selection_set.expand_all_fragments()?,
inline_selection
.selection_set
.expand_all_fragments(check_cancellation)?,
)
.into(),
);
Expand Down Expand Up @@ -2716,6 +2727,7 @@ impl InlineFragmentSelection {
pub(crate) fn from_fragment_spread_selection(
parent_type_position: CompositeTypeDefinitionPosition,
fragment_spread_selection: &Arc<FragmentSpreadSelection>,
check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>,
) -> Result<InlineFragmentSelection, FederationError> {
let schema = fragment_spread_selection.spread.schema.schema();
for directive in fragment_spread_selection.spread.directives.iter() {
Expand Down Expand Up @@ -2753,7 +2765,7 @@ impl InlineFragmentSelection {
},
fragment_spread_selection
.selection_set
.expand_all_fragments()?,
.expand_all_fragments(check_cancellation)?,
))
}

Expand Down Expand Up @@ -3749,10 +3761,11 @@ pub(crate) fn normalize_operation(
named_fragments: NamedFragments,
schema: &ValidFederationSchema,
interface_types_with_interface_objects: &IndexSet<InterfaceTypeDefinitionPosition>,
check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>,
) -> Result<Operation, FederationError> {
let mut normalized_selection_set =
SelectionSet::from_selection_set(&operation.selection_set, &named_fragments, schema)?;
normalized_selection_set = normalized_selection_set.expand_all_fragments()?;
normalized_selection_set = normalized_selection_set.expand_all_fragments(check_cancellation)?;
// We clear up the fragments since we've expanded all.
// Also note that expanding fragment usually generate unnecessary fragments/inefficient
// selections, so it basically always make sense to flatten afterwards. Besides, fragment
Expand Down
39 changes: 38 additions & 1 deletion apollo-federation/src/operation/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use super::SelectionKey;
use super::SelectionSet;
use super::normalize_operation;
use crate::error::FederationError;
use crate::error::SingleFederationError;
use crate::query_graph::graph_path::OpPathElement;
use crate::schema::ValidFederationSchema;
use crate::schema::position::InterfaceTypeDefinitionPosition;
Expand Down Expand Up @@ -60,7 +61,19 @@ pub(super) fn parse_and_expand(
.expect("must have anonymous operation");
let fragments = NamedFragments::new(&doc.fragments, schema);

normalize_operation(operation, fragments, schema, &Default::default())
normalize_operation(
operation,
fragments,
schema,
&Default::default(),
&never_cancel,
)
}

/// The `normalize_operation()` function has a `check_cancellation` parameter that we'll want to
/// configure to never cancel during tests. We create a convenience function here for that purpose.
pub(crate) fn never_cancel() -> Result<(), SingleFederationError> {
Ok(())
}

#[test]
Expand Down Expand Up @@ -100,6 +113,7 @@ type Foo {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
normalized_operation.named_fragments = Default::default();
Expand Down Expand Up @@ -154,6 +168,7 @@ type Foo {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
normalized_operation.named_fragments = Default::default();
Expand Down Expand Up @@ -196,6 +211,7 @@ type Query {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();

Expand Down Expand Up @@ -231,6 +247,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test {
Expand Down Expand Up @@ -274,6 +291,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test($skipIf: Boolean!) {
Expand Down Expand Up @@ -320,6 +338,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test($skipIf: Boolean!) {
Expand Down Expand Up @@ -364,6 +383,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test($skipIf: Boolean!) {
Expand Down Expand Up @@ -410,6 +430,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test($skip1: Boolean!, $skip2: Boolean!) {
Expand Down Expand Up @@ -461,6 +482,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test {
Expand Down Expand Up @@ -527,6 +549,7 @@ type V {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test {
Expand Down Expand Up @@ -586,6 +609,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test {
Expand Down Expand Up @@ -632,6 +656,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test($skipIf: Boolean!) {
Expand Down Expand Up @@ -682,6 +707,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test($skipIf: Boolean!) {
Expand Down Expand Up @@ -730,6 +756,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test($skipIf: Boolean!) {
Expand Down Expand Up @@ -778,6 +805,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test($skip1: Boolean!, $skip2: Boolean!) {
Expand Down Expand Up @@ -830,6 +858,7 @@ type T {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test {
Expand Down Expand Up @@ -898,6 +927,7 @@ type V {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query Test {
Expand Down Expand Up @@ -944,6 +974,7 @@ type Foo {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query TestQuery {
Expand Down Expand Up @@ -983,6 +1014,7 @@ type Foo {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
let expected = r#"query TestQuery {
Expand Down Expand Up @@ -1033,6 +1065,7 @@ scalar FieldSet
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&interface_objects,
&never_cancel,
)
.unwrap();
let expected = r#"query TestQuery {
Expand Down Expand Up @@ -1172,6 +1205,7 @@ mod make_selection_tests {
Default::default(),
&schema,
&Default::default(),
&never_cancel,
)
.unwrap();

Expand Down Expand Up @@ -1271,6 +1305,7 @@ mod lazy_map_tests {
Default::default(),
&schema,
&Default::default(),
&never_cancel,
)
.unwrap();

Expand Down Expand Up @@ -1329,6 +1364,7 @@ mod lazy_map_tests {
Default::default(),
&schema,
&Default::default(),
&never_cancel,
)
.unwrap();

Expand Down Expand Up @@ -1534,6 +1570,7 @@ fn test_expand_all_fragments1() {
NamedFragments::new(&executable_document.fragments, &schema),
&schema,
&IndexSet::default(),
&never_cancel,
)
.unwrap();
normalized_operation.named_fragments = Default::default();
Expand Down
Loading