subcoin_network/
network_api.rs1#[cfg(test)]
4use crate::Error;
5use crate::PeerId;
6#[cfg(test)]
7use crate::peer_connection::Direction;
8use crate::sync::PeerSync;
9#[cfg(test)]
10use crate::sync::SyncAction;
11#[cfg(test)]
12use bitcoin::p2p::message::NetworkMessage;
13use bitcoin::{Transaction, Txid};
14use sc_utils::mpsc::TracingUnboundedSender;
15use serde::{Deserialize, Serialize};
16use std::sync::Arc;
17use std::sync::atomic::AtomicBool;
18use tokio::sync::oneshot;
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(rename_all = "camelCase")]
23pub enum SyncStatus {
24 Idle,
26 Downloading { target: u32, peers: Vec<PeerId> },
31 Importing { target: u32, peers: Vec<PeerId> },
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37#[serde(rename_all = "camelCase")]
38pub struct NetworkStatus {
39 pub num_connected_peers: usize,
41 pub total_bytes_inbound: u64,
43 pub total_bytes_outbound: u64,
45 pub sync_status: SyncStatus,
47}
48
49#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
51#[serde(rename_all = "camelCase")]
52pub enum SendTransactionResult {
53 Success(Txid),
55 Failure(String),
57}
58
59#[derive(Debug)]
61pub(crate) struct IncomingTransaction {
62 pub(crate) txid: Txid,
63 pub(crate) transaction: Transaction,
64}
65
66#[derive(Debug)]
68pub(crate) enum NetworkProcessorMessage {
69 RequestNetworkStatus(oneshot::Sender<NetworkStatus>),
71 RequestSyncPeers(oneshot::Sender<Vec<PeerSync>>),
73 RequestInboundPeersCount(oneshot::Sender<usize>),
75 RequestTransaction(Txid, oneshot::Sender<Option<Transaction>>),
77 SendTransaction((IncomingTransaction, oneshot::Sender<SendTransactionResult>)),
79 StartBlockSync,
81 #[cfg(test)]
83 RequestLocalAddr(PeerId, oneshot::Sender<Option<PeerId>>),
84 #[cfg(test)]
85 ProcessNetworkMessage {
86 from: PeerId,
87 direction: Direction,
88 payload: NetworkMessage,
89 result_sender: oneshot::Sender<Result<SyncAction, Error>>,
90 },
91 #[cfg(test)]
92 ExecuteSyncAction(SyncAction, oneshot::Sender<()>),
93}
94
95#[derive(Debug, Clone)]
100pub struct NetworkHandle {
101 pub(crate) processor_msg_sender: TracingUnboundedSender<NetworkProcessorMessage>,
102 pub(crate) is_major_syncing: Arc<AtomicBool>,
104}
105
106impl NetworkHandle {
107 pub async fn status(&self) -> Option<NetworkStatus> {
111 let (sender, receiver) = oneshot::channel();
112
113 self.processor_msg_sender
114 .unbounded_send(NetworkProcessorMessage::RequestNetworkStatus(sender))
115 .ok();
116
117 receiver.await.ok()
118 }
119
120 pub async fn sync_peers(&self) -> Vec<PeerSync> {
122 let (sender, receiver) = oneshot::channel();
123
124 if self
125 .processor_msg_sender
126 .unbounded_send(NetworkProcessorMessage::RequestSyncPeers(sender))
127 .is_err()
128 {
129 return Vec::new();
130 }
131
132 receiver.await.unwrap_or_default()
133 }
134
135 pub async fn get_transaction(&self, txid: Txid) -> Option<Transaction> {
137 let (sender, receiver) = oneshot::channel();
138
139 if self
140 .processor_msg_sender
141 .unbounded_send(NetworkProcessorMessage::RequestTransaction(txid, sender))
142 .is_err()
143 {
144 return None;
145 }
146
147 receiver.await.ok().flatten()
148 }
149
150 pub async fn send_transaction(&self, transaction: Transaction) -> SendTransactionResult {
152 let (sender, receiver) = oneshot::channel();
153
154 let txid = transaction.compute_txid();
155 let incoming_transaction = IncomingTransaction { txid, transaction };
156
157 if self
158 .processor_msg_sender
159 .unbounded_send(NetworkProcessorMessage::SendTransaction((
160 incoming_transaction,
161 sender,
162 )))
163 .is_err()
164 {
165 return SendTransactionResult::Failure(format!(
166 "Failed to send transaction ({txid}) to net processor"
167 ));
168 }
169
170 receiver
171 .await
172 .unwrap_or(SendTransactionResult::Failure("Internal error".to_string()))
173 }
174
175 pub fn start_block_sync(&self) -> bool {
177 self.processor_msg_sender
178 .unbounded_send(NetworkProcessorMessage::StartBlockSync)
179 .is_ok()
180 }
181
182 pub fn is_major_syncing(&self) -> Arc<AtomicBool> {
184 self.is_major_syncing.clone()
185 }
186
187 #[cfg(test)]
188 pub async fn local_addr_for(&self, peer_addr: PeerId) -> Option<PeerId> {
189 let (sender, receiver) = oneshot::channel();
190
191 self.processor_msg_sender
192 .unbounded_send(NetworkProcessorMessage::RequestLocalAddr(peer_addr, sender))
193 .expect("Failed to request local addr");
194
195 receiver.await.unwrap_or_default()
196 }
197
198 #[cfg(test)]
199 pub async fn process_network_message(
200 &self,
201 from: PeerId,
202 direction: Direction,
203 msg: NetworkMessage,
204 ) -> Result<SyncAction, Error> {
205 let (sender, receiver) = oneshot::channel();
206
207 self.processor_msg_sender
208 .unbounded_send(NetworkProcessorMessage::ProcessNetworkMessage {
209 from,
210 direction,
211 payload: msg,
212 result_sender: sender,
213 })
214 .expect("Failed to send outbound peer message");
215
216 receiver.await.unwrap()
217 }
218
219 #[cfg(test)]
220 pub async fn execute_sync_action(&self, sync_action: SyncAction) {
221 let (sender, receiver) = oneshot::channel();
222
223 self.processor_msg_sender
224 .unbounded_send(NetworkProcessorMessage::ExecuteSyncAction(
225 sync_action,
226 sender,
227 ))
228 .expect("Failed to execute sync action");
229
230 receiver.await.unwrap();
231 }
232}
233
234#[async_trait::async_trait]
236pub trait NetworkApi: Send + Sync {
237 fn enabled(&self) -> bool;
239
240 async fn status(&self) -> Option<NetworkStatus>;
244
245 async fn sync_peers(&self) -> Vec<PeerSync>;
247
248 async fn get_transaction(&self, txid: Txid) -> Option<Transaction>;
250
251 async fn send_transaction(&self, transaction: Transaction) -> SendTransactionResult;
253
254 fn start_block_sync(&self) -> bool;
256
257 fn is_major_syncing(&self) -> bool;
259}
260
261#[async_trait::async_trait]
262impl NetworkApi for NetworkHandle {
263 fn enabled(&self) -> bool {
264 true
265 }
266
267 async fn status(&self) -> Option<NetworkStatus> {
268 Self::status(self).await
269 }
270
271 async fn sync_peers(&self) -> Vec<PeerSync> {
272 Self::sync_peers(self).await
273 }
274
275 async fn get_transaction(&self, txid: Txid) -> Option<Transaction> {
276 Self::get_transaction(self, txid).await
277 }
278
279 async fn send_transaction(&self, transaction: Transaction) -> SendTransactionResult {
280 Self::send_transaction(self, transaction).await
281 }
282
283 fn start_block_sync(&self) -> bool {
284 Self::start_block_sync(self)
285 }
286
287 fn is_major_syncing(&self) -> bool {
288 self.is_major_syncing
289 .load(std::sync::atomic::Ordering::Relaxed)
290 }
291}
292
293pub struct NoNetwork;
295
296#[async_trait::async_trait]
297impl NetworkApi for NoNetwork {
298 fn enabled(&self) -> bool {
299 false
300 }
301
302 async fn status(&self) -> Option<NetworkStatus> {
303 None
304 }
305
306 async fn sync_peers(&self) -> Vec<PeerSync> {
307 Vec::new()
308 }
309
310 async fn get_transaction(&self, _txid: Txid) -> Option<Transaction> {
311 None
312 }
313
314 async fn send_transaction(&self, _transaction: Transaction) -> SendTransactionResult {
315 SendTransactionResult::Failure("Network service unavailble".to_string())
316 }
317
318 fn start_block_sync(&self) -> bool {
319 false
320 }
321
322 fn is_major_syncing(&self) -> bool {
323 false
324 }
325}
326
327pub struct OfflineSync;
330
331#[async_trait::async_trait]
332impl NetworkApi for OfflineSync {
333 fn enabled(&self) -> bool {
334 false
335 }
336
337 async fn status(&self) -> Option<NetworkStatus> {
338 None
339 }
340
341 async fn sync_peers(&self) -> Vec<PeerSync> {
342 Vec::new()
343 }
344
345 async fn get_transaction(&self, _txid: Txid) -> Option<Transaction> {
346 None
347 }
348
349 async fn send_transaction(&self, _transaction: Transaction) -> SendTransactionResult {
350 SendTransactionResult::Failure("Network service unavailble".to_string())
351 }
352
353 fn start_block_sync(&self) -> bool {
354 false
355 }
356
357 fn is_major_syncing(&self) -> bool {
358 true
360 }
361}