diff --git a/shared/src/main/scala/async/Async.scala b/shared/src/main/scala/async/Async.scala index 5ae129da..2e959e81 100644 --- a/shared/src/main/scala/async/Async.scala +++ b/shared/src/main/scala/async/Async.scala @@ -7,16 +7,40 @@ import gears.async.Listener.withLock import gears.async.Listener.NumberedLock import scala.util.boundary -/** A context that allows to suspend waiting for asynchronous data sources +/** The async context: provides the capability to asynchronously [[Async.await await]] for [[Async.Source Source]]s, and + * defines a scope for structured concurrency through a [[CompletionGroup]]. + * + * As both a context and a capability, the idiomatic way of using [[Async]] is to be implicitly passed around + * functions, as an `using` parameter: + * {{{ + * def function()(using Async): T = ??? + * }}} + * + * It is not recommended to store [[Async]] in a class field, since it complicates scoping rules. + * + * @param support + * An implementation of the underlying asynchronous operations (suspend and resume). See [[AsyncSupport]]. + * @param scheduler + * An implementation of a scheduler, for scheduling computation as they are spawned or resumed. See [[Scheduler]]. + * + * @see + * [[Async$.blocking Async.blocking]] for a way to construct an [[Async]] instance. + * @see + * [[Async$.group Async.group]] and [[Future$.apply Future.apply]] for [[Async]]-subscoping operations. */ trait Async(using val support: AsyncSupport, val scheduler: support.Scheduler): - /** Wait for completion of async source `src` and return the result */ + /** Waits for completion of source `src` and returns the result. Suspends the computation. + * + * @see + * [[Async.Source.awaitResult]] and [[Async$.await]] for extension methods calling [[Async!.await]] from the source + * itself. + */ def await[T](src: Async.Source[T]): T - /** The cancellation group for this Async */ + /** Returns the cancellation group for this [[Async]] context. */ def group: CompletionGroup - /** An Async of the same kind as this one, with a new cancellation group */ + /** Returns an [[Async]] context of the same kind as this one, with a new cancellation group. */ def withGroup(group: CompletionGroup): Async object Async: @@ -53,7 +77,7 @@ object Async: def blocking[T](body: Async.Spawn ?=> T)(using support: AsyncSupport, scheduler: support.Scheduler): T = group(body)(using Blocking(CompletionGroup.Unlinked)) - /** The currently executing Async context */ + /** Returns the currently executing Async context. Equivalent to `summon[Async]`. */ inline def current(using async: Async): Async = async /** [[Async.Spawn]] is a special subtype of [[Async]], also capable of spawning runnable [[Future]]s. @@ -63,8 +87,8 @@ object Async: */ opaque type Spawn <: Async = Async - /** Runs [[body]] inside a spawnable context where it is allowed to spawning concurrently runnable [[Future]]s. When - * the body returns, all spawned futures are cancelled and waited for. + /** Runs `body` inside a spawnable context where it is allowed to spawn concurrently runnable [[Future]]s. When the + * body returns, all spawned futures are cancelled and waited for. */ def group[T](body: Async.Spawn ?=> T)(using Async): T = withNewCompletionGroup(CompletionGroup().link())(body) @@ -86,51 +110,85 @@ object Async: group.waitCompletion()(using completionAsync) /** An asynchronous data source. Sources can be persistent or ephemeral. A persistent source will always pass same - * data to calls of `poll and `onComplete`. An ephemeral source can pass new data in every call. An example of a - * persistent source is `Future`. An example of an ephemeral source is `Channel`. + * data to calls of [[Source!.poll]] and [[Source!.onComplete]]. An ephemeral source can pass new data in every call. + * + * @see + * An example of a persistent source is [[gears.async.Future]]. + * @see + * An example of an ephemeral source is [[gears.async.Channel]]. */ trait Source[+T]: - - /** Check whether data is available at present and pass it to k if so. If no element is available, does not lock k - * and returns false immediately. If there is (or may be) data available, the listener is locked and if it fails, - * true is returned to signal this source's general availability. If locking k succeeds, only return true iff k's - * complete is called. Calls to `poll` are always synchronous. + /** Checks whether data is available at present and pass it to `k` if so. Calls to `poll` are always synchronous and + * non-blocking. + * + * The process is as follows: + * - If no data is immediately available, return `false` immediately. + * - If there is data available, attempt to lock `k`. + * - If `k` is no longer available, `true` is returned to signal this source's general availability. + * - If locking `k` succeeds: + * - If data is still available, complete `k` and return true. + * - Otherwise, unlock `k` and return false. + * + * Note that in all cases, a return value of `false` indicates that `k` should be put into `onComplete` to receive + * data in a later point in time. + * + * @return + * Whether poll was able to pass data to `k`. Note that this is regardless of `k` being available to receive the + * data. In most cases, one should pass `k` into [[Source!.onComplete]] if `poll` returns `false`. */ def poll(k: Listener[T]): Boolean - /** Once data is available, pass it to function `k`. `k` returns true iff the data was consumed in an async block. - * Calls to `onComplete` are usually asynchronous, meaning that the passed continuation `k` is a suspension. + /** Once data is available, pass it to the listener `k`. `onComplete` is always non-blocking. + * + * Note that `k`'s methods will be executed on the same thread as the [[Source]], usually in sequence. It is hence + * important that the listener itself does not perform expensive operations. */ def onComplete(k: Listener[T]): Unit - /** Signal that listener `k` is dead (i.e. will always return `false` from now on). This permits original, (i.e. - * non-derived) sources like futures or channels to drop the listener from their waiting sets. + /** Signal that listener `k` is dead (i.e. will always fail to acquire locks from now on), and should be removed + * from `onComplete` queues. + * + * This permits original, (i.e. non-derived) sources like futures or channels to drop the listener from their + * waiting sets. */ def dropListener(k: Listener[T]): Unit - /** Utility method for direct polling. */ + /** Similar to [[Async.Source!.poll(k:Listener[T])* poll]], but instead of passing in a listener, directly return + * the value `T` if it is available. + */ def poll(): Option[T] = var resultOpt: Option[T] = None poll(Listener.acceptingListener { (x, _) => resultOpt = Some(x) }) resultOpt - /** Utility method for direct waiting with `Async`. */ + /** Waits for an item to arrive from the source. Suspends until an item returns. + * + * This is an utility method for direct waiting with `Async`, instead of going through listeners. + */ final def awaitResult(using ac: Async) = ac.await(this) end Source extension [T](src: Source[scala.util.Try[T]]) - /** Waits for an item to arrive from the source, then automatically unwraps it. */ + /** Waits for an item to arrive from the source, then automatically unwraps it. Suspends until an item returns. + * @see + * [[Source!.awaitResult awaitResult]] for non-unwrapping await. + */ inline def await(using Async) = src.awaitResult.get extension [E, T](src: Source[Either[E, T]]) - /** Waits for an item to arrive from the source, then automatically unwraps it. */ + /** Waits for an item to arrive from the source, then automatically unwraps it. Suspends until an item returns. + * @see + * [[Source!.awaitResult awaitResult]] for non-unwrapping await. + */ inline def await(using Async) = src.awaitResult.right.get - /** An original source has a standard definition of `onComplete` in terms of `poll` and `addListener`. Implementations - * should be the resource owner to handle listener queue and completion using an object monitor on the instance. + /** An original source has a standard definition of [[Source.onComplete onComplete]] in terms of [[Source.poll poll]] + * and [[OriginalSource.addListener addListener]]. + * + * Implementations should be the resource owner to handle listener queue and completion using an object monitor on + * the instance. */ abstract class OriginalSource[+T] extends Source[T]: - - /** Add `k` to the listener set of this source */ + /** Add `k` to the listener set of this source. */ protected def addListener(k: Listener[T]): Unit def onComplete(k: Listener[T]): Unit = synchronized: @@ -139,7 +197,12 @@ object Async: end OriginalSource object Source: - /** Create a [[Source]] containing the given values, resolved once for each. */ + /** Create a [[Source]] containing the given values, resolved once for each. + * + * @return + * an ephemeral source of values arriving to listeners in a queue. Once all values are received, attaching a + * listener with [[Source!.onComplete onComplete]] will be a no-op (i.e. the listener will never be called). + */ def values[T](values: T*) = import scala.collection.JavaConverters._ val q = java.util.concurrent.ConcurrentLinkedQueue[T]() @@ -163,8 +226,14 @@ object Async: extension [T](src: Source[T]) /** Create a new source that requires the original source to run the given transformation function on every value - * received. Note that [[f]] is **always** run on the computation that produces the values from the original - * source, so this is very likely to run **sequentially** and be a performance bottleneck. + * received. + * + * Note that `f` is **always** run on the computation that produces the values from the original source, so this is + * very likely to run **sequentially** and be a performance bottleneck. + * + * @param f + * the transformation function to be run on every value. `f` is run *before* the item is passed to the + * [[Listener]]. */ def transformValuesWith[U](f: T => U) = new Source[U]: @@ -182,7 +251,23 @@ object Async: def dropListener(k: Listener[U]): Unit = src.dropListener(transform(k)) + /** Creates a source that "races" a list of sources. + * + * Listeners attached to this source is resolved with the first item arriving from one of the sources. If multiple + * sources are available at the same time, one of the items will be returned with no priority. Items that are not + * returned are '''not''' consumed from the upstream sources. + * + * @see + * [[raceWithOrigin]] for a race source that also returns the upstream origin of the item. + * @see + * [[Async$.select Async.select]] for a convenient syntax to race sources and awaiting them with [[Async]]. + */ def race[T](sources: Source[T]*): Source[T] = raceImpl[T, T]((v, _) => v)(sources*) + + /** Like [[race]], but the returned value includes a reference to the upstream source that the item came from. + * @see + * [[Async$.select Async.select]] for a convenient syntax to race sources and awaiting them with [[Async]]. + */ def raceWithOrigin[T](sources: Source[T]*): Source[(T, Source[T])] = raceImpl[(T, Source[T]), T]((v, src) => (v, src))(sources*) @@ -260,28 +345,59 @@ object Async: /** Cases for handling async sources in a [[select]]. [[SelectCase]] can be constructed by extension methods `handle` * of [[Source]]. + * + * @see + * [[handle Source.handle]] (and its operator alias [[~~> ~~>]]) + * @see + * [[Async$.select Async.select]] where [[SelectCase]] is used. */ opaque type SelectCase[T] = (Source[?], Nothing => T) // ^ unsafe types, but we only construct SelectCase from `handle` which is safe extension [T](src: Source[T]) - /** Attach a handler to [[src]], creating a [[SelectCase]]. */ + /** Attach a handler to `src`, creating a [[SelectCase]]. + * @see + * [[Async$.select Async.select]] where [[SelectCase]] is used. + */ inline def handle[U](f: T => U): SelectCase[U] = (src, f) - /** Alias for [[handle]] */ + /** Alias for [[handle]] + * @see + * [[Async$.select Async.select]] where [[SelectCase]] is used. + */ inline def ~~>[U](f: T => U): SelectCase[U] = src.handle(f) /** Race a list of sources with the corresponding handler functions, once an item has come back. Like [[race]], - * [[select]] guarantees exactly one of the sources are polled. Unlike `map`ping a [[Source]], the handler in + * [[select]] guarantees exactly one of the sources are polled. Unlike [[transformValuesWith]], the handler in * [[select]] is run in the same async context as the calling context of [[select]]. + * + * @see + * [[handle Source.handle]] (and its operator alias [[~~> ~~>]]) for methods to create [[SelectCase]]s. + * @example + * {{{ + * // Race a channel read with a timeout + * val ch = SyncChannel[Int]() + * // ... + * val timeout = Future(sleep(1500.millis)) + * + * Async.select( + * ch.readSrc.handle: item => + * Some(item * 2), + * timeout ~~> _ => None + * ) + * }}} */ def select[T](cases: SelectCase[T]*)(using Async) = val (input, which) = raceWithOrigin(cases.map(_._1)*).awaitResult val (_, handler) = cases.find(_._1 == which).get handler.asInstanceOf[input.type => T](input) - /** If left (respectively, right) source succeeds with `x`, pass `Left(x)`, (respectively, Right(x)) on to the - * continuation. + /** Race two sources, wrapping them respectively in [[Left]] and [[Right]] cases. + * @return + * a new [[Source]] that resolves with [[Left]] if `src1` returns an item, [[Right]] if `src2` returns an item, + * whichever comes first. + * @see + * [[race]] and [[select]] for racing more than two sources. */ def either[T1, T2](src1: Source[T1], src2: Source[T2]): Source[Either[T1, T2]] = race(src1.transformValuesWith(Left(_)), src2.transformValuesWith(Right(_))) diff --git a/shared/src/main/scala/async/AsyncOperations.scala b/shared/src/main/scala/async/AsyncOperations.scala index 9e05b5b4..0494ce0a 100644 --- a/shared/src/main/scala/async/AsyncOperations.scala +++ b/shared/src/main/scala/async/AsyncOperations.scala @@ -4,19 +4,24 @@ import scala.concurrent.duration.FiniteDuration import java.util.concurrent.TimeoutException import gears.async.AsyncOperations.sleep +/** Defines fundamental operations that require the support of the scheduler. This is commonly provided alongside with + * the given implementation of [[Scheduler]]. + * @see + * [[Scheduler]] for the definition of the scheduler itself. + */ trait AsyncOperations: - /** Suspends the current [[Async]] context for at least [[millis]] milliseconds. */ + /** Suspends the current [[Async]] context for at least `millis` milliseconds. */ def sleep(millis: Long)(using Async): Unit object AsyncOperations: - /** Suspends the current [[Async]] context for at least [[millis]] milliseconds. + /** Suspends the current [[Async]] context for at least `millis` milliseconds. * @param millis - * The duration to suspend. Must be a positive integer. + * The duration to suspend, in milliseconds. Must be a positive integer. */ inline def sleep(millis: Long)(using AsyncOperations, Async): Unit = summon[AsyncOperations].sleep(millis) - /** Suspends the current [[Async]] context for at least [[millis]] milliseconds. + /** Suspends the current [[Async]] context for `duration`. * @param duration * The duration to suspend. Must be positive. */ diff --git a/shared/src/main/scala/async/AsyncSupport.scala b/shared/src/main/scala/async/AsyncSupport.scala index 9effcaeb..0505ebf4 100644 --- a/shared/src/main/scala/async/AsyncSupport.scala +++ b/shared/src/main/scala/async/AsyncSupport.scala @@ -2,13 +2,21 @@ package gears.async import scala.concurrent.duration._ +/** The delimited continuation suspension interface. Represents a suspended computation asking for a value of type `T` + * to continue (and eventually returning a value of type `R`). + */ trait Suspension[-T, +R]: def resume(arg: T): R +/** Support for suspension capabilities through a delimited continuation interface. */ trait SuspendSupport: + /** A marker for the "limit" of "delimited continuation". */ type Label[R] + + /** The provided suspension type. */ type Suspension[-T, +R] <: gears.async.Suspension[T, R] + /** Set the suspension marker as the body's caller, and execute `body`. */ def boundary[R](body: Label[R] ?=> R): R /** Should return immediately if resume is called from within body */ @@ -18,12 +26,15 @@ trait SuspendSupport: trait AsyncSupport extends SuspendSupport: type Scheduler <: gears.async.Scheduler + /** Resume a [[Suspension]] at some point in the future, scheduled by the scheduler. */ private[async] def resumeAsync[T, R](suspension: Suspension[T, R])(arg: T)(using s: Scheduler): Unit = s.execute(() => suspension.resume(arg)) + /** Schedule a computation with the suspension boundary already created. */ private[async] def scheduleBoundary(body: Label[Unit] ?=> Unit)(using s: Scheduler): Unit = s.execute(() => boundary(body)) +/** A scheduler implementation, with the ability to execute a computation immediately or after a delay. */ trait Scheduler: def execute(body: Runnable): Unit def schedule(delay: FiniteDuration, body: Runnable): Cancellable diff --git a/shared/src/main/scala/async/Cancellable.scala b/shared/src/main/scala/async/Cancellable.scala index 59b3bf16..15dc07b4 100644 --- a/shared/src/main/scala/async/Cancellable.scala +++ b/shared/src/main/scala/async/Cancellable.scala @@ -1,6 +1,6 @@ package gears.async -/** A trait for cancellable entities that can be grouped */ +/** A trait for cancellable entities that can be grouped. */ trait Cancellable: private var group: CompletionGroup = CompletionGroup.Unlinked @@ -28,6 +28,7 @@ trait Cancellable: end Cancellable object Cancellable: + /** A special [[Cancellable]] object that just tracks whether its linked group was cancelled. */ trait Tracking extends Cancellable: def isCancelled: Boolean diff --git a/shared/src/main/scala/async/CompletionGroup.scala b/shared/src/main/scala/async/CompletionGroup.scala index c2f3cece..22daafb6 100644 --- a/shared/src/main/scala/async/CompletionGroup.scala +++ b/shared/src/main/scala/async/CompletionGroup.scala @@ -20,6 +20,7 @@ class CompletionGroup extends Cancellable.Tracking: members.toSeq .foreach(_.cancel()) + /** Wait for all members of the group to complete and unlink themselves. */ private[async] def waitCompletion()(using Async): Unit = synchronized: if members.nonEmpty && cancelWait.isEmpty then cancelWait = Some(Promise()) diff --git a/shared/src/main/scala/async/Listener.scala b/shared/src/main/scala/async/Listener.scala index 73347ec4..113d9792 100644 --- a/shared/src/main/scala/async/Listener.scala +++ b/shared/src/main/scala/async/Listener.scala @@ -10,19 +10,17 @@ import java.util.concurrent.locks.ReentrantLock * or [[Listener.acceptingListener]]. * * However, should the listener want to attempt synchronization, it has to expose some locking-related interfaces. See - * `lock`. + * [[Listener.lock]]. */ trait Listener[-T]: import Listener._ /** Complete the listener with the given item, from the given source. **If the listener exposes a - * [[Listener.ListenerLock]]**, it is required to acquire this lock completely (either through [[lockCompletely]] or - * through manual locking of every layer) before calling [[complete]]. This can also be done conveniently with - * [[completeNow]]. For performance reasons, this condition is usually not checked and will end up causing unexpected - * behavior if not satisfied. + * [[Listener.ListenerLock]]**, it is required to acquire this lock before calling [[complete]]. This can also be + * done conveniently with [[completeNow]]. For performance reasons, this condition is usually not checked and will + * end up causing unexpected behavior if not satisfied. * - * The listener must automatically release the lock of itself and any underlying listeners, however this usually is - * done automatically by calling the inner listener's [[complete]] recursively. + * The listener must automatically release its own lock upon completion. */ def complete(data: T, source: Async.Source[T]): Unit @@ -32,7 +30,7 @@ trait Listener[-T]: val lock: Listener.ListenerLock | Null /** Attempts to acquire locks and then calling [[complete]] with the given item and source. If locking fails, - * [[release]] is automatically called. + * [[releaseLock]] is automatically called. */ def completeNow(data: T, source: Async.Source[T]): Boolean = if acquireLock() then @@ -40,11 +38,10 @@ trait Listener[-T]: true else false - /** Release the listener lock up to the given [[Listener.LockMarker]], if it exists. */ + /** Release the listener's lock if it exists. */ inline final def releaseLock(): Unit = if lock != null then lock.release() - /** Attempts to completely lock the listener, if such a lock exists. Succeeds with [[true]] immediately if there is no - * [[Listener.ListenerLock]]. If locking fails, [[release]] is automatically called. + /** Attempts to lock the listener, if such a lock exists. Succeeds with `true` immediately if [[lock]] is `null`. */ inline final def acquireLock(): Boolean = if lock != null then lock.acquire() else true @@ -76,14 +73,11 @@ object Listener: * * Some implementations are provided for ease of implementations: * - For custom listener implementations involving locks: [[NumberedLock]] provides uniquely numbered locks. - * - For source transformation implementations: [[ListenerLockWrapper]] provides a ListenerLock instance that only - * forwards the requests to the underlying lock. [[withLock]] is a convenient `.map` for `[[ListenerLock]] | - * Null`. + * - For source transformation implementations: [[withLock]] is a convenient `.map` for `[[ListenerLock]] | Null`. */ trait ListenerLock: - /** The assigned number of the lock. If the listener holds inner listeners underneath that utilizes locks, it is - * **required** that [[selfNumber]] must be greater or equal any [[PartialLock.nextNumber]] of any returned - * [[PartialLock]]s. + /** The assigned number of the lock. It is required that listeners that can be locked together to have different + * [[selfNumber numbers]]. This requirement can be simply done by using a lock created using [[NumberedLock]]. */ val selfNumber: Long @@ -91,9 +85,7 @@ object Listener: */ def acquire(): Boolean - /** Release the current lock without resolving the listener with any items, if the current listener lock is before - * or the same as the current [[Listener.LockMarker]]. - */ + /** Release the current lock. */ def release(): Unit end ListenerLock diff --git a/shared/src/main/scala/async/ScalaConverters.scala b/shared/src/main/scala/async/ScalaConverters.scala index ce92e319..2cc72029 100644 --- a/shared/src/main/scala/async/ScalaConverters.scala +++ b/shared/src/main/scala/async/ScalaConverters.scala @@ -7,8 +7,9 @@ import scala.util.Try /** Converters from Gears types to Scala API types and back. */ object ScalaConverters: extension [T](fut: StdFuture[T]) - /** Converts a [[StdFuture]] into a gears [[Future]]. Requires an [[ExecutionContext]], as the job of completing the - * returned [[Future]] will be done through this context. Since [[StdFuture]] cannot be cancelled, the returned + /** Converts a [[scala.concurrent.Future Scala Future]] into a gears [[Future]]. Requires an + * [[scala.concurrent.ExecutionContext ExecutionContext]], as the job of completing the returned [[Future]] will be + * done through this context. Since [[scala.concurrent.Future Scala Future]] cannot be cancelled, the returned * [[Future]] will *not* clean up the pending job when cancelled. */ def asGears(using ExecutionContext): Future[T] = @@ -16,8 +17,9 @@ object ScalaConverters: fut.andThen(result => resolver.complete(result)) extension [T](fut: Future[T]) - /** Converts a gears [[Future]] into a Scala [[StdFuture]]. Note that if [[fut]] is cancelled, the returned - * [[StdFuture]] will also be completed with `Failure(CancellationException)`. + /** Converts a gears [[Future]] into a Scala [[scala.concurrent.Future Scala Future]]. Note that if `fut` is + * cancelled, the returned [[scala.concurrent.Future Scala Future]] will also be completed with + * `Failure(CancellationException)`. */ def asScala: StdFuture[T] = val p = StdPromise[T]() diff --git a/shared/src/main/scala/async/channels.scala b/shared/src/main/scala/async/channels.scala index 074abd36..2d0cf083 100644 --- a/shared/src/main/scala/async/channels.scala +++ b/shared/src/main/scala/async/channels.scala @@ -14,16 +14,26 @@ import Channel.{Closed, Res} /** The part of a channel one can send values to. Blocking behavior depends on the implementation. */ trait SendableChannel[-T]: - /** Create an [[Async.Source]] representing the send action of value [[x]]. Note that *each* listener attached to and - * accepting a [[Sent]] value corresponds to [[x]] being sent once. + /** Create an [[Async.Source]] representing the send action of value `x`. + * + * Note that *each* listener attached to and accepting an [[Unit]] value corresponds to `x` being sent once. * * To create an [[Async.Source]] that sends the item exactly once regardless of listeners attached, wrap the [[send]] - * operation inside a [[gears.async.Future]]. + * operation inside a [[gears.async.Future]]: + * {{{ + * val sendOnce = Future(ch.send(x)) + * }}} + * + * @return + * an [[Async.Source]] that resolves with `Right(())` when `x` is sent to the channel, or `Left(Closed)` if the + * channel is already closed. This source will perform a send operation every time a listener is attached to it, or + * every time it is [[Async$.await]]ed on. */ def sendSource(x: T): Async.Source[Res[Unit]] - /** Send [[x]] over the channel, blocking (asynchronously with [[Async]]) until the item has been sent or, if the - * channel is buffered, queued. Throws [[ChannelClosedException]] if the channel was closed. + /** Send `x` over the channel, suspending until the item has been sent or, if the channel is buffered, queued. + * @throws ChannelClosedException + * if the channel was closed. */ def send(x: T)(using Async): Unit = sendSource(x).awaitResult match case Right(_) => () @@ -34,55 +44,98 @@ end SendableChannel */ trait ReadableChannel[+T]: /** An [[Async.Source]] corresponding to items being sent over the channel. Note that *each* listener attached to and - * accepting a [[Read]] value corresponds to one value received over the channel. + * accepting a [[Right]] value corresponds to one value received over the channel. * * To create an [[Async.Source]] that reads *exactly one* item regardless of listeners attached, wrap the [[read]] * operation inside a [[gears.async.Future]]. + * {{{ + * val readOnce = Future(ch.read(x)) + * }}} */ val readSource: Async.Source[Res[T]] - /** Read an item from the channel, blocking (asynchronously with [[Async]]) until the item has been received. Returns + /** Read an item from the channel, suspending until the item has been received. Returns * `Failure(ChannelClosedException)` if the channel was closed. */ def read()(using Async): Res[T] = readSource.awaitResult end ReadableChannel -/** A generic channel that can be sent to, received from and closed. */ +/** A generic channel that can be sent to, received from and closed. + * @example + * {{{ + * // send from one Future, read from multiple + * val ch = SyncChannel[Int]() + * val sender = Future: + * for i <- 1 to 20 do + * ch.send(i) + * ch.close() + * val receivers = (1 to 5).map: n => + * Future: + * boundary: + * while true: + * ch.read() match + * case Right(k) => println(s"Receiver $n got: $k") + * case Left(_) => boundary.break() + * + * receivers.awaitAll + * }}} + * @see + * [[SyncChannel]], [[BufferedChannel]] and [[UnboundedChannel]] for channel implementations. + */ trait Channel[T] extends SendableChannel[T], ReadableChannel[T], java.io.Closeable: + /** Restrict this channel to send-only. */ inline final def asSendable: SendableChannel[T] = this + + /** Restrict this channel to read-only. */ inline final def asReadable: ReadableChannel[T] = this + + /** Restrict this channel to close-only. */ inline final def asCloseable: java.io.Closeable = this protected type Reader = Listener[Res[T]] protected type Sender = Listener[Res[Unit]] end Channel -/** SyncChannel, sometimes called a rendez-vous channel has the following semantics: - * - `send` to an unclosed channel blocks until a reader commits to receiving the value (via successfully locking). +/** Synchronous channels, sometimes called rendez-vous channels, has the following semantics: + * - [[Channel.send send]] to an unclosed channel blocks until a [[Channel.read read]] listener commits to receiving + * the value (via successfully locking). + * + * See [[SyncChannel$.apply]] for creation of synchronous channels. */ trait SyncChannel[T] extends Channel[T] -/** BufferedChannel(size: Int) is a version of a channel with an internal value buffer (represented internally as an - * array with positive size). It has the following semantics: - * - `send` if the buffer is not full appends the value to the buffer and success immediately. - * - `send` if the buffer is full blocks until some buffer slot is freed and assigned to this sender. +/** Buffered channels are channels with an internal value buffer (represented internally as an array with positive + * size). They have the following semantics: + * - [[Channel.send send]], when the buffer is not full, appends the value to the buffer and success immediately. + * - [[Channel.send send]], when the buffer is full, suspends until some buffer slot is freed and assigned to this + * sender. + * + * See [[BufferedChannel$.apply]] for creation of buffered channels. */ trait BufferedChannel[T] extends Channel[T] -/** UnboundedChannel are buffered channels that do not bound the number of items in the channel. In other words, the - * buffer is treated as never being full and will expand as needed. +/** Unbounded channels are buffered channels that do not have an upper bound on the number of items in the channel. In + * other words, the buffer is treated as never being full and will expand as needed. + * + * See [[UnboundedChannel$.apply]] for creation of unbounded channels. */ trait UnboundedChannel[T] extends BufferedChannel[T]: - /** Send the item immediately. Throws [[ChannelClosedException]] if the channel is closed. */ + /** Sends the item immediately. + * + * @throws ChannelClosedException + * if the channel is closed. + */ def sendImmediately(x: T): Unit -/** This exception is being raised by [[Channel.send]] on closed [[Channel]], it is also returned wrapped in `Failure` - * when reading form a closed channel. [[ChannelMultiplexer]] sends `Failure(ChannelClosedException)` to all - * subscribers when it receives a `close()` signal. +/** The exception raised by [[Channel.send send]] (or [[UnboundedChannel.sendImmediately]]) on a closed [[Channel]]. + * + * It is also returned wrapped in `Failure` when reading form a closed channel. [[ChannelMultiplexer]] sends + * `Failure(ChannelClosedException)` to all subscribers when it receives a `close()` signal. */ class ChannelClosedException extends Exception object SyncChannel: + /** Creates a new [[SyncChannel]]. */ def apply[T](): SyncChannel[T] = Impl() private class Impl[T] extends Channel.Impl[T] with SyncChannel[T]: @@ -99,6 +152,7 @@ end SyncChannel object BufferedChannel: /** Create a new buffered channel with the given buffer size. */ def apply[T](size: Int = 10): BufferedChannel[T] = Impl(size) + private class Impl[T](size: Int) extends Channel.Impl[T] with BufferedChannel[T]: require(size > 0, "Buffered channels must have a buffer size greater than 0") val buf = new mutable.Queue[T](size) @@ -131,6 +185,7 @@ object BufferedChannel: end BufferedChannel object UnboundedChannel: + /** Creates a new [[UnboundedChannel]]. */ def apply[T](): UnboundedChannel[T] = Impl[T]() private final class Impl[T]() extends Channel.Impl[T] with UnboundedChannel[T] { @@ -309,11 +364,11 @@ end Channel * all messages sent by the publishers. The only guarantee on the order of the values the subscribers see is that * values from the same publisher will arrive in order. * - * Channel multiplexer can also be closed, in that case all subscribers will receive Failure(ChannelClosedException) + * Channel multiplexer can also be closed, in that case all subscribers will receive `Failure(ChannelClosedException)` * but no attempt at closing either publishers or subscribers will be made. */ trait ChannelMultiplexer[T] extends java.io.Closeable: - /** Run the multiplexer synchronously. This call only returns after this multiplexer has been cancelled. */ + /** Run the multiplexer. Returns after this multiplexer has been cancelled. */ def run()(using Async): Unit def addPublisher(c: ReadableChannel[T]): Unit diff --git a/shared/src/main/scala/async/futures.scala b/shared/src/main/scala/async/futures.scala index e767c060..abc665e8 100644 --- a/shared/src/main/scala/async/futures.scala +++ b/shared/src/main/scala/async/futures.scala @@ -1,26 +1,46 @@ package gears.async -import TaskSchedule.ExponentialBackoff -import AsyncOperations.sleep - import scala.collection.mutable -import mutable.ListBuffer - import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.CancellationException import scala.compiletime.uninitialized -import scala.util.{Failure, Success, Try} import scala.annotation.unchecked.uncheckedVariance import scala.annotation.tailrec import scala.util +import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal -/** A cancellable future that can suspend waiting for other asynchronous sources +/** Futures are [[Async.Source Source]]s that has the following properties: + * - They represent a single value: Once resolved, [[Async.await await]]-ing on a [[Future]] should always return the + * same value. + * - They can potentially be cancelled, via [[Cancellable.cancel the cancel method]]. + * + * There are two kinds of futures, active and passive. + * - '''Active''' futures are ones that are spawned with [[Future.apply]] and [[Task.start]]. They require the + * [[Async.Spawn]] context, and run on their own (as long as the [[Async.Spawn]] scope has not ended). Active + * futures represent concurrent computations within Gear's structured concurrency tree. Idiomatic Gears code should + * ''never'' return active futures. Should a function be async (i.e. takes an [[Async]] context parameter), they + * should return values or throw exceptions directly. + * - '''Passive''' futures are ones that are created by [[Future.Promise]] (through + * [[Future.Promise.asFuture asFuture]]) and [[Future.withResolver]]. They represent yet-arrived values coming from + * ''outside'' of Gear's structured concurrency tree (for example, from network or the file system, or even from + * another concurrency system like [[scala.concurrent.Future Scala standard library futures]]). Idiomatic Gears + * libraries should return this kind of [[Future]] if deemed neccessary, but functions returning passive futures + * should ''not'' take an [[Async]] context. + * + * @see + * [[Future.apply]] and [[Task.start]] for creating active futures. + * @see + * [[Future.Promise]] and [[Future.withResolver]] for creating passive futures. + * @see + * [[Future.awaitAll]], [[Future.awaitFirst]] and [[Future.Collector]] for tools to work with multiple futures. + * @see + * [[ScalaConverters.asGears]] and [[ScalaConverters.asScala]] for converting between Scala futures and Gears + * futures. */ trait Future[+T] extends Async.OriginalSource[Try[T]], Cancellable object Future: - /** A future that is completed explicitly by calling its `complete` method. There are three public implementations * * - RunnableFuture: Completion is done by running a block of code @@ -89,6 +109,11 @@ object Future: */ private class RunnableFuture[+T](body: Async.Spawn ?=> T)(using ac: Async) extends CoreFuture[T]: + /** RunnableFuture maintains its own inner [[CompletionGroup]], that is separated from the provided Async + * instance's. When the future is cancelled, we only cancel this CompletionGroup. This effectively means any + * `.await` operations within the future is cancelled *only if they link into this group*. The future body run with + * this inner group by default, but it can always opt-out (e.g. with [[uninterruptible]]). + */ private var innerGroup: CompletionGroup = CompletionGroup() private def checkCancellation(): Unit = @@ -150,13 +175,13 @@ object Future: end RunnableFuture - /** Create a future that asynchronously executes [[body]] that defines its result value in a [[Try]] or returns - * [[Failure]] if an exception was thrown. + /** Create a future that asynchronously executes `body` that wraps its execution in a [[scala.util.Try]]. The returned + * future is linked to the given [[Async.Spawn]] scope by default, i.e. it is cancelled when this scope ends. */ def apply[T](body: Async.Spawn ?=> T)(using async: Async, spawnable: Async.Spawn & async.type): Future[T] = RunnableFuture(body) - /** A future that immediately terminates with the given result. */ + /** A future that is immediately completed with the given result. */ def now[T](result: Try[T]): Future[T] = val f = CoreFuture[T]() f.complete(result) @@ -172,7 +197,6 @@ object Future: inline def rejected(exception: Throwable): Future[Nothing] = now(Failure(exception)) extension [T](f1: Future[T]) - /** Parallel composition of two futures. If both futures succeed, succeed with their values in a pair. Otherwise, * fail with the failure that was returned first. */ @@ -190,23 +214,24 @@ object Future: case Right(Failure(ex)) => r.reject(ex) }) - /** Parallel composition of tuples of futures. Future.Success(EmptyTuple) might be treated as Nil. - */ - def *:[U <: Tuple](f2: Future[U]): Future[T *: U] = Future.withResolver: r => - Async - .either(f1, f2) - .onComplete(Listener { (v, _) => - v match - case Left(Success(x1)) => - f2.onComplete(Listener { (x2, _) => r.complete(x2.map(x1 *: _)) }) - case Right(Success(x2)) => - f1.onComplete(Listener { (x1, _) => r.complete(x1.map(_ *: x2)) }) - case Left(Failure(ex)) => r.reject(ex) - case Right(Failure(ex)) => r.reject(ex) - }) + // /** Parallel composition of tuples of futures. Disabled since scaladoc is crashing with it. (https://github.com/scala/scala3/issues/19925) */ + // def *:[U <: Tuple](f2: Future[U]): Future[T *: U] = Future.withResolver: r => + // Async + // .either(f1, f2) + // .onComplete(Listener { (v, _) => + // v match + // case Left(Success(x1)) => + // f2.onComplete(Listener { (x2, _) => r.complete(x2.map(x1 *: _)) }) + // case Right(Success(x2)) => + // f1.onComplete(Listener { (x1, _) => r.complete(x1.map(_ *: x2)) }) + // case Left(Failure(ex)) => r.reject(ex) + // case Right(Failure(ex)) => r.reject(ex) + // }) /** Alternative parallel composition of this task with `other` task. If either task succeeds, succeed with the * success that was returned first. Otherwise, fail with the failure that was returned last. + * @see + * [[orWithCancel]] for an alternative version where the slower future is cancelled. */ def or(f2: Future[T]): Future[T] = orImpl(false)(f2) @@ -229,7 +254,11 @@ object Future: end extension - /** A promise defines a future that is be completed via the `complete` method. + /** A promise is a [[Future]] that is be completed manually via the `complete` method. + * @see + * [[Promise$.apply]] to create a new, empty promise. + * @see + * [[Future.withResolver]] to create a passive [[Future]] from callback-style asynchronous calls. */ trait Promise[T] extends Future[T]: inline def asFuture: Future[T] = this @@ -238,6 +267,7 @@ object Future: def complete(result: Try[T]): Unit object Promise: + /** Create a new, unresolved [[Promise]]. */ def apply[T](): Promise[T] = new CoreFuture[T] with Promise[T]: override def cancel(): Unit = @@ -292,7 +322,21 @@ object Future: future end withResolver - /** Collects a list of futures into a channel of futures, arriving as they finish. */ + /** Collects a list of futures into a channel of futures, arriving as they finish. + * @example + * {{{ + * // Sleep sort + * val futs = numbers.map(i => Future(sleep(i.millis))) + * val collector = Collector(futs*) + * + * val output = mutable.ArrayBuffer[Int]() + * for i <- 1 to futs.size: + * output += collector.results.read().await + * }}} + * @see + * [[Future.awaitAll]] and [[Future.awaitFirst]] for simple usage of the collectors to get all results or the first + * succeeding one. + */ class Collector[T](futures: Future[T]*): private val ch = UnboundedChannel[Future[T]]() @@ -389,7 +433,7 @@ class Task[+T](val body: (Async, AsyncOperations) ?=> T): if (maxRepetitions == 1) ret else { while (maxRepetitions == 0 || repetitions < maxRepetitions) { - sleep(millis) + AsyncOperations.sleep(millis) ret = body repetitions += 1 } @@ -408,7 +452,7 @@ class Task[+T](val body: (Async, AsyncOperations) ?=> T): else { var timeToSleep = millis while (maxRepetitions == 0 || repetitions < maxRepetitions) { - sleep(timeToSleep) + AsyncOperations.sleep(timeToSleep) timeToSleep *= exponentialBase ret = body repetitions += 1 @@ -427,7 +471,7 @@ class Task[+T](val body: (Async, AsyncOperations) ?=> T): repetitions += 1 if (maxRepetitions == 1) ret else { - sleep(millis) + AsyncOperations.sleep(millis) ret = body repetitions += 1 if (maxRepetitions == 2) ret @@ -436,7 +480,7 @@ class Task[+T](val body: (Async, AsyncOperations) ?=> T): val aOld = a a = b b = aOld + b - sleep(b * millis) + AsyncOperations.sleep(b * millis) ret = body repetitions += 1 } @@ -451,7 +495,7 @@ class Task[+T](val body: (Async, AsyncOperations) ?=> T): @tailrec def helper(repetitions: Long = 0): T = if (repetitions > 0 && millis > 0) - sleep(millis) + AsyncOperations.sleep(millis) val ret: T = body ret match { case Failure(_) => ret @@ -467,7 +511,7 @@ class Task[+T](val body: (Async, AsyncOperations) ?=> T): @tailrec def helper(repetitions: Long = 0): T = if (repetitions > 0 && millis > 0) - sleep(millis) + AsyncOperations.sleep(millis) val ret: T = body ret match { case Success(_) => ret @@ -480,6 +524,11 @@ class Task[+T](val body: (Async, AsyncOperations) ?=> T): end Task +/** Runs the `body` inside in an [[Async]] context that does *not* propagate cancellation until the end. + * + * In other words, `body` is never notified of the cancellation of the `ac` context; but `uninterruptible` would still + * throw a [[CancellationException]] ''after `body` finishes running'' if `ac` was cancelled. + */ def uninterruptible[T](body: Async ?=> T)(using ac: Async): T = val tracker = Cancellable.Tracking().link() @@ -492,7 +541,12 @@ def uninterruptible[T](body: Async ?=> T)(using ac: Async): T = if tracker.isCancelled then throw new CancellationException() r -def cancellationScope[T](cancel: Cancellable)(fn: => T)(using a: Async): T = - cancel.link() +/** Link `cancellable` to the completion group of the current [[Async]] context during `fn`. + * + * If the [[Async]] context is cancelled during the execution of `fn`, `cancellable` will also be immediately + * cancelled. + */ +def cancellationScope[T](cancellable: Cancellable)(fn: => T)(using a: Async): T = + cancellable.link() try fn - finally cancel.unlink() + finally cancellable.unlink() diff --git a/shared/src/main/scala/async/listeners/locking.scala b/shared/src/main/scala/async/listeners/locking.scala index 213e7bfd..d7902e2d 100644 --- a/shared/src/main/scala/async/listeners/locking.scala +++ b/shared/src/main/scala/async/listeners/locking.scala @@ -5,9 +5,7 @@ import gears.async._ import Listener.ListenerLock import scala.annotation.tailrec -/** Two listeners being locked at the same time, while holding the same lock on their listener chains. This happens if - * you attempt to lockBoth two listeners with a common downstream listener, e.g., two derived listeners of the same - * race. +/** Two listeners being locked at the same time, while having the same [[Listener.ListenerLock.selfNumber lock number]]. */ case class ConflictingLocksException( listeners: (Listener[?], Listener[?]) @@ -16,10 +14,11 @@ case class ConflictingLocksException( /** Attempt to lock both listeners belonging to possibly different sources at the same time. Lock orders are respected * by comparing numbers on every step. * - * Returns [[Locked]] on success, or the listener that fails first. + * Returns `true` on success, or the listener that fails first. * - * In the case that two locks sharing the same number is encountered, [[ConflictingLocksException]] is thrown with the - * base listeners and conflicting listeners. + * @throws ConflictingLocksException + * In the case that two locks sharing the same number is encountered, this exception is thrown with the conflicting + * listeners. */ def lockBoth[T, U]( lt: Listener[T], diff --git a/shared/src/main/scala/async/package.scala b/shared/src/main/scala/async/package.scala index 3ed9eb08..1b97ea02 100644 --- a/shared/src/main/scala/async/package.scala +++ b/shared/src/main/scala/async/package.scala @@ -1,4 +1,12 @@ package gears +/** Asynchronous programming support with direct-style Scala. + * @see + * [[gears.async.Async]] for an introduction to the [[Async]] context and how to create them. + * @see + * [[gears.async.Future]] for a simple interface to spawn concurrent computations. + * @see + * [[gears.async.Channel]] for a simple inter-future communication primitive. + */ package object async: type CancellationException = java.util.concurrent.CancellationException diff --git a/shared/src/test/scala/ChannelBehavior.scala b/shared/src/test/scala/ChannelBehavior.scala index e2e565c1..afbc340a 100644 --- a/shared/src/test/scala/ChannelBehavior.scala +++ b/shared/src/test/scala/ChannelBehavior.scala @@ -11,7 +11,7 @@ import gears.async.{ } import gears.async.default.given import gears.async.AsyncOperations.* -import Future.{*:, zip} +import Future.zip import java.util.concurrent.CancellationException import scala.collection.mutable diff --git a/shared/src/test/scala/FutureBehavior.scala b/shared/src/test/scala/FutureBehavior.scala index cfc754f1..9aa53d92 100644 --- a/shared/src/test/scala/FutureBehavior.scala +++ b/shared/src/test/scala/FutureBehavior.scala @@ -1,6 +1,6 @@ import gears.async.{Async, Future, Task, TaskSchedule, uninterruptible} import gears.async.default.given -import gears.async.Future.{*:, Promise, zip} +import gears.async.Future.{Promise, zip} import gears.async.AsyncOperations.* import java.util.concurrent.CancellationException @@ -215,40 +215,40 @@ class FutureBehavior extends munit.FunSuite { assertEquals(zombieModifiedThis, true) } - test("zip on tuples with EmptyTuple") { - Async.blocking: - val z1 = Future { sleep(500); 10 } *: Future { sleep(10); 222 } *: Future { sleep(150); 333 } *: Future { - EmptyTuple - } - assertEquals(z1.await, (10, 222, 333)) - } - - test("zip on tuples with last zip") { - Async.blocking: - val z1 = Future { 10 } *: Future { 222 }.zip(Future { 333 }) - assertEquals(z1.await, (10, 222, 333)) - } - - test("zip(3) first error") { - for (_ <- 1 to 20) - Async.blocking: - val e1 = AssertionError(111) - val e2 = AssertionError(211) - val e3 = AssertionError(311) - assertEquals( - (Future { - sleep(Random.between(200, 300)); - throw e1 - } *: Future { - sleep(Random.between(200, 300)); - throw e2 - } *: Future { - sleep(Random.between(50, 100)); - throw e3 - } *: Future.now(Success(EmptyTuple))).awaitResult, - Failure(e3) - ) - } + // test("zip on tuples with EmptyTuple") { + // Async.blocking: + // val z1 = Future { sleep(500); 10 } *: Future { sleep(10); 222 } *: Future { sleep(150); 333 } *: Future { + // EmptyTuple + // } + // assertEquals(z1.await, (10, 222, 333)) + // } + + // test("zip on tuples with last zip") { + // Async.blocking: + // val z1 = Future { 10 } *: Future { 222 }.zip(Future { 333 }) + // assertEquals(z1.await, (10, 222, 333)) + // } + + // test("zip(3) first error") { + // for (_ <- 1 to 20) + // Async.blocking: + // val e1 = AssertionError(111) + // val e2 = AssertionError(211) + // val e3 = AssertionError(311) + // assertEquals( + // (Future { + // sleep(Random.between(200, 300)); + // throw e1 + // } *: Future { + // sleep(Random.between(200, 300)); + // throw e2 + // } *: Future { + // sleep(Random.between(50, 100)); + // throw e3 + // } *: Future.now(Success(EmptyTuple))).awaitResult, + // Failure(e3) + // ) + // } test("cancelled futures return the same constant CancellationException with no stack attached".ignore) { Async.blocking: @@ -434,4 +434,17 @@ class FutureBehavior extends munit.FunSuite { assert(!lastFutureFinished) } + test("uninterruptible should continue even when Future is cancelled") { + Async.blocking: + val ch = gears.async.UnboundedChannel[Int]() + val reader = Future: + gears.async.uninterruptible: + val i = ch.read().right.get + println(i) + reader.cancel() + ch.sendImmediately(1) + ch.sendImmediately(2) + reader.awaitResult + assertEquals(ch.read(), Right(2)) + } } diff --git a/shared/src/test/scala/RetryBehavior.scala b/shared/src/test/scala/RetryBehavior.scala index fe161bac..53a791b0 100644 --- a/shared/src/test/scala/RetryBehavior.scala +++ b/shared/src/test/scala/RetryBehavior.scala @@ -3,7 +3,7 @@ import Retry.Delay import scala.concurrent.duration.* import FiniteDuration as Duration import gears.async.default.given -import Future.{*:, zip} +import Future.zip import scala.concurrent.ExecutionContext import scala.util.{Failure, Success, Try} diff --git a/shared/src/test/scala/TaskScheduleBehavior.scala b/shared/src/test/scala/TaskScheduleBehavior.scala index 5247af49..06e26253 100644 --- a/shared/src/test/scala/TaskScheduleBehavior.scala +++ b/shared/src/test/scala/TaskScheduleBehavior.scala @@ -1,6 +1,6 @@ import gears.async.{Async, Future, Task, TaskSchedule} import gears.async.default.given -import Future.{*:, zip} +import Future.zip import scala.concurrent.ExecutionContext import scala.util.{Failure, Success, Try}