diff --git a/burrow/src/daemon/command.rs b/burrow/src/daemon/command.rs index a5a1f30..cbe7f15 100644 --- a/burrow/src/daemon/command.rs +++ b/burrow/src/daemon/command.rs @@ -12,7 +12,7 @@ pub enum DaemonCommand { #[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)] pub struct DaemonStartOptions { - pub(super) tun: TunOptions, + pub tun: TunOptions, } #[test] diff --git a/burrow/src/daemon/instance.rs b/burrow/src/daemon/instance.rs index bb94897..073bc37 100644 --- a/burrow/src/daemon/instance.rs +++ b/burrow/src/daemon/instance.rs @@ -1,22 +1,35 @@ -use std::ops::Deref; +use tokio::task::JoinHandle; use tracing::{debug, info, warn}; use DaemonResponse; use tun::tokio::TunInterface; use crate::daemon::response::{DaemonResponseData, ServerConfig, ServerInfo}; use super::*; +enum RunState{ + Running(JoinHandle>), + Idle, +} + pub struct DaemonInstance { rx: async_channel::Receiver, sx: async_channel::Sender, tun_interface: Option>>, + wg_interface: Arc>, + wg_state: RunState, } impl DaemonInstance { - pub fn new(rx: async_channel::Receiver, sx: async_channel::Sender) -> Self { + pub fn new( + rx: async_channel::Receiver, + sx: async_channel::Sender, + wg_interface: Arc>, + ) -> Self { Self { rx, sx, + wg_interface, tun_interface: None, + wg_state: RunState::Idle, } } @@ -28,12 +41,19 @@ impl DaemonInstance { info!("Daemon got command: {:?}", command); match command { DaemonCommand::Start(st) => { - if self.tun_interface.is_none() { - debug!("Daemon attempting start tun interface."); - self.tun_interface = Some(Arc::new(RwLock::new(TunInterface::new(st.tun.open()?)?))); - info!("Daemon started tun interface"); - } else { - warn!("Got start, but tun interface already up."); + match self.wg_state { + RunState::Running(_) => {warn!("Got start, but tun interface already up.");} + RunState::Idle => { + let tun_if = Arc::new(RwLock::new(TunInterface::new(st.tun.open()?)?)); + self.tun_interface = Some(tun_if.clone()); + self.wg_interface.write().await.set_tun(tun_if); + let tmp_wg = self.wg_interface.clone(); + let run_task = tokio::spawn(async move { + tmp_wg.read().await.run().await + }); + self.wg_state = RunState::Running(run_task); + info!("Daemon started tun interface"); + } } Ok(DaemonResponseData::None) } diff --git a/burrow/src/daemon/mod.rs b/burrow/src/daemon/mod.rs index b44efc1..8814ce2 100644 --- a/burrow/src/daemon/mod.rs +++ b/burrow/src/daemon/mod.rs @@ -46,22 +46,13 @@ fn parse_public_key(string: &str) -> PublicKey { pub async fn daemon_main() -> Result<()> { let (commands_tx, commands_rx) = async_channel::unbounded(); let (response_tx, response_rx) = async_channel::unbounded(); - let mut inst = DaemonInstance::new(commands_rx, response_tx); - - let mut _tun = tun::TunInterface::new()?; - _tun.set_ipv4_addr(Ipv4Addr::from([10,13,13,2]))?; - _tun.set_nonblocking(true)?; - let tun = tun::tokio::TunInterface::new(_tun)?; - let tun_ref = Arc::new(RwLock::new(tun)); let private_key = parse_secret_key("GNqIAOCRxjl/cicZyvkvpTklgQuUmGUIEkH7IXF/sEE=")?; let public_key = parse_public_key("uy75leriJay0+oHLhRMpV+A5xAQ0hCJ+q7Ww81AOvT4=")?; let preshared_key = Some(parse_key("s7lx/mg+reVEMnGnqeyYOQkzD86n2+gYnx1M9ygi08k=")?); let endpoint = "wg.burrow.rs:51820".to_socket_addrs()?.next().unwrap(); - inst.set_tun_interface(tun_ref.clone()); - - let iface = Interface::new(tun_ref, vec![Peer { + let iface = Interface::new(vec![Peer { endpoint, private_key, public_key, @@ -69,7 +60,9 @@ pub async fn daemon_main() -> Result<()> { allowed_ips: vec![IpNetwork::V4(Ipv4Network::DEFAULT_ROUTE)], }])?; - tokio::try_join!(iface.run(), inst.run(), listen(commands_tx, response_rx)) + let mut inst = DaemonInstance::new(commands_rx, response_tx, Arc::new(RwLock::new(iface))); + + tokio::try_join!(inst.run(), listen(commands_tx, response_rx)) .map(|_| {()}); Ok(()) } diff --git a/burrow/src/main.rs b/burrow/src/main.rs index 2e89a48..125d763 100644 --- a/burrow/src/main.rs +++ b/burrow/src/main.rs @@ -18,6 +18,7 @@ mod daemon; mod wireguard; use daemon::{DaemonClient, DaemonCommand, DaemonStartOptions}; +use tun::TunOptions; use crate::daemon::DaemonResponseData; #[derive(Parser)] @@ -65,7 +66,11 @@ struct DaemonArgs {} async fn try_start() -> Result<()> { let mut client = DaemonClient::new().await?; client - .send_command(DaemonCommand::Start(DaemonStartOptions::default())) + .send_command(DaemonCommand::Start( + DaemonStartOptions{ + tun: TunOptions::new().address("10.13.13.2") + } + )) .await .map(|_| ()) } diff --git a/burrow/src/wireguard/iface.rs b/burrow/src/wireguard/iface.rs index 9f5dae4..4a00cbe 100755 --- a/burrow/src/wireguard/iface.rs +++ b/burrow/src/wireguard/iface.rs @@ -77,25 +77,32 @@ impl FromIterator for IndexedPcbs { } pub struct Interface { - tun: Arc>, + tun: Option>>, pcbs: Arc, } impl Interface { #[throws] - pub fn new>(tun: Arc>, peers: I) -> Self { + pub fn new>(peers: I) -> Self { let pcbs: IndexedPcbs = peers .into_iter() - .map(|peer| PeerPcb::new(peer, tun.clone())) + .map(|peer| PeerPcb::new(peer)) .collect::>()?; let pcbs = Arc::new(pcbs); - Self { tun, pcbs } + Self { + pcbs, + tun: None + } } - pub async fn run(self) -> anyhow::Result<()> { + pub fn set_tun(&mut self, tun: Arc>) { + self.tun = Some(tun); + } + + pub async fn run(&self) -> anyhow::Result<()> { let pcbs = self.pcbs.clone(); - let tun = self.tun.clone(); + let tun = self.tun.clone().ok_or(anyhow::anyhow!("tun interface does not exist"))?; log::info!("starting interface"); let outgoing = async move { @@ -143,24 +150,16 @@ impl Interface { continue }, }; - - // let mut buf = [0u8; 3000]; - // match pcbs.pcbs[idx].read().await.recv(&mut buf).await { - // Ok(len) => log::debug!("received {} bytes from peer {}", len, dst_addr), - // Err(e) => { - // log::error!("failed to receive packet {}", e); - // continue - // }, - // } } }; let mut tsks = vec![]; - let tun = self.tun.clone(); + let tun = self.tun.clone().ok_or(anyhow::anyhow!("tun interface does not exist"))?; + let outgoing = tokio::task::spawn(outgoing); tsks.push(outgoing); { - let pcbs = self.pcbs; + let pcbs = &self.pcbs; for i in 0..pcbs.pcbs.len(){ let mut pcb = pcbs.pcbs[i].clone(); let tun = tun.clone(); @@ -172,7 +171,7 @@ impl Interface { return } } - let r2 = pcb.read().await.run().await; + let r2 = pcb.read().await.run(tun).await; if let Err(e) = r2 { log::error!("failed to run pcb: {}", e); return diff --git a/burrow/src/wireguard/pcb.rs b/burrow/src/wireguard/pcb.rs index f92acdc..6fcaa15 100755 --- a/burrow/src/wireguard/pcb.rs +++ b/burrow/src/wireguard/pcb.rs @@ -28,12 +28,11 @@ pub struct PeerPcb { pub handle: Option>, socket: Option, tunnel: RwLock, - tun_interface: Arc> } impl PeerPcb { #[throws] - pub fn new(peer: Peer, tun_interface: Arc>) -> Self { + pub fn new(peer: Peer) -> Self { let tunnel = RwLock::new(Tunnel::new(peer.private_key, peer.public_key, peer.preshared_key, None, 1, None) .map_err(|s| anyhow::anyhow!("{}", s))?); @@ -42,8 +41,7 @@ impl PeerPcb { allowed_ips: peer.allowed_ips, handle: None, socket: None, - tunnel, - tun_interface + tunnel } } @@ -56,22 +54,22 @@ impl PeerPcb { Ok(()) } - pub async fn run(&self) -> Result<(), Error> { + pub async fn run(&self, tun_interface: Arc>) -> Result<(), Error> { let mut buf = [0u8; 3000]; log::debug!("starting read loop for pcb..."); loop { tracing::debug!("waiting for packet"); - let len = self.recv(&mut buf).await?; + let len = self.recv(&mut buf, tun_interface.clone()).await?; tracing::debug!("received {} bytes", len); } } - pub async fn recv(&self, buf: &mut [u8]) -> Result { + pub async fn recv(&self, buf: &mut [u8], tun_interface: Arc>) -> Result { log::debug!("starting read loop for pcb... for {:?}", &self); let rid: i32 = random(); log::debug!("start read loop {}", rid); loop{ - // log::debug!("{}: waiting for packet", rid); + log::debug!("{}: waiting for packet", rid); let Some(socket) = &self.socket else { continue }; @@ -105,12 +103,12 @@ impl PeerPcb { } TunnResult::WriteToTunnelV4(packet, addr) => { tracing::debug!("WriteToTunnelV4: {:?}, {:?}", packet, addr); - self.tun_interface.read().await.send(packet).await?; + tun_interface.read().await.send(packet).await?; break; } TunnResult::WriteToTunnelV6(packet, addr) => { tracing::debug!("WriteToTunnelV6: {:?}, {:?}", packet, addr); - self.tun_interface.read().await.send(packet).await?; + tun_interface.read().await.send(packet).await?; break; } } diff --git a/tun/src/options.rs b/tun/src/options.rs index 3fe5a13..82cadfd 100644 --- a/tun/src/options.rs +++ b/tun/src/options.rs @@ -13,6 +13,10 @@ pub struct TunOptions { pub(crate) no_pi: Option<()>, /// (Linux) Avoid opening an existing persistant device. pub(crate) tun_excl: Option<()>, + /// (MacOS) Whether to seek the first available utun device. + pub(crate) seek_utun: Option<()>, + /// (Linux) The IP address of the tun interface. + pub(crate) address: Option, } impl TunOptions { @@ -27,6 +31,11 @@ impl TunOptions { pub fn tun_excl(mut self, enable: bool) { self.tun_excl = enable.then_some(()); } + pub fn address(mut self, address: impl ToString) -> Self { + self.address = Some(address.to_string()); + self + } + #[throws] pub fn open(self) -> TunInterface { TunInterface::new_with_options(self)? } } diff --git a/tun/src/tokio/mod.rs b/tun/src/tokio/mod.rs index 599e92c..fb924ff 100644 --- a/tun/src/tokio/mod.rs +++ b/tun/src/tokio/mod.rs @@ -10,7 +10,8 @@ pub struct TunInterface { impl TunInterface { #[instrument] - pub fn new(tun: crate::TunInterface) -> io::Result { + pub fn new(mut tun: crate::TunInterface) -> io::Result { + tun.set_nonblocking(true)?; Ok(Self { inner: AsyncFd::new(tun)?, }) diff --git a/tun/src/unix/apple/mod.rs b/tun/src/unix/apple/mod.rs index 83dbdc1..b419294 100644 --- a/tun/src/unix/apple/mod.rs +++ b/tun/src/unix/apple/mod.rs @@ -33,8 +33,14 @@ impl TunInterface { #[throws] #[instrument] - pub fn new_with_options(_: TunOptions) -> TunInterface { - TunInterface::connect(0)? + pub fn new_with_options(options: TunOptions) -> TunInterface { + let ti = TunInterface::connect(0)?; + if let Some(addr) = options.address{ + if let Ok(addr) = addr.parse() { + ti.set_ipv4_addr(addr)?; + } + } + ti } #[throws]