spawn GRPC server at daemon start

This commit is contained in:
Jett Chen 2024-07-13 17:32:49 -07:00
parent aa634d03e2
commit d3ff21545c
7 changed files with 114 additions and 19 deletions

View file

@ -58,6 +58,11 @@ reqwest = { version = "0.12", default-features = false, features = [
] }
rusqlite = "0.31.0"
dotenv = "0.15.0"
tonic = "0.12.0"
prost = "0.13.1"
prost-types = "0.13.1"
tokio-stream = "0.1"
async-stream = "0.2"
[target.'cfg(target_os = "linux")'.dependencies]
caps = "0.5"
@ -83,3 +88,7 @@ pre_uninstall_script = "../package/rpm/pre_uninstall"
[features]
tokio-console = ["dep:console-subscriber"]
bundled = ["rusqlite/bundled"]
[build-dependencies]
tonic-build = "0.12.0"

4
burrow/build.rs Normal file
View file

@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../proto/burrow.proto")?;
Ok(())
}

View file

@ -10,19 +10,24 @@ use tun::tokio::TunInterface;
use crate::{
daemon::rpc::{
DaemonCommand,
DaemonNotification,
DaemonResponse,
DaemonResponseData,
ServerConfig,
DaemonCommand, DaemonNotification, DaemonResponse, DaemonResponseData, ServerConfig,
ServerInfo,
},
database::{get_connection, load_interface},
wireguard::{Config, Interface},
};
use tonic::{Request, Response, Status as RspStatus};
use super::rpc::grpc_defs::{
tunnel_server::Tunnel, Empty, TunnelConfigurationResponse, TunnelStatusResponse,
WireGuardNetwork, WireGuardPeer,
};
use tokio_stream::{wrappers::ReceiverStream, Stream};
#[derive(Debug, Clone)]
enum RunState {
Running(JoinHandle<Result<()>>),
Running,
Idle,
}
@ -63,7 +68,7 @@ impl DaemonInstance {
match command {
DaemonCommand::Start(st) => {
match self.wg_state {
RunState::Running(_) => {
RunState::Running => {
warn!("Got start, but tun interface already up.");
}
RunState::Idle => {
@ -82,7 +87,7 @@ impl DaemonInstance {
let twlock = tmp_wg.read().await;
twlock.run().await
});
self.wg_state = RunState::Running(run_task);
self.wg_state = RunState::Running;
info!("Daemon started tun interface");
}
}
@ -128,3 +133,55 @@ impl DaemonInstance {
Ok(())
}
}
#[derive(Clone)]
pub struct DaemonRPCServer {
tun_interface: Arc<RwLock<Option<TunInterface>>>,
wg_interface: Arc<RwLock<Interface>>,
config: Arc<RwLock<Config>>,
db_path: Option<PathBuf>,
wg_state: RunState,
}
impl DaemonRPCServer {
pub fn new(
wg_interface: Arc<RwLock<Interface>>,
config: Arc<RwLock<Config>>,
db_path: Option<&Path>,
) -> Self {
Self {
tun_interface: Arc::new(RwLock::new(None)),
wg_interface,
config,
db_path: db_path.map(|p| p.to_owned()),
wg_state: RunState::Idle,
}
}
}
#[tonic::async_trait]
impl Tunnel for DaemonRPCServer {
async fn tunnel_configuration(
&self,
_request: Request<Empty>,
) -> Result<Response<TunnelConfigurationResponse>, RspStatus> {
unimplemented!()
}
async fn tunnel_start(&self, _request: Request<Empty>) -> Result<Response<Empty>, RspStatus> {
return Ok(Response::new(Empty {}));
}
async fn tunnel_stop(&self, _request: Request<Empty>) -> Result<Response<Empty>, RspStatus> {
return Ok(Response::new(Empty {}));
}
type TunnelStatusStream = ReceiverStream<Result<TunnelStatusResponse, RspStatus>>;
async fn tunnel_status(
&self,
_request: Request<Empty>,
) -> Result<Response<Self::TunnelStatusStream>, RspStatus> {
unimplemented!()
}
}

View file

@ -5,11 +5,18 @@ mod instance;
mod net;
pub mod rpc;
use crate::daemon::rpc::grpc_defs::tunnel_server::TunnelServer;
use anyhow::Error as AhError;
use anyhow::Result;
use instance::DaemonInstance;
use instance::{DaemonInstance, DaemonRPCServer};
pub use net::{DaemonClient, Listener};
pub use rpc::{DaemonCommand, DaemonResponseData, DaemonStartOptions};
use tokio::sync::{Notify, RwLock};
use tokio::{
net::UnixListener,
sync::{Notify, RwLock},
};
use tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::Server;
use tracing::{error, info};
use crate::{
@ -45,9 +52,25 @@ pub async fn daemon_main(
response_tx,
subscribe_tx,
Arc::new(RwLock::new(iface)),
Arc::new(RwLock::new(config)),
db_path,
Arc::new(RwLock::new(config.clone())),
db_path.clone(),
);
let dbp = db_path.clone();
let burrow_server = DaemonRPCServer::new(
Arc::new(RwLock::new(config.clone().try_into()?)),
Arc::new(RwLock::new(config)),
dbp,
);
let spp = socket_path.clone();
let uds = UnixListener::bind(spp.unwrap())?;
let serve_job = tokio::spawn(async move {
let uds_stream = UnixListenerStream::new(uds);
let srv = Server::builder()
.add_service(TunnelServer::new(burrow_server))
.serve_with_incoming(uds_stream)
.await?;
Ok::<(), AhError>(())
});
info!("Starting daemon...");
@ -67,7 +90,7 @@ pub async fn daemon_main(
result
});
tokio::try_join!(main_job, listener_job)
tokio::try_join!(main_job, listener_job, serve_job)
.map(|_| ())
.map_err(|e| e.into())
}

View file

@ -11,11 +11,7 @@ use tokio::{
use tracing::{debug, error, info};
use crate::daemon::rpc::{
DaemonCommand,
DaemonMessage,
DaemonNotification,
DaemonRequest,
DaemonResponse,
DaemonCommand, DaemonMessage, DaemonNotification, DaemonRequest, DaemonResponse,
DaemonResponseData,
};
@ -36,7 +32,7 @@ pub struct Listener {
cmd_tx: async_channel::Sender<DaemonCommand>,
rsp_rx: async_channel::Receiver<DaemonResponse>,
sub_chan: async_channel::Receiver<DaemonNotification>,
inner: UnixListener,
pub inner: UnixListener,
}
impl Listener {

View file

@ -0,0 +1,5 @@
pub use burrowgrpc::*;
mod burrowgrpc {
tonic::include_proto!("burrow");
}

View file

@ -1,3 +1,4 @@
pub mod grpc_defs;
pub mod notification;
pub mod request;
pub mod response;