Skip to content

Commit

Permalink
chore: fix formatting on stream! and try_stream! macros
Browse files Browse the repository at this point in the history
tokio-rs/async-stream#68 pointed out we can
use the `stream!({ ... })` macro syntax to force rustfmt to format the
inner code block.
  • Loading branch information
TroyKomodo committed Jun 14, 2023
1 parent 0284620 commit ac4d79b
Show file tree
Hide file tree
Showing 12 changed files with 177 additions and 100 deletions.
24 changes: 16 additions & 8 deletions backend/api/src/api/v1/gql/subscription/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,30 @@ impl ChatSubscription {
.await
.map_err_gql("failed to subscribe to chat messages")?;

Ok(stream! {
Ok(stream!({
yield Ok(welcome_message);
while let Ok(message) = message_stream.recv().await {
let event = pb::scuffle::events::ChatMessage::decode(
message.as_bytes().map_err_gql("invalid redis value type")?
).map_err_gql("failed to decode chat message")?;
message.as_bytes().map_err_gql("invalid redis value type")?,
)
.map_err_gql("failed to decode chat message")?;

yield Ok(ChatMessage {
id: Uuid::parse_str(&event.id).map_err_gql("failed to parse chat message id")?,
author_id: Uuid::parse_str(&event.author_id).map_err_gql("failed to parse chat message author id")?,
channel_id: Uuid::parse_str(&event.channel_id).map_err_gql("failed to parse chat message channel id")?,
id: Uuid::parse_str(&event.id)
.map_err_gql("failed to parse chat message id")?,
author_id: Uuid::parse_str(&event.author_id)
.map_err_gql("failed to parse chat message author id")?,
channel_id: Uuid::parse_str(&event.channel_id)
.map_err_gql("failed to parse chat message channel id")?,
content: event.content,
created_at: Utc.timestamp_opt(event.created_at, 0).single().map_err_gql("failed to parse chat message created at")?.into(),
created_at: Utc
.timestamp_opt(event.created_at, 0)
.single()
.map_err_gql("failed to parse chat message created at")?
.into(),
r#type: MessageType::User,
});
}
})
}))
}
}
9 changes: 5 additions & 4 deletions backend/api/src/api/v1/gql/subscription/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,17 @@ impl UserSubscription {
.await
.map_err_gql("failed to subscribe to user display name")?;

Ok(async_stream::stream! {
Ok(async_stream::stream!({
yield Ok(DisplayNameStream {
display_name: user.display_name.clone(),
username: user.username.clone(),
});

while let Ok(message) = subscription.recv().await {
let event = pb::scuffle::events::UserDisplayName::decode(
message.as_bytes().map_err_gql("invalid redis value")?
).map_err_gql("failed to decode user display name")?;
message.as_bytes().map_err_gql("invalid redis value")?,
)
.map_err_gql("failed to decode user display name")?;

if let Some(username) = event.username {
user.username = username;
Expand All @@ -63,6 +64,6 @@ impl UserSubscription {
username: user.username.clone(),
});
}
})
}))
}
}
15 changes: 11 additions & 4 deletions backend/api/src/grpc/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,24 @@ impl health_server::Health for HealthServer {
async fn watch(&self, _: Request<HealthCheckRequest>) -> Result<Response<Self::WatchStream>> {
let global = self.global.clone();

let output = try_stream! {
let output = try_stream!({
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

let serving = global.upgrade().map(|g| !g.ctx.is_done()).unwrap_or_default();
let serving = global
.upgrade()
.map(|g| !g.ctx.is_done())
.unwrap_or_default();

yield HealthCheckResponse {
status: if serving { ServingStatus::Serving.into() } else { ServingStatus::NotServing.into() },
status: if serving {
ServingStatus::Serving.into()
} else {
ServingStatus::NotServing.into()
},
};
}
};
});

Ok(Response::new(Box::pin(output)))
}
Expand Down
28 changes: 14 additions & 14 deletions common/src/rmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,35 +127,35 @@ impl ConnectionPool {
let queue_name = queue_name.to_string();
let connection_name = connection_name.to_string();

stream! {
stream!({
'connection_loop: loop {
let channel = self.aquire().await?;
let mut consumer = channel.basic_consume(&queue_name, &connection_name, options, table.clone()).await?;
let mut consumer = channel
.basic_consume(&queue_name, &connection_name, options, table.clone())
.await?;
loop {
let m = consumer.next().await;
match m {
Some(Ok(m)) => {
yield Ok(m);
},
Some(Err(e)) => {
match e {
lapin::Error::IOError(e) => {
if e.kind() == std::io::ErrorKind::ConnectionReset {
continue 'connection_loop;
}
},
_ => {
yield Err(anyhow!("failed to get message: {}", e));
}
Some(Err(e)) => match e {
lapin::Error::IOError(e) => {
if e.kind() == std::io::ErrorKind::ConnectionReset {
continue 'connection_loop;
}
}
_ => {
yield Err(anyhow!("failed to get message: {}", e));
}
},
None => {
continue 'connection_loop;
},
}
}
}
}
}
})
}

pub async fn aquire(&self) -> Result<Channel> {
Expand Down
15 changes: 11 additions & 4 deletions video/edge/src/grpc/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,24 @@ impl health_server::Health for HealthServer {
async fn watch(&self, _: Request<HealthCheckRequest>) -> Result<Response<Self::WatchStream>> {
let global = self.global.clone();

let output = try_stream! {
let output = try_stream!({
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

let serving = global.upgrade().map(|g| !g.ctx.is_done()).unwrap_or_default();
let serving = global
.upgrade()
.map(|g| !g.ctx.is_done())
.unwrap_or_default();

yield HealthCheckResponse {
status: if serving { ServingStatus::Serving.into() } else { ServingStatus::NotServing.into() },
status: if serving {
ServingStatus::Serving.into()
} else {
ServingStatus::NotServing.into()
},
};
}
};
});

Ok(Response::new(Box::pin(output)))
}
Expand Down
15 changes: 11 additions & 4 deletions video/ingest/src/grpc/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,24 @@ impl health_server::Health for HealthServer {
async fn watch(&self, _: Request<HealthCheckRequest>) -> Result<Response<Self::WatchStream>> {
let global = self.global.clone();

let output = try_stream! {
let output = try_stream!({
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

let serving = global.upgrade().map(|g| !g.ctx.is_done()).unwrap_or_default();
let serving = global
.upgrade()
.map(|g| !g.ctx.is_done())
.unwrap_or_default();

yield HealthCheckResponse {
status: if serving { ServingStatus::Serving.into() } else { ServingStatus::NotServing.into() },
status: if serving {
ServingStatus::Serving.into()
} else {
ServingStatus::NotServing.into()
},
};
}
};
});

Ok(Response::new(Box::pin(output)))
}
Expand Down
48 changes: 23 additions & 25 deletions video/ingest/src/grpc/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,39 +68,37 @@ impl ingest_server::Ingest for IngestServer {
return Err(Status::not_found("Stream not found"));
}

let output = try_stream! {
let output = try_stream!({
while let Some(event) = channel_rx.recv().await {
let event = match event {
WatchStreamEvent::InitSegment(data) => {
WatchStreamResponse {
data: Some(watch_stream_response::Data::InitSegment(data)),
}
WatchStreamEvent::InitSegment(data) => WatchStreamResponse {
data: Some(watch_stream_response::Data::InitSegment(data)),
},
WatchStreamEvent::MediaSegment(ms) => {
WatchStreamResponse {
data: Some(watch_stream_response::Data::MediaSegment(
watch_stream_response::MediaSegment {
data: ms.data,
keyframe: ms.keyframe,
timestamp: ms.timestamp,
data_type: match ms.ty {
transmuxer::MediaType::Audio => watch_stream_response::media_segment::DataType::Audio.into(),
transmuxer::MediaType::Video => watch_stream_response::media_segment::DataType::Video.into(),
WatchStreamEvent::MediaSegment(ms) => WatchStreamResponse {
data: Some(watch_stream_response::Data::MediaSegment(
watch_stream_response::MediaSegment {
data: ms.data,
keyframe: ms.keyframe,
timestamp: ms.timestamp,
data_type: match ms.ty {
transmuxer::MediaType::Audio => {
watch_stream_response::media_segment::DataType::Audio.into()
}
}
)),
}
}
WatchStreamEvent::ShuttingDown(stream_shutdown) => {
WatchStreamResponse {
data: Some(watch_stream_response::Data::ShuttingDown(stream_shutdown)),
}
}
transmuxer::MediaType::Video => {
watch_stream_response::media_segment::DataType::Video.into()
}
},
},
)),
},
WatchStreamEvent::ShuttingDown(stream_shutdown) => WatchStreamResponse {
data: Some(watch_stream_response::Data::ShuttingDown(stream_shutdown)),
},
};

yield event;
}
};
});

Ok(Response::new(Box::pin(output)))
}
Expand Down
11 changes: 8 additions & 3 deletions video/ingest/src/tests/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,13 @@ impl TestState {

let stream = {
let global = global.clone();
stream! {
let mut stream = pin!(global.rmq.basic_consume(global.config.transcoder.events_subject.clone(), "", Default::default(), Default::default()));
stream!({
let mut stream = pin!(global.rmq.basic_consume(
global.config.transcoder.events_subject.clone(),
"",
Default::default(),
Default::default()
));
loop {
select! {
message = stream.next() => {
Expand All @@ -288,7 +293,7 @@ impl TestState {
}
}
}
}
})
};

Self {
Expand Down
15 changes: 11 additions & 4 deletions video/transcoder/src/grpc/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,24 @@ impl health_server::Health for HealthServer {
async fn watch(&self, _: Request<HealthCheckRequest>) -> Result<Response<Self::WatchStream>> {
let global = self.global.clone();

let output = try_stream! {
let output = try_stream!({
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

let serving = global.upgrade().map(|g| !g.ctx.is_done()).unwrap_or_default();
let serving = global
.upgrade()
.map(|g| !g.ctx.is_done())
.unwrap_or_default();

yield HealthCheckResponse {
status: if serving { ServingStatus::Serving.into() } else { ServingStatus::NotServing.into() },
status: if serving {
ServingStatus::Serving.into()
} else {
ServingStatus::NotServing.into()
},
};
}
};
});

Ok(Response::new(Box::pin(output)))
}
Expand Down
4 changes: 2 additions & 2 deletions video/transcoder/src/transcoder/job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ fn report_to_ingest(
mut client: IngestClient<Channel>,
mut channel: mpsc::Receiver<TranscoderEventRequest>,
) -> impl Stream<Item = Result<()>> + Send + 'static {
stream! {
stream!({
loop {
select! {
msg = channel.recv() => {
Expand All @@ -276,7 +276,7 @@ fn report_to_ingest(
}
}
}
}
})
}

impl Job {
Expand Down
Loading

0 comments on commit ac4d79b

Please sign in to comment.