Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shared stream interfaces #1449

Merged
merged 40 commits into from
Apr 18, 2024
Merged
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
15ba09d
Add a simple controller example
mateiidavid Jan 31, 2024
fc2fc42
Merge branch 'main' of github.com:kube-rs/kube into matei/arc-watcher
mateiidavid Jan 31, 2024
1657ddc
Add shared stream controller example
mateiidavid Feb 23, 2024
98255dc
Try to get something working
mateiidavid Feb 23, 2024
3685b9e
Rm my notes
mateiidavid Feb 23, 2024
683e77d
Results or objectefs
mateiidavid Feb 29, 2024
af7a309
Working shared stream
mateiidavid Feb 29, 2024
8d4d694
Different way of doing it
mateiidavid Feb 29, 2024
8534770
Switch to async_broadcast
mateiidavid Mar 2, 2024
9bbe8e1
Remove old, unused code
mateiidavid Mar 2, 2024
3f874ce
Remove unused examples
mateiidavid Mar 2, 2024
1e1e347
Gotta state machine this stuff
mateiidavid Mar 7, 2024
15f6e1d
Take 1 with try_recv
mateiidavid Mar 8, 2024
49eaf12
try_recv take 2
mateiidavid Mar 8, 2024
e7aad76
Working on names next
mateiidavid Mar 11, 2024
b6ff97f
Ok surprising this worked
mateiidavid Mar 13, 2024
7a570fd
Write tests and rename file to reflect dispatch
mateiidavid Mar 25, 2024
0256cb0
WIP
mateiidavid Mar 25, 2024
74f09f7
WIP 2
mateiidavid Mar 26, 2024
2d5a3b0
Start working on store side
mateiidavid Mar 26, 2024
0cb816b
Merge branch 'main' of github.com:kube-rs/kube into matei/fork-add-fu…
mateiidavid Mar 26, 2024
9bf111c
Tests are green
mateiidavid Mar 26, 2024
04a53d1
rm redundant trait bounds
mateiidavid Mar 26, 2024
6b5bd31
Update example with new interfaces
mateiidavid Mar 26, 2024
def0011
Add comments and a small todo
mateiidavid Mar 27, 2024
d69213a
Remove dispatch mod from utils
mateiidavid Mar 27, 2024
21dbbae
@clux's feedback
mateiidavid Apr 3, 2024
c7fc333
@clux's feedback
mateiidavid Apr 3, 2024
1b81f4c
Merge branch 'main' of github.com:kube-rs/kube into matei/fork-add-fu…
mateiidavid Apr 3, 2024
c6d1027
Fix tests & clippy warns
mateiidavid Apr 3, 2024
a30f2e6
Run fmt
mateiidavid Apr 3, 2024
fef5d83
Update examples/shared_stream_controllers.rs
mateiidavid Apr 8, 2024
44c441e
@clux's feedback on examples
mateiidavid Apr 11, 2024
8347103
Fix name in ns
mateiidavid Apr 15, 2024
9f7edd1
Add comments and feature flags
mateiidavid Apr 15, 2024
e2399f1
Merge branch 'main' of github.com:kube-rs/kube into matei/fork-add-fu…
mateiidavid Apr 16, 2024
a14d6b4
Fix CI checks
mateiidavid Apr 16, 2024
de2eda1
Run rustfmt
mateiidavid Apr 16, 2024
276b75e
@clux's feedback
mateiidavid Apr 17, 2024
eca6be1
Run fmt
mateiidavid Apr 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Rm my notes
Signed-off-by: Matei David <[email protected]>
mateiidavid committed Feb 23, 2024
commit 3685b9e703dea32d9c47036fbf7774c977837371
73 changes: 0 additions & 73 deletions examples/shared_stream_controllers.rs
Original file line number Diff line number Diff line change
@@ -43,79 +43,6 @@ async fn main() -> anyhow::Result<()> {
let reflector = shared_reflector(writer, watcher(pods.clone(), Default::default()));


// Building a controller
//
// 1. Controller uses a builder pattern to instantiate it with different
// configuration values. Each method called on it will return and consume
// Self.
//
// 2. When a controller is created (`new()` and `new_with()`) it will:
// - create a new store (and return a writeable handle).
// - create a new "trigger_selector"
// - create a "trigger_self"
// - a "trigger_self" accepts a reflector (created using a watch and
// our store handle)
//
// What are triggers:
//
// * Each controller has a trigger_selector. It is a stream union. It
// accepts a bunch of streams.
// - there are some internals to how this is done, which I guess I'll
// briefly cover, but `SelectAll` is a buffer.
// - e.g. it buffers futures. Calling `next()` on it will return
// whatever future resolves first.
// - how do we compose a buffer with streams? we turn every stream into a
// future. We turn them into a functional list, (car, cdr).
// - e.g. each stream is turned into a future, when polled it returns (Item,
// Tail)
// - this is a `StreamFuture`, its Poll function is easy to understand
// - and now, the buffer will drive them all into completion
// * A trigger itself is just a ReconciliationRequest. It is a stream that
// wraps an inner stream and maps the object to a ReconciliationRequest,
// something that triggers a reconciliation.
//
//
// Trigger helpers:
// * Trigger with: is an adapter that will take a stream and transform it
// according to a predicate / mapper.
//
//
// 3. When the controller is run:
// - It accepts a function that will reconcile. This is a callback that
// is called with an object that has triggered a reconciliation.
// - It will take a function that handles errors (an error callback)
//
// We can think of the applier as an actor. It owns the reconciliation
// state. It has a function (reconciler) and a stream, for each element
// in the stream, it applies the function.
//
// The complexity for the applier comes from managing streams and
// requeues. It has to deal with:
// * Shutdown signals
// * Requeues
//
// Requeuing is done through a buffer structured as a channel.
//
// The applier will start a "Runner". The runner is a scheduler that can
// debounce events. The runner wraps a stream that is handled by the
// applier;
// * Basically, our input stream and the rescheduled stream both get
// polled, events get sent to the runner.
// * Events are then debounced
// * Scheduler will get the relevant item from the store, and apply the
// callback to the object.
// - if it fails, it reschedules
// - if it does not, it moves on.
//
//
// So we create another stream from all streams that trigger
// reconciliations and the scheduler, and that stream of everything will
// end up being run through a debouncer.
//
// The scheduler is a bit tricky since it needs to deal with concurrent
// invocations and messages that have already been processed.
//

tokio::spawn(
Controller::for_shared_stream(reflector.applied_objects(), reader, ())
.with_config(config.clone())