subcoin_network/
network_api.rs

1//! This module provides the interfaces for external interaction with Subcoin network.
2
3#[cfg(test)]
4use crate::Error;
5use crate::PeerId;
6#[cfg(test)]
7use crate::peer_connection::Direction;
8use crate::sync::PeerSync;
9#[cfg(test)]
10use crate::sync::SyncAction;
11#[cfg(test)]
12use bitcoin::p2p::message::NetworkMessage;
13use bitcoin::{Transaction, Txid};
14use sc_utils::mpsc::TracingUnboundedSender;
15use serde::{Deserialize, Serialize};
16use std::sync::Arc;
17use std::sync::atomic::AtomicBool;
18use tokio::sync::oneshot;
19
20/// Represents the sync status of node.
21#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(rename_all = "camelCase")]
23pub enum SyncStatus {
24    /// The node is idle and not currently major syncing.
25    Idle,
26    /// The node is downloading blocks from peers.
27    ///
28    /// `target` specifies the block number the node aims to reach.
29    /// `peers` is a list of peers from which the node is downloading.
30    Downloading { target: u32, peers: Vec<PeerId> },
31    /// The node is importing downloaded blocks into the local database.
32    Importing { target: u32, peers: Vec<PeerId> },
33}
34
35/// Represents the status of network.
36#[derive(Debug, Clone, Serialize, Deserialize)]
37#[serde(rename_all = "camelCase")]
38pub struct NetworkStatus {
39    /// The number of peers currently connected to the node.
40    pub num_connected_peers: usize,
41    /// The total number of bytes received from the network.
42    pub total_bytes_inbound: u64,
43    /// The total number of bytes sent to the network.
44    pub total_bytes_outbound: u64,
45    /// Current sync status of the node.
46    pub sync_status: SyncStatus,
47}
48
49/// Represents the result of submitting a transaction to the network.
50#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
51#[serde(rename_all = "camelCase")]
52pub enum SendTransactionResult {
53    /// Transaction was submitted successfully.
54    Success(Txid),
55    /// An error occurred during the transaction submission.
56    Failure(String),
57}
58
59/// An incoming transaction from RPC or network.
60#[derive(Debug)]
61pub(crate) struct IncomingTransaction {
62    #[allow(dead_code)]
63    pub(crate) txid: Txid,
64    pub(crate) transaction: Transaction,
65}
66
67/// Represents the different messages that can be sent to the network processor.
68#[derive(Debug)]
69pub(crate) enum NetworkProcessorMessage {
70    /// Request the current network status.
71    RequestNetworkStatus(oneshot::Sender<NetworkStatus>),
72    /// Request the sync peers.
73    RequestSyncPeers(oneshot::Sender<Vec<PeerSync>>),
74    /// Request the number of inbound connected peers.
75    RequestInboundPeersCount(oneshot::Sender<usize>),
76    /// Request a specific transaction by its Txid.
77    RequestTransaction(Txid, oneshot::Sender<Option<Transaction>>),
78    /// Submit a transaction to the transaction manager.
79    SendTransaction((IncomingTransaction, oneshot::Sender<SendTransactionResult>)),
80    /// Enable the block sync within the chain sync component.
81    StartBlockSync,
82    /// Request a local addr for the connection to given peer_id if any.
83    #[cfg(test)]
84    RequestLocalAddr(PeerId, oneshot::Sender<Option<PeerId>>),
85    #[cfg(test)]
86    ProcessNetworkMessage {
87        from: PeerId,
88        direction: Direction,
89        payload: NetworkMessage,
90        result_sender: oneshot::Sender<Result<SyncAction, Error>>,
91    },
92    #[cfg(test)]
93    ExecuteSyncAction(SyncAction, oneshot::Sender<()>),
94}
95
96/// A handle for interacting with the network processor.
97///
98/// This handle allows sending messages to the network processor and provides a simple
99/// way to check if the node is performing a major synchronization.
100#[derive(Debug, Clone)]
101pub struct NetworkHandle {
102    pub(crate) processor_msg_sender: TracingUnboundedSender<NetworkProcessorMessage>,
103    // A simple flag to know whether the node is doing the major sync.
104    pub(crate) is_major_syncing: Arc<AtomicBool>,
105}
106
107impl NetworkHandle {
108    /// Provides high-level status information about network.
109    ///
110    /// Returns None if the `NetworkProcessor` is no longer running.
111    pub async fn status(&self) -> Option<NetworkStatus> {
112        let (sender, receiver) = oneshot::channel();
113
114        self.processor_msg_sender
115            .unbounded_send(NetworkProcessorMessage::RequestNetworkStatus(sender))
116            .ok();
117
118        receiver.await.ok()
119    }
120
121    /// Returns the currently syncing peers.
122    pub async fn sync_peers(&self) -> Vec<PeerSync> {
123        let (sender, receiver) = oneshot::channel();
124
125        if self
126            .processor_msg_sender
127            .unbounded_send(NetworkProcessorMessage::RequestSyncPeers(sender))
128            .is_err()
129        {
130            return Vec::new();
131        }
132
133        receiver.await.unwrap_or_default()
134    }
135
136    /// Retrieves a transaction by its Txid.
137    pub async fn get_transaction(&self, txid: Txid) -> Option<Transaction> {
138        let (sender, receiver) = oneshot::channel();
139
140        if self
141            .processor_msg_sender
142            .unbounded_send(NetworkProcessorMessage::RequestTransaction(txid, sender))
143            .is_err()
144        {
145            return None;
146        }
147
148        receiver.await.ok().flatten()
149    }
150
151    /// Sends a transaction to the network.
152    pub async fn send_transaction(&self, transaction: Transaction) -> SendTransactionResult {
153        let (sender, receiver) = oneshot::channel();
154
155        let txid = transaction.compute_txid();
156        let incoming_transaction = IncomingTransaction { txid, transaction };
157
158        if self
159            .processor_msg_sender
160            .unbounded_send(NetworkProcessorMessage::SendTransaction((
161                incoming_transaction,
162                sender,
163            )))
164            .is_err()
165        {
166            return SendTransactionResult::Failure(format!(
167                "Failed to send transaction ({txid}) to net processor"
168            ));
169        }
170
171        receiver
172            .await
173            .unwrap_or(SendTransactionResult::Failure("Internal error".to_string()))
174    }
175
176    /// Starts the block sync in chain sync component.
177    pub fn start_block_sync(&self) -> bool {
178        self.processor_msg_sender
179            .unbounded_send(NetworkProcessorMessage::StartBlockSync)
180            .is_ok()
181    }
182
183    /// Returns a flag indicating whether the node is actively performing a major sync.
184    pub fn is_major_syncing(&self) -> Arc<AtomicBool> {
185        self.is_major_syncing.clone()
186    }
187
188    #[cfg(test)]
189    pub async fn local_addr_for(&self, peer_addr: PeerId) -> Option<PeerId> {
190        let (sender, receiver) = oneshot::channel();
191
192        self.processor_msg_sender
193            .unbounded_send(NetworkProcessorMessage::RequestLocalAddr(peer_addr, sender))
194            .expect("Failed to request local addr");
195
196        receiver.await.unwrap_or_default()
197    }
198
199    #[cfg(test)]
200    pub async fn process_network_message(
201        &self,
202        from: PeerId,
203        direction: Direction,
204        msg: NetworkMessage,
205    ) -> Result<SyncAction, Error> {
206        let (sender, receiver) = oneshot::channel();
207
208        self.processor_msg_sender
209            .unbounded_send(NetworkProcessorMessage::ProcessNetworkMessage {
210                from,
211                direction,
212                payload: msg,
213                result_sender: sender,
214            })
215            .expect("Failed to send outbound peer message");
216
217        receiver.await.unwrap()
218    }
219
220    #[cfg(test)]
221    pub async fn execute_sync_action(&self, sync_action: SyncAction) {
222        let (sender, receiver) = oneshot::channel();
223
224        self.processor_msg_sender
225            .unbounded_send(NetworkProcessorMessage::ExecuteSyncAction(
226                sync_action,
227                sender,
228            ))
229            .expect("Failed to execute sync action");
230
231        receiver.await.unwrap();
232    }
233}
234
235/// Subcoin network service interface.
236#[async_trait::async_trait]
237pub trait NetworkApi: Send + Sync {
238    /// Whether the network instnace is running.
239    fn enabled(&self) -> bool;
240
241    /// Provides high-level status information about network.
242    ///
243    /// Returns None if the `NetworkProcessor` is no longer running.
244    async fn status(&self) -> Option<NetworkStatus>;
245
246    /// Returns the currently syncing peers.
247    async fn sync_peers(&self) -> Vec<PeerSync>;
248
249    /// Retrieves a transaction by its Txid.
250    async fn get_transaction(&self, txid: Txid) -> Option<Transaction>;
251
252    /// Sends a transaction to the network.
253    async fn send_transaction(&self, transaction: Transaction) -> SendTransactionResult;
254
255    /// Starts the block sync in chain sync component.
256    fn start_block_sync(&self) -> bool;
257
258    /// Whether the node is actively performing a major sync.
259    fn is_major_syncing(&self) -> bool;
260}
261
262#[async_trait::async_trait]
263impl NetworkApi for NetworkHandle {
264    fn enabled(&self) -> bool {
265        true
266    }
267
268    async fn status(&self) -> Option<NetworkStatus> {
269        Self::status(self).await
270    }
271
272    async fn sync_peers(&self) -> Vec<PeerSync> {
273        Self::sync_peers(self).await
274    }
275
276    async fn get_transaction(&self, txid: Txid) -> Option<Transaction> {
277        Self::get_transaction(self, txid).await
278    }
279
280    async fn send_transaction(&self, transaction: Transaction) -> SendTransactionResult {
281        Self::send_transaction(self, transaction).await
282    }
283
284    fn start_block_sync(&self) -> bool {
285        Self::start_block_sync(self)
286    }
287
288    fn is_major_syncing(&self) -> bool {
289        self.is_major_syncing
290            .load(std::sync::atomic::Ordering::Relaxed)
291    }
292}
293
294/// Subcoin network disabled.
295pub struct NoNetwork;
296
297#[async_trait::async_trait]
298impl NetworkApi for NoNetwork {
299    fn enabled(&self) -> bool {
300        false
301    }
302
303    async fn status(&self) -> Option<NetworkStatus> {
304        None
305    }
306
307    async fn sync_peers(&self) -> Vec<PeerSync> {
308        Vec::new()
309    }
310
311    async fn get_transaction(&self, _txid: Txid) -> Option<Transaction> {
312        None
313    }
314
315    async fn send_transaction(&self, _transaction: Transaction) -> SendTransactionResult {
316        SendTransactionResult::Failure("Network service unavailble".to_string())
317    }
318
319    fn start_block_sync(&self) -> bool {
320        false
321    }
322
323    fn is_major_syncing(&self) -> bool {
324        false
325    }
326}
327
328/// Subcoin network is disabled, but chain is syncing from other source, e.g., importing blocks
329/// from the Bitcoind database.
330pub struct OfflineSync;
331
332#[async_trait::async_trait]
333impl NetworkApi for OfflineSync {
334    fn enabled(&self) -> bool {
335        false
336    }
337
338    async fn status(&self) -> Option<NetworkStatus> {
339        None
340    }
341
342    async fn sync_peers(&self) -> Vec<PeerSync> {
343        Vec::new()
344    }
345
346    async fn get_transaction(&self, _txid: Txid) -> Option<Transaction> {
347        None
348    }
349
350    async fn send_transaction(&self, _transaction: Transaction) -> SendTransactionResult {
351        SendTransactionResult::Failure("Network service unavailble".to_string())
352    }
353
354    fn start_block_sync(&self) -> bool {
355        false
356    }
357
358    fn is_major_syncing(&self) -> bool {
359        // Chain is syncing in the offline mode when using import-blocks command.
360        true
361    }
362}