Skip to content

Record end-to-end indexing duration in S3 file notification source #5811

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

tontinton
Copy link
Contributor

No description provided.

@tontinton tontinton marked this pull request as draft June 19, 2025 19:59
@tontinton tontinton force-pushed the queue-source-duration-metric branch from 90dca71 to 70e9318 Compare June 19, 2025 20:02
@tontinton tontinton marked this pull request as ready for review June 19, 2025 20:02
Copy link
Collaborator

@rdettai rdettai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution. It's very specific, but I think it can be valuable to any user. Could you try also running the coverage tests (which also contain SQS tests) on your fork repo? It seems I can't do that from this repo.

Comment on lines +447 to +454
/// This function returns `index_name-source_name` or projects it to `<any>` if per-index metrics are disabled.
pub fn source_label(index_name: &str, source_name: &str) -> String {
if is_per_index_metrics_enabled() {
format!("{index_name}-{source_name}")
} else {
"__any__".to_string()
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having the source and the index in separate labels is better in all ways.

Comment on lines +111 to +112
// 15 seconds up to 3 minutes
linear_buckets(15.0, 15.0, 12).unwrap(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 3 min really enough in general? I think you might be interested to see much higher value when the system starts to behave badly. But 12 buckets is already a lot, I would then switch to an exponential scale.

queue_source_index_duration_seconds: Lazy::new(|| {
new_histogram_vec(
"queue_source_index_duration_seconds",
"Number of seconds it took since the message was generated until it was sent \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Number of seconds it took since the message was generated until it was sent \
"Duration (seconds) between the queue message event time (parsed from its content if available) and its acknowledgment"

/// associated ack_id
awaiting_commit: BTreeMap<PartitionId, String>,
/// associated ack_id and optional creation timestamp
awaiting_commit: BTreeMap<PartitionId, (String, Option<OffsetDateTime>)>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could refactor (String, Option<OffsetDateTime>) to

pub struct AwaitingCommitMessage {
  ack_id: String,
  source_event_time_opt: Option<OffsetDateTime>
}

It would help readability I think.

@@ -122,6 +128,7 @@ impl PreProcessedPayload {
pub struct PreProcessedMessage {
pub metadata: MessageMetadata,
pub payload: PreProcessedPayload,
pub timestamp_opt: Option<OffsetDateTime>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub timestamp_opt: Option<OffsetDateTime>,
pub source_event_time_opt: Option<OffsetDateTime>,

let completed_opt = self.local_state.mark_completed(partition_id);
if let Some((ack_id, timestamp_opt)) = completed_opt {
if let Some(timestamp) = timestamp_opt {
let duration = OffsetDateTime::now_utc() - timestamp;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can panic if something is off with the event time. It's unlikely, but it's unfortunate to panic because of a metric. Refactor this as:

fn record_index_duration_metric(index_id: &str, source_id: &str, source_event_time_opt: Option<OffsetDateTime>) {
  let Some(source_event_time) = source_event_time_opt else {
     return
  };
  let now = OffsetDateTime::now_utc();
  if now < source_event_time {
    error!("Event time smaller than current time");
    return;
  }
  let duration = now - source_event_time;
  let index_label = index_label(
      index_id,
  );
  let source_label = source_label(
      source_id,
  );
  crate::metrics::INDEXER_METRICS
      .queue_source_index_duration_seconds
      .with_label_values([index_label, source_label])
      .observe(duration.as_seconds_f64());
}

@rdettai
Copy link
Collaborator

rdettai commented Jun 23, 2025

Did you check out the SQS ApproximateAgeOfOldestMessage metric? I'm not sure what you want to monitor exactly, but usually it's what you would use to monitor that queues are processed through in a timely manner.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants