-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathPutRecordTests.scala
82 lines (77 loc) · 2.48 KB
/
PutRecordTests.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package kinesis.mock
import scala.jdk.CollectionConverters._
import cats.effect.IO
import cats.syntax.all._
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.kinesis.model._
import kinesis.mock.instances.arbitrary._
import kinesis.mock.syntax.id._
import kinesis.mock.syntax.javaFuture._
import kinesis.mock.syntax.scalacheck._
class PutRecordTests extends AwsFunctionalTests {
fixture.test("It should put a record") { resources =>
for {
recordRequests <- IO(
putRecordRequestArb.arbitrary
.take(20)
.toVector
.map(
_.copy(streamName = Some(resources.streamName), streamArn = None)
)
.map(x =>
PutRecordRequest
.builder()
.partitionKey(x.partitionKey)
.streamName(resources.streamName.streamName)
.data(SdkBytes.fromByteArray(x.data))
.maybeTransform(x.explicitHashKey)(_.explicitHashKey(_))
.maybeTransform(x.sequenceNumberForOrdering)((req, sequenceNum) =>
req.sequenceNumberForOrdering(sequenceNum.value)
)
.build()
)
)
_ <- recordRequests.parTraverse(x =>
resources.kinesisClient.putRecord(x).toIO
)
shards <- resources.kinesisClient
.listShards(
ListShardsRequest
.builder()
.streamName(resources.streamName.streamName)
.build()
)
.toIO
.map(_.shards().asScala.toVector)
shardIterators <- shards.traverse(shard =>
resources.kinesisClient
.getShardIterator(
GetShardIteratorRequest
.builder()
.shardId(shard.shardId())
.streamName(resources.streamName.streamName)
.shardIteratorType(ShardIteratorType.TRIM_HORIZON)
.build()
)
.toIO
.map(_.shardIterator())
)
gets <- shardIterators.traverse(shardIterator =>
resources.kinesisClient
.getRecords(
GetRecordsRequest.builder().shardIterator(shardIterator).build()
)
.toIO
)
res = gets.flatMap(_.records().asScala.toVector)
} yield assert(
res.length == 20 && res.forall(rec =>
recordRequests.exists(req =>
req.data.asByteArray.sameElements(rec.data.asByteArray)
&& req.partitionKey == rec.partitionKey
)
),
s"$res\n$recordRequests"
)
}
}