Add Linux tor-exec namespace runtime

This commit is contained in:
Conrad Kramer 2026-03-30 20:01:55 -07:00
parent 7ade60646b
commit cdf8d22055
12 changed files with 6514 additions and 158 deletions

5521
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -33,6 +33,7 @@ serde_json = "1.0"
blake2 = "0.10"
chacha20poly1305 = "0.10"
rand = "0.8"
bytes = "1"
rand_core = "0.6"
aead = "0.5"
x25519-dalek = { version = "2.0", features = [
@ -46,40 +47,50 @@ base64 = "0.21"
fehler = "1.0"
ip_network_table = "0.2"
ip_network = "0.4"
ipnetwork = "0.21"
async-channel = "2.1"
schemars = "0.8"
futures = "0.3.28"
once_cell = "1.19"
arti-client = "0.40.0"
hickory-proto = "0.25.2"
tokio-util = { version = "0.7.18", features = ["compat"] }
tor-rtcompat = "0.40.0"
console-subscriber = { version = "0.2.0", optional = true }
console = "0.15.8"
axum = "0.7.4"
argon2 = "0.5"
reqwest = { version = "0.12", default-features = false, features = [
"json",
"rustls-tls",
] }
rusqlite = { version = "0.31.0", features = ["blob"] }
rusqlite = { version = "0.38.0", features = ["blob"] }
iroh = "0.94.0"
dotenv = "0.15.0"
tonic = "0.12.0"
prost = "0.13.1"
prost-types = "0.13.1"
tokio-stream = "0.1"
async-stream = "0.2"
tower = "0.4.13"
tower = { version = "0.4.13", features = ["util"] }
hyper-util = "0.1.6"
toml = "0.8.15"
rust-ini = "0.21.0"
subtle = "2.6"
[target.'cfg(target_os = "linux")'.dependencies]
caps = "0.5"
libc = "0.2"
libsystemd = "0.7"
tracing-journald = "0.3"
[target.'cfg(target_vendor = "apple")'.dependencies]
nix = { version = "0.27" }
rusqlite = { version = "0.31.0", features = ["bundled", "blob"] }
rusqlite = { version = "0.38.0", features = ["bundled", "blob"] }
[dev-dependencies]
insta = { version = "1.32", features = ["yaml"] }
tempfile = "3.13"
[package.metadata.generate-rpm]
assets = [

View file

@ -54,7 +54,7 @@ END;
pub fn initialize_tables(conn: &Connection) -> Result<()> {
conn.execute(CREATE_WG_INTERFACE_TABLE, [])?;
conn.execute(CREATE_WG_PEER_TABLE, [])?;
conn.execute(CREATE_NETWORK_TABLE, [])?;
conn.execute_batch(CREATE_NETWORK_TABLE)?;
Ok(())
}

View file

@ -1,22 +1,25 @@
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
pub mod control;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
pub mod wireguard;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
mod auth;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
mod daemon;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
pub mod database;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
mod auth;
pub mod mesh;
#[cfg(target_os = "linux")]
pub mod tor;
pub(crate) mod tracing;
#[cfg(target_vendor = "apple")]
pub use daemon::apple::spawn_in_process;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
pub use daemon::{
rpc::DaemonResponse,
rpc::ServerInfo,
DaemonClient,
DaemonCommand,
DaemonResponseData,
rpc::DaemonResponse, rpc::ServerInfo, DaemonClient, DaemonCommand, DaemonResponseData,
DaemonStartOptions,
};

View file

@ -1,6 +1,8 @@
use anyhow::Result;
use clap::{Args, Parser, Subcommand};
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
mod control;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
mod daemon;
pub(crate) mod tracing;
@ -10,6 +12,11 @@ mod wireguard;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
mod auth;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
mod mesh;
#[cfg(target_os = "linux")]
mod tor;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
use daemon::{DaemonClient, DaemonCommand};
@ -66,6 +73,9 @@ enum Commands {
NetworkReorder(NetworkReorderArgs),
/// Delete Network
NetworkDelete(NetworkDeleteArgs),
#[cfg(target_os = "linux")]
/// Run a command in a Linux user namespace with Tor-backed networking
TorExec(TorExecArgs),
}
#[derive(Args)]
@ -98,6 +108,14 @@ struct NetworkDeleteArgs {
id: i32,
}
#[cfg(target_os = "linux")]
#[derive(Args)]
struct TorExecArgs {
payload_path: String,
#[arg(required = true, num_args = 1.., trailing_var_arg = true)]
command: Vec<String>,
}
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
async fn try_start() -> Result<()> {
let mut client = BurrowClient::from_uds().await?;
@ -209,6 +227,17 @@ async fn try_network_delete(id: i32) -> Result<()> {
Ok(())
}
#[cfg(target_os = "linux")]
async fn try_tor_exec(payload_path: &str, command: Vec<String>) -> Result<()> {
let payload = tokio::fs::read(payload_path).await?;
let config = tor::Config::from_payload(&payload)?;
let exit_code = tor::run_exec(config, command).await?;
if exit_code != 0 {
std::process::exit(exit_code);
}
Ok(())
}
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
fn handle_unexpected(res: Result<DaemonResponseData, String>) {
match res {
@ -285,6 +314,8 @@ async fn main() -> Result<()> {
Commands::NetworkList => try_network_list().await?,
Commands::NetworkReorder(args) => try_network_reorder(args.id, args.index).await?,
Commands::NetworkDelete(args) => try_network_delete(args.id).await?,
#[cfg(target_os = "linux")]
Commands::TorExec(args) => try_tor_exec(&args.payload_path, args.command.clone()).await?,
}
Ok(())

187
burrow/src/tor/config.rs Normal file
View file

@ -0,0 +1,187 @@
use std::{net::SocketAddr, path::PathBuf, str};
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Config {
#[serde(default)]
pub account: Option<String>,
#[serde(default)]
pub identity: Option<String>,
#[serde(default)]
pub address: Vec<String>,
#[serde(default)]
pub dns: Vec<String>,
#[serde(default)]
pub mtu: Option<u32>,
#[serde(default)]
pub tun_name: Option<String>,
#[serde(default)]
pub arti: ArtiConfig,
#[serde(default)]
pub tcp_stack: TcpStackConfig,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ArtiConfig {
pub state_dir: String,
pub cache_dir: String,
}
impl Default for ArtiConfig {
fn default() -> Self {
Self {
state_dir: "/var/lib/burrow/arti/state".to_string(),
cache_dir: "/var/cache/burrow/arti".to_string(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum TcpStackConfig {
System(SystemTcpStackConfig),
}
impl Default for TcpStackConfig {
fn default() -> Self {
Self::System(SystemTcpStackConfig::default())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SystemTcpStackConfig {
#[serde(default = "default_system_listen")]
pub listen: String,
}
impl Default for SystemTcpStackConfig {
fn default() -> Self {
Self {
listen: default_system_listen(),
}
}
}
impl Config {
pub fn from_payload(payload: &[u8]) -> Result<Self> {
if let Ok(config) = serde_json::from_slice(payload) {
return Ok(config);
}
let payload = str::from_utf8(payload).context("tor payload must be valid UTF-8")?;
toml::from_str(payload).context("failed to parse tor payload as JSON or TOML")
}
pub fn listen_addr(&self) -> Result<SocketAddr> {
match &self.tcp_stack {
TcpStackConfig::System(config) => config
.listen
.parse()
.with_context(|| format!("invalid system tcp listen address '{}'", config.listen)),
}
}
pub fn authority(&self) -> String {
"arti://local".to_owned()
}
pub fn account_name(&self) -> String {
self.account
.clone()
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| "default".to_owned())
}
pub fn identity_name(&self, network_id: i32) -> String {
self.identity
.clone()
.filter(|value| !value.trim().is_empty())
.or_else(|| self.tun_name.clone())
.unwrap_or_else(|| format!("tor-{network_id}"))
}
pub fn runtime_dirs(&self, network_id: i32) -> (String, String) {
let authority = sanitize_path_component(&self.authority());
let account = sanitize_path_component(&self.account_name());
let identity = sanitize_path_component(&self.identity_name(network_id));
(
append_runtime_path(&self.arti.state_dir, &[&authority, &account, &identity]),
append_runtime_path(&self.arti.cache_dir, &[&authority, &account, &identity]),
)
}
}
fn default_system_listen() -> String {
"127.0.0.1:9040".to_string()
}
fn append_runtime_path(base: &str, parts: &[&str]) -> String {
let mut path = PathBuf::from(base);
for part in parts {
path.push(part);
}
path.to_string_lossy().to_string()
}
fn sanitize_path_component(value: &str) -> String {
let sanitized: String = value
.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
ch
} else {
'_'
}
})
.collect();
if sanitized.is_empty() {
"default".to_owned()
} else {
sanitized
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_json_payload() {
let payload = br#"{
"address":["100.64.0.2/32"],
"mtu":1400,
"arti":{"state_dir":"/tmp/state","cache_dir":"/tmp/cache"},
"tcp_stack":{"kind":"system","listen":"127.0.0.1:9150"}
}"#;
let config = Config::from_payload(payload).unwrap();
assert_eq!(config.address, vec!["100.64.0.2/32"]);
assert_eq!(config.listen_addr().unwrap().to_string(), "127.0.0.1:9150");
assert!(config.runtime_dirs(7).0.contains("arti___local"));
}
#[test]
fn parses_toml_payload() {
let payload = r#"
address = ["100.64.0.3/32"]
mtu = 1280
tun_name = "burrow-tor"
[arti]
state_dir = "/tmp/state"
cache_dir = "/tmp/cache"
[tcp_stack]
kind = "system"
listen = "127.0.0.1:9140"
"#;
let config = Config::from_payload(payload.as_bytes()).unwrap();
assert_eq!(config.tun_name.as_deref(), Some("burrow-tor"));
assert_eq!(config.listen_addr().unwrap().to_string(), "127.0.0.1:9140");
assert_eq!(config.identity_name(11), "burrow-tor");
}
}

178
burrow/src/tor/dns.rs Normal file
View file

@ -0,0 +1,178 @@
use std::{
net::{IpAddr, SocketAddr},
sync::Arc,
};
use anyhow::{Context, Result};
use arti_client::TorClient;
use hickory_proto::{
op::{Message, MessageType, ResponseCode},
rr::{rdata::A, rdata::AAAA, RData, Record, RecordType},
};
use tokio::{net::UdpSocket, sync::watch, task::JoinError};
use tor_rtcompat::PreferredRuntime;
use tracing::{debug, warn};
const DNS_TTL_SECS: u32 = 60;
#[derive(Debug)]
pub struct TorDnsHandle {
shutdown: watch::Sender<bool>,
task: tokio::task::JoinHandle<()>,
}
impl TorDnsHandle {
pub async fn shutdown(self) -> Result<()> {
let _ = self.shutdown.send(true);
match self.task.await {
Ok(()) => Ok(()),
Err(err) if err.is_cancelled() => Ok(()),
Err(err) => Err(join_error(err)),
}
}
}
pub async fn spawn(
bind_addr: SocketAddr,
tor_client: Arc<TorClient<PreferredRuntime>>,
) -> Result<TorDnsHandle> {
let socket = UdpSocket::bind(bind_addr)
.await
.with_context(|| format!("failed to bind tor dns proxy on {bind_addr}"))?;
let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
let task = tokio::spawn(async move {
let mut buffer = [0u8; 4096];
loop {
tokio::select! {
changed = shutdown_rx.changed() => {
match changed {
Ok(()) if *shutdown_rx.borrow() => break,
Ok(()) => continue,
Err(_) => break,
}
}
received = socket.recv_from(&mut buffer) => {
let (len, peer_addr) = match received {
Ok(value) => value,
Err(err) => {
warn!(?err, "tor dns proxy recv failed");
continue;
}
};
let response = match build_response(&buffer[..len], tor_client.as_ref()).await {
Ok(message) => message,
Err(err) => {
debug!(?err, "tor dns proxy failed to answer query");
continue;
}
};
if let Err(err) = socket.send_to(&response, peer_addr).await {
warn!(?err, "tor dns proxy send failed");
}
}
}
}
});
Ok(TorDnsHandle {
shutdown: shutdown_tx,
task,
})
}
async fn build_response(
packet: &[u8],
tor_client: &TorClient<PreferredRuntime>,
) -> Result<Vec<u8>> {
let request = Message::from_vec(packet).context("failed to parse dns packet")?;
let mut response = Message::new();
response
.set_id(request.id())
.set_op_code(request.op_code())
.set_message_type(MessageType::Response)
.set_recursion_desired(request.recursion_desired())
.set_recursion_available(true)
.set_response_code(ResponseCode::NoError);
for query in request.queries().iter().cloned() {
response.add_query(query.clone());
match query.query_type() {
RecordType::A | RecordType::AAAA => {
let hostname = query.name().to_utf8();
let hostname = hostname.trim_end_matches('.');
match tor_client.resolve(hostname).await {
Ok(addrs) => {
for addr in addrs {
if let Some(answer) =
record_for_address(query.name().clone(), query.query_type(), addr)
{
response.add_answer(answer);
}
}
}
Err(err) => {
debug!(hostname, ?err, "tor dns lookup failed");
response.set_response_code(ResponseCode::ServFail);
}
}
}
_ => {
response.set_response_code(ResponseCode::NotImp);
}
}
}
response.to_vec().context("failed to encode dns response")
}
fn record_for_address(
name: hickory_proto::rr::Name,
record_type: RecordType,
addr: IpAddr,
) -> Option<Record> {
match (record_type, addr) {
(RecordType::A, IpAddr::V4(ip)) => {
Some(Record::from_rdata(name, DNS_TTL_SECS, RData::A(A::from(ip))))
}
(RecordType::AAAA, IpAddr::V6(ip)) => Some(Record::from_rdata(
name,
DNS_TTL_SECS,
RData::AAAA(AAAA::from(ip)),
)),
_ => None,
}
}
fn join_error(err: JoinError) -> anyhow::Error {
anyhow::anyhow!("tor dns task failed: {err}")
}
#[cfg(test)]
mod tests {
use super::*;
use hickory_proto::rr::Name;
use std::net::{Ipv4Addr, Ipv6Addr};
#[test]
fn builds_a_record_for_ipv4_answer() {
let record = record_for_address(
Name::from_ascii("example.com.").unwrap(),
RecordType::A,
IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)),
)
.unwrap();
assert_eq!(record.record_type(), RecordType::A);
}
#[test]
fn skips_mismatched_record_type() {
let record = record_for_address(
Name::from_ascii("example.com.").unwrap(),
RecordType::A,
IpAddr::V6(Ipv6Addr::LOCALHOST),
);
assert!(record.is_none());
}
}

439
burrow/src/tor/exec.rs Normal file
View file

@ -0,0 +1,439 @@
use std::{
ffi::{OsStr, OsString},
fs,
net::{IpAddr, Ipv4Addr, SocketAddr},
os::unix::process::ExitStatusExt,
path::PathBuf,
process::{Command, ExitStatus, Stdio},
sync::Arc,
time::Duration,
};
use anyhow::{bail, Context, Result};
use tokio::process::Command as TokioCommand;
use tor_rtcompat::PreferredRuntime;
use tracing::{debug, info};
use super::{
bootstrap_client,
dns::{spawn as spawn_dns, TorDnsHandle},
runtime::{spawn_with_client, TorHandle},
Config, SystemTcpStackConfig, TcpStackConfig,
};
const CHILD_PREFIX_LEN: u8 = 30;
const CHILD_DNS_PORT: u16 = 53;
const LISTENER_READY_TIMEOUT: Duration = Duration::from_secs(10);
const LISTENER_READY_POLL: Duration = Duration::from_millis(100);
pub async fn run_exec(mut config: Config, command: Vec<String>) -> Result<i32> {
if command.is_empty() {
bail!("tor-exec requires a command to run");
}
ensure_root()?;
ensure_host_tool("ip")?;
ensure_host_tool("iptables")?;
ensure_host_tool("unshare")?;
let requested_listener = config.listen_addr()?;
if requested_listener.port() == 0 {
bail!("tor-exec requires a fixed listener port");
}
let plan = NamespacePlan::new(requested_listener.port());
let (state_dir, cache_dir) = config.runtime_dirs(std::process::id() as i32);
config.arti.state_dir = state_dir;
config.arti.cache_dir = cache_dir;
config.tcp_stack = TcpStackConfig::System(SystemTcpStackConfig {
listen: format!("{}:{}", plan.host_ip, plan.listener_port),
});
let namespace = NamespaceGuard::create(&plan)?;
let tor_client = bootstrap_client(&config).await?;
let tor_handle = spawn_with_client(config, tor_client.clone()).await?;
wait_for_listener(SocketAddr::new(
IpAddr::V4(plan.host_ip),
plan.listener_port,
))
.await?;
let dns_handle = spawn_dns(
SocketAddr::new(IpAddr::V4(plan.host_ip), CHILD_DNS_PORT),
tor_client,
)
.await?;
let status = namespace.run_child(&command).await;
let dns_shutdown = dns_handle.shutdown().await;
let tor_shutdown = tor_handle.shutdown().await;
let status = status?;
dns_shutdown?;
tor_shutdown?;
child_exit_code(status)
}
fn ensure_root() -> Result<()> {
if unsafe { libc::geteuid() } != 0 {
bail!("tor-exec currently requires root on linux");
}
Ok(())
}
fn ensure_host_tool(tool: &str) -> Result<()> {
let status = Command::new("sh")
.args(["-lc", &format!("command -v {tool} >/dev/null")])
.status()
.with_context(|| format!("failed to probe required tool '{tool}'"))?;
if !status.success() {
bail!("required host tool '{tool}' is not available");
}
Ok(())
}
async fn wait_for_listener(addr: SocketAddr) -> Result<()> {
let deadline = tokio::time::Instant::now() + LISTENER_READY_TIMEOUT;
loop {
match tokio::net::TcpStream::connect(addr).await {
Ok(stream) => {
drop(stream);
return Ok(());
}
Err(err) if tokio::time::Instant::now() < deadline => {
debug!(%addr, ?err, "waiting for tor transparent listener");
tokio::time::sleep(LISTENER_READY_POLL).await;
}
Err(err) => return Err(err).with_context(|| format!("timed out waiting for {addr}")),
}
}
}
fn child_exit_code(status: ExitStatus) -> Result<i32> {
if let Some(code) = status.code() {
return Ok(code);
}
if let Some(signal) = status.signal() {
return Ok(128 + signal);
}
bail!("child process terminated without an exit code");
}
#[derive(Debug, Clone)]
struct NamespacePlan {
netns_name: String,
host_if: String,
child_if: String,
host_ip: Ipv4Addr,
child_ip: Ipv4Addr,
listener_port: u16,
}
impl NamespacePlan {
fn new(listener_port: u16) -> Self {
let token = std::process::id() % 10_000;
let segment = ((std::process::id() % 200) as u8) + 20;
Self {
netns_name: format!("burrow-tor-{token}"),
host_if: format!("bth{token}"),
child_if: format!("btc{token}"),
host_ip: Ipv4Addr::new(100, 90, segment, 1),
child_ip: Ipv4Addr::new(100, 90, segment, 2),
listener_port,
}
}
fn host_cidr(&self) -> String {
format!("{}/{}", self.host_ip, CHILD_PREFIX_LEN)
}
fn child_cidr(&self) -> String {
format!("{}/{}", self.child_ip, CHILD_PREFIX_LEN)
}
}
struct NamespaceGuard {
plan: NamespacePlan,
resolv_conf: PathBuf,
nat_rule_installed: bool,
forward_rule_installed: bool,
netns_created: bool,
host_link_created: bool,
}
impl NamespaceGuard {
fn create(plan: &NamespacePlan) -> Result<Self> {
let mut guard = Self {
plan: plan.clone(),
resolv_conf: write_resolv_conf(plan.host_ip)?,
nat_rule_installed: false,
forward_rule_installed: false,
netns_created: false,
host_link_created: false,
};
let setup = (|| -> Result<()> {
run_host_command(["ip", "netns", "add", &guard.plan.netns_name])?;
guard.netns_created = true;
run_host_command([
"ip",
"link",
"add",
&guard.plan.host_if,
"type",
"veth",
"peer",
"name",
&guard.plan.child_if,
])?;
guard.host_link_created = true;
run_host_command([
"ip",
"addr",
"add",
&guard.plan.host_cidr(),
"dev",
&guard.plan.host_if,
])?;
run_host_command(["ip", "link", "set", &guard.plan.host_if, "up"])?;
run_host_command([
"ip",
"link",
"set",
&guard.plan.child_if,
"netns",
&guard.plan.netns_name,
])?;
run_host_command([
"ip",
"netns",
"exec",
&guard.plan.netns_name,
"ip",
"link",
"set",
"lo",
"up",
])?;
run_host_command([
"ip",
"netns",
"exec",
&guard.plan.netns_name,
"ip",
"addr",
"add",
&guard.plan.child_cidr(),
"dev",
&guard.plan.child_if,
])?;
run_host_command([
"ip",
"netns",
"exec",
&guard.plan.netns_name,
"ip",
"link",
"set",
&guard.plan.child_if,
"up",
])?;
run_host_command([
"ip",
"netns",
"exec",
&guard.plan.netns_name,
"ip",
"route",
"add",
"default",
"via",
&guard.plan.host_ip.to_string(),
"dev",
&guard.plan.child_if,
])?;
run_host_command([
"iptables",
"-t",
"nat",
"-A",
"PREROUTING",
"-i",
&guard.plan.host_if,
"-p",
"tcp",
"-j",
"DNAT",
"--to-destination",
&format!("{}:{}", guard.plan.host_ip, guard.plan.listener_port),
])?;
guard.nat_rule_installed = true;
run_host_command([
"iptables",
"-A",
"FORWARD",
"-i",
&guard.plan.host_if,
"-j",
"REJECT",
])?;
guard.forward_rule_installed = true;
Ok(())
})();
if let Err(err) = setup {
guard.cleanup();
return Err(err);
}
Ok(guard)
}
async fn run_child(&self, command: &[String]) -> Result<ExitStatus> {
let mut args = vec![
OsString::from("netns"),
OsString::from("exec"),
OsString::from(&self.plan.netns_name),
OsString::from("unshare"),
OsString::from("--user"),
OsString::from("--map-root-user"),
OsString::from("--mount"),
OsString::from("--pid"),
OsString::from("--fork"),
OsString::from("--kill-child"),
OsString::from("sh"),
OsString::from("-ceu"),
OsString::from(CHILD_SCRIPT),
OsString::from("sh"),
self.resolv_conf.as_os_str().to_os_string(),
];
args.extend(command.iter().map(OsString::from));
let status = TokioCommand::new("ip")
.args(args)
.stdin(Stdio::inherit())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.status()
.await
.context("failed to execute child in tor namespace")?;
Ok(status)
}
fn cleanup(&mut self) {
if self.forward_rule_installed {
let _ = run_host_command([
"iptables",
"-D",
"FORWARD",
"-i",
&self.plan.host_if,
"-j",
"REJECT",
]);
self.forward_rule_installed = false;
}
if self.nat_rule_installed {
let _ = run_host_command([
"iptables",
"-t",
"nat",
"-D",
"PREROUTING",
"-i",
&self.plan.host_if,
"-p",
"tcp",
"-j",
"DNAT",
"--to-destination",
&format!("{}:{}", self.plan.host_ip, self.plan.listener_port),
]);
self.nat_rule_installed = false;
}
if self.host_link_created {
let _ = run_host_command(["ip", "link", "delete", &self.plan.host_if]);
self.host_link_created = false;
}
if self.netns_created {
let _ = run_host_command(["ip", "netns", "delete", &self.plan.netns_name]);
self.netns_created = false;
}
let _ = fs::remove_file(&self.resolv_conf);
}
}
impl Drop for NamespaceGuard {
fn drop(&mut self) {
self.cleanup();
}
}
fn write_resolv_conf(nameserver: Ipv4Addr) -> Result<PathBuf> {
let path = std::env::temp_dir().join(format!("burrow-tor-resolv-{}.conf", std::process::id()));
fs::write(&path, format!("nameserver {nameserver}\noptions ndots:1\n"))
.with_context(|| format!("failed to write {}", path.display()))?;
Ok(path)
}
fn run_host_command<const N: usize>(args: [&str; N]) -> Result<()> {
let (program, rest) = args
.split_first()
.expect("run_host_command requires a program and arguments");
let status = Command::new(program)
.args(rest)
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::piped())
.status()
.with_context(|| format!("failed to start host command {}", shell_words(&args)))?;
if status.success() {
Ok(())
} else {
bail!("host command failed: {}", shell_words(&args));
}
}
fn shell_words(args: &[&str]) -> String {
args.iter()
.map(|arg| shlex_escape(arg))
.collect::<Vec<_>>()
.join(" ")
}
fn shlex_escape(value: &str) -> String {
if value
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || "-_./:=+".contains(ch))
{
value.to_string()
} else {
format!("'{}'", value.replace('\'', "'\\''"))
}
}
const CHILD_SCRIPT: &str = r#"
mount -t proc proc /proc
mount --bind "$1" /etc/resolv.conf
shift
exec "$@"
"#;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn namespace_plan_uses_short_interface_names() {
let plan = NamespacePlan::new(9040);
assert!(plan.host_if.len() <= 15);
assert!(plan.child_if.len() <= 15);
}
#[test]
fn signal_exit_code_uses_shell_convention() {
let status = ExitStatus::from_raw(libc::SIGTERM);
assert_eq!(child_exit_code(status).unwrap(), 128 + libc::SIGTERM);
}
}

9
burrow/src/tor/mod.rs Normal file
View file

@ -0,0 +1,9 @@
mod config;
mod dns;
mod exec;
mod runtime;
mod system;
pub use config::{ArtiConfig, Config, SystemTcpStackConfig, TcpStackConfig};
pub use exec::run_exec;
pub use runtime::{bootstrap_client, spawn, spawn_with_client, TorHandle};

129
burrow/src/tor/runtime.rs Normal file
View file

@ -0,0 +1,129 @@
use std::{sync::Arc, time::Duration};
use anyhow::{Context, Result};
use arti_client::{config::TorClientConfigBuilder, TorClient};
use tokio::{
sync::watch,
task::{JoinError, JoinSet},
};
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tor_rtcompat::PreferredRuntime;
use tracing::{debug, error, info, warn};
use super::{system::SystemTcpStackRuntime, Config, TcpStackConfig};
#[derive(Debug)]
pub struct TorHandle {
shutdown: watch::Sender<bool>,
task: tokio::task::JoinHandle<()>,
}
impl TorHandle {
pub async fn shutdown(self) -> Result<()> {
let _ = self.shutdown.send(true);
match self.task.await {
Ok(()) => Ok(()),
Err(err) if err.is_cancelled() => Ok(()),
Err(err) => Err(join_error(err)),
}
}
}
pub async fn bootstrap_client(config: &Config) -> Result<Arc<TorClient<PreferredRuntime>>> {
let builder =
TorClientConfigBuilder::from_directories(&config.arti.state_dir, &config.arti.cache_dir);
let tor_config = builder.build().context("failed to build arti config")?;
let tor_client = TorClient::create_bootstrapped(tor_config)
.await
.context("failed to bootstrap arti client")?;
Ok(Arc::new(tor_client))
}
pub async fn spawn(config: Config) -> Result<TorHandle> {
let tor_client = bootstrap_client(&config).await?;
spawn_with_client(config, tor_client).await
}
pub async fn spawn_with_client(
config: Config,
tor_client: Arc<TorClient<PreferredRuntime>>,
) -> Result<TorHandle> {
let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
let task = match config.tcp_stack.clone() {
TcpStackConfig::System(system_config) => tokio::spawn(async move {
let stack = match SystemTcpStackRuntime::bind(&system_config).await {
Ok(stack) => stack,
Err(err) => {
error!(?err, "failed to bind system tcp stack listener");
return;
}
};
info!(
listen = %stack.local_addr(),
"system tcp stack listener bound for tor transparent proxy"
);
let mut connections = JoinSet::new();
loop {
tokio::select! {
changed = shutdown_rx.changed() => {
match changed {
Ok(()) if *shutdown_rx.borrow() => break,
Ok(()) => continue,
Err(_) => break,
}
}
Some(res) = connections.join_next(), if !connections.is_empty() => {
match res {
Ok(Ok(())) => {}
Ok(Err(err)) => warn!(?err, "transparent proxy task failed"),
Err(err) => warn!(?err, "transparent proxy task panicked"),
}
}
accepted = stack.accept() => {
let (mut inbound, original_dst) = match accepted {
Ok(pair) => pair,
Err(err) => {
warn!(?err, "failed to accept transparent tcp connection");
tokio::time::sleep(Duration::from_millis(50)).await;
continue;
}
};
let tor_client = tor_client.clone();
connections.spawn(async move {
debug!(%original_dst, "accepted transparent tcp connection");
let tor_stream = tor_client
.connect((original_dst.ip().to_string(), original_dst.port()))
.await
.with_context(|| format!("failed to connect to {original_dst} over tor"))?;
let mut tor_stream = tor_stream.compat();
tokio::io::copy_bidirectional(&mut inbound, &mut tor_stream)
.await
.with_context(|| format!("failed to bridge tor stream for {original_dst}"))?;
Result::<()>::Ok(())
});
}
}
}
connections.abort_all();
while let Some(res) = connections.join_next().await {
match res {
Ok(Ok(())) => {}
Ok(Err(err)) => debug!(?err, "transparent proxy task failed during shutdown"),
Err(err) => debug!(?err, "transparent proxy task exited during shutdown"),
}
}
}),
};
Ok(TorHandle {
shutdown: shutdown_tx,
task,
})
}
fn join_error(err: JoinError) -> anyhow::Error {
anyhow::anyhow!("tor runtime task failed: {err}")
}

140
burrow/src/tor/system.rs Normal file
View file

@ -0,0 +1,140 @@
use std::net::SocketAddr;
use anyhow::{Context, Result};
use tokio::net::{TcpListener, TcpStream};
use super::SystemTcpStackConfig;
pub struct SystemTcpStackRuntime {
listener: TcpListener,
}
impl SystemTcpStackRuntime {
pub async fn bind(config: &SystemTcpStackConfig) -> Result<Self> {
let listener = TcpListener::bind(&config.listen)
.await
.with_context(|| format!("failed to bind transparent listener on {}", config.listen))?;
Ok(Self { listener })
}
pub fn local_addr(&self) -> SocketAddr {
self.listener
.local_addr()
.expect("listener should always have a local address")
}
pub async fn accept(&self) -> Result<(TcpStream, SocketAddr)> {
let (stream, _) = self
.listener
.accept()
.await
.context("failed to accept transparent listener connection")?;
let original_dst = original_destination(&stream)?;
Ok((stream, original_dst))
}
}
#[cfg(target_os = "linux")]
fn original_destination(stream: &TcpStream) -> Result<SocketAddr> {
use std::{
mem::{size_of, MaybeUninit},
os::fd::AsRawFd,
};
let level = if stream.local_addr()?.is_ipv6() {
libc::SOL_IPV6
} else {
libc::SOL_IP
};
let mut addr = MaybeUninit::<libc::sockaddr_storage>::zeroed();
let mut len = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
let rc = unsafe {
libc::getsockopt(
stream.as_raw_fd(),
level,
80,
addr.as_mut_ptr().cast(),
&mut len,
)
};
if rc != 0 {
return Err(std::io::Error::last_os_error()).context("SO_ORIGINAL_DST lookup failed");
}
socket_addr_from_storage(unsafe { &addr.assume_init() }, len as usize)
}
#[cfg(not(target_os = "linux"))]
fn original_destination(_stream: &TcpStream) -> Result<SocketAddr> {
anyhow::bail!("system tcp stack transparent destination lookup is only implemented on linux")
}
fn socket_addr_from_storage(addr: &libc::sockaddr_storage, len: usize) -> Result<SocketAddr> {
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6};
if len < std::mem::size_of::<libc::sa_family_t>() {
anyhow::bail!("socket address buffer was too short");
}
match addr.ss_family as i32 {
libc::AF_INET => {
let addr_in = unsafe { *(addr as *const _ as *const libc::sockaddr_in) };
let ip = Ipv4Addr::from(u32::from_be(addr_in.sin_addr.s_addr));
let port = u16::from_be(addr_in.sin_port);
Ok(SocketAddr::V4(SocketAddrV4::new(ip, port)))
}
libc::AF_INET6 => {
let addr_in = unsafe { *(addr as *const _ as *const libc::sockaddr_in6) };
let ip = Ipv6Addr::from(addr_in.sin6_addr.s6_addr);
let port = u16::from_be(addr_in.sin6_port);
Ok(SocketAddr::V6(SocketAddrV6::new(
ip,
port,
addr_in.sin6_flowinfo,
addr_in.sin6_scope_id,
)))
}
family => anyhow::bail!("unsupported socket address family {family}"),
}
}
#[cfg(all(test, target_os = "linux"))]
mod tests {
use super::*;
use std::{
mem::size_of,
net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6},
};
#[test]
fn parses_ipv4_socket_addr() {
let mut storage = unsafe { std::mem::zeroed::<libc::sockaddr_storage>() };
let addr_in = unsafe { &mut *(&mut storage as *mut _ as *mut libc::sockaddr_in) };
addr_in.sin_family = libc::AF_INET as libc::sa_family_t;
addr_in.sin_port = u16::to_be(9040);
addr_in.sin_addr = libc::in_addr {
s_addr: u32::to_be(u32::from(Ipv4Addr::new(127, 0, 0, 1))),
};
let parsed = socket_addr_from_storage(&storage, size_of::<libc::sockaddr_in>()).unwrap();
assert_eq!(parsed, SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9040)));
}
#[test]
fn parses_ipv6_socket_addr() {
let mut storage = unsafe { std::mem::zeroed::<libc::sockaddr_storage>() };
let addr_in = unsafe { &mut *(&mut storage as *mut _ as *mut libc::sockaddr_in6) };
addr_in.sin6_family = libc::AF_INET6 as libc::sa_family_t;
addr_in.sin6_port = u16::to_be(9150);
addr_in.sin6_addr = libc::in6_addr {
s6_addr: Ipv6Addr::LOCALHOST.octets(),
};
let parsed = socket_addr_from_storage(&storage, size_of::<libc::sockaddr_in6>()).unwrap();
assert_eq!(
parsed,
SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 9150, 0, 0))
);
}
}

4
rust-toolchain.toml Normal file
View file

@ -0,0 +1,4 @@
[toolchain]
channel = "1.93.1"
components = ["rustfmt"]
profile = "minimal"