Skip to content

Commit

Permalink
allows disabling the API endpoint & topic of a data model (#2045)
Browse files Browse the repository at this point in the history
  • Loading branch information
phiSgr authored Feb 18, 2025
1 parent 873de0c commit 96fff12
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 70 deletions.
8 changes: 4 additions & 4 deletions apps/framework-cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -994,15 +994,15 @@ async fn top_command_handler(
peek(project_arc, data_model_name, *limit, file.clone(), *topic).await
}
Commands::Workflow(workflow_args) => {
if !settings.features.scripts {
let project = load_project()?;

if !(settings.features.scripts || project.features.workflows) {
return Err(RoutineFailure::error(Message {
action: "Workflow".to_string(),
details: "Feature not enabled, to turn on go to ~/.moose/config.toml and set 'scripts' to true under the 'features' section".to_string(),
details: "Feature not enabled, to turn on go to moose.config.toml and set 'workflows' to true under the 'features' section".to_string(),
}));
}

let project = load_project()?;

match &workflow_args.command {
Some(WorkflowCommands::Init { name, tasks, task }) => {
init_workflow(&project, name, tasks.clone(), task.clone()).await
Expand Down
2 changes: 1 addition & 1 deletion apps/framework-cli/src/cli/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ pub fn batch_inserted(count: usize, table_name: &str) {
MessageType::Info,
Message {
action: "[DB]".to_string(),
details: format!("{count} rows successfully written to DB table ({table_name})"),
details: format!("{count} row(s) successfully written to DB table ({table_name})"),
}
);
}
Expand Down
8 changes: 5 additions & 3 deletions apps/framework-cli/src/cli/routines/dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ pub fn run_local_infrastructure(
run_containers(project, docker_client)?.show();

validate_clickhouse_run(project, docker_client)?.show();
validate_redpanda_run(project, docker_client)?.show();
validate_redpanda_cluster(project.name(), docker_client)?.show();
if settings.features.scripts {
if project.features.streaming_engine {
validate_redpanda_run(project, docker_client)?.show();
validate_redpanda_cluster(project.name(), docker_client)?.show();
}
if settings.features.scripts || project.features.workflows {
validate_temporal_run(project, docker_client)?.show();
}

Expand Down
38 changes: 25 additions & 13 deletions apps/framework-cli/src/cli/routines/ls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,25 +162,37 @@ async fn add_tables_views(
target_version: &str,
output_table: &mut HashMap<String, Vec<String>>,
) {
let system_tables = get_system_tables(project, target_version).await;
let mut system_tables = get_system_tables(project, target_version).await;

for (data_model, metadata) in output_table.iter_mut() {
if let Some(system_table) = system_tables.get(data_model) {
match system_table.engine.as_str() {
"MergeTree" => {
if let Some(v) = metadata.get_mut(1) {
v.clone_from(&system_table.name);
}
fn update_metadata(system_table: ClickHouseSystemTable, metadata: &mut [String]) {
match system_table.engine.as_str() {
"MergeTree" => {
if let Some(v) = metadata.get_mut(1) {
*v = system_table.name;
}
"View" => {
if let Some(v) = metadata.get_mut(2) {
v.clone_from(&system_table.name);
}
}
"View" => {
if let Some(v) = metadata.get_mut(2) {
*v = system_table.name;
}
_ => {}
}
_ => {}
}
}

for (data_model, metadata) in output_table.iter_mut() {
if let Some(system_table) = system_tables.remove(data_model) {
update_metadata(system_table, metadata);
}
}

// handle system_tables that are not in output_table (i.e. tables with no ingestion endpoint)
for (data_model, system_table) in system_tables.into_iter() {
let mut metadata = vec!["".to_string(), "".to_string(), "".to_string()];
update_metadata(system_table, &mut metadata);

output_table.insert(data_model, metadata);
}
}

async fn get_topics(project: &Project) -> HashSet<String> {
Expand Down
107 changes: 63 additions & 44 deletions apps/framework-cli/src/framework/core/infrastructure_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ use super::infrastructure::topic::Topic;
use super::infrastructure::topic_sync_process::{TopicToTableSyncProcess, TopicToTopicSyncProcess};
use super::infrastructure::view::View;
use super::primitive_map::PrimitiveMap;
use crate::cli::display::{show_message_wrapper, Message, MessageType};
use crate::framework::controller::{InitialDataLoad, InitialDataLoadStatus};
use crate::infrastructure::redis::redis_client::RedisClient;
use crate::infrastructure::stream::redpanda::RedpandaConfig;
use crate::project::Project;
use crate::proto::infrastructure_map::InfrastructureMap as ProtoInfrastructureMap;
use anyhow::Result;
use protobuf::{EnumOrUnknown, Message};
use protobuf::{EnumOrUnknown, Message as ProtoMessage};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs;
Expand Down Expand Up @@ -325,8 +326,10 @@ impl InfrastructureMap {
);
}

topics.insert(topic.id(), topic);
api_endpoints.insert(api_endpoint.id(), api_endpoint);
if project.features.streaming_engine {
topics.insert(topic.id(), topic);
api_endpoints.insert(api_endpoint.id(), api_endpoint);
}
} else {
// We wait to have processed all the datamodels to process the ones that don't have changes
// That way we can refer to infrastructure that was created by those older versions.
Expand Down Expand Up @@ -356,7 +359,9 @@ impl InfrastructureMap {
views.insert(view.id(), view);
}

api_endpoints.insert(api_endpoint.id(), api_endpoint);
if project.features.streaming_engine {
api_endpoints.insert(api_endpoint.id(), api_endpoint);
}
}
None => {
log::error!(
Expand All @@ -369,47 +374,61 @@ impl InfrastructureMap {
}
}

for function in primitive_map.functions.iter() {
// Currently we are not creating 1 per function source and target.
// We reuse the topics that were created from the data models.
// Unless for streaming function migrations where we will have to create new topics.

if function.is_migration() {
let (source_topic, target_topic) = Topic::from_migration_function(function);

let function_process = FunctionProcess::from_migration_function(
function,
&source_topic,
&target_topic.clone().unwrap(),
);

let sync_process = TopicToTableSyncProcess::new(
&target_topic.clone().unwrap(),
&function.target_data_model.as_ref().unwrap().to_table(),
);
topic_to_table_sync_processes.insert(sync_process.id(), sync_process);

let topic_sync = TopicToTopicSyncProcess::from_migration_function(function);
topic_to_topic_sync_processes.insert(topic_sync.id(), topic_sync);

initial_data_loads.insert(
function_process.id(),
InitialDataLoad {
table: function.source_data_model.to_table(),
topic: source_topic.name.clone(),
// it doesn't mean it is completed, it means the desired state is Completed
status: InitialDataLoadStatus::Completed,
},
);
topics.insert(source_topic.id(), source_topic);
if let Some(target) = target_topic.clone() {
topics.insert(target.id(), target.clone());
}
if !project.features.streaming_engine && !primitive_map.functions.is_empty() {
log::error!("Streaming disabled. Functions are disabled.");
show_message_wrapper(
MessageType::Error,
Message {
action: "Disabled".to_string(),
details: format!(
"Streaming is disabled but {} function(s) found.",
primitive_map.functions.len()
),
},
);
} else {
for function in primitive_map.functions.iter() {
// Currently we are not creating 1 per function source and target.
// We reuse the topics that were created from the data models.
// Unless for streaming function migrations where we will have to create new topics.

if function.is_migration() {
let (source_topic, target_topic) = Topic::from_migration_function(function);

let function_process = FunctionProcess::from_migration_function(
function,
&source_topic,
&target_topic.clone().unwrap(),
);

function_processes.insert(function_process.id(), function_process);
} else {
let function_process = FunctionProcess::from_function(function, &topics);
function_processes.insert(function_process.id(), function_process);
let sync_process = TopicToTableSyncProcess::new(
&target_topic.clone().unwrap(),
&function.target_data_model.as_ref().unwrap().to_table(),
);
topic_to_table_sync_processes.insert(sync_process.id(), sync_process);

let topic_sync = TopicToTopicSyncProcess::from_migration_function(function);
topic_to_topic_sync_processes.insert(topic_sync.id(), topic_sync);

initial_data_loads.insert(
function_process.id(),
InitialDataLoad {
table: function.source_data_model.to_table(),
topic: source_topic.name.clone(),
// it doesn't mean it is completed, it means the desired state is Completed
status: InitialDataLoadStatus::Completed,
},
);
topics.insert(source_topic.id(), source_topic);
if let Some(target) = target_topic.clone() {
topics.insert(target.id(), target.clone());
}

function_processes.insert(function_process.id(), function_process);
} else {
let function_process = FunctionProcess::from_function(function, &topics);
function_processes.insert(function_process.id(), function_process);
}
}
}

Expand Down
4 changes: 1 addition & 3 deletions apps/framework-cli/src/framework/data_model/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::path::Path;

use crate::framework::python::datamodel_config::execute_python_model_file_for_config;
use crate::framework::typescript::export_collectors::get_data_model_configs;
use crate::utilities::_true;
use log::info;
use serde::Deserialize;
use serde::Serialize;
Expand Down Expand Up @@ -40,9 +41,6 @@ pub struct StorageConfig {
#[serde(default)]
pub deduplicate: bool,
}
const fn _true() -> bool {
true
}

impl Default for StorageConfig {
fn default() -> Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl OrchestrationWorkersRegistry {
Self {
workers: HashMap::new(),
project: project.clone(),
scripts_enabled: settings.features.scripts,
scripts_enabled: settings.features.scripts || project.features.workflows,
}
}

Expand Down
23 changes: 23 additions & 0 deletions apps/framework-cli/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ use crate::utilities::constants::{PYTHON_INIT_FILE, PY_API_FILE, TS_API_FILE};
use crate::utilities::constants::{VSCODE_DIR, VSCODE_EXT_FILE, VSCODE_SETTINGS_FILE};
use crate::utilities::git::GitConfig;
use crate::utilities::PathExt;
use crate::utilities::_true;

#[derive(Debug, thiserror::Error)]
#[error("Failed to create or delete project files")]
Expand Down Expand Up @@ -120,6 +121,27 @@ pub struct Project {

#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub cron_jobs: Vec<CronJob>,

#[serde(default)]
pub features: ProjectFeatures,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ProjectFeatures {
#[serde(default = "_true")]
pub streaming_engine: bool,

#[serde(default)]
pub workflows: bool,
}

impl Default for ProjectFeatures {
fn default() -> Self {
ProjectFeatures {
streaming_engine: true,
workflows: false,
}
}
}

#[derive(Debug, Serialize, Deserialize, Clone)]
Expand Down Expand Up @@ -189,6 +211,7 @@ impl Project {
git_config: GitConfig::default(),
jwt: None,
cron_jobs: Vec::new(),
features: Default::default(),
}
}

Expand Down
4 changes: 4 additions & 0 deletions apps/framework-cli/src/utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ impl PathExt for Path {
})
}
}

pub const fn _true() -> bool {
true
}
2 changes: 2 additions & 0 deletions apps/framework-cli/src/utilities/docker-compose.yml.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ services:
command: ["redis-server", "--appendonly", "yes"]
volumes:
- redis-data:/data
{{#if streaming_engine}}
redpanda:
image: docker.redpanda.com/redpandadata/redpanda:latest
ports:
Expand All @@ -56,6 +57,7 @@ services:
timeout: 3s
retries: 5
start_period: 5s
{{/if}}
clickhousedb:
image: docker.io/clickhouse/clickhouse-server:${CLICKHOUSE_VERSION:-latest}
volumes:
Expand Down
3 changes: 2 additions & 1 deletion apps/framework-cli/src/utilities/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ impl DockerClient {
handlebars.register_escape_fn(handlebars::no_escape);

let data = json!({
"scripts_feature": settings.features.scripts
"scripts_feature": settings.features.scripts || project.features.workflows,
"streaming_engine": project.features.streaming_engine
});

let rendered = handlebars
Expand Down

0 comments on commit 96fff12

Please sign in to comment.