Simplified process startup on macOS and Linux

This commit is contained in:
Conrad Kramer 2024-01-21 16:18:13 -08:00
parent 9e03c9680c
commit 7cc1f3119e
40 changed files with 1343 additions and 1157 deletions

View file

@ -12,46 +12,44 @@ crate-type = ["lib", "staticlib"]
anyhow = "1.0"
tokio = { version = "1.21", features = ["rt", "macros", "sync", "io-util", "rt-multi-thread", "time", "tracing"] }
tun = { version = "0.1", path = "../tun", features = ["serde", "tokio"] }
clap = { version = "4.3.2", features = ["derive"] }
clap = { version = "4.4", features = ["derive"] }
tracing = "0.1"
tracing-log = "0.1"
tracing-journald = "0.3"
tracing-oslog = {git = "https://github.com/Stormshield-robinc/tracing-oslog"}
tracing-subscriber = { version = "0.3" , features = ["std", "env-filter"]}
env_logger = "0.10"
tracing-oslog = { git = "https://github.com/Stormshield-robinc/tracing-oslog" }
tracing-subscriber = { version = "0.3" , features = ["std", "env-filter"] }
log = "0.4"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
blake2 = "0.10.6"
chacha20poly1305 = "0.10.1"
rand = "0.8.5"
rand_core = "0.6.4"
aead = "0.5.2"
x25519-dalek = { version = "2.0.0", features = ["reusable_secrets", "static_secrets"] }
ring = "0.17.7"
parking_lot = "0.12.1"
serde_json = "1.0"
blake2 = "0.10"
chacha20poly1305 = "0.10"
rand = "0.8"
rand_core = "0.6"
aead = "0.5"
x25519-dalek = { version = "2.0", features = ["reusable_secrets", "static_secrets"] }
ring = "0.17"
parking_lot = "0.12"
hmac = "0.12"
ipnet = { version = "2.8.0", features = ["serde"] }
base64 = "0.21.4"
fehler = "1.0.0"
ip_network_table = "0.2.0"
ip_network = "0.4.0"
async-channel = "2.1.1"
base64 = "0.21"
fehler = "1.0"
ip_network_table = "0.2"
ip_network = "0.4"
async-channel = "2.1"
schemars = "0.8"
futures = "0.3.28"
uuid = { version = "1.6.1", features = ["v4"] }
console-subscriber = { version = "0.2.0" , optional = true}
once_cell = "1.19"
console-subscriber = { version = "0.2.0" , optional = true }
console = "0.15.8"
[target.'cfg(target_os = "linux")'.dependencies]
caps = "0.5.5"
libsystemd = "0.6"
caps = "0.5"
libsystemd = "0.7"
tracing-journald = "0.3"
[target.'cfg(target_vendor = "apple")'.dependencies]
nix = { version = "0.26.2" }
nix = { version = "0.27" }
[dev-dependencies]
insta = { version = "1.32.0", features = ["yaml"] }
etherparse = "0.12"
insta = { version = "1.32", features = ["yaml"] }
[package.metadata.generate-rpm]
assets = [
@ -63,4 +61,4 @@ post_install_script = "../package/rpm/post_install"
pre_uninstall_script = "../package/rpm/pre_uninstall"
[features]
tokio-console = ["dep:console-subscriber"]
tokio-console = ["dep:console-subscriber"]

View file

@ -1,13 +0,0 @@
use tracing::debug;
use tracing_oslog::OsLogger;
use tracing_subscriber::layer::SubscriberExt;
pub use crate::daemon::start_srv;
#[no_mangle]
pub extern "C" fn initialize_oslog() {
let collector =
tracing_subscriber::registry().with(OsLogger::new("com.hackclub.burrow", "backend"));
tracing::subscriber::set_global_default(collector).unwrap();
debug!("Initialized oslog tracing in libburrow rust FFI");
}

View file

@ -0,0 +1,55 @@
use std::{
ffi::{c_char, CStr},
path::PathBuf,
sync::Arc,
thread,
};
use once_cell::sync::OnceCell;
use tokio::{
runtime::{Builder, Handle},
sync::Notify,
};
use tracing::error;
use crate::daemon::daemon_main;
static BURROW_NOTIFY: OnceCell<Arc<Notify>> = OnceCell::new();
static BURROW_HANDLE: OnceCell<Handle> = OnceCell::new();
#[no_mangle]
pub unsafe extern "C" fn spawn_in_process(path: *const c_char) {
crate::tracing::initialize();
let notify = BURROW_NOTIFY.get_or_init(|| Arc::new(Notify::new()));
let handle = BURROW_HANDLE.get_or_init(|| {
let path_buf = if path.is_null() {
None
} else {
Some(PathBuf::from(CStr::from_ptr(path).to_str().unwrap()))
};
let sender = notify.clone();
let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
thread::spawn(move || {
let runtime = Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.thread_name("burrow-worker")
.build()
.unwrap();
handle_tx.send(runtime.handle().clone()).unwrap();
runtime.block_on(async {
let result = daemon_main(path_buf.as_deref(), Some(sender.clone())).await;
if let Err(error) = result.as_ref() {
error!("Burrow thread exited: {}", error);
}
result
})
});
handle_rx.blocking_recv().unwrap()
});
let receiver = notify.clone();
handle.block_on(async move { receiver.notified().await });
}

View file

@ -1,5 +1,6 @@
use std::sync::Arc;
use std::{path::Path, sync::Arc};
pub mod apple;
mod command;
mod instance;
mod net;
@ -8,44 +9,52 @@ mod response;
use anyhow::Result;
pub use command::{DaemonCommand, DaemonStartOptions};
use instance::DaemonInstance;
#[cfg(target_vendor = "apple")]
pub use net::start_srv;
pub use net::DaemonClient;
pub use net::{DaemonClient, Listener};
pub use response::{DaemonResponse, DaemonResponseData, ServerInfo};
use tokio::sync::{Notify, RwLock};
use tracing::{error, info};
use crate::{
daemon::net::listen,
wireguard::{Config, Interface},
};
use crate::wireguard::{Config, Interface};
pub async fn daemon_main(notify_ready: Option<Arc<Notify>>) -> Result<()> {
pub async fn daemon_main(path: Option<&Path>, notify_ready: Option<Arc<Notify>>) -> Result<()> {
let (commands_tx, commands_rx) = async_channel::unbounded();
let (response_tx, response_rx) = async_channel::unbounded();
let listener = if let Some(path) = path {
info!("Creating listener... {:?}", path);
Listener::new_with_path(commands_tx, response_rx, path)
} else {
info!("Creating listener...");
Listener::new(commands_tx, response_rx)
};
if let Some(n) = notify_ready {
n.notify_one()
}
let listener = listener?;
let config = Config::default();
let iface: Interface = config.try_into()?;
let mut instance = DaemonInstance::new(commands_rx, response_tx, Arc::new(RwLock::new(iface)));
let mut inst: DaemonInstance =
DaemonInstance::new(commands_rx, response_tx, Arc::new(RwLock::new(iface)));
info!("Starting daemon...");
tracing::info!("Starting daemon jobs...");
let inst_job = tokio::spawn(async move {
let res = inst.run().await;
if let Err(e) = res {
tracing::error!("Error when running instance: {}", e);
let main_job = tokio::spawn(async move {
let result = instance.run().await;
if let Err(e) = result.as_ref() {
error!("Instance exited: {}", e);
}
result
});
let listen_job = tokio::spawn(async move {
let res = listen(commands_tx, response_rx, notify_ready).await;
if let Err(e) = res {
tracing::error!("Error when listening: {}", e);
let listener_job = tokio::spawn(async move {
let result = listener.run().await;
if let Err(e) = result.as_ref() {
error!("Listener exited: {}", e);
}
result
});
tokio::try_join!(inst_job, listen_job)
tokio::try_join!(main_job, listener_job)
.map(|_| ())
.map_err(|e| e.into())
}

View file

@ -1,32 +0,0 @@
use std::sync::Arc;
use std::thread;
use tokio::runtime::Runtime;
use tokio::sync::Notify;
use tracing::{error, info};
use crate::daemon::{daemon_main, DaemonClient};
#[no_mangle]
pub extern "C" fn start_srv() {
info!("Starting server");
let start_notify = Arc::new(Notify::new());
let start_recv = start_notify.clone();
let _handle = thread::spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async {
if let Err(e) = daemon_main(Some(start_notify.clone())).await {
error!("Error when starting rpc server: {}", e);
}
});
start_notify.notify_one();
});
let rt = Runtime::new().unwrap();
rt.block_on(async {
start_recv.notified().await;
match DaemonClient::new().await {
Ok(..) => info!("Server successfully started"),
Err(e) => error!("Could not connect to server: {}", e)
}
});
}

View file

@ -4,28 +4,18 @@ use super::DaemonCommand;
#[cfg(target_family = "unix")]
mod unix;
#[cfg(all(target_family = "unix", not(target_os = "linux")))]
pub use unix::{listen, DaemonClient};
#[cfg(target_os = "linux")]
mod systemd;
#[cfg(target_os = "linux")]
pub use systemd::{listen, DaemonClient};
#[cfg(target_family = "unix")]
pub use unix::{DaemonClient, Listener};
#[cfg(target_os = "windows")]
mod windows;
#[cfg(target_os = "windows")]
pub use windows::{listen, DaemonClient};
#[cfg(target_vendor = "apple")]
mod apple;
#[cfg(target_vendor = "apple")]
pub use apple::start_srv;
pub use windows::{DaemonClient, Listener};
#[derive(Clone, Serialize, Deserialize)]
pub struct DaemonRequest {
pub id: u32,
pub id: u64,
pub command: DaemonCommand,
}

View file

@ -1,33 +0,0 @@
use std::os::fd::IntoRawFd;
use std::sync::Arc;
use anyhow::Result;
use tokio::sync::Notify;
use super::*;
use crate::daemon::DaemonResponse;
pub async fn listen(
cmd_tx: async_channel::Sender<DaemonCommand>,
rsp_rx: async_channel::Receiver<DaemonResponse>,
notify: Option<Arc<Notify>>
) -> Result<()> {
if !libsystemd::daemon::booted()
|| listen_with_systemd(cmd_tx.clone(), rsp_rx.clone())
.await
.is_err()
{
unix::listen(cmd_tx, rsp_rx, notify).await?;
}
Ok(())
}
async fn listen_with_systemd(
cmd_tx: async_channel::Sender<DaemonCommand>,
rsp_rx: async_channel::Receiver<DaemonResponse>,
) -> Result<()> {
let fds = libsystemd::activation::receive_descriptors(false)?;
super::unix::listen_with_optional_fd(cmd_tx, rsp_rx, Some(fds[0].clone().into_raw_fd()), None).await
}
pub type DaemonClient = unix::DaemonClient;

View file

@ -1,21 +1,15 @@
use std::{
io,
os::{
fd::{FromRawFd, RawFd},
unix::net::UnixListener as StdUnixListener,
},
path::{Path, PathBuf},
};
use std::sync::Arc;
#[cfg(target_os = "linux")]
use std::os::fd::{IntoRawFd, RawFd};
use std::{ffi::OsStr, io, path::Path};
use anyhow::{anyhow, Result};
use anyhow::{anyhow, Error, Result};
use fehler::throws;
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::{UnixListener, UnixStream},
};
use tracing::{debug, info};
use tracing::{debug, error, info};
use tokio::sync::Notify;
use super::*;
use crate::daemon::{DaemonCommand, DaemonResponse, DaemonResponseData};
@ -25,141 +19,178 @@ const UNIX_SOCKET_PATH: &str = "/run/burrow.sock";
#[cfg(target_vendor = "apple")]
const UNIX_SOCKET_PATH: &str = "burrow.sock";
#[cfg(target_os = "macos")]
fn fetch_socket_path() -> Option<PathBuf> {
let tries = vec![
"burrow.sock".to_string(),
format!(
"{}/Library/Containers/com.hackclub.burrow.network/Data/burrow.sock",
std::env::var("HOME").unwrap_or_default()
)
.to_string(),
];
for path in tries {
let path = PathBuf::from(path);
if path.exists() {
return Some(path)
}
}
None
}
#[cfg(not(target_os = "macos"))]
fn fetch_socket_path() -> Option<PathBuf> {
Some(Path::new(UNIX_SOCKET_PATH).to_path_buf())
}
pub async fn listen(
#[derive(Debug)]
pub struct Listener {
cmd_tx: async_channel::Sender<DaemonCommand>,
rsp_rx: async_channel::Receiver<DaemonResponse>,
notify: Option<Arc<Notify>>
) -> Result<()> {
listen_with_optional_fd(cmd_tx, rsp_rx, None, notify).await
inner: UnixListener,
}
pub(crate) async fn listen_with_optional_fd(
cmd_tx: async_channel::Sender<DaemonCommand>,
rsp_rx: async_channel::Receiver<DaemonResponse>,
raw_fd: Option<RawFd>,
notify: Option<Arc<Notify>>
) -> Result<()> {
let path = Path::new(UNIX_SOCKET_PATH);
let listener = if let Some(raw_fd) = raw_fd {
let listener = unsafe { StdUnixListener::from_raw_fd(raw_fd) };
listener.set_nonblocking(true)?;
UnixListener::from_std(listener)
} else {
UnixListener::bind(path)
};
let listener = if let Ok(listener) = listener {
listener
} else {
// Won't help all that much, if we use the async version of fs.
if let Some(par) = path.parent() {
std::fs::create_dir_all(par)?;
}
match std::fs::remove_file(path) {
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
stuff => stuff,
}?;
info!("Relative path: {}", path.to_string_lossy());
UnixListener::bind(path)?
};
if let Some(notify) = notify {
notify.notify_one();
impl Listener {
#[throws]
pub fn new(
cmd_tx: async_channel::Sender<DaemonCommand>,
rsp_rx: async_channel::Receiver<DaemonResponse>,
) -> Self {
let path = Path::new(OsStr::new(UNIX_SOCKET_PATH));
Self::new_with_path(cmd_tx, rsp_rx, path)?
}
loop {
let (stream, _) = listener.accept().await?;
let cmd_tx = cmd_tx.clone();
// I'm pretty sure we won't need to manually join / shut this down,
// `lines` will return Err during dropping, and this task should exit
// gracefully.
let rsp_rxc = rsp_rx.clone();
tokio::task::spawn(async move {
let cmd_tx = cmd_tx;
let mut stream = stream;
let (mut read_stream, mut write_stream) = stream.split();
let buf_reader = BufReader::new(&mut read_stream);
let mut lines = buf_reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
info!("Got line: {}", line);
debug!("Line raw data: {:?}", line.as_bytes());
let mut res: DaemonResponse = DaemonResponseData::None.into();
let req = match serde_json::from_str::<DaemonRequest>(&line) {
Ok(req) => Some(req),
Err(e) => {
res.result = Err(e.to_string());
tracing::error!("Failed to parse request: {}", e);
None
}
};
let mut res = serde_json::to_string(&res).unwrap();
res.push('\n');
#[throws]
#[cfg(target_os = "linux")]
pub fn new_with_path(
cmd_tx: async_channel::Sender<DaemonCommand>,
rsp_rx: async_channel::Receiver<DaemonResponse>,
path: &Path,
) -> Self {
let inner = listener_from_path_or_fd(&path, raw_fd())?;
Self { cmd_tx, rsp_rx, inner }
}
if let Some(req) = req {
cmd_tx.send(req.command).await.unwrap();
let res = rsp_rxc.recv().await.unwrap().with_id(req.id);
let mut retres = serde_json::to_string(&res).unwrap();
retres.push('\n');
info!("Sending response: {}", retres);
write_stream.write_all(retres.as_bytes()).await.unwrap();
} else {
write_stream.write_all(res.as_bytes()).await.unwrap();
#[throws]
#[cfg(not(target_os = "linux"))]
pub fn new_with_path(
cmd_tx: async_channel::Sender<DaemonCommand>,
rsp_rx: async_channel::Receiver<DaemonResponse>,
path: &Path,
) -> Self {
let inner = listener_from_path(path)?;
Self { cmd_tx, rsp_rx, inner }
}
pub async fn run(&self) -> Result<()> {
info!("Waiting for connections...");
loop {
let (stream, _) = self.inner.accept().await?;
let cmd_tx = self.cmd_tx.clone();
let rsp_rxc = self.rsp_rx.clone();
tokio::task::spawn(async move {
info!("Got connection: {:?}", stream);
Self::stream(stream, cmd_tx, rsp_rxc).await;
});
}
}
async fn stream(
stream: UnixStream,
cmd_tx: async_channel::Sender<DaemonCommand>,
rsp_rxc: async_channel::Receiver<DaemonResponse>,
) {
let mut stream = stream;
let (mut read_stream, mut write_stream) = stream.split();
let buf_reader = BufReader::new(&mut read_stream);
let mut lines = buf_reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
info!("Line: {}", line);
let mut res: DaemonResponse = DaemonResponseData::None.into();
let req = match serde_json::from_str::<DaemonRequest>(&line) {
Ok(req) => Some(req),
Err(e) => {
res.result = Err(e.to_string());
error!("Failed to parse request: {}", e);
None
}
};
let mut res = serde_json::to_string(&res).unwrap();
res.push('\n');
if let Some(req) = req {
cmd_tx.send(req.command).await.unwrap();
let res = rsp_rxc.recv().await.unwrap().with_id(req.id);
let mut retres = serde_json::to_string(&res).unwrap();
retres.push('\n');
info!("Sending response: {}", retres);
write_stream.write_all(retres.as_bytes()).await.unwrap();
} else {
write_stream.write_all(res.as_bytes()).await.unwrap();
}
});
}
}
}
#[cfg(target_os = "linux")]
fn raw_fd() -> Option<RawFd> {
if !libsystemd::daemon::booted() {
return None;
}
match libsystemd::activation::receive_descriptors(false) {
Ok(descriptors) => descriptors.into_iter().map(|d| d.into_raw_fd()).next(),
Err(e) => {
tracing::error!("Failed to receive descriptors: {}", e);
None
}
}
}
#[throws]
#[cfg(target_os = "linux")]
fn listener_from_path_or_fd(path: &Path, raw_fd: Option<RawFd>) -> UnixListener {
match raw_fd.map(listener_from_fd) {
Some(Ok(listener)) => listener,
_ => listener_from_path(path)?,
}
}
#[throws]
#[cfg(target_os = "linux")]
fn listener_from_fd(fd: RawFd) -> UnixListener {
use std::os::fd::FromRawFd;
let listener = unsafe { std::os::unix::net::UnixListener::from_raw_fd(fd) };
listener.set_nonblocking(true)?;
UnixListener::from_std(listener)?
}
#[throws]
fn listener_from_path(path: &Path) -> UnixListener {
let error = match UnixListener::bind(path) {
Ok(listener) => return listener,
Err(e) => e,
};
match error.kind() {
io::ErrorKind::NotFound => {
if let Some(parent) = path.parent() {
info!("Creating parent directory {:?}", parent);
std::fs::create_dir_all(parent)?;
}
}
io::ErrorKind::AddrInUse => {
info!("Removing existing file");
match std::fs::remove_file(path) {
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
stuff => stuff,
}?;
}
_ => error!("Failed to bind to {:?}: {}", path, error),
}
UnixListener::bind(path)?
}
#[derive(Debug)]
pub struct DaemonClient {
connection: UnixStream,
stream: UnixStream,
}
impl DaemonClient {
pub async fn new() -> Result<Self> {
let path = fetch_socket_path().ok_or(anyhow!("Failed to find socket path"))?;
// debug!("found path: {:?}", path);
let connection = UnixStream::connect(path).await?;
debug!("connected to socket");
Ok(Self { connection })
let path = Path::new(OsStr::new(UNIX_SOCKET_PATH));
Self::new_with_path(path).await
}
pub async fn new_with_path(path: &str) -> Result<Self> {
let path = Path::new(path);
let connection = UnixStream::connect(path).await?;
Ok(Self { connection })
pub async fn new_with_path(path: &Path) -> Result<Self> {
let stream = UnixStream::connect(path).await?;
Ok(Self { stream })
}
pub async fn send_command(&mut self, command: DaemonCommand) -> Result<DaemonResponse> {
let mut command = serde_json::to_string(&DaemonRequest { id: 0, command })?;
command.push('\n');
self.connection.write_all(command.as_bytes()).await?;
let buf_reader = BufReader::new(&mut self.connection);
self.stream.write_all(command.as_bytes()).await?;
let buf_reader = BufReader::new(&mut self.stream);
let mut lines = buf_reader.lines();
let response = lines
.next_line()

View file

@ -1,23 +1,34 @@
use anyhow::Result;
use fehler::throws;
use super::*;
use super::DaemonCommand;
use crate::daemon::DaemonResponse;
pub async fn listen(
_cmd_tx: async_channel::Sender<DaemonCommand>,
_rsp_rx: async_channel::Receiver<DaemonResponse>,
) -> Result<()> {
unimplemented!("This platform does not currently support daemon mode.")
pub struct Listener;
impl Listener {
pub fn new_with_path(
cmd_tx: async_channel::Sender<DaemonCommand>,
rsp_rx: async_channel::Receiver<DaemonResponse>,
path: &Path,
) -> Self {
Self
}
pub async fn run(&self) -> Result<()> {
Ok(())
}
}
#[derive(Debug)]
pub struct DaemonClient;
impl DaemonClient {
pub async fn new() -> Result<Self> {
unimplemented!("This platform does not currently support daemon mode.")
Ok(Self)
}
pub async fn send_command(&mut self, _: DaemonCommand) -> Result<()> {
pub async fn send_command(&mut self, command: DaemonCommand) -> Result<DaemonResponse> {
unimplemented!("This platform does not currently support daemon mode.")
}
}

View file

@ -6,7 +6,7 @@ use tun::TunInterface;
pub struct DaemonResponse {
// Error types can't be serialized, so this is the second best option.
pub result: Result<DaemonResponseData, String>,
pub id: u32,
pub id: u64,
}
impl DaemonResponse {
@ -25,7 +25,7 @@ impl From<DaemonResponseData> for DaemonResponse {
}
impl DaemonResponse {
pub fn with_id(self, id: u32) -> Self {
pub fn with_id(self, id: u64) -> Self {
Self { id, ..self }
}
}

View file

@ -3,6 +3,10 @@ pub mod wireguard;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
mod daemon;
pub(crate) mod tracing;
#[cfg(target_vendor = "apple")]
pub use daemon::apple::spawn_in_process;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
pub use daemon::{
DaemonClient,
@ -12,9 +16,3 @@ pub use daemon::{
DaemonStartOptions,
ServerInfo,
};
#[cfg(target_vendor = "apple")]
mod apple;
#[cfg(target_vendor = "apple")]
pub use apple::*;

View file

@ -1,14 +1,9 @@
use anyhow::{Context, Result};
use anyhow::Result;
use clap::{Args, Parser, Subcommand};
use tracing::instrument;
use tracing_log::LogTracer;
use tracing_oslog::OsLogger;
use tracing_subscriber::{prelude::*, EnvFilter, FmtSubscriber};
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
use tun::TunInterface;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
mod daemon;
pub(crate) mod tracing;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
mod wireguard;
@ -39,8 +34,6 @@ struct Cli {
enum Commands {
/// Start Burrow
Start(StartArgs),
/// Retrieve the file descriptor of the tun interface
Retrieve(RetrieveArgs),
/// Stop Burrow daemon
Stop,
/// Start Burrow daemon
@ -54,9 +47,6 @@ enum Commands {
#[derive(Args)]
struct StartArgs {}
#[derive(Args)]
struct RetrieveArgs {}
#[derive(Args)]
struct DaemonArgs {}
@ -71,57 +61,6 @@ async fn try_start() -> Result<()> {
.map(|_| ())
}
#[cfg(target_vendor = "apple")]
#[instrument]
async fn try_retrieve() -> Result<()> {
LogTracer::init()
.context("Failed to initialize LogTracer")
.unwrap();
if cfg!(target_os = "linux") || cfg!(target_vendor = "apple") {
let maybe_layer = system_log().unwrap();
if let Some(layer) = maybe_layer {
let logger = layer.with_subscriber(FmtSubscriber::new());
tracing::subscriber::set_global_default(logger)
.context("Failed to set the global tracing subscriber")
.unwrap();
}
}
let iface2 = TunInterface::retrieve().ok_or(anyhow::anyhow!("No interface found"))?;
tracing::info!("{:?}", iface2);
Ok(())
}
async fn initialize_tracing() -> Result<()> {
LogTracer::init().context("Failed to initialize LogTracer")?;
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
{
let maybe_layer = system_log()?;
if let Some(layer) = maybe_layer {
let registry = tracing_subscriber::registry()
.with(layer)
.with(tracing_subscriber::fmt::layer()
.with_line_number(true)
.with_filter(EnvFilter::from_default_env())
);
#[cfg(feature = "tokio-console")]
let registry = registry.with(
console_subscriber::spawn()
.with_filter(EnvFilter::from_default_env()
.add_directive("tokio=trace".parse()?)
.add_directive("runtime=trace".parse()?)
)
);
tracing::subscriber::set_global_default(registry).context("Failed to set the global tracing subscriber")?;
}
}
Ok(())
}
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
async fn try_stop() -> Result<()> {
let mut client = DaemonClient::new().await?;
@ -176,11 +115,6 @@ async fn try_start() -> Result<()> {
Ok(())
}
#[cfg(not(target_vendor = "apple"))]
async fn try_retrieve() -> Result<()> {
Ok(())
}
#[cfg(not(any(target_os = "linux", target_vendor = "apple")))]
async fn try_stop() -> Result<()> {
Ok(())
@ -195,26 +129,17 @@ async fn try_serverinfo() -> Result<()> {
async fn try_serverconfig() -> Result<()> {
Ok(())
}
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
initialize_tracing().await?;
tracing::info!("Platform: {}", std::env::consts::OS);
tracing::initialize();
let cli = Cli::parse();
match &cli.command {
Commands::Start(..) => {
try_start().await?;
tracing::info!("FINISHED");
}
Commands::Retrieve(..) => {
try_retrieve().await?;
tracing::info!("FINISHED");
}
Commands::Stop => {
try_stop().await?;
}
Commands::Daemon(_) => daemon::daemon_main(None).await?,
Commands::Start(..) => try_start().await?,
Commands::Stop => try_stop().await?,
Commands::Daemon(_) => daemon::daemon_main(None, None).await?,
Commands::ServerInfo => try_serverinfo().await?,
Commands::ServerConfig => try_serverconfig().await?,
}
@ -222,23 +147,6 @@ async fn main() -> Result<()> {
Ok(())
}
#[cfg(target_os = "linux")]
fn system_log() -> Result<Option<tracing_journald::Layer>> {
let maybe_journald = tracing_journald::layer();
match maybe_journald {
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
tracing::trace!("journald not found");
Ok(None)
}
_ => Ok(Some(maybe_journald?)),
}
}
#[cfg(target_vendor = "apple")]
fn system_log() -> Result<Option<OsLogger>> {
Ok(Some(OsLogger::new("com.hackclub.burrow", "burrow-cli")))
}
#[cfg(not(any(target_os = "linux", target_vendor = "apple")))]
pub fn main() {
eprintln!("This platform is not supported currently.")

63
burrow/src/tracing.rs Normal file
View file

@ -0,0 +1,63 @@
use std::sync::Once;
use tracing::{error, info};
use tracing_subscriber::{
layer::{Layer, SubscriberExt},
EnvFilter,
Registry,
};
static TRACING: Once = Once::new();
pub fn initialize() {
TRACING.call_once(|| {
if let Err(e) = tracing_log::LogTracer::init() {
error!("Failed to initialize LogTracer: {}", e);
}
#[cfg(target_os = "windows")]
let system_log = Some(tracing_subscriber::fmt::layer());
#[cfg(target_os = "linux")]
let system_log = match tracing_journald::layer() {
Ok(layer) => Some(layer),
Err(e) => {
if e.kind() != std::io::ErrorKind::NotFound {
error!("Failed to initialize journald: {}", e);
}
None
}
};
#[cfg(target_vendor = "apple")]
let system_log = Some(tracing_oslog::OsLogger::new(
"com.hackclub.burrow",
"tracing",
));
let stderr = (console::user_attended_stderr() || system_log.is_none()).then(|| {
tracing_subscriber::fmt::layer()
.with_level(true)
.with_writer(std::io::stderr)
.compact()
.with_filter(EnvFilter::from_default_env())
});
let subscriber = Registry::default().with(stderr).with(system_log);
#[cfg(feature = "tokio-console")]
let subscriber = subscriber.with(
console_subscriber::spawn().with_filter(
EnvFilter::from_default_env()
.add_directive("tokio=trace".parse().unwrap())
.add_directive("runtime=trace".parse().unwrap()),
),
);
if let Err(e) = tracing::subscriber::set_global_default(subscriber) {
error!("Failed to initialize logging: {}", e);
}
info!("Initialized logging")
});
}

View file

@ -153,7 +153,13 @@ impl Interface {
let mut buf = [0u8; 65535];
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
pcb.update_timers(&mut buf).await;
match pcb.update_timers(&mut buf).await {
Ok(..) => (),
Err(e) => {
error!("Failed to update timers: {}", e);
return
}
}
}
};
@ -167,7 +173,7 @@ impl Interface {
tsks.extend(vec![
tokio::spawn(main_tsk),
tokio::spawn(update_timers_tsk),
tokio::spawn(reset_rate_limiter_tsk)
tokio::spawn(reset_rate_limiter_tsk),
]);
debug!("task made..");
}

View file

@ -8,4 +8,3 @@ pub use config::Config;
pub use iface::Interface;
pub use pcb::PeerPcb;
pub use peer::Peer;
pub use x25519_dalek::{PublicKey, StaticSecret};

View file

@ -44,13 +44,7 @@ const MAX_QUEUE_DEPTH: usize = 256;
const N_SESSIONS: usize = 8;
pub mod x25519 {
pub use x25519_dalek::{
EphemeralSecret,
PublicKey,
ReusableSecret,
SharedSecret,
StaticSecret,
};
pub use x25519_dalek::{PublicKey, ReusableSecret, SharedSecret, StaticSecret};
}
#[derive(Debug)]

View file

@ -1,19 +1,17 @@
use std::{net::SocketAddr, sync::Arc};
use std::time::Duration;
use anyhow::{Error, Result};
use fehler::throws;
use ip_network::IpNetwork;
use rand::random;
use tokio::{net::UdpSocket, sync::RwLock, task::JoinHandle, time::timeout};
use tokio::io::AsyncWrite;
use tokio::{net::UdpSocket, sync::RwLock, task::JoinHandle};
use tun::tokio::TunInterface;
use crate::wireguard::noise::errors::WireGuardError;
use super::{
noise::{TunnResult, Tunnel},
Peer,
};
use crate::wireguard::noise::errors::WireGuardError;
#[derive(Debug)]
pub struct PeerPcb {
@ -95,7 +93,13 @@ impl PeerPcb {
TunnResult::WriteToNetwork(packet) => {
tracing::debug!("WriteToNetwork: {:?}", packet);
self.open_if_closed().await?;
self.socket.read().await.as_ref().unwrap().send(packet).await?;
self.socket
.read()
.await
.as_ref()
.unwrap()
.send(packet)
.await?;
tracing::debug!("WriteToNetwork done");
res_dat = &[];
continue
@ -143,8 +147,7 @@ impl PeerPcb {
pub async fn update_timers(&self, dst: &mut [u8]) -> Result<(), Error> {
match self.tunnel.write().await.update_timers(dst) {
TunnResult::Done => {}
TunnResult::Err(WireGuardError::ConnectionExpired) => {
}
TunnResult::Err(WireGuardError::ConnectionExpired) => {}
TunnResult::Err(e) => {
tracing::error!(message = "Update timers error", error = ?e)
}