concurrent read write loop working

relies on timeouts. Write to Networks doesn't work yet
This commit is contained in:
Jett Chen 2023-11-28 22:05:31 +08:00
parent 73b3136597
commit 4dd31d5f1e
7 changed files with 121 additions and 51 deletions

6
Cargo.lock generated
View file

@ -283,6 +283,7 @@ dependencies = [
"tracing-oslog",
"tracing-subscriber",
"tun",
"uuid",
"x25519-dalek",
]
@ -2288,10 +2289,11 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "uuid"
version = "1.4.0"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d023da39d1fde5a8a3fe1f3e01ca9632ada0a63e9797de55a879d6e2236277be"
checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560"
dependencies = [
"getrandom",
"serde",
]

View file

@ -10,7 +10,7 @@ crate-type = ["lib", "staticlib"]
[dependencies]
anyhow = "1.0"
tokio = { version = "1.21", features = ["rt", "macros", "sync", "io-util", "rt-multi-thread"] }
tokio = { version = "1.21", features = ["rt", "macros", "sync", "io-util", "rt-multi-thread", "time"] }
tun = { version = "0.1", path = "../tun", features = ["serde", "tokio"] }
clap = { version = "4.3.2", features = ["derive"] }
tracing = "0.1"
@ -40,6 +40,7 @@ async-trait = "0.1.74"
async-channel = "1.9"
schemars = "0.8"
futures = "0.3.28"
uuid = { version = "1.6.1", features = ["v4"] }
[target.'cfg(target_os = "linux")'.dependencies]
caps = "0.5.5"

View file

@ -47,6 +47,7 @@ pub async fn daemon_main() -> Result<()> {
let mut _tun = tun::TunInterface::new()?;
_tun.set_ipv4_addr(Ipv4Addr::from([192, 168, 1, 10]))?;
_tun.set_timeout(Some(std::time::Duration::from_secs(1)))?;
let tun = tun::tokio::TunInterface::new(_tun)?;
let private_key = parse_secret_key("GNqIAOCRxjl/cicZyvkvpTklgQuUmGUIEkH7IXF/sEE=")?;

View file

@ -1,5 +1,6 @@
use std::{net::IpAddr, rc::Rc};
use std::sync::Arc;
use std::time::Duration;
use anyhow::Error;
use async_trait::async_trait;
@ -14,6 +15,7 @@ use tokio::{
use tun::tokio::TunInterface;
use futures::future::join_all;
use futures::FutureExt;
use tokio::time::timeout;
use super::{noise::Tunnel, pcb, Peer, PeerPcb};
@ -104,10 +106,10 @@ impl Interface {
let src = {
log::debug!("awaiting read...");
let src = match tun.write().await.recv(&mut buf[..]).await {
Ok(len) => &buf[..len],
Err(e) => {
log::error!("failed reading from interface: {}", e);
let src = match timeout(Duration::from_secs(2), tun.write().await.recv(&mut buf[..])).await {
Ok(Ok(len)) => &buf[..len],
Ok(Err(e)) => {continue}
Err(_would_block) => {
continue
}
};
@ -116,6 +118,7 @@ impl Interface {
src
};
let dst_addr = match Tunnel::dst_address(src) {
Some(addr) => addr,
None => {
@ -141,13 +144,18 @@ impl Interface {
continue
},
};
// let mut buf = [0u8; 3000];
// match pcbs.pcbs[idx].read().await.recv(&mut buf).await {
// Ok(len) => log::debug!("received {} bytes from peer {}", len, dst_addr),
// Err(e) => {
// log::error!("failed to receive packet {}", e);
// continue
// },
// }
}
};
task::LocalSet::new()
.run_until(async move {
let mut tsks = vec![];
let tun = self.tun.clone();
let outgoing = tokio::task::spawn(outgoing);
@ -158,16 +166,26 @@ impl Interface {
let mut pcb = pcbs.pcbs[i].clone();
let tun = tun.clone();
let tsk = async move {
pcb.write().await.open_if_closed().await;
pcb.read().await.run(tun).await;
{
let r1 = pcb.write().await.open_if_closed().await;
if let Err(e) = r1 {
log::error!("failed to open pcb: {}", e);
return
}
}
let r2 = pcb.read().await.run().await;
if let Err(e) = r2 {
log::error!("failed to run pcb: {}", e);
return
} else {
log::debug!("pcb ran successfully");
}
};
tsks.push(tokio::task::spawn(tsk));
tsks.push(tokio::spawn(tsk));
}
log::debug!("spawned read tasks");
}
log::debug!("preparing to join..");
join_all(tsks).await;
})
.await;
}
}

View file

@ -2,12 +2,17 @@ use std::io;
use std::net::SocketAddr;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Error};
use fehler::throws;
use ip_network::IpNetwork;
use log::log;
use rand::random;
use tokio::{net::UdpSocket, task::JoinHandle};
use tokio::sync::{Mutex, RwLock};
use tokio::time::timeout;
use uuid::uuid;
use super::{
iface::PacketInterface,
@ -48,37 +53,48 @@ impl PeerPcb {
Ok(())
}
pub async fn run(&self, interface: Arc<RwLock<impl PacketInterface>>) -> Result<(), Error> {
pub async fn run(&self) -> Result<(), Error> {
let mut buf = [0u8; 3000];
log::debug!("starting read loop for pcb...");
loop {
tracing::debug!("looping");
let sock = match &self.socket {
None => {continue}
Some(sock) => {sock}
};
let (len, addr) = sock.recv_from(&mut buf).await?;
tracing::debug!("received {} bytes from {}", len, addr);
tracing::debug!("waiting for packet");
let len = self.recv(&mut buf).await?;
tracing::debug!("received {} bytes", len);
}
}
pub async fn recv(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
pub async fn recv(&self, buf: &mut [u8]) -> Result<usize, Error> {
log::debug!("starting read loop for pcb... for {:?}", &self);
let rid: i32 = random();
log::debug!("start read loop {}", rid);
loop{
let Some(socket) = self.socket.as_ref() else {
log::debug!("{}: waiting for packet", rid);
let Some(socket) = &self.socket else {
continue
};
let mut res_buf = [0;1500];
let (len, addr) = socket.recv_from(&mut res_buf).await?;
log::debug!("{} : waiting for readability on {:?}", rid, socket);
match timeout(Duration::from_secs(2), socket.readable()).await {
Err(e) => {
log::debug!("{}: timeout waiting for readability on {:?}", rid, e);
continue
}
Ok(Err(e)) => {
log::debug!("{}: error waiting for readability on {:?}", rid, e);
continue
}
Ok(Ok(_)) => {}
};
log::debug!("{}: readable!", rid);
let Ok(len) = socket.try_recv(&mut res_buf) else {
continue
};
let mut res_dat = &res_buf[..len];
tracing::debug!("Decapsulating {} bytes from {}", len, addr);
tracing::debug!("{}: Decapsulating {} bytes", rid, len);
tracing::debug!("{:?}", &res_dat);
loop {
match self.tunnel.write().await.decapsulate(None, res_dat, &mut buf[..]) {
TunnResult::Done => {
tracing::debug!("Decapsulate done");
break;
}
TunnResult::Err(e) => {
@ -87,6 +103,8 @@ impl PeerPcb {
}
TunnResult::WriteToNetwork(packet) => {
tracing::debug!("WriteToNetwork: {:?}", packet);
socket.send(packet).await?;
tracing::debug!("WriteToNetwork done");
res_dat = &[];
continue;
}

View file

@ -27,14 +27,38 @@ impl TunInterface {
}
}
#[instrument]
// #[instrument]
pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
loop {
log::debug!("TunInterface receiving...");
let mut guard = self.inner.readable_mut().await?;
log::debug!("Got! readable_mut");
match guard.try_io(|inner| {
// log::debug!("Got! {:#?}", inner);
let raw_ref = (*inner).get_mut();
// log::debug!("Got mut ref! {:#?}", raw_ref);
let recved = raw_ref.recv(buf);
// log::debug!("Got recved! {:#?}", recved);
recved
}) {
Ok(result) => {
log::debug!("HORRAY");
return result
},
Err(_would_block) => {
log::debug!("WouldBlock");
continue
},
}
}
}
#[instrument]
pub async fn try_recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let mut guard = self.inner.readable_mut().await?;
match guard.try_io(|inner| (*inner).get_mut().recv(buf)) {
Ok(result) => return result,
Err(_would_block) => continue,
}
Ok(result) => Ok(result.unwrap_or_default()),
Err(_would_block) => Err(io::Error::new(io::ErrorKind::WouldBlock, "WouldBlock")),
}
}
}

View file

@ -50,6 +50,12 @@ impl TunInterface {
buf[..len-4].copy_from_slice(&tmp_buf[4..len]);
len-4
}
#[throws]
#[instrument]
pub fn set_timeout(&self, timeout: Option<std::time::Duration>) {
self.socket.set_read_timeout(timeout)?;
}
}
#[instrument]