Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown of a stream for a single subscription #1201

Open
wants to merge 134 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
134 commits
Select commit Hold shift + click to select a range
7a9a6f8
Draft of interface changes
svroonland Mar 24, 2024
c8196b3
Remove deprecated for now
svroonland Mar 24, 2024
583d112
Draft implementation
svroonland Mar 24, 2024
f55c898
Fix implementation + first test
svroonland Mar 24, 2024
6f86958
More tests copied from stopConsumption
svroonland Mar 27, 2024
9eb989b
Alternative interface, workaround inability to deconstruct tuples in …
svroonland Mar 28, 2024
9a32c74
Formatting
svroonland Mar 28, 2024
98df970
Fix doc
svroonland Mar 28, 2024
29f8e44
Add test
svroonland Mar 28, 2024
1215c43
Tweak docs
svroonland Mar 28, 2024
97d7e6f
Add test
svroonland Mar 28, 2024
336aa8d
Move to separate file
svroonland Mar 29, 2024
361cfec
runWithGracefulShutdown
svroonland Mar 30, 2024
4ee9696
Add timeout
svroonland Mar 30, 2024
dfa0afa
Process PR comments
svroonland Apr 1, 2024
15ec438
Fix type constraints
svroonland Apr 1, 2024
885d9c9
Only offer *streamWithGracefulShutdown methods
svroonland Apr 3, 2024
1408148
Pause a partition when its stream is ended
erikvanoosten Apr 13, 2024
ed326a2
More tests
svroonland Apr 14, 2024
252cc3a
Add default value for bufferSize consistently
svroonland Apr 14, 2024
9954dda
Fix race condition between join and timeout, leading to unwanted inte…
svroonland Apr 14, 2024
add21e9
Fix test
svroonland Apr 14, 2024
8ec18c0
Make SubscriptionStreamControl a case class
svroonland Apr 14, 2024
c409368
Update doc
svroonland Apr 14, 2024
2dbddfc
Cleanup
svroonland Apr 14, 2024
7838929
Simplify subscribe
svroonland Apr 15, 2024
c9e48ab
requireRunning false
svroonland Apr 15, 2024
5d334e0
Log unexpected interruption
svroonland Apr 20, 2024
60464b6
Log more
svroonland Apr 20, 2024
258168d
Use partitionedStream
svroonland Apr 21, 2024
6e25cd4
Update pendingRequests and assignedStreams
svroonland Apr 21, 2024
8d19e16
Formatting
erikvanoosten Apr 27, 2024
f1edcab
Fix linting
erikvanoosten Apr 27, 2024
f19417b
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland May 10, 2024
5365aa0
Merge branch 'master' into subscription-stream-control
svroonland May 11, 2024
f977001
Merge branch 'master' into subscription-stream-control
svroonland May 19, 2024
c54b3e9
This works
svroonland May 20, 2024
33a6d82
This works with timeout
svroonland May 20, 2024
a155267
Remove unused annotation
svroonland May 20, 2024
15e041f
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Jun 5, 2024
7c21f82
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Jun 16, 2024
f5e42c5
Merge branch 'master' into subscription-stream-control
svroonland Jul 14, 2024
9a31569
Small improvements to the Producer (#1272)
erikvanoosten Jul 14, 2024
27f033e
Document metrics and consumer tuning based on metrics (#1280)
erikvanoosten Jul 14, 2024
108b285
Add alternative fetch strategy for many partitions (#1281)
erikvanoosten Jul 16, 2024
eaae8af
Alternative producer implementation (#1285)
erikvanoosten Jul 18, 2024
c862686
Prevent users from enabling auto commit (#1290)
erikvanoosten Jul 24, 2024
ff4ea7f
Update scalafmt-core to 3.8.3 (#1291)
zio-scala-steward[bot] Jul 26, 2024
bbbfe48
Upgrade to 2.1.7+11-854102ae-SNAPSHOT with ZStream finalization fix
svroonland Aug 10, 2024
a75a78e
Add sonatype snapshots
svroonland Aug 10, 2024
5fec195
Bump ZIO version
svroonland Oct 10, 2024
699e6e8
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Oct 10, 2024
aafd4ec
Revert stuff
svroonland Oct 10, 2024
34f5110
Bump
svroonland Oct 20, 2024
95f9bef
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Oct 20, 2024
f33dc34
Fix race condition when removing subscription
svroonland Nov 2, 2024
3cb1eee
Tweak
svroonland Nov 2, 2024
87eadd8
Tweak
svroonland Nov 2, 2024
31cd086
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Nov 2, 2024
a6d2afa
Increase timeout
svroonland Nov 2, 2024
1835758
This seems to work
svroonland Nov 2, 2024
6c28b89
Cleanup
svroonland Nov 2, 2024
e649753
Cleanup
svroonland Nov 2, 2024
348b01e
Restore old methods so we can offer this as experimental functionality
svroonland Nov 2, 2024
c9f1597
Rename methods + add some doc
svroonland Nov 2, 2024
4a9b6f6
More renames
svroonland Nov 2, 2024
3c0cfd1
Fix rebalanceSafeCommits test timing out
svroonland Nov 2, 2024
90ca347
Forgot to commit this file
svroonland Nov 2, 2024
edb7005
Fix shutdown behavior
svroonland Nov 2, 2024
e400012
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Nov 2, 2024
e83bc89
Apply suggestions from code review
svroonland Nov 2, 2024
efd3937
Restore stuff
svroonland Nov 2, 2024
e29d63e
Cleanup
svroonland Nov 2, 2024
f19f90d
Update docs/consuming-kafka-topics-using-zio-streams.md
svroonland Nov 3, 2024
c71a08f
PR comments
svroonland Nov 3, 2024
5c27da7
Do not empty commits when stopping all streams
svroonland Nov 5, 2024
9901b16
Revert change
svroonland Nov 5, 2024
a9925d5
Add assertion before poll, add some documenting comments
svroonland Nov 9, 2024
dac1865
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Nov 9, 2024
ddbd576
Stronger test
svroonland Nov 9, 2024
8e461ee
Do not clear assignedStreams when ending streams by subscription (ins…
svroonland Nov 9, 2024
5848e9c
Fix interruption issue
svroonland Nov 9, 2024
143f914
Log timeout error + cleanup
svroonland Nov 9, 2024
d5fc7bb
Fix test compilation withFilter issue
svroonland Nov 9, 2024
d6f485c
Fix doc syntax
svroonland Nov 9, 2024
5ef97ef
Fix test
svroonland Nov 9, 2024
bec0e25
Update comment
svroonland Nov 10, 2024
c7b6879
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Nov 16, 2024
5e2031e
Comment workaround
svroonland Nov 16, 2024
4c02d08
Merge branch 'master' into subscription-stream-control
svroonland Nov 16, 2024
e4f4e11
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Feb 16, 2025
46f2142
Tweak
svroonland Feb 16, 2025
43a0c8a
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Feb 22, 2025
a346981
Fix merge error + test
svroonland Feb 22, 2025
585ee42
Add experimental notes
svroonland Feb 22, 2025
f899758
Allow returning a value from with*Stream methods
svroonland Feb 22, 2025
07c0bf3
More logging
svroonland Mar 1, 2025
00d868d
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Mar 1, 2025
4e822fa
Make debugging easier
svroonland Mar 1, 2025
af901eb
No infinite retry
svroonland Mar 1, 2025
298c293
Restore log level
svroonland Mar 1, 2025
f9f31ed
Use KafkaTestUtils for creating topic
svroonland Mar 1, 2025
d87acd1
Unused imports
svroonland Mar 1, 2025
efe709a
Remove usage of stopConsumption
svroonland Mar 1, 2025
83c48fd
Rearrange all stopConsumption tests in one suite
svroonland Mar 1, 2025
f1610f9
Revert "Allow returning a value from with*Stream methods"
svroonland Mar 1, 2025
be05e25
Small refactoring
svroonland Mar 1, 2025
09a6375
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Mar 9, 2025
adf3965
Alternative type parameters for StreamControl
svroonland Mar 9, 2025
f032ad8
Remove workaround
svroonland Mar 15, 2025
67a4a90
Improve error messages
svroonland Mar 15, 2025
804d86c
Undo default parameter change
svroonland Mar 15, 2025
2d19709
Update zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
svroonland Mar 15, 2025
0e4eb31
Add comment
svroonland Mar 15, 2025
3c1a90c
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Mar 15, 2025
b8d79fb
Fix
svroonland Mar 15, 2025
454af70
Restore
svroonland Mar 15, 2025
d4bb91d
Subscription stream control - alternative API (#1501)
svroonland Mar 16, 2025
cb312fa
Mark as experimental API
svroonland Mar 16, 2025
41e679c
Update doc
svroonland Mar 16, 2025
610be5a
Formatting
svroonland Mar 16, 2025
33d28b9
Do not fetch data or accept pending requests for ended partition streams
svroonland Mar 17, 2025
e9ce9f3
Test calling end more than once
svroonland Mar 17, 2025
5569055
PR comments
svroonland Mar 17, 2025
cee0f89
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Mar 17, 2025
0ae0254
Formatting
svroonland Mar 17, 2025
565ccbf
Address some PR comments
svroonland Mar 21, 2025
44283e5
Undo some unnecessary changes
svroonland Mar 21, 2025
5c657f2
Update docs
svroonland Mar 21, 2025
6fc2186
Merge branch 'master' into subscription-stream-control
svroonland Mar 21, 2025
a6341c0
Undo haltWhen change
svroonland Mar 22, 2025
0ac6623
Doc fix
svroonland Mar 22, 2025
d8f898d
This does work
svroonland Mar 22, 2025
59701e9
Test for runWithGracefulShutdown
svroonland Mar 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions docs/consuming-kafka-topics-using-zio-streams.md
Original file line number Diff line number Diff line change
@@ -74,3 +74,70 @@ Each time a new partition is assigned to the consumer, a new partition stream is
ensures only a single stream is doing commits.

In this example, each partition is processed in parallel, on separate fibers, while a single fiber is doing commits.

## Controlled shutdown (experimental)

The examples above will keep processing records forever, or until the fiber is interrupted, typically at application shutdown. When interrupted, some records may be 'in-flight', e.g. being processed by one of the stages of your consumer stream user code. Those records will be processed partly and their offsets may not be committed. For fast shutdown in an at-least-once processing scenario this is fine.

zio-kafka also supports a _graceful shutdown_, where the fetching of records for the subscribed topics/partitions is stopped, the streams are ended and all downstream stages are completed, allowing in-flight records to be fully processed.

Use the `*StreamWithControl` variants of `plainStream`, `partitionedStream` and `partitionedAssignmentStream` for this purpose. These methods return a `StreamControl` object allowing you to stop fetching records and terminate the execution of the stream gracefully.

The `StreamControl` can be used with `Consumer.runWithGracefulShutdown`, which can gracefully terminate the stream upon fiber interruption. This is useful for a controlled shutdown when your application is terminated:

```scala
import zio.Console.printLine
import zio.kafka.consumer._
import zio._

ZIO.scoped {
for {
control <- consumer.partitionedStreamWithControl(
Subscription.topics("topic150"),
Serde.string,
Serde.string
)
_ <- consumer.runWithGracefulShutdown(control, shutdownTimeout = 30.seconds) { stream =>
stream.flatMapPar(Int.MaxValue) { case (topicPartition, partitionStream) =>
partitionStream
.tap(record => printLine(s"key: ${record.key}, value: ${record.value}"))
.map(_.offset)
}
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.runDrain
}
} yield ()
}
```

For more control over when to end the stream, use the `StreamControl` construct like this:

```scala
import zio.Console.printLine
import zio.kafka.consumer._

ZIO.scoped {
for {
control <- consumer.partitionedStreamWithControl(
Subscription.topics("topic150"),
Serde.string,
Serde.string
)
fiber <- control.stream.flatMapPar(Int.MaxValue) { case (topicPartition, partitionStream) =>
partitionStream
.tap(record => printLine(s"key: ${record.key}, value: ${record.value}"))
.map(_.offset)
}
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.runDrain
.forkScoped
// At some point in your code, when some condition is met
_ <- control.end // Stop fetching more records
_ <- fiber.join // Wait for graceful completion of the stream
} yield ()
}
```

As of zio-kafka 3.0, this functionality is experimental. If no issues are reported and the API has good usability, it will eventually be marked as stable.
728 changes: 606 additions & 122 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -56,7 +56,17 @@ object PartitionStreamControlSpec extends ZIOSpecDefault {
_ <- control.offerRecords(records)
_ <- control.end
pulledRecords <- control.stream.runCollect
} yield assertTrue(records == pulledRecords)
hasEnded <- control.hasEnded
} yield assertTrue(records == pulledRecords && hasEnded)
},
test("offering records after end will fail") {
for {
control <- createTestControl
records = createTestRecords(3)
_ <- control.offerRecords(records)
_ <- control.end
exit <- control.offerRecords(records).exit
} yield assertTrue(exit.isFailure)
},
test("halt completes the stream with a TimeoutException") {
for {
163 changes: 162 additions & 1 deletion zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
@@ -122,6 +122,63 @@ trait Consumer {
bufferSize: Int = 4
): ZStream[R, Throwable, CommittableRecord[K, V]]

/**
* Like [[plainStream]] but returns an object to control the stream.
*
* The Scope provided to the returned effect controls the lifetime of the subscription. The subscription is
* unsubscribed when the scope is ended. Calling [[StreamControl.end]] stops fetching data for the subscription
* partitions but will not unsubscribe until the Scope is ended.
*
* WARNING: this is an EXPERIMENTAL API and may disappear or change in an incompatible way without notice in any
* zio-kafka version.
*/
def plainStreamWithControl[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
bufferSize: Int = 4
): ZIO[Scope with R, Throwable, StreamControl[R, Throwable, CommittableRecord[K, V]]]

/**
* Like [[partitionedStream]] but returns an object to control the stream.
*
* The Scope provided to the returned effect controls the lifetime of the subscription. The subscription is
* unsubscribed when the scope is ended. Calling [[StreamControl.end]] stops fetching data for the subscription
* partitions but will not unsubscribe until the Scope is ended.
*
* WARNING: this is an EXPERIMENTAL API and may disappear or change in an incompatible way without notice in any
* zio-kafka version.
*/
def partitionedStreamWithControl[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): ZIO[
Scope with R,
Throwable,
StreamControl[R, Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]
]

/**
* Like [[partitionedAssignmentStream]] but returns an object to control the stream.
*
* The Scope provided to the returned effect controls the lifetime of the subscription. The subscription is
* unsubscribed when the scope is ended. Calling [[StreamControl.end]] stops fetching data for the subscription
* partitions but will not unsubscribe until the Scope is ended.
*
* WARNING: this is an EXPERIMENTAL API and may disappear or change in an incompatible way without notice in any
* zio-kafka version.
*/
def partitionedAssignmentStreamWithControl[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): ZIO[Scope, Throwable, StreamControl[
R,
Throwable,
Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]
]]

/**
* Stops consumption of data, drains buffered records, and ends the attached streams while still serving commit
* requests.
@@ -341,6 +398,56 @@ object Consumer {
case object Latest extends AutoOffsetStrategy
case object None extends AutoOffsetStrategy
}

/**
* Takes a StreamControl for some stream and runs the given ZIO workflow on that stream such that, when interrupted,
* stops fetching records and gracefully waits for the ZIO workflow to complete.
*
* This is useful for running streams from within your application's Main class, such that streams are cleanly stopped
* when the application is shutdown (for example by your container runtime).
*
* WARNING: this is an EXPERIMENTAL API and may disappear or change in an incompatible way without notice in any
* zio-kafka version.
*
* @param streamControl
* Result of one of the Consumer's methods returning a [[StreamControl]]
* @param shutdownTimeout
* Timeout for the workflow to complete after initiating the graceful shutdown
* @param withStream
* Takes the stream as input and returns a ZIO workflow that processes the stream. As in most programs the given
* workflow runs until an external interruption, the result value (Any type) is meaningless. `withStream` is
* typically something like `stream => stream.mapZIO(record => ZIO.debug(record)).mapZIO(_.offset.commit)`
*/
def runWithGracefulShutdown[R, E, A](
control: StreamControl[R, E, A],
shutdownTimeout: Duration
)(
withStream: ZStream[R, E, A] => ZIO[R, E, Any]
): ZIO[R, E, Any] =
ZIO.scoped[R] {
for {
fib <-
withStream(control.stream)
.onInterrupt(
ZIO.logError("withStream in runWithGracefulShutdown was interrupted, this should not happen")
)
.tapErrorCause(cause => ZIO.logErrorCause("Error in withStream fiber in runWithGracefulShutdown", cause))
.forkScoped
result <-
fib.join.onInterrupt(
control.end *>
fib.join
.timeout(shutdownTimeout)
.someOrElseZIO(
ZIO.logError(
"Timeout waiting for `withStream` to shut down gracefully. Not all in-flight records may have been processed."
)
)
.tapErrorCause(cause => ZIO.logErrorCause("Stream failed while awaiting its graceful shutdown", cause))
.ignore
)
} yield result
}
}

private[consumer] final class ConsumerLive private[consumer] (
@@ -392,7 +499,8 @@ private[consumer] final class ConsumerLive private[consumer] (

ZStream.unwrapScoped {
for {
stream <- runloopAccess.subscribe(subscription)
streamControl <- runloopAccess.subscribe(subscription)
stream = streamControl.stream
} yield stream
.map(_.exit)
.flattenExitOption
@@ -410,6 +518,33 @@ private[consumer] final class ConsumerLive private[consumer] (
}
}

override def partitionedAssignmentStreamWithControl[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): ZIO[Scope, Throwable, StreamControl[
R,
Throwable,
Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]
]] = {
val onlyByteArraySerdes: Boolean = (keyDeserializer eq Serde.byteArray) && (valueDeserializer eq Serde.byteArray)
for {
streamControl <- runloopAccess.subscribe(subscription)
} yield streamControl.map(
_.map(_.exit).flattenExitOption.map {
_.collect {
case (tp, partitionStream) if Subscription.subscriptionMatches(subscription, tp) =>
val stream: ZStream[R, Throwable, CommittableRecord[K, V]] =
if (onlyByteArraySerdes)
partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]]
else partitionStream.mapChunksZIO(_.mapZIO(_.deserializeWith(keyDeserializer, valueDeserializer)))

tp -> stream
}
}
)
}

override def partitionedStream[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
@@ -428,6 +563,32 @@ private[consumer] final class ConsumerLive private[consumer] (
bufferSize = bufferSize
)(_._2)

override def plainStreamWithControl[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
bufferSize: Int = 4
): ZIO[Scope with R, Throwable, StreamControl[R, Throwable, CommittableRecord[K, V]]] =
partitionedStreamWithControl(subscription, keyDeserializer, valueDeserializer).map(
_.map(
_.flatMapPar(
n = Int.MaxValue,
bufferSize = bufferSize
)(_._2)
)
)

override def partitionedStreamWithControl[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): ZIO[
Scope with R,
Throwable,
StreamControl[R, Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]
] = partitionedAssignmentStreamWithControl(subscription, keyDeserializer, valueDeserializer)
.map(_.map(_.flattenChunks))

override def stopConsumption: UIO[Unit] =
ZIO.logDebug("stopConsumption called") *>
runloopAccess.stopConsumption
33 changes: 33 additions & 0 deletions zio-kafka/src/main/scala/zio/kafka/consumer/StreamControl.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package zio.kafka.consumer
import zio.UIO
import zio.stream.ZStream

trait StreamControl[-R, +E, +A] {

/**
* The stream associated with this subscription
*
* The stream should be run at most once. Running it more than once will result in chunks of records being divided
* over the streams. After ending, running the stream another time will not produce records
*/
def stream: ZStream[R, E, A]

/**
* Stops fetching data for all partitions associated with this subscription. The stream will end and the effect
* running the stream will eventually complete.
*
* @return
* Effect that will complete immediately.
*/
def end: UIO[Unit]
}

object StreamControl {
implicit class StreamControlOps[-R, +E, +A](streamControl: StreamControl[R, E, A]) {
def map[R1 <: R, E1 >: E, B](f: ZStream[R, E, A] => ZStream[R1, E1, B]): StreamControl[R1, E1, B] =
new StreamControl[R1, E1, B] {
def stream = f(streamControl.stream)
def end = streamControl.end
}
}
}
Loading