diff --git a/examples/kubernetes-service-discovery/Cargo.toml b/examples/kubernetes-service-discovery/Cargo.toml index 9b26205..bee94dd 100644 --- a/examples/kubernetes-service-discovery/Cargo.toml +++ b/examples/kubernetes-service-discovery/Cargo.toml @@ -8,8 +8,8 @@ publish = false # This pulls version from main branch so that docker build works (docker was confused by paths) #groupcache = { git = "https://github.com/Petroniuss/groupcache.git" } groupcache = { path = "../../groupcache" } -tonic = "0.12.3" -axum = "0.7.7" +tonic = "0.14.2" +axum = "0.8.0" tower = { version = "0.5.1", features = ["steer"] } tower-http = { version = "0.6.1", features = ["trace"] } @@ -20,10 +20,10 @@ serde_json = "1" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } -axum-prometheus = "0.7.0" +axum-prometheus = "0.9.0" anyhow = "1" async-trait = "0.1" -kube = { version = "0.96.0", features = ["runtime", "derive"] } -k8s-openapi = { version = "0.23.0", features = ["latest"] } +kube = { version = "2.0.1", features = ["runtime", "derive"] } +k8s-openapi = { version = "0.26.0", features = ["latest"] } diff --git a/examples/simple-multiple-instances/Cargo.toml b/examples/simple-multiple-instances/Cargo.toml index f02cbfd..e3696a3 100644 --- a/examples/simple-multiple-instances/Cargo.toml +++ b/examples/simple-multiple-instances/Cargo.toml @@ -7,7 +7,7 @@ publish = false [dependencies] groupcache = { path = "../../groupcache" } -tonic = "0.12.3" +tonic = "0.14.2" tokio = { version = "1.34", features = ["full"] } tower-http = { version = "0.6.1", features = ["trace"] } diff --git a/groupcache-pb/Cargo.toml b/groupcache-pb/Cargo.toml index 8911605..388c98a 100644 --- a/groupcache-pb/Cargo.toml +++ b/groupcache-pb/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "groupcache-pb" -version = "0.1.0" +version = "0.3.0" edition = "2021" authors = ["Patryk Wojtyczek"] categories = ["caching", "web-programming", "concurrency", "asynchronous"] @@ -12,7 +12,10 @@ readme = "../readme.md" repository = "https://github.com/Petroniuss/groupcache" [dependencies] -prost = "0.13.3" -tonic = "0.12.3" -anyhow = "1" -tonic-build = { version= "0.12.3", features = ["prost"] } +prost = "0.14.1" +tonic = "0.14.2" +tonic-prost = "0.14.2" +tonic-prost-build = { version = "0.14.2" } + +[build-dependencies] +tonic-prost-build = "*" \ No newline at end of file diff --git a/groupcache-pb/src/main.rs b/groupcache-pb/build.rs similarity index 71% rename from groupcache-pb/src/main.rs rename to groupcache-pb/build.rs index 6c50c97..145c5bb 100644 --- a/groupcache-pb/src/main.rs +++ b/groupcache-pb/build.rs @@ -1,16 +1,19 @@ -use anyhow::anyhow; - fn main() -> Result<(), Box> { + // skip codegen if there hasn't been any updates. + if true { + return Ok(()); + } + let current_dir = std::env::current_dir()?; if !current_dir.ends_with("groupcache-pb") { - return Err(anyhow!( + return Err(format!( "must be run from the root of the crate, instead was {:#?}", current_dir ) .into()); } - tonic_build::configure() + tonic_prost_build::configure() .out_dir("src/") .compile_protos(&["protos/groupcache.proto"], &["protos/"])?; Ok(()) diff --git a/groupcache-pb/src/groupcache_pb.rs b/groupcache-pb/src/groupcache_pb.rs index ab3bdfd..321cd44 100644 --- a/groupcache-pb/src/groupcache_pb.rs +++ b/groupcache-pb/src/groupcache_pb.rs @@ -1,20 +1,20 @@ // This file is @generated by prost-build. -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct GetRequest { #[prost(string, tag = "1")] pub key: ::prost::alloc::string::String, } -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct GetResponse { #[prost(bytes = "vec", optional, tag = "1")] pub value: ::core::option::Option<::prost::alloc::vec::Vec>, } -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct RemoveRequest { #[prost(string, tag = "1")] pub key: ::prost::alloc::string::String, } -#[derive(Clone, Copy, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct RemoveResponse {} /// Generated client implementations. pub mod groupcache_client { @@ -23,10 +23,10 @@ pub mod groupcache_client { dead_code, missing_docs, clippy::wildcard_imports, - clippy::let_unit_value + clippy::let_unit_value, )] - use tonic::codegen::http::Uri; use tonic::codegen::*; + use tonic::codegen::http::Uri; #[derive(Debug, Clone)] pub struct GroupcacheClient { inner: tonic::client::Grpc, @@ -44,7 +44,7 @@ pub mod groupcache_client { } impl GroupcacheClient where - T: tonic::client::GrpcService, + T: tonic::client::GrpcService, T::Error: Into, T::ResponseBody: Body + std::marker::Send + 'static, ::Error: Into + std::marker::Send, @@ -65,13 +65,14 @@ pub mod groupcache_client { F: tonic::service::Interceptor, T::ResponseBody: Default, T: tonic::codegen::Service< - http::Request, + http::Request, Response = http::Response< - >::ResponseBody, + >::ResponseBody, >, >, - >>::Error: - Into + std::marker::Send + std::marker::Sync, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, { GroupcacheClient::new(InterceptedService::new(inner, interceptor)) } @@ -110,11 +111,18 @@ pub mod groupcache_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::unknown(format!("Service was not ready: {}", e.into())) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/groupcache_pb.Groupcache/Get"); + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/groupcache_pb.Groupcache/Get", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("groupcache_pb.Groupcache", "Get")); @@ -124,11 +132,18 @@ pub mod groupcache_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::unknown(format!("Service was not ready: {}", e.into())) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/groupcache_pb.Groupcache/Remove"); + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/groupcache_pb.Groupcache/Remove", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("groupcache_pb.Groupcache", "Remove")); @@ -143,7 +158,7 @@ pub mod groupcache_server { dead_code, missing_docs, clippy::wildcard_imports, - clippy::let_unit_value + clippy::let_unit_value, )] use tonic::codegen::*; /// Generated trait containing gRPC methods that should be implemented for use with GroupcacheServer. @@ -179,7 +194,10 @@ pub mod groupcache_server { max_encoding_message_size: None, } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -220,7 +238,7 @@ pub mod groupcache_server { B: Body + std::marker::Send + 'static, B::Error: Into + std::marker::Send + 'static, { - type Response = http::Response; + type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; fn poll_ready( @@ -234,15 +252,21 @@ pub mod groupcache_server { "/groupcache_pb.Groupcache/Get" => { #[allow(non_camel_case_types)] struct GetSvc(pub Arc); - impl tonic::server::UnaryService for GetSvc { + impl tonic::server::UnaryService + for GetSvc { type Response = super::GetResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::get(&inner, request).await }; + let fut = async move { + ::get(&inner, request).await + }; Box::pin(fut) } } @@ -253,7 +277,7 @@ pub mod groupcache_server { let inner = self.inner.clone(); let fut = async move { let method = GetSvc(inner); - let codec = tonic::codec::ProstCodec::default(); + let codec = tonic_prost::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( accept_compression_encodings, @@ -271,16 +295,21 @@ pub mod groupcache_server { "/groupcache_pb.Groupcache/Remove" => { #[allow(non_camel_case_types)] struct RemoveSvc(pub Arc); - impl tonic::server::UnaryService for RemoveSvc { + impl tonic::server::UnaryService + for RemoveSvc { type Response = super::RemoveResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = - async move { ::remove(&inner, request).await }; + let fut = async move { + ::remove(&inner, request).await + }; Box::pin(fut) } } @@ -291,7 +320,7 @@ pub mod groupcache_server { let inner = self.inner.clone(); let fut = async move { let method = RemoveSvc(inner); - let codec = tonic::codec::ProstCodec::default(); + let codec = tonic_prost::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( accept_compression_encodings, @@ -306,19 +335,25 @@ pub mod groupcache_server { }; Box::pin(fut) } - _ => Box::pin(async move { - let mut response = http::Response::new(empty_body()); - let headers = response.headers_mut(); - headers.insert( - tonic::Status::GRPC_STATUS, - (tonic::Code::Unimplemented as i32).into(), - ); - headers.insert( - http::header::CONTENT_TYPE, - tonic::metadata::GRPC_CONTENT_TYPE, - ); - Ok(response) - }), + _ => { + Box::pin(async move { + let mut response = http::Response::new( + tonic::body::Body::default(), + ); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } } } } diff --git a/groupcache-pb/src/lib.rs b/groupcache-pb/src/lib.rs index 350cff9..9d62e3f 100644 --- a/groupcache-pb/src/lib.rs +++ b/groupcache-pb/src/lib.rs @@ -1,12 +1,13 @@ //! Contains generated tonic code for peer to peer communication. //! //! Run main.rs to regenerate the code when updating .proto or bumping tonic/prost version. +#[rustfmt::skip] mod groupcache_pb; -pub use groupcache_pb::groupcache_client::GroupcacheClient; -pub use groupcache_pb::groupcache_server::Groupcache; -use groupcache_pb::groupcache_server::GroupcacheServer as GroupcacheGRPCServer; -pub use groupcache_pb::{GetRequest, GetResponse, RemoveRequest, RemoveResponse}; +pub use crate::groupcache_pb::groupcache_client::GroupcacheClient; +pub use crate::groupcache_pb::groupcache_server::Groupcache; +use crate::groupcache_pb::groupcache_server::GroupcacheServer as GroupcacheGRPCServer; +pub use crate::groupcache_pb::{GetRequest, GetResponse, RemoveRequest, RemoveResponse}; /// gRPC server implementing groupcache GET to retrieve values. pub type GroupcacheServer = GroupcacheGRPCServer; diff --git a/groupcache/Cargo.toml b/groupcache/Cargo.toml index b2d944a..c68c725 100644 --- a/groupcache/Cargo.toml +++ b/groupcache/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "groupcache" -version = "0.2.1" +version = "0.3.0" authors = ["Patryk Wojtyczek"] edition = "2021" categories = ["caching", "web-programming", "concurrency", "asynchronous"] @@ -16,24 +16,24 @@ readme = "readme.md" repository = "https://github.com/Petroniuss/groupcache" [dependencies] -groupcache-pb = { path = "../groupcache-pb", version = "0.1.0" } -tonic = "0.12.3" +groupcache-pb = { path = "../groupcache-pb", version = "0.3.0" } +tonic = "0.14.2" hashring = "0.3.3" anyhow = "1.0" thiserror = "2.0" -axum = "0.7.7" +axum = "0.8.0" tokio = { version = "1.34" , features = ["rt"]} serde = { version = "1.0", features = ["derive"] } rmp-serde = "1.1" async-trait = "0.1.74" -singleflight-async = "0.1.1" +singleflight-async = "0.2.0" moka = { version = "0.12.1", features = ["future"] } log = "0.4.20" -metrics = "0.22.0" +metrics = "0.24.0" [dev-dependencies] cargo-husky = { workspace = true } pretty_assertions = "1.4.0" tokio-stream = { version = "0.1.14", features = ["net"] } tokio = { version = "1.34", features = ["time", "test-util", "rt", "macros"] } -rstest = "0.18.2" +rstest = "0.26.1" diff --git a/groupcache/src/groupcache_inner.rs b/groupcache/src/groupcache_inner.rs index 9cf3ce9..e9ac1c9 100644 --- a/groupcache/src/groupcache_inner.rs +++ b/groupcache/src/groupcache_inner.rs @@ -25,7 +25,7 @@ use tonic::IntoRequest; /// Core implementation of groupcache API. pub struct GroupcacheInner { routing_state: Arc>, - single_flight_group: SingleFlight>, + single_flight_group: SingleFlight>, main_cache: Cache, hot_cache: Cache, loader: Box>, @@ -102,7 +102,7 @@ impl GroupcacheInner { peer: GroupcachePeerWithClient, ) -> Result { self.single_flight_group - .work(key, || async { + .work(key.to_owned(), || async { self.get_deduped(key, peer) .await .map_err(|e| DedupedGroupcacheError(Arc::new(e))) @@ -143,9 +143,8 @@ impl GroupcacheInner { self.loader .load(key) .await - .map_err(|e| { + .inspect_err(|_| { counter!(METRIC_LOCAL_LOAD_ERROR_TOTAL).increment(1); - e }) .map_err(InternalGroupcacheError::LocalLoader) } @@ -156,10 +155,9 @@ impl GroupcacheInner { client: &mut GroupcachePeerClient, ) -> Result { counter!(METRIC_REMOTE_LOAD_TOTAL).increment(1); - self.load_remotely(key, client).await.map_err(|e| { - counter!(METRIC_REMOTE_LOAD_ERROR).increment(1); - e - }) + self.load_remotely(key, client) + .await + .inspect_err(|_| counter!(METRIC_REMOTE_LOAD_ERROR).increment(1)) } async fn load_remotely(