1use crate::db::{IndexerDatabase, Result};
4use crate::types::{
5 AddressBalance, AddressHistory, AddressStats, IndexerState, IndexerStatus, OutputStatus, Utxo,
6};
7use bitcoin::Txid;
8use bitcoin::hashes::Hash;
9use subcoin_primitives::TxPosition;
10
11#[derive(Clone)]
13pub struct IndexerQuery {
14 db: IndexerDatabase,
15}
16
17impl IndexerQuery {
18 pub fn new(db: IndexerDatabase) -> Self {
20 Self { db }
21 }
22
23 pub async fn get_tx_position(&self, txid: Txid) -> Result<Option<TxPosition>> {
25 let result = self.db.get_transaction(&txid).await?;
26 Ok(result.map(|(block_number, index)| TxPosition {
27 block_number,
28 index,
29 }))
30 }
31
32 pub async fn get_address_history(
34 &self,
35 address: &str,
36 limit: u32,
37 offset: u32,
38 ) -> Result<Vec<AddressHistory>> {
39 let rows: Vec<(Vec<u8>, i64, i64)> = sqlx::query_as(
40 r#"
41 SELECT ah.txid, ah.block_height, ah.delta
42 FROM address_history ah
43 WHERE ah.address = ?
44 ORDER BY ah.block_height DESC
45 LIMIT ? OFFSET ?
46 "#,
47 )
48 .bind(address)
49 .bind(limit as i64)
50 .bind(offset as i64)
51 .fetch_all(self.db.pool())
52 .await?;
53
54 let mut history = Vec::with_capacity(rows.len());
55 for (txid_bytes, block_height, delta) in rows {
56 let txid = IndexerDatabase::parse_txid(&txid_bytes)?;
57
58 let timestamp: Option<(i64,)> =
60 sqlx::query_as("SELECT timestamp FROM blocks WHERE height = ?")
61 .bind(block_height)
62 .fetch_optional(self.db.pool())
63 .await?;
64
65 history.push(AddressHistory {
66 txid,
67 block_height: block_height as u32,
68 delta,
69 timestamp: timestamp.map(|(t,)| t as u32).unwrap_or(0),
70 });
71 }
72
73 Ok(history)
74 }
75
76 #[allow(clippy::type_complexity)]
78 pub async fn get_address_utxos(&self, address: &str) -> Result<Vec<Utxo>> {
79 let rows: Vec<(Vec<u8>, i64, i64, i64, Vec<u8>)> = sqlx::query_as(
80 r#"
81 SELECT txid, vout, value, block_height, script_pubkey
82 FROM outputs
83 WHERE address = ? AND spent_txid IS NULL
84 ORDER BY block_height DESC
85 "#,
86 )
87 .bind(address)
88 .fetch_all(self.db.pool())
89 .await?;
90
91 let mut utxos = Vec::with_capacity(rows.len());
92 for (txid_bytes, vout, value, block_height, script_pubkey) in rows {
93 let txid = IndexerDatabase::parse_txid(&txid_bytes)?;
94 utxos.push(Utxo {
95 txid,
96 vout: vout as u32,
97 value: value as u64,
98 block_height: block_height as u32,
99 script_pubkey,
100 });
101 }
102
103 Ok(utxos)
104 }
105
106 pub async fn get_address_balance(&self, address: &str) -> Result<AddressBalance> {
108 let confirmed: Option<(i64,)> = sqlx::query_as(
110 "SELECT COALESCE(SUM(value), 0) FROM outputs WHERE address = ? AND spent_txid IS NULL",
111 )
112 .bind(address)
113 .fetch_optional(self.db.pool())
114 .await?;
115
116 let total_received: Option<(i64,)> =
118 sqlx::query_as("SELECT COALESCE(SUM(value), 0) FROM outputs WHERE address = ?")
119 .bind(address)
120 .fetch_optional(self.db.pool())
121 .await?;
122
123 let total_sent: Option<(i64,)> = sqlx::query_as(
125 "SELECT COALESCE(SUM(value), 0) FROM outputs WHERE address = ? AND spent_txid IS NOT NULL",
126 )
127 .bind(address)
128 .fetch_optional(self.db.pool())
129 .await?;
130
131 let tx_count: Option<(i64,)> =
133 sqlx::query_as("SELECT COUNT(DISTINCT txid) FROM address_history WHERE address = ?")
134 .bind(address)
135 .fetch_optional(self.db.pool())
136 .await?;
137
138 let utxo_count: Option<(i64,)> =
140 sqlx::query_as("SELECT COUNT(*) FROM outputs WHERE address = ? AND spent_txid IS NULL")
141 .bind(address)
142 .fetch_optional(self.db.pool())
143 .await?;
144
145 Ok(AddressBalance {
146 confirmed: confirmed.map(|(v,)| v as u64).unwrap_or(0),
147 total_received: total_received.map(|(v,)| v as u64).unwrap_or(0),
148 total_sent: total_sent.map(|(v,)| v as u64).unwrap_or(0),
149 tx_count: tx_count.map(|(v,)| v as u64).unwrap_or(0),
150 utxo_count: utxo_count.map(|(v,)| v as u64).unwrap_or(0),
151 })
152 }
153
154 pub async fn get_address_tx_count(&self, address: &str) -> Result<u64> {
156 let count: Option<(i64,)> =
157 sqlx::query_as("SELECT COUNT(DISTINCT txid) FROM address_history WHERE address = ?")
158 .bind(address)
159 .fetch_optional(self.db.pool())
160 .await?;
161 Ok(count.map(|(v,)| v as u64).unwrap_or(0))
162 }
163
164 pub async fn get_address_stats(&self, address: &str) -> Result<AddressStats> {
166 let first_seen: Option<(i64, i64)> = sqlx::query_as(
168 r#"
169 SELECT ah.block_height, b.timestamp
170 FROM address_history ah
171 JOIN blocks b ON b.height = ah.block_height
172 WHERE ah.address = ?
173 ORDER BY ah.block_height ASC
174 LIMIT 1
175 "#,
176 )
177 .bind(address)
178 .fetch_optional(self.db.pool())
179 .await?;
180
181 let last_seen: Option<(i64, i64)> = sqlx::query_as(
183 r#"
184 SELECT ah.block_height, b.timestamp
185 FROM address_history ah
186 JOIN blocks b ON b.height = ah.block_height
187 WHERE ah.address = ?
188 ORDER BY ah.block_height DESC
189 LIMIT 1
190 "#,
191 )
192 .bind(address)
193 .fetch_optional(self.db.pool())
194 .await?;
195
196 let largest_receive: Option<(i64,)> = sqlx::query_as(
198 "SELECT COALESCE(MAX(delta), 0) FROM address_history WHERE address = ? AND delta > 0",
199 )
200 .bind(address)
201 .fetch_optional(self.db.pool())
202 .await?;
203
204 let largest_send: Option<(i64,)> = sqlx::query_as(
206 "SELECT COALESCE(MAX(ABS(delta)), 0) FROM address_history WHERE address = ? AND delta < 0",
207 )
208 .bind(address)
209 .fetch_optional(self.db.pool())
210 .await?;
211
212 let receive_count: Option<(i64,)> =
214 sqlx::query_as("SELECT COUNT(*) FROM address_history WHERE address = ? AND delta > 0")
215 .bind(address)
216 .fetch_optional(self.db.pool())
217 .await?;
218
219 let send_count: Option<(i64,)> =
221 sqlx::query_as("SELECT COUNT(*) FROM address_history WHERE address = ? AND delta < 0")
222 .bind(address)
223 .fetch_optional(self.db.pool())
224 .await?;
225
226 Ok(AddressStats {
227 first_seen_height: first_seen.map(|(h, _)| h as u32),
228 first_seen_timestamp: first_seen.map(|(_, t)| t as u32),
229 last_seen_height: last_seen.map(|(h, _)| h as u32),
230 last_seen_timestamp: last_seen.map(|(_, t)| t as u32),
231 largest_receive: largest_receive.map(|(v,)| v as u64).unwrap_or(0),
232 largest_send: largest_send.map(|(v,)| v as u64).unwrap_or(0),
233 receive_count: receive_count.map(|(v,)| v as u64).unwrap_or(0),
234 send_count: send_count.map(|(v,)| v as u64).unwrap_or(0),
235 })
236 }
237
238 #[allow(clippy::type_complexity)]
240 pub async fn get_output_status(&self, txid: &Txid, vout: u32) -> Result<Option<OutputStatus>> {
241 let row: Option<(Option<Vec<u8>>, Option<i64>, Option<i64>)> = sqlx::query_as(
242 "SELECT spent_txid, spent_vout, spent_block_height FROM outputs WHERE txid = ? AND vout = ?",
243 )
244 .bind(txid.as_byte_array().as_slice())
245 .bind(vout as i64)
246 .fetch_optional(self.db.pool())
247 .await?;
248
249 match row {
250 Some((spent_txid_bytes, spent_vout, spent_block_height)) => {
251 let spent = spent_txid_bytes.is_some();
252 let spent_by_txid = match spent_txid_bytes {
253 Some(bytes) => Some(IndexerDatabase::parse_txid(&bytes)?),
254 None => None,
255 };
256 Ok(Some(OutputStatus {
257 spent,
258 spent_by_txid,
259 spent_by_vin: spent_vout.map(|v| v as u32),
260 spent_at_height: spent_block_height.map(|h| h as u32),
261 }))
262 }
263 None => Ok(None),
264 }
265 }
266
267 pub async fn get_status(&self, chain_tip: u32) -> Result<IndexerStatus> {
269 let state = self.db.load_state().await?;
270
271 match state {
272 Some(IndexerState::HistoricalIndexing {
273 target_height,
274 current_height,
275 }) => {
276 let progress = if target_height > 0 {
277 (current_height as f64 / target_height as f64) * 100.0
278 } else {
279 0.0
280 };
281 Ok(IndexerStatus {
282 is_syncing: true,
283 indexed_height: current_height,
284 target_height: Some(target_height),
285 progress_percent: progress,
286 })
287 }
288 Some(IndexerState::Active { last_indexed }) => {
289 let progress = if chain_tip > 0 {
290 (last_indexed as f64 / chain_tip as f64) * 100.0
291 } else {
292 100.0
293 };
294 Ok(IndexerStatus {
295 is_syncing: last_indexed < chain_tip,
296 indexed_height: last_indexed,
297 target_height: if last_indexed < chain_tip {
298 Some(chain_tip)
299 } else {
300 None
301 },
302 progress_percent: progress.min(100.0),
303 })
304 }
305 None => Ok(IndexerStatus {
306 is_syncing: chain_tip > 0,
307 indexed_height: 0,
308 target_height: if chain_tip > 0 { Some(chain_tip) } else { None },
309 progress_percent: 0.0,
310 }),
311 }
312 }
313}
314
315impl subcoin_primitives::TransactionIndex for IndexerQuery {
317 fn tx_index(&self, txid: Txid) -> sp_blockchain::Result<Option<TxPosition>> {
318 let handle = tokio::runtime::Handle::try_current()
321 .map_err(|_| sp_blockchain::Error::Backend("No tokio runtime available".to_string()))?;
322
323 let db = self.db.clone();
324 let result = std::thread::spawn(move || {
325 handle.block_on(async move { db.get_transaction(&txid).await })
326 })
327 .join()
328 .map_err(|_| sp_blockchain::Error::Backend("Failed to query transaction".to_string()))?;
329
330 result
331 .map(|opt| {
332 opt.map(|(block_number, index)| TxPosition {
333 block_number,
334 index,
335 })
336 })
337 .map_err(|e| sp_blockchain::Error::Backend(e.to_string()))
338 }
339}