Skip to content

Commit

Permalink
Improve producing logs (#1478)
Browse files Browse the repository at this point in the history
  • Loading branch information
javierarrieta authored Apr 30, 2024
1 parent fd7b6db commit fa4f5db
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions core/src/main/scala/tamer/Tamer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ object Tamer {
.fromQueueWithShutdown(queue)
.mapZIO {
case (TxInfo.Context(transaction), chunk) if chunk.nonEmpty =>
transaction
.produceChunk(chunk.map(_.toKafkaProducerRecord(sinkTopic)), sinkKeySerializer, sinkValueSerializer, None)
.tapError(_ => log.debug(s"failed pushing ${chunk.size} messages to $sinkTopic"))
.retry(retries) // TODO: stop trying if the error is transaction related
.unit <*
log.info(s"pushed ${chunk.size} messages to $sinkTopic")
log.debug(s"pushing ${chunk.size} messages to $sinkTopic") *>
transaction
.produceChunk(chunk.map(_.toKafkaProducerRecord(sinkTopic)), sinkKeySerializer, sinkValueSerializer, None)
.tapError(e => log.info(s"failed pushing ${chunk.size} messages to $sinkTopic, will retry. Caused by: ${e.getMessage}"))
.retry(retries) // TODO: stop trying if the error is transaction related
.tapError(e => log.warn(s"finally failed pushing ${chunk.size} messages to $sinkTopic, will abort. Caused by: ${e.getMessage}", e))
.unit *> log.info(s"successfully pushed ${chunk.size} messages to $sinkTopic")

case (TxInfo.Delimiter(promise), _) =>
promise.succeed(()).unit <*
log.debug(s"user implicitly signalled end of data production")
Expand Down

0 comments on commit fa4f5db

Please sign in to comment.