1use bitcoin::Txid;
2use bitcoin::hashes::Hash;
3use codec::{Decode, Encode};
4use futures::StreamExt;
5use sc_client_api::backend::AuxStore;
6use sc_client_api::{BlockBackend, BlockchainEvents, HeaderBackend, StorageProvider};
7use sc_service::SpawnTaskHandle;
8use sp_runtime::generic::SignedBlock;
9use sp_runtime::traits::{Block as BlockT, Header, SaturatedConversion};
10use std::marker::PhantomData;
11use std::sync::Arc;
12use subcoin_primitives::{BitcoinTransactionAdapter, TransactionIndex, TxPosition};
13
14const TX_INDEX_GAP_KEY: &[u8] = b"tx_index_gap";
15
16const INDEXED_BLOCK_RANGE_KEY: &[u8] = b"tx_indexed_block_range";
17
18type IndexRange = std::ops::Range<u32>;
19
20type IndexedBlockRange = IndexRange;
24
25#[derive(Debug, Clone, Copy)]
27enum IndexAction {
28 Apply,
29 Revert,
30}
31
32#[derive(Debug, thiserror::Error)]
34pub enum IndexerError {
35 #[error("Block not found: {0}")]
36 BlockNotFound(String),
37
38 #[error("Inconsistent block range. Indexed: {indexed:?}, Processed: {processed}")]
39 InconsistentBlockRange {
40 indexed: Option<IndexedBlockRange>,
41 processed: u32,
42 },
43
44 #[error("Failed to decode data: {0}")]
45 DecodeError(#[from] codec::Error),
46
47 #[error(transparent)]
48 Blockchain(#[from] sp_blockchain::Error),
49}
50
51pub type Result<T> = std::result::Result<T, IndexerError>;
53
54#[derive(Debug)]
59pub struct TransactionIndexer<Block, Backend, Client, TransactionAdapter> {
60 network: bitcoin::Network,
61 client: Arc<Client>,
62 _phantom: PhantomData<(Block, Backend, TransactionAdapter)>,
63}
64
65impl<Block, Backend, Client, TransactionAdapter> Clone
66 for TransactionIndexer<Block, Backend, Client, TransactionAdapter>
67{
68 fn clone(&self) -> Self {
69 Self {
70 network: self.network,
71 client: self.client.clone(),
72 _phantom: self._phantom,
73 }
74 }
75}
76
77impl<Block, Backend, Client, TransactionAdapter>
78 TransactionIndexer<Block, Backend, Client, TransactionAdapter>
79where
80 Block: BlockT,
81 Backend: sc_client_api::backend::Backend<Block>,
82 Client: BlockchainEvents<Block>
83 + HeaderBackend<Block>
84 + BlockBackend<Block>
85 + StorageProvider<Block, Backend>
86 + AuxStore
87 + 'static,
88 TransactionAdapter: BitcoinTransactionAdapter<Block>,
89{
90 pub fn new(
92 network: bitcoin::Network,
93 client: Arc<Client>,
94 spawn_handle: SpawnTaskHandle,
95 ) -> Result<Self> {
96 if let Some(gap) = Self::detect_index_gap(&client)? {
97 let client = client.clone();
98 spawn_handle.spawn_blocking("tx-historical-index", None, async move {
99 if let Err(err) = index_historical_blocks::<_, TransactionAdapter, _>(client, gap) {
100 tracing::error!(?err, "Failed to index historical blocks");
101 }
102 });
103 }
104
105 Ok(Self {
106 network,
107 client,
108 _phantom: Default::default(),
109 })
110 }
111
112 fn detect_index_gap(client: &Client) -> Result<Option<IndexRange>> {
114 let gap = if let Some(gap) = load_index_gap(client)? {
115 Some(gap)
116 } else if let Some(ref block_range) = load_indexed_block_range(client)? {
117 let best_number: u32 = client.info().best_number.saturated_into();
118 let last_indexed_block = block_range.end.saturating_sub(1);
119
120 if last_indexed_block < best_number {
121 let new_gap = last_indexed_block + 1..best_number + 1;
122 tracing::debug!(
123 last_indexed = last_indexed_block,
124 best_number = best_number,
125 ?new_gap,
126 "Detected transaction indexing gap"
127 );
128 Some(new_gap)
129 } else {
130 None
131 }
132 } else {
133 None
134 };
135
136 Ok(gap)
137 }
138
139 pub async fn run(self) {
140 let mut block_import_stream = self.client.every_import_notification_stream();
141
142 while let Some(notification) = block_import_stream.next().await {
143 let Ok(Some(SignedBlock {
144 block,
145 justifications: _,
146 })) = self.client.block(notification.hash)
147 else {
148 tracing::error!("Imported block {} unavailable", notification.hash);
149 continue;
150 };
151
152 let res = if let Some(route) = notification.tree_route {
153 self.handle_reorg(route)
154 } else {
155 self.handle_new_block(block)
156 };
157
158 if let Err(err) = res {
159 panic!("Failed to process block#{}: {err:?}", notification.hash);
160 }
161 }
162 }
163
164 fn handle_reorg(&self, route: Arc<sp_blockchain::TreeRoute<Block>>) -> Result<()> {
166 for hash_and_number in route.retracted() {
167 let block = self.get_block(hash_and_number.hash)?;
168 process_block::<_, TransactionAdapter, _>(&*self.client, block, IndexAction::Revert);
169 }
170
171 for hash_and_number in route.enacted() {
172 let block = self.get_block(hash_and_number.hash)?;
173 process_block::<_, TransactionAdapter, _>(&*self.client, block, IndexAction::Apply);
174 }
175
176 Ok(())
177 }
178
179 fn handle_new_block(&self, block: Block) -> Result<()> {
181 let block_number: u32 = (*block.header().number()).saturated_into();
182
183 process_block::<_, TransactionAdapter, _>(&*self.client, block, IndexAction::Apply);
184
185 let mut indexed_block_range = load_indexed_block_range(&*self.client)?;
186
187 match indexed_block_range.as_mut() {
188 Some(current_range) => {
189 if current_range.end == block_number {
190 current_range.end += 1;
191 write_tx_indexed_range(&*self.client, current_range.encode())?;
192 } else {
193 return Err(IndexerError::InconsistentBlockRange {
194 indexed: indexed_block_range,
195 processed: block_number,
196 });
197 }
198 }
199 None => {
200 let new_range = block_number..block_number + 1;
201 write_tx_indexed_range(&*self.client, new_range.encode())?;
202 }
203 }
204
205 Ok(())
206 }
207
208 fn get_block(&self, block_hash: Block::Hash) -> Result<Block> {
209 self.client
210 .block(block_hash)?
211 .ok_or_else(|| IndexerError::BlockNotFound(format!("{block_hash:?}")))
212 .map(|signed| signed.block)
213 }
214}
215
216fn index_historical_blocks<Block, TransactionAdapter, Client>(
217 client: Arc<Client>,
218 gap_range: IndexRange,
219) -> sp_blockchain::Result<()>
220where
221 Block: BlockT,
222 TransactionAdapter: BitcoinTransactionAdapter<Block>,
223 Client: BlockBackend<Block> + HeaderBackend<Block> + AuxStore,
224{
225 let mut remaining_gap = gap_range.clone();
226
227 tracing::debug!("Starting to index historical blocks in range {gap_range:?}");
228
229 for block_number in gap_range.clone() {
230 let block_hash = client.hash(block_number.into())?.ok_or_else(|| {
231 sp_blockchain::Error::Backend(format!("Hash for block#{block_number} not found"))
232 })?;
233 let block = client
234 .block(block_hash)?
235 .ok_or_else(|| {
236 sp_blockchain::Error::Backend(format!(
237 "Missing block#{block_number},{block_hash:?}"
238 ))
239 })?
240 .block;
241
242 process_block::<_, TransactionAdapter, _>(&*client, block, IndexAction::Apply);
243
244 remaining_gap.start += 1;
245 write_index_gap(&*client, remaining_gap.encode())?;
246 }
247
248 tracing::debug!("Finished indexing historical blocks. Final gap status: {remaining_gap:?}");
249
250 delete_index_gap(&*client)?;
251
252 match load_indexed_block_range(&*client)? {
254 Some(mut existing_range) => {
255 if gap_range.end > existing_range.end {
257 existing_range.end = gap_range.end;
258 write_tx_indexed_range(&*client, existing_range.encode())?;
259 }
260 }
261 None => {
262 tracing::debug!("No prior range exist; initialize new gap range: {gap_range:?}");
263 write_tx_indexed_range(&*client, gap_range.encode())?;
264 }
265 }
266
267 Ok(())
268}
269
270fn process_block<Block, TransactionAdapter, B>(backend: &B, block: Block, index_action: IndexAction)
271where
272 Block: BlockT,
273 TransactionAdapter: BitcoinTransactionAdapter<Block>,
274 B: AuxStore,
275{
276 let block_number: u32 = (*block.header().number()).saturated_into();
277 let bitcoin_block =
278 subcoin_primitives::convert_to_bitcoin_block::<Block, TransactionAdapter>(block)
279 .expect("Failed to convert Substrate block to Bitcoin block");
280 let changes = bitcoin_block
281 .txdata
282 .iter()
283 .enumerate()
284 .map(|(index, tx)| {
285 (
286 tx.compute_txid(),
287 TxPosition {
288 block_number,
289 index: index as u32,
290 },
291 )
292 })
293 .collect::<Vec<_>>();
294 if let Err(err) = write_transaction_index_changes(backend, index_action, changes) {
295 tracing::error!(?err, "Failed to write index changes");
296 }
297}
298
299fn load_decode<B, T>(backend: &B, key: &[u8]) -> sp_blockchain::Result<Option<T>>
300where
301 B: AuxStore,
302 T: Decode,
303{
304 match backend.get_aux(key)? {
305 Some(t) => T::decode(&mut &t[..]).map(Some).map_err(|e: codec::Error| {
306 sp_blockchain::Error::Backend(format!("Subcoin DB is corrupted. Decode error: {e}"))
307 }),
308 None => Ok(None),
309 }
310}
311
312fn load_indexed_block_range<B: AuxStore>(
313 backend: &B,
314) -> sp_blockchain::Result<Option<IndexedBlockRange>> {
315 load_decode(backend, INDEXED_BLOCK_RANGE_KEY)
316}
317
318fn write_tx_indexed_range<B: AuxStore>(
319 backend: &B,
320 encoded_indexed_block_range: Vec<u8>,
321) -> sp_blockchain::Result<()> {
322 backend.insert_aux(
323 &[(
324 INDEXED_BLOCK_RANGE_KEY,
325 encoded_indexed_block_range.as_slice(),
326 )],
327 &[],
328 )
329}
330
331fn load_index_gap<B: AuxStore>(backend: &B) -> sp_blockchain::Result<Option<IndexRange>> {
332 load_decode(backend, TX_INDEX_GAP_KEY)
333}
334
335fn write_index_gap<B: AuxStore>(backend: &B, encoded_gap: Vec<u8>) -> sp_blockchain::Result<()> {
336 backend.insert_aux(&[(TX_INDEX_GAP_KEY, encoded_gap.as_slice())], &[])
337}
338
339fn delete_index_gap<B: AuxStore>(backend: &B) -> sp_blockchain::Result<()> {
340 backend.insert_aux([], &[TX_INDEX_GAP_KEY])
341}
342
343fn write_transaction_index_changes<B: AuxStore>(
344 backend: &B,
345 index_action: IndexAction,
346 changes: Vec<(Txid, TxPosition)>,
347) -> sp_blockchain::Result<()> {
348 match index_action {
349 IndexAction::Apply => {
350 let key_values = changes
351 .iter()
352 .map(|(txid, tx_pos)| (txid_key(*txid), tx_pos.encode()))
353 .collect::<Vec<_>>();
354 backend.insert_aux(
355 key_values
356 .iter()
357 .map(|(k, v)| (k.as_slice(), v.as_slice()))
358 .collect::<Vec<_>>()
359 .iter(),
360 &[],
361 )
362 }
363 IndexAction::Revert => {
364 let keys = changes
365 .iter()
366 .map(|(txid, _tx_pos)| txid_key(*txid))
367 .collect::<Vec<_>>();
368 backend.insert_aux(
369 &[],
370 keys.iter().map(|k| k.as_slice()).collect::<Vec<_>>().iter(),
371 )
372 }
373 }
374}
375
376pub struct TransactionIndexProvider<Client> {
377 client: Arc<Client>,
378}
379
380impl<Client> TransactionIndexProvider<Client> {
381 pub fn new(client: Arc<Client>) -> Self {
382 Self { client }
383 }
384}
385
386impl<Client> TransactionIndex for TransactionIndexProvider<Client>
387where
388 Client: AuxStore,
389{
390 fn tx_index(&self, txid: Txid) -> sp_blockchain::Result<Option<TxPosition>> {
391 load_transaction_index(&*self.client, txid)
392 }
393}
394
395fn txid_key(txid: Txid) -> Vec<u8> {
396 (b"txid", txid.as_byte_array()).encode()
397}
398
399fn load_transaction_index<B: AuxStore>(
400 backend: &B,
401 txid: Txid,
402) -> sp_blockchain::Result<Option<TxPosition>> {
403 load_decode(backend, &txid_key(txid))
404}