From 4f2337ef316c841bc16b173081fa3a9f38217625 Mon Sep 17 00:00:00 2001 From: dav Date: Sat, 13 Jul 2024 16:54:57 -0700 Subject: [PATCH] Integrate tunnel status streaming --- burrow-gtk/src/components/app.rs | 20 +----- burrow-gtk/src/components/mod.rs | 1 + burrow-gtk/src/components/switch_screen.rs | 78 ++++++++++++++++++---- burrow-gtk/src/daemon.rs | 17 +++++ burrow-gtk/src/main.rs | 1 + 5 files changed, 85 insertions(+), 32 deletions(-) create mode 100644 burrow-gtk/src/daemon.rs diff --git a/burrow-gtk/src/components/app.rs b/burrow-gtk/src/components/app.rs index 7bc9084..6d64fa0 100644 --- a/burrow-gtk/src/components/app.rs +++ b/burrow-gtk/src/components/app.rs @@ -1,13 +1,7 @@ use super::*; use anyhow::Context; -use anyhow::Result; -use hyper_util::rt::TokioIo; use std::time::Duration; -use tokio::net::UnixStream; -use tonic::transport::{Channel, Endpoint, Uri}; -use tower::service_fn; -const BURROW_RPC_SOCKET_PATH: &str = "/run/burrow.sock"; const RECONNECT_POLL_TIME: Duration = Duration::from_secs(5); pub struct App { @@ -65,7 +59,7 @@ impl AsyncComponent for App { sender: AsyncComponentSender, ) -> AsyncComponentParts { // TODO: RPC REFACTOR (Handle Error) - let daemon_client = Arc::new(Mutex::new(Some(daemon_connect().await.unwrap()))); + let daemon_client = Arc::new(Mutex::new(Some(daemon::daemon_connect().await.unwrap()))); let switch_screen = switch_screen::SwitchScreen::builder() .launch(switch_screen::SwitchScreenInit { @@ -146,7 +140,7 @@ impl AsyncComponent for App { } if disconnected_daemon_client || daemon_client.is_none() { - match daemon_connect().await { + match daemon::daemon_connect().await { Ok(new_daemon_client) => { *daemon_client = Some(new_daemon_client); self.switch_screen @@ -163,13 +157,3 @@ impl AsyncComponent for App { } } } - -pub async fn daemon_connect() -> Result { - Ok(Endpoint::try_from("http://[::]:50051")? - .connect_with_connector(service_fn(|_: Uri| async { - Ok::<_, std::io::Error>(TokioIo::new( - UnixStream::connect(BURROW_RPC_SOCKET_PATH).await?, - )) - })) - .await?) -} diff --git a/burrow-gtk/src/components/mod.rs b/burrow-gtk/src/components/mod.rs index 21c51e5..7490b82 100644 --- a/burrow-gtk/src/components/mod.rs +++ b/burrow-gtk/src/components/mod.rs @@ -3,6 +3,7 @@ use adw::prelude::*; use gtk::Align; use relm4::{ component::{ + worker::{Worker, WorkerController}, AsyncComponent, AsyncComponentController, AsyncComponentParts, AsyncComponentSender, AsyncController, }, diff --git a/burrow-gtk/src/components/switch_screen.rs b/burrow-gtk/src/components/switch_screen.rs index 7155d9f..7258854 100644 --- a/burrow-gtk/src/components/switch_screen.rs +++ b/burrow-gtk/src/components/switch_screen.rs @@ -1,10 +1,15 @@ use super::*; +use std::time::Duration; + +const RECONNECT_POLL_TIME: Duration = Duration::from_secs(3); pub struct SwitchScreen { daemon_client: Arc>>, switch: gtk::Switch, switch_screen: gtk::Box, disconnected_banner: adw::Banner, + + _tunnel_state_worker: WorkerController, } pub struct SwitchScreenInit { @@ -13,10 +18,13 @@ pub struct SwitchScreenInit { #[derive(Debug, PartialEq, Eq)] pub enum SwitchScreenMsg { + None, DaemonReconnect, DaemonDisconnect, Start, Stop, + SwitchSetStart, + SwitchSetStop, } #[relm4::component(pub, async)] @@ -24,7 +32,7 @@ impl AsyncComponent for SwitchScreen { type Init = SwitchScreenInit; type Input = SwitchScreenMsg; type Output = (); - type CommandOutput = (); + type CommandOutput = SwitchScreenMsg; view! { gtk::Box { @@ -61,7 +69,7 @@ impl AsyncComponent for SwitchScreen { set_hexpand: false, set_vexpand: false, connect_active_notify => move |switch| - sender.input(if switch.is_active() { SwitchScreenMsg::Start } else { SwitchScreenMsg::Stop }) + switch_sender.input(if switch.is_active() { SwitchScreenMsg::Start } else { SwitchScreenMsg::Stop }) }, } } @@ -72,29 +80,25 @@ impl AsyncComponent for SwitchScreen { root: Self::Root, sender: AsyncComponentSender, ) -> AsyncComponentParts { - let mut initial_switch_status = false; let mut initial_daemon_server_down = false; if let Some(daemon_client) = init.daemon_client.lock().await.as_mut() { let mut client = tunnel_client::TunnelClient::new(daemon_client); - if let Ok(res) = client.tunnel_status(burrow_rpc::Empty {}).await.as_mut() { - // TODO: RPC REFACTOR (Handle Error) - let res = res.get_mut().message().await.unwrap().unwrap(); - initial_switch_status = match res.state() { - burrow_rpc::State::Running => true, - burrow_rpc::State::Stopped => false, - }; - } else { + if client + .tunnel_status(burrow_rpc::Empty {}) + .await + .as_mut() + .is_err() + { initial_daemon_server_down = true; } } else { initial_daemon_server_down = true; } + let switch_sender = sender.clone(); let widgets = view_output!(); - widgets.switch.set_active(initial_switch_status); - if initial_daemon_server_down { *init.daemon_client.lock().await = None; widgets.switch.set_active(false); @@ -107,8 +111,13 @@ impl AsyncComponent for SwitchScreen { switch: widgets.switch.clone(), switch_screen: widgets.switch_screen.clone(), disconnected_banner: widgets.setup_banner.clone(), + _tunnel_state_worker: AsyncTunnelStateHandler::builder() + .detach_worker(()) + .forward(sender.input_sender(), |_| SwitchScreenMsg::None), }; + widgets.switch.set_active(false); + AsyncComponentParts { model, widgets } } @@ -133,6 +142,12 @@ impl AsyncComponent for SwitchScreen { disconnected_daemon_client = true; } } + Self::Input::SwitchSetStart => { + self.switch.set_active(true); + } + Self::Input::SwitchSetStop => { + self.switch.set_active(false); + } _ => {} } } else { @@ -146,9 +161,44 @@ impl AsyncComponent for SwitchScreen { if disconnected_daemon_client || msg == Self::Input::DaemonDisconnect { *self.daemon_client.lock().await = None; - self.switch.set_active(false); self.switch_screen.set_sensitive(false); self.disconnected_banner.set_revealed(true); } } } +struct AsyncTunnelStateHandler; + +impl Worker for AsyncTunnelStateHandler { + type Init = (); + type Input = (); + type Output = SwitchScreenMsg; + + fn init(_: Self::Init, _sender: ComponentSender) -> Self { + Self + } + + fn update(&mut self, _: (), sender: ComponentSender) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let task = rt.spawn(async move { + loop { + let conn = daemon::daemon_connect().await; + if let Ok(conn) = conn { + let mut client = tunnel_client::TunnelClient::new(conn); + if let Ok(mut res) = client.tunnel_status(burrow_rpc::Empty {}).await { + let stream = res.get_mut(); + while let Ok(Some(msg)) = stream.message().await { + sender + .output(match msg.state() { + burrow_rpc::State::Running => SwitchScreenMsg::SwitchSetStart, + burrow_rpc::State::Stopped => SwitchScreenMsg::SwitchSetStop, + }) + .unwrap(); + } + } + } + tokio::time::sleep(RECONNECT_POLL_TIME).await; + } + }); + rt.block_on(task).unwrap(); + } +} diff --git a/burrow-gtk/src/daemon.rs b/burrow-gtk/src/daemon.rs new file mode 100644 index 0000000..dc47f24 --- /dev/null +++ b/burrow-gtk/src/daemon.rs @@ -0,0 +1,17 @@ +use anyhow::Result; +use hyper_util::rt::TokioIo; +use tokio::net::UnixStream; +use tonic::transport::{Channel, Endpoint, Uri}; +use tower::service_fn; + +const BURROW_RPC_SOCKET_PATH: &str = "/run/burrow.sock"; + +pub async fn daemon_connect() -> Result { + Ok(Endpoint::try_from("http://[::]:50051")? + .connect_with_connector(service_fn(|_: Uri| async { + Ok::<_, std::io::Error>(TokioIo::new( + UnixStream::connect(BURROW_RPC_SOCKET_PATH).await?, + )) + })) + .await?) +} diff --git a/burrow-gtk/src/main.rs b/burrow-gtk/src/main.rs index 6f91e2a..66e6b36 100644 --- a/burrow-gtk/src/main.rs +++ b/burrow-gtk/src/main.rs @@ -1,6 +1,7 @@ use anyhow::Result; pub mod components; +mod daemon; mod diag; // Generated using meson