Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ default = []
#lightning-liquidity = { version = "0.2.0", features = ["std"] }
#lightning-macros = { version = "0.2.0" }

lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std"] }
lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" }
lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std"] }
lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" }
lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["tokio"] }
lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" }
lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" }
lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["rest-client", "rpc-client", "tokio"] }
lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["esplora-async-https", "time", "electrum-rustls-ring"] }
lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std"] }
lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" }
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std"] }
lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" }
lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std"] }
lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" }
lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["tokio"] }
lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" }
lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" }
lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["rest-client", "rpc-client", "tokio"] }
lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["esplora-async-https", "time", "electrum-rustls-ring"] }
lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std"] }
lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" }

bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] }
bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]}
Expand Down Expand Up @@ -78,13 +78,13 @@ log = { version = "0.4.22", default-features = false, features = ["std"]}
vss-client = { package = "vss-client-ng", version = "0.4" }
prost = { version = "0.11.6", default-features = false}
#bitcoin-payment-instructions = { version = "0.6" }
bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", rev = "fdca6c62f2fe2c53427d3e51e322a49aa7323ee2" }
bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", rev = "ce9ff5281ae9bb05526981f6f9df8f8d929c7c44" }

[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3", features = ["winbase"] }

[dev-dependencies]
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std", "_test_utils"] }
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std", "_test_utils"] }
proptest = "1.0.0"
regex = "1.5.6"
criterion = { version = "0.7.0", features = ["async_tokio"] }
Expand Down
153 changes: 96 additions & 57 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use lightning::routing::scoring::{
};
use lightning::sign::{EntropySource, NodeSigner};
use lightning::util::persist::{
KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
};
use lightning::util::ser::ReadableArgs;
Expand Down Expand Up @@ -69,11 +69,12 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger};
use crate::message_handler::NodeCustomMessageHandler;
use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
use crate::peer_store::PeerStore;
use crate::runtime::Runtime;
use crate::runtime::{Runtime, RuntimeSpawner};
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, KeysManager,
MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, SyncAndAsyncKVStore,
AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph,
KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister,
SyncAndAsyncKVStore,
};
use crate::wallet::persist::KVStoreWalletPersister;
use crate::wallet::Wallet;
Expand Down Expand Up @@ -1051,10 +1052,20 @@ fn build_with_store_internal(
}
}

let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger)));
let fee_estimator = Arc::new(OnchainFeeEstimator::new());

let kv_store_ref = Arc::clone(&kv_store);
let logger_ref = Arc::clone(&logger);
let (payment_store_res, node_metris_res) = runtime.block_on(async move {
tokio::join!(
read_payments(&*kv_store_ref, Arc::clone(&logger_ref)),
read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)),
)
});

// Initialize the status fields.
let node_metrics = match runtime
.block_on(async { read_node_metrics(&*kv_store, Arc::clone(&logger)).await })
{
let node_metrics = match node_metris_res {
Ok(metrics) => Arc::new(RwLock::new(metrics)),
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
Expand All @@ -1065,23 +1076,20 @@ fn build_with_store_internal(
}
},
};
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger)));
let fee_estimator = Arc::new(OnchainFeeEstimator::new());

let payment_store =
match runtime.block_on(async { read_payments(&*kv_store, Arc::clone(&logger)).await }) {
Ok(payments) => Arc::new(PaymentStore::new(
payments,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
Arc::clone(&kv_store),
Arc::clone(&logger),
)),
Err(e) => {
log_error!(logger, "Failed to read payment data from store: {}", e);
return Err(BuildError::ReadFailed);
},
};
let payment_store = match payment_store_res {
Ok(payments) => Arc::new(PaymentStore::new(
payments,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
Arc::clone(&kv_store),
Arc::clone(&logger),
)),
Err(e) => {
log_error!(logger, "Failed to read payment data from store: {}", e);
return Err(BuildError::ReadFailed);
},
};

let (chain_source, chain_tip_opt) = match chain_data_source_config {
Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => {
Expand Down Expand Up @@ -1261,8 +1269,9 @@ fn build_with_store_internal(
));

let peer_storage_key = keys_manager.get_peer_storage_key();
let persister = Arc::new(Persister::new(
let monitor_reader = Arc::new(AsyncPersister::new(
Arc::clone(&kv_store),
RuntimeSpawner::new(Arc::clone(&runtime)),
Arc::clone(&logger),
PERSISTER_MAX_PENDING_UPDATES,
Arc::clone(&keys_manager),
Expand All @@ -1271,8 +1280,18 @@ fn build_with_store_internal(
Arc::clone(&fee_estimator),
));

// Read ChannelMonitors and the NetworkGraph
let kv_store_ref = Arc::clone(&kv_store);
let logger_ref = Arc::clone(&logger);
let (monitor_read_res, network_graph_res) = runtime.block_on(async move {
tokio::join!(
monitor_reader.read_all_channel_monitors_with_updates_parallel(),
read_network_graph(&*kv_store_ref, logger_ref),
)
});

// Read ChannelMonitor state from store
let channel_monitors = match persister.read_all_channel_monitors_with_updates() {
let channel_monitors = match monitor_read_res {
Ok(monitors) => monitors,
Err(e) => {
if e.kind() == lightning::io::ErrorKind::NotFound {
Expand All @@ -1284,6 +1303,16 @@ fn build_with_store_internal(
},
};

let persister = Arc::new(Persister::new(
Arc::clone(&kv_store),
Arc::clone(&logger),
PERSISTER_MAX_PENDING_UPDATES,
Arc::clone(&keys_manager),
Arc::clone(&keys_manager),
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
));

// Initialize the ChainMonitor
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
Some(Arc::clone(&chain_source)),
Expand All @@ -1296,9 +1325,7 @@ fn build_with_store_internal(
));

// Initialize the network graph, scorer, and router
let network_graph = match runtime
.block_on(async { read_network_graph(&*kv_store, Arc::clone(&logger)).await })
{
let network_graph = match network_graph_res {
Ok(graph) => Arc::new(graph),
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
Expand All @@ -1310,9 +1337,42 @@ fn build_with_store_internal(
},
};

let local_scorer = match runtime.block_on(async {
read_scorer(&*kv_store, Arc::clone(&network_graph), Arc::clone(&logger)).await
}) {
// Read various smaller LDK and ldk-node objects from the store
let kv_store_ref = Arc::clone(&kv_store);
let logger_ref = Arc::clone(&logger);
let network_graph_ref = Arc::clone(&network_graph);
let output_sweeper_future = read_output_sweeper(
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
Arc::clone(&chain_source),
Arc::clone(&keys_manager),
Arc::clone(&kv_store_ref),
Arc::clone(&logger_ref),
);
let (
scorer_res,
external_scores_res,
channel_manager_bytes_res,
sweeper_bytes_res,
event_queue_res,
peer_info_res,
) = runtime.block_on(async move {
tokio::join!(
read_scorer(&*kv_store_ref, network_graph_ref, Arc::clone(&logger_ref)),
read_external_pathfinding_scores_from_cache(&*kv_store_ref, Arc::clone(&logger_ref)),
KVStore::read(
&*kv_store_ref,
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
),
output_sweeper_future,
read_event_queue(Arc::clone(&kv_store_ref), Arc::clone(&logger_ref)),
read_peer_info(Arc::clone(&kv_store_ref), Arc::clone(&logger_ref)),
)
});

let local_scorer = match scorer_res {
Ok(scorer) => scorer,
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
Expand All @@ -1328,9 +1388,7 @@ fn build_with_store_internal(
let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));

// Restore external pathfinding scores from cache if possible.
match runtime.block_on(async {
read_external_pathfinding_scores_from_cache(&*kv_store, Arc::clone(&logger)).await
}) {
match external_scores_res {
Ok(external_scores) => {
scorer.lock().unwrap().merge(external_scores, cur_time);
log_trace!(logger, "External scores from cache merged successfully");
Expand Down Expand Up @@ -1383,12 +1441,7 @@ fn build_with_store_internal(

// Initialize the ChannelManager
let channel_manager = {
if let Ok(reader) = KVStoreSync::read(
&*kv_store,
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
) {
if let Ok(reader) = channel_manager_bytes_res {
let channel_monitor_references =
channel_monitors.iter().map(|(_, chanmon)| chanmon).collect();
let read_args = ChannelManagerReadArgs::new(
Expand Down Expand Up @@ -1613,17 +1666,7 @@ fn build_with_store_internal(
let connection_manager =
Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger)));

let output_sweeper = match runtime.block_on(async {
read_output_sweeper(
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
Arc::clone(&chain_source),
Arc::clone(&keys_manager),
Arc::clone(&kv_store),
Arc::clone(&logger),
)
.await
}) {
let output_sweeper = match sweeper_bytes_res {
Ok(output_sweeper) => Arc::new(output_sweeper),
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
Expand All @@ -1644,9 +1687,7 @@ fn build_with_store_internal(
},
};

let event_queue = match runtime
.block_on(async { read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)).await })
{
let event_queue = match event_queue_res {
Ok(event_queue) => Arc::new(event_queue),
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
Expand All @@ -1658,9 +1699,7 @@ fn build_with_store_internal(
},
};

let peer_store = match runtime
.block_on(async { read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)).await })
{
let peer_store = match peer_info_res {
Ok(peer_store) => Arc::new(peer_store),
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
Expand Down
20 changes: 1 addition & 19 deletions src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
// accordance with one or both of these licenses.

use std::future::Future;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;

use lightning::util::native_async::FutureSpawner;
use lightning_block_sync::gossip::GossipVerifier;

use crate::chain::ChainSource;
use crate::config::RGS_SYNC_TIMEOUT_SECS;
use crate::logger::{log_trace, LdkLogger, Logger};
use crate::runtime::Runtime;
use crate::runtime::{Runtime, RuntimeSpawner};
use crate::types::{GossipSync, Graph, P2PGossipSync, RapidGossipSync};
use crate::Error;

Expand Down Expand Up @@ -114,19 +112,3 @@ impl GossipSource {
}
}
}

pub(crate) struct RuntimeSpawner {
runtime: Arc<Runtime>,
}

impl RuntimeSpawner {
pub(crate) fn new(runtime: Arc<Runtime>) -> Self {
Self { runtime }
}
}

impl FutureSpawner for RuntimeSpawner {
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
self.runtime.spawn_cancellable_background_task(future);
}
}
28 changes: 28 additions & 0 deletions src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use std::future::Future;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use lightning::util::native_async::FutureSpawner;

use tokio::task::{JoinHandle, JoinSet};

use crate::config::{
Expand Down Expand Up @@ -219,3 +221,29 @@ enum RuntimeMode {
Owned(tokio::runtime::Runtime),
Handle(tokio::runtime::Handle),
}

pub(crate) struct RuntimeSpawner {
runtime: Arc<Runtime>,
}

impl RuntimeSpawner {
pub(crate) fn new(runtime: Arc<Runtime>) -> Self {
Self { runtime }
}
}

impl FutureSpawner for RuntimeSpawner {
type E = tokio::sync::oneshot::error::RecvError;
type SpawnedFutureResult<O> = tokio::sync::oneshot::Receiver<O>;
fn spawn<O: Send + 'static, F: Future<Output = O> + Send + 'static>(
&self, future: F,
) -> Self::SpawnedFutureResult<O> {
let (result, output) = tokio::sync::oneshot::channel();
self.runtime.spawn_cancellable_background_task(async move {
// We don't care if the send works or not, if the receiver is dropped its not our
// problem.
let _ = result.send(future.await);
});
output
}
}
Loading
Loading