Integrate tunnel status streaming

This commit is contained in:
dav 2024-07-13 16:54:57 -07:00
parent 827f0e65dc
commit 4f2337ef31
5 changed files with 85 additions and 32 deletions

View file

@ -1,13 +1,7 @@
use super::*; use super::*;
use anyhow::Context; use anyhow::Context;
use anyhow::Result;
use hyper_util::rt::TokioIo;
use std::time::Duration; use std::time::Duration;
use tokio::net::UnixStream;
use tonic::transport::{Channel, Endpoint, Uri};
use tower::service_fn;
const BURROW_RPC_SOCKET_PATH: &str = "/run/burrow.sock";
const RECONNECT_POLL_TIME: Duration = Duration::from_secs(5); const RECONNECT_POLL_TIME: Duration = Duration::from_secs(5);
pub struct App { pub struct App {
@ -65,7 +59,7 @@ impl AsyncComponent for App {
sender: AsyncComponentSender<Self>, sender: AsyncComponentSender<Self>,
) -> AsyncComponentParts<Self> { ) -> AsyncComponentParts<Self> {
// TODO: RPC REFACTOR (Handle Error) // TODO: RPC REFACTOR (Handle Error)
let daemon_client = Arc::new(Mutex::new(Some(daemon_connect().await.unwrap()))); let daemon_client = Arc::new(Mutex::new(Some(daemon::daemon_connect().await.unwrap())));
let switch_screen = switch_screen::SwitchScreen::builder() let switch_screen = switch_screen::SwitchScreen::builder()
.launch(switch_screen::SwitchScreenInit { .launch(switch_screen::SwitchScreenInit {
@ -146,7 +140,7 @@ impl AsyncComponent for App {
} }
if disconnected_daemon_client || daemon_client.is_none() { if disconnected_daemon_client || daemon_client.is_none() {
match daemon_connect().await { match daemon::daemon_connect().await {
Ok(new_daemon_client) => { Ok(new_daemon_client) => {
*daemon_client = Some(new_daemon_client); *daemon_client = Some(new_daemon_client);
self.switch_screen self.switch_screen
@ -163,13 +157,3 @@ impl AsyncComponent for App {
} }
} }
} }
pub async fn daemon_connect() -> Result<Channel> {
Ok(Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(service_fn(|_: Uri| async {
Ok::<_, std::io::Error>(TokioIo::new(
UnixStream::connect(BURROW_RPC_SOCKET_PATH).await?,
))
}))
.await?)
}

View file

@ -3,6 +3,7 @@ use adw::prelude::*;
use gtk::Align; use gtk::Align;
use relm4::{ use relm4::{
component::{ component::{
worker::{Worker, WorkerController},
AsyncComponent, AsyncComponentController, AsyncComponentParts, AsyncComponentSender, AsyncComponent, AsyncComponentController, AsyncComponentParts, AsyncComponentSender,
AsyncController, AsyncController,
}, },

View file

@ -1,10 +1,15 @@
use super::*; use super::*;
use std::time::Duration;
const RECONNECT_POLL_TIME: Duration = Duration::from_secs(3);
pub struct SwitchScreen { pub struct SwitchScreen {
daemon_client: Arc<Mutex<Option<Channel>>>, daemon_client: Arc<Mutex<Option<Channel>>>,
switch: gtk::Switch, switch: gtk::Switch,
switch_screen: gtk::Box, switch_screen: gtk::Box,
disconnected_banner: adw::Banner, disconnected_banner: adw::Banner,
_tunnel_state_worker: WorkerController<AsyncTunnelStateHandler>,
} }
pub struct SwitchScreenInit { pub struct SwitchScreenInit {
@ -13,10 +18,13 @@ pub struct SwitchScreenInit {
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub enum SwitchScreenMsg { pub enum SwitchScreenMsg {
None,
DaemonReconnect, DaemonReconnect,
DaemonDisconnect, DaemonDisconnect,
Start, Start,
Stop, Stop,
SwitchSetStart,
SwitchSetStop,
} }
#[relm4::component(pub, async)] #[relm4::component(pub, async)]
@ -24,7 +32,7 @@ impl AsyncComponent for SwitchScreen {
type Init = SwitchScreenInit; type Init = SwitchScreenInit;
type Input = SwitchScreenMsg; type Input = SwitchScreenMsg;
type Output = (); type Output = ();
type CommandOutput = (); type CommandOutput = SwitchScreenMsg;
view! { view! {
gtk::Box { gtk::Box {
@ -61,7 +69,7 @@ impl AsyncComponent for SwitchScreen {
set_hexpand: false, set_hexpand: false,
set_vexpand: false, set_vexpand: false,
connect_active_notify => move |switch| connect_active_notify => move |switch|
sender.input(if switch.is_active() { SwitchScreenMsg::Start } else { SwitchScreenMsg::Stop }) switch_sender.input(if switch.is_active() { SwitchScreenMsg::Start } else { SwitchScreenMsg::Stop })
}, },
} }
} }
@ -72,29 +80,25 @@ impl AsyncComponent for SwitchScreen {
root: Self::Root, root: Self::Root,
sender: AsyncComponentSender<Self>, sender: AsyncComponentSender<Self>,
) -> AsyncComponentParts<Self> { ) -> AsyncComponentParts<Self> {
let mut initial_switch_status = false;
let mut initial_daemon_server_down = false; let mut initial_daemon_server_down = false;
if let Some(daemon_client) = init.daemon_client.lock().await.as_mut() { if let Some(daemon_client) = init.daemon_client.lock().await.as_mut() {
let mut client = tunnel_client::TunnelClient::new(daemon_client); let mut client = tunnel_client::TunnelClient::new(daemon_client);
if let Ok(res) = client.tunnel_status(burrow_rpc::Empty {}).await.as_mut() { if client
// TODO: RPC REFACTOR (Handle Error) .tunnel_status(burrow_rpc::Empty {})
let res = res.get_mut().message().await.unwrap().unwrap(); .await
initial_switch_status = match res.state() { .as_mut()
burrow_rpc::State::Running => true, .is_err()
burrow_rpc::State::Stopped => false, {
};
} else {
initial_daemon_server_down = true; initial_daemon_server_down = true;
} }
} else { } else {
initial_daemon_server_down = true; initial_daemon_server_down = true;
} }
let switch_sender = sender.clone();
let widgets = view_output!(); let widgets = view_output!();
widgets.switch.set_active(initial_switch_status);
if initial_daemon_server_down { if initial_daemon_server_down {
*init.daemon_client.lock().await = None; *init.daemon_client.lock().await = None;
widgets.switch.set_active(false); widgets.switch.set_active(false);
@ -107,8 +111,13 @@ impl AsyncComponent for SwitchScreen {
switch: widgets.switch.clone(), switch: widgets.switch.clone(),
switch_screen: widgets.switch_screen.clone(), switch_screen: widgets.switch_screen.clone(),
disconnected_banner: widgets.setup_banner.clone(), disconnected_banner: widgets.setup_banner.clone(),
_tunnel_state_worker: AsyncTunnelStateHandler::builder()
.detach_worker(())
.forward(sender.input_sender(), |_| SwitchScreenMsg::None),
}; };
widgets.switch.set_active(false);
AsyncComponentParts { model, widgets } AsyncComponentParts { model, widgets }
} }
@ -133,6 +142,12 @@ impl AsyncComponent for SwitchScreen {
disconnected_daemon_client = true; disconnected_daemon_client = true;
} }
} }
Self::Input::SwitchSetStart => {
self.switch.set_active(true);
}
Self::Input::SwitchSetStop => {
self.switch.set_active(false);
}
_ => {} _ => {}
} }
} else { } else {
@ -146,9 +161,44 @@ impl AsyncComponent for SwitchScreen {
if disconnected_daemon_client || msg == Self::Input::DaemonDisconnect { if disconnected_daemon_client || msg == Self::Input::DaemonDisconnect {
*self.daemon_client.lock().await = None; *self.daemon_client.lock().await = None;
self.switch.set_active(false);
self.switch_screen.set_sensitive(false); self.switch_screen.set_sensitive(false);
self.disconnected_banner.set_revealed(true); self.disconnected_banner.set_revealed(true);
} }
} }
} }
struct AsyncTunnelStateHandler;
impl Worker for AsyncTunnelStateHandler {
type Init = ();
type Input = ();
type Output = SwitchScreenMsg;
fn init(_: Self::Init, _sender: ComponentSender<Self>) -> Self {
Self
}
fn update(&mut self, _: (), sender: ComponentSender<Self>) {
let rt = tokio::runtime::Runtime::new().unwrap();
let task = rt.spawn(async move {
loop {
let conn = daemon::daemon_connect().await;
if let Ok(conn) = conn {
let mut client = tunnel_client::TunnelClient::new(conn);
if let Ok(mut res) = client.tunnel_status(burrow_rpc::Empty {}).await {
let stream = res.get_mut();
while let Ok(Some(msg)) = stream.message().await {
sender
.output(match msg.state() {
burrow_rpc::State::Running => SwitchScreenMsg::SwitchSetStart,
burrow_rpc::State::Stopped => SwitchScreenMsg::SwitchSetStop,
})
.unwrap();
}
}
}
tokio::time::sleep(RECONNECT_POLL_TIME).await;
}
});
rt.block_on(task).unwrap();
}
}

17
burrow-gtk/src/daemon.rs Normal file
View file

@ -0,0 +1,17 @@
use anyhow::Result;
use hyper_util::rt::TokioIo;
use tokio::net::UnixStream;
use tonic::transport::{Channel, Endpoint, Uri};
use tower::service_fn;
const BURROW_RPC_SOCKET_PATH: &str = "/run/burrow.sock";
pub async fn daemon_connect() -> Result<Channel> {
Ok(Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(service_fn(|_: Uri| async {
Ok::<_, std::io::Error>(TokioIo::new(
UnixStream::connect(BURROW_RPC_SOCKET_PATH).await?,
))
}))
.await?)
}

View file

@ -1,6 +1,7 @@
use anyhow::Result; use anyhow::Result;
pub mod components; pub mod components;
mod daemon;
mod diag; mod diag;
// Generated using meson // Generated using meson