Skip to content

Commit

Permalink
feat: add moose code for capturing runtime telemetry (#1650)
Browse files Browse the repository at this point in the history
  • Loading branch information
callicles authored Aug 2, 2024
1 parent bd45899 commit c99e320
Show file tree
Hide file tree
Showing 15 changed files with 680 additions and 399 deletions.
41 changes: 29 additions & 12 deletions apps/framework-cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use crate::framework::languages::SupportedLanguages;
use crate::framework::sdk::ingest::generate_sdk;
use crate::framework::versions::parse_version;
use crate::infrastructure::olap::clickhouse::version_sync::version_to_string;
use crate::metrics::TelemetryMetadata;
use crate::project::Project;
use crate::utilities::capture::{wait_for_usage_capture, ActivityType};
use crate::utilities::constants::{CLI_VERSION, PROJECT_NAME_ALLOW_PATTERN};
Expand Down Expand Up @@ -149,7 +150,6 @@ fn maybe_create_git_repo(dir_path: &Path, project_arc: Arc<Project>) {
async fn top_command_handler(
settings: Settings,
commands: &Commands,
metrics: Arc<Metrics>,
) -> Result<RoutineSuccess, RoutineFailure> {
match commands {
Commands::Init {
Expand Down Expand Up @@ -312,10 +312,22 @@ async fn top_command_handler(
&settings,
);

let (metrics, rx) = Metrics::new(TelemetryMetadata {
anonymous_telemetry_enabled: settings.telemetry.enabled
&& settings.features.telemetry_metrics,
machine_id: settings.telemetry.machine_id.clone(),
is_moose_developer: settings.telemetry.is_moose_developer,
is_production: project_arc.is_production,
project_name: project_arc.name().to_string(),
});

let arc_metrics = Arc::new(metrics);
arc_metrics.start_listening_to_metrics(rx).await;

check_project_name(&project_arc.name())?;
run_local_infrastructure(&project_arc)?.show();

routines::start_development_mode(project_arc, &settings.features, metrics)
routines::start_development_mode(project_arc, &settings.features, arc_metrics)
.await
.map_err(|e| {
RoutineFailure::error(Message {
Expand Down Expand Up @@ -452,6 +464,18 @@ async fn top_command_handler(
project.set_is_production_env(true);
let project_arc = Arc::new(project);

let (metrics, rx) = Metrics::new(TelemetryMetadata {
anonymous_telemetry_enabled: settings.telemetry.enabled
&& settings.features.telemetry_metrics,
machine_id: settings.telemetry.machine_id.clone(),
is_moose_developer: settings.telemetry.is_moose_developer,
is_production: project_arc.is_production,
project_name: project_arc.name().to_string(),
});

let arc_metrics = Arc::new(metrics);
arc_metrics.start_listening_to_metrics(rx).await;

let capture_handle = crate::utilities::capture::capture_usage(
ActivityType::ProdCommand,
Some(project_arc.name()),
Expand All @@ -460,7 +484,7 @@ async fn top_command_handler(

check_project_name(&project_arc.name())?;

routines::start_production_mode(project_arc, settings.features, metrics)
routines::start_production_mode(project_arc, settings.features, arc_metrics)
.await
.unwrap();

Expand Down Expand Up @@ -862,11 +886,7 @@ pub async fn cli_run() {

let cli = Cli::parse();

let (metrics, rx) = Metrics::new();
let arc_metrics = Arc::new(metrics);
arc_metrics.start_listening_to_metrics(rx).await;

match top_command_handler(config, &cli.command, arc_metrics).await {
match top_command_handler(config, &cli.command).await {
Ok(s) => {
show_message!(s.message_type, s.message);
exit(0);
Expand Down Expand Up @@ -914,10 +934,7 @@ mod tests {

let config = read_settings().unwrap();

let (metrics, _rx) = Metrics::new();
let arc_metrics = Arc::new(metrics);

top_command_handler(config, &cli.command, arc_metrics).await
top_command_handler(config, &cli.command).await
}

#[tokio::test]
Expand Down
148 changes: 67 additions & 81 deletions apps/framework-cli/src/cli/local_webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ enum Direction {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FlowMessages {
count: u64,
path: String,
function_name: String,
bytes: u64,
direction: Direction,
}
Expand Down Expand Up @@ -175,10 +175,11 @@ async fn create_client(
let res = sender.send_request(req).await?;
let body = res.collect().await.unwrap().to_bytes().to_vec();
metrics
.send_metric(MetricsMessage::PutNumberOfBytesOut(
.send_metric(MetricsMessage::PutConsumedBytesCount {
route,
body.len() as u64,
))
method: "GET".to_string(),
bytes_count: body.len() as u64,
})
.await;

Ok(Response::builder()
Expand Down Expand Up @@ -269,38 +270,30 @@ async fn metrics_log_route(req: Request<Incoming>, metrics: Arc<Metrics>) -> Res
let body = to_reader(req).await;
let parsed: Result<FlowMessages, serde_json::Error> = serde_json::from_reader(body);
match parsed {
Ok(cli_message) => {
let message = FlowMessages {
count: cli_message.count,
bytes: cli_message.bytes,
path: cli_message.path,
direction: cli_message.direction,
};
match message.direction {
Direction::In => {
metrics
.send_metric(MetricsMessage::PutStreamingFunctionMessagesIn(
message.path.clone(),
message.count,
))
.await;
metrics
.send_metric(MetricsMessage::PutStreamingFunctionBytes(
message.path,
message.bytes,
))
.await;
}
Direction::Out => {
metrics
.send_metric(MetricsMessage::PutStreamingFunctionMessagesOut(
message.path,
message.count,
))
.await
}
Ok(cli_message) => match cli_message.direction {
Direction::In => {
metrics
.send_metric(MetricsMessage::PutStreamingFunctionMessagesIn {
function_name: cli_message.function_name.clone(),
count: cli_message.count,
})
.await;
metrics
.send_metric(MetricsMessage::PutStreamingFunctionBytes {
function_name: cli_message.function_name.clone(),
bytes_count: cli_message.bytes,
})
.await;
}
}
Direction::Out => {
metrics
.send_metric(MetricsMessage::PutStreamingFunctionMessagesOut {
function_name: cli_message.function_name.clone(),
count: cli_message.count,
})
.await
}
},
Err(e) => println!("Received unknown message: {:?}", e),
}

Expand Down Expand Up @@ -370,10 +363,12 @@ async fn send_payload_to_topic(
debug!("Sending payload {:?} to topic: {}", payload, topic_name);

metrics
.send_metric(MetricsMessage::PutNumberOfMessagesIn(
route.to_str().unwrap().to_string(),
topic_name.to_string(),
))
.send_metric(MetricsMessage::PutHTTPToTopicEventCount {
route,
topic_name: topic_name.to_string(),
method: "POST".to_string(),
count: 1,
})
.await;

configured_producer
Expand Down Expand Up @@ -406,10 +401,11 @@ async fn handle_json_req(
let parsed: Result<Value, serde_json::Error> = serde_json::from_reader(body);

metrics
.send_metric(MetricsMessage::PutNumberOfBytesIn(
route.clone(),
number_of_bytes,
))
.send_metric(MetricsMessage::PutIngestedBytesCount {
route: route.clone(),
method: "POST".to_string(),
bytes_count: number_of_bytes,
})
.await;
// TODO add check that the payload has the proper schema

Expand Down Expand Up @@ -439,22 +435,11 @@ async fn handle_json_req(
async fn wait_for_batch_complete(
res_arr: &mut Vec<Result<OwnedDeliveryResult, KafkaError>>,
temp_res: Vec<Result<DeliveryFuture, KafkaError>>,
topic_name: &str,
metrics: Arc<Metrics>,
route: PathBuf,
) {
for future_res in temp_res {
match future_res {
Ok(future) => match future.await {
Ok(res) => {
metrics
.send_metric(MetricsMessage::PutNumberOfMessagesIn(
route.to_str().unwrap().to_string(),
topic_name.to_string(),
))
.await;
res_arr.push(Ok(res))
}
Ok(res) => res_arr.push(Ok(res)),
Err(_) => res_arr.push(Err(KafkaError::Canceled)),
},
Err(e) => res_arr.push(Err(e)),
Expand Down Expand Up @@ -483,11 +468,13 @@ async fn handle_json_array_body(

debug!("parsed json array for {}", topic_name);
metrics
.send_metric(MetricsMessage::PutNumberOfBytesIn(
route.clone(),
number_of_bytes,
))
.send_metric(MetricsMessage::PutIngestedBytesCount {
route: route.clone(),
method: "POST".to_string(),
bytes_count: number_of_bytes,
})
.await;

if let Err(e) = parsed {
return bad_json_response(e);
}
Expand All @@ -502,6 +489,7 @@ async fn handle_json_array_body(
let record = FutureRecord::to(topic_name)
.key(topic_name) // This should probably be generated by the client that pushes data to the API
.payload(payload.as_slice());

temp_res.push(
configured_producer
.producer
Expand All @@ -511,18 +499,21 @@ async fn handle_json_array_body(
// ideally we want to use redpanda::send_with_back_pressure
// but it does not report the error back
if count % 1024 == 1023 {
wait_for_batch_complete(
&mut res_arr,
temp_res,
topic_name,
metrics.clone(),
route.clone(),
)
.await;
wait_for_batch_complete(&mut res_arr, temp_res).await;

temp_res = Vec::new();
}
}
wait_for_batch_complete(&mut res_arr, temp_res, topic_name, metrics, route).await;
wait_for_batch_complete(&mut res_arr, temp_res).await;

metrics
.send_metric(MetricsMessage::PutHTTPToTopicEventCount {
route: route.clone(),
method: "POST".to_string(),
count: res_arr.iter().filter(|res| res.is_ok()).count() as u64,
topic_name: topic_name.to_string(),
})
.await;

if res_arr.iter().any(|res| res.is_err()) {
return internal_server_error_response();
Expand Down Expand Up @@ -696,20 +687,15 @@ async fn router(
.body(Full::new(Bytes::from("no match"))),
};

if metrics_path.to_str().unwrap().starts_with("ingest/")
|| metrics_path
.clone()
.into_os_string()
.to_str()
.unwrap()
.starts_with("consumption/")
{
let metrics_path_str = metrics_path.to_str().unwrap();

if metrics_path_str.starts_with("ingest/") || metrics_path_str.starts_with("consumption/") {
metrics
.send_metric(MetricsMessage::HTTPLatency((
metrics_path,
now.elapsed(),
metrics_method,
)))
.send_metric(MetricsMessage::HTTPLatency {
path: metrics_path,
duration: now.elapsed(),
method: metrics_method,
})
.await;
}

Expand Down
Loading

0 comments on commit c99e320

Please sign in to comment.