Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ members = [
resolver = "2"

[workspace.dependencies]
diesel = { version = "2.2", features = ["postgres", "chrono"] }
diesel = { version = "2.2", features = ["postgres", "chrono", "serde_json"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Expand Down
177 changes: 177 additions & 0 deletions crates/divine-atbridge/src/backfill_planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
use std::sync::{Arc, Mutex};

use anyhow::{Context, Result};
use async_trait::async_trait;
use chrono::{TimeZone, Utc};
use diesel::PgConnection;
use divine_bridge_db::models::{AccountLinkLifecycleRow, NewPublishJob};
use divine_bridge_db::{
cancel_publish_job, enqueue_publish_job, list_accounts_requiring_backfill,
mark_account_backfill_completed, mark_account_backfill_failed, mark_account_backfill_started,
};
use divine_bridge_types::{PublishJobSource, PublishState};

use crate::config::DEFAULT_BACKFILL_BATCH_SIZE;
use crate::nostr_consumer::{author_history_filter, collect_history_until_eose, RelayConnection};
use crate::pipeline::{BlobFetcher, BlobUploader, BridgePipeline, PdsPublisher, QueueDecision};

#[async_trait]
pub trait BackfillRelayConnector: Send + Sync {
type Connection: RelayConnection;

async fn connect(&self, relay_url: &str) -> Result<Self::Connection>;
}

pub struct BackfillPlanner<A, R, F, U, P, C> {
relay_url: String,
connection: Arc<Mutex<PgConnection>>,
pipeline: Arc<BridgePipeline<A, R, F, U, P>>,
relay_connector: C,
batch_size: i64,
}

impl<A, R, F, U, P, C> BackfillPlanner<A, R, F, U, P, C>
where
A: crate::pipeline::AccountStore,
R: crate::pipeline::RecordStore,
F: BlobFetcher,
U: BlobUploader,
P: PdsPublisher,
C: BackfillRelayConnector,
{
pub fn new(
relay_url: String,
connection: Arc<Mutex<PgConnection>>,
pipeline: Arc<BridgePipeline<A, R, F, U, P>>,
relay_connector: C,
batch_size: i64,
) -> Self {
Self {
relay_url,
connection,
pipeline,
relay_connector,
batch_size,
}
}

pub fn with_default_batch_size(
relay_url: String,
connection: Arc<Mutex<PgConnection>>,
pipeline: Arc<BridgePipeline<A, R, F, U, P>>,
relay_connector: C,
) -> Self {
Self::new(
relay_url,
connection,
pipeline,
relay_connector,
DEFAULT_BACKFILL_BATCH_SIZE,
)
}

pub async fn run_once(&self) -> Result<()> {
let accounts = {
let mut connection = self.connection.lock().unwrap();
list_accounts_requiring_backfill(&mut connection, self.batch_size)?
};

for account in accounts {
let relay = self.relay_connector.connect(&self.relay_url).await;
let mut relay = match relay {
Ok(relay) => relay,
Err(error) => {
let mut connection = self.connection.lock().unwrap();
mark_account_backfill_failed(
&mut connection,
&account.nostr_pubkey,
&error.to_string(),
)?;
continue;
}
};

let result = self.replay_account_history(&mut relay, &account).await;
let _ = relay.close().await;
if let Err(error) = result {
let mut connection = self.connection.lock().unwrap();
mark_account_backfill_failed(
&mut connection,
&account.nostr_pubkey,
&error.to_string(),
)?;
}
}

Ok(())
}

async fn replay_account_history<RC>(
&self,
relay: &mut RC,
account: &AccountLinkLifecycleRow,
) -> Result<()>
where
RC: RelayConnection,
{
{
let mut connection = self.connection.lock().unwrap();
mark_account_backfill_started(&mut connection, &account.nostr_pubkey)?;
}

let subscription_id = "sub-1".to_string();
let mut history = collect_history_until_eose(
relay,
&subscription_id,
&author_history_filter(account.nostr_pubkey.clone()),
)
.await
.with_context(|| format!("failed to load relay history for {}", account.nostr_pubkey))?;

history.sort_by(|left, right| {
left.created_at
.cmp(&right.created_at)
.then_with(|| left.id.cmp(&right.id))
});

for event in history {
match self.pipeline.prepare_publish_job(&event).await? {
QueueDecision::Enqueue(job) => {
let queued = new_backfill_job(&job)?;
let mut connection = self.connection.lock().unwrap();
enqueue_publish_job(&mut connection, &queued)?;
}
QueueDecision::Cancel { tombstone_job, .. } => {
let tombstone = new_backfill_job(&tombstone_job)?;
let mut connection = self.connection.lock().unwrap();
cancel_publish_job(
&mut connection,
&tombstone,
Some("historical delete replay"),
)?;
}
QueueDecision::Skip { .. } => {}
}
}

let mut connection = self.connection.lock().unwrap();
mark_account_backfill_completed(&mut connection, &account.nostr_pubkey)?;
Ok(())
}
}

fn new_backfill_job(envelope: &crate::pipeline::PublishJobEnvelope) -> Result<NewPublishJob<'_>> {
let event_created_at = Utc
.timestamp_opt(envelope.event_created_at, 0)
.single()
.context("queued event timestamp is out of range")?;

Ok(NewPublishJob {
nostr_event_id: &envelope.nostr_event_id,
nostr_pubkey: &envelope.nostr_pubkey,
event_created_at,
event_payload: envelope.event_payload.clone(),
job_source: PublishJobSource::Backfill.as_str(),
state: PublishState::Pending.as_str(),
})
}
2 changes: 2 additions & 0 deletions crates/divine-atbridge/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use anyhow::{Context, Result};

const PRODUCTION_DIVINE_HANDLE_DOMAIN: &str = "divine.video";
const PRODUCTION_DIVINE_PDS_URL: &str = "https://pds.divine.video";
pub const DEFAULT_BACKFILL_BATCH_SIZE: i64 = 25;
pub const DEFAULT_BACKFILL_PLANNER_INTERVAL_SECS: u64 = 30;

/// Configuration for the ATBridge service.
#[derive(Debug, Clone)]
Expand Down
27 changes: 22 additions & 5 deletions crates/divine-atbridge/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Divine ATBridge library surface.

pub mod backfill_planner;
pub mod config;
pub mod deletion;
pub mod health;
Expand All @@ -23,7 +24,7 @@ use nostr_consumer::{
parse_relay_message, NostrConsumer, NostrFilter, RelayConnection, RelayMessage,
};
use pipeline::{
AccountStore, BlobFetcher, BlobUploader, BridgePipeline, PdsPublisher, ProcessResult,
AccountStore, BlobFetcher, BlobUploader, BridgePipeline, PdsPublisher, QueueDecision,
RecordStore,
};

Expand Down Expand Up @@ -57,13 +58,29 @@ where
match parse_relay_message(&raw) {
Ok(RelayMessage::Event { event, .. }) => {
let created_at = event.created_at;
match pipeline.process_event(&event).await {
ProcessResult::Error { message } => {
tracing::error!(error = %message, event_id = %event.id, "bridge pipeline rejected relay event");
let result = match pipeline.prepare_publish_job(&event).await {
Ok(QueueDecision::Enqueue(job)) => {
pipeline.execute_publish_job(&job).await.map(|_| ())
}
_ => {
Ok(QueueDecision::Cancel { tombstone_job, .. }) => pipeline
.execute_publish_job(&tombstone_job)
.await
.map(|_| ()),
Ok(QueueDecision::Skip { .. }) => Ok(()),
Err(error) => Err(error),
};

match result {
Ok(()) => {
consumer.last_seen_timestamp = Some(created_at);
}
Err(error) => {
tracing::error!(
error = %error,
event_id = %event.id,
"bridge pipeline rejected relay event"
);
}
}
}
Ok(RelayMessage::Eose { .. }) => {}
Expand Down
49 changes: 49 additions & 0 deletions crates/divine-atbridge/src/nostr_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ impl NostrFilter {
}
}

pub fn author_history_filter(author: String) -> NostrFilter {
NostrFilter {
kinds: vec![0, 5, 34235, 34236],
authors: Some(vec![author]),
since: None,
}
}

/// Messages the relay can send us.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RelayMessage {
Expand Down Expand Up @@ -103,6 +111,47 @@ pub fn parse_relay_message(raw: &str) -> Result<RelayMessage> {
}
}

pub async fn collect_history_until_eose<C>(
conn: &mut C,
subscription_id: &str,
filter: &NostrFilter,
) -> Result<Vec<NostrEvent>>
where
C: RelayConnection + ?Sized,
{
let req = serde_json::to_string(&serde_json::json!(["REQ", subscription_id, filter]))
.expect("history REQ serialization cannot fail");
conn.send(req)
.await
.context("failed to send history subscription")?;

let mut events = Vec::new();
while let Some(raw) = conn.recv().await.context("failed to read relay frame")? {
match parse_relay_message(&raw) {
Ok(RelayMessage::Event {
subscription_id: sub_id,
event,
}) if sub_id == subscription_id => events.push(event),
Ok(RelayMessage::Eose {
subscription_id: sub_id,
}) if sub_id == subscription_id => return Ok(events),
Ok(RelayMessage::Notice(message)) => {
tracing::warn!("relay NOTICE: {message}");
}
Ok(RelayMessage::Unknown(_))
| Ok(RelayMessage::Event { .. })
| Ok(RelayMessage::Eose { .. }) => {}
Err(error) => {
tracing::warn!("failed to parse relay message: {error}");
}
}
}

Err(anyhow!(
"relay history subscription {subscription_id} ended before EOSE"
))
}

// ---------------------------------------------------------------------------
// WebSocket abstraction
// ---------------------------------------------------------------------------
Expand Down
Loading
Loading