Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: build telemetry client in background #7627

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions crates/turborepo-api-client/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use reqwest::Method;
use tokio::sync::mpsc;
use turborepo_vercel_api::telemetry::TelemetryEvent;

use crate::{retry, AnonAPIClient, Error};
Expand Down Expand Up @@ -41,3 +42,15 @@
Ok(())
}
}

pub struct BackgroundTelemetryClient {
sender: mpsc::Sender<Request>,

Check warning on line 47 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / JS Native Package Tests (ubuntu, self-hosted, linux, x64, metal)

field `sender` is never read

Check warning on line 47 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / JS Native Package Tests (ubuntu, self-hosted, linux, x64, metal)

field `sender` is never read

Check warning on line 47 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / JS Native Package Tests (macos, macos-latest)

field `sender` is never read

Check warning on line 47 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / JS Native Package Tests (macos, macos-latest)

field `sender` is never read

Check warning on line 47 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (ubuntu, self-hosted, linux, x64, metal)

field `sender` is never read

Check warning on line 47 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (macos, macos-latest)

field `sender` is never read

Check warning on line 47 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (windows, windows-latest)

field `sender` is never read

Check warning on line 47 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (ubuntu-latest)

field `sender` is never read

Check warning on line 47 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (macos-latest)

field `sender` is never read

Check warning on line 47 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (windows-latest)

field `sender` is never read
}

enum Request {
Record {

Check warning on line 51 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / JS Native Package Tests (ubuntu, self-hosted, linux, x64, metal)

variant `Record` is never constructed

Check warning on line 51 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / JS Native Package Tests (ubuntu, self-hosted, linux, x64, metal)

variant `Record` is never constructed

Check warning on line 51 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / JS Native Package Tests (macos, macos-latest)

variant `Record` is never constructed

Check warning on line 51 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / JS Native Package Tests (macos, macos-latest)

variant `Record` is never constructed

Check warning on line 51 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (ubuntu, self-hosted, linux, x64, metal)

variant `Record` is never constructed

Check warning on line 51 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (macos, macos-latest)

variant `Record` is never constructed

Check warning on line 51 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (windows, windows-latest)

variant `Record` is never constructed

Check warning on line 51 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (ubuntu-latest)

variant `Record` is never constructed

Check warning on line 51 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (macos-latest)

variant `Record` is never constructed

Check warning on line 51 in crates/turborepo-api-client/src/telemetry.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (windows-latest)

variant `Record` is never constructed
events: Vec<TelemetryEvent>,
telemetry_id: String,
session_id: String,
},
}
20 changes: 9 additions & 11 deletions crates/turborepo-lib/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -937,18 +937,16 @@ pub async fn run(
let mut telemetry_handle: Option<TelemetryHandle> = None;

// initialize telemetry
match AnonAPIClient::new("https://telemetry.vercel.com", 250, version) {
Ok(anonymous_api_client) => {
let handle = init_telemetry(anonymous_api_client, ui);
match handle {
Ok(h) => telemetry_handle = Some(h),
Err(error) => {
debug!("failed to start telemetry: {:?}", error)
}
}
}
let client_version = version.to_string();
let client_builder = move || {
AnonAPIClient::new("https://telemetry.vercel.com", 250, &client_version)
.inspect_err(|e| debug!("failed setting up client: {e}"))
.ok()
};
match init_telemetry(client_builder, ui) {
Ok(h) => telemetry_handle = Some(h),
Err(error) => {
debug!("Failed to create AnonAPIClient: {:?}", error);
debug!("failed to start telemetry: {:?}", error)
}
}

Expand Down
68 changes: 46 additions & 22 deletions crates/turborepo-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,14 @@ pub fn telem(event: events::TelemetryEvent) {
}
}

fn init(
fn init<C>(
mut config: TelemetryConfig,
client: impl telemetry::TelemetryClient + Clone + Send + Sync + 'static,
client: impl Send + FnOnce() -> Option<C> + 'static,
ui: UI,
) -> Result<(TelemetryHandle, TelemetrySender), Box<dyn std::error::Error>> {
) -> Result<(TelemetryHandle, TelemetrySender), Box<dyn std::error::Error>>
where
C: telemetry::TelemetryClient + Clone + Send + Sync + 'static,
{
let (tx, rx) = mpsc::unbounded_channel();
let (cancel_tx, cancel_rx) = oneshot::channel();
config.show_alert(ui);
Expand All @@ -90,13 +93,12 @@ fn init(
buffer: Vec::new(),
senders: FuturesUnordered::new(),
exit_ch: cancel_tx,
client,
session_id: session_id.to_string(),
telemetry_id: config.get_id().to_string(),
enabled: config.is_enabled(),
ui,
};
let handle = worker.start();
let handle = worker.start(client);

let telemetry_handle = TelemetryHandle {
exit_ch: cancel_rx,
Expand All @@ -113,10 +115,13 @@ fn init(
/// We have two different types because the TelemetrySender should be shared
/// across threads (i.e. Clone + Send), while the TelemetryHandle cannot be
/// shared since it contains the structs necessary to shut down the worker.
pub fn init_telemetry(
client: impl telemetry::TelemetryClient + Clone + Send + Sync + 'static,
pub fn init_telemetry<C>(
client: impl Send + FnOnce() -> Option<C> + 'static,
ui: UI,
) -> Result<TelemetryHandle, Box<dyn std::error::Error>> {
) -> Result<TelemetryHandle, Box<dyn std::error::Error>>
where
C: telemetry::TelemetryClient + Clone + Send + Sync + 'static,
{
// make sure we're not already initialized
if SENDER_INSTANCE.get().is_some() {
debug!("telemetry already initialized");
Expand Down Expand Up @@ -147,22 +152,29 @@ impl TelemetryHandle {
}
}

struct Worker<C> {
struct Worker {
rx: mpsc::UnboundedReceiver<TelemetryEvent>,
buffer: Vec<TelemetryEvent>,
senders: FuturesUnordered<JoinHandle<()>>,
// Used to cancel the worker
exit_ch: oneshot::Sender<()>,
client: C,
telemetry_id: String,
session_id: String,
enabled: bool,
ui: UI,
}

impl<C: telemetry::TelemetryClient + Clone + Send + Sync + 'static> Worker<C> {
pub fn start(mut self) -> JoinHandle<()> {
impl Worker {
pub fn start<C>(mut self, client: impl FnOnce() -> Option<C> + Send + 'static) -> JoinHandle<()>
where
C: telemetry::TelemetryClient + Clone + Send + Sync + 'static,
{
tokio::spawn(async move {
// Constructing a HTTPS client is almost always a blocking operation
let Ok(Some(client)) = tokio::task::spawn_blocking(client).await else {
// If constructing telemetry client panics, shut down
return;
};
let mut timeout = tokio::time::sleep(NO_TIMEOUT);
loop {
select! {
Expand All @@ -176,22 +188,22 @@ impl<C: telemetry::TelemetryClient + Clone + Send + Sync + 'static> Worker<C> {
break;
}
if self.buffer.len() == BUFFER_THRESHOLD {
self.flush_events();
self.flush_events(&client);
timeout = tokio::time::sleep(NO_TIMEOUT);
} else {
timeout = tokio::time::sleep(EVENT_TIMEOUT);
}
}
_ = timeout => {
self.flush_events();
self.flush_events(&client);
timeout = tokio::time::sleep(NO_TIMEOUT);
}
_ = self.exit_ch.closed() => {
break;
}
}
}
self.flush_events();
self.flush_events(&client);
while let Some(result) = self.senders.next().await {
if let Err(err) = result {
debug!("failed to send telemetry event. error: {}", err)
Expand All @@ -200,22 +212,28 @@ impl<C: telemetry::TelemetryClient + Clone + Send + Sync + 'static> Worker<C> {
})
}

pub fn flush_events(&mut self) {
pub fn flush_events<C>(&mut self, client: &C)
where
C: telemetry::TelemetryClient + Clone + Send + Sync + 'static,
{
if !self.buffer.is_empty() {
let events = std::mem::take(&mut self.buffer);
debug!(
"Starting telemetry event queue flush (num_events={:?})",
events.len()
);
let handle = self.send_events(events);
let handle = self.send_events(client, events);
if let Some(handle) = handle {
self.senders.push(handle);
}
debug!("Done telemetry event queue flush");
}
}

fn send_events(&self, events: Vec<TelemetryEvent>) -> Option<JoinHandle<()>> {
fn send_events<C>(&self, client: &C, events: Vec<TelemetryEvent>) -> Option<JoinHandle<()>>
where
C: telemetry::TelemetryClient + Clone + Send + Sync + 'static,
{
if !self.enabled {
return None;
}
Expand All @@ -232,7 +250,7 @@ impl<C: telemetry::TelemetryClient + Clone + Send + Sync + 'static> Worker<C> {
}
}

let client = self.client.clone();
let client = client.clone();
let session_id = self.session_id.clone();
let telemetry_id = self.telemetry_id.clone();
Some(tokio::spawn(async move {
Expand Down Expand Up @@ -343,8 +361,10 @@ mod tests {
events: Default::default(),
tx,
};
let client_copy = client.clone();
let client_builder = move || Some(client_copy);

let result = init(config, client.clone(), UI::new(false));
let result = init(config, client_builder, UI::new(false));

let (telemetry_handle, telemetry_sender) = result.unwrap();

Expand Down Expand Up @@ -383,8 +403,10 @@ mod tests {
events: Default::default(),
tx,
};
let client_copy = client.clone();
let client_builder = move || Some(client_copy);

let result = init(config, client.clone(), UI::new(false));
let result = init(config, client_builder, UI::new(false));

let (telemetry_handle, telemetry_sender) = result.unwrap();

Expand Down Expand Up @@ -429,8 +451,10 @@ mod tests {
events: Default::default(),
tx,
};
let client_copy = client.clone();
let client_builder = move || Some(client_copy);

let result = init(config, client.clone(), UI::new(false));
let result = init(config, client_builder, UI::new(false));

let (telemetry_handle, telemetry_sender) = result.unwrap();

Expand Down
Loading