subcoin_network/
sync.rs

1mod block_downloader;
2mod orphan_blocks_pool;
3mod strategy;
4
5use self::strategy::{BlocksFirstStrategy, HeadersFirstStrategy};
6use crate::peer_manager::NewPeer;
7use crate::peer_store::PeerStore;
8use crate::{Error, Latency, MemoryConfig, PeerId, SyncStatus, SyncStrategy};
9use bitcoin::blockdata::block::Header as BitcoinHeader;
10use bitcoin::p2p::message_blockdata::Inventory;
11use bitcoin::{Block as BitcoinBlock, BlockHash};
12use sc_client_api::{AuxStore, HeaderBackend};
13use sc_consensus_nakamoto::{
14    BlockImportQueue, HeaderVerifier, ImportBlocks, ImportManyBlocksResult,
15};
16use serde::{Deserialize, Serialize};
17use sp_consensus::BlockOrigin;
18use sp_runtime::traits::Block as BlockT;
19use std::cmp::Ordering as CmpOrdering;
20use std::collections::HashMap;
21use std::marker::PhantomData;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, Ordering};
24use subcoin_primitives::ClientExt;
25
26// Do major sync when the current tip falls behind the network by 144 blocks (roughly one day).
27const MAJOR_SYNC_GAP: u32 = 144;
28
29/// Minimum required improvement ratio in peer score to switch to a better peer.
30/// Since lower scores indicate better peers, we switch when current_score / best_score > threshold.
31/// A value of 1.2 means the current peer must be at least 20% worse to justify switching.
32const PEER_SCORE_IMPROVEMENT_THRESHOLD: f64 = 1.2;
33
34// Define a constant for the low ping latency cutoff, in milliseconds.
35const LOW_LATENCY_CUTOFF: Latency = 20;
36
37/// Maximum number of syncing retries for a deprioritized peer.
38const MAX_STALLS: usize = 5;
39
40/// Weight for latency in peer scoring (lower is better).
41const LATENCY_WEIGHT: f64 = 0.7;
42
43/// Weight for reliability in peer scoring (higher is better).
44const RELIABILITY_WEIGHT: f64 = 0.3;
45
46/// Base reliability score for peers with no stalls.
47const BASE_RELIABILITY_SCORE: f64 = 100.0;
48
49/// The state of syncing between a Peer and ourselves.
50#[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
51#[serde(rename_all = "camelCase")]
52pub enum PeerSyncState {
53    /// Available for sync requests.
54    Available,
55    /// The peer has been deprioritized due to past syncing issues (e.g., stalling).
56    Deprioritized {
57        /// Number of times the peer has stalled.
58        stalled_count: usize,
59    },
60    /// Actively downloading new blocks, starting from the given Number.
61    DownloadingNew { start: u32 },
62}
63
64impl PeerSyncState {
65    /// Returns `true` if the peer is available for syncing.
66    pub fn is_available(&self) -> bool {
67        matches!(self, Self::Available)
68    }
69
70    fn stalled_count(&self) -> usize {
71        match self {
72            Self::Deprioritized { stalled_count } => *stalled_count,
73            _ => 0,
74        }
75    }
76
77    /// Determines if the peer is permanently deprioritized based on the stall count.
78    fn is_permanently_deprioritized(&self) -> bool {
79        match self {
80            PeerSyncState::Deprioritized { stalled_count } => *stalled_count > MAX_STALLS,
81            _ => false,
82        }
83    }
84}
85
86/// Contains all the data about a Peer that we are trying to sync with.
87#[derive(Debug, Clone, Serialize, Deserialize)]
88#[serde(rename_all = "camelCase")]
89pub struct PeerSync {
90    /// Peer id of this peer.
91    pub peer_id: PeerId,
92    /// The number of the best block that we've seen for this peer.
93    pub best_number: u32,
94    /// Latency of connection to this peer.
95    pub latency: Latency,
96    /// The state of syncing this peer is in for us, generally categories
97    /// into `Available` or "busy" with something as defined by `PeerSyncState`.
98    pub state: PeerSyncState,
99}
100
101impl PeerSync {
102    /// Calculates a peer score based on latency and reliability.
103    /// Lower scores indicate better peers.
104    pub fn peer_score(&self) -> f64 {
105        // Normalize latency (higher latency = higher score = worse)
106        let latency_score = self.latency as f64;
107
108        // Calculate reliability score (more stalls = lower reliability = higher score = worse)
109        let reliability_penalty = self.state.stalled_count() as f64 * 20.0; // Each stall adds 20 points penalty
110        let reliability_score = BASE_RELIABILITY_SCORE + reliability_penalty;
111
112        // Combine scores with weights
113        (latency_score * LATENCY_WEIGHT) + (reliability_score * RELIABILITY_WEIGHT)
114    }
115}
116
117/// Locator based sync request, for requesting either Headers or Blocks.
118#[derive(Debug, PartialEq, Eq)]
119pub struct LocatorRequest {
120    pub locator_hashes: Vec<BlockHash>,
121    pub stop_hash: BlockHash,
122    pub to: PeerId,
123}
124
125/// Represents different kinds of sync requests.
126#[derive(Debug, PartialEq, Eq)]
127pub enum SyncRequest {
128    /// Request headers via `getheaders`.
129    Header(LocatorRequest),
130    /// Request inventories via `getblocks`.
131    Inventory(LocatorRequest),
132    /// Request blocks via `getdata`.
133    Data(Vec<Inventory>, PeerId),
134}
135
136/// Represents actions that can be taken during the syncing.
137#[derive(Debug)]
138pub enum SyncAction {
139    /// Fetch headers, blocks and data.
140    Request(SyncRequest),
141    /// Transitions to a Blocks-First sync after Headers-First sync
142    /// compltes, to fetch the most recent blocks.
143    SwitchToBlocksFirstSync,
144    /// Disconnect from the peer for the given reason.
145    DisconnectPeer(PeerId, Error),
146    /// Deprioritize the specified peer, restarting the current sync
147    /// with other candidates if available.
148    RestartSyncWithStalledPeer(PeerId),
149    /// Blocks-First sync finished and sets the syncing state to idle.
150    SetIdle,
151    /// No action needed.
152    None,
153}
154
155impl SyncAction {
156    pub(crate) fn get_headers(request: LocatorRequest) -> Self {
157        Self::Request(SyncRequest::Header(request))
158    }
159
160    pub(crate) fn get_inventory(request: LocatorRequest) -> Self {
161        Self::Request(SyncRequest::Inventory(request))
162    }
163
164    pub(crate) fn get_data(inv: Vec<Inventory>, from: PeerId) -> Self {
165        Self::Request(SyncRequest::Data(inv, from))
166    }
167}
168
169#[derive(Debug)]
170pub(crate) enum RestartReason {
171    Stalled,
172    Disconnected,
173}
174
175// This enum encapsulates the various strategies and states a node
176// might be in during the sync process.
177enum Syncing<Block, Client> {
178    /// Blocks-First sync.
179    BlocksFirst(Box<BlocksFirstStrategy<Block, Client>>),
180    /// Headers-First sync.
181    HeadersFirst(Box<HeadersFirstStrategy<Block, Client>>),
182    /// Not syncing.
183    ///
184    /// This could indicate that the node is either fully synced
185    /// or is waiting for more peers to resume syncing.
186    Idle,
187}
188
189impl<Block, Client> Syncing<Block, Client> {
190    fn is_major_syncing(&self) -> bool {
191        matches!(self, Self::BlocksFirst(_) | Self::HeadersFirst(_))
192    }
193}
194
195/// The main data structure which contains all the state for syncing.
196pub(crate) struct ChainSync<Block, Client> {
197    /// Chain client.
198    client: Arc<Client>,
199    /// Block header verifier.
200    header_verifier: HeaderVerifier<Block, Client>,
201    /// The active peers that we are using to sync and their PeerSync status
202    pub(crate) peers: HashMap<PeerId, PeerSync>,
203    /// Current syncing state.
204    syncing: Syncing<Block, Client>,
205    /// Handle of the import queue.
206    pub(crate) import_queue: BlockImportQueue,
207    /// Block syncing strategy.
208    sync_strategy: SyncStrategy,
209    /// Are we in major syncing?
210    is_major_syncing: Arc<AtomicBool>,
211    /// Whether to sync blocks from Bitcoin network.
212    enable_block_sync: bool,
213    /// Handle of peer store.
214    peer_store: Arc<dyn PeerStore>,
215    /// Target block of the syncing process.
216    sync_target: Option<u32>,
217    min_sync_peer_threshold: usize,
218    /// Memory management configuration.
219    memory_config: MemoryConfig,
220    _phantom: PhantomData<Block>,
221}
222
223impl<Block, Client> ChainSync<Block, Client>
224where
225    Block: BlockT,
226    Client: HeaderBackend<Block> + AuxStore,
227{
228    /// Constructs a new instance of [`ChainSync`].
229    #[allow(clippy::too_many_arguments)]
230    pub(super) fn new(
231        client: Arc<Client>,
232        header_verifier: HeaderVerifier<Block, Client>,
233        import_queue: BlockImportQueue,
234        sync_strategy: SyncStrategy,
235        is_major_syncing: Arc<AtomicBool>,
236        enable_block_sync: bool,
237        peer_store: Arc<dyn PeerStore>,
238        sync_target: Option<u32>,
239        min_sync_peer_threshold: usize,
240        memory_config: MemoryConfig,
241    ) -> Self {
242        Self {
243            client,
244            header_verifier,
245            peers: HashMap::new(),
246            syncing: Syncing::Idle,
247            import_queue,
248            sync_strategy,
249            is_major_syncing,
250            enable_block_sync,
251            peer_store,
252            sync_target,
253            min_sync_peer_threshold,
254            memory_config,
255            _phantom: Default::default(),
256        }
257    }
258
259    pub(super) fn sync_status(&self) -> SyncStatus {
260        match &self.syncing {
261            Syncing::Idle => SyncStatus::Idle,
262            Syncing::BlocksFirst(strategy) => strategy.sync_status(),
263            Syncing::HeadersFirst(strategy) => strategy.sync_status(),
264        }
265    }
266
267    pub(super) fn is_idle(&self) -> bool {
268        matches!(self.syncing, Syncing::Idle)
269    }
270
271    pub(super) fn on_tick(&mut self) -> SyncAction {
272        match &mut self.syncing {
273            Syncing::Idle => SyncAction::None,
274            Syncing::BlocksFirst(strategy) => strategy.on_tick(),
275            Syncing::HeadersFirst(strategy) => strategy.on_tick(),
276        }
277    }
278
279    pub(super) async fn wait_for_block_import_results(&mut self) -> ImportManyBlocksResult {
280        self.import_queue.block_import_results().await
281    }
282
283    pub(super) fn unreliable_peers(&self) -> Vec<PeerId> {
284        self.peers
285            .iter()
286            .filter_map(|(peer_id, peer)| {
287                peer.state
288                    .is_permanently_deprioritized()
289                    .then_some(peer_id)
290                    .copied()
291            })
292            .collect()
293    }
294
295    /// Removes the given peer from peers of chain sync.
296    pub(super) fn disconnect(&mut self, peer_id: PeerId) {
297        if let Some(removed_peer) = self.peers.remove(&peer_id) {
298            // We currently support only one syncing peer, this logic needs to be
299            // refactored once multiple syncing peers are supported.
300            if matches!(removed_peer.state, PeerSyncState::DownloadingNew { .. }) {
301                self.restart_sync(removed_peer.peer_id, RestartReason::Disconnected);
302            }
303        }
304    }
305
306    pub(super) fn update_peer_best(&mut self, peer_id: PeerId, peer_best: u32) {
307        let mut peer_best_updated = false;
308
309        self.peers.entry(peer_id).and_modify(|e| {
310            if peer_best > e.best_number {
311                tracing::debug!(
312                    "Tip of {peer_id:?} updated from #{} to #{peer_best}",
313                    e.best_number
314                );
315                e.best_number = peer_best;
316                peer_best_updated = true;
317            }
318        });
319
320        if peer_best_updated {
321            match &mut self.syncing {
322                Syncing::Idle => {}
323                Syncing::BlocksFirst(strategy) => strategy.set_peer_best(peer_id, peer_best),
324                Syncing::HeadersFirst(strategy) => strategy.set_peer_best(peer_id, peer_best),
325            }
326        }
327    }
328
329    /// Attempt to find the best available peer using a composite scoring algorithm
330    /// that considers both latency and reliability.
331    fn select_next_peer_for_sync(
332        &mut self,
333        our_best: u32,
334        excluded_peer: PeerId,
335    ) -> Option<PeerId> {
336        // First try available peers with the new scoring algorithm
337        let best_available = self
338            .peers
339            .values()
340            .filter(|peer| {
341                peer.peer_id != excluded_peer
342                    && peer.best_number > our_best
343                    && peer.state.is_available()
344            })
345            .min_by(|a, b| {
346                let score_a = a.peer_score();
347                let score_b = b.peer_score();
348                score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
349            })
350            .map(|peer| peer.peer_id);
351
352        best_available.or_else(|| {
353            // Fall back to deprioritized peers, also using scoring
354            self.peers
355                .values()
356                .filter(|peer| {
357                    peer.peer_id != excluded_peer
358                        && peer.best_number > our_best
359                        && !peer.state.is_permanently_deprioritized()
360                })
361                .min_by(|a, b| {
362                    let score_a = a.peer_score();
363                    let score_b = b.peer_score();
364                    score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
365                })
366                .map(|peer| peer.peer_id)
367        })
368    }
369
370    /// Attempts to restart the sync based on the reason provided.
371    ///
372    /// Returns `true` if the sync is restarted with a new peer.
373    pub(super) fn restart_sync(&mut self, prior_peer_id: PeerId, reason: RestartReason) {
374        let our_best = self.client.best_number();
375
376        let Some(new_peer_id) = self.select_next_peer_for_sync(our_best, prior_peer_id) else {
377            if let Some(median_seen_block) = self.median_seen()
378                && median_seen_block <= our_best
379            {
380                let best_seen_block = self.peers.values().map(|p| p.best_number).max();
381
382                // We are synced to the median block seen by our peers, but this may
383                // not be the network's tip.
384                //
385                // Transition to idle unless more blocks are announced.
386                tracing::debug!(
387                    best_seen_block,
388                    median_seen_block,
389                    our_best,
390                    "Synced to the majority of peers, switching to Idle"
391                );
392                self.update_syncing_state(Syncing::Idle);
393                return;
394            }
395
396            // No new sync candidate, keep it as is.
397            // TODO: handle this properly.
398            tracing::debug!(
399                ?prior_peer_id,
400                "⚠️ Attempting to restart sync, but no new sync candidate available"
401            );
402
403            return;
404        };
405
406        {
407            let Some(new_peer) = self.peers.get_mut(&new_peer_id) else {
408                tracing::error!("Corrupted state, next peer {new_peer_id} missing from peer list");
409                return;
410            };
411
412            tracing::debug!(?reason, prior_peer = ?prior_peer_id, ?new_peer, "🔄 Sync restarted");
413            new_peer.state = PeerSyncState::DownloadingNew { start: our_best };
414
415            let target_block_number = target_block_number(self.sync_target, new_peer.best_number);
416
417            match &mut self.syncing {
418                Syncing::BlocksFirst(strategy) => {
419                    strategy.restart(new_peer.peer_id, target_block_number);
420                }
421                Syncing::HeadersFirst(strategy) => {
422                    strategy.restart(new_peer.peer_id, target_block_number);
423                }
424                Syncing::Idle => {}
425            }
426        }
427
428        match reason {
429            RestartReason::Stalled => {
430                self.peers.entry(prior_peer_id).and_modify(|p| {
431                    let current_stalled_count = p.state.stalled_count();
432                    p.state = PeerSyncState::Deprioritized {
433                        stalled_count: current_stalled_count + 1,
434                    };
435                });
436            }
437            RestartReason::Disconnected => {
438                // Nothing to be done, peer is already removed from the peer list.
439            }
440        }
441    }
442
443    /// Returns the median block number advertised by our peers.
444    fn median_seen(&self) -> Option<u32> {
445        let mut best_seens = self
446            .peers
447            .values()
448            .map(|p| p.best_number)
449            .collect::<Vec<_>>();
450
451        if best_seens.is_empty() {
452            None
453        } else {
454            let middle = best_seens.len() / 2;
455
456            Some(*best_seens.select_nth_unstable(middle).1)
457        }
458    }
459
460    pub(super) fn update_peer_latency(&mut self, peer_id: PeerId, avg_latency: Latency) {
461        self.peers.entry(peer_id).and_modify(|peer| {
462            peer.latency = avg_latency;
463        });
464        self.update_sync_peer_on_lower_latency();
465    }
466
467    pub(super) fn update_sync_peer_on_lower_latency(&mut self) {
468        let maybe_sync_peer_id = match &self.syncing {
469            Syncing::Idle => return,
470            Syncing::BlocksFirst(strategy) => strategy.can_swap_sync_peer(),
471            Syncing::HeadersFirst(strategy) => strategy.can_swap_sync_peer(),
472        };
473
474        let Some(current_sync_peer_id) = maybe_sync_peer_id else {
475            return;
476        };
477
478        let Some(current_latency) = self
479            .peers
480            .get(&current_sync_peer_id)
481            .map(|peer| peer.latency)
482        else {
483            return;
484        };
485
486        if current_latency <= LOW_LATENCY_CUTOFF {
487            tracing::trace!(
488                peer_id = ?current_sync_peer_id,
489                "Skipping sync peer update as the current latency ({current_latency}ms) is already low enough"
490            );
491        }
492
493        let our_best = self.client.best_number();
494
495        // Find the peer with best score (considering both latency and reliability).
496        let Some(best_sync_peer) = self
497            .peers
498            .values()
499            .filter(|peer| {
500                peer.peer_id != current_sync_peer_id
501                    && peer.best_number > our_best
502                    && peer.state.is_available()
503            })
504            .min_by(|a, b| {
505                let score_a = a.peer_score();
506                let score_b = b.peer_score();
507                score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
508            })
509        else {
510            return;
511        };
512
513        let Some(current_peer) = self.peers.get(&current_sync_peer_id) else {
514            tracing::debug!(
515                peer_id = ?current_sync_peer_id,
516                "Current sync peer not found in peer list, skipping peer update"
517            );
518            return;
519        };
520
521        let current_score = current_peer.peer_score();
522        let best_score = best_sync_peer.peer_score();
523
524        // Update sync peer if the score improvement is significant (lower score is better).
525        // Protect against division by zero and ensure best_score is positive.
526        if best_score > 0.0 && current_score / best_score > PEER_SCORE_IMPROVEMENT_THRESHOLD {
527            let peer_id = best_sync_peer.peer_id;
528            let target_block_number =
529                target_block_number(self.sync_target, best_sync_peer.best_number);
530
531            let sync_peer_updated = match &mut self.syncing {
532                Syncing::BlocksFirst(strategy) => {
533                    strategy.swap_sync_peer(peer_id, target_block_number);
534                    true
535                }
536                Syncing::HeadersFirst(strategy) => {
537                    strategy.swap_sync_peer(peer_id, target_block_number);
538                    true
539                }
540                Syncing::Idle => unreachable!("Must not be Idle as checked; qed"),
541            };
542
543            if sync_peer_updated {
544                tracing::debug!(
545                    old_peer_id = ?current_sync_peer_id,
546                    new_peer_id = ?peer_id,
547                    old_score = %current_score,
548                    new_score = %best_score,
549                    "🔧 Sync peer updated to a peer with better score (latency: {} ms -> {} ms)",
550                    current_peer.latency,
551                    best_sync_peer.latency,
552                );
553                self.peers.entry(current_sync_peer_id).and_modify(|peer| {
554                    peer.state = PeerSyncState::Available;
555                });
556                self.peers.entry(peer_id).and_modify(|peer| {
557                    peer.state = PeerSyncState::DownloadingNew { start: our_best };
558                });
559            }
560        }
561    }
562
563    /// Add new peer to the chain sync component and potentially starts to synchronize the network.
564    pub(super) fn add_new_peer(&mut self, new_peer: NewPeer) -> SyncAction {
565        let NewPeer {
566            peer_id,
567            best_number,
568            latency,
569        } = new_peer;
570
571        let new_peer = PeerSync {
572            peer_id,
573            best_number,
574            latency,
575            state: PeerSyncState::Available,
576        };
577
578        self.peers.insert(peer_id, new_peer);
579
580        if self.enable_block_sync {
581            self.attempt_sync_start()
582        } else {
583            SyncAction::None
584        }
585    }
586
587    pub(super) fn start_block_sync(&mut self) -> SyncAction {
588        self.enable_block_sync = true;
589        self.attempt_sync_start()
590    }
591
592    fn attempt_sync_start(&mut self) -> SyncAction {
593        if self.syncing.is_major_syncing() {
594            return SyncAction::None;
595        }
596
597        if self.peers.len() < self.min_sync_peer_threshold {
598            tracing::debug!(
599                "Waiting for more sync peers, discovered {} peers, require {} peers",
600                self.peers.len(),
601                self.min_sync_peer_threshold
602            );
603            return SyncAction::None;
604        }
605
606        let our_best = self.client.best_number();
607
608        if let Some(sync_target) = self.sync_target
609            && our_best >= sync_target
610        {
611            return SyncAction::None;
612        }
613
614        let find_best_available_peer = || {
615            self.peers
616                .iter()
617                .filter(|(_peer_id, peer)| peer.best_number > our_best && peer.state.is_available())
618                .min_by(|(_, a), (_, b)| {
619                    let score_a = a.peer_score();
620                    let score_b = b.peer_score();
621                    score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
622                })
623                .map(|(peer_id, _peer)| peer_id)
624        };
625
626        let find_best_deprioritized_peer = || {
627            self.peers
628                .iter()
629                .filter(|(_peer_id, peer)| peer.best_number > our_best)
630                .filter_map(|(peer_id, peer)| {
631                    if let PeerSyncState::Deprioritized { stalled_count } = peer.state {
632                        if stalled_count > MAX_STALLS {
633                            None
634                        } else {
635                            Some((peer_id, stalled_count, peer.latency))
636                        }
637                    } else {
638                        None
639                    }
640                })
641                .min_by(
642                    |(_, stalled_count_a, latency_a), (_, stalled_count_b, latency_b)| {
643                        // First, compare stalled_count, then latency if stalled_count is equal
644                        match stalled_count_a.cmp(stalled_count_b) {
645                            CmpOrdering::Equal => latency_a.cmp(latency_b), // compare latency if stalled_count is the same
646                            other => other, // otherwise, return the comparison of stalled_count
647                        }
648                    },
649                )
650                .map(|(peer_id, _stalled_count, _latency)| peer_id)
651        };
652
653        let Some(next_peer_id) = find_best_available_peer()
654            .or_else(find_best_deprioritized_peer)
655            .copied()
656        else {
657            return SyncAction::None;
658        };
659
660        let Some(next_peer) = self.peers.get_mut(&next_peer_id) else {
661            return SyncAction::None;
662        };
663
664        let client = self.client.clone();
665        let peer_store = self.peer_store.clone();
666
667        let peer_best = next_peer.best_number;
668        let require_major_sync = peer_best - our_best > MAJOR_SYNC_GAP;
669
670        let target_block_number = target_block_number(self.sync_target, peer_best);
671
672        // Start major syncing if the gap is significant.
673        let (new_syncing, sync_action) = if require_major_sync {
674            let blocks_first = our_best >= crate::checkpoint::last_checkpoint_height()
675                || matches!(self.sync_strategy, SyncStrategy::BlocksFirst);
676
677            tracing::debug!(
678                latency = ?next_peer.latency,
679                "⏩ Starting major sync ({}) from {next_peer_id:?} at #{our_best}",
680                if blocks_first { "blocks-first" } else { "headers-first" }
681            );
682
683            if blocks_first {
684                let (sync_strategy, sync_action) = BlocksFirstStrategy::new(
685                    client,
686                    next_peer_id,
687                    target_block_number,
688                    peer_store,
689                    self.memory_config.clone(),
690                );
691                (Syncing::BlocksFirst(Box::new(sync_strategy)), sync_action)
692            } else {
693                let (sync_strategy, sync_action) = HeadersFirstStrategy::new(
694                    client,
695                    self.header_verifier.clone(),
696                    next_peer_id,
697                    target_block_number,
698                    peer_store,
699                    self.memory_config.clone(),
700                );
701                (Syncing::HeadersFirst(Box::new(sync_strategy)), sync_action)
702            }
703        } else {
704            let (sync_strategy, sync_action) = BlocksFirstStrategy::new(
705                client,
706                next_peer_id,
707                target_block_number,
708                peer_store,
709                self.memory_config.clone(),
710            );
711            (Syncing::BlocksFirst(Box::new(sync_strategy)), sync_action)
712        };
713
714        next_peer.state = PeerSyncState::DownloadingNew { start: our_best };
715        self.update_syncing_state(new_syncing);
716
717        sync_action
718    }
719
720    pub(super) fn start_blocks_first_sync(&mut self) -> Option<SyncAction> {
721        if matches!(self.syncing, Syncing::BlocksFirst(_)) {
722            return None;
723        }
724
725        // Import the potential remaining blocks downloaded by Headers-First sync.
726        self.import_pending_blocks();
727
728        let our_best = self.client.best_number();
729
730        let Some(best_peer) = self
731            .peers
732            .values()
733            .filter(|peer| peer.best_number > our_best && peer.state.is_available())
734            .min_by(|a, b| {
735                let score_a = a.peer_score();
736                let score_b = b.peer_score();
737                score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
738            })
739        else {
740            self.update_syncing_state(Syncing::Idle);
741            return None;
742        };
743
744        let (blocks_first_strategy, sync_action) = BlocksFirstStrategy::new(
745            self.client.clone(),
746            best_peer.peer_id,
747            best_peer.best_number,
748            self.peer_store.clone(),
749            self.memory_config.clone(),
750        );
751
752        tracing::debug!("Headers-First sync completed, continuing with blocks-first sync");
753        self.update_syncing_state(Syncing::BlocksFirst(Box::new(blocks_first_strategy)));
754
755        Some(sync_action)
756    }
757
758    fn update_syncing_state(&mut self, new: Syncing<Block, Client>) {
759        self.syncing = new;
760        self.is_major_syncing
761            .store(self.syncing.is_major_syncing(), Ordering::Relaxed);
762    }
763
764    pub(super) fn set_idle(&mut self) {
765        self.import_pending_blocks();
766
767        tracing::debug!(
768            best_number = self.client.best_number(),
769            "Blocks-First sync completed, switching to Syncing::Idle"
770        );
771        self.update_syncing_state(Syncing::Idle);
772    }
773
774    // NOTE: `inv` can be received unsolicited as an announcement of a new block,
775    // or in reply to `getblocks`.
776    pub(super) fn on_inv(&mut self, inventories: Vec<Inventory>, from: PeerId) -> SyncAction {
777        match &mut self.syncing {
778            Syncing::Idle => SyncAction::None,
779            Syncing::BlocksFirst(strategy) => strategy.on_inv(inventories, from),
780            Syncing::HeadersFirst(_) => SyncAction::None,
781        }
782    }
783
784    pub(super) fn on_block(&mut self, block: BitcoinBlock, from: PeerId) -> SyncAction {
785        match &mut self.syncing {
786            Syncing::Idle => SyncAction::None,
787            Syncing::BlocksFirst(strategy) => strategy.on_block(block, from),
788            Syncing::HeadersFirst(strategy) => strategy.on_block(block, from),
789        }
790    }
791
792    pub(super) fn on_headers(&mut self, headers: Vec<BitcoinHeader>, from: PeerId) -> SyncAction {
793        match &mut self.syncing {
794            Syncing::HeadersFirst(strategy) => strategy.on_headers(headers, from),
795            Syncing::BlocksFirst(_) | Syncing::Idle => {
796                tracing::debug!(
797                    ?from,
798                    "Ignored headers: {:?}",
799                    headers
800                        .iter()
801                        .map(|header| header.block_hash())
802                        .collect::<Vec<_>>()
803                );
804                SyncAction::None
805            }
806        }
807    }
808
809    pub(super) fn on_blocks_processed(&mut self, results: ImportManyBlocksResult) {
810        let block_downloader = match &mut self.syncing {
811            Syncing::Idle => return,
812            Syncing::BlocksFirst(strategy) => strategy.block_downloader(),
813            Syncing::HeadersFirst(strategy) => strategy.block_downloader(),
814        };
815        block_downloader.handle_processed_blocks(results);
816    }
817
818    pub(super) fn import_pending_blocks(&mut self) {
819        let block_downloader = match &mut self.syncing {
820            Syncing::Idle => return,
821            Syncing::BlocksFirst(strategy) => strategy.block_downloader(),
822            Syncing::HeadersFirst(strategy) => strategy.block_downloader(),
823        };
824
825        if !block_downloader.has_pending_blocks() {
826            return;
827        }
828
829        let (hashes, blocks) = block_downloader.prepare_blocks_for_import(self.sync_target);
830
831        tracing::trace!(
832            blocks = ?hashes,
833            blocks_in_queue = block_downloader.blocks_in_queue_count(),
834            "Scheduling {} blocks for import",
835            blocks.len(),
836        );
837
838        self.import_queue.import_blocks(ImportBlocks {
839            origin: if self.is_major_syncing.load(Ordering::Relaxed) {
840                BlockOrigin::NetworkInitialSync
841            } else {
842                BlockOrigin::NetworkBroadcast
843            },
844            blocks,
845        });
846    }
847}
848
849/// Determines the target block number for syncing based on the provided sync target
850/// and the peer's best block number.
851///
852/// Bitcoin Core only supports snapshots at specific block heights (e.g., 840000 as of writing).
853/// To avoid syncing past a block that may have been pruned or is unavailable in pruning mode
854/// when running a snapshot node, this function ensures that we do not sync beyond a certain
855/// block height, as determined by the `sync_target` or the peer's best block height.
856fn target_block_number(sync_target: Option<u32>, peer_best: u32) -> u32 {
857    match sync_target {
858        Some(target) => peer_best.min(target),
859        None => peer_best,
860    }
861}