1
- use futures:: { future, StreamExt } ;
1
+ use futures:: { future, stream , StreamExt } ;
2
2
use k8s_openapi:: api:: {
3
3
apps:: v1:: Deployment ,
4
4
core:: v1:: { ConfigMap , Secret } ,
5
5
} ;
6
6
use kube:: {
7
+ api:: { ApiResource , DynamicObject , GroupVersionKind } ,
8
+ core:: TypedResource ,
7
9
runtime:: {
8
- reflector,
9
- reflector:: { ObjectRef , Store } ,
10
- watcher, WatchStreamExt ,
10
+ reflector:: {
11
+ store:: { CacheWriter , Writer } ,
12
+ ObjectRef , Store ,
13
+ } ,
14
+ watcher:: { self , dynamic_watcher} ,
15
+ WatchStreamExt ,
11
16
} ,
12
- Api , Client ,
17
+ Api , Client , Resource ,
13
18
} ;
19
+ use parking_lot:: RwLock ;
20
+ use serde:: de:: DeserializeOwned ;
14
21
use std:: sync:: Arc ;
15
22
use tracing:: * ;
16
23
17
- // This does not work because Resource trait is not dyn safe.
18
- /*
19
- use std::any::TypeId;
20
24
use std:: collections:: HashMap ;
21
- use k8s_openapi::NamespaceResourceScope;
22
- use kube::api::{Resource, ResourceExt};
23
- struct MultiStore {
24
- stores: HashMap<TypeId, Store<dyn Resource<DynamicType = (), Scope = NamespaceResourceScope>>>,
25
- }
26
- impl MultiStore {
27
- fn get<K: Resource<DynamicType = ()>>(&self, name: &str, ns: &str) -> Option<Arc<K>> {
28
- let oref = ObjectRef::<K>::new(name).within(ns);
29
- if let Some(store) = self.stores.get(&TypeId::of::<K>()) {
30
- store.get(oref)
31
- } else {
32
- None
33
- }
34
- }
35
- }*/
36
25
37
- // explicit store can work
26
+ type Cache = Arc < RwLock < HashMap < GroupVersionKind , Writer < DynamicObject > > > > ;
27
+
28
+ #[ derive( Default , Clone ) ]
29
+ struct MultiWriter {
30
+ store : Cache ,
31
+ buffer : Cache ,
32
+ }
33
+
34
+ #[ derive( Default , Clone ) ]
38
35
struct MultiStore {
39
- deploys : Store < Deployment > ,
40
- cms : Store < ConfigMap > ,
41
- secs : Store < Secret > ,
36
+ store : Cache ,
42
37
}
43
- // but using generics to help out won't because the K needs to be concretised
44
- /*
45
- impl MultiStore {
46
- fn get<K: Resource<DynamicType = ()>>(&self, name: &str, ns: &str) -> Option<Arc<Option<K>>> {
47
- let oref = ObjectRef::<K>::new(name).within(ns);
48
- let kind = K::kind(&()).to_owned();
49
- match kind.as_ref() {
50
- "Deployment" => self.deploys.get(&ObjectRef::new(name).within(ns)),
51
- "ConfigMap" => self.cms.get(&ObjectRef::new(name).within(ns)),
52
- "Secret" => self.secs.get(&ObjectRef::new(name).within(ns)),
53
- _ => None,
38
+
39
+ impl MultiWriter {
40
+ fn as_reader ( & self ) -> MultiStore {
41
+ MultiStore {
42
+ store : self . store . clone ( ) ,
54
43
}
55
- None
56
44
}
57
45
}
58
- */
59
- // so left with this
60
46
61
47
impl MultiStore {
62
- fn get_deploy ( & self , name : & str , ns : & str ) -> Option < Arc < Deployment > > {
63
- self . deploys . get ( & ObjectRef :: < Deployment > :: new ( name) . within ( ns) )
48
+ fn get < K : Resource < DynamicType = impl Default > + DeserializeOwned + Clone > (
49
+ & self ,
50
+ name : & str ,
51
+ ns : & str ,
52
+ ) -> Option < Arc < K > > {
53
+ let oref = ObjectRef :: < K > :: new ( name) . within ( ns) . erase ( ) ;
54
+ let store = self . get_store :: < K > ( ) ?;
55
+ let obj = store. get ( & oref) ?. as_ref ( ) . clone ( ) ;
56
+ obj. try_parse ( ) . ok ( ) . map ( Arc :: new)
64
57
}
65
58
66
- fn get_secret ( & self , name : & str , ns : & str ) -> Option < Arc < Secret > > {
67
- self . secs . get ( & ObjectRef :: < Secret > :: new ( name) . within ( ns) )
59
+ fn get_store < K : Resource < DynamicType = impl Default > + DeserializeOwned + Clone > (
60
+ & self ,
61
+ ) -> Option < Store < DynamicObject > > {
62
+ Some ( self . store . read ( ) . get ( & K :: gvk ( & Default :: default ( ) ) ) ?. as_reader ( ) )
68
63
}
64
+ }
69
65
70
- fn get_cm ( & self , name : & str , ns : & str ) -> Option < Arc < ConfigMap > > {
71
- self . cms . get ( & ObjectRef :: < ConfigMap > :: new ( name) . within ( ns) )
66
+ impl CacheWriter < DynamicObject > for MultiWriter {
67
+ /// Applies a single watcher event to the store
68
+ fn apply_watcher_event ( & mut self , event : & watcher:: Event < DynamicObject > ) {
69
+ match event {
70
+ watcher:: Event :: Init | watcher:: Event :: InitDone ( None ) => { }
71
+ watcher:: Event :: Apply ( obj) | watcher:: Event :: Delete ( obj) => {
72
+ let mut stores = self . store . write ( ) ;
73
+ if stores. get ( & obj. gvk ( ) ) . is_none ( ) {
74
+ let store = Writer :: new ( ApiResource :: from_gvk ( & obj. gvk ( ) ) ) ;
75
+ stores. insert ( obj. gvk ( ) , store) ;
76
+ } ;
77
+ if let Some ( store) = stores. get_mut ( & obj. gvk ( ) ) {
78
+ store. apply_watcher_event ( event) ;
79
+ } ;
80
+ }
81
+ watcher:: Event :: InitApply ( obj) => {
82
+ let mut buffer = self . buffer . write ( ) ;
83
+ if buffer. get ( & obj. gvk ( ) ) . is_none ( ) {
84
+ let store = Writer :: new ( ApiResource :: from_gvk ( & obj. gvk ( ) ) ) ;
85
+ buffer. insert ( obj. gvk ( ) , store) ;
86
+ } ;
87
+ if let Some ( store) = buffer. get_mut ( & obj. gvk ( ) ) {
88
+ store. apply_watcher_event ( event) ;
89
+ } ;
90
+ }
91
+ watcher:: Event :: InitDone ( Some ( obj) ) => {
92
+ let mut buffer = self . buffer . write ( ) ;
93
+ if let Some ( mut store) = buffer. remove ( & obj. gvk ( ) ) {
94
+ store. apply_watcher_event ( event) ;
95
+ self . store . write ( ) . insert ( obj. gvk ( ) , store) ;
96
+ }
97
+ }
98
+ }
72
99
}
73
100
}
74
101
@@ -77,60 +104,43 @@ async fn main() -> anyhow::Result<()> {
77
104
tracing_subscriber:: fmt:: init ( ) ;
78
105
let client = Client :: try_default ( ) . await ?;
79
106
80
- let deploys: Api < Deployment > = Api :: default_namespaced ( client. clone ( ) ) ;
81
- let cms: Api < ConfigMap > = Api :: default_namespaced ( client. clone ( ) ) ;
82
- let secret: Api < Secret > = Api :: default_namespaced ( client. clone ( ) ) ;
83
-
84
- let ( dep_reader, dep_writer) = reflector:: store :: < Deployment > ( ) ;
85
- let ( cm_reader, cm_writer) = reflector:: store :: < ConfigMap > ( ) ;
86
- let ( sec_reader, sec_writer) = reflector:: store :: < Secret > ( ) ;
107
+ // multistore
108
+ let combo_stream = stream:: select_all ( vec ! [
109
+ dynamic_watcher( Api :: <Deployment >:: all( client. clone( ) ) , Default :: default ( ) ) . boxed( ) ,
110
+ dynamic_watcher( Api :: <ConfigMap >:: all( client. clone( ) ) , Default :: default ( ) ) . boxed( ) ,
111
+ dynamic_watcher( Api :: <Secret >:: all( client. clone( ) ) , Default :: default ( ) ) . boxed( ) ,
112
+ ] ) ;
87
113
88
- let cfg = watcher:: Config :: default ( ) ;
89
- let dep_watcher = watcher ( deploys, cfg. clone ( ) )
90
- . reflect ( dep_writer)
91
- . applied_objects ( )
92
- . for_each ( |_| future:: ready ( ( ) ) ) ;
93
- let cm_watcher = watcher ( cms, cfg. clone ( ) )
94
- . reflect ( cm_writer)
114
+ let multi_writer = MultiWriter :: default ( ) ;
115
+ let watcher = combo_stream
116
+ . reflect ( multi_writer. clone ( ) )
95
117
. applied_objects ( )
96
118
. for_each ( |_| future:: ready ( ( ) ) ) ;
97
- let sec_watcher = watcher ( secret, cfg)
98
- . reflect ( sec_writer)
99
- . applied_objects ( )
100
- . for_each ( |_| future:: ready ( ( ) ) ) ;
101
- // poll these forever
102
-
103
- // multistore
104
- let stores = MultiStore {
105
- deploys : dep_reader,
106
- cms : cm_reader,
107
- secs : sec_reader,
108
- } ;
109
119
110
120
// simulate doing stuff with the stores from some other thread
111
121
tokio:: spawn ( async move {
112
- // Show state every 5 seconds of watching
113
- info ! ( "waiting for them to be ready" ) ;
114
- stores. deploys . wait_until_ready ( ) . await . unwrap ( ) ;
115
- stores. cms . wait_until_ready ( ) . await . unwrap ( ) ;
116
- stores. secs . wait_until_ready ( ) . await . unwrap ( ) ;
117
- info ! ( "stores initialised" ) ;
118
122
// can use helper accessors
119
- info ! (
120
- "common cm: {:?}" ,
121
- stores. get_cm( "kube-root-ca.crt" , "kube-system" ) . unwrap( )
122
- ) ;
123
123
loop {
124
124
tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 5 ) ) . await ;
125
+ info ! (
126
+ "cache content: {:?}" ,
127
+ multi_writer. as_reader( ) . store. read( ) . keys( )
128
+ ) ;
129
+ info ! (
130
+ "common cm: {:?}" ,
131
+ multi_writer
132
+ . as_reader( )
133
+ . get:: <ConfigMap >( "kube-root-ca.crt" , "kube-system" )
134
+ ) ;
125
135
// access individual sub stores
126
- info ! ( "Current deploys count: {}" , stores. deploys. state( ) . len( ) ) ;
136
+ if let Some ( deploys) = multi_writer. as_reader ( ) . get_store :: < Deployment > ( ) {
137
+ info ! ( "Current deploys count: {}" , deploys. state( ) . len( ) ) ;
138
+ }
127
139
}
128
140
} ) ;
129
- // info!("long watches starting");
141
+ info ! ( "long watches starting" ) ;
130
142
tokio:: select! {
131
- r = dep_watcher => println!( "dep watcher exit: {r:?}" ) ,
132
- r = cm_watcher => println!( "cm watcher exit: {r:?}" ) ,
133
- r = sec_watcher => println!( "sec watcher exit: {r:?}" ) ,
143
+ r = watcher => println!( "watcher exit: {r:?}" ) ,
134
144
}
135
145
136
146
Ok ( ( ) )
0 commit comments