Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .evergreen/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ set -o pipefail
source .evergreen/env.sh
source .evergreen/cargo-test.sh

FEATURE_FLAGS+=("tracing-unstable" "cert-key-password")
FEATURE_FLAGS+=("tracing-unstable" "cert-key-password" "opentelemetry")

if [ "$OPENSSL" = true ]; then
FEATURE_FLAGS+=("openssl-tls")
Expand Down
47 changes: 40 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ tracing-unstable = ["dep:tracing", "dep:log", "bson3?/serde_json-1"]
# compatible with the preview version.
text-indexes-unstable = []

# Enable support for opentelemetry instrumentation
opentelemetry = ["dep:opentelemetry"]

[dependencies]
base64 = "0.13.0"
bitflags = "1.1.0"
Expand All @@ -101,6 +104,7 @@ mongodb-internal-macros = { path = "macros", version = "3.3.0" }
num_cpus = { version = "1.13.1", optional = true }
openssl = { version = "0.10.38", optional = true }
openssl-probe = { version = "0.1.5", optional = true }
opentelemetry = { version = "0.31.0", optional = true }
pem = { version = "3.0.4", optional = true }
percent-encoding = "2.0.0"
pkcs8 = { version = "0.10.2", features = ["encryption", "pkcs5"], optional = true }
Expand All @@ -118,7 +122,7 @@ take_mut = "0.2.2"
thiserror = "1.0.24"
tokio-openssl = { version = "0.6.3", optional = true }
tracing = { version = "0.1.36", optional = true }
typed-builder = "0.20.0"
typed-builder = "0.22.0"
webpki-roots = "1"
zstd = { version = "0.11.2", optional = true }
macro_magic = "0.5.1"
Expand Down Expand Up @@ -216,6 +220,7 @@ futures = "0.3"
hex = "0.4"
home = "0.5"
lambda_runtime = "0.6.0"
opentelemetry_sdk = { version = "0.31.0", features = ["testing"] }
pkcs8 = { version = "0.10.2", features = ["3des", "des-insecure", "sha1-insecure"] }
pretty_assertions = "1.3.0"
serde = { version = ">= 0.0.0", features = ["rc"] }
Expand Down
32 changes: 32 additions & 0 deletions src/bson_util.rs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The utility functions here just got moved out of tracing since they're needed for otel as well.

Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,38 @@ pub(crate) mod option_u64_as_i64 {
}
}

/// Truncates the given string at the closest UTF-8 character boundary >= the provided length.
/// If the new length is >= the current length, does nothing.
#[cfg(any(feature = "tracing-unstable", feature = "opentelemetry"))]
pub(crate) fn truncate_on_char_boundary(s: &mut String, new_len: usize) {
let original_len = s.len();
if original_len > new_len {
// to avoid generating invalid UTF-8, find the first index >= max_length_bytes that is
// the end of a character.
// TODO: RUST-1496 we should use ceil_char_boundary here but it's currently nightly-only.
// see: https://doc.rust-lang.org/std/string/struct.String.html#method.ceil_char_boundary
let mut truncate_index = new_len;
// is_char_boundary returns true when the provided value == the length of the string, so
// if we reach the end of the string this loop will terminate.
while !s.is_char_boundary(truncate_index) {
truncate_index += 1;
}
s.truncate(truncate_index);
// due to the "rounding up" behavior we might not actually end up truncating anything.
// if we did, spec requires we add a trailing "...".
if truncate_index < original_len {
s.push_str("...")
}
}
}

#[cfg(any(feature = "tracing-unstable", feature = "opentelemetry"))]
pub(crate) fn doc_to_json_str(doc: crate::bson::Document, max_length_bytes: usize) -> String {
let mut ext_json = Bson::Document(doc).into_relaxed_extjson().to_string();
truncate_on_char_boundary(&mut ext_json, max_length_bytes);
ext_json
}

#[cfg(test)]
mod test {
use crate::bson_util::num_decimal_digits;
Expand Down
12 changes: 12 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ struct ClientInner {
end_sessions_token: std::sync::Mutex<AsyncDropToken>,
#[cfg(feature = "in-use-encryption")]
csfle: tokio::sync::RwLock<Option<csfle::ClientState>>,
#[cfg(feature = "opentelemetry")]
tracer: opentelemetry::global::BoxedTracer,
#[cfg(test)]
disable_command_events: AtomicBool,
}
Expand Down Expand Up @@ -181,6 +183,9 @@ impl Client {
tx: Some(cleanup_tx),
});

#[cfg(feature = "opentelemetry")]
let tracer = options.tracer();

let inner = TrackingArc::new(ClientInner {
topology: Topology::new(options.clone())?,
session_pool: ServerSessionPool::new(),
Expand All @@ -193,6 +198,8 @@ impl Client {
end_sessions_token,
#[cfg(feature = "in-use-encryption")]
csfle: Default::default(),
#[cfg(feature = "opentelemetry")]
tracer,
#[cfg(test)]
disable_command_events: AtomicBool::new(false),
});
Expand Down Expand Up @@ -668,6 +675,11 @@ impl Client {
.await;
}
}

#[cfg(feature = "opentelemetry")]
pub(crate) fn tracer(&self) -> &opentelemetry::global::BoxedTracer {
&self.inner.tracer
}
}

#[derive(Clone, Debug)]
Expand Down
120 changes: 78 additions & 42 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use crate::bson::RawDocumentBuf;
use crate::bson::{doc, RawBsonRef, RawDocument, Timestamp};
#[cfg(feature = "in-use-encryption")]
use futures_core::future::BoxFuture;
#[cfg(feature = "opentelemetry")]
use opentelemetry::context::FutureExt;
use serde::de::DeserializeOwned;
use std::sync::LazyLock;

Expand All @@ -14,6 +16,8 @@ use std::{
};

use super::{options::ServerAddress, session::TransactionState, Client, ClientSession};
#[cfg(not(feature = "opentelemetry"))]
use crate::otel::OtelFutureStub as _;
use crate::{
bson::Document,
change_stream::{
Expand Down Expand Up @@ -106,55 +110,72 @@ impl Client {
op: &mut T,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<ExecutionDetails<T>> {
// Validate inputs that can be checked before server selection and connection checkout.
if self.inner.shutdown.executed.load(Ordering::SeqCst) {
return Err(ErrorKind::Shutdown.into());
}
// TODO RUST-9: remove this validation
if !op.is_acknowledged() {
return Err(ErrorKind::InvalidArgument {
message: "Unacknowledged write concerns are not supported".to_string(),
}
.into());
}
if let Some(write_concern) = op.write_concern() {
write_concern.validate()?;
}

// Validate the session and update its transaction status if needed.
let mut session = session.into();
if let Some(ref mut session) = session {
if !TrackingArc::ptr_eq(&self.inner, &session.client().inner) {
return Err(Error::invalid_argument(
"the session provided to an operation must be created from the same client as \
the collection/database on which the operation is being performed",
));
let ctx = self.start_operation_span(op, session.as_deref());
let result = (async move || {
// Validate inputs that can be checked before server selection and connection
// checkout.
if self.inner.shutdown.executed.load(Ordering::SeqCst) {
return Err(ErrorKind::Shutdown.into());
}
if op
.selection_criteria()
.and_then(|sc| sc.as_read_pref())
.is_some_and(|rp| rp != &ReadPreference::Primary)
&& session.in_transaction()
{
return Err(ErrorKind::Transaction {
message: "read preference in a transaction must be primary".into(),
// TODO RUST-9: remove this validation
if !op.is_acknowledged() {
return Err(ErrorKind::InvalidArgument {
message: "Unacknowledged write concerns are not supported".to_string(),
}
.into());
}
// If the current transaction has been committed/aborted and it is not being
// re-committed/re-aborted, reset the transaction's state to None.
if matches!(
session.transaction.state,
TransactionState::Committed { .. }
) && op.name() != CommitTransaction::NAME
|| session.transaction.state == TransactionState::Aborted
&& op.name() != AbortTransaction::NAME
{
session.transaction.reset();
if let Some(write_concern) = op.write_concern() {
write_concern.validate()?;
}
}

Box::pin(async { self.execute_operation_with_retry(op, session).await }).await
// Validate the session and update its transaction status if needed.
if let Some(ref mut session) = session {
if !TrackingArc::ptr_eq(&self.inner, &session.client().inner) {
return Err(Error::invalid_argument(
"the session provided to an operation must be created from the same \
client as the collection/database on which the operation is being \
performed",
));
}
if op
.selection_criteria()
.and_then(|sc| sc.as_read_pref())
.is_some_and(|rp| rp != &ReadPreference::Primary)
&& session.in_transaction()
{
return Err(ErrorKind::Transaction {
message: "read preference in a transaction must be primary".into(),
}
.into());
}
// If the current transaction has been committed/aborted and it is not being
// re-committed/re-aborted, reset the transaction's state to None.
if matches!(
session.transaction.state,
TransactionState::Committed { .. }
) && op.name() != CommitTransaction::NAME
|| session.transaction.state == TransactionState::Aborted
&& op.name() != AbortTransaction::NAME
{
session.transaction.reset();
}
}

Box::pin(async {
self.execute_operation_with_retry(op, session)
.with_current_context()
.await
})
.with_current_context()
.await
})()
.with_context(ctx.clone())
.await;
#[cfg(feature = "opentelemetry")]
self.record_error(&ctx, &result);

result
}

/// Execute the given operation, returning the cursor created by the operation.
Expand Down Expand Up @@ -408,6 +429,7 @@ impl Client {
retryability,
effective_criteria,
)
.with_current_context()
.await
{
Ok(output) => ExecutionDetails {
Expand Down Expand Up @@ -496,9 +518,21 @@ impl Client {
let should_redact = cmd.should_redact();
let cmd_name = cmd.name.clone();
let target_db = cmd.target_db.clone();
#[cfg(feature = "opentelemetry")]
let cmd_attrs = crate::otel::CommandAttributes::new(&cmd);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otel::CommandAttributes exists because the opentelemetry trace needs values from both the Command struct and the Message struct, and constructing the latter consumes the former so they can't just both be passed in directly.


let mut message = Message::try_from(cmd)?;
message.request_id = Some(request_id);

#[cfg(feature = "opentelemetry")]
let ctx = self.start_command_span(
op,
&connection_info,
connection.stream_description()?,
&message,
cmd_attrs,
);

#[cfg(feature = "in-use-encryption")]
{
let guard = self.inner.csfle.read().await;
Expand Down Expand Up @@ -629,6 +663,8 @@ impl Client {
}
}
};
#[cfg(feature = "opentelemetry")]
self.record_command_result::<T>(&ctx, &result);

if result
.as_ref()
Expand Down
Loading