diff --git a/.cursor/rules/protobuff.mdc b/.cursor/rules/protobuff.mdc new file mode 100644 index 000000000..64586447d --- /dev/null +++ b/.cursor/rules/protobuff.mdc @@ -0,0 +1,6 @@ +--- +description: Best practices to edit and build .proto files +globs: *.proto +--- + +DO NOT MAKE BREAKING CHANGES \ No newline at end of file diff --git a/.cursor/rules/rust.mdc b/.cursor/rules/rust.mdc new file mode 100644 index 000000000..ed1ee36f9 --- /dev/null +++ b/.cursor/rules/rust.mdc @@ -0,0 +1,104 @@ +--- +description: Rust Langauge best practices +globs: *.rs +--- +# Architectural Rules + +## General Principles + +- **Keep it Simple (KISS)**: Strive for simplicity in code design. +- **Don't Repeat Yourself (DRY)**: Eliminate duplication to enhance maintainability. +- **You Ain't Gonna Need It (YAGNI)**: Avoid adding functionality until it's necessary. +- **Readability Matters**: Write code as if the next person to read it is a beginner. + +## Code Structure + +- Use meaningful and descriptive variable, function, and class names. +- Organize files and directories logically. +- Follow a consistent indentation and formatting style (use linters and formatters). +- Separate concerns: Keep logic modular and avoid monolithic functions. + +## Performance Considerations + +- Optimize only when necessary—write clear code first. +- Use efficient data structures and algorithms. +- Avoid unnecessary computations and redundant API calls. +- Be mindful of memory usage and garbage collection. + +## Security Best Practices + +- Never hardcode sensitive data (use environment variables or secrets management tools). +- Sanitize inputs to prevent injection attacks. +- Follow the principle of least privilege when managing permissions. +- Keep dependencies updated to mitigate vulnerabilities. + + + +## Error Handling +1. Error types should be located near their unit of fallibility + - No global `Error` type or `errors.rs` module + - Each module/component defines its own error types + - Error types live close to the functions that generate them + +2. Use thiserror for error definitions + - Derive Error trait using #[derive(thiserror::Error)] + - Use #[error()] attribute for human-readable messages + - Use #[from] attribute to implement From trait where appropriate + - Mark error types as #[non_exhaustive] + +3. Structure errors in layers + - Use struct for context (e.g. file paths, line numbers) + - Use enum for error variants + - Implement source() to chain underlying errors + - Example: + ```rust + #[derive(Debug, thiserror::Error)] + #[error("error reading `{path}`")] + pub struct FileError { + pub path: PathBuf, + #[source] + pub kind: FileErrorKind + } + ``` + +4. Error matching and handling + - Make errors inspectable with specific variants + - Provide enough context for actionable error messages + - Use meaningful variant names (e.g. ReadFile vs Io) + - Document error conditions and handling + +5. Error stability and privacy + - Consider making error fields private when needed + - Don't expose internal error types in public API + - Use opaque error types for stable interfaces + - Version error types appropriately + +6. Do NOT use anyhow::Result + - If you see anyhow::Result being used, refactor using this::error + +## Constants +- Use `const` for all static values in Rust unless interior mutability or runtime evaluation is required. +- Prefer placing constants in a `constants.rs` file. +- The `constants.rs` file should be located at the deepest level in the module tree but at the highest level where all usages of the constants exist. +- Ensure constants are appropriately scoped to avoid unnecessary exposure to unrelated modules. +- Use `pub(crate)` or `pub(super)` instead of `pub` when limiting visibility to the necessary scope. +- Group related constants together for better maintainability and readability. +- Use descriptive names in uppercase with underscores (e.g., `MAX_RETRY_COUNT`). +- When working with enums or complex types, prefer `static` with `lazy_static!` or `once_cell::sync::Lazy` for initialization. +- Avoid redefining constants in multiple places; ensure they are sourced from `constants.rs` where needed. + +## Documentation +1. All public APIs must be documented +2. Architecture decisions should be documented +3. Side effects should be clearly documented +4. Breaking changes must be documented + +## Observability + +- Implement logging, metrics, and tracing to gain insights into system behavior. +- Use structured logging for better searchability and debugging. +- Leverage distributed tracing to diagnose performance bottlenecks in microservices. + +## Linters +* Always run `cargo clippy` to make sure your changes are right. In Clippy we trust. +* Don't go around the linters by adding exceptions, try to actually use the variables if you find deadcode or delete the code if it is actually not useful. \ No newline at end of file diff --git a/apps/framework-cli/src/cli/local_webserver.rs b/apps/framework-cli/src/cli/local_webserver.rs index 987cdfd97..cce53641e 100644 --- a/apps/framework-cli/src/cli/local_webserver.rs +++ b/apps/framework-cli/src/cli/local_webserver.rs @@ -2,6 +2,7 @@ use super::display::Message; use super::display::MessageType; use super::routines::auth::validate_auth_token; use super::settings::Settings; +use crate::infrastructure::redis::redis_client::RedisClient; use crate::metrics::MetricEvent; use crate::cli::display::with_spinner; @@ -46,6 +47,7 @@ use serde::Serialize; use serde::{Deserialize, Deserializer}; use serde_json::Deserializer as JsonDeserializer; use tokio::spawn; +use tokio::sync::Mutex; use crate::framework::data_model::model::DataModel; use crate::utilities::validate_passthrough::{DataModelArrayVisitor, DataModelVisitor}; @@ -219,6 +221,8 @@ struct RouteService { is_prod: bool, metrics: Arc, http_client: Arc, + project: Arc, + redis_client: Arc>, } #[derive(Clone)] struct ManagementService { @@ -249,6 +253,8 @@ impl Service> for RouteService { req, route_table: self.route_table, }, + self.project.clone(), + self.redis_client.clone(), )) } } @@ -287,7 +293,7 @@ fn options_route() -> Result>, hyper::http::Error> { Ok(response) } -fn health_route() -> Result>, hyper::http::Error> { +async fn health_route() -> Result>, hyper::http::Error> { let response = Response::builder() .status(StatusCode::OK) .body(Full::new(Bytes::from("Success"))) @@ -295,6 +301,80 @@ fn health_route() -> Result>, hyper::http::Error> { Ok(response) } +async fn admin_reality_check_route( + req: Request, + admin_api_key: &Option, + project: &Project, + redis_client: &Arc>, +) -> Result>, hyper::http::Error> { + let auth_header = req.headers().get(hyper::header::AUTHORIZATION); + let bearer_token = auth_header + .and_then(|header_value| header_value.to_str().ok()) + .and_then(|header_str| header_str.strip_prefix("Bearer ")); + + // Check API key authentication + if let Some(key) = admin_api_key.as_ref() { + if !validate_token(bearer_token, key).await { + return Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body(Full::new(Bytes::from( + "Unauthorized: Invalid or missing token", + ))); + } + } else { + return Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body(Full::new(Bytes::from( + "Unauthorized: Admin API key not configured", + ))); + } + + // Create OLAP client and reality checker + let olap_client = + crate::infrastructure::olap::clickhouse::create_client(project.clickhouse_config.clone()); + let reality_checker = + crate::framework::core::infra_reality_checker::InfraRealityChecker::new(olap_client); + + // Load infrastructure map from Redis + let redis_guard = redis_client.lock().await; + let infra_map = match InfrastructureMap::get_from_redis(&redis_guard).await { + Ok(map) => map, + Err(e) => { + return Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Full::new(Bytes::from(format!( + "Failed to get infrastructure map: {}", + e + )))) + } + }; + + // Perform reality check + match reality_checker.check_reality(project, &infra_map).await { + Ok(discrepancies) => { + let response = serde_json::json!({ + "status": "success", + "discrepancies": { + "unmapped_tables": discrepancies.unmapped_tables, + "missing_tables": discrepancies.missing_tables, + "mismatched_tables": discrepancies.mismatched_tables, + } + }); + + Ok(Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Full::new(Bytes::from(response.to_string())))?) + } + Err(e) => Ok(Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Full::new(Bytes::from(format!( + "{{\"status\": \"error\", \"message\": \"{}\"}}", + e + ))))?), + } +} + async fn log_route(req: Request) -> Response> { let body = to_reader(req).await; let parsed: Result = serde_json::from_reader(body); @@ -710,6 +790,8 @@ async fn router( metrics: Arc, http_client: Arc, request: RouterRequest, + project: Arc, + redis_client: Arc>, ) -> Result>, hyper::http::Error> { let now = Instant::now(); @@ -772,8 +854,16 @@ async fn router( } } } - (&hyper::Method::GET, ["health"]) => health_route(), - + (&hyper::Method::GET, ["health"]) => health_route().await, + (&hyper::Method::GET, ["admin", "reality-check"]) => { + admin_reality_check_route( + req, + &project.authentication.admin_api_key, + &project, + &redis_client, + ) + .await + } (&hyper::Method::OPTIONS, _) => options_route(), _ => route_not_found_response(), }; @@ -1043,6 +1133,12 @@ impl Webserver { is_prod: project.is_production, http_client, metrics: metrics.clone(), + project: project.clone(), + redis_client: Arc::new(Mutex::new( + RedisClient::new(project.name(), project.redis_config.clone()) + .await + .unwrap(), + )), }; let management_service = ManagementService { diff --git a/apps/framework-cli/src/framework/consumption/model.rs b/apps/framework-cli/src/framework/consumption/model.rs index 48a3477b2..225412636 100644 --- a/apps/framework-cli/src/framework/consumption/model.rs +++ b/apps/framework-cli/src/framework/consumption/model.rs @@ -23,6 +23,14 @@ impl ConsumptionQueryParam { special_fields: Default::default(), } } + + pub fn from_proto(proto: ProtoConsumptionQueryParam) -> Self { + ConsumptionQueryParam { + name: proto.name, + data_type: ColumnType::from_proto(proto.data_type.unwrap()), + required: proto.required, + } + } } #[derive(Debug, Clone, Default)] diff --git a/apps/framework-cli/src/framework/controller.rs b/apps/framework-cli/src/framework/controller.rs index 917cfe25a..0b8a91dc2 100644 --- a/apps/framework-cli/src/framework/controller.rs +++ b/apps/framework-cli/src/framework/controller.rs @@ -38,10 +38,15 @@ pub struct RouteMeta { pub struct InitialDataLoad { pub table: Table, pub topic: String, - pub status: InitialDataLoadStatus, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum InitialDataLoadStatus { + InProgress(i64), + Completed, +} + impl InitialDataLoad { pub(crate) fn expanded_display(&self) -> String { format!( @@ -54,19 +59,25 @@ impl InitialDataLoad { ProtoInitialDataLoad { table: MessageField::some(self.table.to_proto()), topic: self.topic.clone(), - progress: match self.status { - InitialDataLoadStatus::InProgress(i) => Some(i as u64), + progress: match &self.status { + InitialDataLoadStatus::InProgress(i) => Some(*i as u64), InitialDataLoadStatus::Completed => None, }, special_fields: Default::default(), } } -} -#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] -pub enum InitialDataLoadStatus { - InProgress(i64), - Completed, + pub fn from_proto(proto: ProtoInitialDataLoad) -> Self { + InitialDataLoad { + table: Table::from_proto(proto.table.unwrap()), + topic: proto.topic, + status: match proto.progress { + Some(i) if i >= 100 => InitialDataLoadStatus::Completed, + Some(i) => InitialDataLoadStatus::InProgress(i as i64), + None => InitialDataLoadStatus::Completed, + }, + } + } } pub async fn initial_data_load( diff --git a/apps/framework-cli/src/framework/core/infra_reality_checker.rs b/apps/framework-cli/src/framework/core/infra_reality_checker.rs new file mode 100644 index 000000000..c6d5bb1d3 --- /dev/null +++ b/apps/framework-cli/src/framework/core/infra_reality_checker.rs @@ -0,0 +1,488 @@ +use crate::{ + framework::core::infrastructure_map::{InfrastructureMap, OlapChange}, + infrastructure::olap::{OlapChangesError, OlapOperations}, + project::Project, +}; +use log::debug; +use std::collections::HashMap; +use thiserror::Error; + +#[derive(Debug, Error)] +#[non_exhaustive] +pub enum RealityCheckError { + #[error("Failed to check OLAP infrastructure: {0}")] + OlapCheck(#[from] OlapChangesError), + #[error("Failed to load infrastructure map: {0}")] + InfraMapLoad(#[from] std::io::Error), + #[error("Database error: {0}")] + DatabaseError(String), +} + +/// Represents discrepancies found between actual infrastructure and documented map +#[derive(Debug)] +pub struct InfraDiscrepancies { + /// Tables that exist in reality but are not in the map + pub unmapped_tables: Vec, + /// Tables that are in the map but don't exist in reality + pub missing_tables: Vec, + /// Tables that exist in both but have structural differences + pub mismatched_tables: Vec, +} + +impl InfraDiscrepancies { + pub fn is_empty(&self) -> bool { + self.unmapped_tables.is_empty() + && self.missing_tables.is_empty() + && self.mismatched_tables.is_empty() + } +} + +/// The Infrastructure Reality Checker compares actual infrastructure state with the infrastructure map +pub struct InfraRealityChecker { + olap_client: T, +} + +impl InfraRealityChecker { + pub fn new(olap_client: T) -> Self { + Self { olap_client } + } + + /// Checks the actual infrastructure state against the provided infrastructure map + /// + /// # Arguments + /// + /// * `project` - The project configuration + /// * `infra_map` - The infrastructure map to check against + /// + /// # Returns + /// + /// * `Result` - The discrepancies found or an error + pub async fn check_reality( + &self, + project: &Project, + infra_map: &InfrastructureMap, + ) -> Result { + debug!("Starting infrastructure reality check"); + debug!("Project version: {}", project.cur_version()); + debug!("Database: {}", project.clickhouse_config.db_name); + + // Get actual tables from OLAP database + debug!("Fetching actual tables from OLAP database"); + let actual_tables = self + .olap_client + .list_tables(&project.clickhouse_config.db_name, project) + .await?; + + debug!("Found {} tables in database", actual_tables.len()); + + // Filter out tables starting with "_moose" (case-insensitive) + let actual_tables: Vec<_> = actual_tables + .into_iter() + .filter(|t| !t.name.to_lowercase().starts_with("_moose")) + .collect(); + debug!( + "{} tables remain after filtering _moose tables", + actual_tables.len() + ); + + // Create maps for easier comparison + let actual_table_map: HashMap<_, _> = actual_tables + .iter() + .map(|t| (t.name.clone(), t.clone())) + .collect(); + debug!("Actual table names: {:?}", actual_table_map.keys()); + + let mapped_table_map: HashMap<_, _> = infra_map + .tables + .values() + .map(|t| (t.name.clone(), t.clone())) + .collect(); + debug!( + "Infrastructure map table names: {:?}", + mapped_table_map.keys() + ); + + // Find unmapped tables (exist in reality but not in map) + let unmapped_tables: Vec = actual_table_map + .keys() + .filter(|name| !mapped_table_map.contains_key(*name)) + .cloned() + .collect(); + debug!( + "Found {} unmapped tables: {:?}", + unmapped_tables.len(), + unmapped_tables + ); + + // Find missing tables (in map but don't exist) + let missing_tables: Vec = mapped_table_map + .keys() + .filter(|name| !actual_table_map.contains_key(*name)) + .cloned() + .collect(); + debug!( + "Found {} missing tables: {:?}", + missing_tables.len(), + missing_tables + ); + + // Find structural differences in tables that exist in both + let mut mismatched_tables = Vec::new(); + for (name, mapped_table) in mapped_table_map { + if let Some(actual_table) = actual_table_map.get(&name) { + debug!("Comparing table structure for: {}", name); + if actual_table != &mapped_table { + debug!("Found structural mismatch in table: {}", name); + debug!("Actual table: {:?}", actual_table); + debug!("Mapped table: {:?}", mapped_table); + + // Use the existing diff_tables function to compute differences + // Note: We flip the order here to make infra_map the reference + let mut changes = Vec::new(); + let mut actual_tables = HashMap::new(); + actual_tables.insert(name.clone(), actual_table.clone()); + let mut mapped_tables = HashMap::new(); + mapped_tables.insert(name.clone(), mapped_table.clone()); + + // Flip the order of arguments to make infra_map the reference + InfrastructureMap::diff_tables(&actual_tables, &mapped_tables, &mut changes); + debug!( + "Found {} changes for table {}: {:?}", + changes.len(), + name, + changes + ); + mismatched_tables.extend(changes); + } else { + debug!("Table {} matches infrastructure map", name); + } + } + } + debug!("Found {} mismatched tables", mismatched_tables.len()); + + let discrepancies = InfraDiscrepancies { + unmapped_tables, + missing_tables, + mismatched_tables, + }; + + debug!( + "Reality check complete. Found {} unmapped, {} missing, and {} mismatched tables", + discrepancies.unmapped_tables.len(), + discrepancies.missing_tables.len(), + discrepancies.mismatched_tables.len() + ); + + if discrepancies.is_empty() { + debug!("No discrepancies found between reality and infrastructure map"); + } + + Ok(discrepancies) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::cli::local_webserver::LocalWebserverConfig; + use crate::framework::core::infrastructure::consumption_webserver::ConsumptionApiWebServer; + use crate::framework::core::infrastructure::olap_process::OlapProcess; + use crate::framework::core::infrastructure::table::{Column, ColumnType, Table}; + use crate::framework::core::infrastructure_map::{ + PrimitiveSignature, PrimitiveTypes, TableChange, + }; + use crate::framework::versions::Version; + use async_trait::async_trait; + + // Mock OLAP client for testing + struct MockOlapClient { + tables: Vec, + } + + #[async_trait] + impl OlapOperations for MockOlapClient { + async fn list_tables( + &self, + _db_name: &str, + _project: &Project, + ) -> Result, OlapChangesError> { + Ok(self.tables.clone()) + } + } + + // Helper function to create a test project + fn create_test_project() -> Project { + Project { + language: crate::framework::languages::SupportedLanguages::Typescript, + redpanda_config: crate::infrastructure::stream::redpanda::RedpandaConfig::default(), + clickhouse_config: crate::infrastructure::olap::clickhouse::ClickHouseConfig { + db_name: "test".to_string(), + user: "test".to_string(), + password: "test".to_string(), + use_ssl: false, + host: "localhost".to_string(), + host_port: 18123, + native_port: 9000, + }, + http_server_config: LocalWebserverConfig::default(), + redis_config: crate::infrastructure::redis::redis_client::RedisConfig::default(), + git_config: crate::utilities::git::GitConfig::default(), + temporal_config: + crate::infrastructure::orchestration::temporal::TemporalConfig::default(), + language_project_config: crate::project::LanguageProjectConfig::default(), + project_location: std::path::PathBuf::new(), + is_production: false, + supported_old_versions: std::collections::HashMap::new(), + jwt: None, + authentication: crate::project::AuthenticationConfig::default(), + cron_jobs: Vec::new(), + features: crate::project::ProjectFeatures::default(), + } + } + + fn create_base_table(name: &str) -> Table { + Table { + name: name.to_string(), + columns: vec![Column { + name: "id".to_string(), + data_type: ColumnType::Int, + required: true, + unique: true, + primary_key: true, + default: None, + }], + order_by: vec!["id".to_string()], + deduplicate: false, + version: Version::from_string("1.0.0".to_string()), + source_primitive: PrimitiveSignature { + name: "test".to_string(), + primitive_type: PrimitiveTypes::DataModel, + }, + } + } + + #[tokio::test] + async fn test_reality_checker_basic() { + // Create a mock table + let table = create_base_table("test_table"); + + // Create mock OLAP client with one table + let mock_client = MockOlapClient { + tables: vec![table.clone()], + }; + + // Create empty infrastructure map + let mut infra_map = InfrastructureMap { + topics: HashMap::new(), + api_endpoints: HashMap::new(), + tables: HashMap::new(), + views: HashMap::new(), + topic_to_table_sync_processes: HashMap::new(), + topic_to_topic_sync_processes: HashMap::new(), + function_processes: HashMap::new(), + block_db_processes: OlapProcess {}, + consumption_api_web_server: ConsumptionApiWebServer {}, + initial_data_loads: HashMap::new(), + orchestration_workers: HashMap::new(), + }; + + // Create reality checker + let checker = InfraRealityChecker::new(mock_client); + + // Create test project + let project = create_test_project(); + + let discrepancies = checker.check_reality(&project, &infra_map).await.unwrap(); + + // Should find one unmapped table + assert_eq!(discrepancies.unmapped_tables.len(), 1); + assert_eq!(discrepancies.unmapped_tables[0], "test_table"); + assert!(discrepancies.missing_tables.is_empty()); + assert!(discrepancies.mismatched_tables.is_empty()); + + // Add table to infrastructure map + infra_map.tables.insert(table.name.clone(), table); + + // Check again + let discrepancies = checker.check_reality(&project, &infra_map).await.unwrap(); + + // Should find no discrepancies + assert!(discrepancies.is_empty()); + } + + #[tokio::test] + async fn test_reality_checker_structural_mismatch() { + let mut actual_table = create_base_table("test_table"); + let infra_table = create_base_table("test_table"); + + // Add an extra column to the actual table that's not in infra map + actual_table.columns.push(Column { + name: "extra_column".to_string(), + data_type: ColumnType::String, + required: false, + unique: false, + primary_key: false, + default: None, + }); + + let mock_client = MockOlapClient { + tables: vec![actual_table], + }; + + let mut infra_map = InfrastructureMap { + topics: HashMap::new(), + api_endpoints: HashMap::new(), + tables: HashMap::new(), + views: HashMap::new(), + topic_to_table_sync_processes: HashMap::new(), + topic_to_topic_sync_processes: HashMap::new(), + function_processes: HashMap::new(), + block_db_processes: OlapProcess {}, + consumption_api_web_server: ConsumptionApiWebServer {}, + initial_data_loads: HashMap::new(), + orchestration_workers: HashMap::new(), + }; + + infra_map + .tables + .insert(infra_table.name.clone(), infra_table); + + let checker = InfraRealityChecker::new(mock_client); + let project = create_test_project(); + + let discrepancies = checker.check_reality(&project, &infra_map).await.unwrap(); + + assert!(discrepancies.unmapped_tables.is_empty()); + assert!(discrepancies.missing_tables.is_empty()); + assert_eq!(discrepancies.mismatched_tables.len(), 1); + + // Verify the change is from reality's perspective - we need to remove the extra column to match infra map + match &discrepancies.mismatched_tables[0] { + OlapChange::Table(TableChange::Updated { column_changes, .. }) => { + assert_eq!(column_changes.len(), 1); + assert!(matches!( + &column_changes[0], + crate::framework::core::infrastructure_map::ColumnChange::Removed(_) + )); + } + _ => panic!("Expected TableChange::Updated variant"), + } + } + + #[tokio::test] + async fn test_reality_checker_order_by_mismatch() { + let mut actual_table = create_base_table("test_table"); + let mut infra_table = create_base_table("test_table"); + + // Add timestamp column to both tables + let timestamp_col = Column { + name: "timestamp".to_string(), + data_type: ColumnType::DateTime, + required: true, + unique: false, + primary_key: false, + default: None, + }; + actual_table.columns.push(timestamp_col.clone()); + infra_table.columns.push(timestamp_col); + + // Set different order_by in actual vs infra + actual_table.order_by = vec!["id".to_string(), "timestamp".to_string()]; + infra_table.order_by = vec!["id".to_string()]; + + let mock_client = MockOlapClient { + tables: vec![actual_table], + }; + + let mut infra_map = InfrastructureMap { + topics: HashMap::new(), + api_endpoints: HashMap::new(), + tables: HashMap::new(), + views: HashMap::new(), + topic_to_table_sync_processes: HashMap::new(), + topic_to_topic_sync_processes: HashMap::new(), + function_processes: HashMap::new(), + block_db_processes: OlapProcess {}, + consumption_api_web_server: ConsumptionApiWebServer {}, + initial_data_loads: HashMap::new(), + orchestration_workers: HashMap::new(), + }; + + infra_map + .tables + .insert(infra_table.name.clone(), infra_table); + + let checker = InfraRealityChecker::new(mock_client); + let project = create_test_project(); + + let discrepancies = checker.check_reality(&project, &infra_map).await.unwrap(); + + assert!(discrepancies.unmapped_tables.is_empty()); + assert!(discrepancies.missing_tables.is_empty()); + assert_eq!(discrepancies.mismatched_tables.len(), 1); + + // Verify the change is from reality's perspective - we need to change order_by to match infra map + match &discrepancies.mismatched_tables[0] { + OlapChange::Table(TableChange::Updated { + order_by_change, .. + }) => { + assert_eq!( + order_by_change.before, + vec!["id".to_string(), "timestamp".to_string()] + ); + assert_eq!(order_by_change.after, vec!["id".to_string()]); + } + _ => panic!("Expected TableChange::Updated variant"), + } + } + + #[tokio::test] + async fn test_reality_checker_deduplicate_mismatch() { + let mut actual_table = create_base_table("test_table"); + let mut infra_table = create_base_table("test_table"); + + // Set different deduplicate values + actual_table.deduplicate = true; + infra_table.deduplicate = false; + + let mock_client = MockOlapClient { + tables: vec![actual_table], + }; + + let mut infra_map = InfrastructureMap { + topics: HashMap::new(), + api_endpoints: HashMap::new(), + tables: HashMap::new(), + views: HashMap::new(), + topic_to_table_sync_processes: HashMap::new(), + topic_to_topic_sync_processes: HashMap::new(), + function_processes: HashMap::new(), + block_db_processes: OlapProcess {}, + consumption_api_web_server: ConsumptionApiWebServer {}, + initial_data_loads: HashMap::new(), + orchestration_workers: HashMap::new(), + }; + + infra_map + .tables + .insert(infra_table.name.clone(), infra_table); + + let checker = InfraRealityChecker::new(mock_client); + let project = create_test_project(); + + let discrepancies = checker.check_reality(&project, &infra_map).await.unwrap(); + + assert!(discrepancies.unmapped_tables.is_empty()); + assert!(discrepancies.missing_tables.is_empty()); + assert_eq!(discrepancies.mismatched_tables.len(), 1); + + // Verify the change is from reality's perspective - we need to change deduplicate to match infra map + match &discrepancies.mismatched_tables[0] { + OlapChange::Table(TableChange::Updated { before, after, .. }) => { + assert!(before.deduplicate); + assert!(!after.deduplicate); + } + _ => panic!("Expected TableChange::Updated variant"), + } + } +} diff --git a/apps/framework-cli/src/framework/core/infrastructure/api_endpoint.rs b/apps/framework-cli/src/framework/core/infrastructure/api_endpoint.rs index f0d0d62d7..e17860e3e 100644 --- a/apps/framework-cli/src/framework/core/infrastructure/api_endpoint.rs +++ b/apps/framework-cli/src/framework/core/infrastructure/api_endpoint.rs @@ -124,6 +124,22 @@ impl ApiEndpoint { special_fields: Default::default(), } } + + pub fn from_proto(proto: ProtoApiEndpoint) -> Self { + ApiEndpoint { + name: proto.name, + api_type: APIType::from_proto(proto.api_type.unwrap()), + path: PathBuf::from(proto.path), + method: Method::from_proto( + proto + .method + .enum_value() + .expect("Invalid method enum value"), + ), + version: Version::from_string(proto.version), + source_primitive: PrimitiveSignature::from_proto(proto.source_primitive.unwrap()), + } + } } impl From for ApiEndpoint { @@ -210,6 +226,31 @@ impl APIType { _ => serde_json::from_value(json).map_err(D::Error::custom), } } + + pub fn from_proto(proto: ProtoApiType) -> Self { + match proto { + ProtoApiType::Ingress(details) => APIType::INGRESS { + target_topic: details.target_topic, + data_model: None, + format: match details + .format + .enum_value() + .expect("Invalid format enum value") + { + ProtoIngressFormat::JSON => EndpointIngestionFormat::Json, + ProtoIngressFormat::JSON_ARRAY => EndpointIngestionFormat::JsonArray, + }, + }, + ProtoApiType::Egress(details) => APIType::EGRESS { + query_params: details + .query_params + .into_iter() + .map(ConsumptionQueryParam::from_proto) + .collect(), + output_schema: serde_json::from_str(&details.output_schema).unwrap_or_default(), + }, + } + } } impl Method { @@ -221,4 +262,13 @@ impl Method { Method::DELETE => ProtoMethod::DELETE, } } + + pub fn from_proto(proto: ProtoMethod) -> Self { + match proto { + ProtoMethod::GET => Method::GET, + ProtoMethod::POST => Method::POST, + ProtoMethod::PUT => Method::PUT, + ProtoMethod::DELETE => Method::DELETE, + } + } } diff --git a/apps/framework-cli/src/framework/core/infrastructure/function_process.rs b/apps/framework-cli/src/framework/core/infrastructure/function_process.rs index 60797d3aa..8cfb3baa2 100644 --- a/apps/framework-cli/src/framework/core/infrastructure/function_process.rs +++ b/apps/framework-cli/src/framework/core/infrastructure/function_process.rs @@ -163,6 +163,29 @@ impl FunctionProcess { special_fields: Default::default(), } } + + pub fn from_proto(proto: ProtoFunctionProcess) -> Self { + FunctionProcess { + name: proto.name, + source_topic: proto.source_topic, + source_columns: proto + .source_columns + .into_iter() + .map(Column::from_proto) + .collect(), + target_topic: proto.target_topic, + target_topic_config: proto.target_topic_config, + target_columns: proto + .target_columns + .into_iter() + .map(Column::from_proto) + .collect(), + executable: PathBuf::from(proto.executable), + parallel_process_count: proto.parallel_process_count.unwrap_or(1) as usize, + version: proto.version, + source_primitive: PrimitiveSignature::from_proto(proto.source_primitive.unwrap()), + } + } } /** diff --git a/apps/framework-cli/src/framework/core/infrastructure/orchestration_worker.rs b/apps/framework-cli/src/framework/core/infrastructure/orchestration_worker.rs index a514a2e02..887614256 100644 --- a/apps/framework-cli/src/framework/core/infrastructure/orchestration_worker.rs +++ b/apps/framework-cli/src/framework/core/infrastructure/orchestration_worker.rs @@ -23,4 +23,10 @@ impl OrchestrationWorker { special_fields: Default::default(), } } + + pub fn from_proto(proto: ProtoOrchestrationWorker) -> Self { + OrchestrationWorker { + supported_language: SupportedLanguages::from_proto(proto.supported_language), + } + } } diff --git a/apps/framework-cli/src/framework/core/infrastructure/table.rs b/apps/framework-cli/src/framework/core/infrastructure/table.rs index 847a6f17a..e5b2c7e9c 100644 --- a/apps/framework-cli/src/framework/core/infrastructure/table.rs +++ b/apps/framework-cli/src/framework/core/infrastructure/table.rs @@ -59,6 +59,17 @@ impl Table { special_fields: Default::default(), } } + + pub fn from_proto(proto: ProtoTable) -> Self { + Table { + name: proto.name, + columns: proto.columns.into_iter().map(Column::from_proto).collect(), + order_by: proto.order_by, + version: Version::from_string(proto.version), + source_primitive: PrimitiveSignature::from_proto(proto.source_primitive.unwrap()), + deduplicate: false, // TODO: Add to proto + } + } } #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)] @@ -303,7 +314,26 @@ impl Column { special_fields: Default::default(), } } + + pub fn from_proto(proto: crate::proto::infrastructure_map::Column) -> Self { + Column { + name: proto.name, + data_type: ColumnType::from_proto(proto.data_type.unwrap()), + required: proto.required, + unique: proto.unique, + primary_key: proto.primary_key, + default: match proto + .default + .enum_value() + .expect("Invalid default enum value") + { + ProtoColumnDefaults::NONE => None, + default => Some(ColumnDefaults::from_proto(default)), + }, + } + } } + impl ColumnType { pub fn to_proto(&self) -> ProtoColumnType { let t = match self { @@ -332,6 +362,34 @@ impl ColumnType { special_fields: Default::default(), } } + + pub fn from_proto(proto: ProtoColumnType) -> Self { + match proto.t.unwrap() { + column_type::T::Simple(simple) => { + match simple.enum_value().expect("Invalid simple type") { + SimpleColumnType::STRING => ColumnType::String, + SimpleColumnType::BOOLEAN => ColumnType::Boolean, + SimpleColumnType::INT => ColumnType::Int, + SimpleColumnType::BIGINT => ColumnType::BigInt, + SimpleColumnType::FLOAT => ColumnType::Float, + SimpleColumnType::DECIMAL => ColumnType::Decimal, + SimpleColumnType::DATETIME => ColumnType::DateTime, + SimpleColumnType::JSON_COLUMN => ColumnType::Json, + SimpleColumnType::BYTES => ColumnType::Bytes, + } + } + column_type::T::Enum(data_enum) => ColumnType::Enum(DataEnum::from_proto(data_enum)), + column_type::T::Array(element_type) => ColumnType::Array { + element_type: Box::new(ColumnType::from_proto(*element_type)), + element_nullable: false, + }, + column_type::T::ArrayOfNullable(element_type) => ColumnType::Array { + element_type: Box::new(ColumnType::from_proto(*element_type)), + element_nullable: true, + }, + column_type::T::Nested(nested) => ColumnType::Nested(Nested::from_proto(nested)), + } + } } impl DataEnum { @@ -342,6 +400,17 @@ impl DataEnum { special_fields: Default::default(), } } + + pub fn from_proto(proto: crate::proto::infrastructure_map::DataEnum) -> Self { + DataEnum { + name: proto.name, + values: proto + .values + .into_iter() + .map(EnumMember::from_proto) + .collect(), + } + } } impl Nested { @@ -353,6 +422,14 @@ impl Nested { special_fields: Default::default(), } } + + pub fn from_proto(proto: crate::proto::infrastructure_map::Nested) -> Self { + Nested { + name: proto.name, + columns: proto.columns.into_iter().map(Column::from_proto).collect(), + jwt: proto.jwt, + } + } } impl EnumMember { @@ -363,6 +440,13 @@ impl EnumMember { special_fields: Default::default(), } } + + pub fn from_proto(proto: crate::proto::infrastructure_map::EnumMember) -> Self { + EnumMember { + name: proto.name, + value: EnumValue::from_proto(proto.value.unwrap()), + } + } } impl EnumValue { @@ -380,6 +464,17 @@ impl EnumValue { special_fields: Default::default(), } } + + pub fn from_proto(proto: crate::proto::infrastructure_map::EnumValue) -> Self { + match proto.value.unwrap() { + crate::proto::infrastructure_map::enum_value::Value::IntValue(i) => { + EnumValue::Int(i as u8) + } + crate::proto::infrastructure_map::enum_value::Value::StringValue(s) => { + EnumValue::String(s) + } + } + } } impl ColumnDefaults { @@ -391,6 +486,16 @@ impl ColumnDefaults { ColumnDefaults::Now => ProtoColumnDefaults::NOW, } } + + pub fn from_proto(proto: ProtoColumnDefaults) -> Self { + match proto { + ProtoColumnDefaults::AUTO_INCREMENT => ColumnDefaults::AutoIncrement, + ProtoColumnDefaults::CUID => ColumnDefaults::CUID, + ProtoColumnDefaults::UUID => ColumnDefaults::UUID, + ProtoColumnDefaults::NOW => ColumnDefaults::Now, + ProtoColumnDefaults::NONE => panic!("NONE should be handled as Option::None"), + } + } } #[cfg(test)] diff --git a/apps/framework-cli/src/framework/core/infrastructure/topic.rs b/apps/framework-cli/src/framework/core/infrastructure/topic.rs index 88ee4526d..c6a789136 100644 --- a/apps/framework-cli/src/framework/core/infrastructure/topic.rs +++ b/apps/framework-cli/src/framework/core/infrastructure/topic.rs @@ -131,4 +131,15 @@ impl Topic { pub fn default_partition_count() -> usize { 1 } + + pub fn from_proto(proto: ProtoTopic) -> Self { + Topic { + version: Version::from_string(proto.version), + name: proto.name, + retention_period: proto.retention_period.unwrap().into(), + partition_count: proto.partition_count.unwrap_or(1) as usize, + columns: proto.columns.into_iter().map(Column::from_proto).collect(), + source_primitive: PrimitiveSignature::from_proto(proto.source_primitive.unwrap()), + } + } } diff --git a/apps/framework-cli/src/framework/core/infrastructure/topic_sync_process.rs b/apps/framework-cli/src/framework/core/infrastructure/topic_sync_process.rs index 56dd58a18..5b7138210 100644 --- a/apps/framework-cli/src/framework/core/infrastructure/topic_sync_process.rs +++ b/apps/framework-cli/src/framework/core/infrastructure/topic_sync_process.rs @@ -81,6 +81,16 @@ impl TopicToTableSyncProcess { special_fields: Default::default(), } } + + pub fn from_proto(proto: ProtoTopicToTableSyncProcess) -> Self { + TopicToTableSyncProcess { + source_topic_id: proto.source_topic_id, + target_table_id: proto.target_table_id, + columns: proto.columns.into_iter().map(Column::from_proto).collect(), + version: Version::from_string(proto.version), + source_primitive: PrimitiveSignature::from_proto(proto.source_primitive.unwrap()), + } + } } impl TopicToTopicSyncProcess { @@ -123,4 +133,12 @@ impl TopicToTopicSyncProcess { special_fields: Default::default(), } } + + pub fn from_proto(proto: ProtoTopicToTopicSyncProcess) -> Self { + TopicToTopicSyncProcess { + source_topic_id: proto.source_topic_id, + target_topic_id: proto.target_topic_id, + source_primitive: PrimitiveSignature::from_proto(proto.source_primitive.unwrap()), + } + } } diff --git a/apps/framework-cli/src/framework/core/infrastructure/view.rs b/apps/framework-cli/src/framework/core/infrastructure/view.rs index fbde2de64..255f15cee 100644 --- a/apps/framework-cli/src/framework/core/infrastructure/view.rs +++ b/apps/framework-cli/src/framework/core/infrastructure/view.rs @@ -51,6 +51,14 @@ impl View { special_fields: Default::default(), } } + + pub fn from_proto(proto: ProtoView) -> Self { + View { + name: proto.name, + version: Version::from_string(proto.version), + view_type: ViewType::from_proto(proto.view_type.unwrap()), + } + } } impl ViewType { @@ -64,4 +72,12 @@ impl ViewType { } } } + + pub fn from_proto(proto: ProtoViewType) -> Self { + match proto { + ProtoViewType::TableAlias(alias) => ViewType::TableAlias { + source_table_name: alias.source_table_name, + }, + } + } } diff --git a/apps/framework-cli/src/framework/core/infrastructure_map.rs b/apps/framework-cli/src/framework/core/infrastructure_map.rs index 315cb146e..29b69b260 100644 --- a/apps/framework-cli/src/framework/core/infrastructure_map.rs +++ b/apps/framework-cli/src/framework/core/infrastructure_map.rs @@ -14,13 +14,24 @@ use crate::infrastructure::redis::redis_client::RedisClient; use crate::infrastructure::stream::redpanda::RedpandaConfig; use crate::project::Project; use crate::proto::infrastructure_map::InfrastructureMap as ProtoInfrastructureMap; -use anyhow::Result; +use anyhow::{Context, Result}; use protobuf::{EnumOrUnknown, Message as ProtoMessage}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fs; use std::path::Path; +#[derive(Debug, thiserror::Error)] +#[error("Failed to convert infrastructure map from proto")] +#[non_exhaustive] +pub enum InfraMapProtoError { + #[error("Failed to parse proto message")] + ProtoParseError(#[from] protobuf::Error), + + #[error("Missing required field: {field_name}")] + MissingField { field_name: String }, +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum PrimitiveTypes { DataModel, @@ -43,6 +54,13 @@ impl PrimitiveSignature { special_fields: Default::default(), } } + + pub fn from_proto(proto: crate::proto::infrastructure_map::PrimitiveSignature) -> Self { + PrimitiveSignature { + name: proto.name, + primitive_type: PrimitiveTypes::from_proto(proto.primitive_type.unwrap()), + } + } } impl PrimitiveTypes { @@ -58,22 +76,35 @@ impl PrimitiveTypes { } } } + + pub fn from_proto(proto: crate::proto::infrastructure_map::PrimitiveTypes) -> Self { + match proto { + crate::proto::infrastructure_map::PrimitiveTypes::DATA_MODEL => { + PrimitiveTypes::DataModel + } + crate::proto::infrastructure_map::PrimitiveTypes::FUNCTION => PrimitiveTypes::Function, + crate::proto::infrastructure_map::PrimitiveTypes::DB_BLOCK => PrimitiveTypes::DBBlock, + crate::proto::infrastructure_map::PrimitiveTypes::CONSUMPTION_API => { + PrimitiveTypes::ConsumptionAPI + } + } + } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub enum ColumnChange { Added(Column), Removed(Column), Updated { before: Column, after: Column }, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub struct OrderByChange { pub before: Vec, pub after: Vec, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] #[allow(clippy::large_enum_variant)] pub enum TableChange { Added(Table), @@ -87,15 +118,15 @@ pub enum TableChange { }, } -#[derive(Debug, Clone)] -pub enum Change { +#[derive(Debug, Clone, Serialize)] +pub enum Change { Added(Box), Removed(Box), Updated { before: Box, after: Box }, } #[allow(clippy::large_enum_variant)] -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub enum InfraChange { Olap(OlapChange), Streaming(StreamingChange), @@ -103,24 +134,24 @@ pub enum InfraChange { Process(ProcessChange), } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] #[allow(clippy::large_enum_variant)] pub enum OlapChange { Table(TableChange), View(Change), } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub enum StreamingChange { Topic(Change), } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub enum ApiChange { ApiEndpoint(Change), } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub enum ProcessChange { TopicToTableSyncProcess(Change), TopicToTopicSyncProcess(Change), @@ -415,7 +446,6 @@ impl InfrastructureMap { InitialDataLoad { table: function.source_data_model.to_table(), topic: source_topic.name.clone(), - // it doesn't mean it is completed, it means the desired state is Completed status: InitialDataLoadStatus::Completed, }, ); @@ -702,18 +732,17 @@ impl InfrastructureMap { None => changes .initial_data_loads .push(InitialDataLoadChange::Addition(load.clone())), - Some(existing) => { - match existing.status { - InitialDataLoadStatus::InProgress(resume_from) => changes + Some(existing) => match existing.status { + InitialDataLoadStatus::InProgress(resume_from) => { + changes .initial_data_loads .push(InitialDataLoadChange::Resumption { resume_from, load: load.clone(), - }), - // nothing to do - InitialDataLoadStatus::Completed => {} + }); } - } + InitialDataLoadStatus::Completed => {} + }, } } @@ -897,7 +926,6 @@ impl InfrastructureMap { .values() .map(|load| { InitialDataLoadChange::Addition(InitialDataLoad { - // if existing deployment is empty, there is no initial data to load status: InitialDataLoadStatus::Completed, ..load.clone() }) @@ -917,7 +945,6 @@ impl InfrastructureMap { } pub async fn store_in_redis(&self, redis_client: &RedisClient) -> Result<()> { - use anyhow::Context; let encoded: Vec = self.to_proto().write_to_bytes()?; redis_client .set_with_service_prefix("infrastructure_map", &encoded) @@ -927,6 +954,18 @@ impl InfrastructureMap { Ok(()) } + pub async fn get_from_redis(redis_client: &RedisClient) -> Result { + let encoded = redis_client + .get_with_service_prefix("infrastructure_map") + .await + .context("Failed to get InfrastructureMap from Redis")? + .ok_or_else(|| anyhow::anyhow!("InfrastructureMap not found in Redis"))?; + + let decoded = InfrastructureMap::from_proto(encoded) + .map_err(|e| anyhow::anyhow!("Failed to decode InfrastructureMap from proto: {}", e))?; + Ok(decoded) + } + pub fn to_proto(&self) -> ProtoInfrastructureMap { ProtoInfrastructureMap { topics: self @@ -981,6 +1020,60 @@ impl InfrastructureMap { pub fn to_proto_bytes(&self) -> Vec { self.to_proto().write_to_bytes().unwrap() } + + pub fn from_proto(bytes: Vec) -> Result { + let proto = ProtoInfrastructureMap::parse_from_bytes(&bytes)?; + + Ok(InfrastructureMap { + topics: proto + .topics + .into_iter() + .map(|(k, v)| (k, Topic::from_proto(v))) + .collect(), + api_endpoints: proto + .api_endpoints + .into_iter() + .map(|(k, v)| (k, ApiEndpoint::from_proto(v))) + .collect(), + tables: proto + .tables + .into_iter() + .map(|(k, v)| (k, Table::from_proto(v))) + .collect(), + views: proto + .views + .into_iter() + .map(|(k, v)| (k, View::from_proto(v))) + .collect(), + topic_to_table_sync_processes: proto + .topic_to_table_sync_processes + .into_iter() + .map(|(k, v)| (k, TopicToTableSyncProcess::from_proto(v))) + .collect(), + topic_to_topic_sync_processes: proto + .topic_to_topic_sync_processes + .into_iter() + .map(|(k, v)| (k, TopicToTopicSyncProcess::from_proto(v))) + .collect(), + function_processes: proto + .function_processes + .into_iter() + .map(|(k, v)| (k, FunctionProcess::from_proto(v))) + .collect(), + initial_data_loads: proto + .initial_data_loads + .into_iter() + .map(|(k, v)| (k, InitialDataLoad::from_proto(v))) + .collect(), + orchestration_workers: proto + .orchestration_workers + .into_iter() + .map(|(k, v)| (k, OrchestrationWorker::from_proto(v))) + .collect(), + consumption_api_web_server: ConsumptionApiWebServer {}, + block_db_processes: OlapProcess {}, + }) + } } pub fn compute_table_diff(before: &Table, after: &Table) -> Vec { diff --git a/apps/framework-cli/src/framework/core.rs b/apps/framework-cli/src/framework/core/mod.rs similarity index 98% rename from apps/framework-cli/src/framework/core.rs rename to apps/framework-cli/src/framework/core/mod.rs index 90651e42e..85f681947 100644 --- a/apps/framework-cli/src/framework/core.rs +++ b/apps/framework-cli/src/framework/core/mod.rs @@ -23,6 +23,7 @@ pub mod check; pub mod code_loader; pub mod execute; +pub mod infra_reality_checker; pub mod infrastructure; pub mod infrastructure_map; pub mod plan; diff --git a/apps/framework-cli/src/framework/core/plan.rs b/apps/framework-cli/src/framework/core/plan.rs index d7eafc024..427eadd32 100644 --- a/apps/framework-cli/src/framework/core/plan.rs +++ b/apps/framework-cli/src/framework/core/plan.rs @@ -75,7 +75,7 @@ pub async fn plan_changes( for (id, load) in target_infra_map.initial_data_loads.iter() { match existing_data_loads.get(id) { - Some(load) if load.status == InitialDataLoadStatus::Completed => {} + Some(load) if matches!(load.status, InitialDataLoadStatus::Completed) => {} // there might be existing loads that is not written to the DB _ => { match check_topic_fully_populated( @@ -89,7 +89,6 @@ pub async fn plan_changes( { None => { // None means completed - // the load variable is the target state, which is to completed existing_data_loads.insert(id.clone(), load.clone()); } Some(progress) => { diff --git a/apps/framework-cli/src/framework/languages.rs b/apps/framework-cli/src/framework/languages.rs index ad2ba1ad0..402175f29 100644 --- a/apps/framework-cli/src/framework/languages.rs +++ b/apps/framework-cli/src/framework/languages.rs @@ -17,6 +17,14 @@ impl SupportedLanguages { SupportedLanguages::Python => constants::PYTHON_FILE_EXTENSION, } } + + pub fn from_proto(language: String) -> Self { + match language.as_str() { + "ts" => SupportedLanguages::Typescript, + "python" => SupportedLanguages::Python, + _ => panic!("Unsupported language: {}", language), + } + } } impl std::fmt::Display for SupportedLanguages { diff --git a/apps/framework-cli/src/infrastructure/olap.rs b/apps/framework-cli/src/infrastructure/olap.rs index 6a3c4b1bf..148100495 100644 --- a/apps/framework-cli/src/infrastructure/olap.rs +++ b/apps/framework-cli/src/infrastructure/olap.rs @@ -1,6 +1,9 @@ use clickhouse::ClickhouseChangesError; -use crate::{framework::core::infrastructure_map::OlapChange, project::Project}; +use crate::{ + framework::core::infrastructure::table::Table, framework::core::infrastructure_map::OlapChange, + project::Project, +}; pub mod clickhouse; pub mod clickhouse_alt_client; @@ -9,6 +12,36 @@ pub mod clickhouse_alt_client; pub enum OlapChangesError { #[error("Failed to execute the changes on Clickhouse")] ClickhouseChanges(#[from] ClickhouseChangesError), + #[error("Database error: {0}")] + DatabaseError(String), +} + +/// Trait defining operations that can be performed on an OLAP database +#[async_trait::async_trait] +pub trait OlapOperations { + /// Retrieves all tables from the database + /// + /// # Arguments + /// + /// * `db_name` - The name of the database to list tables from + /// * `project` - The project configuration containing the current version + /// + /// # Returns + /// + /// * `Result, OlapChangesError>` - A list of Table objects on success, or an error if the operation fails + /// + /// # Errors + /// + /// Returns `OlapChangesError` if: + /// - The database connection fails + /// - The database doesn't exist + /// - The query execution fails + /// - Table metadata cannot be retrieved + async fn list_tables( + &self, + db_name: &str, + project: &Project, + ) -> Result, OlapChangesError>; } /// This method dispatches the execution of the changes to the right olap storage. @@ -18,6 +51,5 @@ pub async fn execute_changes( changes: &[OlapChange], ) -> Result<(), OlapChangesError> { clickhouse::execute_changes(project, changes).await?; - Ok(()) } diff --git a/apps/framework-cli/src/infrastructure/olap/clickhouse.rs b/apps/framework-cli/src/infrastructure/olap/clickhouse.rs index 0c2b1be6f..084eb6b55 100644 --- a/apps/framework-cli/src/infrastructure/olap/clickhouse.rs +++ b/apps/framework-cli/src/infrastructure/olap/clickhouse.rs @@ -1,3 +1,35 @@ +//! # ClickHouse OLAP Implementation +//! +//! This module provides the ClickHouse-specific implementation of OLAP operations. +//! It handles table management, schema changes, and data type conversions between +//! ClickHouse and the framework's type system. +//! +//! ## Features +//! - Table management (create, read, update, delete) +//! - Schema change management +//! - Type system conversion +//! - Version tracking +//! - Table naming conventions +//! +//! ## Dependencies +//! - clickhouse: Client library for ClickHouse database +//! - clickhouse-rs: Alternative ClickHouse client +//! - Framework core types and infrastructure +//! +//! ## Version Support +//! Tables follow the naming convention: {name}_{version} +//! where version is in the format x_y_z (e.g., table_1_0_0) +//! +//! ## Authors +//! - Initial implementation: Framework Team +//! - Last modified: 2024 +//! +//! ## Usage Example +//! ```rust +//! let client = create_client(config); +//! let tables = client.list_tables(&config.db_name).await?; +//! ``` + use clickhouse::Client; use clickhouse_rs::ClientHandle; use crypto_hash::{hex_digest, Algorithm}; @@ -12,11 +44,17 @@ use queries::{ use serde::{Deserialize, Serialize}; use std::time::Duration; -use self::config::ClickHouseConfig; use self::model::ClickHouseSystemTable; +use crate::framework::core::infrastructure::table::{ + Column, ColumnType, DataEnum, EnumMember, EnumValue, Table, +}; use crate::framework::core::infrastructure::view::ViewType; -use crate::framework::core::infrastructure_map::{Change, ColumnChange, OlapChange, TableChange}; +use crate::framework::core::infrastructure_map::{ + Change, ColumnChange, OlapChange, PrimitiveSignature, PrimitiveTypes, TableChange, +}; +use crate::framework::versions::Version; use crate::infrastructure::olap::clickhouse::model::{ClickHouseSystemTableRow, ClickHouseTable}; +use crate::infrastructure::olap::{OlapChangesError, OlapOperations}; use crate::project::Project; use crate::utilities::retry::retry; @@ -29,16 +67,23 @@ pub mod model; pub mod queries; pub mod version_sync; +pub use config::ClickHouseConfig; + +/// Type alias for query strings to improve readability pub type QueryString = String; +/// Represents errors that can occur during ClickHouse operations #[derive(Debug, thiserror::Error)] pub enum ClickhouseChangesError { + /// Error when interacting with ClickHouse database #[error("Error interacting with Clickhouse")] Clickhouse(#[from] ClickhouseError), + /// Error from the ClickHouse client library #[error("Error interacting with Clickhouse")] ClickhouseClient(#[from] clickhouse::error::Error), + /// Error for unsupported operations #[error("Not Supported {0}")] NotSupported(String), } @@ -64,6 +109,12 @@ pub enum ClickhouseChangesError { /// - Table engine changes (via deduplication settings) /// /// Will retry certain operations that return specific ClickHouse error codes indicating retry is possible. +/// +/// # Example +/// ```rust +/// let changes = vec![OlapChange::Table(TableChange::Added(table))]; +/// execute_changes(&project, &changes).await?; +/// ``` pub async fn execute_changes( project: &Project, changes: &[OlapChange], @@ -182,6 +233,32 @@ pub struct ConfiguredDBClient { pub config: ClickHouseConfig, } +/// Creates a configured ClickHouse client with the provided configuration +/// +/// # Arguments +/// * `clickhouse_config` - Configuration for the ClickHouse connection +/// +/// # Returns +/// * `ConfiguredDBClient` - A configured client ready for database operations +/// +/// # Details +/// Creates a client with: +/// - Proper URL construction (http/https) +/// - Authentication settings +/// - Database selection +/// - Connection options +/// +/// # Example +/// ```rust +/// let client = create_client(ClickHouseConfig { +/// host: "localhost".to_string(), +/// host_port: 8123, +/// user: "default".to_string(), +/// password: "".to_string(), +/// db_name: "mydb".to_string(), +/// use_ssl: false, +/// }); +/// ``` pub fn create_client(clickhouse_config: ClickHouseConfig) -> ConfiguredDBClient { let protocol = if clickhouse_config.use_ssl { "https" @@ -202,7 +279,25 @@ pub fn create_client(clickhouse_config: ClickHouseConfig) -> ConfiguredDBClient } } -// Run an arbitrary clickhouse query +/// Executes a query against the ClickHouse database +/// +/// # Arguments +/// * `query` - The SQL query to execute +/// * `configured_client` - The configured client to use for execution +/// +/// # Returns +/// * `Result<(), clickhouse::error::Error>` - Success or error from query execution +/// +/// # Details +/// - Logs query for debugging purposes +/// - Executes query using configured client +/// - Handles query execution errors +/// +/// # Example +/// ```rust +/// let query = "SELECT 1".to_string(); +/// run_query(&query, &client).await?; +/// ``` pub async fn run_query( query: &String, configured_client: &ConfiguredDBClient, @@ -212,6 +307,26 @@ pub async fn run_query( client.query(query.as_str()).execute().await } +/// Checks if the ClickHouse database is ready for operations +/// +/// # Arguments +/// * `configured_client` - The configured client to check +/// +/// # Returns +/// * `Result<(), clickhouse::error::Error>` - Success if database is ready +/// +/// # Details +/// - Executes a simple version query +/// - Implements retry logic for common connection issues +/// - Handles temporary network failures +/// - Maximum 20 retries with 200ms delay +/// +/// # Retries +/// Retries on the following conditions: +/// - Connection closed before message completed +/// - Connection reset by peer +/// - Connection not ready +/// - Channel closed pub async fn check_ready( configured_client: &ConfiguredDBClient, ) -> Result<(), clickhouse::error::Error> { @@ -241,6 +356,21 @@ pub async fn check_ready( .await } +/// Fetches all tables from the ClickHouse database +/// +/// # Arguments +/// * `configured_client` - The configured client to use +/// +/// # Returns +/// * `Result, clickhouse::error::Error>` - List of system tables +/// +/// # Details +/// - Excludes system and information schema tables +/// - Returns table metadata including UUID, name, engine +/// - Orders results by table name +/// +/// # Notes +/// The order of columns in the query must match the order of fields in ClickHouseSystemTableRow pub async fn fetch_all_tables( configured_client: &ConfiguredDBClient, ) -> Result, clickhouse::error::Error> { @@ -265,6 +395,19 @@ pub async fn fetch_all_tables( Ok(tables) } +/// Fetches tables matching a specific version pattern +/// +/// # Arguments +/// * `configured_client` - The configured client to use +/// * `version` - The version pattern to match against table names +/// +/// # Returns +/// * `Result, clickhouse::error::Error>` - List of matching tables +/// +/// # Details +/// - Filters tables by database name and version pattern +/// - Returns full table metadata +/// - Uses parameterized query for safety pub async fn fetch_tables_with_version( configured_client: &ConfiguredDBClient, version: &str, @@ -287,6 +430,24 @@ pub async fn fetch_tables_with_version( Ok(tables) } +/// Deletes a table or view from the ClickHouse database +/// +/// # Arguments +/// * `table_or_view_name` - Name of the table or view to delete +/// * `configured_client` - The configured client to use +/// +/// # Returns +/// * `Result<(), clickhouse::error::Error>` - Success if deletion completed +/// +/// # Details +/// - Properly escapes database and table/view names +/// - Logs deletion operation for debugging +/// - Works for both tables and views +/// +/// # Example +/// ```rust +/// delete_table_or_view("my_table_1_0_0", &client).await?; +/// ``` pub async fn delete_table_or_view( table_or_view_name: &str, configured_client: &ConfiguredDBClient, @@ -305,6 +466,28 @@ pub async fn delete_table_or_view( .await } +/// Retrieves the engine type for a specific table +/// +/// # Arguments +/// * `db_name` - Database name +/// * `name` - Table name +/// * `configured_client` - The configured client to use +/// +/// # Returns +/// * `anyhow::Result>` - The engine type if table exists +/// +/// # Details +/// - Uses parameterized query for safety +/// - Returns None if table doesn't exist +/// - Useful for determining table properties and capabilities +/// +/// # Example +/// ```rust +/// let engine = get_engine("mydb", "mytable", &client).await?; +/// if let Some(engine_type) = engine { +/// println!("Table engine: {}", engine_type); +/// } +/// ``` pub async fn get_engine( db_name: &str, name: &str, @@ -320,6 +503,27 @@ pub async fn get_engine( Ok(cursor.next().await?) } +/// Checks if a table is newly created and empty +/// +/// # Arguments +/// * `table` - The table to check +/// * `configured_client` - The configured client to use +/// +/// # Returns +/// * `Result` - True if table is new and empty +/// +/// # Details +/// - Checks if table exists +/// - Verifies table is not a view +/// - Confirms table has zero rows +/// - Used for initialization logic +/// +/// # Example +/// ```rust +/// if check_is_table_new(&table, &client).await? { +/// // Handle new table initialization +/// } +/// ``` pub async fn check_is_table_new( table: &ClickHouseTable, configured_client: &ConfiguredDBClient, @@ -341,6 +545,26 @@ pub async fn check_is_table_new( } } +/// Gets the number of rows in a table +/// +/// # Arguments +/// * `table_name` - Name of the table to check +/// * `config` - ClickHouse configuration +/// * `clickhouse` - Client handle for database operations +/// +/// # Returns +/// * `Result` - Number of rows in the table +/// +/// # Details +/// - Uses COUNT(*) for accurate row count +/// - Properly escapes table and database names +/// - Handles empty tables correctly +/// +/// # Example +/// ```rust +/// let size = check_table_size("users_1_0_0", &config, &mut client).await?; +/// println!("Table has {} rows", size); +/// ``` pub async fn check_table_size( table_name: &str, config: &ClickHouseConfig, @@ -364,6 +588,27 @@ pub async fn check_table_size( Ok(result as i64) } +/// Retrieves a list of all table names in the database +/// +/// # Arguments +/// * `configured_client` - The configured client to use +/// +/// # Returns +/// * `Result, clickhouse::error::Error>` - Sorted list of table names +/// +/// # Details +/// - Filters by specified database +/// - Returns sorted list for consistency +/// - Includes both tables and views +/// - Logs operation for debugging +/// +/// # Example +/// ```rust +/// let tables = fetch_table_names(&client).await?; +/// for table in tables { +/// println!("Found table: {}", table); +/// } +/// ``` pub async fn fetch_table_names( configured_client: &ConfiguredDBClient, ) -> Result, clickhouse::error::Error> { @@ -387,6 +632,28 @@ pub async fn fetch_table_names( Ok(tables) } +/// Retrieves the schema (columns and types) for a specific table +/// +/// # Arguments +/// * `configured_client` - The configured client to use +/// * `table_name` - Name of the table to describe +/// +/// # Returns +/// * `Result, clickhouse::error::Error>` - List of column name and type pairs +/// +/// # Details +/// - Returns sorted column list for consistency +/// - Includes column names and their ClickHouse types +/// - Uses system.columns for accurate metadata +/// - Logs operation for debugging +/// +/// # Example +/// ```rust +/// let schema = fetch_table_schema(&client, "users_1_0_0").await?; +/// for (column, type_) in schema { +/// println!("Column {} has type {}", column, type_); +/// } +/// ``` pub async fn fetch_table_schema( configured_client: &ConfiguredDBClient, table_name: &str, @@ -412,6 +679,26 @@ pub async fn fetch_table_schema( Ok(columns) } +/// Generates a hash of the table schema for comparison +/// +/// # Arguments +/// * `columns` - List of column name and type pairs +/// +/// # Returns +/// * `Result` - SHA256 hash of the schema +/// +/// # Details +/// - Concatenates column names and types +/// - Uses SHA256 for consistent hashing +/// - Useful for detecting schema changes +/// - Order-dependent for consistency +/// +/// # Example +/// ```rust +/// let schema = fetch_table_schema(&client, "users_1_0_0").await?; +/// let hash = table_schema_to_hash(schema)?; +/// println!("Schema hash: {}", hash); +/// ``` pub fn table_schema_to_hash( columns: Vec<(String, String)>, ) -> Result { @@ -426,12 +713,46 @@ pub fn table_schema_to_hash( Ok(hashed) } +/// Represents details about a table in ClickHouse +/// +/// # Fields +/// * `engine` - The table's engine type +/// * `total_rows` - Optional count of rows in the table +/// +/// # Usage +/// Used internally for table metadata operations and checks #[derive(Debug, Clone, Deserialize, Serialize, clickhouse::Row)] struct TableDetail { pub engine: String, pub total_rows: Option, } +/// Generates SQL statements for column alterations +/// +/// # Arguments +/// * `diff` - List of column changes to apply +/// * `db_name` - Target database name +/// * `table_name` - Target table name +/// +/// # Returns +/// * `Result, ClickhouseError>` - List of ALTER TABLE statements +/// +/// # Details +/// Handles three types of column changes: +/// - Added: Creates new columns +/// - Removed: Drops existing columns +/// - Updated: Modifies column type/properties +/// +/// # Example +/// ```rust +/// let changes = vec![ColumnChange::Added(Column { +/// name: "new_column".to_string(), +/// data_type: ColumnType::Int, +/// required: true, +/// ..Default::default() +/// })]; +/// let statements = generate_column_alter_statements(&changes, "mydb", "mytable")?; +/// ``` fn generate_column_alter_statements( diff: &[ColumnChange], db_name: &str, @@ -473,6 +794,27 @@ fn generate_column_alter_statements( Ok(statements) } +/// Generates an ORDER BY clause alteration statement +/// +/// # Arguments +/// * `order_by` - List of columns for ordering +/// * `db_name` - Target database name +/// * `table_name` - Target table name +/// +/// # Returns +/// * `String` - The complete ALTER TABLE statement +/// +/// # Details +/// - Generates proper SQL syntax for ORDER BY modifications +/// - Handles empty order by clauses +/// - Properly escapes identifiers +/// +/// # Example +/// ```rust +/// let order_by = vec!["id".to_string(), "timestamp".to_string()]; +/// let statement = generate_order_by_alter_statement(&order_by, "mydb", "mytable"); +/// // Result: ALTER TABLE `mydb`.`mytable` MODIFY ORDER BY (`id`, `timestamp`) +/// ``` fn generate_order_by_alter_statement( order_by: &[String], db_name: &str, @@ -486,10 +828,479 @@ fn generate_order_by_alter_statement( ) } +/// Extracts version information from a table name +/// +/// # Arguments +/// * `table_name` - The name of the table to parse +/// * `default_version` - The version to use for tables that don't follow the versioning convention +/// +/// # Returns +/// * `(String, Version)` - A tuple containing the base name and version +/// +/// # Format +/// For tables following the naming convention: {name}_{version} +/// where version is in the format x_y_z (e.g., 1_0_0) +/// For tables not following the convention: returns the full name and default_version +/// +/// # Example +/// ```rust +/// let (base_name, version) = extract_version_from_table_name("users_1_0_0", "0.0.0"); +/// assert_eq!(base_name, "users"); +/// assert_eq!(version.to_string(), "1.0.0"); +/// +/// let (base_name, version) = extract_version_from_table_name("my_table", "1.0.0"); +/// assert_eq!(base_name, "my_table"); +/// assert_eq!(version.to_string(), "1.0.0"); +/// ``` +fn extract_version_from_table_name(table_name: &str, default_version: &str) -> (String, Version) { + debug!("Extracting version from table name: {}", table_name); + debug!("Using default version: {}", default_version); + + // Special case for empty table name + if table_name.is_empty() { + debug!("Empty table name, using default version"); + return ( + table_name.to_string(), + Version::from_string(default_version.to_string()), + ); + } + + // Special case for tables ending in _MV (materialized views) + if table_name.ends_with("_MV") { + debug!("Materialized view detected, using default version"); + return ( + table_name.to_string(), + Version::from_string(default_version.to_string()), + ); + } + + let parts: Vec<&str> = table_name.split('_').collect(); + debug!("Split table name into parts: {:?}", parts); + + if parts.len() < 2 { + debug!("Table name has fewer than 2 parts, using default version"); + // If table doesn't follow naming convention, return full name and default version + return ( + table_name.to_string(), + Version::from_string(default_version.to_string()), + ); + } + + // Find the first numeric part - this marks the start of the version + let mut version_start_idx = None; + for (i, part) in parts.iter().enumerate() { + if part.chars().all(|c| c.is_ascii_digit()) { + version_start_idx = Some(i); + debug!("Found version start at index {}: {}", i, part); + break; + } + } + + match version_start_idx { + Some(idx) => { + // Filter out empty parts when joining base name + let base_parts: Vec<&str> = parts[..idx] + .iter() + .filter(|p| !p.is_empty()) + .copied() + .collect(); + let base_name = base_parts.join("_"); + debug!( + "Base parts: {:?}, joined base name: {}", + base_parts, base_name + ); + + // Filter out empty parts when joining version + let version_parts: Vec<&str> = parts[idx..] + .iter() + .filter(|p| !p.is_empty() && p.chars().all(|c| c.is_ascii_digit())) + .copied() + .collect(); + debug!("Version parts: {:?}", version_parts); + + // If we have no valid version parts, return the original name and default version + if version_parts.is_empty() { + debug!("No valid version parts found, using default version"); + return ( + table_name.to_string(), + Version::from_string(default_version.to_string()), + ); + } + + let version_str = version_parts.join("."); + debug!("Created version string: {}", version_str); + + (base_name, Version::from_string(version_str)) + } + None => { + debug!("No version parts found, using default version"); + ( + table_name.to_string(), + Version::from_string(default_version.to_string()), + ) + } + } +} + +#[async_trait::async_trait] +impl OlapOperations for ConfiguredDBClient { + /// Retrieves all tables from the ClickHouse database and converts them to framework Table objects + /// + /// # Arguments + /// * `db_name` - The name of the database to list tables from + /// + /// # Returns + /// * `Result, OlapChangesError>` - A list of Table objects on success + /// + /// # Details + /// This implementation: + /// 1. Queries system.tables for basic table information + /// 2. Extracts version information from table names + /// 3. Queries system.columns for column metadata + /// 4. Converts ClickHouse types to framework types + /// 5. Creates Table objects with proper versioning and source primitives + /// + /// # Notes + /// - Tables without proper version information in their names are skipped + /// - Column types are converted based on ClickHouse to framework type mapping + /// - Primary key columns are used for order_by clauses + /// - Tables are sorted by name in the final result + async fn list_tables( + &self, + db_name: &str, + project: &Project, + ) -> Result, OlapChangesError> { + debug!("Starting list_tables operation for database: {}", db_name); + debug!("Using project version: {}", project.cur_version()); + + // First get basic table information + let query = format!( + r#" + SELECT + name, + engine, + create_table_query + FROM system.tables + WHERE database = '{}' + AND engine != 'View' + AND NOT name LIKE '.%' + ORDER BY name + "#, + db_name + ); + debug!("Executing table query: {}", query); + + let mut cursor = self + .client + .query(&query) + .fetch::<(String, String, String)>() + .map_err(|e| { + debug!("Error fetching tables: {}", e); + OlapChangesError::DatabaseError(e.to_string()) + })?; + + let mut tables = Vec::new(); + + while let Some((table_name, engine, create_query)) = cursor + .next() + .await + .map_err(|e| OlapChangesError::DatabaseError(e.to_string()))? + { + debug!("Processing table: {}", table_name); + debug!("Table engine: {}", engine); + debug!("Create query: {}", create_query); + + // Extract ORDER BY columns from create_query + let order_by_cols = extract_order_by_from_create_query(&create_query); + debug!("Extracted ORDER BY columns: {:?}", order_by_cols); + + // Get column information for each table + let columns_query = format!( + r#" + SELECT + name, + type, + is_in_primary_key, + is_in_sorting_key + FROM system.columns + WHERE database = '{}' + AND table = '{}' + ORDER BY name + "#, + db_name, table_name + ); + debug!( + "Executing columns query for table {}: {}", + table_name, columns_query + ); + + let mut columns_cursor = self + .client + .query(&columns_query) + .fetch::<(String, String, u8, u8)>() + .map_err(|e| { + debug!("Error fetching columns for table {}: {}", table_name, e); + OlapChangesError::DatabaseError(e.to_string()) + })?; + + let mut columns = Vec::new(); + + while let Some((col_name, col_type, is_primary, is_sorting)) = columns_cursor + .next() + .await + .map_err(|e| OlapChangesError::DatabaseError(e.to_string()))? + { + debug!( + "Processing column: {} (type: {}, primary: {}, sorting: {})", + col_name, col_type, is_primary, is_sorting + ); + + // Convert ClickHouse types to framework types + let (data_type, is_nullable) = convert_clickhouse_type_to_column_type(&col_type) + .map_err(OlapChangesError::DatabaseError)?; + debug!( + "Converted column type: {:?}, nullable: {}", + data_type, is_nullable + ); + + let column = Column { + name: col_name.clone(), + data_type, + required: !is_nullable, + unique: false, + primary_key: is_primary == 1, + default: None, + }; + + columns.push(column); + } + + // Sort columns by name for consistent ordering + columns.sort_by(|a, b| a.name.cmp(&b.name)); + + debug!("Found {} columns for table {}", columns.len(), table_name); + + // Extract base name and version for source primitive + let (base_name, version) = + extract_version_from_table_name(&table_name, project.cur_version().as_str()); + + // Create source primitive signature using the base name + let source_primitive = PrimitiveSignature { + name: base_name, + primitive_type: PrimitiveTypes::DataModel, + }; + + // Create the Table object using the original table_name + let table = Table { + name: table_name, // Keep the original table name with version + columns, + order_by: order_by_cols, // Use the extracted ORDER BY columns + deduplicate: engine.contains("ReplacingMergeTree"), + version, // Still store the version for reference + source_primitive, + }; + debug!("Created table object: {:?}", table); + + tables.push(table); + } + + debug!( + "Completed list_tables operation, found {} tables", + tables.len() + ); + Ok(tables) + } +} + +/// Converts a ClickHouse column type string to the framework's ColumnType +/// +/// # Arguments +/// * `ch_type` - The ClickHouse type string to convert +/// +/// # Returns +/// * `Result<(ColumnType, bool), String>` - A tuple containing: +/// - The converted framework type +/// - A boolean indicating if the type is nullable (true = nullable) +/// +/// # Type Mappings +/// - String -> ColumnType::String +/// - UInt8/16/32, Int8/16/32 -> ColumnType::Int +/// - UInt64, Int64 -> ColumnType::BigInt +/// - Float32/64 -> ColumnType::Float +/// - Decimal* -> ColumnType::Decimal +/// - DateTime -> ColumnType::DateTime +/// - DateTime64(precision) -> ColumnType::DateTime +/// - DateTime('timezone') -> ColumnType::DateTime +/// - DateTime64(precision, 'timezone') -> ColumnType::DateTime +/// - Bool/Boolean -> ColumnType::Boolean +/// - Array(*) -> ColumnType::Array +/// - JSON -> ColumnType::Json +/// - Enum8/16 -> ColumnType::Enum +/// - Nullable(*) -> (inner_type, true) +/// +/// # Example +/// ```rust +/// let (framework_type, is_nullable) = convert_clickhouse_type_to_column_type("Nullable(Int32)")?; +/// assert_eq!(framework_type, ColumnType::Int); +/// assert!(is_nullable); +/// ``` +fn convert_clickhouse_type_to_column_type(ch_type: &str) -> Result<(ColumnType, bool), String> { + use regex::Regex; + + // Handle Nullable type wrapper + if ch_type.starts_with("Nullable(") { + let inner_type = ch_type + .strip_prefix("Nullable(") + .and_then(|s| s.strip_suffix(")")) + .ok_or_else(|| format!("Invalid Nullable type format: {}", ch_type))?; + + let (inner_column_type, _) = convert_clickhouse_type_to_column_type(inner_type)?; + return Ok((inner_column_type, true)); + } + + // Handle DateTime types with parameters + if ch_type.starts_with("DateTime") { + // All DateTime variants map to ColumnType::DateTime + // We could store precision and timezone as metadata if needed in the future + return Ok((ColumnType::DateTime, false)); + } + + // Handle Enum types first since they contain parentheses which would interfere with the base type extraction + if ch_type.starts_with("Enum8(") || ch_type.starts_with("Enum16(") { + let enum_content = ch_type + .trim_start_matches("Enum8(") + .trim_start_matches("Enum16(") + .trim_end_matches(')'); + + // Return error if enum content is empty + if enum_content.trim().is_empty() { + return Err(format!("Empty enum definition: {}", ch_type)); + } + + // Use regex to match enum values, handling potential commas in the names + let re = Regex::new(r"'([^']*)'\s*=\s*(\d+)").map_err(|e| e.to_string())?; + let values = re + .captures_iter(enum_content) + .map(|cap| { + let name = cap[1].to_string(); + let value = cap[2].parse::().map_err(|e| e.to_string())?; + + Ok(EnumMember { + name: name.clone(), + value: EnumValue::Int(value), + }) + }) + .collect::, String>>()?; + + // Return error if no valid enum values were found + if values.is_empty() { + return Err(format!("No valid enum values found in: {}", ch_type)); + } + + return Ok(( + ColumnType::Enum(DataEnum { + name: "Unknown".to_string(), // The actual enum name will be set elsewhere + values, + }), + false, + )); + } + + // Remove any parameters from type string for other types + let base_type = ch_type.split('(').next().unwrap_or(ch_type); + + let column_type = match base_type { + "String" => Ok(ColumnType::String), + "UInt8" | "UInt16" | "UInt32" | "Int8" | "Int16" | "Int32" => Ok(ColumnType::Int), + "UInt64" | "Int64" => Ok(ColumnType::BigInt), + "Float32" | "Float64" => Ok(ColumnType::Float), + "Decimal" | "Decimal32" | "Decimal64" | "Decimal128" => Ok(ColumnType::Decimal), + "Bool" | "Boolean" => Ok(ColumnType::Boolean), + "Array" => { + // Extract the inner type from Array(...) format + let inner_type = ch_type + .strip_prefix("Array(") + .and_then(|s| s.strip_suffix(")")) + .ok_or_else(|| format!("Invalid Array type format: {}", ch_type))?; + + let (inner_column_type, inner_nullable) = + convert_clickhouse_type_to_column_type(inner_type)?; + Ok(ColumnType::Array { + element_type: Box::new(inner_column_type), + element_nullable: inner_nullable, + }) + } + "JSON" => Ok(ColumnType::Json), + _ => Err(format!("Unsupported ClickHouse type: {}", ch_type)), + }?; + + Ok((column_type, false)) +} + +/// Extracts ORDER BY columns from a CREATE TABLE query +/// +/// # Arguments +/// * `create_query` - The CREATE TABLE query string +/// +/// # Returns +/// * `Vec` - List of column names in the ORDER BY clause, or empty vector if none found +/// +/// # Example +/// ```rust +/// let query = "CREATE TABLE test (id Int64) ENGINE = MergeTree() ORDER BY (id, timestamp)"; +/// let order_by = extract_order_by_from_create_query(query); +/// assert_eq!(order_by, vec!["id".to_string(), "timestamp".to_string()]); +/// ``` +fn extract_order_by_from_create_query(create_query: &str) -> Vec { + debug!("Extracting ORDER BY from query: {}", create_query); + + // Find the ORDER BY clause, being careful not to match PRIMARY KEY + let mut after_order_by = None; + for (idx, _) in create_query.to_uppercase().match_indices("ORDER BY") { + // Check if this is not part of "PRIMARY KEY" by looking at the preceding text + let preceding_text = &create_query[..idx].trim_end().to_uppercase(); + if !preceding_text.ends_with("PRIMARY KEY") { + after_order_by = Some(&create_query[idx..]); + break; + } + } + + if let Some(after_order_by) = after_order_by { + // Find where the ORDER BY clause ends (at SETTINGS or end of string) + let mut end_idx = after_order_by.len(); + if let Some(settings_idx) = after_order_by.to_uppercase().find("SETTINGS") { + end_idx = settings_idx; + } + if let Some(next_order_by) = after_order_by[8..].to_uppercase().find("ORDER BY") { + end_idx = std::cmp::min(end_idx, next_order_by + 8); + } + let order_by_clause = &after_order_by[..end_idx]; + + // Extract the column names + let order_by_content = order_by_clause + .trim_start_matches("ORDER BY") + .trim() + .trim_matches('(') + .trim_matches(')'); + + debug!("Found ORDER BY content: {}", order_by_content); + + // Split by comma and clean up each column name + return order_by_content + .split(',') + .map(|s| s.trim().trim_matches('`').to_string()) + .filter(|s| !s.is_empty()) + .collect(); + } + + debug!("No explicit ORDER BY clause found"); + Vec::new() +} + #[cfg(test)] mod tests { use super::*; - use crate::framework::core::infrastructure::table::{Column, ColumnType}; + use crate::framework::core::infrastructure::table::{Column, ColumnType, EnumValue}; #[test] fn test_generate_column_alter_statements_with_array_float() { @@ -527,7 +1338,7 @@ mod tests { }), ColumnChange::Removed(Column { name: "old_column".to_string(), - data_type: ColumnType::String, // Assuming String type, adjust if needed + data_type: ColumnType::String, required: false, unique: false, primary_key: false, @@ -536,7 +1347,7 @@ mod tests { ColumnChange::Updated { before: Column { name: "id".to_string(), - data_type: ColumnType::Int, // Assuming it was Int before, adjust if needed + data_type: ColumnType::Int, required: true, unique: true, primary_key: true, @@ -611,4 +1422,427 @@ mod tests { "ALTER TABLE `test_db`.`test_table` MODIFY ORDER BY ()" ); } + + #[test] + fn test_datetime_type_conversion() { + // Test basic DateTime + assert_eq!( + convert_clickhouse_type_to_column_type("DateTime"), + Ok((ColumnType::DateTime, false)) + ); + + // Test DateTime64 with precision + assert_eq!( + convert_clickhouse_type_to_column_type("DateTime64(3)"), + Ok((ColumnType::DateTime, false)) + ); + + // Test DateTime with timezone + assert_eq!( + convert_clickhouse_type_to_column_type("DateTime('UTC')"), + Ok((ColumnType::DateTime, false)) + ); + + // Test DateTime64 with precision and timezone + assert_eq!( + convert_clickhouse_type_to_column_type("DateTime64(6, 'America/New_York')"), + Ok((ColumnType::DateTime, false)) + ); + } + + #[test] + fn test_enum_type_conversion() { + // Test Enum8 + let enum8_type = "Enum8('RED' = 1, 'GREEN' = 2, 'BLUE' = 3)"; + let result = convert_clickhouse_type_to_column_type(enum8_type).unwrap(); + + match result { + (ColumnType::Enum(data_enum), false) => { + assert_eq!(data_enum.values.len(), 3); + assert_eq!(data_enum.values[0].name, "RED"); + assert_eq!(data_enum.values[0].value, EnumValue::Int(1)); + assert_eq!(data_enum.values[1].name, "GREEN"); + assert_eq!(data_enum.values[1].value, EnumValue::Int(2)); + assert_eq!(data_enum.values[2].name, "BLUE"); + assert_eq!(data_enum.values[2].value, EnumValue::Int(3)); + } + _ => panic!("Expected Enum type"), + } + + // Test Enum16 + let enum16_type = "Enum16('PENDING' = 0, 'ACTIVE' = 1, 'INACTIVE' = 2)"; + let result = convert_clickhouse_type_to_column_type(enum16_type).unwrap(); + + match result { + (ColumnType::Enum(data_enum), false) => { + assert_eq!(data_enum.values.len(), 3); + assert_eq!(data_enum.values[0].name, "PENDING"); + assert_eq!(data_enum.values[0].value, EnumValue::Int(0)); + assert_eq!(data_enum.values[1].name, "ACTIVE"); + assert_eq!(data_enum.values[1].value, EnumValue::Int(1)); + assert_eq!(data_enum.values[2].name, "INACTIVE"); + assert_eq!(data_enum.values[2].value, EnumValue::Int(2)); + } + _ => panic!("Expected Enum type"), + } + + // Test enum with spaces and special characters in names + let complex_enum = "Enum8('NOT FOUND' = 1, 'BAD_REQUEST' = 2, 'SERVER ERROR!' = 3)"; + let result = convert_clickhouse_type_to_column_type(complex_enum).unwrap(); + + match result { + (ColumnType::Enum(data_enum), false) => { + assert_eq!(data_enum.values.len(), 3); + assert_eq!(data_enum.values[0].name, "NOT FOUND"); + assert_eq!(data_enum.values[1].name, "BAD_REQUEST"); + assert_eq!(data_enum.values[2].name, "SERVER ERROR!"); + } + _ => panic!("Expected Enum type"), + } + } + + #[test] + fn test_nullable_type_conversion() { + // Test basic nullable types + assert_eq!( + convert_clickhouse_type_to_column_type("Nullable(Int32)"), + Ok((ColumnType::Int, true)) + ); + assert_eq!( + convert_clickhouse_type_to_column_type("Nullable(String)"), + Ok((ColumnType::String, true)) + ); + assert_eq!( + convert_clickhouse_type_to_column_type("Nullable(Float64)"), + Ok((ColumnType::Float, true)) + ); + + // Test nullable datetime + assert_eq!( + convert_clickhouse_type_to_column_type("Nullable(DateTime)"), + Ok((ColumnType::DateTime, true)) + ); + assert_eq!( + convert_clickhouse_type_to_column_type("Nullable(DateTime64(3))"), + Ok((ColumnType::DateTime, true)) + ); + + // Test nullable array + let (array_type, is_nullable) = + convert_clickhouse_type_to_column_type("Nullable(Array(Int32))").unwrap(); + assert!(is_nullable); + match array_type { + ColumnType::Array { + element_type, + element_nullable, + } => { + assert_eq!(*element_type, ColumnType::Int); + assert!(!element_nullable); + } + _ => panic!("Expected Array type"), + } + + // Test nullable enum + let enum_type = "Nullable(Enum8('RED' = 1, 'GREEN' = 2, 'BLUE' = 3))"; + let (column_type, is_nullable) = convert_clickhouse_type_to_column_type(enum_type).unwrap(); + assert!(is_nullable); + match column_type { + ColumnType::Enum(data_enum) => { + assert_eq!(data_enum.values.len(), 3); + assert_eq!(data_enum.values[0].name, "RED"); + assert_eq!(data_enum.values[0].value, EnumValue::Int(1)); + } + _ => panic!("Expected Enum type"), + } + + // Test array with nullable elements + let array_type = "Array(Nullable(Int32))"; + let (column_type, is_nullable) = + convert_clickhouse_type_to_column_type(array_type).unwrap(); + assert!(!is_nullable); // The array itself is not nullable + match column_type { + ColumnType::Array { + element_type, + element_nullable, + } => { + assert_eq!(*element_type, ColumnType::Int); + assert!(element_nullable); // But its elements are nullable + } + _ => panic!("Expected Array type"), + } + } + + #[test] + fn test_extract_version_from_table_name() { + // Test two-part versions + let (base_name, version) = extract_version_from_table_name("Bar_0_0", "1.0.0"); + assert_eq!(base_name, "Bar"); + assert_eq!(version.to_string(), "0.0"); + + let (base_name, version) = extract_version_from_table_name("Foo_0_0", "1.0.0"); + assert_eq!(base_name, "Foo"); + assert_eq!(version.to_string(), "0.0"); + + // Test three-part versions + let (base_name, version) = extract_version_from_table_name("Bar_0_0_0", "1.0.0"); + assert_eq!(base_name, "Bar"); + assert_eq!(version.to_string(), "0.0.0"); + + let (base_name, version) = extract_version_from_table_name("Foo_1_2_3", "0.0.0"); + assert_eq!(base_name, "Foo"); + assert_eq!(version.to_string(), "1.2.3"); + + // Test table names with underscores + let (base_name, version) = extract_version_from_table_name("My_Table_0_0", "1.0.0"); + assert_eq!(base_name, "My_Table"); + assert_eq!(version.to_string(), "0.0"); + + let (base_name, version) = + extract_version_from_table_name("Complex_Table_Name_1_0_0", "0.0.0"); + assert_eq!(base_name, "Complex_Table_Name"); + assert_eq!(version.to_string(), "1.0.0"); + + // Test invalid formats - should use default version + let (base_name, version) = extract_version_from_table_name("TableWithoutVersion", "1.0.0"); + assert_eq!(base_name, "TableWithoutVersion"); + assert_eq!(version.to_string(), "1.0.0"); + + let (base_name, version) = + extract_version_from_table_name("Table_WithoutNumericVersion", "1.0.0"); + assert_eq!(base_name, "Table_WithoutNumericVersion"); + assert_eq!(version.to_string(), "1.0.0"); + + // Test edge cases + let (base_name, version) = extract_version_from_table_name("", "1.0.0"); + assert_eq!(base_name, ""); + assert_eq!(version.to_string(), "1.0.0"); + + let (base_name, version) = extract_version_from_table_name("_0_0", "1.0.0"); + assert_eq!(base_name, ""); + assert_eq!(version.to_string(), "0.0"); + + let (base_name, version) = extract_version_from_table_name("Table_0_0_", "1.0.0"); + assert_eq!(base_name, "Table"); + assert_eq!(version.to_string(), "0.0"); + + // Test mixed numeric and non-numeric parts + let (base_name, version) = extract_version_from_table_name("Table2_0_0", "1.0.0"); + assert_eq!(base_name, "Table2"); + assert_eq!(version.to_string(), "0.0"); + + let (base_name, version) = extract_version_from_table_name("V2_Table_1_0_0", "0.0.0"); + assert_eq!(base_name, "V2_Table"); + assert_eq!(version.to_string(), "1.0.0"); + + // Test materialized views + let (base_name, version) = extract_version_from_table_name("BarAggregated_MV", "1.0.0"); + assert_eq!(base_name, "BarAggregated_MV"); + assert_eq!(version.to_string(), "1.0.0"); + + // Test non-versioned tables + let (base_name, version) = extract_version_from_table_name("Foo", "1.0.0"); + assert_eq!(base_name, "Foo"); + assert_eq!(version.to_string(), "1.0.0"); + + let (base_name, version) = extract_version_from_table_name("Bar", "1.0.0"); + assert_eq!(base_name, "Bar"); + assert_eq!(version.to_string(), "1.0.0"); + } + + #[test] + fn test_extract_order_by_from_create_query() { + // Test with explicit ORDER BY + let query = "CREATE TABLE test (id Int64) ENGINE = MergeTree() ORDER BY (id, timestamp)"; + let order_by = extract_order_by_from_create_query(query); + assert_eq!(order_by, vec!["id".to_string(), "timestamp".to_string()]); + + // Test with PRIMARY KEY and ORDER BY being different + let query = + "CREATE TABLE test (id Int64) ENGINE = MergeTree PRIMARY KEY id ORDER BY (timestamp)"; + let order_by = extract_order_by_from_create_query(query); + assert_eq!(order_by, vec!["timestamp".to_string()]); + + // Test with PRIMARY KEY but no explicit ORDER BY (should return empty) + let query = "CREATE TABLE test (id Int64) ENGINE = MergeTree PRIMARY KEY id"; + let order_by = extract_order_by_from_create_query(query); + assert_eq!(order_by, Vec::::new()); + + // Test with PRIMARY KEY and implicit ORDER BY through PRIMARY KEY + let query = "CREATE TABLE local.Foo_0_0 (`primaryKey` String, `timestamp` Float64, `optionalText` Nullable(String)) ENGINE = MergeTree PRIMARY KEY primaryKey ORDER BY primaryKey SETTINGS index_granularity = 8192"; + let order_by = extract_order_by_from_create_query(query); + assert_eq!(order_by, vec!["primaryKey".to_string()]); + + // Test with SETTINGS clause + let query = "CREATE TABLE test (id Int64) ENGINE = MergeTree() ORDER BY (id, timestamp) SETTINGS index_granularity = 8192"; + let order_by = extract_order_by_from_create_query(query); + assert_eq!(order_by, vec!["id".to_string(), "timestamp".to_string()]); + + // Test with backticks + let query = + "CREATE TABLE test (id Int64) ENGINE = MergeTree() ORDER BY (`id`, `timestamp`)"; + let order_by = extract_order_by_from_create_query(query); + assert_eq!(order_by, vec!["id".to_string(), "timestamp".to_string()]); + + // Test without parentheses + let query = "CREATE TABLE test (id Int64) ENGINE = MergeTree() ORDER BY id"; + let order_by = extract_order_by_from_create_query(query); + assert_eq!(order_by, vec!["id".to_string()]); + + // Test with no ORDER BY clause + let query = "CREATE TABLE test (id Int64) ENGINE = MergeTree()"; + let order_by = extract_order_by_from_create_query(query); + assert_eq!(order_by, Vec::::new()); + } + + #[test] + fn test_extract_order_by_from_create_query_edge_cases() { + // Test with multiple ORDER BY clauses (should only use the first one) + let query = + "CREATE TABLE test (id Int64) ENGINE = MergeTree() ORDER BY (id) ORDER BY (timestamp)"; + let order_by = extract_order_by_from_create_query(query); + assert_eq!(order_by, vec!["id".to_string()]); + + // Test with malformed ORDER BY clause (missing closing parenthesis) + let query = "CREATE TABLE test (id Int64) ENGINE = MergeTree() ORDER BY (id, timestamp"; + let order_by = extract_order_by_from_create_query(query); + assert_eq!(order_by, vec!["id".to_string(), "timestamp".to_string()]); + + // Test with empty ORDER BY clause + let query = "CREATE TABLE test (id Int64) ENGINE = MergeTree() ORDER BY ()"; + let order_by = extract_order_by_from_create_query(query); + assert_eq!(order_by, Vec::::new()); + + // Test with ORDER BY clause containing only spaces + let query = "CREATE TABLE test (id Int64) ENGINE = MergeTree() ORDER BY ( )"; + let order_by = extract_order_by_from_create_query(query); + assert_eq!(order_by, Vec::::new()); + + // Test with ORDER BY clause containing empty entries + let query = "CREATE TABLE test (id Int64) ENGINE = MergeTree() ORDER BY (id,,timestamp)"; + let order_by = extract_order_by_from_create_query(query); + assert_eq!(order_by, vec!["id".to_string(), "timestamp".to_string()]); + + // Test with complex expressions in ORDER BY + let query = "CREATE TABLE test (id Int64) ENGINE = MergeTree() ORDER BY (id, cityId, `user.id`, nested.field)"; + let order_by = extract_order_by_from_create_query(query); + assert_eq!( + order_by, + vec![ + "id".to_string(), + "cityId".to_string(), + "user.id".to_string(), + "nested.field".to_string() + ] + ); + + // Test with PRIMARY KEY in column definition and ORDER BY + let query = "CREATE TABLE test (`PRIMARY KEY` Int64) ENGINE = MergeTree() ORDER BY (`id`)"; + let order_by = extract_order_by_from_create_query(query); + assert_eq!(order_by, vec!["id".to_string()]); + } + + #[test] + fn test_convert_clickhouse_type_error_handling() { + // Test invalid Nullable format + assert!(convert_clickhouse_type_to_column_type("Nullable").is_err()); + assert!(convert_clickhouse_type_to_column_type("Nullable()").is_err()); + assert!(convert_clickhouse_type_to_column_type("Nullable(").is_err()); + + // Test invalid Array format + assert!(convert_clickhouse_type_to_column_type("Array").is_err()); + assert!(convert_clickhouse_type_to_column_type("Array()").is_err()); + assert!(convert_clickhouse_type_to_column_type("Array(").is_err()); + + // Test invalid Enum format + assert!(convert_clickhouse_type_to_column_type("Enum8").is_err()); + assert!(convert_clickhouse_type_to_column_type("Enum8()").is_err()); + assert!(convert_clickhouse_type_to_column_type("Enum8(").is_err()); + assert!(convert_clickhouse_type_to_column_type("Enum8('RED' = )").is_err()); + assert!(convert_clickhouse_type_to_column_type("Enum8('RED' = x)").is_err()); + + // Test unsupported types + assert!(convert_clickhouse_type_to_column_type("UUID").is_err()); + assert!(convert_clickhouse_type_to_column_type("IPv4").is_err()); + assert!(convert_clickhouse_type_to_column_type("IPv6").is_err()); + + // Test nested type combinations + assert!(convert_clickhouse_type_to_column_type("Array(Nullable())").is_err()); + assert!(convert_clickhouse_type_to_column_type("Nullable(Array())").is_err()); + } + + #[test] + fn test_complex_type_combinations() { + // Test Array of Nullable types + let (array_type, is_nullable) = + convert_clickhouse_type_to_column_type("Array(Nullable(Int32))").unwrap(); + assert!(!is_nullable); // Array itself is not nullable + match array_type { + ColumnType::Array { + element_type, + element_nullable, + } => { + assert_eq!(*element_type, ColumnType::Int); + assert!(element_nullable); // Elements are nullable + } + _ => panic!("Expected Array type"), + } + + // Test Nullable Array of Nullable types + let (array_type, is_nullable) = + convert_clickhouse_type_to_column_type("Nullable(Array(Nullable(Int32)))").unwrap(); + assert!(is_nullable); // Array itself is nullable + match array_type { + ColumnType::Array { + element_type, + element_nullable, + } => { + assert_eq!(*element_type, ColumnType::Int); + assert!(element_nullable); // Elements are nullable + } + _ => panic!("Expected Array type"), + } + + // Test Array of Enums + let (array_type, is_nullable) = + convert_clickhouse_type_to_column_type("Array(Enum8('RED' = 1, 'GREEN' = 2))").unwrap(); + assert!(!is_nullable); + match array_type { + ColumnType::Array { + element_type, + element_nullable, + } => { + match *element_type { + ColumnType::Enum(data_enum) => { + assert_eq!(data_enum.values.len(), 2); + assert_eq!(data_enum.values[0].name, "RED"); + assert_eq!(data_enum.values[0].value, EnumValue::Int(1)); + } + _ => panic!("Expected Enum type"), + } + assert!(!element_nullable); + } + _ => panic!("Expected Array type"), + } + + // Test Nullable Array of Enums + let (array_type, is_nullable) = + convert_clickhouse_type_to_column_type("Nullable(Array(Enum8('RED' = 1)))").unwrap(); + assert!(is_nullable); + match array_type { + ColumnType::Array { + element_type, + element_nullable, + } => { + match *element_type { + ColumnType::Enum(data_enum) => { + assert_eq!(data_enum.values.len(), 1); + assert_eq!(data_enum.values[0].name, "RED"); + } + _ => panic!("Expected Enum type"), + } + assert!(!element_nullable); + } + _ => panic!("Expected Array type"), + } + } } diff --git a/apps/framework-cli/src/infrastructure/redis/redis_client.rs b/apps/framework-cli/src/infrastructure/redis/redis_client.rs index 0502f0657..819045b7c 100644 --- a/apps/framework-cli/src/infrastructure/redis/redis_client.rs +++ b/apps/framework-cli/src/infrastructure/redis/redis_client.rs @@ -8,7 +8,7 @@ use anyhow::{Context, Result}; use log::{error, info, warn}; use redis::aio::Connection as RedisConnection; -use redis::{AsyncCommands, Client, RedisError, Script, ToRedisArgs}; +use redis::{AsyncCommands, Client, FromRedisValue, RedisError, Script, ToRedisArgs}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; @@ -445,6 +445,20 @@ impl RedisClient { Ok(()) } + pub async fn get_with_service_prefix( + &self, + key: &str, + ) -> Result> { + let value: Option = self + .connection + .lock() + .await + .get::<_, V>(self.service_prefix(&[key])) + .await + .ok(); + Ok(value) + } + pub async fn register_message_handler(&self, callback: Arc) { self.message_callbacks .lock() diff --git a/apps/framework-cli/src/project.rs b/apps/framework-cli/src/project.rs index 4a4288056..2de71a0bf 100644 --- a/apps/framework-cli/src/project.rs +++ b/apps/framework-cli/src/project.rs @@ -69,72 +69,127 @@ use crate::utilities::git::GitConfig; use crate::utilities::PathExt; use crate::utilities::_true; +/// Represents errors that can occur during project file operations #[derive(Debug, thiserror::Error)] #[error("Failed to create or delete project files")] #[non_exhaustive] pub enum ProjectFileError { + /// Error when creating the internal directory structure InternalDirCreationFailed(std::io::Error), + /// Generic error with custom message #[error("Failed to create project files: {message}")] - Other { - message: String, - }, + Other { message: String }, + /// Standard IO error IO(#[from] std::io::Error), + /// TypeScript project specific error TSProjectFileError(#[from] typescript_project::TSProjectFileError), + /// Python project specific error PythonProjectError(#[from] python_project::PythonProjectError), + /// JSON serialization error JSONSerde(#[from] serde_json::Error), + /// TOML serialization error TOMLSerde(#[from] toml::ser::Error), } -// We have explored using a Generic associated Types as well as -// Dynamic Dispatch to handle the different types of projects -// the approach with enums is the one that is the simplest to put into practice and -// maintain. With Copilot - it also has the advantage that the boiler plate is really fast to write +/// Configuration for JWT authentication +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct JwtConfig { + /// Whether to enforce JWT on all consumption APIs + #[serde(default)] + pub enforce_on_all_consumptions_apis: bool, + /// Whether to enforce JWT on all ingestion APIs + #[serde(default)] + pub enforce_on_all_ingest_apis: bool, + /// Secret key for JWT signing + pub secret: String, + /// JWT issuer + pub issuer: String, + /// JWT audience + pub audience: String, +} + +/// Language-specific project configuration +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum LanguageProjectConfig { + /// TypeScript project configuration + Typescript(TypescriptProject), + /// Python project configuration + Python(PythonProject), +} + +impl Default for LanguageProjectConfig { + fn default() -> Self { + LanguageProjectConfig::Typescript(TypescriptProject::default()) + } +} + +/// Authentication configuration for the project +#[derive(Debug, Serialize, Deserialize, Clone, Default)] +pub struct AuthenticationConfig { + /// Optional admin API key for authentication + #[serde(default)] + pub admin_api_key: Option, +} + +/// Feature flags for the project +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ProjectFeatures { + /// Whether streaming engine is enabled + #[serde(default = "_true")] + pub streaming_engine: bool, + + /// Whether workflows are enabled + #[serde(default)] + pub workflows: bool, +} + +/// Represents a user's Moose project #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Project { + /// Programming language used in the project pub language: SupportedLanguages, + /// RedPanda streaming configuration #[serde(default)] pub redpanda_config: RedpandaConfig, + /// ClickHouse database configuration pub clickhouse_config: ClickHouseConfig, + /// HTTP server configuration for local development pub http_server_config: LocalWebserverConfig, + /// Redis configuration #[serde(default)] pub redis_config: RedisConfig, + /// Git configuration #[serde(default)] pub git_config: GitConfig, - + /// Temporal workflow configuration #[serde(default)] pub temporal_config: TemporalConfig, - - // This part of the configuration for the project is dynamic and not saved - // to disk. It is loaded from the language specific configuration file or the currently - // running command + /// Language-specific project configuration (not serialized) #[serde(skip)] pub language_project_config: LanguageProjectConfig, + /// Project root directory location (not serialized) #[serde(skip)] pub project_location: PathBuf, + /// Whether the project is running in production mode #[serde(skip, default = "Project::default_production")] pub is_production: bool, - + /// Map of supported old versions and their locations #[serde(default = "HashMap::new")] pub supported_old_versions: HashMap, + /// JWT configuration #[serde(default)] pub jwt: Option, - + /// Authentication configuration + #[serde(default)] + pub authentication: AuthenticationConfig, + /// List of configured cron jobs #[serde(default, skip_serializing_if = "Vec::is_empty")] pub cron_jobs: Vec, - + /// Feature flags #[serde(default)] pub features: ProjectFeatures, } -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct ProjectFeatures { - #[serde(default = "_true")] - pub streaming_engine: bool, - - #[serde(default)] - pub workflows: bool, -} - impl Default for ProjectFeatures { fn default() -> Self { ProjectFeatures { @@ -144,36 +199,15 @@ impl Default for ProjectFeatures { } } -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct JwtConfig { - #[serde(default)] - pub enforce_on_all_consumptions_apis: bool, - #[serde(default)] - pub enforce_on_all_ingest_apis: bool, - pub secret: String, - pub issuer: String, - pub audience: String, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub enum LanguageProjectConfig { - Typescript(TypescriptProject), - Python(PythonProject), -} - -impl Default for LanguageProjectConfig { - fn default() -> Self { - LanguageProjectConfig::Typescript(TypescriptProject::default()) - } -} - static STREAMING_FUNCTION_RENAME_WARNING: Once = Once::new(); impl Project { + /// Returns the default production state (false) pub fn default_production() -> bool { false } + /// Returns the project name based on the language configuration pub fn name(&self) -> String { match &self.language_project_config { LanguageProjectConfig::Typescript(p) => p.name.clone(), @@ -181,6 +215,12 @@ impl Project { } } + /// Creates a new project with the specified parameters + /// + /// # Arguments + /// * `dir_location` - The directory where the project will be created + /// * `name` - The name of the project + /// * `language` - The programming language to use pub fn new(dir_location: &Path, name: String, language: SupportedLanguages) -> Project { let mut location = dir_location.to_path_buf(); @@ -212,13 +252,16 @@ impl Project { jwt: None, cron_jobs: Vec::new(), features: Default::default(), + authentication: AuthenticationConfig::default(), } } + /// Sets whether the project is running in production mode pub fn set_is_production_env(&mut self, is_production: bool) { self.is_production = is_production; } + /// Loads a project from the specified directory pub fn load(directory: &PathBuf) -> Result { let mut project_file = directory.clone(); @@ -256,11 +299,13 @@ impl Project { Ok(project_config) } + /// Loads a project from the current directory pub fn load_from_current_dir() -> Result { let current_dir = std::env::current_dir().expect("Failed to get the current directory"); Project::load(¤t_dir) } + /// Writes the project configuration to disk pub fn write_to_disk(&self) -> Result<(), ProjectFileError> { // Write to disk what is common to all project types, the moose.config.toml let project_file = self.project_location.join(PROJECT_CONFIG_FILE); @@ -276,6 +321,7 @@ impl Project { } } + /// Sets up the application directory structure pub fn setup_app_dir(&self) -> Result<(), ProjectFileError> { let app_dir = self.app_dir(); std::fs::create_dir_all(&app_dir)?; @@ -298,6 +344,7 @@ impl Project { Ok(()) } + /// Creates base application files with optional sample code pub fn create_base_app_files(&self, no_samples: bool) -> Result<(), std::io::Error> { // Common file paths let readme_file_path = self.project_location.join("README.md"); @@ -386,6 +433,7 @@ impl Project { Ok(()) } + /// Helper function to write file content fn write_file(&self, path: &PathBuf, content: String) -> Result<(), std::io::Error> { let mut file = std::fs::File::create(path)?; let content = if let Some(without_starting_empty_line) = content.strip_prefix('\n') { @@ -397,6 +445,7 @@ impl Project { Ok(()) } + /// Creates VSCode configuration files pub fn create_vscode_files(&self) -> Result<(), ProjectFileError> { let vscode_dir = self.vscode_dir(); @@ -422,6 +471,7 @@ impl Project { Ok(()) } + /// Returns the path to the app directory pub fn app_dir(&self) -> PathBuf { let mut app_dir = self.project_location.clone(); app_dir.push(APP_DIR); @@ -434,6 +484,7 @@ impl Project { app_dir } + /// Returns the path to the scripts directory pub fn scripts_dir(&self) -> PathBuf { let mut scripts_dir = self.app_dir(); scripts_dir.push(SCRIPTS_DIR); @@ -445,6 +496,7 @@ impl Project { scripts_dir } + /// Returns the path to the data models directory pub fn data_models_dir(&self) -> PathBuf { let mut schemas_dir = self.app_dir(); schemas_dir.push(SCHEMAS_DIR); @@ -457,7 +509,7 @@ impl Project { schemas_dir } - // Will start to be more useful when we version more than just the data models + /// Returns the path to the versioned data model directory pub fn versioned_data_model_dir(&self, version: &str) -> Result { if version == self.cur_version().as_str() { Ok(self.data_models_dir()) @@ -466,6 +518,7 @@ impl Project { } } + /// Returns the path to the streaming functions directory pub fn streaming_func_dir(&self) -> PathBuf { let functions_dir = self.app_dir().join(FUNCTIONS_DIR); @@ -485,6 +538,7 @@ impl Project { functions_dir } + /// Returns the path to the blocks directory pub fn blocks_dir(&self) -> PathBuf { let blocks_dir = self.app_dir().join(BLOCKS_DIR); @@ -496,6 +550,7 @@ impl Project { blocks_dir } + /// Returns the path to the consumption directory pub fn consumption_dir(&self) -> PathBuf { let apis_dir = self.app_dir().join(CONSUMPTION_DIR); @@ -507,6 +562,7 @@ impl Project { apis_dir } + /// Returns the path to the VSCode directory pub fn vscode_dir(&self) -> PathBuf { let mut vscode_dir = self.project_location.clone(); vscode_dir.push(VSCODE_DIR); @@ -519,8 +575,7 @@ impl Project { vscode_dir } - // This is a Result of io::Error because the caller - // can be returning a Result of io::Error or a Routine Failure + /// Returns the path to the internal directory pub fn internal_dir(&self) -> Result { let mut internal_dir = self.project_location.clone(); internal_dir.push(CLI_PROJECT_INTERNAL_DIR); @@ -545,11 +600,13 @@ impl Project { Ok(internal_dir) } + /// Deletes the internal directory pub fn delete_internal_dir(&self) -> Result<(), ProjectFileError> { let internal_dir = self.internal_dir()?; Ok(std::fs::remove_dir_all(internal_dir)?) } + /// Returns the location of an old version pub fn old_version_location(&self, version: &str) -> Result { let mut old_base_path = self.internal_dir()?; old_base_path.push(CLI_INTERNAL_VERSIONS_DIR); @@ -558,6 +615,7 @@ impl Project { Ok(old_base_path) } + /// Deletes all old versions pub fn delete_old_versions(&self) -> Result<(), ProjectFileError> { let mut old_versions = self.internal_dir()?; old_versions.push(CLI_INTERNAL_VERSIONS_DIR); @@ -569,6 +627,7 @@ impl Project { Ok(()) } + /// Returns the current version pub fn cur_version(&self) -> &Version { match &self.language_project_config { LanguageProjectConfig::Typescript(package_json) => &package_json.version, @@ -576,6 +635,7 @@ impl Project { } } + /// Returns sorted list of old versions pub fn old_versions_sorted(&self) -> Vec { self.supported_old_versions .keys() @@ -584,12 +644,14 @@ impl Project { .collect() } + /// Returns all versions including current pub fn versions(&self) -> Vec { let mut versions = self.old_versions_sorted(); versions.push(self.cur_version().to_string()); versions } + /// Returns a map of functions and their associated models pub fn get_functions(&self) -> HashMap> { let mut functions_map = HashMap::new(); @@ -628,6 +690,7 @@ impl Project { functions_map } + /// Processes function input for a directory entry fn process_function_input(&self, entry: &std::fs::DirEntry) -> Option<(String, Vec)> { let input_model = entry.file_name().to_string_lossy().into_owned(); let mut output_models = Vec::new(); @@ -647,6 +710,7 @@ impl Project { } } + /// Processes function output for a directory entry fn process_function_output(&self, entry: &std::fs::DirEntry) -> Option { if let Ok(file_type) = entry.file_type() { if file_type.is_dir() { @@ -659,11 +723,7 @@ impl Project { None } - /** - * Check if the project is running in a docker constainer we built from the CLI - * The docker container will have a special environment variable that is set by the build - * image that we can check for. DOCKER_IMAGE=true - */ + /// Checks if the project is running in a docker container pub fn is_docker_image(&self) -> bool { std::env::var("DOCKER_IMAGE").unwrap_or("false".to_string()) == "true" } diff --git a/packages/protobuf/infrastructure_map.proto b/packages/protobuf/infrastructure_map.proto index b9194218b..c6298519d 100644 --- a/packages/protobuf/infrastructure_map.proto +++ b/packages/protobuf/infrastructure_map.proto @@ -120,7 +120,6 @@ message FunctionProcess { message InitialDataLoad { Table table = 1; string topic = 2; - // unset means completed optional uint64 progress = 3; }