sc_consensus_nakamoto/
import_queue.rs

1//! This module defines the [`BlockImportQueue`] struct, which separates the block download
2//! and import processes. The queue operates in a distinct blocking task, receiving blocks
3//! downloaded from the Bitcoin P2P network by subcoin-network and importing them into
4//! the database. Import results are then communicated back.
5
6use crate::block_import::{BitcoinBlockImport, ImportStatus};
7use bitcoin::{Block as BitcoinBlock, BlockHash};
8use futures::StreamExt;
9use futures::prelude::*;
10use futures::task::{Context, Poll};
11use sc_consensus::{BlockImportError, BlockImportParams};
12use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender, tracing_unbounded};
13use sp_consensus::BlockOrigin;
14use sp_core::traits::SpawnEssentialNamed;
15use sp_runtime::traits::Block as BlockT;
16use std::pin::Pin;
17
18/// Represents a batch of Bitcoin blocks that are to be imported.
19#[derive(Debug, Clone)]
20pub struct ImportBlocks {
21    /// The source from which the blocks were obtained.
22    pub origin: BlockOrigin,
23    /// A vector containing the Bitcoin blocks to be imported.
24    pub blocks: Vec<BitcoinBlock>,
25}
26
27/// Subcoin import queue for processing Bitcoin blocks.
28#[derive(Debug)]
29pub struct BlockImportQueue {
30    block_import_sender: TracingUnboundedSender<ImportBlocks>,
31    import_result_receiver: TracingUnboundedReceiver<ImportManyBlocksResult>,
32}
33
34impl BlockImportQueue {
35    /// Sends a batch of blocks to the worker of import queue for processing.
36    pub fn import_blocks(&self, incoming_blocks: ImportBlocks) {
37        let _ = self.block_import_sender.unbounded_send(incoming_blocks);
38    }
39
40    /// Retrieves the results of the block import operations.
41    ///
42    /// This asynchronous function waits for and returns the results of the block import process.
43    /// It consumes the next available result from the import queue.
44    pub async fn block_import_results(&mut self) -> ImportManyBlocksResult {
45        self.import_result_receiver.select_next_some().await
46    }
47}
48
49/// Creates a new import queue.
50pub fn bitcoin_import_queue<BI>(
51    spawner: &impl SpawnEssentialNamed,
52    mut block_import: BI,
53) -> BlockImportQueue
54where
55    BI: BitcoinBlockImport,
56{
57    let (import_result_sender, import_result_receiver) =
58        tracing_unbounded("mpsc_subcoin_import_queue_result", 100_000);
59
60    let (block_import_sender, block_import_receiver) =
61        tracing_unbounded("mpsc_subcoin_import_queue_worker_blocks", 100_000);
62
63    let future = async move {
64        let block_import_process = block_import_process(
65            &mut block_import,
66            import_result_sender.clone(),
67            block_import_receiver,
68        );
69        futures::pin_mut!(block_import_process);
70
71        loop {
72            // If the results sender is closed, that means that the import queue is shutting
73            // down and we should end this future.
74            if import_result_sender.is_closed() {
75                tracing::debug!("Stopping block import because result channel was closed!");
76                return;
77            }
78
79            if let Poll::Ready(()) = futures::poll!(&mut block_import_process) {
80                return;
81            }
82
83            // All futures that we polled are now pending.
84            futures::pending!()
85        }
86    };
87
88    spawner.spawn_essential_blocking(
89        "bitcoin-block-import-worker",
90        Some("bitcoin-block-import"),
91        future.boxed(),
92    );
93
94    BlockImportQueue {
95        block_import_sender,
96        import_result_receiver,
97    }
98}
99
100/// A dummy verifier that verifies nothing against the block.
101pub struct VerifyNothing;
102
103#[async_trait::async_trait]
104impl<Block: BlockT> sc_consensus::Verifier<Block> for VerifyNothing {
105    async fn verify(
106        &self,
107        block: BlockImportParams<Block>,
108    ) -> Result<BlockImportParams<Block>, String> {
109        Ok(BlockImportParams::new(block.origin, block.header))
110    }
111}
112
113/// The process of importing blocks.
114///
115/// This polls the `block_import_receiver` for new blocks to import and than awaits on
116/// importing these blocks. After each block is imported, this async function yields once
117/// to give other futures the possibility to be run.
118///
119/// Returns when `block_import` ended.
120async fn block_import_process(
121    block_import: &mut dyn BitcoinBlockImport,
122    result_sender: TracingUnboundedSender<ImportManyBlocksResult>,
123    mut block_import_receiver: TracingUnboundedReceiver<ImportBlocks>,
124) {
125    loop {
126        let Some(ImportBlocks { origin, blocks }) = block_import_receiver.next().await else {
127            tracing::debug!("Stopping block import because the import channel was closed!",);
128            return;
129        };
130
131        let res = import_many_blocks(block_import, origin, blocks).await;
132
133        let _ = result_sender.unbounded_send(res);
134    }
135}
136
137type BlockImportStatus = sc_consensus::BlockImportStatus<u32>;
138
139/// Result of `import_many_blocks`.
140#[derive(Debug)]
141pub struct ImportManyBlocksResult {
142    /// The number of blocks imported successfully.
143    pub imported: usize,
144    /// The total number of blocks processed.
145    pub block_count: usize,
146    /// The import results for each block.
147    pub results: Vec<(Result<BlockImportStatus, BlockImportError>, BlockHash)>,
148}
149
150/// Import several blocks at once, returning import result for each block.
151///
152/// This will yield after each imported block once, to ensure that other futures can
153/// be called as well.
154async fn import_many_blocks(
155    import_handle: &mut dyn BitcoinBlockImport,
156    origin: BlockOrigin,
157    blocks: Vec<BitcoinBlock>,
158) -> ImportManyBlocksResult {
159    tracing::trace!("[import_many_blocks] importing {} blocks", blocks.len());
160    let count = blocks.len();
161
162    let mut imported = 0;
163    let mut results = vec![];
164    let mut has_error = false;
165
166    // Blocks in the response/drain should be in ascending order.
167    for block in blocks {
168        let block_hash = block.block_hash();
169
170        let block_import_result = if has_error {
171            Err(BlockImportError::Cancelled)
172        } else {
173            // The actual import.
174            let import_result = import_handle.import_block(block, origin).await;
175
176            match import_result {
177                Ok(ImportStatus::AlreadyInChain(number)) => {
178                    tracing::trace!("Block already in chain: #{number},{block_hash:?}");
179                    Ok(BlockImportStatus::ImportedKnown(number, None))
180                }
181                Ok(ImportStatus::Imported {
182                    block_number,
183                    block_hash: _,
184                    aux,
185                }) => Ok(BlockImportStatus::ImportedUnknown(block_number, aux, None)),
186                Ok(ImportStatus::MissingState) => {
187                    tracing::debug!("Parent state is missing for block: {block_hash:?}",);
188                    Err(BlockImportError::MissingState)
189                }
190                Ok(ImportStatus::UnknownParent) => {
191                    tracing::debug!("Block {block_hash:?} has unknown parent");
192                    Err(BlockImportError::UnknownParent)
193                }
194                Ok(ImportStatus::KnownBad) => {
195                    tracing::debug!("Peer gave us a bad block: {block_hash:?}");
196                    Err(BlockImportError::BadBlock(None))
197                }
198                Err(err) => {
199                    tracing::error!(?err, "Error importing block: {block_hash:?}");
200                    Err(BlockImportError::Other(err))
201                }
202            }
203        };
204
205        if let Ok(block_import_status) = &block_import_result {
206            let block_number = block_import_status.number();
207            tracing::debug!("Block imported successfully #{block_number:?},{block_hash}");
208            imported += 1;
209        } else {
210            has_error = true;
211        }
212
213        results.push((block_import_result, block_hash));
214
215        Yield::new().await
216    }
217
218    // No block left to import, success!
219    ImportManyBlocksResult {
220        block_count: count,
221        imported,
222        results,
223    }
224}
225
226/// A future that will always `yield` on the first call of `poll` but schedules the
227/// current task for re-execution.
228///
229/// This is done by getting the waker and calling `wake_by_ref` followed by returning
230/// `Pending`. The next time the `poll` is called, it will return `Ready`.
231struct Yield(bool);
232
233impl Yield {
234    fn new() -> Self {
235        Self(false)
236    }
237}
238
239impl Future for Yield {
240    type Output = ();
241
242    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
243        if !self.0 {
244            self.0 = true;
245            cx.waker().wake_by_ref();
246            Poll::Pending
247        } else {
248            Poll::Ready(())
249        }
250    }
251}