subcoin_indexer/
transaction_indexer.rs

1use bitcoin::Txid;
2use bitcoin::hashes::Hash;
3use codec::{Decode, Encode};
4use futures::StreamExt;
5use sc_client_api::backend::AuxStore;
6use sc_client_api::{BlockBackend, BlockchainEvents, HeaderBackend, StorageProvider};
7use sc_service::SpawnTaskHandle;
8use sp_runtime::generic::SignedBlock;
9use sp_runtime::traits::{Block as BlockT, Header, SaturatedConversion};
10use std::marker::PhantomData;
11use std::sync::Arc;
12use subcoin_primitives::{BitcoinTransactionAdapter, TransactionIndex, TxPosition};
13
14const TX_INDEX_GAP_KEY: &[u8] = b"tx_index_gap";
15
16const INDEXED_BLOCK_RANGE_KEY: &[u8] = b"tx_indexed_block_range";
17
18type IndexRange = std::ops::Range<u32>;
19
20/// The range of indexed blocks.
21/// - `start`: The first block number indexed.
22/// - `end`: One past the last indexed block.
23type IndexedBlockRange = IndexRange;
24
25/// Represents actions applied to a block during transaction indexing.
26#[derive(Debug, Clone, Copy)]
27enum IndexAction {
28    Apply,
29    Revert,
30}
31
32/// Indexer error type.
33#[derive(Debug, thiserror::Error)]
34pub enum IndexerError {
35    #[error("Block not found: {0}")]
36    BlockNotFound(String),
37
38    #[error("Inconsistent block range. Indexed: {indexed:?}, Processed: {processed}")]
39    InconsistentBlockRange {
40        indexed: Option<IndexedBlockRange>,
41        processed: u32,
42    },
43
44    #[error("Failed to decode data: {0}")]
45    DecodeError(#[from] codec::Error),
46
47    #[error(transparent)]
48    Blockchain(#[from] sp_blockchain::Error),
49}
50
51/// Add result type alias
52pub type Result<T> = std::result::Result<T, IndexerError>;
53
54/// Provides transaction indexing functionality for Bitcoin transactions in Substrate blocks.
55///
56/// The indexer maintains a mapping of Bitcoin transaction IDs to their positions within blocks,
57/// allowing efficient transaction lookups. It handles both new block imports and chain reorganizations.
58#[derive(Debug)]
59pub struct TransactionIndexer<Block, Backend, Client, TransactionAdapter> {
60    network: bitcoin::Network,
61    client: Arc<Client>,
62    _phantom: PhantomData<(Block, Backend, TransactionAdapter)>,
63}
64
65impl<Block, Backend, Client, TransactionAdapter> Clone
66    for TransactionIndexer<Block, Backend, Client, TransactionAdapter>
67{
68    fn clone(&self) -> Self {
69        Self {
70            network: self.network,
71            client: self.client.clone(),
72            _phantom: self._phantom,
73        }
74    }
75}
76
77impl<Block, Backend, Client, TransactionAdapter>
78    TransactionIndexer<Block, Backend, Client, TransactionAdapter>
79where
80    Block: BlockT,
81    Backend: sc_client_api::backend::Backend<Block>,
82    Client: BlockchainEvents<Block>
83        + HeaderBackend<Block>
84        + BlockBackend<Block>
85        + StorageProvider<Block, Backend>
86        + AuxStore
87        + 'static,
88    TransactionAdapter: BitcoinTransactionAdapter<Block>,
89{
90    /// Creates a new instance of [`TransactionIndexer`].
91    pub fn new(
92        network: bitcoin::Network,
93        client: Arc<Client>,
94        spawn_handle: SpawnTaskHandle,
95    ) -> Result<Self> {
96        if let Some(gap) = Self::detect_index_gap(&client)? {
97            let client = client.clone();
98            spawn_handle.spawn_blocking("tx-historical-index", None, async move {
99                if let Err(err) = index_historical_blocks::<_, TransactionAdapter, _>(client, gap) {
100                    tracing::error!(?err, "Failed to index historical blocks");
101                }
102            });
103        }
104
105        Ok(Self {
106            network,
107            client,
108            _phantom: Default::default(),
109        })
110    }
111
112    /// Detects if there are any gaps in the transaction index.
113    fn detect_index_gap(client: &Client) -> Result<Option<IndexRange>> {
114        let gap = if let Some(gap) = load_index_gap(client)? {
115            Some(gap)
116        } else if let Some(ref block_range) = load_indexed_block_range(client)? {
117            let best_number: u32 = client.info().best_number.saturated_into();
118            let last_indexed_block = block_range.end.saturating_sub(1);
119
120            if last_indexed_block < best_number {
121                let new_gap = last_indexed_block + 1..best_number + 1;
122                tracing::debug!(
123                    last_indexed = last_indexed_block,
124                    best_number = best_number,
125                    ?new_gap,
126                    "Detected transaction indexing gap"
127                );
128                Some(new_gap)
129            } else {
130                None
131            }
132        } else {
133            None
134        };
135
136        Ok(gap)
137    }
138
139    pub async fn run(self) {
140        let mut block_import_stream = self.client.every_import_notification_stream();
141
142        while let Some(notification) = block_import_stream.next().await {
143            let Ok(Some(SignedBlock {
144                block,
145                justifications: _,
146            })) = self.client.block(notification.hash)
147            else {
148                tracing::error!("Imported block {} unavailable", notification.hash);
149                continue;
150            };
151
152            let res = if let Some(route) = notification.tree_route {
153                self.handle_reorg(route)
154            } else {
155                self.handle_new_block(block)
156            };
157
158            if let Err(err) = res {
159                panic!("Failed to process block#{}: {err:?}", notification.hash);
160            }
161        }
162    }
163
164    /// Handles retracted and enacted blocks during a re-org.
165    fn handle_reorg(&self, route: Arc<sp_blockchain::TreeRoute<Block>>) -> Result<()> {
166        for hash_and_number in route.retracted() {
167            let block = self.get_block(hash_and_number.hash)?;
168            process_block::<_, TransactionAdapter, _>(&*self.client, block, IndexAction::Revert);
169        }
170
171        for hash_and_number in route.enacted() {
172            let block = self.get_block(hash_and_number.hash)?;
173            process_block::<_, TransactionAdapter, _>(&*self.client, block, IndexAction::Apply);
174        }
175
176        Ok(())
177    }
178
179    /// Handles a new block import (non-reorg).
180    fn handle_new_block(&self, block: Block) -> Result<()> {
181        let block_number: u32 = (*block.header().number()).saturated_into();
182
183        process_block::<_, TransactionAdapter, _>(&*self.client, block, IndexAction::Apply);
184
185        let mut indexed_block_range = load_indexed_block_range(&*self.client)?;
186
187        match indexed_block_range.as_mut() {
188            Some(current_range) => {
189                if current_range.end == block_number {
190                    current_range.end += 1;
191                    write_tx_indexed_range(&*self.client, current_range.encode())?;
192                } else {
193                    return Err(IndexerError::InconsistentBlockRange {
194                        indexed: indexed_block_range,
195                        processed: block_number,
196                    });
197                }
198            }
199            None => {
200                let new_range = block_number..block_number + 1;
201                write_tx_indexed_range(&*self.client, new_range.encode())?;
202            }
203        }
204
205        Ok(())
206    }
207
208    fn get_block(&self, block_hash: Block::Hash) -> Result<Block> {
209        self.client
210            .block(block_hash)?
211            .ok_or_else(|| IndexerError::BlockNotFound(format!("{block_hash:?}")))
212            .map(|signed| signed.block)
213    }
214}
215
216fn index_historical_blocks<Block, TransactionAdapter, Client>(
217    client: Arc<Client>,
218    gap_range: IndexRange,
219) -> sp_blockchain::Result<()>
220where
221    Block: BlockT,
222    TransactionAdapter: BitcoinTransactionAdapter<Block>,
223    Client: BlockBackend<Block> + HeaderBackend<Block> + AuxStore,
224{
225    let mut remaining_gap = gap_range.clone();
226
227    tracing::debug!("Starting to index historical blocks in range {gap_range:?}");
228
229    for block_number in gap_range.clone() {
230        let block_hash = client.hash(block_number.into())?.ok_or_else(|| {
231            sp_blockchain::Error::Backend(format!("Hash for block#{block_number} not found"))
232        })?;
233        let block = client
234            .block(block_hash)?
235            .ok_or_else(|| {
236                sp_blockchain::Error::Backend(format!(
237                    "Missing block#{block_number},{block_hash:?}"
238                ))
239            })?
240            .block;
241
242        process_block::<_, TransactionAdapter, _>(&*client, block, IndexAction::Apply);
243
244        remaining_gap.start += 1;
245        write_index_gap(&*client, remaining_gap.encode())?;
246    }
247
248    tracing::debug!("Finished indexing historical blocks. Final gap status: {remaining_gap:?}");
249
250    delete_index_gap(&*client)?;
251
252    // Extends the existing indexed block range or initializes a new range.
253    match load_indexed_block_range(&*client)? {
254        Some(mut existing_range) => {
255            // Extend the range if there is overlap or new blocks beyond the current range.
256            if gap_range.end > existing_range.end {
257                existing_range.end = gap_range.end;
258                write_tx_indexed_range(&*client, existing_range.encode())?;
259            }
260        }
261        None => {
262            tracing::debug!("No prior range exist; initialize new gap range: {gap_range:?}");
263            write_tx_indexed_range(&*client, gap_range.encode())?;
264        }
265    }
266
267    Ok(())
268}
269
270fn process_block<Block, TransactionAdapter, B>(backend: &B, block: Block, index_action: IndexAction)
271where
272    Block: BlockT,
273    TransactionAdapter: BitcoinTransactionAdapter<Block>,
274    B: AuxStore,
275{
276    let block_number: u32 = (*block.header().number()).saturated_into();
277    let bitcoin_block =
278        subcoin_primitives::convert_to_bitcoin_block::<Block, TransactionAdapter>(block)
279            .expect("Failed to convert Substrate block to Bitcoin block");
280    let changes = bitcoin_block
281        .txdata
282        .iter()
283        .enumerate()
284        .map(|(index, tx)| {
285            (
286                tx.compute_txid(),
287                TxPosition {
288                    block_number,
289                    index: index as u32,
290                },
291            )
292        })
293        .collect::<Vec<_>>();
294    if let Err(err) = write_transaction_index_changes(backend, index_action, changes) {
295        tracing::error!(?err, "Failed to write index changes");
296    }
297}
298
299fn load_decode<B, T>(backend: &B, key: &[u8]) -> sp_blockchain::Result<Option<T>>
300where
301    B: AuxStore,
302    T: Decode,
303{
304    match backend.get_aux(key)? {
305        Some(t) => T::decode(&mut &t[..]).map(Some).map_err(|e: codec::Error| {
306            sp_blockchain::Error::Backend(format!("Subcoin DB is corrupted. Decode error: {e}"))
307        }),
308        None => Ok(None),
309    }
310}
311
312fn load_indexed_block_range<B: AuxStore>(
313    backend: &B,
314) -> sp_blockchain::Result<Option<IndexedBlockRange>> {
315    load_decode(backend, INDEXED_BLOCK_RANGE_KEY)
316}
317
318fn write_tx_indexed_range<B: AuxStore>(
319    backend: &B,
320    encoded_indexed_block_range: Vec<u8>,
321) -> sp_blockchain::Result<()> {
322    backend.insert_aux(
323        &[(
324            INDEXED_BLOCK_RANGE_KEY,
325            encoded_indexed_block_range.as_slice(),
326        )],
327        &[],
328    )
329}
330
331fn load_index_gap<B: AuxStore>(backend: &B) -> sp_blockchain::Result<Option<IndexRange>> {
332    load_decode(backend, TX_INDEX_GAP_KEY)
333}
334
335fn write_index_gap<B: AuxStore>(backend: &B, encoded_gap: Vec<u8>) -> sp_blockchain::Result<()> {
336    backend.insert_aux(&[(TX_INDEX_GAP_KEY, encoded_gap.as_slice())], &[])
337}
338
339fn delete_index_gap<B: AuxStore>(backend: &B) -> sp_blockchain::Result<()> {
340    backend.insert_aux([], &[TX_INDEX_GAP_KEY])
341}
342
343fn write_transaction_index_changes<B: AuxStore>(
344    backend: &B,
345    index_action: IndexAction,
346    changes: Vec<(Txid, TxPosition)>,
347) -> sp_blockchain::Result<()> {
348    match index_action {
349        IndexAction::Apply => {
350            let key_values = changes
351                .iter()
352                .map(|(txid, tx_pos)| (txid_key(*txid), tx_pos.encode()))
353                .collect::<Vec<_>>();
354            backend.insert_aux(
355                key_values
356                    .iter()
357                    .map(|(k, v)| (k.as_slice(), v.as_slice()))
358                    .collect::<Vec<_>>()
359                    .iter(),
360                &[],
361            )
362        }
363        IndexAction::Revert => {
364            let keys = changes
365                .iter()
366                .map(|(txid, _tx_pos)| txid_key(*txid))
367                .collect::<Vec<_>>();
368            backend.insert_aux(
369                &[],
370                keys.iter().map(|k| k.as_slice()).collect::<Vec<_>>().iter(),
371            )
372        }
373    }
374}
375
376pub struct TransactionIndexProvider<Client> {
377    client: Arc<Client>,
378}
379
380impl<Client> TransactionIndexProvider<Client> {
381    pub fn new(client: Arc<Client>) -> Self {
382        Self { client }
383    }
384}
385
386impl<Client> TransactionIndex for TransactionIndexProvider<Client>
387where
388    Client: AuxStore,
389{
390    fn tx_index(&self, txid: Txid) -> sp_blockchain::Result<Option<TxPosition>> {
391        load_transaction_index(&*self.client, txid)
392    }
393}
394
395fn txid_key(txid: Txid) -> Vec<u8> {
396    (b"txid", txid.as_byte_array()).encode()
397}
398
399fn load_transaction_index<B: AuxStore>(
400    backend: &B,
401    txid: Txid,
402) -> sp_blockchain::Result<Option<TxPosition>> {
403    load_decode(backend, &txid_key(txid))
404}