1mod address_book;
30mod checkpoint;
31mod metrics;
32mod network_api;
33mod network_processor;
34mod peer_connection;
35mod peer_manager;
36mod peer_store;
37mod sync;
38#[cfg(test)]
39mod tests;
40mod transaction_manager;
41
42use crate::metrics::BandwidthMetrics;
43use crate::network_api::NetworkProcessorMessage;
44use crate::network_processor::NetworkProcessor;
45use crate::peer_connection::ConnectionInitiator;
46use crate::peer_store::{PersistentPeerStore, PersistentPeerStoreHandle};
47use bitcoin::p2p::ServiceFlags;
48use bitcoin::{BlockHash, Network as BitcoinNetwork};
49use chrono::prelude::{DateTime, Local};
50use peer_manager::HandshakeState;
51use sc_client_api::{AuxStore, HeaderBackend};
52use sc_consensus_nakamoto::{BlockImportQueue, ChainParams, HeaderVerifier};
53use sc_network_sync::SyncingService;
54use sc_service::TaskManager;
55use sc_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
56use sp_runtime::traits::Block as BlockT;
57use std::net::{AddrParseError, SocketAddr};
58use std::path::PathBuf;
59use std::sync::Arc;
60use std::sync::atomic::{AtomicBool, AtomicU64};
61use substrate_prometheus_endpoint::Registry;
62use tokio::net::TcpListener;
63use tokio::sync::oneshot;
64
65pub use crate::network_api::{
66 NetworkApi, NetworkHandle, NetworkStatus, NoNetwork, OfflineSync, SendTransactionResult,
67 SyncStatus,
68};
69pub use crate::sync::{PeerSync, PeerSyncState};
70
71pub type PeerId = SocketAddr;
73
74pub type Latency = u128;
76
77pub(crate) type LocalTime = DateTime<Local>;
78
79#[derive(Debug, thiserror::Error)]
81pub enum Error {
82 #[error("Invalid bootnode address: {0}")]
83 InvalidBootnode(String),
84 #[error("Received 0 bytes, peer performed an orderly shutdown")]
85 PeerShutdown,
86 #[error("Cannot communicate with the network event stream")]
87 NetworkEventStreamError,
88 #[error("Peer {0:?} not found")]
89 PeerNotFound(PeerId),
90 #[error("Connection of peer {0:?} not found")]
91 ConnectionNotFound(PeerId),
92 #[error("Connecting to the stream timed out")]
93 ConnectionTimeout,
94 #[error("Unexpected handshake state: {0:?}")]
95 UnexpectedHandshakeState(Box<HandshakeState>),
96 #[error("Handshake timeout")]
97 HandshakeTimeout,
98 #[error("Only IPv4 peers are supported")]
99 Ipv4Only,
100 #[error("Peer is not a full node")]
101 NotFullNode,
102 #[error("Peer is not a segwit node")]
103 NotSegwitNode,
104 #[error("Peer's protocol version is too low")]
105 ProtocolVersionTooLow,
106 #[error("Header contains invalid proof-of-block")]
107 BadProofOfWork(BlockHash),
108 #[error("Too many Inventory::Block items in inv message")]
109 InvHasTooManyBlockItems,
110 #[error("Too many entries (> 2000) in headers message")]
111 TooManyHeaders,
112 #[error("Entries in headers message are not in ascending order")]
113 HeadersNotInAscendingOrder,
114 #[error("Too many inventory items")]
115 TooManyInventoryItems,
116 #[error("Ping timeout")]
117 PingTimeout,
118 #[error("Ping latency ({0}) exceeds the threshold")]
119 PingLatencyTooHigh(Latency),
120 #[error("Peer is deprioritized for syncing and has encountered multiple failures")]
121 UnreliablePeer,
122 #[error("Peer's latency ({0} ms) is too high")]
123 SlowPeer(Latency),
124 #[error("Unexpected pong message")]
125 UnexpectedPong,
126 #[error("Bad nonce in pong, expected: {expected}, got: {got}")]
127 BadPong { expected: u64, got: u64 },
128 #[error("Cannot find the parent of the first header in headers message")]
129 MissingFirstHeaderParent,
130 #[error("Other: {0}")]
131 Other(String),
132 #[error(transparent)]
133 IO(#[from] std::io::Error),
134 #[error(transparent)]
135 InvalidAddress(#[from] AddrParseError),
136 #[error(transparent)]
137 Blockchain(#[from] sp_blockchain::Error),
138 #[error(transparent)]
139 Consensus(#[from] sp_consensus::Error),
140 #[error(transparent)]
141 BitcoinIO(#[from] bitcoin::io::Error),
142 #[error(transparent)]
143 BitcoinEncoding(#[from] bitcoin::consensus::encode::Error),
144}
145
146fn validate_outbound_services(services: ServiceFlags) -> Result<(), Error> {
149 if !services.has(ServiceFlags::NETWORK) {
150 return Err(Error::NotFullNode);
151 }
152
153 if !services.has(ServiceFlags::WITNESS) {
154 return Err(Error::NotSegwitNode);
155 }
156
157 Ok(())
158}
159
160#[derive(Debug, Clone, Copy, Default)]
162#[cfg_attr(feature = "cli", derive(clap::ValueEnum))]
163pub enum SyncStrategy {
164 #[default]
166 HeadersFirst,
167 BlocksFirst,
169}
170
171#[derive(Debug, Default)]
172struct Bandwidth {
173 total_bytes_inbound: Arc<AtomicU64>,
174 total_bytes_outbound: Arc<AtomicU64>,
175 metrics: Option<BandwidthMetrics>,
176}
177
178impl Bandwidth {
179 fn new(registry: Option<&Registry>) -> Self {
180 Self {
181 total_bytes_inbound: Arc::new(0.into()),
182 total_bytes_outbound: Arc::new(0.into()),
183 metrics: registry.and_then(|registry| {
184 BandwidthMetrics::register(registry)
185 .map_err(|err| tracing::error!("Failed to register bandwidth metrics: {err}"))
186 .ok()
187 }),
188 }
189 }
190
191 fn report(&self, label: &str, value: u64) {
195 if let Some(metrics) = &self.metrics {
196 metrics.bandwidth.with_label_values(&[label]).set(value);
197 }
198 }
199}
200
201impl Clone for Bandwidth {
202 fn clone(&self) -> Self {
203 Self {
204 total_bytes_inbound: self.total_bytes_inbound.clone(),
205 total_bytes_outbound: self.total_bytes_outbound.clone(),
206 metrics: self.metrics.clone(),
207 }
208 }
209}
210
211pub enum BlockSyncOption {
213 AlwaysOn,
215 Off,
217 PausedUntilFastSync,
220}
221
222#[derive(Debug, Clone)]
224pub struct MemoryConfig {
225 pub max_downloaded_blocks_memory: usize,
227 pub max_blocks_in_memory: usize,
229 pub max_orphan_blocks_memory: usize,
231 pub enable_adaptive_batch_sizing: bool,
233}
234
235impl Default for MemoryConfig {
236 fn default() -> Self {
237 Self {
238 max_downloaded_blocks_memory: 256 * 1024 * 1024, max_blocks_in_memory: 1000,
240 max_orphan_blocks_memory: 64 * 1024 * 1024, enable_adaptive_batch_sizing: true,
242 }
243 }
244}
245
246pub struct Config {
248 pub network: BitcoinNetwork,
250 pub base_path: PathBuf,
252 pub listen_on: PeerId,
254 pub seednodes: Vec<String>,
256 pub seednode_only: bool,
258 pub ipv4_only: bool,
260 pub sync_target: Option<u32>,
262 pub max_outbound_peers: usize,
264 pub max_inbound_peers: usize,
266 pub min_sync_peer_threshold: usize,
268 pub persistent_peer_latency_threshold: u128,
270 pub sync_strategy: SyncStrategy,
272 pub block_sync: BlockSyncOption,
277 pub memory_config: MemoryConfig,
279}
280
281fn builtin_seednodes(network: BitcoinNetwork) -> &'static [&'static str] {
282 match network {
283 BitcoinNetwork::Bitcoin => {
284 &[
285 "seed.bitcoin.sipa.be:8333", "dnsseed.bluematt.me:8333", "dnsseed.bitcoin.dashjr-list-of-p2p-nodes.us:8333", "seed.bitcoinstats.com:8333", "seed.bitcoin.jonasschnelli.ch:8333", "seed.btc.petertodd.net:8333", "seed.bitcoin.sprovoost.nl:8333", "dnsseed.emzy.de:8333", "seed.bitcoin.wiz.biz:8333", "seed.mainnet.achownodes.xyz:8333", ]
296 }
297 BitcoinNetwork::Testnet => &[
298 "testnet-seed.bitcoin.jonasschnelli.ch:18333",
299 "seed.tbtc.petertodd.net:18333",
300 "seed.testnet.bitcoin.sprovoost.nl:18333",
301 "testnet-seed.bluematt.me:18333",
302 "testnet-seed.achownodes.xyz:18333",
303 ],
304 _ => &[],
305 }
306}
307
308async fn watch_substrate_fast_sync<Block>(
312 subcoin_network_handle: NetworkHandle,
313 substate_sync_service: Arc<SyncingService<Block>>,
314) where
315 Block: BlockT,
316{
317 let mut interval = tokio::time::interval(std::time::Duration::from_millis(100));
318
319 let mut state_sync_has_started = false;
320
321 loop {
322 interval.tick().await;
323
324 let state_sync_is_active = substate_sync_service
325 .status()
326 .await
327 .map(|status| status.state_sync.is_some())
328 .unwrap_or(false);
329
330 if state_sync_is_active {
331 if !state_sync_has_started {
332 state_sync_has_started = true;
333 }
334 } else if state_sync_has_started {
335 tracing::info!("Detected state sync is complete, starting Subcoin block sync");
336 subcoin_network_handle.start_block_sync();
337 return;
338 }
339 }
340}
341
342async fn listen_for_inbound_connections(
343 listener: TcpListener,
344 max_inbound_peers: usize,
345 connection_initiator: ConnectionInitiator,
346 processor_msg_sender: TracingUnboundedSender<NetworkProcessorMessage>,
347) {
348 let local_addr = listener
349 .local_addr()
350 .expect("Local listen addr must be available; qed");
351
352 tracing::info!("🔊 Listening on {local_addr:?}",);
353
354 while let Ok((socket, peer_addr)) = listener.accept().await {
355 let (sender, receiver) = oneshot::channel();
356
357 if processor_msg_sender
358 .unbounded_send(NetworkProcessorMessage::RequestInboundPeersCount(sender))
359 .is_err()
360 {
361 return;
362 }
363
364 let Ok(inbound_peers_count) = receiver.await else {
365 return;
366 };
367
368 if inbound_peers_count < max_inbound_peers {
369 tracing::debug!(?peer_addr, "New peer accepted");
370
371 if let Err(err) = connection_initiator.initiate_inbound_connection(socket) {
372 tracing::debug!(?err, ?peer_addr, "Failed to initiate inbound connection");
373 }
374 }
375 }
376}
377
378async fn initialize_outbound_connections(
379 network: bitcoin::Network,
380 seednodes: Vec<String>,
381 seednode_only: bool,
382 persistent_peers: Vec<PeerId>,
383 connection_initiator: ConnectionInitiator,
384) {
385 let mut bootnodes = seednodes;
386
387 if !seednode_only {
388 bootnodes.extend(builtin_seednodes(network).iter().map(|s| s.to_string()));
389 }
390
391 bootnodes.extend(persistent_peers.into_iter().map(|s| s.to_string()));
392
393 let lookup_futures = bootnodes.into_iter().map(|bootnode| async move {
395 let res = tokio::net::lookup_host(&bootnode).await.map(|mut addrs| {
396 addrs
397 .next()
398 .ok_or_else(|| Error::InvalidBootnode(bootnode.to_string()))
399 });
400 (bootnode, res)
401 });
402
403 let lookup_results = futures::future::join_all(lookup_futures).await;
405
406 for (bootnode, result) in lookup_results {
407 match result {
408 Ok(Ok(addr)) => {
409 connection_initiator.initiate_outbound_connection(addr);
410 }
411 Ok(Err(e)) => {
412 tracing::warn!(%bootnode, "Failed to resolve bootnode address: {e}");
413 }
414 Err(e) => {
415 tracing::warn!(%bootnode, "Failed to perform bootnode DNS lookup: {e}");
416 }
417 }
418 }
419}
420
421pub async fn build_network<Block, Client>(
423 client: Arc<Client>,
424 config: Config,
425 import_queue: BlockImportQueue,
426 task_manager: &TaskManager,
427 registry: Option<Registry>,
428 substrate_sync_service: Option<Arc<SyncingService<Block>>>,
429) -> Result<NetworkHandle, Error>
430where
431 Block: BlockT,
432 Client: HeaderBackend<Block> + AuxStore + 'static,
433{
434 let (processor_msg_sender, processor_msg_receiver) =
435 tracing_unbounded("mpsc_subcoin_network_processor", 100);
436
437 let is_major_syncing = Arc::new(AtomicBool::new(false));
438
439 let mut listen_on = config.listen_on;
440 let listener = match TcpListener::bind(&listen_on).await {
441 Ok(listener) => listener,
442 Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
443 tracing::warn!("{listen_on} is occupied, trying any available port.");
444 listen_on.set_port(0);
445 TcpListener::bind(listen_on).await?
446 }
447 Err(err) => return Err(err.into()),
448 };
449
450 let (network_event_sender, network_event_receiver) = tokio::sync::mpsc::unbounded_channel();
451
452 let bandwidth = Bandwidth::new(registry.as_ref());
453
454 let spawn_handle = task_manager.spawn_handle();
455
456 let connection_initiator = ConnectionInitiator::new(
457 config.network,
458 network_event_sender,
459 spawn_handle.clone(),
460 bandwidth.clone(),
461 config.ipv4_only,
462 );
463
464 let (sender, receiver) = tracing_unbounded("mpsc_subcoin_peer_store", 10_000);
465 let (persistent_peer_store, persistent_peers) =
466 PersistentPeerStore::new(&config.base_path, config.max_outbound_peers);
467 let persistent_peer_store_handle =
468 PersistentPeerStoreHandle::new(config.persistent_peer_latency_threshold, sender);
469
470 spawn_handle.spawn(
471 "peer-store",
472 Some("subcoin-networking"),
473 persistent_peer_store.run(receiver),
474 );
475
476 let Config {
477 seednode_only,
478 seednodes,
479 network,
480 max_inbound_peers,
481 max_outbound_peers,
482 min_sync_peer_threshold,
483 sync_strategy,
484 block_sync,
485 sync_target,
486 ..
487 } = config;
488
489 let network_handle = NetworkHandle {
490 processor_msg_sender: processor_msg_sender.clone(),
491 is_major_syncing: is_major_syncing.clone(),
492 };
493
494 let enable_block_sync = matches!(block_sync, BlockSyncOption::AlwaysOn);
495
496 if !enable_block_sync {
497 tracing::info!("Subcoin block sync is disabled on startup");
498 }
499
500 if matches!(block_sync, BlockSyncOption::PausedUntilFastSync) {
501 if let Some(substrate_sync_service) = substrate_sync_service {
502 spawn_handle.spawn(
503 "substrate-fast-sync-watcher",
504 None,
505 watch_substrate_fast_sync(network_handle.clone(), substrate_sync_service),
506 );
507 } else {
508 tracing::warn!(
509 "Block sync from Bitcoin P2P network will not be started automatically on Substrate fast sync completion"
510 );
511 }
512 }
513
514 task_manager.spawn_essential_handle().spawn_blocking(
515 "net-processor",
516 Some("subcoin-networking"),
517 {
518 let is_major_syncing = is_major_syncing.clone();
519 let connection_initiator = connection_initiator.clone();
520 let client = client.clone();
521
522 NetworkProcessor::new(
523 network_processor::Params {
524 client: client.clone(),
525 header_verifier: HeaderVerifier::new(client, ChainParams::new(network)),
526 network_event_receiver,
527 import_queue,
528 sync_strategy,
529 is_major_syncing,
530 connection_initiator,
531 max_outbound_peers,
532 min_sync_peer_threshold,
533 enable_block_sync,
534 peer_store: Arc::new(persistent_peer_store_handle),
535 sync_target,
536 memory_config: config.memory_config.clone(),
537 },
538 registry.as_ref(),
539 )
540 .run(processor_msg_receiver, bandwidth)
541 },
542 );
543
544 spawn_handle.spawn(
545 "inbound-connection",
546 Some("subcoin-networking"),
547 listen_for_inbound_connections(
548 listener,
549 max_inbound_peers,
550 connection_initiator.clone(),
551 processor_msg_sender,
552 ),
553 );
554
555 spawn_handle.spawn(
556 "init-outbound-connection",
557 Some("subcoin-networking"),
558 initialize_outbound_connections(
559 network,
560 seednodes,
561 seednode_only,
562 persistent_peers,
563 connection_initiator,
564 ),
565 );
566
567 Ok(network_handle)
568}