Skip to content

Commit

Permalink
fix(datadog): retry mechanism (#2)
Browse files Browse the repository at this point in the history
Fixed retry mechanism. Fixed tests.
  • Loading branch information
robertohuertasm authored Sep 17, 2024
1 parent a6e622b commit eb24c09
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 24 deletions.
8 changes: 5 additions & 3 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ jobs:
- name: Setup Rust
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust }}
override: true
components: rustfmt, clippy
toolchain: ${{ matrix.rust }}
override: true
components: rustfmt, clippy
- uses: davidB/rust-cargo-make@v1
- name: Checkout
uses: actions/checkout@v2
- name: Run cargo fmt
run: cargo make format
- name: Run cargo clippy
run: cargo make clippy
- name: Run cargo test
run: cargo test
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/target
/Cargo.lock
.env
9 changes: 5 additions & 4 deletions dd-tracing-layer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "dd-tracing-layer"
version = "0.3.0"
version = "0.4.0"
authors = ["Roberto Huertas <[email protected]>"]
description = "Send your logs to Datadog"
edition = "2021"
Expand All @@ -15,10 +15,10 @@ maintenance = { status = "actively-developed" }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
log-tracing-layer = { path = "../log-tracing-layer", version = "0.3.0" }
log-tracing-layer = { path = "../log-tracing-layer", version = "0.4.0" }
tracing-subscriber = "0.3"
tracing = "0.1"
reqwest = { version = "0.11", features = ["gzip"] }
reqwest = { version = "0.12", features = ["gzip"] }
tokio = { version = "1", features = ["sync", "rt-multi-thread", "time"] }
serde_json = "1"
chrono = "0.4"
Expand All @@ -27,5 +27,6 @@ async-recursion = "1.0"
libflate = "2.0"

[dev-dependencies]
dotenvy = "0.15.7"
tracing-subscriber = { version = "0.3", features = ["json", "registry"] }
httpmock = "0.6.8"
httpmock = "0.7.0"
19 changes: 13 additions & 6 deletions dd-tracing-layer/src/datadog_ingestor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use async_recursion::async_recursion;
use async_trait::async_trait;
use chrono::{Duration, Utc};
use chrono::Utc;
use log_tracing_layer::{Log, LogEvent, LogIngestor};
use serde_json::json;
use std::{collections::VecDeque, error::Error, io::Write, sync::Arc};
use std::{collections::VecDeque, error::Error, io::Write, sync::Arc, time::Duration};
use tokio::sync::RwLock;

const DD_SOURCE: &str = "dd-tracing-layer";
Expand Down Expand Up @@ -109,9 +109,15 @@ impl DatadogLogIngestor {
async fn send_logs(&self, logs: &[Log], retries: u8) {
if retries > MAX_RETRIES {
eprintln!("Failed to send logs after {} retries", retries);
return;
}

let retry = || self.send_logs(logs, retries + 1);
let retry = || async {
let next = retries + 1;
let next_time = 100 * next as u64;
tokio::time::sleep(Duration::from_millis(next_time)).await;
self.send_logs(logs, next).await;
};

// compress the logs
let compressed_logs = match self.compress(logs) {
Expand All @@ -136,7 +142,7 @@ impl DatadogLogIngestor {
{
Ok(res) => match res.status().as_u16() {
202 => {
//println!("Accepted: the request has been accepted for processing");
// println!("Accepted: the request has been accepted for processing");
}
400 => {
eprintln!("Bad request (likely an issue in the payload formatting)");
Expand Down Expand Up @@ -197,7 +203,8 @@ impl DatadogLogIngestor {
let last_log = queue.back().unwrap();
let now = Utc::now();
let diff = now - last_log.received_at;
if diff < Duration::seconds(MAX_BATCH_DURATION_SECS) && queue.len() < MAX_BATCH_SIZE
if diff < chrono::Duration::seconds(MAX_BATCH_DURATION_SECS)
&& queue.len() < MAX_BATCH_SIZE
{
return;
}
Expand Down Expand Up @@ -246,7 +253,7 @@ impl LogIngestor for DatadogLogIngestor {
// start a timer that will flush the queue every n seconds
let mut this = self.clone();
tokio::spawn(async move {
let period = std::time::Duration::from_secs(MAX_BATCH_DURATION_SECS as u64);
let period = Duration::from_secs(MAX_BATCH_DURATION_SECS as u64);
let mut interval = tokio::time::interval(period);
loop {
interval.tick().await;
Expand Down
63 changes: 53 additions & 10 deletions dd-tracing-layer/tests/datadog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,74 @@ mod tests {
tracing::info!(ip = "127.0.0.1", message = msg2);
}

#[test]
fn datadog_works() {
let server = httpmock::MockServer::start();
let _mock = server.mock(|when, then| {
when.any_request();
fn get_api_key() -> String {
std::env::var("DD_API_KEY").unwrap_or("invented_api_key".to_string())
}

fn setup(server: &httpmock::MockServer) -> (httpmock::Mock, dd_tracing_layer::LogLayer) {
dotenvy::from_filename(".env").ok();
let api_key = get_api_key();

let url = server.base_url().clone();
let mock = server.mock(|when, then| {
when.any_request().header_exists("DD-API-KEY");
then.status(202).json_body(serde_json::json!([]));
});
let options = DatadogOptions::new("dd-tracing-layer", "21695c1b35156511441c0d3ace5943f4")
.with_url(server.base_url())

let options = DatadogOptions::new("dd-tracing-layer", api_key)
.with_url(url)
.with_tags("env:dev");

let dd = dd_tracing_layer::create(options);

(mock, dd)
}

#[test]
fn datadog_works() {
// server where logs will be sent
let server = httpmock::MockServer::start();
let (mock_server, dd) = setup(&server);

// create the subscriber
let subscriber = tracing_subscriber::registry().with(dd);
let _s = subscriber::set_default(subscriber);

// test the logs
log("this is a test message");
// assert the server was hit, and we wait for the logs to be sent
std::thread::sleep(std::time::Duration::from_secs(8));
assert_eq!(mock_server.hits(), 1);
}

/// This test is just to test manually test the feature
/// Comment the ignore attribute to run the test and alter the code
/// as you see fit.
#[test]
#[ignore]
fn manual_tests() {
// server where logs will be sent
let server = httpmock::MockServer::start();
let (_, dd) = setup(&server);

// create the subscriber
let subscriber = tracing_subscriber::registry()
// this shows the traces to the terminal...
.with(tracing_subscriber::fmt::Layer::new().json())
.with(dd);
let _s = subscriber::set_default(subscriber);

// test the logs and check the terminal
log("a");
// proof that logs are not blocking
std::thread::sleep(std::time::Duration::from_secs(2));
// plain log
tracing::info!(
ip = "127.0.0.1",
person = r#"{ "name": "rob", "age": 15 }"#,
message = "Testing Json"
);
std::thread::sleep(std::time::Duration::from_secs(2));
// yet another log
log("3a");
std::thread::sleep(std::time::Duration::from_secs(6));
log("4a");
}
}
2 changes: 1 addition & 1 deletion log-tracing-layer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "log-tracing-layer"
version = "0.3.0"
version = "0.4.0"
authors = ["Roberto Huertas <[email protected]>"]
description = "Build your own custom tracing layer."
edition = "2021"
Expand Down

0 comments on commit eb24c09

Please sign in to comment.