1mod arena;
14mod coins_view;
15mod error;
16mod inner;
17mod options;
18mod policy;
19mod types;
24mod validation;
25
26pub use self::arena::{MemPoolArena, TxMemPoolEntry};
27pub use self::coins_view::CoinsViewCache;
28pub use self::error::MempoolError;
29pub use self::inner::MemPoolInner;
30pub use self::options::MemPoolOptions;
31pub use self::types::{
32 ConflictSet, EntryId, FeeRate, LockPoints, Package, PackageValidationResult, RemovalReason,
33 ValidationResult,
34};
35
36use bitcoin::Transaction;
37use bitcoin::hashes::Hash;
38use sc_client_api::{AuxStore, HeaderBackend};
39use sp_api::ProvideRuntimeApi;
40use sp_runtime::traits::Block as BlockT;
41use std::collections::HashSet;
42use std::marker::PhantomData;
43use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
44use std::sync::{Arc, RwLock};
45use subcoin_primitives::tx_pool::*;
46use subcoin_primitives::{BackendExt, ClientExt};
47
48pub struct MemPool<Block: BlockT, Client> {
57 options: MemPoolOptions,
59
60 inner: RwLock<MemPoolInner>,
62
63 coins_cache: RwLock<CoinsViewCache<Block, Client>>,
65
66 transactions_updated: AtomicU32,
68 sequence_number: AtomicU64,
69
70 client: Arc<Client>,
72
73 _phantom: PhantomData<Block>,
74}
75
76impl<Block, Client> MemPool<Block, Client>
77where
78 Block: BlockT,
79 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore + Send + Sync,
80 Client::Api: subcoin_runtime_primitives::SubcoinApi<Block>,
81{
82 pub fn new(client: Arc<Client>) -> Self {
84 Self::with_options(client, MemPoolOptions::default())
85 }
86
87 pub fn with_options(client: Arc<Client>, options: MemPoolOptions) -> Self {
89 let coins_cache = CoinsViewCache::new(client.clone(), 10_000);
90
91 Self {
92 options,
93 inner: RwLock::new(MemPoolInner::new()),
94 coins_cache: RwLock::new(coins_cache),
95 transactions_updated: AtomicU32::new(0),
96 sequence_number: AtomicU64::new(1),
97 client,
98 _phantom: PhantomData,
99 }
100 }
101
102 pub fn accept_single_transaction(&self, tx: Transaction) -> Result<(), MempoolError> {
106 let mut inner = self.inner.write().expect("MemPool lock poisoned");
108 let mut coins = self.coins_cache.write().expect("CoinsCache lock poisoned");
109
110 let best_block = coins.best_block();
112 let current_height: u32 = self
113 .client
114 .info()
115 .best_number
116 .try_into()
117 .unwrap_or_else(|_| panic!("Block number must fit into u32"));
118 let current_time = std::time::SystemTime::now()
119 .duration_since(std::time::UNIX_EPOCH)
120 .expect("Time went backwards")
121 .as_secs() as i64;
122
123 let current_mtp =
125 if let Some(bitcoin_block_hash) = self.client.bitcoin_block_hash_for(best_block) {
126 self.client
127 .get_block_metadata(bitcoin_block_hash)
128 .map(|metadata| metadata.median_time_past)
129 .unwrap_or(current_time) } else {
131 current_time };
133
134 let tx_arc = Arc::new(tx);
136 let mut ws = validation::ValidationWorkspace::new(tx_arc);
137 validation::pre_checks(
138 &mut ws,
139 &inner,
140 &mut coins,
141 &self.options,
142 current_height,
143 current_mtp,
144 best_block,
145 )?;
146
147 validation::check_package_limits(&ws, &inner, &self.options)?;
149
150 validation::check_inputs(&ws, &mut coins, validation::standard_script_verify_flags())?;
152
153 validation::check_inputs(&ws, &mut coins, validation::mandatory_script_verify_flags())?;
155
156 let sequence = self.sequence_number.fetch_add(1, Ordering::SeqCst);
158 let entry_block_hash = self
160 .client
161 .bitcoin_block_hash_for(best_block)
162 .unwrap_or_else(bitcoin::BlockHash::all_zeros);
163 let _entry_id = validation::finalize_tx(
164 ws,
165 &mut inner,
166 &mut coins,
167 current_height,
168 current_time,
169 current_mtp,
170 entry_block_hash,
171 sequence,
172 )?;
173
174 self.transactions_updated.fetch_add(1, Ordering::SeqCst);
176
177 Ok(())
178 }
179
180 pub fn size(&self) -> usize {
182 self.inner.read().expect("MemPool lock poisoned").size()
183 }
184
185 pub fn total_size(&self) -> u64 {
187 self.inner
188 .read()
189 .expect("MemPool lock poisoned")
190 .total_size()
191 }
192
193 pub fn trim_to_size(&self, max_size: u64) {
195 self.inner
196 .write()
197 .expect("MemPool lock poisoned")
198 .trim_to_size(max_size);
199 }
200
201 pub fn expire(&self, max_age_seconds: i64) {
203 let current_time = std::time::SystemTime::now()
204 .duration_since(std::time::UNIX_EPOCH)
205 .expect("Time went backwards")
206 .as_secs() as i64;
207
208 self.inner
209 .write()
210 .expect("MemPool lock poisoned")
211 .expire(current_time, max_age_seconds);
212 }
213
214 pub fn options(&self) -> &MemPoolOptions {
216 &self.options
217 }
218
219 pub fn remove_for_block(
226 &self,
227 confirmed_txs: &[Transaction],
228 new_best_block: Block::Hash,
229 ) -> Result<(), MempoolError> {
230 let mut inner = self.inner.write().expect("MemPool lock poisoned");
231 let mut coins = self.coins_cache.write().expect("CoinsCache lock poisoned");
232
233 let mut to_remove = HashSet::new();
234
235 for tx in confirmed_txs {
237 let txid = tx.compute_txid();
238
239 if let Some(entry_id) = inner.arena.get_by_txid(&txid) {
241 to_remove.insert(entry_id);
242 }
243
244 let conflicts = Self::collect_conflicts(tx, &inner);
246 to_remove.extend(conflicts);
247 }
248
249 let mut expanded = HashSet::new();
251 for &entry_id in &to_remove {
252 inner.calculate_descendants(entry_id, &mut expanded);
253 }
254
255 let mut removed_txs = Vec::with_capacity(expanded.len());
257 for &entry_id in &expanded {
258 if let Some(entry) = inner.arena.get(entry_id) {
259 removed_txs.push(entry.tx.clone());
260 }
261 }
262
263 inner.remove_staged(&expanded, false, RemovalReason::Block);
265
266 for tx in removed_txs {
268 coins.remove_mempool_tx(&tx);
269 }
270
271 coins.on_block_connected(new_best_block);
273
274 self.transactions_updated
276 .fetch_add(expanded.len() as u32, Ordering::SeqCst);
277
278 Ok(())
279 }
280
281 fn collect_conflicts(tx: &Transaction, inner: &MemPoolInner) -> HashSet<EntryId> {
285 let mut conflicts = HashSet::new();
286
287 for input in &tx.input {
288 if let Some(conflicting_txid) = inner.get_conflict_tx(&input.previous_output) {
289 if let Some(entry_id) = inner.arena.get_by_txid(&conflicting_txid) {
290 conflicts.insert(entry_id);
291 }
292 }
293 }
294
295 conflicts
296 }
297
298 pub fn remove_conflicts(&self, txs: &[Transaction]) -> Result<(), MempoolError> {
303 let mut inner = self.inner.write().expect("MemPool lock poisoned");
304 let mut coins = self.coins_cache.write().expect("CoinsCache lock poisoned");
305
306 let mut to_remove = HashSet::new();
307
308 for tx in txs {
310 let conflicts = Self::collect_conflicts(tx, &inner);
311 to_remove.extend(conflicts);
312 }
313
314 let mut expanded = HashSet::new();
316 for &entry_id in &to_remove {
317 inner.calculate_descendants(entry_id, &mut expanded);
318 }
319
320 let mut removed_txs = Vec::with_capacity(expanded.len());
322 for &entry_id in &expanded {
323 if let Some(entry) = inner.arena.get(entry_id) {
324 removed_txs.push(entry.tx.clone());
325 }
326 }
327
328 inner.remove_staged(&expanded, false, RemovalReason::Conflict);
330
331 for tx in removed_txs {
333 coins.remove_mempool_tx(&tx);
334 }
335
336 self.transactions_updated
338 .fetch_add(expanded.len() as u32, Ordering::SeqCst);
339
340 Ok(())
341 }
342
343 pub fn remove_for_reorg(
353 &self,
354 new_tip_height: u32,
355 new_best_block: Block::Hash,
356 ) -> Result<(), MempoolError> {
357 let mut inner = self.inner.write().expect("MemPool lock poisoned");
358 let mut coins = self.coins_cache.write().expect("CoinsCache lock poisoned");
359
360 let fallback_time = std::time::SystemTime::now()
362 .duration_since(std::time::UNIX_EPOCH)
363 .expect("Time went backwards")
364 .as_secs() as i64;
365
366 let new_tip_mtp =
367 if let Some(bitcoin_block_hash) = self.client.bitcoin_block_hash_for(new_best_block) {
368 self.client
369 .get_block_metadata(bitcoin_block_hash)
370 .map(|metadata| metadata.median_time_past)
371 .unwrap_or(fallback_time)
372 } else {
373 fallback_time
374 };
375
376 let mut to_remove = HashSet::new();
377
378 for (entry_id, entry) in inner.arena.iter_by_entry_time() {
380 let mut invalid = false;
381
382 if entry.lock_points.height > 0 && entry.lock_points.height > new_tip_height as i32 {
384 invalid = true;
385 }
386
387 if entry.lock_points.time > 0 && entry.lock_points.time > new_tip_mtp {
389 invalid = true;
390 }
391
392 if let Some(max_input_block) = entry.lock_points.max_input_block {
396 let is_on_active_chain = self.client.is_block_on_active_chain(max_input_block);
397
398 if !is_on_active_chain {
399 invalid = true;
400 }
401 }
402
403 if entry.spends_coinbase {
405 let min_required_height = entry.entry_height.saturating_add(100);
409 if new_tip_height < min_required_height {
410 invalid = true;
411 }
412 }
413
414 if invalid {
415 to_remove.insert(entry_id);
416 }
417 }
418
419 if to_remove.is_empty() {
420 coins.on_block_connected(new_best_block);
422 return Ok(());
423 }
424
425 let mut expanded = HashSet::new();
427 for &entry_id in &to_remove {
428 inner.calculate_descendants(entry_id, &mut expanded);
429 }
430
431 let mut removed_txs = Vec::with_capacity(expanded.len());
433 for &entry_id in &expanded {
434 if let Some(entry) = inner.arena.get(entry_id) {
435 removed_txs.push(entry.tx.clone());
436 }
437 }
438
439 inner.remove_staged(&expanded, false, RemovalReason::Reorg);
441
442 for tx in removed_txs {
444 coins.remove_mempool_tx(&tx);
445 }
446
447 coins.on_block_connected(new_best_block);
449
450 self.transactions_updated
452 .fetch_add(expanded.len() as u32, Ordering::SeqCst);
453
454 Ok(())
455 }
456
457 pub fn accept_package(
461 &self,
462 transactions: Vec<Transaction>,
463 ) -> Result<PackageValidationResult, MempoolError> {
464 if !self.options.enable_package_relay {
465 return Err(MempoolError::PackageRelayDisabled);
466 }
467
468 let mut inner = self.inner.write().expect("MemPool lock poisoned");
470 let mut coins = self.coins_cache.write().expect("CoinsCache lock poisoned");
471
472 let best_block = coins.best_block();
474 let current_height: u32 = self
475 .client
476 .info()
477 .best_number
478 .try_into()
479 .unwrap_or_else(|_| panic!("Block number must fit into u32"));
480 let current_time = std::time::SystemTime::now()
481 .duration_since(std::time::UNIX_EPOCH)
482 .expect("Time went backwards")
483 .as_secs() as i64;
484
485 let current_mtp =
487 if let Some(bitcoin_block_hash) = self.client.bitcoin_block_hash_for(best_block) {
488 self.client
489 .get_block_metadata(bitcoin_block_hash)
490 .map(|metadata| metadata.median_time_past)
491 .unwrap_or(current_time) } else {
493 current_time };
495
496 let arc_txs: Vec<_> = transactions.into_iter().map(Arc::new).collect();
498
499 let package = Package {
500 transactions: arc_txs,
501 };
502
503 let sequence_start = self.sequence_number.load(Ordering::SeqCst);
505 let result = validation::validate_package(
506 &package,
507 &mut inner,
508 &mut coins,
509 &self.options,
510 current_height,
511 best_block,
512 current_mtp,
513 sequence_start,
514 )?;
515
516 self.sequence_number.store(
518 sequence_start + result.accepted.len() as u64,
519 Ordering::SeqCst,
520 );
521
522 self.transactions_updated
524 .fetch_add(result.accepted.len() as u32, Ordering::SeqCst);
525
526 Ok(result)
527 }
528
529 pub fn contains_txid(&self, txid: &bitcoin::Txid) -> bool {
533 self.inner
534 .read()
535 .expect("MemPool lock poisoned")
536 .arena
537 .get_by_txid(txid)
538 .is_some()
539 }
540
541 pub fn get_transaction(&self, txid: &bitcoin::Txid) -> Option<Arc<Transaction>> {
543 self.inner
544 .read()
545 .expect("MemPool lock poisoned")
546 .arena
547 .get_by_txid(txid)
548 .map(|entry_id| {
549 self.inner
550 .read()
551 .expect("MemPool lock poisoned")
552 .arena
553 .get(entry_id)
554 .expect("Entry ID must be valid")
555 .tx
556 .clone()
557 })
558 }
559
560 pub fn pending_broadcast_txs(&self) -> Vec<(bitcoin::Txid, u64)> {
562 let inner = self.inner.read().expect("MemPool lock poisoned");
563 inner
564 .unbroadcast
565 .iter()
566 .filter_map(|txid| {
567 inner.arena.get_by_txid(txid).and_then(|entry_id| {
568 inner.arena.get(entry_id).map(|entry| {
569 let vsize = entry.tx_weight.to_wu().div_ceil(4); let fee_rate = (entry.fee.to_sat() * 1000) / vsize;
572 (*txid, fee_rate)
573 })
574 })
575 })
576 .collect()
577 }
578
579 pub fn mark_broadcast_txs(&self, txids: &[bitcoin::Txid]) {
581 let mut inner = self.inner.write().expect("MemPool lock poisoned");
582 for txid in txids {
583 inner.unbroadcast.remove(txid);
584 }
585 }
586
587 pub fn iter_txids_by_priority(&self) -> Vec<(bitcoin::Txid, u64)> {
589 let inner = self.inner.read().expect("MemPool lock poisoned");
590 inner
592 .arena
593 .iter_by_ancestor_score()
594 .map(|(_, entry)| {
595 let txid = entry.tx.compute_txid();
596 let vsize = entry.tx_weight.to_wu().div_ceil(4);
598 let fee_rate = (entry.fee.to_sat() * 1000) / vsize;
599 (txid, fee_rate)
600 })
601 .collect()
602 }
603
604 fn to_validation_result(
606 &self,
607 txid: bitcoin::Txid,
608 result: Result<(), MempoolError>,
609 ) -> subcoin_primitives::tx_pool::TxValidationResult {
610 match result {
611 Ok(()) => {
612 let inner = self.inner.read().expect("MemPool lock poisoned");
614 let fee_rate = inner
615 .arena
616 .get_by_txid(&txid)
617 .and_then(|entry_id| {
618 inner.arena.get(entry_id).map(|entry| {
619 let vsize = entry.tx_weight.to_wu().div_ceil(4);
621 (entry.fee.to_sat() * 1000) / vsize
622 })
623 })
624 .unwrap_or(0);
625
626 TxValidationResult::Accepted { txid, fee_rate }
627 }
628 Err(err) => {
629 let reason = match err {
630 MempoolError::AlreadyInMempool => {
632 RejectionReason::Soft(SoftRejection::AlreadyInMempool)
633 }
634 MempoolError::MissingInputs { parents } => {
635 RejectionReason::Soft(SoftRejection::MissingInputs { parents })
636 }
637 MempoolError::FeeTooLow {
638 min_kvb,
639 actual_kvb,
640 } => RejectionReason::Soft(SoftRejection::FeeTooLow {
641 min_kvb,
642 actual_kvb,
643 }),
644 MempoolError::MempoolFull => RejectionReason::Soft(SoftRejection::MempoolFull),
645 MempoolError::TooManyAncestors(count) => {
646 RejectionReason::Soft(SoftRejection::TooManyAncestors(count))
647 }
648 MempoolError::TooManyDescendants(count) => {
649 RejectionReason::Soft(SoftRejection::TooManyDescendants(count))
650 }
651 MempoolError::TxConflict(msg) => {
652 RejectionReason::Soft(SoftRejection::TxConflict(msg))
653 }
654 MempoolError::NoConflictToReplace => {
655 RejectionReason::Soft(SoftRejection::NoConflictToReplace)
656 }
657 MempoolError::TxNotReplaceable => {
658 RejectionReason::Soft(SoftRejection::TxNotReplaceable)
659 }
660 MempoolError::TooManyReplacements(count) => {
661 RejectionReason::Soft(SoftRejection::TooManyReplacements(count))
662 }
663 MempoolError::NewUnconfirmedInput => {
664 RejectionReason::Soft(SoftRejection::NewUnconfirmedInput)
665 }
666 MempoolError::InsufficientFee(msg) => {
667 RejectionReason::Soft(SoftRejection::InsufficientFee(msg))
668 }
669 MempoolError::PackageTooLarge(count, max) => {
670 RejectionReason::Soft(SoftRejection::PackageTooLarge(count, max))
671 }
672 MempoolError::PackageSizeTooLarge(size) => {
673 RejectionReason::Soft(SoftRejection::PackageSizeTooLarge(size))
674 }
675 MempoolError::PackageCyclicDependencies => {
676 RejectionReason::Soft(SoftRejection::PackageCyclicDependencies)
677 }
678 MempoolError::PackageFeeTooLow(msg) => {
679 RejectionReason::Soft(SoftRejection::PackageFeeTooLow(msg))
680 }
681 MempoolError::PackageTxValidationFailed(txid, msg) => {
682 RejectionReason::Soft(SoftRejection::PackageTxValidationFailed(txid, msg))
683 }
684 MempoolError::PackageRelayDisabled => {
685 RejectionReason::Soft(SoftRejection::PackageRelayDisabled)
686 }
687
688 MempoolError::Coinbase => RejectionReason::Hard(HardRejection::Coinbase),
690 MempoolError::NotStandard(msg) => {
691 RejectionReason::Hard(HardRejection::NotStandard(msg))
692 }
693 MempoolError::TxVersionNotStandard => {
694 RejectionReason::Hard(HardRejection::TxVersionNotStandard)
695 }
696 MempoolError::TxSizeTooSmall => {
697 RejectionReason::Hard(HardRejection::TxSizeTooSmall)
698 }
699 MempoolError::NonFinal => RejectionReason::Hard(HardRejection::NonFinal),
700 MempoolError::NonBIP68Final => {
701 RejectionReason::Hard(HardRejection::NonBIP68Final)
702 }
703 MempoolError::TooManySigops(count) => {
704 RejectionReason::Hard(HardRejection::TooManySigops(count))
705 }
706 MempoolError::NegativeFee => RejectionReason::Hard(HardRejection::NegativeFee),
707 MempoolError::FeeOverflow => RejectionReason::Hard(HardRejection::FeeOverflow),
708 MempoolError::InvalidFeeRate(msg) => {
709 RejectionReason::Hard(HardRejection::InvalidFeeRate(msg))
710 }
711 MempoolError::AncestorSizeTooLarge(size) => {
712 RejectionReason::Hard(HardRejection::AncestorSizeTooLarge(size))
713 }
714 MempoolError::DescendantSizeTooLarge(size) => {
715 RejectionReason::Hard(HardRejection::DescendantSizeTooLarge(size))
716 }
717 MempoolError::ScriptValidationFailed(msg) => {
718 RejectionReason::Hard(HardRejection::ScriptValidationFailed(msg))
719 }
720 MempoolError::TxError(err) => {
721 RejectionReason::Hard(HardRejection::TxError(err.to_string()))
722 }
723 MempoolError::RuntimeApi(msg) => {
724 RejectionReason::Hard(HardRejection::RuntimeApi(msg))
725 }
726 MempoolError::MissingConflict => {
727 RejectionReason::Hard(HardRejection::RuntimeApi(
729 "Missing conflict transaction".to_string(),
730 ))
731 }
732 };
733
734 TxValidationResult::Rejected { txid, reason }
735 }
736 }
737 }
738}
739
740impl<Block, Client> subcoin_primitives::tx_pool::TxPool for MemPool<Block, Client>
743where
744 Block: BlockT + 'static,
745 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore + Send + Sync + 'static,
746 Client::Api: subcoin_runtime_primitives::SubcoinApi<Block>,
747{
748 fn validate_transaction(
749 &self,
750 tx: Transaction,
751 ) -> subcoin_primitives::tx_pool::TxValidationResult {
752 let txid = tx.compute_txid();
753 let result = self.accept_single_transaction(tx);
754 self.to_validation_result(txid, result)
755 }
756
757 fn contains(&self, txid: &bitcoin::Txid) -> bool {
758 self.contains_txid(txid)
759 }
760
761 fn get(&self, txid: &bitcoin::Txid) -> Option<Arc<Transaction>> {
762 self.get_transaction(txid)
763 }
764
765 fn pending_broadcast(&self) -> Vec<(bitcoin::Txid, u64)> {
766 self.pending_broadcast_txs()
767 }
768
769 fn mark_broadcast(&self, txids: &[bitcoin::Txid]) {
770 self.mark_broadcast_txs(txids)
771 }
772
773 fn iter_txids(&self) -> Box<dyn Iterator<Item = (bitcoin::Txid, u64)> + Send> {
774 Box::new(self.iter_txids_by_priority().into_iter())
775 }
776
777 fn info(&self) -> subcoin_primitives::tx_pool::TxPoolInfo {
778 let inner = self.inner.read().expect("MemPool lock poisoned");
779 subcoin_primitives::tx_pool::TxPoolInfo {
780 size: inner.size(),
781 bytes: inner.total_size(),
782 usage: inner.total_size(), min_fee_rate: self.options.min_relay_fee_rate().as_sat_per_kvb(),
784 }
785 }
786}