diff --git a/Cargo.lock b/Cargo.lock index 58d2bb0..acddbfe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -226,6 +226,26 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "bincode" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" +dependencies = [ + "bincode_derive", + "serde", + "unty", +] + +[[package]] +name = "bincode_derive" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09" +dependencies = [ + "virtue", +] + [[package]] name = "bindgen" version = "0.72.1" @@ -1923,6 +1943,7 @@ version = "0.1.0" dependencies = [ "anyhow", "arboard", + "bincode", "directories-next", "iris", "pollster", @@ -1932,6 +1953,7 @@ dependencies = [ "tracing", "wgpu", "winit", + "zstd", ] [[package]] @@ -3282,6 +3304,12 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "unty" +version = "0.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" + [[package]] name = "v_frame" version = "0.3.9" @@ -3305,6 +3333,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "virtue" +version = "0.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" + [[package]] name = "walkdir" version = "2.5.0" @@ -4211,6 +4245,34 @@ version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.16+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" +dependencies = [ + "cc", + "pkg-config", +] + [[package]] name = "zune-core" version = "0.4.12" diff --git a/Cargo.toml b/Cargo.toml index 6363ba5..003a595 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,5 @@ tracing = "0.1.41" iris = { path = "../iris" } wgpu = "27.0.1" winit = "0.30.12" +bincode = "2.0.1" +zstd = "0.13.3" diff --git a/src/client/ui.rs b/src/client/ui.rs index d92a6ea..65640eb 100644 --- a/src/client/ui.rs +++ b/src/client/ui.rs @@ -3,7 +3,10 @@ use len_fns::*; use crate::{ client::{Client, app::AppHandle}, - net::client::{NetCmd, NetSender, connect}, + net::{ + ClientMsg, + client::{NetSender, connect}, + }, }; #[derive(Eq, PartialEq, Hash, Clone)] @@ -117,7 +120,11 @@ pub fn msg_panel(ui: &mut Ui, network: NetSender) -> impl WidgetFn + use< .clone() .id_on(Submit, move |id, client: &mut Client, _| { let content = client.ui.text(id).take(); - network.send(NetCmd::SendMsg(content.clone())).unwrap(); + network + .send(ClientMsg::SendMsg { + content: content.clone(), + }) + .unwrap(); let msg = msg_widget(content).add(&mut client.ui); client.ui[&msg_area].children.push(msg.any()); }) diff --git a/src/net/client.rs b/src/net/client.rs index ec22c23..067054f 100644 --- a/src/net/client.rs +++ b/src/net/client.rs @@ -1,14 +1,13 @@ use crate::{ client::{AppHandle, ClientEvent}, - net::{SERVER_NAME, no_cert::SkipServerVerification}, + net::{BINCODE_CONFIG, ClientMsg, SERVER_NAME, no_cert::SkipServerVerification}, }; use quinn::{ClientConfig, Connection, Endpoint, crypto::rustls::QuicClientConfig}; use std::{ net::{Ipv6Addr, SocketAddr, SocketAddrV6, ToSocketAddrs}, - str::FromStr, sync::Arc, }; -use tokio::sync::mpsc::UnboundedSender; +use tokio::{io::AsyncWriteExt, sync::mpsc::UnboundedSender}; pub const CLIENT_SOCKET: SocketAddr = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0)); @@ -19,11 +18,7 @@ pub fn connect(handle: AppHandle, ip: String) { }); } -pub enum NetCmd { - SendMsg(String), -} - -pub type NetSender = UnboundedSender; +pub type NetSender = UnboundedSender; // async fn connection_cert(addr: SocketAddr) -> anyhow::Result { // let dirs = directories_next::ProjectDirs::from("", "", "openworm").unwrap(); @@ -71,30 +66,27 @@ async fn connection_no_cert(addr: SocketAddr) -> anyhow::Result { #[tokio::main] async fn connect_the(handle: AppHandle, ip: String) -> anyhow::Result<()> { - let (client_send, mut client_recv) = tokio::sync::mpsc::unbounded_channel::(); + let (client_send, mut client_recv) = tokio::sync::mpsc::unbounded_channel::(); handle.send(ClientEvent::Connect(client_send)); let addr = ip.to_socket_addrs().unwrap().next().unwrap(); - // let addr = SocketAddr::from_str(&ip).unwrap(); let conn = connection_no_cert(addr).await?; while let Some(msg) = client_recv.recv().await { - match msg { - NetCmd::SendMsg(content) => { - let (mut send, recv) = conn - .open_bi() - .await - .map_err(|e| anyhow::anyhow!("failed to open stream: {e}"))?; + let bytes = bincode::encode_to_vec(msg, BINCODE_CONFIG).unwrap(); + let (mut send, recv) = conn + .open_bi() + .await + .map_err(|e| anyhow::anyhow!("failed to open stream: {e}"))?; - drop(recv); + drop(recv); - send.write_all(content.as_bytes()) - .await - .expect("failed to send"); - send.finish().unwrap(); - send.stopped().await.unwrap(); - } - } + send.write_u64(bytes.len() as u64) + .await + .expect("failed to send"); + send.write_all(&bytes).await.expect("failed to send"); + send.finish().unwrap(); + send.stopped().await.unwrap(); } Ok(()) diff --git a/src/net/mod.rs b/src/net/mod.rs index be0b30c..8beacd9 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -1,13 +1,18 @@ +use bincode::config::Configuration; + pub mod client; mod no_cert; pub mod server; pub const SERVER_NAME: &str = "openworm"; +pub const BINCODE_CONFIG: Configuration = bincode::config::standard(); +#[derive(Debug, bincode::Encode, bincode::Decode)] pub enum ClientMsg { SendMsg { content: String }, } +#[derive(Debug, bincode::Encode, bincode::Decode)] pub enum ServerMsg { RecvMsg { content: String }, } diff --git a/src/net/server.rs b/src/net/server.rs index e65805f..80545b8 100644 --- a/src/net/server.rs +++ b/src/net/server.rs @@ -1,17 +1,22 @@ -use crate::net::SERVER_NAME; +use crate::net::{BINCODE_CONFIG, ClientMsg, SERVER_NAME}; use quinn::{ Endpoint, ServerConfig, rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}, }; -use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6}; use std::{fs, path::Path}; +use std::{ + net::{Ipv6Addr, SocketAddr, SocketAddrV6}, + sync::Arc, +}; +use tokio::io::AsyncReadExt; +use tracing::Instrument; pub const DEFAULT_PORT: u16 = 16839; pub const SERVER_HOST: Ipv6Addr = Ipv6Addr::UNSPECIFIED; pub const SERVER_SOCKET: SocketAddr = SocketAddr::V6(SocketAddrV6::new(SERVER_HOST, DEFAULT_PORT, 0, 0)); -pub fn listen(data_path: &Path) -> Endpoint { +pub fn init_endpoint(data_path: &Path) -> Endpoint { let cert_path = data_path.join("cert.der"); let key_path = data_path.join("key.der"); let (cert, key) = match fs::read(&cert_path).and_then(|x| Ok((x, fs::read(&key_path)?))) { @@ -47,3 +52,83 @@ pub fn listen(data_path: &Path) -> Endpoint { quinn::Endpoint::server(server_config, SERVER_SOCKET).unwrap() } + +pub trait ConHandler: Send + Sync + 'static { + fn on_msg(&self, msg: ClientMsg); +} + +pub async fn listen(data_path: &Path, handler: impl ConHandler) { + let handler = Arc::new(handler); + let endpoint = init_endpoint(data_path); + println!("listening on {}", endpoint.local_addr().unwrap()); + + while let Some(conn) = endpoint.accept().await { + let fut = handle_connection(conn, handler.clone()); + tokio::spawn(async move { + if let Err(e) = fut.await { + eprintln!("connection failed: {reason}", reason = e) + } + }); + } +} + +async fn handle_connection( + conn: quinn::Incoming, + handler: Arc, +) -> std::io::Result<()> { + let connection = conn.await?; + let span = tracing::info_span!( + "connection", + remote = %connection.remote_address(), + protocol = %connection + .handshake_data() + .unwrap() + .downcast::().unwrap() + .protocol + .map_or_else(|| "".into(), |x| String::from_utf8_lossy(&x).into_owned()) + ); + async { + // Each stream initiated by the client constitutes a new request. + loop { + let stream = connection.accept_bi().await; + // let time = Instant::now(); + let stream = match stream { + Err(quinn::ConnectionError::ApplicationClosed { .. }) => { + println!("connection closed"); + return Ok(()); + } + Err(e) => { + return Err(e); + } + Ok(s) => s, + }; + let handler = handler.clone(); + tokio::spawn( + async move { + if let Err(e) = handle_stream(stream, handler).await { + eprintln!("failed: {reason}", reason = e); + } + } + .instrument(tracing::info_span!("request")), + ); + } + } + .instrument(span) + .await?; + Ok(()) +} + +async fn handle_stream( + (send, mut recv): (quinn::SendStream, quinn::RecvStream), + handler: Arc, +) -> Result<(), String> { + drop(send); + let len = recv.read_u64().await.unwrap(); + let bytes = recv + .read_to_end(len as usize) + .await + .map_err(|e| format!("failed reading request: {}", e))?; + let (msg, _) = bincode::decode_from_slice::(&bytes, BINCODE_CONFIG).unwrap(); + handler.on_msg(msg); + Ok(()) +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 36befb9..2767c82 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,74 +1,20 @@ -use crate::net::{ClientMsg, server::listen}; -use tracing::Instrument; +use crate::net::{ + ClientMsg, + server::{ConHandler, listen}, +}; #[tokio::main] pub async fn run_server() { let dirs = directories_next::ProjectDirs::from("", "", "openworm").unwrap(); let path = dirs.data_local_dir(); - let endpoint = listen(path); - println!("listening on {}", endpoint.local_addr().unwrap()); + let handler = ClientHandler {}; + listen(path, handler).await; +} - while let Some(conn) = endpoint.accept().await { - let fut = handle_connection(conn); - tokio::spawn(async move { - if let Err(e) = fut.await { - eprintln!("connection failed: {reason}", reason = e) - } - }); +pub struct ClientHandler {} + +impl ConHandler for ClientHandler { + fn on_msg(&self, msg: ClientMsg) { + println!("received msg: {msg:?}"); } } - -async fn handle_connection(conn: quinn::Incoming) -> std::io::Result<()> { - let connection = conn.await?; - let span = tracing::info_span!( - "connection", - remote = %connection.remote_address(), - protocol = %connection - .handshake_data() - .unwrap() - .downcast::().unwrap() - .protocol - .map_or_else(|| "".into(), |x| String::from_utf8_lossy(&x).into_owned()) - ); - async { - // Each stream initiated by the client constitutes a new request. - loop { - let stream = connection.accept_bi().await; - // let time = Instant::now(); - let stream = match stream { - Err(quinn::ConnectionError::ApplicationClosed { .. }) => { - println!("connection closed"); - return Ok(()); - } - Err(e) => { - return Err(e); - } - Ok(s) => s, - }; - tokio::spawn( - async move { - if let Err(e) = handle_stream(stream).await { - eprintln!("failed: {reason}", reason = e); - } - } - .instrument(tracing::info_span!("request")), - ); - } - } - .instrument(span) - .await?; - Ok(()) -} - -async fn handle_stream( - (send, mut recv): (quinn::SendStream, quinn::RecvStream), -) -> Result<(), String> { - drop(send); - let bytes = recv - .read_to_end(std::mem::size_of::()) - .await - .map_err(|e| format!("failed reading request: {}", e))?; - let msg = String::from_utf8(bytes).unwrap(); - println!("received message: {:?}", msg); - Ok(()) -}