Skip to content

Commit

Permalink
feat(rust): optimize node creation:
Browse files Browse the repository at this point in the history
 - create tokio `Runtime` in a predictable way and reuse it
 - rework `Command` usage
 - add `no_logging` flag to the node
 - optimize node startup
 - replace readiness poll with a callback
  • Loading branch information
SanjoDeundiak committed Jan 30, 2025
1 parent d59dbfc commit 7c2bea9
Show file tree
Hide file tree
Showing 204 changed files with 1,351 additions and 2,090 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 15 additions & 15 deletions implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use tokio::sync::broadcast::{channel, Receiver, Sender};

use ockam::SqlxDatabase;
use ockam_core::env::get_env_with_default;
use ockam_node::database::{DatabaseConfiguration, DatabaseType, OCKAM_SQLITE_IN_MEMORY};
use ockam_node::Executor;
use ockam_node::database::{DatabaseConfiguration, DatabaseType};

use crate::cli_state::error::Result;
use crate::cli_state::CliStateError;
Expand Down Expand Up @@ -51,11 +50,6 @@ pub struct CliState {
}

impl CliState {
/// Create a new CliState in a given directory
pub fn new(mode: CliStateMode) -> Result<Self> {
Executor::execute_future(Self::create(mode))?
}

pub fn dir(&self) -> Result<PathBuf> {
match &self.mode {
CliStateMode::Persistent(dir) => Ok(dir.to_path_buf()),
Expand Down Expand Up @@ -127,14 +121,14 @@ impl CliState {
impl CliState {
/// Return a new CliState using a default directory to store its data or
/// using an in-memory storage if the OCKAM_SQLITE_IN_MEMORY environment variable is set to true
pub fn from_env() -> Result<Self> {
let in_memory = get_env_with_default::<bool>(OCKAM_SQLITE_IN_MEMORY, false)?;
pub async fn new(in_memory: bool) -> Result<Self> {
let mode = if in_memory {
CliStateMode::InMemory
} else {
CliStateMode::with_default_dir()?
};
Self::new(mode)

Self::create(mode).await
}

/// Stop nodes and remove all the directories storing state
Expand Down Expand Up @@ -182,7 +176,7 @@ impl CliState {
/// Backup and reset is used to save aside
/// some corrupted local state for later inspection and then reset the state.
/// The database is backed-up only if it is a SQLite database.
pub fn backup_and_reset() -> Result<()> {
pub async fn backup_and_reset() -> Result<()> {
let dir = Self::default_dir()?;

// Reset backup directory
Expand All @@ -202,7 +196,7 @@ impl CliState {

// Reset state
Self::delete_at(&dir)?;
Self::new(CliStateMode::Persistent(dir.clone()))?;
Self::create(CliStateMode::Persistent(dir.clone())).await?;

let backup_dir = CliState::backup_default_dir()?;
eprintln!("The {dir:?} directory has been reset and has been backed up to {backup_dir:?}");
Expand Down Expand Up @@ -234,15 +228,20 @@ impl CliState {
std::fs::create_dir_all(dir.as_path())?;
}
let database = SqlxDatabase::create(&Self::make_database_configuration(&mode)?).await?;
let configuration = Self::make_application_database_configuration(&mode)?;
let application_database =
SqlxDatabase::create_application_database(&configuration).await?;
debug!("Opened the main database with options {:?}", database);

// TODO: This should not be called unless we're running the App
let application_database = SqlxDatabase::create_application_database(
&Self::make_application_database_configuration(&mode)?,
)
.await?;
debug!(
"Opened the application database with options {:?}",
application_database
);

let (notifications, _) = channel::<Notification>(NOTIFICATIONS_CHANNEL_CAPACITY);

let state = Self {
mode,
database,
Expand All @@ -254,6 +253,7 @@ impl CliState {
exporting_enabled: ExportingEnabled::Off,
notifications,
};

Ok(state)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub(crate) const OCKAM_LOG_MAX_FILES: &str = "OCKAM_LOG_MAX_FILES";
/// Log format. Accepted values, see LogFormat. For example: pretty, json, default
pub(crate) const OCKAM_LOG_FORMAT: &str = "OCKAM_LOG_FORMAT";

/// Filter for log messages based on crate names. Accepted values: 'all', 'default', 'comma-separated strings'. For example: ockam_core,ockam_api
/// Filter for log messages based on crate names. Accepted values: 'all' or 'comma-separated strings'. For example: ockam_core,ockam_api
pub(crate) const OCKAM_LOG_CRATES_FILTER: &str = "OCKAM_LOG_CRATES_FILTER";

///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::logs::ExportingEnabled;
use crate::CliState;
use ockam_core::env::{get_env_with_default, FromString};
use ockam_core::errcode::{Kind, Origin};
use ockam_node::Executor;
use std::env::current_exe;
use std::fmt::{Display, Formatter};
use std::net::{SocketAddr, ToSocketAddrs};
Expand Down Expand Up @@ -116,14 +115,15 @@ impl ExportingConfiguration {

/// Create a tracing configuration for a user command running in the foreground.
/// (meaning that the process will shut down once the command has been executed)
pub fn foreground(state: &CliState) -> ockam_core::Result<ExportingConfiguration> {
match opentelemetry_endpoint(state)? {
pub async fn foreground(state: &CliState) -> ockam_core::Result<ExportingConfiguration> {
match opentelemetry_endpoint(state).await? {
None => ExportingConfiguration::off(),
Some(endpoint) => Ok(ExportingConfiguration {
enabled: exporting_enabled(
&endpoint,
opentelemetry_endpoint_foreground_connection_timeout()?,
)?,
)
.await?,
span_export_timeout: span_export_timeout()?,
log_export_timeout: log_export_timeout()?,
span_export_scheduled_delay: foreground_span_export_scheduled_delay()?,
Expand All @@ -139,14 +139,15 @@ impl ExportingConfiguration {
}

/// Create a tracing configuration for a background node
pub fn background(state: &CliState) -> ockam_core::Result<ExportingConfiguration> {
match opentelemetry_endpoint(state)? {
pub async fn background(state: &CliState) -> ockam_core::Result<ExportingConfiguration> {
match opentelemetry_endpoint(state).await? {
None => ExportingConfiguration::off(),
Some(endpoint) => Ok(ExportingConfiguration {
enabled: exporting_enabled(
&endpoint,
opentelemetry_endpoint_background_connection_timeout()?,
)?,
)
.await?,
span_export_timeout: span_export_timeout()?,
log_export_timeout: log_export_timeout()?,
span_export_scheduled_delay: background_span_export_scheduled_delay()?,
Expand Down Expand Up @@ -254,11 +255,11 @@ fn print_debug(message: impl Into<String>) {
/// - Exporting has not been deactivated by the user
/// - The opentelemetry endpoint is accessible
///
fn exporting_enabled(
async fn exporting_enabled(
endpoint: &OpenTelemetryEndpoint,
connection_check_timeout: Duration,
) -> ockam_core::Result<ExportingEnabled> {
if is_endpoint_accessible(&endpoint.url(), connection_check_timeout) {
if is_endpoint_accessible(&endpoint.url(), connection_check_timeout).await {
print_debug("Exporting is enabled");
Ok(ExportingEnabled::On)
} else {
Expand All @@ -275,23 +276,37 @@ fn exporting_enabled(
}

/// Return true if the endpoint can be accessed with a TCP connection
fn is_endpoint_accessible(url: &Url, connection_check_timeout: Duration) -> bool {
async fn is_endpoint_accessible(url: &Url, connection_check_timeout: Duration) -> bool {
match to_socket_addr(url) {
Some(address) => {
let retries = FibonacciBackoff::from_millis(100);
let now = Instant::now();

// TODO: Not sure we need to retry really, also maybe it could happen in the background
// to not slow things down
for timeout_duration in retries {
print_debug(format!(
"trying to connect to {address} in {timeout_duration:?}"
));
if std::net::TcpStream::connect_timeout(&address, timeout_duration).is_ok() {
return true;
} else {
if now.elapsed() >= connection_check_timeout {
return false;
};
std::thread::sleep(timeout_duration);

let res = tokio::time::timeout(
timeout_duration,
tokio::net::TcpStream::connect(&address),
)
.await;

match res {
Ok(res) => {
if res.is_ok() {
return true;
}
}
Err(_) => {
if now.elapsed() >= connection_check_timeout {
return false;
};
tokio::time::sleep(timeout_duration).await;
}
}
}
false
Expand Down Expand Up @@ -324,36 +339,27 @@ fn to_socket_addr(url: &Url) -> Option<SocketAddr> {
/// Return the tracing endpoint, defined by an environment variable
/// If the endpoint can be established with an Ockam portal to the opentelemetry-relay created in the project
/// use that URL, otherwise use the HTTPS endpoint
fn opentelemetry_endpoint(state: &CliState) -> ockam_core::Result<Option<OpenTelemetryEndpoint>> {
if !is_exporting_set()? {
print_debug("Exporting is turned off");
Ok(None)
} else {
let state = state.clone();
match Executor::execute_future(async move {
// if a project is defined try to use the OpenTelemetry portal
// and if we allow traces to be exported via a portal
if state.projects().get_default_project().await.is_ok()
&& is_exporting_via_portal_set()?
{
print_debug("A default project exists. Getting the project export endpoint");
get_project_endpoint_url(&state).await
} else {
print_debug("A default project does not exist. Getting the default HTTPs endpoint");
get_https_endpoint()
}
}) {
Ok(Ok(url)) => Ok(Some(url)),
Ok(Err(e)) => {
print_debug(format!(
"There was an issue when setting up the exporting of traces: {e:?}"
));
Ok(None)
}
Err(e) => {
print_debug(format!("There was an issue when running the code setting up the exporting of traces: {e:?}"));
Ok(None)
}
async fn opentelemetry_endpoint(
state: &CliState,
) -> ockam_core::Result<Option<OpenTelemetryEndpoint>> {
let res = {
// if a project is defined try to use the OpenTelemetry portal
// and if we allow traces to be exported via a portal
if state.projects().get_default_project().await.is_ok() && is_exporting_via_portal_set()? {
print_debug("A default project exists. Getting the project export endpoint");
get_project_endpoint_url(state).await
} else {
print_debug("A default project does not exist. Getting the default HTTPs endpoint");
get_https_endpoint()
}
};
match res {
Ok(url) => Ok(Some(url)),
Err(e) => {
print_debug(format!(
"There was an issue when setting up the exporting of traces: {e:?}"
));
Ok(None)
}
}
}
Expand Down
Loading

0 comments on commit 7c2bea9

Please sign in to comment.