subcoin_service/
network_request_handler.rs

1//! Helper for answering the subcoin specific requests from a remote peer via the
2//! request-response protocol.
3
4use codec::{Decode, Encode};
5use futures::channel::oneshot;
6use futures::stream::StreamExt;
7use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
8use sc_network::config::ProtocolId;
9use sc_network::request_responses::{IncomingRequest, OutgoingResponse};
10use sc_network::{MAX_RESPONSE_SIZE, NetworkBackend, PeerId};
11use sp_api::ProvideRuntimeApi;
12use sp_runtime::traits::Block as BlockT;
13use std::marker::PhantomData;
14use std::sync::Arc;
15use std::time::Duration;
16use subcoin_primitives::runtime::SubcoinApi;
17
18const LOG_TARGET: &str = "sync::subcoin";
19
20/// Generates a `RequestResponseProtocolConfig` for the state request protocol, refusing incoming
21/// requests.
22pub fn generate_protocol_config<
23    Hash: AsRef<[u8]>,
24    B: BlockT,
25    N: NetworkBackend<B, <B as BlockT>::Hash>,
26>(
27    protocol_id: &ProtocolId,
28    genesis_hash: Hash,
29    fork_id: Option<&str>,
30    inbound_queue: async_channel::Sender<IncomingRequest>,
31) -> N::RequestResponseProtocolConfig {
32    N::request_response_config(
33        generate_protocol_name(genesis_hash, fork_id).into(),
34        std::iter::once(generate_legacy_protocol_name(protocol_id).into()).collect(),
35        1024 * 1024,
36        MAX_RESPONSE_SIZE,
37        Duration::from_secs(40),
38        Some(inbound_queue),
39    )
40}
41
42/// Generate the state protocol name from the genesis hash and fork id.
43fn generate_protocol_name<Hash: AsRef<[u8]>>(genesis_hash: Hash, fork_id: Option<&str>) -> String {
44    let genesis_hash = genesis_hash.as_ref();
45    let genesis_hash = array_bytes::bytes2hex("", genesis_hash);
46    if let Some(fork_id) = fork_id {
47        format!("/{genesis_hash}/{fork_id}/subcoin/1",)
48    } else {
49        format!("/{genesis_hash}/subcoin/1")
50    }
51}
52
53/// Generate the legacy state protocol name from chain specific protocol identifier.
54fn generate_legacy_protocol_name(protocol_id: &ProtocolId) -> String {
55    format!("/{}/subcoin/1", protocol_id.as_ref())
56}
57
58/// Versioned network requests.
59#[derive(Debug, codec::Encode, codec::Decode)]
60pub enum VersionedNetworkRequest<Block: BlockT> {
61    V1(v1::NetworkRequest<Block>),
62}
63
64/// Versioned network responses.
65#[derive(Debug, codec::Encode, codec::Decode)]
66pub enum VersionedNetworkResponse<Block: BlockT> {
67    V1(Result<v1::NetworkResponse<Block>, String>),
68}
69
70pub mod v1 {
71    use sp_runtime::traits::{Block as BlockT, NumberFor};
72
73    /// Subcoin network specific requests.
74    #[derive(Debug, codec::Encode, codec::Decode)]
75    pub enum NetworkRequest<Block: BlockT> {
76        /// Requests the best block.
77        GetBestBlock,
78        /// Requests the number of total coins at a specified block.
79        GetCoinsCount { block_hash: Block::Hash },
80        /// Request the header of specified block.
81        GetBlockHeader { block_number: NumberFor<Block> },
82    }
83
84    /// Subcoin network specific responses.
85    #[derive(Debug, codec::Encode, codec::Decode)]
86    pub enum NetworkResponse<Block: BlockT> {
87        BestBlock {
88            best_hash: Block::Hash,
89            best_number: NumberFor<Block>,
90        },
91        /// The number of total coins at the specified block.
92        CoinsCount { block_hash: Block::Hash, count: u64 },
93        /// Block header.
94        BlockHeader { block_header: Block::Header },
95    }
96}
97
98/// Handler for incoming block requests from a remote peer.
99pub struct NetworkRequestHandler<Block, Client> {
100    client: Arc<Client>,
101    request_receiver: async_channel::Receiver<IncomingRequest>,
102    _phantom: PhantomData<Block>,
103}
104
105impl<B, Client> NetworkRequestHandler<B, Client>
106where
107    B: BlockT,
108    Client: HeaderBackend<B>
109        + BlockBackend<B>
110        + ProofProvider<B>
111        + ProvideRuntimeApi<B>
112        + Send
113        + Sync
114        + 'static,
115    Client::Api: SubcoinApi<B>,
116{
117    /// Create a new [`NetworkRequestHandler`].
118    pub fn new<N: NetworkBackend<B, <B as BlockT>::Hash>>(
119        protocol_id: &ProtocolId,
120        fork_id: Option<&str>,
121        client: Arc<Client>,
122        num_peer_hint: usize,
123    ) -> (Self, N::RequestResponseProtocolConfig) {
124        // Reserve enough request slots for one request per peer when we are at the maximum
125        // number of peers.
126        let capacity = std::cmp::max(num_peer_hint, 1);
127        let (tx, request_receiver) = async_channel::bounded(capacity);
128
129        let protocol_config = generate_protocol_config::<_, B, N>(
130            protocol_id,
131            client.info().genesis_hash,
132            fork_id,
133            tx,
134        );
135
136        (
137            Self {
138                client,
139                request_receiver,
140                _phantom: Default::default(),
141            },
142            protocol_config,
143        )
144    }
145
146    /// Run [`NetworkRequestHandler`].
147    pub async fn run(mut self) {
148        while let Some(request) = self.request_receiver.next().await {
149            let IncomingRequest {
150                peer,
151                payload,
152                pending_response,
153            } = request;
154
155            match self.handle_request(payload, pending_response, &peer) {
156                Ok(()) => {
157                    tracing::debug!(target: LOG_TARGET, "Handled subcoin request from {peer}")
158                }
159                Err(e) => {
160                    tracing::debug!(target: LOG_TARGET, "Failed to handle subcoin request from {peer}: {e:?}")
161                }
162            }
163        }
164    }
165
166    fn handle_request(
167        &mut self,
168        payload: Vec<u8>,
169        pending_response: oneshot::Sender<OutgoingResponse>,
170        peer: &PeerId,
171    ) -> Result<(), HandleRequestError> {
172        let request = VersionedNetworkRequest::<B>::decode(&mut payload.as_slice())?;
173
174        tracing::debug!(target: LOG_TARGET, "Handling request from {peer:?}: {request:?}");
175
176        let response: VersionedNetworkResponse<B> = match request {
177            VersionedNetworkRequest::V1(request) => VersionedNetworkResponse::V1(
178                self.process_request_v1(request)
179                    .map_err(|err| err.to_string()),
180            ),
181        };
182
183        pending_response
184            .send(OutgoingResponse {
185                result: Ok(response.encode()),
186                reputation_changes: Vec::new(),
187                sent_feedback: None,
188            })
189            .map_err(|_| HandleRequestError::SendResponse)
190    }
191
192    fn process_request_v1(
193        &self,
194        request: v1::NetworkRequest<B>,
195    ) -> Result<v1::NetworkResponse<B>, HandleRequestError> {
196        match request {
197            v1::NetworkRequest::GetBestBlock => {
198                let info = self.client.info();
199                let response = v1::NetworkResponse::<B>::BestBlock {
200                    best_hash: info.best_hash,
201                    best_number: info.best_number,
202                };
203                Ok(response)
204            }
205            v1::NetworkRequest::GetCoinsCount { block_hash } => {
206                let count = self.client.runtime_api().coins_count(block_hash)?;
207                let response = v1::NetworkResponse::<B>::CoinsCount { block_hash, count };
208                Ok(response)
209            }
210            v1::NetworkRequest::GetBlockHeader { block_number } => {
211                let block_hash = self.client.hash(block_number)?.ok_or_else(|| {
212                    sp_blockchain::Error::Backend(format!(
213                        "Hash for block #{block_number} not found"
214                    ))
215                })?;
216
217                Ok(self
218                    .client
219                    .header(block_hash)?
220                    .map(|block_header| v1::NetworkResponse::<B>::BlockHeader { block_header })
221                    .ok_or_else(|| {
222                        sp_blockchain::Error::Backend(format!(
223                            "Header for #{block_number},{block_hash} not found"
224                        ))
225                    })?)
226            }
227        }
228    }
229}
230
231#[derive(Debug, thiserror::Error)]
232enum HandleRequestError {
233    #[error("Failed to decode block hash: {0}.")]
234    InvalidHash(#[from] codec::Error),
235
236    #[error(transparent)]
237    Client(#[from] sp_blockchain::Error),
238
239    #[error(transparent)]
240    RuntimeApi(#[from] sp_api::ApiError),
241
242    #[error("Failed to send response.")]
243    SendResponse,
244}