Skip to content

Commit

Permalink
Add tcp.ping message type, make error wrapper work with request/respo…
Browse files Browse the repository at this point in the history
…nse services instead of message
  • Loading branch information
hacker-volodya committed Apr 15, 2024
1 parent f5f34cf commit 452ac44
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 46 deletions.
2 changes: 1 addition & 1 deletion liteapi/examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use adnl::{AdnlPeer, AdnlRawPublicKey};
use tokio_tower::multiplex;
use ton_liteapi::layers::WrapMessagesLayer;
use ton_liteapi::peer::LitePeer;
use ton_liteapi::tl::request::{LiteQuery, Request, WrappedRequest};
use ton_liteapi::tl::request::{Request, WrappedRequest};
use tower::{Service, ServiceBuilder, ServiceExt};

#[tokio::main]
Expand Down
21 changes: 10 additions & 11 deletions liteapi/examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,22 @@ use std::env;
use std::error::Error;

use adnl::{AdnlPrivateKey, AdnlPublicKey};
use ton_liteapi::layers::WrapErrorLayer;
use ton_liteapi::layers::{UnwrapMessagesLayer, WrapErrorLayer};
use ton_liteapi::server::serve;
use ton_liteapi::tl::request::{Request, WrappedRequest};
use ton_liteapi::types::LiteError;
use ton_liteapi::tl::response::CurrentTime;
use ton_liteapi::tl::adnl::Message;
use ton_liteapi::tl::response::Response;
use tower::{make::Shared, ServiceBuilder};
use x25519_dalek::StaticSecret;

async fn handler(req: Message) -> Result<Message, LiteError> {
let (query_id, req) = match req {
Message::Query { query_id, query } => (query_id, query),
_ => return Err(LiteError::UnexpectedMessage)
};
println!("Received frame: {:?}, tag = {}", &req, query_id);

let response = Message::Answer { query_id, answer: Response::CurrentTime(CurrentTime { now: 1234 }) };
Ok(response)
async fn handler(req: WrappedRequest) -> Result<Response, LiteError> {
println!("Received frame: {:?}", &req);
if let Request::GetTime = req.request {
Ok(Response::CurrentTime(CurrentTime { now: 1234 }))
} else {
Err(LiteError::UnexpectedMessage)
}
}

#[tokio::main]
Expand All @@ -35,6 +33,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

let service = ServiceBuilder::new()
.buffer(100)
.layer(UnwrapMessagesLayer)
.layer(WrapErrorLayer)
.service_fn(handler);

Expand Down
37 changes: 15 additions & 22 deletions liteapi/src/layers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::task::{Context, Poll};

use futures::future::{self, BoxFuture};
use futures::FutureExt;
use tower::{Layer, Service};

use crate::tl::common::Int256;
Expand Down Expand Up @@ -90,14 +89,15 @@ where
}

fn call(&mut self, request: Message) -> Self::Future {
let request = match request {
Message::Query { query: LiteQuery { wrapped_request }, .. } => wrapped_request,
let (query_id, request) = match request {
Message::Query { query_id, query: LiteQuery { wrapped_request } } => (query_id, wrapped_request),
Message::Ping { random_id } => return Box::pin(future::ok(Message::Pong { random_id })),
_ => return Box::pin(future::err(LiteError::UnexpectedMessage))
};
let fut = self.service.call(request);
Box::pin(async move {
let response = fut.await.map_err(Into::into)?.into();
Ok(Message::Answer { query_id: Int256::default(), answer: response })
let answer = fut.await.map_err(Into::into)?.into();
Ok(Message::Answer { query_id, answer })
})
}
}
Expand All @@ -118,38 +118,31 @@ pub struct WrapErrorService<S> {
service: S,
}

impl<S> Service<Message> for WrapErrorService<S>
impl<S> Service<WrappedRequest> for WrapErrorService<S>
where
S: Service<Message>,
S: Service<WrappedRequest>,
S::Error: Into<LiteError>,
S::Response: Into<Message>,
S::Response: Into<Response>,
S::Future: Send + 'static,
{
type Response = Message;
type Response = Response;
type Error = LiteError;
type Future = BoxFuture<'static, Result<Message, LiteError>>;
type Future = BoxFuture<'static, Result<Response, LiteError>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, request: Message) -> Self::Future {
let query_id = match &request {
Message::Query { query_id, .. } => query_id,
Message::Answer { query_id, .. } => query_id,
}.clone();
fn call(&mut self, request: WrappedRequest) -> Self::Future {
let fut = self.service.call(request);
Box::pin(async move {
let response = fut.await;
match response {
Ok(x) => Ok(x.into()),
Err(e) => Ok(Message::Answer {
query_id,
answer: Response::Error(Error {
code: 500,
message: format!("{:?}", e.into()).as_str().into(),
})
})
Err(e) => Ok(Response::Error(Error {
code: 500,
message: format!("{:?}", e.into()).as_str().into(),
}))
}
})
}
Expand Down
21 changes: 16 additions & 5 deletions liteapi/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::task::Poll;
use adnl::AdnlError;
use futures::{Sink, Stream};
use pin_project::pin_project;
use rand::random;
use tokio_tower::multiplex::TagStore;
use tokio_util::bytes::Bytes;

Expand Down Expand Up @@ -60,20 +61,30 @@ impl<T> Stream for LitePeer<T> where T: Stream<Item = Result<Bytes, AdnlError>>
}
}

#[derive(PartialEq, Eq)]
pub enum LiteTag {
Int256(Int256),
Long(u64),
}

impl<T> TagStore<Message, Message> for LitePeer<T> {
type Tag = Int256;
type Tag = LiteTag;

fn assign_tag(self: std::pin::Pin<&mut Self>, r: &mut Message) -> Self::Tag {
match r {
Message::Answer { query_id, .. } => { *query_id = Int256::random(); query_id.clone() },
Message::Query { query_id, .. } => { *query_id = Int256::random(); query_id.clone() },
Message::Answer { query_id, .. } => { *query_id = Int256::random(); LiteTag::Int256(query_id.clone()) },
Message::Query { query_id, .. } => { *query_id = Int256::random(); LiteTag::Int256(query_id.clone()) },
Message::Ping { random_id } => { *random_id = random(); LiteTag::Long(random_id.clone()) },

Check failure on line 77 in liteapi/src/peer.rs

View workflow job for this annotation

GitHub Actions / Clippy

using `clone` on type `u64` which implements the `Copy` trait
Message::Pong { random_id } => { *random_id = random(); LiteTag::Long(random_id.clone()) },

Check failure on line 78 in liteapi/src/peer.rs

View workflow job for this annotation

GitHub Actions / Clippy

using `clone` on type `u64` which implements the `Copy` trait
}
}

fn finish_tag(self: std::pin::Pin<&mut Self>, r: &Message) -> Self::Tag {
match r {
Message::Answer { query_id, .. } => query_id.clone(),
Message::Query { query_id, .. } => query_id.clone(),
Message::Answer { query_id, .. } => LiteTag::Int256(query_id.clone()),
Message::Query { query_id, .. } => LiteTag::Int256(query_id.clone()),
Message::Ping { random_id } => LiteTag::Long(random_id.clone()),
Message::Pong { random_id } => LiteTag::Long(random_id.clone()),
}
}
}
18 changes: 11 additions & 7 deletions liteapi/src/tl/adnl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@ use super::utils::*;

#[derive(TlRead, TlWrite, Derivative)]
#[derivative(Debug, Clone, PartialEq)]
#[tl(
boxed,
scheme_inline = r##"adnl.message.query query_id:int256 query:bytes = adnl.Message;
adnl.message.answer query_id:int256 answer:bytes = adnl.Message;"##
)]
#[tl(boxed)]
pub enum Message {
/// adnl.message.query query_id:int256 query:bytes = adnl.Message;
#[tl(id = "adnl.message.query")]
#[tl(id = 0xb48bf97a)]
Query { query_id: Int256, #[tl(with = "struct_as_bytes")] query: LiteQuery },

/// adnl.message.answer query_id:int256 answer:bytes = adnl.Message;
#[tl(id = "adnl.message.answer")]
#[tl(id = 0x0fac8416)]
Answer { query_id: Int256, #[tl(with = "struct_as_bytes")] answer: Response },

/// tcp.ping random_id:long = tcp.Pong;
#[tl(id = 0x4d082b9a)]
Ping { random_id: u64 },

/// tcp.pong random_id:long = tcp.Pong;
#[tl(id = 0xdc69fb03)]
Pong { random_id: u64 },
}

0 comments on commit 452ac44

Please sign in to comment.