sc_consensus_nakamoto/
import_queue.rs1use crate::block_import::{BitcoinBlockImport, ImportStatus};
7use bitcoin::{Block as BitcoinBlock, BlockHash};
8use futures::StreamExt;
9use futures::prelude::*;
10use futures::task::{Context, Poll};
11use sc_consensus::{BlockImportError, BlockImportParams};
12use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender, tracing_unbounded};
13use sp_consensus::BlockOrigin;
14use sp_core::traits::SpawnEssentialNamed;
15use sp_runtime::traits::Block as BlockT;
16use std::pin::Pin;
17
18#[derive(Debug, Clone)]
20pub struct ImportBlocks {
21 pub origin: BlockOrigin,
23 pub blocks: Vec<BitcoinBlock>,
25}
26
27#[derive(Debug)]
29pub struct BlockImportQueue {
30 block_import_sender: TracingUnboundedSender<ImportBlocks>,
31 import_result_receiver: TracingUnboundedReceiver<ImportManyBlocksResult>,
32}
33
34impl BlockImportQueue {
35 pub fn import_blocks(&self, incoming_blocks: ImportBlocks) {
37 let _ = self.block_import_sender.unbounded_send(incoming_blocks);
38 }
39
40 pub async fn block_import_results(&mut self) -> ImportManyBlocksResult {
45 self.import_result_receiver.select_next_some().await
46 }
47}
48
49pub fn bitcoin_import_queue<BI>(
51 spawner: &impl SpawnEssentialNamed,
52 mut block_import: BI,
53) -> BlockImportQueue
54where
55 BI: BitcoinBlockImport,
56{
57 let (import_result_sender, import_result_receiver) =
58 tracing_unbounded("mpsc_subcoin_import_queue_result", 100_000);
59
60 let (block_import_sender, block_import_receiver) =
61 tracing_unbounded("mpsc_subcoin_import_queue_worker_blocks", 100_000);
62
63 let future = async move {
64 let block_import_process = block_import_process(
65 &mut block_import,
66 import_result_sender.clone(),
67 block_import_receiver,
68 );
69 futures::pin_mut!(block_import_process);
70
71 loop {
72 if import_result_sender.is_closed() {
75 tracing::debug!("Stopping block import because result channel was closed!");
76 return;
77 }
78
79 if let Poll::Ready(()) = futures::poll!(&mut block_import_process) {
80 return;
81 }
82
83 futures::pending!()
85 }
86 };
87
88 spawner.spawn_essential_blocking(
89 "bitcoin-block-import-worker",
90 Some("bitcoin-block-import"),
91 future.boxed(),
92 );
93
94 BlockImportQueue {
95 block_import_sender,
96 import_result_receiver,
97 }
98}
99
100pub struct VerifyNothing;
102
103#[async_trait::async_trait]
104impl<Block: BlockT> sc_consensus::Verifier<Block> for VerifyNothing {
105 async fn verify(
106 &self,
107 block: BlockImportParams<Block>,
108 ) -> Result<BlockImportParams<Block>, String> {
109 Ok(BlockImportParams::new(block.origin, block.header))
110 }
111}
112
113async fn block_import_process(
121 block_import: &mut dyn BitcoinBlockImport,
122 result_sender: TracingUnboundedSender<ImportManyBlocksResult>,
123 mut block_import_receiver: TracingUnboundedReceiver<ImportBlocks>,
124) {
125 loop {
126 let Some(ImportBlocks { origin, blocks }) = block_import_receiver.next().await else {
127 tracing::debug!("Stopping block import because the import channel was closed!",);
128 return;
129 };
130
131 let res = import_many_blocks(block_import, origin, blocks).await;
132
133 let _ = result_sender.unbounded_send(res);
134 }
135}
136
137type BlockImportStatus = sc_consensus::BlockImportStatus<u32>;
138
139#[derive(Debug)]
141pub struct ImportManyBlocksResult {
142 pub imported: usize,
144 pub block_count: usize,
146 pub results: Vec<(Result<BlockImportStatus, BlockImportError>, BlockHash)>,
148}
149
150async fn import_many_blocks(
155 import_handle: &mut dyn BitcoinBlockImport,
156 origin: BlockOrigin,
157 blocks: Vec<BitcoinBlock>,
158) -> ImportManyBlocksResult {
159 tracing::trace!("[import_many_blocks] importing {} blocks", blocks.len());
160 let count = blocks.len();
161
162 let mut imported = 0;
163 let mut results = vec![];
164 let mut has_error = false;
165
166 for block in blocks {
168 let block_hash = block.block_hash();
169
170 let block_import_result = if has_error {
171 Err(BlockImportError::Cancelled)
172 } else {
173 let import_result = import_handle.import_block(block, origin).await;
175
176 match import_result {
177 Ok(ImportStatus::AlreadyInChain(number)) => {
178 tracing::trace!("Block already in chain: #{number},{block_hash:?}");
179 Ok(BlockImportStatus::ImportedKnown(number, None))
180 }
181 Ok(ImportStatus::Imported {
182 block_number,
183 block_hash: _,
184 aux,
185 }) => Ok(BlockImportStatus::ImportedUnknown(block_number, aux, None)),
186 Ok(ImportStatus::MissingState) => {
187 tracing::debug!("Parent state is missing for block: {block_hash:?}",);
188 Err(BlockImportError::MissingState)
189 }
190 Ok(ImportStatus::UnknownParent) => {
191 tracing::debug!("Block {block_hash:?} has unknown parent");
192 Err(BlockImportError::UnknownParent)
193 }
194 Ok(ImportStatus::KnownBad) => {
195 tracing::debug!("Peer gave us a bad block: {block_hash:?}");
196 Err(BlockImportError::BadBlock(None))
197 }
198 Err(err) => {
199 tracing::error!(?err, "Error importing block: {block_hash:?}");
200 Err(BlockImportError::Other(err))
201 }
202 }
203 };
204
205 if let Ok(block_import_status) = &block_import_result {
206 let block_number = block_import_status.number();
207 tracing::debug!("Block imported successfully #{block_number:?},{block_hash}");
208 imported += 1;
209 } else {
210 has_error = true;
211 }
212
213 results.push((block_import_result, block_hash));
214
215 Yield::new().await
216 }
217
218 ImportManyBlocksResult {
220 block_count: count,
221 imported,
222 results,
223 }
224}
225
226struct Yield(bool);
232
233impl Yield {
234 fn new() -> Self {
235 Self(false)
236 }
237}
238
239impl Future for Yield {
240 type Output = ();
241
242 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
243 if !self.0 {
244 self.0 = true;
245 cx.waker().wake_by_ref();
246 Poll::Pending
247 } else {
248 Poll::Ready(())
249 }
250 }
251}