diff --git a/Cargo.lock b/Cargo.lock index d7b47e8ef2..fcc42c11fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -216,6 +216,18 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-broadcast" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435a87a52755b8f27fcf321ac4f04b2802e337c8c4872923137471ec39c37532" +dependencies = [ + "event-listener", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-nats" version = "0.40.0" @@ -556,6 +568,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "backon" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" +dependencies = [ + "fastrand", + "gloo-timers", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.75" @@ -1212,6 +1235,15 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "config" version = "0.15.17" @@ -2289,6 +2321,8 @@ dependencies = [ "futures", "humantime", "jsonschema", + "k8s-openapi", + "kube", "local-ip-address", "log", "nid", @@ -2305,6 +2339,7 @@ dependencies = [ "regex", "reqwest 0.12.23", "rstest 0.23.0", + "schemars 1.0.4", "serde", "serde_json", "socket2 0.5.10", @@ -2533,6 +2568,27 @@ dependencies = [ "tower-service", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "eventsource-stream" version = "0.2.3" @@ -3236,6 +3292,18 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.3.27" @@ -3404,6 +3472,26 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "home" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc627f471c528ff0c4a49e1d5e60450c8f6461dd6d10ba9dcd3a61d3dff7728d" +dependencies = [ + "windows-sys 0.61.0", +] + +[[package]] +name = "hostname" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56f203cd1c76362b69e3863fd987520ac36cf70a8c92627449b2f64a8cf7d65" +dependencies = [ + "cfg-if 1.0.3", + "libc", + "windows-link 0.1.3", +] + [[package]] name = "hound" version = "3.5.1" @@ -3576,6 +3664,7 @@ dependencies = [ "http 1.3.1", "hyper 1.7.0", "hyper-util", + "log", "rustls", "rustls-native-certs 0.8.1", "rustls-pki-types", @@ -4106,6 +4195,18 @@ dependencies = [ "unicode-general-category", ] +[[package]] +name = "json-patch" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f300e415e2134745ef75f04562dd0145405c2f7fd92065db029ac4b16b57fe90" +dependencies = [ + "jsonptr", + "serde", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "json5" version = "0.4.1" @@ -4117,6 +4218,29 @@ dependencies = [ "serde", ] +[[package]] +name = "jsonpath-rust" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c00ae348f9f8fd2d09f82a98ca381c60df9e0820d8d79fce43e649b4dc3128b" +dependencies = [ + "pest", + "pest_derive", + "regex", + "serde_json", + "thiserror 2.0.16", +] + +[[package]] +name = "jsonptr" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5a3cc660ba5d72bce0b3bb295bf20847ccbb40fd423f3f05b61273672e561fe" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "jsonschema" version = "0.17.1" @@ -4157,6 +4281,19 @@ dependencies = [ "rayon", ] +[[package]] +name = "k8s-openapi" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d13f06d5326a915becaffabdfab75051b8cdc260c2a5c06c0e90226ede89a692" +dependencies = [ + "base64 0.22.1", + "chrono", + "schemars 1.0.4", + "serde", + "serde_json", +] + [[package]] name = "kernel32-sys" version = "0.2.2" @@ -4167,6 +4304,115 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "kube" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48e7bb0b6a46502cc20e4575b6ff401af45cfea150b34ba272a3410b78aa014e" +dependencies = [ + "k8s-openapi", + "kube-client", + "kube-core", + "kube-derive", + "kube-runtime", +] + +[[package]] +name = "kube-client" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4987d57a184d2b5294fdad3d7fc7f278899469d21a4da39a8f6ca16426567a36" +dependencies = [ + "base64 0.22.1", + "bytes", + "chrono", + "either", + "futures", + "home", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.7.0", + "hyper-rustls", + "hyper-timeout", + "hyper-util", + "jsonpath-rust", + "k8s-openapi", + "kube-core", + "pem", + "rustls", + "secrecy", + "serde", + "serde_json", + "serde_yaml", + "thiserror 2.0.16", + "tokio", + "tokio-util", + "tower 0.5.2", + "tower-http", + "tracing", +] + +[[package]] +name = "kube-core" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "914bbb770e7bb721a06e3538c0edd2babed46447d128f7c21caa68747060ee73" +dependencies = [ + "chrono", + "derive_more 2.0.1", + "form_urlencoded", + "http 1.3.1", + "json-patch", + "k8s-openapi", + "schemars 1.0.4", + "serde", + "serde-value", + "serde_json", + "thiserror 2.0.16", +] + +[[package]] +name = "kube-derive" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03dee8252be137772a6ab3508b81cd797dee62ee771112a2453bc85cbbe150d2" +dependencies = [ + "darling 0.21.3", + "proc-macro2", + "quote", + "serde", + "serde_json", + "syn 2.0.106", +] + +[[package]] +name = "kube-runtime" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6aea4de4b562c5cc89ab10300bb63474ae1fa57ff5a19275f2e26401a323e3fd" +dependencies = [ + "ahash", + "async-broadcast", + "async-stream", + "backon", + "educe", + "futures", + "hashbrown 0.15.5", + "hostname", + "json-patch", + "k8s-openapi", + "kube-client", + "parking_lot", + "pin-project", + "serde", + "serde_json", + "thiserror 2.0.16", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "lalrpop-util" version = "0.20.2" @@ -4760,7 +5006,7 @@ dependencies = [ "num-traits", "objc", "once_cell", - "ordered-float", + "ordered-float 5.0.0", "parking_lot", "radix_trie", "rand 0.9.2", @@ -5573,6 +5819,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-float" version = "5.0.0" @@ -5614,6 +5869,12 @@ dependencies = [ "serde", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.5" @@ -5672,6 +5933,16 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "pem" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" +dependencies = [ + "base64 0.22.1", + "serde_core", +] + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -7186,7 +7457,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fbf2ae1b8bc8e02df939598064d22402220cd5bbcca1c76f7d6a310974d5615" dependencies = [ "dyn-clone", - "schemars_derive", + "schemars_derive 0.8.22", "serde", "serde_json", ] @@ -7211,6 +7482,7 @@ checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0" dependencies = [ "dyn-clone", "ref-cast", + "schemars_derive 1.0.4", "serde", "serde_json", ] @@ -7227,6 +7499,18 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "schemars_derive" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33d020396d1d138dc19f1165df7545479dcd58d93810dc5d646a16e55abefa80" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.106", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -7368,6 +7652,16 @@ dependencies = [ "typeid", ] +[[package]] +name = "serde-value" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" +dependencies = [ + "ordered-float 2.10.1", + "serde", +] + [[package]] name = "serde_cbor" version = "0.11.2" @@ -8547,6 +8841,7 @@ dependencies = [ "futures-sink", "futures-util", "pin-project-lite", + "slab", "tokio", ] @@ -8863,12 +9158,14 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" dependencies = [ + "base64 0.22.1", "bitflags 2.9.4", "bytes", "futures-util", "http 1.3.1", "http-body 1.0.1", "iri-string", + "mime", "pin-project-lite", "tower 0.5.2", "tower-layer", diff --git a/examples/custom_backend/hello_world/client.py b/examples/custom_backend/hello_world/client.py index 71643d3988..4292091efb 100644 --- a/examples/custom_backend/hello_world/client.py +++ b/examples/custom_backend/hello_world/client.py @@ -24,7 +24,7 @@ async def worker(runtime: DistributedRuntime): # Get endpoint endpoint = ( - runtime.namespace("hello_world").component("backend").endpoint("generate") + runtime.namespace("test").component("backend").endpoint("generate") ) # Create client and wait for service to be ready diff --git a/examples/custom_backend/hello_world/hello_world.py b/examples/custom_backend/hello_world/hello_world.py index 0761a19aa6..4bd63fe83e 100644 --- a/examples/custom_backend/hello_world/hello_world.py +++ b/examples/custom_backend/hello_world/hello_world.py @@ -23,7 +23,7 @@ async def content_generator(request: str): @dynamo_worker() async def worker(runtime: DistributedRuntime): - namespace_name = "hello_world" + namespace_name = "test" component_name = "backend" endpoint_name = "generate" diff --git a/lib/bindings/python/Cargo.lock b/lib/bindings/python/Cargo.lock index 4586591b15..d523b245cb 100644 --- a/lib/bindings/python/Cargo.lock +++ b/lib/bindings/python/Cargo.lock @@ -55,6 +55,12 @@ dependencies = [ "equator", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -173,6 +179,18 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "async-broadcast" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435a87a52755b8f27fcf321ac4f04b2802e337c8c4872923137471ec39c37532" +dependencies = [ + "event-listener", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-channel" version = "2.5.0" @@ -467,6 +485,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "backon" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" +dependencies = [ + "fastrand", + "gloo-timers", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.75" @@ -1153,8 +1182,18 @@ version = "0.20.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.20.11", + "darling_macro 0.20.11", +] + +[[package]] +name = "darling" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0" +dependencies = [ + "darling_core 0.21.3", + "darling_macro 0.21.3", ] [[package]] @@ -1171,13 +1210,38 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "darling_core" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.106", +] + [[package]] name = "darling_macro" version = "0.20.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" dependencies = [ - "darling_core", + "darling_core 0.20.11", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "darling_macro" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" +dependencies = [ + "darling_core 0.21.3", "quote", "syn 2.0.106", ] @@ -1274,7 +1338,7 @@ version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" dependencies = [ - "darling", + "darling 0.20.11", "proc-macro2", "quote", "syn 2.0.106", @@ -1296,7 +1360,16 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" dependencies = [ - "derive_more-impl", + "derive_more-impl 1.0.0", +] + +[[package]] +name = "derive_more" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678" +dependencies = [ + "derive_more-impl 2.0.1", ] [[package]] @@ -1311,6 +1384,17 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "derive_more-impl" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "dialoguer" version = "0.11.0" @@ -1614,6 +1698,8 @@ dependencies = [ "figment", "futures", "humantime", + "k8s-openapi", + "kube", "local-ip-address", "log", "nid", @@ -1628,6 +1714,7 @@ dependencies = [ "rand 0.9.2", "rayon", "regex", + "schemars 1.0.4", "serde", "serde_json", "socket2 0.5.10", @@ -2394,6 +2481,18 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.4.12" @@ -2448,6 +2547,8 @@ version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ + "allocator-api2", + "equivalent", "foldhash", ] @@ -2510,6 +2611,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "hostname" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56f203cd1c76362b69e3863fd987520ac36cf70a8c92627449b2f64a8cf7d65" +dependencies = [ + "cfg-if 1.0.3", + "libc", + "windows-link 0.1.3", +] + [[package]] name = "http" version = "1.3.1" @@ -2594,6 +2706,7 @@ dependencies = [ "http", "hyper", "hyper-util", + "log", "rustls", "rustls-native-certs 0.8.1", "rustls-pki-types", @@ -3067,6 +3180,18 @@ dependencies = [ "unicode-general-category", ] +[[package]] +name = "json-patch" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f300e415e2134745ef75f04562dd0145405c2f7fd92065db029ac4b16b57fe90" +dependencies = [ + "jsonptr", + "serde", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "json5" version = "0.4.1" @@ -3078,6 +3203,29 @@ dependencies = [ "serde", ] +[[package]] +name = "jsonpath-rust" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c00ae348f9f8fd2d09f82a98ca381c60df9e0820d8d79fce43e649b4dc3128b" +dependencies = [ + "pest", + "pest_derive", + "regex", + "serde_json", + "thiserror 2.0.16", +] + +[[package]] +name = "jsonptr" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5a3cc660ba5d72bce0b3bb295bf20847ccbb40fd423f3f05b61273672e561fe" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "jwalk" version = "0.8.1" @@ -3088,6 +3236,19 @@ dependencies = [ "rayon", ] +[[package]] +name = "k8s-openapi" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d13f06d5326a915becaffabdfab75051b8cdc260c2a5c06c0e90226ede89a692" +dependencies = [ + "base64 0.22.1", + "chrono", + "schemars 1.0.4", + "serde", + "serde_json", +] + [[package]] name = "kernel32-sys" version = "0.2.2" @@ -3098,6 +3259,115 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "kube" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48e7bb0b6a46502cc20e4575b6ff401af45cfea150b34ba272a3410b78aa014e" +dependencies = [ + "k8s-openapi", + "kube-client", + "kube-core", + "kube-derive", + "kube-runtime", +] + +[[package]] +name = "kube-client" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4987d57a184d2b5294fdad3d7fc7f278899469d21a4da39a8f6ca16426567a36" +dependencies = [ + "base64 0.22.1", + "bytes", + "chrono", + "either", + "futures", + "home", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-timeout", + "hyper-util", + "jsonpath-rust", + "k8s-openapi", + "kube-core", + "pem", + "rustls", + "secrecy", + "serde", + "serde_json", + "serde_yaml", + "thiserror 2.0.16", + "tokio", + "tokio-util", + "tower", + "tower-http", + "tracing", +] + +[[package]] +name = "kube-core" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "914bbb770e7bb721a06e3538c0edd2babed46447d128f7c21caa68747060ee73" +dependencies = [ + "chrono", + "derive_more 2.0.1", + "form_urlencoded", + "http", + "json-patch", + "k8s-openapi", + "schemars 1.0.4", + "serde", + "serde-value", + "serde_json", + "thiserror 2.0.16", +] + +[[package]] +name = "kube-derive" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03dee8252be137772a6ab3508b81cd797dee62ee771112a2453bc85cbbe150d2" +dependencies = [ + "darling 0.21.3", + "proc-macro2", + "quote", + "serde", + "serde_json", + "syn 2.0.106", +] + +[[package]] +name = "kube-runtime" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6aea4de4b562c5cc89ab10300bb63474ae1fa57ff5a19275f2e26401a323e3fd" +dependencies = [ + "ahash", + "async-broadcast", + "async-stream", + "backon", + "educe", + "futures", + "hashbrown 0.15.5", + "hostname", + "json-patch", + "k8s-openapi", + "kube-client", + "parking_lot", + "pin-project", + "serde", + "serde_json", + "thiserror 2.0.16", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "lalrpop-util" version = "0.20.2" @@ -3278,7 +3548,7 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d149aaa2965d70381709d9df4c7ee1fc0de1c614a4efc2ee356f5e43d68749f8" dependencies = [ - "derive_more", + "derive_more 1.0.0", "malachite", "num-integer", "num-traits", @@ -4025,6 +4295,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-multimap" version = "0.7.3" @@ -4117,6 +4396,16 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "pem" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" +dependencies = [ + "base64 0.22.1", + "serde_core", +] + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -5463,10 +5752,23 @@ checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0" dependencies = [ "dyn-clone", "ref-cast", + "schemars_derive", "serde", "serde_json", ] +[[package]] +name = "schemars_derive" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33d020396d1d138dc19f1165df7545479dcd58d93810dc5d646a16e55abefa80" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.106", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -5539,10 +5841,11 @@ checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc" [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" dependencies = [ + "serde_core", "serde_derive", ] @@ -5557,11 +5860,41 @@ dependencies = [ "typeid", ] +[[package]] +name = "serde-value" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" +dependencies = [ + "ordered-float", + "serde", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" dependencies = [ "proc-macro2", "quote", @@ -5667,7 +6000,7 @@ version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de90945e6565ce0d9a25098082ed4ee4002e047cb59892c318d66821e14bb30f" dependencies = [ - "darling", + "darling 0.20.11", "proc-macro2", "quote", "syn 2.0.106", @@ -6263,6 +6596,7 @@ dependencies = [ "futures-sink", "futures-util", "pin-project-lite", + "slab", "tokio", ] @@ -6492,12 +6826,14 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" dependencies = [ + "base64 0.22.1", "bitflags 2.9.3", "bytes", "futures-util", "http", "http-body", "iri-string", + "mime", "pin-project-lite", "tower", "tower-layer", @@ -6955,7 +7291,7 @@ version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7df16e474ef958526d1205f6dda359fdfab79d9aa6d54bafcb92dcd07673dca" dependencies = [ - "darling", + "darling 0.20.11", "once_cell", "proc-macro-error2", "proc-macro2", diff --git a/lib/bindings/python/rust/lib.rs b/lib/bindings/python/rust/lib.rs index f4449808f3..1f8dd80e42 100644 --- a/lib/bindings/python/rust/lib.rs +++ b/lib/bindings/python/rust/lib.rs @@ -685,9 +685,15 @@ impl Component { /// NATS specific stats/metrics call fn create_service<'p>(&self, py: Python<'p>) -> PyResult> { - let mut inner = self.inner.clone(); + let inner = self.inner.clone(); pyo3_async_runtimes::tokio::future_into_py(py, async move { + // Create the NATS service inner.add_stats_service().await.map_err(to_pyerr)?; + + // Feature flag: register with service discovery if enabled + if rs::config::is_service_discovery_enabled() { + inner.register_instance().await.map_err(to_pyerr)?; + } Ok(()) }) } diff --git a/lib/runtime/Cargo.toml b/lib/runtime/Cargo.toml index cd774ba16e..5c93e3110d 100644 --- a/lib/runtime/Cargo.toml +++ b/lib/runtime/Cargo.toml @@ -56,6 +56,9 @@ uuid = { workspace = true } url = { workspace = true } validator = { workspace = true } xxhash-rust = { workspace = true } +kube = { version = "2.0.1", default-features = false, features = ["runtime", "derive", "client", "rustls-tls", "aws-lc-rs"] } +k8s-openapi = { version = "0.26.0", features = ["latest", "schemars"] } +schemars = { version = "1" } arc-swap = { version = "1" } async-once-cell = { version = "0.5.4" } diff --git a/lib/runtime/src/component.rs b/lib/runtime/src/component.rs index 9bd2c4f39c..62583a25c1 100644 --- a/lib/runtime/src/component.rs +++ b/lib/runtime/src/component.rs @@ -171,6 +171,12 @@ pub struct Component { // fixed at startup time. is_static: bool, + /// Handle for this component's registration with service discovery + /// Wrapped in Arc so it can be set once and shared across all clones + #[builder(default = "Arc::new(async_once_cell::OnceCell::new())")] + #[educe(Debug(ignore))] + instance_handle: Arc>>, + /// This hierarchy's own metrics registry #[builder(default = "crate::MetricsRegistry::new()")] metrics_registry: crate::MetricsRegistry, @@ -268,6 +274,12 @@ impl Component { &self.labels } + /// Get the ServiceDiscovery instance handle for this component + pub fn instance_handle(&self) -> Result<&Box> { + let handle = self.instance_handle.get(); + handle.ok_or_else(|| error!("Component not registered with service discovery")) + } + pub fn endpoint(&self, endpoint: impl Into) -> Endpoint { Endpoint { component: self.clone(), @@ -375,7 +387,7 @@ impl Component { unimplemented!("collect_stats") } - pub async fn add_stats_service(&mut self) -> anyhow::Result<()> { + pub async fn add_stats_service(&self) -> anyhow::Result<()> { let service_name = self.service_name(); // Pre-check to save cost of creating the service, but don't hold the lock @@ -391,6 +403,7 @@ impl Component { anyhow::bail!("Service {service_name} already exists"); } + // Setup NATS service let Some(nats_client) = self.drt.nats_client() else { anyhow::bail!("Cannot create NATS service without NATS."); }; @@ -398,6 +411,7 @@ impl Component { let (nats_service, stats_reg) = service::build_nats_service(nats_client, self, description).await?; + // Update component registry let mut guard = self.drt.component_registry.inner.lock().await; if !guard.services.contains_key(&service_name) { // Normal case @@ -416,6 +430,25 @@ impl Component { if let Err(err) = self.start_scraping_nats_service_component_metrics() { tracing::debug!(service_name, error = %err, "Metrics registration failed"); } + + Ok(()) + } + + /// Register this component instance with service discovery + pub async fn register_instance(&self) -> anyhow::Result<()> { + // Register with service discovery + let instance_handle = { + let discovery = self.drt.service_discovery()?; + discovery + .register_instance(&self.namespace.name(), &self.name) + .await? + }; + + // Store instance handle on component (OnceCell ensures thread-safe init, shared across clones via Arc) + self.instance_handle + .get_or_init(async { instance_handle }) + .await; + Ok(()) } } diff --git a/lib/runtime/src/component/client.rs b/lib/runtime/src/component/client.rs index 987c5002d8..5049386d64 100644 --- a/lib/runtime/src/component/client.rs +++ b/lib/runtime/src/component/client.rs @@ -65,8 +65,31 @@ impl Client { }) } - // Client with auto-discover instances using etcd + // Client with auto-discover instances: ServiceDiscovery or ETCD pub(crate) async fn new_dynamic(endpoint: Endpoint) -> Result { + if crate::config::is_service_discovery_enabled() { + Self::new_dynamic_v2(endpoint).await + } else { + Self::new_dynamic_etcd(endpoint).await + } + } + + // V2: Client with auto-discover instances using ServiceDiscovery + pub(crate) async fn new_dynamic_v2(endpoint: Endpoint) -> Result { + let instance_source = Self::get_or_create_dynamic_instance_source_v2(&endpoint).await?; + + let client = Client { + endpoint, + instance_source: instance_source.clone(), + instance_avail: Arc::new(ArcSwap::from(Arc::new(vec![]))), + instance_free: Arc::new(ArcSwap::from(Arc::new(vec![]))), + }; + client.monitor_instance_source(); + Ok(client) + } + + // Original ETCD-based implementation + async fn new_dynamic_etcd(endpoint: Endpoint) -> Result { const INSTANCE_REFRESH_PERIOD: Duration = Duration::from_secs(1); // create live endpoint watcher @@ -181,9 +204,12 @@ impl Client { // TODO: this resets both tracked available and free instances client.instance_avail.store(Arc::new(instance_ids.clone())); - client.instance_free.store(Arc::new(instance_ids)); + client.instance_free.store(Arc::new(instance_ids.clone())); - tracing::debug!("instance source updated"); + tracing::debug!( + "Instance source updated with {} instances", + instance_ids.len() + ); if let Err(err) = rx.changed().await { tracing::error!("The Sender is dropped: {}", err); @@ -282,4 +308,124 @@ impl Client { instance_sources.insert(endpoint.clone(), Arc::downgrade(&instance_source)); Ok(instance_source) } + + /// V2: Create instance source using ServiceDiscovery interface + async fn get_or_create_dynamic_instance_source_v2( + endpoint: &Endpoint, + ) -> Result> { + let drt = endpoint.drt(); + let instance_sources = drt.instance_sources(); + let mut instance_sources = instance_sources.lock().await; + + // Check if we already have a watcher for this endpoint + if let Some(instance_source) = instance_sources.get(endpoint) { + if let Some(instance_source) = instance_source.upgrade() { + return Ok(instance_source); + } else { + instance_sources.remove(endpoint); + } + } + + let namespace = endpoint.component.namespace.name(); + let component = endpoint.component.name(); + + // Get service discovery interface and set up watch for instance changes + let discovery = drt.service_discovery()?; + let mut instance_watch = discovery.watch(&namespace, component).await?; + + // Watch automatically streams existing instances as ADDED events, so no need to call list_instances() + let (watch_tx, watch_rx) = tokio::sync::watch::channel(vec![]); + + let secondary = endpoint.component.drt.runtime.secondary().clone(); + let endpoint_name = endpoint.name.clone(); + let namespace_clone = namespace.clone(); + let component_clone = component.to_string(); + + // Spawn background task to process instance events + secondary.spawn(async move { + tracing::debug!("Starting ServiceDiscovery watcher for {}/{}", namespace_clone, component_clone); + + // Map instances by their discovery instance_id (string) for proper removal tracking + let mut map: HashMap = HashMap::new(); + + loop { + let event = tokio::select! { + _ = watch_tx.closed() => { + tracing::debug!("All watchers have closed; shutting down ServiceDiscovery watcher for {}/{}", namespace_clone, component_clone); + break; + } + event = instance_watch.recv() => { + match event { + Ok(event) => event, + Err(e) => { + tracing::debug!("Watch stream error: {}; shutting down ServiceDiscovery watcher for {}/{}", e, namespace_clone, component_clone); + break; + } + } + } + }; + + match event { + crate::discovery::InstanceEvent::Added(disc_instance) => { + if let Some(runtime_instance) = Self::convert_discovery_instance_to_runtime( + disc_instance.clone(), + &namespace_clone, + &component_clone, + &endpoint_name, + ) { + // Use discovery instance_id as key for proper removal tracking + map.insert(disc_instance.instance_id.clone(), runtime_instance); + tracing::debug!("Added instance {}, total instances: {}", disc_instance.instance_id, map.len()); + } + } + crate::discovery::InstanceEvent::Removed(instance_id) => { + if map.remove(&instance_id).is_some() { + tracing::debug!("Removed instance {}, total instances: {}", instance_id, map.len()); + } else { + tracing::warn!("Attempted to remove non-existent instance: {}", instance_id); + } + } + } + + let instances: Vec = map.values().cloned().collect(); + + if watch_tx.send(instances).is_err() { + tracing::debug!("Unable to send watch updates; shutting down ServiceDiscovery watcher for {}/{}", namespace_clone, component_clone); + break; + } + } + + tracing::debug!("Completed ServiceDiscovery watcher for {}/{}", namespace_clone, component_clone); + let _ = watch_tx.send(vec![]); + }); + + let instance_source = Arc::new(InstanceSource::Dynamic(watch_rx)); + instance_sources.insert(endpoint.clone(), Arc::downgrade(&instance_source)); + Ok(instance_source) + } + + /// Convert a discovery::Instance to a runtime Instance + fn convert_discovery_instance_to_runtime( + disc_instance: crate::discovery::Instance, + namespace: &str, + component: &str, + endpoint_name: &str, + ) -> Option { + let instance_id = crate::discovery::instance_id_to_u64(&disc_instance.instance_id); + + // Construct NATS subject (must match server side) + use crate::transports::nats::Slug; + let service_name_raw = format!("{}_{}", namespace, component); + let service_name = Slug::slugify(&service_name_raw).to_string(); + let subject = format!("{}.{}-{:x}", service_name, endpoint_name, instance_id); + let transport = TransportType::NatsTcp(subject); + + Some(Instance { + namespace: namespace.to_string(), + component: component.to_string(), + endpoint: endpoint_name.to_string(), + instance_id, + transport, + }) + } } diff --git a/lib/runtime/src/component/endpoint.rs b/lib/runtime/src/component/endpoint.rs index 60773de263..8414e4f49f 100644 --- a/lib/runtime/src/component/endpoint.rs +++ b/lib/runtime/src/component/endpoint.rs @@ -71,9 +71,26 @@ impl EndpointConfigBuilder { let lease = lease.or(endpoint.drt().primary_lease()); let lease_id = lease.as_ref().map(|l| l.id()).unwrap_or(0); + // Feature flag determines registration path: ServiceDiscovery (new) or ETCD lease (legacy) + let use_service_discovery = crate::config::is_service_discovery_enabled(); + + // Get instance_id: from ServiceDiscovery handle or ETCD lease_id + let instance_id = if use_service_discovery { + if let Ok(instance_handle) = endpoint.component.instance_handle() { + crate::discovery::instance_id_to_u64(instance_handle.instance_id()) + } else { + panic!( + "Service discovery enabled but component not registered. Call component.register_instance() first." + ); + } + } else { + lease_id + }; + tracing::debug!( - "Starting endpoint: {}", - endpoint.etcd_path_with_lease_id(lease_id) + "Starting endpoint: {} (instance_id: {})", + endpoint.etcd_path_with_lease_id(lease_id), + instance_id ); let service_name = endpoint.component.service_name(); @@ -107,12 +124,12 @@ impl EndpointConfigBuilder { if let Some(stats_handler) = stats_handler { handler_map .lock() - .insert(endpoint.subject_to(lease_id), stats_handler); + .insert(endpoint.subject_to(instance_id), stats_handler); } // creates an endpoint for the service let service_endpoint = group - .endpoint(&endpoint.name_with_id(lease_id)) + .endpoint(&endpoint.name_with_id(instance_id)) .await .map_err(|e| anyhow::anyhow!("Failed to start endpoint: {e}"))?; @@ -124,7 +141,9 @@ impl EndpointConfigBuilder { let component_name = endpoint.component.name.clone(); let endpoint_name = endpoint.name.clone(); let system_health = endpoint.drt().system_health.clone(); - let subject = endpoint.subject_to(lease_id); + let subject = endpoint.subject_to(instance_id); + + // Legacy path only: ETCD registration variables let etcd_path = endpoint.etcd_path_with_lease_id(lease_id); let etcd_client = endpoint.component.drt.etcd_client.clone(); @@ -134,7 +153,7 @@ impl EndpointConfigBuilder { component: component_name.clone(), endpoint: endpoint_name.clone(), namespace: namespace_name.clone(), - instance_id: lease_id, + instance_id, transport: TransportType::NatsTcp(subject.clone()), }; tracing::debug!(endpoint_name = %endpoint_name, "Registering endpoint health check target"); @@ -210,7 +229,7 @@ impl EndpointConfigBuilder { namespace_name_for_task, component_name_for_task, endpoint_name_for_task, - lease_id, + instance_id, system_health, ) .await; @@ -224,34 +243,57 @@ impl EndpointConfigBuilder { result }); - // make the components service endpoint discovery in etcd - - // client.register_service() - let info = Instance { - component: component_name.clone(), - endpoint: endpoint_name.clone(), - namespace: namespace_name.clone(), - instance_id: lease_id, - transport: TransportType::NatsTcp(subject), - }; - - let info = serde_json::to_vec_pretty(&info)?; + // Register endpoint for discovery: ServiceDiscovery.set_ready() or direct ETCD write + if use_service_discovery { + // New path: mark instance ready via ServiceDiscovery interface + if let Ok(instance_handle) = endpoint.component.instance_handle() { + if let Err(e) = instance_handle + .set_ready(crate::discovery::InstanceStatus::Ready) + .await + { + tracing::error!(component_name, endpoint_name, error = %e, "Unable to mark instance ready"); + cancel_token.cancel(); + return Err(error!( + "Unable to mark instance as ready. Check discovery service status" + )); + } + tracing::info!( + component_name, + endpoint_name, + instance_id, + "Instance marked ready" + ); + } + } else { + // Legacy path: direct ETCD registration with lease + let info = Instance { + component: component_name.clone(), + endpoint: endpoint_name.clone(), + namespace: namespace_name.clone(), + instance_id: lease_id, + transport: TransportType::NatsTcp(subject.clone()), + }; - if let Some(etcd_client) = &etcd_client - && let Err(e) = etcd_client - .kv_create(&etcd_path, info, Some(lease_id)) - .await - { - tracing::error!( + let info = serde_json::to_vec_pretty(&info)?; + + if let Some(etcd_client) = &etcd_client + && let Err(e) = etcd_client + .kv_create(&etcd_path, info, Some(lease_id)) + .await + { + tracing::error!(component_name, endpoint_name, error = %e, "Unable to register in ETCD"); + cancel_token.cancel(); + return Err(error!( + "Unable to register service for discovery. Check ETCD connectivity" + )); + } + tracing::info!( component_name, endpoint_name, - error = %e, - "Unable to register service for discovery" + lease_id, + etcd_path, + "Instance registered in ETCD" ); - cancel_token.cancel(); - return Err(error!( - "Unable to register service for discovery. Check discovery service status" - )); } task.await??; diff --git a/lib/runtime/src/component/service.rs b/lib/runtime/src/component/service.rs index b12e0d8668..df7728c686 100644 --- a/lib/runtime/src/component/service.rs +++ b/lib/runtime/src/component/service.rs @@ -20,7 +20,7 @@ pub type EndpointStatsHandler = Box serde_json::Value + Send + Sync + 'static>; pub const PROJECT_NAME: &str = "Dynamo"; -const SERVICE_VERSION: &str = env!("CARGO_PKG_VERSION"); +pub const SERVICE_VERSION: &str = env!("CARGO_PKG_VERSION"); pub async fn build_nats_service( nats_client: &crate::transports::nats::Client, diff --git a/lib/runtime/src/config.rs b/lib/runtime/src/config.rs index fbfc457fe7..bc951877fe 100644 --- a/lib/runtime/src/config.rs +++ b/lib/runtime/src/config.rs @@ -421,6 +421,15 @@ pub fn env_is_falsey(env: &str) -> bool { } } +/// Check if service discovery is enabled via USE_SERVICE_DISCOVERY environment variable +/// Returns true if USE_SERVICE_DISCOVERY is set to a truthy value, false otherwise +pub fn is_service_discovery_enabled() -> bool { + std::env::var("USE_SERVICE_DISCOVERY") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(false) +} + /// Check whether JSONL logging enabled /// Set the `DYN_LOGGING_JSONL` environment variable a [`is_truthy`] value pub fn jsonl_logging_enabled() -> bool { diff --git a/lib/runtime/src/discovery/kubernetes.rs b/lib/runtime/src/discovery/kubernetes.rs new file mode 100644 index 0000000000..0e2d2b2c3b --- /dev/null +++ b/lib/runtime/src/discovery/kubernetes.rs @@ -0,0 +1,440 @@ +use super::*; +use async_trait::async_trait; +use futures::StreamExt; +use k8s_openapi::api::discovery::v1::EndpointSlice; +use kube::{ + Client, ResourceExt, + api::{Api, ListParams}, + runtime::{WatchStreamExt, watcher}, +}; +use parking_lot::Mutex; +use serde_json::Value; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use tokio::sync::broadcast; + +/// Kubernetes-based implementation of ServiceDiscovery using EndpointSlices +#[derive(Clone)] +pub struct KubernetesServiceDiscovery { + client: Client, + event_senders: Arc>>>, +} + +impl KubernetesServiceDiscovery { + /// Create a new KubernetesServiceDiscovery + pub async fn new() -> Result { + let client = Client::try_default().await.map_err(|e| { + DiscoveryError::RegistrationError(format!("Failed to create k8s client: {}", e)) + })?; + + Ok(Self { + client, + event_senders: Arc::new(Mutex::new(HashMap::new())), + }) + } + + /// Get or create an event sender for a specific namespace/component pair + fn get_or_create_sender(&self, key: String) -> broadcast::Sender { + let mut senders = self.event_senders.lock(); + senders + .entry(key) + .or_insert_with(|| { + let (tx, _) = broadcast::channel(100); + tx + }) + .clone() + } + + /// Build label selector for namespace and component + fn build_label_selector(namespace: &str, component: &str) -> String { + format!( + "dynamo.namespace={},dynamo.component={}", + namespace, component + ) + } + + /// Watch key for a namespace/component pair + fn watch_key(namespace: &str, component: &str) -> String { + format!("{}:{}", namespace, component) + } + + /// Extract pod names and their ready status from EndpointSlices + fn extract_instances(endpoint_slices: &[EndpointSlice]) -> Vec { + let mut instances = Vec::new(); + + for slice in endpoint_slices { + for endpoint in &slice.endpoints { + // Only include ready endpoints + if let Some(conditions) = &endpoint.conditions + && conditions.ready != Some(true) { + continue; + } + + // Extract pod name from targetRef + if let Some(target_ref) = &endpoint.target_ref + && target_ref.kind.as_deref() == Some("Pod") + && let Some(pod_name) = &target_ref.name { + instances.push(Instance::new(pod_name.clone(), Value::Null)); + } + } + } + + instances + } + + /// Start watching EndpointSlices for a specific namespace/component + fn start_watch(&self, namespace: &str, component: &str, watch_namespace: &str) { + let client = self.client.clone(); + let label_selector = Self::build_label_selector(namespace, component); + let event_tx = self.get_or_create_sender(Self::watch_key(namespace, component)); + let watch_namespace = watch_namespace.to_string(); + let namespace_copy = namespace.to_string(); + let component_copy = component.to_string(); + + tokio::spawn(async move { + println!( + "[K8s Discovery] Starting EndpointSlice watch: namespace={}, component={}, k8s_namespace={}, labels={}", + namespace_copy, component_copy, watch_namespace, label_selector + ); + let api: Api = Api::namespaced(client, &watch_namespace); + let watch_config = watcher::Config::default().labels(&label_selector); + + let mut stream = watcher(api, watch_config).applied_objects().boxed(); + + // Track known ready instances across all slices + // Key: pod name, Value: slice name (for tracking which slice it came from) + let mut known_ready: HashMap = HashMap::new(); + // Track current state of all slices + let mut slice_instances: HashMap> = HashMap::new(); + + while let Some(result) = stream.next().await { + match result { + Ok(endpoint_slice) => { + let slice_name = endpoint_slice.name_any(); + + // Extract ready instances from this slice + let mut slice_ready = HashSet::new(); + + for endpoint in &endpoint_slice.endpoints { + // Check if endpoint is ready + let is_ready = endpoint + .conditions + .as_ref() + .and_then(|c| c.ready) + .unwrap_or(false); + + if is_ready + && let Some(target_ref) = &endpoint.target_ref + && target_ref.kind.as_deref() == Some("Pod") + && let Some(pod_name) = &target_ref.name { + slice_ready.insert(pod_name.clone()); + } + } + + // Update slice_instances map + slice_instances.insert(slice_name.clone(), slice_ready); + + // Rebuild the complete set of ready instances across all slices + // TODO: First pass, entire set of instances is rebuilt across Dynamo. + let mut current_ready: HashMap = HashMap::new(); + for (slice, pods) in &slice_instances { + for pod in pods { + current_ready.insert(pod.clone(), slice.clone()); + } + } + + // Find newly ready instances (Added events) + for pod_name in current_ready.keys() { + if !known_ready.contains_key(pod_name) { + println!( + "[K8s Discovery] ✅ Instance ADDED: pod_name={}, slice={}", + pod_name, slice_name + ); + let instance = Instance::new(pod_name.clone(), Value::Null); + let _ = event_tx.send(InstanceEvent::Added(instance)); + } + } + + // Find no-longer-ready instances (Removed events) + for pod_name in known_ready.keys() { + if !current_ready.contains_key(pod_name) { + println!( + "[K8s Discovery] ❌ Instance REMOVED: pod_name={}", + pod_name + ); + let _ = event_tx.send(InstanceEvent::Removed(pod_name.clone())); + } + } + + known_ready = current_ready; + } + Err(e) => { + eprintln!("[K8s Discovery] ⚠️ Error watching EndpointSlices: {}", e); + // Continue watching despite errors + } + } + } + }); + } +} + +/// Handle for a Kubernetes-registered instance +pub struct KubernetesInstanceHandle { + instance_id: String, +} + +impl KubernetesInstanceHandle { + /// Read pod name from environment variable + fn read_pod_name() -> Result { + std::env::var("POD_NAME").map_err(|_| { + DiscoveryError::RegistrationError("POD_NAME environment variable not set".to_string()) + }) + } +} + +#[async_trait] +impl InstanceHandle for KubernetesInstanceHandle { + fn instance_id(&self) -> &str { + &self.instance_id + } + + async fn set_metadata(&self, _metadata: Value) -> Result<()> { + // Metadata changes are not supported in this implementation + // The Kubernetes operator manages the pod metadata + Ok(()) + } + + async fn set_ready(&self, _status: InstanceStatus) -> Result<()> { + // Readiness is controlled by Kubernetes readiness probes + // The operator and pod's readiness probe determine the actual status + // This is a no-op as the pod's readiness is reflected in EndpointSlices + Ok(()) + } +} + +#[async_trait] +impl ServiceDiscovery for KubernetesServiceDiscovery { + async fn register_instance( + &self, + namespace: &str, + component: &str, + ) -> Result> { + // Read pod name from environment + let instance_id = KubernetesInstanceHandle::read_pod_name()?; + + println!( + "[K8s Discovery] 📝 Registered instance: namespace={}, component={}, pod_name={}", + namespace, component, instance_id + ); + + Ok(Box::new(KubernetesInstanceHandle { instance_id })) + } + + async fn list_instances(&self, namespace: &str, component: &str) -> Result> { + // Query all EndpointSlices with matching labels + let label_selector = Self::build_label_selector(namespace, component); + + // Get the current namespace from env var, or use "default" + let current_namespace = + std::env::var("POD_NAMESPACE").unwrap_or_else(|_| "default".to_string()); + + let api: Api = Api::namespaced(self.client.clone(), ¤t_namespace); + let lp = ListParams::default().labels(&label_selector); + + let slices = api.list(&lp).await.map_err(|e| { + DiscoveryError::MetadataError(format!("Failed to list EndpointSlices: {}", e)) + })?; + + let instances = Self::extract_instances(&slices.items); + + println!( + "[K8s Discovery] 📋 Listed {} instances: namespace={}, component={}, pods={:?}", + instances.len(), + namespace, + component, + instances.iter().map(|i| &i.instance_id).collect::>() + ); + + Ok(instances) + } + + async fn watch( + &self, + namespace: &str, + component: &str, + ) -> Result> { + let key = Self::watch_key(namespace, component); + + // Get or create event sender for this namespace/component + let event_tx = self.get_or_create_sender(key.clone()); + + // Check if we need to start a watcher + let needs_watch = { + let senders = self.event_senders.lock(); + senders.get(&key).map(|tx| tx.receiver_count()).unwrap_or(0) == 0 + }; + + if needs_watch { + // Get the current namespace from env var, or use "default" + let watch_namespace = + std::env::var("POD_NAMESPACE").unwrap_or_else(|_| "default".to_string()); + + println!( + "[K8s Discovery] 👀 Starting new EndpointSlice watcher: namespace={}, component={}", + namespace, component + ); + + self.start_watch(namespace, component, &watch_namespace); + } + + Ok(event_tx.subscribe()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn test_build_label_selector() { + let selector = KubernetesServiceDiscovery::build_label_selector("my-ns", "my-comp"); + assert_eq!(selector, "dynamo.namespace=my-ns,dynamo.component=my-comp"); + } + + #[test] + fn test_watch_key() { + let key = KubernetesServiceDiscovery::watch_key("ns1", "comp1"); + assert_eq!(key, "ns1:comp1"); + } + + /// Integration test for Kubernetes service discovery + /// + /// Prerequisites: + /// 1. Have a Kubernetes cluster accessible via kubectl + /// 2. Set KUBECONFIG environment variable to point to your kubeconfig file: + /// export KUBECONFIG=/path/to/your/kubeconfig + /// 3. Set POD_NAMESPACE environment variable (defaults to "default"): + /// export POD_NAMESPACE=default + /// 4. Create EndpointSlices in your cluster with the following labels: + /// dynamo.namespace=test + /// dynamo.component=worker + /// + /// Example EndpointSlice creation (see kubernetes/endpoint-slice-test.yaml): + /// kubectl apply -f kubernetes/endpoint-slice-test.yaml + /// + /// Run this test with: + /// cargo test --package dynamo-runtime test_kubernetes_discovery -- --ignored --nocapture + #[tokio::test] + #[ignore] // Ignore by default since it requires a running cluster + async fn test_kubernetes_discovery_list_and_watch() { + // Initialize tracing for debugging + let _ = tracing_subscriber::fmt() + .with_env_filter("debug") + .try_init(); + + // Create discovery client + let discovery = KubernetesServiceDiscovery::new() + .await + .expect("Failed to create Kubernetes discovery client. Make sure KUBECONFIG is set."); + + println!("✓ Successfully connected to Kubernetes cluster"); + + // Test list_instances + println!("\n--- Testing list_instances ---"); + let instances = discovery + .list_instances("test", "worker") + .await + .expect("Failed to list instances"); + + println!("Found {} instances:", instances.len()); + for instance in &instances { + println!(" - {}", instance.instance_id); + } + + // Test watch + println!("\n--- Testing watch ---"); + let mut watch = discovery + .watch("test", "worker") + .await + .expect("Failed to create watch"); + + println!("Watching for changes (will wait 30 seconds)..."); + println!("Now you can:"); + println!( + " 1. Scale up/down your deployment: kubectl scale deployment --replicas=N" + ); + println!(" 2. Delete pods: kubectl delete pod "); + println!(" 3. Create new EndpointSlices with matching labels"); + + // Wait for events for 30 seconds + let timeout = Duration::from_secs(30); + let start = tokio::time::Instant::now(); + + let mut event_count = 0; + while start.elapsed() < timeout { + match tokio::time::timeout(Duration::from_secs(1), watch.recv()).await { + Ok(Ok(event)) => { + event_count += 1; + match event { + InstanceEvent::Added(instance) => { + println!(" [ADDED] Instance: {}", instance.instance_id); + } + InstanceEvent::Removed(instance_id) => { + println!(" [REMOVED] Instance: {}", instance_id); + } + } + } + Ok(Err(e)) => { + println!(" Watch error: {:?}", e); + break; + } + Err(_) => { + // Timeout - no event received, continue waiting + } + } + } + + println!("\n--- Test Summary ---"); + println!("Total events received: {}", event_count); + + // Re-list to see final state + let final_instances = discovery + .list_instances("test", "worker") + .await + .expect("Failed to list instances"); + + println!("Final instance count: {}", final_instances.len()); + for instance in &final_instances { + println!(" - {}", instance.instance_id); + } + } + + /// Quick smoke test to verify connection to cluster + /// Run with: cargo test --package dynamo-runtime test_kubernetes_connection -- --ignored + #[tokio::test] + #[ignore] + async fn test_kubernetes_connection() { + let discovery = KubernetesServiceDiscovery::new() + .await + .expect("Failed to create Kubernetes discovery client"); + + // Just try to list - even if there are no results, connection works + let result = discovery.list_instances("test", "worker").await; + + match result { + Ok(instances) => { + println!( + "✓ Connected successfully! Found {} instances", + instances.len() + ); + for instance in &instances { + println!(" - {}", instance.instance_id); + } + } + Err(e) => { + panic!("Failed to list instances: {:?}", e); + } + } + } +} diff --git a/lib/runtime/src/discovery.rs b/lib/runtime/src/discovery/legacy.rs similarity index 100% rename from lib/runtime/src/discovery.rs rename to lib/runtime/src/discovery/legacy.rs diff --git a/lib/runtime/src/discovery/mod.rs b/lib/runtime/src/discovery/mod.rs new file mode 100644 index 0000000000..52a217d9a9 --- /dev/null +++ b/lib/runtime/src/discovery/mod.rs @@ -0,0 +1,100 @@ +use async_trait::async_trait; +use serde_json::Value; +use std::collections::HashMap; +use thiserror::Error; +use tokio::sync::broadcast; + +// Re-export legacy types +mod legacy; +pub use legacy::{DiscoveryClient, Lease}; + +pub mod kubernetes; + +#[derive(Error, Debug)] +pub enum DiscoveryError { + #[error("instance not found: {0}")] + InstanceNotFound(String), + #[error("metadata error: {0}")] + MetadataError(String), + #[error("registration error: {0}")] + RegistrationError(String), + #[error("watch error: {0}")] + WatchError(String), +} + +pub type Result = std::result::Result; + +/// Status of an instance +#[derive(Debug, Clone, PartialEq)] +pub enum InstanceStatus { + Ready, + NotReady, +} + +/// Event emitted when instance status changes +#[derive(Debug, Clone)] +pub enum InstanceEvent { + Added(Instance), + Removed(String), // instance_id +} + +/// Handle returned when registering a new instance +#[async_trait] +pub trait InstanceHandle: Send + Sync { + /// Returns the unique identifier for this instance + fn instance_id(&self) -> &str; + + /// Set metadata associated with this instance + async fn set_metadata(&self, metadata: Value) -> Result<()>; + + /// Set the ready status of this instance + async fn set_ready(&self, status: InstanceStatus) -> Result<()>; +} + +/// Represents a discovered instance +#[derive(Debug, Clone)] +pub struct Instance { + pub instance_id: String, + pub metadata: Value, +} + +// add a getter for the metadata +impl Instance { + pub fn new(instance_id: String, metadata: Value) -> Self { + Self { + instance_id, + metadata, + } + } +} + +/// Convert a string instance_id to u64, either by parsing or hashing +pub fn instance_id_to_u64(id: &str) -> u64 { + id.parse::().unwrap_or_else(|_| { + use std::hash::{Hash, Hasher}; + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + id.hash(&mut hasher); + hasher.finish() + }) +} + +/// Main service discovery interface +#[async_trait] +pub trait ServiceDiscovery: Send + Sync + 'static { + /// Register a new instance + async fn register_instance( + &self, + namespace: &str, + component: &str, + ) -> Result>; + + /// List all instances for a namespace/component + async fn list_instances(&self, namespace: &str, component: &str) -> Result>; + + /// Watch for instance changes + async fn watch( + &self, + namespace: &str, + component: &str, + ) -> Result>; +} diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index d111d5a4d5..5ee85bea11 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 pub use crate::component::Component; +use crate::discovery::{ServiceDiscovery, kubernetes::KubernetesServiceDiscovery}; use crate::storage::key_value_store::{ EtcdStore, KeyValueStore, KeyValueStoreEnum, KeyValueStoreManager, MemoryStore, }; @@ -84,6 +85,9 @@ impl DistributedRuntime { let nats_client_for_metrics = nats_client.clone(); + // Initialize service discovery only if feature flag is enabled + let service_discovery = Self::create_service_discovery().await?; + let distributed_runtime = Self { runtime, etcd_client, @@ -96,6 +100,7 @@ impl DistributedRuntime { instance_sources: Arc::new(Mutex::new(HashMap::new())), metrics_registry: crate::MetricsRegistry::new(), system_health, + service_discovery, }; if let Some(nats_client_for_metrics) = nats_client_for_metrics { @@ -203,6 +208,22 @@ impl DistributedRuntime { Self::new(runtime, config).await } + /// Create the appropriate ServiceDiscovery implementation based on environment. + /// Only initializes if USE_SERVICE_DISCOVERY feature flag is enabled. + async fn create_service_discovery() -> Result>>> { + if !crate::config::is_service_discovery_enabled() { + tracing::debug!("Service discovery disabled (USE_SERVICE_DISCOVERY not set)"); + return Ok(None); + } + + // Currently only Kubernetes discovery is supported + // In the future, this can be extended to support other backends + tracing::info!("Initializing Kubernetes-based service discovery"); + let discovery: Box = + Box::new(KubernetesServiceDiscovery::new().await?); + Ok(Some(Arc::new(discovery))) + } + pub fn runtime(&self) -> &Runtime { &self.runtime } @@ -291,6 +312,14 @@ impl DistributedRuntime { &self.store } + /// Get the service discovery interface. + /// Returns an error if service discovery is not initialized (USE_SERVICE_DISCOVERY not enabled). + pub fn service_discovery(&self) -> Result>> { + self.service_discovery.clone().ok_or_else(|| { + error!("Service discovery not initialized. Set USE_SERVICE_DISCOVERY=true to enable.") + }) + } + pub fn child_token(&self) -> CancellationToken { self.runtime.child_token() } diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index 9851d91925..d7397db733 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -17,7 +17,7 @@ pub use anyhow::{ use async_once_cell::OnceCell; -mod config; +pub mod config; pub use config::RuntimeConfig; pub mod component; @@ -112,6 +112,9 @@ pub struct DistributedRuntime { // Health Status system_health: Arc>, + // Service discovery interface (only initialized when USE_SERVICE_DISCOVERY is enabled) + service_discovery: Option>>, + // This hierarchy's own metrics registry metrics_registry: MetricsRegistry, }