diff --git a/Cargo.lock b/Cargo.lock index 9db4719..839eaf0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -286,6 +286,7 @@ dependencies = [ "libp2p", "listenfd", "local-ip-address", + "moka", "postgres-types", "prost", "prost-types", @@ -446,6 +447,17 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "async-lock" +version = "3.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd03604047cee9b6ce9de9f70c6cd540a0520c813cbd49bae61f33ab80ed1dc" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -3482,10 +3494,13 @@ version = "0.12.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8261cd88c312e0004c1d51baad2980c66528dfdb2bee62003e643a4d8f86b077" dependencies = [ + "async-lock", "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", "equivalent", + "event-listener", + "futures-util", "parking_lot 0.12.5", "portable-atomic", "rustc_version", diff --git a/README.md b/README.md index fac5e52..f46f076 100644 --- a/README.md +++ b/README.md @@ -1,104 +1,93 @@ -# Anvil: Open‑Source Object Storage in Rust +# Anvil: An Open-Source Object Store for AI/ML Research -**Anvil** is an open‑source, S3‑compatible object storage server written in Rust. Built by the team behind Worka, Anvil is designed to host large files—such as open‑source model weights—with high performance and reliability. It exposes a familiar S3 HTTP gateway, a high‑performance gRPC API, multi‑tenant isolation, and the ability to scale from a single development node to a multi‑region cluster. +[![Build Status](https://github.com/worka-ai/anvil-enterprise/actions/workflows/ci.yml/badge.svg)](https://github.com/worka-ai/anvil-enterprise/actions/workflows/ci.yml) +[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) +[![JOSS Submission](https://joss.theoj.org/papers/10.21105/joss.XXXXX/status.svg)](https://joss.theoj.org/papers/10.21105/joss.XXXXX) ---- - -## 🔥 Why Anvil? - -- **Written in Rust**: Modern, memory-safe, and highly concurrent. -- **S3-Compatible**: Works out of the box with AWS SDKs, CLI, and third-party tools. -- **gRPC API**: For low-latency, high-throughput access. -- **Multi-Tenant**: Serve different model groups or clients in isolation. -- **Clusterable**: Run standalone or as a horizontally-scalable distributed system. -- **Model Hosting Friendly**: Built to serve billions of tokens efficiently. +**Anvil** is a high-performance, open-source distributed object store built in Rust. It is designed to address the data management and storage challenges inherent in modern computational research, particularly for large-scale Artificial Intelligence (AI) and Machine Learning (ML) workloads. By providing an S3-compatible interface, a native high-throughput gRPC API, and first-class support for content-addressing, Anvil serves as a foundational infrastructure layer for reproducible and efficient research. --- -## 🚀 Quick Start (Standalone) +## Key Features -```bash -cargo install anvil -anvil server --root ./data --port 9000 -``` - -Now test it: - -```bash -aws --endpoint-url http://localhost:9000 s3 ls -``` +- **Content-Addressable Storage:** Automatically deduplicates identical data using BLAKE3 hashing, dramatically reducing storage costs for versioned models and datasets. +- **High-Performance gRPC Streaming:** A native gRPC API with bidirectional streaming, ideal for high-throughput ML data loaders that feed GPUs directly from storage. +- **S3-Compatible Gateway:** Provides drop-in compatibility with the vast ecosystem of existing research tools and SDKs that support the S3 API (Boto3, MLflow, Rclone, etc.). +- **Built for the ML Ecosystem:** Includes features like the `anvil hf ingest` command to import model repositories directly from the Hugging Face Hub. +- **Modern, Resilient Architecture:** Built in Rust for memory safety and high concurrency, with a SWIM-like gossip protocol over QUIC for clustering and failure detection. +- **Multi-Tenant by Design:** Provides strong logical isolation between different users, teams, or projects. --- -## 🧪 Example: Upload and Fetch via S3 +## 🚀 Quick Start -```bash -# Upload a file -aws --endpoint-url http://localhost:9000 s3 cp weights.gguf s3://mymodels/weights.gguf +The fastest way to get a single-node Anvil instance running is with Docker Compose. -# Fetch the file -curl http://localhost:9000/mymodels/weights.gguf -``` - ---- - -## 🏗️ Building From Source +1. **Save the `docker-compose.yml`:** + Save the example `docker-compose.yml` from the [Getting Started Guide](./docs/01-getting-started.md) to a local file. -Anvil uses [Rust](https://www.rust-lang.org/tools/install) and requires at least version 1.72. +2. **Launch Anvil:** + ```bash + docker-compose up -d + ``` -```bash -git clone https://github.com/worka-ai/anvil -cd anvil -cargo build --release -``` - ---- +3. **Create Your First Tenant and App:** + Use the `admin` tool to create a tenant and an app with API credentials. + ```bash + # Create a region and a tenant + docker compose exec anvil1 admin region create europe-west-1 + docker compose exec anvil1 admin tenant create my-first-tenant -## ⚙️ Running in Cluster Mode + # Create an app and save the credentials + docker compose exec anvil1 admin app create --tenant-name my-first-tenant --app-name my-cli-app + ``` -Start multiple nodes with a shared cluster config (see [docs](https://worka.ai/docs/anvil/operational-guide/scaling)). - ---- - -## 📡 gRPC API - -See full [API reference](https://worka.ai/docs/anvil/user-guide/grpc-api). Example client use: - -```bash -anvil grpc-client --list-buckets -``` - ---- - -## 🔐 Authentication - -Supports API key-based tenant isolation. See [Auth docs](https://worka.ai/docs/anvil/user-guide/auth-permissions). +4. **Configure the Anvil CLI:** + Use the credentials from the previous step to configure your local `anvil` CLI. + ```bash + anvil configure --host http://localhost:50051 --client-id YOUR_CLIENT_ID --client-secret YOUR_CLIENT_SECRET + ``` --- ## 📘 Documentation -- [Getting Started](https://worka.ai/docs/anvil/getting-started) -- [Deployment](https://worka.ai/docs/anvil/operational-guide/deployment) -- [S3 Gateway](https://worka.ai/docs/anvil/user-guide/s3-gateway) -- [Cluster Scaling](https://worka.ai/docs/anvil/operational-guide/scaling) -- [Contributing](https://worka.ai/docs/anvil/developer-guide/contributing) +For complete guides on deployment, architecture, and usage, please see the [**Full Documentation**](./docs/index.md). + +- [Getting Started](./docs/01-getting-started.md) +- [Authentication & Permissions](./docs/03-user-guide-authentication.md) +- [Using the S3 Gateway](./docs/04-user-guide-s3-gateway.md) +- [Deployment Guide](./docs/06-operational-guide-deployment.md) --- ## 🤝 Contributing -We welcome PRs! Check out [CONTRIBUTING.md](https://worka.ai/docs/anvil/developer-guide/contributing) and start with [good first issues](https://github.com/worka-ai/anvil/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22). +We welcome contributions of all kinds! Please read our [**Contributing Guide**](./CONTRIBUTING.md) to get started. All participation in the Anvil community is governed by our [**Code of Conduct**](./CODE_OF_CONDUCT.md). --- -## 📣 Community - -- [Discord](https://discord.gg/uCWVg5STGh) — Chat with the team -- [Product Hunt](https://www.producthunt.com/products/worka-anvil) +## 📜 Citing Anvil + +If you use Anvil in your research, please cite it. Once published in JOSS, a BibTeX entry will be provided here. + +```bibtex +@article{Anvil2025, + doi = {10.21105/joss.XXXXX}, + url = {https://doi.org/10.21105/joss.XXXXX}, + year = {2025}, + publisher = {The Open Journal}, + volume = {X}, + number = {XX}, + pages = {XXXXX}, + author = {Your Name and Other Authors}, + title = {Anvil: An Open-Source Object Store for AI/ML Research}, + journal = {Journal of Open Source Software} +} +``` --- ## License -Licensed under [Apache 2.0](LICENSE). +Anvil is licensed under the [Apache 2.0 License](./LICENSE). diff --git a/anvil-core/Cargo.toml b/anvil-core/Cargo.toml index e1666ca..9b296bc 100644 --- a/anvil-core/Cargo.toml +++ b/anvil-core/Cargo.toml @@ -107,6 +107,7 @@ aes-gcm = "0.10.3" constant_time_eq = "0.4.2" http-body-util = "0.1.1" subtle = "2.6.1" +moka = { version = "0.12.11", features = ["future"] } [build-dependencies] tonic-prost-build = { version = "0.14.2" } diff --git a/anvil-core/src/cache.rs b/anvil-core/src/cache.rs new file mode 100644 index 0000000..6c786b5 --- /dev/null +++ b/anvil-core/src/cache.rs @@ -0,0 +1,99 @@ +use crate::persistence::{Bucket, Tenant}; +use moka::future::Cache; +use std::time::Duration; + +#[derive(Clone, Debug)] +pub struct MetadataCache { + // (tenant_id, bucket_name) -> Bucket + buckets: Cache<(i64, String), Bucket>, + // bucket_name -> Bucket (for public/S3 lookups without tenant_id context initially) + // This might need to handle conflicts if bucket names aren't globally unique, but + // for S3 compat they should be. Assuming global uniqueness for now. + buckets_by_name: Cache, + + // api_key -> Tenant + tenants: Cache, + + // (app_id, resource, action) -> bool (authorized) + // Or perhaps cache the list of policies? + // Let's cache the policies list for an app as that's what `get_policies_for_app` returns. + // app_id -> Vec (policies) + app_policies: Cache>, +} + +impl MetadataCache { + pub fn new(config: &crate::config::Config) -> Self { + let ttl = Duration::from_secs(config.metadata_cache_ttl_secs); + Self { + buckets: Cache::builder() + .max_capacity(10_000) + .time_to_live(ttl) + .build(), + buckets_by_name: Cache::builder() + .max_capacity(10_000) + .time_to_live(ttl) + .build(), + tenants: Cache::builder() + .max_capacity(5_000) + .time_to_live(ttl * 2) + .build(), + app_policies: Cache::builder() + .max_capacity(5_000) + .time_to_live(ttl) + .build(), + } + } + + pub async fn get_bucket(&self, tenant_id: i64, name: &str) -> Option { + self.buckets.get(&(tenant_id, name.to_string())).await + } + + pub async fn insert_bucket(&self, tenant_id: i64, name: String, bucket: Bucket) { + self.buckets.insert((tenant_id, name.clone()), bucket.clone()).await; + self.buckets_by_name.insert(name, bucket).await; + } + + pub async fn invalidate_bucket(&self, tenant_id: i64, name: &str) { + self.buckets.invalidate(&(tenant_id, name.to_string())).await; + self.buckets_by_name.invalidate(name).await; + } + + // For when we only know the name (e.g. deleting by name, or cross-tenant lookup if allowed) + pub async fn get_bucket_by_name_only(&self, name: &str) -> Option { + self.buckets_by_name.get(name).await + } + + pub async fn invalidate_bucket_by_name(&self, name: &str) { + self.buckets_by_name.invalidate(name).await; + // Note: We can't easily invalidate the (tenant_id, name) key without scanning + // or knowing the tenant_id. This is a trade-off. + // For strict consistency, the caller should provide tenant_id if possible. + // However, P2P events usually contain enough info. + } + + pub async fn get_tenant(&self, api_key: &str) -> Option { + self.tenants.get(api_key).await + } + + pub async fn insert_tenant(&self, api_key: String, tenant: Tenant) { + self.tenants.insert(api_key, tenant).await; + } + + pub async fn invalidate_tenant(&self, api_key: &str) { + self.tenants.invalidate(api_key).await; + } + + pub async fn get_app_policies(&self, app_id: i64) -> Option> { + self.app_policies.get(&app_id).await + } + + pub async fn insert_app_policies(&self, app_id: i64, policies: Vec) { + self.app_policies.insert(app_id, policies).await; + } + + pub async fn invalidate_app_policies(&self, app_id: i64) { + self.app_policies.invalidate(&app_id).await; + } +} + + diff --git a/anvil-core/src/cluster.rs b/anvil-core/src/cluster.rs index 3bd7f9c..3777b86 100644 --- a/anvil-core/src/cluster.rs +++ b/anvil-core/src/cluster.rs @@ -14,7 +14,9 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::RwLock; -use tracing::info; +use tracing::{info, error}; + +use crate::cache::MetadataCache; // Rich information about a peer in the cluster. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -38,6 +40,13 @@ pub struct ClusterMessage { pub signature: Vec, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum MetadataEvent { + BucketUpdated { tenant_id: i64, name: String }, + TenantUpdated { api_key: String }, + PolicyUpdated { app_id: i64 }, +} + impl ClusterMessage { // Sign the message with the given secret. pub fn sign(&mut self, secret: &str) -> Result<()> { @@ -174,9 +183,13 @@ pub async fn run_gossip( cluster_state: ClusterState, grpc_addr: String, cluster_secret: Option, + metadata_cache: MetadataCache, + mut outbound_events: tokio::sync::mpsc::Receiver, ) -> Result<()> { - let topic = Topic::new("anvil-cluster"); - swarm.behaviour_mut().gossipsub.subscribe(&topic)?; + let cluster_topic = Topic::new("anvil-cluster"); + let metadata_topic = Topic::new("anvil-metadata"); + swarm.behaviour_mut().gossipsub.subscribe(&cluster_topic)?; + swarm.behaviour_mut().gossipsub.subscribe(&metadata_topic)?; let local_peer_id = *swarm.local_peer_id(); @@ -215,14 +228,24 @@ pub async fn run_gossip( } if let Ok(encoded_message) = serde_json::to_vec(&message) { - if let Err(e) = swarm.behaviour_mut().gossipsub.publish(topic.clone(), encoded_message) { + if let Err(e) = swarm.behaviour_mut().gossipsub.publish(cluster_topic.clone(), encoded_message) { info!("[GOSSIP] Failed to publish gossip message: {:?}", e); } } } + Some(event) = outbound_events.recv() => { + if let Ok(encoded_event) = serde_json::to_vec(&event) { + if let Err(e) = swarm.behaviour_mut().gossipsub.publish(metadata_topic.clone(), encoded_event) { + error!("[GOSSIP] Failed to publish metadata event: {:?}", e); + } else { + info!("[GOSSIP] Published metadata event: {:?}", event); + } + } + } + event = swarm.select_next_some() => { - handle_swarm_event(event, &mut swarm, &cluster_state, &grpc_addr, &cluster_secret).await; + handle_swarm_event(event, &mut swarm, &cluster_state, &grpc_addr, &cluster_secret, &metadata_cache).await; } } } @@ -234,8 +257,12 @@ pub async fn handle_swarm_event( cluster_state: &ClusterState, grpc_addr: &str, cluster_secret: &Option, + metadata_cache: &MetadataCache, ) { let local_peer_id = *swarm.local_peer_id(); + let cluster_topic = Topic::new("anvil-cluster"); + let metadata_topic = Topic::new("anvil-metadata"); + match event { SwarmEvent::NewListenAddr { address, .. } => { info!("[GOSSIP] Listening on {address}"); @@ -272,42 +299,59 @@ pub async fn handle_swarm_event( message, .. })) => { - if let Ok(cluster_message) = serde_json::from_slice::(&message.data) { - if let Some(secret) = cluster_secret { - if let Err(e) = cluster_message.verify(secret) { - info!( - "[GOSSIP] Invalid signature from peer: {}, error: {:?}", - cluster_message.peer_id, e - ); - return; + if message.topic == cluster_topic.hash() { + if let Ok(cluster_message) = serde_json::from_slice::(&message.data) { + if let Some(secret) = cluster_secret { + if let Err(e) = cluster_message.verify(secret) { + info!( + "[GOSSIP] Invalid signature from peer: {}, error: {:?}", + cluster_message.peer_id, e + ); + return; + } + // Check timestamp to prevent replay attacks + let now = Utc::now().timestamp(); + if (now - cluster_message.timestamp).abs() > 60 { + info!( + "[GOSSIP] Stale message from peer: {}, timestamp: {}", + cluster_message.peer_id, cluster_message.timestamp + ); + return; + } } - // Check timestamp to prevent replay attacks - let now = Utc::now().timestamp(); - if (now - cluster_message.timestamp).abs() > 60 { - info!( - "[GOSSIP] Stale message from peer: {}, timestamp: {}", - cluster_message.peer_id, cluster_message.timestamp - ); - return; - } - } - info!( - "[GOSSIP] Received cluster message from peer: {}", - cluster_message.peer_id - ); - let mut state = cluster_state.write().await; - let info = state - .entry(cluster_message.peer_id) - .or_insert_with(|| PeerInfo { - p2p_addrs: Vec::new(), - grpc_addr: cluster_message.grpc_addr, - }); - for addr in cluster_message.p2p_addrs { - if !info.p2p_addrs.contains(&addr) { - info.p2p_addrs.push(addr); + info!( + "[GOSSIP] Received cluster message from peer: {}", + cluster_message.peer_id + ); + let mut state = cluster_state.write().await; + let info = state + .entry(cluster_message.peer_id) + .or_insert_with(|| PeerInfo { + p2p_addrs: Vec::new(), + grpc_addr: cluster_message.grpc_addr, + }); + for addr in cluster_message.p2p_addrs { + if !info.p2p_addrs.contains(&addr) { + info.p2p_addrs.push(addr); + } } } + } else if message.topic == metadata_topic.hash() { + if let Ok(event) = serde_json::from_slice::(&message.data) { + info!("[GOSSIP] Received metadata event: {:?}", event); + match event { + MetadataEvent::BucketUpdated { tenant_id, name } => { + metadata_cache.invalidate_bucket(tenant_id, &name).await; + } + MetadataEvent::TenantUpdated { api_key } => { + metadata_cache.invalidate_tenant(&api_key).await; + } + MetadataEvent::PolicyUpdated { app_id } => { + metadata_cache.invalidate_app_policies(app_id).await; + } + } + } } } _ => {} diff --git a/anvil-core/src/config.rs b/anvil-core/src/config.rs index 37aca93..991d5b5 100644 --- a/anvil-core/src/config.rs +++ b/anvil-core/src/config.rs @@ -55,6 +55,10 @@ pub struct Config { /// The shared secret for cluster authentication. #[arg(long, env)] pub cluster_secret: Option, + + /// TTL for metadata cache entries in seconds. + #[arg(long, env, default_value_t = 300)] + pub metadata_cache_ttl_secs: u64, } impl Config { #[allow(unused)] diff --git a/anvil-core/src/lib.rs b/anvil-core/src/lib.rs index 0fa90fa..58f0677 100644 --- a/anvil-core/src/lib.rs +++ b/anvil-core/src/lib.rs @@ -10,6 +10,7 @@ use tokio::sync::RwLock; // The modules we've created pub mod auth; pub mod bucket_manager; +pub mod cache; pub mod cluster; pub mod config; pub mod crypto; @@ -49,12 +50,17 @@ pub struct AppState { } impl AppState { - pub async fn new(global_pool: Pool, regional_pool: Pool, config: Config) -> Result { + pub async fn new( + global_pool: Pool, + regional_pool: Pool, + config: Config, + event_publisher: Option>, + ) -> Result { let arc_config = Arc::new(config); let jwt_manager = Arc::new(JwtManager::new(arc_config.jwt_secret.clone())); let storage = storage::Storage::new().await?; let cluster_state = Arc::new(RwLock::new(HashMap::new())); - let db = persistence::Persistence::new(global_pool, regional_pool); + let db = persistence::Persistence::new(global_pool, regional_pool, event_publisher, &arc_config); let sharder = sharding::ShardManager::new(); let placer = placement::PlacementManager::default(); diff --git a/anvil-core/src/object_manager.rs b/anvil-core/src/object_manager.rs index 1ed1813..6b8f428 100644 --- a/anvil-core/src/object_manager.rs +++ b/anvil-core/src/object_manager.rs @@ -209,10 +209,17 @@ impl ObjectManager { let bucket = self .db - .get_bucket_by_name(tenant_id, bucket_name, &self.region) + .get_bucket_by_name(tenant_id, bucket_name) .await .map_err(|e| Status::internal(e.to_string()))? .ok_or_else(|| Status::not_found("Bucket not found"))?; + + if bucket.region != self.region { + return Err(Status::failed_precondition(format!( + "Bucket is in region {}", + bucket.region + ))); + } let shard_map_json = if nodes.len() > 1 { let peer_ids: Vec = nodes.iter().map(|p| p.to_base58()).collect(); Some(serde_json::json!(peer_ids)) @@ -525,11 +532,18 @@ impl ObjectManager { let bucket = self .db - .get_bucket_by_name(tenant_id, bucket_name, &self.region) + .get_bucket_by_name(tenant_id, bucket_name) .await .map_err(|e| Status::internal(e.to_string()))? .ok_or_else(|| Status::not_found("Bucket not found"))?; + if bucket.region != self.region { + return Err(Status::failed_precondition(format!( + "Bucket is in region {}", + bucket.region + ))); + } + let object = self .db .soft_delete_object(bucket.id, object_key) @@ -627,10 +641,10 @@ impl ObjectManager { claims: Option<&auth::Claims>, bucket_name: &str, ) -> Result { - match claims { + let bucket = match claims { Some(c) => self .db - .get_bucket_by_name(c.tenant_id, bucket_name, &self.region) + .get_bucket_by_name(c.tenant_id, bucket_name) .await .map_err(|e| Status::internal(e.to_string()))? .ok_or_else(|| Status::not_found("Bucket not found for this tenant")), @@ -640,6 +654,15 @@ impl ObjectManager { .await .map_err(|e| Status::internal(e.to_string()))? .ok_or_else(|| Status::not_found("Public bucket not found")), + }?; + + if bucket.region != self.region { + return Err(Status::failed_precondition(format!( + "Bucket is in region {}", + bucket.region + ))); } + + Ok(bucket) } } diff --git a/anvil-core/src/persistence.rs b/anvil-core/src/persistence.rs index 9718711..0a86b40 100644 --- a/anvil-core/src/persistence.rs +++ b/anvil-core/src/persistence.rs @@ -4,14 +4,20 @@ use deadpool_postgres::Pool; use serde_json::Value as JsonValue; use tokio_postgres::Row; +use crate::cache::MetadataCache; +use crate::cluster::MetadataEvent; +use tokio::sync::mpsc::Sender; + #[derive(Debug, Clone)] pub struct Persistence { global_pool: Pool, regional_pool: Pool, + cache: MetadataCache, + event_publisher: Option>, } // Structs that map to our database tables -#[derive(Debug, serde::Serialize)] +#[derive(Debug, Clone, serde::Serialize)] pub struct Tenant { pub id: i64, pub name: String, @@ -24,7 +30,7 @@ pub struct App { pub client_id: String, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Bucket { pub id: i64, pub tenant_id: i64, @@ -140,13 +146,29 @@ impl From for AppDetails { } impl Persistence { - pub fn new(global_pool: Pool, regional_pool: Pool) -> Self { + pub fn new( + global_pool: Pool, + regional_pool: Pool, + event_publisher: Option>, + config: &crate::config::Config, + ) -> Self { Self { global_pool, regional_pool, + cache: MetadataCache::new(config), + event_publisher, } } + async fn publish_event(&self, event: MetadataEvent) { + if let Some(publisher) = &self.event_publisher { + if let Err(e) = publisher.send(event).await { + tracing::warn!("Failed to publish metadata event: {}", e); + } + } + } + + pub async fn get_admin_user_by_username(&self, username: &str) -> Result> { let client = self.global_pool.get().await?; let row = client @@ -188,6 +210,10 @@ impl Persistence { &self.global_pool } + pub fn cache(&self) -> &MetadataCache { + &self.cache + } + pub async fn create_admin_user( &self, username: &str, @@ -745,7 +771,13 @@ impl Persistence { match result { Ok(row) => { tracing::debug!("[Persistence] EXITING create_bucket: success"); - Ok(row.into()) + let bucket: Bucket = row.into(); + self.cache.insert_bucket(tenant_id, name.to_string(), bucket.clone()).await; + self.publish_event(MetadataEvent::BucketUpdated { + tenant_id, + name: name.to_string(), + }).await; + Ok(bucket) } Err(e) => { tracing::debug!("[Persistence] EXITING create_bucket: error"); @@ -766,37 +798,80 @@ impl Persistence { &self, tenant_id: i64, name: &str, - region: &str, ) -> Result> { + // Check cache first + if let Some(bucket) = self.cache.get_bucket(tenant_id, name).await { + return Ok(Some(bucket)); + } + let client = self.global_pool.get().await?; + // Removed region constraint let row = client .query_opt( - "SELECT id, name, region, created_at, is_public_read, tenant_id FROM buckets WHERE tenant_id = $1 AND name = $2 AND region = $3 AND deleted_at IS NULL", - &[&tenant_id, &name, ®ion], + "SELECT id, name, region, created_at, is_public_read, tenant_id FROM buckets WHERE tenant_id = $1 AND name = $2 AND deleted_at IS NULL", + &[&tenant_id, &name], ) .await?; - Ok(row.map(Into::into)) + + if let Some(row) = row { + let bucket: Bucket = row.into(); + self.cache.insert_bucket(tenant_id, name.to_string(), bucket.clone()).await; + Ok(Some(bucket)) + } else { + Ok(None) + } } pub async fn get_public_bucket_by_name(&self, name: &str) -> Result> { + if let Some(bucket) = self.cache.get_bucket_by_name_only(name).await { + if bucket.is_public_read { + return Ok(Some(bucket)); + } + // If cached but not public, return None (effectively hiding it) + return Ok(None); + } + let client = self.global_pool.get().await?; let row = client .query_opt( - "SELECT * FROM buckets WHERE name = $1 AND is_public_read = true AND deleted_at IS NULL", + "SELECT * FROM buckets WHERE name = $1 AND deleted_at IS NULL", &[&name], ) .await?; - Ok(row.map(Into::into)) + + if let Some(row) = row { + let bucket: Bucket = row.into(); + // We cache it regardless of public status so we don't hit DB repeatedly for non-public buckets? + // Or only if public? + // My `buckets_by_name` cache is generic. It's better to cache it. + self.cache.insert_bucket(bucket.tenant_id, name.to_string(), bucket.clone()).await; + + if bucket.is_public_read { + Ok(Some(bucket)) + } else { + Ok(None) + } + } else { + Ok(None) + } } pub async fn set_bucket_public_access(&self, bucket_name: &str, is_public: bool) -> Result<()> { let client = self.global_pool.get().await?; - client - .execute( - "UPDATE buckets SET is_public_read = $1 WHERE name = $2", + let row = client + .query_one( + "UPDATE buckets SET is_public_read = $1 WHERE name = $2 RETURNING tenant_id", &[&is_public, &bucket_name], ) .await?; + + let tenant_id: i64 = row.get("tenant_id"); + self.cache.invalidate_bucket(tenant_id, bucket_name).await; + self.publish_event(MetadataEvent::BucketUpdated { + tenant_id, + name: bucket_name.to_string(), + }).await; + Ok(()) } @@ -808,6 +883,16 @@ impl Persistence { &[&bucket_name], ) .await?; + + if let Some(ref r) = row { + let tenant_id: i64 = r.get("tenant_id"); + self.cache.invalidate_bucket(tenant_id, bucket_name).await; + self.publish_event(MetadataEvent::BucketUpdated { + tenant_id, + name: bucket_name.to_string(), + }).await; + } + Ok(row.map(Into::into)) } diff --git a/anvil-core/src/s3_gateway.rs b/anvil-core/src/s3_gateway.rs index 7174a9a..6f80e07 100644 --- a/anvil-core/src/s3_gateway.rs +++ b/anvil-core/src/s3_gateway.rs @@ -183,7 +183,7 @@ async fn head_bucket( match state .db - .get_bucket_by_name(claims.tenant_id, &bucket_name, &state.region) + .get_bucket_by_name(claims.tenant_id, &bucket_name) .await { Ok(Some(_)) => (axum::http::StatusCode::OK, "").into_response(), diff --git a/anvil-core/src/worker.rs b/anvil-core/src/worker.rs index cc6cf1c..c5e6991 100644 --- a/anvil-core/src/worker.rs +++ b/anvil-core/src/worker.rs @@ -237,7 +237,7 @@ async fn handle_hf_ingestion( debug!(item_id, "Item state set to downloading."); if let Ok(bucket_opt) = - persistence.get_bucket_by_name(tenant_id, &target_bucket, &target_region).await + persistence.get_bucket_by_name(tenant_id, &target_bucket).await { if let Some(bucket) = bucket_opt { if let Ok(obj_opt) = persistence.get_object(bucket.id, &path).await { @@ -282,7 +282,7 @@ async fn handle_hf_ingestion( // --- End Blocking --- let _bucket = persistence - .get_bucket_by_name(tenant_id, &target_bucket, &target_region) + .get_bucket_by_name(tenant_id, &target_bucket) .await? .ok_or_else(|| anyhow!("target bucket not found"))?; let full_key = if target_prefix.is_empty() { diff --git a/anvil-test-utils/src/lib.rs b/anvil-test-utils/src/lib.rs index 039450b..954ab31 100644 --- a/anvil-test-utils/src/lib.rs +++ b/anvil-test-utils/src/lib.rs @@ -183,7 +183,8 @@ impl TestCluster { "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(), cluster_listen_addr: "/ip4/127.0.0.1/udp/0/quic-v1".to_string(), public_cluster_addrs: vec![], - public_api_addr: "".to_string(), + metadata_cache_ttl_secs: 1, + public_api_addr: "127.0.0.1:0".to_string(), api_listen_addr: "127.0.0.1:0".to_string(), region: "".to_string(), bootstrap_addrs: vec![], @@ -233,7 +234,8 @@ impl TestCluster { let regional_pool = regional_pools.get(*region_name).unwrap().clone(); let mut node_config = config.deref().clone(); node_config.region = region_name.to_string(); - let state = AppState::new(global_pool.clone(), regional_pool, node_config) + node_config.metadata_cache_ttl_secs = 1; // Short TTL for tests + let state = AppState::new(global_pool.clone(), regional_pool, node_config, None) .await .unwrap(); states.push(state); @@ -307,7 +309,8 @@ impl TestCluster { state.config = Arc::new(cfg); let handle = tokio::spawn(async move { - anvil::start_node(listener, state, swarm).await.unwrap(); + let (_tx, rx) = tokio::sync::mpsc::channel(1); + anvil::start_node(listener, state, swarm, rx).await.unwrap(); }); self.nodes.push(handle); } diff --git a/anvil/migrations_global/V1__initial_global_schema.sql b/anvil/migrations_global/V1__initial_global_schema.sql index 36d2939..4e63d62 100644 --- a/anvil/migrations_global/V1__initial_global_schema.sql +++ b/anvil/migrations_global/V1__initial_global_schema.sql @@ -116,3 +116,59 @@ CREATE TABLE hf_ingestion_items ( UNIQUE(ingestion_id, path) ); CREATE INDEX idx_hf_ingestion_items_ingest ON hf_ingestion_items(ingestion_id); + +-- Admin Auth Tables + +CREATE TABLE admin_roles ( + id SERIAL PRIMARY KEY, + name TEXT UNIQUE NOT NULL -- e.g., 'SuperAdmin', 'ReadOnlyViewer' +); + +CREATE TABLE admin_users ( + id BIGSERIAL PRIMARY KEY, + username TEXT UNIQUE NOT NULL, + email TEXT UNIQUE NOT NULL, + password_hash TEXT NOT NULL, + is_active BOOLEAN NOT NULL DEFAULT true, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE TABLE admin_user_roles ( + user_id BIGINT NOT NULL REFERENCES admin_users(id) ON DELETE CASCADE, + role_id INTEGER NOT NULL REFERENCES admin_roles(id) ON DELETE CASCADE, + PRIMARY KEY (user_id, role_id) +); + +CREATE TABLE admin_role_permissions ( + id SERIAL PRIMARY KEY, + role_id INTEGER NOT NULL REFERENCES admin_roles(id) ON DELETE CASCADE, + resource TEXT NOT NULL, -- e.g., 'cluster', 'tenants', 'nodes' + action TEXT NOT NULL, -- e.g., 'read', 'write', 'create', 'delete' + UNIQUE (role_id, resource, action) +); + +-- Seed the initial roles +INSERT INTO admin_roles (name) VALUES ('SuperAdmin'), ('ReadOnlyViewer'); + +-- Grant permissions to ReadOnlyViewer +-- This role can only perform GET requests +INSERT INTO admin_role_permissions (role_id, resource, action) +SELECT id, 'cluster', 'read' FROM admin_roles WHERE name = 'ReadOnlyViewer'; + +INSERT INTO admin_role_permissions (role_id, resource, action) +SELECT id, 'regions', 'read' FROM admin_roles WHERE name = 'ReadOnlyViewer'; + +INSERT INTO admin_role_permissions (role_id, resource, action) +SELECT id, 'tenants', 'read' FROM admin_roles WHERE name = 'ReadOnlyViewer'; + +INSERT INTO admin_role_permissions (role_id, resource, action) +SELECT id, 'apps', 'read' FROM admin_roles WHERE name = 'ReadOnlyViewer'; + +INSERT INTO admin_role_permissions (role_id, resource, action) +SELECT id, 'hf', 'read' FROM admin_roles WHERE name = 'ReadOnlyViewer'; + +-- Grant all permissions to SuperAdmin +INSERT INTO admin_role_permissions (role_id, resource, action) +SELECT id, '*', '*' FROM admin_roles WHERE name = 'SuperAdmin'; + diff --git a/anvil/migrations_global/V2__create_admin_auth_tables.sql b/anvil/migrations_global/V2__create_admin_auth_tables.sql deleted file mode 100644 index cea8f44..0000000 --- a/anvil/migrations_global/V2__create_admin_auth_tables.sql +++ /dev/null @@ -1,55 +0,0 @@ --- Admin Auth Tables - -CREATE TABLE admin_roles ( - id SERIAL PRIMARY KEY, - name TEXT UNIQUE NOT NULL -- e.g., 'SuperAdmin', 'ReadOnlyViewer' -); - -CREATE TABLE admin_users ( - id BIGSERIAL PRIMARY KEY, - username TEXT UNIQUE NOT NULL, - email TEXT UNIQUE NOT NULL, - password_hash TEXT NOT NULL, - is_active BOOLEAN NOT NULL DEFAULT true, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT now() -); - -CREATE TABLE admin_user_roles ( - user_id BIGINT NOT NULL REFERENCES admin_users(id) ON DELETE CASCADE, - role_id INTEGER NOT NULL REFERENCES admin_roles(id) ON DELETE CASCADE, - PRIMARY KEY (user_id, role_id) -); - -CREATE TABLE admin_role_permissions ( - id SERIAL PRIMARY KEY, - role_id INTEGER NOT NULL REFERENCES admin_roles(id) ON DELETE CASCADE, - resource TEXT NOT NULL, -- e.g., 'cluster', 'tenants', 'nodes' - action TEXT NOT NULL, -- e.g., 'read', 'write', 'create', 'delete' - UNIQUE (role_id, resource, action) -); - --- Seed the initial roles -INSERT INTO admin_roles (name) VALUES ('SuperAdmin'), ('ReadOnlyViewer'); - --- Grant permissions to ReadOnlyViewer --- This role can only perform GET requests -INSERT INTO admin_role_permissions (role_id, resource, action) -SELECT id, 'cluster', 'read' FROM admin_roles WHERE name = 'ReadOnlyViewer'; - -INSERT INTO admin_role_permissions (role_id, resource, action) -SELECT id, 'regions', 'read' FROM admin_roles WHERE name = 'ReadOnlyViewer'; - -INSERT INTO admin_role_permissions (role_id, resource, action) -SELECT id, 'tenants', 'read' FROM admin_roles WHERE name = 'ReadOnlyViewer'; - -INSERT INTO admin_role_permissions (role_id, resource, action) -SELECT id, 'apps', 'read' FROM admin_roles WHERE name = 'ReadOnlyViewer'; - -INSERT INTO admin_role_permissions (role_id, resource, action) -SELECT id, 'hf', 'read' FROM admin_roles WHERE name = 'ReadOnlyViewer'; - --- Grant all permissions to SuperAdmin -INSERT INTO admin_role_permissions (role_id, resource, action) -SELECT id, '*', '*' FROM admin_roles WHERE name = 'SuperAdmin'; - diff --git a/anvil/migrations_regional/V1__initial_regional_schema.sql b/anvil/migrations_regional/V1__initial_regional_schema.sql index 3a6a64f..16e3037 100644 --- a/anvil/migrations_regional/V1__initial_regional_schema.sql +++ b/anvil/migrations_regional/V1__initial_regional_schema.sql @@ -101,3 +101,27 @@ CREATE INDEX idx_objects_trgm ON objects USING GIN(key gin_trgm_ops); CREATE INDEX idx_objects_created_at ON objects USING BRIN(created_at); CREATE INDEX idx_objects_not_deleted ON objects (bucket_id, key) WHERE deleted_at IS NULL; + +CREATE TABLE model_artifacts ( + artifact_id TEXT PRIMARY KEY, -- blake3 + bucket_id BIGINT NOT NULL, + key TEXT NOT NULL, + manifest JSONB NOT NULL, + created_at TIMESTAMPTZ DEFAULT now() +); + +CREATE TABLE model_tensors ( + artifact_id TEXT NOT NULL REFERENCES model_artifacts (artifact_id) ON DELETE CASCADE, + tensor_name TEXT NOT NULL, + file_path TEXT NOT NULL, + file_offset BIGINT NOT NULL, + byte_length BIGINT NOT NULL, + dtype TEXT NOT NULL, + shape INTEGER[] NOT NULL, + layout TEXT NOT NULL, + block_bytes INTEGER, + blocks JSONB, + PRIMARY KEY (artifact_id, tensor_name) +); +CREATE INDEX idx_model_tensors_name ON model_tensors (artifact_id, tensor_name); +CREATE INDEX idx_model_tensors_file ON model_tensors (artifact_id, file_path, file_offset); diff --git a/anvil/migrations_regional/V2__create_model_tables.sql b/anvil/migrations_regional/V2__create_model_tables.sql deleted file mode 100644 index 27edbcc..0000000 --- a/anvil/migrations_regional/V2__create_model_tables.sql +++ /dev/null @@ -1,23 +0,0 @@ -CREATE TABLE model_artifacts ( - artifact_id TEXT PRIMARY KEY, -- blake3 - bucket_id BIGINT NOT NULL, - key TEXT NOT NULL, - manifest JSONB NOT NULL, - created_at TIMESTAMPTZ DEFAULT now() -); - -CREATE TABLE model_tensors ( - artifact_id TEXT NOT NULL REFERENCES model_artifacts (artifact_id) ON DELETE CASCADE, - tensor_name TEXT NOT NULL, - file_path TEXT NOT NULL, - file_offset BIGINT NOT NULL, - byte_length BIGINT NOT NULL, - dtype TEXT NOT NULL, - shape INTEGER[] NOT NULL, - layout TEXT NOT NULL, - block_bytes INTEGER, - blocks JSONB, - PRIMARY KEY (artifact_id, tensor_name) -); -CREATE INDEX idx_model_tensors_name ON model_tensors (artifact_id, tensor_name); -CREATE INDEX idx_model_tensors_file ON model_tensors (artifact_id, file_path, file_offset); diff --git a/anvil/src/bin/admin.rs b/anvil/src/bin/admin.rs index e10d3a2..2680b8e 100644 --- a/anvil/src/bin/admin.rs +++ b/anvil/src/bin/admin.rs @@ -128,7 +128,15 @@ enum RegionCommands { #[tokio::main] async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); - let config = cli.config; + let shared_config = cli.config; + + let mut config = anvil_core::config::Config::default(); + config.global_database_url = shared_config.global_database_url; + config.anvil_secret_encryption_key = shared_config.anvil_secret_encryption_key; + // Set a dummy region and public_api_addr, as admin CLI doesn't use them, + // but Persistence::new needs a full Config. + config.region = "admin-cli-region".to_string(); + config.public_api_addr = "127.0.0.1:0".to_string(); let global_pool = create_pool(&config.global_database_url)?; // The admin tool only interacts with the global DB, so we can use it as a placeholder for the regional pool. @@ -141,7 +149,7 @@ async fn main() -> anyhow::Result<()> { ) .await?; - let persistence = Persistence::new(global_pool, regional_pool); + let persistence = Persistence::new(global_pool, regional_pool, None, &config); let encryption_key = hex::decode(config.anvil_secret_encryption_key)?; match &cli.command { @@ -157,7 +165,6 @@ async fn main() -> anyhow::Result<()> { app_name, } => { println!("Creating app for tenant: {}", tenant_name); - println!("Admin received tenant_name: {}", tenant_name); let tenant = persistence .get_tenant_by_name(tenant_name) .await? diff --git a/anvil/src/lib.rs b/anvil/src/lib.rs index 9209def..83bdebb 100644 --- a/anvil/src/lib.rs +++ b/anvil/src/lib.rs @@ -46,17 +46,21 @@ pub async fn run( let regional_pool = create_pool(&config.regional_database_url)?; let global_pool = create_pool(&config.global_database_url)?; - let state = AppState::new(global_pool, regional_pool, config).await?; + + let (tx, rx) = tokio::sync::mpsc::channel(100); + + let state = AppState::new(global_pool, regional_pool, config, Some(tx)).await?; let swarm = anvil_core::cluster::create_swarm(state.config.clone()).await?; // Then start the node - start_node(listener, state, swarm).await + start_node(listener, state, swarm, rx).await } pub async fn start_node( listener: tokio::net::TcpListener, state: AppState, mut swarm: libp2p::Swarm, + outbound_events_rx: tokio::sync::mpsc::Receiver, ) -> Result<()> { for addr in &state.config.bootstrap_addrs { let multiaddr: libp2p::Multiaddr = addr.parse()?; @@ -126,6 +130,8 @@ pub async fn start_node( state.cluster.clone(), state.config.public_api_addr.clone(), state.config.cluster_secret.clone(), + state.db.cache().clone(), + outbound_events_rx, )); let server_task = tokio::spawn(async move { axum::serve(listener, app.into_make_service()).await }); diff --git a/anvil/src/s3_gateway.rs b/anvil/src/s3_gateway.rs index 7174a9a..27d3393 100644 --- a/anvil/src/s3_gateway.rs +++ b/anvil/src/s3_gateway.rs @@ -11,6 +11,13 @@ use axum::{ }; use futures_util::stream::StreamExt; use std::collections::HashMap; +use serde::Deserialize; + +#[derive(Deserialize)] +struct CreateBucketConfiguration { + #[serde(rename = "LocationConstraint")] + location_constraint: Option, +} fn s3_error(code: &str, message: &str, status: axum::http::StatusCode) -> Response { let body = format!( @@ -128,15 +135,21 @@ async fn create_bucket( ); } }; - //let _ = body.collect().await; - // let body_stream = req.into_body().into_data_stream().map(|r| { - // r.map(|chunk| chunk.to_vec()) - // .map_err(|e| tonic::Status::internal(e.to_string())) - // }).collect::>(); - // println!("{:?}", body_stream); + + let bytes = axum::body::to_bytes(req.into_body(), 1024 * 1024).await.unwrap_or_default(); + let region = if !bytes.is_empty() { + if let Ok(config) = quick_xml::de::from_reader::<_, CreateBucketConfiguration>(&bytes[..]) { + config.location_constraint.unwrap_or(state.region.clone()) + } else { + state.region.clone() + } + } else { + state.region.clone() + }; + match state .bucket_manager - .create_bucket(claims.tenant_id, &bucket, &state.region, &claims.scopes) + .create_bucket(claims.tenant_id, &bucket, ®ion, &claims.scopes) .await { Ok(_) => (axum::http::StatusCode::OK, "").into_response(), @@ -165,6 +178,19 @@ async fn create_bucket( } } +fn s3_redirect(region: &str) -> Response { + let body = format!( + "\n\n PermanentRedirect\n The bucket is in this region: {}. Please use this region to retry the request.\n {}\n\n", + region, region + ); + Response::builder() + .status(axum::http::StatusCode::MOVED_PERMANENTLY) + .header("Content-Type", "application/xml") + .header("x-amz-bucket-region", region) + .body(Body::from(body)) + .unwrap() +} + async fn head_bucket( State(state): State, Path(bucket_name): Path, @@ -183,10 +209,15 @@ async fn head_bucket( match state .db - .get_bucket_by_name(claims.tenant_id, &bucket_name, &state.region) + .get_bucket_by_name(claims.tenant_id, &bucket_name) .await { - Ok(Some(_)) => (axum::http::StatusCode::OK, "").into_response(), + Ok(Some(bucket)) => { + if bucket.region != state.region { + return s3_redirect(&bucket.region); + } + (axum::http::StatusCode::OK, "").into_response() + }, Ok(None) => s3_error( "NoSuchBucket", "The specified bucket does not exist", @@ -267,6 +298,17 @@ async fn list_objects( .unwrap() } Err(status) => match status.code() { + tonic::Code::FailedPrecondition => { + if status.message().starts_with("Bucket is in region ") { + let region = status.message().trim_start_matches("Bucket is in region "); + return s3_redirect(region); + } + s3_error( + "PreconditionFailed", + status.message(), + axum::http::StatusCode::PRECONDITION_FAILED, + ) + } tonic::Code::NotFound => { if req.extensions().get::().is_none() { s3_error( @@ -337,6 +379,17 @@ async fn get_object( .unwrap() } Err(status) => match status.code() { + tonic::Code::FailedPrecondition => { + if status.message().starts_with("Bucket is in region ") { + let region = status.message().trim_start_matches("Bucket is in region "); + return s3_redirect(region); + } + s3_error( + "PreconditionFailed", + status.message(), + axum::http::StatusCode::PRECONDITION_FAILED, + ) + } tonic::Code::NotFound => { if req.extensions().get::().is_none() { s3_error( @@ -398,6 +451,17 @@ async fn put_object( .body(Body::empty()) .unwrap(), Err(status) => match status.code() { + tonic::Code::FailedPrecondition => { + if status.message().starts_with("Bucket is in region ") { + let region = status.message().trim_start_matches("Bucket is in region "); + return s3_redirect(region); + } + s3_error( + "PreconditionFailed", + status.message(), + axum::http::StatusCode::PRECONDITION_FAILED, + ) + } tonic::Code::NotFound => s3_error( "NoSuchBucket", status.message(), @@ -437,6 +501,17 @@ async fn head_object( .body(Body::empty()) .unwrap(), Err(status) => match status.code() { + tonic::Code::FailedPrecondition => { + if status.message().starts_with("Bucket is in region ") { + let region = status.message().trim_start_matches("Bucket is in region "); + return s3_redirect(region); + } + s3_error( + "PreconditionFailed", + status.message(), + axum::http::StatusCode::PRECONDITION_FAILED, + ) + } tonic::Code::NotFound => { if req.extensions().get::().is_none() { s3_error( diff --git a/anvil/tests/auth_tests.rs b/anvil/tests/auth_tests.rs index d40c895..e98c7e3 100644 --- a/anvil/tests/auth_tests.rs +++ b/anvil/tests/auth_tests.rs @@ -457,8 +457,19 @@ async fn test_admin_cli_set_public_access() { .unwrap(); assert!(set_public_status.success()); - // 4. Verify the object IS public now. - let resp_after = http_client.get(&object_url).send().await.unwrap(); + // 4. Verify the object IS public now, with retries for cache consistency. + let mut resp_after = None; + for i in 0..5 { // Retry up to 5 times + let resp = http_client.get(&object_url).send().await.unwrap(); + if resp.status() == 200 { + resp_after = Some(resp); + break; + } + tokio::time::sleep(Duration::from_millis(500)).await; // Wait 500ms before retrying + println!("Retry {} for public access check...", i + 1); + } + let resp_after = resp_after.expect("Object should be public after CLI command, but never became public"); + assert_eq!( resp_after.status(), 200, diff --git a/anvil/tests/cluster.rs b/anvil/tests/cluster.rs index 3db07df..a3e31b7 100644 --- a/anvil/tests/cluster.rs +++ b/anvil/tests/cluster.rs @@ -22,6 +22,7 @@ async fn test_cluster_gossip() { init_cluster: false, enable_mdns: true, // Enable for this specific gossip test cluster_secret: Some("test-secret".to_string()), + metadata_cache_ttl_secs: 1, }); // 1. Create two swarms let mut swarm1 = create_swarm(config.clone()).await.unwrap(); @@ -119,6 +120,7 @@ async fn test_cluster_gossip_invalid_secret() { init_cluster: false, enable_mdns: true, cluster_secret: Some("secret-1".to_string()), + metadata_cache_ttl_secs: 1, }); let config2 = Arc::new(anvil::config::Config { global_database_url: "".to_string(), @@ -135,6 +137,7 @@ async fn test_cluster_gossip_invalid_secret() { init_cluster: false, enable_mdns: true, cluster_secret: Some("secret-2".to_string()), + metadata_cache_ttl_secs: 1, }); let mut swarm1 = create_swarm(config1).await.unwrap();