Skip to content

Commit 9788c79

Browse files
rewrote
1 parent 739fbea commit 9788c79

File tree

12 files changed

+529
-510
lines changed

12 files changed

+529
-510
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package com.squareup.sample.thingy
2+
3+
import com.squareup.workflow1.NullableInitBox
4+
import com.squareup.workflow1.Updater
5+
import com.squareup.workflow1.WorkflowAction
6+
import com.squareup.workflow1.action
7+
8+
internal typealias StateTransformation = (MutableList<BackStackFrame>) -> Unit
9+
10+
internal class ActionQueue {
11+
12+
private val lock = Any()
13+
14+
private val stateTransformations = mutableListOf<StateTransformation>()
15+
private val outputEmissions = mutableListOf<Any?>()
16+
17+
fun enqueueStateTransformation(transformation: StateTransformation) {
18+
synchronized(lock) {
19+
stateTransformations += transformation
20+
}
21+
}
22+
23+
fun enqueueOutputEmission(value: Any?) {
24+
synchronized(lock) {
25+
outputEmissions += value
26+
}
27+
}
28+
29+
/**
30+
* @param onNextEmitOutputAction Called when the returned action is applied if there are more
31+
* outputs to emit. This callback should send another action into the sink to consume those
32+
* outputs.
33+
*/
34+
fun consumeToAction(onNextEmitOutputAction: () -> Unit): WorkflowAction<*, *, *> =
35+
action(name = { "ActionQueue.consumeToAction()" }) {
36+
consume(onNextEmitOutputAction)
37+
}
38+
39+
fun consumeActionsToStack(stack: MutableList<BackStackFrame>) {
40+
val transformations = synchronized(lock) {
41+
stateTransformations.toList().also {
42+
stateTransformations.clear()
43+
}
44+
}
45+
transformations.forEach {
46+
it(stack)
47+
}
48+
}
49+
50+
private fun Updater<Any?, BackStackState, Any?>.consume(
51+
onNextEmitOutputAction: () -> Unit
52+
) {
53+
var transformations: List<StateTransformation>
54+
var output = NullableInitBox<Any?>()
55+
var hasMoreOutputs = false
56+
57+
// The workflow runtime guarantees serialization of WorkflowActions, so we only need to guard
58+
// the actual reading of the lists in this class.
59+
synchronized(lock) {
60+
transformations = stateTransformations.toList()
61+
stateTransformations.clear()
62+
63+
if (outputEmissions.isNotEmpty()) {
64+
// Can't use removeFirst on JVM, it resolves to too-new JVM method.
65+
output = NullableInitBox(outputEmissions.removeAt(0))
66+
hasMoreOutputs = outputEmissions.isNotEmpty()
67+
}
68+
}
69+
70+
if (output.isInitialized) {
71+
setOutput(output)
72+
}
73+
74+
state = state.transformStack(transformations)
75+
76+
if (hasMoreOutputs) {
77+
onNextEmitOutputAction()
78+
}
79+
}
80+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package com.squareup.sample.thingy
2+
3+
import kotlinx.coroutines.CoroutineDispatcher
4+
import kotlinx.coroutines.Dispatchers
5+
import kotlinx.coroutines.Runnable
6+
import kotlinx.coroutines.awaitCancellation
7+
import kotlinx.coroutines.currentCoroutineContext
8+
import kotlin.coroutines.CoroutineContext
9+
10+
// TODO this is rough sketch, there are races
11+
internal class BackStackDispatcher : CoroutineDispatcher() {
12+
13+
private val lock = Any()
14+
private val tasks = mutableListOf<Runnable>()
15+
private var capturingTasks = false
16+
private var delegate: CoroutineDispatcher? = null
17+
private var onIdle: (() -> Unit)? = null
18+
19+
/**
20+
* Runs [block] then immediately runs all dispatched tasks before returning.
21+
*/
22+
fun runThenDispatchImmediately(block: () -> Unit) {
23+
synchronized(lock) {
24+
check(!capturingTasks) { "Cannot capture again" }
25+
capturingTasks = true
26+
}
27+
try {
28+
block()
29+
} finally {
30+
// Drain tasks before clearing capturing tasks so any tasks that dispatch are also captured.
31+
drainTasks()
32+
synchronized(lock) {
33+
capturingTasks = false
34+
}
35+
// Run one last time in case tasks were enqueued while clearing the capture flag.
36+
drainTasks()
37+
}
38+
}
39+
40+
/**
41+
* Suspends this coroutine indefinitely and dispatches any tasks to the current dispatcher.
42+
* [onIdle] is called after processing tasks when there are no more tasks to process.
43+
*/
44+
@OptIn(ExperimentalStdlibApi::class)
45+
suspend fun runDispatch(onIdle: () -> Unit): Nothing {
46+
val delegate = currentCoroutineContext()[CoroutineDispatcher] ?: Dispatchers.Default
47+
synchronized(lock) {
48+
check(this.delegate == null) { "Expected runDispatch to only be called once concurrently" }
49+
this.delegate = delegate
50+
this.onIdle = onIdle
51+
}
52+
53+
try {
54+
awaitCancellation()
55+
} finally {
56+
synchronized(lock) {
57+
this.delegate = null
58+
this.onIdle = null
59+
}
60+
}
61+
}
62+
63+
override fun dispatch(
64+
context: CoroutineContext,
65+
block: Runnable
66+
) {
67+
var isCapturing: Boolean
68+
var isFirstTask: Boolean
69+
var delegate: CoroutineDispatcher?
70+
var onIdle: (() -> Unit)?
71+
72+
synchronized(lock) {
73+
tasks += block
74+
isFirstTask = tasks.size == 1
75+
isCapturing = this.capturingTasks
76+
delegate = this.delegate
77+
onIdle = this.onIdle
78+
}
79+
80+
if (!isCapturing && delegate != null && onIdle != null && isFirstTask) {
81+
delegate!!.dispatch(context) {
82+
// Only run onIdle if work was actually done.
83+
if (drainTasks()) {
84+
onIdle!!()
85+
}
86+
}
87+
}
88+
}
89+
90+
/**
91+
* Returns true if any tasks were executed.
92+
*/
93+
private fun drainTasks(): Boolean {
94+
var didAnything = false
95+
var task = getNextTask()
96+
while (task != null) {
97+
didAnything = true
98+
task.run()
99+
task = getNextTask()
100+
}
101+
return didAnything
102+
}
103+
104+
private fun getNextTask(): Runnable? {
105+
synchronized(lock) {
106+
return tasks.removeFirstOrNull()
107+
}
108+
}
109+
}
Lines changed: 8 additions & 181 deletions
Original file line numberDiff line numberDiff line change
@@ -1,191 +1,18 @@
11
package com.squareup.sample.thingy
22

3-
import com.squareup.workflow1.Sink
43
import com.squareup.workflow1.StatefulWorkflow.RenderContext
5-
import com.squareup.workflow1.Workflow
6-
import com.squareup.workflow1.WorkflowAction
7-
import com.squareup.workflow1.WorkflowAction.Companion
84
import com.squareup.workflow1.ui.Screen
9-
import kotlinx.coroutines.CompletableDeferred
10-
import kotlinx.coroutines.CoroutineScope
11-
import kotlinx.coroutines.CoroutineStart.UNDISPATCHED
12-
import kotlinx.coroutines.Job
13-
import kotlinx.coroutines.cancel
14-
import kotlinx.coroutines.currentCoroutineContext
15-
import kotlinx.coroutines.ensureActive
16-
import kotlinx.coroutines.job
17-
import kotlinx.coroutines.launch
185

19-
internal sealed interface BackStackFrame<R> {
20-
fun cancelCaller()
21-
suspend fun awaitResult(): R
22-
suspend fun cancelSelf(): Nothing
23-
fun cancel()
24-
}
25-
26-
/**
27-
* Represents a call to [BackStackScope.showWorkflow].
28-
*/
29-
internal class WorkflowFrame<PropsT, OutputT, ChildPropsT, ChildOutputT, R> private constructor(
30-
private val workflow: Workflow<ChildPropsT, ChildOutputT, Screen>,
31-
private val props: ChildPropsT,
32-
private val callerJob: Job,
33-
private val frameScope: CoroutineScope,
34-
private val onOutput: suspend BackStackWorkflowScope.(ChildOutputT) -> R,
35-
private val actionSink: Sink<WorkflowAction<PropsT, BackStackState, OutputT>>,
36-
private val parent: BackStackFrame<*>?,
37-
private val result: CompletableDeferred<R>,
38-
) : BackStackFrame<R> {
39-
40-
constructor(
41-
workflow: Workflow<ChildPropsT, ChildOutputT, Screen>,
42-
initialProps: ChildPropsT,
43-
callerJob: Job,
44-
frameScope: CoroutineScope,
45-
onOutput: suspend BackStackWorkflowScope.(ChildOutputT) -> R,
46-
actionSink: Sink<WorkflowAction<PropsT, BackStackState, OutputT>>,
47-
parent: BackStackFrame<*>?,
48-
) : this(
49-
workflow = workflow,
50-
props = initialProps,
51-
callerJob = callerJob,
52-
frameScope = frameScope,
53-
onOutput = onOutput,
54-
actionSink = actionSink,
55-
parent = parent,
56-
result = CompletableDeferred(parent = frameScope.coroutineContext.job)
57-
)
58-
59-
fun copy(
60-
props: ChildPropsT = this.props,
61-
): WorkflowFrame<PropsT, OutputT, ChildPropsT, ChildOutputT, R> = WorkflowFrame(
62-
workflow = workflow,
63-
props = props,
64-
callerJob = callerJob,
65-
frameScope = frameScope,
66-
onOutput = onOutput,
67-
actionSink = actionSink,
68-
parent = parent,
69-
result = result
70-
)
71-
72-
override suspend fun awaitResult(): R = result.await()
73-
74-
override fun cancelCaller() {
75-
callerJob.cancel()
76-
}
77-
78-
private suspend fun finishWith(value: R): Nothing {
79-
result.complete(value)
80-
cancelSelf()
81-
}
82-
83-
override suspend fun cancelSelf(): Nothing {
84-
cancel()
85-
val currentContext = currentCoroutineContext()
86-
currentContext.cancel()
87-
currentContext.ensureActive()
88-
error("Nonsense")
89-
}
90-
91-
override fun cancel() {
92-
frameScope.cancel()
93-
}
94-
95-
fun renderWorkflow(
96-
context: RenderContext<PropsT, BackStackState, OutputT>
97-
): Screen = context.renderChild(
98-
child = workflow,
99-
props = props,
100-
handler = ::onOutput
101-
)
6+
internal interface BackStackFrame {
7+
val node: BackStackNode
1028

103-
private fun onOutput(output: ChildOutputT): WorkflowAction<PropsT, BackStackState, OutputT> {
104-
var canAcceptAction = true
105-
var action: WorkflowAction<PropsT, BackStackState, OutputT>? = null
106-
val sink = object : Sink<WorkflowAction<PropsT, BackStackState, OutputT>> {
107-
override fun send(value: WorkflowAction<PropsT, BackStackState, OutputT>) {
108-
val sendToSink = synchronized(result) {
109-
if (canAcceptAction) {
110-
action = value
111-
canAcceptAction = false
112-
false
113-
} else {
114-
true
115-
}
116-
}
117-
if (sendToSink) {
118-
actionSink.send(value)
119-
}
120-
}
121-
}
9+
val isIdle: Boolean
10+
get() = false
12211

123-
// Run synchronously until first suspension point since in many cases it will immediately
124-
// either call showWorkflow, finishWith, or goBack, and so then we can just return that action
125-
// immediately instead of needing a whole separate render pass.
126-
frameScope.launch(start = UNDISPATCHED) {
127-
val showScope = BackStackWorkflowScopeImpl(
128-
actionSink = sink,
129-
coroutineScope = this,
130-
thisFrame = this@WorkflowFrame,
131-
parentFrame = parent
132-
)
133-
finishWith(onOutput(showScope, output))
134-
}
135-
// TODO collect WorkflowAction
136-
137-
// Once the coroutine has suspended, all sends must go to the real sink.
138-
return synchronized(result) {
139-
canAcceptAction = false
140-
action ?: WorkflowAction.noAction()
141-
}
142-
}
143-
}
144-
145-
/**
146-
* Represents a call to [BackStackScope.showScreen].
147-
*/
148-
internal class ScreenFrame<OutputT, R>(
149-
private val callerJob: Job,
150-
private val frameScope: CoroutineScope,
151-
private val actionSink: Sink<WorkflowAction<Any?, BackStackState, OutputT>>,
152-
private val parent: BackStackFrame<*>?,
153-
) : BackStackFrame<R> {
154-
private val result = CompletableDeferred<R>()
155-
156-
lateinit var screen: Screen
157-
private set
158-
159-
fun initScreen(screenFactory: BackStackScreenScope<R>.() -> Screen) {
160-
val factoryScope = BackStackScreenScopeImpl<Any?, OutputT, R>(
161-
actionSink = actionSink,
162-
coroutineScope = frameScope,
163-
thisFrame = this,
164-
parentFrame = parent
165-
)
166-
screen = screenFactory(factoryScope)
167-
}
168-
169-
override suspend fun awaitResult(): R = result.await()
170-
171-
override fun cancelCaller() {
172-
callerJob.cancel()
173-
}
174-
175-
fun continueWith(value: R) {
176-
result.complete(value)
177-
cancel()
12+
fun withIdle(): BackStackFrame = object : BackStackFrame by this {
13+
override val isIdle: Boolean
14+
get() = true
17815
}
17916

180-
override suspend fun cancelSelf(): Nothing {
181-
cancel()
182-
val currentContext = currentCoroutineContext()
183-
currentContext.cancel()
184-
currentContext.ensureActive()
185-
error("Nonsense")
186-
}
187-
188-
override fun cancel() {
189-
frameScope.cancel()
190-
}
17+
fun render(context: RenderContext<Any?, BackStackState, Any?>): Screen
19118
}

0 commit comments

Comments
 (0)