Skip to content

Commit 13b4104

Browse files
Use WorkStealingDispatcher in runtime, behind a flag.
1 parent 9cb6cfe commit 13b4104

File tree

4 files changed

+161
-2
lines changed

4 files changed

+161
-2
lines changed

workflow-core/api/workflow-core.api

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ public final class com/squareup/workflow1/RuntimeConfigOptions : java/lang/Enum
167167
public static final field PARTIAL_TREE_RENDERING Lcom/squareup/workflow1/RuntimeConfigOptions;
168168
public static final field RENDER_ONLY_WHEN_STATE_CHANGES Lcom/squareup/workflow1/RuntimeConfigOptions;
169169
public static final field STABLE_EVENT_HANDLERS Lcom/squareup/workflow1/RuntimeConfigOptions;
170+
public static final field WORK_STEALING_DISPATCHER Lcom/squareup/workflow1/RuntimeConfigOptions;
170171
public static fun getEntries ()Lkotlin/enums/EnumEntries;
171172
public static fun valueOf (Ljava/lang/String;)Lcom/squareup/workflow1/RuntimeConfigOptions;
172173
public static fun values ()[Lcom/squareup/workflow1/RuntimeConfigOptions;
@@ -179,6 +180,7 @@ public final class com/squareup/workflow1/RuntimeConfigOptions$Companion {
179180
}
180181

181182
public final class com/squareup/workflow1/RuntimeConfigOptions$Companion$RuntimeOptions : java/lang/Enum {
183+
public static final field ALL Lcom/squareup/workflow1/RuntimeConfigOptions$Companion$RuntimeOptions;
182184
public static final field CONFLATE Lcom/squareup/workflow1/RuntimeConfigOptions$Companion$RuntimeOptions;
183185
public static final field DEFAULT Lcom/squareup/workflow1/RuntimeConfigOptions$Companion$RuntimeOptions;
184186
public static final field RENDER_ONLY Lcom/squareup/workflow1/RuntimeConfigOptions$Companion$RuntimeOptions;

workflow-core/src/commonMain/kotlin/com/squareup/workflow1/RuntimeConfig.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ public enum class RuntimeConfigOptions {
7474
// */
7575
// @WorkflowExperimentalRuntime
7676
// DRAIN_EXCLUSIVE_ACTIONS,
77+
78+
/**
79+
* Wrap the dispatcher passed to the runtime with a special dispatcher that can be advanced
80+
* explicitly, to allow any tasks scheduled by the workflow runtime to run before certain phases.
81+
*/
82+
@WorkflowExperimentalRuntime
83+
WORK_STEALING_DISPATCHER,
7784
;
7885

7986
public companion object {
@@ -136,6 +143,7 @@ public enum class RuntimeConfigOptions {
136143
STABLE_EVENT_HANDLERS,
137144
)
138145
),
146+
139147
// RENDER_ONLY_CONFLATE_PARTIAL_DEA(
140148
// setOf(
141149
// RENDER_ONLY_WHEN_STATE_CHANGES,
@@ -161,6 +169,8 @@ public enum class RuntimeConfigOptions {
161169
// DRAIN_EXCLUSIVE_ACTIONS,
162170
// )
163171
// ),
172+
173+
ALL(RuntimeConfigOptions.entries.toSet())
164174
}
165175
}
166176
}

workflow-runtime/src/commonMain/kotlin/com/squareup/workflow1/RenderWorkflow.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import com.squareup.workflow1.RuntimeConfigOptions.CONFLATE_STALE_RENDERINGS
44
import com.squareup.workflow1.RuntimeConfigOptions.RENDER_ONLY_WHEN_STATE_CHANGES
55
import com.squareup.workflow1.WorkflowInterceptor.RenderPassSkipped
66
import com.squareup.workflow1.WorkflowInterceptor.RenderPassesComplete
7+
import com.squareup.workflow1.internal.WorkStealingDispatcher
78
import com.squareup.workflow1.internal.WorkflowRunner
89
import com.squareup.workflow1.internal.chained
910
import kotlinx.coroutines.CancellationException
@@ -14,6 +15,7 @@ import kotlinx.coroutines.flow.MutableStateFlow
1415
import kotlinx.coroutines.flow.StateFlow
1516
import kotlinx.coroutines.isActive
1617
import kotlinx.coroutines.launch
18+
import kotlinx.coroutines.plus
1719

1820
/**
1921
* Launches the [workflow] in a new coroutine in [scope] and returns a [StateFlow] of its
@@ -117,6 +119,15 @@ public fun <PropsT, OutputT, RenderingT> renderWorkflowIn(
117119
): StateFlow<RenderingAndSnapshot<RenderingT>> {
118120
val chainedInterceptor = interceptors.chained()
119121

122+
val dispatcher = if (RuntimeConfigOptions.WORK_STEALING_DISPATCHER in runtimeConfig) {
123+
WorkStealingDispatcher.wrapDispatcherFrom(scope.coroutineContext)
124+
} else {
125+
null
126+
}
127+
128+
@Suppress("NAME_SHADOWING")
129+
val scope = dispatcher?.let { scope + dispatcher } ?: scope
130+
120131
val runner = WorkflowRunner(
121132
scope,
122133
workflow,
@@ -202,6 +213,13 @@ public fun <PropsT, OutputT, RenderingT> renderWorkflowIn(
202213
conflationHasChangedState = conflationHasChangedState || actionResult.stateChanged
203214
// We may have more actions we can process, this rendering could be stale.
204215
// This will check for any actions that are immediately available and apply them.
216+
// We advance the dispatcher first to allow any coroutines that were launched by the last
217+
// render pass to start up and potentially enqueue actions.
218+
dispatcher?.let {
219+
workflowTracer.trace("AdvancingWorkflowDispatcher") {
220+
dispatcher.advanceUntilIdle()
221+
}
222+
}
205223
actionResult = runner.applyNextAvailableTreeAction()
206224

207225
// If no actions processed, then no new rendering needed. Pass on to UI.

workflow-runtime/src/commonTest/kotlin/com/squareup/workflow1/RenderWorkflowInTest.kt

Lines changed: 131 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,29 @@ import com.squareup.workflow1.RuntimeConfigOptions.Companion.RuntimeOptions
66
import com.squareup.workflow1.RuntimeConfigOptions.Companion.RuntimeOptions.DEFAULT
77
import com.squareup.workflow1.RuntimeConfigOptions.PARTIAL_TREE_RENDERING
88
import com.squareup.workflow1.RuntimeConfigOptions.RENDER_ONLY_WHEN_STATE_CHANGES
9+
import com.squareup.workflow1.RuntimeConfigOptions.WORK_STEALING_DISPATCHER
910
import com.squareup.workflow1.WorkflowInterceptor.RenderPassSkipped
1011
import com.squareup.workflow1.WorkflowInterceptor.RenderPassesComplete
1112
import com.squareup.workflow1.WorkflowInterceptor.RuntimeLoopOutcome
1213
import kotlinx.coroutines.CancellationException
1314
import kotlinx.coroutines.CompletableDeferred
1415
import kotlinx.coroutines.CoroutineExceptionHandler
16+
import kotlinx.coroutines.Dispatchers
1517
import kotlinx.coroutines.ExperimentalCoroutinesApi
18+
import kotlinx.coroutines.awaitCancellation
1619
import kotlinx.coroutines.cancel
20+
import kotlinx.coroutines.cancelAndJoin
1721
import kotlinx.coroutines.channels.Channel
1822
import kotlinx.coroutines.flow.MutableSharedFlow
1923
import kotlinx.coroutines.flow.MutableStateFlow
2024
import kotlinx.coroutines.flow.StateFlow
2125
import kotlinx.coroutines.flow.map
26+
import kotlinx.coroutines.flow.produceIn
2227
import kotlinx.coroutines.flow.receiveAsFlow
2328
import kotlinx.coroutines.isActive
29+
import kotlinx.coroutines.job
2430
import kotlinx.coroutines.launch
31+
import kotlinx.coroutines.plus
2532
import kotlinx.coroutines.suspendCancellableCoroutine
2633
import kotlinx.coroutines.sync.Mutex
2734
import kotlinx.coroutines.test.StandardTestDispatcher
@@ -45,7 +52,7 @@ import kotlin.test.assertTrue
4552
@Burst
4653
class RenderWorkflowInTest(
4754
useTracer: Boolean = false,
48-
useUnconfined: Boolean = true,
55+
private val useUnconfined: Boolean = true,
4956
private val runtime: RuntimeOptions = DEFAULT
5057
) {
5158

@@ -1494,7 +1501,9 @@ class RenderWorkflowInTest(
14941501

14951502
@Test
14961503
fun for_conflate_we_do_not_conflate_stacked_actions_into_one_rendering_if_output() {
1497-
if (runtimeConfig.contains(CONFLATE_STALE_RENDERINGS)) {
1504+
if (CONFLATE_STALE_RENDERINGS in runtimeConfig &&
1505+
WORK_STEALING_DISPATCHER !in runtimeConfig
1506+
) {
14981507
runTest(dispatcherUsed) {
14991508
check(runtimeConfig.contains(CONFLATE_STALE_RENDERINGS))
15001509

@@ -1738,6 +1747,126 @@ class RenderWorkflowInTest(
17381747
}
17391748
}
17401749

1750+
/**
1751+
* When the [CONFLATE_STALE_RENDERINGS] flag is specified, the runtime will repeatedly run all
1752+
* enqueued WorkflowActions after a render pass, before emitting the rendering to the external
1753+
* flow. When the [WORK_STEALING_DISPATCHER] flag is specified at the same time, any coroutines
1754+
* launched (or even resumed) since the render pass will be allowed to run _before_ checking for
1755+
* actions. This means that any new side effects or workers started by the render pass will be
1756+
* allowed to run to their first suspension point before the rendering is emitted. And if they
1757+
* happen to emit more actions as part of that, then those actions will also be processed, etc.
1758+
* until no more actions are available – only then will the rendering actually be emitted.
1759+
*/
1760+
@Test
1761+
fun new_effect_coroutines_dispatched_before_rendering_emitted_when_work_stealing_dispatcher() {
1762+
// This tests is specifically for standard dispatching behavior. It currently only works when
1763+
// CSR is enabled, although an additional test for DEA should be added.
1764+
if (WORK_STEALING_DISPATCHER !in runtimeConfig ||
1765+
CONFLATE_STALE_RENDERINGS !in runtimeConfig ||
1766+
useUnconfined
1767+
) {
1768+
return
1769+
}
1770+
1771+
runTest(dispatcherUsed) {
1772+
val workflow = Workflow.stateful<Int, Nothing, Unit>(initialState = 0) { effectCount ->
1773+
// Because of the WSD, this effect will be allowed to run after the render pass but before
1774+
// emitting the rendering OR checking for new actions, in the CSR loop. Since it emits an
1775+
// action, that action will be processed and trigger a second render pass.
1776+
runningSideEffect("sender") {
1777+
actionSink.send(
1778+
action("0") {
1779+
expect(2)
1780+
this.state++
1781+
}
1782+
)
1783+
}
1784+
1785+
if (effectCount >= 1) {
1786+
// This effect will be started by the first action and cancelled only when the runtime
1787+
// is cancelled.
1788+
// It will also start in the CSR loop, and trigger a third render pass before emitting the
1789+
// rendering.
1790+
runningSideEffect("0") {
1791+
expect(3)
1792+
actionSink.send(
1793+
action("1") {
1794+
expect(4)
1795+
this.state++
1796+
}
1797+
)
1798+
awaitCancellation {
1799+
expect(9)
1800+
}
1801+
}
1802+
}
1803+
1804+
if (effectCount >= 2) {
1805+
// This effect will be started by the second action, and cancelled by its own action in
1806+
// the same run of the CSR loop again.
1807+
runningSideEffect("1") {
1808+
expect(5)
1809+
actionSink.send(
1810+
action("-1") {
1811+
expect(6)
1812+
this.state--
1813+
}
1814+
)
1815+
awaitCancellation {
1816+
expect(7)
1817+
}
1818+
}
1819+
}
1820+
}
1821+
1822+
// We collect the renderings flow to a channel to drive the runtime loop by receiving from the
1823+
// channel. We can't use testScheduler.advanceUntilIdle() et al because we only want the test
1824+
// scheduler to run tasks until a rendering is available, not indefinitely.
1825+
val renderings = renderWorkflowIn(
1826+
workflow = workflow,
1827+
// Run in this scope so it is advanced by advanceUntilIdle.
1828+
scope = backgroundScope,
1829+
props = MutableStateFlow(Unit),
1830+
runtimeConfig = runtimeConfig,
1831+
workflowTracer = testTracer,
1832+
onOutput = {}
1833+
).produceIn(backgroundScope + Dispatchers.Unconfined)
1834+
1835+
expect(0)
1836+
// Receiving the first rendering allows the runtime coroutine to start. The first rendering
1837+
// is returned synchronously.
1838+
renderings.receive()
1839+
expect(1)
1840+
// Receiving the second rendering will allow the runtime to continue until the rendering is
1841+
// emitted. Since the CSR loop will start all our effects before emitting the next rendering,
1842+
// only one rendering will be emitted for all those render passes.
1843+
renderings.receive()
1844+
expect(8)
1845+
1846+
// No more renderings should be produced.
1847+
testScheduler.advanceUntilIdle()
1848+
assertTrue(renderings.isEmpty)
1849+
1850+
// Cancel the whole workflow runtime, including all effects.
1851+
backgroundScope.coroutineContext.job.cancelAndJoin()
1852+
expect(10)
1853+
}
1854+
}
1855+
1856+
private suspend fun awaitCancellation(onFinally: () -> Unit) {
1857+
try {
1858+
awaitCancellation()
1859+
} finally {
1860+
onFinally()
1861+
}
1862+
}
1863+
1864+
private var expectCounter = 0
1865+
private fun expect(expected: Int) {
1866+
assertEquals(expected, expectCounter)
1867+
expectCounter++
1868+
}
1869+
17411870
private class ExpectedException : RuntimeException()
17421871

17431872
companion object {

0 commit comments

Comments
 (0)