Skip to content
This repository was archived by the owner on Sep 4, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/bin/create-oblivion-es.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use clap::Parser;
use oddbot::{event_stream::create_nats_client, prelude::*};
use oddbot::{nats::create_nats_client, prelude::*};

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
Expand Down
2 changes: 1 addition & 1 deletion src/bin/oblivion/app_state.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use oddbot::{event_stream::create_nats_client, prelude::*, skeever::squeak::Squeak};
use oddbot::{nats::create_nats_client, prelude::*, skeever::squeak::Squeak};
use std::sync::Arc;
use tokio::sync::broadcast;

Expand Down
21 changes: 19 additions & 2 deletions src/bin/oblivion/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,24 @@ async fn ws_handler(
/// Handles individual websocket connections
async fn handle_socket(socket: ws::WebSocket, state: AppState) {
let mut event_receiver = state.event_sender.subscribe();
let (mut sender, mut receiver) = socket.split();
let (sender, mut receiver) = socket.split();

// Send historical messages first and get the sender back
let mut sender = match websockets::send_historical_messages(
state
.get_event_stream()
.await
.expect("Could not create connection to event stream"),
sender,
)
.await
{
Ok(sender) => sender,
Err(e) => {
tracing::error!("Failed to send historical messages: {:?}", e);
return;
}
};

// Handle incoming messages in a separate task
let receiver_task = tokio::spawn(async move {
Expand Down Expand Up @@ -102,7 +119,7 @@ async fn handle_socket(socket: ws::WebSocket, state: AppState) {
{
break;
}
tracing::debug!("Successfuly squeaked {:?}", &squeak);
tracing::debug!("Successfully squeaked {:?}", &squeak);
}
});

Expand Down
101 changes: 92 additions & 9 deletions src/bin/oblivion/websockets.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
use futures::StreamExt;
use oddbot::{prelude::EventStream, skeever::squeak::Squeak};
use std::sync::Arc;
use async_nats::jetstream;
use axum::extract::ws::{Message, WebSocket};
use futures::{SinkExt, StreamExt, stream::SplitSink};
use oddbot::{error::OddbotError, prelude::EventStream, skeever::squeak::Squeak};
use std::{sync::Arc, time::Duration};
use tokio::sync::broadcast;

pub async fn forward_events_to_websockets(
listener: Arc<EventStream>,
event_sender: broadcast::Sender<Squeak>,
) {
let skeever_subject = Squeak::get_subject();
let consumer_name = format!("oblivion_websocket_consumer_{}", ulid::Ulid::new());

let Ok(consumer) = listener
.create_consumer(Some(consumer_name), skeever_subject)
let consumer = match listener
.create_consumer(
Some("oblivion_websocket_main_consumer".to_string()),
skeever_subject.clone(),
Some(jetstream::consumer::DeliverPolicy::New),
)
.await
else {
tracing::error!("Failed to create consumer");
return;
{
Ok(consumer) => consumer,
Err(e) => {
tracing::error!("Failed to create consumer: {:?}", e);
return;
}
};

loop {
Expand Down Expand Up @@ -48,10 +56,85 @@ pub async fn forward_events_to_websockets(
tracing::error!("Failed to broadcast event: {}", e);
}
},
};

if let Err(e) = message.ack().await {
tracing::error!("Failed to ack message: {}", e);
}
}

// Add a small delay between fetches
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}

#[derive(Debug, Clone)]
pub struct WebSocketConfig {
pub historical_batch_size: usize,
pub historical_max_age: Option<Duration>,
pub historical_batch_delay: Duration,
}

impl Default for WebSocketConfig {
fn default() -> Self {
Self {
historical_batch_size: 100,
historical_max_age: Some(Duration::from_secs(60 * 60 * 24)), // 24 hours
historical_batch_delay: Duration::from_millis(50),
}
}
}

pub async fn send_historical_messages(
listener: Arc<EventStream>,
mut ws_sender: SplitSink<WebSocket, Message>,
) -> Result<SplitSink<WebSocket, Message>, OddbotError> {
let skeever_subject = Squeak::get_subject();
let batch_size = 100;

let temp_consumer = listener
.create_consumer(
None,
skeever_subject,
Some(jetstream::consumer::DeliverPolicy::All),
)
.await?;

loop {
let mut messages = temp_consumer
.fetch()
.max_messages(batch_size)
.messages()
.await?;

let mut batch_count = 0;
while let Some(message) = messages.next().await {
let Ok(message) = message else {
tracing::error!("Failed to fetch message");
continue;
};
let squeak = serde_json::from_slice::<Squeak>(&message.payload)?;

ws_sender
.send(Message::Text(serde_json::to_string(&squeak)?.into()))
.await
.map_err(|e| OddbotError::WebsocketSend(e.to_string()))?;

message
.ack()
.await
.map_err(|e| OddbotError::WebsocketSend(e.to_string()))?; // Convert ack error

batch_count += 1;
}

if batch_count < batch_size {
break; // No more messages
}

// Optional: Add a small delay between batches to prevent overwhelming the client
tokio::time::sleep(Duration::from_millis(50)).await;
}

Ok(ws_sender)
}
12 changes: 8 additions & 4 deletions src/bin/oddbot.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use oddbot::{db, event_stream::create_nats_client, prelude::*};
use oddbot::{db, discord::character::CharacterStore, nats::create_nats_client, prelude::*};
use std::sync::Arc;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand All @@ -21,8 +21,8 @@ async fn main() -> Result<(), OddbotError> {

let event_stream = match event_stream_name {
Some(name) => {
let nats_client = create_nats_client().await?;
let event_stream = EventStream::connect(name, nats_client).await?;
let event_nats = create_nats_client().await?;
let event_stream = EventStream::connect(name, event_nats).await?;
Some(Arc::new(event_stream))
}
None => {
Expand All @@ -31,8 +31,12 @@ async fn main() -> Result<(), OddbotError> {
}
};

// Connect to our Oblivion character store
let character_nats = create_nats_client().await?;
let character_store = Arc::new(CharacterStore::new(character_nats).await?);

// Initialize our bot
let mut oddbot = DiscordBot::init(pool, event_stream).await?;
let mut oddbot = DiscordBot::init(pool, event_stream, character_store).await?;

// Finally, start a single shard, and start listening to events.
//
Expand Down
12 changes: 9 additions & 3 deletions src/discord/bot.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
use super::{character::CharacterStore, handler::Handler};
use crate::prelude::*;
use serenity::{Client, all::GatewayIntents};
use sqlx::PgPool;
use std::{env, sync::Arc};

use super::handler::Handler;

#[allow(dead_code)] // TODO: Remove after we use db_pool
pub struct DiscordBot {
pub client: Client,
db_pool: Arc<PgPool>,
event_stream: Option<Arc<EventStream>>,
character_store: Arc<CharacterStore>,
}

impl DiscordBot {
pub async fn init(
db_pool: Arc<PgPool>,
event_stream: Option<Arc<EventStream>>,
character_store: Arc<CharacterStore>,
) -> Result<Self, OddbotError> {
// Get our discord token
let discord_token = env::var("DISCORD_TOKEN").map_err(OddbotError::EnvVar)?;

// Create a handler for handling Discord events
let handler = Handler::new(db_pool.clone(), event_stream.clone());
let handler = Handler::new(
db_pool.clone(),
event_stream.clone(),
character_store.clone(),
);

// Declare our intents for events we're going to listen to
let intents = GatewayIntents::GUILDS
Expand All @@ -40,6 +45,7 @@ impl DiscordBot {
client,
db_pool,
event_stream,
character_store,
})
}
}
Loading