WORKING BASIC MSG SERVER

This commit is contained in:
2025-11-17 01:44:22 -05:00
parent 8a9e766633
commit 46a566ebd0
8 changed files with 265 additions and 115 deletions

View File

@@ -1,24 +1,42 @@
use crate::{
client::{AppHandle, ClientEvent},
net::{BINCODE_CONFIG, ClientMsg, SERVER_NAME, no_cert::SkipServerVerification},
net::{
ClientMsg, SERVER_NAME, ServerMsg,
no_cert::SkipServerVerification,
transfer::{RecvHandler, recv_uni, send_uni},
},
};
use quinn::{ClientConfig, Connection, Endpoint, crypto::rustls::QuicClientConfig};
use std::{
net::{Ipv6Addr, SocketAddr, SocketAddrV6, ToSocketAddrs},
sync::Arc,
};
use tokio::{io::AsyncWriteExt, sync::mpsc::UnboundedSender};
use tokio::sync::mpsc::UnboundedSender;
pub const CLIENT_SOCKET: SocketAddr =
SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0));
pub fn connect(handle: AppHandle, ip: String) {
pub struct ConnectInfo {
pub ip: String,
pub username: String,
}
pub fn connect(handle: AppHandle, info: ConnectInfo) {
std::thread::spawn(|| {
connect_the(handle, ip).unwrap();
connect_the(handle, info).unwrap();
});
}
pub type NetSender = UnboundedSender<ClientMsg>;
type MsgPayload = ClientMsg;
pub struct NetSender {
send: UnboundedSender<MsgPayload>,
}
impl NetSender {
pub fn send(&self, msg: ClientMsg) {
self.send.send(msg).unwrap();
}
}
// async fn connection_cert(addr: SocketAddr) -> anyhow::Result<Connection> {
// let dirs = directories_next::ProjectDirs::from("", "", "openworm").unwrap();
@@ -65,29 +83,36 @@ async fn connection_no_cert(addr: SocketAddr) -> anyhow::Result<Connection> {
}
#[tokio::main]
async fn connect_the(handle: AppHandle, ip: String) -> anyhow::Result<()> {
let (client_send, mut client_recv) = tokio::sync::mpsc::unbounded_channel::<ClientMsg>();
async fn connect_the(handle: AppHandle, info: ConnectInfo) -> anyhow::Result<()> {
let (send, mut ui_recv) = tokio::sync::mpsc::unbounded_channel::<MsgPayload>();
handle.send(ClientEvent::Connect(client_send));
let addr = ip.to_socket_addrs().unwrap().next().unwrap();
handle.send(ClientEvent::Connect {
username: info.username,
send: NetSender { send },
});
let addr = info.ip.to_socket_addrs().unwrap().next().unwrap();
let conn = connection_no_cert(addr).await?;
let conn_ = conn.clone();
while let Some(msg) = client_recv.recv().await {
let bytes = bincode::encode_to_vec(msg, BINCODE_CONFIG).unwrap();
let (mut send, recv) = conn
.open_bi()
.await
.map_err(|e| anyhow::anyhow!("failed to open stream: {e}"))?;
let recv = ServerRecv { handle };
tokio::spawn(recv_uni(conn_, recv));
drop(recv);
send.write_u64(bytes.len() as u64)
.await
.expect("failed to send");
send.write_all(&bytes).await.expect("failed to send");
send.finish().unwrap();
send.stopped().await.unwrap();
while let Some(msg) = ui_recv.recv().await {
if send_uni(&conn, msg).await.is_err() {
println!("disconnected from server");
break;
}
}
Ok(())
}
struct ServerRecv {
handle: AppHandle,
}
impl RecvHandler<ServerMsg> for ServerRecv {
async fn msg(&self, msg: ServerMsg) {
self.handle.send(ClientEvent::ServerMsg(msg));
}
}

View File

@@ -3,16 +3,27 @@ use bincode::config::Configuration;
pub mod client;
mod no_cert;
pub mod server;
pub mod transfer;
pub const SERVER_NAME: &str = "openworm";
pub const BINCODE_CONFIG: Configuration = bincode::config::standard();
#[derive(Debug, bincode::Encode, bincode::Decode)]
pub enum ClientMsg {
SendMsg { content: String },
SendMsg(Msg),
RequestMsgs,
}
#[derive(Debug, bincode::Encode, bincode::Decode)]
pub enum ServerMsg {
RecvMsg { content: String },
SendMsg(Msg),
LoadMsgs(Vec<Msg>),
}
pub type ServerResp<T> = Result<T, String>;
#[derive(Debug, Clone, bincode::Encode, bincode::Decode)]
pub struct Msg {
pub content: String,
pub user: String,
}

View File

@@ -1,6 +1,9 @@
use crate::net::{BINCODE_CONFIG, ClientMsg, SERVER_NAME};
use crate::net::{
ClientMsg, SERVER_NAME, ServerMsg,
transfer::{RecvHandler, SendResult, recv_uni, send_uni},
};
use quinn::{
Endpoint, ServerConfig,
Connection, Endpoint, ServerConfig,
rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer},
};
use std::{fs, path::Path};
@@ -8,7 +11,6 @@ use std::{
net::{Ipv6Addr, SocketAddr, SocketAddrV6},
sync::Arc,
};
use tokio::io::AsyncReadExt;
use tracing::Instrument;
pub const DEFAULT_PORT: u16 = 16839;
@@ -53,17 +55,31 @@ pub fn init_endpoint(data_path: &Path) -> Endpoint {
quinn::Endpoint::server(server_config, SERVER_SOCKET).unwrap()
}
pub trait ConHandler: Send + Sync + 'static {
fn on_msg(&self, msg: ClientMsg);
#[derive(Clone)]
pub struct ClientSender {
conn: Connection,
}
pub async fn listen(data_path: &Path, handler: impl ConHandler) {
let handler = Arc::new(handler);
impl ClientSender {
pub async fn send(&self, msg: ServerMsg) -> SendResult {
send_uni(&self.conn, msg).await
}
}
pub trait ConAccepter: Send + Sync + 'static {
fn accept(
&self,
send: ClientSender,
) -> impl Future<Output = impl RecvHandler<ClientMsg>> + Send;
}
pub async fn listen(data_path: &Path, accepter: impl ConAccepter) {
let accepter = Arc::new(accepter);
let endpoint = init_endpoint(data_path);
println!("listening on {}", endpoint.local_addr().unwrap());
while let Some(conn) = endpoint.accept().await {
let fut = handle_connection(conn, handler.clone());
let fut = handle_connection(conn, accepter.clone());
tokio::spawn(async move {
if let Err(e) = fut.await {
eprintln!("connection failed: {reason}", reason = e)
@@ -74,13 +90,14 @@ pub async fn listen(data_path: &Path, handler: impl ConHandler) {
async fn handle_connection(
conn: quinn::Incoming,
handler: Arc<impl ConHandler>,
accepter: Arc<impl ConAccepter>,
) -> std::io::Result<()> {
let connection = conn.await?;
let conn = conn.await?;
let handler = accepter.accept(ClientSender { conn: conn.clone() }).await;
let span = tracing::info_span!(
"connection",
remote = %connection.remote_address(),
protocol = %connection
remote = %conn.remote_address(),
protocol = %conn
.handshake_data()
.unwrap()
.downcast::<quinn::crypto::rustls::HandshakeData>().unwrap()
@@ -88,47 +105,9 @@ async fn handle_connection(
.map_or_else(|| "<none>".into(), |x| String::from_utf8_lossy(&x).into_owned())
);
async {
// Each stream initiated by the client constitutes a new request.
loop {
let stream = connection.accept_bi().await;
// let time = Instant::now();
let stream = match stream {
Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
println!("connection closed");
return Ok(());
}
Err(e) => {
return Err(e);
}
Ok(s) => s,
};
let handler = handler.clone();
tokio::spawn(
async move {
if let Err(e) = handle_stream(stream, handler).await {
eprintln!("failed: {reason}", reason = e);
}
}
.instrument(tracing::info_span!("request")),
);
}
recv_uni(conn, handler).await;
}
.instrument(span)
.await?;
Ok(())
}
async fn handle_stream(
(send, mut recv): (quinn::SendStream, quinn::RecvStream),
handler: Arc<impl ConHandler>,
) -> Result<(), String> {
drop(send);
let len = recv.read_u64().await.unwrap();
let bytes = recv
.read_to_end(len as usize)
.await
.map_err(|e| format!("failed reading request: {}", e))?;
let (msg, _) = bincode::decode_from_slice::<ClientMsg, _>(&bytes, BINCODE_CONFIG).unwrap();
handler.on_msg(msg);
.await;
Ok(())
}

48
src/net/transfer.rs Normal file
View File

@@ -0,0 +1,48 @@
use std::sync::Arc;
use crate::net::BINCODE_CONFIG;
use quinn::Connection;
use tokio::io::{AsyncReadExt as _, AsyncWriteExt};
use tracing::Instrument as _;
pub trait RecvHandler<M>: Send + Sync + 'static {
fn msg(&self, msg: M) -> impl Future<Output = ()> + Send;
}
pub type SendResult = Result<(), ()>;
pub async fn send_uni<M: bincode::Encode>(conn: &Connection, msg: M) -> SendResult {
let bytes = bincode::encode_to_vec(msg, BINCODE_CONFIG).unwrap();
let mut send = conn.open_uni().await.map_err(|_| ())?;
send.write_u64(bytes.len() as u64).await.map_err(|_| ())?;
send.write_all(&bytes).await.map_err(|_| ())?;
send.finish().map_err(|_| ())?;
send.stopped().await.map_err(|_| ())?;
Ok(())
}
pub async fn recv_uni<M: bincode::Decode<()>>(conn: Connection, handler: impl RecvHandler<M>) {
let handler = Arc::new(handler);
loop {
let mut recv = match conn.accept_uni().await {
Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
return;
}
Err(e) => {
eprintln!("connection error: {e}");
return;
}
Ok(s) => s,
};
let handler = handler.clone();
tokio::spawn(
async move {
let len = recv.read_u64().await.unwrap();
let bytes = recv.read_to_end(len as usize).await.unwrap();
let (msg, _) = bincode::decode_from_slice::<M, _>(&bytes, BINCODE_CONFIG).unwrap();
handler.msg(msg).await;
}
.instrument(tracing::info_span!("request")),
);
}
}