diff --git a/Cargo.lock b/Cargo.lock index c6cbb40..0c20af2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -60,11 +60,58 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "anyhow" +version = "1.0.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" + +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + +[[package]] +name = "bitflags" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" + +[[package]] +name = "bumpalo" +version = "3.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" + [[package]] name = "bytes" -version = "1.9.0" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" + +[[package]] +name = "cfg-if" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" [[package]] name = "clap" @@ -112,6 +159,23 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "env_filter" version = "0.1.3" @@ -135,24 +199,406 @@ dependencies = [ "log", ] +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "form_urlencoded" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" +dependencies = [ + "percent-encoding", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "getrandom" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasip2", +] + +[[package]] +name = "h2" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c0b69cfcb4e1b9f1bf2f53f95f766e4661169728ec61cd3fe5a0166f2d1386" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" + [[package]] name = "heck" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "http" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" +dependencies = [ + "bytes", + "itoa", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http", +] + +[[package]] +name = "http-body-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" + [[package]] name = "humantime" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "hyper" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11" +dependencies = [ + "atomic-waker", + "bytes", + "futures-channel", + "futures-core", + "h2", + "http", + "http-body", + "httparse", + "itoa", + "pin-project-lite", + "pin-utils", + "smallvec", + "tokio", + "want", +] + +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + +[[package]] +name = "hyper-util" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" +dependencies = [ + "base64", + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http", + "http-body", + "hyper", + "ipnet", + "libc", + "percent-encoding", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", +] + +[[package]] +name = "icu_collections" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43" +dependencies = [ + "displaydoc", + "potential_utf", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locale_core" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_normalizer" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599" +dependencies = [ + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" + +[[package]] +name = "icu_properties" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec" +dependencies = [ + "icu_collections", + "icu_locale_core", + "icu_properties_data", + "icu_provider", + "zerotrie", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af" + +[[package]] +name = "icu_provider" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614" +dependencies = [ + "displaydoc", + "icu_locale_core", + "writeable", + "yoke", + "zerofrom", + "zerotrie", + "zerovec", +] + +[[package]] +name = "idna" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + +[[package]] +name = "indexmap" +version = "2.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" +dependencies = [ + "equivalent", + "hashbrown", +] + +[[package]] +name = "ipnet" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" + +[[package]] +name = "iri-string" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f867b9d1d896b67beb18518eda36fdb77a32ea590de864f1325b294a6d14397" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + +[[package]] +name = "itoa" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" + +[[package]] +name = "js-sys" +version = "0.3.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "464a3709c7f55f1f721e5389aa6ea4e3bc6aba669353300af094b29ffbdde1d8" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + [[package]] name = "libc" version = "0.2.178" @@ -170,6 +616,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "litemap" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" + [[package]] name = "log" version = "0.4.22" @@ -194,57 +646,366 @@ dependencies = [ ] [[package]] -name = "pin-project-lite" -version = "0.2.15" +name = "once_cell" +version = "1.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" + +[[package]] +name = "opentelemetry" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror", + "tracing", +] + +[[package]] +name = "opentelemetry-http" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" +dependencies = [ + "async-trait", + "bytes", + "http", + "opentelemetry", + "reqwest", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" +dependencies = [ + "http", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "reqwest", + "thiserror", + "tokio", + "tonic", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", + "tonic-prost", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" + +[[package]] +name = "opentelemetry_sdk" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry", + "percent-encoding", + "rand", + "thiserror", + "tokio", + "tokio-stream", +] + +[[package]] +name = "percent-encoding" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" + +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "potential_utf" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b73949432f5e2a09657003c25bca5e19a0e9c84f8058ca374f49e0ebe605af77" +dependencies = [ + "zerovec", +] + +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + +[[package]] +name = "proc-macro2" +version = "1.0.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "prost" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "quote" +version = "1.0.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + +[[package]] +name = "rand" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" +dependencies = [ + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom", +] + +[[package]] +name = "regex" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" + +[[package]] +name = "reqwest" +version = "0.12.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b4c14b2d9afca6a60277086b0cc6a6ae0b568f6f7916c943a8cdc79f8be240f" +dependencies = [ + "base64", + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + +[[package]] +name = "ryu" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff" +checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" [[package]] -name = "proc-macro2" -version = "1.0.92" +name = "serde" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" dependencies = [ - "unicode-ident", + "serde_core", + "serde_derive", ] [[package]] -name = "quote" -version = "1.0.38" +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", + "quote", + "syn", ] [[package]] -name = "regex" -version = "1.11.1" +name = "serde_json" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" +checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" dependencies = [ - "aho-corasick", + "itoa", "memchr", - "regex-automata", - "regex-syntax", + "ryu", + "serde", + "serde_core", ] [[package]] -name = "regex-automata" -version = "0.4.9" +name = "serde_urlencoded" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" dependencies = [ - "aho-corasick", - "memchr", - "regex-syntax", + "form_urlencoded", + "itoa", + "ryu", + "serde", ] [[package]] -name = "regex-syntax" -version = "0.8.5" +name = "slab" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" [[package]] name = "socket2" @@ -256,6 +1017,12 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" + [[package]] name = "strsim" version = "0.11.1" @@ -273,17 +1040,71 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +dependencies = [ + "futures-core", +] + +[[package]] +name = "synstructure" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "systemd-udp-proxy" -version = "0.1.3" +version = "0.2.0" dependencies = [ "clap", "env_logger", "listenfd", "log", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", "tokio", ] +[[package]] +name = "thiserror" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tinystr" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42d3e9c45c09de15d06dd8acf5f4e0e399e85927b7f00711024eb7ae10fa4869" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tokio" version = "1.48.0" @@ -310,12 +1131,177 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tonic" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" +dependencies = [ + "async-trait", + "base64", + "bytes", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "sync_wrapper", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-prost" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" +dependencies = [ + "bytes", + "prost", + "tonic", +] + +[[package]] +name = "tower" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" +dependencies = [ + "futures-core", + "futures-util", + "indexmap", + "pin-project-lite", + "slab", + "sync_wrapper", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" +dependencies = [ + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "iri-string", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + +[[package]] +name = "tower-service" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + +[[package]] +name = "tracing" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c" +dependencies = [ + "once_cell", +] + +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + [[package]] name = "unicode-ident" version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" +[[package]] +name = "url" +version = "2.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08bc136a29a3d1758e07a9cca267be308aeebf5cfd5a10f3f67ab2097683ef5b" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", + "serde", +] + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.2" @@ -328,12 +1314,98 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasip2" +version = "1.0.1+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" +dependencies = [ + "wit-bindgen", +] + +[[package]] +name = "wasm-bindgen" +version = "0.2.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d759f433fa64a2d763d1340820e46e111a7a5ab75f993d1852d70b03dbb80fd" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "836d9622d604feee9e5de25ac10e3ea5f2d65b41eac0d9ce72eb5deae707ce7c" +dependencies = [ + "cfg-if", + "js-sys", + "once_cell", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48cb0d2638f8baedbc542ed444afc0644a29166f1595371af4fecf8ce1e7eeb3" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cefb59d5cd5f92d9dcf80e4683949f15ca4b511f4ac0a6e14d4e1ac60c6ecd40" +dependencies = [ + "bumpalo", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbc538057e648b67f72a982e708d485b2efa771e1ac05fec311f9f63e5800db4" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "web-sys" +version = "0.3.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b32828d774c412041098d182a8b38b16ea816958e07cf40eec2bc080ae137ac" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "winapi" version = "0.3.9" @@ -526,3 +1598,112 @@ name = "windows_x86_64_msvc" version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + +[[package]] +name = "wit-bindgen" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" + +[[package]] +name = "writeable" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" + +[[package]] +name = "yoke" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72d6e5c6afb84d73944e5cedb052c4680d5657337201555f9f2a16b7406d4954" +dependencies = [ + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerocopy" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd74ec98b9250adb3ca554bdde269adf631549f51d8a8f8f0a10b50f1cb298c3" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8a8d209fdf45cf5138cbb5a506f6b52522a25afccc534d1475dad8e31105c6a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zerofrom" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerotrie" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a59c17a5562d507e4b54960e8569ebee33bee890c70aa3fe7b97e85a9fd7851" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c28719294829477f525be0186d13efa9a3c602f7ec202ca9e353d310fb9a002" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml index c3d9fbe..a9f4548 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "systemd-udp-proxy" -version = "0.1.3" +version = "0.2.0" edition = "2024" [dependencies] @@ -8,4 +8,8 @@ clap = { version = "4.5", features = ["derive"] } env_logger = "0.11" listenfd = "1.0" log = "0.4" +opentelemetry = "0.31" +opentelemetry_sdk = { version = "0.31", features = ["rt-tokio"] } +opentelemetry-otlp = { version = "0.31", features = ["metrics", "grpc-tonic"] } +opentelemetry-semantic-conventions = { version = "0.31", features = ["semconv_experimental"] } tokio = { version = "1.48", features = ["io-util", "macros", "net", "rt-multi-thread", "sync", "time"] } diff --git a/src/main.rs b/src/main.rs index 6f325df..1165806 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,14 +11,18 @@ use clap::Parser; use listenfd::ListenFd; #[cfg(not(debug_assertions))] use log::warn; -use primary_tasks::{rx_task, tx_task}; +use primary_tasks::{SessionCache, rx_task, tx_task}; use session::SessionReply; -use tokio::{net::UdpSocket, sync::mpsc}; +use tokio::{ + net::UdpSocket, + sync::{RwLock, mpsc}, +}; mod error_util; mod log_config; mod primary_tasks; mod session; +mod telemetry; #[derive(Parser, Debug)] struct ProxyConfig { @@ -34,9 +38,20 @@ struct ProxyConfig { /// How many seconds sessions should be cached before expiring #[arg(short = 't', long, default_value_t = 60)] session_timeout: u64, -} + /// Maximum UDP packet size to receive in bytes (packets larger will be truncated) + #[arg(short = 'm', long, default_value_t = 1500)] + max_packet_size: usize, -const MAX_UDP_PACKET_SIZE: u16 = u16::MAX; + /// The OTel collector endpoint + #[arg(long, default_value = "http://localhost:4317")] + otel_endpoint: String, + /// The service name for OTel tagging + #[arg(long, default_value = "systemd-udp-proxy")] + service_name: String, + /// The deployment environment for OTel tagging + #[arg(long, default_value = "prod")] + environment: String, +} #[tokio::main] async fn main() -> io::Result<()> { @@ -69,10 +84,25 @@ async fn main() -> io::Result<()> { let source_socket = Arc::new(UdpSocket::from_std(std_source_socket)?); let (reply_channel_tx, reply_channel_rx) = mpsc::unbounded_channel::(); - let rx_task = tokio::spawn(rx_task(config, reply_channel_tx, source_socket.clone())); - let tx_task = tokio::spawn(tx_task(reply_channel_rx, source_socket.clone())); + let sessions = Arc::new(RwLock::new(SessionCache::new())); + let (metrics, meter) = telemetry::init_metrics(&config, sessions.clone()).map_err(|err| { + io::Error::other(format!("Failed to initialize OTel metrics exporter: {err}")) + })?; + + let rx_task = tokio::spawn(rx_task( + config, + reply_channel_tx, + source_socket.clone(), + sessions, + metrics.clone(), + )); + let tx_task = tokio::spawn(tx_task( + reply_channel_rx, + source_socket.clone(), + metrics.clone(), + )); rx_task.await??; tx_task.await??; - Ok(()) + meter.shutdown().map_err(io::Error::other) } diff --git a/src/primary_tasks/mod.rs b/src/primary_tasks/mod.rs index 8714810..29ccb5c 100644 --- a/src/primary_tasks/mod.rs +++ b/src/primary_tasks/mod.rs @@ -1,5 +1,6 @@ mod rx_task; mod tx_task; +pub use rx_task::SessionCache; pub use rx_task::rx_task; pub use tx_task::tx_task; diff --git a/src/primary_tasks/rx_task.rs b/src/primary_tasks/rx_task.rs index 2371485..df6dde6 100644 --- a/src/primary_tasks/rx_task.rs +++ b/src/primary_tasks/rx_task.rs @@ -14,13 +14,14 @@ use tokio::{ }; use crate::{ - MAX_UDP_PACKET_SIZE, ProxyConfig, + ProxyConfig, error_util::{ErrorAction, handle_io_error}, session::{Session, SessionReply, SessionSource}, + telemetry::{NetworkDirection, Peer, ProxyMetrics}, }; type SessionChannel = UnboundedSender>; -type SessionCache = HashMap)>; +pub type SessionCache = HashMap)>; /// Loops infinitely over the `rx_socket` to recieve traffic from the original source of the proxy. /// @@ -28,16 +29,23 @@ type SessionCache = HashMap)>; /// tx/rx loop tasks are spawned to proxy traffic for that session to and from the destination. If a [`Session`] /// does not recieve traffic for [`ProxyConfig::session_timeout`] seconds, it will close its tasks and a new one will /// be created if any traffic resumes from it. +/// +/// If a packet arrives after a Session's channel has closed but before the session is removed +/// from the cache, that packet will be dropped and the session will be cleaned up. Subsequent +/// packets from the same source will trigger creation of a new session. pub async fn rx_task( config: ProxyConfig, reply_channel_tx: UnboundedSender, rx_socket: Arc, + sessions: Arc>, + metrics: Arc, ) -> io::Result<()> { let shared_reply_channel = Arc::new(reply_channel_tx); - let sessions = Arc::new(RwLock::new(SessionCache::new())); + let dir = NetworkDirection::Receive; + let peer = Peer::Client; loop { - let mut buf = Vec::with_capacity(MAX_UDP_PACKET_SIZE.into()); + let mut buf = Vec::with_capacity(config.max_packet_size); match rx_socket.recv_buf_from(&mut buf).await { Err(err) => match handle_io_error(err) { ErrorAction::Terminate(err) => return Err(err), @@ -48,13 +56,14 @@ pub async fn rx_task( let session_channel_tx = match session_cache.entry(source.into()) { Entry::Vacant(entry) => { info!("Creating a new session for {source}"); - let session = match Session::new(&config, source.into()).await { - Ok(created_session) => Arc::new(created_session), - Err(err) => { - error!("Failed to create a session for {}: {:?}", source, err); - continue; - } - }; + let session = + match Session::new(&config, source.into(), metrics.clone()).await { + Ok(created_session) => Arc::new(created_session), + Err(err) => { + error!("Failed to create a session for {}: {:?}", source, err); + continue; + } + }; let (tx, rx) = mpsc::unbounded_channel(); @@ -72,7 +81,11 @@ pub async fn rx_task( let rx_reply_channel = shared_reply_channel.clone(); tokio::spawn(async move { if let Err(err) = rx_session - .rx_loop(rx_reply_channel, config.session_timeout) + .rx_loop( + rx_reply_channel, + config.session_timeout, + config.max_packet_size, + ) .await { error!("RX error for {}: {:?}", source, err); @@ -89,12 +102,17 @@ pub async fn rx_task( } }; + let bytes = buf.len() as u64; if session_channel_tx.send(buf).is_err() { error!( "Dropped packet for {} because its proxy session is closed", source ); + metrics.count_dropped_packet(&Peer::Client); sessions.write().await.remove(&source.into()); + } else { + metrics.count_packet(&dir, &peer); + metrics.count_bytes(&dir, &peer, bytes); } } } diff --git a/src/primary_tasks/tx_task.rs b/src/primary_tasks/tx_task.rs index b60f9c0..b2bcdbe 100644 --- a/src/primary_tasks/tx_task.rs +++ b/src/primary_tasks/tx_task.rs @@ -5,6 +5,7 @@ use tokio::{net::UdpSocket, sync::mpsc::UnboundedReceiver}; use crate::{ error_util::{ErrorAction, handle_io_error}, session::SessionReply, + telemetry::{NetworkDirection, Peer, ProxyMetrics}, }; /// Loops infinitely over the `reply_channel_rx` to forward traffic from the destination of the proxy. @@ -15,17 +16,32 @@ use crate::{ pub async fn tx_task( mut reply_channel_rx: UnboundedReceiver, tx_socket: Arc, + metrics: Arc, ) -> io::Result<()> { + let dir = NetworkDirection::Transmit; + let peer = Peer::Client; + while let Some(reply) = reply_channel_rx.recv().await { match tx_socket .send_to(&reply.data, (reply.source.address, reply.source.port)) .await { - Ok(_) => {} - Err(err) => match handle_io_error(err) { - ErrorAction::Terminate(err) => return Err::<(), io::Error>(err), - ErrorAction::Continue => {} - }, + Ok(_) => { + metrics.count_packet(&dir, &peer); + metrics.count_bytes(&dir, &peer, reply.data.len() as u64); + } + Err(err) => { + metrics.count_dropped_packet(&peer); + match handle_io_error(err) { + ErrorAction::Terminate(err) => { + metrics.count_io_error(&dir, &peer, false); + return Err::<(), io::Error>(err); + } + ErrorAction::Continue => { + metrics.count_io_error(&dir, &peer, true); + } + } + } } } Ok(()) diff --git a/src/session.rs b/src/session.rs index 0e4105d..24d3a86 100644 --- a/src/session.rs +++ b/src/session.rs @@ -6,16 +6,17 @@ use std::{ time::Duration, }; -use log::info; +use log::{info, warn}; use tokio::{ net::UdpSocket, sync::mpsc::{UnboundedReceiver, UnboundedSender}, - time::timeout, + time::{Instant, timeout}, }; use crate::{ - MAX_UDP_PACKET_SIZE, ProxyConfig, + ProxyConfig, error_util::{ErrorAction, handle_io_error}, + telemetry::{NetworkDirection, Peer, ProxyMetrics}, }; #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] @@ -27,15 +28,15 @@ pub struct SessionSource { /// Wrapper around a [`UdpSocket`] that handles the boiler plate of establishing a connection to the appropriate /// backend destination. It retains the original [`SessionSource`] of the traffic it will be proxying /// so that replies from the backend can be properly routed back. -#[derive(Debug)] pub struct Session { /// The source that this session is receiving traffic from source: SessionSource, /// The socket that this session is using to communicate with the destination destination_socket: Arc, + metrics: Arc, + start: Instant, } -#[derive(Debug)] pub struct SessionReply { pub source: SessionSource, pub data: Vec, @@ -51,7 +52,11 @@ impl Session { /// Establish a new session that binds to an [`ProxyConfig::source_address`] and establishes /// a connection to [`ProxyConfig::destination_address`] on [`ProxyConfig::destination_port`]. /// Returns an [`io::Error`] if the connection fails to establish. - pub async fn new(config: &ProxyConfig, source: SessionSource) -> io::Result { + pub async fn new( + config: &ProxyConfig, + source: SessionSource, + metrics: Arc, + ) -> io::Result { // Let the OS assign us an available port let destination_socket = Arc::new(UdpSocket::bind((config.source_address, 0)).await?); // Connect to the destination @@ -62,6 +67,8 @@ impl Session { Ok(Session { source, destination_socket, + metrics, + start: Instant::now(), }) } @@ -74,17 +81,33 @@ impl Session { session_timeout: u64, ) -> io::Result<()> { let duration = Duration::from_secs(session_timeout); + let dir = NetworkDirection::Transmit; + let peer = Peer::Backend; + while let Ok(Some(data)) = timeout(duration, source_channel.recv()).await { match self.destination_socket.send(&data).await { - Ok(_) => {} - Err(err) => match err.kind() { - // Destination service hasn't started yet - ErrorKind::ConnectionRefused => {} - _ => match handle_io_error(err) { - ErrorAction::Terminate(cause) => return Err(cause), - ErrorAction::Continue => {} - }, - }, + Ok(_) => { + self.metrics.count_packet(&dir, &peer); + self.metrics.count_bytes(&dir, &peer, data.len() as u64); + } + Err(err) => { + self.metrics.count_dropped_packet(&peer); + match err.kind() { + // Destination service hasn't started yet + ErrorKind::ConnectionRefused => { + warn!("Destination service refused connection"); + } + _ => match handle_io_error(err) { + ErrorAction::Terminate(cause) => { + self.metrics.count_io_error(&dir, &peer, false); + return Err(cause); + } + ErrorAction::Continue => { + self.metrics.count_io_error(&dir, &peer, true); + } + }, + } + } } } info!("Closing tx session for {}", self.source); @@ -97,17 +120,30 @@ impl Session { &self, reply_channel: Arc>, session_timeout: u64, + max_packet_size: usize, ) -> io::Result<()> { let duration = Duration::from_secs(session_timeout); + let dir = NetworkDirection::Receive; + let peer = Peer::Backend; + loop { - let mut buf = Vec::with_capacity(MAX_UDP_PACKET_SIZE.into()); + let mut buf = Vec::with_capacity(max_packet_size); match timeout(duration, self.destination_socket.recv_buf(&mut buf)).await { Ok(result) => { if let Err(err) = result { + self.metrics.count_dropped_packet(&peer); match handle_io_error(err) { - ErrorAction::Terminate(cause) => return Err(cause), - ErrorAction::Continue => {} + ErrorAction::Terminate(cause) => { + self.metrics.count_io_error(&dir, &peer, false); + return Err(cause); + } + ErrorAction::Continue => { + self.metrics.count_io_error(&dir, &peer, true); + } } + } else { + self.metrics.count_packet(&dir, &peer); + self.metrics.count_bytes(&dir, &peer, buf.len() as u64); } } Err(_timeout_exceeded) => { @@ -120,6 +156,7 @@ impl Session { .send(SessionReply::new(self.source, buf)) .is_err() { + self.metrics.count_dropped_packet(&peer); return Err(io::Error::new( ErrorKind::ConnectionAborted, "Primary tx task has stopped listening, dropping reply as the proxy will soon terminate", @@ -129,6 +166,13 @@ impl Session { } } +impl Drop for Session { + fn drop(&mut self) { + self.metrics + .record_session_duration(Instant::now().duration_since(self.start).as_secs_f64()); + } +} + impl From for SessionSource { fn from(value: SocketAddr) -> Self { SessionSource { diff --git a/src/telemetry.rs b/src/telemetry.rs new file mode 100644 index 0000000..9263119 --- /dev/null +++ b/src/telemetry.rs @@ -0,0 +1,207 @@ +use opentelemetry::{ + KeyValue, + metrics::{Counter, Histogram, MeterProvider}, +}; +use opentelemetry_otlp::{ExportConfig, ExporterBuildError, MetricExporter, WithExportConfig}; +use opentelemetry_sdk::{ + Resource, + metrics::{PeriodicReader, SdkMeterProvider}, +}; +use opentelemetry_semantic_conventions::attribute; +use std::{ + fmt::{self, Display, Formatter}, + sync::Arc, + time::Duration, +}; +use tokio::sync::RwLock; + +use crate::{ProxyConfig, SessionCache}; + +const SESSION_COUNT: &str = "proxy.session.count"; +const SESSION_DURATION: &str = "proxy.session.duration"; + +const NETWORK_IO_BYTES: &str = "proxy.network.io.bytes"; +const NETWORK_IO_PACKETS: &str = "proxy.network.io.packets"; +const NETWORK_IO_PACKETS_DROPPED: &str = "proxy.network.io.packets.dropped"; +const NETWORK_IO_ERRORS_RECOVERABLE: &str = "proxy.network.io.errors.recoverable"; +const NETWORK_IO_ERRORS_UNRECOVERABLE: &str = "proxy.network.io.errors.unrecoverable"; + +const NETWORK_IO_DIRECTION: &str = "proxy.network.io.direction"; +const NETWORK_PEER_ROLE: &str = "proxy.network.peer.role"; + +const METER_NAME: &str = "proxy"; + +pub struct ProxyMetrics { + pub session_duration: Histogram, + + pub network_io_bytes: Counter, + pub network_io_packets: Counter, + pub network_io_packets_dropped: Counter, + pub network_io_errors_recoverable: Counter, + pub network_io_errors_unrecoverable: Counter, +} + +impl ProxyMetrics { + fn new(meter_provider: &SdkMeterProvider, sessions: Arc>) -> Self { + let meter = meter_provider.meter(METER_NAME); + + // Observable gauge for active sessions + meter + .u64_observable_gauge(SESSION_COUNT) + .with_description("Number of currently active sessions") + .with_callback(move |observer| { + if let Ok(cache) = sessions.try_read() { + observer.observe(cache.len() as u64, &[]); + } + }) + .build(); + + ProxyMetrics { + session_duration: meter + .f64_histogram(SESSION_DURATION) + .with_description("Proxy session duration measured in seconds") + .with_unit("s") + .build(), + network_io_bytes: meter + .u64_counter(NETWORK_IO_BYTES) + .with_description("Network bytes sent and received by the proxy") + .with_unit("By") + .build(), + network_io_packets: meter + .u64_counter(NETWORK_IO_PACKETS) + .with_description("Network packets sent and received by the proxy") + .with_unit("{packet}") + .build(), + network_io_packets_dropped: meter + .u64_counter(NETWORK_IO_PACKETS_DROPPED) + .with_description("Packets dropped due to closing sessions") + .with_unit("{packet}") + .build(), + network_io_errors_recoverable: meter + .u64_counter(NETWORK_IO_ERRORS_RECOVERABLE) + .with_description("Recoverable IO errors encountered by the proxy") + .with_unit("{error}") + .build(), + network_io_errors_unrecoverable: meter + .u64_counter(NETWORK_IO_ERRORS_UNRECOVERABLE) + .with_description("Unrecoverable IO errors encountered by the proxy") + .with_unit("{error}") + .build(), + } + } + + pub fn record_session_duration(&self, duration_secs: f64) { + self.session_duration.record(duration_secs, &[]); + } + + pub fn count_bytes(&self, dir: &NetworkDirection, peer: &Peer, byte_count: u64) { + self.network_io_bytes.add( + byte_count, + &[ + KeyValue::new(NETWORK_IO_DIRECTION, dir.to_string()), + KeyValue::new(NETWORK_PEER_ROLE, peer.to_string()), + ], + ); + } + + pub fn count_packet(&self, dir: &NetworkDirection, peer: &Peer) { + self.network_io_packets.add( + 1, + &[ + KeyValue::new(NETWORK_IO_DIRECTION, dir.to_string()), + KeyValue::new(NETWORK_PEER_ROLE, peer.to_string()), + ], + ); + } + + pub fn count_dropped_packet(&self, peer: &Peer) { + self.network_io_packets_dropped.add( + 1, + &[ + KeyValue::new(NETWORK_IO_DIRECTION, NetworkDirection::Receive.to_string()), + KeyValue::new(NETWORK_PEER_ROLE, peer.to_string()), + ], + ); + } + + pub fn count_io_error(&self, dir: &NetworkDirection, peer: &Peer, recoverable: bool) { + let metric = if recoverable { + &self.network_io_errors_recoverable + } else { + &self.network_io_errors_unrecoverable + }; + metric.add( + 1, + &[ + KeyValue::new(NETWORK_IO_DIRECTION, dir.to_string()), + KeyValue::new(NETWORK_PEER_ROLE, peer.to_string()), + ], + ); + } +} + +pub enum Peer { + Client, + Backend, +} +impl Display for Peer { + fn fmt(&self, fmt: &mut Formatter) -> fmt::Result { + match self { + Self::Client => fmt.write_str("client"), + Self::Backend => fmt.write_str("backend"), + } + } +} + +pub enum NetworkDirection { + Transmit, + Receive, +} +impl Display for NetworkDirection { + fn fmt(&self, fmt: &mut Formatter) -> fmt::Result { + match self { + Self::Transmit => fmt.write_str("transmit"), + Self::Receive => fmt.write_str("receive"), + } + } +} + +pub fn init_metrics( + config: &ProxyConfig, + sessions: Arc>, +) -> Result<(Arc, SdkMeterProvider), ExporterBuildError> { + let export_config = ExportConfig { + endpoint: Some(config.otel_endpoint.clone()), + ..Default::default() + }; + + let exporter = MetricExporter::builder() + .with_tonic() + .with_export_config(export_config) + .build()?; + + let reader = PeriodicReader::builder(exporter) + .with_interval(Duration::from_secs(30)) + .build(); + + let resource = Resource::builder() + .with_attributes(vec![ + KeyValue::new(attribute::SERVICE_NAME, config.service_name.clone()), + KeyValue::new(attribute::SERVICE_VERSION, env!("CARGO_PKG_VERSION")), + KeyValue::new( + attribute::DEPLOYMENT_ENVIRONMENT_NAME, + config.environment.clone(), + ), + ]) + .build(); + + let meter_provider = SdkMeterProvider::builder() + .with_reader(reader) + .with_resource(resource) + .build(); + + Ok(( + Arc::new(ProxyMetrics::new(&meter_provider, sessions)), + meter_provider, + )) +}