Skip to content

Record end-to-end indexing duration in S3 file notification source #5811

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use std::sync::OnceLock;
use once_cell::sync::Lazy;
use prometheus::{Gauge, HistogramOpts, Opts, TextEncoder};
pub use prometheus::{
Histogram, HistogramTimer, HistogramVec as PrometheusHistogramVec, IntCounter,
IntCounterVec as PrometheusIntCounterVec, IntGauge, IntGaugeVec as PrometheusIntGaugeVec,
exponential_buckets, linear_buckets,
exponential_buckets, linear_buckets, Histogram, HistogramTimer,
HistogramVec as PrometheusHistogramVec, IntCounter, IntCounterVec as PrometheusIntCounterVec,
IntGauge, IntGaugeVec as PrometheusIntGaugeVec,
};

#[derive(Clone)]
Expand Down Expand Up @@ -429,16 +429,28 @@ impl InFlightDataGauges {
}
}

fn is_per_index_metrics_enabled() -> bool {
static PER_INDEX_METRICS_ENABLED: OnceLock<bool> = OnceLock::new();
*PER_INDEX_METRICS_ENABLED
.get_or_init(|| !crate::get_bool_from_env("QW_DISABLE_PER_INDEX_METRICS", false))
}

/// This function returns `index_name` or projects it to `<any>` if per-index metrics are disabled.
pub fn index_label(index_name: &str) -> &str {
static PER_INDEX_METRICS_ENABLED: OnceLock<bool> = OnceLock::new();
let per_index_metrics_enabled: bool = *PER_INDEX_METRICS_ENABLED
.get_or_init(|| !crate::get_bool_from_env("QW_DISABLE_PER_INDEX_METRICS", false));
if per_index_metrics_enabled {
if is_per_index_metrics_enabled() {
index_name
} else {
"__any__"
}
}

/// This function returns `index_name-source_name` or projects it to `<any>` if per-index metrics are disabled.
pub fn source_label(index_name: &str, source_name: &str) -> String {
if is_per_index_metrics_enabled() {
format!("{index_name}-{source_name}")
} else {
"__any__".to_string()
}
}
Comment on lines +447 to +454
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having the source and the index in separate labels is better in all ways.


pub static MEMORY_METRICS: Lazy<MemoryMetrics> = Lazy::new(MemoryMetrics::default);
18 changes: 16 additions & 2 deletions quickwit/quickwit-indexing/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

use once_cell::sync::Lazy;
use quickwit_common::metrics::{
IntCounter, IntCounterVec, IntGauge, IntGaugeVec, new_counter, new_counter_vec, new_gauge,
new_gauge_vec,
HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, linear_buckets, new_counter,
new_counter_vec, new_gauge, new_gauge_vec, new_histogram_vec,
};

pub struct IndexerMetrics {
Expand All @@ -30,6 +30,8 @@ pub struct IndexerMetrics {
// We use a lazy counter, as most users do not use Kafka.
#[cfg_attr(not(feature = "kafka"), allow(dead_code))]
pub kafka_rebalance_total: Lazy<IntCounter>,
#[cfg_attr(not(feature = "queue-sources"), allow(dead_code))]
pub queue_source_index_duration_seconds: Lazy<HistogramVec<1>>,
}

impl Default for IndexerMetrics {
Expand Down Expand Up @@ -98,6 +100,18 @@ impl Default for IndexerMetrics {
&[],
)
}),
queue_source_index_duration_seconds: Lazy::new(|| {
new_histogram_vec(
"queue_source_index_duration_seconds",
"Number of seconds it took since the message was generated until it was sent \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Number of seconds it took since the message was generated until it was sent \
"Duration (seconds) between the queue message event time (parsed from its content if available) and its acknowledgment"

to be acknowledged (deleted).",
"indexing",
&[],
["source"],
// 15 seconds up to 3 minutes
linear_buckets(15.0, 15.0, 12).unwrap(),
Comment on lines +111 to +112
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 3 min really enough in general? I think you might be interested to see much higher value when the system starts to behave badly. But 12 buckets is already a lot, I would then switch to an exponential scale.

)
}),
}
}
}
Expand Down
18 changes: 16 additions & 2 deletions quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::Duration;

use itertools::Itertools;
use quickwit_actors::{ActorExitStatus, Mailbox};
use quickwit_common::metrics::source_label;
use quickwit_common::rate_limited_error;
use quickwit_config::{FileSourceMessageType, FileSourceSqs};
use quickwit_metastore::checkpoint::SourceCheckpoint;
Expand All @@ -26,6 +27,7 @@ use quickwit_proto::metastore::SourceType;
use quickwit_proto::types::SourceUid;
use quickwit_storage::StorageResolver;
use serde::Serialize;
use time::OffsetDateTime;
use ulid::Ulid;

use super::Queue;
Expand Down Expand Up @@ -302,8 +304,20 @@ impl QueueCoordinator {
.collect::<Vec<_>>();
let mut completed = Vec::new();
for partition_id in committed_partition_ids {
let ack_id_opt = self.local_state.mark_completed(partition_id);
if let Some(ack_id) = ack_id_opt {
let completed_opt = self.local_state.mark_completed(partition_id);
if let Some((ack_id, timestamp_opt)) = completed_opt {
if let Some(timestamp) = timestamp_opt {
let duration = OffsetDateTime::now_utc() - timestamp;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can panic if something is off with the event time. It's unlikely, but it's unfortunate to panic because of a metric. Refactor this as:

fn record_index_duration_metric(index_id: &str, source_id: &str, source_event_time_opt: Option<OffsetDateTime>) {
  let Some(source_event_time) = source_event_time_opt else {
     return
  };
  let now = OffsetDateTime::now_utc();
  if now < source_event_time {
    error!("Event time smaller than current time");
    return;
  }
  let duration = now - source_event_time;
  let index_label = index_label(
      index_id,
  );
  let source_label = source_label(
      source_id,
  );
  crate::metrics::INDEXER_METRICS
      .queue_source_index_duration_seconds
      .with_label_values([index_label, source_label])
      .observe(duration.as_seconds_f64());
}

let label = source_label(
&self.pipeline_id.index_uid.index_id,
&self.pipeline_id.source_id,
);
crate::metrics::INDEXER_METRICS
.queue_source_index_duration_seconds
.with_label_values([&label])
.observe(duration.as_seconds_f64());
}

completed.push(ack_id);
}
}
Expand Down
21 changes: 14 additions & 7 deletions quickwit/quickwit-indexing/src/source/queue_sources/local_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque};

use anyhow::bail;
use quickwit_metastore::checkpoint::PartitionId;
use time::OffsetDateTime;

use super::message::{InProgressMessage, ReadyMessage};

Expand All @@ -34,8 +35,8 @@ pub struct QueueLocalState {
/// Message that is currently being read and sent to the `DocProcessor`
read_in_progress: Option<InProgressMessage>,
/// Partitions that were read and are still being indexed, with their
/// associated ack_id
awaiting_commit: BTreeMap<PartitionId, String>,
/// associated ack_id and optional creation timestamp
awaiting_commit: BTreeMap<PartitionId, (String, Option<OffsetDateTime>)>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could refactor (String, Option<OffsetDateTime>) to

pub struct AwaitingCommitMessage {
  ack_id: String,
  source_event_time_opt: Option<OffsetDateTime>
}

It would help readability I think.

/// Partitions that were fully indexed and committed
completed: BTreeSet<PartitionId>,
}
Expand Down Expand Up @@ -94,7 +95,10 @@ impl QueueLocalState {
if let Some(in_progress) = self.read_in_progress.take() {
self.awaiting_commit.insert(
in_progress.partition_id.clone(),
in_progress.visibility_handle.ack_id().to_string(),
(
in_progress.visibility_handle.ack_id().to_string(),
in_progress.timestamp_opt,
),
);
in_progress
.visibility_handle
Expand All @@ -117,10 +121,13 @@ impl QueueLocalState {
Ok(())
}

/// Returns the ack_id if that message was awaiting_commit
pub fn mark_completed(&mut self, partition_id: PartitionId) -> Option<String> {
let ack_id_opt = self.awaiting_commit.remove(&partition_id);
/// Returns the ack_id and creation timestamp if that message was awaiting_commit
pub fn mark_completed(
&mut self,
partition_id: PartitionId,
) -> Option<(String, Option<OffsetDateTime>)> {
let completed_opt = self.awaiting_commit.remove(&partition_id);
self.completed.insert(partition_id);
ack_id_opt
completed_opt
}
}
56 changes: 41 additions & 15 deletions quickwit/quickwit-indexing/src/source/queue_sources/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use quickwit_proto::types::Position;
use quickwit_storage::{OwnedBytes, StorageResolver};
use serde_json::Value;
use thiserror::Error;
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
use tracing::info;

use super::visibility::VisibilityTaskHandle;
Expand Down Expand Up @@ -82,18 +83,23 @@ impl RawMessage {
self,
message_type: MessageType,
) -> Result<PreProcessedMessage, PreProcessingError> {
let payload = match message_type {
MessageType::S3Notification => PreProcessedPayload::ObjectUri(
uri_from_s3_notification(&self.payload, &self.metadata.ack_id)?,
),
let (payload, timestamp_opt) = match message_type {
MessageType::S3Notification => {
let (uri, timestamp) = parse_s3_notification(&self.payload, &self.metadata.ack_id)?;
(PreProcessedPayload::ObjectUri(uri), Some(timestamp))
}
MessageType::RawUri => {
let payload_str = read_to_string(self.payload).context("failed to read payload")?;
PreProcessedPayload::ObjectUri(Uri::from_str(&payload_str)?)
(
PreProcessedPayload::ObjectUri(Uri::from_str(&payload_str)?),
None,
)
}
};
Ok(PreProcessedMessage {
metadata: self.metadata,
payload,
timestamp_opt,
})
}
}
Expand Down Expand Up @@ -122,6 +128,7 @@ impl PreProcessedPayload {
pub struct PreProcessedMessage {
pub metadata: MessageMetadata,
pub payload: PreProcessedPayload,
pub timestamp_opt: Option<OffsetDateTime>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub timestamp_opt: Option<OffsetDateTime>,
pub source_event_time_opt: Option<OffsetDateTime>,

}

impl PreProcessedMessage {
Expand All @@ -130,7 +137,10 @@ impl PreProcessedMessage {
}
}

fn uri_from_s3_notification(message: &[u8], ack_id: &str) -> Result<Uri, PreProcessingError> {
fn parse_s3_notification(
message: &[u8],
ack_id: &str,
) -> Result<(Uri, OffsetDateTime), PreProcessingError> {
let value: Value = serde_json::from_slice(message).context("invalid JSON message")?;
if matches!(value["Event"].as_str(), Some("s3:TestEvent")) {
info!("discarding S3 test event");
Expand All @@ -151,6 +161,13 @@ fn uri_from_s3_notification(message: &[u8], ack_id: &str) -> Result<Uri, PreProc
ack_id: ack_id.to_string(),
});
}

let event_time = value["Records"][0]["eventTime"]
.as_str()
.context("invalid S3 notification: Records[0].eventTime not found")?;
let timestamp = OffsetDateTime::parse(event_time, &Rfc3339)
.context("invalid S3 notification: Records[0].eventTime not in rfc3339")?;

let key = value["Records"][0]["s3"]["object"]["key"]
.as_str()
.context("invalid S3 notification: Records[0].s3.object.key not found")?;
Expand All @@ -160,7 +177,9 @@ fn uri_from_s3_notification(message: &[u8], ack_id: &str) -> Result<Uri, PreProc
let encoded_key = percent_encoding::percent_decode(key.as_bytes())
.decode_utf8()
.context("invalid S3 notification: Records[0].s3.object.key could not be url decoded")?;
Uri::from_str(&format!("s3://{}/{}", bucket, encoded_key)).map_err(|e| e.into())
let uri = Uri::from_str(&format!("s3://{}/{}", bucket, encoded_key))?;

Ok((uri, timestamp))
}

/// A message for which we know as much of the global processing status as
Expand Down Expand Up @@ -193,6 +212,7 @@ impl ReadyMessage {
batch_reader,
partition_id,
visibility_handle: self.visibility_handle,
timestamp_opt: self.content.timestamp_opt,
}))
}
}
Expand All @@ -209,6 +229,7 @@ pub struct InProgressMessage {
pub partition_id: PartitionId,
pub visibility_handle: VisibilityTaskHandle,
pub batch_reader: ObjectUriBatchReader,
pub timestamp_opt: Option<OffsetDateTime>,
}

#[cfg(test)]
Expand Down Expand Up @@ -257,9 +278,13 @@ mod tests {
}
]
}"#;
let actual_uri = uri_from_s3_notification(test_message.as_bytes(), "myackid").unwrap();
let (actual_uri, actual_timestamp) =
parse_s3_notification(test_message.as_bytes(), "myackid").unwrap();
let expected_uri = Uri::from_str("s3://mybucket/logs.json").unwrap();
let expected_timestamp =
OffsetDateTime::parse("2021-05-22T09:22:41.789Z", &Rfc3339).unwrap();
assert_eq!(actual_uri, expected_uri);
assert_eq!(actual_timestamp, expected_timestamp);
}

#[test]
Expand All @@ -275,8 +300,7 @@ mod tests {
}
]
}"#;
let result =
uri_from_s3_notification(&OwnedBytes::new(invalid_message.as_bytes()), "myackid");
let result = parse_s3_notification(&OwnedBytes::new(invalid_message.as_bytes()), "myackid");
assert!(matches!(
result,
Err(PreProcessingError::UnexpectedFormat(_))
Expand Down Expand Up @@ -321,8 +345,7 @@ mod tests {
}
]
}"#;
let result =
uri_from_s3_notification(&OwnedBytes::new(invalid_message.as_bytes()), "myackid");
let result = parse_s3_notification(&OwnedBytes::new(invalid_message.as_bytes()), "myackid");
assert!(matches!(
result,
Err(PreProcessingError::Discardable { .. })
Expand All @@ -339,8 +362,7 @@ mod tests {
"RequestId":"5582815E1AEA5ADF",
"HostId":"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE"
}"#;
let result =
uri_from_s3_notification(&OwnedBytes::new(invalid_message.as_bytes()), "myackid");
let result = parse_s3_notification(&OwnedBytes::new(invalid_message.as_bytes()), "myackid");
if let Err(PreProcessingError::Discardable { ack_id }) = result {
assert_eq!(ack_id, "myackid");
} else {
Expand Down Expand Up @@ -390,8 +412,12 @@ mod tests {
}
]
}"#;
let actual_uri = uri_from_s3_notification(test_message.as_bytes(), "myackid").unwrap();
let (actual_uri, actual_timestamp) =
parse_s3_notification(test_message.as_bytes(), "myackid").unwrap();
let expected_uri = Uri::from_str("s3://mybucket/hello::world::logs.json").unwrap();
let expected_timestamp =
OffsetDateTime::parse("2021-05-22T09:22:41.789Z", &Rfc3339).unwrap();
assert_eq!(actual_uri, expected_uri);
assert_eq!(actual_timestamp, expected_timestamp);
}
}
Loading