1mod block_downloader;
2mod orphan_blocks_pool;
3mod strategy;
4
5use self::strategy::{BlocksFirstStrategy, HeadersFirstStrategy};
6use crate::peer_manager::NewPeer;
7use crate::peer_store::PeerStore;
8use crate::{Error, Latency, MemoryConfig, PeerId, SyncStatus, SyncStrategy};
9use bitcoin::blockdata::block::Header as BitcoinHeader;
10use bitcoin::p2p::message_blockdata::Inventory;
11use bitcoin::{Block as BitcoinBlock, BlockHash};
12use sc_client_api::{AuxStore, HeaderBackend};
13use sc_consensus_nakamoto::{
14 BlockImportQueue, HeaderVerifier, ImportBlocks, ImportManyBlocksResult,
15};
16use serde::{Deserialize, Serialize};
17use sp_consensus::BlockOrigin;
18use sp_runtime::traits::Block as BlockT;
19use std::cmp::Ordering as CmpOrdering;
20use std::collections::HashMap;
21use std::marker::PhantomData;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, Ordering};
24use subcoin_primitives::ClientExt;
25
26const MAJOR_SYNC_GAP: u32 = 144;
28
29const PEER_SCORE_IMPROVEMENT_THRESHOLD: f64 = 1.2;
33
34const LOW_LATENCY_CUTOFF: Latency = 20;
36
37const MAX_STALLS: usize = 5;
39
40const LATENCY_WEIGHT: f64 = 0.7;
42
43const RELIABILITY_WEIGHT: f64 = 0.3;
45
46const BASE_RELIABILITY_SCORE: f64 = 100.0;
48
49#[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
51#[serde(rename_all = "camelCase")]
52pub enum PeerSyncState {
53 Available,
55 Deprioritized {
57 stalled_count: usize,
59 },
60 DownloadingNew { start: u32 },
62}
63
64impl PeerSyncState {
65 pub fn is_available(&self) -> bool {
67 matches!(self, Self::Available)
68 }
69
70 fn stalled_count(&self) -> usize {
71 match self {
72 Self::Deprioritized { stalled_count } => *stalled_count,
73 _ => 0,
74 }
75 }
76
77 fn is_permanently_deprioritized(&self) -> bool {
79 match self {
80 PeerSyncState::Deprioritized { stalled_count } => *stalled_count > MAX_STALLS,
81 _ => false,
82 }
83 }
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88#[serde(rename_all = "camelCase")]
89pub struct PeerSync {
90 pub peer_id: PeerId,
92 pub best_number: u32,
94 pub latency: Latency,
96 pub state: PeerSyncState,
99}
100
101impl PeerSync {
102 pub fn peer_score(&self) -> f64 {
105 let latency_score = self.latency as f64;
107
108 let reliability_penalty = self.state.stalled_count() as f64 * 20.0; let reliability_score = BASE_RELIABILITY_SCORE + reliability_penalty;
111
112 (latency_score * LATENCY_WEIGHT) + (reliability_score * RELIABILITY_WEIGHT)
114 }
115}
116
117#[derive(Debug, PartialEq, Eq)]
119pub struct LocatorRequest {
120 pub locator_hashes: Vec<BlockHash>,
121 pub stop_hash: BlockHash,
122 pub to: PeerId,
123}
124
125#[derive(Debug, PartialEq, Eq)]
127pub enum SyncRequest {
128 Header(LocatorRequest),
130 Inventory(LocatorRequest),
132 Data(Vec<Inventory>, PeerId),
134}
135
136#[derive(Debug)]
138pub enum SyncAction {
139 Request(SyncRequest),
141 SwitchToBlocksFirstSync,
144 DisconnectPeer(PeerId, Error),
146 RestartSyncWithStalledPeer(PeerId),
149 SetIdle,
151 None,
153}
154
155impl SyncAction {
156 pub(crate) fn get_headers(request: LocatorRequest) -> Self {
157 Self::Request(SyncRequest::Header(request))
158 }
159
160 pub(crate) fn get_inventory(request: LocatorRequest) -> Self {
161 Self::Request(SyncRequest::Inventory(request))
162 }
163
164 pub(crate) fn get_data(inv: Vec<Inventory>, from: PeerId) -> Self {
165 Self::Request(SyncRequest::Data(inv, from))
166 }
167}
168
169#[derive(Debug)]
170pub(crate) enum RestartReason {
171 Stalled,
172 Disconnected,
173}
174
175enum Syncing<Block, Client> {
178 BlocksFirst(Box<BlocksFirstStrategy<Block, Client>>),
180 HeadersFirst(Box<HeadersFirstStrategy<Block, Client>>),
182 Idle,
187}
188
189impl<Block, Client> Syncing<Block, Client> {
190 fn is_major_syncing(&self) -> bool {
191 matches!(self, Self::BlocksFirst(_) | Self::HeadersFirst(_))
192 }
193}
194
195pub(crate) struct ChainSync<Block, Client> {
197 client: Arc<Client>,
199 header_verifier: HeaderVerifier<Block, Client>,
201 pub(crate) peers: HashMap<PeerId, PeerSync>,
203 syncing: Syncing<Block, Client>,
205 pub(crate) import_queue: BlockImportQueue,
207 sync_strategy: SyncStrategy,
209 is_major_syncing: Arc<AtomicBool>,
211 enable_block_sync: bool,
213 peer_store: Arc<dyn PeerStore>,
215 sync_target: Option<u32>,
217 min_sync_peer_threshold: usize,
218 memory_config: MemoryConfig,
220 _phantom: PhantomData<Block>,
221}
222
223impl<Block, Client> ChainSync<Block, Client>
224where
225 Block: BlockT,
226 Client: HeaderBackend<Block> + AuxStore,
227{
228 #[allow(clippy::too_many_arguments)]
230 pub(super) fn new(
231 client: Arc<Client>,
232 header_verifier: HeaderVerifier<Block, Client>,
233 import_queue: BlockImportQueue,
234 sync_strategy: SyncStrategy,
235 is_major_syncing: Arc<AtomicBool>,
236 enable_block_sync: bool,
237 peer_store: Arc<dyn PeerStore>,
238 sync_target: Option<u32>,
239 min_sync_peer_threshold: usize,
240 memory_config: MemoryConfig,
241 ) -> Self {
242 Self {
243 client,
244 header_verifier,
245 peers: HashMap::new(),
246 syncing: Syncing::Idle,
247 import_queue,
248 sync_strategy,
249 is_major_syncing,
250 enable_block_sync,
251 peer_store,
252 sync_target,
253 min_sync_peer_threshold,
254 memory_config,
255 _phantom: Default::default(),
256 }
257 }
258
259 pub(super) fn sync_status(&self) -> SyncStatus {
260 match &self.syncing {
261 Syncing::Idle => SyncStatus::Idle,
262 Syncing::BlocksFirst(strategy) => strategy.sync_status(),
263 Syncing::HeadersFirst(strategy) => strategy.sync_status(),
264 }
265 }
266
267 pub(super) fn is_idle(&self) -> bool {
268 matches!(self.syncing, Syncing::Idle)
269 }
270
271 pub(super) fn is_active_sync_peer(&self, peer_id: PeerId) -> bool {
273 self.peers
274 .get(&peer_id)
275 .map(|peer| matches!(peer.state, PeerSyncState::DownloadingNew { .. }))
276 .unwrap_or(false)
277 }
278
279 pub(super) fn on_tick(&mut self) -> SyncAction {
280 match &mut self.syncing {
281 Syncing::Idle => SyncAction::None,
282 Syncing::BlocksFirst(strategy) => strategy.on_tick(),
283 Syncing::HeadersFirst(strategy) => strategy.on_tick(),
284 }
285 }
286
287 pub(super) async fn wait_for_block_import_results(&mut self) -> ImportManyBlocksResult {
288 self.import_queue.block_import_results().await
289 }
290
291 pub(super) fn unreliable_peers(&self) -> Vec<PeerId> {
292 self.peers
293 .iter()
294 .filter_map(|(peer_id, peer)| {
295 peer.state
296 .is_permanently_deprioritized()
297 .then_some(peer_id)
298 .copied()
299 })
300 .collect()
301 }
302
303 pub(super) fn disconnect(&mut self, peer_id: PeerId) {
305 if let Some(removed_peer) = self.peers.remove(&peer_id) {
306 if matches!(removed_peer.state, PeerSyncState::DownloadingNew { .. }) {
309 self.restart_sync(removed_peer.peer_id, RestartReason::Disconnected);
310 }
311 }
312 }
313
314 pub(super) fn update_peer_best(&mut self, peer_id: PeerId, peer_best: u32) {
315 let mut peer_best_updated = false;
316
317 self.peers.entry(peer_id).and_modify(|e| {
318 if peer_best > e.best_number {
319 tracing::debug!(
320 "Tip of {peer_id:?} updated from #{} to #{peer_best}",
321 e.best_number
322 );
323 e.best_number = peer_best;
324 peer_best_updated = true;
325 }
326 });
327
328 if peer_best_updated {
329 match &mut self.syncing {
330 Syncing::Idle => {}
331 Syncing::BlocksFirst(strategy) => strategy.set_peer_best(peer_id, peer_best),
332 Syncing::HeadersFirst(strategy) => strategy.set_peer_best(peer_id, peer_best),
333 }
334 }
335 }
336
337 fn select_next_peer_for_sync(
340 &mut self,
341 our_best: u32,
342 excluded_peer: PeerId,
343 ) -> Option<PeerId> {
344 let best_available = self
346 .peers
347 .values()
348 .filter(|peer| {
349 peer.peer_id != excluded_peer
350 && peer.best_number > our_best
351 && peer.state.is_available()
352 })
353 .min_by(|a, b| {
354 let score_a = a.peer_score();
355 let score_b = b.peer_score();
356 score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
357 })
358 .map(|peer| peer.peer_id);
359
360 best_available.or_else(|| {
361 self.peers
363 .values()
364 .filter(|peer| {
365 peer.peer_id != excluded_peer
366 && peer.best_number > our_best
367 && !peer.state.is_permanently_deprioritized()
368 })
369 .min_by(|a, b| {
370 let score_a = a.peer_score();
371 let score_b = b.peer_score();
372 score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
373 })
374 .map(|peer| peer.peer_id)
375 })
376 }
377
378 pub(super) fn restart_sync(&mut self, prior_peer_id: PeerId, reason: RestartReason) {
382 let our_best = self.client.best_number();
383
384 let Some(new_peer_id) = self.select_next_peer_for_sync(our_best, prior_peer_id) else {
385 if let Some(median_seen_block) = self.median_seen()
386 && median_seen_block <= our_best
387 {
388 let best_seen_block = self.peers.values().map(|p| p.best_number).max();
389
390 tracing::debug!(
395 best_seen_block,
396 median_seen_block,
397 our_best,
398 "Synced to the majority of peers, switching to Idle"
399 );
400 self.update_syncing_state(Syncing::Idle);
401 return;
402 }
403
404 tracing::debug!(
407 ?prior_peer_id,
408 "⚠️ Attempting to restart sync, but no new sync candidate available"
409 );
410
411 return;
412 };
413
414 {
415 let Some(new_peer) = self.peers.get_mut(&new_peer_id) else {
416 tracing::error!("Corrupted state, next peer {new_peer_id} missing from peer list");
417 return;
418 };
419
420 tracing::debug!(?reason, prior_peer = ?prior_peer_id, ?new_peer, "🔄 Sync restarted");
421 new_peer.state = PeerSyncState::DownloadingNew { start: our_best };
422
423 let target_block_number = target_block_number(self.sync_target, new_peer.best_number);
424
425 match &mut self.syncing {
426 Syncing::BlocksFirst(strategy) => {
427 strategy.restart(new_peer.peer_id, target_block_number);
428 }
429 Syncing::HeadersFirst(strategy) => {
430 strategy.restart(new_peer.peer_id, target_block_number);
431 }
432 Syncing::Idle => {}
433 }
434 }
435
436 match reason {
437 RestartReason::Stalled => {
438 self.peers.entry(prior_peer_id).and_modify(|p| {
439 let current_stalled_count = p.state.stalled_count();
440 p.state = PeerSyncState::Deprioritized {
441 stalled_count: current_stalled_count + 1,
442 };
443 });
444 }
445 RestartReason::Disconnected => {
446 }
448 }
449 }
450
451 fn median_seen(&self) -> Option<u32> {
453 let mut best_seens = self
454 .peers
455 .values()
456 .map(|p| p.best_number)
457 .collect::<Vec<_>>();
458
459 if best_seens.is_empty() {
460 None
461 } else {
462 let middle = best_seens.len() / 2;
463
464 Some(*best_seens.select_nth_unstable(middle).1)
465 }
466 }
467
468 pub(super) fn update_peer_latency(&mut self, peer_id: PeerId, avg_latency: Latency) {
469 self.peers.entry(peer_id).and_modify(|peer| {
470 peer.latency = avg_latency;
471 });
472 self.update_sync_peer_on_lower_latency();
473 }
474
475 pub(super) fn update_sync_peer_on_lower_latency(&mut self) {
476 let maybe_sync_peer_id = match &self.syncing {
477 Syncing::Idle => return,
478 Syncing::BlocksFirst(strategy) => strategy.can_swap_sync_peer(),
479 Syncing::HeadersFirst(strategy) => strategy.can_swap_sync_peer(),
480 };
481
482 let Some(current_sync_peer_id) = maybe_sync_peer_id else {
483 return;
484 };
485
486 let Some(current_latency) = self
487 .peers
488 .get(¤t_sync_peer_id)
489 .map(|peer| peer.latency)
490 else {
491 return;
492 };
493
494 if current_latency <= LOW_LATENCY_CUTOFF {
495 tracing::trace!(
496 peer_id = ?current_sync_peer_id,
497 "Skipping sync peer update as the current latency ({current_latency}ms) is already low enough"
498 );
499 }
500
501 let our_best = self.client.best_number();
502
503 let Some(best_sync_peer) = self
505 .peers
506 .values()
507 .filter(|peer| {
508 peer.peer_id != current_sync_peer_id
509 && peer.best_number > our_best
510 && peer.state.is_available()
511 })
512 .min_by(|a, b| {
513 let score_a = a.peer_score();
514 let score_b = b.peer_score();
515 score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
516 })
517 else {
518 return;
519 };
520
521 let Some(current_peer) = self.peers.get(¤t_sync_peer_id) else {
522 tracing::debug!(
523 peer_id = ?current_sync_peer_id,
524 "Current sync peer not found in peer list, skipping peer update"
525 );
526 return;
527 };
528
529 let current_score = current_peer.peer_score();
530 let best_score = best_sync_peer.peer_score();
531
532 if best_score > 0.0 && current_score / best_score > PEER_SCORE_IMPROVEMENT_THRESHOLD {
535 let peer_id = best_sync_peer.peer_id;
536 let target_block_number =
537 target_block_number(self.sync_target, best_sync_peer.best_number);
538
539 let sync_peer_updated = match &mut self.syncing {
540 Syncing::BlocksFirst(strategy) => {
541 strategy.swap_sync_peer(peer_id, target_block_number);
542 true
543 }
544 Syncing::HeadersFirst(strategy) => {
545 strategy.swap_sync_peer(peer_id, target_block_number);
546 true
547 }
548 Syncing::Idle => unreachable!("Must not be Idle as checked; qed"),
549 };
550
551 if sync_peer_updated {
552 tracing::debug!(
553 old_peer_id = ?current_sync_peer_id,
554 new_peer_id = ?peer_id,
555 old_score = %current_score,
556 new_score = %best_score,
557 "🔧 Sync peer updated to a peer with better score (latency: {} ms -> {} ms)",
558 current_peer.latency,
559 best_sync_peer.latency,
560 );
561 self.peers.entry(current_sync_peer_id).and_modify(|peer| {
562 peer.state = PeerSyncState::Available;
563 });
564 self.peers.entry(peer_id).and_modify(|peer| {
565 peer.state = PeerSyncState::DownloadingNew { start: our_best };
566 });
567 }
568 }
569 }
570
571 pub(super) fn add_new_peer(&mut self, new_peer: NewPeer) -> SyncAction {
573 let NewPeer {
574 peer_id,
575 best_number,
576 latency,
577 } = new_peer;
578
579 let new_peer = PeerSync {
580 peer_id,
581 best_number,
582 latency,
583 state: PeerSyncState::Available,
584 };
585
586 self.peers.insert(peer_id, new_peer);
587
588 if self.enable_block_sync {
589 self.attempt_sync_start()
590 } else {
591 SyncAction::None
592 }
593 }
594
595 pub(super) fn start_block_sync(&mut self) -> SyncAction {
596 self.enable_block_sync = true;
597 self.attempt_sync_start()
598 }
599
600 fn attempt_sync_start(&mut self) -> SyncAction {
601 if self.syncing.is_major_syncing() {
602 return SyncAction::None;
603 }
604
605 if self.peers.len() < self.min_sync_peer_threshold {
606 tracing::debug!(
607 "Waiting for more sync peers, discovered {} peers, require {} peers",
608 self.peers.len(),
609 self.min_sync_peer_threshold
610 );
611 return SyncAction::None;
612 }
613
614 let our_best = self.client.best_number();
615
616 if let Some(sync_target) = self.sync_target
617 && our_best >= sync_target
618 {
619 return SyncAction::None;
620 }
621
622 let find_best_available_peer = || {
623 self.peers
624 .iter()
625 .filter(|(_peer_id, peer)| peer.best_number > our_best && peer.state.is_available())
626 .min_by(|(_, a), (_, b)| {
627 let score_a = a.peer_score();
628 let score_b = b.peer_score();
629 score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
630 })
631 .map(|(peer_id, _peer)| peer_id)
632 };
633
634 let find_best_deprioritized_peer = || {
635 self.peers
636 .iter()
637 .filter(|(_peer_id, peer)| peer.best_number > our_best)
638 .filter_map(|(peer_id, peer)| {
639 if let PeerSyncState::Deprioritized { stalled_count } = peer.state {
640 if stalled_count > MAX_STALLS {
641 None
642 } else {
643 Some((peer_id, stalled_count, peer.latency))
644 }
645 } else {
646 None
647 }
648 })
649 .min_by(
650 |(_, stalled_count_a, latency_a), (_, stalled_count_b, latency_b)| {
651 match stalled_count_a.cmp(stalled_count_b) {
653 CmpOrdering::Equal => latency_a.cmp(latency_b), other => other, }
656 },
657 )
658 .map(|(peer_id, _stalled_count, _latency)| peer_id)
659 };
660
661 let Some(next_peer_id) = find_best_available_peer()
662 .or_else(find_best_deprioritized_peer)
663 .copied()
664 else {
665 return SyncAction::None;
666 };
667
668 let Some(next_peer) = self.peers.get_mut(&next_peer_id) else {
669 return SyncAction::None;
670 };
671
672 let client = self.client.clone();
673 let peer_store = self.peer_store.clone();
674
675 let peer_best = next_peer.best_number;
676 let require_major_sync = peer_best - our_best > MAJOR_SYNC_GAP;
677
678 let target_block_number = target_block_number(self.sync_target, peer_best);
679
680 let (new_syncing, sync_action) = if require_major_sync {
682 let blocks_first = our_best >= crate::checkpoint::last_checkpoint_height()
683 || matches!(self.sync_strategy, SyncStrategy::BlocksFirst);
684
685 tracing::debug!(
686 latency = ?next_peer.latency,
687 "⏩ Starting major sync ({}) from {next_peer_id:?} at #{our_best}",
688 if blocks_first { "blocks-first" } else { "headers-first" }
689 );
690
691 if blocks_first {
692 let (sync_strategy, sync_action) = BlocksFirstStrategy::new(
693 client,
694 next_peer_id,
695 target_block_number,
696 peer_store,
697 self.memory_config.clone(),
698 );
699 (Syncing::BlocksFirst(Box::new(sync_strategy)), sync_action)
700 } else {
701 let (sync_strategy, sync_action) = HeadersFirstStrategy::new(
702 client,
703 self.header_verifier.clone(),
704 next_peer_id,
705 target_block_number,
706 peer_store,
707 self.memory_config.clone(),
708 );
709 (Syncing::HeadersFirst(Box::new(sync_strategy)), sync_action)
710 }
711 } else {
712 let (sync_strategy, sync_action) = BlocksFirstStrategy::new(
713 client,
714 next_peer_id,
715 target_block_number,
716 peer_store,
717 self.memory_config.clone(),
718 );
719 (Syncing::BlocksFirst(Box::new(sync_strategy)), sync_action)
720 };
721
722 next_peer.state = PeerSyncState::DownloadingNew { start: our_best };
723 self.update_syncing_state(new_syncing);
724
725 sync_action
726 }
727
728 pub(super) fn start_blocks_first_sync(&mut self) -> Option<SyncAction> {
729 if matches!(self.syncing, Syncing::BlocksFirst(_)) {
730 return None;
731 }
732
733 self.import_pending_blocks();
735
736 let our_best = self.client.best_number();
737
738 let Some(best_peer) = self
739 .peers
740 .values()
741 .filter(|peer| peer.best_number > our_best && peer.state.is_available())
742 .min_by(|a, b| {
743 let score_a = a.peer_score();
744 let score_b = b.peer_score();
745 score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
746 })
747 else {
748 self.update_syncing_state(Syncing::Idle);
749 return None;
750 };
751
752 let (blocks_first_strategy, sync_action) = BlocksFirstStrategy::new(
753 self.client.clone(),
754 best_peer.peer_id,
755 best_peer.best_number,
756 self.peer_store.clone(),
757 self.memory_config.clone(),
758 );
759
760 tracing::debug!("Headers-First sync completed, continuing with blocks-first sync");
761 self.update_syncing_state(Syncing::BlocksFirst(Box::new(blocks_first_strategy)));
762
763 Some(sync_action)
764 }
765
766 fn update_syncing_state(&mut self, new: Syncing<Block, Client>) {
767 self.syncing = new;
768 self.is_major_syncing
769 .store(self.syncing.is_major_syncing(), Ordering::Relaxed);
770 }
771
772 pub(super) fn set_idle(&mut self) {
773 self.import_pending_blocks();
774
775 tracing::debug!(
776 best_number = self.client.best_number(),
777 "Blocks-First sync completed, switching to Syncing::Idle"
778 );
779 self.update_syncing_state(Syncing::Idle);
780 }
781
782 pub(super) fn on_inv(&mut self, inventories: Vec<Inventory>, from: PeerId) -> SyncAction {
785 match &mut self.syncing {
786 Syncing::Idle => SyncAction::None,
787 Syncing::BlocksFirst(strategy) => strategy.on_inv(inventories, from),
788 Syncing::HeadersFirst(_) => SyncAction::None,
789 }
790 }
791
792 pub(super) fn on_block(&mut self, block: BitcoinBlock, from: PeerId) -> SyncAction {
793 match &mut self.syncing {
794 Syncing::Idle => SyncAction::None,
795 Syncing::BlocksFirst(strategy) => strategy.on_block(block, from),
796 Syncing::HeadersFirst(strategy) => strategy.on_block(block, from),
797 }
798 }
799
800 pub(super) fn on_headers(&mut self, headers: Vec<BitcoinHeader>, from: PeerId) -> SyncAction {
801 match &mut self.syncing {
802 Syncing::HeadersFirst(strategy) => strategy.on_headers(headers, from),
803 Syncing::BlocksFirst(_) | Syncing::Idle => {
804 tracing::debug!(
805 ?from,
806 "Ignored headers: {:?}",
807 headers
808 .iter()
809 .map(|header| header.block_hash())
810 .collect::<Vec<_>>()
811 );
812 SyncAction::None
813 }
814 }
815 }
816
817 pub(super) fn on_blocks_processed(&mut self, results: ImportManyBlocksResult) {
818 let block_downloader = match &mut self.syncing {
819 Syncing::Idle => return,
820 Syncing::BlocksFirst(strategy) => strategy.block_downloader(),
821 Syncing::HeadersFirst(strategy) => strategy.block_downloader(),
822 };
823
824 block_downloader.handle_processed_blocks(results);
825 }
826
827 pub(super) fn import_pending_blocks(&mut self) {
828 let block_downloader = match &mut self.syncing {
829 Syncing::Idle => return,
830 Syncing::BlocksFirst(strategy) => strategy.block_downloader(),
831 Syncing::HeadersFirst(strategy) => strategy.block_downloader(),
832 };
833
834 if !block_downloader.has_pending_blocks() {
835 return;
836 }
837
838 let (hashes, blocks) = block_downloader.prepare_blocks_for_import(self.sync_target);
839
840 tracing::trace!(
841 blocks = ?hashes,
842 blocks_in_queue = block_downloader.blocks_in_queue_count(),
843 "Scheduling {} blocks for import",
844 blocks.len(),
845 );
846
847 self.import_queue.import_blocks(ImportBlocks {
848 origin: if self.is_major_syncing.load(Ordering::Relaxed) {
849 BlockOrigin::NetworkInitialSync
850 } else {
851 BlockOrigin::NetworkBroadcast
852 },
853 blocks,
854 });
855 }
856}
857
858fn target_block_number(sync_target: Option<u32>, peer_best: u32) -> u32 {
866 match sync_target {
867 Some(target) => peer_best.min(target),
868 None => peer_best,
869 }
870}