1mod block_downloader;
2mod orphan_blocks_pool;
3mod strategy;
4
5use self::strategy::{BlocksFirstStrategy, HeadersFirstStrategy};
6use crate::peer_manager::NewPeer;
7use crate::peer_store::PeerStore;
8use crate::{Error, Latency, MemoryConfig, PeerId, SyncStatus, SyncStrategy};
9use bitcoin::blockdata::block::Header as BitcoinHeader;
10use bitcoin::p2p::message_blockdata::Inventory;
11use bitcoin::{Block as BitcoinBlock, BlockHash};
12use sc_client_api::{AuxStore, HeaderBackend};
13use sc_consensus_nakamoto::{
14 BlockImportQueue, HeaderVerifier, ImportBlocks, ImportManyBlocksResult,
15};
16use serde::{Deserialize, Serialize};
17use sp_consensus::BlockOrigin;
18use sp_runtime::traits::Block as BlockT;
19use std::cmp::Ordering as CmpOrdering;
20use std::collections::HashMap;
21use std::marker::PhantomData;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, Ordering};
24use subcoin_primitives::ClientExt;
25
26const MAJOR_SYNC_GAP: u32 = 144;
28
29const PEER_SCORE_IMPROVEMENT_THRESHOLD: f64 = 1.2;
33
34const LOW_LATENCY_CUTOFF: Latency = 20;
36
37const MAX_STALLS: usize = 5;
39
40const LATENCY_WEIGHT: f64 = 0.7;
42
43const RELIABILITY_WEIGHT: f64 = 0.3;
45
46const BASE_RELIABILITY_SCORE: f64 = 100.0;
48
49#[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
51#[serde(rename_all = "camelCase")]
52pub enum PeerSyncState {
53 Available,
55 Deprioritized {
57 stalled_count: usize,
59 },
60 DownloadingNew { start: u32 },
62}
63
64impl PeerSyncState {
65 pub fn is_available(&self) -> bool {
67 matches!(self, Self::Available)
68 }
69
70 fn stalled_count(&self) -> usize {
71 match self {
72 Self::Deprioritized { stalled_count } => *stalled_count,
73 _ => 0,
74 }
75 }
76
77 fn is_permanently_deprioritized(&self) -> bool {
79 match self {
80 PeerSyncState::Deprioritized { stalled_count } => *stalled_count > MAX_STALLS,
81 _ => false,
82 }
83 }
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88#[serde(rename_all = "camelCase")]
89pub struct PeerSync {
90 pub peer_id: PeerId,
92 pub best_number: u32,
94 pub latency: Latency,
96 pub state: PeerSyncState,
99}
100
101impl PeerSync {
102 pub fn peer_score(&self) -> f64 {
105 let latency_score = self.latency as f64;
107
108 let reliability_penalty = self.state.stalled_count() as f64 * 20.0; let reliability_score = BASE_RELIABILITY_SCORE + reliability_penalty;
111
112 (latency_score * LATENCY_WEIGHT) + (reliability_score * RELIABILITY_WEIGHT)
114 }
115}
116
117#[derive(Debug, PartialEq, Eq)]
119pub struct LocatorRequest {
120 pub locator_hashes: Vec<BlockHash>,
121 pub stop_hash: BlockHash,
122 pub to: PeerId,
123}
124
125#[derive(Debug, PartialEq, Eq)]
127pub enum SyncRequest {
128 Header(LocatorRequest),
130 Inventory(LocatorRequest),
132 Data(Vec<Inventory>, PeerId),
134}
135
136#[derive(Debug)]
138pub enum SyncAction {
139 Request(SyncRequest),
141 SwitchToBlocksFirstSync,
144 DisconnectPeer(PeerId, Error),
146 RestartSyncWithStalledPeer(PeerId),
149 SetIdle,
151 None,
153}
154
155impl SyncAction {
156 pub(crate) fn get_headers(request: LocatorRequest) -> Self {
157 Self::Request(SyncRequest::Header(request))
158 }
159
160 pub(crate) fn get_inventory(request: LocatorRequest) -> Self {
161 Self::Request(SyncRequest::Inventory(request))
162 }
163
164 pub(crate) fn get_data(inv: Vec<Inventory>, from: PeerId) -> Self {
165 Self::Request(SyncRequest::Data(inv, from))
166 }
167}
168
169#[derive(Debug)]
170pub(crate) enum RestartReason {
171 Stalled,
172 Disconnected,
173}
174
175enum Syncing<Block, Client> {
178 BlocksFirst(Box<BlocksFirstStrategy<Block, Client>>),
180 HeadersFirst(Box<HeadersFirstStrategy<Block, Client>>),
182 Idle,
187}
188
189impl<Block, Client> Syncing<Block, Client> {
190 fn is_major_syncing(&self) -> bool {
191 matches!(self, Self::BlocksFirst(_) | Self::HeadersFirst(_))
192 }
193}
194
195pub(crate) struct ChainSync<Block, Client> {
197 client: Arc<Client>,
199 header_verifier: HeaderVerifier<Block, Client>,
201 pub(crate) peers: HashMap<PeerId, PeerSync>,
203 syncing: Syncing<Block, Client>,
205 pub(crate) import_queue: BlockImportQueue,
207 sync_strategy: SyncStrategy,
209 is_major_syncing: Arc<AtomicBool>,
211 enable_block_sync: bool,
213 peer_store: Arc<dyn PeerStore>,
215 sync_target: Option<u32>,
217 min_sync_peer_threshold: usize,
218 memory_config: MemoryConfig,
220 _phantom: PhantomData<Block>,
221}
222
223impl<Block, Client> ChainSync<Block, Client>
224where
225 Block: BlockT,
226 Client: HeaderBackend<Block> + AuxStore,
227{
228 #[allow(clippy::too_many_arguments)]
230 pub(super) fn new(
231 client: Arc<Client>,
232 header_verifier: HeaderVerifier<Block, Client>,
233 import_queue: BlockImportQueue,
234 sync_strategy: SyncStrategy,
235 is_major_syncing: Arc<AtomicBool>,
236 enable_block_sync: bool,
237 peer_store: Arc<dyn PeerStore>,
238 sync_target: Option<u32>,
239 min_sync_peer_threshold: usize,
240 memory_config: MemoryConfig,
241 ) -> Self {
242 Self {
243 client,
244 header_verifier,
245 peers: HashMap::new(),
246 syncing: Syncing::Idle,
247 import_queue,
248 sync_strategy,
249 is_major_syncing,
250 enable_block_sync,
251 peer_store,
252 sync_target,
253 min_sync_peer_threshold,
254 memory_config,
255 _phantom: Default::default(),
256 }
257 }
258
259 pub(super) fn sync_status(&self) -> SyncStatus {
260 match &self.syncing {
261 Syncing::Idle => SyncStatus::Idle,
262 Syncing::BlocksFirst(strategy) => strategy.sync_status(),
263 Syncing::HeadersFirst(strategy) => strategy.sync_status(),
264 }
265 }
266
267 pub(super) fn is_idle(&self) -> bool {
268 matches!(self.syncing, Syncing::Idle)
269 }
270
271 pub(super) fn on_tick(&mut self) -> SyncAction {
272 match &mut self.syncing {
273 Syncing::Idle => SyncAction::None,
274 Syncing::BlocksFirst(strategy) => strategy.on_tick(),
275 Syncing::HeadersFirst(strategy) => strategy.on_tick(),
276 }
277 }
278
279 pub(super) async fn wait_for_block_import_results(&mut self) -> ImportManyBlocksResult {
280 self.import_queue.block_import_results().await
281 }
282
283 pub(super) fn unreliable_peers(&self) -> Vec<PeerId> {
284 self.peers
285 .iter()
286 .filter_map(|(peer_id, peer)| {
287 peer.state
288 .is_permanently_deprioritized()
289 .then_some(peer_id)
290 .copied()
291 })
292 .collect()
293 }
294
295 pub(super) fn disconnect(&mut self, peer_id: PeerId) {
297 if let Some(removed_peer) = self.peers.remove(&peer_id) {
298 if matches!(removed_peer.state, PeerSyncState::DownloadingNew { .. }) {
301 self.restart_sync(removed_peer.peer_id, RestartReason::Disconnected);
302 }
303 }
304 }
305
306 pub(super) fn update_peer_best(&mut self, peer_id: PeerId, peer_best: u32) {
307 let mut peer_best_updated = false;
308
309 self.peers.entry(peer_id).and_modify(|e| {
310 if peer_best > e.best_number {
311 tracing::debug!(
312 "Tip of {peer_id:?} updated from #{} to #{peer_best}",
313 e.best_number
314 );
315 e.best_number = peer_best;
316 peer_best_updated = true;
317 }
318 });
319
320 if peer_best_updated {
321 match &mut self.syncing {
322 Syncing::Idle => {}
323 Syncing::BlocksFirst(strategy) => strategy.set_peer_best(peer_id, peer_best),
324 Syncing::HeadersFirst(strategy) => strategy.set_peer_best(peer_id, peer_best),
325 }
326 }
327 }
328
329 fn select_next_peer_for_sync(
332 &mut self,
333 our_best: u32,
334 excluded_peer: PeerId,
335 ) -> Option<PeerId> {
336 let best_available = self
338 .peers
339 .values()
340 .filter(|peer| {
341 peer.peer_id != excluded_peer
342 && peer.best_number > our_best
343 && peer.state.is_available()
344 })
345 .min_by(|a, b| {
346 let score_a = a.peer_score();
347 let score_b = b.peer_score();
348 score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
349 })
350 .map(|peer| peer.peer_id);
351
352 best_available.or_else(|| {
353 self.peers
355 .values()
356 .filter(|peer| {
357 peer.peer_id != excluded_peer
358 && peer.best_number > our_best
359 && !peer.state.is_permanently_deprioritized()
360 })
361 .min_by(|a, b| {
362 let score_a = a.peer_score();
363 let score_b = b.peer_score();
364 score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
365 })
366 .map(|peer| peer.peer_id)
367 })
368 }
369
370 pub(super) fn restart_sync(&mut self, prior_peer_id: PeerId, reason: RestartReason) {
374 let our_best = self.client.best_number();
375
376 let Some(new_peer_id) = self.select_next_peer_for_sync(our_best, prior_peer_id) else {
377 if let Some(median_seen_block) = self.median_seen()
378 && median_seen_block <= our_best
379 {
380 let best_seen_block = self.peers.values().map(|p| p.best_number).max();
381
382 tracing::debug!(
387 best_seen_block,
388 median_seen_block,
389 our_best,
390 "Synced to the majority of peers, switching to Idle"
391 );
392 self.update_syncing_state(Syncing::Idle);
393 return;
394 }
395
396 tracing::debug!(
399 ?prior_peer_id,
400 "⚠️ Attempting to restart sync, but no new sync candidate available"
401 );
402
403 return;
404 };
405
406 {
407 let Some(new_peer) = self.peers.get_mut(&new_peer_id) else {
408 tracing::error!("Corrupted state, next peer {new_peer_id} missing from peer list");
409 return;
410 };
411
412 tracing::debug!(?reason, prior_peer = ?prior_peer_id, ?new_peer, "🔄 Sync restarted");
413 new_peer.state = PeerSyncState::DownloadingNew { start: our_best };
414
415 let target_block_number = target_block_number(self.sync_target, new_peer.best_number);
416
417 match &mut self.syncing {
418 Syncing::BlocksFirst(strategy) => {
419 strategy.restart(new_peer.peer_id, target_block_number);
420 }
421 Syncing::HeadersFirst(strategy) => {
422 strategy.restart(new_peer.peer_id, target_block_number);
423 }
424 Syncing::Idle => {}
425 }
426 }
427
428 match reason {
429 RestartReason::Stalled => {
430 self.peers.entry(prior_peer_id).and_modify(|p| {
431 let current_stalled_count = p.state.stalled_count();
432 p.state = PeerSyncState::Deprioritized {
433 stalled_count: current_stalled_count + 1,
434 };
435 });
436 }
437 RestartReason::Disconnected => {
438 }
440 }
441 }
442
443 fn median_seen(&self) -> Option<u32> {
445 let mut best_seens = self
446 .peers
447 .values()
448 .map(|p| p.best_number)
449 .collect::<Vec<_>>();
450
451 if best_seens.is_empty() {
452 None
453 } else {
454 let middle = best_seens.len() / 2;
455
456 Some(*best_seens.select_nth_unstable(middle).1)
457 }
458 }
459
460 pub(super) fn update_peer_latency(&mut self, peer_id: PeerId, avg_latency: Latency) {
461 self.peers.entry(peer_id).and_modify(|peer| {
462 peer.latency = avg_latency;
463 });
464 self.update_sync_peer_on_lower_latency();
465 }
466
467 pub(super) fn update_sync_peer_on_lower_latency(&mut self) {
468 let maybe_sync_peer_id = match &self.syncing {
469 Syncing::Idle => return,
470 Syncing::BlocksFirst(strategy) => strategy.can_swap_sync_peer(),
471 Syncing::HeadersFirst(strategy) => strategy.can_swap_sync_peer(),
472 };
473
474 let Some(current_sync_peer_id) = maybe_sync_peer_id else {
475 return;
476 };
477
478 let Some(current_latency) = self
479 .peers
480 .get(¤t_sync_peer_id)
481 .map(|peer| peer.latency)
482 else {
483 return;
484 };
485
486 if current_latency <= LOW_LATENCY_CUTOFF {
487 tracing::trace!(
488 peer_id = ?current_sync_peer_id,
489 "Skipping sync peer update as the current latency ({current_latency}ms) is already low enough"
490 );
491 }
492
493 let our_best = self.client.best_number();
494
495 let Some(best_sync_peer) = self
497 .peers
498 .values()
499 .filter(|peer| {
500 peer.peer_id != current_sync_peer_id
501 && peer.best_number > our_best
502 && peer.state.is_available()
503 })
504 .min_by(|a, b| {
505 let score_a = a.peer_score();
506 let score_b = b.peer_score();
507 score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
508 })
509 else {
510 return;
511 };
512
513 let Some(current_peer) = self.peers.get(¤t_sync_peer_id) else {
514 tracing::debug!(
515 peer_id = ?current_sync_peer_id,
516 "Current sync peer not found in peer list, skipping peer update"
517 );
518 return;
519 };
520
521 let current_score = current_peer.peer_score();
522 let best_score = best_sync_peer.peer_score();
523
524 if best_score > 0.0 && current_score / best_score > PEER_SCORE_IMPROVEMENT_THRESHOLD {
527 let peer_id = best_sync_peer.peer_id;
528 let target_block_number =
529 target_block_number(self.sync_target, best_sync_peer.best_number);
530
531 let sync_peer_updated = match &mut self.syncing {
532 Syncing::BlocksFirst(strategy) => {
533 strategy.swap_sync_peer(peer_id, target_block_number);
534 true
535 }
536 Syncing::HeadersFirst(strategy) => {
537 strategy.swap_sync_peer(peer_id, target_block_number);
538 true
539 }
540 Syncing::Idle => unreachable!("Must not be Idle as checked; qed"),
541 };
542
543 if sync_peer_updated {
544 tracing::debug!(
545 old_peer_id = ?current_sync_peer_id,
546 new_peer_id = ?peer_id,
547 old_score = %current_score,
548 new_score = %best_score,
549 "🔧 Sync peer updated to a peer with better score (latency: {} ms -> {} ms)",
550 current_peer.latency,
551 best_sync_peer.latency,
552 );
553 self.peers.entry(current_sync_peer_id).and_modify(|peer| {
554 peer.state = PeerSyncState::Available;
555 });
556 self.peers.entry(peer_id).and_modify(|peer| {
557 peer.state = PeerSyncState::DownloadingNew { start: our_best };
558 });
559 }
560 }
561 }
562
563 pub(super) fn add_new_peer(&mut self, new_peer: NewPeer) -> SyncAction {
565 let NewPeer {
566 peer_id,
567 best_number,
568 latency,
569 } = new_peer;
570
571 let new_peer = PeerSync {
572 peer_id,
573 best_number,
574 latency,
575 state: PeerSyncState::Available,
576 };
577
578 self.peers.insert(peer_id, new_peer);
579
580 if self.enable_block_sync {
581 self.attempt_sync_start()
582 } else {
583 SyncAction::None
584 }
585 }
586
587 pub(super) fn start_block_sync(&mut self) -> SyncAction {
588 self.enable_block_sync = true;
589 self.attempt_sync_start()
590 }
591
592 fn attempt_sync_start(&mut self) -> SyncAction {
593 if self.syncing.is_major_syncing() {
594 return SyncAction::None;
595 }
596
597 if self.peers.len() < self.min_sync_peer_threshold {
598 tracing::debug!(
599 "Waiting for more sync peers, discovered {} peers, require {} peers",
600 self.peers.len(),
601 self.min_sync_peer_threshold
602 );
603 return SyncAction::None;
604 }
605
606 let our_best = self.client.best_number();
607
608 if let Some(sync_target) = self.sync_target
609 && our_best >= sync_target
610 {
611 return SyncAction::None;
612 }
613
614 let find_best_available_peer = || {
615 self.peers
616 .iter()
617 .filter(|(_peer_id, peer)| peer.best_number > our_best && peer.state.is_available())
618 .min_by(|(_, a), (_, b)| {
619 let score_a = a.peer_score();
620 let score_b = b.peer_score();
621 score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
622 })
623 .map(|(peer_id, _peer)| peer_id)
624 };
625
626 let find_best_deprioritized_peer = || {
627 self.peers
628 .iter()
629 .filter(|(_peer_id, peer)| peer.best_number > our_best)
630 .filter_map(|(peer_id, peer)| {
631 if let PeerSyncState::Deprioritized { stalled_count } = peer.state {
632 if stalled_count > MAX_STALLS {
633 None
634 } else {
635 Some((peer_id, stalled_count, peer.latency))
636 }
637 } else {
638 None
639 }
640 })
641 .min_by(
642 |(_, stalled_count_a, latency_a), (_, stalled_count_b, latency_b)| {
643 match stalled_count_a.cmp(stalled_count_b) {
645 CmpOrdering::Equal => latency_a.cmp(latency_b), other => other, }
648 },
649 )
650 .map(|(peer_id, _stalled_count, _latency)| peer_id)
651 };
652
653 let Some(next_peer_id) = find_best_available_peer()
654 .or_else(find_best_deprioritized_peer)
655 .copied()
656 else {
657 return SyncAction::None;
658 };
659
660 let Some(next_peer) = self.peers.get_mut(&next_peer_id) else {
661 return SyncAction::None;
662 };
663
664 let client = self.client.clone();
665 let peer_store = self.peer_store.clone();
666
667 let peer_best = next_peer.best_number;
668 let require_major_sync = peer_best - our_best > MAJOR_SYNC_GAP;
669
670 let target_block_number = target_block_number(self.sync_target, peer_best);
671
672 let (new_syncing, sync_action) = if require_major_sync {
674 let blocks_first = our_best >= crate::checkpoint::last_checkpoint_height()
675 || matches!(self.sync_strategy, SyncStrategy::BlocksFirst);
676
677 tracing::debug!(
678 latency = ?next_peer.latency,
679 "⏩ Starting major sync ({}) from {next_peer_id:?} at #{our_best}",
680 if blocks_first { "blocks-first" } else { "headers-first" }
681 );
682
683 if blocks_first {
684 let (sync_strategy, sync_action) = BlocksFirstStrategy::new(
685 client,
686 next_peer_id,
687 target_block_number,
688 peer_store,
689 self.memory_config.clone(),
690 );
691 (Syncing::BlocksFirst(Box::new(sync_strategy)), sync_action)
692 } else {
693 let (sync_strategy, sync_action) = HeadersFirstStrategy::new(
694 client,
695 self.header_verifier.clone(),
696 next_peer_id,
697 target_block_number,
698 peer_store,
699 self.memory_config.clone(),
700 );
701 (Syncing::HeadersFirst(Box::new(sync_strategy)), sync_action)
702 }
703 } else {
704 let (sync_strategy, sync_action) = BlocksFirstStrategy::new(
705 client,
706 next_peer_id,
707 target_block_number,
708 peer_store,
709 self.memory_config.clone(),
710 );
711 (Syncing::BlocksFirst(Box::new(sync_strategy)), sync_action)
712 };
713
714 next_peer.state = PeerSyncState::DownloadingNew { start: our_best };
715 self.update_syncing_state(new_syncing);
716
717 sync_action
718 }
719
720 pub(super) fn start_blocks_first_sync(&mut self) -> Option<SyncAction> {
721 if matches!(self.syncing, Syncing::BlocksFirst(_)) {
722 return None;
723 }
724
725 self.import_pending_blocks();
727
728 let our_best = self.client.best_number();
729
730 let Some(best_peer) = self
731 .peers
732 .values()
733 .filter(|peer| peer.best_number > our_best && peer.state.is_available())
734 .min_by(|a, b| {
735 let score_a = a.peer_score();
736 let score_b = b.peer_score();
737 score_a.partial_cmp(&score_b).unwrap_or(CmpOrdering::Equal)
738 })
739 else {
740 self.update_syncing_state(Syncing::Idle);
741 return None;
742 };
743
744 let (blocks_first_strategy, sync_action) = BlocksFirstStrategy::new(
745 self.client.clone(),
746 best_peer.peer_id,
747 best_peer.best_number,
748 self.peer_store.clone(),
749 self.memory_config.clone(),
750 );
751
752 tracing::debug!("Headers-First sync completed, continuing with blocks-first sync");
753 self.update_syncing_state(Syncing::BlocksFirst(Box::new(blocks_first_strategy)));
754
755 Some(sync_action)
756 }
757
758 fn update_syncing_state(&mut self, new: Syncing<Block, Client>) {
759 self.syncing = new;
760 self.is_major_syncing
761 .store(self.syncing.is_major_syncing(), Ordering::Relaxed);
762 }
763
764 pub(super) fn set_idle(&mut self) {
765 self.import_pending_blocks();
766
767 tracing::debug!(
768 best_number = self.client.best_number(),
769 "Blocks-First sync completed, switching to Syncing::Idle"
770 );
771 self.update_syncing_state(Syncing::Idle);
772 }
773
774 pub(super) fn on_inv(&mut self, inventories: Vec<Inventory>, from: PeerId) -> SyncAction {
777 match &mut self.syncing {
778 Syncing::Idle => SyncAction::None,
779 Syncing::BlocksFirst(strategy) => strategy.on_inv(inventories, from),
780 Syncing::HeadersFirst(_) => SyncAction::None,
781 }
782 }
783
784 pub(super) fn on_block(&mut self, block: BitcoinBlock, from: PeerId) -> SyncAction {
785 match &mut self.syncing {
786 Syncing::Idle => SyncAction::None,
787 Syncing::BlocksFirst(strategy) => strategy.on_block(block, from),
788 Syncing::HeadersFirst(strategy) => strategy.on_block(block, from),
789 }
790 }
791
792 pub(super) fn on_headers(&mut self, headers: Vec<BitcoinHeader>, from: PeerId) -> SyncAction {
793 match &mut self.syncing {
794 Syncing::HeadersFirst(strategy) => strategy.on_headers(headers, from),
795 Syncing::BlocksFirst(_) | Syncing::Idle => {
796 tracing::debug!(
797 ?from,
798 "Ignored headers: {:?}",
799 headers
800 .iter()
801 .map(|header| header.block_hash())
802 .collect::<Vec<_>>()
803 );
804 SyncAction::None
805 }
806 }
807 }
808
809 pub(super) fn on_blocks_processed(&mut self, results: ImportManyBlocksResult) {
810 let block_downloader = match &mut self.syncing {
811 Syncing::Idle => return,
812 Syncing::BlocksFirst(strategy) => strategy.block_downloader(),
813 Syncing::HeadersFirst(strategy) => strategy.block_downloader(),
814 };
815 block_downloader.handle_processed_blocks(results);
816 }
817
818 pub(super) fn import_pending_blocks(&mut self) {
819 let block_downloader = match &mut self.syncing {
820 Syncing::Idle => return,
821 Syncing::BlocksFirst(strategy) => strategy.block_downloader(),
822 Syncing::HeadersFirst(strategy) => strategy.block_downloader(),
823 };
824
825 if !block_downloader.has_pending_blocks() {
826 return;
827 }
828
829 let (hashes, blocks) = block_downloader.prepare_blocks_for_import(self.sync_target);
830
831 tracing::trace!(
832 blocks = ?hashes,
833 blocks_in_queue = block_downloader.blocks_in_queue_count(),
834 "Scheduling {} blocks for import",
835 blocks.len(),
836 );
837
838 self.import_queue.import_blocks(ImportBlocks {
839 origin: if self.is_major_syncing.load(Ordering::Relaxed) {
840 BlockOrigin::NetworkInitialSync
841 } else {
842 BlockOrigin::NetworkBroadcast
843 },
844 blocks,
845 });
846 }
847}
848
849fn target_block_number(sync_target: Option<u32>, peer_best: u32) -> u32 {
857 match sync_target {
858 Some(target) => peer_best.min(target),
859 None => peer_best,
860 }
861}