1use crate::db::{IndexerDatabase, Result};
4use crate::queries::IndexerQuery;
5use crate::types::IndexerState;
6use bitcoin::{Block as BitcoinBlock, Network};
7use futures::StreamExt;
8use rayon::prelude::*;
9use sc_client_api::{BlockBackend, BlockchainEvents, HeaderBackend};
10use sp_runtime::traits::{Block as BlockT, Header, SaturatedConversion};
11use std::marker::PhantomData;
12use std::path::Path;
13use std::sync::Arc;
14use subcoin_primitives::BitcoinTransactionAdapter;
15
16const PROGRESS_SAVE_INTERVAL: u32 = 1000;
18
19const BATCH_SIZE: u32 = 100;
21
22pub struct Indexer<Block, Client, TransactionAdapter> {
24 db: IndexerDatabase,
25 client: Arc<Client>,
26 network: Network,
27 _phantom: PhantomData<(Block, TransactionAdapter)>,
28}
29
30impl<Block, Client, TransactionAdapter> Indexer<Block, Client, TransactionAdapter>
31where
32 Block: BlockT,
33 Client: BlockchainEvents<Block>
34 + HeaderBackend<Block>
35 + BlockBackend<Block>
36 + Send
37 + Sync
38 + 'static,
39 TransactionAdapter: BitcoinTransactionAdapter<Block> + Send + Sync,
40{
41 pub async fn new(db_path: &Path, network: Network, client: Arc<Client>) -> Result<Self> {
46 let db = IndexerDatabase::open(db_path, network).await?;
47
48 let indexer = Self {
49 db,
50 client,
51 network,
52 _phantom: PhantomData,
53 };
54
55 Ok(indexer)
56 }
57
58 pub fn query(&self) -> IndexerQuery {
60 IndexerQuery::new(self.db.clone())
61 }
62
63 async fn handle_index_gap(&self) -> Result<()> {
65 let best_number: u32 = self.client.info().best_number.saturated_into();
66
67 match self.db.load_state().await? {
68 Some(IndexerState::HistoricalIndexing {
69 target_height,
70 current_height,
71 }) => {
72 tracing::info!(
74 current_height,
75 target_height,
76 "Resuming interrupted transaction indexing"
77 );
78 self.index_block_range(current_height, target_height)
79 .await?;
80 }
81 Some(IndexerState::Active { last_indexed }) => {
82 if last_indexed < best_number {
84 tracing::info!(
85 last_indexed,
86 best_number,
87 "Transaction index behind, catching up"
88 );
89 self.index_block_range(last_indexed + 1, best_number + 1)
90 .await?;
91 }
92 }
93 None => {
94 if best_number > 0 {
96 tracing::info!(
97 best_number,
98 "First run with indexer, indexing all {} existing blocks",
99 best_number + 1
100 );
101
102 self.db
104 .save_state(&IndexerState::HistoricalIndexing {
105 target_height: best_number + 1,
106 current_height: 0,
107 })
108 .await?;
109
110 self.index_block_range(0, best_number + 1).await?;
111 } else {
112 self.db
114 .save_state(&IndexerState::Active { last_indexed: 0 })
115 .await?;
116 }
117 }
118 }
119
120 Ok(())
121 }
122
123 async fn index_block_range(&self, start: u32, end: u32) -> Result<()> {
125 let total_blocks = end.saturating_sub(start);
126 let start_time = std::time::Instant::now();
127 let mut processed = 0u32;
128
129 tracing::info!(
130 start,
131 end,
132 total = total_blocks,
133 batch_size = BATCH_SIZE,
134 "Starting historical indexing with batched inserts"
135 );
136
137 let mut height = start;
138 while height < end {
139 let batch_end = (height + BATCH_SIZE).min(end);
140
141 self.index_block_batch(height, batch_end).await?;
143
144 let batch_processed = batch_end - height;
145 processed += batch_processed;
146 height = batch_end;
147
148 let is_last = height >= end;
150 if processed % PROGRESS_SAVE_INTERVAL == 0 || is_last || processed == batch_processed {
151 if !is_last {
152 self.db
153 .save_state(&IndexerState::HistoricalIndexing {
154 target_height: end,
155 current_height: height,
156 })
157 .await?;
158 }
159
160 let elapsed = start_time.elapsed().as_secs_f64();
162 let blocks_per_sec = if elapsed > 0.0 {
163 processed as f64 / elapsed
164 } else {
165 0.0
166 };
167 let remaining = total_blocks.saturating_sub(processed);
168 let eta_secs = if blocks_per_sec > 0.0 {
169 (remaining as f64 / blocks_per_sec) as u64
170 } else {
171 0
172 };
173
174 tracing::info!(
175 processed,
176 total = total_blocks,
177 percent = %format!("{:.1}%", (processed as f64 / total_blocks as f64) * 100.0),
178 blocks_per_sec = %format!("{:.0}", blocks_per_sec),
179 eta_secs,
180 "Indexing progress"
181 );
182 }
183 }
184
185 let last_indexed = end.saturating_sub(1);
187 self.db
188 .save_state(&IndexerState::Active { last_indexed })
189 .await?;
190
191 tracing::info!(
192 blocks = total_blocks,
193 duration_secs = start_time.elapsed().as_secs(),
194 "Historical indexing complete"
195 );
196
197 Ok(())
198 }
199
200 async fn index_block_batch(&self, start: u32, end: u32) -> Result<()> {
202 let heights: Vec<u32> = (start..end).collect();
204 let blocks_result: std::result::Result<Vec<_>, _> = heights
205 .par_iter()
206 .map(|&height| {
207 self.get_bitcoin_block_at_height(height)
208 .map(|block| (height, block))
209 })
210 .collect();
211
212 let mut blocks_data = blocks_result?;
213
214 blocks_data.sort_by_key(|(height, _)| *height);
216
217 self.db
219 .index_blocks_batch(&blocks_data, self.network)
220 .await?;
221
222 Ok(())
223 }
224
225 async fn index_block(&self, block: &BitcoinBlock, height: u32) -> Result<()> {
228 self.db.index_block(height, block, self.network).await
229 }
230
231 fn get_bitcoin_block_at_height(&self, height: u32) -> Result<BitcoinBlock> {
233 let block_hash = self
234 .client
235 .hash(height.into())
236 .map_err(|e| crate::db::Error::Database(sqlx::Error::Protocol(e.to_string())))?
237 .ok_or(crate::db::Error::BlockNotFound(height))?;
238
239 let signed_block = self
240 .client
241 .block(block_hash)
242 .map_err(|e| crate::db::Error::Database(sqlx::Error::Protocol(e.to_string())))?
243 .ok_or(crate::db::Error::BlockNotFound(height))?;
244
245 let bitcoin_block =
246 subcoin_primitives::convert_to_bitcoin_block::<Block, TransactionAdapter>(
247 signed_block.block,
248 )
249 .map_err(|e| crate::db::Error::Database(sqlx::Error::Protocol(format!("{e:?}"))))?;
250
251 Ok(bitcoin_block)
252 }
253
254 pub fn run_on_dedicated_thread(self)
259 where
260 Block: 'static,
261 Client: 'static,
262 TransactionAdapter: 'static,
263 {
264 std::thread::Builder::new()
265 .name("indexer".into())
266 .spawn(move || {
267 let rt = tokio::runtime::Builder::new_current_thread()
268 .enable_all()
269 .build()
270 .expect("Failed to create indexer runtime");
271
272 rt.block_on(self.run());
273 })
274 .expect("Failed to spawn indexer thread");
275 }
276
277 async fn run(self) {
282 loop {
284 let best_before: u32 = self.client.info().best_number.saturated_into();
285
286 if let Err(e) = self.handle_index_gap().await {
288 tracing::error!(?e, "Failed to handle indexing gap, indexer will not run");
289 return;
290 }
291
292 let best_after: u32 = self.client.info().best_number.saturated_into();
294 if best_after == best_before {
295 break;
297 }
298
299 tracing::debug!(
300 best_before,
301 best_after,
302 "Chain progressed during sync, catching up new blocks"
303 );
304 }
305
306 let mut last_indexed = match self.db.load_state().await {
308 Ok(Some(crate::types::IndexerState::Active { last_indexed })) => last_indexed,
309 _ => 0,
310 };
311
312 tracing::info!(
313 last_indexed,
314 "Historical sync complete, switching to live mode"
315 );
316
317 let mut block_import_stream = self.client.every_import_notification_stream();
319
320 while let Some(notification) = block_import_stream.next().await {
321 let block_number: u32 = (*notification.header.number()).saturated_into();
322
323 if let Some(route) = ¬ification.tree_route {
325 if !route.retracted().is_empty() {
327 let revert_to = route
328 .retracted()
329 .first()
330 .map(|b| {
331 let n: u32 = b.number.saturated_into();
332 n.saturating_sub(1)
333 })
334 .unwrap_or(0);
335
336 tracing::info!(
337 revert_to,
338 retracted = route.retracted().len(),
339 "Handling reorg, reverting to height {revert_to}"
340 );
341
342 if let Err(e) = self.db.revert_to_height(revert_to).await {
343 tracing::error!(?e, "Failed to revert blocks during reorg");
344 continue;
345 }
346 last_indexed = revert_to;
347 }
348
349 for hash_and_number in route.enacted() {
351 let height: u32 = hash_and_number.number.saturated_into();
352 match self.get_bitcoin_block_at_height(height) {
353 Ok(block) => {
354 if let Err(e) = self.index_block(&block, height).await {
355 tracing::error!(?e, height, "Failed to index enacted block");
356 } else {
357 last_indexed = height;
358 }
359 }
360 Err(e) => {
361 tracing::error!(?e, height, "Failed to get enacted block");
362 }
363 }
364 }
365 } else {
366 match self.get_bitcoin_block_at_height(block_number) {
368 Ok(block) => {
369 if let Err(e) = self.index_block(&block, block_number).await {
370 tracing::error!(?e, block_number, "Failed to index block");
371 continue;
372 }
373 last_indexed = block_number;
374 }
375 Err(e) => {
376 tracing::error!(?e, block_number, "Failed to get block for indexing");
377 continue;
378 }
379 }
380 }
381
382 if let Err(e) = self
384 .db
385 .save_state(&IndexerState::Active { last_indexed })
386 .await
387 {
388 tracing::error!(?e, "Failed to save indexer state");
389 }
390 }
391 }
392}