diff --git a/src/proxy.rs b/src/proxy.rs index 5a1336434..408cf441e 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -446,6 +446,12 @@ pub enum Error { #[error("unknown waypoint: {0}")] UnknownWaypoint(String), + #[error("no service or workload for hostname: {0}")] + NoHostname(String), + + #[error("no valid authority pseudo header: {0}")] + NoValidAuthority(String), + #[error("no valid routing destination for workload: {0}")] NoValidDestination(Box), @@ -485,12 +491,16 @@ pub enum Error { DnsEmpty, } +// Custom TLV for proxy protocol for the identity of the source const PROXY_PROTOCOL_AUTHORITY_TLV: u8 = 0xD0; +// Custom TLV for including the svc hostname in the proxy protocol header +const PROXY_PROTOCOL_SVC_HOSTNAME_TLV: u8 = 0xD1; pub async fn write_proxy_protocol( stream: &mut TcpStream, addresses: T, src_id: Option, + hostname_addr: Option, ) -> io::Result<()> where T: Into + std::fmt::Debug, @@ -498,6 +508,10 @@ where use ppp::v2::{Builder, Command, Protocol, Version}; use tokio::io::AsyncWriteExt; + // When the hbone_addr populated from the authority header contains a svc hostname, the address included + // with respect to the hbone_addr is the SocketAddr :. + // This is done since addresses doesn't support hostnames. + // See ref https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt debug!("writing proxy protocol addresses: {:?}", addresses); let mut builder = Builder::with_addresses(Version::Two | Command::Proxy, Protocol::Stream, addresses); @@ -506,6 +520,11 @@ where builder = builder.write_tlv(PROXY_PROTOCOL_AUTHORITY_TLV, id.to_string().as_bytes())?; } + // svc_hostname is None when the hbone_addr does not contain a svc hostname. + if let Some(svc_host) = hostname_addr { + builder = builder.write_tlv(PROXY_PROTOCOL_SVC_HOSTNAME_TLV, svc_host.as_bytes())?; + } + let header = builder.build()?; stream.write_all(&header).await } @@ -782,6 +801,82 @@ pub fn parse_forwarded_host(input: &str) -> Option { .filter(|host| !host.is_empty()) } +#[derive(Debug, Clone)] +pub enum HboneAddress { + SocketAddr(SocketAddr), + SvcHostname(Strng, u16), +} + +impl HboneAddress { + pub fn port(&self) -> u16 { + match self { + HboneAddress::SocketAddr(s) => s.port(), + HboneAddress::SvcHostname(_, p) => *p, + } + } + + pub fn ip(&self) -> Option { + match self { + HboneAddress::SocketAddr(s) => Some(s.ip()), + HboneAddress::SvcHostname(_, _) => None, + } + } + + pub fn svc_hostname(&self) -> Option { + match self { + HboneAddress::SocketAddr(_) => None, + HboneAddress::SvcHostname(s, _) => Some(s.into()), + } + } + + pub fn hostname_addr(&self) -> Option { + match self { + HboneAddress::SocketAddr(_) => None, + HboneAddress::SvcHostname(_, _) => Some(Strng::from(self.to_string())), + } + } +} + +impl std::fmt::Display for HboneAddress { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + HboneAddress::SocketAddr(addr) => write!(f, "{}", addr), + HboneAddress::SvcHostname(host, port) => write!(f, "{}:{}", host, port), + } + } +} + +impl From for HboneAddress { + fn from(socket_addr: SocketAddr) -> Self { + HboneAddress::SocketAddr(socket_addr) + } +} + +impl From<(Strng, u16)> for HboneAddress { + fn from(svc_hostname: (Strng, u16)) -> Self { + HboneAddress::SvcHostname(svc_hostname.0, svc_hostname.1) + } +} + +impl TryFrom<&http::Uri> for HboneAddress { + type Error = Error; + + fn try_from(value: &http::Uri) -> Result { + match value.to_string().parse::() { + Ok(addr) => Ok(HboneAddress::SocketAddr(addr)), + Err(_) => { + let hbone_host = value + .host() + .ok_or_else(|| Error::NoValidAuthority(value.to_string()))?; + let hbone_port = value + .port_u16() + .ok_or_else(|| Error::NoValidAuthority(value.to_string()))?; + Ok(HboneAddress::SvcHostname(hbone_host.into(), hbone_port)) + } + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/proxy/inbound.rs b/src/proxy/inbound.rs index 85e6ae35d..8d1f7ed0a 100644 --- a/src/proxy/inbound.rs +++ b/src/proxy/inbound.rs @@ -30,11 +30,14 @@ use crate::config::Config; use crate::drain::DrainWatcher; use crate::proxy::h2::server::H2Request; use crate::proxy::metrics::{ConnectionOpen, Reporter}; -use crate::proxy::{metrics, ProxyInputs, TraceParent, BAGGAGE_HEADER, TRACEPARENT_HEADER}; +use crate::proxy::{ + metrics, HboneAddress, ProxyInputs, TraceParent, BAGGAGE_HEADER, TRACEPARENT_HEADER, +}; use crate::rbac::Connection; use crate::socket::to_canonical; use crate::state::service::Service; use crate::state::workload::application_tunnel::Protocol as AppProtocol; +use crate::strng::Strng; use crate::{assertions, copy, handle_connection, proxy, socket, strng, tls}; use crate::drain::run_with_drain; @@ -160,6 +163,7 @@ impl Inbound { .unwrap_or_else(TraceParent::new) } + /// serve_connect handles a single connection from a client. #[allow(clippy::too_many_arguments)] async fn serve_connect( pi: Arc, @@ -192,6 +196,7 @@ impl Inbound { // Now we have enough context to properly report logs and metrics. Group everything else that // can fail before we send the OK response here. let rx = async { + // Define a connection guard to ensure rbac conditions are maintained for the duration of the connection let conn_guard = pi .connection_manager .assert_rbac(&pi.state, &ri.rbac_ctx, ri.for_host) @@ -202,6 +207,8 @@ impl Inbound { ))?; let orig_src = enable_original_source.then_some(ri.rbac_ctx.conn.src.ip()); + // Establish upstream connection between original source and destination + // We are allowing a bind to the original source address locally even if the ip address isn't on this node. let stream = super::freebind_connect(orig_src, ri.upstream_addr, pi.socket_factory.as_ref()) .await @@ -213,6 +220,7 @@ impl Inbound { debug!("connected to: {}", ri.upstream_addr); Ok((conn_guard, stream)) }; + // Wait on establishing the upstream connection and connection guard before sending the 200 response to the client let (mut conn_guard, mut stream) = match rx.await { Ok(res) => res, Err(InboundFlagError(err, flag, code)) => { @@ -227,16 +235,41 @@ impl Inbound { // At this point, we established the upstream connection and need to send a 200 back to the client. // we may still have failures at this point during the proxying, but we don't need to send these // at the HTTP layer. + // After sending a 200 back to the client, write the proxy protocol to the stream. This ensures + // that the server has all of the necessary information about the connection regardless of the protocol + // See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt for more information about the + // proxy protocol. let send = req .send_response(build_response(StatusCode::OK)) .and_then(|h2_stream| async { + // Required for a custom waypoint so that it doesn't have to support HBONE if ri.inbound_protocol == AppProtocol::PROXY { let Connection { src, src_identity, .. } = ri.rbac_ctx.conn; - super::write_proxy_protocol(&mut stream, (src, ri.hbone_addr), src_identity) - .instrument(trace_span!("proxy protocol")) - .await?; + let protocol_addr = match ri.hbone_addr { + HboneAddress::SocketAddr(addr) => addr, + HboneAddress::SvcHostname(_, _) => { + // If the hbone_addr includes service hostname, we need to use the resolved svc IP address + match ri.upstream_protocol_addr { + Some(addr) => addr, + None => { + // This should never happen, as we should have already validated the destination + // and resolved the hostname to an IP address. + return Err(Error::ConnectAddress(ri.hbone_addr.to_string())); + } + } + } + }; + let hostname_addr: Option = ri.hbone_addr.hostname_addr(); + super::write_proxy_protocol( + &mut stream, + (src, protocol_addr), + src_identity, + hostname_addr, + ) + .instrument(trace_span!("proxy protocol")) + .await?; } copy::copy_bidirectional( h2_stream, @@ -250,8 +283,10 @@ impl Inbound { ri.result_tracker.record(res); } + // build_inbound_request builds up the context for an inbound request. async fn build_inbound_request( pi: &Arc, + // HBONE connection conn: Connection, req: &H2Request, ) -> Result { @@ -261,18 +296,14 @@ impl Inbound { } let start = Instant::now(); - let hbone_addr = req + + // Extract the host or IP from the authority pseudo-header of the URI + let hbone_addr: HboneAddress = req .uri() - .to_string() - .as_str() - .parse::() - .map_err(|_| { - InboundError( - Error::ConnectAddress(req.uri().to_string()), - StatusCode::BAD_REQUEST, - ) - })?; + .try_into() + .map_err(|e: Error| InboundError(e, StatusCode::BAD_REQUEST))?; + // Get the destination workload information of the destination pods (wds) workload (not destination ztunnel) let destination_workload = pi .local_workload_information .get_workload() @@ -280,19 +311,66 @@ impl Inbound { // At this point we already fetched the local workload for TLS, so it should be infallible. .map_err(InboundError::build(StatusCode::SERVICE_UNAVAILABLE))?; - // Check the request is allowed - Self::validate_destination(&pi.cfg, &pi.state, &conn, &destination_workload, hbone_addr) + // Check the request is allowed by verifying the destination + Self::validate_destination( + &pi.cfg, + &pi.state, + &conn, + &destination_workload, + hbone_addr.clone(), + ) + .await + .map_err(InboundError::build(StatusCode::BAD_REQUEST))?; + + // Set to the service IP address if the hbone_addr is a hostname + // Used for the proxy protocol header. The Address header does not support hostname. + let mut upstream_protocol_addr = None; + + // Validate hostname if present + if hbone_addr.svc_hostname().is_some() { + // Get the service address by hostname + let svc = Self::find_service_by_hostname( + &pi.state, + &destination_workload, + hbone_addr.svc_hostname().unwrap(), + ) .await .map_err(InboundError::build(StatusCode::BAD_REQUEST))?; + // Validate the destination pod is a member of the selected service + if !svc.contains_endpoint(&destination_workload) { + // TODO(jaellio): Update error msg/type + return Err(InboundError( + Error::NoHostname(destination_workload.to_string()), + StatusCode::BAD_REQUEST, + )); + } + // Select the VIP by network match + let svc_address = svc + .vips + .iter() + .max_by_key(|a| match a.network == conn.dst_network { + true => 1, + false => 0, + }) + .ok_or_else(|| { + InboundError( + Error::NoResolvedAddresses(destination_workload.to_string()), + StatusCode::BAD_REQUEST, + ) + })?; + upstream_protocol_addr = SocketAddr::new(svc_address.address, hbone_addr.port()).into(); + } + // Determine the next hop. let (upstream_addr, inbound_protocol, upstream_service) = - Self::find_inbound_upstream(&pi.state, &conn, &destination_workload, hbone_addr); + Self::find_inbound_upstream(&pi.state, &conn, &destination_workload, hbone_addr.port()); let original_dst = conn.dst; // Connection has 15008, swap with the real port let conn = Connection { dst: upstream_addr, + // copies the rest of the connection fields from conn ..conn }; @@ -346,7 +424,7 @@ impl Inbound { // For consistency with outbound logs, report the original destination (with 15008 port) // as dst.addr, and the target address as dst.hbone_addr original_dst, - Some(hbone_addr), + Some(hbone_addr.clone()), start, ConnectionOpen { reporter: Reporter::destination, @@ -365,26 +443,65 @@ impl Inbound { upstream_addr, inbound_protocol, hbone_addr, + upstream_protocol_addr, }) } + // Selects a service by hostname without the explicit knowledge of the namespace + // There is no explicit mapping from hostname to namespace (e.g. foo.com) + async fn find_service_by_hostname( + state: &DemandProxyState, + local_workload: &Workload, + hbone_host: Strng, + ) -> Result { + // Validate a service exists for the hostname + // TODO(jaellio): Currently returns an error is no service is found. Workload lookup is not supported. + let services = state.read().find_service_by_hostname(&hbone_host)?; + + services + .iter() + .max_by_key(|s| { + let is_local_namespace = s.namespace == local_workload.namespace; + match is_local_namespace { + true => 1, + false => 0, + } + }) + .cloned() + .ok_or_else(|| Error::NoResolvedAddresses(hbone_host.to_string())) + } + /// validate_destination ensures the destination is an allowed request. async fn validate_destination( cfg: &Config, state: &DemandProxyState, conn: &Connection, local_workload: &Workload, - hbone_addr: SocketAddr, + hbone_addr: HboneAddress, ) -> Result<(), Error> { let illegal_call = cfg.illegal_ports.contains(&hbone_addr.port()); if illegal_call { return Err(Error::SelfCall); } - if conn.dst.ip() == hbone_addr.ip() { - // Normal case: both are aligned. This is allowed (we really only need the HBONE address for the port.) + let hbone_ip = match hbone_addr.ip() { + Some(ip) => ip, + None => { + if hbone_addr.svc_hostname().is_none() { + // If the hbone_address doesn't have an IP, it must have a hostname + return Err(Error::ConnectAddress(hbone_addr.to_string())); + } + return Ok(()); + } + }; + + if conn.dst.ip() == hbone_ip { + // Normal case: both the connection destination IP and the HBONE address from the authority header are aligned. + // This is allowed (we really only need the HBONE address for the port.) + // Note: HBONE address contains the original port, so we need that to update the port from 15008. return Ok(()); } + if local_workload.application_tunnel.is_some() { // In the case they have their own tunnel, they will get the HBONE target address in the PROXY // header, and their application can decide what to do with it; we don't validate this. @@ -398,14 +515,13 @@ impl Inbound { // To do this, we do a lookup to see if the HBONE target has us as its waypoint. let hbone_dst = &NetworkAddress { network: conn.dst_network.clone(), - address: hbone_addr.ip(), + address: hbone_ip, }; // None means we need to do on-demand lookup let lookup_is_destination_this_waypoint = || -> Option { let state = state.read(); - // TODO Allow HBONE address to be a hostname. We have to respect rules about // hostname scoping. Can we use the client's namespace here to do that? let hbone_target = state.find_address(hbone_dst)?; @@ -445,18 +561,19 @@ impl Inbound { }; if res.is_none() || res == Some(false) { - return Err(Error::IPMismatch(conn.dst.ip(), hbone_addr.ip())); + return Err(Error::IPMismatch(conn.dst.ip(), hbone_ip)); } Ok(()) } + /// find_inbound_upstream determines the next hop for an inbound request. fn find_inbound_upstream( state: &DemandProxyState, conn: &Connection, local_workload: &Workload, - hbone_addr: SocketAddr, + hbone_port: u16, ) -> (SocketAddr, AppProtocol, Vec>) { - let upstream_addr = SocketAddr::new(conn.dst.ip(), hbone_addr.port()); + let upstream_addr = SocketAddr::new(conn.dst.ip(), hbone_port); // Application tunnel may override the port. let (upstream_addr, inbound_protocol) = match local_workload.application_tunnel.clone() { @@ -483,8 +600,11 @@ struct InboundRequest { rbac_ctx: ProxyRbacContext, result_tracker: Box, upstream_addr: SocketAddr, - hbone_addr: SocketAddr, + hbone_addr: HboneAddress, inbound_protocol: AppProtocol, + // When the hbone_addr contains a hostname, we need to resolve it to an IP address + // None when hbone_addr is an IP address + upstream_protocol_addr: Option, } /// InboundError represents an error with an associated status code. @@ -536,7 +656,7 @@ fn build_response(status: StatusCode) -> Response<()> { #[cfg(test)] mod tests { use super::Inbound; - use crate::{config, strng}; + use crate::{config, proxy::inbound::HboneAddress, strng}; use crate::{ rbac::Connection, @@ -566,6 +686,8 @@ mod tests { const SERVER_POD_IP: &str = "10.0.0.2"; const SERVER_SVC_IP: &str = "10.10.0.1"; + const SERVER_POD_HOSTNAME: &str = "server.default.svc.cluster.local"; + const WAYPOINT_POD_IP: &str = "10.0.0.3"; const WAYPOINT_SVC_IP: &str = "10.10.0.2"; @@ -593,6 +715,8 @@ mod tests { #[test_case(Waypoint::None, WAYPOINT_POD_IP, CLIENT_POD_IP, None; "to waypoint without attachment" )] #[test_case(Waypoint::Service(WAYPOINT_POD_IP, None), WAYPOINT_POD_IP, SERVER_POD_IP , None; "to workload via waypoint with wrong attachment")] #[test_case(Waypoint::Workload(WAYPOINT_POD_IP, None), WAYPOINT_POD_IP, SERVER_SVC_IP , None; "to service via waypoint with wrong attachment")] + // Svc hostname + #[test_case(Waypoint::None, SERVER_POD_IP, SERVER_POD_HOSTNAME, Some((SERVER_POD_IP, TARGET_PORT)); "svc hostname to workload no waypoint")] #[tokio::test] async fn test_find_inbound_upstream( target_waypoint: Waypoint<'_>, @@ -615,18 +739,23 @@ mod tests { }) .await .unwrap(); - let hbone_addr = format!("{hbone_dst}:{TARGET_PORT}").parse().unwrap(); - let res = Inbound::validate_destination(&cfg, &state, &conn, &local_wl, hbone_addr) - .await - .map(|_| Inbound::find_inbound_upstream(&state, &conn, &local_wl, hbone_addr)); + let hbone_addr = + if let Ok(addr) = format!("{hbone_dst}:{TARGET_PORT}").parse::() { + HboneAddress::SocketAddr(addr) + } else { + HboneAddress::SvcHostname(hbone_dst.into(), TARGET_PORT) + }; + let validate_destination = + Inbound::validate_destination(&cfg, &state, &conn, &local_wl, hbone_addr.clone()).await; + let res = Inbound::find_inbound_upstream(&state, &conn, &local_wl, hbone_addr.port()); match want { Some((ip, port)) => { - let got_addr = res.expect("found upstream").0; - assert_eq!(got_addr, SocketAddr::new(ip.parse().unwrap(), port)) + let got_addr = res.0; + assert_eq!(got_addr, SocketAddr::new(ip.parse().unwrap(), port)); } None => { - res.expect_err("did not find upstream"); + validate_destination.expect_err("did not find upstream"); } } } diff --git a/src/proxy/metrics.rs b/src/proxy/metrics.rs index 7a34dd4f1..3d4522efe 100644 --- a/src/proxy/metrics.rs +++ b/src/proxy/metrics.rs @@ -28,7 +28,7 @@ use tracing_core::field::Value; use crate::identity::Identity; use crate::metrics::DefaultedUnknown; -use crate::proxy; +use crate::proxy::{self, HboneAddress}; use crate::state::service::ServiceDescription; use crate::state::workload::Workload; @@ -299,7 +299,7 @@ pub struct ConnectionResult { src: (SocketAddr, Option), // Dst address and name dst: (SocketAddr, Option), - hbone_target: Option, + hbone_target: Option, start: Instant, // TODO: storing CommonTrafficLabels adds ~600 bytes retained throughout a connection life time. @@ -381,7 +381,7 @@ impl ConnectionResult { dst: SocketAddr, // If using hbone, the inner HBONE address // That is, dst is the L4 address, while is the :authority. - hbone_target: Option, + hbone_target: Option, start: Instant, conn: ConnectionOpen, metrics: Arc, @@ -410,7 +410,7 @@ impl ConnectionResult { src.identity = tl.source_principal.as_ref().filter(|_| mtls).map(to_value_owned), dst.addr = %dst.0, - dst.hbone_addr = hbone_target.map(display), + dst.hbone_addr = hbone_target.as_ref().map(display), dst.service = tl.destination_service.to_value(), dst.workload = dst.1.as_deref().map(to_value), dst.namespace = tl.destination_workload_namespace.to_value(), @@ -504,7 +504,7 @@ impl ConnectionResult { src.identity = tl.source_principal.as_ref().filter(|_| mtls).map(to_value_owned), dst.addr = %self.dst.0, - dst.hbone_addr = self.hbone_target.map(display), + dst.hbone_addr = self.hbone_target.as_ref().map(display), dst.service = tl.destination_service.to_value(), dst.workload = self.dst.1.as_deref().map(to_value), dst.namespace = tl.destination_workload_namespace.to_value(), diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index 5c047ade7..08881e6c6 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -28,7 +28,9 @@ use crate::identity::Identity; use crate::proxy::metrics::Reporter; use crate::proxy::{metrics, pool, ConnectionOpen, ConnectionResult, DerivedWorkload}; -use crate::proxy::{util, Error, ProxyInputs, TraceParent, BAGGAGE_HEADER, TRACEPARENT_HEADER}; +use crate::proxy::{ + util, Error, HboneAddress, ProxyInputs, TraceParent, BAGGAGE_HEADER, TRACEPARENT_HEADER, +}; use crate::drain::run_with_drain; use crate::drain::DrainWatcher; @@ -183,7 +185,7 @@ impl OutboundConnection { ); let metrics = self.pi.metrics.clone(); - let hbone_target = req.hbone_target_destination; + let hbone_target = req.hbone_target_destination.map(HboneAddress::SocketAddr); let result_tracker = Box::new(ConnectionResult::new( source_addr, req.actual_destination, diff --git a/src/state.rs b/src/state.rs index 879a12df8..9b25ef020 100644 --- a/src/state.rs +++ b/src/state.rs @@ -258,6 +258,14 @@ impl ProxyState { }) } + /// Find services by hostname. + pub fn find_service_by_hostname(&self, hostname: &Strng) -> Result, Error> { + // Hostnames for services are more common, so lookup service first and fallback to workload. + self.services.get_by_host(hostname).ok_or_else(|| { + Error::NoHostname(format!("service with hostname {} not found", hostname)) + }) + } + fn find_upstream( &self, network: Strng,