binary refactor
This commit is contained in:
96
src/bin/server/main.rs
Normal file
96
src/bin/server/main.rs
Normal file
@@ -0,0 +1,96 @@
|
||||
mod net;
|
||||
|
||||
use net::{ClientSender, ConAccepter, listen};
|
||||
use openworm::{
|
||||
net::{ClientMsg, DisconnectReason, Msg, RecvHandler, ServerMsg, install_crypto_provider},
|
||||
rsc::DataDir,
|
||||
};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicU64, Ordering},
|
||||
},
|
||||
};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
fn main() {
|
||||
install_crypto_provider();
|
||||
run_server();
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
pub async fn run_server() {
|
||||
let dir = DataDir::default();
|
||||
let path = dir.get();
|
||||
let handler = ServerListener {
|
||||
msgs: Default::default(),
|
||||
senders: Default::default(),
|
||||
count: 0.into(),
|
||||
};
|
||||
listen(path, handler).await;
|
||||
}
|
||||
|
||||
type ClientId = u64;
|
||||
|
||||
struct ServerListener {
|
||||
msgs: Arc<RwLock<Vec<Msg>>>,
|
||||
senders: Arc<RwLock<HashMap<ClientId, ClientSender>>>,
|
||||
count: AtomicU64,
|
||||
}
|
||||
|
||||
impl ConAccepter for ServerListener {
|
||||
async fn accept(&self, send: ClientSender) -> impl RecvHandler<ClientMsg> {
|
||||
let id = self.count.fetch_add(1, Ordering::Release);
|
||||
self.senders.write().await.insert(id, send.clone());
|
||||
ClientHandler {
|
||||
msgs: self.msgs.clone(),
|
||||
senders: self.senders.clone(),
|
||||
send,
|
||||
id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ClientHandler {
|
||||
msgs: Arc<RwLock<Vec<Msg>>>,
|
||||
send: ClientSender,
|
||||
senders: Arc<RwLock<HashMap<ClientId, ClientSender>>>,
|
||||
id: ClientId,
|
||||
}
|
||||
|
||||
impl RecvHandler<ClientMsg> for ClientHandler {
|
||||
async fn msg(&self, msg: ClientMsg) {
|
||||
match msg {
|
||||
ClientMsg::SendMsg(msg) => {
|
||||
self.msgs.write().await.push(msg.clone());
|
||||
let mut handles = Vec::new();
|
||||
for (&id, send) in self.senders.read().await.iter() {
|
||||
if id == self.id {
|
||||
continue;
|
||||
}
|
||||
let send = send.clone();
|
||||
let msg = msg.clone();
|
||||
let fut = async move {
|
||||
let _ = send.send(ServerMsg::SendMsg(msg)).await;
|
||||
};
|
||||
handles.push(tokio::spawn(fut));
|
||||
}
|
||||
for h in handles {
|
||||
h.await.unwrap();
|
||||
}
|
||||
}
|
||||
ClientMsg::RequestMsgs => {
|
||||
let msgs = self.msgs.read().await.clone();
|
||||
let _ = self.send.send(ServerMsg::LoadMsgs(msgs)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn disconnect(&self, reason: DisconnectReason) -> () {
|
||||
match reason {
|
||||
DisconnectReason::Closed | DisconnectReason::Timeout => (),
|
||||
DisconnectReason::Other(e) => println!("connection issue: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
113
src/bin/server/net.rs
Normal file
113
src/bin/server/net.rs
Normal file
@@ -0,0 +1,113 @@
|
||||
use openworm::net::{
|
||||
ClientMsg, RecvHandler, SERVER_NAME, SendResult, ServerMsg, recv_uni, send_uni,
|
||||
};
|
||||
use quinn::{
|
||||
Connection, Endpoint, ServerConfig,
|
||||
rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer},
|
||||
};
|
||||
use std::{fs, path::Path};
|
||||
use std::{
|
||||
net::{Ipv6Addr, SocketAddr, SocketAddrV6},
|
||||
sync::Arc,
|
||||
};
|
||||
use tracing::Instrument;
|
||||
|
||||
pub const DEFAULT_PORT: u16 = 16839;
|
||||
pub const SERVER_HOST: Ipv6Addr = Ipv6Addr::UNSPECIFIED;
|
||||
pub const SERVER_SOCKET: SocketAddr =
|
||||
SocketAddr::V6(SocketAddrV6::new(SERVER_HOST, DEFAULT_PORT, 0, 0));
|
||||
|
||||
pub fn init_endpoint(data_path: &Path) -> Endpoint {
|
||||
let cert_path = data_path.join("cert.der");
|
||||
let key_path = data_path.join("key.der");
|
||||
let (cert, key) = match fs::read(&cert_path).and_then(|x| Ok((x, fs::read(&key_path)?))) {
|
||||
Ok((cert, key)) => (
|
||||
CertificateDer::from(cert),
|
||||
PrivateKeyDer::try_from(key).unwrap(),
|
||||
),
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
let cert = rcgen::generate_simple_self_signed([SERVER_NAME.into()]).unwrap();
|
||||
let key = PrivatePkcs8KeyDer::from(cert.signing_key.serialize_der());
|
||||
let cert = cert.cert.into();
|
||||
fs::create_dir_all(data_path).expect("failed to create certificate directory");
|
||||
fs::write(&cert_path, &cert).expect("failed to write certificate");
|
||||
fs::write(&key_path, key.secret_pkcs8_der()).expect("failed to write private key");
|
||||
(cert, key.into())
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("failed to read certificate: {}", e);
|
||||
}
|
||||
};
|
||||
// let server_crypto = quinn::rustls::ServerConfig::builder()
|
||||
// .with_no_client_auth()
|
||||
// .with_single_cert(vec![cert], key)
|
||||
// .unwrap();
|
||||
//
|
||||
// let server_config = quinn::ServerConfig::with_crypto(Arc::new(
|
||||
// QuicServerConfig::try_from(server_crypto).unwrap(),
|
||||
// ));
|
||||
|
||||
let server_config = ServerConfig::with_single_cert(vec![cert], key).unwrap();
|
||||
// let transport_config = Arc::get_mut(&mut server_config.transport).unwrap();
|
||||
// transport_config.max_concurrent_uni_streams(0_u8.into());
|
||||
|
||||
quinn::Endpoint::server(server_config, SERVER_SOCKET).unwrap()
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ClientSender {
|
||||
conn: Connection,
|
||||
}
|
||||
|
||||
impl ClientSender {
|
||||
pub async fn send(&self, msg: ServerMsg) -> SendResult {
|
||||
send_uni(&self.conn, msg).await
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ConAccepter: Send + Sync + 'static {
|
||||
fn accept(
|
||||
&self,
|
||||
send: ClientSender,
|
||||
) -> impl Future<Output = impl RecvHandler<ClientMsg>> + Send;
|
||||
}
|
||||
|
||||
pub async fn listen(data_path: &Path, accepter: impl ConAccepter) {
|
||||
let accepter = Arc::new(accepter);
|
||||
let endpoint = init_endpoint(data_path);
|
||||
println!("listening on {}", endpoint.local_addr().unwrap());
|
||||
|
||||
while let Some(conn) = endpoint.accept().await {
|
||||
let fut = handle_connection(conn, accepter.clone());
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = fut.await {
|
||||
eprintln!("connection failed: {reason}", reason = e)
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_connection(
|
||||
conn: quinn::Incoming,
|
||||
accepter: Arc<impl ConAccepter>,
|
||||
) -> std::io::Result<()> {
|
||||
let conn = conn.await?;
|
||||
let handler = Arc::new(accepter.accept(ClientSender { conn: conn.clone() }).await);
|
||||
let span = tracing::info_span!(
|
||||
"connection",
|
||||
remote = %conn.remote_address(),
|
||||
protocol = %conn
|
||||
.handshake_data()
|
||||
.unwrap()
|
||||
.downcast::<quinn::crypto::rustls::HandshakeData>().unwrap()
|
||||
.protocol
|
||||
.map_or_else(|| "<none>".into(), |x| String::from_utf8_lossy(&x).into_owned())
|
||||
);
|
||||
async {
|
||||
let res = recv_uni(conn, handler.clone()).await;
|
||||
handler.disconnect(res).await;
|
||||
}
|
||||
.instrument(span)
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user