subcoin_network/
network_api.rs

1//! This module provides the interfaces for external interaction with Subcoin network.
2
3#[cfg(test)]
4use crate::Error;
5use crate::PeerId;
6#[cfg(test)]
7use crate::peer_connection::Direction;
8use crate::sync::PeerSync;
9#[cfg(test)]
10use crate::sync::SyncAction;
11#[cfg(test)]
12use bitcoin::p2p::message::NetworkMessage;
13use bitcoin::{Transaction, Txid};
14use sc_utils::mpsc::TracingUnboundedSender;
15use serde::{Deserialize, Serialize};
16use std::sync::Arc;
17use std::sync::atomic::AtomicBool;
18use tokio::sync::oneshot;
19
20/// Represents the sync status of node.
21#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(rename_all = "camelCase")]
23pub enum SyncStatus {
24    /// The node is idle and not currently major syncing.
25    Idle,
26    /// The node is downloading blocks from peers.
27    ///
28    /// `target` specifies the block number the node aims to reach.
29    /// `peers` is a list of peers from which the node is downloading.
30    Downloading { target: u32, peers: Vec<PeerId> },
31    /// The node is importing downloaded blocks into the local database.
32    Importing { target: u32, peers: Vec<PeerId> },
33}
34
35/// Represents the status of network.
36#[derive(Debug, Clone, Serialize, Deserialize)]
37#[serde(rename_all = "camelCase")]
38pub struct NetworkStatus {
39    /// The number of peers currently connected to the node.
40    pub num_connected_peers: usize,
41    /// The total number of bytes received from the network.
42    pub total_bytes_inbound: u64,
43    /// The total number of bytes sent to the network.
44    pub total_bytes_outbound: u64,
45    /// Current sync status of the node.
46    pub sync_status: SyncStatus,
47}
48
49/// Represents the result of submitting a transaction to the network.
50#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
51#[serde(rename_all = "camelCase")]
52pub enum SendTransactionResult {
53    /// Transaction was submitted successfully.
54    Success(Txid),
55    /// An error occurred during the transaction submission.
56    Failure(String),
57}
58
59/// An incoming transaction from RPC or network.
60#[derive(Debug)]
61pub(crate) struct IncomingTransaction {
62    pub(crate) txid: Txid,
63    pub(crate) transaction: Transaction,
64}
65
66/// Represents the different messages that can be sent to the network processor.
67#[derive(Debug)]
68pub(crate) enum NetworkProcessorMessage {
69    /// Request the current network status.
70    RequestNetworkStatus(oneshot::Sender<NetworkStatus>),
71    /// Request the sync peers.
72    RequestSyncPeers(oneshot::Sender<Vec<PeerSync>>),
73    /// Request the number of inbound connected peers.
74    RequestInboundPeersCount(oneshot::Sender<usize>),
75    /// Request a specific transaction by its Txid.
76    RequestTransaction(Txid, oneshot::Sender<Option<Transaction>>),
77    /// Submit a transaction to the transaction manager.
78    SendTransaction((IncomingTransaction, oneshot::Sender<SendTransactionResult>)),
79    /// Enable the block sync within the chain sync component.
80    StartBlockSync,
81    /// Request a local addr for the connection to given peer_id if any.
82    #[cfg(test)]
83    RequestLocalAddr(PeerId, oneshot::Sender<Option<PeerId>>),
84    #[cfg(test)]
85    ProcessNetworkMessage {
86        from: PeerId,
87        direction: Direction,
88        payload: NetworkMessage,
89        result_sender: oneshot::Sender<Result<SyncAction, Error>>,
90    },
91    #[cfg(test)]
92    ExecuteSyncAction(SyncAction, oneshot::Sender<()>),
93}
94
95/// A handle for interacting with the network processor.
96///
97/// This handle allows sending messages to the network processor and provides a simple
98/// way to check if the node is performing a major synchronization.
99#[derive(Debug, Clone)]
100pub struct NetworkHandle {
101    pub(crate) processor_msg_sender: TracingUnboundedSender<NetworkProcessorMessage>,
102    // A simple flag to know whether the node is doing the major sync.
103    pub(crate) is_major_syncing: Arc<AtomicBool>,
104}
105
106impl NetworkHandle {
107    /// Provides high-level status information about network.
108    ///
109    /// Returns None if the `NetworkProcessor` is no longer running.
110    pub async fn status(&self) -> Option<NetworkStatus> {
111        let (sender, receiver) = oneshot::channel();
112
113        self.processor_msg_sender
114            .unbounded_send(NetworkProcessorMessage::RequestNetworkStatus(sender))
115            .ok();
116
117        receiver.await.ok()
118    }
119
120    /// Returns the currently syncing peers.
121    pub async fn sync_peers(&self) -> Vec<PeerSync> {
122        let (sender, receiver) = oneshot::channel();
123
124        if self
125            .processor_msg_sender
126            .unbounded_send(NetworkProcessorMessage::RequestSyncPeers(sender))
127            .is_err()
128        {
129            return Vec::new();
130        }
131
132        receiver.await.unwrap_or_default()
133    }
134
135    /// Retrieves a transaction by its Txid.
136    pub async fn get_transaction(&self, txid: Txid) -> Option<Transaction> {
137        let (sender, receiver) = oneshot::channel();
138
139        if self
140            .processor_msg_sender
141            .unbounded_send(NetworkProcessorMessage::RequestTransaction(txid, sender))
142            .is_err()
143        {
144            return None;
145        }
146
147        receiver.await.ok().flatten()
148    }
149
150    /// Sends a transaction to the network.
151    pub async fn send_transaction(&self, transaction: Transaction) -> SendTransactionResult {
152        let (sender, receiver) = oneshot::channel();
153
154        let txid = transaction.compute_txid();
155        let incoming_transaction = IncomingTransaction { txid, transaction };
156
157        if self
158            .processor_msg_sender
159            .unbounded_send(NetworkProcessorMessage::SendTransaction((
160                incoming_transaction,
161                sender,
162            )))
163            .is_err()
164        {
165            return SendTransactionResult::Failure(format!(
166                "Failed to send transaction ({txid}) to net processor"
167            ));
168        }
169
170        receiver
171            .await
172            .unwrap_or(SendTransactionResult::Failure("Internal error".to_string()))
173    }
174
175    /// Starts the block sync in chain sync component.
176    pub fn start_block_sync(&self) -> bool {
177        self.processor_msg_sender
178            .unbounded_send(NetworkProcessorMessage::StartBlockSync)
179            .is_ok()
180    }
181
182    /// Returns a flag indicating whether the node is actively performing a major sync.
183    pub fn is_major_syncing(&self) -> Arc<AtomicBool> {
184        self.is_major_syncing.clone()
185    }
186
187    #[cfg(test)]
188    pub async fn local_addr_for(&self, peer_addr: PeerId) -> Option<PeerId> {
189        let (sender, receiver) = oneshot::channel();
190
191        self.processor_msg_sender
192            .unbounded_send(NetworkProcessorMessage::RequestLocalAddr(peer_addr, sender))
193            .expect("Failed to request local addr");
194
195        receiver.await.unwrap_or_default()
196    }
197
198    #[cfg(test)]
199    pub async fn process_network_message(
200        &self,
201        from: PeerId,
202        direction: Direction,
203        msg: NetworkMessage,
204    ) -> Result<SyncAction, Error> {
205        let (sender, receiver) = oneshot::channel();
206
207        self.processor_msg_sender
208            .unbounded_send(NetworkProcessorMessage::ProcessNetworkMessage {
209                from,
210                direction,
211                payload: msg,
212                result_sender: sender,
213            })
214            .expect("Failed to send outbound peer message");
215
216        receiver.await.unwrap()
217    }
218
219    #[cfg(test)]
220    pub async fn execute_sync_action(&self, sync_action: SyncAction) {
221        let (sender, receiver) = oneshot::channel();
222
223        self.processor_msg_sender
224            .unbounded_send(NetworkProcessorMessage::ExecuteSyncAction(
225                sync_action,
226                sender,
227            ))
228            .expect("Failed to execute sync action");
229
230        receiver.await.unwrap();
231    }
232}
233
234/// Subcoin network service interface.
235#[async_trait::async_trait]
236pub trait NetworkApi: Send + Sync {
237    /// Whether the network instnace is running.
238    fn enabled(&self) -> bool;
239
240    /// Provides high-level status information about network.
241    ///
242    /// Returns None if the `NetworkProcessor` is no longer running.
243    async fn status(&self) -> Option<NetworkStatus>;
244
245    /// Returns the currently syncing peers.
246    async fn sync_peers(&self) -> Vec<PeerSync>;
247
248    /// Retrieves a transaction by its Txid.
249    async fn get_transaction(&self, txid: Txid) -> Option<Transaction>;
250
251    /// Sends a transaction to the network.
252    async fn send_transaction(&self, transaction: Transaction) -> SendTransactionResult;
253
254    /// Starts the block sync in chain sync component.
255    fn start_block_sync(&self) -> bool;
256
257    /// Whether the node is actively performing a major sync.
258    fn is_major_syncing(&self) -> bool;
259}
260
261#[async_trait::async_trait]
262impl NetworkApi for NetworkHandle {
263    fn enabled(&self) -> bool {
264        true
265    }
266
267    async fn status(&self) -> Option<NetworkStatus> {
268        Self::status(self).await
269    }
270
271    async fn sync_peers(&self) -> Vec<PeerSync> {
272        Self::sync_peers(self).await
273    }
274
275    async fn get_transaction(&self, txid: Txid) -> Option<Transaction> {
276        Self::get_transaction(self, txid).await
277    }
278
279    async fn send_transaction(&self, transaction: Transaction) -> SendTransactionResult {
280        Self::send_transaction(self, transaction).await
281    }
282
283    fn start_block_sync(&self) -> bool {
284        Self::start_block_sync(self)
285    }
286
287    fn is_major_syncing(&self) -> bool {
288        self.is_major_syncing
289            .load(std::sync::atomic::Ordering::Relaxed)
290    }
291}
292
293/// Subcoin network disabled.
294pub struct NoNetwork;
295
296#[async_trait::async_trait]
297impl NetworkApi for NoNetwork {
298    fn enabled(&self) -> bool {
299        false
300    }
301
302    async fn status(&self) -> Option<NetworkStatus> {
303        None
304    }
305
306    async fn sync_peers(&self) -> Vec<PeerSync> {
307        Vec::new()
308    }
309
310    async fn get_transaction(&self, _txid: Txid) -> Option<Transaction> {
311        None
312    }
313
314    async fn send_transaction(&self, _transaction: Transaction) -> SendTransactionResult {
315        SendTransactionResult::Failure("Network service unavailble".to_string())
316    }
317
318    fn start_block_sync(&self) -> bool {
319        false
320    }
321
322    fn is_major_syncing(&self) -> bool {
323        false
324    }
325}
326
327/// Subcoin network is disabled, but chain is syncing from other source, e.g., importing blocks
328/// from the Bitcoind database.
329pub struct OfflineSync;
330
331#[async_trait::async_trait]
332impl NetworkApi for OfflineSync {
333    fn enabled(&self) -> bool {
334        false
335    }
336
337    async fn status(&self) -> Option<NetworkStatus> {
338        None
339    }
340
341    async fn sync_peers(&self) -> Vec<PeerSync> {
342        Vec::new()
343    }
344
345    async fn get_transaction(&self, _txid: Txid) -> Option<Transaction> {
346        None
347    }
348
349    async fn send_transaction(&self, _transaction: Transaction) -> SendTransactionResult {
350        SendTransactionResult::Failure("Network service unavailble".to_string())
351    }
352
353    fn start_block_sync(&self) -> bool {
354        false
355    }
356
357    fn is_major_syncing(&self) -> bool {
358        // Chain is syncing in the offline mode when using import-blocks command.
359        true
360    }
361}