Skip to content

Commit

Permalink
minor remarks
Browse files Browse the repository at this point in the history
  • Loading branch information
m8nmueller committed Dec 8, 2023
1 parent 8f41bc3 commit 25a1300
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 30 deletions.
32 changes: 14 additions & 18 deletions shared/src/main/scala/async/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ object Async:
/** Create a [[Source]] containing the given values, resolved once for each. */
def values[T](values: T*) =
import scala.collection.JavaConverters._
val q = java.util.concurrent.ConcurrentLinkedQueue[T]()
q.addAll(values.asJavaCollection)
val q = java.util.concurrent.ConcurrentLinkedQueue[T](values.asJavaCollection)
new Source[T]:
override def poll(k: Listener[T]): Boolean =
if q.isEmpty() then return false
Expand Down Expand Up @@ -174,8 +173,8 @@ object Async:
def dropListener(k: Listener[U]): Unit =
src.dropListener(transform(k))

def race[T](sources: Source[T]*) = raceImpl[T, T]((v, _) => v)(sources*)
def raceWithOrigin[T](sources: Source[T]*) =
def race[T](sources: Source[T]*): Source[T] = raceImpl[T, T]((v, _) => v)(sources*)
def raceWithOrigin[T](sources: Source[T]*): Source[(T, Source[T])] =
raceImpl[(T, Source[T]), T]((v, src) => (v, src))(sources*)

/** Pass first result from any of `sources` to the continuation */
Expand All @@ -185,13 +184,14 @@ object Async:
val it = sources.iterator
var found = false

val listener = new Listener.ForwardingListener[U](this, k):
val lock = withLock(k) { inner => new ListenerLockWrapper(inner, selfSrc) }
def complete(data: U, source: Async.Source[U]) =
k.complete(map(data, source), selfSrc)
end listener

while it.hasNext && !found do
found = it.next.poll(
new Listener.ForwardingListener[U](this, k):
val lock = withLock(k) { inner => new ListenerLockWrapper(inner, selfSrc) }
def complete(data: U, source: Async.Source[U]) =
k.complete(map(data, source), selfSrc)
)
found = it.next.poll(listener)
found

def onComplete(k: Listener[T]): Unit =
Expand Down Expand Up @@ -245,23 +245,19 @@ object Async:
}
end raceImpl

/** Cases for handling async sources in a [[select]]. [[SelectCase]] can be constructed by extension methods `handle`
* and `handleVal` of [[Source]].
/** Cases for handling async sources in a [[select]]. [[SelectCase]] can be
* constructed by extension methods `handle` of [[Source]].
*/
opaque type SelectCase[T] = (Source[?], Nothing => T)
// ^ unsafe types, but we only construct SelectCase from `handle` and `handleVal` which are safe
// ^ unsafe types, but we only construct SelectCase from `handle` which is safe

extension [T](src: Source[T])
/** Attach a handler to [[src]], creating a [[SelectCase]]. */
inline def handle[U](f: T => U): SelectCase[U] = (src, f)

/** Alias for [[handle]] */
inline def ~~>[U](f: T => U): SelectCase[U] = src.handle(f)

// /** Attach a handler to [[src]] that takes a [[T]] and throws if [[Failure]] was returned from the source, creating
// * a [[SelectCase]].
// */
// inline def handleVal[U](f: T => U): SelectCase[U] = (src, t => f(t.get))

/** Race a list of sources with the corresponding handler functions, once an item has come back. Like [[race]],
* [[select]] guarantees exactly one of the sources are polled. Unlike `map`ping a [[Source]], the handler in
* [[select]] is run in the same async context as the calling context of [[select]].
Expand Down
19 changes: 9 additions & 10 deletions shared/src/main/scala/async/channels.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ private type Res[T] = Either[Closed, T]

/** The part of a channel one can send values to. Blocking behavior depends on the implementation.
*/
trait SendableChannel[T]:
trait SendableChannel[-T]:
/** Create an [[Async.Source]] representing the send action of value [[x]]. Note that *each* listener attached to and
* accepting a [[Sent]] value corresponds to [[x]] being sent once.
*
Expand All @@ -34,12 +34,12 @@ trait SendableChannel[T]:
*/
def send(x: T)(using Async): Unit = Async.await(sendSource(x)) match
case Right(_) => ()
case Left(_) => throw channelClosedException
case Left(_) => throw ChannelClosedException()
end SendableChannel

/** The part of a channel one can read values from. Blocking behavior depends on the implementation.
*/
trait ReadableChannel[T]:
trait ReadableChannel[+T]:
/** An [[Async.Source]] corresponding to items being sent over the channel. Note that *each* listener attached to and
* accepting a [[Read]] value corresponds to one value received over the channel.
*
Expand Down Expand Up @@ -88,7 +88,6 @@ trait UnboundedChannel[T] extends BufferedChannel[T]:
* subscribers when it receives a `close()` signal.
*/
class ChannelClosedException extends Exception
private val channelClosedException = ChannelClosedException()

object SyncChannel:
def apply[T](): SyncChannel[T] = Impl()
Expand Down Expand Up @@ -147,7 +146,7 @@ object UnboundedChannel:
override def sendImmediately(x: T): Unit =
var result: SendResult = Left(Closed)
pollSend(CanSend(x), acceptingListener((r, _) => result = r))
if result.isLeft then throw channelClosedException
if result.isLeft then throw ChannelClosedException()

override def pollRead(r: Reader): Boolean = synchronized:
if checkClosed(readSource, r) then true
Expand Down Expand Up @@ -327,7 +326,7 @@ object ChannelMultiplexer:
case Left(_) | Right(Message.Quit) =>
ChannelMultiplexer.this.synchronized:
subscribersCopy = subscribers.toList
for (s <- subscribersCopy) s.send(Failure(channelClosedException))
for (s <- subscribersCopy) s.send(Failure(ChannelClosedException()))
shouldTerminate = true
case Right(Message.Refresh) => ()
}) +:
Expand Down Expand Up @@ -359,24 +358,24 @@ object ChannelMultiplexer:

override def removePublisher(c: ReadableChannel[T]): Unit =
ChannelMultiplexer.this.synchronized:
if (isClosed) throw channelClosedException
if (isClosed) throw ChannelClosedException()
publishers -= c
infoChannel.sendImmediately(Message.Refresh)

override def removeSubscriber(c: SendableChannel[Try[T]]): Unit =
ChannelMultiplexer.this.synchronized:
if (isClosed) throw channelClosedException
if (isClosed) throw ChannelClosedException()
subscribers -= c

override def addPublisher(c: ReadableChannel[T]): Unit =
ChannelMultiplexer.this.synchronized:
if (isClosed) throw channelClosedException
if (isClosed) throw ChannelClosedException()
publishers += c
infoChannel.sendImmediately(Message.Refresh)

override def addSubscriber(c: SendableChannel[Try[T]]): Unit =
ChannelMultiplexer.this.synchronized:
if (isClosed) throw channelClosedException
if (isClosed) throw ChannelClosedException()
subscribers += c

end ChannelMultiplexer
5 changes: 3 additions & 2 deletions shared/src/main/scala/async/futures.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import java.util.concurrent.CancellationException
import scala.annotation.tailrec
import scala.util
import java.util.concurrent.atomic.AtomicLong
import scala.util.control.NonFatal

/** A cancellable future that can suspend waiting for other asynchronous sources
*/
Expand Down Expand Up @@ -294,7 +295,7 @@ object Future:
/** Like [[Collector]], but exposes the ability to add futures after creation. */
class MutableCollector[T](futures: Future[T]*) extends Collector[T](futures*):
/** Add a new [[Future]] into the collector. */
def add(future: Future[T]) = addFuture(future)
inline def add(future: Future[T]) = addFuture(future)
inline def +=(future: Future[T]) = add(future)

extension [T](fs: Seq[Future[T]])
Expand All @@ -311,7 +312,7 @@ object Future:
for _ <- fs do collector.results.read().right.get.value
fs.map(_.value)
catch
case e: Exception =>
case NonFatal(e) =>
fs.foreach(_.cancel())
throw e

Expand Down

0 comments on commit 25a1300

Please sign in to comment.