From 4dd31d5f1e0838fdd8a55d0d445e120bf8d1943e Mon Sep 17 00:00:00 2001 From: Jett Chen Date: Tue, 28 Nov 2023 22:05:31 +0800 Subject: [PATCH] concurrent read write loop working relies on timeouts. Write to Networks doesn't work yet --- Cargo.lock | 6 ++- burrow/Cargo.toml | 3 +- burrow/src/daemon/mod.rs | 1 + burrow/src/wireguard/iface.rs | 74 ++++++++++++++++++++++------------- burrow/src/wireguard/pcb.rs | 50 +++++++++++++++-------- tun/src/tokio/mod.rs | 32 +++++++++++++-- tun/src/unix/mod.rs | 6 +++ 7 files changed, 121 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0011d90..54a0a53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -283,6 +283,7 @@ dependencies = [ "tracing-oslog", "tracing-subscriber", "tun", + "uuid", "x25519-dalek", ] @@ -2288,10 +2289,11 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.4.0" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d023da39d1fde5a8a3fe1f3e01ca9632ada0a63e9797de55a879d6e2236277be" +checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" dependencies = [ + "getrandom", "serde", ] diff --git a/burrow/Cargo.toml b/burrow/Cargo.toml index d168ac3..684bbb4 100644 --- a/burrow/Cargo.toml +++ b/burrow/Cargo.toml @@ -10,7 +10,7 @@ crate-type = ["lib", "staticlib"] [dependencies] anyhow = "1.0" -tokio = { version = "1.21", features = ["rt", "macros", "sync", "io-util", "rt-multi-thread"] } +tokio = { version = "1.21", features = ["rt", "macros", "sync", "io-util", "rt-multi-thread", "time"] } tun = { version = "0.1", path = "../tun", features = ["serde", "tokio"] } clap = { version = "4.3.2", features = ["derive"] } tracing = "0.1" @@ -40,6 +40,7 @@ async-trait = "0.1.74" async-channel = "1.9" schemars = "0.8" futures = "0.3.28" +uuid = { version = "1.6.1", features = ["v4"] } [target.'cfg(target_os = "linux")'.dependencies] caps = "0.5.5" diff --git a/burrow/src/daemon/mod.rs b/burrow/src/daemon/mod.rs index 9c3bd14..e086452 100644 --- a/burrow/src/daemon/mod.rs +++ b/burrow/src/daemon/mod.rs @@ -47,6 +47,7 @@ pub async fn daemon_main() -> Result<()> { let mut _tun = tun::TunInterface::new()?; _tun.set_ipv4_addr(Ipv4Addr::from([192, 168, 1, 10]))?; + _tun.set_timeout(Some(std::time::Duration::from_secs(1)))?; let tun = tun::tokio::TunInterface::new(_tun)?; let private_key = parse_secret_key("GNqIAOCRxjl/cicZyvkvpTklgQuUmGUIEkH7IXF/sEE=")?; diff --git a/burrow/src/wireguard/iface.rs b/burrow/src/wireguard/iface.rs index 7e260d6..7f6473c 100755 --- a/burrow/src/wireguard/iface.rs +++ b/burrow/src/wireguard/iface.rs @@ -1,5 +1,6 @@ use std::{net::IpAddr, rc::Rc}; use std::sync::Arc; +use std::time::Duration; use anyhow::Error; use async_trait::async_trait; @@ -14,6 +15,7 @@ use tokio::{ use tun::tokio::TunInterface; use futures::future::join_all; use futures::FutureExt; +use tokio::time::timeout; use super::{noise::Tunnel, pcb, Peer, PeerPcb}; @@ -104,10 +106,10 @@ impl Interface { let src = { log::debug!("awaiting read..."); - let src = match tun.write().await.recv(&mut buf[..]).await { - Ok(len) => &buf[..len], - Err(e) => { - log::error!("failed reading from interface: {}", e); + let src = match timeout(Duration::from_secs(2), tun.write().await.recv(&mut buf[..])).await { + Ok(Ok(len)) => &buf[..len], + Ok(Err(e)) => {continue} + Err(_would_block) => { continue } }; @@ -116,6 +118,7 @@ impl Interface { src }; + let dst_addr = match Tunnel::dst_address(src) { Some(addr) => addr, None => { @@ -141,33 +144,48 @@ impl Interface { continue }, }; + + // let mut buf = [0u8; 3000]; + // match pcbs.pcbs[idx].read().await.recv(&mut buf).await { + // Ok(len) => log::debug!("received {} bytes from peer {}", len, dst_addr), + // Err(e) => { + // log::error!("failed to receive packet {}", e); + // continue + // }, + // } } }; - - - task::LocalSet::new() - .run_until(async move { - let mut tsks = vec![]; - let tun = self.tun.clone(); - let outgoing = tokio::task::spawn(outgoing); - tsks.push(outgoing); - { - let pcbs = self.pcbs; - for i in 0..pcbs.pcbs.len(){ - let mut pcb = pcbs.pcbs[i].clone(); - let tun = tun.clone(); - let tsk = async move { - pcb.write().await.open_if_closed().await; - pcb.read().await.run(tun).await; - }; - tsks.push(tokio::task::spawn(tsk)); + let mut tsks = vec![]; + let tun = self.tun.clone(); + let outgoing = tokio::task::spawn(outgoing); + tsks.push(outgoing); + { + let pcbs = self.pcbs; + for i in 0..pcbs.pcbs.len(){ + let mut pcb = pcbs.pcbs[i].clone(); + let tun = tun.clone(); + let tsk = async move { + { + let r1 = pcb.write().await.open_if_closed().await; + if let Err(e) = r1 { + log::error!("failed to open pcb: {}", e); + return + } } - log::debug!("spawned read tasks"); - } - log::debug!("preparing to join.."); - join_all(tsks).await; - }) - .await; + let r2 = pcb.read().await.run().await; + if let Err(e) = r2 { + log::error!("failed to run pcb: {}", e); + return + } else { + log::debug!("pcb ran successfully"); + } + }; + tsks.push(tokio::spawn(tsk)); + } + log::debug!("spawned read tasks"); + } + log::debug!("preparing to join.."); + join_all(tsks).await; } } diff --git a/burrow/src/wireguard/pcb.rs b/burrow/src/wireguard/pcb.rs index 051ca53..4ec63c5 100755 --- a/burrow/src/wireguard/pcb.rs +++ b/burrow/src/wireguard/pcb.rs @@ -2,12 +2,17 @@ use std::io; use std::net::SocketAddr; use std::rc::Rc; use std::sync::Arc; +use std::time::Duration; use anyhow::{anyhow, Error}; use fehler::throws; use ip_network::IpNetwork; +use log::log; +use rand::random; use tokio::{net::UdpSocket, task::JoinHandle}; use tokio::sync::{Mutex, RwLock}; +use tokio::time::timeout; +use uuid::uuid; use super::{ iface::PacketInterface, @@ -48,37 +53,48 @@ impl PeerPcb { Ok(()) } - pub async fn run(&self, interface: Arc>) -> Result<(), Error> { + pub async fn run(&self) -> Result<(), Error> { let mut buf = [0u8; 3000]; log::debug!("starting read loop for pcb..."); loop { - tracing::debug!("looping"); - - let sock = match &self.socket { - None => {continue} - Some(sock) => {sock} - }; - - let (len, addr) = sock.recv_from(&mut buf).await?; - - tracing::debug!("received {} bytes from {}", len, addr); + tracing::debug!("waiting for packet"); + let len = self.recv(&mut buf).await?; + tracing::debug!("received {} bytes", len); } } - pub async fn recv(&mut self, buf: &mut [u8]) -> Result { + pub async fn recv(&self, buf: &mut [u8]) -> Result { + log::debug!("starting read loop for pcb... for {:?}", &self); + let rid: i32 = random(); + log::debug!("start read loop {}", rid); loop{ - let Some(socket) = self.socket.as_ref() else { + log::debug!("{}: waiting for packet", rid); + let Some(socket) = &self.socket else { continue }; let mut res_buf = [0;1500]; - let (len, addr) = socket.recv_from(&mut res_buf).await?; + log::debug!("{} : waiting for readability on {:?}", rid, socket); + match timeout(Duration::from_secs(2), socket.readable()).await { + Err(e) => { + log::debug!("{}: timeout waiting for readability on {:?}", rid, e); + continue + } + Ok(Err(e)) => { + log::debug!("{}: error waiting for readability on {:?}", rid, e); + continue + } + Ok(Ok(_)) => {} + }; + log::debug!("{}: readable!", rid); + let Ok(len) = socket.try_recv(&mut res_buf) else { + continue + }; let mut res_dat = &res_buf[..len]; - tracing::debug!("Decapsulating {} bytes from {}", len, addr); + tracing::debug!("{}: Decapsulating {} bytes", rid, len); tracing::debug!("{:?}", &res_dat); loop { match self.tunnel.write().await.decapsulate(None, res_dat, &mut buf[..]) { TunnResult::Done => { - tracing::debug!("Decapsulate done"); break; } TunnResult::Err(e) => { @@ -87,6 +103,8 @@ impl PeerPcb { } TunnResult::WriteToNetwork(packet) => { tracing::debug!("WriteToNetwork: {:?}", packet); + socket.send(packet).await?; + tracing::debug!("WriteToNetwork done"); res_dat = &[]; continue; } diff --git a/tun/src/tokio/mod.rs b/tun/src/tokio/mod.rs index 8318830..8d23b7b 100644 --- a/tun/src/tokio/mod.rs +++ b/tun/src/tokio/mod.rs @@ -27,14 +27,38 @@ impl TunInterface { } } - #[instrument] + // #[instrument] pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result { loop { + log::debug!("TunInterface receiving..."); let mut guard = self.inner.readable_mut().await?; - match guard.try_io(|inner| (*inner).get_mut().recv(buf)) { - Ok(result) => return result, - Err(_would_block) => continue, + log::debug!("Got! readable_mut"); + match guard.try_io(|inner| { + // log::debug!("Got! {:#?}", inner); + let raw_ref = (*inner).get_mut(); + // log::debug!("Got mut ref! {:#?}", raw_ref); + let recved = raw_ref.recv(buf); + // log::debug!("Got recved! {:#?}", recved); + recved + }) { + Ok(result) => { + log::debug!("HORRAY"); + return result + }, + Err(_would_block) => { + log::debug!("WouldBlock"); + continue + }, } } } + + #[instrument] + pub async fn try_recv(&mut self, buf: &mut [u8]) -> io::Result { + let mut guard = self.inner.readable_mut().await?; + match guard.try_io(|inner| (*inner).get_mut().recv(buf)) { + Ok(result) => Ok(result.unwrap_or_default()), + Err(_would_block) => Err(io::Error::new(io::ErrorKind::WouldBlock, "WouldBlock")), + } + } } diff --git a/tun/src/unix/mod.rs b/tun/src/unix/mod.rs index a3cfeae..407d425 100644 --- a/tun/src/unix/mod.rs +++ b/tun/src/unix/mod.rs @@ -50,6 +50,12 @@ impl TunInterface { buf[..len-4].copy_from_slice(&tmp_buf[4..len]); len-4 } + + #[throws] + #[instrument] + pub fn set_timeout(&self, timeout: Option) { + self.socket.set_read_timeout(timeout)?; + } } #[instrument]