From d3ff21545ceb1369bf6ffc61bfae32a9f0a75d3a Mon Sep 17 00:00:00 2001 From: Jett Chen Date: Sat, 13 Jul 2024 17:32:49 -0700 Subject: [PATCH] spawn GRPC server at daemon start --- burrow/Cargo.toml | 9 ++++ burrow/build.rs | 4 ++ burrow/src/daemon/instance.rs | 73 ++++++++++++++++++++++++++---- burrow/src/daemon/mod.rs | 33 ++++++++++++-- burrow/src/daemon/net/unix.rs | 8 +--- burrow/src/daemon/rpc/grpc_defs.rs | 5 ++ burrow/src/daemon/rpc/mod.rs | 1 + 7 files changed, 114 insertions(+), 19 deletions(-) create mode 100644 burrow/build.rs create mode 100644 burrow/src/daemon/rpc/grpc_defs.rs diff --git a/burrow/Cargo.toml b/burrow/Cargo.toml index 0fb63a5..34ec5c8 100644 --- a/burrow/Cargo.toml +++ b/burrow/Cargo.toml @@ -58,6 +58,11 @@ reqwest = { version = "0.12", default-features = false, features = [ ] } rusqlite = "0.31.0" dotenv = "0.15.0" +tonic = "0.12.0" +prost = "0.13.1" +prost-types = "0.13.1" +tokio-stream = "0.1" +async-stream = "0.2" [target.'cfg(target_os = "linux")'.dependencies] caps = "0.5" @@ -83,3 +88,7 @@ pre_uninstall_script = "../package/rpm/pre_uninstall" [features] tokio-console = ["dep:console-subscriber"] bundled = ["rusqlite/bundled"] + + +[build-dependencies] +tonic-build = "0.12.0" diff --git a/burrow/build.rs b/burrow/build.rs new file mode 100644 index 0000000..8eea5dc --- /dev/null +++ b/burrow/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("../proto/burrow.proto")?; + Ok(()) +} diff --git a/burrow/src/daemon/instance.rs b/burrow/src/daemon/instance.rs index bc506bd..62e8e18 100644 --- a/burrow/src/daemon/instance.rs +++ b/burrow/src/daemon/instance.rs @@ -10,19 +10,24 @@ use tun::tokio::TunInterface; use crate::{ daemon::rpc::{ - DaemonCommand, - DaemonNotification, - DaemonResponse, - DaemonResponseData, - ServerConfig, + DaemonCommand, DaemonNotification, DaemonResponse, DaemonResponseData, ServerConfig, ServerInfo, }, database::{get_connection, load_interface}, wireguard::{Config, Interface}, }; +use tonic::{Request, Response, Status as RspStatus}; + +use super::rpc::grpc_defs::{ + tunnel_server::Tunnel, Empty, TunnelConfigurationResponse, TunnelStatusResponse, + WireGuardNetwork, WireGuardPeer, +}; +use tokio_stream::{wrappers::ReceiverStream, Stream}; + +#[derive(Debug, Clone)] enum RunState { - Running(JoinHandle>), + Running, Idle, } @@ -63,7 +68,7 @@ impl DaemonInstance { match command { DaemonCommand::Start(st) => { match self.wg_state { - RunState::Running(_) => { + RunState::Running => { warn!("Got start, but tun interface already up."); } RunState::Idle => { @@ -82,7 +87,7 @@ impl DaemonInstance { let twlock = tmp_wg.read().await; twlock.run().await }); - self.wg_state = RunState::Running(run_task); + self.wg_state = RunState::Running; info!("Daemon started tun interface"); } } @@ -128,3 +133,55 @@ impl DaemonInstance { Ok(()) } } + +#[derive(Clone)] +pub struct DaemonRPCServer { + tun_interface: Arc>>, + wg_interface: Arc>, + config: Arc>, + db_path: Option, + wg_state: RunState, +} + +impl DaemonRPCServer { + pub fn new( + wg_interface: Arc>, + config: Arc>, + db_path: Option<&Path>, + ) -> Self { + Self { + tun_interface: Arc::new(RwLock::new(None)), + wg_interface, + config, + db_path: db_path.map(|p| p.to_owned()), + wg_state: RunState::Idle, + } + } +} + +#[tonic::async_trait] +impl Tunnel for DaemonRPCServer { + async fn tunnel_configuration( + &self, + _request: Request, + ) -> Result, RspStatus> { + unimplemented!() + } + + async fn tunnel_start(&self, _request: Request) -> Result, RspStatus> { + return Ok(Response::new(Empty {})); + } + + async fn tunnel_stop(&self, _request: Request) -> Result, RspStatus> { + return Ok(Response::new(Empty {})); + } + + type TunnelStatusStream = ReceiverStream>; + + async fn tunnel_status( + &self, + _request: Request, + ) -> Result, RspStatus> { + unimplemented!() + } +} diff --git a/burrow/src/daemon/mod.rs b/burrow/src/daemon/mod.rs index 4469e90..fffd379 100644 --- a/burrow/src/daemon/mod.rs +++ b/burrow/src/daemon/mod.rs @@ -5,11 +5,18 @@ mod instance; mod net; pub mod rpc; +use crate::daemon::rpc::grpc_defs::tunnel_server::TunnelServer; +use anyhow::Error as AhError; use anyhow::Result; -use instance::DaemonInstance; +use instance::{DaemonInstance, DaemonRPCServer}; pub use net::{DaemonClient, Listener}; pub use rpc::{DaemonCommand, DaemonResponseData, DaemonStartOptions}; -use tokio::sync::{Notify, RwLock}; +use tokio::{ + net::UnixListener, + sync::{Notify, RwLock}, +}; +use tokio_stream::wrappers::UnixListenerStream; +use tonic::transport::Server; use tracing::{error, info}; use crate::{ @@ -45,9 +52,25 @@ pub async fn daemon_main( response_tx, subscribe_tx, Arc::new(RwLock::new(iface)), - Arc::new(RwLock::new(config)), - db_path, + Arc::new(RwLock::new(config.clone())), + db_path.clone(), ); + let dbp = db_path.clone(); + let burrow_server = DaemonRPCServer::new( + Arc::new(RwLock::new(config.clone().try_into()?)), + Arc::new(RwLock::new(config)), + dbp, + ); + let spp = socket_path.clone(); + let uds = UnixListener::bind(spp.unwrap())?; + let serve_job = tokio::spawn(async move { + let uds_stream = UnixListenerStream::new(uds); + let srv = Server::builder() + .add_service(TunnelServer::new(burrow_server)) + .serve_with_incoming(uds_stream) + .await?; + Ok::<(), AhError>(()) + }); info!("Starting daemon..."); @@ -67,7 +90,7 @@ pub async fn daemon_main( result }); - tokio::try_join!(main_job, listener_job) + tokio::try_join!(main_job, listener_job, serve_job) .map(|_| ()) .map_err(|e| e.into()) } diff --git a/burrow/src/daemon/net/unix.rs b/burrow/src/daemon/net/unix.rs index 70c4207..218598f 100644 --- a/burrow/src/daemon/net/unix.rs +++ b/burrow/src/daemon/net/unix.rs @@ -11,11 +11,7 @@ use tokio::{ use tracing::{debug, error, info}; use crate::daemon::rpc::{ - DaemonCommand, - DaemonMessage, - DaemonNotification, - DaemonRequest, - DaemonResponse, + DaemonCommand, DaemonMessage, DaemonNotification, DaemonRequest, DaemonResponse, DaemonResponseData, }; @@ -36,7 +32,7 @@ pub struct Listener { cmd_tx: async_channel::Sender, rsp_rx: async_channel::Receiver, sub_chan: async_channel::Receiver, - inner: UnixListener, + pub inner: UnixListener, } impl Listener { diff --git a/burrow/src/daemon/rpc/grpc_defs.rs b/burrow/src/daemon/rpc/grpc_defs.rs new file mode 100644 index 0000000..f3085ee --- /dev/null +++ b/burrow/src/daemon/rpc/grpc_defs.rs @@ -0,0 +1,5 @@ +pub use burrowgrpc::*; + +mod burrowgrpc { + tonic::include_proto!("burrow"); +} diff --git a/burrow/src/daemon/rpc/mod.rs b/burrow/src/daemon/rpc/mod.rs index 4146e71..47e35bf 100644 --- a/burrow/src/daemon/rpc/mod.rs +++ b/burrow/src/daemon/rpc/mod.rs @@ -1,3 +1,4 @@ +pub mod grpc_defs; pub mod notification; pub mod request; pub mod response;