WIP async read write implementation

This commit is contained in:
Jett Chen 2023-11-22 21:44:46 +08:00
parent 5174fdd238
commit c7c4e5779c
5 changed files with 49 additions and 31 deletions

10
Cargo.lock generated
View file

@ -260,6 +260,7 @@ dependencies = [
"env_logger", "env_logger",
"etherparse", "etherparse",
"fehler", "fehler",
"futures",
"hmac", "hmac",
"insta", "insta",
"ip_network", "ip_network",
@ -330,11 +331,12 @@ dependencies = [
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.0.79" version = "1.0.83"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0"
dependencies = [ dependencies = [
"jobserver", "jobserver",
"libc",
] ]
[[package]] [[package]]
@ -1109,9 +1111,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.147" version = "0.2.150"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c"
[[package]] [[package]]
name = "libloading" name = "libloading"

View file

@ -39,6 +39,7 @@ ip_network = "0.4.0"
async-trait = "0.1.74" async-trait = "0.1.74"
async-channel = "1.9" async-channel = "1.9"
schemars = "0.8" schemars = "0.8"
futures = "0.3.28"
[target.'cfg(target_os = "linux")'.dependencies] [target.'cfg(target_os = "linux")'.dependencies]
caps = "0.5.5" caps = "0.5.5"

View file

@ -13,6 +13,7 @@ use tracing_subscriber::{prelude::*, FmtSubscriber, EnvFilter};
#[cfg(any(target_os = "linux", target_vendor = "apple"))] #[cfg(any(target_os = "linux", target_vendor = "apple"))]
use tun::TunInterface; use tun::TunInterface;
mod daemon; mod daemon;
mod wireguard; mod wireguard;

View file

@ -1,4 +1,5 @@
use std::{net::IpAddr, rc::Rc}; use std::{net::IpAddr, rc::Rc};
use std::sync::Arc;
use anyhow::Error; use anyhow::Error;
use async_trait::async_trait; use async_trait::async_trait;
@ -11,6 +12,7 @@ use tokio::{
task::{self, JoinHandle}, task::{self, JoinHandle},
}; };
use tun::tokio::TunInterface; use tun::tokio::TunInterface;
use futures::future::join_all;
use super::{noise::Tunnel, pcb, Peer, PeerPcb}; use super::{noise::Tunnel, pcb, Peer, PeerPcb};
@ -32,7 +34,7 @@ impl PacketInterface for tun::tokio::TunInterface {
} }
struct IndexedPcbs { struct IndexedPcbs {
pcbs: Vec<PeerPcb>, pcbs: Vec<Arc<Mutex<PeerPcb>>>,
allowed_ips: IpNetworkTable<usize>, allowed_ips: IpNetworkTable<usize>,
} }
@ -49,7 +51,7 @@ impl IndexedPcbs {
for allowed_ip in pcb.allowed_ips.iter() { for allowed_ip in pcb.allowed_ips.iter() {
self.allowed_ips.insert(allowed_ip.clone(), idx); self.allowed_ips.insert(allowed_ip.clone(), idx);
} }
self.pcbs.insert(idx, pcb); self.pcbs.insert(idx, Arc::new(Mutex::new(pcb)));
} }
pub fn find(&mut self, addr: IpAddr) -> Option<usize> { pub fn find(&mut self, addr: IpAddr) -> Option<usize> {
@ -57,8 +59,8 @@ impl IndexedPcbs {
Some(idx) Some(idx)
} }
pub fn connect(&mut self, idx: usize, handle: JoinHandle<()>) { pub async fn connect(&mut self, idx: usize, handle: JoinHandle<()>) {
self.pcbs[idx].handle = Some(handle); self.pcbs[idx].lock().await.handle = Some(handle);
} }
} }
@ -72,8 +74,8 @@ impl FromIterator<PeerPcb> for IndexedPcbs {
} }
pub struct Interface { pub struct Interface {
tun: Rc<Mutex<TunInterface>>, tun: Arc<Mutex<TunInterface>>,
pcbs: Rc<Mutex<IndexedPcbs>>, pcbs: Arc<Mutex<IndexedPcbs>>,
} }
impl Interface { impl Interface {
@ -84,21 +86,23 @@ impl Interface {
.map(|peer| PeerPcb::new(peer)) .map(|peer| PeerPcb::new(peer))
.collect::<Result<_, _>>()?; .collect::<Result<_, _>>()?;
let tun = Rc::new(Mutex::new(tun)); let tun = Arc::new(Mutex::new(tun));
let pcbs = Rc::new(Mutex::new(pcbs)); let pcbs = Arc::new(Mutex::new(pcbs));
Self { tun, pcbs } Self { tun, pcbs }
} }
pub async fn run(self) { pub async fn run(self) {
let pcbs = self.pcbs; let pcbs = self.pcbs.clone();
let tun = self.tun; let tun = self.tun.clone();
log::info!("starting interface"); log::info!("starting interface");
let outgoing = async move { let outgoing = async move {
loop { loop {
log::debug!("starting loop...");
let mut buf = [0u8; 3000]; let mut buf = [0u8; 3000];
let mut tun = tun.lock().await; let mut tun = tun.lock().await;
log::debug!("awaiting read...");
let src = match tun.recv(&mut buf[..]).await { let src = match tun.recv(&mut buf[..]).await {
Ok(len) => &buf[..len], Ok(len) => &buf[..len],
Err(e) => { Err(e) => {
@ -127,7 +131,7 @@ impl Interface {
log::debug!("found peer:{}", idx); log::debug!("found peer:{}", idx);
match pcbs.pcbs[idx].send(src).await { match pcbs.pcbs[idx].lock().await.send(src).await {
Ok(..) => { Ok(..) => {
log::debug!("sent packet to peer {}", dst_addr); log::debug!("sent packet to peer {}", dst_addr);
} }
@ -135,26 +139,32 @@ impl Interface {
log::error!("failed to send packet {}", e); log::error!("failed to send packet {}", e);
continue continue
}, },
} };
let mut recv_buf = [0;1500];
match pcbs.pcbs[idx].recv(&mut recv_buf[..]).await {
Ok(siz) => {
log::info!("received {} bytes from peer",siz);
log::debug!("bytes: {:?}", &recv_buf[..siz]);
},
Err(e) => {
log::error!("failed to receive packet {}", e);
continue
}
}
} }
}; };
task::LocalSet::new() task::LocalSet::new()
.run_until(async move { .run_until(async move {
let outgoing = task::spawn_local(outgoing); let mut tsks = vec![];
join!(outgoing); let tun = self.tun.clone();
let outgoing = tokio::task::spawn(outgoing);
tsks.push(outgoing);
{
let pcbs = self.pcbs.lock().await;
for i in 0..pcbs.pcbs.len(){
let pcb = pcbs.pcbs[i].clone();
let tun = tun.clone();
let tsk = async move{
pcb.lock().await.run(tun).await.unwrap();
};
tsks.push(tokio::task::spawn(tsk));
}
log::debug!("spawned read tasks");
}
log::debug!("preparing to join..");
join_all(tsks).await;
}) })
.await; .await;
} }

View file

@ -1,9 +1,12 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::rc::Rc;
use std::sync::Arc;
use anyhow::Error; use anyhow::Error;
use fehler::throws; use fehler::throws;
use ip_network::IpNetwork; use ip_network::IpNetwork;
use tokio::{net::UdpSocket, task::JoinHandle}; use tokio::{net::UdpSocket, task::JoinHandle};
use tokio::sync::Mutex;
use super::{ use super::{
iface::PacketInterface, iface::PacketInterface,
@ -44,8 +47,9 @@ impl PeerPcb {
Ok(()) Ok(())
} }
pub async fn run(&self, interface: Box<&dyn PacketInterface>) -> Result<(), Error> { pub async fn run(&self, interface: Arc<Mutex<impl PacketInterface>>) -> Result<(), Error> {
let mut buf = [0u8; 3000]; let mut buf = [0u8; 3000];
log::debug!("starting read loop for pcb...");
loop { loop {
let Some(socket) = self.socket.as_ref() else { let Some(socket) = self.socket.as_ref() else {
continue continue