diff --git a/Cargo.lock b/Cargo.lock index eff1a76..4c383ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1393,7 +1393,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util 0.7.10", + "tokio-util", "tracing", ] @@ -1780,7 +1780,6 @@ dependencies = [ "maplit", "rdkafka", "rusoto_core", - "rusoto_credential", "rusoto_s3", "schema_registry_converter", "sentry", @@ -1792,7 +1791,7 @@ dependencies = [ "thiserror", "time 0.3.31", "tokio", - "tokio-util 0.6.10", + "tokio-util", "url", "utime", "uuid 1.6.1", @@ -2762,7 +2761,7 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls 0.24.1", - "tokio-util 0.7.10", + "tokio-util", "tower-service", "url", "wasm-bindgen", @@ -3680,20 +3679,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-util" -version = "0.6.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" -dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "log", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.7.10" diff --git a/Cargo.toml b/Cargo.toml index 02e3a7d..d9443e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ serde_json = "1" strum_macros = "0.20" thiserror = "1" tokio = { version = "1", features = ["full"] } -tokio-util = "0.6.3" +tokio-util = "0.7.10" uuid = { version = "1.0", features = ["serde", "v4"] } url = "2.3" @@ -35,8 +35,6 @@ deltalake-azure = { git = "https://github.com/delta-io/delta-rs", branch = "main # s3 feature enabled dynamodb_lock = { version = "0.6.0", optional = true } -rusoto_core = { version = "0.47", default-features = false, features = ["rustls"], optional = true } -rusoto_credential = { version = "0.47", optional = true } # sentry sentry = { version = "0.23.0", optional = true } @@ -68,8 +66,6 @@ azure = [ s3 = [ "deltalake-aws", "dynamodb_lock", - "rusoto_core", - "rusoto_credential", ] [dev-dependencies] @@ -77,6 +73,7 @@ utime = "0.3" serial_test = "*" tempfile = "3" time = "0.3.20" +rusoto_core = { version = "0.47", default-features = false, features = ["rustls"]} rusoto_s3 = { version = "0.47", default-features = false, features = ["rustls"]} [profile.release] diff --git a/src/dead_letters.rs b/src/dead_letters.rs index a4a1c7c..4d25fec 100644 --- a/src/dead_letters.rs +++ b/src/dead_letters.rs @@ -255,7 +255,7 @@ impl DeltaSinkDeadLetterQueue { dynamo_lock_options::DYNAMO_LOCK_PARTITION_KEY_VALUE.to_string() => std::env::var(env_vars::DEAD_LETTER_DYNAMO_LOCK_PARTITION_KEY_VALUE) .unwrap_or_else(|_| "kafka_delta_ingest-dead_letters".to_string()), }; - #[cfg(all(feature = "azure", not(feature="s3")))] + #[cfg(all(feature = "azure", not(feature = "s3")))] let opts = HashMap::default(); let table = crate::delta_helpers::load_table(table_uri, opts.clone()).await?; diff --git a/src/lib.rs b/src/lib.rs index 2af55a6..6c0b099 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ #![deny(warnings)] #![deny(missing_docs)] +#![allow(unused)] #[macro_use] extern crate lazy_static; diff --git a/src/serialization/mod.rs b/src/serialization/mod.rs index 136b80b..171505d 100644 --- a/src/serialization/mod.rs +++ b/src/serialization/mod.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; -use serde_json::Value; use log::*; +use serde_json::Value; use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat}; @@ -24,6 +24,13 @@ pub struct DeserializedMessage { } impl DeserializedMessage { + fn new(message: Value) -> Self { + Self { + message, + ..Default::default() + } + } + pub fn schema(&self) -> &Option { &self.schema } @@ -41,9 +48,7 @@ impl DeserializedMessage { /// Allow for `.into()` on [Value] for ease of use impl From for DeserializedMessage { fn from(message: Value) -> Self { - // XXX: This seems wasteful, this function should go away, and the deserializers should - // infer straight from the buffer stream - let iter = vec![message.clone()].into_iter().map(Ok); + let iter = std::iter::once(&message).map(Ok); let schema = match deltalake_core::arrow::json::reader::infer_json_schema_from_iterator(iter) { Ok(schema) => Some(schema), @@ -169,7 +174,10 @@ impl MessageDeserializer for DefaultDeserializer { } }; - Ok(value.into()) + match self.can_evolve_schema() { + true => Ok(value.into()), + false => Ok(DeserializedMessage::new(value)), + } } } @@ -183,8 +191,20 @@ mod default_tests { } #[tokio::test] - async fn deserialize_with_schema() { + async fn deserializer_default_without_evolution() { let mut deser = DefaultDeserializer::default(); + let dm = deser + .deserialize(r#"{"hello" : "world"}"#.as_bytes()) + .await + .unwrap(); + assert_eq!(true, dm.schema().is_none()); + } + + #[tokio::test] + async fn deserialize_with_schema() { + let mut deser = DefaultDeserializer { + schema_evolution: true, + }; let message = deser .deserialize(r#"{"hello" : "world"}"#.as_bytes()) .await