Start Tun Interface at Daemon Command

This commit is contained in:
Jett Chen 2023-12-09 20:13:49 +08:00
parent 2cb9dd75ca
commit 17610ff90d
9 changed files with 83 additions and 52 deletions

View file

@ -12,7 +12,7 @@ pub enum DaemonCommand {
#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)] #[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
pub struct DaemonStartOptions { pub struct DaemonStartOptions {
pub(super) tun: TunOptions, pub tun: TunOptions,
} }
#[test] #[test]

View file

@ -1,22 +1,35 @@
use std::ops::Deref; use tokio::task::JoinHandle;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use DaemonResponse; use DaemonResponse;
use tun::tokio::TunInterface; use tun::tokio::TunInterface;
use crate::daemon::response::{DaemonResponseData, ServerConfig, ServerInfo}; use crate::daemon::response::{DaemonResponseData, ServerConfig, ServerInfo};
use super::*; use super::*;
enum RunState{
Running(JoinHandle<Result<()>>),
Idle,
}
pub struct DaemonInstance { pub struct DaemonInstance {
rx: async_channel::Receiver<DaemonCommand>, rx: async_channel::Receiver<DaemonCommand>,
sx: async_channel::Sender<DaemonResponse>, sx: async_channel::Sender<DaemonResponse>,
tun_interface: Option<Arc<RwLock<TunInterface>>>, tun_interface: Option<Arc<RwLock<TunInterface>>>,
wg_interface: Arc<RwLock<Interface>>,
wg_state: RunState,
} }
impl DaemonInstance { impl DaemonInstance {
pub fn new(rx: async_channel::Receiver<DaemonCommand>, sx: async_channel::Sender<DaemonResponse>) -> Self { pub fn new(
rx: async_channel::Receiver<DaemonCommand>,
sx: async_channel::Sender<DaemonResponse>,
wg_interface: Arc<RwLock<Interface>>,
) -> Self {
Self { Self {
rx, rx,
sx, sx,
wg_interface,
tun_interface: None, tun_interface: None,
wg_state: RunState::Idle,
} }
} }
@ -28,12 +41,19 @@ impl DaemonInstance {
info!("Daemon got command: {:?}", command); info!("Daemon got command: {:?}", command);
match command { match command {
DaemonCommand::Start(st) => { DaemonCommand::Start(st) => {
if self.tun_interface.is_none() { match self.wg_state {
debug!("Daemon attempting start tun interface."); RunState::Running(_) => {warn!("Got start, but tun interface already up.");}
self.tun_interface = Some(Arc::new(RwLock::new(TunInterface::new(st.tun.open()?)?))); RunState::Idle => {
info!("Daemon started tun interface"); let tun_if = Arc::new(RwLock::new(TunInterface::new(st.tun.open()?)?));
} else { self.tun_interface = Some(tun_if.clone());
warn!("Got start, but tun interface already up."); self.wg_interface.write().await.set_tun(tun_if);
let tmp_wg = self.wg_interface.clone();
let run_task = tokio::spawn(async move {
tmp_wg.read().await.run().await
});
self.wg_state = RunState::Running(run_task);
info!("Daemon started tun interface");
}
} }
Ok(DaemonResponseData::None) Ok(DaemonResponseData::None)
} }

View file

@ -46,22 +46,13 @@ fn parse_public_key(string: &str) -> PublicKey {
pub async fn daemon_main() -> Result<()> { pub async fn daemon_main() -> Result<()> {
let (commands_tx, commands_rx) = async_channel::unbounded(); let (commands_tx, commands_rx) = async_channel::unbounded();
let (response_tx, response_rx) = async_channel::unbounded(); let (response_tx, response_rx) = async_channel::unbounded();
let mut inst = DaemonInstance::new(commands_rx, response_tx);
let mut _tun = tun::TunInterface::new()?;
_tun.set_ipv4_addr(Ipv4Addr::from([10,13,13,2]))?;
_tun.set_nonblocking(true)?;
let tun = tun::tokio::TunInterface::new(_tun)?;
let tun_ref = Arc::new(RwLock::new(tun));
let private_key = parse_secret_key("GNqIAOCRxjl/cicZyvkvpTklgQuUmGUIEkH7IXF/sEE=")?; let private_key = parse_secret_key("GNqIAOCRxjl/cicZyvkvpTklgQuUmGUIEkH7IXF/sEE=")?;
let public_key = parse_public_key("uy75leriJay0+oHLhRMpV+A5xAQ0hCJ+q7Ww81AOvT4=")?; let public_key = parse_public_key("uy75leriJay0+oHLhRMpV+A5xAQ0hCJ+q7Ww81AOvT4=")?;
let preshared_key = Some(parse_key("s7lx/mg+reVEMnGnqeyYOQkzD86n2+gYnx1M9ygi08k=")?); let preshared_key = Some(parse_key("s7lx/mg+reVEMnGnqeyYOQkzD86n2+gYnx1M9ygi08k=")?);
let endpoint = "wg.burrow.rs:51820".to_socket_addrs()?.next().unwrap(); let endpoint = "wg.burrow.rs:51820".to_socket_addrs()?.next().unwrap();
inst.set_tun_interface(tun_ref.clone()); let iface = Interface::new(vec![Peer {
let iface = Interface::new(tun_ref, vec![Peer {
endpoint, endpoint,
private_key, private_key,
public_key, public_key,
@ -69,7 +60,9 @@ pub async fn daemon_main() -> Result<()> {
allowed_ips: vec![IpNetwork::V4(Ipv4Network::DEFAULT_ROUTE)], allowed_ips: vec![IpNetwork::V4(Ipv4Network::DEFAULT_ROUTE)],
}])?; }])?;
tokio::try_join!(iface.run(), inst.run(), listen(commands_tx, response_rx)) let mut inst = DaemonInstance::new(commands_rx, response_tx, Arc::new(RwLock::new(iface)));
tokio::try_join!(inst.run(), listen(commands_tx, response_rx))
.map(|_| {()}); .map(|_| {()});
Ok(()) Ok(())
} }

View file

@ -18,6 +18,7 @@ mod daemon;
mod wireguard; mod wireguard;
use daemon::{DaemonClient, DaemonCommand, DaemonStartOptions}; use daemon::{DaemonClient, DaemonCommand, DaemonStartOptions};
use tun::TunOptions;
use crate::daemon::DaemonResponseData; use crate::daemon::DaemonResponseData;
#[derive(Parser)] #[derive(Parser)]
@ -65,7 +66,11 @@ struct DaemonArgs {}
async fn try_start() -> Result<()> { async fn try_start() -> Result<()> {
let mut client = DaemonClient::new().await?; let mut client = DaemonClient::new().await?;
client client
.send_command(DaemonCommand::Start(DaemonStartOptions::default())) .send_command(DaemonCommand::Start(
DaemonStartOptions{
tun: TunOptions::new().address("10.13.13.2")
}
))
.await .await
.map(|_| ()) .map(|_| ())
} }

View file

@ -77,25 +77,32 @@ impl FromIterator<PeerPcb> for IndexedPcbs {
} }
pub struct Interface { pub struct Interface {
tun: Arc<RwLock<TunInterface>>, tun: Option<Arc<RwLock<TunInterface>>>,
pcbs: Arc<IndexedPcbs>, pcbs: Arc<IndexedPcbs>,
} }
impl Interface { impl Interface {
#[throws] #[throws]
pub fn new<I: IntoIterator<Item = Peer>>(tun: Arc<RwLock<TunInterface>>, peers: I) -> Self { pub fn new<I: IntoIterator<Item = Peer>>(peers: I) -> Self {
let pcbs: IndexedPcbs = peers let pcbs: IndexedPcbs = peers
.into_iter() .into_iter()
.map(|peer| PeerPcb::new(peer, tun.clone())) .map(|peer| PeerPcb::new(peer))
.collect::<Result<_, _>>()?; .collect::<Result<_, _>>()?;
let pcbs = Arc::new(pcbs); let pcbs = Arc::new(pcbs);
Self { tun, pcbs } Self {
pcbs,
tun: None
}
} }
pub async fn run(self) -> anyhow::Result<()> { pub fn set_tun(&mut self, tun: Arc<RwLock<TunInterface>>) {
self.tun = Some(tun);
}
pub async fn run(&self) -> anyhow::Result<()> {
let pcbs = self.pcbs.clone(); let pcbs = self.pcbs.clone();
let tun = self.tun.clone(); let tun = self.tun.clone().ok_or(anyhow::anyhow!("tun interface does not exist"))?;
log::info!("starting interface"); log::info!("starting interface");
let outgoing = async move { let outgoing = async move {
@ -143,24 +150,16 @@ impl Interface {
continue continue
}, },
}; };
// let mut buf = [0u8; 3000];
// match pcbs.pcbs[idx].read().await.recv(&mut buf).await {
// Ok(len) => log::debug!("received {} bytes from peer {}", len, dst_addr),
// Err(e) => {
// log::error!("failed to receive packet {}", e);
// continue
// },
// }
} }
}; };
let mut tsks = vec![]; let mut tsks = vec![];
let tun = self.tun.clone(); let tun = self.tun.clone().ok_or(anyhow::anyhow!("tun interface does not exist"))?;
let outgoing = tokio::task::spawn(outgoing); let outgoing = tokio::task::spawn(outgoing);
tsks.push(outgoing); tsks.push(outgoing);
{ {
let pcbs = self.pcbs; let pcbs = &self.pcbs;
for i in 0..pcbs.pcbs.len(){ for i in 0..pcbs.pcbs.len(){
let mut pcb = pcbs.pcbs[i].clone(); let mut pcb = pcbs.pcbs[i].clone();
let tun = tun.clone(); let tun = tun.clone();
@ -172,7 +171,7 @@ impl Interface {
return return
} }
} }
let r2 = pcb.read().await.run().await; let r2 = pcb.read().await.run(tun).await;
if let Err(e) = r2 { if let Err(e) = r2 {
log::error!("failed to run pcb: {}", e); log::error!("failed to run pcb: {}", e);
return return

View file

@ -28,12 +28,11 @@ pub struct PeerPcb {
pub handle: Option<JoinHandle<()>>, pub handle: Option<JoinHandle<()>>,
socket: Option<UdpSocket>, socket: Option<UdpSocket>,
tunnel: RwLock<Tunnel>, tunnel: RwLock<Tunnel>,
tun_interface: Arc<RwLock<TunInterface>>
} }
impl PeerPcb { impl PeerPcb {
#[throws] #[throws]
pub fn new(peer: Peer, tun_interface: Arc<RwLock<TunInterface>>) -> Self { pub fn new(peer: Peer) -> Self {
let tunnel = RwLock::new(Tunnel::new(peer.private_key, peer.public_key, peer.preshared_key, None, 1, None) let tunnel = RwLock::new(Tunnel::new(peer.private_key, peer.public_key, peer.preshared_key, None, 1, None)
.map_err(|s| anyhow::anyhow!("{}", s))?); .map_err(|s| anyhow::anyhow!("{}", s))?);
@ -42,8 +41,7 @@ impl PeerPcb {
allowed_ips: peer.allowed_ips, allowed_ips: peer.allowed_ips,
handle: None, handle: None,
socket: None, socket: None,
tunnel, tunnel
tun_interface
} }
} }
@ -56,22 +54,22 @@ impl PeerPcb {
Ok(()) Ok(())
} }
pub async fn run(&self) -> Result<(), Error> { pub async fn run(&self, tun_interface: Arc<RwLock<TunInterface>>) -> Result<(), Error> {
let mut buf = [0u8; 3000]; let mut buf = [0u8; 3000];
log::debug!("starting read loop for pcb..."); log::debug!("starting read loop for pcb...");
loop { loop {
tracing::debug!("waiting for packet"); tracing::debug!("waiting for packet");
let len = self.recv(&mut buf).await?; let len = self.recv(&mut buf, tun_interface.clone()).await?;
tracing::debug!("received {} bytes", len); tracing::debug!("received {} bytes", len);
} }
} }
pub async fn recv(&self, buf: &mut [u8]) -> Result<usize, Error> { pub async fn recv(&self, buf: &mut [u8], tun_interface: Arc<RwLock<TunInterface>>) -> Result<usize, Error> {
log::debug!("starting read loop for pcb... for {:?}", &self); log::debug!("starting read loop for pcb... for {:?}", &self);
let rid: i32 = random(); let rid: i32 = random();
log::debug!("start read loop {}", rid); log::debug!("start read loop {}", rid);
loop{ loop{
// log::debug!("{}: waiting for packet", rid); log::debug!("{}: waiting for packet", rid);
let Some(socket) = &self.socket else { let Some(socket) = &self.socket else {
continue continue
}; };
@ -105,12 +103,12 @@ impl PeerPcb {
} }
TunnResult::WriteToTunnelV4(packet, addr) => { TunnResult::WriteToTunnelV4(packet, addr) => {
tracing::debug!("WriteToTunnelV4: {:?}, {:?}", packet, addr); tracing::debug!("WriteToTunnelV4: {:?}, {:?}", packet, addr);
self.tun_interface.read().await.send(packet).await?; tun_interface.read().await.send(packet).await?;
break; break;
} }
TunnResult::WriteToTunnelV6(packet, addr) => { TunnResult::WriteToTunnelV6(packet, addr) => {
tracing::debug!("WriteToTunnelV6: {:?}, {:?}", packet, addr); tracing::debug!("WriteToTunnelV6: {:?}, {:?}", packet, addr);
self.tun_interface.read().await.send(packet).await?; tun_interface.read().await.send(packet).await?;
break; break;
} }
} }

View file

@ -13,6 +13,10 @@ pub struct TunOptions {
pub(crate) no_pi: Option<()>, pub(crate) no_pi: Option<()>,
/// (Linux) Avoid opening an existing persistant device. /// (Linux) Avoid opening an existing persistant device.
pub(crate) tun_excl: Option<()>, pub(crate) tun_excl: Option<()>,
/// (MacOS) Whether to seek the first available utun device.
pub(crate) seek_utun: Option<()>,
/// (Linux) The IP address of the tun interface.
pub(crate) address: Option<String>,
} }
impl TunOptions { impl TunOptions {
@ -27,6 +31,11 @@ impl TunOptions {
pub fn tun_excl(mut self, enable: bool) { self.tun_excl = enable.then_some(()); } pub fn tun_excl(mut self, enable: bool) { self.tun_excl = enable.then_some(()); }
pub fn address(mut self, address: impl ToString) -> Self {
self.address = Some(address.to_string());
self
}
#[throws] #[throws]
pub fn open(self) -> TunInterface { TunInterface::new_with_options(self)? } pub fn open(self) -> TunInterface { TunInterface::new_with_options(self)? }
} }

View file

@ -10,7 +10,8 @@ pub struct TunInterface {
impl TunInterface { impl TunInterface {
#[instrument] #[instrument]
pub fn new(tun: crate::TunInterface) -> io::Result<Self> { pub fn new(mut tun: crate::TunInterface) -> io::Result<Self> {
tun.set_nonblocking(true)?;
Ok(Self { Ok(Self {
inner: AsyncFd::new(tun)?, inner: AsyncFd::new(tun)?,
}) })

View file

@ -33,8 +33,14 @@ impl TunInterface {
#[throws] #[throws]
#[instrument] #[instrument]
pub fn new_with_options(_: TunOptions) -> TunInterface { pub fn new_with_options(options: TunOptions) -> TunInterface {
TunInterface::connect(0)? let ti = TunInterface::connect(0)?;
if let Some(addr) = options.address{
if let Ok(addr) = addr.parse() {
ti.set_ipv4_addr(addr)?;
}
}
ti
} }
#[throws] #[throws]