Add implementation for stop command

This adds implementation for stopping the tunnel
via the `Stop` command.
This commit is contained in:
Jett Chen 2024-02-11 03:17:14 +08:00
parent 29d2bfae3f
commit c4c342dc8b
5 changed files with 81 additions and 38 deletions

View file

@ -44,6 +44,17 @@ class PacketTunnelProvider: NEPacketTunnelProvider {
} }
} }
override func stopTunnel(with reason: NEProviderStopReason) async {
do {
let client = try Client()
let command = BurrowRequest(id: 0, command: "Stop")
let data = try await client.request(command, type: Response<BurrowResult<String>>.self)
self.logger.log("Stopped client.")
} catch {
self.logger.error("Failed to stop tunnel: \(error)")
}
}
private func generateTunSettings(from: ServerConfigData) -> NETunnelNetworkSettings? { private func generateTunSettings(from: ServerConfigData) -> NETunnelNetworkSettings? {
let cfig = from.ServerConfig let cfig = from.ServerConfig
let nst = NEPacketTunnelNetworkSettings(tunnelRemoteAddress: "1.1.1.1") let nst = NEPacketTunnelNetworkSettings(tunnelRemoteAddress: "1.1.1.1")

View file

@ -17,6 +17,9 @@ daemon:
start: start:
@$(cargo_norm) start @$(cargo_norm) start
stop:
@$(cargo_norm) stop
test-dns: test-dns:
@sudo route delete 8.8.8.8 @sudo route delete 8.8.8.8
@sudo route add 8.8.8.8 -interface $(tun) @sudo route add 8.8.8.8 -interface $(tun)

View file

@ -21,7 +21,7 @@ enum RunState {
pub struct DaemonInstance { pub struct DaemonInstance {
rx: async_channel::Receiver<DaemonCommand>, rx: async_channel::Receiver<DaemonCommand>,
sx: async_channel::Sender<DaemonResponse>, sx: async_channel::Sender<DaemonResponse>,
tun_interface: Option<Arc<RwLock<TunInterface>>>, tun_interface: Arc<RwLock<Option<TunInterface>>>,
wg_interface: Arc<RwLock<Interface>>, wg_interface: Arc<RwLock<Interface>>,
wg_state: RunState, wg_state: RunState,
} }
@ -36,7 +36,7 @@ impl DaemonInstance {
rx, rx,
sx, sx,
wg_interface, wg_interface,
tun_interface: None, tun_interface: Arc::new(RwLock::new(None)),
wg_state: RunState::Idle, wg_state: RunState::Idle,
} }
} }
@ -50,15 +50,15 @@ impl DaemonInstance {
warn!("Got start, but tun interface already up."); warn!("Got start, but tun interface already up.");
} }
RunState::Idle => { RunState::Idle => {
let tun_if = Arc::new(RwLock::new(st.tun.open()?)); let tun_if = st.tun.open()?;
debug!("Setting tun on wg_interface");
self.wg_interface.read().await.set_tun(tun_if).await;
debug!("tun set on wg_interface");
debug!("Setting tun_interface"); debug!("Setting tun_interface");
self.tun_interface = Some(tun_if.clone()); self.tun_interface = self.wg_interface.read().await.get_tun();
debug!("tun_interface set: {:?}", self.tun_interface); debug!("tun_interface set: {:?}", self.tun_interface);
debug!("Setting tun on wg_interface");
self.wg_interface.write().await.set_tun(tun_if);
debug!("tun set on wg_interface");
debug!("Cloning wg_interface"); debug!("Cloning wg_interface");
let tmp_wg = self.wg_interface.clone(); let tmp_wg = self.wg_interface.clone();
@ -82,22 +82,18 @@ impl DaemonInstance {
} }
Ok(DaemonResponseData::None) Ok(DaemonResponseData::None)
} }
DaemonCommand::ServerInfo => match &self.tun_interface { DaemonCommand::ServerInfo => match &self.tun_interface.read().await.as_ref() {
None => Ok(DaemonResponseData::None), None => Ok(DaemonResponseData::None),
Some(ti) => { Some(ti) => {
info!("{:?}", ti); info!("{:?}", ti);
Ok(DaemonResponseData::ServerInfo(ServerInfo::try_from( Ok(DaemonResponseData::ServerInfo(ServerInfo::try_from(
ti.read().await.inner.get_ref(), ti.inner.get_ref(),
)?)) )?))
} }
}, },
DaemonCommand::Stop => { DaemonCommand::Stop => {
if self.tun_interface.is_some() { self.wg_interface.read().await.remove_tun().await;
self.tun_interface = None; self.wg_state = RunState::Idle;
info!("Daemon stopping tun interface.");
} else {
warn!("Got stop, but tun interface is not up.")
}
Ok(DaemonResponseData::None) Ok(DaemonResponseData::None)
} }
DaemonCommand::ServerConfig => { DaemonCommand::ServerConfig => {

View file

@ -1,10 +1,11 @@
use std::{net::IpAddr, sync::Arc}; use std::{net::IpAddr, sync::Arc};
use std::ops::Deref;
use anyhow::Error; use anyhow::Error;
use fehler::throws; use fehler::throws;
use futures::future::join_all; use futures::future::join_all;
use ip_network_table::IpNetworkTable; use ip_network_table::IpNetworkTable;
use tokio::sync::RwLock; use tokio::sync::{RwLock, Notify};
use tracing::{debug, error}; use tracing::{debug, error};
use tun::tokio::TunInterface; use tun::tokio::TunInterface;
@ -46,9 +47,21 @@ impl FromIterator<PeerPcb> for IndexedPcbs {
} }
} }
enum IfaceStatus {
Running,
Idle
}
pub struct Interface { pub struct Interface {
tun: Option<Arc<RwLock<TunInterface>>>, tun: Arc<RwLock<Option<TunInterface>>>,
pcbs: Arc<IndexedPcbs>, pcbs: Arc<IndexedPcbs>,
status: Arc<RwLock<IfaceStatus>>,
stop_notifier: Arc<Notify>,
}
async fn is_running(status: Arc<RwLock<IfaceStatus>>) -> bool {
let st = status.read().await;
matches!(st.deref(), IfaceStatus::Running)
} }
impl Interface { impl Interface {
@ -60,35 +73,54 @@ impl Interface {
.collect::<Result<_, _>>()?; .collect::<Result<_, _>>()?;
let pcbs = Arc::new(pcbs); let pcbs = Arc::new(pcbs);
Self { pcbs, tun: None } Self { pcbs, tun: Arc::new(RwLock::new(None)), status: Arc::new(RwLock::new(IfaceStatus::Idle)), stop_notifier: Arc::new(Notify::new()) }
} }
pub fn set_tun(&mut self, tun: Arc<RwLock<TunInterface>>) { pub async fn set_tun(&self, tun: TunInterface) {
self.tun = Some(tun); debug!("Setting tun interface");
self.tun.write().await.replace(tun);
let mut st = self.status.write().await;
*st = IfaceStatus::Running;
}
pub fn get_tun(&self) -> Arc<RwLock<Option<TunInterface>>> {
self.tun.clone()
}
pub async fn remove_tun(&self){
let mut st = self.status.write().await;
self.stop_notifier.notify_waiters();
*st = IfaceStatus::Idle;
} }
pub async fn run(&self) -> anyhow::Result<()> { pub async fn run(&self) -> anyhow::Result<()> {
let pcbs = self.pcbs.clone(); let pcbs = self.pcbs.clone();
let tun = self let tun = self
.tun .tun
.clone() .clone();
.ok_or(anyhow::anyhow!("tun interface does not exist"))?; let status = self.status.clone();
let stop_notifier = self.stop_notifier.clone();
log::info!("Starting interface"); log::info!("Starting interface");
let outgoing = async move { let outgoing = async move {
loop { while is_running(status.clone()).await {
let mut buf = [0u8; 3000]; let mut buf = [0u8; 3000];
let src = { let src = {
let src = match tun.read().await.recv(&mut buf[..]).await { let t = tun.read().await;
Ok(len) => &buf[..len], let Some(_tun) = t.as_ref() else {
Err(e) => { continue;
error!("Failed to read from interface: {}", e);
continue
}
}; };
debug!("Read {} bytes from interface", src.len()); tokio::select! {
src _ = stop_notifier.notified() => continue,
pkg = _tun.recv(&mut buf[..]) => match pkg {
Ok(len) => &buf[..len],
Err(e) => {
error!("Failed to read from interface: {}", e);
continue
}
},
}
}; };
let dst_addr = match Tunnel::dst_address(src) { let dst_addr = match Tunnel::dst_address(src) {
@ -123,8 +155,7 @@ impl Interface {
let mut tsks = vec![]; let mut tsks = vec![];
let tun = self let tun = self
.tun .tun
.clone() .clone();
.ok_or(anyhow::anyhow!("tun interface does not exist"))?;
let outgoing = tokio::task::spawn(outgoing); let outgoing = tokio::task::spawn(outgoing);
tsks.push(outgoing); tsks.push(outgoing);
debug!("preparing to spawn read tasks"); debug!("preparing to spawn read tasks");
@ -149,9 +180,10 @@ impl Interface {
}; };
let pcb = pcbs.pcbs[i].clone(); let pcb = pcbs.pcbs[i].clone();
let status = self.status.clone();
let update_timers_tsk = async move { let update_timers_tsk = async move {
let mut buf = [0u8; 65535]; let mut buf = [0u8; 65535];
loop { while is_running(status.clone()).await {
tokio::time::sleep(tokio::time::Duration::from_millis(250)).await; tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
match pcb.update_timers(&mut buf).await { match pcb.update_timers(&mut buf).await {
Ok(..) => (), Ok(..) => (),
@ -164,8 +196,9 @@ impl Interface {
}; };
let pcb = pcbs.pcbs[i].clone(); let pcb = pcbs.pcbs[i].clone();
let status = self.status.clone();
let reset_rate_limiter_tsk = async move { let reset_rate_limiter_tsk = async move {
loop { while is_running(status.clone()).await {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
pcb.reset_rate_limiter().await; pcb.reset_rate_limiter().await;
} }

View file

@ -54,7 +54,7 @@ impl PeerPcb {
Ok(()) Ok(())
} }
pub async fn run(&self, tun_interface: Arc<RwLock<TunInterface>>) -> Result<(), Error> { pub async fn run(&self, tun_interface: Arc<RwLock<Option<TunInterface>>>) -> Result<(), Error> {
tracing::debug!("starting read loop for pcb... for {:?}", &self); tracing::debug!("starting read loop for pcb... for {:?}", &self);
let rid: i32 = random(); let rid: i32 = random();
let mut buf: [u8; 3000] = [0u8; 3000]; let mut buf: [u8; 3000] = [0u8; 3000];
@ -106,12 +106,12 @@ impl PeerPcb {
} }
TunnResult::WriteToTunnelV4(packet, addr) => { TunnResult::WriteToTunnelV4(packet, addr) => {
tracing::debug!("WriteToTunnelV4: {:?}, {:?}", packet, addr); tracing::debug!("WriteToTunnelV4: {:?}, {:?}", packet, addr);
tun_interface.read().await.send(packet).await?; tun_interface.read().await.as_ref().ok_or(anyhow::anyhow!("tun interface does not exist"))?.send(packet).await?;
break break
} }
TunnResult::WriteToTunnelV6(packet, addr) => { TunnResult::WriteToTunnelV6(packet, addr) => {
tracing::debug!("WriteToTunnelV6: {:?}, {:?}", packet, addr); tracing::debug!("WriteToTunnelV6: {:?}, {:?}", packet, addr);
tun_interface.read().await.send(packet).await?; tun_interface.read().await.as_ref().ok_or(anyhow::anyhow!("tun interface does not exist"))?.send(packet).await?;
break break
} }
} }