Skip to content

Refactor Publishing #34

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ maintenance = { status = "actively-developed" }
default = []

# Backends
google = ["ya-gcp", "tracing"]
mock = ["async-channel"]
google = ["ya-gcp", "tracing", "parking_lot"]
mock = ["async-channel", "parking_lot"]

# Validators
json-schema = ["valico", "serde_json", "serde"]
protobuf = ["prost"]

[[example]]
name = "googlepubsub"
required-features = ["google", "json-schema"]
required-features = ["google", "protobuf"]

[dependencies]
async-trait = { version = "0.1" }
Expand All @@ -47,17 +47,21 @@ uuid = { version = "^0.8", features = ["v4"], default-features = false }
async-channel = { version = "1.6", optional = true }
serde = { version = "^1.0", optional = true, default-features = false }
serde_json = { version = "^1", features = ["std"], optional = true, default-features = false }
ya-gcp = { version = "0.6", features = ["pubsub"], optional = true }
parking_lot = { version = "0.11", optional = true }
prost = { version = "0.8", optional = true, features = ["std"], default-features = false }
tracing = { version = "0.1.26", optional = true }
valico = { version = "^3.2", optional = true, default-features = false }
ya-gcp = { version = "0.6.3", features = ["pubsub"], optional = true }

[dev-dependencies]
async-channel = { version = "1.6" }
futures-channel = "0.3.17"
parking_lot = { version = "0.11" }
prost = { version = "0.8", features = ["std", "prost-derive"] }
tokio = { version = "1", features = ["macros", "rt"] }
tonic = "0.5"
serde = { version = "1", features = ["derive"] }
ya-gcp = { version = "0.6", features = ["pubsub", "emulators"] }
ya-gcp = { version = "0.6.3", features = ["pubsub", "emulators"] }
structopt = "0.3"

[package.metadata.docs.rs]
Expand Down
187 changes: 140 additions & 47 deletions examples/googlepubsub.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,41 @@
use futures_util::{SinkExt, StreamExt};
//! An example of ingesting messages from a PubSub subscription, applying a
//! transformation, then submitting those transformations to another PubSub topic.

use futures_util::{SinkExt, StreamExt, TryFutureExt};
use hedwig::{
googlepubsub::{
AuthFlow, ClientBuilder, ClientBuilderConfig, PubSubConfig, ServiceAccountAuth,
StreamSubscriptionConfig, SubscriptionConfig, SubscriptionName, TopicConfig, TopicName,
AcknowledgeToken, AuthFlow, ClientBuilder, ClientBuilderConfig, PubSubConfig,
PubSubMessage, PublishError, ServiceAccountAuth, StreamSubscriptionConfig,
SubscriptionConfig, SubscriptionName, TopicConfig, TopicName,
},
validators, Consumer, DecodableMessage, EncodableMessage, Headers, Publisher,
};
use std::time::SystemTime;
use std::{error::Error as StdError, time::SystemTime};
use structopt::StructOpt;

#[derive(Clone, PartialEq, Eq, prost::Message)]
const USER_CREATED_TOPIC: &str = "user.created";
const USER_UPDATED_TOPIC: &str = "user.updated";

/// The input data, representing some user being created with the given name
#[derive(PartialEq, Eq, prost::Message)]
struct UserCreatedMessage {
#[prost(string, tag = "1")]
payload: String,
name: String,
}

impl<'a> EncodableMessage for UserCreatedMessage {
impl EncodableMessage for UserCreatedMessage {
type Error = validators::ProstValidatorError;
type Validator = validators::ProstValidator;
fn topic(&self) -> hedwig::Topic {
"user.created".into()
USER_CREATED_TOPIC.into()
}
fn encode(self, validator: &Self::Validator) -> Result<hedwig::ValidatedMessage, Self::Error> {
fn encode(&self, validator: &Self::Validator) -> Result<hedwig::ValidatedMessage, Self::Error> {
Ok(validator.validate(
uuid::Uuid::new_v4(),
SystemTime::now(),
"user.created/1.0",
Headers::new(),
&self,
self,
)?)
}
}
Expand All @@ -42,6 +50,46 @@ impl DecodableMessage for UserCreatedMessage {
}
}

/// The output data, where the given user has now been assigned an ID and some metadata
#[derive(PartialEq, Eq, prost::Message)]
struct UserUpdatedMessage {
#[prost(string, tag = "1")]
name: String,

#[prost(int64, tag = "2")]
id: i64,

#[prost(string, tag = "3")]
metadata: String,
}

#[derive(Debug)]
struct TransformedMessage {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like something that might make sense to have provided by the library in a generic way, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's the AcknowledgeableMessage type that the user could employ (and a pubsub-specific alias PubSubMessage). Now that i think about it, I should alias the TransformedMessage type to one of those.

I've also thought about adding a map function to AcknowledgeableMessage so that a user could operate on the message and leave the token as-is, but then this goes down the rabbit hole of monad-ish types (implement flatmap? flatten? fallible mapping? async mapping?). Instead i left the fields as public so the user can construct them as they wish

// keep the input's ack token to ack when the output is successfully published, or nack on
// failure
input_token: AcknowledgeToken,
output: UserUpdatedMessage,
}

impl EncodableMessage for TransformedMessage {
type Error = validators::ProstValidatorError;
type Validator = validators::ProstValidator;

fn topic(&self) -> hedwig::Topic {
USER_UPDATED_TOPIC.into()
}

fn encode(&self, validator: &Self::Validator) -> Result<hedwig::ValidatedMessage, Self::Error> {
Ok(validator.validate(
uuid::Uuid::new_v4(),
SystemTime::now(),
"user.updated/1.0",
Headers::new(),
&self.output,
)?)
}
}

#[derive(Debug, StructOpt)]
struct Args {
/// The name of the pubsub project
Expand All @@ -50,7 +98,7 @@ struct Args {
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error + 'static>> {
async fn main() -> Result<(), Box<dyn StdError>> {
let args = Args::from_args();

println!("Building PubSub clients");
Expand All @@ -61,49 +109,59 @@ async fn main() -> Result<(), Box<dyn std::error::Error + 'static>> {
)
.await?;

let topic_name = TopicName::new("pubsub_example_topic");
let subscription_name = SubscriptionName::new("pubsub_example_subscription");
let input_topic_name = TopicName::new(USER_CREATED_TOPIC);
let subscription_name = SubscriptionName::new("user-metadata-updaters");

let output_topic_name = TopicName::new(USER_UPDATED_TOPIC);
const APP_NAME: &str = "user-metadata-updater";

let mut publisher_client = builder
.build_publisher(&args.project_name, "myapp_publisher")
.await?;
let mut consumer_client = builder
.build_consumer(&args.project_name, "myapp_consumer")
.build_publisher(&args.project_name, APP_NAME)
.await?;
let mut consumer_client = builder.build_consumer(&args.project_name, APP_NAME).await?;

println!("Creating topic {:?}", &topic_name);
for topic_name in [&input_topic_name, &output_topic_name] {
println!("Creating topic {:?}", topic_name);

publisher_client
.create_topic(TopicConfig {
name: topic_name.clone(),
..TopicConfig::default()
})
.await?;
publisher_client
.create_topic(TopicConfig {
name: topic_name.clone(),
..TopicConfig::default()
})
.await?;
}

println!("Creating subscription {:?}", &subscription_name);

consumer_client
.create_subscription(SubscriptionConfig {
topic: topic_name.clone(),
topic: input_topic_name.clone(),
name: subscription_name.clone(),
..SubscriptionConfig::default()
})
.await?;

println!("Publishing message to topic");
println!(
"Synthesizing input messages for topic {:?}",
&input_topic_name
);

let validator = hedwig::validators::ProstValidator::new();
let mut publisher = publisher_client.publisher().publish_sink(validator);
{
let validator = validators::ProstValidator::new();
let mut input_sink =
Publisher::<UserCreatedMessage>::publish_sink(publisher_client.publisher(), validator);

for i in 1..=10 {
let message = UserCreatedMessage {
payload: format!("this is message #{}", i),
};
for i in 1..=10 {
let message = UserCreatedMessage {
name: format!("Example Name #{}", i),
};

publisher.send(message).await?;
input_sink.feed(message).await?;
}
input_sink.flush().await?;
}

println!("Reading back published message");
println!("Ingesting input messages, applying transformations, and publishing to destination");

let mut read_stream = consumer_client
.stream_subscription(
Expand All @@ -114,35 +172,70 @@ async fn main() -> Result<(), Box<dyn std::error::Error + 'static>> {
hedwig::validators::prost::ExactSchemaMatcher::new("user.created/1.0"),
));

let mut output_sink = Publisher::<TransformedMessage, _>::publish_sink_with_responses(
publisher_client.publisher(),
validators::ProstValidator::new(),
futures_util::sink::unfold((), |_, message: TransformedMessage| async move {
// if the output is successfully sent, ack the input to mark it as processed
message.input_token.ack().await
}),
);

for i in 1..=10 {
let message = read_stream
let PubSubMessage { ack_token, message } = read_stream
.next()
.await
.ok_or("unexpected end of stream")??
.ack()
.await?;
.expect("stream should have 10 elements")?;

assert_eq!(
message,
UserCreatedMessage {
payload: format!("this is message #{}", i)
}
);
assert_eq!(&message.name, &format!("Example Name #{}", i));

let transformed = TransformedMessage {
output: UserUpdatedMessage {
name: message.name,
id: random_id(),
metadata: "some metadata".into(),
},
input_token: ack_token,
};

output_sink
.feed(transformed)
.or_else(|publish_error| async move {
// if publishing fails, nack the failed messages to allow later retries
Err(match publish_error {
PublishError::Publish { cause, messages } => {
for failed_transform in messages {
failed_transform.input_token.nack().await?;
}
Box::<dyn StdError>::from(cause)
}
err => Box::<dyn StdError>::from(err),
})
})
.await?
}
output_sink.flush().await?;

println!("All messages matched!");
println!("All messages matched and published successfully!");

println!("Deleting subscription {:?}", &subscription_name);

consumer_client
.delete_subscription(subscription_name)
.await?;

println!("Deleting topic {:?}", &topic_name);
for topic_name in [input_topic_name, output_topic_name] {
println!("Deleting topic {:?}", &topic_name);

publisher_client.delete_topic(topic_name).await?;
publisher_client.delete_topic(topic_name).await?;
}

println!("Done");

Ok(())
}

fn random_id() -> i64 {
4 // chosen by fair dice roll.
// guaranteed to be random.
}
5 changes: 3 additions & 2 deletions src/backends/googlepubsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use std::{borrow::Cow, fmt::Display};
pub use ya_gcp::{
grpc::StatusCodeSet,
pubsub::{
AcknowledgeError, BuildError, Error as PubSubError, MakeConnection, ModifyAcknowledgeError,
PubSubConfig, SinkError, StreamSubscriptionConfig, Uri, DEFAULT_RETRY_CODES,
AcknowledgeError, AcknowledgeToken, BuildError, Error as PubSubError, MakeConnection,
ModifyAcknowledgeError, PubSubConfig, SinkError, StreamSubscriptionConfig, Uri,
DEFAULT_RETRY_CODES,
},
retry_policy, AuthFlow, ClientBuilderConfig, Connect, CreateBuilderError, DefaultConnector,
ServiceAccountAuth,
Expand Down
Loading