use crate::block_import::{BitcoinBlockImport, ImportStatus};
use bitcoin::{Block as BitcoinBlock, BlockHash};
use futures::prelude::*;
use futures::task::{Context, Poll};
use futures::StreamExt;
use sc_consensus::{BlockImportError, BlockImportParams};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_consensus::BlockOrigin;
use sp_core::traits::SpawnEssentialNamed;
use sp_runtime::traits::Block as BlockT;
use std::pin::Pin;
#[derive(Debug, Clone)]
pub struct ImportBlocks {
pub origin: BlockOrigin,
pub blocks: Vec<BitcoinBlock>,
}
#[derive(Debug)]
pub struct BlockImportQueue {
block_import_sender: TracingUnboundedSender<ImportBlocks>,
import_result_receiver: TracingUnboundedReceiver<ImportManyBlocksResult>,
}
impl BlockImportQueue {
pub fn import_blocks(&self, incoming_blocks: ImportBlocks) {
let _ = self.block_import_sender.unbounded_send(incoming_blocks);
}
pub async fn block_import_results(&mut self) -> ImportManyBlocksResult {
self.import_result_receiver.select_next_some().await
}
}
pub fn bitcoin_import_queue<BI>(
spawner: &impl SpawnEssentialNamed,
mut block_import: BI,
) -> BlockImportQueue
where
BI: BitcoinBlockImport,
{
let (import_result_sender, import_result_receiver) =
tracing_unbounded("mpsc_subcoin_import_queue_result", 100_000);
let (block_import_sender, block_import_receiver) =
tracing_unbounded("mpsc_subcoin_import_queue_worker_blocks", 100_000);
let future = async move {
let block_import_process = block_import_process(
&mut block_import,
import_result_sender.clone(),
block_import_receiver,
);
futures::pin_mut!(block_import_process);
loop {
if import_result_sender.is_closed() {
tracing::debug!("Stopping block import because result channel was closed!");
return;
}
if let Poll::Ready(()) = futures::poll!(&mut block_import_process) {
return;
}
futures::pending!()
}
};
spawner.spawn_essential_blocking(
"bitcoin-block-import-worker",
Some("bitcoin-block-import"),
future.boxed(),
);
BlockImportQueue {
block_import_sender,
import_result_receiver,
}
}
pub struct VerifyNothing;
#[async_trait::async_trait]
impl<Block: BlockT> sc_consensus::Verifier<Block> for VerifyNothing {
async fn verify(
&self,
block: BlockImportParams<Block>,
) -> Result<BlockImportParams<Block>, String> {
Ok(BlockImportParams::new(block.origin, block.header))
}
}
async fn block_import_process(
block_import: &mut dyn BitcoinBlockImport,
result_sender: TracingUnboundedSender<ImportManyBlocksResult>,
mut block_import_receiver: TracingUnboundedReceiver<ImportBlocks>,
) {
loop {
let Some(ImportBlocks { origin, blocks }) = block_import_receiver.next().await else {
tracing::debug!("Stopping block import because the import channel was closed!",);
return;
};
let res = import_many_blocks(block_import, origin, blocks).await;
let _ = result_sender.unbounded_send(res);
}
}
type BlockImportStatus = sc_consensus::BlockImportStatus<u32>;
#[derive(Debug)]
pub struct ImportManyBlocksResult {
pub imported: usize,
pub block_count: usize,
pub results: Vec<(Result<BlockImportStatus, BlockImportError>, BlockHash)>,
}
async fn import_many_blocks(
import_handle: &mut dyn BitcoinBlockImport,
origin: BlockOrigin,
blocks: Vec<BitcoinBlock>,
) -> ImportManyBlocksResult {
tracing::trace!("[import_many_blocks] importing {} blocks", blocks.len());
let count = blocks.len();
let mut imported = 0;
let mut results = vec![];
let mut has_error = false;
for block in blocks {
let block_hash = block.block_hash();
let block_import_result = if has_error {
Err(BlockImportError::Cancelled)
} else {
let import_result = import_handle.import_block(block, origin).await;
match import_result {
Ok(ImportStatus::AlreadyInChain(number)) => {
tracing::trace!("Block already in chain: #{number},{block_hash:?}");
Ok(BlockImportStatus::ImportedKnown(number, None))
}
Ok(ImportStatus::Imported {
block_number,
block_hash: _,
aux,
}) => Ok(BlockImportStatus::ImportedUnknown(block_number, aux, None)),
Ok(ImportStatus::MissingState) => {
tracing::debug!("Parent state is missing for block: {block_hash:?}",);
Err(BlockImportError::MissingState)
}
Ok(ImportStatus::UnknownParent) => {
tracing::debug!("Block {block_hash:?} has unknown parent");
Err(BlockImportError::UnknownParent)
}
Ok(ImportStatus::KnownBad) => {
tracing::debug!("Peer gave us a bad block: {block_hash:?}");
Err(BlockImportError::BadBlock(None))
}
Err(err) => {
tracing::error!(?err, "Error importing block: {block_hash:?}");
Err(BlockImportError::Other(err))
}
}
};
if let Ok(block_import_status) = &block_import_result {
let block_number = block_import_status.number();
tracing::debug!("Block imported successfully #{block_number:?},{block_hash}");
imported += 1;
} else {
has_error = true;
}
results.push((block_import_result, block_hash));
Yield::new().await
}
ImportManyBlocksResult {
block_count: count,
imported,
results,
}
}
struct Yield(bool);
impl Yield {
fn new() -> Self {
Self(false)
}
}
impl Future for Yield {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if !self.0 {
self.0 = true;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
}
}