diff --git a/docs/reference/metrics.md b/docs/reference/metrics.md index 9df72335a05..316244dc296 100644 --- a/docs/reference/metrics.md +++ b/docs/reference/metrics.md @@ -5,78 +5,195 @@ sidebar_position: 70 Quickwit exposes key metrics in the [Prometheus](https://prometheus.io/) format on the `/metrics` endpoint. You can use any front-end that supports Prometheus to examine the behavior of Quickwit visually. +:::tip + +Workloads with a large number of indexes generate high cardinality metrics for the label `index`. Set the environment variable `QW_DISABLE_PER_INDEX_METRICS=true` to disable that label if this is problematic for your metrics database. + +::: + ## Cache Metrics -Currently Quickwit exposes metrics for three caches: `fastfields`, `shortlived`, `splitfooter`. These metrics share the same structure. +Quickwit exposes several metrics every caches. The cache type is defined in the `component_name` label. Values are `fastfields`, `shortlived`, `splitfooter`, `fd`, `partial_request`, and `searcher_split`. -| Namespace | Metric Name | Description | Type | -| --------- | ----------- | ----------- | ---- | -| `quickwit_cache_{cache_name}` | `in_cache_count` | Count of {cache_name} in cache | `gauge` | -| `quickwit_cache_{cache_name}` | `in_cache_num_bytes` | Number of {cache_name} bytes in cache | `gauge` | -| `quickwit_cache_{cache_name}` | `cache_hit_total` | Number of {cache_name} cache hits | `counter` | -| `quickwit_cache_{cache_name}` | `cache_hits_bytes` | Number of {cache_name} cache hits in bytes | `counter` | -| `quickwit_cache_{cache_name}` | `cache_miss_total` | Number of {cache_name} cache hits | `counter` | +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit_cache` | `in_cache_count` | Count of entries in cache by component | [`component_name`] | `gauge` | +| `quickwit_cache` | `in_cache_num_bytes` | Number of bytes in cache by component | [`component_name`] | `gauge` | +| `quickwit_cache` | `cache_hits_total` | Number of cache hits by component | [`component_name`] | `counter` | +| `quickwit_cache` | `cache_hits_bytes` | Number of cache hits in bytes by component | [`component_name`] | `counter` | +| `quickwit_cache` | `cache_misses_total` | Number of cache misses by component | [`component_name`] | `counter` | +| `quickwit_cache` | `cache_evict_total` | Number of cache entries evicted by component | [`component_name`] | `counter` | +| `quickwit_cache` | `cache_evict_bytes` | Number of cache entries evicted in bytes by component | [`component_name`] | `counter` | -## CLI Metrics +## Cluster Metrics + +Cluster metrics help track the behavior of the Chitchat protocol. + +Note: the cluster protocol uses GRPC to catch up large deltas in its state. Those calls are monitored as [GRPC metrics](#grpc-metrics). | Namespace | Metric Name | Description | Type | | --------- | ----------- | ----------- | ---- | -| `quickwit` | `allocated_num_bytes` | Number of bytes allocated memory, as reported by jemalloc. | `gauge` | +| `quickwit_cluster` | `live_nodes` | The number of live nodes observed locally | `gauge` | +| `quickwit_cluster` | `ready_nodes` | The number of ready nodes observed locally | `gauge` | +| `quickwit_cluster` | `zombie_nodes` | The number of zombie nodes observed locally | `gauge` | +| `quickwit_cluster` | `dead_nodes` | The number of dead nodes observed locally | `gauge` | +| `quickwit_cluster` | `cluster_state_size_bytes` | The size of the cluster state in bytes | `gauge` | +| `quickwit_cluster` | `node_state_size_bytes` | The size of the node state in bytes | `gauge` | +| `quickwit_cluster` | `node_state_keys` | The number of keys in the node state | `gauge` | +| `quickwit_cluster` | `gossip_recv_messages_total` | Total number of gossip messages received | `counter` | +| `quickwit_cluster` | `gossip_recv_bytes_total` | Total amount of gossip data received in bytes | `counter` | +| `quickwit_cluster` | `gossip_sent_messages_total` | Total number of gossip messages sent | `counter` | +| `quickwit_cluster` | `gossip_sent_bytes_total` | Total amount of gossip data sent in bytes | `counter` | +| `quickwit_cluster` | `grpc_gossip_rounds_total` | Total number of gRPC gossip rounds performed with peer nodes | `counter` | + +## Control Plane Metrics -## Common Metrics +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit_control_plane` | `indexes_total` | Number of indexes | | `gauge` | +| `quickwit_control_plane` | `restart_total` | Number of control plane restarts | | `counter` | +| `quickwit_control_plane` | `schedule_total` | Number of control plane schedule operations | | `counter` | +| `quickwit_control_plane` | `apply_total` | Number of control plane apply plan operations | | `counter` | +| `quickwit_control_plane` | `metastore_error_aborted` | Number of aborted metastore transactions (do not trigger a control plane restart) | | `counter` | +| `quickwit_control_plane` | `metastore_error_maybe_executed` | Number of metastore transactions with an uncertain outcome (do trigger a control plane restart) | | `counter` | +| `quickwit_control_plane` | `open_shards_total` | Number of open shards per source | [`index_id`] | `gauge` | +| `quickwit_control_plane` | `shards` | Number of (remote/local) shards in the indexing plan | [`locality`] | `gauge` | + +## GRPC Metrics + +The following subsystems expose gRPC metrics: `cluster`, `control_plane`, `indexing`, `ingest`, `metastore`. | Namespace | Metric Name | Description | Labels | Type | | --------- | ----------- | ----------- | ------ | ---- | -| `quickwit` | `write_bytes`| Number of bytes written by a given component in [`indexer`, `merger`, `deleter`, `split_downloader_{merge,delete}`] | [`index`, `component`] | `counter` | +| `quickwit_{subsystem}` | `grpc_requests_total` | Total number of gRPC requests processed | [`kind`, `rpc`, `status`] | `counter` | +| `quickwit_{subsystem}` | `grpc_requests_in_flight` | Number of gRPC requests in-flight | [`kind`, `rpc`] | `gauge` | +| `quickwit_{subsystem}` | `grpc_request_duration_seconds` | Duration of request in seconds | [`kind`, `rpc`, `status`] | `histogram` | +| `quickwit_grpc` | `circuit_break_total` | Circuit breaker counter | | `counter` | ## Indexing Metrics | Namespace | Metric Name | Description | Labels | Type | | --------- | ----------- | ----------- | ------ | ---- | -| `quickwit_indexing` | `processed_docs_total`| Number of processed docs by index, source and processed status in [`valid`, `schema_error`, `parse_error`, `transform_error`] | [`index`, `source`, `docs_processed_status`] | `counter` | -| `quickwit_indexing` | `processed_bytes`| Number of processed bytes by index, source and processed status in [`valid`, `schema_error`, `parse_error`, `transform_error`] | [`index`, `source`, `docs_processed_status`] | `counter` | -| `quickwit_indexing` | `available_concurrent_upload_permits`| Number of available concurrent upload permits by component in [`merger`, `indexer`] | [`component`] | `gauge` | -| `quickwit_indexing` | `ongoing_merge_operations`| Number of available concurrent upload permits by component in [`merger`, `indexer`]. | [`index`, `source`] | `gauge` | +| `quickwit_indexing` | `processed_docs_total` | Number of processed docs by index and processed status | [`index`, `docs_processed_status`] | `counter` | +| `quickwit_indexing` | `processed_bytes` | Number of bytes of processed documents by index and processed status | [`index`, `docs_processed_status`] | `counter` | +| `quickwit_indexing` | `backpressure_micros` | Amount of time spent in backpressure (in micros) | [`actor_name`] | `counter` | +| `quickwit_indexing` | `concurrent_upload_available_permits_num` | Number of available concurrent upload permits by component | [`component`] | `gauge` | +| `quickwit_indexing` | `split_builders` | Number of existing index writer instances | | `gauge` | +| `quickwit_indexing` | `ongoing_merge_operations` | Number of ongoing merge operations | | `gauge` | +| `quickwit_indexing` | `pending_merge_operations` | Number of pending merge operations | | `gauge` | +| `quickwit_indexing` | `pending_merge_bytes` | Number of pending merge bytes | | `gauge` | +| `quickwit_indexing` | `kafka_rebalance_total` | Number of kafka rebalances | | `counter` | ## Ingest Metrics -| Namespace | Metric Name | Description | Type | -| --------- | ----------- | ----------- | ---- | -| `quickwit_ingest` | `ingested_num_bytes` | Total size of the docs ingested in bytes | `counter` | -| `quickwit_ingest` | `ingested_num_docs` | Number of docs received to be ingested | `counter` | -| `quickwit_ingest` | `queue_count` | Number of queues currently active | `counter` | +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit_ingest` | `docs_total` | Total number of docs ingested, measured in ingester's leader | [`validity`] | `counter` | +| `quickwit_ingest` | `docs_bytes_total` | Total size of the docs ingested in bytes, measured in ingester's leader | [`validity`] | `counter` | +| `quickwit_ingest` | `ingest_result_total` | Number of ingest requests by result | [`result`] | `counter` | +| `quickwit_ingest` | `reset_shards_operations_total` | Total number of reset shards operations performed | [`status`] | `counter` | +| `quickwit_ingest` | `shards` | Number of shards hosted by the ingester | [`state`] | `gauge` | +| `quickwit_ingest` | `shard_lt_throughput_mib` | Shard long term throughput as reported through chitchat | | `histogram` | +| `quickwit_ingest` | `shard_st_throughput_mib` | Shard short term throughput as reported through chitchat | | `histogram` | +| `quickwit_ingest` | `wal_acquire_lock_requests_in_flight` | Number of acquire lock requests in-flight | [`operation`, `type`] | `gauge` | +| `quickwit_ingest` | `wal_acquire_lock_request_duration_secs` | Duration of acquire lock requests in seconds | [`operation`, `type`] | `histogram` | +| `quickwit_ingest` | `wal_disk_used_bytes` | WAL disk space used in bytes | | `gauge` | +| `quickwit_ingest` | `wal_memory_used_bytes` | WAL memory used in bytes | | `gauge` | + + +Note that the legacy ingest (V1) only records the `docs_total` and `docs_bytes_total` metrics. The `validity` label is always set to `valid` because it doesn't parse the documents at ingest time. Invalid documents are discarded asynchronously in the indexing pipeline's doc processor. + +## Janitor Metrics -## Metastore Metrics +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit_janitor` | `ongoing_num_delete_operations_total` | Number of ongoing delete operations per index | [`index`] | `gauge` | +| `quickwit_janitor` | `gc_deleted_splits_total` | Total number of splits deleted by the garbage collector | [`result`] | `counter` | +| `quickwit_janitor` | `gc_deleted_bytes_total` | Total number of bytes deleted by the garbage collector | | `counter` | +| `quickwit_janitor` | `gc_runs_total` | Total number of garbage collector executions | [`result`] | `counter` | +| `quickwit_janitor` | `gc_seconds_total` | Total time spent running the garbage collector | | `counter` | + +## Jaeger Metrics + +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit_jaeger` | `requests_total` | Number of requests | [`operation`, `index`] | `counter` | +| `quickwit_jaeger` | `request_errors_total` | Number of failed requests | [`operation`, `index`] | `counter` | +| `quickwit_jaeger` | `request_duration_seconds` | Duration of requests | [`operation`, `index`, `error`] | `histogram` | +| `quickwit_jaeger` | `fetched_traces_total` | Number of traces retrieved from storage | [`operation`, `index`] | `counter` | +| `quickwit_jaeger` | `fetched_spans_total` | Number of spans retrieved from storage | [`operation`, `index`] | `counter` | +| `quickwit_jaeger` | `transferred_bytes_total` | Number of bytes transferred | [`operation`, `index`] | `counter` | -All metastore methods are monitored by the 3 metrics: +## Memory Metrics | Namespace | Metric Name | Description | Labels | Type | | --------- | ----------- | ----------- | ------ | ---- | -| `quickwit_metastore` | `requests_total` | Number of requests | [`operation`, `index`] | `counter` | -| `quickwit_metastore` | `request_errors_total` | Number of failed requests | [`operation`, `index`] | `counter` | -| `quickwit_metastore` | `request_duration_seconds` | Duration of requests | [`operation`, `index`, `error`] | `histogram` | +| `quickwit_memory` | `active_bytes` | Total number of bytes in active pages allocated by the application, as reported by jemalloc `stats.active` | | `gauge` | +| `quickwit_memory` | `allocated_bytes` | Total number of bytes allocated by the application, as reported by jemalloc `stats.allocated` | | `gauge` | +| `quickwit_memory` | `resident_bytes` | Total number of bytes in physically resident data pages mapped by the allocator, as reported by jemalloc `stats.resident` | | `gauge` | +| `quickwit_memory` | `in_flight_data_bytes` | Amount of data in-flight in various buffers in bytes | [`component`] | `gauge` | -Examples of operation names: `create_index`, `index_metadata`, `delete_index`, `stage_splits`, `publish_splits`, `list_splits`, `add_source`, ... +## Metastore Metrics -## Rest API Metrics +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit_metastore` | `acquire_connections` | Number of connections being acquired (PostgreSQL only) | | `gauge` | +| `quickwit_metastore` | `active_connections` | Number of active (used + idle) connections (PostgreSQL only) | | `gauge` | +| `quickwit_metastore` | `idle_connections` | Number of idle connections (PostgreSQL only) | | `gauge` | -| Namespace | Metric Name | Description | Type | -| --------- | ----------- | ----------- | ---- | -| `quickwit` | `http_requests_total` | Total number of HTTP requests received | `counter` | +## OTLP Metrics + +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit_otlp` | `requests_total` | Number of requests | [`service`, `index`, `transport`, `format`] | `counter` | +| `quickwit_otlp` | `request_errors_total` | Number of failed requests | [`service`, `index`, `transport`, `format`] | `counter` | +| `quickwit_otlp` | `request_duration_seconds` | Duration of requests | [`service`, `index`, `transport`, `format`, `error`] | `histogram` | +| `quickwit_otlp` | `ingested_log_records_total` | Number of log records ingested | [`service`, `index`, `transport`, `format`] | `counter` | +| `quickwit_otlp` | `ingested_spans_total` | Number of spans ingested | [`service`, `index`, `transport`, `format`] | `counter` | +| `quickwit_otlp` | `ingested_bytes_total` | Number of bytes ingested | [`service`, `index`, `transport`, `format`] | `counter` | + +## REST API Metrics + +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit` | `http_requests_total` | Total number of HTTP requests processed | [`method`, `status_code`] | `counter` | +| `quickwit` | `request_duration_secs` | Response time in seconds | [`method`, `status_code`] | `histogram` | +| `quickwit` | `ongoing_requests` | Number of ongoing requests on specific endpoint groups | [`endpoint_group`] | `gauge` | +| `quickwit` | `pending_requests` | Number of pending requests on specific endpoint groups | [`endpoint_group`] | `gauge` | ## Search Metrics -| Namespace | Metric Name | Description | Type | -| --------- | ----------- | ----------- | ---- | -| `quickwit_search` | `leaf_searches_splits_total` | Number of leaf searches (count of splits) started | `counter` | -| `quickwit_search` | `leaf_search_split_duration_secs` | Number of seconds required to run a leaf search over a single split. The timer starts after the semaphore is obtained | `histogram` | -| `quickwit_search` | `active_search_threads_count` | Number of threads in use in the CPU thread pool | `gauge` | +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit_search` | `root_search_requests_total` | Total number of root search gRPC requests processed | [`status`] | `counter` | +| `quickwit_search` | `root_search_request_duration_seconds` | Duration of root search gRPC requests in seconds | [`status`] | `histogram` | +| `quickwit_search` | `root_search_targeted_splits` | Number of splits targeted per root search gRPC request | [`status`] | `histogram` | +| `quickwit_search` | `leaf_search_requests_total` | Total number of leaf search gRPC requests processed | [`status`] | `counter` | +| `quickwit_search` | `leaf_search_request_duration_seconds` | Duration of leaf search gRPC requests in seconds | [`status`] | `histogram` | +| `quickwit_search` | `leaf_search_targeted_splits` | Number of splits targeted per leaf search gRPC request | [`status`] | `histogram` | +| `quickwit_search` | `leaf_searches_splits_total` | Number of leaf searches (count of splits) started | | `counter` | +| `quickwit_search` | `leaf_search_split_duration_secs` | Number of seconds required to run a leaf search over a single split. The timer starts after the semaphore is obtained | | `histogram` | +| `quickwit_search` | `leaf_search_single_split_tasks` | Number of single split search tasks pending or ongoing | [`status`] | `gauge` | +| `quickwit_search` | `leaf_search_single_split_warmup_num_bytes` | Size of the short lived cache for a single split once the warmup is done | | `histogram` | +| `quickwit_search` | `job_assigned_total` | Number of jobs assigned from this searcher (root) to other searchers (leafs), per affinity rank | [`affinity`] | `counter` | +| `quickwit_search` | `searcher_local_kv_store_size_bytes` | Size of the searcher kv store in bytes. This store is used to cache scroll contexts | | `gauge` | ## Storage Metrics +| Namespace | Metric Name | Description | Labels | Type | +| --------- | ----------- | ----------- | ------ | ---- | +| `quickwit_storage` | `get_slice_timeout_outcome` | Outcome of get_slice operations. success_after_1_timeout means the operation succeeded after a retry caused by a timeout | [`outcome`] | `counter` | +| `quickwit_storage` | `object_storage_requests_total` | Number of requests to the object store, by action and status. Requests are recorded when the response headers are returned | [`action`, `status`] | `counter` | +| `quickwit_storage` | `object_storage_request_duration` | Durations until the response headers are returned from the object store, by action and status | [`action`, `status`] | `histogram` | +| `quickwit_storage` | `object_storage_download_num_bytes` | Amount of data downloaded from object storage | [`status`] | `counter` | +| `quickwit_storage` | `object_storage_download_errors` | Number of download requests that received successful response headers but failed during download | [`status`] | `counter` | +| `quickwit_storage` | `object_storage_upload_num_bytes` | Amount of data uploaded to object storage. The value recorded for failed and aborted uploads is the full payload size | [`status`] | `counter` | + +## CLI Metrics + | Namespace | Metric Name | Description | Type | | --------- | ----------- | ----------- | ---- | -| `quickwit_storage` | `object_storage_gets_total` | Number of objects fetched | `counter` | -| `quickwit_storage` | `object_storage_puts_total` | Number of objects uploaded. May differ from object_storage_requests_parts due to multipart upload | `counter` | -| `quickwit_storage` | `object_storage_puts_parts` | Number of object parts uploaded | `counter` | -| `quickwit_storage` | `object_storage_download_num_bytes` | Amount of data downloaded from an object storage | `counter` | +| `quickwit_cli` | `thread_unpark_duration_microseconds` | Duration for which a thread of the main tokio runtime is unparked | `histogram` | diff --git a/monitoring/grafana/dashboards/indexers.json b/monitoring/grafana/dashboards/indexers.json index 18d2d94aa7a..feaf655574d 100644 --- a/monitoring/grafana/dashboards/indexers.json +++ b/monitoring/grafana/dashboards/indexers.json @@ -25,6 +25,7 @@ "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 0, + "id": 1, "links": [], "liveNow": false, "panels": [ @@ -745,7 +746,7 @@ "uid": "${datasource}" }, "editorMode": "builder", - "expr": "sum by(pod) (rate(quickwit_storage_object_storage_upload_num_bytes{namespace=\"$namespace\", pod=~\"$pod\", instance=~\"$instance\"}[$__rate_interval]))", + "expr": "sum by(pod) (rate(quickwit_storage_object_storage_upload_num_bytes{instance=~\"$instance\"}[$__rate_interval]))", "hide": false, "legendFormat": "Upload bytes / sec - {{pod}}", "range": true, @@ -806,8 +807,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -910,8 +910,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -965,23 +964,12 @@ "type": "prometheus", "uid": "${datasource}" }, - "editorMode": "builder", - "expr": "sum(rate(quickwit_storage_object_storage_gets_total{instance=~\"$instance\"}[$__rate_interval]))", - "legendFormat": "GET req/sec", - "range": true, - "refId": "Download" - }, - { - "datasource": { - "type": "prometheus", - "uid": "${datasource}" - }, - "editorMode": "builder", - "expr": "sum(rate(quickwit_storage_object_storage_puts_total{namespace=\"$namespace\", pod=~\"$pod\", instance=~\"$instance\"}[$__rate_interval]))", + "editorMode": "code", + "expr": "sum(rate(quickwit_storage_object_storage_requests_total{instance=~\"$instance\"}[$__rate_interval])) by (action)", "hide": false, - "legendFormat": "PUT req/sec", + "legendFormat": "{{action}} req/sec", "range": true, - "refId": "Upload" + "refId": "Requests" } ], "title": "Requests on object storage", @@ -1034,8 +1022,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -1130,8 +1117,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -1191,7 +1177,7 @@ "list": [ { "current": { - "selected": true, + "selected": false, "text": "Prometheus", "value": "PBFA97CFB590B2093" }, @@ -1245,6 +1231,6 @@ "timezone": "", "title": "Quickwit Indexers", "uid": "quickwit-indexers", - "version": 2, + "version": 6, "weekStart": "" } diff --git a/monitoring/grafana/dashboards/searchers.json b/monitoring/grafana/dashboards/searchers.json index b6f7ade938b..fdb88dcfb73 100644 --- a/monitoring/grafana/dashboards/searchers.json +++ b/monitoring/grafana/dashboards/searchers.json @@ -18,7 +18,7 @@ "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 0, - "id": 2, + "id": 4, "links": [], "liveNow": false, "panels": [ @@ -307,18 +307,18 @@ "uid": "${datasource}" }, "disableTextWrap": false, - "editorMode": "builder", - "expr": "rate(quickwit_storage_object_storage_gets_total{instance=~\"$instance\"}[$__rate_interval])", + "editorMode": "code", + "expr": "sum(rate(quickwit_storage_object_storage_requests_total{instance=~\"$instance\"}[$__rate_interval])) by (action)", "fullMetaSearch": false, "includeNullMetadata": false, "instant": false, - "legendFormat": "Total", + "legendFormat": "{{action}} req/sec", "range": true, "refId": "A", "useBackend": false } ], - "title": "Number of GET requests", + "title": "Object store requests", "type": "timeseries" }, { @@ -407,18 +407,18 @@ "uid": "${datasource}" }, "disableTextWrap": false, - "editorMode": "builder", - "expr": "quickwit_storage_object_storage_download_num_bytes{instance=~\"$instance\"}", + "editorMode": "code", + "expr": "rate(quickwit_storage_object_storage_download_num_bytes{instance=~\"$instance\"}[$__rate_interval])", "fullMetaSearch": false, "includeNullMetadata": true, "instant": false, - "legendFormat": "Downloaded bytes", + "legendFormat": "Download bytes / sec ", "range": true, "refId": "A", "useBackend": false } ], - "title": "Size of GET requests (bytes)", + "title": "Object store download rate", "type": "timeseries" }, { @@ -506,8 +506,8 @@ "uid": "${datasource}" }, "disableTextWrap": false, - "editorMode": "builder", - "expr": "rate(quickwit_cache_cache_hits_total{instance=~\"$instance\"}[$__rate_interval])", + "editorMode": "code", + "expr": "sum(rate(quickwit_cache_cache_hits_total{instance=~\"$instance\"}[$__rate_interval])) by (component_name)", "fullMetaSearch": false, "includeNullMetadata": true, "instant": false, @@ -710,7 +710,7 @@ "fullMetaSearch": false, "includeNullMetadata": true, "instant": false, - "legendFormat": "Split footer", + "legendFormat": "{{component_name}}", "range": true, "refId": "A", "useBackend": false @@ -810,7 +810,7 @@ "fullMetaSearch": false, "includeNullMetadata": true, "instant": false, - "legendFormat": "Split footer", + "legendFormat": "{{component_name}}", "range": true, "refId": "A", "useBackend": false @@ -874,7 +874,6 @@ "sort": 0, "type": "query" } - ] }, "time": { @@ -885,6 +884,6 @@ "timezone": "", "title": "Quickwit Searchers", "uid": "quickwit-searchers", - "version": 1, + "version": 4, "weekStart": "" } diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index c59bf953937..4212760e0b7 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -342,6 +342,7 @@ pub struct InFlightDataGauges { pub doc_processor_mailbox: IntGauge, pub indexer_mailbox: IntGauge, pub index_writer: IntGauge, + pub get_object: IntGauge, in_flight_gauge_vec: IntGaugeVec<1>, } @@ -365,6 +366,7 @@ impl Default for InFlightDataGauges { doc_processor_mailbox: in_flight_gauge_vec.with_label_values(["doc_processor_mailbox"]), indexer_mailbox: in_flight_gauge_vec.with_label_values(["indexer_mailbox"]), index_writer: in_flight_gauge_vec.with_label_values(["index_writer"]), + get_object: in_flight_gauge_vec.with_label_values(["get_object"]), in_flight_gauge_vec: in_flight_gauge_vec.clone(), } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs index 1fb32c0b2fd..1b27617c624 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs @@ -16,7 +16,8 @@ use mrecordlog::ResourceUsage; use once_cell::sync::Lazy; use quickwit_common::metrics::{ Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, exponential_buckets, - linear_buckets, new_counter_vec, new_gauge, new_gauge_vec, new_histogram, new_histogram_vec, + linear_buckets, new_counter, new_counter_vec, new_gauge, new_gauge_vec, new_histogram, + new_histogram_vec, }; // Counter vec counting the different outcomes of ingest requests as @@ -82,6 +83,8 @@ pub(super) struct IngestV2Metrics { pub wal_disk_used_bytes: IntGauge, pub wal_memory_used_bytes: IntGauge, pub ingest_results: IngestResultMetrics, + pub replicated_num_bytes_total: IntCounter, + pub replicated_num_docs_total: IntCounter, } impl Default for IngestV2Metrics { @@ -146,6 +149,18 @@ impl Default for IngestV2Metrics { "ingest", &[], ), + replicated_num_bytes_total: new_counter( + "replicated_num_bytes_total", + "Total size in bytes of the replicated docs.", + "ingest", + &[], + ), + replicated_num_docs_total: new_counter( + "replicated_num_docs_total", + "Total number of docs replicated.", + "ingest", + &[], + ), } } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index 5e286ec5b84..5c54593fe69 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -37,8 +37,8 @@ use super::metrics::report_wal_usage; use super::models::IngesterShard; use super::mrecordlog_utils::check_enough_capacity; use super::state::IngesterState; +use crate::ingest_v2::metrics::INGEST_V2_METRICS; use crate::ingest_v2::mrecordlog_utils::{AppendDocBatchError, append_non_empty_doc_batch}; -use crate::metrics::INGEST_METRICS; use crate::{estimate_size, with_lock_metrics}; pub(super) const SYN_REPLICATION_STREAM_CAPACITY: usize = 5; @@ -667,10 +667,10 @@ impl ReplicationTask { .expect("replica shard should be initialized") .set_replication_position_inclusive(current_position_inclusive.clone(), now); - INGEST_METRICS + INGEST_V2_METRICS .replicated_num_bytes_total .inc_by(batch_num_bytes); - INGEST_METRICS + INGEST_V2_METRICS .replicated_num_docs_total .inc_by(batch_num_docs); diff --git a/quickwit/quickwit-ingest/src/metrics.rs b/quickwit/quickwit-ingest/src/metrics.rs index 15eb5d661de..4601c6e7498 100644 --- a/quickwit/quickwit-ingest/src/metrics.rs +++ b/quickwit/quickwit-ingest/src/metrics.rs @@ -13,18 +13,14 @@ // limitations under the License. use once_cell::sync::Lazy; -use quickwit_common::metrics::{IntCounter, IntGauge, new_counter, new_counter_vec, new_gauge}; +use quickwit_common::metrics::{IntCounter, new_counter_vec}; pub struct IngestMetrics { + // With ingest V1 all ingested documents are considered valid pub ingested_docs_bytes_valid: IntCounter, + pub ingested_docs_valid: IntCounter, pub ingested_docs_bytes_invalid: IntCounter, pub ingested_docs_invalid: IntCounter, - pub ingested_docs_valid: IntCounter, - - pub replicated_num_bytes_total: IntCounter, - pub replicated_num_docs_total: IntCounter, - #[allow(dead_code)] // this really shouldn't be dead, it needs to be used somewhere - pub queue_count: IntGauge, } impl Default for IngestMetrics { @@ -56,24 +52,6 @@ impl Default for IngestMetrics { ingested_docs_bytes_invalid, ingested_docs_valid, ingested_docs_invalid, - replicated_num_bytes_total: new_counter( - "replicated_num_bytes_total", - "Total size in bytes of the replicated docs.", - "ingest", - &[], - ), - replicated_num_docs_total: new_counter( - "replicated_num_docs_total", - "Total number of docs replicated.", - "ingest", - &[], - ), - queue_count: new_gauge( - "queue_count", - "Number of queues currently active", - "ingest", - &[], - ), } } } diff --git a/quickwit/quickwit-serve/src/metrics.rs b/quickwit/quickwit-serve/src/metrics.rs index f88896ea68a..bda75da7718 100644 --- a/quickwit/quickwit-serve/src/metrics.rs +++ b/quickwit/quickwit-serve/src/metrics.rs @@ -52,14 +52,14 @@ impl Default for ServeMetrics { ), ongoing_requests: new_gauge_vec( "ongoing_requests", - "Number of ongoing requests.", + "Number of ongoing requests on specific endpoint groups", "", &[], ["endpoint_group"], ), pending_requests: new_gauge_vec( "pending_requests", - "Number of pending requests.", + "Number of pending requests on specific endpoint groups", "", &[], ["endpoint_group"], diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index 43ef588e192..61342c593b6 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -16,8 +16,8 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - GaugeGuard, Histogram, IntCounter, IntCounterVec, IntGauge, new_counter, new_counter_vec, - new_gauge, new_histogram_vec, + GaugeGuard, HistogramVec, IntCounter, IntCounterVec, IntGauge, MEMORY_METRICS, new_counter, + new_counter_vec, new_gauge, new_histogram_vec, }; /// Counters associated to storage operations. @@ -30,19 +30,11 @@ pub struct StorageMetrics { pub searcher_split_cache: CacheMetrics, pub get_slice_timeout_successes: [IntCounter; 3], pub get_slice_timeout_all_timeouts: IntCounter, - pub object_storage_get_total: IntCounter, - pub object_storage_get_errors_total: IntCounterVec<1>, - pub object_storage_get_slice_in_flight_count: IntGauge, - pub object_storage_get_slice_in_flight_num_bytes: IntGauge, - pub object_storage_put_total: IntCounter, - pub object_storage_put_parts: IntCounter, - pub object_storage_download_num_bytes: IntCounter, - pub object_storage_upload_num_bytes: IntCounter, - - pub object_storage_delete_requests_total: IntCounter, - pub object_storage_bulk_delete_requests_total: IntCounter, - pub object_storage_delete_request_duration: Histogram, - pub object_storage_bulk_delete_request_duration: Histogram, + pub object_storage_requests_total: IntCounterVec<2>, + pub object_storage_request_duration: HistogramVec<2>, + pub object_storage_download_num_bytes: IntCounterVec<1>, + pub object_storage_download_errors: IntCounterVec<1>, + pub object_storage_upload_num_bytes: IntCounterVec<1>, } impl Default for StorageMetrics { @@ -63,31 +55,6 @@ impl Default for StorageMetrics { let get_slice_timeout_all_timeouts = get_slice_timeout_outcome_total_vec.with_label_values(["all_timeouts"]); - let object_storage_requests_total = new_counter_vec( - "object_storage_requests_total", - "Total number of object storage requests performed.", - "storage", - &[], - ["action"], - ); - let object_storage_delete_requests_total = - object_storage_requests_total.with_label_values(["delete_object"]); - let object_storage_bulk_delete_requests_total = - object_storage_requests_total.with_label_values(["delete_objects"]); - - let object_storage_request_duration = new_histogram_vec( - "object_storage_request_duration_seconds", - "Duration of object storage requests in seconds.", - "storage", - &[], - ["action"], - vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0], - ); - let object_storage_delete_request_duration = - object_storage_request_duration.with_label_values(["delete_object"]); - let object_storage_bulk_delete_request_duration = - object_storage_request_duration.with_label_values(["delete_objects"]); - StorageMetrics { fast_field_cache: CacheMetrics::for_component("fastfields"), fd_cache_metrics: CacheMetrics::for_component("fd"), @@ -97,62 +64,50 @@ impl Default for StorageMetrics { split_footer_cache: CacheMetrics::for_component("splitfooter"), get_slice_timeout_successes, get_slice_timeout_all_timeouts, - object_storage_get_total: new_counter( - "object_storage_gets_total", - "Number of objects fetched. Might be lower than get_slice_timeout_outcome if \ - queries are debounced.", + object_storage_requests_total: new_counter_vec( + "object_storage_requests_total", + "Number of requests to the object store, by action and status. Requests are \ + recorded when the response headers are returned, download failures will not \ + appear as errors.", "storage", &[], + ["action", "status"], ), - object_storage_get_errors_total: new_counter_vec::<1>( - "object_storage_get_errors_total", - "Number of GetObject errors.", + object_storage_request_duration: new_histogram_vec( + "object_storage_request_duration", + "Durations until the response headers are returned from the object store, by \ + action and status. This does not measure the download time for the body content.", "storage", &[], - ["code"], + ["action", "status"], + vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0], ), - object_storage_get_slice_in_flight_count: new_gauge( - "object_storage_get_slice_in_flight_count", - "Number of GetObject for which the memory was allocated but the download is still \ - in progress.", - "storage", - &[], - ), - object_storage_get_slice_in_flight_num_bytes: new_gauge( - "object_storage_get_slice_in_flight_num_bytes", - "Memory allocated for GetObject requests that are still in progress.", - "storage", - &[], - ), - object_storage_put_total: new_counter( - "object_storage_puts_total", - "Number of objects uploaded. May differ from object_storage_requests_parts due to \ - multipart upload.", + object_storage_download_num_bytes: new_counter_vec( + "object_storage_download_num_bytes", + "Amount of data downloaded from object storage.", "storage", &[], + ["status"], ), - object_storage_put_parts: new_counter( - "object_storage_puts_parts", - "Number of object parts uploaded.", - "", - &[], - ), - object_storage_download_num_bytes: new_counter( - "object_storage_download_num_bytes", - "Amount of data downloaded from an object storage.", + object_storage_download_errors: new_counter_vec( + "object_storage_download_errors", + // Download errors are recorded separately because the associated + // get_object requests were already recorded as successful in + // object_storage_requests_total + "Number of download requests that received successful response headers but failed \ + during download.", "storage", &[], + ["status"], ), - object_storage_upload_num_bytes: new_counter( + object_storage_upload_num_bytes: new_counter_vec( "object_storage_upload_num_bytes", - "Amount of data uploaded to an object storage.", + "Amount of data uploaded to object storage. The value recorded for failed and \ + aborted uploads is the full payload size.", "storage", &[], + ["status"], ), - object_storage_delete_requests_total, - object_storage_bulk_delete_requests_total, - object_storage_delete_request_duration, - object_storage_bulk_delete_request_duration, } } } @@ -229,15 +184,11 @@ pub static STORAGE_METRICS: Lazy = Lazy::new(StorageMetrics::def pub static CACHE_METRICS_FOR_TESTS: Lazy = Lazy::new(|| CacheMetrics::for_component("fortest")); -pub fn object_storage_get_slice_in_flight_guards( - get_request_size: usize, -) -> (GaugeGuard<'static>, GaugeGuard<'static>) { - let mut bytes_guard = GaugeGuard::from_gauge( - &crate::STORAGE_METRICS.object_storage_get_slice_in_flight_num_bytes, - ); +/// Helps tracking pre-allocated memory for downloads that are still in progress. +/// +/// This is actually recorded as a memory metric and not a storage metric. +pub fn object_storage_get_slice_in_flight_guards(get_request_size: usize) -> GaugeGuard<'static> { + let mut bytes_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.get_object); bytes_guard.add(get_request_size as i64); - let mut count_guard = - GaugeGuard::from_gauge(&crate::STORAGE_METRICS.object_storage_get_slice_in_flight_count); - count_guard.add(1); - (bytes_guard, count_guard) + bytes_guard } diff --git a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs index b21776fa69f..3bb6d9711dd 100644 --- a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs @@ -45,10 +45,13 @@ use tracing::{instrument, warn}; use crate::debouncer::DebouncedStorage; use crate::metrics::object_storage_get_slice_in_flight_guards; +use crate::object_storage::metrics_wrappers::{ + ActionLabel, RequestMetricsWrapperExt, copy_with_download_metrics, +}; use crate::storage::SendableAsync; use crate::{ - BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, STORAGE_METRICS, Storage, - StorageError, StorageErrorKind, StorageFactory, StorageResolverError, StorageResult, + BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, Storage, StorageError, + StorageErrorKind, StorageFactory, StorageResolverError, StorageResult, }; /// Azure object storage resolver. @@ -225,10 +228,6 @@ impl AzureBlobStorage { name: &'a str, payload: Box, ) -> StorageResult<()> { - crate::STORAGE_METRICS.object_storage_put_parts.inc(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(payload.len()); retry(&self.retry_params, || async { let data = Bytes::from(payload.read_all().await?.to_vec()); let hash = azure_storage_blobs::prelude::Hash::from(md5::compute(&data[..]).0); @@ -237,6 +236,7 @@ impl AzureBlobStorage { .put_block_blob(data) .hash(hash) .into_future() + .with_count_and_upload_metrics(ActionLabel::PutObject, payload.len()) .await?; Result::<(), AzureErrorWrapper>::Ok(()) }) @@ -261,10 +261,6 @@ impl AzureBlobStorage { .map(|(num, range)| { let moved_blob_client = blob_client.clone(); let moved_payload = payload.clone(); - crate::STORAGE_METRICS.object_storage_put_parts.inc(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(range.end - range.start); async move { retry(&self.retry_params, || async { let block_id = format!("block:{num}"); @@ -276,6 +272,10 @@ impl AzureBlobStorage { .put_block(block_id.clone(), data) .hash(hash) .into_future() + .with_count_and_upload_metrics( + ActionLabel::UploadPart, + range.end - range.start, + ) .await?; Result::<_, AzureErrorWrapper>::Ok(block_id) }) @@ -299,6 +299,7 @@ impl AzureBlobStorage { blob_client .put_block_list(block_list) .into_future() + .with_count_metric(ActionLabel::CompleteMultipartUpload) .await .map_err(AzureErrorWrapper::from)?; @@ -315,6 +316,7 @@ impl Storage for AzureBlobStorage { .max_results(NonZeroU32::new(1u32).expect("1 is always non-zero.")) .into_stream() .next() + .with_count_metric(ActionLabel::ListObjects) .await { let _ = first_blob_result?; @@ -327,7 +329,6 @@ impl Storage for AzureBlobStorage { path: &Path, payload: Box, ) -> crate::StorageResult<()> { - crate::STORAGE_METRICS.object_storage_put_total.inc(); let name = self.blob_name(path); let total_len = payload.len(); let part_num_bytes = self.multipart_policy.part_num_bytes(total_len); @@ -345,7 +346,11 @@ impl Storage for AzureBlobStorage { let name = self.blob_name(path); let mut output_stream = self.container_client.blob_client(name).get().into_stream(); - while let Some(chunk_result) = output_stream.next().await { + while let Some(chunk_result) = output_stream + .next() + .with_count_metric(ActionLabel::GetObject) + .await + { let chunk_response = chunk_result.map_err(AzureErrorWrapper::from)?; let chunk_response_body_stream = chunk_response .data @@ -353,10 +358,7 @@ impl Storage for AzureBlobStorage { .into_async_read() .compat(); let mut body_stream_reader = BufReader::new(chunk_response_body_stream); - let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?; - STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_stream_reader, output).await?; } output.flush().await?; Ok(()) @@ -369,6 +371,7 @@ impl Storage for AzureBlobStorage { .blob_client(blob_name) .delete() .into_future() + .with_count_metric(ActionLabel::DeleteObject) .await .map_err(|err| AzureErrorWrapper::from(err).into()); ignore_error_kind!(StorageErrorKind::NotFound, delete_res)?; @@ -491,6 +494,7 @@ impl Storage for AzureBlobStorage { .blob_client(name) .get_properties() .into_future() + .with_count_metric(ActionLabel::HeadObject) .await; match properties_result { Ok(response) => Ok(response.blob.properties.content_length), @@ -513,7 +517,7 @@ async fn extract_range_data_and_hash( .await? .into_async_read(); let mut buf: Vec = Vec::with_capacity(range.count()); - tokio::io::copy(&mut reader, &mut buf).await?; + tokio::io::copy_buf(&mut reader, &mut buf).await?; let data = Bytes::from(buf); let hash = md5::compute(&data[..]); Ok((data, hash)) @@ -544,7 +548,11 @@ async fn download_all( output: &mut Vec, ) -> Result<(), AzureErrorWrapper> { output.clear(); - while let Some(chunk_result) = chunk_stream.next().await { + while let Some(chunk_result) = chunk_stream + .next() + .with_count_metric(ActionLabel::GetObject) + .await + { let chunk_response = chunk_result?; let chunk_response_body_stream = chunk_response .data @@ -552,10 +560,7 @@ async fn download_all( .into_async_read() .compat(); let mut body_stream_reader = BufReader::new(chunk_response_body_stream); - let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?; - crate::STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_stream_reader, output).await?; } // When calling `get_all`, the Vec capacity is not properly set. output.shrink_to_fit(); diff --git a/quickwit/quickwit-storage/src/object_storage/error.rs b/quickwit/quickwit-storage/src/object_storage/error.rs index 5f60fe1f944..8a7efc13332 100644 --- a/quickwit/quickwit-storage/src/object_storage/error.rs +++ b/quickwit/quickwit-storage/src/object_storage/error.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use aws_sdk_s3::error::{DisplayErrorContext, ProvideErrorMetadata, SdkError}; +use aws_sdk_s3::error::{DisplayErrorContext, SdkError}; use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadError; use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError; use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadError; @@ -62,11 +62,6 @@ pub trait ToStorageErrorKind { impl ToStorageErrorKind for GetObjectError { fn to_storage_error_kind(&self) -> StorageErrorKind { - let error_code = self.code().unwrap_or("unknown"); - crate::STORAGE_METRICS - .object_storage_get_errors_total - .with_label_values([error_code]) - .inc(); match self { GetObjectError::InvalidObjectState(_) => StorageErrorKind::Service, GetObjectError::NoSuchKey(_) => StorageErrorKind::NotFound, diff --git a/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs b/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs new file mode 100644 index 00000000000..f2d92991984 --- /dev/null +++ b/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs @@ -0,0 +1,426 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll, ready}; +use std::time::Instant; + +use pin_project::{pin_project, pinned_drop}; +use tokio::io::{AsyncBufRead, AsyncWrite}; + +use crate::STORAGE_METRICS; + +pub enum ActionLabel { + AbortMultipartUpload, + CompleteMultipartUpload, + CreateMultipartUpload, + DeleteObject, + DeleteObjects, + GetObject, + HeadObject, + ListObjects, + PutObject, + UploadPart, +} + +impl ActionLabel { + fn as_str(&self) -> &'static str { + match self { + ActionLabel::AbortMultipartUpload => "abort_multipart_upload", + ActionLabel::CompleteMultipartUpload => "complete_multipart_upload", + ActionLabel::CreateMultipartUpload => "create_multipart_upload", + ActionLabel::DeleteObject => "delete_object", + ActionLabel::DeleteObjects => "delete_objects", + ActionLabel::GetObject => "get_object", + ActionLabel::HeadObject => "head_object", + ActionLabel::ListObjects => "list_objects", + ActionLabel::PutObject => "put_object", + ActionLabel::UploadPart => "upload_part", + } + } +} + +pub enum RequestStatus { + Pending, + // only useful on feature="azure" + #[allow(dead_code)] + Done, + Ready(String), +} + +/// Converts an object store client SDK Result<> to the [Status] that should be +/// recorded in the metrics. +/// +/// The `Marker` type is necessary to avoid conflicting implementations of the +/// trait. +pub trait AsRequestStatus { + fn as_status(&self) -> RequestStatus; +} + +/// Wrapper around object store requests to record metrics, including cancellation. +#[pin_project(PinnedDrop)] +pub struct RequestMetricsWrapper +where + F: Future, + F::Output: AsRequestStatus, +{ + #[pin] + tracked: F, + action: ActionLabel, + start: Option, + uploaded_bytes: Option, + status: RequestStatus, + _marker: PhantomData, +} + +#[pinned_drop] +impl PinnedDrop for RequestMetricsWrapper +where + F: Future, + F::Output: AsRequestStatus, +{ + fn drop(self: Pin<&mut Self>) { + let status = match &self.status { + RequestStatus::Pending => "cancelled", + RequestStatus::Done => return, + RequestStatus::Ready(s) => s.as_str(), + }; + let label_values = [self.action.as_str(), status]; + STORAGE_METRICS + .object_storage_requests_total + .with_label_values(label_values) + .inc(); + if let Some(start) = self.start { + STORAGE_METRICS + .object_storage_request_duration + .with_label_values(label_values) + .observe(start.elapsed().as_secs_f64()); + } + if let Some(bytes) = self.uploaded_bytes { + STORAGE_METRICS + .object_storage_upload_num_bytes + .with_label_values([status]) + .inc_by(bytes); + } + } +} + +impl Future for RequestMetricsWrapper +where + F: Future, + F::Output: AsRequestStatus, +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let response = ready!(this.tracked.poll(cx)); + *this.status = response.as_status(); + + Poll::Ready(response) + } +} + +pub trait RequestMetricsWrapperExt +where + F: Future, + F::Output: AsRequestStatus, +{ + fn with_count_metric(self, action: ActionLabel) -> RequestMetricsWrapper; + + fn with_count_and_duration_metrics( + self, + action: ActionLabel, + ) -> RequestMetricsWrapper; + + fn with_count_and_upload_metrics( + self, + action: ActionLabel, + bytes: u64, + ) -> RequestMetricsWrapper; +} + +impl RequestMetricsWrapperExt for F +where + F: Future, + F::Output: AsRequestStatus, +{ + fn with_count_metric(self, action: ActionLabel) -> RequestMetricsWrapper { + RequestMetricsWrapper { + tracked: self, + action, + status: RequestStatus::Pending, + start: None, + uploaded_bytes: None, + _marker: PhantomData, + } + } + + fn with_count_and_duration_metrics( + self, + action: ActionLabel, + ) -> RequestMetricsWrapper { + RequestMetricsWrapper { + tracked: self, + action, + status: RequestStatus::Pending, + start: Some(Instant::now()), + uploaded_bytes: None, + _marker: PhantomData, + } + } + + fn with_count_and_upload_metrics( + self, + action: ActionLabel, + bytes: u64, + ) -> RequestMetricsWrapper { + RequestMetricsWrapper { + tracked: self, + action, + status: RequestStatus::Pending, + start: None, + uploaded_bytes: Some(bytes), + _marker: PhantomData, + } + } +} + +pub struct S3Marker; + +impl AsRequestStatus for Result +where E: aws_sdk_s3::error::ProvideErrorMetadata +{ + fn as_status(&self) -> RequestStatus { + let status_str = match self { + Ok(_) => "success".to_string(), + Err(e) => e.meta().code().unwrap_or("unknown").to_string(), + }; + RequestStatus::Ready(status_str) + } +} + +#[cfg(feature = "azure")] +pub struct AzureMarker; + +#[cfg(feature = "azure")] +impl AsRequestStatus for Result { + fn as_status(&self) -> RequestStatus { + let Err(err) = self else { + return RequestStatus::Ready("success".to_string()); + }; + let err_status_str = match err.kind() { + azure_storage::ErrorKind::HttpResponse { status, .. } => status.to_string(), + azure_storage::ErrorKind::Credential => "credential".to_string(), + azure_storage::ErrorKind::Io => "io".to_string(), + azure_storage::ErrorKind::DataConversion => "data_conversion".to_string(), + _ => "unknown".to_string(), + }; + RequestStatus::Ready(err_status_str) + } +} + +// The Azure SDK get_blob request returns Option because it chunks +// the download into a stream of get requests. +#[cfg(feature = "azure")] +impl AsRequestStatus for Option> { + fn as_status(&self) -> RequestStatus { + match self { + None => RequestStatus::Done, + Some(res) => res.as_status(), + } + } +} + +pub enum DownloadStatus { + InProgress, + Done, + Failed(&'static str), +} + +/// Track io errors during downloads. +/// +/// Downloads are a bit different from other requests because the request might +/// fail while getting the bytes from the response body, long after getting a +/// successful response header. +#[pin_project(PinnedDrop)] +struct DownloadMetricsWrapper<'a, R, W> +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + #[pin] + tracked: copy_buf::CopyBuf<'a, R, W>, + status: DownloadStatus, +} + +#[pinned_drop] +impl<'a, R, W> PinnedDrop for DownloadMetricsWrapper<'a, R, W> +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + fn drop(self: Pin<&mut Self>) { + let error_opt = match &self.status { + DownloadStatus::InProgress => Some("cancelled"), + DownloadStatus::Failed(e) => Some(*e), + DownloadStatus::Done => None, + }; + + STORAGE_METRICS + .object_storage_download_num_bytes + .with_label_values([error_opt.unwrap_or("success")]) + .inc_by(self.tracked.amt); + + if let Some(error) = error_opt { + STORAGE_METRICS + .object_storage_download_errors + .with_label_values([error]) + .inc(); + } + } +} + +impl<'a, R, W> Future for DownloadMetricsWrapper<'a, R, W> +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let response = ready!(this.tracked.poll(cx)); + *this.status = match &response { + Ok(_) => DownloadStatus::Done, + Err(e) => DownloadStatus::Failed(io_error_as_label(e.kind())), + }; + Poll::Ready(response) + } +} + +pub async fn copy_with_download_metrics<'a, R, W>( + reader: &'a mut R, + writer: &'a mut W, +) -> io::Result +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + DownloadMetricsWrapper { + tracked: copy_buf::CopyBuf { + reader, + writer, + amt: 0, + }, + status: DownloadStatus::InProgress, + } + .await +} + +/// This is a fork of `tokio::io::copy_buf` that enables tracking the number of +/// bytes transferred. This estimate should be accurate as long as the network +/// is the bottleneck. +mod copy_buf { + + use std::future::Future; + use std::io; + use std::pin::Pin; + use std::task::{Context, Poll, ready}; + + use tokio::io::{AsyncBufRead, AsyncWrite}; + + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct CopyBuf<'a, R: ?Sized, W: ?Sized> { + pub reader: &'a mut R, + pub writer: &'a mut W, + pub amt: u64, + } + + impl Future for CopyBuf<'_, R, W> + where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, + { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + let me = &mut *self; + let buffer = ready!(Pin::new(&mut *me.reader).poll_fill_buf(cx))?; + if buffer.is_empty() { + ready!(Pin::new(&mut self.writer).poll_flush(cx))?; + return Poll::Ready(Ok(self.amt)); + } + + let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, buffer))?; + if i == 0 { + return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into())); + } + self.amt += i as u64; + Pin::new(&mut *self.reader).consume(i); + } + } + } +} + +fn io_error_as_label(error: io::ErrorKind) -> &'static str { + use io::ErrorKind::*; + // most of these variants are not expected to happen + match error { + AddrInUse => "addr_in_use", + AddrNotAvailable => "addr_not_available", + AlreadyExists => "already_exists", + ArgumentListTooLong => "argument_list_too_long", + BrokenPipe => "broken_pipe", + ConnectionAborted => "connection_aborted", + ConnectionRefused => "connection_refused", + ConnectionReset => "connection_reset", + CrossesDevices => "crosses_devices", + Deadlock => "deadlock", + DirectoryNotEmpty => "directory_not_empty", + ExecutableFileBusy => "executable_file_busy", + FileTooLarge => "file_too_large", + HostUnreachable => "host_unreachable", + Interrupted => "interrupted", + InvalidData => "invalid_data", + InvalidFilename => "invalid_filename", + InvalidInput => "invalid_input", + IsADirectory => "is_a_directory", + NetworkDown => "network_down", + NetworkUnreachable => "network_unreachable", + NotADirectory => "not_a_directory", + NotConnected => "not_connected", + NotFound => "not_found", + NotSeekable => "not_seekable", + Other => "other", + OutOfMemory => "out_of_memory", + PermissionDenied => "permission_denied", + QuotaExceeded => "quota_exceeded", + ReadOnlyFilesystem => "read_only_filesystem", + ResourceBusy => "resource_busy", + StaleNetworkFileHandle => "stale_network_file_handle", + StorageFull => "storage_full", + TimedOut => "timed_out", + TooManyLinks => "too_many_links", + UnexpectedEof => "unexpected_eof", + Unsupported => "unsupported", + WouldBlock => "would_block", + WriteZero => "write_zero", + _ => "uncategorized", + } +} diff --git a/quickwit/quickwit-storage/src/object_storage/mod.rs b/quickwit/quickwit-storage/src/object_storage/mod.rs index e914c107291..cee3bacd338 100644 --- a/quickwit/quickwit-storage/src/object_storage/mod.rs +++ b/quickwit/quickwit-storage/src/object_storage/mod.rs @@ -14,6 +14,8 @@ mod error; +mod metrics_wrappers; + mod s3_compatible_storage; pub use self::s3_compatible_storage::S3CompatibleObjectStorage; pub use self::s3_compatible_storage_resolver::S3CompatibleObjectStorageFactory; diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 6a7105fb8f1..9d6d376205e 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -46,10 +46,13 @@ use tracing::{info, instrument, warn}; use crate::metrics::object_storage_get_slice_in_flight_guards; use crate::object_storage::MultiPartPolicy; +use crate::object_storage::metrics_wrappers::{ + ActionLabel, RequestMetricsWrapperExt, copy_with_download_metrics, +}; use crate::storage::SendableAsync; use crate::{ - BulkDeleteError, DeleteFailure, OwnedBytes, STORAGE_METRICS, Storage, StorageError, - StorageErrorKind, StorageResolverError, StorageResult, + BulkDeleteError, DeleteFailure, OwnedBytes, Storage, StorageError, StorageErrorKind, + StorageResolverError, StorageResult, }; /// Semaphore to limit the number of concurrent requests to the object store. Some object stores @@ -286,11 +289,6 @@ impl S3CompatibleObjectStorage { .await .map_err(|io_error| Retry::Permanent(StorageError::from(io_error)))?; - crate::STORAGE_METRICS.object_storage_put_parts.inc(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(len); - self.s3_client .put_object() .bucket(bucket) @@ -298,6 +296,7 @@ impl S3CompatibleObjectStorage { .body(body) .content_length(len as i64) .send() + .with_count_and_upload_metrics(ActionLabel::PutObject, len) .await .map_err(|sdk_error| { if sdk_error.is_retryable() { @@ -332,6 +331,7 @@ impl S3CompatibleObjectStorage { .bucket(self.bucket.clone()) .key(key) .send() + .with_count_metric(ActionLabel::CreateMultipartUpload) .await }) .await? @@ -421,11 +421,6 @@ impl S3CompatibleObjectStorage { .map_err(Retry::Permanent)?; let md5 = BASE64_STANDARD.encode(part.md5.0); - crate::STORAGE_METRICS.object_storage_put_parts.inc(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(part.len()); - let upload_part_output = self .s3_client .upload_part() @@ -437,6 +432,7 @@ impl S3CompatibleObjectStorage { .part_number(part.part_number as i32) .upload_id(upload_id.0) .send() + .with_count_and_upload_metrics(ActionLabel::UploadPart, part.len()) .await .map_err(|s3_err| { if s3_err.is_retryable() { @@ -516,6 +512,7 @@ impl S3CompatibleObjectStorage { .multipart_upload(completed_upload.clone()) .upload_id(upload_id) .send() + .with_count_metric(ActionLabel::CompleteMultipartUpload) .await }) .await?; @@ -530,6 +527,7 @@ impl S3CompatibleObjectStorage { .key(key) .upload_id(upload_id) .send() + .with_count_metric(ActionLabel::AbortMultipartUpload) .await }) .await?; @@ -544,8 +542,6 @@ impl S3CompatibleObjectStorage { let key = self.key(path); let range_str = range_opt.map(|range| format!("bytes={}-{}", range.start, range.end - 1)); - crate::STORAGE_METRICS.object_storage_get_total.inc(); - let get_object_output = self .s3_client .get_object() @@ -553,6 +549,7 @@ impl S3CompatibleObjectStorage { .key(key) .set_range(range_str) .send() + .with_count_metric(ActionLabel::GetObject) .await?; Ok(get_object_output) } @@ -640,17 +637,12 @@ impl S3CompatibleObjectStorage { for (path_chunk, delete) in &mut delete_requests_it { let delete_objects_res: StorageResult = aws_retry(&self.retry_params, || async { - crate::STORAGE_METRICS - .object_storage_bulk_delete_requests_total - .inc(); - let _timer = crate::STORAGE_METRICS - .object_storage_bulk_delete_request_duration - .start_timer(); self.s3_client .delete_objects() .bucket(self.bucket.clone()) .delete(delete.clone()) .send() + .with_count_and_duration_metrics(ActionLabel::DeleteObjects) .await }) .await @@ -716,10 +708,7 @@ impl S3CompatibleObjectStorage { async fn download_all(byte_stream: ByteStream, output: &mut Vec) -> io::Result<()> { output.clear(); let mut body_stream_reader = BufReader::new(byte_stream.into_async_read()); - let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?; - STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_stream_reader, output).await?; // When calling `get_all`, the Vec capacity is not properly set. output.shrink_to_fit(); Ok(()) @@ -735,6 +724,7 @@ impl Storage for S3CompatibleObjectStorage { .bucket(self.bucket.clone()) .max_keys(1) .send() + .with_count_metric(ActionLabel::ListObjects) .await?; Ok(()) } @@ -744,7 +734,6 @@ impl Storage for S3CompatibleObjectStorage { path: &Path, payload: Box, ) -> crate::StorageResult<()> { - crate::STORAGE_METRICS.object_storage_put_total.inc(); let _permit = REQUEST_SEMAPHORE.acquire().await; let key = self.key(path); let total_len = payload.len(); @@ -763,10 +752,7 @@ impl Storage for S3CompatibleObjectStorage { let get_object_output = aws_retry(&self.retry_params, || self.get_object(path, None)).await?; let mut body_read = BufReader::new(get_object_output.body.into_async_read()); - let num_bytes_copied = tokio::io::copy_buf(&mut body_read, output).await?; - STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_read, output).await?; output.flush().await?; Ok(()) } @@ -776,17 +762,12 @@ impl Storage for S3CompatibleObjectStorage { let bucket = self.bucket.clone(); let key = self.key(path); let delete_res = aws_retry(&self.retry_params, || async { - crate::STORAGE_METRICS - .object_storage_delete_requests_total - .inc(); - let _timer = crate::STORAGE_METRICS - .object_storage_delete_request_duration - .start_timer(); self.s3_client .delete_object() .bucket(&bucket) .key(&key) .send() + .with_count_and_duration_metrics(ActionLabel::DeleteObject) .await }) .await; @@ -867,6 +848,7 @@ impl Storage for S3CompatibleObjectStorage { .bucket(&bucket) .key(&key) .send() + .with_count_metric(ActionLabel::HeadObject) .await }) .await?;