subcoin_indexer/
queries.rs

1//! Query functions for the indexer.
2
3use crate::db::{IndexerDatabase, Result};
4use crate::types::{
5    AddressBalance, AddressHistory, AddressStats, IndexerState, IndexerStatus, OutputStatus, Utxo,
6};
7use bitcoin::Txid;
8use bitcoin::hashes::Hash;
9use subcoin_primitives::TxPosition;
10
11/// Query interface for the indexer database.
12#[derive(Clone)]
13pub struct IndexerQuery {
14    db: IndexerDatabase,
15}
16
17impl IndexerQuery {
18    /// Create a new query interface.
19    pub fn new(db: IndexerDatabase) -> Self {
20        Self { db }
21    }
22
23    /// Get transaction position by txid.
24    pub async fn get_tx_position(&self, txid: Txid) -> Result<Option<TxPosition>> {
25        let result = self.db.get_transaction(&txid).await?;
26        Ok(result.map(|(block_number, index)| TxPosition {
27            block_number,
28            index,
29        }))
30    }
31
32    /// Get address transaction history with pagination.
33    pub async fn get_address_history(
34        &self,
35        address: &str,
36        limit: u32,
37        offset: u32,
38    ) -> Result<Vec<AddressHistory>> {
39        let rows: Vec<(Vec<u8>, i64, i64)> = sqlx::query_as(
40            r#"
41            SELECT ah.txid, ah.block_height, ah.delta
42            FROM address_history ah
43            WHERE ah.address = ?
44            ORDER BY ah.block_height DESC
45            LIMIT ? OFFSET ?
46            "#,
47        )
48        .bind(address)
49        .bind(limit as i64)
50        .bind(offset as i64)
51        .fetch_all(self.db.pool())
52        .await?;
53
54        let mut history = Vec::with_capacity(rows.len());
55        for (txid_bytes, block_height, delta) in rows {
56            let txid = IndexerDatabase::parse_txid(&txid_bytes)?;
57
58            // Get timestamp from blocks table
59            let timestamp: Option<(i64,)> =
60                sqlx::query_as("SELECT timestamp FROM blocks WHERE height = ?")
61                    .bind(block_height)
62                    .fetch_optional(self.db.pool())
63                    .await?;
64
65            history.push(AddressHistory {
66                txid,
67                block_height: block_height as u32,
68                delta,
69                timestamp: timestamp.map(|(t,)| t as u32).unwrap_or(0),
70            });
71        }
72
73        Ok(history)
74    }
75
76    /// Get unspent outputs for an address.
77    #[allow(clippy::type_complexity)]
78    pub async fn get_address_utxos(&self, address: &str) -> Result<Vec<Utxo>> {
79        let rows: Vec<(Vec<u8>, i64, i64, i64, Vec<u8>)> = sqlx::query_as(
80            r#"
81            SELECT txid, vout, value, block_height, script_pubkey
82            FROM outputs
83            WHERE address = ? AND spent_txid IS NULL
84            ORDER BY block_height DESC
85            "#,
86        )
87        .bind(address)
88        .fetch_all(self.db.pool())
89        .await?;
90
91        let mut utxos = Vec::with_capacity(rows.len());
92        for (txid_bytes, vout, value, block_height, script_pubkey) in rows {
93            let txid = IndexerDatabase::parse_txid(&txid_bytes)?;
94            utxos.push(Utxo {
95                txid,
96                vout: vout as u32,
97                value: value as u64,
98                block_height: block_height as u32,
99                script_pubkey,
100            });
101        }
102
103        Ok(utxos)
104    }
105
106    /// Get address balance and statistics.
107    pub async fn get_address_balance(&self, address: &str) -> Result<AddressBalance> {
108        // Get confirmed balance (sum of unspent outputs)
109        let confirmed: Option<(i64,)> = sqlx::query_as(
110            "SELECT COALESCE(SUM(value), 0) FROM outputs WHERE address = ? AND spent_txid IS NULL",
111        )
112        .bind(address)
113        .fetch_optional(self.db.pool())
114        .await?;
115
116        // Get total received
117        let total_received: Option<(i64,)> =
118            sqlx::query_as("SELECT COALESCE(SUM(value), 0) FROM outputs WHERE address = ?")
119                .bind(address)
120                .fetch_optional(self.db.pool())
121                .await?;
122
123        // Get total sent (sum of spent outputs)
124        let total_sent: Option<(i64,)> = sqlx::query_as(
125            "SELECT COALESCE(SUM(value), 0) FROM outputs WHERE address = ? AND spent_txid IS NOT NULL",
126        )
127        .bind(address)
128        .fetch_optional(self.db.pool())
129        .await?;
130
131        // Get transaction count (distinct transactions)
132        let tx_count: Option<(i64,)> =
133            sqlx::query_as("SELECT COUNT(DISTINCT txid) FROM address_history WHERE address = ?")
134                .bind(address)
135                .fetch_optional(self.db.pool())
136                .await?;
137
138        // Get UTXO count
139        let utxo_count: Option<(i64,)> =
140            sqlx::query_as("SELECT COUNT(*) FROM outputs WHERE address = ? AND spent_txid IS NULL")
141                .bind(address)
142                .fetch_optional(self.db.pool())
143                .await?;
144
145        Ok(AddressBalance {
146            confirmed: confirmed.map(|(v,)| v as u64).unwrap_or(0),
147            total_received: total_received.map(|(v,)| v as u64).unwrap_or(0),
148            total_sent: total_sent.map(|(v,)| v as u64).unwrap_or(0),
149            tx_count: tx_count.map(|(v,)| v as u64).unwrap_or(0),
150            utxo_count: utxo_count.map(|(v,)| v as u64).unwrap_or(0),
151        })
152    }
153
154    /// Get the number of transactions for an address.
155    pub async fn get_address_tx_count(&self, address: &str) -> Result<u64> {
156        let count: Option<(i64,)> =
157            sqlx::query_as("SELECT COUNT(DISTINCT txid) FROM address_history WHERE address = ?")
158                .bind(address)
159                .fetch_optional(self.db.pool())
160                .await?;
161        Ok(count.map(|(v,)| v as u64).unwrap_or(0))
162    }
163
164    /// Get address statistics (first/last seen, largest tx, etc).
165    pub async fn get_address_stats(&self, address: &str) -> Result<AddressStats> {
166        // Get first seen (earliest transaction)
167        let first_seen: Option<(i64, i64)> = sqlx::query_as(
168            r#"
169            SELECT ah.block_height, b.timestamp
170            FROM address_history ah
171            JOIN blocks b ON b.height = ah.block_height
172            WHERE ah.address = ?
173            ORDER BY ah.block_height ASC
174            LIMIT 1
175            "#,
176        )
177        .bind(address)
178        .fetch_optional(self.db.pool())
179        .await?;
180
181        // Get last seen (most recent transaction)
182        let last_seen: Option<(i64, i64)> = sqlx::query_as(
183            r#"
184            SELECT ah.block_height, b.timestamp
185            FROM address_history ah
186            JOIN blocks b ON b.height = ah.block_height
187            WHERE ah.address = ?
188            ORDER BY ah.block_height DESC
189            LIMIT 1
190            "#,
191        )
192        .bind(address)
193        .fetch_optional(self.db.pool())
194        .await?;
195
196        // Get largest receive (max positive delta)
197        let largest_receive: Option<(i64,)> = sqlx::query_as(
198            "SELECT COALESCE(MAX(delta), 0) FROM address_history WHERE address = ? AND delta > 0",
199        )
200        .bind(address)
201        .fetch_optional(self.db.pool())
202        .await?;
203
204        // Get largest send (max absolute negative delta)
205        let largest_send: Option<(i64,)> = sqlx::query_as(
206            "SELECT COALESCE(MAX(ABS(delta)), 0) FROM address_history WHERE address = ? AND delta < 0",
207        )
208        .bind(address)
209        .fetch_optional(self.db.pool())
210        .await?;
211
212        // Get receive count (transactions with positive delta)
213        let receive_count: Option<(i64,)> =
214            sqlx::query_as("SELECT COUNT(*) FROM address_history WHERE address = ? AND delta > 0")
215                .bind(address)
216                .fetch_optional(self.db.pool())
217                .await?;
218
219        // Get send count (transactions with negative delta)
220        let send_count: Option<(i64,)> =
221            sqlx::query_as("SELECT COUNT(*) FROM address_history WHERE address = ? AND delta < 0")
222                .bind(address)
223                .fetch_optional(self.db.pool())
224                .await?;
225
226        Ok(AddressStats {
227            first_seen_height: first_seen.map(|(h, _)| h as u32),
228            first_seen_timestamp: first_seen.map(|(_, t)| t as u32),
229            last_seen_height: last_seen.map(|(h, _)| h as u32),
230            last_seen_timestamp: last_seen.map(|(_, t)| t as u32),
231            largest_receive: largest_receive.map(|(v,)| v as u64).unwrap_or(0),
232            largest_send: largest_send.map(|(v,)| v as u64).unwrap_or(0),
233            receive_count: receive_count.map(|(v,)| v as u64).unwrap_or(0),
234            send_count: send_count.map(|(v,)| v as u64).unwrap_or(0),
235        })
236    }
237
238    /// Get output spending status.
239    #[allow(clippy::type_complexity)]
240    pub async fn get_output_status(&self, txid: &Txid, vout: u32) -> Result<Option<OutputStatus>> {
241        let row: Option<(Option<Vec<u8>>, Option<i64>, Option<i64>)> = sqlx::query_as(
242            "SELECT spent_txid, spent_vout, spent_block_height FROM outputs WHERE txid = ? AND vout = ?",
243        )
244        .bind(txid.as_byte_array().as_slice())
245        .bind(vout as i64)
246        .fetch_optional(self.db.pool())
247        .await?;
248
249        match row {
250            Some((spent_txid_bytes, spent_vout, spent_block_height)) => {
251                let spent = spent_txid_bytes.is_some();
252                let spent_by_txid = match spent_txid_bytes {
253                    Some(bytes) => Some(IndexerDatabase::parse_txid(&bytes)?),
254                    None => None,
255                };
256                Ok(Some(OutputStatus {
257                    spent,
258                    spent_by_txid,
259                    spent_by_vin: spent_vout.map(|v| v as u32),
260                    spent_at_height: spent_block_height.map(|h| h as u32),
261                }))
262            }
263            None => Ok(None),
264        }
265    }
266
267    /// Get the current indexer status.
268    pub async fn get_status(&self, chain_tip: u32) -> Result<IndexerStatus> {
269        let state = self.db.load_state().await?;
270
271        match state {
272            Some(IndexerState::HistoricalIndexing {
273                target_height,
274                current_height,
275            }) => {
276                let progress = if target_height > 0 {
277                    (current_height as f64 / target_height as f64) * 100.0
278                } else {
279                    0.0
280                };
281                Ok(IndexerStatus {
282                    is_syncing: true,
283                    indexed_height: current_height,
284                    target_height: Some(target_height),
285                    progress_percent: progress,
286                })
287            }
288            Some(IndexerState::Active { last_indexed }) => {
289                let progress = if chain_tip > 0 {
290                    (last_indexed as f64 / chain_tip as f64) * 100.0
291                } else {
292                    100.0
293                };
294                Ok(IndexerStatus {
295                    is_syncing: last_indexed < chain_tip,
296                    indexed_height: last_indexed,
297                    target_height: if last_indexed < chain_tip {
298                        Some(chain_tip)
299                    } else {
300                        None
301                    },
302                    progress_percent: progress.min(100.0),
303                })
304            }
305            None => Ok(IndexerStatus {
306                is_syncing: chain_tip > 0,
307                indexed_height: 0,
308                target_height: if chain_tip > 0 { Some(chain_tip) } else { None },
309                progress_percent: 0.0,
310            }),
311        }
312    }
313}
314
315/// Implement the TransactionIndex trait from subcoin-primitives.
316impl subcoin_primitives::TransactionIndex for IndexerQuery {
317    fn tx_index(&self, txid: Txid) -> sp_blockchain::Result<Option<TxPosition>> {
318        // We need to block on the async call since the trait is sync
319        // This assumes we're running within a tokio context (which we always are in the node)
320        let handle = tokio::runtime::Handle::try_current()
321            .map_err(|_| sp_blockchain::Error::Backend("No tokio runtime available".to_string()))?;
322
323        let db = self.db.clone();
324        let result = std::thread::spawn(move || {
325            handle.block_on(async move { db.get_transaction(&txid).await })
326        })
327        .join()
328        .map_err(|_| sp_blockchain::Error::Backend("Failed to query transaction".to_string()))?;
329
330        result
331            .map(|opt| {
332                opt.map(|(block_number, index)| TxPosition {
333                    block_number,
334                    index,
335                })
336            })
337            .map_err(|e| sp_blockchain::Error::Backend(e.to_string()))
338    }
339}