Skip to content

Commit

Permalink
More docs
Browse files Browse the repository at this point in the history
  • Loading branch information
natsukagami committed Mar 12, 2024
1 parent f58b9cc commit f96d361
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 2 deletions.
11 changes: 11 additions & 0 deletions shared/src/main/scala/async/AsyncSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@ package gears.async

import scala.concurrent.duration._

/** The delimited continuation suspension interface. Represents a suspended computation asking for a value of type `T`
* to continue (and eventually returning a value of type `R`).
*/
trait Suspension[-T, +R]:
def resume(arg: T): R

/** Support for suspension capabilities through a delimited continuation interface. */
trait SuspendSupport:
/** A marker for the "limit" of "delimited continuation". */
type Label[R]

/** The provided suspension type. */
type Suspension[-T, +R] <: gears.async.Suspension[T, R]

/** Set the suspension marker as the body's caller, and execute `body`. */
def boundary[R](body: Label[R] ?=> R): R

/** Should return immediately if resume is called from within body */
Expand All @@ -18,12 +26,15 @@ trait SuspendSupport:
trait AsyncSupport extends SuspendSupport:
type Scheduler <: gears.async.Scheduler

/** Resume a [[Suspension]] at some point in the future, scheduled by the scheduler. */
private[async] def resumeAsync[T, R](suspension: Suspension[T, R])(arg: T)(using s: Scheduler): Unit =
s.execute(() => suspension.resume(arg))

/** Schedule a computation with the suspension boundary already created. */
private[async] def scheduleBoundary(body: Label[Unit] ?=> Unit)(using s: Scheduler): Unit =
s.execute(() => boundary(body))

/** A scheduler implementation, with the ability to execute a computation immediately or after a delay. */
trait Scheduler:
def execute(body: Runnable): Unit
def schedule(delay: FiniteDuration, body: Runnable): Cancellable
Expand Down
3 changes: 2 additions & 1 deletion shared/src/main/scala/async/Cancellable.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package gears.async

/** A trait for cancellable entities that can be grouped */
/** A trait for cancellable entities that can be grouped. */
trait Cancellable:

private var group: CompletionGroup = CompletionGroup.Unlinked
Expand Down Expand Up @@ -28,6 +28,7 @@ trait Cancellable:
end Cancellable

object Cancellable:
/** A special [[Cancellable]] object that just tracks whether its linked group was cancelled. */
trait Tracking extends Cancellable:
def isCancelled: Boolean

Expand Down
1 change: 1 addition & 0 deletions shared/src/main/scala/async/CompletionGroup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class CompletionGroup extends Cancellable.Tracking:
members.toSeq
.foreach(_.cancel())

/** Wait for all members of the group to complete and unlink themselves. */
private[async] def waitCompletion()(using Async): Unit =
synchronized:
if members.nonEmpty && cancelWait.isEmpty then cancelWait = Some(Promise())
Expand Down
18 changes: 17 additions & 1 deletion shared/src/main/scala/async/futures.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import scala.util.control.NonFatal
* @see
* [[Future.Promise]] and [[Future.withResolver]] for creating passive futures.
* @see
* [[Future.awaitAll]], [[Future.awaitFirst]] and [[Future.Collector]] for tools to work with multiple futures.
* @see
* [[ScalaConverters.asGears]] and [[ScalaConverters.asScala]] for converting between Scala futures and Gears
* futures.
*/
Expand Down Expand Up @@ -320,7 +322,21 @@ object Future:
future
end withResolver

/** Collects a list of futures into a channel of futures, arriving as they finish. */
/** Collects a list of futures into a channel of futures, arriving as they finish.
* @example
* {{{
* // Sleep sort
* val futs = numbers.map(i => Future(sleep(i.millis)))
* val collector = Collector(futs*)
*
* val output = mutable.ArrayBuffer[Int]()
* for i <- 1 to futs.size:
* output += collector.results.read().await
* }}}
* @see
* [[Future.awaitAll]] and [[Future.awaitFirst]] for simple usage of the collectors to get all results or the first
* succeeding one.
*/
class Collector[T](futures: Future[T]*):
private val ch = UnboundedChannel[Future[T]]()

Expand Down
8 changes: 8 additions & 0 deletions shared/src/main/scala/async/package.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
package gears

/** Asynchronous programming support with direct-style Scala.
* @see
* [[gears.async.Async]] for an introduction to the [[Async]] context and how to create them.
* @see
* [[gears.async.Future]] for a simple interface to spawn concurrent computations.
* @see
* [[gears.async.Channel]] for a simple inter-future communication primitive.
*/
package object async:
type CancellationException = java.util.concurrent.CancellationException

0 comments on commit f96d361

Please sign in to comment.