From 96fff1210cc255637e6608e01400feaa62de1da4 Mon Sep 17 00:00:00 2001 From: George Leung Date: Tue, 18 Feb 2025 14:09:34 -0800 Subject: [PATCH] allows disabling the API endpoint & topic of a data model (#2045) --- apps/framework-cli/src/cli.rs | 8 +- apps/framework-cli/src/cli/display.rs | 2 +- apps/framework-cli/src/cli/routines/dev.rs | 8 +- apps/framework-cli/src/cli/routines/ls.rs | 38 ++++--- .../src/framework/core/infrastructure_map.rs | 107 +++++++++++------- .../src/framework/data_model/config.rs | 4 +- .../orchestration_workers_registry.rs | 2 +- apps/framework-cli/src/project.rs | 23 ++++ apps/framework-cli/src/utilities.rs | 4 + .../src/utilities/docker-compose.yml.hbs | 2 + apps/framework-cli/src/utilities/docker.rs | 3 +- 11 files changed, 131 insertions(+), 70 deletions(-) diff --git a/apps/framework-cli/src/cli.rs b/apps/framework-cli/src/cli.rs index 1a48ebec1..59f80be74 100644 --- a/apps/framework-cli/src/cli.rs +++ b/apps/framework-cli/src/cli.rs @@ -994,15 +994,15 @@ async fn top_command_handler( peek(project_arc, data_model_name, *limit, file.clone(), *topic).await } Commands::Workflow(workflow_args) => { - if !settings.features.scripts { + let project = load_project()?; + + if !(settings.features.scripts || project.features.workflows) { return Err(RoutineFailure::error(Message { action: "Workflow".to_string(), - details: "Feature not enabled, to turn on go to ~/.moose/config.toml and set 'scripts' to true under the 'features' section".to_string(), + details: "Feature not enabled, to turn on go to moose.config.toml and set 'workflows' to true under the 'features' section".to_string(), })); } - let project = load_project()?; - match &workflow_args.command { Some(WorkflowCommands::Init { name, tasks, task }) => { init_workflow(&project, name, tasks.clone(), task.clone()).await diff --git a/apps/framework-cli/src/cli/display.rs b/apps/framework-cli/src/cli/display.rs index df7d80848..3d048a627 100644 --- a/apps/framework-cli/src/cli/display.rs +++ b/apps/framework-cli/src/cli/display.rs @@ -243,7 +243,7 @@ pub fn batch_inserted(count: usize, table_name: &str) { MessageType::Info, Message { action: "[DB]".to_string(), - details: format!("{count} rows successfully written to DB table ({table_name})"), + details: format!("{count} row(s) successfully written to DB table ({table_name})"), } ); } diff --git a/apps/framework-cli/src/cli/routines/dev.rs b/apps/framework-cli/src/cli/routines/dev.rs index 3c8c6f8e5..496aba7c0 100644 --- a/apps/framework-cli/src/cli/routines/dev.rs +++ b/apps/framework-cli/src/cli/routines/dev.rs @@ -30,9 +30,11 @@ pub fn run_local_infrastructure( run_containers(project, docker_client)?.show(); validate_clickhouse_run(project, docker_client)?.show(); - validate_redpanda_run(project, docker_client)?.show(); - validate_redpanda_cluster(project.name(), docker_client)?.show(); - if settings.features.scripts { + if project.features.streaming_engine { + validate_redpanda_run(project, docker_client)?.show(); + validate_redpanda_cluster(project.name(), docker_client)?.show(); + } + if settings.features.scripts || project.features.workflows { validate_temporal_run(project, docker_client)?.show(); } diff --git a/apps/framework-cli/src/cli/routines/ls.rs b/apps/framework-cli/src/cli/routines/ls.rs index 01add6baf..dbf6b940b 100644 --- a/apps/framework-cli/src/cli/routines/ls.rs +++ b/apps/framework-cli/src/cli/routines/ls.rs @@ -162,25 +162,37 @@ async fn add_tables_views( target_version: &str, output_table: &mut HashMap>, ) { - let system_tables = get_system_tables(project, target_version).await; + let mut system_tables = get_system_tables(project, target_version).await; - for (data_model, metadata) in output_table.iter_mut() { - if let Some(system_table) = system_tables.get(data_model) { - match system_table.engine.as_str() { - "MergeTree" => { - if let Some(v) = metadata.get_mut(1) { - v.clone_from(&system_table.name); - } + fn update_metadata(system_table: ClickHouseSystemTable, metadata: &mut [String]) { + match system_table.engine.as_str() { + "MergeTree" => { + if let Some(v) = metadata.get_mut(1) { + *v = system_table.name; } - "View" => { - if let Some(v) = metadata.get_mut(2) { - v.clone_from(&system_table.name); - } + } + "View" => { + if let Some(v) = metadata.get_mut(2) { + *v = system_table.name; } - _ => {} } + _ => {} } } + + for (data_model, metadata) in output_table.iter_mut() { + if let Some(system_table) = system_tables.remove(data_model) { + update_metadata(system_table, metadata); + } + } + + // handle system_tables that are not in output_table (i.e. tables with no ingestion endpoint) + for (data_model, system_table) in system_tables.into_iter() { + let mut metadata = vec!["".to_string(), "".to_string(), "".to_string()]; + update_metadata(system_table, &mut metadata); + + output_table.insert(data_model, metadata); + } } async fn get_topics(project: &Project) -> HashSet { diff --git a/apps/framework-cli/src/framework/core/infrastructure_map.rs b/apps/framework-cli/src/framework/core/infrastructure_map.rs index 562672057..315cb146e 100644 --- a/apps/framework-cli/src/framework/core/infrastructure_map.rs +++ b/apps/framework-cli/src/framework/core/infrastructure_map.rs @@ -8,13 +8,14 @@ use super::infrastructure::topic::Topic; use super::infrastructure::topic_sync_process::{TopicToTableSyncProcess, TopicToTopicSyncProcess}; use super::infrastructure::view::View; use super::primitive_map::PrimitiveMap; +use crate::cli::display::{show_message_wrapper, Message, MessageType}; use crate::framework::controller::{InitialDataLoad, InitialDataLoadStatus}; use crate::infrastructure::redis::redis_client::RedisClient; use crate::infrastructure::stream::redpanda::RedpandaConfig; use crate::project::Project; use crate::proto::infrastructure_map::InfrastructureMap as ProtoInfrastructureMap; use anyhow::Result; -use protobuf::{EnumOrUnknown, Message}; +use protobuf::{EnumOrUnknown, Message as ProtoMessage}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fs; @@ -325,8 +326,10 @@ impl InfrastructureMap { ); } - topics.insert(topic.id(), topic); - api_endpoints.insert(api_endpoint.id(), api_endpoint); + if project.features.streaming_engine { + topics.insert(topic.id(), topic); + api_endpoints.insert(api_endpoint.id(), api_endpoint); + } } else { // We wait to have processed all the datamodels to process the ones that don't have changes // That way we can refer to infrastructure that was created by those older versions. @@ -356,7 +359,9 @@ impl InfrastructureMap { views.insert(view.id(), view); } - api_endpoints.insert(api_endpoint.id(), api_endpoint); + if project.features.streaming_engine { + api_endpoints.insert(api_endpoint.id(), api_endpoint); + } } None => { log::error!( @@ -369,47 +374,61 @@ impl InfrastructureMap { } } - for function in primitive_map.functions.iter() { - // Currently we are not creating 1 per function source and target. - // We reuse the topics that were created from the data models. - // Unless for streaming function migrations where we will have to create new topics. - - if function.is_migration() { - let (source_topic, target_topic) = Topic::from_migration_function(function); - - let function_process = FunctionProcess::from_migration_function( - function, - &source_topic, - &target_topic.clone().unwrap(), - ); - - let sync_process = TopicToTableSyncProcess::new( - &target_topic.clone().unwrap(), - &function.target_data_model.as_ref().unwrap().to_table(), - ); - topic_to_table_sync_processes.insert(sync_process.id(), sync_process); - - let topic_sync = TopicToTopicSyncProcess::from_migration_function(function); - topic_to_topic_sync_processes.insert(topic_sync.id(), topic_sync); - - initial_data_loads.insert( - function_process.id(), - InitialDataLoad { - table: function.source_data_model.to_table(), - topic: source_topic.name.clone(), - // it doesn't mean it is completed, it means the desired state is Completed - status: InitialDataLoadStatus::Completed, - }, - ); - topics.insert(source_topic.id(), source_topic); - if let Some(target) = target_topic.clone() { - topics.insert(target.id(), target.clone()); - } + if !project.features.streaming_engine && !primitive_map.functions.is_empty() { + log::error!("Streaming disabled. Functions are disabled."); + show_message_wrapper( + MessageType::Error, + Message { + action: "Disabled".to_string(), + details: format!( + "Streaming is disabled but {} function(s) found.", + primitive_map.functions.len() + ), + }, + ); + } else { + for function in primitive_map.functions.iter() { + // Currently we are not creating 1 per function source and target. + // We reuse the topics that were created from the data models. + // Unless for streaming function migrations where we will have to create new topics. + + if function.is_migration() { + let (source_topic, target_topic) = Topic::from_migration_function(function); + + let function_process = FunctionProcess::from_migration_function( + function, + &source_topic, + &target_topic.clone().unwrap(), + ); - function_processes.insert(function_process.id(), function_process); - } else { - let function_process = FunctionProcess::from_function(function, &topics); - function_processes.insert(function_process.id(), function_process); + let sync_process = TopicToTableSyncProcess::new( + &target_topic.clone().unwrap(), + &function.target_data_model.as_ref().unwrap().to_table(), + ); + topic_to_table_sync_processes.insert(sync_process.id(), sync_process); + + let topic_sync = TopicToTopicSyncProcess::from_migration_function(function); + topic_to_topic_sync_processes.insert(topic_sync.id(), topic_sync); + + initial_data_loads.insert( + function_process.id(), + InitialDataLoad { + table: function.source_data_model.to_table(), + topic: source_topic.name.clone(), + // it doesn't mean it is completed, it means the desired state is Completed + status: InitialDataLoadStatus::Completed, + }, + ); + topics.insert(source_topic.id(), source_topic); + if let Some(target) = target_topic.clone() { + topics.insert(target.id(), target.clone()); + } + + function_processes.insert(function_process.id(), function_process); + } else { + let function_process = FunctionProcess::from_function(function, &topics); + function_processes.insert(function_process.id(), function_process); + } } } diff --git a/apps/framework-cli/src/framework/data_model/config.rs b/apps/framework-cli/src/framework/data_model/config.rs index a0c225fa3..b4ebe90ee 100644 --- a/apps/framework-cli/src/framework/data_model/config.rs +++ b/apps/framework-cli/src/framework/data_model/config.rs @@ -3,6 +3,7 @@ use std::path::Path; use crate::framework::python::datamodel_config::execute_python_model_file_for_config; use crate::framework::typescript::export_collectors::get_data_model_configs; +use crate::utilities::_true; use log::info; use serde::Deserialize; use serde::Serialize; @@ -40,9 +41,6 @@ pub struct StorageConfig { #[serde(default)] pub deduplicate: bool, } -const fn _true() -> bool { - true -} impl Default for StorageConfig { fn default() -> Self { diff --git a/apps/framework-cli/src/infrastructure/processes/orchestration_workers_registry.rs b/apps/framework-cli/src/infrastructure/processes/orchestration_workers_registry.rs index 300157517..25f02bf7f 100644 --- a/apps/framework-cli/src/infrastructure/processes/orchestration_workers_registry.rs +++ b/apps/framework-cli/src/infrastructure/processes/orchestration_workers_registry.rs @@ -54,7 +54,7 @@ impl OrchestrationWorkersRegistry { Self { workers: HashMap::new(), project: project.clone(), - scripts_enabled: settings.features.scripts, + scripts_enabled: settings.features.scripts || project.features.workflows, } } diff --git a/apps/framework-cli/src/project.rs b/apps/framework-cli/src/project.rs index fa5e6c17b..4a4288056 100644 --- a/apps/framework-cli/src/project.rs +++ b/apps/framework-cli/src/project.rs @@ -67,6 +67,7 @@ use crate::utilities::constants::{PYTHON_INIT_FILE, PY_API_FILE, TS_API_FILE}; use crate::utilities::constants::{VSCODE_DIR, VSCODE_EXT_FILE, VSCODE_SETTINGS_FILE}; use crate::utilities::git::GitConfig; use crate::utilities::PathExt; +use crate::utilities::_true; #[derive(Debug, thiserror::Error)] #[error("Failed to create or delete project files")] @@ -120,6 +121,27 @@ pub struct Project { #[serde(default, skip_serializing_if = "Vec::is_empty")] pub cron_jobs: Vec, + + #[serde(default)] + pub features: ProjectFeatures, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ProjectFeatures { + #[serde(default = "_true")] + pub streaming_engine: bool, + + #[serde(default)] + pub workflows: bool, +} + +impl Default for ProjectFeatures { + fn default() -> Self { + ProjectFeatures { + streaming_engine: true, + workflows: false, + } + } } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -189,6 +211,7 @@ impl Project { git_config: GitConfig::default(), jwt: None, cron_jobs: Vec::new(), + features: Default::default(), } } diff --git a/apps/framework-cli/src/utilities.rs b/apps/framework-cli/src/utilities.rs index 371b2f0bb..6d01941d2 100644 --- a/apps/framework-cli/src/utilities.rs +++ b/apps/framework-cli/src/utilities.rs @@ -21,3 +21,7 @@ impl PathExt for Path { }) } } + +pub const fn _true() -> bool { + true +} diff --git a/apps/framework-cli/src/utilities/docker-compose.yml.hbs b/apps/framework-cli/src/utilities/docker-compose.yml.hbs index 88a2c31ec..36d8c2dde 100644 --- a/apps/framework-cli/src/utilities/docker-compose.yml.hbs +++ b/apps/framework-cli/src/utilities/docker-compose.yml.hbs @@ -30,6 +30,7 @@ services: command: ["redis-server", "--appendonly", "yes"] volumes: - redis-data:/data + {{#if streaming_engine}} redpanda: image: docker.redpanda.com/redpandadata/redpanda:latest ports: @@ -56,6 +57,7 @@ services: timeout: 3s retries: 5 start_period: 5s + {{/if}} clickhousedb: image: docker.io/clickhouse/clickhouse-server:${CLICKHOUSE_VERSION:-latest} volumes: diff --git a/apps/framework-cli/src/utilities/docker.rs b/apps/framework-cli/src/utilities/docker.rs index 9abf83323..e79289270 100644 --- a/apps/framework-cli/src/utilities/docker.rs +++ b/apps/framework-cli/src/utilities/docker.rs @@ -374,7 +374,8 @@ impl DockerClient { handlebars.register_escape_fn(handlebars::no_escape); let data = json!({ - "scripts_feature": settings.features.scripts + "scripts_feature": settings.features.scripts || project.features.workflows, + "streaming_engine": project.features.streaming_engine }); let rendered = handlebars