-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathListStreamConsumersTests.scala
72 lines (67 loc) · 2.23 KB
/
ListStreamConsumersTests.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package kinesis.mock
import scala.jdk.CollectionConverters._
import cats.effect.IO
import cats.syntax.all._
import software.amazon.awssdk.services.kinesis.model._
import kinesis.mock.instances.arbitrary._
import kinesis.mock.models.ConsumerArn
import kinesis.mock.syntax.javaFuture._
import kinesis.mock.syntax.scalacheck._
class ListStreamConsumersTests extends AwsFunctionalTests {
fixture.test("It should list stream consumers") { resources =>
for {
consumerNames <- IO(
consumerNameGen.take(3).toList.sorted.map(_.consumerName)
)
streamSummary <- describeStreamSummary(resources)
streamArn = streamSummary.streamDescriptionSummary().streamARN()
registerRes <- consumerNames.traverse(consumerName =>
resources.kinesisClient
.registerStreamConsumer(
RegisterStreamConsumerRequest
.builder()
.streamARN(streamArn)
.consumerName(consumerName)
.build()
)
.toIO
)
res <- resources.kinesisClient
.listStreamConsumers(
ListStreamConsumersRequest.builder().streamARN(streamArn).build()
)
.toIO
resultConsumers <- res.consumers.asScala.toList.traverse(x =>
IO.fromEither(
ConsumerArn.fromArn(x.consumerARN()).leftMap(new RuntimeException(_))
).map(consumerArn =>
models.ConsumerSummary(
consumerArn,
x.consumerCreationTimestamp(),
models.ConsumerName(x.consumerName()),
models.ConsumerStatus.withName(x.consumerStatusAsString())
)
)
)
registerResultConsumers <- registerRes
.map(_.consumer())
.traverse(x =>
IO.fromEither(
ConsumerArn
.fromArn(x.consumerARN())
.leftMap(new RuntimeException(_))
).map(consumerArn =>
models.ConsumerSummary(
consumerArn,
x.consumerCreationTimestamp(),
models.ConsumerName(x.consumerName()),
models.ConsumerStatus.withName(x.consumerStatusAsString())
)
)
)
} yield assert(
resultConsumers === registerResultConsumers,
s"$res\n$registerRes"
)
}
}