Add Arti system TCP transport

This commit is contained in:
Conrad Kramer 2026-03-18 02:45:55 -07:00
parent 3fb0269d7c
commit 482fd5d085
15 changed files with 5427 additions and 451 deletions

4292
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -10,7 +10,7 @@ crate-type = ["lib", "staticlib"]
[dependencies] [dependencies]
anyhow = "1.0" anyhow = "1.0"
tokio = { version = "1.37", features = [ tokio = { version = "1.50.0", features = [
"rt", "rt",
"macros", "macros",
"sync", "sync",
@ -50,22 +50,25 @@ async-channel = "2.1"
schemars = "0.8" schemars = "0.8"
futures = "0.3.28" futures = "0.3.28"
once_cell = "1.19" once_cell = "1.19"
arti-client = "0.40.0"
tokio-util = { version = "0.7.18", features = ["compat"] }
console-subscriber = { version = "0.2.0", optional = true } console-subscriber = { version = "0.2.0", optional = true }
console = "0.15.8" console = "0.15.8"
axum = "0.7.4" axum = "0.8.8"
reqwest = { version = "0.12", default-features = false, features = [ reqwest = { version = "0.13.2", default-features = false, features = [
"json", "json",
"rustls-tls", "rustls",
] } ] }
rusqlite = { version = "0.31.0", features = ["blob"] } rusqlite = { version = "0.38.0", features = ["blob"] }
dotenv = "0.15.0" dotenv = "0.15.0"
tonic = "0.12.0" tonic = "0.14.5"
prost = "0.13.1" tonic-prost = "0.14.5"
prost-types = "0.13.1" prost = "0.14.3"
tokio-stream = "0.1" prost-types = "0.14.3"
tokio-stream = "0.1.18"
async-stream = "0.2" async-stream = "0.2"
tower = "0.4.13" tower = "0.5.3"
hyper-util = "0.1.6" hyper-util = "0.1.20"
toml = "0.8.15" toml = "0.8.15"
rust-ini = "0.21.0" rust-ini = "0.21.0"
@ -73,10 +76,11 @@ rust-ini = "0.21.0"
caps = "0.5" caps = "0.5"
libsystemd = "0.7" libsystemd = "0.7"
tracing-journald = "0.3" tracing-journald = "0.3"
libc = "0.2"
[target.'cfg(target_vendor = "apple")'.dependencies] [target.'cfg(target_vendor = "apple")'.dependencies]
nix = { version = "0.27" } nix = { version = "0.27", features = ["ioctl"] }
rusqlite = { version = "0.31.0", features = ["bundled", "blob"] } rusqlite = { version = "0.38.0", features = ["bundled", "blob"] }
[dev-dependencies] [dev-dependencies]
insta = { version = "1.32", features = ["yaml"] } insta = { version = "1.32", features = ["yaml"] }
@ -96,4 +100,4 @@ bundled = ["rusqlite/bundled"]
[build-dependencies] [build-dependencies]
tonic-build = "0.12.0" tonic-prost-build = "0.14.5"

View file

@ -1,4 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> { fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../proto/burrow.proto")?; tonic_prost_build::compile_protos("../proto/burrow.proto")?;
Ok(()) Ok(())
} }

View file

@ -1,61 +1,186 @@
use std::{ use std::{
ops::Deref,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
time::Duration,
}; };
use anyhow::Result; use anyhow::{anyhow, Context, Result};
use rusqlite::Connection; use rusqlite::Connection;
use tokio::sync::{mpsc, watch, Notify, RwLock}; use tokio::{
sync::{mpsc, watch, RwLock},
task::JoinHandle,
};
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status as RspStatus}; use tonic::{Request, Response, Status as RspStatus};
use tracing::{debug, info, warn}; use tracing::warn;
use tun::{tokio::TunInterface, TunOptions}; use tun::{tokio::TunInterface, TunOptions};
use super::rpc::grpc_defs::{ use super::rpc::{
networks_server::Networks, grpc_defs::{
tunnel_server::Tunnel, networks_server::Networks, tunnel_server::Tunnel, Empty, Network, NetworkDeleteRequest,
Empty, NetworkListResponse, NetworkReorderRequest, NetworkType, State as RPCTunnelState,
Network, TunnelConfigurationResponse, TunnelStatusResponse,
NetworkDeleteRequest, },
NetworkListResponse, ServerConfig,
NetworkReorderRequest,
State as RPCTunnelState,
TunnelConfigurationResponse,
TunnelStatusResponse,
}; };
use crate::{ use crate::{
daemon::rpc::{ database::{add_network, delete_network, get_connection, list_networks, reorder_network},
DaemonCommand, tor::{self, Config as TorConfig, TorHandle},
DaemonNotification, wireguard::{Config as WireGuardConfig, Interface as WireGuardInterface},
DaemonResponse,
DaemonResponseData,
ServerConfig,
ServerInfo,
},
database::{
add_network,
delete_network,
get_connection,
list_networks,
load_interface,
reorder_network,
},
wireguard::{Config, Interface},
}; };
#[derive(Debug, Clone)] #[derive(Debug, Clone, PartialEq, Eq)]
enum RunState { enum RunState {
Running, Running,
Idle, Idle,
} }
impl RunState { impl RunState {
pub fn to_rpc(&self) -> RPCTunnelState { fn to_rpc(&self) -> RPCTunnelState {
match self { match self {
RunState::Running => RPCTunnelState::Running, Self::Running => RPCTunnelState::Running,
RunState::Idle => RPCTunnelState::Stopped, Self::Idle => RPCTunnelState::Stopped,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
enum RuntimeIdentity {
DefaultWireGuard,
Network { id: i32, network_type: NetworkType },
}
#[derive(Clone, Debug)]
enum ResolvedTunnel {
WireGuard {
identity: RuntimeIdentity,
config: WireGuardConfig,
},
Tor {
identity: RuntimeIdentity,
config: TorConfig,
},
}
impl ResolvedTunnel {
fn from_networks(networks: &[Network], fallback: &WireGuardConfig) -> Result<Self> {
let Some(network) = networks.first() else {
return Ok(Self::WireGuard {
identity: RuntimeIdentity::DefaultWireGuard,
config: fallback.clone(),
});
};
let identity = RuntimeIdentity::Network {
id: network.id,
network_type: network.r#type(),
};
match network.r#type() {
NetworkType::WireGuard => {
let payload = String::from_utf8(network.payload.clone())
.context("wireguard payload must be valid UTF-8")?;
let config = WireGuardConfig::from_content_fmt(&payload, "ini")?;
Ok(Self::WireGuard { identity, config })
}
NetworkType::Tor => {
let config = TorConfig::from_payload(&network.payload)?;
Ok(Self::Tor { identity, config })
}
NetworkType::HackClub => {
Err(anyhow!("HackClub runtime is not available on this branch"))
}
}
}
fn identity(&self) -> &RuntimeIdentity {
match self {
Self::WireGuard { identity, .. } | Self::Tor { identity, .. } => identity,
}
}
fn server_config(&self) -> Result<ServerConfig> {
match self {
Self::WireGuard { config, .. } => ServerConfig::try_from(config),
Self::Tor { config, .. } => Ok(ServerConfig {
address: config.address.clone(),
name: config.tun_name.clone(),
mtu: config.mtu.map(|mtu| mtu as i32),
}),
}
}
async fn start(self, tun_interface: Arc<RwLock<Option<TunInterface>>>) -> Result<ActiveTunnel> {
match self {
Self::WireGuard { identity, config } => {
let tun = TunOptions::new()
.address(config.interface.address.clone())
.open()?;
tun_interface.write().await.replace(tun);
let mut interface: WireGuardInterface = config.try_into()?;
interface.set_tun_ref(tun_interface.clone()).await;
let interface = Arc::new(RwLock::new(interface));
let run_interface = interface.clone();
let task = tokio::spawn(async move {
let guard = run_interface.read().await;
guard.run().await
});
Ok(ActiveTunnel::WireGuard { identity, interface, task })
}
Self::Tor { identity, config } => {
let mut tun_options = TunOptions::new().address(config.address.clone());
if let Some(name) = config.tun_name.as_deref() {
tun_options = tun_options.name(name);
}
let tun = tun_options.open()?;
tun_interface.write().await.replace(tun);
match tor::spawn(config).await {
Ok(handle) => Ok(ActiveTunnel::Tor { identity, handle }),
Err(err) => {
tun_interface.write().await.take();
Err(err)
}
}
}
}
}
}
enum ActiveTunnel {
WireGuard {
identity: RuntimeIdentity,
interface: Arc<RwLock<WireGuardInterface>>,
task: JoinHandle<Result<()>>,
},
Tor {
identity: RuntimeIdentity,
handle: TorHandle,
},
}
impl ActiveTunnel {
fn identity(&self) -> &RuntimeIdentity {
match self {
Self::WireGuard { identity, .. } | Self::Tor { identity, .. } => identity,
}
}
async fn shutdown(self, tun_interface: &Arc<RwLock<Option<TunInterface>>>) -> Result<()> {
match self {
Self::WireGuard { interface, task, .. } => {
interface.read().await.remove_tun().await;
let task_result = task.await;
tun_interface.write().await.take();
task_result??;
Ok(())
}
Self::Tor { handle, .. } => {
let result = handle.shutdown().await;
tun_interface.write().await.take();
result
}
} }
} }
} }
@ -63,30 +188,26 @@ impl RunState {
#[derive(Clone)] #[derive(Clone)]
pub struct DaemonRPCServer { pub struct DaemonRPCServer {
tun_interface: Arc<RwLock<Option<TunInterface>>>, tun_interface: Arc<RwLock<Option<TunInterface>>>,
wg_interface: Arc<RwLock<Interface>>, default_config: Arc<RwLock<WireGuardConfig>>,
config: Arc<RwLock<Config>>,
db_path: Option<PathBuf>, db_path: Option<PathBuf>,
wg_state_chan: (watch::Sender<RunState>, watch::Receiver<RunState>), wg_state_chan: (watch::Sender<RunState>, watch::Receiver<RunState>),
network_update_chan: (watch::Sender<()>, watch::Receiver<()>), network_update_chan: (watch::Sender<()>, watch::Receiver<()>),
active_tunnel: Arc<RwLock<Option<ActiveTunnel>>>,
} }
impl DaemonRPCServer { impl DaemonRPCServer {
pub fn new( pub fn new(config: Arc<RwLock<WireGuardConfig>>, db_path: Option<&Path>) -> Result<Self> {
wg_interface: Arc<RwLock<Interface>>,
config: Arc<RwLock<Config>>,
db_path: Option<&Path>,
) -> Result<Self> {
Ok(Self { Ok(Self {
tun_interface: Arc::new(RwLock::new(None)), tun_interface: Arc::new(RwLock::new(None)),
wg_interface, default_config: config,
config, db_path: db_path.map(Path::to_owned),
db_path: db_path.map(|p| p.to_owned()),
wg_state_chan: watch::channel(RunState::Idle), wg_state_chan: watch::channel(RunState::Idle),
network_update_chan: watch::channel(()), network_update_chan: watch::channel(()),
active_tunnel: Arc::new(RwLock::new(None)),
}) })
} }
pub fn get_connection(&self) -> Result<Connection, RspStatus> { fn get_connection(&self) -> Result<Connection, RspStatus> {
get_connection(self.db_path.as_deref()).map_err(proc_err) get_connection(self.db_path.as_deref()).map_err(proc_err)
} }
@ -94,13 +215,70 @@ impl DaemonRPCServer {
self.wg_state_chan.0.send(state).map_err(proc_err) self.wg_state_chan.0.send(state).map_err(proc_err)
} }
async fn get_wg_state(&self) -> RunState {
self.wg_state_chan.1.borrow().to_owned()
}
async fn notify_network_update(&self) -> Result<(), RspStatus> { async fn notify_network_update(&self) -> Result<(), RspStatus> {
self.network_update_chan.0.send(()).map_err(proc_err) self.network_update_chan.0.send(()).map_err(proc_err)
} }
async fn resolve_tunnel(&self) -> Result<ResolvedTunnel, RspStatus> {
let conn = self.get_connection()?;
let networks = list_networks(&conn).map_err(proc_err)?;
let fallback = self.default_config.read().await.clone();
ResolvedTunnel::from_networks(&networks, &fallback).map_err(proc_err)
}
async fn current_tunnel_configuration(&self) -> Result<TunnelConfigurationResponse, RspStatus> {
let config = self
.resolve_tunnel()
.await?
.server_config()
.map_err(proc_err)?;
Ok(TunnelConfigurationResponse {
addresses: config.address,
mtu: config.mtu.unwrap_or(1500),
})
}
async fn stop_active_tunnel(&self) -> Result<bool, RspStatus> {
let current = { self.active_tunnel.write().await.take() };
let Some(current) = current else {
return Ok(false);
};
current
.shutdown(&self.tun_interface)
.await
.map_err(proc_err)?;
self.set_wg_state(RunState::Idle).await?;
Ok(true)
}
async fn replace_active_tunnel(&self, desired: ResolvedTunnel) -> Result<(), RspStatus> {
let _ = self.stop_active_tunnel().await?;
let active = desired
.start(self.tun_interface.clone())
.await
.map_err(proc_err)?;
self.active_tunnel.write().await.replace(active);
self.set_wg_state(RunState::Running).await?;
Ok(())
}
async fn reconcile_runtime(&self) -> Result<(), RspStatus> {
let desired = self.resolve_tunnel().await?;
let needs_restart = {
let guard = self.active_tunnel.read().await;
guard
.as_ref()
.map(|active| active.identity() != desired.identity())
.unwrap_or(false)
};
if needs_restart {
self.replace_active_tunnel(desired).await?;
}
Ok(())
}
} }
#[tonic::async_trait] #[tonic::async_trait]
@ -113,55 +291,46 @@ impl Tunnel for DaemonRPCServer {
_request: Request<Empty>, _request: Request<Empty>,
) -> Result<Response<Self::TunnelConfigurationStream>, RspStatus> { ) -> Result<Response<Self::TunnelConfigurationStream>, RspStatus> {
let (tx, rx) = mpsc::channel(10); let (tx, rx) = mpsc::channel(10);
let server = self.clone();
let mut sub = self.network_update_chan.1.clone();
tokio::spawn(async move { tokio::spawn(async move {
let serv_config = ServerConfig::default(); loop {
tx.send(Ok(TunnelConfigurationResponse { let response = server.current_tunnel_configuration().await;
mtu: serv_config.mtu.unwrap_or(1000), if tx.send(response).await.is_err() {
addresses: serv_config.address, break;
})) }
.await if sub.changed().await.is_err() {
break;
}
}
}); });
Ok(Response::new(ReceiverStream::new(rx))) Ok(Response::new(ReceiverStream::new(rx)))
} }
async fn tunnel_start(&self, _request: Request<Empty>) -> Result<Response<Empty>, RspStatus> { async fn tunnel_start(&self, _request: Request<Empty>) -> Result<Response<Empty>, RspStatus> {
let wg_state = self.get_wg_state().await; let desired = self.resolve_tunnel().await?;
match wg_state { let already_running = {
RunState::Idle => { let guard = self.active_tunnel.read().await;
let tun_if = TunOptions::new().open()?; guard
debug!("Setting tun on wg_interface"); .as_ref()
self.tun_interface.write().await.replace(tun_if); .map(|active| active.identity() == desired.identity())
self.wg_interface .unwrap_or(false)
.write() };
.await
.set_tun_ref(self.tun_interface.clone())
.await;
debug!("tun set on wg_interface");
debug!("Setting tun_interface");
debug!("tun_interface set: {:?}", self.tun_interface);
debug!("Cloning wg_interface");
let tmp_wg = self.wg_interface.clone();
let run_task = tokio::spawn(async move {
let twlock = tmp_wg.read().await;
twlock.run().await
});
self.set_wg_state(RunState::Running).await?;
}
RunState::Running => {
warn!("Got start, but tun interface already up.");
}
}
if already_running {
warn!("Got start, but active tunnel already matches desired network.");
return Ok(Response::new(Empty {})); return Ok(Response::new(Empty {}));
} }
self.replace_active_tunnel(desired).await?;
Ok(Response::new(Empty {}))
}
async fn tunnel_stop(&self, _request: Request<Empty>) -> Result<Response<Empty>, RspStatus> { async fn tunnel_stop(&self, _request: Request<Empty>) -> Result<Response<Empty>, RspStatus> {
self.wg_interface.write().await.remove_tun().await; let _ = self.stop_active_tunnel().await?;
self.set_wg_state(RunState::Idle).await?; Ok(Response::new(Empty {}))
return Ok(Response::new(Empty {}));
} }
async fn tunnel_status( async fn tunnel_status(
@ -172,13 +341,16 @@ impl Tunnel for DaemonRPCServer {
let mut state_rx = self.wg_state_chan.1.clone(); let mut state_rx = self.wg_state_chan.1.clone();
tokio::spawn(async move { tokio::spawn(async move {
let cur = state_rx.borrow_and_update().to_owned(); let cur = state_rx.borrow_and_update().to_owned();
tx.send(Ok(status_rsp(cur))).await; if tx.send(Ok(status_rsp(cur))).await.is_err() {
return;
}
loop { loop {
state_rx.changed().await.unwrap(); if state_rx.changed().await.is_err() {
break;
}
let cur = state_rx.borrow().to_owned(); let cur = state_rx.borrow().to_owned();
let res = tx.send(Ok(status_rsp(cur))).await; if tx.send(Ok(status_rsp(cur))).await.is_err() {
if res.is_err() {
eprintln!("Tunnel status channel closed");
break; break;
} }
} }
@ -196,6 +368,7 @@ impl Networks for DaemonRPCServer {
let network = request.into_inner(); let network = request.into_inner();
add_network(&conn, &network).map_err(proc_err)?; add_network(&conn, &network).map_err(proc_err)?;
self.notify_network_update().await?; self.notify_network_update().await?;
self.reconcile_runtime().await?;
Ok(Response::new(Empty {})) Ok(Response::new(Empty {}))
} }
@ -203,7 +376,6 @@ impl Networks for DaemonRPCServer {
&self, &self,
_request: Request<Empty>, _request: Request<Empty>,
) -> Result<Response<Self::NetworkListStream>, RspStatus> { ) -> Result<Response<Self::NetworkListStream>, RspStatus> {
debug!("Mock network_list called");
let (tx, rx) = mpsc::channel(10); let (tx, rx) = mpsc::channel(10);
let conn = self.get_connection()?; let conn = self.get_connection()?;
let mut sub = self.network_update_chan.1.clone(); let mut sub = self.network_update_chan.1.clone();
@ -212,12 +384,12 @@ impl Networks for DaemonRPCServer {
let networks = list_networks(&conn) let networks = list_networks(&conn)
.map(|res| NetworkListResponse { network: res }) .map(|res| NetworkListResponse { network: res })
.map_err(proc_err); .map_err(proc_err);
let res = tx.send(networks).await; if tx.send(networks).await.is_err() {
if res.is_err() { break;
eprintln!("Network list channel closed"); }
if sub.changed().await.is_err() {
break; break;
} }
sub.changed().await.unwrap();
} }
}); });
Ok(Response::new(ReceiverStream::new(rx))) Ok(Response::new(ReceiverStream::new(rx)))
@ -230,6 +402,7 @@ impl Networks for DaemonRPCServer {
let conn = self.get_connection()?; let conn = self.get_connection()?;
reorder_network(&conn, request.into_inner()).map_err(proc_err)?; reorder_network(&conn, request.into_inner()).map_err(proc_err)?;
self.notify_network_update().await?; self.notify_network_update().await?;
self.reconcile_runtime().await?;
Ok(Response::new(Empty {})) Ok(Response::new(Empty {}))
} }
@ -240,6 +413,7 @@ impl Networks for DaemonRPCServer {
let conn = self.get_connection()?; let conn = self.get_connection()?;
delete_network(&conn, request.into_inner()).map_err(proc_err)?; delete_network(&conn, request.into_inner()).map_err(proc_err)?;
self.notify_network_update().await?; self.notify_network_update().await?;
self.reconcile_runtime().await?;
Ok(Response::new(Empty {})) Ok(Response::new(Empty {}))
} }
} }
@ -251,6 +425,6 @@ fn proc_err(err: impl ToString) -> RspStatus {
fn status_rsp(state: RunState) -> TunnelStatusResponse { fn status_rsp(state: RunState) -> TunnelStatusResponse {
TunnelStatusResponse { TunnelStatusResponse {
state: state.to_rpc().into(), state: state.to_rpc().into(),
start: None, // TODO: Add timestamp start: None,
} }
} }

View file

@ -15,12 +15,11 @@ use tokio::{
}; };
use tokio_stream::wrappers::UnixListenerStream; use tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::Server; use tonic::transport::Server;
use tracing::{error, info}; use tracing::info;
use crate::{ use crate::{
daemon::rpc::grpc_defs::{networks_server::NetworksServer, tunnel_server::TunnelServer}, daemon::rpc::grpc_defs::{networks_server::NetworksServer, tunnel_server::TunnelServer},
database::{get_connection, load_interface}, database::{get_connection, load_interface},
wireguard::Interface,
}; };
pub async fn daemon_main( pub async fn daemon_main(
@ -33,11 +32,7 @@ pub async fn daemon_main(
} }
let conn = get_connection(db_path)?; let conn = get_connection(db_path)?;
let config = load_interface(&conn, "1")?; let config = load_interface(&conn, "1")?;
let burrow_server = DaemonRPCServer::new( let burrow_server = DaemonRPCServer::new(Arc::new(RwLock::new(config)), db_path.clone())?;
Arc::new(RwLock::new(config.clone().try_into()?)),
Arc::new(RwLock::new(config)),
db_path.clone(),
)?;
let spp = socket_path.clone(); let spp = socket_path.clone();
let tmp = get_socket_path(); let tmp = get_socket_path();
let sock_path = spp.unwrap_or(Path::new(tmp.as_str())); let sock_path = spp.unwrap_or(Path::new(tmp.as_str()));

View file

@ -56,7 +56,7 @@ END;
pub fn initialize_tables(conn: &Connection) -> Result<()> { pub fn initialize_tables(conn: &Connection) -> Result<()> {
conn.execute(CREATE_WG_INTERFACE_TABLE, [])?; conn.execute(CREATE_WG_INTERFACE_TABLE, [])?;
conn.execute(CREATE_WG_PEER_TABLE, [])?; conn.execute(CREATE_WG_PEER_TABLE, [])?;
conn.execute(CREATE_NETWORK_TABLE, [])?; conn.execute_batch(CREATE_NETWORK_TABLE)?;
Ok(()) Ok(())
} }

View file

@ -1,22 +1,20 @@
#[cfg(any(target_os = "linux", target_vendor = "apple"))] #[cfg(any(target_os = "linux", target_vendor = "apple"))]
pub mod tor;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
pub mod wireguard; pub mod wireguard;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
mod auth;
#[cfg(any(target_os = "linux", target_vendor = "apple"))] #[cfg(any(target_os = "linux", target_vendor = "apple"))]
mod daemon; mod daemon;
#[cfg(any(target_os = "linux", target_vendor = "apple"))] #[cfg(any(target_os = "linux", target_vendor = "apple"))]
pub mod database; pub mod database;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
mod auth;
pub(crate) mod tracing; pub(crate) mod tracing;
#[cfg(target_vendor = "apple")] #[cfg(target_vendor = "apple")]
pub use daemon::apple::spawn_in_process; pub use daemon::apple::spawn_in_process;
#[cfg(any(target_os = "linux", target_vendor = "apple"))] #[cfg(any(target_os = "linux", target_vendor = "apple"))]
pub use daemon::{ pub use daemon::{
rpc::DaemonResponse, rpc::DaemonResponse, rpc::ServerInfo, DaemonClient, DaemonCommand, DaemonResponseData,
rpc::ServerInfo,
DaemonClient,
DaemonCommand,
DaemonResponseData,
DaemonStartOptions, DaemonStartOptions,
}; };

View file

@ -3,6 +3,8 @@ use clap::{Args, Parser, Subcommand};
#[cfg(any(target_os = "linux", target_vendor = "apple"))] #[cfg(any(target_os = "linux", target_vendor = "apple"))]
mod daemon; mod daemon;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
mod tor;
pub(crate) mod tracing; pub(crate) mod tracing;
#[cfg(any(target_os = "linux", target_vendor = "apple"))] #[cfg(any(target_os = "linux", target_vendor = "apple"))]
mod wireguard; mod wireguard;

125
burrow/src/tor/config.rs Normal file
View file

@ -0,0 +1,125 @@
use std::{net::SocketAddr, str};
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Config {
#[serde(default)]
pub address: Vec<String>,
#[serde(default)]
pub dns: Vec<String>,
#[serde(default)]
pub mtu: Option<u32>,
#[serde(default)]
pub tun_name: Option<String>,
#[serde(default)]
pub arti: ArtiConfig,
#[serde(default)]
pub tcp_stack: TcpStackConfig,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ArtiConfig {
pub state_dir: String,
pub cache_dir: String,
}
impl Default for ArtiConfig {
fn default() -> Self {
Self {
state_dir: "/var/lib/burrow/arti/state".to_string(),
cache_dir: "/var/cache/burrow/arti".to_string(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum TcpStackConfig {
System(SystemTcpStackConfig),
}
impl Default for TcpStackConfig {
fn default() -> Self {
Self::System(SystemTcpStackConfig::default())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SystemTcpStackConfig {
#[serde(default = "default_system_listen")]
pub listen: String,
}
impl Default for SystemTcpStackConfig {
fn default() -> Self {
Self {
listen: default_system_listen(),
}
}
}
impl Config {
pub fn from_payload(payload: &[u8]) -> Result<Self> {
if let Ok(config) = serde_json::from_slice(payload) {
return Ok(config);
}
let payload = str::from_utf8(payload).context("tor payload must be valid UTF-8")?;
toml::from_str(payload).context("failed to parse tor payload as JSON or TOML")
}
pub fn listen_addr(&self) -> Result<SocketAddr> {
match &self.tcp_stack {
TcpStackConfig::System(config) => config
.listen
.parse()
.with_context(|| format!("invalid system tcp listen address '{}'", config.listen)),
}
}
}
fn default_system_listen() -> String {
"127.0.0.1:9040".to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_json_payload() {
let payload = br#"{
"address":["100.64.0.2/32"],
"mtu":1400,
"arti":{"state_dir":"/tmp/state","cache_dir":"/tmp/cache"},
"tcp_stack":{"kind":"system","listen":"127.0.0.1:9150"}
}"#;
let config = Config::from_payload(payload).unwrap();
assert_eq!(config.address, vec!["100.64.0.2/32"]);
assert_eq!(config.listen_addr().unwrap().to_string(), "127.0.0.1:9150");
}
#[test]
fn parses_toml_payload() {
let payload = r#"
address = ["100.64.0.3/32"]
mtu = 1280
tun_name = "burrow-tor"
[arti]
state_dir = "/tmp/state"
cache_dir = "/tmp/cache"
[tcp_stack]
kind = "system"
listen = "127.0.0.1:9140"
"#;
let config = Config::from_payload(payload.as_bytes()).unwrap();
assert_eq!(config.tun_name.as_deref(), Some("burrow-tor"));
assert_eq!(config.listen_addr().unwrap().to_string(), "127.0.0.1:9140");
}
}

6
burrow/src/tor/mod.rs Normal file
View file

@ -0,0 +1,6 @@
mod config;
mod runtime;
mod system;
pub use config::{ArtiConfig, Config, SystemTcpStackConfig, TcpStackConfig};
pub use runtime::{spawn, TorHandle};

116
burrow/src/tor/runtime.rs Normal file
View file

@ -0,0 +1,116 @@
use std::{sync::Arc, time::Duration};
use anyhow::{Context, Result};
use arti_client::{config::TorClientConfigBuilder, TorClient};
use tokio::{
sync::watch,
task::{JoinError, JoinSet},
};
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{debug, error, info, warn};
use super::{system::SystemTcpStackRuntime, Config, TcpStackConfig};
#[derive(Debug)]
pub struct TorHandle {
shutdown: watch::Sender<bool>,
task: tokio::task::JoinHandle<()>,
}
impl TorHandle {
pub async fn shutdown(self) -> Result<()> {
let _ = self.shutdown.send(true);
match self.task.await {
Ok(()) => Ok(()),
Err(err) if err.is_cancelled() => Ok(()),
Err(err) => Err(join_error(err)),
}
}
}
pub async fn spawn(config: Config) -> Result<TorHandle> {
let builder =
TorClientConfigBuilder::from_directories(&config.arti.state_dir, &config.arti.cache_dir);
let tor_config = builder.build().context("failed to build arti config")?;
let tor_client = Arc::new(
TorClient::create_bootstrapped(tor_config)
.await
.context("failed to bootstrap arti client")?,
);
let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
let task = match config.tcp_stack.clone() {
TcpStackConfig::System(system_config) => tokio::spawn(async move {
let stack = match SystemTcpStackRuntime::bind(&system_config).await {
Ok(stack) => stack,
Err(err) => {
error!(?err, "failed to bind system tcp stack listener");
return;
}
};
info!(
listen = %stack.local_addr(),
"system tcp stack listener bound for tor transparent proxy"
);
let mut connections = JoinSet::new();
loop {
tokio::select! {
changed = shutdown_rx.changed() => {
match changed {
Ok(()) if *shutdown_rx.borrow() => break,
Ok(()) => continue,
Err(_) => break,
}
}
Some(res) = connections.join_next(), if !connections.is_empty() => {
match res {
Ok(Ok(())) => {}
Ok(Err(err)) => warn!(?err, "transparent proxy task failed"),
Err(err) => warn!(?err, "transparent proxy task panicked"),
}
}
accepted = stack.accept() => {
let (mut inbound, original_dst) = match accepted {
Ok(pair) => pair,
Err(err) => {
warn!(?err, "failed to accept transparent tcp connection");
tokio::time::sleep(Duration::from_millis(50)).await;
continue;
}
};
let tor_client = tor_client.clone();
connections.spawn(async move {
debug!(%original_dst, "accepted transparent tcp connection");
let tor_stream = tor_client
.connect((original_dst.ip().to_string(), original_dst.port()))
.await
.with_context(|| format!("failed to connect to {original_dst} over tor"))?;
let mut tor_stream = tor_stream.compat();
tokio::io::copy_bidirectional(&mut inbound, &mut tor_stream)
.await
.with_context(|| format!("failed to bridge tor stream for {original_dst}"))?;
Result::<()>::Ok(())
});
}
}
}
connections.abort_all();
while let Some(res) = connections.join_next().await {
match res {
Ok(Ok(())) => {}
Ok(Err(err)) => debug!(?err, "transparent proxy task failed during shutdown"),
Err(err) => debug!(?err, "transparent proxy task exited during shutdown"),
}
}
}),
};
Ok(TorHandle { shutdown: shutdown_tx, task })
}
fn join_error(err: JoinError) -> anyhow::Error {
anyhow::anyhow!("tor runtime task failed: {err}")
}

856
burrow/src/tor/system.rs Normal file
View file

@ -0,0 +1,856 @@
use std::net::SocketAddr;
use anyhow::{Context, Result};
use tokio::net::{TcpListener, TcpStream};
use super::SystemTcpStackConfig;
pub struct SystemTcpStackRuntime {
listener: TcpListener,
#[cfg(target_vendor = "apple")]
flow_tracker: AppleFlowTracker,
}
impl SystemTcpStackRuntime {
pub async fn bind(config: &SystemTcpStackConfig) -> Result<Self> {
let listener = TcpListener::bind(&config.listen)
.await
.with_context(|| format!("failed to bind transparent listener on {}", config.listen))?;
#[cfg(target_vendor = "apple")]
let flow_tracker = AppleFlowTracker::new(
listener
.local_addr()
.expect("listener should always have a local address"),
)
.context("failed to open /dev/pf for transparent destination lookups")?;
Ok(Self {
listener,
#[cfg(target_vendor = "apple")]
flow_tracker,
})
}
pub fn local_addr(&self) -> SocketAddr {
self.listener
.local_addr()
.expect("listener should always have a local address")
}
pub async fn accept(&self) -> Result<(TcpStream, SocketAddr)> {
let (stream, _) = self
.listener
.accept()
.await
.context("failed to accept transparent listener connection")?;
#[cfg(target_vendor = "apple")]
let original_dst = self.flow_tracker.resolve(&stream)?;
#[cfg(not(target_vendor = "apple"))]
let original_dst = original_destination(&stream)?;
Ok((stream, original_dst))
}
}
#[cfg(target_os = "linux")]
fn original_destination(stream: &TcpStream) -> Result<SocketAddr> {
use std::{
mem::{size_of, MaybeUninit},
os::fd::AsRawFd,
};
let level = if stream.local_addr()?.is_ipv6() {
libc::SOL_IPV6
} else {
libc::SOL_IP
};
let mut addr = MaybeUninit::<libc::sockaddr_storage>::zeroed();
let mut len = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
let rc = unsafe {
libc::getsockopt(
stream.as_raw_fd(),
level,
80,
addr.as_mut_ptr().cast(),
&mut len,
)
};
if rc != 0 {
return Err(std::io::Error::last_os_error()).context("SO_ORIGINAL_DST lookup failed");
}
socket_addr_from_storage(unsafe { &addr.assume_init() }, len as usize)
}
#[cfg(all(not(target_os = "linux"), not(target_vendor = "apple")))]
fn original_destination(_stream: &TcpStream) -> Result<SocketAddr> {
anyhow::bail!("system tcp stack transparent destination lookup is only implemented on linux")
}
#[cfg(target_vendor = "apple")]
mod apple_pf {
use std::{
collections::HashMap,
fs::File,
io,
mem::zeroed,
io::Read,
net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
os::fd::{AsRawFd, RawFd},
time::{Duration, Instant},
};
use anyhow::{anyhow, bail, Context, Result};
use nix::{ioctl_readwrite, libc};
use parking_lot::Mutex;
use tokio::net::TcpStream;
ioctl_readwrite!(pf_natlook, b'D', 23, PfiocNatlook);
const FLOW_CACHE_LIMIT: usize = 4096;
const FLOW_CACHE_TTL: Duration = Duration::from_secs(30);
const PF_OUT: u8 = 2;
const PFLOG_RULESET_NAME_SIZE: usize = 16;
const PFLOG_DEVICE: &str = "pflog0";
const OBSERVER_WAIT_STEPS: usize = 20;
const OBSERVER_WAIT_INTERVAL: Duration = Duration::from_millis(10);
pub(super) struct AppleFlowTracker {
pf: File,
listener_addr: SocketAddr,
state: Mutex<FlowState>,
}
impl AppleFlowTracker {
pub(super) fn new(listener_addr: SocketAddr) -> io::Result<Self> {
Ok(Self {
pf: File::options().read(true).write(true).open("/dev/pf")?,
listener_addr,
state: Mutex::new(FlowState {
cache: HashMap::new(),
observer: PacketObserver::new(listener_addr).ok(),
}),
})
}
pub(super) fn resolve(&self, stream: &TcpStream) -> Result<SocketAddr> {
let key = FlowKey::from_stream(stream)?;
if let Some(original_dst) = self.cached_destination(key) {
return Ok(original_dst);
}
match lookup_pf_original_destination(self.pf.as_raw_fd(), key.peer, key.local) {
Ok(original_dst) => {
self.remember(key, original_dst);
Ok(original_dst)
}
Err(err)
if matches!(
err.raw_os_error(),
Some(code) if code == libc::EPERM || code == libc::ENOENT
) =>
{
if let Some(original_dst) = self.wait_for_observer(key) {
return Ok(original_dst);
}
match err.raw_os_error() {
Some(code) if code == libc::EPERM => Err(anyhow!(
"PF NAT lookups are denied on this macOS build and no logged redirect flow was observed for {} -> {}",
key.peer,
key.local
)),
Some(code) if code == libc::ENOENT => Err(anyhow!(
"PF did not have a redirect state for {} -> {} and no logged redirect flow was observed; ensure outbound TCP is redirected and logged before Burrow accepts it",
key.peer,
key.local
)),
_ => unreachable!(),
}
}
Err(err) => Err(err).context("DIOCNATLOOK failed"),
}
}
fn cached_destination(&self, key: FlowKey) -> Option<SocketAddr> {
let mut state = self.state.lock();
state.prune();
state.drain_observer(self.listener_addr);
state.cache.get(&key).map(|entry| entry.original_dst)
}
fn remember(&self, key: FlowKey, original_dst: SocketAddr) {
let mut state = self.state.lock();
state.prune();
remember_flow(&mut state.cache, key, original_dst, Instant::now());
}
fn wait_for_observer(&self, key: FlowKey) -> Option<SocketAddr> {
for _ in 0..OBSERVER_WAIT_STEPS {
if let Some(original_dst) = self.cached_destination(key) {
return Some(original_dst);
}
std::thread::sleep(OBSERVER_WAIT_INTERVAL);
}
None
}
}
struct FlowState {
cache: HashMap<FlowKey, FlowEntry>,
observer: Option<PacketObserver>,
}
impl FlowState {
fn prune(&mut self) {
let now = Instant::now();
self.cache.retain(|_, entry| entry.expires_at > now);
}
fn drain_observer(&mut self, listener_addr: SocketAddr) {
let Some(mut observer) = self.observer.take() else {
return;
};
if observer.drain(listener_addr, &mut self.cache).is_ok() {
self.observer = Some(observer);
}
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
struct FlowKey {
peer: SocketAddr,
local: SocketAddr,
}
impl FlowKey {
fn from_stream(stream: &TcpStream) -> Result<Self> {
let peer = stream.peer_addr().context("failed to read transparent peer address")?;
let local = stream
.local_addr()
.context("failed to read transparent listener address")?;
match (peer, local) {
(SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_)) => {
Ok(Self { peer, local })
}
_ => bail!("transparent socket had mismatched source/destination address families"),
}
}
}
#[derive(Clone, Copy, Debug)]
struct FlowEntry {
original_dst: SocketAddr,
expires_at: Instant,
}
fn remember_flow(
cache: &mut HashMap<FlowKey, FlowEntry>,
key: FlowKey,
original_dst: SocketAddr,
now: Instant,
) {
cache.retain(|_, entry| entry.expires_at > now);
if cache.len() >= FLOW_CACHE_LIMIT {
if let Some(oldest) = cache
.iter()
.min_by_key(|(_, entry)| entry.expires_at)
.map(|(flow_key, _)| *flow_key)
{
cache.remove(&oldest);
}
}
cache.insert(
key,
FlowEntry {
original_dst,
expires_at: now + FLOW_CACHE_TTL,
},
);
}
fn lookup_pf_original_destination(
fd: RawFd,
peer: SocketAddr,
local: SocketAddr,
) -> io::Result<SocketAddr> {
let mut request = PfiocNatlook::for_flow(peer, local)
.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
let ioctl_result = unsafe { pf_natlook(fd, &mut request) };
if let Err(errno) = ioctl_result {
return Err(io::Error::from(errno));
}
request
.original_destination()
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
}
struct PacketObserver {
file: File,
buffer: Vec<u8>,
}
impl PacketObserver {
fn new(listener_addr: SocketAddr) -> io::Result<Self> {
if listener_addr.ip().is_unspecified() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"packet observer requires an explicit listener address",
));
}
let file = open_bpf_device()?;
bind_bpf_to_interface(file.as_raw_fd(), PFLOG_DEVICE)?;
set_bpf_flag(file.as_raw_fd(), libc::BIOCIMMEDIATE, 1)?;
set_bpf_flag(file.as_raw_fd(), libc::BIOCSSEESENT, 1)?;
set_nonblocking(file.as_raw_fd())?;
let mut buffer_len: libc::c_uint = 0;
ioctl_value(file.as_raw_fd(), libc::BIOCGBLEN, &mut buffer_len)?;
Ok(Self {
file,
buffer: vec![0; buffer_len as usize],
})
}
fn drain(
&mut self,
listener_addr: SocketAddr,
cache: &mut HashMap<FlowKey, FlowEntry>,
) -> io::Result<()> {
loop {
match self.file.read(&mut self.buffer) {
Ok(0) => break,
Ok(read) => self.consume(&self.buffer[..read], listener_addr, cache),
Err(err) if err.kind() == io::ErrorKind::WouldBlock => break,
Err(err) => return Err(err),
}
}
Ok(())
}
fn consume(
&self,
buffer: &[u8],
listener_addr: SocketAddr,
cache: &mut HashMap<FlowKey, FlowEntry>,
) {
let mut offset = 0usize;
let now = Instant::now();
while offset + std::mem::size_of::<libc::bpf_hdr>() <= buffer.len() {
let header = unsafe {
std::ptr::read_unaligned(buffer[offset..].as_ptr() as *const libc::bpf_hdr)
};
let header_len = header.bh_hdrlen as usize;
let captured_len = header.bh_caplen as usize;
let packet_start = offset + header_len;
let packet_end = packet_start + captured_len;
let next_record = offset + bpf_wordalign(header_len + captured_len);
if packet_end > buffer.len() || next_record > buffer.len() {
break;
}
if let Some((peer, original_dst)) =
parse_logged_syn(&buffer[packet_start..packet_end], listener_addr)
{
remember_flow(
cache,
FlowKey {
peer,
local: listener_addr,
},
original_dst,
now,
);
}
offset = next_record;
}
}
}
fn open_bpf_device() -> io::Result<File> {
for index in 0..=255 {
match File::options()
.read(true)
.open(format!("/dev/bpf{index}"))
{
Ok(file) => return Ok(file),
Err(err) if err.raw_os_error() == Some(libc::EBUSY) => continue,
Err(err) => return Err(err),
}
}
Err(io::Error::new(
io::ErrorKind::NotFound,
"no free /dev/bpf devices were available",
))
}
fn bind_bpf_to_interface(fd: RawFd, ifname: &str) -> io::Result<()> {
let mut ifreq = unsafe { zeroed::<libc::ifreq>() };
let bytes = ifname.as_bytes();
let max = std::cmp::min(bytes.len(), libc::IFNAMSIZ.saturating_sub(1));
for (index, byte) in bytes.iter().take(max).enumerate() {
ifreq.ifr_name[index] = *byte as libc::c_char;
}
ioctl_value(fd, libc::BIOCSETIF, &mut ifreq)
}
fn set_bpf_flag(fd: RawFd, request: libc::c_ulong, value: libc::c_uint) -> io::Result<()> {
let mut flag = value;
ioctl_value(fd, request, &mut flag)
}
fn set_nonblocking(fd: RawFd) -> io::Result<()> {
let current = unsafe { libc::fcntl(fd, libc::F_GETFL) };
if current < 0 {
return Err(io::Error::last_os_error());
}
if unsafe { libc::fcntl(fd, libc::F_SETFL, current | libc::O_NONBLOCK) } != 0 {
return Err(io::Error::last_os_error());
}
Ok(())
}
fn ioctl_value<T>(fd: RawFd, request: libc::c_ulong, value: &mut T) -> io::Result<()> {
if unsafe { libc::ioctl(fd, request, value) } != 0 {
return Err(io::Error::last_os_error());
}
Ok(())
}
fn parse_logged_syn(
record: &[u8],
listener_addr: SocketAddr,
) -> Option<(SocketAddr, SocketAddr)> {
let header = read_pflog_header(record)?;
if header.dir != PF_OUT {
return None;
}
let packet = record.get(header.length as usize..)?;
match header.af as i32 {
libc::AF_INET => parse_ipv4_syn(packet, listener_addr),
libc::AF_INET6 => parse_ipv6_syn(packet, listener_addr),
_ => None,
}
}
fn parse_ipv4_syn(packet: &[u8], listener_addr: SocketAddr) -> Option<(SocketAddr, SocketAddr)> {
if !matches!(listener_addr, SocketAddr::V4(_)) || packet.len() < 20 || packet[0] >> 4 != 4 {
return None;
}
let header_len = usize::from(packet[0] & 0x0f) * 4;
if header_len < 20 || packet.len() < header_len + 20 || packet[9] != libc::IPPROTO_TCP as u8 {
return None;
}
let tcp = &packet[header_len..];
let flags = tcp[13];
if flags & 0x02 == 0 || flags & 0x10 != 0 {
return None;
}
let source_ip = Ipv4Addr::new(packet[12], packet[13], packet[14], packet[15]);
let dest_ip = Ipv4Addr::new(packet[16], packet[17], packet[18], packet[19]);
let source_port = u16::from_be_bytes([tcp[0], tcp[1]]);
let dest_port = u16::from_be_bytes([tcp[2], tcp[3]]);
Some((
SocketAddr::V4(SocketAddrV4::new(source_ip, source_port)),
SocketAddr::V4(SocketAddrV4::new(dest_ip, dest_port)),
))
}
fn parse_ipv6_syn(packet: &[u8], listener_addr: SocketAddr) -> Option<(SocketAddr, SocketAddr)> {
if !matches!(listener_addr, SocketAddr::V6(_)) || packet.len() < 40 || packet[0] >> 4 != 6 {
return None;
}
if packet[6] != libc::IPPROTO_TCP as u8 || packet.len() < 60 {
return None;
}
let tcp = &packet[40..];
let flags = tcp[13];
if flags & 0x02 == 0 || flags & 0x10 != 0 {
return None;
}
let source_ip = Ipv6Addr::from(<[u8; 16]>::try_from(&packet[8..24]).ok()?);
let dest_ip = Ipv6Addr::from(<[u8; 16]>::try_from(&packet[24..40]).ok()?);
let source_port = u16::from_be_bytes([tcp[0], tcp[1]]);
let dest_port = u16::from_be_bytes([tcp[2], tcp[3]]);
Some((
SocketAddr::V6(SocketAddrV6::new(source_ip, source_port, 0, 0)),
SocketAddr::V6(SocketAddrV6::new(dest_ip, dest_port, 0, 0)),
))
}
fn read_pflog_header(record: &[u8]) -> Option<PflogHdr> {
if record.len() < std::mem::size_of::<PflogHdr>() {
return None;
}
let header =
unsafe { std::ptr::read_unaligned(record.as_ptr() as *const PflogHdr) };
if header.length as usize > record.len() || (header.length as usize) < PFLOG_REAL_HDRLEN {
return None;
}
Some(header)
}
const fn bpf_wordalign(len: usize) -> usize {
let alignment = std::mem::size_of::<i32>();
(len + (alignment - 1)) & !(alignment - 1)
}
#[repr(C)]
#[derive(Clone, Copy)]
struct PfiocNatlook {
saddr: PfAddr,
daddr: PfAddr,
rsaddr: PfAddr,
rdaddr: PfAddr,
sxport: PfStateXport,
dxport: PfStateXport,
rsxport: PfStateXport,
rdxport: PfStateXport,
af: libc::sa_family_t,
proto: u8,
proto_variant: u8,
direction: u8,
}
impl PfiocNatlook {
fn for_flow(peer: SocketAddr, local: SocketAddr) -> Result<Self> {
let (saddr, sxport, source_af) = pf_endpoint(peer);
let (daddr, dxport, destination_af) = pf_endpoint(local);
if source_af != destination_af {
bail!("transparent flow key changed address family across redirect");
}
Ok(Self {
saddr,
daddr,
rsaddr: PfAddr::default(),
rdaddr: PfAddr::default(),
sxport,
dxport,
rsxport: PfStateXport::default(),
rdxport: PfStateXport::default(),
af: source_af,
proto: libc::IPPROTO_TCP as u8,
proto_variant: 0,
direction: PF_OUT,
})
}
fn original_destination(&self) -> Result<SocketAddr> {
socket_addr_from_pf(self.af, self.rdaddr, self.rdxport)
}
}
fn pf_endpoint(addr: SocketAddr) -> (PfAddr, PfStateXport, libc::sa_family_t) {
let port = PfStateXport {
port: u16::to_be(addr.port()),
};
match addr {
SocketAddr::V4(addr) => (
PfAddr::from_ipv4(*addr.ip()),
port,
libc::AF_INET as libc::sa_family_t,
),
SocketAddr::V6(addr) => (
PfAddr::from_ipv6(*addr.ip()),
port,
libc::AF_INET6 as libc::sa_family_t,
),
}
}
fn socket_addr_from_pf(
af: libc::sa_family_t,
addr: PfAddr,
port: PfStateXport,
) -> Result<SocketAddr> {
match af as i32 {
libc::AF_INET => Ok(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::from(addr.v4_octets()),
u16::from_be(unsafe { port.port }),
))),
libc::AF_INET6 => Ok(SocketAddr::V6(SocketAddrV6::new(
Ipv6Addr::from(addr.v6_octets()),
u16::from_be(unsafe { port.port }),
0,
0,
))),
family => bail!("unsupported PF address family {family}"),
}
}
#[repr(C)]
#[derive(Clone, Copy)]
union PfAddrRepr {
v4addr: libc::in_addr,
v6addr: libc::in6_addr,
addr8: [u8; 16],
addr16: [u16; 8],
addr32: [u32; 4],
}
#[repr(C)]
#[derive(Clone, Copy)]
struct PfAddr {
pfa: PfAddrRepr,
}
impl Default for PfAddr {
fn default() -> Self {
Self {
pfa: PfAddrRepr { addr32: [0; 4] },
}
}
}
impl PfAddr {
fn from_ipv4(ip: Ipv4Addr) -> Self {
let mut bytes = [0u8; 16];
bytes[..4].copy_from_slice(&ip.octets());
Self {
pfa: PfAddrRepr { addr8: bytes },
}
}
fn from_ipv6(ip: Ipv6Addr) -> Self {
Self {
pfa: PfAddrRepr {
addr8: ip.octets(),
},
}
}
fn v4_octets(self) -> [u8; 4] {
let bytes = unsafe { self.pfa.addr8 };
[bytes[0], bytes[1], bytes[2], bytes[3]]
}
fn v6_octets(self) -> [u8; 16] {
unsafe { self.pfa.addr8 }
}
}
#[repr(C)]
#[derive(Clone, Copy)]
union PfStateXport {
port: u16,
call_id: u16,
spi: u32,
}
#[repr(C)]
#[derive(Clone, Copy)]
struct PflogHdr {
length: u8,
af: libc::sa_family_t,
action: u8,
reason: u8,
ifname: [libc::c_char; libc::IFNAMSIZ],
ruleset: [libc::c_char; PFLOG_RULESET_NAME_SIZE],
rulenr: u32,
subrulenr: u32,
uid: libc::uid_t,
pid: libc::pid_t,
rule_uid: libc::uid_t,
rule_pid: libc::pid_t,
dir: u8,
pad: [u8; 3],
}
const PFLOG_REAL_HDRLEN: usize = std::mem::offset_of!(PflogHdr, pad);
impl Default for PfStateXport {
fn default() -> Self {
unsafe { zeroed() }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn builds_natlook_request_from_redirected_flow() {
let request = PfiocNatlook::for_flow(
"192.0.2.10:41000".parse().unwrap(),
"127.0.0.1:9040".parse().unwrap(),
)
.unwrap();
assert_eq!(request.af as i32, libc::AF_INET);
assert_eq!(request.proto, libc::IPPROTO_TCP as u8);
assert_eq!(request.direction, PF_OUT);
assert_eq!(request.saddr.v4_octets(), [192, 0, 2, 10]);
assert_eq!(request.daddr.v4_octets(), [127, 0, 0, 1]);
assert_eq!(u16::from_be(unsafe { request.sxport.port }), 41000);
assert_eq!(u16::from_be(unsafe { request.dxport.port }), 9040);
}
#[test]
fn decodes_original_ipv6_destination() {
let mut request =
PfiocNatlook::for_flow("[::1]:41000".parse().unwrap(), "[::1]:9040".parse().unwrap())
.unwrap();
request.rdaddr = PfAddr::from_ipv6("2001:db8::42".parse().unwrap());
request.rdxport = PfStateXport {
port: u16::to_be(443),
};
assert_eq!(
request.original_destination().unwrap(),
"[2001:db8::42]:443".parse::<SocketAddr>().unwrap()
);
}
#[test]
fn parses_logged_ipv4_syn() {
let mut record = Vec::new();
record.extend_from_slice(&[
PFLOG_REAL_HDRLEN as u8,
libc::AF_INET as u8,
0,
0,
]);
record.extend_from_slice(&[0; libc::IFNAMSIZ]);
record.extend_from_slice(&[0; PFLOG_RULESET_NAME_SIZE]);
record.extend_from_slice(&0u32.to_ne_bytes());
record.extend_from_slice(&0u32.to_ne_bytes());
record.extend_from_slice(&0u32.to_ne_bytes());
record.extend_from_slice(&0u32.to_ne_bytes());
record.extend_from_slice(&0u32.to_ne_bytes());
record.extend_from_slice(&0u32.to_ne_bytes());
record.push(PF_OUT);
record.extend_from_slice(&[
0x45, 0, 0, 40, 0, 0, 0, 0, 64, libc::IPPROTO_TCP as u8, 0, 0, 192, 0, 2, 10,
198, 51, 100, 42,
]);
record.extend_from_slice(&[
0x9c, 0x28, 0x01, 0xbb, 0, 0, 0, 0, 0, 0, 0, 0, 0x50, 0x02, 0x20, 0, 0, 0, 0,
0,
]);
assert_eq!(
parse_logged_syn(&record, "127.0.0.1:9040".parse().unwrap()),
Some((
"192.0.2.10:39976".parse().unwrap(),
"198.51.100.42:443".parse().unwrap(),
))
);
}
#[test]
fn parses_logged_ipv6_syn() {
let mut record = Vec::new();
record.extend_from_slice(&[
PFLOG_REAL_HDRLEN as u8,
libc::AF_INET6 as u8,
0,
0,
]);
record.extend_from_slice(&[0; libc::IFNAMSIZ]);
record.extend_from_slice(&[0; PFLOG_RULESET_NAME_SIZE]);
record.extend_from_slice(&0u32.to_ne_bytes());
record.extend_from_slice(&0u32.to_ne_bytes());
record.extend_from_slice(&0u32.to_ne_bytes());
record.extend_from_slice(&0u32.to_ne_bytes());
record.extend_from_slice(&0u32.to_ne_bytes());
record.extend_from_slice(&0u32.to_ne_bytes());
record.push(PF_OUT);
let source_ip = Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 0x10).octets();
let dest_ip = Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 0x42).octets();
record.extend_from_slice(&[
0x60, 0, 0, 0, 0, 20, libc::IPPROTO_TCP as u8, 64,
]);
record.extend_from_slice(&source_ip);
record.extend_from_slice(&dest_ip);
record.extend_from_slice(&[
0x9c, 0x28, 0x01, 0xbb, 0, 0, 0, 0, 0, 0, 0, 0, 0x50, 0x02, 0x20, 0, 0, 0, 0,
0,
]);
assert_eq!(
parse_logged_syn(&record, "[::1]:9040".parse().unwrap()),
Some((
"[2001:db8::10]:39976".parse().unwrap(),
"[2001:db8::42]:443".parse().unwrap(),
))
);
}
}
}
#[cfg(target_vendor = "apple")]
use apple_pf::AppleFlowTracker;
#[cfg(target_os = "linux")]
fn socket_addr_from_storage(addr: &libc::sockaddr_storage, len: usize) -> Result<SocketAddr> {
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6};
if len < std::mem::size_of::<libc::sa_family_t>() {
anyhow::bail!("socket address buffer was too short");
}
match addr.ss_family as i32 {
libc::AF_INET => {
let addr_in = unsafe { *(addr as *const _ as *const libc::sockaddr_in) };
let ip = Ipv4Addr::from(u32::from_be(addr_in.sin_addr.s_addr));
let port = u16::from_be(addr_in.sin_port);
Ok(SocketAddr::V4(SocketAddrV4::new(ip, port)))
}
libc::AF_INET6 => {
let addr_in = unsafe { *(addr as *const _ as *const libc::sockaddr_in6) };
let ip = Ipv6Addr::from(addr_in.sin6_addr.s6_addr);
let port = u16::from_be(addr_in.sin6_port);
Ok(SocketAddr::V6(SocketAddrV6::new(
ip,
port,
addr_in.sin6_flowinfo,
addr_in.sin6_scope_id,
)))
}
family => anyhow::bail!("unsupported socket address family {family}"),
}
}
#[cfg(all(test, target_os = "linux"))]
mod tests {
use super::*;
use std::{
mem::size_of,
net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6},
};
#[test]
fn parses_ipv4_socket_addr() {
let mut storage = unsafe { std::mem::zeroed::<libc::sockaddr_storage>() };
let addr_in = unsafe { &mut *(&mut storage as *mut _ as *mut libc::sockaddr_in) };
addr_in.sin_family = libc::AF_INET as libc::sa_family_t;
addr_in.sin_port = u16::to_be(9040);
addr_in.sin_addr = libc::in_addr {
s_addr: u32::to_be(u32::from(Ipv4Addr::new(127, 0, 0, 1))),
};
let parsed = socket_addr_from_storage(&storage, size_of::<libc::sockaddr_in>()).unwrap();
assert_eq!(
parsed,
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 9040))
);
}
#[test]
fn parses_ipv6_socket_addr() {
let mut storage = unsafe { std::mem::zeroed::<libc::sockaddr_storage>() };
let addr_in = unsafe { &mut *(&mut storage as *mut _ as *mut libc::sockaddr_in6) };
addr_in.sin6_family = libc::AF_INET6 as libc::sa_family_t;
addr_in.sin6_port = u16::to_be(9150);
addr_in.sin6_addr = libc::in6_addr {
s6_addr: Ipv6Addr::LOCALHOST.octets(),
};
let parsed = socket_addr_from_storage(&storage, size_of::<libc::sockaddr_in6>()).unwrap();
assert_eq!(
parsed,
SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 9150, 0, 0))
);
}
}

41
docs/TOR.md Normal file
View file

@ -0,0 +1,41 @@
# Tor Transport
Burrow now has a `Tor` network type that boots an in-process [Arti](https://gitlab.torproject.org/tpo/core/arti) client and exposes a transparent TCP listener for outbound stream forwarding.
The first implementation is intentionally narrow:
- `tcp_stack.kind = "system"` is the only supported TCP stack backend.
- transparent destination recovery uses Linux `SO_ORIGINAL_DST` and macOS PF lookups.
- on macOS, Burrow first tries PF `DIOCNATLOOK`, then falls back to a `pflog0` observer backed by an in-memory flow cache keyed by the redirected socket tuple.
- Burrow does not yet install firewall redirect rules for you.
- traffic reaches Arti only if the host already redirects outbound TCP flows to Burrow's local listener.
- the macOS observer fallback only works when the redirect rule is logged to `pflog0` and Burrow listens on an explicit local address such as `127.0.0.1:9040`.
- destination handling is IP-and-port based, so this does not yet capture DNS or `.onion` names before local resolution.
- Burrow still does not install loop-avoidance rules for Arti's own relay connections, so redirect rules must exempt those flows externally for now.
## Payload format
`Network.payload` can be JSON or TOML.
```json
{
"address": ["100.64.0.2/32"],
"tun_name": "burrow-tor",
"mtu": 1400,
"arti": {
"state_dir": "/var/lib/burrow/arti/state",
"cache_dir": "/var/cache/burrow/arti"
},
"tcp_stack": {
"kind": "system",
"listen": "127.0.0.1:9040"
}
}
```
## Next steps
- teach Burrow to program and tear down redirect rules safely.
- add loop-avoidance for Arti's own relay connections before enabling automatic redirect.
- add DNS capture or hostname-aware forwarding for `.onion` and other unresolved destinations.
- add alternate pure-Rust TCP stack backends behind the same `tcp_stack` enum.

View file

@ -46,6 +46,7 @@ message Network {
enum NetworkType { enum NetworkType {
WireGuard = 0; WireGuard = 0;
HackClub = 1; HackClub = 1;
Tor = 2;
} }
message NetworkListResponse { message NetworkListResponse {

View file

@ -8,7 +8,7 @@ libc = "0.2"
fehler = "1.0" fehler = "1.0"
nix = { version = "0.26", features = ["ioctl"] } nix = { version = "0.26", features = ["ioctl"] }
socket2 = "0.5" socket2 = "0.5"
tokio = { version = "1.37", default-features = false, optional = true } tokio = { version = "1.50.0", default-features = false, optional = true }
byteorder = "1.4" byteorder = "1.4"
tracing = "0.1" tracing = "0.1"
log = "0.4" log = "0.4"
@ -19,7 +19,7 @@ futures = { version = "0.3.28", optional = true }
[features] [features]
serde = ["dep:serde", "dep:schemars"] serde = ["dep:serde", "dep:schemars"]
tokio = ["tokio/net", "dep:tokio", "dep:futures"] tokio = ["tokio/macros", "tokio/net", "tokio/rt", "dep:tokio", "dep:futures"]
[target.'cfg(windows)'.dependencies] [target.'cfg(windows)'.dependencies]
lazy_static = "1.4" lazy_static = "1.4"
@ -34,7 +34,7 @@ windows = { version = "0.48", features = [
[target.'cfg(windows)'.build-dependencies] [target.'cfg(windows)'.build-dependencies]
anyhow = "1.0" anyhow = "1.0"
bindgen = "0.65" bindgen = "0.65"
reqwest = { version = "0.11" } reqwest = { version = "0.13.2" }
ssri = { version = "9.0", default-features = false } ssri = { version = "9.0", default-features = false }
tokio = { version = "1.28", features = ["rt", "macros"] } tokio = { version = "1.50.0", features = ["rt", "macros"] }
zip = { version = "0.6", features = ["deflate"] } zip = { version = "0.6", features = ["deflate"] }