concurrent read write loop working
relies on timeouts. Write to Networks doesn't work yet
This commit is contained in:
parent
6c1c806401
commit
4038d125db
7 changed files with 121 additions and 51 deletions
6
Cargo.lock
generated
6
Cargo.lock
generated
|
|
@ -283,6 +283,7 @@ dependencies = [
|
||||||
"tracing-oslog",
|
"tracing-oslog",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
"tun",
|
"tun",
|
||||||
|
"uuid",
|
||||||
"x25519-dalek",
|
"x25519-dalek",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
@ -2288,10 +2289,11 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "uuid"
|
name = "uuid"
|
||||||
version = "1.4.0"
|
version = "1.6.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d023da39d1fde5a8a3fe1f3e01ca9632ada0a63e9797de55a879d6e2236277be"
|
checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"getrandom",
|
||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ crate-type = ["lib", "staticlib"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
tokio = { version = "1.21", features = ["rt", "macros", "sync", "io-util", "rt-multi-thread"] }
|
tokio = { version = "1.21", features = ["rt", "macros", "sync", "io-util", "rt-multi-thread", "time"] }
|
||||||
tun = { version = "0.1", path = "../tun", features = ["serde", "tokio"] }
|
tun = { version = "0.1", path = "../tun", features = ["serde", "tokio"] }
|
||||||
clap = { version = "4.3.2", features = ["derive"] }
|
clap = { version = "4.3.2", features = ["derive"] }
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
|
|
@ -40,6 +40,7 @@ async-trait = "0.1.74"
|
||||||
async-channel = "1.9"
|
async-channel = "1.9"
|
||||||
schemars = "0.8"
|
schemars = "0.8"
|
||||||
futures = "0.3.28"
|
futures = "0.3.28"
|
||||||
|
uuid = { version = "1.6.1", features = ["v4"] }
|
||||||
|
|
||||||
[target.'cfg(target_os = "linux")'.dependencies]
|
[target.'cfg(target_os = "linux")'.dependencies]
|
||||||
caps = "0.5.5"
|
caps = "0.5.5"
|
||||||
|
|
|
||||||
|
|
@ -47,6 +47,7 @@ pub async fn daemon_main() -> Result<()> {
|
||||||
|
|
||||||
let mut _tun = tun::TunInterface::new()?;
|
let mut _tun = tun::TunInterface::new()?;
|
||||||
_tun.set_ipv4_addr(Ipv4Addr::from([192, 168, 1, 10]))?;
|
_tun.set_ipv4_addr(Ipv4Addr::from([192, 168, 1, 10]))?;
|
||||||
|
_tun.set_timeout(Some(std::time::Duration::from_secs(1)))?;
|
||||||
let tun = tun::tokio::TunInterface::new(_tun)?;
|
let tun = tun::tokio::TunInterface::new(_tun)?;
|
||||||
|
|
||||||
let private_key = parse_secret_key("GNqIAOCRxjl/cicZyvkvpTklgQuUmGUIEkH7IXF/sEE=")?;
|
let private_key = parse_secret_key("GNqIAOCRxjl/cicZyvkvpTklgQuUmGUIEkH7IXF/sEE=")?;
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
use std::{net::IpAddr, rc::Rc};
|
use std::{net::IpAddr, rc::Rc};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use anyhow::Error;
|
use anyhow::Error;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
|
@ -14,6 +15,7 @@ use tokio::{
|
||||||
use tun::tokio::TunInterface;
|
use tun::tokio::TunInterface;
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
|
use tokio::time::timeout;
|
||||||
|
|
||||||
use super::{noise::Tunnel, pcb, Peer, PeerPcb};
|
use super::{noise::Tunnel, pcb, Peer, PeerPcb};
|
||||||
|
|
||||||
|
|
@ -104,10 +106,10 @@ impl Interface {
|
||||||
|
|
||||||
let src = {
|
let src = {
|
||||||
log::debug!("awaiting read...");
|
log::debug!("awaiting read...");
|
||||||
let src = match tun.write().await.recv(&mut buf[..]).await {
|
let src = match timeout(Duration::from_secs(2), tun.write().await.recv(&mut buf[..])).await {
|
||||||
Ok(len) => &buf[..len],
|
Ok(Ok(len)) => &buf[..len],
|
||||||
Err(e) => {
|
Ok(Err(e)) => {continue}
|
||||||
log::error!("failed reading from interface: {}", e);
|
Err(_would_block) => {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
@ -116,6 +118,7 @@ impl Interface {
|
||||||
src
|
src
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
let dst_addr = match Tunnel::dst_address(src) {
|
let dst_addr = match Tunnel::dst_address(src) {
|
||||||
Some(addr) => addr,
|
Some(addr) => addr,
|
||||||
None => {
|
None => {
|
||||||
|
|
@ -141,33 +144,48 @@ impl Interface {
|
||||||
continue
|
continue
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// let mut buf = [0u8; 3000];
|
||||||
|
// match pcbs.pcbs[idx].read().await.recv(&mut buf).await {
|
||||||
|
// Ok(len) => log::debug!("received {} bytes from peer {}", len, dst_addr),
|
||||||
|
// Err(e) => {
|
||||||
|
// log::error!("failed to receive packet {}", e);
|
||||||
|
// continue
|
||||||
|
// },
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let mut tsks = vec![];
|
||||||
|
let tun = self.tun.clone();
|
||||||
task::LocalSet::new()
|
let outgoing = tokio::task::spawn(outgoing);
|
||||||
.run_until(async move {
|
tsks.push(outgoing);
|
||||||
let mut tsks = vec![];
|
{
|
||||||
let tun = self.tun.clone();
|
let pcbs = self.pcbs;
|
||||||
let outgoing = tokio::task::spawn(outgoing);
|
for i in 0..pcbs.pcbs.len(){
|
||||||
tsks.push(outgoing);
|
let mut pcb = pcbs.pcbs[i].clone();
|
||||||
{
|
let tun = tun.clone();
|
||||||
let pcbs = self.pcbs;
|
let tsk = async move {
|
||||||
for i in 0..pcbs.pcbs.len(){
|
{
|
||||||
let mut pcb = pcbs.pcbs[i].clone();
|
let r1 = pcb.write().await.open_if_closed().await;
|
||||||
let tun = tun.clone();
|
if let Err(e) = r1 {
|
||||||
let tsk = async move {
|
log::error!("failed to open pcb: {}", e);
|
||||||
pcb.write().await.open_if_closed().await;
|
return
|
||||||
pcb.read().await.run(tun).await;
|
}
|
||||||
};
|
|
||||||
tsks.push(tokio::task::spawn(tsk));
|
|
||||||
}
|
}
|
||||||
log::debug!("spawned read tasks");
|
let r2 = pcb.read().await.run().await;
|
||||||
}
|
if let Err(e) = r2 {
|
||||||
log::debug!("preparing to join..");
|
log::error!("failed to run pcb: {}", e);
|
||||||
join_all(tsks).await;
|
return
|
||||||
})
|
} else {
|
||||||
.await;
|
log::debug!("pcb ran successfully");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
tsks.push(tokio::spawn(tsk));
|
||||||
|
}
|
||||||
|
log::debug!("spawned read tasks");
|
||||||
|
}
|
||||||
|
log::debug!("preparing to join..");
|
||||||
|
join_all(tsks).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,12 +2,17 @@ use std::io;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use anyhow::{anyhow, Error};
|
use anyhow::{anyhow, Error};
|
||||||
use fehler::throws;
|
use fehler::throws;
|
||||||
use ip_network::IpNetwork;
|
use ip_network::IpNetwork;
|
||||||
|
use log::log;
|
||||||
|
use rand::random;
|
||||||
use tokio::{net::UdpSocket, task::JoinHandle};
|
use tokio::{net::UdpSocket, task::JoinHandle};
|
||||||
use tokio::sync::{Mutex, RwLock};
|
use tokio::sync::{Mutex, RwLock};
|
||||||
|
use tokio::time::timeout;
|
||||||
|
use uuid::uuid;
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
iface::PacketInterface,
|
iface::PacketInterface,
|
||||||
|
|
@ -48,37 +53,48 @@ impl PeerPcb {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(&self, interface: Arc<RwLock<impl PacketInterface>>) -> Result<(), Error> {
|
pub async fn run(&self) -> Result<(), Error> {
|
||||||
let mut buf = [0u8; 3000];
|
let mut buf = [0u8; 3000];
|
||||||
log::debug!("starting read loop for pcb...");
|
log::debug!("starting read loop for pcb...");
|
||||||
loop {
|
loop {
|
||||||
tracing::debug!("looping");
|
tracing::debug!("waiting for packet");
|
||||||
|
let len = self.recv(&mut buf).await?;
|
||||||
let sock = match &self.socket {
|
tracing::debug!("received {} bytes", len);
|
||||||
None => {continue}
|
|
||||||
Some(sock) => {sock}
|
|
||||||
};
|
|
||||||
|
|
||||||
let (len, addr) = sock.recv_from(&mut buf).await?;
|
|
||||||
|
|
||||||
tracing::debug!("received {} bytes from {}", len, addr);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn recv(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
|
pub async fn recv(&self, buf: &mut [u8]) -> Result<usize, Error> {
|
||||||
|
log::debug!("starting read loop for pcb... for {:?}", &self);
|
||||||
|
let rid: i32 = random();
|
||||||
|
log::debug!("start read loop {}", rid);
|
||||||
loop{
|
loop{
|
||||||
let Some(socket) = self.socket.as_ref() else {
|
log::debug!("{}: waiting for packet", rid);
|
||||||
|
let Some(socket) = &self.socket else {
|
||||||
continue
|
continue
|
||||||
};
|
};
|
||||||
let mut res_buf = [0;1500];
|
let mut res_buf = [0;1500];
|
||||||
let (len, addr) = socket.recv_from(&mut res_buf).await?;
|
log::debug!("{} : waiting for readability on {:?}", rid, socket);
|
||||||
|
match timeout(Duration::from_secs(2), socket.readable()).await {
|
||||||
|
Err(e) => {
|
||||||
|
log::debug!("{}: timeout waiting for readability on {:?}", rid, e);
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
log::debug!("{}: error waiting for readability on {:?}", rid, e);
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
Ok(Ok(_)) => {}
|
||||||
|
};
|
||||||
|
log::debug!("{}: readable!", rid);
|
||||||
|
let Ok(len) = socket.try_recv(&mut res_buf) else {
|
||||||
|
continue
|
||||||
|
};
|
||||||
let mut res_dat = &res_buf[..len];
|
let mut res_dat = &res_buf[..len];
|
||||||
tracing::debug!("Decapsulating {} bytes from {}", len, addr);
|
tracing::debug!("{}: Decapsulating {} bytes", rid, len);
|
||||||
tracing::debug!("{:?}", &res_dat);
|
tracing::debug!("{:?}", &res_dat);
|
||||||
loop {
|
loop {
|
||||||
match self.tunnel.write().await.decapsulate(None, res_dat, &mut buf[..]) {
|
match self.tunnel.write().await.decapsulate(None, res_dat, &mut buf[..]) {
|
||||||
TunnResult::Done => {
|
TunnResult::Done => {
|
||||||
tracing::debug!("Decapsulate done");
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
TunnResult::Err(e) => {
|
TunnResult::Err(e) => {
|
||||||
|
|
@ -87,6 +103,8 @@ impl PeerPcb {
|
||||||
}
|
}
|
||||||
TunnResult::WriteToNetwork(packet) => {
|
TunnResult::WriteToNetwork(packet) => {
|
||||||
tracing::debug!("WriteToNetwork: {:?}", packet);
|
tracing::debug!("WriteToNetwork: {:?}", packet);
|
||||||
|
socket.send(packet).await?;
|
||||||
|
tracing::debug!("WriteToNetwork done");
|
||||||
res_dat = &[];
|
res_dat = &[];
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,14 +27,38 @@ impl TunInterface {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument]
|
// #[instrument]
|
||||||
pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||||
loop {
|
loop {
|
||||||
|
log::debug!("TunInterface receiving...");
|
||||||
let mut guard = self.inner.readable_mut().await?;
|
let mut guard = self.inner.readable_mut().await?;
|
||||||
match guard.try_io(|inner| (*inner).get_mut().recv(buf)) {
|
log::debug!("Got! readable_mut");
|
||||||
Ok(result) => return result,
|
match guard.try_io(|inner| {
|
||||||
Err(_would_block) => continue,
|
// log::debug!("Got! {:#?}", inner);
|
||||||
|
let raw_ref = (*inner).get_mut();
|
||||||
|
// log::debug!("Got mut ref! {:#?}", raw_ref);
|
||||||
|
let recved = raw_ref.recv(buf);
|
||||||
|
// log::debug!("Got recved! {:#?}", recved);
|
||||||
|
recved
|
||||||
|
}) {
|
||||||
|
Ok(result) => {
|
||||||
|
log::debug!("HORRAY");
|
||||||
|
return result
|
||||||
|
},
|
||||||
|
Err(_would_block) => {
|
||||||
|
log::debug!("WouldBlock");
|
||||||
|
continue
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument]
|
||||||
|
pub async fn try_recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||||
|
let mut guard = self.inner.readable_mut().await?;
|
||||||
|
match guard.try_io(|inner| (*inner).get_mut().recv(buf)) {
|
||||||
|
Ok(result) => Ok(result.unwrap_or_default()),
|
||||||
|
Err(_would_block) => Err(io::Error::new(io::ErrorKind::WouldBlock, "WouldBlock")),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -50,6 +50,12 @@ impl TunInterface {
|
||||||
buf[..len-4].copy_from_slice(&tmp_buf[4..len]);
|
buf[..len-4].copy_from_slice(&tmp_buf[4..len]);
|
||||||
len-4
|
len-4
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[throws]
|
||||||
|
#[instrument]
|
||||||
|
pub fn set_timeout(&self, timeout: Option<std::time::Duration>) {
|
||||||
|
self.socket.set_read_timeout(timeout)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument]
|
#[instrument]
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue