Skip to content
This repository was archived by the owner on Aug 11, 2023. It is now read-only.

Commit 0bd1d93

Browse files
committed
overhaul
1 parent d94bf12 commit 0bd1d93

File tree

15 files changed

+797
-94
lines changed

15 files changed

+797
-94
lines changed

Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,11 @@ ethers = { version = "0.17.0", features = [ "ws", "rustls" ] }
2222
serde_json = { version = "1.0.64", default-features = false, features = ["raw_value"] }
2323
serde = "1.0.144"
2424
actix-rt = "2.7.0"
25+
soketto = "0.7.1"
26+
tokio = { version = "1", features = ["full"] }
27+
tokio-util = { version = "0.6", features = ["compat"] }
28+
tokio-stream = { version = "0.1", features = ["net"] }
29+
futures = { default-features = false, features = ["bilock", "std", "unstable"], version = "0.3.1" }
30+
tracing = "0.1.36"
31+
tracing-subscriber = "0.3.15"
32+
uuid = { version = "1.1.2", features = [ "serde" ] }

README.md

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,77 @@ alchemy_rs = "0.1.0"
1717

1818
## Usage
1919

20-
// TODO
20+
[alchemy-rs](https://github.com/abigger87/alchemy-rs) is a minimal ethers-rs wrapper for the Alchemy API built in pure rust.
21+
22+
The [AlchemyManager](src/manager.rs) is the main entry point for interacting with the Alchemy API. It is initializable with an Alchemy API key and a [Chain](https://docs.rs/ethers/latest/ethers/types/enum.Chain.html). Alchemy supports the following chains: ...
23+
2124

2225

2326
## Examples
2427

25-
```rust
28+
Listening to pending transactions using alchemy's `alchemy_pendingTransactions` method is demonstrated below.
2629

30+
```rust
31+
use std::str::FromStr;
32+
use std::env;
33+
34+
use alchemy_rs::prelude::*;
35+
36+
async {
37+
// Read an alchemy websocket api key from the `ALCHEMY_API_KEY` environment variable
38+
let api_key = env::var("ALCHEMY_API_KEY").expect("ALCHEMY_API_KEY must be set");
39+
40+
// Create the AlchemyManager
41+
let mut manager = AlchemyManager::new(&format!("wss://eth-mainnet.g.alchemy.com/v2/{}", api_key), None);
42+
43+
// Connect to the websocket
44+
let _ = manager.connect().await.unwrap();
45+
46+
// Listen to _pending_ transactions to the USDT address on mainnet
47+
// (there should be a lot of these!)
48+
let usdt_address = Address::from_str("dac17f958d2ee523a2206206994597c13d831ec7").unwrap();
49+
50+
// Try to subscribe to pending transactions
51+
let sub_id = match manager.subscribe(Some(usdt_address), None).await {
52+
Ok(id) => id,
53+
Err(e) => {
54+
println!("Error subscribing to pending transactions: {:?}", e);
55+
return;
56+
}
57+
};
58+
59+
// Now we can grab items from the stream
60+
let item: AlchemySocketMessageResponse;
61+
loop {
62+
match manager.receive(sub_id).await {
63+
Ok(i) => {
64+
item = i;
65+
break;
66+
},
67+
Err(e) => {
68+
println!("Error receiving item: {:?}", e);
69+
return;
70+
}
71+
}
72+
}
73+
74+
// Print the next item
75+
println!("Received pending transaction from the stream: {:?}", item);
76+
};
2777
```
2878

2979

80+
## Safety
81+
82+
> **Warning**
83+
>
84+
> This is **experimental software** and is provided on an "as is" and "as available" basis.
85+
> Expect rapid iteration and **use at your own risk**.
86+
87+
3088
## License
3189

32-
[AGPL-3.0-only](https://github.com/abigger87/alchemy-rs/blob/master/LICENSE)
90+
[MIT](https://github.com/abigger87/alchemy-rs/blob/master/LICENSE), but go crazy :P
3391

3492

3593
## Acknowledgements

src/connectors/errors.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use ethers::providers::ProviderError;
2+
use soketto::handshake;
3+
4+
/// An Alchemy Websocket Connection Error
5+
#[derive(Debug)]
6+
pub enum AlchemyConnectionError {
7+
/// A Provider Connection Error
8+
ProviderError(ProviderError),
9+
/// Raw Websocket TcpStream Error
10+
RawStreamError(std::io::Error),
11+
/// A Raw Websocket Connection Error
12+
RawSocketError(soketto::handshake::Error),
13+
/// A Raw Websocket Handshake Error
14+
RawHandshakeError(handshake::Error),
15+
/// Deserialization Error
16+
Deserialization(serde_json::Error),
17+
/// Received an unexpected response type
18+
UnexpectedResponseType,
19+
/// Missing the websocket channel sender
20+
MissingSender,
21+
/// Missing the websocket channel receiver
22+
MissingReceiver,
23+
/// Connection Closed
24+
Closed,
25+
/// No websocket connection established yet
26+
MissingConnection,
27+
/// Sending a message to the websocket channel failed
28+
SendFailed(soketto::connection::Error),
29+
/// Flushing the websocket channel failed
30+
FlushFailed(soketto::connection::Error),
31+
32+
/// Some soketto websocket error
33+
SomeError(soketto::connection::Error),
34+
/// The method is unimplemented
35+
Unimplemented,
36+
/// The text response could not be parsed as a string
37+
InvalidTextString,
38+
}

src/connectors/mod.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
//! Alchemy Connectors
2+
3+
/// An ethers provider connector
4+
pub mod provider;
5+
6+
/// A raw websocket connector
7+
pub mod raw;
8+
9+
/// Common Errors
10+
pub mod errors;
11+
12+
/// Re-export a prelude
13+
pub mod prelude {
14+
pub use super::{errors::*, provider::*, raw::*, *};
15+
}
16+
17+
/// An alchemy api connection manager
18+
#[derive(Debug)]
19+
pub enum AlchemyConnector {
20+
/// An ethers-rs websocket [Provider](ethers::providers::Provider) for alchemy
21+
Provider(Option<provider::EthersWsProvider>),
22+
/// A Raw, Persistent Websocket Connection to the Alchemy API using [soketto](https://docs.rs/soketto/latest/soketto/)
23+
Raw(Option<raw::RawAlchemyConnection>),
24+
}
25+
26+
/// The type of alchemy api websocket connection
27+
#[derive(Debug, Clone, PartialEq, Eq)]
28+
pub enum AlchemyConnectorType {
29+
/// An ethers-rs websocket [Provider](ethers::providers::Provider) for alchemy
30+
Provider,
31+
/// A Raw, Persistent Websocket Connection to the Alchemy API using [soketto](https://docs.rs/soketto/latest/soketto/)
32+
Raw,
33+
}
34+
35+
impl Default for AlchemyConnectorType {
36+
fn default() -> Self {
37+
AlchemyConnectorType::Raw
38+
}
39+
}
40+
41+
impl From<AlchemyConnectorType> for AlchemyConnector {
42+
fn from(t: AlchemyConnectorType) -> Self {
43+
match t {
44+
AlchemyConnectorType::Provider => AlchemyConnector::Provider(None),
45+
AlchemyConnectorType::Raw => AlchemyConnector::Raw(None),
46+
}
47+
}
48+
}

src/connectors/provider.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use ethers::prelude::*;
2+
3+
use super::errors::AlchemyConnectionError;
4+
5+
/// An ethers-rs websocket [Provider](ethers::providers::Provider) for alchemy
6+
#[derive(Debug, Default, Clone)]
7+
pub struct EthersWsProvider {
8+
/// The ethers-rs websocket [Provider](ethers::providers::Provider)
9+
pub provider: Option<Provider<Ws>>,
10+
}
11+
12+
impl EthersWsProvider {
13+
/// Create a new ethers-rs websocket [Provider](ethers::providers::Provider) for alchemy
14+
pub fn new() -> Self {
15+
Self { provider: None }
16+
}
17+
18+
/// Connect to the websocket provider
19+
pub async fn connect(&mut self, url: &str) -> Result<(), AlchemyConnectionError> {
20+
match Provider::connect(String::from(url)).await {
21+
Ok(p) => {
22+
self.provider = Some(p);
23+
Ok(())
24+
}
25+
Err(e) => Err(AlchemyConnectionError::ProviderError(e)),
26+
}
27+
}
28+
}
29+
30+
impl From<Provider<Ws>> for EthersWsProvider {
31+
fn from(provider: Provider<Ws>) -> Self {
32+
Self {
33+
provider: Some(provider),
34+
}
35+
}
36+
}

src/connectors/raw.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
use futures::io::{BufReader, BufWriter};
2+
use soketto::handshake;
3+
use tokio::net::TcpStream;
4+
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
5+
6+
use super::errors::AlchemyConnectionError;
7+
8+
/// A Raw, Persistent Websocket Connection to the Alchemy API using [soketto](https://docs.rs/soketto/latest/soketto/)
9+
///
10+
/// ## Alchemy
11+
///
12+
/// The Alchemy Websocket API allows you to interactively demo the endpoints.
13+
///
14+
/// Simply install a websocket shell command and connect to the demo endpoint:
15+
///
16+
/// ```sh
17+
/// $ wscat -c wss://eth-mainnet.ws.alchemyapi.io/ws/demo
18+
///
19+
/// // create subscription
20+
/// > {"id": 1, "method": "eth_subscribe", "params": ["newHeads"]}
21+
/// < {"jsonrpc":"2.0","id":1,"result":"0xcd0c3e8af590364c09d0fa6a1210faf5"}
22+
///
23+
/// // incoming notifications
24+
/// < {"jsonrpc":"2.0","method":"eth_subscription","params":{"subscription":"0xcd0c3e8af590364c09d0fa6a1210faf5","result":{"difficulty":"0xd9263f42a87",<...>, "uncles":[]}}}
25+
/// < {"jsonrpc":"2.0","method":"eth_subscription","params":{"subscription":"0xcd0c3e8af590364c09d0fa6a1210faf5","result":{"difficulty":"0xd90b1a7ad02", <...>, "uncles":["0x80aacd1ea4c9da32efd8c2cc9ab38f8f70578fcd46a1a4ed73f82f3e0957f936"]}}}
26+
///
27+
/// // cancel subscription
28+
/// > {"id": 1, "method": "eth_unsubscribe", "params": ["0xcd0c3e8af590364c09d0fa6a1210faf5"]}
29+
/// < {"jsonrpc":"2.0","id":1,"result":true}
30+
/// ```
31+
///
32+
#[derive(Debug, Default)]
33+
pub struct RawAlchemyConnection {
34+
// / The websocket connection
35+
// pub connection: Option<soketto::handshake::Client<'a, BufReader<BufWriter<Compat<TcpStream>>>>>,
36+
/// The websocket client sender after building
37+
pub sender: Option<soketto::Sender<BufReader<BufWriter<Compat<TcpStream>>>>>,
38+
/// The websocket client receiver after building
39+
pub receiver: Option<soketto::Receiver<BufReader<BufWriter<Compat<TcpStream>>>>>,
40+
}
41+
42+
impl RawAlchemyConnection {
43+
/// Create a new RawAlchemyConnection
44+
pub fn new() -> Self {
45+
Self {
46+
sender: None,
47+
receiver: None,
48+
}
49+
}
50+
51+
/// Connect to the sokettot websocket
52+
pub async fn connect(&mut self, url: &'_ str) -> Result<(), AlchemyConnectionError> {
53+
// Create the socket connection
54+
let socket = match tokio::net::TcpStream::connect(url).await {
55+
Ok(s) => s,
56+
Err(e) => return Err(AlchemyConnectionError::RawStreamError(e)),
57+
};
58+
59+
// Create the client connection
60+
let compatible_socket = BufReader::new(BufWriter::new(socket.compat()));
61+
let mut client = handshake::Client::new(compatible_socket, url, "");
62+
63+
// Handshake the connection
64+
match client.handshake().await {
65+
Ok(sr) => {
66+
tracing::info!("Got handshake response: {:?}", sr);
67+
tracing::debug!(
68+
"Expecting server response: {:?}",
69+
handshake::ServerResponse::Accepted { protocol: None }
70+
);
71+
}
72+
Err(e) => return Err(AlchemyConnectionError::RawHandshakeError(e)),
73+
}
74+
75+
let (sender, receiver) = client.into_builder().finish();
76+
self.sender = Some(sender);
77+
self.receiver = Some(receiver);
78+
79+
Ok(())
80+
}
81+
}

src/lib.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,14 @@
33
#![forbid(unsafe_code)]
44
#![forbid(where_clauses_object_safety)]
55
#![deny(rustdoc::broken_intra_doc_links)]
6-
76
#![doc=include_str!("../README.md")]
87

8+
/// Refactored Websocket Connectors for the Alchemy Manager
9+
pub mod connectors;
10+
11+
/// Refactored Alchemy Websocket Messages
12+
pub mod messages;
13+
914
/// Alchemy Manager
1015
pub mod manager;
1116

@@ -17,5 +22,8 @@ pub mod types;
1722

1823
/// A prelude of commonly used alchemy-rs items
1924
pub mod prelude {
20-
pub use super::{manager::*, types::*, wrapper::*};
25+
pub use super::{manager::*, messages::prelude::*, types::*, wrapper::*};
26+
27+
// Re-export ethers-rs prelude
28+
pub use ethers::prelude::*;
2129
}

0 commit comments

Comments
 (0)