subcoin_service/
finalizer.rs1use futures::StreamExt;
2use parking_lot::Mutex;
3use sc_client_api::{BlockchainEvents, Finalizer, HeaderBackend};
4use sc_network_sync::SyncingService;
5use sc_service::SpawnTaskHandle;
6use sp_consensus::SyncOracle;
7use sp_runtime::traits::{Block as BlockT, CheckedSub, NumberFor};
8use std::marker::PhantomData;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, Ordering};
11use subcoin_network::NetworkApi;
12
13type BlockInfo<Block> = (NumberFor<Block>, <Block as BlockT>::Hash);
14
15pub struct SubcoinFinalizer<Block: BlockT, Client, Backend> {
17 client: Arc<Client>,
18 spawn_handle: SpawnTaskHandle,
19 confirmation_depth: u32,
20 subcoin_network_api: Arc<dyn NetworkApi>,
21 substrate_sync_service: Option<Arc<SyncingService<Block>>>,
22 _phantom: PhantomData<Backend>,
23}
24
25impl<Block, Client, Backend> SubcoinFinalizer<Block, Client, Backend>
26where
27 Block: BlockT + 'static,
28 Client: HeaderBackend<Block> + Finalizer<Block, Backend> + BlockchainEvents<Block> + 'static,
29 Backend: sc_client_api::backend::Backend<Block> + 'static,
30{
31 pub fn new(
33 client: Arc<Client>,
34 spawn_handle: SpawnTaskHandle,
35 confirmation_depth: u32,
36 subcoin_network_api: Arc<dyn NetworkApi>,
37 substrate_sync_service: Option<Arc<SyncingService<Block>>>,
38 ) -> Self {
39 Self {
40 client,
41 spawn_handle,
42 confirmation_depth,
43 subcoin_network_api,
44 substrate_sync_service,
45 _phantom: Default::default(),
46 }
47 }
48
49 pub async fn run(self) {
51 let Self {
52 client,
53 spawn_handle,
54 confirmation_depth,
55 subcoin_network_api,
56 substrate_sync_service,
57 _phantom,
58 } = self;
59
60 let mut block_import_stream = client.every_import_notification_stream();
63
64 let cached_block_to_finalize: Arc<Mutex<Option<BlockInfo<Block>>>> =
65 Arc::new(Mutex::new(None));
66
67 let finalizer_worker_is_busy = Arc::new(AtomicBool::new(false));
68
69 while let Some(notification) = block_import_stream.next().await {
70 let block_number = client
71 .number(notification.hash)
72 .ok()
73 .flatten()
74 .expect("Imported block must be available; qed");
75
76 let Some(confirmed_block_number) = block_number.checked_sub(&confirmation_depth.into())
77 else {
78 continue;
79 };
80
81 let finalized_number = client.info().finalized_number;
82
83 if confirmed_block_number <= finalized_number {
84 continue;
85 }
86
87 let confirmed_block_hash = client
88 .hash(confirmed_block_number)
89 .ok()
90 .flatten()
91 .expect("Confirmed block must be available; qed");
92
93 let try_update_cached_block_to_finalize = || {
94 let mut cached_block_to_finalize = cached_block_to_finalize.lock();
95
96 let should_update = cached_block_to_finalize
97 .map(|(cached_block_number, _)| confirmed_block_number > cached_block_number)
98 .unwrap_or(true);
99
100 if should_update {
101 cached_block_to_finalize
102 .replace((confirmed_block_number, confirmed_block_hash));
103 }
104
105 drop(cached_block_to_finalize);
106 };
107
108 if let Some(sync_service) = substrate_sync_service.as_ref() {
109 if sync_service.is_major_syncing()
115 && sync_service
116 .status()
117 .await
118 .map(|status| status.queued_blocks)
119 .unwrap_or(0)
120 > 0
121 {
122 try_update_cached_block_to_finalize();
123 continue;
124 }
125 }
126
127 if finalizer_worker_is_busy.load(Ordering::SeqCst) {
128 try_update_cached_block_to_finalize();
129 continue;
130 }
131
132 let client = client.clone();
133 let subcoin_network_api = subcoin_network_api.clone();
134 let substrate_sync_service = substrate_sync_service.clone();
135 let finalizer_worker_is_busy = finalizer_worker_is_busy.clone();
136 let cached_block_to_finalize = cached_block_to_finalize.clone();
137
138 finalizer_worker_is_busy.store(true, Ordering::SeqCst);
139
140 spawn_handle.spawn(
141 "finalize-block",
142 None,
143 Box::pin(async move {
144 do_finalize_block(
145 &client,
146 confirmed_block_number,
147 confirmed_block_hash,
148 &subcoin_network_api,
149 substrate_sync_service.as_ref(),
150 );
151
152 let mut cached_block_to_finalize = cached_block_to_finalize.lock();
153 let maybe_cached_block_to_finalize = cached_block_to_finalize.take();
154 drop(cached_block_to_finalize);
155
156 if let Some((cached_block_number, cached_block_hash)) =
157 maybe_cached_block_to_finalize
158 {
159 do_finalize_block(
160 &client,
161 cached_block_number,
162 cached_block_hash,
163 &subcoin_network_api,
164 substrate_sync_service.as_ref(),
165 );
166 }
167
168 finalizer_worker_is_busy.store(false, Ordering::SeqCst);
169 }),
170 );
171 }
172 }
173}
174
175fn do_finalize_block<Block, Client, Backend>(
176 client: &Arc<Client>,
177 confirmed_block_number: NumberFor<Block>,
178 confirmed_block_hash: Block::Hash,
179 subcoin_network_api: &Arc<dyn NetworkApi>,
180 substrate_sync_service: Option<&Arc<SyncingService<Block>>>,
181) where
182 Block: BlockT,
183 Client: HeaderBackend<Block> + Finalizer<Block, Backend>,
184 Backend: sc_client_api::backend::Backend<Block>,
185{
186 let finalized_number = client.info().finalized_number;
187
188 if confirmed_block_number <= finalized_number {
189 return;
190 }
191
192 match client.finalize_block(confirmed_block_hash, None, true) {
193 Ok(()) => {
194 let is_major_syncing = subcoin_network_api.is_major_syncing()
195 || substrate_sync_service
196 .map(|sync_service| sync_service.is_major_syncing())
197 .unwrap_or(false);
198
199 if !is_major_syncing {
201 tracing::info!(
202 "✅ Successfully finalized block #{confirmed_block_number},{confirmed_block_hash}"
203 );
204 }
205 }
206 Err(err) => {
207 tracing::warn!(
208 ?err,
209 ?finalized_number,
210 "Failed to finalize block #{confirmed_block_number},{confirmed_block_hash}",
211 );
212 }
213 }
214}