Skip to content

Commit

Permalink
Fix flaky "race successful with wait" test
Browse files Browse the repository at this point in the history
This used to fail sometimes with the second source not getting rid of the listener.
Occurs when the second listener call won the race, which is quite rare.

Due to the contract of the Source, it will remove the listener, but the second
listener is called bare, so cleanup does not happen.
  • Loading branch information
natsukagami committed Feb 20, 2024
1 parent 07989ff commit a8e6aa3
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions shared/src/test/scala/ListenerBehavior.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class ListenerBehavior extends munit.FunSuite:
test("race successful without wait"):
val source1 = TSource()
val source2 = TSource()
assert(source1 != source2)
val listener = TestListener(1)
Async.race(source1, source2).onComplete(listener)

Expand Down Expand Up @@ -123,11 +124,15 @@ class ListenerBehavior extends munit.FunSuite:

Async.blocking:
val f1 = Future(source1.completeNowWith(1))
listener.waitWaiter()
listener.sleeping.await
listener.continue()
val f2 = Future(l2.completeNow(1, source2))
val f2 = Future:
val completed = l2.completeNow(1, source2)
if completed then source2.dropListener(l2) // usually the source will do it by default
completed
assert(f1.await || f2.await)
assert(!f1.await || !f2.await)
println(s"${f1.await} ${f2.await}")

assert(source1.listener.isEmpty)
assert(source2.listener.isEmpty)
Expand Down Expand Up @@ -163,7 +168,7 @@ class ListenerBehavior extends munit.FunSuite:

Async.blocking:
val f1 = Future(lockBoth(s1listener, other))
other.waitWaiter()
other.sleeping.await
assert(source2.listener.get.completeNow(1, source2))
other.continue()
assertEquals(f1.await, s1listener)
Expand Down Expand Up @@ -247,7 +252,13 @@ private class TestListener(expected: Int)(using asst: munit.Assertions) extends

private class NumberedTestListener private (sleep: AtomicBoolean, fail: Boolean, expected: Int)(using munit.Assertions)
extends TestListener(expected):
private var waiter: Option[Promise[Unit]] = None
// A promise that is waited for inside `lock` until `continue` is called.
private val waiter = if sleep.get() then Some(Promise[Unit]()) else None
// A promise that is resolved right before the lock starts waiting for `waiter`.
private val sleepPromise = Promise[Unit]()

/** A [[Future]] that resolves when the listener goes to sleep. */
val sleeping = sleepPromise.asFuture

def this(sleep: Boolean, fail: Boolean, expected: Int)(using munit.Assertions) =
this(AtomicBoolean(sleep), fail, expected)
Expand All @@ -257,18 +268,12 @@ private class NumberedTestListener private (sleep: AtomicBoolean, fail: Boolean,
def acquire() =
if sleep.getAndSet(false) then
Async.blocking:
waiter = Some(Promise())
sleepPromise.complete(Success(()))
waiter.get.await
waiter.foreach: promise =>
promise.complete(Success(()))
waiter = None
if fail then false
else true
def release() = ()

def waitWaiter() =
while waiter.isEmpty do Thread.`yield`()

def continue() = waiter.get.complete(Success(()))

/** Dummy source that never completes */
Expand Down

0 comments on commit a8e6aa3

Please sign in to comment.