Skip to content

Commit

Permalink
Merge pull request #3425 from fredfp/remove_printStackTrace
Browse files Browse the repository at this point in the history
reactive-streams: report errors using the ExecutionContext
  • Loading branch information
armanbilge authored Apr 23, 2024
2 parents 8f87f2b + 7fe473a commit 9213dce
Showing 1 changed file with 18 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,7 @@ object StreamSubscriber {
case object DownstreamCancellation extends State
case class UpstreamError(err: Throwable) extends State

def reportFailure(e: Throwable): Unit =
Thread.getDefaultUncaughtExceptionHandler match {
case null => e.printStackTrace()
case h => h.uncaughtException(Thread.currentThread(), e)
}

def step(in: Input): State => (State, () => Unit) =
def step(in: Input)(reportFailure: Throwable => Unit): State => (State, () => Unit) =
in match {
case OnSubscribe(s) => {
case RequestBeforeSubscription(req) =>
Expand Down Expand Up @@ -206,24 +200,25 @@ object StreamSubscriber {
}
}

F.delay(new AtomicReference[(State, () => Unit)]((Uninitialized, () => ()))).map { ref =>
new FSM[F, A] {
def nextState(in: Input): Unit = {
val (_, effect) = ref.updateAndGet { case (state, _) =>
step(in)(state)
}
effect()
for {
ref <- F.delay(new AtomicReference[(State, () => Unit)]((Uninitialized, () => ())))
executionContext <- F.executionContext
} yield new FSM[F, A] {
def nextState(in: Input): Unit = {
val (_, effect) = ref.updateAndGet { case (state, _) =>
step(in)(executionContext.reportFailure)(state)
}
def onSubscribe(s: Subscription): Unit = nextState(OnSubscribe(s))
def onNext(a: A): Unit = nextState(OnNext(a))
def onError(t: Throwable): Unit = nextState(OnError(t))
def onComplete(): Unit = nextState(OnComplete)
def onFinalize: F[Unit] = F.delay(nextState(OnFinalize))
def dequeue1: F[Either[Throwable, Option[Chunk[A]]]] =
F.async_[Either[Throwable, Option[Chunk[A]]]] { cb =>
nextState(OnDequeue(out => cb(Right(out))))
}
effect()
}
def onSubscribe(s: Subscription): Unit = nextState(OnSubscribe(s))
def onNext(a: A): Unit = nextState(OnNext(a))
def onError(t: Throwable): Unit = nextState(OnError(t))
def onComplete(): Unit = nextState(OnComplete)
def onFinalize: F[Unit] = F.delay(nextState(OnFinalize))
def dequeue1: F[Either[Throwable, Option[Chunk[A]]]] =
F.async_[Either[Throwable, Option[Chunk[A]]]] { cb =>
nextState(OnDequeue(out => cb(Right(out))))
}
}
}
}

0 comments on commit 9213dce

Please sign in to comment.