From c7c4e5779c653dcb20cb58a61c7321d08df45888 Mon Sep 17 00:00:00 2001 From: Jett Chen Date: Wed, 22 Nov 2023 21:44:46 +0800 Subject: [PATCH] WIP async read write implementation --- Cargo.lock | 10 +++--- burrow/Cargo.toml | 1 + burrow/src/main.rs | 1 + burrow/src/wireguard/iface.rs | 62 ++++++++++++++++++++--------------- burrow/src/wireguard/pcb.rs | 6 +++- 5 files changed, 49 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e1ac6a..0011d90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -260,6 +260,7 @@ dependencies = [ "env_logger", "etherparse", "fehler", + "futures", "hmac", "insta", "ip_network", @@ -330,11 +331,12 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.79" +version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" +checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" dependencies = [ "jobserver", + "libc", ] [[package]] @@ -1109,9 +1111,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.147" +version = "0.2.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" [[package]] name = "libloading" diff --git a/burrow/Cargo.toml b/burrow/Cargo.toml index 3dfbfe8..d168ac3 100644 --- a/burrow/Cargo.toml +++ b/burrow/Cargo.toml @@ -39,6 +39,7 @@ ip_network = "0.4.0" async-trait = "0.1.74" async-channel = "1.9" schemars = "0.8" +futures = "0.3.28" [target.'cfg(target_os = "linux")'.dependencies] caps = "0.5.5" diff --git a/burrow/src/main.rs b/burrow/src/main.rs index 8ecbe97..2e89a48 100644 --- a/burrow/src/main.rs +++ b/burrow/src/main.rs @@ -13,6 +13,7 @@ use tracing_subscriber::{prelude::*, FmtSubscriber, EnvFilter}; #[cfg(any(target_os = "linux", target_vendor = "apple"))] use tun::TunInterface; + mod daemon; mod wireguard; diff --git a/burrow/src/wireguard/iface.rs b/burrow/src/wireguard/iface.rs index a427bf8..2373cc3 100755 --- a/burrow/src/wireguard/iface.rs +++ b/burrow/src/wireguard/iface.rs @@ -1,4 +1,5 @@ use std::{net::IpAddr, rc::Rc}; +use std::sync::Arc; use anyhow::Error; use async_trait::async_trait; @@ -11,6 +12,7 @@ use tokio::{ task::{self, JoinHandle}, }; use tun::tokio::TunInterface; +use futures::future::join_all; use super::{noise::Tunnel, pcb, Peer, PeerPcb}; @@ -32,7 +34,7 @@ impl PacketInterface for tun::tokio::TunInterface { } struct IndexedPcbs { - pcbs: Vec, + pcbs: Vec>>, allowed_ips: IpNetworkTable, } @@ -49,7 +51,7 @@ impl IndexedPcbs { for allowed_ip in pcb.allowed_ips.iter() { self.allowed_ips.insert(allowed_ip.clone(), idx); } - self.pcbs.insert(idx, pcb); + self.pcbs.insert(idx, Arc::new(Mutex::new(pcb))); } pub fn find(&mut self, addr: IpAddr) -> Option { @@ -57,8 +59,8 @@ impl IndexedPcbs { Some(idx) } - pub fn connect(&mut self, idx: usize, handle: JoinHandle<()>) { - self.pcbs[idx].handle = Some(handle); + pub async fn connect(&mut self, idx: usize, handle: JoinHandle<()>) { + self.pcbs[idx].lock().await.handle = Some(handle); } } @@ -72,8 +74,8 @@ impl FromIterator for IndexedPcbs { } pub struct Interface { - tun: Rc>, - pcbs: Rc>, + tun: Arc>, + pcbs: Arc>, } impl Interface { @@ -84,21 +86,23 @@ impl Interface { .map(|peer| PeerPcb::new(peer)) .collect::>()?; - let tun = Rc::new(Mutex::new(tun)); - let pcbs = Rc::new(Mutex::new(pcbs)); + let tun = Arc::new(Mutex::new(tun)); + let pcbs = Arc::new(Mutex::new(pcbs)); Self { tun, pcbs } } pub async fn run(self) { - let pcbs = self.pcbs; - let tun = self.tun; + let pcbs = self.pcbs.clone(); + let tun = self.tun.clone(); log::info!("starting interface"); let outgoing = async move { loop { + log::debug!("starting loop..."); let mut buf = [0u8; 3000]; let mut tun = tun.lock().await; + log::debug!("awaiting read..."); let src = match tun.recv(&mut buf[..]).await { Ok(len) => &buf[..len], Err(e) => { @@ -127,7 +131,7 @@ impl Interface { log::debug!("found peer:{}", idx); - match pcbs.pcbs[idx].send(src).await { + match pcbs.pcbs[idx].lock().await.send(src).await { Ok(..) => { log::debug!("sent packet to peer {}", dst_addr); } @@ -135,26 +139,32 @@ impl Interface { log::error!("failed to send packet {}", e); continue }, - } - - let mut recv_buf = [0;1500]; - match pcbs.pcbs[idx].recv(&mut recv_buf[..]).await { - Ok(siz) => { - log::info!("received {} bytes from peer",siz); - log::debug!("bytes: {:?}", &recv_buf[..siz]); - }, - Err(e) => { - log::error!("failed to receive packet {}", e); - continue - } - } + }; } }; + + task::LocalSet::new() .run_until(async move { - let outgoing = task::spawn_local(outgoing); - join!(outgoing); + let mut tsks = vec![]; + let tun = self.tun.clone(); + let outgoing = tokio::task::spawn(outgoing); + tsks.push(outgoing); + { + let pcbs = self.pcbs.lock().await; + for i in 0..pcbs.pcbs.len(){ + let pcb = pcbs.pcbs[i].clone(); + let tun = tun.clone(); + let tsk = async move{ + pcb.lock().await.run(tun).await.unwrap(); + }; + tsks.push(tokio::task::spawn(tsk)); + } + log::debug!("spawned read tasks"); + } + log::debug!("preparing to join.."); + join_all(tsks).await; }) .await; } diff --git a/burrow/src/wireguard/pcb.rs b/burrow/src/wireguard/pcb.rs index 8d7cf4a..151aaf8 100755 --- a/burrow/src/wireguard/pcb.rs +++ b/burrow/src/wireguard/pcb.rs @@ -1,9 +1,12 @@ use std::net::SocketAddr; +use std::rc::Rc; +use std::sync::Arc; use anyhow::Error; use fehler::throws; use ip_network::IpNetwork; use tokio::{net::UdpSocket, task::JoinHandle}; +use tokio::sync::Mutex; use super::{ iface::PacketInterface, @@ -44,8 +47,9 @@ impl PeerPcb { Ok(()) } - pub async fn run(&self, interface: Box<&dyn PacketInterface>) -> Result<(), Error> { + pub async fn run(&self, interface: Arc>) -> Result<(), Error> { let mut buf = [0u8; 3000]; + log::debug!("starting read loop for pcb..."); loop { let Some(socket) = self.socket.as_ref() else { continue