Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ cargo run --bin backend -- --server-addr 0.0.0.0:<SERVER_PORT>

Replace `<SERVER_PORT>` with your desired port number (e.g., 8080).

> **_NOTE:_** By default, the backend runs in **restricted** mode, which requires clients to subscribe to the events specified in `backend/restricted_filters.json`. To disable this behavior, set the `ALLOW_UNRESTRICTED_FILTERS` environment variable to a non-empty value before spawning the backend process.

### 2. Configure the Frontend

Edit the `.env` file in the frontend directory to set the WebSocket URL:
Expand Down
166 changes: 166 additions & 0 deletions backend/restricted_filters.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
[
{
"event_name": "TxnHeaderStart"
},
{
"event_name": "BlockStart"
},
{
"event_name": "BlockQC"
},
{
"event_name": "BlockFinalized"
},
{
"event_name": "BlockVerified"
},
{
"event_name": "BlockReject"
},
{
"event_name": "BlockEnd"
},
{
"event_name": "TxnEnd"
},
{
"event_name": "TxnEvmOutput"
},
{
"event_name": "TxnLog",
"field_filters": [
{
"field": "address",
"filter": {
"values": ["0xa68a7f0601effdc65c64d9c47ca1b18d96b4352c"]
}
},
{
"field": "topics",
"filter": {
"values": [
"0x6e4c3aa29fc5ed6dc56aa0a95d8ac6660b6bf4e9c2ab49a0ea79b9cdafbcd7eb"
]
}
}
]
},
{
"event_name": "TxnLog",
"field_filters": [
{
"field": "address",
"filter": {
"values": ["0x188d586ddcf52439676ca21a244753fa19f9ea8e"]
}
},
{
"field": "topics",
"filter": {
"values": [
"0x40e9cecb9f5f1f1c5b9c97dec2917b7ee92e57ba5563708daca94dd84ad7112f",
"0xadaf30776f551bccdfb307c3fd8cdec198ca9a852434c8022ee32d1ccedd8219"
]
}
}
]
},
{
"event_name": "TxnLog",
"field_filters": [
{
"field": "address",
"filter": {
"values": ["0xd5b70d70cbe6c42bcd1aaa662a21673a83f4615b"]
}
},
{
"field": "topics",
"filter": {
"values": [
"0x19b47279256b2a23a1665c810c8d55a1758940ee09377d4f8d26497a3577dc83"
]
}
}
]
},
{
"event_name": "TxnLog",
"field_filters": [
{
"field": "address",
"filter": {
"values": ["0xb3e6778480b2e488385e8205ea05e20060b813cb"]
}
},
{
"field": "topics",
"filter": {
"values": [
"0xc2e9a469a567800a865e66b33df0528af708b4d0764eb83443b96980e99f4c68"
]
}
}
]
},
{
"event_name": "TxnLog",
"field_filters": [
{
"field": "address",
"filter": {
"values": ["0x6131b5fae19ea4f9d964eac0408e4408b66337b5"]
}
},
{
"field": "topics",
"filter": {
"values": [
"0xd6d4f5681c246c9f42c203e287975af1601f8df8035a9251f79aab5c8f09e2f8"
]
}
}
]
},
{
"event_name": "TxnLog",
"field_filters": [
{
"field": "address",
"filter": {
"values": ["0x6352a56caadc4f1e25cd6c75970fa768a3304e64"]
}
},
{
"field": "topics",
"filter": {
"values": [
"0x76af224a143865a50b41496e1a73622698692c565c1214bc862f18e22d829c5e"
]
}
}
]
},
{
"event_name": "NativeTransfer"
},
{
"event_name": "TxnLog",
"field_filters": [
{
"field": "address",
"filter": {
"values": ["0x3bd359c1119da7da1d913d1c4d2b7c461115433a"]
}
},
{
"field": "topics",
"filter": {
"values": [
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
]
}
}
]
}
]
43 changes: 24 additions & 19 deletions backend/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use execution_events_example::{
event_filter::{ClientMessage, EventFilterSpec},
server::ServerMessage,
};
use execution_events_example::event_filter::load_restricted_filters;
use futures_util::{SinkExt, StreamExt};
use std::collections::HashMap;
use tokio_tungstenite::{connect_async, tungstenite::Message};
Expand All @@ -26,6 +27,9 @@ struct Cli {
#[arg(short, long, value_delimiter = ',')]
events: Option<Vec<String>>,

#[arg(long, default_value = "false")]
restricted: bool,

#[arg(long, default_value = "false")]
verbose_events: bool,

Expand Down Expand Up @@ -77,41 +81,42 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let (mut write, mut read) = ws_stream.split();

// Parse event names from strings to EventName enum
let event_strings = cli.events.clone().unwrap_or_default();
let events: Vec<EventName> = if event_strings.is_empty() {
Vec::new()
// Build event filter specs for subscription
let event_filters: Vec<EventFilterSpec> = if cli.restricted {
info!("Loading restricted filters");
let restricted_filters = load_restricted_filters();
restricted_filters.get_filter_specs()
} else {
event_strings
.iter()
.map(|s| {
serde_json::from_value(serde_json::Value::String(s.clone()))
.map_err(|_| format!("Invalid event name: {}", s))
})
.collect::<Result<Vec<_>, _>>()?
};

// Send subscription message
let subscribe_msg = ClientMessage::Subscribe {
event_filters: if events.is_empty() {
let event_strings = cli.events.clone().unwrap_or_default();
if event_strings.is_empty() {
Vec::new()
} else {
let events: Vec<EventName> = event_strings
.iter()
.map(|s| {
serde_json::from_value(serde_json::Value::String(s.clone()))
.map_err(|_| format!("Invalid event name: {}", s))
})
.collect::<Result<Vec<_>, _>>()?;
events
.iter()
.map(|event_name| EventFilterSpec {
event_name: *event_name,
field_filters: Vec::new(),
})
.collect()
},
}
};

// Send subscription message
let subscribe_msg = ClientMessage::Subscribe { event_filters: event_filters.clone() };
let subscribe_json = serde_json::to_string(&subscribe_msg)?;
write.send(Message::Text(subscribe_json)).await?;

if events.is_empty() {
if event_filters.is_empty() {
info!("Subscribed to all events");
} else {
info!("Subscribed to events: {:?}", events);
info!("Subscribed to {} event filters", event_filters.len());
}

// Read messages from the server
Expand Down
42 changes: 42 additions & 0 deletions backend/src/lib/event_filter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use alloy_primitives::{Address, B256, U256};
use serde::{Deserialize, Serialize};
use std::fs::File;
use super::event_listener::EventName;
use super::serializable_event::{SerializableEventData, SerializableExecEvent};

Expand Down Expand Up @@ -183,6 +184,24 @@ pub struct EventFilter {
event_filters: Vec<EventFilterSpec>,
}

impl PartialEq for EventFilter {
fn eq(&self, other: &Self) -> bool {
// Check that all elements in self exist in other
for spec in &self.event_filters {
if !other.event_filters.contains(spec) {
return false;
}
}
// Check that all elements in other exist in self
for spec in &other.event_filters {
if !self.event_filters.contains(spec) {
return false;
}
}
true
}
}

impl EventFilter {
/// Create a filter from event filter specs
pub fn new(event_filters: Vec<EventFilterSpec>) -> Self {
Expand Down Expand Up @@ -217,4 +236,27 @@ impl EventFilter {
pub fn accepts_all(&self) -> bool {
self.event_filters.is_empty()
}

/// Returns a clone of the event filter specs
pub fn get_filter_specs(&self) -> Vec<EventFilterSpec> {
self.event_filters.clone()
}
}

// Load restricted filters from file
// These filters are statically specified to restrict the events that can be subscribed to
pub fn load_restricted_filters() -> EventFilter {
let file = if let Ok(f) = File::open("restricted_filters.json") {
f
} else if let Ok(f) = File::open("backend/restricted_filters.json") {
f
} else {
panic!("restricted_filters.json not found");
};
let filters: Vec<EventFilterSpec> = serde_json::from_reader(file).unwrap();
EventFilter::new(filters)
}

pub fn is_restricted_mode() -> bool {
std::env::var("ALLOW_UNRESTRICTED_FILTERS").is_err()
}
24 changes: 22 additions & 2 deletions backend/src/lib/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
use tracing::{error, info, warn};
use serde::{Deserialize, Serialize};

use crate::event_filter::{is_restricted_mode, load_restricted_filters};
use crate::event_listener::EventName;
use crate::top_k_tracker::{AccessEntry, TopKTracker};

Expand Down Expand Up @@ -71,13 +72,23 @@ impl TPSTracker {
async fn wait_for_subscription(
ws_receiver: &mut SplitStream<WebSocketStream<TcpStream>>,
addr: SocketAddr,
restricted_filters: Option<EventFilter>,
) -> Option<EventFilter> {
let timeout = tokio::time::Duration::from_secs(10);

match tokio::time::timeout(timeout, ws_receiver.next()).await {
Ok(Some(Ok(Message::Text(text)))) => match serde_json::from_str::<ClientMessage>(&text) {
Ok(ClientMessage::Subscribe { event_filters }) => {
let filter = EventFilter::new(event_filters.clone());
if let Some(restricted_filters) = restricted_filters {
if filter != restricted_filters {
warn!(
"Client {} subscription does not match restricted filters, closing connection",
addr
);
return None;
}
}
if filter.accepts_all() {
info!("Client {} subscribed to all events", addr);
} else {
Expand Down Expand Up @@ -341,6 +352,7 @@ async fn handle_connection(
stream: TcpStream,
addr: SocketAddr,
event_broadcast_receiver: broadcast::Receiver<EventDataOrMetrics>,
restricted_filters: Option<EventFilter>,
) {
info!("New WebSocket connection from: {}", addr);

Expand All @@ -355,7 +367,7 @@ async fn handle_connection(
let (ws_sender, mut ws_receiver) = ws_stream.split();

// Wait for subscription message before streaming events
let filter = match wait_for_subscription(&mut ws_receiver, addr).await {
let filter = match wait_for_subscription(&mut ws_receiver, addr, restricted_filters).await {
Some(f) => f,
None => return,
};
Expand Down Expand Up @@ -408,12 +420,20 @@ pub async fn run_websocket_server(
let listener = TcpListener::bind(&server_addr).await?;
info!("WebSocket server listening on: {}", server_addr);

let restricted_filters: Option<EventFilter> = if is_restricted_mode() {
info!("Running in restricted mode");
Some(load_restricted_filters())
} else {
info!("Running in unrestricted mode");
None
};

// Accept incoming connections
loop {
match listener.accept().await {
Ok((stream, client_addr)) => {
let event_broadcast_receiver = event_broadcast_sender.subscribe();
tokio::spawn(handle_connection(stream, client_addr, event_broadcast_receiver));
tokio::spawn(handle_connection(stream, client_addr, event_broadcast_receiver, restricted_filters.clone()));
}
Err(e) => {
error!("Error accepting connection: {}", e);
Expand Down