1use crate::types::IndexerState;
4use bitcoin::hashes::{Hash, hash160};
5use bitcoin::key::CompressedPublicKey;
6use bitcoin::{Address, Network, OutPoint, PubkeyHash, ScriptBuf, Txid};
7use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePool, SqlitePoolOptions};
8use std::collections::{HashMap, HashSet};
9use std::path::Path;
10
11#[derive(Debug, thiserror::Error)]
13pub enum Error {
14 #[error("Database error: {0}")]
15 Database(#[from] sqlx::Error),
16
17 #[error("Invalid txid in database: {0}")]
18 InvalidTxid(String),
19
20 #[error("Block not found: {0}")]
21 BlockNotFound(u32),
22}
23
24pub type Result<T> = std::result::Result<T, Error>;
25
26fn script_to_address(script: &ScriptBuf, network: Network) -> Option<String> {
32 if let Ok(address) = Address::from_script(script.as_script(), network) {
34 return Some(address.to_string());
35 }
36
37 let bytes = script.as_bytes();
40
41 if bytes.len() == 35 && bytes[0] == 0x21 && bytes[34] == 0xac {
43 let pubkey_bytes = &bytes[1..34];
44 if let Ok(pubkey) = CompressedPublicKey::from_slice(pubkey_bytes) {
45 let pubkey_hash = PubkeyHash::from(pubkey);
46 let address = Address::p2pkh(pubkey_hash, network);
47 return Some(address.to_string());
48 }
49 }
50
51 if bytes.len() == 67 && bytes[0] == 0x41 && bytes[66] == 0xac {
53 let pubkey_bytes = &bytes[1..66];
54 let hash = hash160::Hash::hash(pubkey_bytes);
55 let pubkey_hash = PubkeyHash::from_raw_hash(hash);
56 let address = Address::p2pkh(pubkey_hash, network);
57 return Some(address.to_string());
58 }
59
60 None
61}
62
63#[derive(Clone)]
65pub struct IndexerDatabase {
66 pool: SqlitePool,
67 network: Network,
68}
69
70impl IndexerDatabase {
71 pub async fn open(path: &Path, network: Network) -> Result<Self> {
76 let network_dir = network.to_core_arg();
77 let db_path = path.join("indexer").join(network_dir).join("index.sqlite");
78
79 if let Some(parent) = db_path.parent() {
81 std::fs::create_dir_all(parent).ok();
82 }
83
84 let options = SqliteConnectOptions::new()
85 .filename(&db_path)
86 .create_if_missing(true)
87 .journal_mode(SqliteJournalMode::Wal)
88 .pragma("cache_size", "-64000") .pragma("synchronous", "NORMAL");
90
91 let pool = SqlitePoolOptions::new()
92 .max_connections(4)
93 .connect_with(options)
94 .await?;
95
96 let db = Self { pool, network };
97 db.init_schema().await?;
98
99 Ok(db)
100 }
101
102 async fn init_schema(&self) -> Result<()> {
104 sqlx::query(
105 r#"
106 -- Blocks table for reorg tracking
107 CREATE TABLE IF NOT EXISTS blocks (
108 height INTEGER PRIMARY KEY,
109 hash BLOB NOT NULL UNIQUE,
110 timestamp INTEGER NOT NULL
111 );
112
113 -- Transactions table
114 CREATE TABLE IF NOT EXISTS transactions (
115 txid BLOB PRIMARY KEY,
116 block_height INTEGER NOT NULL,
117 tx_index INTEGER NOT NULL
118 );
119 CREATE INDEX IF NOT EXISTS idx_tx_block ON transactions(block_height);
120
121 -- Outputs table (for UTXO tracking)
122 CREATE TABLE IF NOT EXISTS outputs (
123 txid BLOB NOT NULL,
124 vout INTEGER NOT NULL,
125 address TEXT,
126 value INTEGER NOT NULL,
127 script_pubkey BLOB NOT NULL,
128 block_height INTEGER NOT NULL,
129 spent_txid BLOB,
130 spent_vout INTEGER,
131 spent_block_height INTEGER,
132 PRIMARY KEY (txid, vout)
133 );
134 CREATE INDEX IF NOT EXISTS idx_outputs_address ON outputs(address) WHERE address IS NOT NULL;
135 CREATE INDEX IF NOT EXISTS idx_outputs_unspent ON outputs(address) WHERE address IS NOT NULL AND spent_txid IS NULL;
136 CREATE INDEX IF NOT EXISTS idx_outputs_block ON outputs(block_height);
137
138 -- Address history (denormalized for fast queries)
139 CREATE TABLE IF NOT EXISTS address_history (
140 address TEXT NOT NULL,
141 txid BLOB NOT NULL,
142 block_height INTEGER NOT NULL,
143 delta INTEGER NOT NULL,
144 PRIMARY KEY (address, block_height, txid)
145 );
146 CREATE INDEX IF NOT EXISTS idx_addr_hist ON address_history(address, block_height DESC);
147
148 -- Indexer state
149 CREATE TABLE IF NOT EXISTS state (
150 key TEXT PRIMARY KEY,
151 value TEXT NOT NULL
152 );
153 "#,
154 )
155 .execute(&self.pool)
156 .await?;
157
158 Ok(())
159 }
160
161 pub fn pool(&self) -> &SqlitePool {
163 &self.pool
164 }
165
166 pub fn network(&self) -> Network {
168 self.network
169 }
170
171 pub async fn load_state(&self) -> Result<Option<IndexerState>> {
175 let row: Option<(String,)> =
176 sqlx::query_as("SELECT value FROM state WHERE key = 'indexer_state'")
177 .fetch_optional(&self.pool)
178 .await?;
179
180 let Some((value,)) = row else {
181 return Ok(None);
182 };
183
184 if value.starts_with("historical:") {
186 let parts: Vec<&str> = value
187 .strip_prefix("historical:")
188 .unwrap()
189 .split(':')
190 .collect();
191 if parts.len() == 2 {
192 let target_height: u32 = parts[0].parse().unwrap_or(0);
193 let current_height: u32 = parts[1].parse().unwrap_or(0);
194 return Ok(Some(IndexerState::HistoricalIndexing {
195 target_height,
196 current_height,
197 }));
198 }
199 } else if value.starts_with("active:") {
200 let last_indexed: u32 = value.strip_prefix("active:").unwrap().parse().unwrap_or(0);
201 return Ok(Some(IndexerState::Active { last_indexed }));
202 }
203
204 Ok(None)
205 }
206
207 pub async fn save_state(&self, state: &IndexerState) -> Result<()> {
209 let value = match state {
210 IndexerState::HistoricalIndexing {
211 target_height,
212 current_height,
213 } => format!("historical:{target_height}:{current_height}"),
214 IndexerState::Active { last_indexed } => format!("active:{last_indexed}"),
215 };
216
217 sqlx::query("INSERT OR REPLACE INTO state (key, value) VALUES ('indexer_state', ?)")
218 .bind(&value)
219 .execute(&self.pool)
220 .await?;
221
222 Ok(())
223 }
224
225 pub async fn insert_block(&self, height: u32, hash: &[u8; 32], timestamp: u32) -> Result<()> {
229 sqlx::query("INSERT OR REPLACE INTO blocks (height, hash, timestamp) VALUES (?, ?, ?)")
230 .bind(height as i64)
231 .bind(hash.as_slice())
232 .bind(timestamp as i64)
233 .execute(&self.pool)
234 .await?;
235 Ok(())
236 }
237
238 pub async fn last_block_height(&self) -> Result<Option<u32>> {
240 let row: Option<(i64,)> = sqlx::query_as("SELECT MAX(height) FROM blocks")
241 .fetch_optional(&self.pool)
242 .await?;
243 Ok(row.and_then(|(h,)| if h >= 0 { Some(h as u32) } else { None }))
244 }
245
246 pub async fn insert_transaction(
250 &self,
251 txid: &Txid,
252 block_height: u32,
253 tx_index: u32,
254 ) -> Result<()> {
255 sqlx::query(
256 "INSERT OR REPLACE INTO transactions (txid, block_height, tx_index) VALUES (?, ?, ?)",
257 )
258 .bind(txid.as_byte_array().as_slice())
259 .bind(block_height as i64)
260 .bind(tx_index as i64)
261 .execute(&self.pool)
262 .await?;
263 Ok(())
264 }
265
266 pub async fn get_transaction(&self, txid: &Txid) -> Result<Option<(u32, u32)>> {
268 let row: Option<(i64, i64)> =
269 sqlx::query_as("SELECT block_height, tx_index FROM transactions WHERE txid = ?")
270 .bind(txid.as_byte_array().as_slice())
271 .fetch_optional(&self.pool)
272 .await?;
273 Ok(row.map(|(h, i)| (h as u32, i as u32)))
274 }
275
276 pub async fn insert_output(
280 &self,
281 txid: &Txid,
282 vout: u32,
283 script_pubkey: &ScriptBuf,
284 value: u64,
285 block_height: u32,
286 ) -> Result<()> {
287 let address = script_to_address(script_pubkey, self.network);
288
289 sqlx::query(
290 "INSERT OR REPLACE INTO outputs (txid, vout, address, value, script_pubkey, block_height) VALUES (?, ?, ?, ?, ?, ?)",
291 )
292 .bind(txid.as_byte_array().as_slice())
293 .bind(vout as i64)
294 .bind(&address)
295 .bind(value as i64)
296 .bind(script_pubkey.as_bytes())
297 .bind(block_height as i64)
298 .execute(&self.pool)
299 .await?;
300
301 Ok(())
302 }
303
304 pub async fn mark_output_spent(
306 &self,
307 outpoint: &OutPoint,
308 spent_txid: &Txid,
309 spent_vout: u32,
310 spent_block_height: u32,
311 ) -> Result<()> {
312 sqlx::query(
313 "UPDATE outputs SET spent_txid = ?, spent_vout = ?, spent_block_height = ? WHERE txid = ? AND vout = ?",
314 )
315 .bind(spent_txid.as_byte_array().as_slice())
316 .bind(spent_vout as i64)
317 .bind(spent_block_height as i64)
318 .bind(outpoint.txid.as_byte_array().as_slice())
319 .bind(outpoint.vout as i64)
320 .execute(&self.pool)
321 .await?;
322 Ok(())
323 }
324
325 pub async fn get_output_info(&self, outpoint: &OutPoint) -> Result<Option<(String, u64)>> {
327 let row: Option<(String, i64)> =
328 sqlx::query_as("SELECT address, value FROM outputs WHERE txid = ? AND vout = ? AND address IS NOT NULL")
329 .bind(outpoint.txid.as_byte_array().as_slice())
330 .bind(outpoint.vout as i64)
331 .fetch_optional(&self.pool)
332 .await?;
333 Ok(row.map(|(addr, val)| (addr, val as u64)))
334 }
335
336 pub async fn insert_address_history(
340 &self,
341 address: &str,
342 txid: &Txid,
343 block_height: u32,
344 delta: i64,
345 ) -> Result<()> {
346 sqlx::query(
349 r#"
350 INSERT INTO address_history (address, txid, block_height, delta)
351 VALUES (?, ?, ?, ?)
352 ON CONFLICT(address, block_height, txid) DO UPDATE SET
353 delta = address_history.delta + excluded.delta
354 "#,
355 )
356 .bind(address)
357 .bind(txid.as_byte_array().as_slice())
358 .bind(block_height as i64)
359 .bind(delta)
360 .execute(&self.pool)
361 .await?;
362 Ok(())
363 }
364
365 pub async fn revert_to_height(&self, height: u32) -> Result<()> {
369 let mut tx = self.pool.begin().await?;
370
371 sqlx::query(
373 "UPDATE outputs SET spent_txid = NULL, spent_vout = NULL, spent_block_height = NULL WHERE spent_block_height > ?",
374 )
375 .bind(height as i64)
376 .execute(&mut *tx)
377 .await?;
378
379 sqlx::query("DELETE FROM outputs WHERE block_height > ?")
381 .bind(height as i64)
382 .execute(&mut *tx)
383 .await?;
384
385 sqlx::query("DELETE FROM transactions WHERE block_height > ?")
387 .bind(height as i64)
388 .execute(&mut *tx)
389 .await?;
390
391 sqlx::query("DELETE FROM address_history WHERE block_height > ?")
393 .bind(height as i64)
394 .execute(&mut *tx)
395 .await?;
396
397 sqlx::query("DELETE FROM blocks WHERE height > ?")
399 .bind(height as i64)
400 .execute(&mut *tx)
401 .await?;
402
403 tx.commit().await?;
404 Ok(())
405 }
406
407 pub async fn begin_transaction(&self) -> Result<sqlx::Transaction<'_, sqlx::Sqlite>> {
409 Ok(self.pool.begin().await?)
410 }
411
412 pub fn parse_txid(bytes: &[u8]) -> Result<Txid> {
414 let arr: [u8; 32] = bytes
415 .try_into()
416 .map_err(|_| Error::InvalidTxid("Invalid length".to_string()))?;
417 Ok(Txid::from_byte_array(arr))
418 }
419
420 pub async fn index_block(
422 &self,
423 height: u32,
424 block: &bitcoin::Block,
425 network: Network,
426 ) -> Result<()> {
427 self.index_blocks_batch(&[(height, block.clone())], network)
428 .await
429 }
430
431 #[allow(clippy::type_complexity)]
439 pub async fn index_blocks_batch(
440 &self,
441 blocks: &[(u32, bitcoin::Block)],
442 network: Network,
443 ) -> Result<()> {
444 if blocks.is_empty() {
445 return Ok(());
446 }
447
448 let mut outpoints_to_fetch: HashSet<OutPoint> = HashSet::new();
450 for (_height, block) in blocks {
451 for btc_tx in &block.txdata {
452 for input in &btc_tx.input {
453 if !input.previous_output.is_null() {
454 outpoints_to_fetch.insert(input.previous_output);
455 }
456 }
457 }
458 }
459
460 let mut output_cache: HashMap<OutPoint, (String, u64)> = HashMap::new();
462 let outpoints_vec: Vec<_> = outpoints_to_fetch.into_iter().collect();
463
464 let mut tx = self.pool.begin().await?;
465
466 const FETCH_BATCH_SIZE: usize = 500;
467 for chunk in outpoints_vec.chunks(FETCH_BATCH_SIZE) {
468 if chunk.is_empty() {
469 continue;
470 }
471
472 let placeholders: Vec<String> = chunk
473 .iter()
474 .map(|_| "(txid = ? AND vout = ?)".to_string())
475 .collect();
476 let query = format!(
477 "SELECT txid, vout, address, value FROM outputs WHERE address IS NOT NULL AND ({})",
478 placeholders.join(" OR ")
479 );
480
481 let mut query_builder = sqlx::query_as::<_, (Vec<u8>, i64, String, i64)>(&query);
482 for outpoint in chunk {
483 query_builder = query_builder
484 .bind(outpoint.txid.as_byte_array().as_slice())
485 .bind(outpoint.vout as i64);
486 }
487
488 let rows = query_builder.fetch_all(&mut *tx).await?;
489
490 for (txid_bytes, vout, address, value) in rows {
491 let txid = Self::parse_txid(&txid_bytes)?;
492 let outpoint = OutPoint {
493 txid,
494 vout: vout as u32,
495 };
496 output_cache.insert(outpoint, (address, value as u64));
497 }
498 }
499
500 let mut blocks_data: Vec<(i64, Vec<u8>, i64)> = Vec::new();
504
505 let mut transactions_data: Vec<(Vec<u8>, i64, i64)> = Vec::new();
507
508 let mut outputs_data: Vec<(Vec<u8>, i64, Option<String>, i64, Vec<u8>, i64)> = Vec::new();
510
511 let mut spent_updates: Vec<(Vec<u8>, i64, i64, Vec<u8>, i64)> = Vec::new();
513
514 let mut address_history_map: HashMap<(String, Vec<u8>, i64), i64> = HashMap::new();
516
517 let mut batch_outputs: HashMap<OutPoint, (Option<String>, u64)> = HashMap::new();
520
521 for (height, block) in blocks {
522 let block_hash = block.block_hash();
523 let timestamp = block.header.time;
524
525 blocks_data.push((
526 *height as i64,
527 block_hash.as_byte_array().to_vec(),
528 timestamp as i64,
529 ));
530
531 for (tx_index, btc_tx) in block.txdata.iter().enumerate() {
532 let txid = btc_tx.compute_txid();
533 let txid_bytes = txid.as_byte_array().to_vec();
534
535 transactions_data.push((txid_bytes.clone(), *height as i64, tx_index as i64));
536
537 for (vin, input) in btc_tx.input.iter().enumerate() {
539 if input.previous_output.is_null() {
540 continue;
541 }
542
543 let output_info: Option<(String, u64)> = output_cache
545 .get(&input.previous_output)
546 .cloned()
547 .or_else(|| {
548 batch_outputs
549 .get(&input.previous_output)
550 .and_then(|(addr, val)| addr.clone().map(|a| (a, *val)))
551 });
552
553 if let Some((address, value)) = output_info {
554 spent_updates.push((
555 txid_bytes.clone(),
556 vin as i64,
557 *height as i64,
558 input.previous_output.txid.as_byte_array().to_vec(),
559 input.previous_output.vout as i64,
560 ));
561
562 let key = (address.clone(), txid_bytes.clone(), *height as i64);
564 *address_history_map.entry(key).or_insert(0) -= value as i64;
565 }
566 }
567
568 for (vout, output) in btc_tx.output.iter().enumerate() {
570 let script = &output.script_pubkey;
571 let value = output.value.to_sat();
572
573 let address = script_to_address(script, network);
574
575 outputs_data.push((
576 txid_bytes.clone(),
577 vout as i64,
578 address.clone(),
579 value as i64,
580 script.as_bytes().to_vec(),
581 *height as i64,
582 ));
583
584 let outpoint = OutPoint {
586 txid,
587 vout: vout as u32,
588 };
589 batch_outputs.insert(outpoint, (address.clone(), value));
590
591 if let Some(addr) = address {
593 let key = (addr, txid_bytes.clone(), *height as i64);
594 *address_history_map.entry(key).or_insert(0) += value as i64;
595 }
596 }
597 }
598 }
599
600 const INSERT_BATCH_SIZE: usize = 100;
604
605 for chunk in blocks_data.chunks(INSERT_BATCH_SIZE) {
606 let placeholders: Vec<&str> = chunk.iter().map(|_| "(?, ?, ?)").collect();
607 let query = format!(
608 "INSERT OR REPLACE INTO blocks (height, hash, timestamp) VALUES {}",
609 placeholders.join(", ")
610 );
611
612 let mut query_builder = sqlx::query(&query);
613 for (height, hash, timestamp) in chunk {
614 query_builder = query_builder.bind(height).bind(hash).bind(timestamp);
615 }
616 query_builder.execute(&mut *tx).await?;
617 }
618
619 for chunk in transactions_data.chunks(INSERT_BATCH_SIZE) {
621 let placeholders: Vec<&str> = chunk.iter().map(|_| "(?, ?, ?)").collect();
622 let query = format!(
623 "INSERT OR REPLACE INTO transactions (txid, block_height, tx_index) VALUES {}",
624 placeholders.join(", ")
625 );
626
627 let mut query_builder = sqlx::query(&query);
628 for (txid, height, idx) in chunk {
629 query_builder = query_builder.bind(txid).bind(height).bind(idx);
630 }
631 query_builder.execute(&mut *tx).await?;
632 }
633
634 for chunk in outputs_data.chunks(INSERT_BATCH_SIZE) {
636 let placeholders: Vec<&str> = chunk.iter().map(|_| "(?, ?, ?, ?, ?, ?)").collect();
637 let query = format!(
638 "INSERT OR REPLACE INTO outputs (txid, vout, address, value, script_pubkey, block_height) VALUES {}",
639 placeholders.join(", ")
640 );
641
642 let mut query_builder = sqlx::query(&query);
643 for (txid, vout, address, value, script, height) in chunk {
644 query_builder = query_builder
645 .bind(txid)
646 .bind(vout)
647 .bind(address)
648 .bind(value)
649 .bind(script)
650 .bind(height);
651 }
652 query_builder.execute(&mut *tx).await?;
653 }
654
655 for (spent_txid, spent_vout, spent_height, orig_txid, orig_vout) in &spent_updates {
659 sqlx::query(
660 "UPDATE outputs SET spent_txid = ?, spent_vout = ?, spent_block_height = ? WHERE txid = ? AND vout = ?",
661 )
662 .bind(spent_txid)
663 .bind(spent_vout)
664 .bind(spent_height)
665 .bind(orig_txid)
666 .bind(orig_vout)
667 .execute(&mut *tx)
668 .await?;
669 }
670
671 let history_entries: Vec<_> = address_history_map.into_iter().collect();
673 for chunk in history_entries.chunks(INSERT_BATCH_SIZE) {
674 let placeholders: Vec<&str> = chunk.iter().map(|_| "(?, ?, ?, ?)").collect();
675 let query = format!(
676 "INSERT OR REPLACE INTO address_history (address, txid, block_height, delta) VALUES {}",
677 placeholders.join(", ")
678 );
679
680 let mut query_builder = sqlx::query(&query);
681 for ((address, txid, height), delta) in chunk {
682 query_builder = query_builder
683 .bind(address)
684 .bind(txid)
685 .bind(height)
686 .bind(delta);
687 }
688 query_builder.execute(&mut *tx).await?;
689 }
690
691 tx.commit().await?;
692 Ok(())
693 }
694}