#[cfg(test)]
use crate::peer_connection::Direction;
use crate::sync::PeerSync;
#[cfg(test)]
use crate::sync::SyncAction;
#[cfg(test)]
use crate::Error;
use crate::PeerId;
#[cfg(test)]
use bitcoin::p2p::message::NetworkMessage;
use bitcoin::{Transaction, Txid};
use sc_utils::mpsc::TracingUnboundedSender;
use serde::{Deserialize, Serialize};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::sync::oneshot;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum SyncStatus {
Idle,
Downloading { target: u32, peers: Vec<PeerId> },
Importing { target: u32, peers: Vec<PeerId> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NetworkStatus {
pub num_connected_peers: usize,
pub total_bytes_inbound: u64,
pub total_bytes_outbound: u64,
pub sync_status: SyncStatus,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum SendTransactionResult {
Success(Txid),
Failure(String),
}
#[derive(Debug)]
pub(crate) struct IncomingTransaction {
pub(crate) txid: Txid,
pub(crate) transaction: Transaction,
}
#[derive(Debug)]
pub(crate) enum NetworkProcessorMessage {
RequestNetworkStatus(oneshot::Sender<NetworkStatus>),
RequestSyncPeers(oneshot::Sender<Vec<PeerSync>>),
RequestInboundPeersCount(oneshot::Sender<usize>),
RequestTransaction(Txid, oneshot::Sender<Option<Transaction>>),
SendTransaction((IncomingTransaction, oneshot::Sender<SendTransactionResult>)),
StartBlockSync,
#[cfg(test)]
RequestLocalAddr(PeerId, oneshot::Sender<Option<PeerId>>),
#[cfg(test)]
ProcessNetworkMessage {
from: PeerId,
direction: Direction,
payload: NetworkMessage,
result_sender: oneshot::Sender<Result<SyncAction, Error>>,
},
#[cfg(test)]
ExecuteSyncAction(SyncAction, oneshot::Sender<()>),
}
#[derive(Debug, Clone)]
pub struct NetworkHandle {
pub(crate) processor_msg_sender: TracingUnboundedSender<NetworkProcessorMessage>,
pub(crate) is_major_syncing: Arc<AtomicBool>,
}
impl NetworkHandle {
pub async fn status(&self) -> Option<NetworkStatus> {
let (sender, receiver) = oneshot::channel();
self.processor_msg_sender
.unbounded_send(NetworkProcessorMessage::RequestNetworkStatus(sender))
.ok();
receiver.await.ok()
}
pub async fn sync_peers(&self) -> Vec<PeerSync> {
let (sender, receiver) = oneshot::channel();
if self
.processor_msg_sender
.unbounded_send(NetworkProcessorMessage::RequestSyncPeers(sender))
.is_err()
{
return Vec::new();
}
receiver.await.unwrap_or_default()
}
pub async fn get_transaction(&self, txid: Txid) -> Option<Transaction> {
let (sender, receiver) = oneshot::channel();
if self
.processor_msg_sender
.unbounded_send(NetworkProcessorMessage::RequestTransaction(txid, sender))
.is_err()
{
return None;
}
receiver.await.ok().flatten()
}
pub async fn send_transaction(&self, transaction: Transaction) -> SendTransactionResult {
let (sender, receiver) = oneshot::channel();
let txid = transaction.compute_txid();
let incoming_transaction = IncomingTransaction { txid, transaction };
if self
.processor_msg_sender
.unbounded_send(NetworkProcessorMessage::SendTransaction((
incoming_transaction,
sender,
)))
.is_err()
{
return SendTransactionResult::Failure(format!(
"Failed to send transaction ({txid}) to net processor"
));
}
receiver
.await
.unwrap_or(SendTransactionResult::Failure("Internal error".to_string()))
}
pub fn start_block_sync(&self) -> bool {
self.processor_msg_sender
.unbounded_send(NetworkProcessorMessage::StartBlockSync)
.is_ok()
}
pub fn is_major_syncing(&self) -> Arc<AtomicBool> {
self.is_major_syncing.clone()
}
#[cfg(test)]
pub async fn local_addr_for(&self, peer_addr: PeerId) -> Option<PeerId> {
let (sender, receiver) = oneshot::channel();
self.processor_msg_sender
.unbounded_send(NetworkProcessorMessage::RequestLocalAddr(peer_addr, sender))
.expect("Failed to request local addr");
receiver.await.unwrap_or_default()
}
#[cfg(test)]
pub async fn process_network_message(
&self,
from: PeerId,
direction: Direction,
msg: NetworkMessage,
) -> Result<SyncAction, Error> {
let (sender, receiver) = oneshot::channel();
self.processor_msg_sender
.unbounded_send(NetworkProcessorMessage::ProcessNetworkMessage {
from,
direction,
payload: msg,
result_sender: sender,
})
.expect("Failed to send outbound peer message");
receiver.await.unwrap()
}
#[cfg(test)]
pub async fn execute_sync_action(&self, sync_action: SyncAction) {
let (sender, receiver) = oneshot::channel();
self.processor_msg_sender
.unbounded_send(NetworkProcessorMessage::ExecuteSyncAction(
sync_action,
sender,
))
.expect("Failed to execute sync action");
receiver.await.unwrap();
}
}
#[async_trait::async_trait]
pub trait NetworkApi: Send + Sync {
fn enabled(&self) -> bool;
async fn status(&self) -> Option<NetworkStatus>;
async fn sync_peers(&self) -> Vec<PeerSync>;
async fn get_transaction(&self, txid: Txid) -> Option<Transaction>;
async fn send_transaction(&self, transaction: Transaction) -> SendTransactionResult;
fn start_block_sync(&self) -> bool;
fn is_major_syncing(&self) -> bool;
}
#[async_trait::async_trait]
impl NetworkApi for NetworkHandle {
fn enabled(&self) -> bool {
true
}
async fn status(&self) -> Option<NetworkStatus> {
Self::status(self).await
}
async fn sync_peers(&self) -> Vec<PeerSync> {
Self::sync_peers(self).await
}
async fn get_transaction(&self, txid: Txid) -> Option<Transaction> {
Self::get_transaction(self, txid).await
}
async fn send_transaction(&self, transaction: Transaction) -> SendTransactionResult {
Self::send_transaction(self, transaction).await
}
fn start_block_sync(&self) -> bool {
Self::start_block_sync(self)
}
fn is_major_syncing(&self) -> bool {
self.is_major_syncing
.load(std::sync::atomic::Ordering::Relaxed)
}
}
pub struct NoNetwork;
#[async_trait::async_trait]
impl NetworkApi for NoNetwork {
fn enabled(&self) -> bool {
false
}
async fn status(&self) -> Option<NetworkStatus> {
None
}
async fn sync_peers(&self) -> Vec<PeerSync> {
Vec::new()
}
async fn get_transaction(&self, _txid: Txid) -> Option<Transaction> {
None
}
async fn send_transaction(&self, _transaction: Transaction) -> SendTransactionResult {
SendTransactionResult::Failure("Network service unavailble".to_string())
}
fn start_block_sync(&self) -> bool {
false
}
fn is_major_syncing(&self) -> bool {
false
}
}
pub struct OfflineSync;
#[async_trait::async_trait]
impl NetworkApi for OfflineSync {
fn enabled(&self) -> bool {
false
}
async fn status(&self) -> Option<NetworkStatus> {
None
}
async fn sync_peers(&self) -> Vec<PeerSync> {
Vec::new()
}
async fn get_transaction(&self, _txid: Txid) -> Option<Transaction> {
None
}
async fn send_transaction(&self, _transaction: Transaction) -> SendTransactionResult {
SendTransactionResult::Failure("Network service unavailble".to_string())
}
fn start_block_sync(&self) -> bool {
false
}
fn is_major_syncing(&self) -> bool {
true
}
}