Skip to content

add more logging in app-server startup #443

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ docker compose up -d
This will spin up a lightweight version of the stack with Postgres, clickhouse, app-server, and frontend. This is good for a quickstart
or for lightweight usage. You can access the UI at http://localhost:5667 in your browser.

For production environment, we recommend using our [managed platform](https://www.lmnr.ai/projects) or `docker compose -f docker-compose-full.yml up -d`.
You will also need to properly configure the SDK, with `baseUrl` and correct ports. See https://docs.lmnr.ai/self-hosting/setup

For production environment, we recommend using our [managed platform](https://www.lmnr.ai/projects) or `docker compose -f docker-compose-full.yml up -d`.

`docker-compose-full.yml` is heavy but it will enable all the features.

Expand Down
62 changes: 49 additions & 13 deletions app-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,28 @@ fn main() -> anyhow::Result<()> {

let mut handles: Vec<JoinHandle<Result<(), Error>>> = vec![];

std::env::set_var("RUST_LOG", "info");
env_logger::init();
if env::var("RUST_LOG").is_ok_and(|s| !s.is_empty()) {
env_logger::init();
} else {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.init();
}

let http_payload_limit: usize = env::var("HTTP_PAYLOAD_LIMIT")
.unwrap_or(String::from("5242880")) // default to 5MB
.parse()
.unwrap();

log::info!("HTTP payload limit: {}", http_payload_limit);

let grpc_payload_limit: usize = env::var("GRPC_PAYLOAD_LIMIT")
.unwrap_or(String::from("26214400")) // default to 25MB
.parse()
.unwrap();

log::info!("GRPC payload limit: {}", grpc_payload_limit);

let port = env::var("PORT")
.unwrap_or(String::from("8000"))
.parse()
Expand All @@ -129,11 +138,13 @@ fn main() -> anyhow::Result<()> {
// == Stuff that is needed both for HTTP and gRPC servers ==
// === 1. Cache ===
let cache = if let Ok(redis_url) = env::var("REDIS_URL") {
log::info!("Using Redis cache");
runtime_handle.block_on(async {
let redis_cache = RedisCache::new(&redis_url).await.unwrap();
Cache::Redis(redis_cache)
})
} else {
log::info!("using in-memory cache");
Cache::InMemory(InMemoryCache::new(None))
};
let cache = Arc::new(cache);
Expand Down Expand Up @@ -203,10 +214,13 @@ fn main() -> anyhow::Result<()> {
.and_then(|v| v.parse().ok())
.unwrap_or(64);

log::info!("RabbitMQ span channels: {}", max_channel_pool_size);

let rabbit_mq = mq::rabbit::RabbitMQ::new(connection.clone(), max_channel_pool_size);
Arc::new(rabbit_mq.into())
})
} else {
log::info!("Using tokio mpsc span queue");
Arc::new(mq::tokio_mpsc::TokioMpscQueue::new().into())
};

Expand Down Expand Up @@ -241,10 +255,16 @@ fn main() -> anyhow::Result<()> {
.and_then(|v| v.parse().ok())
.unwrap_or(64);

log::info!(
"RabbitMQ browser events channels: {}",
max_channel_pool_size
);

let rabbit_mq = mq::rabbit::RabbitMQ::new(connection.clone(), max_channel_pool_size);
Arc::new(rabbit_mq.into())
})
} else {
log::info!("Using tokio mpsc browser events queue");
Arc::new(mq::tokio_mpsc::TokioMpscQueue::new().into())
};

Expand All @@ -271,6 +291,7 @@ fn main() -> anyhow::Result<()> {

// == Storage ==
let storage: Arc<Storage> = if is_feature_enabled(Feature::Storage) {
log::info!("using S3 storage");
let s3_client = aws_sdk_s3::Client::new(&aws_sdk_config);
let s3_storage = storage::s3::S3Storage::new(
s3_client,
Expand All @@ -279,6 +300,7 @@ fn main() -> anyhow::Result<()> {
);
Arc::new(s3_storage.into())
} else {
log::info!("using mock storage");
Arc::new(MockStorage {}.into())
};

Expand Down Expand Up @@ -318,13 +340,15 @@ fn main() -> anyhow::Result<()> {
if is_feature_enabled(Feature::MachineManager) {
let machine_manager_url_grpc = env::var("MACHINE_MANAGER_URL_GRPC")
.expect("MACHINE_MANAGER_URL_GRPC must be set");
log::info!("Machine manager URL: {}", machine_manager_url_grpc);
let machine_manager_client = Arc::new(
MachineManagerServiceClient::connect(machine_manager_url_grpc)
.await
.unwrap(),
);
Arc::new(MachineManagerImpl::new(machine_manager_client).into())
} else {
log::info!("Using mock machine manager");
Arc::new(machine_manager::MockMachineManager {}.into())
};

Expand All @@ -333,13 +357,15 @@ fn main() -> anyhow::Result<()> {
{
let agent_manager_url =
env::var("AGENT_MANAGER_URL").expect("AGENT_MANAGER_URL must be set");
log::info!("Agent manager URL: {}", agent_manager_url);
let agent_manager_client = Arc::new(
AgentManagerServiceClient::connect(agent_manager_url)
.await
.unwrap(),
);
Arc::new(AgentManagerImpl::new(agent_manager_client).into())
} else {
log::info!("Using mock agent manager");
Arc::new(agent_manager::mock::MockAgentManager {}.into())
};

Expand All @@ -353,6 +379,7 @@ fn main() -> anyhow::Result<()> {
// == Semantic search ==
let semantic_search: Arc<SemanticSearch> =
if let Ok(semantic_search_url) = env::var("SEMANTIC_SEARCH_URL") {
log::info!("Semantic search URL: {}", semantic_search_url);
let semantic_search_client = Arc::new(
SemanticSearchClient::connect(semantic_search_url)
.await
Expand All @@ -365,12 +392,14 @@ fn main() -> anyhow::Result<()> {
.into(),
)
} else {
log::info!("Using mock semantic search");
Arc::new(semantic_search::mock::MockSemanticSearch {}.into())
};

// == Python executor ==
let code_executor: Arc<CodeExecutor> =
if let Ok(code_executor_url) = env::var("CODE_EXECUTOR_URL") {
log::info!("Code executor URL: {}", code_executor_url);
let code_executor_client = Arc::new(
CodeExecutorClient::connect(code_executor_url)
.await
Expand All @@ -383,6 +412,7 @@ fn main() -> anyhow::Result<()> {
.into(),
)
} else {
log::info!("Using mock code executor");
Arc::new(code_executor::mock::MockCodeExecutor {}.into())
};

Expand Down Expand Up @@ -440,22 +470,28 @@ fn main() -> anyhow::Result<()> {
cache_for_http.clone(),
));

HttpServer::new(move || {
let auth = HttpAuthentication::bearer(auth::validator);
let project_auth = HttpAuthentication::bearer(auth::project_validator);
let shared_secret_auth =
HttpAuthentication::bearer(auth::shared_secret_validator);
let num_spans_workers_per_thread = env::var("NUM_SPANS_WORKERS_PER_THREAD")
.unwrap_or(String::from("4"))
.parse::<u8>()
.unwrap_or(4);

let num_spans_workers_per_thread = env::var("NUM_SPANS_WORKERS_PER_THREAD")
let num_browser_events_workers_per_thread =
env::var("NUM_BROWSER_EVENTS_WORKERS_PER_THREAD")
.unwrap_or(String::from("4"))
.parse::<u8>()
.unwrap_or(4);

let num_browser_events_workers_per_thread =
env::var("NUM_BROWSER_EVENTS_WORKERS_PER_THREAD")
.unwrap_or(String::from("4"))
.parse::<u8>()
.unwrap_or(4);
log::info!(
"Spans workers per thread: {}, Browser events workers per thread: {}",
num_spans_workers_per_thread,
num_browser_events_workers_per_thread
);

HttpServer::new(move || {
let auth = HttpAuthentication::bearer(auth::validator);
let project_auth = HttpAuthentication::bearer(auth::project_validator);
let shared_secret_auth =
HttpAuthentication::bearer(auth::shared_secret_validator);

for _ in 0..num_spans_workers_per_thread {
tokio::spawn(process_queue_spans(
Expand Down