subcoin_network/
lib.rs

1//! # Bitcoin Network
2//!
3//! This crate facilitates communication with other nodes in the Bitcoin P2P network. It handles
4//! connections, message transmission, and the peer-to-peer protocol.
5//!
6//! ## Initial Block Download
7//!
8//! This crate offers two strategies for the initial block download:
9//!
10//! - **Blocks-First**: Downloads the full block data sequentially starting from the last known
11//!   block until it's fully synced with the network, in batches. This is primarily for the testing
12//!   purpose.
13//!
14//! - **Headers-First**: First downloads the block headers and then proceeds to the full block data
15//!   based on the checkpoints.
16//!
17//! However, due to the nature of Subcoin, building a Bitcoin SPV node solely by syncing Bitcoin headers
18//! from the network is not possible, in that Subcoin requires full block data to derive the corresponding
19//! substrate header properly, including generating the `extrinsics_root` and `state_root`.
20//!
21//! ## Subcoin Bootstrap Node
22//!
23//! Subcoin node runs the Bitcoin networking and Substrate networking in parallel. Initial block download
24//! from Bitcoin p2p network is time-consuming because every historical block must be downloaded and executed.
25//! The advantage of Subcoin node is that only the Subcoin bootstrap node needs to perform full block sync.
26//! Once Subcoin bootstrap nodes are operational in the network, newly joined Subcoin nodes can quickly sync
27//! to Bitcoin chain's tip by leveraging the advanced state sync provided by the Substrate networking stack.
28
29mod address_book;
30mod checkpoint;
31mod metrics;
32mod network_api;
33mod network_processor;
34mod peer_connection;
35mod peer_manager;
36mod peer_store;
37mod sync;
38#[cfg(test)]
39mod tests;
40mod transaction_manager;
41
42use crate::metrics::BandwidthMetrics;
43use crate::network_api::NetworkProcessorMessage;
44use crate::network_processor::NetworkProcessor;
45use crate::peer_connection::ConnectionInitiator;
46use crate::peer_store::{PersistentPeerStore, PersistentPeerStoreHandle};
47use bitcoin::p2p::ServiceFlags;
48use bitcoin::{BlockHash, Network as BitcoinNetwork};
49use chrono::prelude::{DateTime, Local};
50use peer_manager::HandshakeState;
51use sc_client_api::{AuxStore, HeaderBackend};
52use sc_consensus_nakamoto::{BlockImportQueue, ChainParams, HeaderVerifier};
53use sc_network_sync::SyncingService;
54use sc_service::TaskManager;
55use sc_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
56use sp_runtime::traits::Block as BlockT;
57use std::net::{AddrParseError, SocketAddr};
58use std::path::PathBuf;
59use std::sync::Arc;
60use std::sync::atomic::{AtomicBool, AtomicU64};
61use substrate_prometheus_endpoint::Registry;
62use tokio::net::TcpListener;
63use tokio::sync::oneshot;
64
65pub use crate::network_api::{
66    NetworkApi, NetworkHandle, NetworkStatus, NoNetwork, OfflineSync, SendTransactionResult,
67    SyncStatus,
68};
69pub use crate::sync::{PeerSync, PeerSyncState};
70
71/// Identifies a peer.
72pub type PeerId = SocketAddr;
73
74/// Peer latency in milliseconds.
75pub type Latency = u128;
76
77pub(crate) type LocalTime = DateTime<Local>;
78
79/// Network error type.
80#[derive(Debug, thiserror::Error)]
81pub enum Error {
82    #[error("Invalid bootnode address: {0}")]
83    InvalidBootnode(String),
84    #[error("Received 0 bytes, peer performed an orderly shutdown")]
85    PeerShutdown,
86    #[error("Cannot communicate with the network event stream")]
87    NetworkEventStreamError,
88    #[error("Peer {0:?} not found")]
89    PeerNotFound(PeerId),
90    #[error("Connection of peer {0:?} not found")]
91    ConnectionNotFound(PeerId),
92    #[error("Connecting to the stream timed out")]
93    ConnectionTimeout,
94    #[error("Unexpected handshake state: {0:?}")]
95    UnexpectedHandshakeState(Box<HandshakeState>),
96    #[error("Handshake timeout")]
97    HandshakeTimeout,
98    #[error("Only IPv4 peers are supported")]
99    Ipv4Only,
100    #[error("Peer is not a full node")]
101    NotFullNode,
102    #[error("Peer is not a segwit node")]
103    NotSegwitNode,
104    #[error("Peer's protocol version is too low")]
105    ProtocolVersionTooLow,
106    #[error("Header contains invalid proof-of-block")]
107    BadProofOfWork(BlockHash),
108    #[error("Too many Inventory::Block items in inv message")]
109    InvHasTooManyBlockItems,
110    #[error("Too many entries (> 2000) in headers message")]
111    TooManyHeaders,
112    #[error("Entries in headers message are not in ascending order")]
113    HeadersNotInAscendingOrder,
114    #[error("Too many inventory items")]
115    TooManyInventoryItems,
116    #[error("Ping timeout")]
117    PingTimeout,
118    #[error("Ping latency ({0}) exceeds the threshold")]
119    PingLatencyTooHigh(Latency),
120    #[error("Peer is deprioritized for syncing and has encountered multiple failures")]
121    UnreliablePeer,
122    #[error("Peer's latency ({0} ms) is too high")]
123    SlowPeer(Latency),
124    #[error("Unexpected pong message")]
125    UnexpectedPong,
126    #[error("Bad nonce in pong, expected: {expected}, got: {got}")]
127    BadPong { expected: u64, got: u64 },
128    #[error("Cannot find the parent of the first header in headers message")]
129    MissingFirstHeaderParent,
130    #[error("Other: {0}")]
131    Other(String),
132    #[error(transparent)]
133    IO(#[from] std::io::Error),
134    #[error(transparent)]
135    InvalidAddress(#[from] AddrParseError),
136    #[error(transparent)]
137    Blockchain(#[from] sp_blockchain::Error),
138    #[error(transparent)]
139    Consensus(#[from] sp_consensus::Error),
140    #[error(transparent)]
141    BitcoinIO(#[from] bitcoin::io::Error),
142    #[error(transparent)]
143    BitcoinEncoding(#[from] bitcoin::consensus::encode::Error),
144}
145
146// Ignore the peer if it is not full with witness enabled as we only want to
147// download from peers that can provide use full witness data for blocks.
148fn validate_outbound_services(services: ServiceFlags) -> Result<(), Error> {
149    if !services.has(ServiceFlags::NETWORK) {
150        return Err(Error::NotFullNode);
151    }
152
153    if !services.has(ServiceFlags::WITNESS) {
154        return Err(Error::NotSegwitNode);
155    }
156
157    Ok(())
158}
159
160/// Represents the strategy for block syncing.
161#[derive(Debug, Clone, Copy, Default)]
162#[cfg_attr(feature = "cli", derive(clap::ValueEnum))]
163pub enum SyncStrategy {
164    /// Download the headers first, followed by the block bodies.
165    #[default]
166    HeadersFirst,
167    /// Download the full blocks (both headers and bodies) in sequence.
168    BlocksFirst,
169}
170
171#[derive(Debug, Default)]
172struct Bandwidth {
173    total_bytes_inbound: Arc<AtomicU64>,
174    total_bytes_outbound: Arc<AtomicU64>,
175    metrics: Option<BandwidthMetrics>,
176}
177
178impl Bandwidth {
179    fn new(registry: Option<&Registry>) -> Self {
180        Self {
181            total_bytes_inbound: Arc::new(0.into()),
182            total_bytes_outbound: Arc::new(0.into()),
183            metrics: registry.and_then(|registry| {
184                BandwidthMetrics::register(registry)
185                    .map_err(|err| tracing::error!("Failed to register bandwidth metrics: {err}"))
186                    .ok()
187            }),
188        }
189    }
190
191    /// Report the metrics if needed.
192    ///
193    /// Possible labels: `in` and `out`.
194    fn report(&self, label: &str, value: u64) {
195        if let Some(metrics) = &self.metrics {
196            metrics.bandwidth.with_label_values(&[label]).set(value);
197        }
198    }
199}
200
201impl Clone for Bandwidth {
202    fn clone(&self) -> Self {
203        Self {
204            total_bytes_inbound: self.total_bytes_inbound.clone(),
205            total_bytes_outbound: self.total_bytes_outbound.clone(),
206            metrics: self.metrics.clone(),
207        }
208    }
209}
210
211/// Controls the block sync from Bitcoin P2P network.
212pub enum BlockSyncOption {
213    /// Bitcoin block sync is enabled on startup.
214    AlwaysOn,
215    /// Bitcoin block sync is fully disabled.
216    Off,
217    /// Bitcoin block sync is paused until Substrate fast sync completes,
218    /// after which it resumes automatically.
219    PausedUntilFastSync,
220}
221
222/// Memory management configuration for network operations.
223#[derive(Debug, Clone)]
224pub struct MemoryConfig {
225    /// Maximum memory usage for downloaded blocks in bytes (default: 256 MB).
226    pub max_downloaded_blocks_memory: usize,
227    /// Maximum number of blocks to keep in memory before sending to the import queue (default: 1000).
228    pub max_blocks_in_memory: usize,
229    /// Memory limit for orphan blocks pool in bytes (default: 64 MB).
230    pub max_orphan_blocks_memory: usize,
231    /// Enable adaptive batch sizing based on memory pressure (default: true).
232    pub enable_adaptive_batch_sizing: bool,
233}
234
235impl Default for MemoryConfig {
236    fn default() -> Self {
237        Self {
238            max_downloaded_blocks_memory: 256 * 1024 * 1024, // 256 MB
239            max_blocks_in_memory: 1000,
240            max_orphan_blocks_memory: 64 * 1024 * 1024, // 64 MB
241            enable_adaptive_batch_sizing: true,
242        }
243    }
244}
245
246/// Network configuration.
247pub struct Config {
248    /// Bitcoin network type.
249    pub network: BitcoinNetwork,
250    /// Node base path.
251    pub base_path: PathBuf,
252    /// Specify the local listen address.
253    pub listen_on: PeerId,
254    /// List of seednodes.
255    pub seednodes: Vec<String>,
256    /// Whether to connect to the seednode only.
257    pub seednode_only: bool,
258    /// Whether to accept the peer in ipv4 only.
259    pub ipv4_only: bool,
260    /// Target block of the chain sync.
261    pub sync_target: Option<u32>,
262    /// Maximum number of outbound peer connections.
263    pub max_outbound_peers: usize,
264    /// Maximum number of inbound peer connections.
265    pub max_inbound_peers: usize,
266    /// Minimum peer threshold to start the block sync.
267    pub min_sync_peer_threshold: usize,
268    /// Persistent peer latency threshold in milliseconds (ms).
269    pub persistent_peer_latency_threshold: u128,
270    /// Major sync strategy.
271    pub sync_strategy: SyncStrategy,
272    /// Whether to enable the block sync on startup.
273    ///
274    /// The block sync from Bitcoin P2P network may be disabled temporarily when
275    /// performing fast sync from the Subcoin network.
276    pub block_sync: BlockSyncOption,
277    /// Memory management configuration.
278    pub memory_config: MemoryConfig,
279}
280
281fn builtin_seednodes(network: BitcoinNetwork) -> &'static [&'static str] {
282    match network {
283        BitcoinNetwork::Bitcoin => {
284            &[
285                "seed.bitcoin.sipa.be:8333",                        // Pieter Wuille
286                "dnsseed.bluematt.me:8333",                         // Matt Corallo
287                "dnsseed.bitcoin.dashjr-list-of-p2p-nodes.us:8333", // Luke Dashjr
288                "seed.bitcoinstats.com:8333",                       // Christian Decker
289                "seed.bitcoin.jonasschnelli.ch:8333",               // Jonas Schnelli
290                "seed.btc.petertodd.net:8333",                      // Peter Todd
291                "seed.bitcoin.sprovoost.nl:8333",                   // Sjors Provoost
292                "dnsseed.emzy.de:8333",                             // Stephan Oeste
293                "seed.bitcoin.wiz.biz:8333",                        // Jason Maurice
294                "seed.mainnet.achownodes.xyz:8333",                 // Ava Chow
295            ]
296        }
297        BitcoinNetwork::Testnet => &[
298            "testnet-seed.bitcoin.jonasschnelli.ch:18333",
299            "seed.tbtc.petertodd.net:18333",
300            "seed.testnet.bitcoin.sprovoost.nl:18333",
301            "testnet-seed.bluematt.me:18333",
302            "testnet-seed.achownodes.xyz:18333",
303        ],
304        _ => &[],
305    }
306}
307
308/// Watch the Substrate sync status and enable the subcoin block sync when the Substate
309/// state sync is finished.
310// TODO: I'm not super happy with pulling in the dep sc-network-sync just for SyncingService.
311async fn watch_substrate_fast_sync<Block>(
312    subcoin_network_handle: NetworkHandle,
313    substate_sync_service: Arc<SyncingService<Block>>,
314) where
315    Block: BlockT,
316{
317    let mut interval = tokio::time::interval(std::time::Duration::from_millis(100));
318
319    let mut state_sync_has_started = false;
320
321    loop {
322        interval.tick().await;
323
324        let state_sync_is_active = substate_sync_service
325            .status()
326            .await
327            .map(|status| status.state_sync.is_some())
328            .unwrap_or(false);
329
330        if state_sync_is_active {
331            if !state_sync_has_started {
332                state_sync_has_started = true;
333            }
334        } else if state_sync_has_started {
335            tracing::info!("Detected state sync is complete, starting Subcoin block sync");
336            subcoin_network_handle.start_block_sync();
337            return;
338        }
339    }
340}
341
342async fn listen_for_inbound_connections(
343    listener: TcpListener,
344    max_inbound_peers: usize,
345    connection_initiator: ConnectionInitiator,
346    processor_msg_sender: TracingUnboundedSender<NetworkProcessorMessage>,
347) {
348    let local_addr = listener
349        .local_addr()
350        .expect("Local listen addr must be available; qed");
351
352    tracing::info!("🔊 Listening on {local_addr:?}",);
353
354    while let Ok((socket, peer_addr)) = listener.accept().await {
355        let (sender, receiver) = oneshot::channel();
356
357        if processor_msg_sender
358            .unbounded_send(NetworkProcessorMessage::RequestInboundPeersCount(sender))
359            .is_err()
360        {
361            return;
362        }
363
364        let Ok(inbound_peers_count) = receiver.await else {
365            return;
366        };
367
368        if inbound_peers_count < max_inbound_peers {
369            tracing::debug!(?peer_addr, "New peer accepted");
370
371            if let Err(err) = connection_initiator.initiate_inbound_connection(socket) {
372                tracing::debug!(?err, ?peer_addr, "Failed to initiate inbound connection");
373            }
374        }
375    }
376}
377
378async fn initialize_outbound_connections(
379    network: bitcoin::Network,
380    seednodes: Vec<String>,
381    seednode_only: bool,
382    persistent_peers: Vec<PeerId>,
383    connection_initiator: ConnectionInitiator,
384) {
385    let mut bootnodes = seednodes;
386
387    if !seednode_only {
388        bootnodes.extend(builtin_seednodes(network).iter().map(|s| s.to_string()));
389    }
390
391    bootnodes.extend(persistent_peers.into_iter().map(|s| s.to_string()));
392
393    // Create a vector of futures for DNS lookups
394    let lookup_futures = bootnodes.into_iter().map(|bootnode| async move {
395        let res = tokio::net::lookup_host(&bootnode).await.map(|mut addrs| {
396            addrs
397                .next()
398                .ok_or_else(|| Error::InvalidBootnode(bootnode.to_string()))
399        });
400        (bootnode, res)
401    });
402
403    // Await all futures concurrently
404    let lookup_results = futures::future::join_all(lookup_futures).await;
405
406    for (bootnode, result) in lookup_results {
407        match result {
408            Ok(Ok(addr)) => {
409                connection_initiator.initiate_outbound_connection(addr);
410            }
411            Ok(Err(e)) => {
412                tracing::warn!(%bootnode, "Failed to resolve bootnode address: {e}");
413            }
414            Err(e) => {
415                tracing::warn!(%bootnode, "Failed to perform bootnode DNS lookup: {e}");
416            }
417        }
418    }
419}
420
421/// Creates Subcoin network.
422pub async fn build_network<Block, Client>(
423    client: Arc<Client>,
424    config: Config,
425    import_queue: BlockImportQueue,
426    task_manager: &TaskManager,
427    registry: Option<Registry>,
428    substrate_sync_service: Option<Arc<SyncingService<Block>>>,
429) -> Result<NetworkHandle, Error>
430where
431    Block: BlockT,
432    Client: HeaderBackend<Block> + AuxStore + 'static,
433{
434    let (processor_msg_sender, processor_msg_receiver) =
435        tracing_unbounded("mpsc_subcoin_network_processor", 100);
436
437    let is_major_syncing = Arc::new(AtomicBool::new(false));
438
439    let mut listen_on = config.listen_on;
440    let listener = match TcpListener::bind(&listen_on).await {
441        Ok(listener) => listener,
442        Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
443            tracing::warn!("{listen_on} is occupied, trying any available port.");
444            listen_on.set_port(0);
445            TcpListener::bind(listen_on).await?
446        }
447        Err(err) => return Err(err.into()),
448    };
449
450    let (network_event_sender, network_event_receiver) = tokio::sync::mpsc::unbounded_channel();
451
452    let bandwidth = Bandwidth::new(registry.as_ref());
453
454    let spawn_handle = task_manager.spawn_handle();
455
456    let connection_initiator = ConnectionInitiator::new(
457        config.network,
458        network_event_sender,
459        spawn_handle.clone(),
460        bandwidth.clone(),
461        config.ipv4_only,
462    );
463
464    let (sender, receiver) = tracing_unbounded("mpsc_subcoin_peer_store", 10_000);
465    let (persistent_peer_store, persistent_peers) =
466        PersistentPeerStore::new(&config.base_path, config.max_outbound_peers);
467    let persistent_peer_store_handle =
468        PersistentPeerStoreHandle::new(config.persistent_peer_latency_threshold, sender);
469
470    spawn_handle.spawn(
471        "peer-store",
472        Some("subcoin-networking"),
473        persistent_peer_store.run(receiver),
474    );
475
476    let Config {
477        seednode_only,
478        seednodes,
479        network,
480        max_inbound_peers,
481        max_outbound_peers,
482        min_sync_peer_threshold,
483        sync_strategy,
484        block_sync,
485        sync_target,
486        ..
487    } = config;
488
489    let network_handle = NetworkHandle {
490        processor_msg_sender: processor_msg_sender.clone(),
491        is_major_syncing: is_major_syncing.clone(),
492    };
493
494    let enable_block_sync = matches!(block_sync, BlockSyncOption::AlwaysOn);
495
496    if !enable_block_sync {
497        tracing::info!("Subcoin block sync is disabled on startup");
498    }
499
500    if matches!(block_sync, BlockSyncOption::PausedUntilFastSync) {
501        if let Some(substrate_sync_service) = substrate_sync_service {
502            spawn_handle.spawn(
503                "substrate-fast-sync-watcher",
504                None,
505                watch_substrate_fast_sync(network_handle.clone(), substrate_sync_service),
506            );
507        } else {
508            tracing::warn!(
509                "Block sync from Bitcoin P2P network will not be started automatically on Substrate fast sync completion"
510            );
511        }
512    }
513
514    task_manager.spawn_essential_handle().spawn_blocking(
515        "net-processor",
516        Some("subcoin-networking"),
517        {
518            let is_major_syncing = is_major_syncing.clone();
519            let connection_initiator = connection_initiator.clone();
520            let client = client.clone();
521
522            NetworkProcessor::new(
523                network_processor::Params {
524                    client: client.clone(),
525                    header_verifier: HeaderVerifier::new(client, ChainParams::new(network)),
526                    network_event_receiver,
527                    import_queue,
528                    sync_strategy,
529                    is_major_syncing,
530                    connection_initiator,
531                    max_outbound_peers,
532                    min_sync_peer_threshold,
533                    enable_block_sync,
534                    peer_store: Arc::new(persistent_peer_store_handle),
535                    sync_target,
536                    memory_config: config.memory_config.clone(),
537                },
538                registry.as_ref(),
539            )
540            .run(processor_msg_receiver, bandwidth)
541        },
542    );
543
544    spawn_handle.spawn(
545        "inbound-connection",
546        Some("subcoin-networking"),
547        listen_for_inbound_connections(
548            listener,
549            max_inbound_peers,
550            connection_initiator.clone(),
551            processor_msg_sender,
552        ),
553    );
554
555    spawn_handle.spawn(
556        "init-outbound-connection",
557        Some("subcoin-networking"),
558        initialize_outbound_connections(
559            network,
560            seednodes,
561            seednode_only,
562            persistent_peers,
563            connection_initiator,
564        ),
565    );
566
567    Ok(network_handle)
568}