Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add/fix lots of documentation #52

Merged
merged 8 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 150 additions & 34 deletions shared/src/main/scala/async/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,40 @@ import gears.async.Listener.withLock
import gears.async.Listener.NumberedLock
import scala.util.boundary

/** A context that allows to suspend waiting for asynchronous data sources
/** The async context: provides the capability to asynchronously [[Async.await await]] for [[Async.Source Source]]s, and
* defines a scope for structured concurrency through a [[CompletionGroup]].
*
* As both a context and a capability, the idiomatic way of using [[Async]] is to be implicitly passed around
* functions, as an `using` parameter:
* {{{
* def function()(using Async): T = ???
* }}}
*
* It is not recommended to store [[Async]] in a class field, since it complicates scoping rules.
*
* @param support
* An implementation of the underlying asynchronous operations (suspend and resume). See [[AsyncSupport]].
* @param scheduler
* An implementation of a scheduler, for scheduling computation as they are spawned or resumed. See [[Scheduler]].
*
* @see
* [[Async$.blocking Async.blocking]] for a way to construct an [[Async]] instance.
* @see
* [[Async$.group Async.group]] and [[Future$.apply Future.apply]] for [[Async]]-subscoping operations.
*/
trait Async(using val support: AsyncSupport, val scheduler: support.Scheduler):
/** Wait for completion of async source `src` and return the result */
/** Waits for completion of source `src` and returns the result. Suspends the computation.
*
* @see
* [[Async.Source.awaitResult]] and [[Async$.await]] for extension methods calling [[Async!.await]] from the source
* itself.
*/
def await[T](src: Async.Source[T]): T

/** The cancellation group for this Async */
/** Returns the cancellation group for this [[Async]] context. */
def group: CompletionGroup

/** An Async of the same kind as this one, with a new cancellation group */
/** Returns an [[Async]] context of the same kind as this one, with a new cancellation group. */
def withGroup(group: CompletionGroup): Async

object Async:
Expand Down Expand Up @@ -53,7 +77,7 @@ object Async:
def blocking[T](body: Async.Spawn ?=> T)(using support: AsyncSupport, scheduler: support.Scheduler): T =
group(body)(using Blocking(CompletionGroup.Unlinked))

/** The currently executing Async context */
/** Returns the currently executing Async context. Equivalent to `summon[Async]`. */
inline def current(using async: Async): Async = async

/** [[Async.Spawn]] is a special subtype of [[Async]], also capable of spawning runnable [[Future]]s.
Expand All @@ -63,8 +87,8 @@ object Async:
*/
opaque type Spawn <: Async = Async

/** Runs [[body]] inside a spawnable context where it is allowed to spawning concurrently runnable [[Future]]s. When
* the body returns, all spawned futures are cancelled and waited for.
/** Runs `body` inside a spawnable context where it is allowed to spawn concurrently runnable [[Future]]s. When the
* body returns, all spawned futures are cancelled and waited for.
*/
def group[T](body: Async.Spawn ?=> T)(using Async): T =
withNewCompletionGroup(CompletionGroup().link())(body)
Expand All @@ -86,51 +110,85 @@ object Async:
group.waitCompletion()(using completionAsync)

/** An asynchronous data source. Sources can be persistent or ephemeral. A persistent source will always pass same
* data to calls of `poll and `onComplete`. An ephemeral source can pass new data in every call. An example of a
* persistent source is `Future`. An example of an ephemeral source is `Channel`.
* data to calls of [[Source!.poll]] and [[Source!.onComplete]]. An ephemeral source can pass new data in every call.
*
* @see
* An example of a persistent source is [[gears.async.Future]].
* @see
* An example of an ephemeral source is [[gears.async.Channel]].
*/
trait Source[+T]:

/** Check whether data is available at present and pass it to k if so. If no element is available, does not lock k
* and returns false immediately. If there is (or may be) data available, the listener is locked and if it fails,
* true is returned to signal this source's general availability. If locking k succeeds, only return true iff k's
* complete is called. Calls to `poll` are always synchronous.
/** Checks whether data is available at present and pass it to `k` if so. Calls to `poll` are always synchronous and
* non-blocking.
*
* The process is as follows:
* - If no data is immediately available, return `false` immediately.
* - If there is data available, attempt to lock `k`.
* - If `k` is no longer available, `true` is returned to signal this source's general availability.
* - If locking `k` succeeds:
* - If data is still available, complete `k` and return true.
* - Otherwise, unlock `k` and return false.
*
* Note that in all cases, a return value of `false` indicates that `k` should be put into `onComplete` to receive
* data in a later point in time.
*
* @return
* Whether poll was able to pass data to `k`. Note that this is regardless of `k` being available to receive the
* data. In most cases, one should pass `k` into [[Source!.onComplete]] if `poll` returns `false`.
*/
def poll(k: Listener[T]): Boolean

/** Once data is available, pass it to function `k`. `k` returns true iff the data was consumed in an async block.
* Calls to `onComplete` are usually asynchronous, meaning that the passed continuation `k` is a suspension.
/** Once data is available, pass it to the listener `k`. `onComplete` is always non-blocking.
*
* Note that `k`'s methods will be executed on the same thread as the [[Source]], usually in sequence. It is hence
* important that the listener itself does not perform expensive operations.
*/
def onComplete(k: Listener[T]): Unit

/** Signal that listener `k` is dead (i.e. will always return `false` from now on). This permits original, (i.e.
* non-derived) sources like futures or channels to drop the listener from their waiting sets.
/** Signal that listener `k` is dead (i.e. will always fail to acquire locks from now on), and should be removed
* from `onComplete` queues.
*
* This permits original, (i.e. non-derived) sources like futures or channels to drop the listener from their
* waiting sets.
*/
def dropListener(k: Listener[T]): Unit

/** Utility method for direct polling. */
/** Similar to [[Async.Source!.poll(k:Listener[T])* poll]], but instead of passing in a listener, directly return
* the value `T` if it is available.
*/
def poll(): Option[T] =
var resultOpt: Option[T] = None
poll(Listener.acceptingListener { (x, _) => resultOpt = Some(x) })
resultOpt

/** Utility method for direct waiting with `Async`. */
/** Waits for an item to arrive from the source. Suspends until an item returns.
*
* This is an utility method for direct waiting with `Async`, instead of going through listeners.
*/
final def awaitResult(using ac: Async) = ac.await(this)
end Source

extension [T](src: Source[scala.util.Try[T]])
/** Waits for an item to arrive from the source, then automatically unwraps it. */
/** Waits for an item to arrive from the source, then automatically unwraps it. Suspends until an item returns.
* @see
* [[Source!.awaitResult awaitResult]] for non-unwrapping await.
*/
inline def await(using Async) = src.awaitResult.get
extension [E, T](src: Source[Either[E, T]])
/** Waits for an item to arrive from the source, then automatically unwraps it. */
/** Waits for an item to arrive from the source, then automatically unwraps it. Suspends until an item returns.
* @see
* [[Source!.awaitResult awaitResult]] for non-unwrapping await.
*/
inline def await(using Async) = src.awaitResult.right.get

/** An original source has a standard definition of `onComplete` in terms of `poll` and `addListener`. Implementations
* should be the resource owner to handle listener queue and completion using an object monitor on the instance.
/** An original source has a standard definition of [[Source.onComplete onComplete]] in terms of [[Source.poll poll]]
* and [[OriginalSource.addListener addListener]].
*
* Implementations should be the resource owner to handle listener queue and completion using an object monitor on
* the instance.
*/
abstract class OriginalSource[+T] extends Source[T]:

/** Add `k` to the listener set of this source */
/** Add `k` to the listener set of this source. */
protected def addListener(k: Listener[T]): Unit

def onComplete(k: Listener[T]): Unit = synchronized:
Expand All @@ -139,7 +197,12 @@ object Async:
end OriginalSource

object Source:
/** Create a [[Source]] containing the given values, resolved once for each. */
/** Create a [[Source]] containing the given values, resolved once for each.
*
* @return
* an ephemeral source of values arriving to listeners in a queue. Once all values are received, attaching a
* listener with [[Source!.onComplete onComplete]] will be a no-op (i.e. the listener will never be called).
*/
def values[T](values: T*) =
import scala.collection.JavaConverters._
val q = java.util.concurrent.ConcurrentLinkedQueue[T]()
Expand All @@ -163,8 +226,14 @@ object Async:

extension [T](src: Source[T])
/** Create a new source that requires the original source to run the given transformation function on every value
* received. Note that [[f]] is **always** run on the computation that produces the values from the original
* source, so this is very likely to run **sequentially** and be a performance bottleneck.
* received.
*
* Note that `f` is **always** run on the computation that produces the values from the original source, so this is
* very likely to run **sequentially** and be a performance bottleneck.
*
* @param f
* the transformation function to be run on every value. `f` is run *before* the item is passed to the
* [[Listener]].
*/
def transformValuesWith[U](f: T => U) =
new Source[U]:
Expand All @@ -182,7 +251,23 @@ object Async:
def dropListener(k: Listener[U]): Unit =
src.dropListener(transform(k))

/** Creates a source that "races" a list of sources.
*
* Listeners attached to this source is resolved with the first item arriving from one of the sources. If multiple
* sources are available at the same time, one of the items will be returned with no priority. Items that are not
* returned are '''not''' consumed from the upstream sources.
*
* @see
* [[raceWithOrigin]] for a race source that also returns the upstream origin of the item.
* @see
* [[Async$.select Async.select]] for a convenient syntax to race sources and awaiting them with [[Async]].
*/
def race[T](sources: Source[T]*): Source[T] = raceImpl[T, T]((v, _) => v)(sources*)

/** Like [[race]], but the returned value includes a reference to the upstream source that the item came from.
* @see
* [[Async$.select Async.select]] for a convenient syntax to race sources and awaiting them with [[Async]].
*/
def raceWithOrigin[T](sources: Source[T]*): Source[(T, Source[T])] =
raceImpl[(T, Source[T]), T]((v, src) => (v, src))(sources*)

Expand Down Expand Up @@ -260,28 +345,59 @@ object Async:

/** Cases for handling async sources in a [[select]]. [[SelectCase]] can be constructed by extension methods `handle`
* of [[Source]].
*
* @see
* [[handle Source.handle]] (and its operator alias [[~~> ~~>]])
* @see
* [[Async$.select Async.select]] where [[SelectCase]] is used.
*/
opaque type SelectCase[T] = (Source[?], Nothing => T)
// ^ unsafe types, but we only construct SelectCase from `handle` which is safe

extension [T](src: Source[T])
/** Attach a handler to [[src]], creating a [[SelectCase]]. */
/** Attach a handler to `src`, creating a [[SelectCase]].
* @see
* [[Async$.select Async.select]] where [[SelectCase]] is used.
*/
inline def handle[U](f: T => U): SelectCase[U] = (src, f)

/** Alias for [[handle]] */
/** Alias for [[handle]]
* @see
* [[Async$.select Async.select]] where [[SelectCase]] is used.
*/
inline def ~~>[U](f: T => U): SelectCase[U] = src.handle(f)

/** Race a list of sources with the corresponding handler functions, once an item has come back. Like [[race]],
* [[select]] guarantees exactly one of the sources are polled. Unlike `map`ping a [[Source]], the handler in
* [[select]] guarantees exactly one of the sources are polled. Unlike [[transformValuesWith]], the handler in
* [[select]] is run in the same async context as the calling context of [[select]].
*
* @see
* [[handle Source.handle]] (and its operator alias [[~~> ~~>]]) for methods to create [[SelectCase]]s.
* @example
* {{{
* // Race a channel read with a timeout
* val ch = SyncChannel[Int]()
* // ...
* val timeout = Future(sleep(1500.millis))
*
* Async.select(
* ch.readSrc.handle: item =>
* Some(item * 2),
* timeout ~~> _ => None
* )
* }}}
*/
def select[T](cases: SelectCase[T]*)(using Async) =
val (input, which) = raceWithOrigin(cases.map(_._1)*).awaitResult
val (_, handler) = cases.find(_._1 == which).get
handler.asInstanceOf[input.type => T](input)

/** If left (respectively, right) source succeeds with `x`, pass `Left(x)`, (respectively, Right(x)) on to the
* continuation.
/** Race two sources, wrapping them respectively in [[Left]] and [[Right]] cases.
* @return
* a new [[Source]] that resolves with [[Left]] if `src1` returns an item, [[Right]] if `src2` returns an item,
* whichever comes first.
* @see
* [[race]] and [[select]] for racing more than two sources.
*/
def either[T1, T2](src1: Source[T1], src2: Source[T2]): Source[Either[T1, T2]] =
race(src1.transformValuesWith(Left(_)), src2.transformValuesWith(Right(_)))
Expand Down
13 changes: 9 additions & 4 deletions shared/src/main/scala/async/AsyncOperations.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,24 @@ import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.TimeoutException
import gears.async.AsyncOperations.sleep

/** Defines fundamental operations that require the support of the scheduler. This is commonly provided alongside with
* the given implementation of [[Scheduler]].
* @see
* [[Scheduler]] for the definition of the scheduler itself.
*/
trait AsyncOperations:
/** Suspends the current [[Async]] context for at least [[millis]] milliseconds. */
/** Suspends the current [[Async]] context for at least `millis` milliseconds. */
def sleep(millis: Long)(using Async): Unit

object AsyncOperations:
/** Suspends the current [[Async]] context for at least [[millis]] milliseconds.
/** Suspends the current [[Async]] context for at least `millis` milliseconds.
* @param millis
* The duration to suspend. Must be a positive integer.
* The duration to suspend, in milliseconds. Must be a positive integer.
*/
inline def sleep(millis: Long)(using AsyncOperations, Async): Unit =
summon[AsyncOperations].sleep(millis)

/** Suspends the current [[Async]] context for at least [[millis]] milliseconds.
/** Suspends the current [[Async]] context for `duration`.
* @param duration
* The duration to suspend. Must be positive.
*/
Expand Down
11 changes: 11 additions & 0 deletions shared/src/main/scala/async/AsyncSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@ package gears.async

import scala.concurrent.duration._

/** The delimited continuation suspension interface. Represents a suspended computation asking for a value of type `T`
* to continue (and eventually returning a value of type `R`).
*/
trait Suspension[-T, +R]:
def resume(arg: T): R

/** Support for suspension capabilities through a delimited continuation interface. */
trait SuspendSupport:
/** A marker for the "limit" of "delimited continuation". */
type Label[R]

/** The provided suspension type. */
type Suspension[-T, +R] <: gears.async.Suspension[T, R]

/** Set the suspension marker as the body's caller, and execute `body`. */
def boundary[R](body: Label[R] ?=> R): R

/** Should return immediately if resume is called from within body */
Expand All @@ -18,12 +26,15 @@ trait SuspendSupport:
trait AsyncSupport extends SuspendSupport:
type Scheduler <: gears.async.Scheduler

/** Resume a [[Suspension]] at some point in the future, scheduled by the scheduler. */
private[async] def resumeAsync[T, R](suspension: Suspension[T, R])(arg: T)(using s: Scheduler): Unit =
s.execute(() => suspension.resume(arg))

/** Schedule a computation with the suspension boundary already created. */
private[async] def scheduleBoundary(body: Label[Unit] ?=> Unit)(using s: Scheduler): Unit =
s.execute(() => boundary(body))

/** A scheduler implementation, with the ability to execute a computation immediately or after a delay. */
trait Scheduler:
def execute(body: Runnable): Unit
def schedule(delay: FiniteDuration, body: Runnable): Cancellable
Expand Down
3 changes: 2 additions & 1 deletion shared/src/main/scala/async/Cancellable.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package gears.async

/** A trait for cancellable entities that can be grouped */
/** A trait for cancellable entities that can be grouped. */
trait Cancellable:

private var group: CompletionGroup = CompletionGroup.Unlinked
Expand Down Expand Up @@ -28,6 +28,7 @@ trait Cancellable:
end Cancellable

object Cancellable:
/** A special [[Cancellable]] object that just tracks whether its linked group was cancelled. */
trait Tracking extends Cancellable:
def isCancelled: Boolean

Expand Down
1 change: 1 addition & 0 deletions shared/src/main/scala/async/CompletionGroup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class CompletionGroup extends Cancellable.Tracking:
members.toSeq
.foreach(_.cancel())

/** Wait for all members of the group to complete and unlink themselves. */
private[async] def waitCompletion()(using Async): Unit =
synchronized:
if members.nonEmpty && cancelWait.isEmpty then cancelWait = Some(Promise())
Expand Down
Loading
Loading