subcoin_network/
sync.rs

1mod block_downloader;
2mod orphan_blocks_pool;
3mod strategy;
4
5use self::strategy::{BlocksFirstStrategy, HeadersFirstStrategy};
6use crate::peer_manager::NewPeer;
7use crate::peer_store::PeerStore;
8use crate::{Error, Latency, MemoryConfig, PeerId, SyncStatus, SyncStrategy};
9use bitcoin::blockdata::block::Header as BitcoinHeader;
10use bitcoin::p2p::message_blockdata::Inventory;
11use bitcoin::{Block as BitcoinBlock, BlockHash};
12use sc_client_api::{AuxStore, HeaderBackend};
13use sc_consensus_nakamoto::{
14    BlockImportQueue, HeaderVerifier, ImportBlocks, ImportManyBlocksResult,
15};
16use serde::{Deserialize, Serialize};
17use sp_consensus::BlockOrigin;
18use sp_runtime::traits::Block as BlockT;
19use std::cmp::Ordering as CmpOrdering;
20use std::collections::HashMap;
21use std::marker::PhantomData;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, Ordering};
24use subcoin_primitives::ClientExt;
25
26// Do major sync when the current tip falls behind the network by 144 blocks (roughly one day).
27const MAJOR_SYNC_GAP: u32 = 144;
28
29/// Minimum required improvement ratio in peer score to switch to a better peer.
30/// Since lower scores indicate better peers, we switch when current_score / best_score > threshold.
31/// A value of 1.2 means the current peer must be at least 20% worse to justify switching.
32const PEER_SCORE_IMPROVEMENT_THRESHOLD: f64 = 1.2;
33
34// Define a constant for the low ping latency cutoff, in milliseconds.
35const LOW_LATENCY_CUTOFF: Latency = 20;
36
37/// Maximum number of syncing retries for a deprioritized peer.
38const MAX_STALLS: usize = 5;
39
40/// Weight for latency in peer scoring (lower is better).
41const LATENCY_WEIGHT: f64 = 0.7;
42
43/// Weight for reliability in peer scoring (higher is better).
44const RELIABILITY_WEIGHT: f64 = 0.3;
45
46/// Base reliability score for peers with no stalls.
47const BASE_RELIABILITY_SCORE: f64 = 100.0;
48
49/// The state of syncing between a Peer and ourselves.
50#[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
51#[serde(rename_all = "camelCase")]
52pub enum PeerSyncState {
53    /// Available for sync requests.
54    Available,
55    /// The peer has been deprioritized due to past syncing issues (e.g., stalling).
56    Deprioritized {
57        /// Number of times the peer has stalled.
58        stalled_count: usize,
59    },
60    /// Actively downloading new blocks, starting from the given Number.
61    DownloadingNew { start: u32 },
62}
63
64impl PeerSyncState {
65    /// Returns `true` if the peer is available for syncing.
66    pub fn is_available(&self) -> bool {
67        matches!(self, Self::Available)
68    }
69
70    fn stalled_count(&self) -> usize {
71        match self {
72            Self::Deprioritized { stalled_count } => *stalled_count,
73            _ => 0,
74        }
75    }
76
77    /// Determines if the peer is permanently deprioritized based on the stall count.
78    fn is_permanently_deprioritized(&self) -> bool {
79        match self {
80            PeerSyncState::Deprioritized { stalled_count } => *stalled_count > MAX_STALLS,
81            _ => false,
82        }
83    }
84}
85
86/// Contains all the data about a Peer that we are trying to sync with.
87#[derive(Debug, Clone, Serialize, Deserialize)]
88#[serde(rename_all = "camelCase")]
89pub struct PeerSync {
90    /// Peer id of this peer.
91    pub peer_id: PeerId,
92    /// The number of the best block that we've seen for this peer.
93    pub best_number: u32,
94    /// Latency of connection to this peer.
95    pub latency: Latency,
96    /// The state of syncing this peer is in for us, generally categories
97    /// into `Available` or "busy" with something as defined by `PeerSyncState`.
98    pub state: PeerSyncState,
99}
100
101impl PeerSync {
102    /// Calculates a peer score based on latency and reliability.
103    /// Lower scores indicate better peers.
104    pub fn peer_score(&self) -> f64 {
105        // Normalize latency (higher latency = higher score = worse)
106        let latency_score = self.latency as f64;
107
108        // Calculate reliability score (more stalls = lower reliability = higher score = worse)
109        let reliability_penalty = self.state.stalled_count() as f64 * 20.0; // Each stall adds 20 points penalty
110        let reliability_score = BASE_RELIABILITY_SCORE + reliability_penalty;
111
112        // Combine scores with weights
113        (latency_score * LATENCY_WEIGHT) + (reliability_score * RELIABILITY_WEIGHT)
114    }
115}
116
117/// Locator based sync request, for requesting either Headers or Blocks.
118#[derive(Debug, PartialEq, Eq)]
119pub struct LocatorRequest {
120    pub locator_hashes: Vec<BlockHash>,
121    pub stop_hash: BlockHash,
122    pub to: PeerId,
123}
124
125/// Represents different kinds of sync requests.
126#[derive(Debug, PartialEq, Eq)]
127pub enum SyncRequest {
128    /// Request headers via `getheaders`.
129    Header(LocatorRequest),
130    /// Request inventories via `getblocks`.
131    Inventory(LocatorRequest),
132    /// Request blocks via `getdata`.
133    Data(Vec<Inventory>, PeerId),
134}
135
136/// Represents actions that can be taken during the syncing.
137#[derive(Debug)]
138pub enum SyncAction {
139    /// Fetch headers, blocks and data.
140    Request(SyncRequest),
141    /// Transitions to a Blocks-First sync after Headers-First sync
142    /// compltes, to fetch the most recent blocks.
143    SwitchToBlocksFirstSync,
144    /// Disconnect from the peer for the given reason.
145    DisconnectPeer(PeerId, Error),
146    /// Deprioritize the specified peer, restarting the current sync
147    /// with other candidates if available.
148    RestartSyncWithStalledPeer(PeerId),
149    /// Blocks-First sync finished and sets the syncing state to idle.
150    SetIdle,
151    /// No action needed.
152    None,
153}
154
155impl SyncAction {
156    pub(crate) fn get_headers(request: LocatorRequest) -> Self {
157        Self::Request(SyncRequest::Header(request))
158    }
159
160    pub(crate) fn get_inventory(request: LocatorRequest) -> Self {
161        Self::Request(SyncRequest::Inventory(request))
162    }
163
164    pub(crate) fn get_data(inv: Vec<Inventory>, from: PeerId) -> Self {
165        Self::Request(SyncRequest::Data(inv, from))
166    }
167}
168
169#[derive(Debug)]
170pub(crate) enum RestartReason {
171    Stalled,
172    Disconnected,
173}
174
175// This enum encapsulates the various strategies and states a node
176// might be in during the sync process.
177enum Syncing<Block, Client> {
178    /// Blocks-First sync.
179    BlocksFirst(Box<BlocksFirstStrategy<Block, Client>>),
180    /// Headers-First sync.
181    HeadersFirst(Box<HeadersFirstStrategy<Block, Client>>),
182    /// Not syncing.
183    ///
184    /// This could indicate that the node is either fully synced
185    /// or is waiting for more peers to resume syncing.
186    Idle,
187}
188
189impl<Block, Client> Syncing<Block, Client> {
190    fn is_major_syncing(&self) -> bool {
191        matches!(self, Self::BlocksFirst(_) | Self::HeadersFirst(_))
192    }
193}
194
195/// The main data structure which contains all the state for syncing.
196pub(crate) struct ChainSync<Block, Client> {
197    /// Chain client.
198    client: Arc<Client>,
199    /// Block header verifier.
200    header_verifier: HeaderVerifier<Block, Client>,
201    /// The active peers that we are using to sync and their PeerSync status
202    pub(crate) peers: HashMap<PeerId, PeerSync>,
203    /// Current syncing state.
204    syncing: Syncing<Block, Client>,
205    /// Handle of the import queue.
206    pub(crate) import_queue: BlockImportQueue,
207    /// Block syncing strategy.
208    sync_strategy: SyncStrategy,
209    /// Are we in major syncing?
210    is_major_syncing: Arc<AtomicBool>,
211    /// Whether to sync blocks from Bitcoin network.
212    enable_block_sync: bool,
213    /// Handle of peer store.
214    peer_store: Arc<dyn PeerStore>,
215    /// Target block of the syncing process.
216    sync_target: Option<u32>,
217    min_sync_peer_threshold: usize,
218    /// Memory management configuration.
219    memory_config: MemoryConfig,
220    _phantom: PhantomData<Block>,
221}
222
223impl<Block, Client> ChainSync<Block, Client>
224where
225    Block: BlockT,
226    Client: HeaderBackend<Block> + AuxStore,
227{
228    /// Constructs a new instance of [`ChainSync`].
229    #[allow(clippy::too_many_arguments)]
230    pub(super) fn new(
231        client: Arc<Client>,
232        header_verifier: HeaderVerifier<Block, Client>,
233        import_queue: BlockImportQueue,
234        sync_strategy: SyncStrategy,
235        is_major_syncing: Arc<AtomicBool>,
236        enable_block_sync: bool,
237        peer_store: Arc<dyn PeerStore>,
238        sync_target: Option<u32>,
239        min_sync_peer_threshold: usize,
240        memory_config: MemoryConfig,
241    ) -> Self {
242        Self {
243            client,
244            header_verifier,
245            peers: HashMap::new(),
246            syncing: Syncing::Idle,
247            import_queue,
248            sync_strategy,
249            is_major_syncing,
250            enable_block_sync,
251            peer_store,
252            sync_target,
253            min_sync_peer_threshold,
254            memory_config,
255            _phantom: Default::default(),
256        }
257    }
258
259    pub(super) fn sync_status(&self) -> SyncStatus {
260        match &self.syncing {
261            Syncing::Idle => SyncStatus::Idle,
262            Syncing::BlocksFirst(strategy) => strategy.sync_status(),
263            Syncing::HeadersFirst(strategy) => strategy.sync_status(),
264        }
265    }
266
267    pub(super) fn is_idle(&self) -> bool {
268        matches!(self.syncing, Syncing::Idle)
269    }
270
271    /// Check if a peer is currently the active sync peer.
272    pub(super) fn is_active_sync_peer(&self, peer_id: PeerId) -> bool {
273        self.peers
274            .get(&peer_id)
275            .map(|peer| matches!(peer.state, PeerSyncState::DownloadingNew { .. }))
276            .unwrap_or(false)
277    }
278
279    pub(super) fn on_tick(&mut self) -> SyncAction {
280        match &mut self.syncing {
281            Syncing::Idle => SyncAction::None,
282            Syncing::BlocksFirst(strategy) => strategy.on_tick(),
283            Syncing::HeadersFirst(strategy) => strategy.on_tick(),
284        }
285    }
286
287    pub(super) async fn wait_for_block_import_results(&mut self) -> ImportManyBlocksResult {
288        self.import_queue.block_import_results().await
289    }
290
291    pub(super) fn unreliable_peers(&self) -> Vec<PeerId> {
292        self.peers
293            .iter()
294            .filter_map(|(peer_id, peer)| {
295                peer.state
296                    .is_permanently_deprioritized()
297                    .then_some(peer_id)
298                    .copied()
299            })
300            .collect()
301    }
302
303    /// Removes the given peer from peers of chain sync.
304    pub(super) fn disconnect(&mut self, peer_id: PeerId) {
305        if let Some(removed_peer) = self.peers.remove(&peer_id) {
306            // We currently support only one syncing peer, this logic needs to be
307            // refactored once multiple syncing peers are supported.
308            if matches!(removed_peer.state, PeerSyncState::DownloadingNew { .. }) {
309                self.restart_sync(removed_peer.peer_id, RestartReason::Disconnected);
310            }
311        }
312    }
313
314    pub(super) fn update_peer_best(&mut self, peer_id: PeerId, peer_best: u32) {
315        let mut peer_best_updated = false;
316
317        self.peers.entry(peer_id).and_modify(|e| {
318            if peer_best > e.best_number {
319                tracing::debug!(
320                    "Tip of {peer_id:?} updated from #{} to #{peer_best}",
321                    e.best_number
322                );
323                e.best_number = peer_best;
324                peer_best_updated = true;
325            }
326        });
327
328        if peer_best_updated {
329            match &mut self.syncing {
330                Syncing::Idle => {}
331                Syncing::BlocksFirst(strategy) => strategy.set_peer_best(peer_id, peer_best),
332                Syncing::HeadersFirst(strategy) => strategy.set_peer_best(peer_id, peer_best),
333            }
334        }
335    }
336
337    /// Attempt to find the best available peer using a composite scoring algorithm
338    /// that considers both latency and reliability.
339    fn select_next_peer_for_sync(
340        &mut self,
341        our_best: u32,
342        excluded_peer: PeerId,
343    ) -> Option<PeerId> {
344        // First try available peers with the new scoring algorithm
345        let best_available = self
346            .peers
347            .values()
348            .filter(|peer| {
349                peer.peer_id != excluded_peer
350                    && peer.best_number > our_best
351                    && peer.state.is_available()
352            })
353            .min_by(|a, b| {
354                let score_a = a.peer_score();
355                let score_b = b.peer_score();
356                score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
357            })
358            .map(|peer| peer.peer_id);
359
360        best_available.or_else(|| {
361            // Fall back to deprioritized peers, also using scoring
362            self.peers
363                .values()
364                .filter(|peer| {
365                    peer.peer_id != excluded_peer
366                        && peer.best_number > our_best
367                        && !peer.state.is_permanently_deprioritized()
368                })
369                .min_by(|a, b| {
370                    let score_a = a.peer_score();
371                    let score_b = b.peer_score();
372                    score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
373                })
374                .map(|peer| peer.peer_id)
375        })
376    }
377
378    /// Attempts to restart the sync based on the reason provided.
379    ///
380    /// Returns `true` if the sync is restarted with a new peer.
381    pub(super) fn restart_sync(&mut self, prior_peer_id: PeerId, reason: RestartReason) {
382        let our_best = self.client.best_number();
383
384        let Some(new_peer_id) = self.select_next_peer_for_sync(our_best, prior_peer_id) else {
385            if let Some(median_seen_block) = self.median_seen()
386                && median_seen_block <= our_best
387            {
388                let best_seen_block = self.peers.values().map(|p| p.best_number).max();
389
390                // We are synced to the median block seen by our peers, but this may
391                // not be the network's tip.
392                //
393                // Transition to idle unless more blocks are announced.
394                tracing::debug!(
395                    best_seen_block,
396                    median_seen_block,
397                    our_best,
398                    "Synced to the majority of peers, switching to Idle"
399                );
400                self.update_syncing_state(Syncing::Idle);
401                return;
402            }
403
404            // No new sync candidate, keep it as is.
405            // TODO: handle this properly.
406            tracing::debug!(
407                ?prior_peer_id,
408                "⚠️ Attempting to restart sync, but no new sync candidate available"
409            );
410
411            return;
412        };
413
414        {
415            let Some(new_peer) = self.peers.get_mut(&new_peer_id) else {
416                tracing::error!("Corrupted state, next peer {new_peer_id} missing from peer list");
417                return;
418            };
419
420            tracing::debug!(?reason, prior_peer = ?prior_peer_id, ?new_peer, "🔄 Sync restarted");
421            new_peer.state = PeerSyncState::DownloadingNew { start: our_best };
422
423            let target_block_number = target_block_number(self.sync_target, new_peer.best_number);
424
425            match &mut self.syncing {
426                Syncing::BlocksFirst(strategy) => {
427                    strategy.restart(new_peer.peer_id, target_block_number);
428                }
429                Syncing::HeadersFirst(strategy) => {
430                    strategy.restart(new_peer.peer_id, target_block_number);
431                }
432                Syncing::Idle => {}
433            }
434        }
435
436        match reason {
437            RestartReason::Stalled => {
438                self.peers.entry(prior_peer_id).and_modify(|p| {
439                    let current_stalled_count = p.state.stalled_count();
440                    p.state = PeerSyncState::Deprioritized {
441                        stalled_count: current_stalled_count + 1,
442                    };
443                });
444            }
445            RestartReason::Disconnected => {
446                // Nothing to be done, peer is already removed from the peer list.
447            }
448        }
449    }
450
451    /// Returns the median block number advertised by our peers.
452    fn median_seen(&self) -> Option<u32> {
453        let mut best_seens = self
454            .peers
455            .values()
456            .map(|p| p.best_number)
457            .collect::<Vec<_>>();
458
459        if best_seens.is_empty() {
460            None
461        } else {
462            let middle = best_seens.len() / 2;
463
464            Some(*best_seens.select_nth_unstable(middle).1)
465        }
466    }
467
468    pub(super) fn update_peer_latency(&mut self, peer_id: PeerId, avg_latency: Latency) {
469        self.peers.entry(peer_id).and_modify(|peer| {
470            peer.latency = avg_latency;
471        });
472        self.update_sync_peer_on_lower_latency();
473    }
474
475    pub(super) fn update_sync_peer_on_lower_latency(&mut self) {
476        let maybe_sync_peer_id = match &self.syncing {
477            Syncing::Idle => return,
478            Syncing::BlocksFirst(strategy) => strategy.can_swap_sync_peer(),
479            Syncing::HeadersFirst(strategy) => strategy.can_swap_sync_peer(),
480        };
481
482        let Some(current_sync_peer_id) = maybe_sync_peer_id else {
483            return;
484        };
485
486        let Some(current_latency) = self
487            .peers
488            .get(&current_sync_peer_id)
489            .map(|peer| peer.latency)
490        else {
491            return;
492        };
493
494        if current_latency <= LOW_LATENCY_CUTOFF {
495            tracing::trace!(
496                peer_id = ?current_sync_peer_id,
497                "Skipping sync peer update as the current latency ({current_latency}ms) is already low enough"
498            );
499        }
500
501        let our_best = self.client.best_number();
502
503        // Find the peer with best score (considering both latency and reliability).
504        let Some(best_sync_peer) = self
505            .peers
506            .values()
507            .filter(|peer| {
508                peer.peer_id != current_sync_peer_id
509                    && peer.best_number > our_best
510                    && peer.state.is_available()
511            })
512            .min_by(|a, b| {
513                let score_a = a.peer_score();
514                let score_b = b.peer_score();
515                score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
516            })
517        else {
518            return;
519        };
520
521        let Some(current_peer) = self.peers.get(&current_sync_peer_id) else {
522            tracing::debug!(
523                peer_id = ?current_sync_peer_id,
524                "Current sync peer not found in peer list, skipping peer update"
525            );
526            return;
527        };
528
529        let current_score = current_peer.peer_score();
530        let best_score = best_sync_peer.peer_score();
531
532        // Update sync peer if the score improvement is significant (lower score is better).
533        // Protect against division by zero and ensure best_score is positive.
534        if best_score > 0.0 && current_score / best_score > PEER_SCORE_IMPROVEMENT_THRESHOLD {
535            let peer_id = best_sync_peer.peer_id;
536            let target_block_number =
537                target_block_number(self.sync_target, best_sync_peer.best_number);
538
539            let sync_peer_updated = match &mut self.syncing {
540                Syncing::BlocksFirst(strategy) => {
541                    strategy.swap_sync_peer(peer_id, target_block_number);
542                    true
543                }
544                Syncing::HeadersFirst(strategy) => {
545                    strategy.swap_sync_peer(peer_id, target_block_number);
546                    true
547                }
548                Syncing::Idle => unreachable!("Must not be Idle as checked; qed"),
549            };
550
551            if sync_peer_updated {
552                tracing::debug!(
553                    old_peer_id = ?current_sync_peer_id,
554                    new_peer_id = ?peer_id,
555                    old_score = %current_score,
556                    new_score = %best_score,
557                    "🔧 Sync peer updated to a peer with better score (latency: {} ms -> {} ms)",
558                    current_peer.latency,
559                    best_sync_peer.latency,
560                );
561                self.peers.entry(current_sync_peer_id).and_modify(|peer| {
562                    peer.state = PeerSyncState::Available;
563                });
564                self.peers.entry(peer_id).and_modify(|peer| {
565                    peer.state = PeerSyncState::DownloadingNew { start: our_best };
566                });
567            }
568        }
569    }
570
571    /// Add new peer to the chain sync component and potentially starts to synchronize the network.
572    pub(super) fn add_new_peer(&mut self, new_peer: NewPeer) -> SyncAction {
573        let NewPeer {
574            peer_id,
575            best_number,
576            latency,
577        } = new_peer;
578
579        let new_peer = PeerSync {
580            peer_id,
581            best_number,
582            latency,
583            state: PeerSyncState::Available,
584        };
585
586        self.peers.insert(peer_id, new_peer);
587
588        if self.enable_block_sync {
589            self.attempt_sync_start()
590        } else {
591            SyncAction::None
592        }
593    }
594
595    pub(super) fn start_block_sync(&mut self) -> SyncAction {
596        self.enable_block_sync = true;
597        self.attempt_sync_start()
598    }
599
600    fn attempt_sync_start(&mut self) -> SyncAction {
601        if self.syncing.is_major_syncing() {
602            return SyncAction::None;
603        }
604
605        if self.peers.len() < self.min_sync_peer_threshold {
606            tracing::debug!(
607                "Waiting for more sync peers, discovered {} peers, require {} peers",
608                self.peers.len(),
609                self.min_sync_peer_threshold
610            );
611            return SyncAction::None;
612        }
613
614        let our_best = self.client.best_number();
615
616        if let Some(sync_target) = self.sync_target
617            && our_best >= sync_target
618        {
619            return SyncAction::None;
620        }
621
622        let find_best_available_peer = || {
623            self.peers
624                .iter()
625                .filter(|(_peer_id, peer)| peer.best_number > our_best && peer.state.is_available())
626                .min_by(|(_, a), (_, b)| {
627                    let score_a = a.peer_score();
628                    let score_b = b.peer_score();
629                    score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
630                })
631                .map(|(peer_id, _peer)| peer_id)
632        };
633
634        let find_best_deprioritized_peer = || {
635            self.peers
636                .iter()
637                .filter(|(_peer_id, peer)| peer.best_number > our_best)
638                .filter_map(|(peer_id, peer)| {
639                    if let PeerSyncState::Deprioritized { stalled_count } = peer.state {
640                        if stalled_count > MAX_STALLS {
641                            None
642                        } else {
643                            Some((peer_id, stalled_count, peer.latency))
644                        }
645                    } else {
646                        None
647                    }
648                })
649                .min_by(
650                    |(_, stalled_count_a, latency_a), (_, stalled_count_b, latency_b)| {
651                        // First, compare stalled_count, then latency if stalled_count is equal
652                        match stalled_count_a.cmp(stalled_count_b) {
653                            CmpOrdering::Equal => latency_a.cmp(latency_b), // compare latency if stalled_count is the same
654                            other => other, // otherwise, return the comparison of stalled_count
655                        }
656                    },
657                )
658                .map(|(peer_id, _stalled_count, _latency)| peer_id)
659        };
660
661        let Some(next_peer_id) = find_best_available_peer()
662            .or_else(find_best_deprioritized_peer)
663            .copied()
664        else {
665            return SyncAction::None;
666        };
667
668        let Some(next_peer) = self.peers.get_mut(&next_peer_id) else {
669            return SyncAction::None;
670        };
671
672        let client = self.client.clone();
673        let peer_store = self.peer_store.clone();
674
675        let peer_best = next_peer.best_number;
676        let require_major_sync = peer_best - our_best > MAJOR_SYNC_GAP;
677
678        let target_block_number = target_block_number(self.sync_target, peer_best);
679
680        // Start major syncing if the gap is significant.
681        let (new_syncing, sync_action) = if require_major_sync {
682            let blocks_first = our_best >= crate::checkpoint::last_checkpoint_height()
683                || matches!(self.sync_strategy, SyncStrategy::BlocksFirst);
684
685            tracing::debug!(
686                latency = ?next_peer.latency,
687                "⏩ Starting major sync ({}) from {next_peer_id:?} at #{our_best}",
688                if blocks_first { "blocks-first" } else { "headers-first" }
689            );
690
691            if blocks_first {
692                let (sync_strategy, sync_action) = BlocksFirstStrategy::new(
693                    client,
694                    next_peer_id,
695                    target_block_number,
696                    peer_store,
697                    self.memory_config.clone(),
698                );
699                (Syncing::BlocksFirst(Box::new(sync_strategy)), sync_action)
700            } else {
701                let (sync_strategy, sync_action) = HeadersFirstStrategy::new(
702                    client,
703                    self.header_verifier.clone(),
704                    next_peer_id,
705                    target_block_number,
706                    peer_store,
707                    self.memory_config.clone(),
708                );
709                (Syncing::HeadersFirst(Box::new(sync_strategy)), sync_action)
710            }
711        } else {
712            let (sync_strategy, sync_action) = BlocksFirstStrategy::new(
713                client,
714                next_peer_id,
715                target_block_number,
716                peer_store,
717                self.memory_config.clone(),
718            );
719            (Syncing::BlocksFirst(Box::new(sync_strategy)), sync_action)
720        };
721
722        next_peer.state = PeerSyncState::DownloadingNew { start: our_best };
723        self.update_syncing_state(new_syncing);
724
725        sync_action
726    }
727
728    pub(super) fn start_blocks_first_sync(&mut self) -> Option<SyncAction> {
729        if matches!(self.syncing, Syncing::BlocksFirst(_)) {
730            return None;
731        }
732
733        // Import the potential remaining blocks downloaded by Headers-First sync.
734        self.import_pending_blocks();
735
736        let our_best = self.client.best_number();
737
738        let Some(best_peer) = self
739            .peers
740            .values()
741            .filter(|peer| peer.best_number > our_best && peer.state.is_available())
742            .min_by(|a, b| {
743                let score_a = a.peer_score();
744                let score_b = b.peer_score();
745                score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
746            })
747        else {
748            self.update_syncing_state(Syncing::Idle);
749            return None;
750        };
751
752        let (blocks_first_strategy, sync_action) = BlocksFirstStrategy::new(
753            self.client.clone(),
754            best_peer.peer_id,
755            best_peer.best_number,
756            self.peer_store.clone(),
757            self.memory_config.clone(),
758        );
759
760        tracing::debug!("Headers-First sync completed, continuing with blocks-first sync");
761        self.update_syncing_state(Syncing::BlocksFirst(Box::new(blocks_first_strategy)));
762
763        Some(sync_action)
764    }
765
766    fn update_syncing_state(&mut self, new: Syncing<Block, Client>) {
767        self.syncing = new;
768        self.is_major_syncing
769            .store(self.syncing.is_major_syncing(), Ordering::Relaxed);
770    }
771
772    pub(super) fn set_idle(&mut self) {
773        self.import_pending_blocks();
774
775        tracing::debug!(
776            best_number = self.client.best_number(),
777            "Blocks-First sync completed, switching to Syncing::Idle"
778        );
779        self.update_syncing_state(Syncing::Idle);
780    }
781
782    // NOTE: `inv` can be received unsolicited as an announcement of a new block,
783    // or in reply to `getblocks`.
784    pub(super) fn on_inv(&mut self, inventories: Vec<Inventory>, from: PeerId) -> SyncAction {
785        match &mut self.syncing {
786            Syncing::Idle => SyncAction::None,
787            Syncing::BlocksFirst(strategy) => strategy.on_inv(inventories, from),
788            Syncing::HeadersFirst(_) => SyncAction::None,
789        }
790    }
791
792    pub(super) fn on_block(&mut self, block: BitcoinBlock, from: PeerId) -> SyncAction {
793        match &mut self.syncing {
794            Syncing::Idle => SyncAction::None,
795            Syncing::BlocksFirst(strategy) => strategy.on_block(block, from),
796            Syncing::HeadersFirst(strategy) => strategy.on_block(block, from),
797        }
798    }
799
800    pub(super) fn on_headers(&mut self, headers: Vec<BitcoinHeader>, from: PeerId) -> SyncAction {
801        match &mut self.syncing {
802            Syncing::HeadersFirst(strategy) => strategy.on_headers(headers, from),
803            Syncing::BlocksFirst(_) | Syncing::Idle => {
804                tracing::debug!(
805                    ?from,
806                    "Ignored headers: {:?}",
807                    headers
808                        .iter()
809                        .map(|header| header.block_hash())
810                        .collect::<Vec<_>>()
811                );
812                SyncAction::None
813            }
814        }
815    }
816
817    pub(super) fn on_blocks_processed(&mut self, results: ImportManyBlocksResult) {
818        let block_downloader = match &mut self.syncing {
819            Syncing::Idle => return,
820            Syncing::BlocksFirst(strategy) => strategy.block_downloader(),
821            Syncing::HeadersFirst(strategy) => strategy.block_downloader(),
822        };
823
824        block_downloader.handle_processed_blocks(results);
825    }
826
827    pub(super) fn import_pending_blocks(&mut self) {
828        let block_downloader = match &mut self.syncing {
829            Syncing::Idle => return,
830            Syncing::BlocksFirst(strategy) => strategy.block_downloader(),
831            Syncing::HeadersFirst(strategy) => strategy.block_downloader(),
832        };
833
834        if !block_downloader.has_pending_blocks() {
835            return;
836        }
837
838        let (hashes, blocks) = block_downloader.prepare_blocks_for_import(self.sync_target);
839
840        tracing::trace!(
841            blocks = ?hashes,
842            blocks_in_queue = block_downloader.blocks_in_queue_count(),
843            "Scheduling {} blocks for import",
844            blocks.len(),
845        );
846
847        self.import_queue.import_blocks(ImportBlocks {
848            origin: if self.is_major_syncing.load(Ordering::Relaxed) {
849                BlockOrigin::NetworkInitialSync
850            } else {
851                BlockOrigin::NetworkBroadcast
852            },
853            blocks,
854        });
855    }
856}
857
858/// Determines the target block number for syncing based on the provided sync target
859/// and the peer's best block number.
860///
861/// Bitcoin Core only supports snapshots at specific block heights (e.g., 840000 as of writing).
862/// To avoid syncing past a block that may have been pruned or is unavailable in pruning mode
863/// when running a snapshot node, this function ensures that we do not sync beyond a certain
864/// block height, as determined by the `sync_target` or the peer's best block height.
865fn target_block_number(sync_target: Option<u32>, peer_best: u32) -> u32 {
866    match sync_target {
867        Some(target) => peer_best.min(target),
868        None => peer_best,
869    }
870}