subcoin_indexer/
indexer.rs

1//! Block indexer implementation.
2
3use crate::db::{IndexerDatabase, Result};
4use crate::queries::IndexerQuery;
5use crate::types::IndexerState;
6use bitcoin::{Block as BitcoinBlock, Network};
7use futures::StreamExt;
8use rayon::prelude::*;
9use sc_client_api::{BlockBackend, BlockchainEvents, HeaderBackend};
10use sp_runtime::traits::{Block as BlockT, Header, SaturatedConversion};
11use std::marker::PhantomData;
12use std::path::Path;
13use std::sync::Arc;
14use subcoin_primitives::BitcoinTransactionAdapter;
15
16/// How often to save progress during historical indexing.
17const PROGRESS_SAVE_INTERVAL: u32 = 1000;
18
19/// Number of blocks to batch together during historical indexing.
20const BATCH_SIZE: u32 = 100;
21
22/// The main blockchain indexer.
23pub struct Indexer<Block, Client, TransactionAdapter> {
24    db: IndexerDatabase,
25    client: Arc<Client>,
26    network: Network,
27    _phantom: PhantomData<(Block, TransactionAdapter)>,
28}
29
30impl<Block, Client, TransactionAdapter> Indexer<Block, Client, TransactionAdapter>
31where
32    Block: BlockT,
33    Client: BlockchainEvents<Block>
34        + HeaderBackend<Block>
35        + BlockBackend<Block>
36        + Send
37        + Sync
38        + 'static,
39    TransactionAdapter: BitcoinTransactionAdapter<Block> + Send + Sync,
40{
41    /// Create a new indexer.
42    ///
43    /// This only initializes the database connection. Historical indexing is deferred
44    /// to `run()` to avoid blocking node startup (including the RPC server).
45    pub async fn new(db_path: &Path, network: Network, client: Arc<Client>) -> Result<Self> {
46        let db = IndexerDatabase::open(db_path, network).await?;
47
48        let indexer = Self {
49            db,
50            client,
51            network,
52            _phantom: PhantomData,
53        };
54
55        Ok(indexer)
56    }
57
58    /// Get a query interface for this indexer.
59    pub fn query(&self) -> IndexerQuery {
60        IndexerQuery::new(self.db.clone())
61    }
62
63    /// Check for indexing gaps and fill them.
64    async fn handle_index_gap(&self) -> Result<()> {
65        let best_number: u32 = self.client.info().best_number.saturated_into();
66
67        match self.db.load_state().await? {
68            Some(IndexerState::HistoricalIndexing {
69                target_height,
70                current_height,
71            }) => {
72                // Resume interrupted historical indexing
73                tracing::info!(
74                    current_height,
75                    target_height,
76                    "Resuming interrupted transaction indexing"
77                );
78                self.index_block_range(current_height, target_height)
79                    .await?;
80            }
81            Some(IndexerState::Active { last_indexed }) => {
82                // Check if we fell behind
83                if last_indexed < best_number {
84                    tracing::info!(
85                        last_indexed,
86                        best_number,
87                        "Transaction index behind, catching up"
88                    );
89                    self.index_block_range(last_indexed + 1, best_number + 1)
90                        .await?;
91                }
92            }
93            None => {
94                // Fresh start - index all existing blocks
95                if best_number > 0 {
96                    tracing::info!(
97                        best_number,
98                        "First run with indexer, indexing all {} existing blocks",
99                        best_number + 1
100                    );
101
102                    // Save state before starting
103                    self.db
104                        .save_state(&IndexerState::HistoricalIndexing {
105                            target_height: best_number + 1,
106                            current_height: 0,
107                        })
108                        .await?;
109
110                    self.index_block_range(0, best_number + 1).await?;
111                } else {
112                    // No blocks yet
113                    self.db
114                        .save_state(&IndexerState::Active { last_indexed: 0 })
115                        .await?;
116                }
117            }
118        }
119
120        Ok(())
121    }
122
123    /// Index a range of blocks.
124    async fn index_block_range(&self, start: u32, end: u32) -> Result<()> {
125        let total_blocks = end.saturating_sub(start);
126        let start_time = std::time::Instant::now();
127        let mut processed = 0u32;
128
129        tracing::info!(
130            start,
131            end,
132            total = total_blocks,
133            batch_size = BATCH_SIZE,
134            "Starting historical indexing with batched inserts"
135        );
136
137        let mut height = start;
138        while height < end {
139            let batch_end = (height + BATCH_SIZE).min(end);
140
141            // Process a batch of blocks in a single transaction
142            self.index_block_batch(height, batch_end).await?;
143
144            let batch_processed = batch_end - height;
145            processed += batch_processed;
146            height = batch_end;
147
148            // Save progress periodically
149            let is_last = height >= end;
150            if processed % PROGRESS_SAVE_INTERVAL == 0 || is_last || processed == batch_processed {
151                if !is_last {
152                    self.db
153                        .save_state(&IndexerState::HistoricalIndexing {
154                            target_height: end,
155                            current_height: height,
156                        })
157                        .await?;
158                }
159
160                // Log progress
161                let elapsed = start_time.elapsed().as_secs_f64();
162                let blocks_per_sec = if elapsed > 0.0 {
163                    processed as f64 / elapsed
164                } else {
165                    0.0
166                };
167                let remaining = total_blocks.saturating_sub(processed);
168                let eta_secs = if blocks_per_sec > 0.0 {
169                    (remaining as f64 / blocks_per_sec) as u64
170                } else {
171                    0
172                };
173
174                tracing::info!(
175                    processed,
176                    total = total_blocks,
177                    percent = %format!("{:.1}%", (processed as f64 / total_blocks as f64) * 100.0),
178                    blocks_per_sec = %format!("{:.0}", blocks_per_sec),
179                    eta_secs,
180                    "Indexing progress"
181                );
182            }
183        }
184
185        // Transition to active state
186        let last_indexed = end.saturating_sub(1);
187        self.db
188            .save_state(&IndexerState::Active { last_indexed })
189            .await?;
190
191        tracing::info!(
192            blocks = total_blocks,
193            duration_secs = start_time.elapsed().as_secs(),
194            "Historical indexing complete"
195        );
196
197        Ok(())
198    }
199
200    /// Index a batch of blocks in a single database transaction.
201    async fn index_block_batch(&self, start: u32, end: u32) -> Result<()> {
202        // Read blocks in parallel using rayon
203        let heights: Vec<u32> = (start..end).collect();
204        let blocks_result: std::result::Result<Vec<_>, _> = heights
205            .par_iter()
206            .map(|&height| {
207                self.get_bitcoin_block_at_height(height)
208                    .map(|block| (height, block))
209            })
210            .collect();
211
212        let mut blocks_data = blocks_result?;
213
214        // Sort by height to ensure correct order for DB insertion
215        blocks_data.sort_by_key(|(height, _)| *height);
216
217        // Now index all blocks in a single transaction
218        self.db
219            .index_blocks_batch(&blocks_data, self.network)
220            .await?;
221
222        Ok(())
223    }
224
225    /// Index a single block (used during live sync).
226    /// Wraps all inserts in a single database transaction for atomicity.
227    async fn index_block(&self, block: &BitcoinBlock, height: u32) -> Result<()> {
228        self.db.index_block(height, block, self.network).await
229    }
230
231    /// Get a Bitcoin block at the given height.
232    fn get_bitcoin_block_at_height(&self, height: u32) -> Result<BitcoinBlock> {
233        let block_hash = self
234            .client
235            .hash(height.into())
236            .map_err(|e| crate::db::Error::Database(sqlx::Error::Protocol(e.to_string())))?
237            .ok_or(crate::db::Error::BlockNotFound(height))?;
238
239        let signed_block = self
240            .client
241            .block(block_hash)
242            .map_err(|e| crate::db::Error::Database(sqlx::Error::Protocol(e.to_string())))?
243            .ok_or(crate::db::Error::BlockNotFound(height))?;
244
245        let bitcoin_block =
246            subcoin_primitives::convert_to_bitcoin_block::<Block, TransactionAdapter>(
247                signed_block.block,
248            )
249            .map_err(|e| crate::db::Error::Database(sqlx::Error::Protocol(format!("{e:?}"))))?;
250
251        Ok(bitcoin_block)
252    }
253
254    /// Run the indexer on a dedicated thread with its own tokio runtime.
255    ///
256    /// This isolates the indexer's I/O from the main async runtime, preventing
257    /// SQLite operations from blocking other tasks.
258    pub fn run_on_dedicated_thread(self)
259    where
260        Block: 'static,
261        Client: 'static,
262        TransactionAdapter: 'static,
263    {
264        std::thread::Builder::new()
265            .name("indexer".into())
266            .spawn(move || {
267                let rt = tokio::runtime::Builder::new_current_thread()
268                    .enable_all()
269                    .build()
270                    .expect("Failed to create indexer runtime");
271
272                rt.block_on(self.run());
273            })
274            .expect("Failed to spawn indexer thread");
275    }
276
277    /// Run the indexer, processing new blocks as they arrive.
278    ///
279    /// This first catches up any missing blocks (historical indexing), then
280    /// processes new block notifications for live indexing.
281    async fn run(self) {
282        // Loop until fully caught up - this avoids buffering notifications during sync
283        loop {
284            let best_before: u32 = self.client.info().best_number.saturated_into();
285
286            // Catch up any indexing gap
287            if let Err(e) = self.handle_index_gap().await {
288                tracing::error!(?e, "Failed to handle indexing gap, indexer will not run");
289                return;
290            }
291
292            // Check if chain progressed during sync
293            let best_after: u32 = self.client.info().best_number.saturated_into();
294            if best_after == best_before {
295                // No new blocks arrived during sync - we're fully caught up
296                break;
297            }
298
299            tracing::debug!(
300                best_before,
301                best_after,
302                "Chain progressed during sync, catching up new blocks"
303            );
304        }
305
306        // Get the height we've indexed up to
307        let mut last_indexed = match self.db.load_state().await {
308            Ok(Some(crate::types::IndexerState::Active { last_indexed })) => last_indexed,
309            _ => 0,
310        };
311
312        tracing::info!(
313            last_indexed,
314            "Historical sync complete, switching to live mode"
315        );
316
317        // Now subscribe to stream - we're fully caught up, no buffered notifications
318        let mut block_import_stream = self.client.every_import_notification_stream();
319
320        while let Some(notification) = block_import_stream.next().await {
321            let block_number: u32 = (*notification.header.number()).saturated_into();
322
323            // Handle reorgs
324            if let Some(route) = &notification.tree_route {
325                // Revert retracted blocks
326                if !route.retracted().is_empty() {
327                    let revert_to = route
328                        .retracted()
329                        .first()
330                        .map(|b| {
331                            let n: u32 = b.number.saturated_into();
332                            n.saturating_sub(1)
333                        })
334                        .unwrap_or(0);
335
336                    tracing::info!(
337                        revert_to,
338                        retracted = route.retracted().len(),
339                        "Handling reorg, reverting to height {revert_to}"
340                    );
341
342                    if let Err(e) = self.db.revert_to_height(revert_to).await {
343                        tracing::error!(?e, "Failed to revert blocks during reorg");
344                        continue;
345                    }
346                    last_indexed = revert_to;
347                }
348
349                // Index enacted blocks
350                for hash_and_number in route.enacted() {
351                    let height: u32 = hash_and_number.number.saturated_into();
352                    match self.get_bitcoin_block_at_height(height) {
353                        Ok(block) => {
354                            if let Err(e) = self.index_block(&block, height).await {
355                                tracing::error!(?e, height, "Failed to index enacted block");
356                            } else {
357                                last_indexed = height;
358                            }
359                        }
360                        Err(e) => {
361                            tracing::error!(?e, height, "Failed to get enacted block");
362                        }
363                    }
364                }
365            } else {
366                // Normal block import (no reorg)
367                match self.get_bitcoin_block_at_height(block_number) {
368                    Ok(block) => {
369                        if let Err(e) = self.index_block(&block, block_number).await {
370                            tracing::error!(?e, block_number, "Failed to index block");
371                            continue;
372                        }
373                        last_indexed = block_number;
374                    }
375                    Err(e) => {
376                        tracing::error!(?e, block_number, "Failed to get block for indexing");
377                        continue;
378                    }
379                }
380            }
381
382            // Update state
383            if let Err(e) = self
384                .db
385                .save_state(&IndexerState::Active { last_indexed })
386                .await
387            {
388                tracing::error!(?e, "Failed to save indexer state");
389            }
390        }
391    }
392}