subcoin_network/
network_api.rs1#[cfg(test)]
4use crate::Error;
5use crate::PeerId;
6#[cfg(test)]
7use crate::peer_connection::Direction;
8use crate::sync::PeerSync;
9#[cfg(test)]
10use crate::sync::SyncAction;
11#[cfg(test)]
12use bitcoin::p2p::message::NetworkMessage;
13use bitcoin::{Transaction, Txid};
14use sc_utils::mpsc::TracingUnboundedSender;
15use serde::{Deserialize, Serialize};
16use std::sync::Arc;
17use std::sync::atomic::AtomicBool;
18use tokio::sync::oneshot;
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(rename_all = "camelCase")]
23pub enum SyncStatus {
24 Idle,
26 Downloading { target: u32, peers: Vec<PeerId> },
31 Importing { target: u32, peers: Vec<PeerId> },
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37#[serde(rename_all = "camelCase")]
38pub struct NetworkStatus {
39 pub num_connected_peers: usize,
41 pub total_bytes_inbound: u64,
43 pub total_bytes_outbound: u64,
45 pub sync_status: SyncStatus,
47}
48
49#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
51#[serde(rename_all = "camelCase")]
52pub enum SendTransactionResult {
53 Success(Txid),
55 Failure(String),
57}
58
59#[derive(Debug)]
61pub(crate) struct IncomingTransaction {
62 #[allow(dead_code)]
63 pub(crate) txid: Txid,
64 pub(crate) transaction: Transaction,
65}
66
67#[derive(Debug)]
69pub(crate) enum NetworkProcessorMessage {
70 RequestNetworkStatus(oneshot::Sender<NetworkStatus>),
72 RequestSyncPeers(oneshot::Sender<Vec<PeerSync>>),
74 RequestInboundPeersCount(oneshot::Sender<usize>),
76 RequestTransaction(Txid, oneshot::Sender<Option<Transaction>>),
78 SendTransaction((IncomingTransaction, oneshot::Sender<SendTransactionResult>)),
80 StartBlockSync,
82 #[cfg(test)]
84 RequestLocalAddr(PeerId, oneshot::Sender<Option<PeerId>>),
85 #[cfg(test)]
86 ProcessNetworkMessage {
87 from: PeerId,
88 direction: Direction,
89 payload: NetworkMessage,
90 result_sender: oneshot::Sender<Result<SyncAction, Error>>,
91 },
92 #[cfg(test)]
93 ExecuteSyncAction(SyncAction, oneshot::Sender<()>),
94}
95
96#[derive(Debug, Clone)]
101pub struct NetworkHandle {
102 pub(crate) processor_msg_sender: TracingUnboundedSender<NetworkProcessorMessage>,
103 pub(crate) is_major_syncing: Arc<AtomicBool>,
105}
106
107impl NetworkHandle {
108 pub async fn status(&self) -> Option<NetworkStatus> {
112 let (sender, receiver) = oneshot::channel();
113
114 self.processor_msg_sender
115 .unbounded_send(NetworkProcessorMessage::RequestNetworkStatus(sender))
116 .ok();
117
118 receiver.await.ok()
119 }
120
121 pub async fn sync_peers(&self) -> Vec<PeerSync> {
123 let (sender, receiver) = oneshot::channel();
124
125 if self
126 .processor_msg_sender
127 .unbounded_send(NetworkProcessorMessage::RequestSyncPeers(sender))
128 .is_err()
129 {
130 return Vec::new();
131 }
132
133 receiver.await.unwrap_or_default()
134 }
135
136 pub async fn get_transaction(&self, txid: Txid) -> Option<Transaction> {
138 let (sender, receiver) = oneshot::channel();
139
140 if self
141 .processor_msg_sender
142 .unbounded_send(NetworkProcessorMessage::RequestTransaction(txid, sender))
143 .is_err()
144 {
145 return None;
146 }
147
148 receiver.await.ok().flatten()
149 }
150
151 pub async fn send_transaction(&self, transaction: Transaction) -> SendTransactionResult {
153 let (sender, receiver) = oneshot::channel();
154
155 let txid = transaction.compute_txid();
156 let incoming_transaction = IncomingTransaction { txid, transaction };
157
158 if self
159 .processor_msg_sender
160 .unbounded_send(NetworkProcessorMessage::SendTransaction((
161 incoming_transaction,
162 sender,
163 )))
164 .is_err()
165 {
166 return SendTransactionResult::Failure(format!(
167 "Failed to send transaction ({txid}) to net processor"
168 ));
169 }
170
171 receiver
172 .await
173 .unwrap_or(SendTransactionResult::Failure("Internal error".to_string()))
174 }
175
176 pub fn start_block_sync(&self) -> bool {
178 self.processor_msg_sender
179 .unbounded_send(NetworkProcessorMessage::StartBlockSync)
180 .is_ok()
181 }
182
183 pub fn is_major_syncing(&self) -> Arc<AtomicBool> {
185 self.is_major_syncing.clone()
186 }
187
188 #[cfg(test)]
189 pub async fn local_addr_for(&self, peer_addr: PeerId) -> Option<PeerId> {
190 let (sender, receiver) = oneshot::channel();
191
192 self.processor_msg_sender
193 .unbounded_send(NetworkProcessorMessage::RequestLocalAddr(peer_addr, sender))
194 .expect("Failed to request local addr");
195
196 receiver.await.unwrap_or_default()
197 }
198
199 #[cfg(test)]
200 pub async fn process_network_message(
201 &self,
202 from: PeerId,
203 direction: Direction,
204 msg: NetworkMessage,
205 ) -> Result<SyncAction, Error> {
206 let (sender, receiver) = oneshot::channel();
207
208 self.processor_msg_sender
209 .unbounded_send(NetworkProcessorMessage::ProcessNetworkMessage {
210 from,
211 direction,
212 payload: msg,
213 result_sender: sender,
214 })
215 .expect("Failed to send outbound peer message");
216
217 receiver.await.unwrap()
218 }
219
220 #[cfg(test)]
221 pub async fn execute_sync_action(&self, sync_action: SyncAction) {
222 let (sender, receiver) = oneshot::channel();
223
224 self.processor_msg_sender
225 .unbounded_send(NetworkProcessorMessage::ExecuteSyncAction(
226 sync_action,
227 sender,
228 ))
229 .expect("Failed to execute sync action");
230
231 receiver.await.unwrap();
232 }
233}
234
235#[async_trait::async_trait]
237pub trait NetworkApi: Send + Sync {
238 fn enabled(&self) -> bool;
240
241 async fn status(&self) -> Option<NetworkStatus>;
245
246 async fn sync_peers(&self) -> Vec<PeerSync>;
248
249 async fn get_transaction(&self, txid: Txid) -> Option<Transaction>;
251
252 async fn send_transaction(&self, transaction: Transaction) -> SendTransactionResult;
254
255 fn start_block_sync(&self) -> bool;
257
258 fn is_major_syncing(&self) -> bool;
260}
261
262#[async_trait::async_trait]
263impl NetworkApi for NetworkHandle {
264 fn enabled(&self) -> bool {
265 true
266 }
267
268 async fn status(&self) -> Option<NetworkStatus> {
269 Self::status(self).await
270 }
271
272 async fn sync_peers(&self) -> Vec<PeerSync> {
273 Self::sync_peers(self).await
274 }
275
276 async fn get_transaction(&self, txid: Txid) -> Option<Transaction> {
277 Self::get_transaction(self, txid).await
278 }
279
280 async fn send_transaction(&self, transaction: Transaction) -> SendTransactionResult {
281 Self::send_transaction(self, transaction).await
282 }
283
284 fn start_block_sync(&self) -> bool {
285 Self::start_block_sync(self)
286 }
287
288 fn is_major_syncing(&self) -> bool {
289 self.is_major_syncing
290 .load(std::sync::atomic::Ordering::Relaxed)
291 }
292}
293
294pub struct NoNetwork;
296
297#[async_trait::async_trait]
298impl NetworkApi for NoNetwork {
299 fn enabled(&self) -> bool {
300 false
301 }
302
303 async fn status(&self) -> Option<NetworkStatus> {
304 None
305 }
306
307 async fn sync_peers(&self) -> Vec<PeerSync> {
308 Vec::new()
309 }
310
311 async fn get_transaction(&self, _txid: Txid) -> Option<Transaction> {
312 None
313 }
314
315 async fn send_transaction(&self, _transaction: Transaction) -> SendTransactionResult {
316 SendTransactionResult::Failure("Network service unavailble".to_string())
317 }
318
319 fn start_block_sync(&self) -> bool {
320 false
321 }
322
323 fn is_major_syncing(&self) -> bool {
324 false
325 }
326}
327
328pub struct OfflineSync;
331
332#[async_trait::async_trait]
333impl NetworkApi for OfflineSync {
334 fn enabled(&self) -> bool {
335 false
336 }
337
338 async fn status(&self) -> Option<NetworkStatus> {
339 None
340 }
341
342 async fn sync_peers(&self) -> Vec<PeerSync> {
343 Vec::new()
344 }
345
346 async fn get_transaction(&self, _txid: Txid) -> Option<Transaction> {
347 None
348 }
349
350 async fn send_transaction(&self, _transaction: Transaction) -> SendTransactionResult {
351 SendTransactionResult::Failure("Network service unavailble".to_string())
352 }
353
354 fn start_block_sync(&self) -> bool {
355 false
356 }
357
358 fn is_major_syncing(&self) -> bool {
359 true
361 }
362}