Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(policy): Add http protocol configuration #13721

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions policy-controller/core/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub type RouteSet<T> = HashMap<GroupKindNamespaceName, T>;

#[derive(Debug, Clone, PartialEq)]
pub enum AppProtocol {
Http1,
Http2,
Opaque,
Unknown(Arc<str>),
}
Expand All @@ -53,6 +55,8 @@ impl FromStr for AppProtocol {

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let protocol = match s.to_ascii_lowercase().as_str() {
"http" => AppProtocol::Http1,
"kubernetes.io/h2c" => AppProtocol::Http2,
"linkerd.io/tcp" | "linkerd.io/opaque" => AppProtocol::Opaque,
protocol => AppProtocol::Unknown(Arc::from(protocol)),
};
Expand Down
65 changes: 46 additions & 19 deletions policy-controller/grpc/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,37 +372,64 @@ fn to_proto(
) -> outbound::OutboundPolicy {
let backend: outbound::Backend = default_backend(&policy, original_dst);

let accrual = policy.accrual.map(|accrual| outbound::FailureAccrual {
kind: Some(match accrual {
linkerd_policy_controller_core::outbound::FailureAccrual::Consecutive {
max_failures,
backoff,
} => outbound::failure_accrual::Kind::ConsecutiveFailures(
outbound::failure_accrual::ConsecutiveFailures {
max_failures,
backoff: Some(outbound::ExponentialBackoff {
min_backoff: convert_duration("min_backoff", backoff.min_penalty),
max_backoff: convert_duration("max_backoff", backoff.max_penalty),
jitter_ratio: backoff.jitter,
}),
},
),
}),
});

let mut http_routes = policy.http_routes.clone().into_iter().collect::<Vec<_>>();

let kind = match &policy.app_protocol {
Some(AppProtocol::Opaque) => {
outbound::proxy_protocol::Kind::Opaque(outbound::proxy_protocol::Opaque {
routes: vec![default_outbound_opaq_route(backend, &policy.parent_info)],
})
}
Some(AppProtocol::Http1) => {
http_routes.sort_by(timestamp_then_name);
http::http1_only_protocol(
backend,
http_routes.into_iter(),
accrual,
policy.http_retry.clone(),
policy.timeouts.clone(),
allow_l5d_request_headers,
&policy.parent_info,
original_dst,
)
}
Some(AppProtocol::Http2) => {
http_routes.sort_by(timestamp_then_name);
http::http2_only_protocol(
backend,
http_routes.into_iter(),
accrual,
policy.http_retry.clone(),
policy.timeouts.clone(),
allow_l5d_request_headers,
&policy.parent_info,
original_dst,
)
}
None | Some(AppProtocol::Unknown(_)) => {
if let Some(AppProtocol::Unknown(protocol)) = &policy.app_protocol {
tracing::debug!(resource = ?policy.parent_info, port = policy.port.get(), "Unknown appProtocol \"{protocol}\"");
}

let accrual = policy.accrual.map(|accrual| outbound::FailureAccrual {
kind: Some(match accrual {
linkerd_policy_controller_core::outbound::FailureAccrual::Consecutive {
max_failures,
backoff,
} => outbound::failure_accrual::Kind::ConsecutiveFailures(
outbound::failure_accrual::ConsecutiveFailures {
max_failures,
backoff: Some(outbound::ExponentialBackoff {
min_backoff: convert_duration("min_backoff", backoff.min_penalty),
max_backoff: convert_duration("max_backoff", backoff.max_penalty),
jitter_ratio: backoff.jitter,
}),
},
),
}),
});

let mut grpc_routes = policy.grpc_routes.clone().into_iter().collect::<Vec<_>>();
let mut http_routes = policy.http_routes.clone().into_iter().collect::<Vec<_>>();
let mut tls_routes = policy.tls_routes.clone().into_iter().collect::<Vec<_>>();
let mut tcp_routes = policy.tcp_routes.clone().into_iter().collect::<Vec<_>>();

Expand Down
97 changes: 97 additions & 0 deletions policy-controller/grpc/src/outbound/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,103 @@
})
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn http1_only_protocol(
default_backend: outbound::Backend,
routes: impl Iterator<Item = (GroupKindNamespaceName, HttpRoute)>,
accrual: Option<outbound::FailureAccrual>,
service_retry: Option<RouteRetry<HttpRetryCondition>>,
service_timeouts: RouteTimeouts,
allow_l5d_request_headers: bool,
parent_info: &ParentInfo,
original_dst: Option<SocketAddr>,
) -> outbound::proxy_protocol::Kind {
outbound::proxy_protocol::Kind::Http1(outbound::proxy_protocol::Http1 {
routes: base_http_routes(
default_backend,
routes,
service_retry,
service_timeouts,
allow_l5d_request_headers,
parent_info,
original_dst,
),
failure_accrual: accrual.clone(),

Check failure on line 108 in policy-controller/grpc/src/outbound/http.rs

View workflow job for this annotation

GitHub Actions / clippy

error: using `clone` on type `Option<FailureAccrual>` which implements the `Copy` trait --> policy-controller/grpc/src/outbound/http.rs:108:26 | 108 | failure_accrual: accrual.clone(), | ^^^^^^^^^^^^^^^ help: try removing the `clone` call: `accrual` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#clone_on_copy note: the lint level is defined here --> policy-controller/grpc/src/lib.rs:1:9 | 1 | #![deny(warnings, rust_2018_idioms)] | ^^^^^^^^ = note: `#[deny(clippy::clone_on_copy)]` implied by `#[deny(warnings)]`

Check failure on line 108 in policy-controller/grpc/src/outbound/http.rs

View workflow job for this annotation

GitHub Actions / clippy

error: using `clone` on type `Option<FailureAccrual>` which implements the `Copy` trait --> policy-controller/grpc/src/outbound/http.rs:108:26 | 108 | failure_accrual: accrual.clone(), | ^^^^^^^^^^^^^^^ help: try removing the `clone` call: `accrual` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#clone_on_copy note: the lint level is defined here --> policy-controller/grpc/src/lib.rs:1:9 | 1 | #![deny(warnings, rust_2018_idioms)] | ^^^^^^^^ = note: `#[deny(clippy::clone_on_copy)]` implied by `#[deny(warnings)]`
})
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn http2_only_protocol(
default_backend: outbound::Backend,
routes: impl Iterator<Item = (GroupKindNamespaceName, HttpRoute)>,
accrual: Option<outbound::FailureAccrual>,
service_retry: Option<RouteRetry<HttpRetryCondition>>,
service_timeouts: RouteTimeouts,
allow_l5d_request_headers: bool,
parent_info: &ParentInfo,
original_dst: Option<SocketAddr>,
) -> outbound::proxy_protocol::Kind {
outbound::proxy_protocol::Kind::Http2(outbound::proxy_protocol::Http2 {
routes: base_http_routes(
default_backend,
routes,
service_retry,
service_timeouts,
allow_l5d_request_headers,
parent_info,
original_dst,
),
failure_accrual: accrual.clone(),

Check failure on line 133 in policy-controller/grpc/src/outbound/http.rs

View workflow job for this annotation

GitHub Actions / clippy

error: using `clone` on type `Option<FailureAccrual>` which implements the `Copy` trait --> policy-controller/grpc/src/outbound/http.rs:133:26 | 133 | failure_accrual: accrual.clone(), | ^^^^^^^^^^^^^^^ help: try removing the `clone` call: `accrual` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#clone_on_copy

Check failure on line 133 in policy-controller/grpc/src/outbound/http.rs

View workflow job for this annotation

GitHub Actions / clippy

error: using `clone` on type `Option<FailureAccrual>` which implements the `Copy` trait --> policy-controller/grpc/src/outbound/http.rs:133:26 | 133 | failure_accrual: accrual.clone(), | ^^^^^^^^^^^^^^^ help: try removing the `clone` call: `accrual` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#clone_on_copy
})
}

fn base_http_routes(
default_backend: outbound::Backend,
routes: impl Iterator<Item = (GroupKindNamespaceName, HttpRoute)>,
service_retry: Option<RouteRetry<HttpRetryCondition>>,
service_timeouts: RouteTimeouts,
allow_l5d_request_headers: bool,
parent_info: &ParentInfo,
original_dst: Option<SocketAddr>,
) -> Vec<outbound::HttpRoute> {
let mut routes = routes
.map(|(gknn, route)| {
convert_outbound_route(
gknn,
route,
default_backend.clone(),
service_retry.clone(),
service_timeouts.clone(),
allow_l5d_request_headers,
parent_info,
original_dst,
)
})
.collect::<Vec<_>>();

match parent_info {
ParentInfo::Service { .. } => {
if routes.is_empty() {
routes.push(default_outbound_service_route(
default_backend,
service_retry.clone(),
service_timeouts.clone(),
));
}
}
ParentInfo::EgressNetwork { traffic_policy, .. } => {
routes.push(default_outbound_egress_route(
default_backend,
service_retry.clone(),
service_timeouts.clone(),
traffic_policy,
));
}
}

routes
}

#[allow(clippy::too_many_arguments)]
fn convert_outbound_route(
gknn: GroupKindNamespaceName,
Expand Down
40 changes: 40 additions & 0 deletions policy-test/src/outbound_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,46 @@ where
}
}

#[track_caller]
pub fn http1_routes(config: &grpc::outbound::OutboundPolicy) -> &[grpc::outbound::HttpRoute] {
let kind = config
.protocol
.as_ref()
.expect("must have proxy protocol")
.kind
.as_ref()
.expect("must have kind");
if let grpc::outbound::proxy_protocol::Kind::Http1(grpc::outbound::proxy_protocol::Http1 {
routes,
failure_accrual: _,
}) = kind
{
routes
} else {
panic!("proxy protocol must be Grpc; actually got:\n{kind:#?}")
}
}

#[track_caller]
pub fn http2_routes(config: &grpc::outbound::OutboundPolicy) -> &[grpc::outbound::HttpRoute] {
let kind = config
.protocol
.as_ref()
.expect("must have proxy protocol")
.kind
.as_ref()
.expect("must have kind");
if let grpc::outbound::proxy_protocol::Kind::Http2(grpc::outbound::proxy_protocol::Http2 {
routes,
failure_accrual: _,
}) = kind
{
routes
} else {
panic!("proxy protocol must be Grpc; actually got:\n{kind:#?}")
}
}

#[track_caller]
pub fn grpc_routes(config: &grpc::outbound::OutboundPolicy) -> &[grpc::outbound::GrpcRoute] {
let kind = config
Expand Down
75 changes: 74 additions & 1 deletion policy-test/tests/outbound_api_app_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
use linkerd_policy_test::{
assert_resource_meta, create,
outbound_api::{
assert_route_is_default, assert_singleton, retry_watch_outbound_policy, tcp_routes,
assert_route_is_default, assert_singleton, http1_routes, http2_routes,
retry_watch_outbound_policy, tcp_routes,
},
test_route::TestParent,
with_temp_ns,
Expand Down Expand Up @@ -45,3 +46,75 @@

test::<k8s::Service>().await;
}

#[tokio::test(flavor = "current_thread")]
async fn http1_parent() {
async fn test<P: TestParent>() {
tracing::debug!(
parent = %P::kind(&P::DynamicType::default()),
);
with_temp_ns(|client, ns| async move {
let port = 4191;
// Create a parent with no routes.
// let parent = P::create_parent(&client.clone(), &ns).await;
let parent = create(
&client,
P::make_parent_with_protocol(&ns, Some("http".to_string())),
)
.await;

let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await;
let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);

assert_resource_meta(&config.metadata, parent.obj_ref(), port);

let routes = http1_routes(&config);
let route = assert_singleton(routes);
assert_route_is_default::<gateway::HttpRoute>(route, &parent.obj_ref(), port);

Check failure on line 78 in policy-test/tests/outbound_api_app_protocol.rs

View workflow job for this annotation

GitHub Actions / check

error[E0412]: cannot find type `HttpRoute` in module `gateway` --> policy-test/tests/outbound_api_app_protocol.rs:78:48 | 78 | assert_route_is_default::<gateway::HttpRoute>(route, &parent.obj_ref(), port); | ^^^^^^^^^ | ::: /usr/local/cargo/registry/src/index.crates.io-6f17d22bba15001f/gateway-api-0.14.2/src/apis/experimental/httproutes.rs:15:10 | 15 | #[derive(CustomResource, Serialize, Deserialize, Clone, Debug, JsonSchema, Default, PartialEq)] | -------------- similarly named struct `HTTPRoute` defined here | help: a struct with a similar name exists | 78 | assert_route_is_default::<gateway::HTTPRoute>(route, &parent.obj_ref(), port); | ~~~~~~~~~ help: consider importing one of these items | 1 + use crate::k8s::policy::HttpRoute; | 1 + use k8s_gateway_api::HttpRoute; | 1 + use linkerd2_proxy_api::inbound::HttpRoute; | 1 + use linkerd2_proxy_api::outbound::HttpRoute; | and 5 other candidates help: if you import `HttpRoute`, refer to it directly | 78 - assert_route_is_default::<gateway::HttpRoute>(route, &parent.obj_ref(), port); 78 + assert_route_is_default::<HttpRoute>(route, &parent.obj_ref(), port); |
})
.await;
}

test::<k8s::Service>().await;
}

#[tokio::test(flavor = "current_thread")]
async fn http2_parent() {
async fn test<P: TestParent>() {
tracing::debug!(
parent = %P::kind(&P::DynamicType::default()),
);
with_temp_ns(|client, ns| async move {
let port = 4191;
// Create a parent with no routes.
// let parent = P::create_parent(&client.clone(), &ns).await;
let parent = create(
&client,
P::make_parent_with_protocol(&ns, Some("kubernetes.io/h2c".to_string())),
)
.await;

let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await;
let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);

assert_resource_meta(&config.metadata, parent.obj_ref(), port);

let routes = http2_routes(&config);
let route = assert_singleton(routes);
assert_route_is_default::<gateway::HttpRoute>(route, &parent.obj_ref(), port);

Check failure on line 114 in policy-test/tests/outbound_api_app_protocol.rs

View workflow job for this annotation

GitHub Actions / check

error[E0412]: cannot find type `HttpRoute` in module `gateway` --> policy-test/tests/outbound_api_app_protocol.rs:114:48 | 114 | assert_route_is_default::<gateway::HttpRoute>(route, &parent.obj_ref(), port); | ^^^^^^^^^ | ::: /usr/local/cargo/registry/src/index.crates.io-6f17d22bba15001f/gateway-api-0.14.2/src/apis/experimental/httproutes.rs:15:10 | 15 | #[derive(CustomResource, Serialize, Deserialize, Clone, Debug, JsonSchema, Default, PartialEq)] | -------------- similarly named struct `HTTPRoute` defined here | help: a struct with a similar name exists | 114 | assert_route_is_default::<gateway::HTTPRoute>(route, &parent.obj_ref(), port); | ~~~~~~~~~ help: consider importing one of these items | 1 + use crate::k8s::policy::HttpRoute; | 1 + use k8s_gateway_api::HttpRoute; | 1 + use linkerd2_proxy_api::inbound::HttpRoute; | 1 + use linkerd2_proxy_api::outbound::HttpRoute; | and 5 other candidates help: if you import `HttpRoute`, refer to it directly | 114 - assert_route_is_default::<gateway::HttpRoute>(route, &parent.obj_ref(), port); 114 + assert_route_is_default::<HttpRoute>(route, &parent.obj_ref(), port); |
})
.await;
}

test::<k8s::Service>().await;
}
Loading
Loading