diff --git a/Cargo.lock b/Cargo.lock index 5ef886c..98253f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -132,17 +132,38 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-stream" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22068c0c19514942eefcfd4daf8976ef1aad84e61539f95cd200c35202f80af5" +dependencies = [ + "async-stream-impl 0.2.1", + "futures-core", +] + [[package]] name = "async-stream" version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" dependencies = [ - "async-stream-impl", + "async-stream-impl 0.3.5", "futures-core", "pin-project-lite", ] +[[package]] +name = "async-stream-impl" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25f9db3b38af870bf7e5cc649167533b493928e50744e2c30ae350230b414670" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "async-stream-impl" version = "0.3.5" @@ -165,6 +186,12 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.1.0" @@ -392,6 +419,7 @@ dependencies = [ "aead", "anyhow", "async-channel", + "async-stream 0.2.1", "axum 0.7.5", "base64 0.21.7", "blake2", @@ -412,6 +440,8 @@ dependencies = [ "nix 0.27.1", "once_cell", "parking_lot", + "prost 0.13.1", + "prost-types 0.13.1", "rand", "rand_core", "reqwest 0.12.5", @@ -421,6 +451,9 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-stream", + "tonic 0.12.1", + "tonic-build", "tracing", "tracing-journald", "tracing-log 0.1.4", @@ -619,9 +652,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd326812b3fd01da5bb1af7d340d0d555fd3d4b641e7f1dfcf5962a902952787" dependencies = [ "futures-core", - "prost", - "prost-types", - "tonic", + "prost 0.12.3", + "prost-types 0.12.3", + "tonic 0.10.2", "tracing-core", ] @@ -637,13 +670,13 @@ dependencies = [ "futures-task", "hdrhistogram", "humantime", - "prost-types", + "prost-types 0.12.3", "serde", "serde_json", "thread_local", "tokio", "tokio-stream", - "tonic", + "tonic 0.10.2", "tracing", "tracing-core", "tracing-subscriber", @@ -876,6 +909,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27573eac26f4dd11e2b1916c3fe1baa56407c83c71a773a8ba17ec0bca03b6b7" +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flate2" version = "1.0.28" @@ -1057,6 +1096,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", + "indexmap 2.1.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1215,7 +1273,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.24", "http 0.2.11", "http-body 0.4.6", "httparse", @@ -1238,6 +1296,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", + "h2 0.4.5", "http 1.1.0", "http-body 1.0.0", "httparse", @@ -1279,6 +1338,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-timeout" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" +dependencies = [ + "hyper 1.4.0", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1615,6 +1687,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "multimap" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" + [[package]] name = "native-tls" version = "0.2.11" @@ -1832,6 +1910,16 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "petgraph" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +dependencies = [ + "fixedbitset", + "indexmap 2.1.0", +] + [[package]] name = "pin-project" version = "1.1.4" @@ -1925,7 +2013,38 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.12.3", +] + +[[package]] +name = "prost" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc" +dependencies = [ + "bytes", + "prost-derive 0.13.1", +] + +[[package]] +name = "prost-build" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" +dependencies = [ + "bytes", + "heck", + "itertools", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost 0.13.1", + "prost-types 0.13.1", + "regex", + "syn 2.0.48", + "tempfile", ] [[package]] @@ -1941,13 +2060,35 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "prost-derive" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "prost-types" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "193898f59edcf43c26227dcd4c8427f00d99d61e95dcde58dabd49fa291d470e" dependencies = [ - "prost", + "prost 0.12.3", +] + +[[package]] +name = "prost-types" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2" +dependencies = [ + "prost 0.13.1", ] [[package]] @@ -2100,7 +2241,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", + "h2 0.3.24", "http 0.2.11", "http-body 0.4.6", "hyper 0.14.28", @@ -2761,19 +2902,19 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" dependencies = [ - "async-stream", + "async-stream 0.3.5", "async-trait", "axum 0.6.20", "base64 0.21.7", "bytes", - "h2", + "h2 0.3.24", "http 0.2.11", "http-body 0.4.6", "hyper 0.14.28", - "hyper-timeout", + "hyper-timeout 0.4.1", "percent-encoding", "pin-project", - "prost", + "prost 0.12.3", "tokio", "tokio-stream", "tower", @@ -2782,6 +2923,49 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38659f4a91aba8598d27821589f5db7dddd94601e7a01b1e485a50e5484c7401" +dependencies = [ + "async-stream 0.3.5", + "async-trait", + "axum 0.7.5", + "base64 0.22.1", + "bytes", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.4.0", + "hyper-timeout 0.5.1", + "hyper-util", + "percent-encoding", + "pin-project", + "prost 0.13.1", + "socket2", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "568392c5a2bd0020723e3f387891176aabafe36fd9fcd074ad309dfa0c8eb964" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn 2.0.48", +] + [[package]] name = "tower" version = "0.4.13" diff --git a/burrow.db b/burrow.db new file mode 100644 index 0000000..3fd6215 Binary files /dev/null and b/burrow.db differ diff --git a/burrow/Cargo.toml b/burrow/Cargo.toml index 0fb63a5..34ec5c8 100644 --- a/burrow/Cargo.toml +++ b/burrow/Cargo.toml @@ -58,6 +58,11 @@ reqwest = { version = "0.12", default-features = false, features = [ ] } rusqlite = "0.31.0" dotenv = "0.15.0" +tonic = "0.12.0" +prost = "0.13.1" +prost-types = "0.13.1" +tokio-stream = "0.1" +async-stream = "0.2" [target.'cfg(target_os = "linux")'.dependencies] caps = "0.5" @@ -83,3 +88,7 @@ pre_uninstall_script = "../package/rpm/pre_uninstall" [features] tokio-console = ["dep:console-subscriber"] bundled = ["rusqlite/bundled"] + + +[build-dependencies] +tonic-build = "0.12.0" diff --git a/burrow/build.rs b/burrow/build.rs new file mode 100644 index 0000000..8eea5dc --- /dev/null +++ b/burrow/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("../proto/burrow.proto")?; + Ok(()) +} diff --git a/burrow/src/daemon/instance.rs b/burrow/src/daemon/instance.rs index bc506bd..e3202db 100644 --- a/burrow/src/daemon/instance.rs +++ b/burrow/src/daemon/instance.rs @@ -1,13 +1,27 @@ use std::{ + ops::Deref, path::{Path, PathBuf}, sync::Arc, + time::Duration, }; use anyhow::Result; -use tokio::{sync::RwLock, task::JoinHandle}; +use tokio::sync::{mpsc, RwLock}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{Request, Response, Status as RspStatus}; use tracing::{debug, info, warn}; -use tun::tokio::TunInterface; +use tun::{tokio::TunInterface, TunOptions}; +use super::rpc::grpc_defs::{ + networks_server::Networks, + tunnel_server::Tunnel, + Empty, + NetworkDeleteRequest, + NetworkListResponse, + NetworkReorderRequest, + TunnelConfigurationResponse, + TunnelStatusResponse, +}; use crate::{ daemon::rpc::{ DaemonCommand, @@ -21,8 +35,9 @@ use crate::{ wireguard::{Config, Interface}, }; +#[derive(Debug, Clone)] enum RunState { - Running(JoinHandle>), + Running, Idle, } @@ -63,7 +78,7 @@ impl DaemonInstance { match command { DaemonCommand::Start(st) => { match self.wg_state { - RunState::Running(_) => { + RunState::Running => { warn!("Got start, but tun interface already up."); } RunState::Idle => { @@ -82,7 +97,7 @@ impl DaemonInstance { let twlock = tmp_wg.read().await; twlock.run().await }); - self.wg_state = RunState::Running(run_task); + self.wg_state = RunState::Running; info!("Daemon started tun interface"); } } @@ -128,3 +143,143 @@ impl DaemonInstance { Ok(()) } } + +#[derive(Clone)] +pub struct DaemonRPCServer { + tun_interface: Arc>>, + wg_interface: Arc>, + config: Arc>, + db_path: Option, + wg_state: Arc>, +} + +impl DaemonRPCServer { + pub fn new( + wg_interface: Arc>, + config: Arc>, + db_path: Option<&Path>, + ) -> Self { + Self { + tun_interface: Arc::new(RwLock::new(None)), + wg_interface, + config, + db_path: db_path.map(|p| p.to_owned()), + wg_state: Arc::new(RwLock::new(RunState::Idle)), + } + } +} + +#[tonic::async_trait] +impl Tunnel for DaemonRPCServer { + type TunnelConfigurationStream = ReceiverStream>; + type TunnelStatusStream = ReceiverStream>; + + async fn tunnel_configuration( + &self, + _request: Request, + ) -> Result, RspStatus> { + let (tx, rx) = mpsc::channel(10); + tokio::spawn(async move { + let serv_config = ServerConfig::default(); + tx.send(Ok(TunnelConfigurationResponse { + mtu: serv_config.mtu.unwrap_or(1000), + addresses: serv_config.address, + })) + .await + }); + Ok(Response::new(ReceiverStream::new(rx))) + } + + async fn tunnel_start(&self, _request: Request) -> Result, RspStatus> { + match self.wg_state.read().await.deref() { + RunState::Idle => { + let tun_if = TunOptions::new().open()?; + debug!("Setting tun on wg_interface"); + self.tun_interface.write().await.replace(tun_if); + self.wg_interface + .write() + .await + .set_tun_ref(self.tun_interface.clone()) + .await; + debug!("tun set on wg_interface"); + + debug!("Setting tun_interface"); + debug!("tun_interface set: {:?}", self.tun_interface); + + debug!("Cloning wg_interface"); + let tmp_wg = self.wg_interface.clone(); + let run_task = tokio::spawn(async move { + let twlock = tmp_wg.read().await; + twlock.run().await + }); + let mut guard = self.wg_state.write().await; + *guard = RunState::Running; + } + + RunState::Running => { + warn!("Got start, but tun interface already up."); + } + } + + return Ok(Response::new(Empty {})); + } + + async fn tunnel_stop(&self, _request: Request) -> Result, RspStatus> { + return Ok(Response::new(Empty {})); + } + + async fn tunnel_status( + &self, + _request: Request, + ) -> Result, RspStatus> { + let (tx, rx) = mpsc::channel(10); + tokio::spawn(async move { + for _ in 0..1000 { + tx.send(Ok(TunnelStatusResponse { ..Default::default() })) + .await; + tokio::time::sleep(Duration::from_secs(100)).await; + } + }); + Ok(Response::new(ReceiverStream::new(rx))) + } +} + +#[tonic::async_trait] +impl Networks for DaemonRPCServer { + type NetworkListStream = ReceiverStream>; + + async fn network_add(&self, _request: Request) -> Result, RspStatus> { + debug!("Mock network_add called"); + Ok(Response::new(Empty {})) + } + + async fn network_list( + &self, + _request: Request, + ) -> Result, RspStatus> { + debug!("Mock network_list called"); + let (tx, rx) = mpsc::channel(10); + tokio::spawn(async move { + tx.send(Ok(NetworkListResponse { ..Default::default() })) + .await + .unwrap(); + }); + Ok(Response::new(ReceiverStream::new(rx))) + } + + async fn network_reorder( + &self, + _request: Request, + ) -> Result, RspStatus> { + debug!("Mock network_reorder called"); + Ok(Response::new(Empty {})) + } + + async fn network_delete( + &self, + _request: Request, + ) -> Result, RspStatus> { + debug!("Mock network_delete called"); + Ok(Response::new(Empty {})) + } +} diff --git a/burrow/src/daemon/mod.rs b/burrow/src/daemon/mod.rs index 4469e90..5896e4b 100644 --- a/burrow/src/daemon/mod.rs +++ b/burrow/src/daemon/mod.rs @@ -5,14 +5,20 @@ mod instance; mod net; pub mod rpc; -use anyhow::Result; -use instance::DaemonInstance; +use anyhow::{Error as AhError, Result}; +use instance::{DaemonInstance, DaemonRPCServer}; pub use net::{DaemonClient, Listener}; pub use rpc::{DaemonCommand, DaemonResponseData, DaemonStartOptions}; -use tokio::sync::{Notify, RwLock}; +use tokio::{ + net::UnixListener, + sync::{Notify, RwLock}, +}; +use tokio_stream::wrappers::UnixListenerStream; +use tonic::transport::Server; use tracing::{error, info}; use crate::{ + daemon::rpc::grpc_defs::{networks_server::NetworksServer, tunnel_server::TunnelServer}, database::{get_connection, load_interface}, wireguard::Interface, }; @@ -45,9 +51,26 @@ pub async fn daemon_main( response_tx, subscribe_tx, Arc::new(RwLock::new(iface)), - Arc::new(RwLock::new(config)), - db_path, + Arc::new(RwLock::new(config.clone())), + db_path.clone(), ); + let dbp = db_path.clone(); + let burrow_server = DaemonRPCServer::new( + Arc::new(RwLock::new(config.clone().try_into()?)), + Arc::new(RwLock::new(config)), + dbp, + ); + let spp = socket_path.clone(); + let uds = UnixListener::bind(spp.unwrap_or(Path::new("burrow_grpc.sock")))?; + let serve_job = tokio::spawn(async move { + let uds_stream = UnixListenerStream::new(uds); + let _srv = Server::builder() + .add_service(TunnelServer::new(burrow_server.clone())) + .add_service(NetworksServer::new(burrow_server)) + .serve_with_incoming(uds_stream) + .await?; + Ok::<(), AhError>(()) + }); info!("Starting daemon..."); @@ -67,7 +90,7 @@ pub async fn daemon_main( result }); - tokio::try_join!(main_job, listener_job) + tokio::try_join!(main_job, listener_job, serve_job) .map(|_| ()) .map_err(|e| e.into()) } diff --git a/burrow/src/daemon/net/unix.rs b/burrow/src/daemon/net/unix.rs index 70c4207..218598f 100644 --- a/burrow/src/daemon/net/unix.rs +++ b/burrow/src/daemon/net/unix.rs @@ -11,11 +11,7 @@ use tokio::{ use tracing::{debug, error, info}; use crate::daemon::rpc::{ - DaemonCommand, - DaemonMessage, - DaemonNotification, - DaemonRequest, - DaemonResponse, + DaemonCommand, DaemonMessage, DaemonNotification, DaemonRequest, DaemonResponse, DaemonResponseData, }; @@ -36,7 +32,7 @@ pub struct Listener { cmd_tx: async_channel::Sender, rsp_rx: async_channel::Receiver, sub_chan: async_channel::Receiver, - inner: UnixListener, + pub inner: UnixListener, } impl Listener { diff --git a/burrow/src/daemon/rpc/grpc_defs.rs b/burrow/src/daemon/rpc/grpc_defs.rs new file mode 100644 index 0000000..f3085ee --- /dev/null +++ b/burrow/src/daemon/rpc/grpc_defs.rs @@ -0,0 +1,5 @@ +pub use burrowgrpc::*; + +mod burrowgrpc { + tonic::include_proto!("burrow"); +} diff --git a/burrow/src/daemon/rpc/mod.rs b/burrow/src/daemon/rpc/mod.rs index 4146e71..47e35bf 100644 --- a/burrow/src/daemon/rpc/mod.rs +++ b/burrow/src/daemon/rpc/mod.rs @@ -1,3 +1,4 @@ +pub mod grpc_defs; pub mod notification; pub mod request; pub mod response; diff --git a/burrow/src/wireguard/iface.rs b/burrow/src/wireguard/iface.rs index 84b5489..321801b 100755 --- a/burrow/src/wireguard/iface.rs +++ b/burrow/src/wireguard/iface.rs @@ -93,6 +93,12 @@ impl Interface { *st = IfaceStatus::Running; } + pub async fn set_tun_ref(&mut self, tun: Arc>>) { + self.tun = tun; + let mut st = self.status.write().await; + *st = IfaceStatus::Running; + } + pub fn get_tun(&self) -> Arc>> { self.tun.clone() } @@ -135,7 +141,7 @@ impl Interface { Some(addr) => addr, None => { debug!("No destination found"); - continue + continue; } }; @@ -154,7 +160,7 @@ impl Interface { } Err(e) => { log::error!("Failed to send packet {}", e); - continue + continue; } }; } @@ -175,7 +181,7 @@ impl Interface { let main_tsk = async move { if let Err(e) = pcb.open_if_closed().await { log::error!("failed to open pcb: {}", e); - return + return; } let r2 = pcb.run(tun).await; if let Err(e) = r2 { @@ -195,7 +201,7 @@ impl Interface { Ok(..) => (), Err(e) => { error!("Failed to update timers: {}", e); - return + return; } } }