subcoin_indexer/
db.rs

1//! SQLite database management for the indexer.
2
3use crate::types::IndexerState;
4use bitcoin::hashes::{Hash, hash160};
5use bitcoin::key::CompressedPublicKey;
6use bitcoin::{Address, Network, OutPoint, PubkeyHash, ScriptBuf, Txid};
7use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePool, SqlitePoolOptions};
8use std::collections::{HashMap, HashSet};
9use std::path::Path;
10
11/// Indexer error type.
12#[derive(Debug, thiserror::Error)]
13pub enum Error {
14    #[error("Database error: {0}")]
15    Database(#[from] sqlx::Error),
16
17    #[error("Invalid txid in database: {0}")]
18    InvalidTxid(String),
19
20    #[error("Block not found: {0}")]
21    BlockNotFound(u32),
22}
23
24pub type Result<T> = std::result::Result<T, Error>;
25
26/// Extract an address from a script_pubkey, including P2PK scripts.
27///
28/// This handles standard address types (P2PKH, P2SH, P2WPKH, P2WSH, P2TR) via
29/// `Address::from_script`, plus P2PK (Pay-to-Public-Key) scripts which are
30/// converted to their equivalent P2PKH addresses.
31fn script_to_address(script: &ScriptBuf, network: Network) -> Option<String> {
32    // Try standard address types first
33    if let Ok(address) = Address::from_script(script.as_script(), network) {
34        return Some(address.to_string());
35    }
36
37    // Handle P2PK (Pay-to-Public-Key) scripts
38    // P2PK format: <pubkey_len> <pubkey> OP_CHECKSIG (0xac)
39    let bytes = script.as_bytes();
40
41    // Compressed P2PK: 0x21 (33) + 33-byte pubkey + 0xac (OP_CHECKSIG) = 35 bytes
42    if bytes.len() == 35 && bytes[0] == 0x21 && bytes[34] == 0xac {
43        let pubkey_bytes = &bytes[1..34];
44        if let Ok(pubkey) = CompressedPublicKey::from_slice(pubkey_bytes) {
45            let pubkey_hash = PubkeyHash::from(pubkey);
46            let address = Address::p2pkh(pubkey_hash, network);
47            return Some(address.to_string());
48        }
49    }
50
51    // Uncompressed P2PK: 0x41 (65) + 65-byte pubkey + 0xac (OP_CHECKSIG) = 67 bytes
52    if bytes.len() == 67 && bytes[0] == 0x41 && bytes[66] == 0xac {
53        let pubkey_bytes = &bytes[1..66];
54        let hash = hash160::Hash::hash(pubkey_bytes);
55        let pubkey_hash = PubkeyHash::from_raw_hash(hash);
56        let address = Address::p2pkh(pubkey_hash, network);
57        return Some(address.to_string());
58    }
59
60    None
61}
62
63/// SQLite database for the indexer.
64#[derive(Clone)]
65pub struct IndexerDatabase {
66    pool: SqlitePool,
67    network: Network,
68}
69
70impl IndexerDatabase {
71    /// Opens or creates the indexer database at the given path.
72    ///
73    /// The database file is stored in a network-specific subdirectory to prevent
74    /// mixing data from different networks (mainnet, testnet, signet, etc.).
75    pub async fn open(path: &Path, network: Network) -> Result<Self> {
76        let network_dir = network.to_core_arg();
77        let db_path = path.join("indexer").join(network_dir).join("index.sqlite");
78
79        // Ensure parent directory exists
80        if let Some(parent) = db_path.parent() {
81            std::fs::create_dir_all(parent).ok();
82        }
83
84        let options = SqliteConnectOptions::new()
85            .filename(&db_path)
86            .create_if_missing(true)
87            .journal_mode(SqliteJournalMode::Wal)
88            .pragma("cache_size", "-64000") // 64MB cache
89            .pragma("synchronous", "NORMAL");
90
91        let pool = SqlitePoolOptions::new()
92            .max_connections(4)
93            .connect_with(options)
94            .await?;
95
96        let db = Self { pool, network };
97        db.init_schema().await?;
98
99        Ok(db)
100    }
101
102    /// Initialize database schema.
103    async fn init_schema(&self) -> Result<()> {
104        sqlx::query(
105            r#"
106            -- Blocks table for reorg tracking
107            CREATE TABLE IF NOT EXISTS blocks (
108                height INTEGER PRIMARY KEY,
109                hash BLOB NOT NULL UNIQUE,
110                timestamp INTEGER NOT NULL
111            );
112
113            -- Transactions table
114            CREATE TABLE IF NOT EXISTS transactions (
115                txid BLOB PRIMARY KEY,
116                block_height INTEGER NOT NULL,
117                tx_index INTEGER NOT NULL
118            );
119            CREATE INDEX IF NOT EXISTS idx_tx_block ON transactions(block_height);
120
121            -- Outputs table (for UTXO tracking)
122            CREATE TABLE IF NOT EXISTS outputs (
123                txid BLOB NOT NULL,
124                vout INTEGER NOT NULL,
125                address TEXT,
126                value INTEGER NOT NULL,
127                script_pubkey BLOB NOT NULL,
128                block_height INTEGER NOT NULL,
129                spent_txid BLOB,
130                spent_vout INTEGER,
131                spent_block_height INTEGER,
132                PRIMARY KEY (txid, vout)
133            );
134            CREATE INDEX IF NOT EXISTS idx_outputs_address ON outputs(address) WHERE address IS NOT NULL;
135            CREATE INDEX IF NOT EXISTS idx_outputs_unspent ON outputs(address) WHERE address IS NOT NULL AND spent_txid IS NULL;
136            CREATE INDEX IF NOT EXISTS idx_outputs_block ON outputs(block_height);
137
138            -- Address history (denormalized for fast queries)
139            CREATE TABLE IF NOT EXISTS address_history (
140                address TEXT NOT NULL,
141                txid BLOB NOT NULL,
142                block_height INTEGER NOT NULL,
143                delta INTEGER NOT NULL,
144                PRIMARY KEY (address, block_height, txid)
145            );
146            CREATE INDEX IF NOT EXISTS idx_addr_hist ON address_history(address, block_height DESC);
147
148            -- Indexer state
149            CREATE TABLE IF NOT EXISTS state (
150                key TEXT PRIMARY KEY,
151                value TEXT NOT NULL
152            );
153            "#,
154        )
155        .execute(&self.pool)
156        .await?;
157
158        Ok(())
159    }
160
161    /// Get the underlying connection pool.
162    pub fn pool(&self) -> &SqlitePool {
163        &self.pool
164    }
165
166    /// Get the Bitcoin network.
167    pub fn network(&self) -> Network {
168        self.network
169    }
170
171    // ========== State Management ==========
172
173    /// Load indexer state from database.
174    pub async fn load_state(&self) -> Result<Option<IndexerState>> {
175        let row: Option<(String,)> =
176            sqlx::query_as("SELECT value FROM state WHERE key = 'indexer_state'")
177                .fetch_optional(&self.pool)
178                .await?;
179
180        let Some((value,)) = row else {
181            return Ok(None);
182        };
183
184        // Parse state from JSON-like format
185        if value.starts_with("historical:") {
186            let parts: Vec<&str> = value
187                .strip_prefix("historical:")
188                .unwrap()
189                .split(':')
190                .collect();
191            if parts.len() == 2 {
192                let target_height: u32 = parts[0].parse().unwrap_or(0);
193                let current_height: u32 = parts[1].parse().unwrap_or(0);
194                return Ok(Some(IndexerState::HistoricalIndexing {
195                    target_height,
196                    current_height,
197                }));
198            }
199        } else if value.starts_with("active:") {
200            let last_indexed: u32 = value.strip_prefix("active:").unwrap().parse().unwrap_or(0);
201            return Ok(Some(IndexerState::Active { last_indexed }));
202        }
203
204        Ok(None)
205    }
206
207    /// Save indexer state to database.
208    pub async fn save_state(&self, state: &IndexerState) -> Result<()> {
209        let value = match state {
210            IndexerState::HistoricalIndexing {
211                target_height,
212                current_height,
213            } => format!("historical:{target_height}:{current_height}"),
214            IndexerState::Active { last_indexed } => format!("active:{last_indexed}"),
215        };
216
217        sqlx::query("INSERT OR REPLACE INTO state (key, value) VALUES ('indexer_state', ?)")
218            .bind(&value)
219            .execute(&self.pool)
220            .await?;
221
222        Ok(())
223    }
224
225    // ========== Block Operations ==========
226
227    /// Insert a block record.
228    pub async fn insert_block(&self, height: u32, hash: &[u8; 32], timestamp: u32) -> Result<()> {
229        sqlx::query("INSERT OR REPLACE INTO blocks (height, hash, timestamp) VALUES (?, ?, ?)")
230            .bind(height as i64)
231            .bind(hash.as_slice())
232            .bind(timestamp as i64)
233            .execute(&self.pool)
234            .await?;
235        Ok(())
236    }
237
238    /// Get the last indexed block height.
239    pub async fn last_block_height(&self) -> Result<Option<u32>> {
240        let row: Option<(i64,)> = sqlx::query_as("SELECT MAX(height) FROM blocks")
241            .fetch_optional(&self.pool)
242            .await?;
243        Ok(row.and_then(|(h,)| if h >= 0 { Some(h as u32) } else { None }))
244    }
245
246    // ========== Transaction Operations ==========
247
248    /// Insert a transaction record.
249    pub async fn insert_transaction(
250        &self,
251        txid: &Txid,
252        block_height: u32,
253        tx_index: u32,
254    ) -> Result<()> {
255        sqlx::query(
256            "INSERT OR REPLACE INTO transactions (txid, block_height, tx_index) VALUES (?, ?, ?)",
257        )
258        .bind(txid.as_byte_array().as_slice())
259        .bind(block_height as i64)
260        .bind(tx_index as i64)
261        .execute(&self.pool)
262        .await?;
263        Ok(())
264    }
265
266    /// Get transaction position by txid.
267    pub async fn get_transaction(&self, txid: &Txid) -> Result<Option<(u32, u32)>> {
268        let row: Option<(i64, i64)> =
269            sqlx::query_as("SELECT block_height, tx_index FROM transactions WHERE txid = ?")
270                .bind(txid.as_byte_array().as_slice())
271                .fetch_optional(&self.pool)
272                .await?;
273        Ok(row.map(|(h, i)| (h as u32, i as u32)))
274    }
275
276    // ========== Output Operations ==========
277
278    /// Insert an output record.
279    pub async fn insert_output(
280        &self,
281        txid: &Txid,
282        vout: u32,
283        script_pubkey: &ScriptBuf,
284        value: u64,
285        block_height: u32,
286    ) -> Result<()> {
287        let address = script_to_address(script_pubkey, self.network);
288
289        sqlx::query(
290            "INSERT OR REPLACE INTO outputs (txid, vout, address, value, script_pubkey, block_height) VALUES (?, ?, ?, ?, ?, ?)",
291        )
292        .bind(txid.as_byte_array().as_slice())
293        .bind(vout as i64)
294        .bind(&address)
295        .bind(value as i64)
296        .bind(script_pubkey.as_bytes())
297        .bind(block_height as i64)
298        .execute(&self.pool)
299        .await?;
300
301        Ok(())
302    }
303
304    /// Mark an output as spent.
305    pub async fn mark_output_spent(
306        &self,
307        outpoint: &OutPoint,
308        spent_txid: &Txid,
309        spent_vout: u32,
310        spent_block_height: u32,
311    ) -> Result<()> {
312        sqlx::query(
313            "UPDATE outputs SET spent_txid = ?, spent_vout = ?, spent_block_height = ? WHERE txid = ? AND vout = ?",
314        )
315        .bind(spent_txid.as_byte_array().as_slice())
316        .bind(spent_vout as i64)
317        .bind(spent_block_height as i64)
318        .bind(outpoint.txid.as_byte_array().as_slice())
319        .bind(outpoint.vout as i64)
320        .execute(&self.pool)
321        .await?;
322        Ok(())
323    }
324
325    /// Get the address for an output (for computing deltas during spending).
326    pub async fn get_output_info(&self, outpoint: &OutPoint) -> Result<Option<(String, u64)>> {
327        let row: Option<(String, i64)> =
328            sqlx::query_as("SELECT address, value FROM outputs WHERE txid = ? AND vout = ? AND address IS NOT NULL")
329                .bind(outpoint.txid.as_byte_array().as_slice())
330                .bind(outpoint.vout as i64)
331                .fetch_optional(&self.pool)
332                .await?;
333        Ok(row.map(|(addr, val)| (addr, val as u64)))
334    }
335
336    // ========== Address History Operations ==========
337
338    /// Insert an address history entry.
339    pub async fn insert_address_history(
340        &self,
341        address: &str,
342        txid: &Txid,
343        block_height: u32,
344        delta: i64,
345    ) -> Result<()> {
346        // Use INSERT OR REPLACE to handle the case where an address appears in both
347        // inputs and outputs of the same transaction - we want to sum the deltas
348        sqlx::query(
349            r#"
350            INSERT INTO address_history (address, txid, block_height, delta)
351            VALUES (?, ?, ?, ?)
352            ON CONFLICT(address, block_height, txid) DO UPDATE SET
353                delta = address_history.delta + excluded.delta
354            "#,
355        )
356        .bind(address)
357        .bind(txid.as_byte_array().as_slice())
358        .bind(block_height as i64)
359        .bind(delta)
360        .execute(&self.pool)
361        .await?;
362        Ok(())
363    }
364
365    // ========== Reorg Handling ==========
366
367    /// Revert all data above the given block height (for handling reorgs).
368    pub async fn revert_to_height(&self, height: u32) -> Result<()> {
369        let mut tx = self.pool.begin().await?;
370
371        // Unspend outputs that were spent in reverted blocks
372        sqlx::query(
373            "UPDATE outputs SET spent_txid = NULL, spent_vout = NULL, spent_block_height = NULL WHERE spent_block_height > ?",
374        )
375        .bind(height as i64)
376        .execute(&mut *tx)
377        .await?;
378
379        // Delete outputs created in reverted blocks
380        sqlx::query("DELETE FROM outputs WHERE block_height > ?")
381            .bind(height as i64)
382            .execute(&mut *tx)
383            .await?;
384
385        // Delete transactions in reverted blocks
386        sqlx::query("DELETE FROM transactions WHERE block_height > ?")
387            .bind(height as i64)
388            .execute(&mut *tx)
389            .await?;
390
391        // Delete address history in reverted blocks
392        sqlx::query("DELETE FROM address_history WHERE block_height > ?")
393            .bind(height as i64)
394            .execute(&mut *tx)
395            .await?;
396
397        // Delete blocks
398        sqlx::query("DELETE FROM blocks WHERE height > ?")
399            .bind(height as i64)
400            .execute(&mut *tx)
401            .await?;
402
403        tx.commit().await?;
404        Ok(())
405    }
406
407    /// Begin a database transaction for batch operations.
408    pub async fn begin_transaction(&self) -> Result<sqlx::Transaction<'_, sqlx::Sqlite>> {
409        Ok(self.pool.begin().await?)
410    }
411
412    /// Parse a txid from database bytes.
413    pub fn parse_txid(bytes: &[u8]) -> Result<Txid> {
414        let arr: [u8; 32] = bytes
415            .try_into()
416            .map_err(|_| Error::InvalidTxid("Invalid length".to_string()))?;
417        Ok(Txid::from_byte_array(arr))
418    }
419
420    /// Index a single block in a database transaction (for live sync).
421    pub async fn index_block(
422        &self,
423        height: u32,
424        block: &bitcoin::Block,
425        network: Network,
426    ) -> Result<()> {
427        self.index_blocks_batch(&[(height, block.clone())], network)
428            .await
429    }
430
431    /// Index multiple blocks in a single database transaction for better performance.
432    /// Used during historical sync for batching, and indirectly for single blocks during live sync.
433    ///
434    /// Optimized with:
435    /// 1. Bulk pre-fetch of outputs (eliminates N+1 queries)
436    /// 2. Collect all data in memory first, then bulk insert by table
437    /// 3. Pre-aggregate address history deltas (avoids ON CONFLICT overhead)
438    #[allow(clippy::type_complexity)]
439    pub async fn index_blocks_batch(
440        &self,
441        blocks: &[(u32, bitcoin::Block)],
442        network: Network,
443    ) -> Result<()> {
444        if blocks.is_empty() {
445            return Ok(());
446        }
447
448        // ========== Phase 1: Collect all outpoints that need lookup ==========
449        let mut outpoints_to_fetch: HashSet<OutPoint> = HashSet::new();
450        for (_height, block) in blocks {
451            for btc_tx in &block.txdata {
452                for input in &btc_tx.input {
453                    if !input.previous_output.is_null() {
454                        outpoints_to_fetch.insert(input.previous_output);
455                    }
456                }
457            }
458        }
459
460        // ========== Phase 2: Bulk fetch outputs ==========
461        let mut output_cache: HashMap<OutPoint, (String, u64)> = HashMap::new();
462        let outpoints_vec: Vec<_> = outpoints_to_fetch.into_iter().collect();
463
464        let mut tx = self.pool.begin().await?;
465
466        const FETCH_BATCH_SIZE: usize = 500;
467        for chunk in outpoints_vec.chunks(FETCH_BATCH_SIZE) {
468            if chunk.is_empty() {
469                continue;
470            }
471
472            let placeholders: Vec<String> = chunk
473                .iter()
474                .map(|_| "(txid = ? AND vout = ?)".to_string())
475                .collect();
476            let query = format!(
477                "SELECT txid, vout, address, value FROM outputs WHERE address IS NOT NULL AND ({})",
478                placeholders.join(" OR ")
479            );
480
481            let mut query_builder = sqlx::query_as::<_, (Vec<u8>, i64, String, i64)>(&query);
482            for outpoint in chunk {
483                query_builder = query_builder
484                    .bind(outpoint.txid.as_byte_array().as_slice())
485                    .bind(outpoint.vout as i64);
486            }
487
488            let rows = query_builder.fetch_all(&mut *tx).await?;
489
490            for (txid_bytes, vout, address, value) in rows {
491                let txid = Self::parse_txid(&txid_bytes)?;
492                let outpoint = OutPoint {
493                    txid,
494                    vout: vout as u32,
495                };
496                output_cache.insert(outpoint, (address, value as u64));
497            }
498        }
499
500        // ========== Phase 3: Collect all data into memory buffers ==========
501
502        // Block records: (height, hash, timestamp)
503        let mut blocks_data: Vec<(i64, Vec<u8>, i64)> = Vec::new();
504
505        // Transaction records: (txid, block_height, tx_index)
506        let mut transactions_data: Vec<(Vec<u8>, i64, i64)> = Vec::new();
507
508        // Output records: (txid, vout, address, value, script_pubkey, block_height)
509        let mut outputs_data: Vec<(Vec<u8>, i64, Option<String>, i64, Vec<u8>, i64)> = Vec::new();
510
511        // Spent output updates: (spent_txid, spent_vout, spent_block_height, orig_txid, orig_vout)
512        let mut spent_updates: Vec<(Vec<u8>, i64, i64, Vec<u8>, i64)> = Vec::new();
513
514        // Address history: HashMap<(address, txid_bytes, block_height), delta> - pre-aggregated
515        let mut address_history_map: HashMap<(String, Vec<u8>, i64), i64> = HashMap::new();
516
517        // Track outputs created within this batch (for intra-batch spending)
518        // Key: OutPoint, Value: (address, value)
519        let mut batch_outputs: HashMap<OutPoint, (Option<String>, u64)> = HashMap::new();
520
521        for (height, block) in blocks {
522            let block_hash = block.block_hash();
523            let timestamp = block.header.time;
524
525            blocks_data.push((
526                *height as i64,
527                block_hash.as_byte_array().to_vec(),
528                timestamp as i64,
529            ));
530
531            for (tx_index, btc_tx) in block.txdata.iter().enumerate() {
532                let txid = btc_tx.compute_txid();
533                let txid_bytes = txid.as_byte_array().to_vec();
534
535                transactions_data.push((txid_bytes.clone(), *height as i64, tx_index as i64));
536
537                // Process inputs - check both database cache and batch-local outputs
538                for (vin, input) in btc_tx.input.iter().enumerate() {
539                    if input.previous_output.is_null() {
540                        continue;
541                    }
542
543                    // Try database cache first, then batch-local outputs
544                    let output_info: Option<(String, u64)> = output_cache
545                        .get(&input.previous_output)
546                        .cloned()
547                        .or_else(|| {
548                            batch_outputs
549                                .get(&input.previous_output)
550                                .and_then(|(addr, val)| addr.clone().map(|a| (a, *val)))
551                        });
552
553                    if let Some((address, value)) = output_info {
554                        spent_updates.push((
555                            txid_bytes.clone(),
556                            vin as i64,
557                            *height as i64,
558                            input.previous_output.txid.as_byte_array().to_vec(),
559                            input.previous_output.vout as i64,
560                        ));
561
562                        // Accumulate negative delta
563                        let key = (address.clone(), txid_bytes.clone(), *height as i64);
564                        *address_history_map.entry(key).or_insert(0) -= value as i64;
565                    }
566                }
567
568                // Process outputs
569                for (vout, output) in btc_tx.output.iter().enumerate() {
570                    let script = &output.script_pubkey;
571                    let value = output.value.to_sat();
572
573                    let address = script_to_address(script, network);
574
575                    outputs_data.push((
576                        txid_bytes.clone(),
577                        vout as i64,
578                        address.clone(),
579                        value as i64,
580                        script.as_bytes().to_vec(),
581                        *height as i64,
582                    ));
583
584                    // Track this output for potential intra-batch spending
585                    let outpoint = OutPoint {
586                        txid,
587                        vout: vout as u32,
588                    };
589                    batch_outputs.insert(outpoint, (address.clone(), value));
590
591                    // Accumulate positive delta
592                    if let Some(addr) = address {
593                        let key = (addr, txid_bytes.clone(), *height as i64);
594                        *address_history_map.entry(key).or_insert(0) += value as i64;
595                    }
596                }
597            }
598        }
599
600        // ========== Phase 4: Bulk insert by table ==========
601
602        // 4a: Insert blocks (multi-row INSERT)
603        const INSERT_BATCH_SIZE: usize = 100;
604
605        for chunk in blocks_data.chunks(INSERT_BATCH_SIZE) {
606            let placeholders: Vec<&str> = chunk.iter().map(|_| "(?, ?, ?)").collect();
607            let query = format!(
608                "INSERT OR REPLACE INTO blocks (height, hash, timestamp) VALUES {}",
609                placeholders.join(", ")
610            );
611
612            let mut query_builder = sqlx::query(&query);
613            for (height, hash, timestamp) in chunk {
614                query_builder = query_builder.bind(height).bind(hash).bind(timestamp);
615            }
616            query_builder.execute(&mut *tx).await?;
617        }
618
619        // 4b: Insert transactions (multi-row INSERT)
620        for chunk in transactions_data.chunks(INSERT_BATCH_SIZE) {
621            let placeholders: Vec<&str> = chunk.iter().map(|_| "(?, ?, ?)").collect();
622            let query = format!(
623                "INSERT OR REPLACE INTO transactions (txid, block_height, tx_index) VALUES {}",
624                placeholders.join(", ")
625            );
626
627            let mut query_builder = sqlx::query(&query);
628            for (txid, height, idx) in chunk {
629                query_builder = query_builder.bind(txid).bind(height).bind(idx);
630            }
631            query_builder.execute(&mut *tx).await?;
632        }
633
634        // 4c: Insert outputs (multi-row INSERT)
635        for chunk in outputs_data.chunks(INSERT_BATCH_SIZE) {
636            let placeholders: Vec<&str> = chunk.iter().map(|_| "(?, ?, ?, ?, ?, ?)").collect();
637            let query = format!(
638                "INSERT OR REPLACE INTO outputs (txid, vout, address, value, script_pubkey, block_height) VALUES {}",
639                placeholders.join(", ")
640            );
641
642            let mut query_builder = sqlx::query(&query);
643            for (txid, vout, address, value, script, height) in chunk {
644                query_builder = query_builder
645                    .bind(txid)
646                    .bind(vout)
647                    .bind(address)
648                    .bind(value)
649                    .bind(script)
650                    .bind(height);
651            }
652            query_builder.execute(&mut *tx).await?;
653        }
654
655        // 4d: Update spent outputs - individual UPDATEs using PK index
656        // Direct PK lookups are fast; the temp table approach with correlated subqueries
657        // becomes slow as the outputs table grows
658        for (spent_txid, spent_vout, spent_height, orig_txid, orig_vout) in &spent_updates {
659            sqlx::query(
660                "UPDATE outputs SET spent_txid = ?, spent_vout = ?, spent_block_height = ? WHERE txid = ? AND vout = ?",
661            )
662            .bind(spent_txid)
663            .bind(spent_vout)
664            .bind(spent_height)
665            .bind(orig_txid)
666            .bind(orig_vout)
667            .execute(&mut *tx)
668            .await?;
669        }
670
671        // 4e: Insert address history (multi-row INSERT with pre-aggregated deltas)
672        let history_entries: Vec<_> = address_history_map.into_iter().collect();
673        for chunk in history_entries.chunks(INSERT_BATCH_SIZE) {
674            let placeholders: Vec<&str> = chunk.iter().map(|_| "(?, ?, ?, ?)").collect();
675            let query = format!(
676                "INSERT OR REPLACE INTO address_history (address, txid, block_height, delta) VALUES {}",
677                placeholders.join(", ")
678            );
679
680            let mut query_builder = sqlx::query(&query);
681            for ((address, txid, height), delta) in chunk {
682                query_builder = query_builder
683                    .bind(address)
684                    .bind(txid)
685                    .bind(height)
686                    .bind(delta);
687            }
688            query_builder.execute(&mut *tx).await?;
689        }
690
691        tx.commit().await?;
692        Ok(())
693    }
694}