subcoin_service/
finalizer.rs

1use futures::StreamExt;
2use parking_lot::Mutex;
3use sc_client_api::{BlockchainEvents, Finalizer, HeaderBackend};
4use sc_network_sync::SyncingService;
5use sc_service::SpawnTaskHandle;
6use sp_consensus::SyncOracle;
7use sp_runtime::traits::{Block as BlockT, CheckedSub, NumberFor};
8use std::marker::PhantomData;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, Ordering};
11use subcoin_network::NetworkApi;
12
13type BlockInfo<Block> = (NumberFor<Block>, <Block as BlockT>::Hash);
14
15/// This struct is responsible for finalizing blocks with enough confirmations.
16pub struct SubcoinFinalizer<Block: BlockT, Client, Backend> {
17    client: Arc<Client>,
18    spawn_handle: SpawnTaskHandle,
19    confirmation_depth: u32,
20    subcoin_network_api: Arc<dyn NetworkApi>,
21    substrate_sync_service: Option<Arc<SyncingService<Block>>>,
22    _phantom: PhantomData<Backend>,
23}
24
25impl<Block, Client, Backend> SubcoinFinalizer<Block, Client, Backend>
26where
27    Block: BlockT + 'static,
28    Client: HeaderBackend<Block> + Finalizer<Block, Backend> + BlockchainEvents<Block> + 'static,
29    Backend: sc_client_api::backend::Backend<Block> + 'static,
30{
31    /// Constructs a new instance of [`SubcoinFinalizer`].
32    pub fn new(
33        client: Arc<Client>,
34        spawn_handle: SpawnTaskHandle,
35        confirmation_depth: u32,
36        subcoin_network_api: Arc<dyn NetworkApi>,
37        substrate_sync_service: Option<Arc<SyncingService<Block>>>,
38    ) -> Self {
39        Self {
40            client,
41            spawn_handle,
42            confirmation_depth,
43            subcoin_network_api,
44            substrate_sync_service,
45            _phantom: Default::default(),
46        }
47    }
48
49    /// The future needs to be spawned in the background.
50    pub async fn run(self) {
51        let Self {
52            client,
53            spawn_handle,
54            confirmation_depth,
55            subcoin_network_api,
56            substrate_sync_service,
57            _phantom,
58        } = self;
59
60        // Use `every_import_notification_stream()` so that we can receive the notifications even when
61        // the Substrate network is major syncing.
62        let mut block_import_stream = client.every_import_notification_stream();
63
64        let cached_block_to_finalize: Arc<Mutex<Option<BlockInfo<Block>>>> =
65            Arc::new(Mutex::new(None));
66
67        let finalizer_worker_is_busy = Arc::new(AtomicBool::new(false));
68
69        while let Some(notification) = block_import_stream.next().await {
70            let block_number = client
71                .number(notification.hash)
72                .ok()
73                .flatten()
74                .expect("Imported block must be available; qed");
75
76            let Some(confirmed_block_number) = block_number.checked_sub(&confirmation_depth.into())
77            else {
78                continue;
79            };
80
81            let finalized_number = client.info().finalized_number;
82
83            if confirmed_block_number <= finalized_number {
84                continue;
85            }
86
87            let confirmed_block_hash = client
88                .hash(confirmed_block_number)
89                .ok()
90                .flatten()
91                .expect("Confirmed block must be available; qed");
92
93            let try_update_cached_block_to_finalize = || {
94                let mut cached_block_to_finalize = cached_block_to_finalize.lock();
95
96                let should_update = cached_block_to_finalize
97                    .map(|(cached_block_number, _)| confirmed_block_number > cached_block_number)
98                    .unwrap_or(true);
99
100                if should_update {
101                    cached_block_to_finalize
102                        .replace((confirmed_block_number, confirmed_block_hash));
103                }
104
105                drop(cached_block_to_finalize);
106            };
107
108            if let Some(sync_service) = substrate_sync_service.as_ref() {
109                // State sync relies on the finalized block notification to progress
110                // Substrate chain sync component relies on the finalized block notification to
111                // initiate the state sync, do not attempt to finalize the block when the queued blocks
112                // are not empty, so that the state sync can be started when the last finalized block
113                // notification is sent.
114                if sync_service.is_major_syncing()
115                    && sync_service
116                        .status()
117                        .await
118                        .map(|status| status.queued_blocks)
119                        .unwrap_or(0)
120                        > 0
121                {
122                    try_update_cached_block_to_finalize();
123                    continue;
124                }
125            }
126
127            if finalizer_worker_is_busy.load(Ordering::SeqCst) {
128                try_update_cached_block_to_finalize();
129                continue;
130            }
131
132            let client = client.clone();
133            let subcoin_network_api = subcoin_network_api.clone();
134            let substrate_sync_service = substrate_sync_service.clone();
135            let finalizer_worker_is_busy = finalizer_worker_is_busy.clone();
136            let cached_block_to_finalize = cached_block_to_finalize.clone();
137
138            finalizer_worker_is_busy.store(true, Ordering::SeqCst);
139
140            spawn_handle.spawn(
141                "finalize-block",
142                None,
143                Box::pin(async move {
144                    do_finalize_block(
145                        &client,
146                        confirmed_block_number,
147                        confirmed_block_hash,
148                        &subcoin_network_api,
149                        substrate_sync_service.as_ref(),
150                    );
151
152                    let mut cached_block_to_finalize = cached_block_to_finalize.lock();
153                    let maybe_cached_block_to_finalize = cached_block_to_finalize.take();
154                    drop(cached_block_to_finalize);
155
156                    if let Some((cached_block_number, cached_block_hash)) =
157                        maybe_cached_block_to_finalize
158                    {
159                        do_finalize_block(
160                            &client,
161                            cached_block_number,
162                            cached_block_hash,
163                            &subcoin_network_api,
164                            substrate_sync_service.as_ref(),
165                        );
166                    }
167
168                    finalizer_worker_is_busy.store(false, Ordering::SeqCst);
169                }),
170            );
171        }
172    }
173}
174
175fn do_finalize_block<Block, Client, Backend>(
176    client: &Arc<Client>,
177    confirmed_block_number: NumberFor<Block>,
178    confirmed_block_hash: Block::Hash,
179    subcoin_network_api: &Arc<dyn NetworkApi>,
180    substrate_sync_service: Option<&Arc<SyncingService<Block>>>,
181) where
182    Block: BlockT,
183    Client: HeaderBackend<Block> + Finalizer<Block, Backend>,
184    Backend: sc_client_api::backend::Backend<Block>,
185{
186    let finalized_number = client.info().finalized_number;
187
188    if confirmed_block_number <= finalized_number {
189        return;
190    }
191
192    match client.finalize_block(confirmed_block_hash, None, true) {
193        Ok(()) => {
194            let is_major_syncing = subcoin_network_api.is_major_syncing()
195                || substrate_sync_service
196                    .map(|sync_service| sync_service.is_major_syncing())
197                    .unwrap_or(false);
198
199            // Only print the log when not major syncing to not clutter the logs.
200            if !is_major_syncing {
201                tracing::info!(
202                    "✅ Successfully finalized block #{confirmed_block_number},{confirmed_block_hash}"
203                );
204            }
205        }
206        Err(err) => {
207            tracing::warn!(
208                ?err,
209                ?finalized_number,
210                "Failed to finalize block #{confirmed_block_number},{confirmed_block_hash}",
211            );
212        }
213    }
214}