Compare commits

...
Sign in to create a new pull request.

2 commits

Author SHA1 Message Date
Jett Chen
095e2a6894 update 2024-07-20 10:28:34 -07:00
Jett Chen
d3ff21545c spawn GRPC server at daemon start 2024-07-19 17:48:57 -07:00
10 changed files with 418 additions and 35 deletions

212
Cargo.lock generated
View file

@ -132,17 +132,38 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "async-stream"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22068c0c19514942eefcfd4daf8976ef1aad84e61539f95cd200c35202f80af5"
dependencies = [
"async-stream-impl 0.2.1",
"futures-core",
]
[[package]]
name = "async-stream"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51"
dependencies = [
"async-stream-impl",
"async-stream-impl 0.3.5",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25f9db3b38af870bf7e5cc649167533b493928e50744e2c30ae350230b414670"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "async-stream-impl"
version = "0.3.5"
@ -165,6 +186,12 @@ dependencies = [
"syn 2.0.48",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "autocfg"
version = "1.1.0"
@ -392,6 +419,7 @@ dependencies = [
"aead",
"anyhow",
"async-channel",
"async-stream 0.2.1",
"axum 0.7.5",
"base64 0.21.7",
"blake2",
@ -412,6 +440,8 @@ dependencies = [
"nix 0.27.1",
"once_cell",
"parking_lot",
"prost 0.13.1",
"prost-types 0.13.1",
"rand",
"rand_core",
"reqwest 0.12.5",
@ -421,6 +451,9 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"tokio-stream",
"tonic 0.12.1",
"tonic-build",
"tracing",
"tracing-journald",
"tracing-log 0.1.4",
@ -619,9 +652,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd326812b3fd01da5bb1af7d340d0d555fd3d4b641e7f1dfcf5962a902952787"
dependencies = [
"futures-core",
"prost",
"prost-types",
"tonic",
"prost 0.12.3",
"prost-types 0.12.3",
"tonic 0.10.2",
"tracing-core",
]
@ -637,13 +670,13 @@ dependencies = [
"futures-task",
"hdrhistogram",
"humantime",
"prost-types",
"prost-types 0.12.3",
"serde",
"serde_json",
"thread_local",
"tokio",
"tokio-stream",
"tonic",
"tonic 0.10.2",
"tracing",
"tracing-core",
"tracing-subscriber",
@ -876,6 +909,12 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "27573eac26f4dd11e2b1916c3fe1baa56407c83c71a773a8ba17ec0bca03b6b7"
[[package]]
name = "fixedbitset"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "flate2"
version = "1.0.28"
@ -1057,6 +1096,25 @@ dependencies = [
"tracing",
]
[[package]]
name = "h2"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab"
dependencies = [
"atomic-waker",
"bytes",
"fnv",
"futures-core",
"futures-sink",
"http 1.1.0",
"indexmap 2.1.0",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
@ -1215,7 +1273,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"h2",
"h2 0.3.24",
"http 0.2.11",
"http-body 0.4.6",
"httparse",
@ -1238,6 +1296,7 @@ dependencies = [
"bytes",
"futures-channel",
"futures-util",
"h2 0.4.5",
"http 1.1.0",
"http-body 1.0.0",
"httparse",
@ -1279,6 +1338,19 @@ dependencies = [
"tokio-io-timeout",
]
[[package]]
name = "hyper-timeout"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793"
dependencies = [
"hyper 1.4.0",
"hyper-util",
"pin-project-lite",
"tokio",
"tower-service",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
@ -1615,6 +1687,12 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "multimap"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
[[package]]
name = "native-tls"
version = "0.2.11"
@ -1832,6 +1910,16 @@ version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "petgraph"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db"
dependencies = [
"fixedbitset",
"indexmap 2.1.0",
]
[[package]]
name = "pin-project"
version = "1.1.4"
@ -1925,7 +2013,38 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a"
dependencies = [
"bytes",
"prost-derive",
"prost-derive 0.12.3",
]
[[package]]
name = "prost"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc"
dependencies = [
"bytes",
"prost-derive 0.13.1",
]
[[package]]
name = "prost-build"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1"
dependencies = [
"bytes",
"heck",
"itertools",
"log",
"multimap",
"once_cell",
"petgraph",
"prettyplease",
"prost 0.13.1",
"prost-types 0.13.1",
"regex",
"syn 2.0.48",
"tempfile",
]
[[package]]
@ -1941,13 +2060,35 @@ dependencies = [
"syn 2.0.48",
]
[[package]]
name = "prost-derive"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn 2.0.48",
]
[[package]]
name = "prost-types"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "193898f59edcf43c26227dcd4c8427f00d99d61e95dcde58dabd49fa291d470e"
dependencies = [
"prost",
"prost 0.12.3",
]
[[package]]
name = "prost-types"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2"
dependencies = [
"prost 0.13.1",
]
[[package]]
@ -2100,7 +2241,7 @@ dependencies = [
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"h2 0.3.24",
"http 0.2.11",
"http-body 0.4.6",
"hyper 0.14.28",
@ -2761,19 +2902,19 @@ version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e"
dependencies = [
"async-stream",
"async-stream 0.3.5",
"async-trait",
"axum 0.6.20",
"base64 0.21.7",
"bytes",
"h2",
"h2 0.3.24",
"http 0.2.11",
"http-body 0.4.6",
"hyper 0.14.28",
"hyper-timeout",
"hyper-timeout 0.4.1",
"percent-encoding",
"pin-project",
"prost",
"prost 0.12.3",
"tokio",
"tokio-stream",
"tower",
@ -2782,6 +2923,49 @@ dependencies = [
"tracing",
]
[[package]]
name = "tonic"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38659f4a91aba8598d27821589f5db7dddd94601e7a01b1e485a50e5484c7401"
dependencies = [
"async-stream 0.3.5",
"async-trait",
"axum 0.7.5",
"base64 0.22.1",
"bytes",
"h2 0.4.5",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.0",
"hyper-timeout 0.5.1",
"hyper-util",
"percent-encoding",
"pin-project",
"prost 0.13.1",
"socket2",
"tokio",
"tokio-stream",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tonic-build"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "568392c5a2bd0020723e3f387891176aabafe36fd9fcd074ad309dfa0c8eb964"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"quote",
"syn 2.0.48",
]
[[package]]
name = "tower"
version = "0.4.13"

BIN
burrow.db Normal file

Binary file not shown.

View file

@ -58,6 +58,11 @@ reqwest = { version = "0.12", default-features = false, features = [
] }
rusqlite = "0.31.0"
dotenv = "0.15.0"
tonic = "0.12.0"
prost = "0.13.1"
prost-types = "0.13.1"
tokio-stream = "0.1"
async-stream = "0.2"
[target.'cfg(target_os = "linux")'.dependencies]
caps = "0.5"
@ -83,3 +88,7 @@ pre_uninstall_script = "../package/rpm/pre_uninstall"
[features]
tokio-console = ["dep:console-subscriber"]
bundled = ["rusqlite/bundled"]
[build-dependencies]
tonic-build = "0.12.0"

4
burrow/build.rs Normal file
View file

@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../proto/burrow.proto")?;
Ok(())
}

View file

@ -1,13 +1,27 @@
use std::{
ops::Deref,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use anyhow::Result;
use tokio::{sync::RwLock, task::JoinHandle};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status as RspStatus};
use tracing::{debug, info, warn};
use tun::tokio::TunInterface;
use tun::{tokio::TunInterface, TunOptions};
use super::rpc::grpc_defs::{
networks_server::Networks,
tunnel_server::Tunnel,
Empty,
NetworkDeleteRequest,
NetworkListResponse,
NetworkReorderRequest,
TunnelConfigurationResponse,
TunnelStatusResponse,
};
use crate::{
daemon::rpc::{
DaemonCommand,
@ -21,8 +35,9 @@ use crate::{
wireguard::{Config, Interface},
};
#[derive(Debug, Clone)]
enum RunState {
Running(JoinHandle<Result<()>>),
Running,
Idle,
}
@ -63,7 +78,7 @@ impl DaemonInstance {
match command {
DaemonCommand::Start(st) => {
match self.wg_state {
RunState::Running(_) => {
RunState::Running => {
warn!("Got start, but tun interface already up.");
}
RunState::Idle => {
@ -82,7 +97,7 @@ impl DaemonInstance {
let twlock = tmp_wg.read().await;
twlock.run().await
});
self.wg_state = RunState::Running(run_task);
self.wg_state = RunState::Running;
info!("Daemon started tun interface");
}
}
@ -128,3 +143,143 @@ impl DaemonInstance {
Ok(())
}
}
#[derive(Clone)]
pub struct DaemonRPCServer {
tun_interface: Arc<RwLock<Option<TunInterface>>>,
wg_interface: Arc<RwLock<Interface>>,
config: Arc<RwLock<Config>>,
db_path: Option<PathBuf>,
wg_state: Arc<RwLock<RunState>>,
}
impl DaemonRPCServer {
pub fn new(
wg_interface: Arc<RwLock<Interface>>,
config: Arc<RwLock<Config>>,
db_path: Option<&Path>,
) -> Self {
Self {
tun_interface: Arc::new(RwLock::new(None)),
wg_interface,
config,
db_path: db_path.map(|p| p.to_owned()),
wg_state: Arc::new(RwLock::new(RunState::Idle)),
}
}
}
#[tonic::async_trait]
impl Tunnel for DaemonRPCServer {
type TunnelConfigurationStream = ReceiverStream<Result<TunnelConfigurationResponse, RspStatus>>;
type TunnelStatusStream = ReceiverStream<Result<TunnelStatusResponse, RspStatus>>;
async fn tunnel_configuration(
&self,
_request: Request<Empty>,
) -> Result<Response<Self::TunnelConfigurationStream>, RspStatus> {
let (tx, rx) = mpsc::channel(10);
tokio::spawn(async move {
let serv_config = ServerConfig::default();
tx.send(Ok(TunnelConfigurationResponse {
mtu: serv_config.mtu.unwrap_or(1000),
addresses: serv_config.address,
}))
.await
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn tunnel_start(&self, _request: Request<Empty>) -> Result<Response<Empty>, RspStatus> {
match self.wg_state.read().await.deref() {
RunState::Idle => {
let tun_if = TunOptions::new().open()?;
debug!("Setting tun on wg_interface");
self.tun_interface.write().await.replace(tun_if);
self.wg_interface
.write()
.await
.set_tun_ref(self.tun_interface.clone())
.await;
debug!("tun set on wg_interface");
debug!("Setting tun_interface");
debug!("tun_interface set: {:?}", self.tun_interface);
debug!("Cloning wg_interface");
let tmp_wg = self.wg_interface.clone();
let run_task = tokio::spawn(async move {
let twlock = tmp_wg.read().await;
twlock.run().await
});
let mut guard = self.wg_state.write().await;
*guard = RunState::Running;
}
RunState::Running => {
warn!("Got start, but tun interface already up.");
}
}
return Ok(Response::new(Empty {}));
}
async fn tunnel_stop(&self, _request: Request<Empty>) -> Result<Response<Empty>, RspStatus> {
return Ok(Response::new(Empty {}));
}
async fn tunnel_status(
&self,
_request: Request<Empty>,
) -> Result<Response<Self::TunnelStatusStream>, RspStatus> {
let (tx, rx) = mpsc::channel(10);
tokio::spawn(async move {
for _ in 0..1000 {
tx.send(Ok(TunnelStatusResponse { ..Default::default() }))
.await;
tokio::time::sleep(Duration::from_secs(100)).await;
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
#[tonic::async_trait]
impl Networks for DaemonRPCServer {
type NetworkListStream = ReceiverStream<Result<NetworkListResponse, RspStatus>>;
async fn network_add(&self, _request: Request<Empty>) -> Result<Response<Empty>, RspStatus> {
debug!("Mock network_add called");
Ok(Response::new(Empty {}))
}
async fn network_list(
&self,
_request: Request<Empty>,
) -> Result<Response<Self::NetworkListStream>, RspStatus> {
debug!("Mock network_list called");
let (tx, rx) = mpsc::channel(10);
tokio::spawn(async move {
tx.send(Ok(NetworkListResponse { ..Default::default() }))
.await
.unwrap();
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn network_reorder(
&self,
_request: Request<NetworkReorderRequest>,
) -> Result<Response<Empty>, RspStatus> {
debug!("Mock network_reorder called");
Ok(Response::new(Empty {}))
}
async fn network_delete(
&self,
_request: Request<NetworkDeleteRequest>,
) -> Result<Response<Empty>, RspStatus> {
debug!("Mock network_delete called");
Ok(Response::new(Empty {}))
}
}

View file

@ -5,14 +5,20 @@ mod instance;
mod net;
pub mod rpc;
use anyhow::Result;
use instance::DaemonInstance;
use anyhow::{Error as AhError, Result};
use instance::{DaemonInstance, DaemonRPCServer};
pub use net::{DaemonClient, Listener};
pub use rpc::{DaemonCommand, DaemonResponseData, DaemonStartOptions};
use tokio::sync::{Notify, RwLock};
use tokio::{
net::UnixListener,
sync::{Notify, RwLock},
};
use tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::Server;
use tracing::{error, info};
use crate::{
daemon::rpc::grpc_defs::{networks_server::NetworksServer, tunnel_server::TunnelServer},
database::{get_connection, load_interface},
wireguard::Interface,
};
@ -45,9 +51,26 @@ pub async fn daemon_main(
response_tx,
subscribe_tx,
Arc::new(RwLock::new(iface)),
Arc::new(RwLock::new(config)),
db_path,
Arc::new(RwLock::new(config.clone())),
db_path.clone(),
);
let dbp = db_path.clone();
let burrow_server = DaemonRPCServer::new(
Arc::new(RwLock::new(config.clone().try_into()?)),
Arc::new(RwLock::new(config)),
dbp,
);
let spp = socket_path.clone();
let uds = UnixListener::bind(spp.unwrap_or(Path::new("burrow_grpc.sock")))?;
let serve_job = tokio::spawn(async move {
let uds_stream = UnixListenerStream::new(uds);
let _srv = Server::builder()
.add_service(TunnelServer::new(burrow_server.clone()))
.add_service(NetworksServer::new(burrow_server))
.serve_with_incoming(uds_stream)
.await?;
Ok::<(), AhError>(())
});
info!("Starting daemon...");
@ -67,7 +90,7 @@ pub async fn daemon_main(
result
});
tokio::try_join!(main_job, listener_job)
tokio::try_join!(main_job, listener_job, serve_job)
.map(|_| ())
.map_err(|e| e.into())
}

View file

@ -11,11 +11,7 @@ use tokio::{
use tracing::{debug, error, info};
use crate::daemon::rpc::{
DaemonCommand,
DaemonMessage,
DaemonNotification,
DaemonRequest,
DaemonResponse,
DaemonCommand, DaemonMessage, DaemonNotification, DaemonRequest, DaemonResponse,
DaemonResponseData,
};
@ -36,7 +32,7 @@ pub struct Listener {
cmd_tx: async_channel::Sender<DaemonCommand>,
rsp_rx: async_channel::Receiver<DaemonResponse>,
sub_chan: async_channel::Receiver<DaemonNotification>,
inner: UnixListener,
pub inner: UnixListener,
}
impl Listener {

View file

@ -0,0 +1,5 @@
pub use burrowgrpc::*;
mod burrowgrpc {
tonic::include_proto!("burrow");
}

View file

@ -1,3 +1,4 @@
pub mod grpc_defs;
pub mod notification;
pub mod request;
pub mod response;

View file

@ -93,6 +93,12 @@ impl Interface {
*st = IfaceStatus::Running;
}
pub async fn set_tun_ref(&mut self, tun: Arc<RwLock<Option<TunInterface>>>) {
self.tun = tun;
let mut st = self.status.write().await;
*st = IfaceStatus::Running;
}
pub fn get_tun(&self) -> Arc<RwLock<Option<TunInterface>>> {
self.tun.clone()
}
@ -135,7 +141,7 @@ impl Interface {
Some(addr) => addr,
None => {
debug!("No destination found");
continue
continue;
}
};
@ -154,7 +160,7 @@ impl Interface {
}
Err(e) => {
log::error!("Failed to send packet {}", e);
continue
continue;
}
};
}
@ -175,7 +181,7 @@ impl Interface {
let main_tsk = async move {
if let Err(e) = pcb.open_if_closed().await {
log::error!("failed to open pcb: {}", e);
return
return;
}
let r2 = pcb.run(tun).await;
if let Err(e) = r2 {
@ -195,7 +201,7 @@ impl Interface {
Ok(..) => (),
Err(e) => {
error!("Failed to update timers: {}", e);
return
return;
}
}
}