Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka replay speed: ingestion metrics #9346

Conversation

dimitarvdimitrov
Copy link
Contributor

What this PR does

  • cortex_ingest_storage_reader_processing_time_seconds now records the time to process the whole batch of requests; this is useful in measuring the effectiveness of fetching from kafka vs ingesting
  • cortex_ingester_pusher_ingestion_shard_batch_age_seconds to keep an eye on the performance of the queue
  • cortex_ingest_storage_reader_ingestion_shard_batch_processing_time_seconds to know when popping form the queue slows down

Which issue(s) this PR fixes or relates to

Fixes #

Checklist

  • Tests updated.
  • Documentation added.
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX].
  • about-versioning.md updated with experimental features.

@dimitarvdimitrov dimitarvdimitrov requested a review from a team as a code owner September 20, 2024 08:52
@dimitarvdimitrov dimitarvdimitrov force-pushed the dimitar/ingester/consume-latency-push-sharding branch from c0d740c to fb30f39 Compare September 20, 2024 13:12
@dimitarvdimitrov dimitarvdimitrov force-pushed the dimitar/ingester/consume-latency-push-sharding-fixups/add-metrics branch 2 times, most recently from d305599 to ec286e3 Compare September 20, 2024 19:09
@dimitarvdimitrov
Copy link
Contributor Author

feedback from @gotjosh:

  1. divide all of these metrics by tenant
  2. add server error rate counter in the queue; this will duplicate the error metrics from kafka replay speed: move error handling closer to actual ingestion #9349, but that's ok - it will make metric discovery easier
  3. have proper component-based naming of the metrics (e.g. cortex_ingest_storage_reader_batching_queue_processing_time_seconds)

@dimitarvdimitrov
Copy link
Contributor Author

LGTM

Copy link
Contributor

@gotjosh gotjosh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

dimitarvdimitrov and others added 9 commits September 25, 2024 16:09
Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-Authored-By: Dimitar Dimitrov <[email protected]>
Signed-off-by: gotjosh <[email protected]>
@gotjosh gotjosh force-pushed the dimitar/ingester/consume-latency-push-sharding-fixups/add-metrics branch from 47a9cdc to 922dfda Compare September 25, 2024 15:10
@gotjosh gotjosh merged commit c2ce628 into dimitar/ingester/consume-latency-push-sharding Sep 25, 2024
29 checks passed
@gotjosh gotjosh deleted the dimitar/ingester/consume-latency-push-sharding-fixups/add-metrics branch September 25, 2024 15:31
dimitarvdimitrov added a commit that referenced this pull request Sep 27, 2024
Signed-off-by: Dimitar Dimitrov <[email protected]>

Use labels hasher

Signed-off-by: Dimitar Dimitrov <[email protected]>

Use consistent title name

Signed-off-by: Dimitar Dimitrov <[email protected]>

Use consistent title name

Signed-off-by: Dimitar Dimitrov <[email protected]>

kafka replay speed: adjust batchingQueueCapacity (#9344)

* kafka replay speed: adjust batchingQueueCapacity

I made 2000 up when we were flushing individual series to the channel.
Then 2000 might have made sense, but when flushing whole WriteRequests
a capacity of 1 should be sufficient.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Increase errCh capacity

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Explain why +1

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Set capacity to 5

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Update pkg/storage/ingest/pusher.go

Co-authored-by: gotjosh <[email protected]>

* Improve test

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Update pkg/storage/ingest/pusher.go

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: gotjosh <[email protected]>

kafka replay speed: rename CLI flags (#9345)

* kafka replay speed: rename CLI flags

Make them a bit more consistent on what they mean and add better descriptions.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Clarify metrics

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Rename flags

Co-authored-by: gotjosh <[email protected]>

* Update docs

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: gotjosh <[email protected]>

kafka replay speed: add support for metadata & source (#9287)

* kafka replay speed: add support for metadata & source

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Remove completed TODO

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Use a single map

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Make tests compile again

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>

kafka replay speed: improve fetching tracing (#9361)

* Better span logging

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Better span logging

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Try to have more buffering in ordered batches

maybe waiting to send to ordered batches comes with too much overhead

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Correct local docker-compose config with new flags

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Maybe have more stable events

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Revert "Try to have more buffering in ordered batches"

This reverts commit 886b159.

* Maybe have more stable events

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Maybe have more stable events

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Propagate loggers in spans

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>

continuous-test: Make the User-Agent header for the Mimir client conf… (#9338)

* continuous-test: Make the User-Agent header for the Mimir client configurable

* Update CHANGELOG.md

* Run make reference-help

TestIngester_PushToStorage_CircuitBreaker: increase initial delay (#9351)

* TestIngester_PushToStorage_CircuitBreaker: increase initial delay

Fixes XXX
I believe there's a race between sending the first request and then collecting the metrics. It's possible that we collect the metrics longer than 200ms after the first request, at which point the CB has opened. I could reproduce XXX by reducing the initialDelay to 10ms.

This PR increases it to 1 hour so that we're more sure that the delay hasn't expired when we're collecting the metrics.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Adjust all tests

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>

Update to latest commit of dskit main (#9356)

Specifically pulls in grafana/dskit#585

Signed-off-by: Nick Pillitteri <[email protected]>

Update mimir-prometheus (#9358)

* Update mimir-prometheus

* Run make generate-otlp

query-tee: add equivalent errors for string expression for range queries (#9366)

* query-tee: add equivalent errors for string expression for range queries

* Add changelog entry

MQE: fix `rate()` over native histograms where first point in range is a counter reset (#9371)

* MQE: fix `rate()` over native histograms where first point is a counter reset

* Add changelog entry

Update module github.com/Azure/azure-sdk-for-go/sdk/storage/azblob to v1.4.1 (#9369)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

Use centralized 'Add to docs project' workflow with GitHub App auth (#9330)

* Use centralized 'Add to docs project' workflow with GitHub App auth

Until this is merged, it is likely that any issues labeled `type/docs` won't be added to the [organization project](https://github.com/orgs/grafana/projects/69).

The underlying action is centralized so that any future changes are made in one place (`grafana/writers-toolkit`). The action is versioned to protect workflows from breaking changes.

The action uses Vault secrets instead of the discouraged organization secrets.

The workflow uses a consistent name so that future changes can be made programmatically.

Relates to https://github.com/orgs/grafana/projects/279/views/9?pane=issue&itemId=44280262

Signed-off-by: Jack Baldry <[email protected]>

* Remove unneeded checkout step

* Remove unneeded checkout step

---------

Signed-off-by: Jack Baldry <[email protected]>

Update grafana/agent Docker tag to v0.43.1 (#9365)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

Update module github.com/hashicorp/vault/api/auth/userpass to v0.8.0 (#9375)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

Update module github.com/hashicorp/vault/api/auth/approle to v0.8.0 (#9374)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

Update module go.opentelemetry.io/collector/pdata to v1.15.0 (#9380)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

Update module github.com/hashicorp/vault/api/auth/kubernetes to v0.8.0 (#9377)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

Update module github.com/twmb/franz-go/plugin/kotel to v1.5.0 (#9379)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

kafka replay speed: ingestion metrics (#9346)

* kafka replay speed: ingestion metrics

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Separate batch processing time by batch contents

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Also set time on metadata

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Add tenant to metrics

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Add metrics for errors

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Rename batching queue metrics

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Pairing to address code review

Co-Authored-By: Dimitar Dimitrov <[email protected]>

* Move the metrics into their own file

Co-Authored-By: Dimitar Dimitrov <[email protected]>

* go mod tidy

Signed-off-by: gotjosh <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: gotjosh <[email protected]>
Co-authored-by: gotjosh <[email protected]>

kafka replay speed: move error handling closer to actual ingestion (#9349)

* kafka replay speed: move error handling closer to actual ingestion

Previously, we'd let error bubble-up and only take decisions on whether to abort the request or not at the very top (`pusherConsumer`). This meant that we'd potentially buffer more requests before we detect an error.

This change extracts error handling logic into a `Pusher` implementation: `clientErrorFilteringPusher`. This implementation logs client errors and then swallows them. We inject that implementation in front of the ingester. This means that the parallel storage implementation can abort ASAP instead of collecting and bubbling up the errors.

Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: gotjosh <[email protected]>
Co-authored-by: gotjosh <[email protected]>

kafka replay speed: concurrency fetching improvements (#9389)

* fetched records include timestamps

Signed-off-by: Dimitar Dimitrov <[email protected]>

* try with defaultMinBytesWaitTime=3s

Signed-off-by: Dimitar Dimitrov <[email protected]>

* add fetch_min_bytes_max_wait

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Don't block on sending to the channel

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Remove wait for when we're fetching from the end

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix bug with blocking on fetch

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Slightly easier to follow lifecycle of previousResult

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Correct merging of results

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Avoid double-logging events

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Revert "add fetch_min_bytes_max_wait"

This reverts commit 6197d4b.

* Increase MinBytesWaitTime to 5s

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Add comment about warpstream and MinBytes

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Address review comments

Signed-off-by: gotjosh <[email protected]>

* Add tests for concurrentFetchers

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix bugs in tracking lastReturnedRecord

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Renamed method

Signed-off-by: gotjosh <[email protected]>

* use the older context

Signed-off-by: gotjosh <[email protected]>

* Name variable correct variable name

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Reduce MaxWaitTime in PartitionReader tests

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Change test createConcurrentFetchers signature

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Sort imports

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: gotjosh <[email protected]>
Co-authored-by: gotjosh <[email protected]>

Make concurrentFetchers change its concurrency dynamically (#9437)

* Make concurrentFetchers change its concurrency dynamically

Signed-off-by: gotjosh <[email protected]>

* address review comments

Signed-off-by: gotjosh <[email protected]>

* `make doc`

Signed-off-by: gotjosh <[email protected]>

* inline the stop method

Signed-off-by: gotjosh <[email protected]>

* Fix panic when creating concurrent fetchers fails

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Disabled by default

Signed-off-by: gotjosh <[email protected]>

* we don't need to handle the context in start

Signed-off-by: gotjosh <[email protected]>

* don't store concurrency or records per fetch

Signed-off-by: gotjosh <[email protected]>

* add validation to the flags

Signed-off-by: gotjosh <[email protected]>

* Ensure we don't leak any goroutines.

Signed-off-by: gotjosh <[email protected]>

* remove concurrent and recordsperfetch from the main struct

Signed-off-by: gotjosh <[email protected]>

---------

Signed-off-by: gotjosh <[email protected]>
Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: Dimitar Dimitrov <[email protected]>

kafka replay speed: fix concurrent fetching concurrency transition (#9447)

* kafka replay speed: fix concurrent fetching concurrency transition

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Update pkg/storage/ingest/reader.go

* Make sure we evaluate r.lastReturnedRecord WHEN we return

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Redistribute wg.Add

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Add tests

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Remove defer causing data race

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Move defer to a different place

Signed-off-by: Dimitar Dimitrov <[email protected]>

* WIP

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Give more time to catch up with target_lag

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Clarify comment

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
dimitarvdimitrov added a commit that referenced this pull request Sep 27, 2024
Signed-off-by: Dimitar Dimitrov <[email protected]>

Use labels hasher

Signed-off-by: Dimitar Dimitrov <[email protected]>

Use consistent title name

Signed-off-by: Dimitar Dimitrov <[email protected]>

Use consistent title name

Signed-off-by: Dimitar Dimitrov <[email protected]>

kafka replay speed: adjust batchingQueueCapacity (#9344)

* kafka replay speed: adjust batchingQueueCapacity

I made 2000 up when we were flushing individual series to the channel.
Then 2000 might have made sense, but when flushing whole WriteRequests
a capacity of 1 should be sufficient.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Increase errCh capacity

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Explain why +1

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Set capacity to 5

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Update pkg/storage/ingest/pusher.go

Co-authored-by: gotjosh <[email protected]>

* Improve test

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Update pkg/storage/ingest/pusher.go

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: gotjosh <[email protected]>

kafka replay speed: rename CLI flags (#9345)

* kafka replay speed: rename CLI flags

Make them a bit more consistent on what they mean and add better descriptions.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Clarify metrics

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Rename flags

Co-authored-by: gotjosh <[email protected]>

* Update docs

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: gotjosh <[email protected]>

kafka replay speed: add support for metadata & source (#9287)

* kafka replay speed: add support for metadata & source

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Remove completed TODO

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Use a single map

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Make tests compile again

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>

kafka replay speed: improve fetching tracing (#9361)

* Better span logging

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Better span logging

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Try to have more buffering in ordered batches

maybe waiting to send to ordered batches comes with too much overhead

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Correct local docker-compose config with new flags

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Maybe have more stable events

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Revert "Try to have more buffering in ordered batches"

This reverts commit 886b159.

* Maybe have more stable events

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Maybe have more stable events

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Propagate loggers in spans

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>

continuous-test: Make the User-Agent header for the Mimir client conf… (#9338)

* continuous-test: Make the User-Agent header for the Mimir client configurable

* Update CHANGELOG.md

* Run make reference-help

TestIngester_PushToStorage_CircuitBreaker: increase initial delay (#9351)

* TestIngester_PushToStorage_CircuitBreaker: increase initial delay

Fixes XXX
I believe there's a race between sending the first request and then collecting the metrics. It's possible that we collect the metrics longer than 200ms after the first request, at which point the CB has opened. I could reproduce XXX by reducing the initialDelay to 10ms.

This PR increases it to 1 hour so that we're more sure that the delay hasn't expired when we're collecting the metrics.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Adjust all tests

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>

Update to latest commit of dskit main (#9356)

Specifically pulls in grafana/dskit#585

Signed-off-by: Nick Pillitteri <[email protected]>

Update mimir-prometheus (#9358)

* Update mimir-prometheus

* Run make generate-otlp

query-tee: add equivalent errors for string expression for range queries (#9366)

* query-tee: add equivalent errors for string expression for range queries

* Add changelog entry

MQE: fix `rate()` over native histograms where first point in range is a counter reset (#9371)

* MQE: fix `rate()` over native histograms where first point is a counter reset

* Add changelog entry

Update module github.com/Azure/azure-sdk-for-go/sdk/storage/azblob to v1.4.1 (#9369)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

Use centralized 'Add to docs project' workflow with GitHub App auth (#9330)

* Use centralized 'Add to docs project' workflow with GitHub App auth

Until this is merged, it is likely that any issues labeled `type/docs` won't be added to the [organization project](https://github.com/orgs/grafana/projects/69).

The underlying action is centralized so that any future changes are made in one place (`grafana/writers-toolkit`). The action is versioned to protect workflows from breaking changes.

The action uses Vault secrets instead of the discouraged organization secrets.

The workflow uses a consistent name so that future changes can be made programmatically.

Relates to https://github.com/orgs/grafana/projects/279/views/9?pane=issue&itemId=44280262

Signed-off-by: Jack Baldry <[email protected]>

* Remove unneeded checkout step

* Remove unneeded checkout step

---------

Signed-off-by: Jack Baldry <[email protected]>

Update grafana/agent Docker tag to v0.43.1 (#9365)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

Update module github.com/hashicorp/vault/api/auth/userpass to v0.8.0 (#9375)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

Update module github.com/hashicorp/vault/api/auth/approle to v0.8.0 (#9374)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

Update module go.opentelemetry.io/collector/pdata to v1.15.0 (#9380)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

Update module github.com/hashicorp/vault/api/auth/kubernetes to v0.8.0 (#9377)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

Update module github.com/twmb/franz-go/plugin/kotel to v1.5.0 (#9379)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

kafka replay speed: ingestion metrics (#9346)

* kafka replay speed: ingestion metrics

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Separate batch processing time by batch contents

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Also set time on metadata

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Add tenant to metrics

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Add metrics for errors

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Rename batching queue metrics

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Pairing to address code review

Co-Authored-By: Dimitar Dimitrov <[email protected]>

* Move the metrics into their own file

Co-Authored-By: Dimitar Dimitrov <[email protected]>

* go mod tidy

Signed-off-by: gotjosh <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: gotjosh <[email protected]>
Co-authored-by: gotjosh <[email protected]>

kafka replay speed: move error handling closer to actual ingestion (#9349)

* kafka replay speed: move error handling closer to actual ingestion

Previously, we'd let error bubble-up and only take decisions on whether to abort the request or not at the very top (`pusherConsumer`). This meant that we'd potentially buffer more requests before we detect an error.

This change extracts error handling logic into a `Pusher` implementation: `clientErrorFilteringPusher`. This implementation logs client errors and then swallows them. We inject that implementation in front of the ingester. This means that the parallel storage implementation can abort ASAP instead of collecting and bubbling up the errors.

Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: gotjosh <[email protected]>
Co-authored-by: gotjosh <[email protected]>

kafka replay speed: concurrency fetching improvements (#9389)

* fetched records include timestamps

Signed-off-by: Dimitar Dimitrov <[email protected]>

* try with defaultMinBytesWaitTime=3s

Signed-off-by: Dimitar Dimitrov <[email protected]>

* add fetch_min_bytes_max_wait

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Don't block on sending to the channel

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Remove wait for when we're fetching from the end

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix bug with blocking on fetch

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Slightly easier to follow lifecycle of previousResult

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Correct merging of results

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Avoid double-logging events

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Revert "add fetch_min_bytes_max_wait"

This reverts commit 6197d4b.

* Increase MinBytesWaitTime to 5s

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Add comment about warpstream and MinBytes

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Address review comments

Signed-off-by: gotjosh <[email protected]>

* Add tests for concurrentFetchers

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix bugs in tracking lastReturnedRecord

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Renamed method

Signed-off-by: gotjosh <[email protected]>

* use the older context

Signed-off-by: gotjosh <[email protected]>

* Name variable correct variable name

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Reduce MaxWaitTime in PartitionReader tests

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Change test createConcurrentFetchers signature

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Sort imports

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: gotjosh <[email protected]>
Co-authored-by: gotjosh <[email protected]>

Make concurrentFetchers change its concurrency dynamically (#9437)

* Make concurrentFetchers change its concurrency dynamically

Signed-off-by: gotjosh <[email protected]>

* address review comments

Signed-off-by: gotjosh <[email protected]>

* `make doc`

Signed-off-by: gotjosh <[email protected]>

* inline the stop method

Signed-off-by: gotjosh <[email protected]>

* Fix panic when creating concurrent fetchers fails

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Disabled by default

Signed-off-by: gotjosh <[email protected]>

* we don't need to handle the context in start

Signed-off-by: gotjosh <[email protected]>

* don't store concurrency or records per fetch

Signed-off-by: gotjosh <[email protected]>

* add validation to the flags

Signed-off-by: gotjosh <[email protected]>

* Ensure we don't leak any goroutines.

Signed-off-by: gotjosh <[email protected]>

* remove concurrent and recordsperfetch from the main struct

Signed-off-by: gotjosh <[email protected]>

---------

Signed-off-by: gotjosh <[email protected]>
Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: Dimitar Dimitrov <[email protected]>

kafka replay speed: fix concurrent fetching concurrency transition (#9447)

* kafka replay speed: fix concurrent fetching concurrency transition

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Update pkg/storage/ingest/reader.go

* Make sure we evaluate r.lastReturnedRecord WHEN we return

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Redistribute wg.Add

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Add tests

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Remove defer causing data race

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Move defer to a different place

Signed-off-by: Dimitar Dimitrov <[email protected]>

* WIP

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Give more time to catch up with target_lag

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Clarify comment

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants