diff --git a/Scripts/run-tailnet-connectivity-smoke.sh b/Scripts/run-tailnet-connectivity-smoke.sh new file mode 100755 index 0000000..f3053d3 --- /dev/null +++ b/Scripts/run-tailnet-connectivity-smoke.sh @@ -0,0 +1,186 @@ +#!/usr/bin/env bash +set -euo pipefail + +repo_root="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +bundle_id="${BURROW_UI_TEST_APP_BUNDLE_ID:-com.hackclub.burrow}" +smoke_root="${BURROW_TAILNET_SMOKE_ROOT:-/tmp/burrow-tailnet-connectivity}" +socket_path="${smoke_root}/burrow.sock" +db_path="${smoke_root}/burrow.db" +daemon_log="${BURROW_TAILNET_SMOKE_DAEMON_LOG:-${smoke_root}/daemon.log}" +payload_path="${smoke_root}/tailnet.json" +authority="${BURROW_TAILNET_SMOKE_AUTHORITY:-https://ts.burrow.net}" +account_name="${BURROW_TAILNET_SMOKE_ACCOUNT:-ui-test}" +identity_name="${BURROW_TAILNET_SMOKE_IDENTITY:-apple}" +hostname="${BURROW_TAILNET_SMOKE_HOSTNAME:-burrow-apple}" +message="${BURROW_TAILNET_SMOKE_MESSAGE:-burrow-tailnet-smoke}" +timeout_ms="${BURROW_TAILNET_SMOKE_TIMEOUT_MS:-8000}" +remote_ip="${BURROW_TAILNET_SMOKE_REMOTE_IP:-}" +remote_port="${BURROW_TAILNET_SMOKE_REMOTE_PORT:-18081}" +remote_hostname="${BURROW_TAILNET_SMOKE_REMOTE_HOSTNAME:-burrow-echo}" +remote_authkey="${BURROW_TAILNET_SMOKE_REMOTE_AUTHKEY:-}" +helper_bin="${BURROW_TAILNET_SMOKE_HELPER_BIN:-${smoke_root}/tailscale-login-bridge}" +remote_state_root="${BURROW_TAILNET_SMOKE_REMOTE_STATE_ROOT:-${smoke_root}/remote-state}" +remote_stdout="${smoke_root}/remote-helper.stdout" +remote_stderr="${BURROW_TAILNET_SMOKE_REMOTE_LOG:-${smoke_root}/remote-helper.log}" + +if [[ -n "${TS_AUTHKEY:-}" ]]; then + default_tailnet_state_root="${smoke_root}/local-state" +else + default_tailnet_state_root="/tmp/${bundle_id}/SimulatorTailnetState" +fi +tailnet_state_root="${BURROW_TAILNET_STATE_ROOT:-${default_tailnet_state_root}}" + +need_login=0 +if [[ -z "${TS_AUTHKEY:-}" ]] && { [[ ! -d "$tailnet_state_root" ]] || [[ -z "$(find "$tailnet_state_root" -mindepth 1 -maxdepth 2 -print -quit 2>/dev/null)" ]]; }; then + need_login=1 +fi + +if [[ "$need_login" -eq 1 ]]; then + echo "Tailnet state root is empty; running iOS login bootstrap first..." + "${repo_root}/Scripts/run-ios-tailnet-ui-tests.sh" +fi + +rm -rf "$smoke_root" +mkdir -p "$smoke_root" + +cleanup() { + rm -f "$payload_path" + if [[ -n "${daemon_pid:-}" ]]; then + kill "$daemon_pid" >/dev/null 2>&1 || true + wait "$daemon_pid" >/dev/null 2>&1 || true + fi + if [[ -n "${remote_pid:-}" ]]; then + kill "$remote_pid" >/dev/null 2>&1 || true + wait "$remote_pid" >/dev/null 2>&1 || true + fi +} +trap cleanup EXIT + +wait_for_helper_listen() { + python3 - <<'PY' "$1" +import json +import pathlib +import sys +import time + +path = pathlib.Path(sys.argv[1]) +deadline = time.time() + 20 +while time.time() < deadline: + if path.exists(): + with path.open("r", encoding="utf-8") as handle: + line = handle.readline().strip() + if line: + hello = json.loads(line) + print(hello["listen_addr"]) + raise SystemExit(0) + time.sleep(0.1) +raise SystemExit("timed out waiting for helper startup line") +PY +} + +wait_for_helper_ip() { + python3 - <<'PY' "$1" +import json +import sys +import time +import urllib.request + +url = sys.argv[1] +deadline = time.time() + 30 +while time.time() < deadline: + with urllib.request.urlopen(url, timeout=5) as response: + status = json.load(response) + if status.get("running") and status.get("tailscale_ips"): + print(status["tailscale_ips"][0]) + raise SystemExit(0) + time.sleep(0.25) +raise SystemExit("timed out waiting for helper to become ready") +PY +} + +python3 - <<'PY' "$payload_path" "$authority" "$account_name" "$identity_name" "$hostname" +import json +import pathlib +import sys + +path = pathlib.Path(sys.argv[1]) +payload = { + "authority": sys.argv[2], + "account": sys.argv[3], + "identity": sys.argv[4], + "hostname": sys.argv[5], +} +path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") +PY + +cargo build -p burrow --bin burrow +( + cd "${repo_root}/Tools/tailscale-login-bridge" + GOWORK=off go build -o "$helper_bin" . +) + +if [[ -z "$remote_ip" ]]; then + if [[ -z "$remote_authkey" ]] && { [[ ! -d "$remote_state_root" ]] || [[ -z "$(find "$remote_state_root" -mindepth 1 -maxdepth 1 -print -quit 2>/dev/null)" ]]; }; then + echo "error: set BURROW_TAILNET_SMOKE_REMOTE_IP, BURROW_TAILNET_SMOKE_REMOTE_AUTHKEY, or BURROW_TAILNET_SMOKE_REMOTE_STATE_ROOT to an existing logged-in helper state" >&2 + exit 1 + fi + + if [[ -n "$remote_authkey" ]]; then + rm -rf "$remote_state_root" + mkdir -p "$remote_state_root" + fi + + ( + cd "$repo_root" + if [[ -n "$remote_authkey" ]]; then + export TS_AUTHKEY="$remote_authkey" + fi + "$helper_bin" \ + --listen 127.0.0.1:0 \ + --state-dir "$remote_state_root" \ + --hostname "$remote_hostname" \ + --control-url "$authority" \ + --udp-echo-port "$remote_port" \ + >"$remote_stdout" 2>"$remote_stderr" + ) & + remote_pid=$! + + remote_listen_addr="$(wait_for_helper_listen "$remote_stdout")" + remote_ip="$(wait_for_helper_ip "http://${remote_listen_addr}/status")" +fi + +( + cd "$smoke_root" + RUST_LOG="${BURROW_TAILNET_SMOKE_RUST_LOG:-info,burrow=debug}" \ + BURROW_SOCKET_PATH="$socket_path" \ + BURROW_TAILSCALE_STATE_ROOT="$tailnet_state_root" \ + "${repo_root}/target/debug/burrow" daemon >"$daemon_log" 2>&1 +) & +daemon_pid=$! + +for _ in $(seq 1 50); do + [[ -S "$socket_path" ]] && break + sleep 0.2 +done + +if [[ ! -S "$socket_path" ]]; then + echo "error: Burrow daemon did not create ${socket_path}" >&2 + [[ -f "$daemon_log" ]] && cat "$daemon_log" >&2 + exit 1 +fi + +run_burrow() { + BURROW_SOCKET_PATH="$socket_path" \ + BURROW_TAILSCALE_STATE_ROOT="$tailnet_state_root" \ + "${repo_root}/target/debug/burrow" "$@" +} + +run_burrow network-add 1 1 "$payload_path" +run_burrow start +run_burrow tunnel-config +run_burrow tailnet-udp-echo "${remote_ip}:${remote_port}" --message "$message" --timeout-ms "$timeout_ms" + +echo +echo "Tailnet connectivity smoke passed." +echo "State root: $tailnet_state_root" +echo "Remote: ${remote_ip}:${remote_port}" diff --git a/Tools/tailscale-login-bridge/main.go b/Tools/tailscale-login-bridge/main.go index 82ca9b0..877d0e4 100644 --- a/Tools/tailscale-login-bridge/main.go +++ b/Tools/tailscale-login-bridge/main.go @@ -2,17 +2,26 @@ package main import ( "context" + "encoding/binary" "encoding/json" + "errors" "flag" "fmt" + "io" "log" "net" + "net/netip" "net/http" "os" + "strconv" + "sync" "time" + "github.com/tailscale/wireguard-go/tun" "tailscale.com/client/local" "tailscale.com/ipn" + "tailscale.com/ipn/ipnstate" + "tailscale.com/tailcfg" "tailscale.com/tsnet" ) @@ -26,13 +35,123 @@ type statusResponse struct { SelfDNSName string `json:"self_dns_name,omitempty"` TailscaleIPs []string `json:"tailscale_ips,omitempty"` Health []string `json:"health,omitempty"` + Peers []peerSummary `json:"peers,omitempty"` } +type peerSummary struct { + Name string `json:"name,omitempty"` + DNSName string `json:"dns_name,omitempty"` + TailscaleIPs []string `json:"tailscale_ips,omitempty"` + Online bool `json:"online"` + Active bool `json:"active"` + Relay string `json:"relay,omitempty"` + CurAddr string `json:"cur_addr,omitempty"` + LastSeenUnix int64 `json:"last_seen_unix,omitempty"` +} + +type pingResponse struct { + Result *ipnstate.PingResult `json:"result,omitempty"` +} + +type helperHello struct { + ListenAddr string `json:"listen_addr"` + PacketSocket string `json:"packet_socket,omitempty"` +} + +type helperState struct { + mu sync.RWMutex + authURL string +} + +func (s *helperState) authURLSnapshot() string { + s.mu.RLock() + defer s.mu.RUnlock() + return s.authURL +} + +func (s *helperState) setAuthURL(url string) { + s.mu.Lock() + defer s.mu.Unlock() + s.authURL = url +} + +func (s *helperState) clearAuthURL() { + s.setAuthURL("") +} + +// chanTUN is a tun.Device backed by channels so another process can feed and +// consume raw IP packets while tsnet handles the Tailnet control/data plane. +type chanTUN struct { + Inbound chan []byte + Outbound chan []byte + closed chan struct{} + events chan tun.Event +} + +func newChanTUN() *chanTUN { + t := &chanTUN{ + Inbound: make(chan []byte, 1024), + Outbound: make(chan []byte, 1024), + closed: make(chan struct{}), + events: make(chan tun.Event, 1), + } + t.events <- tun.EventUp + return t +} + +func (t *chanTUN) File() *os.File { return nil } + +func (t *chanTUN) Close() error { + select { + case <-t.closed: + default: + close(t.closed) + close(t.Inbound) + } + return nil +} + +func (t *chanTUN) Read(bufs [][]byte, sizes []int, offset int) (int, error) { + select { + case <-t.closed: + return 0, io.EOF + case pkt, ok := <-t.Outbound: + if !ok { + return 0, io.EOF + } + sizes[0] = copy(bufs[0][offset:], pkt) + return 1, nil + } +} + +func (t *chanTUN) Write(bufs [][]byte, offset int) (int, error) { + for _, buf := range bufs { + pkt := buf[offset:] + if len(pkt) == 0 { + continue + } + select { + case <-t.closed: + return 0, errors.New("closed") + case t.Inbound <- append([]byte(nil), pkt...): + default: + } + } + return len(bufs), nil +} + +func (t *chanTUN) MTU() (int, error) { return 1280, nil } +func (t *chanTUN) Name() (string, error) { return "burrow-tailnet", nil } +func (t *chanTUN) Events() <-chan tun.Event { return t.events } +func (t *chanTUN) BatchSize() int { return 1 } + func main() { listen := flag.String("listen", "127.0.0.1:0", "local listen address") stateDir := flag.String("state-dir", "", "persistent state directory") hostname := flag.String("hostname", "burrow-apple", "tailnet hostname") controlURL := flag.String("control-url", "", "optional control URL") + packetSocket := flag.String("packet-socket", "", "optional unix socket path for raw packet bridging") + udpEchoPort := flag.Int("udp-echo-port", 0, "optional tailnet UDP echo port") flag.Parse() if *stateDir == "" { @@ -48,6 +167,24 @@ func main() { Hostname: *hostname, UserLogf: log.Printf, } + + var tunDevice *chanTUN + var packetListener net.Listener + if *packetSocket != "" { + _ = os.Remove(*packetSocket) + ln, err := net.Listen("unix", *packetSocket) + if err != nil { + log.Fatalf("packet listen: %v", err) + } + packetListener = ln + defer func() { + packetListener.Close() + _ = os.Remove(*packetSocket) + }() + + tunDevice = newChanTUN() + server.Tun = tunDevice + } if *controlURL != "" { server.ControlURL = *controlURL } @@ -61,6 +198,7 @@ func main() { if err != nil { log.Fatalf("local client: %v", err) } + state := &helperState{} ln, err := net.Listen("tcp", *listen) if err != nil { @@ -68,12 +206,27 @@ func main() { } defer ln.Close() - fmt.Printf("{\"listen_addr\":%q}\n", ln.Addr().String()) + if packetListener != nil { + go servePacketBridge(packetListener, tunDevice) + } + if *udpEchoPort > 0 { + go serveUDPEcho(context.Background(), server, localClient, *udpEchoPort) + } + + hello := helperHello{ + ListenAddr: ln.Addr().String(), + } + if *packetSocket != "" { + hello.PacketSocket = *packetSocket + } + if err := json.NewEncoder(os.Stdout).Encode(hello); err != nil { + log.Fatalf("write hello: %v", err) + } _ = os.Stdout.Sync() mux := http.NewServeMux() mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { - status, err := snapshot(r.Context(), localClient) + status, err := snapshot(r.Context(), localClient, state) if err != nil { http.Error(w, err.Error(), http.StatusBadGateway) return @@ -81,6 +234,40 @@ func main() { w.Header().Set("content-type", "application/json") _ = json.NewEncoder(w).Encode(status) }) + mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) { + ip := r.URL.Query().Get("ip") + if ip == "" { + http.Error(w, "missing ip", http.StatusBadRequest) + return + } + target, err := netip.ParseAddr(ip) + if err != nil { + http.Error(w, fmt.Sprintf("invalid ip: %v", err), http.StatusBadRequest) + return + } + + pingType := tailcfg.PingTSMP + switch r.URL.Query().Get("type") { + case "", "tsmp", "TSMP": + pingType = tailcfg.PingTSMP + case "icmp", "ICMP": + pingType = tailcfg.PingICMP + case "peerapi": + pingType = tailcfg.PingPeerAPI + default: + http.Error(w, "unsupported ping type", http.StatusBadRequest) + return + } + + result, err := localClient.Ping(r.Context(), target, pingType) + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + + w.Header().Set("content-type", "application/json") + _ = json.NewEncoder(w).Encode(&pingResponse{Result: result}) + }) mux.HandleFunc("/shutdown", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) go func() { @@ -96,16 +283,110 @@ func main() { log.Fatal(httpServer.Serve(ln)) } -func snapshot(ctx context.Context, localClient *local.Client) (*statusResponse, error) { - status, err := localClient.StatusWithoutPeers(ctx) +func servePacketBridge(listener net.Listener, device *chanTUN) { + for { + conn, err := listener.Accept() + if err != nil { + if errors.Is(err, net.ErrClosed) { + return + } + log.Printf("packet accept: %v", err) + continue + } + log.Printf("packet bridge connected") + if err := bridgePacketConn(conn, device); err != nil && !errors.Is(err, io.EOF) { + log.Printf("packet bridge error: %v", err) + } + _ = conn.Close() + log.Printf("packet bridge disconnected") + } +} + +func bridgePacketConn(conn net.Conn, device *chanTUN) error { + errCh := make(chan error, 2) + + go func() { + for { + pkt, err := readFrame(conn) + if err != nil { + errCh <- err + return + } + select { + case <-device.closed: + errCh <- io.EOF + return + case device.Outbound <- pkt: + } + } + }() + + go func() { + for { + select { + case <-device.closed: + errCh <- io.EOF + return + case pkt, ok := <-device.Inbound: + if !ok { + errCh <- io.EOF + return + } + if err := writeFrame(conn, pkt); err != nil { + errCh <- err + return + } + } + } + }() + + return <-errCh +} + +func readFrame(r io.Reader) ([]byte, error) { + var size [4]byte + if _, err := io.ReadFull(r, size[:]); err != nil { + return nil, err + } + length := binary.BigEndian.Uint32(size[:]) + if length == 0 { + return []byte{}, nil + } + packet := make([]byte, length) + if _, err := io.ReadFull(r, packet); err != nil { + return nil, err + } + return packet, nil +} + +func writeFrame(w io.Writer, packet []byte) error { + var size [4]byte + binary.BigEndian.PutUint32(size[:], uint32(len(packet))) + if _, err := w.Write(size[:]); err != nil { + return err + } + if len(packet) == 0 { + return nil + } + _, err := w.Write(packet) + return err +} + +func snapshot(ctx context.Context, localClient *local.Client, state *helperState) (*statusResponse, error) { + status, err := localClient.Status(ctx) if err != nil { return nil, err } - if (status.BackendState == ipn.NeedsLogin.String() || status.BackendState == ipn.NoState.String()) && status.AuthURL == "" { - if err := localClient.StartLoginInteractive(ctx); err != nil { - return nil, err - } - status, err = localClient.StatusWithoutPeers(ctx) + + authURL := status.AuthURL + if authURL == "" { + authURL = state.authURLSnapshot() + } + if status.BackendState == ipn.Running.String() { + state.clearAuthURL() + authURL = "" + } else if (status.BackendState == ipn.NeedsLogin.String() || status.BackendState == ipn.NoState.String()) && authURL == "" { + authURL, err = awaitAuthURL(ctx, localClient, state) if err != nil { return nil, err } @@ -113,7 +394,7 @@ func snapshot(ctx context.Context, localClient *local.Client) (*statusResponse, response := &statusResponse{ BackendState: status.BackendState, - AuthURL: status.AuthURL, + AuthURL: authURL, Running: status.BackendState == ipn.Running.String(), NeedsLogin: status.BackendState == ipn.NeedsLogin.String(), Health: append([]string(nil), status.Health...), @@ -129,5 +410,114 @@ func snapshot(ctx context.Context, localClient *local.Client) (*statusResponse, for _, ip := range status.TailscaleIPs { response.TailscaleIPs = append(response.TailscaleIPs, ip.String()) } + for _, key := range status.Peers() { + peer := status.Peer[key] + if peer == nil { + continue + } + summary := peerSummary{ + Name: peer.HostName, + DNSName: peer.DNSName, + Online: peer.Online, + Active: peer.Active, + Relay: peer.Relay, + CurAddr: peer.CurAddr, + LastSeenUnix: peer.LastSeen.Unix(), + } + for _, ip := range peer.TailscaleIPs { + summary.TailscaleIPs = append(summary.TailscaleIPs, ip.String()) + } + response.Peers = append(response.Peers, summary) + } return response, nil } + +func serveUDPEcho(ctx context.Context, server *tsnet.Server, localClient *local.Client, port int) { + ip, err := awaitTailscaleIP(ctx, localClient) + if err != nil { + log.Printf("udp echo setup failed: %v", err) + return + } + + listenAddr := net.JoinHostPort(ip.String(), strconv.Itoa(port)) + pc, err := server.ListenPacket("udp", listenAddr) + if err != nil { + log.Printf("udp echo listen failed on %s: %v", listenAddr, err) + return + } + defer pc.Close() + + log.Printf("udp echo listening on %s", pc.LocalAddr()) + buf := make([]byte, 64<<10) + for { + n, addr, err := pc.ReadFrom(buf) + if err != nil { + if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) { + return + } + log.Printf("udp echo read failed: %v", err) + return + } + if _, err := pc.WriteTo(buf[:n], addr); err != nil { + log.Printf("udp echo write failed: %v", err) + return + } + } +} + +func awaitTailscaleIP(ctx context.Context, localClient *local.Client) (netip.Addr, error) { + for range 60 { + status, err := localClient.StatusWithoutPeers(ctx) + if err == nil { + for _, ip := range status.TailscaleIPs { + if ip.Is4() { + return ip, nil + } + } + for _, ip := range status.TailscaleIPs { + if ip.Is6() { + return ip, nil + } + } + } + select { + case <-ctx.Done(): + return netip.Addr{}, ctx.Err() + case <-time.After(250 * time.Millisecond): + } + } + return netip.Addr{}, errors.New("timed out waiting for tailscale IP") +} + +func awaitAuthURL(ctx context.Context, localClient *local.Client, state *helperState) (string, error) { + watchCtx, cancel := context.WithTimeout(ctx, 8*time.Second) + defer cancel() + + watcher, err := localClient.WatchIPNBus(watchCtx, ipn.NotifyInitialState) + if err != nil { + return "", err + } + defer watcher.Close() + + if err := localClient.StartLoginInteractive(ctx); err != nil { + return "", err + } + + for { + notify, err := watcher.Next() + if err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return state.authURLSnapshot(), nil + } + return "", err + } + if notify.BrowseToURL != nil && *notify.BrowseToURL != "" { + state.setAuthURL(*notify.BrowseToURL) + return *notify.BrowseToURL, nil + } + if notify.State != nil && *notify.State == ipn.Running { + state.clearAuthURL() + return "", nil + } + } +} diff --git a/burrow/src/main.rs b/burrow/src/main.rs index c91f36f..4ab7700 100644 --- a/burrow/src/main.rs +++ b/burrow/src/main.rs @@ -72,6 +72,14 @@ enum Commands { NetworkReorder(NetworkReorderArgs), /// Delete Network NetworkDelete(NetworkDeleteArgs), + /// Discover a Tailnet authority through the daemon + TailnetDiscover(TailnetDiscoverArgs), + /// Probe a Tailnet authority through the daemon + TailnetProbe(TailnetProbeArgs), + /// Send an ICMP echo probe through the active Tailnet tunnel over daemon packet streaming + TailnetPing(TailnetPingArgs), + /// Send a UDP echo probe through the active Tailnet tunnel over daemon packet streaming + TailnetUdpEcho(TailnetUdpEchoArgs), #[cfg(target_os = "linux")] /// Run a command in an unshared Linux namespace using a Burrow backend Exec(ExecArgs), @@ -110,6 +118,36 @@ struct NetworkDeleteArgs { id: i32, } +#[derive(Args)] +struct TailnetDiscoverArgs { + email: String, +} + +#[derive(Args)] +struct TailnetProbeArgs { + authority: String, +} + +#[cfg(any(target_os = "linux", target_vendor = "apple"))] +#[derive(Args)] +struct TailnetPingArgs { + remote: String, + #[arg(long, default_value = "burrow-tailnet-smoke")] + payload: String, + #[arg(long, default_value_t = 5000)] + timeout_ms: u64, +} + +#[cfg(any(target_os = "linux", target_vendor = "apple"))] +#[derive(Args)] +struct TailnetUdpEchoArgs { + remote: String, + #[arg(long, default_value = "burrow-tailnet-smoke")] + message: String, + #[arg(long, default_value_t = 5000)] + timeout_ms: u64, +} + #[cfg(target_os = "linux")] #[derive(Args)] struct TorExecArgs { @@ -240,6 +278,393 @@ async fn try_network_delete(id: i32) -> Result<()> { Ok(()) } +#[cfg(any(target_os = "linux", target_vendor = "apple"))] +async fn try_tailnet_discover(email: &str) -> Result<()> { + let mut client = BurrowClient::from_uds().await?; + let response = client + .tailnet_client + .discover(crate::daemon::rpc::grpc_defs::TailnetDiscoverRequest { + email: email.to_owned(), + }) + .await? + .into_inner(); + println!("Tailnet Discover Response: {:?}", response); + Ok(()) +} + +#[cfg(any(target_os = "linux", target_vendor = "apple"))] +async fn try_tailnet_probe(authority: &str) -> Result<()> { + let mut client = BurrowClient::from_uds().await?; + let response = client + .tailnet_client + .probe(crate::daemon::rpc::grpc_defs::TailnetProbeRequest { + authority: authority.to_owned(), + }) + .await? + .into_inner(); + println!("Tailnet Probe Response: {:?}", response); + Ok(()) +} + +#[cfg(any(target_os = "linux", target_vendor = "apple"))] +async fn try_tailnet_ping(remote: &str, payload: &str, timeout_ms: u64) -> Result<()> { + use std::net::IpAddr; + + use anyhow::Context; + use rand::Rng; + use tokio::{ + sync::mpsc, + time::{timeout, Duration}, + }; + use tokio_stream::wrappers::ReceiverStream; + + use crate::daemon::rpc::grpc_defs::{Empty, TunnelPacket}; + + let remote_ip: IpAddr = remote + .parse() + .with_context(|| format!("invalid remote IP address {remote}"))?; + let message = payload.as_bytes().to_vec(); + + let mut client = BurrowClient::from_uds().await?; + client.tunnel_client.tunnel_start(Empty {}).await?; + + let mut config_stream = client + .tunnel_client + .tunnel_configuration(Empty {}) + .await? + .into_inner(); + let config = config_stream + .message() + .await? + .context("tunnel configuration stream ended before yielding a config")?; + let local_ip = select_tailnet_local_ip(&config.addresses, remote_ip)?; + + let identifier = rand::thread_rng().gen::(); + let sequence = 1_u16; + let packet = build_icmp_echo_request(local_ip, remote_ip, identifier, sequence, &message)?; + + let (outbound_tx, outbound_rx) = mpsc::channel::(128); + let mut tunnel_packets = client + .tunnel_client + .tunnel_packets(ReceiverStream::new(outbound_rx)) + .await? + .into_inner(); + + outbound_tx + .send(TunnelPacket { payload: packet }) + .await + .context("failed to send ICMP echo probe into daemon packet stream")?; + log::debug!( + "tailnet ping probe queued from {local_ip} to {remote_ip} identifier={identifier} sequence={sequence}" + ); + drop(outbound_tx); + + let reply = timeout(Duration::from_millis(timeout_ms), async { + loop { + let packet = tunnel_packets + .message() + .await + .context("failed to read packet from daemon packet stream")? + .context("daemon packet stream ended before returning a reply")?; + log::debug!( + "tailnet ping received {} bytes from daemon packet stream", + packet.payload.len() + ); + if let Some(reply) = parse_icmp_echo_reply( + &packet.payload, + local_ip, + remote_ip, + identifier, + sequence, + )? { + break Ok::<_, anyhow::Error>(reply); + } + } + }) + .await + .with_context(|| format!("timed out waiting for ICMP echo reply from {remote_ip}"))??; + + println!("Tailnet Ping Source: {}", reply.source); + println!("Tailnet Ping Destination: {}", reply.destination); + println!( + "Tailnet Ping Payload: {}", + String::from_utf8_lossy(&reply.payload) + ); + Ok(()) +} + +#[cfg(any(target_os = "linux", target_vendor = "apple"))] +async fn try_tailnet_udp_echo(remote: &str, message: &str, timeout_ms: u64) -> Result<()> { + use std::net::SocketAddr; + + use anyhow::{bail, Context}; + use futures::{SinkExt, StreamExt}; + use netstack_smoltcp::StackBuilder; + use tokio::{ + sync::mpsc, + time::{timeout, Duration}, + }; + use tokio_stream::wrappers::ReceiverStream; + + use crate::daemon::rpc::grpc_defs::{Empty, TunnelPacket}; + + let remote_addr: SocketAddr = remote + .parse() + .with_context(|| format!("invalid remote socket address {remote}"))?; + + let mut client = BurrowClient::from_uds().await?; + client.tunnel_client.tunnel_start(Empty {}).await?; + + let mut config_stream = client + .tunnel_client + .tunnel_configuration(Empty {}) + .await? + .into_inner(); + let config = config_stream + .message() + .await? + .context("tunnel configuration stream ended before yielding a config")?; + let local_addr = select_tailnet_local_socket(&config.addresses, remote_addr.ip())?; + + let (stack, runner, udp_socket, _) = StackBuilder::default() + .enable_udp(true) + .enable_tcp(true) + .build() + .context("failed to build userspace UDP stack")?; + let runner = runner.context("userspace UDP stack runner unavailable")?; + let udp_socket = udp_socket.context("userspace UDP stack socket unavailable")?; + let (mut stack_sink, mut stack_stream) = stack.split(); + let (mut udp_reader, mut udp_writer) = udp_socket.split(); + + let (outbound_tx, outbound_rx) = mpsc::channel::(128); + let mut tunnel_packets = client + .tunnel_client + .tunnel_packets(ReceiverStream::new(outbound_rx)) + .await? + .into_inner(); + + let ingress_task = tokio::spawn(async move { + loop { + match tunnel_packets.message().await? { + Some(packet) => { + log::debug!( + "tailnet udp echo received {} bytes from daemon packet stream", + packet.payload.len() + ); + stack_sink + .send(packet.payload) + .await + .context("failed to feed inbound tailnet packet into userspace stack")?; + } + None => break, + } + } + Result::<()>::Ok(()) + }); + + let egress_task = tokio::spawn(async move { + while let Some(packet) = stack_stream.next().await { + let payload = + packet.context("failed to read outbound packet from userspace stack")?; + log::debug!( + "tailnet udp echo sending {} bytes into daemon packet stream", + payload.len() + ); + outbound_tx + .send(TunnelPacket { payload }) + .await + .context("failed to forward outbound tailnet packet to daemon")?; + } + Result::<()>::Ok(()) + }); + + let runner_task = tokio::spawn(async move { runner.await.map_err(anyhow::Error::from) }); + + udp_writer + .send((message.as_bytes().to_vec(), local_addr, remote_addr)) + .await + .context("failed to send UDP echo probe into userspace stack")?; + log::debug!( + "tailnet udp echo probe queued from {local_addr} to {remote_addr}" + ); + + let response = timeout(Duration::from_millis(timeout_ms), udp_reader.next()) + .await + .with_context(|| format!("timed out waiting for UDP echo from {remote_addr}"))? + .context("userspace UDP stack ended before returning a reply")?; + let (payload, reply_source, reply_destination) = response; + let response_text = String::from_utf8_lossy(&payload); + + ingress_task.abort(); + egress_task.abort(); + runner_task.abort(); + + if reply_source != remote_addr { + bail!("received UDP reply from unexpected source {reply_source}"); + } + if reply_destination != local_addr { + bail!("received UDP reply for unexpected local socket {reply_destination}"); + } + if payload != message.as_bytes() { + bail!("UDP echo payload mismatch"); + } + + println!("Tailnet UDP Echo Source: {reply_source}"); + println!("Tailnet UDP Echo Destination: {reply_destination}"); + println!("Tailnet UDP Echo Payload: {response_text}"); + Ok(()) +} + +#[cfg(any(target_os = "linux", target_vendor = "apple"))] +fn select_tailnet_local_ip(addresses: &[String], remote_ip: std::net::IpAddr) -> Result { + use anyhow::Context; + + let family_is_v4 = remote_ip.is_ipv4(); + addresses + .iter() + .filter_map(|cidr| cidr.split('/').next()) + .filter_map(|ip| ip.parse::().ok()) + .find(|ip| ip.is_ipv4() == family_is_v4) + .with_context(|| { + format!( + "no local {} tailnet address found in daemon config {:?}", + if family_is_v4 { "IPv4" } else { "IPv6" }, + addresses + ) + }) +} + +#[cfg(any(target_os = "linux", target_vendor = "apple"))] +fn select_tailnet_local_socket( + addresses: &[String], + remote_ip: std::net::IpAddr, +) -> Result { + use rand::Rng; + + let local_ip = select_tailnet_local_ip(addresses, remote_ip)?; + let port = rand::thread_rng().gen_range(40000..50000); + Ok(std::net::SocketAddr::new(local_ip, port)) +} + +#[cfg(any(target_os = "linux", target_vendor = "apple"))] +struct IcmpEchoReply { + source: std::net::IpAddr, + destination: std::net::IpAddr, + payload: Vec, +} + +#[cfg(any(target_os = "linux", target_vendor = "apple"))] +fn build_icmp_echo_request( + source: std::net::IpAddr, + destination: std::net::IpAddr, + identifier: u16, + sequence: u16, + payload: &[u8], +) -> Result> { + use anyhow::bail; + + let (source, destination) = match (source, destination) { + (std::net::IpAddr::V4(source), std::net::IpAddr::V4(destination)) => (source, destination), + _ => bail!("tailnet ping currently supports IPv4 only"), + }; + + let mut icmp = Vec::with_capacity(8 + payload.len()); + icmp.push(8); + icmp.push(0); + icmp.extend_from_slice(&[0, 0]); + icmp.extend_from_slice(&identifier.to_be_bytes()); + icmp.extend_from_slice(&sequence.to_be_bytes()); + icmp.extend_from_slice(payload); + let icmp_checksum = internet_checksum(&icmp); + icmp[2..4].copy_from_slice(&icmp_checksum.to_be_bytes()); + + let total_len = 20 + icmp.len(); + let mut packet = Vec::with_capacity(total_len); + packet.push(0x45); + packet.push(0); + packet.extend_from_slice(&(total_len as u16).to_be_bytes()); + packet.extend_from_slice(&0u16.to_be_bytes()); + packet.extend_from_slice(&0u16.to_be_bytes()); + packet.push(64); + packet.push(1); + packet.extend_from_slice(&[0, 0]); + packet.extend_from_slice(&source.octets()); + packet.extend_from_slice(&destination.octets()); + let header_checksum = internet_checksum(&packet); + packet[10..12].copy_from_slice(&header_checksum.to_be_bytes()); + packet.extend_from_slice(&icmp); + Ok(packet) +} + +#[cfg(any(target_os = "linux", target_vendor = "apple"))] +fn parse_icmp_echo_reply( + packet: &[u8], + local_ip: std::net::IpAddr, + remote_ip: std::net::IpAddr, + identifier: u16, + sequence: u16, +) -> Result> { + use anyhow::bail; + + let (local_ip, remote_ip) = match (local_ip, remote_ip) { + (std::net::IpAddr::V4(local_ip), std::net::IpAddr::V4(remote_ip)) => (local_ip, remote_ip), + _ => bail!("tailnet ping currently supports IPv4 only"), + }; + + if packet.len() < 20 { + return Ok(None); + } + let version = packet[0] >> 4; + if version != 4 { + return Ok(None); + } + let ihl = (packet[0] & 0x0f) as usize * 4; + if packet.len() < ihl + 8 { + return Ok(None); + } + if packet[9] != 1 { + return Ok(None); + } + + let source = std::net::Ipv4Addr::new(packet[12], packet[13], packet[14], packet[15]); + let destination = std::net::Ipv4Addr::new(packet[16], packet[17], packet[18], packet[19]); + if source != remote_ip || destination != local_ip { + return Ok(None); + } + + let icmp = &packet[ihl..]; + if icmp[0] != 0 || icmp[1] != 0 { + return Ok(None); + } + let reply_identifier = u16::from_be_bytes([icmp[4], icmp[5]]); + let reply_sequence = u16::from_be_bytes([icmp[6], icmp[7]]); + if reply_identifier != identifier || reply_sequence != sequence { + return Ok(None); + } + + Ok(Some(IcmpEchoReply { + source: std::net::IpAddr::V4(source), + destination: std::net::IpAddr::V4(destination), + payload: icmp[8..].to_vec(), + })) +} + +#[cfg(any(target_os = "linux", target_vendor = "apple"))] +fn internet_checksum(bytes: &[u8]) -> u16 { + let mut sum = 0u32; + let mut chunks = bytes.chunks_exact(2); + for chunk in &mut chunks { + sum += u16::from_be_bytes([chunk[0], chunk[1]]) as u32; + } + if let Some(&last) = chunks.remainder().first() { + sum += (last as u32) << 8; + } + while (sum >> 16) != 0 { + sum = (sum & 0xffff) + (sum >> 16); + } + !(sum as u16) +} + #[cfg(target_os = "linux")] async fn try_tor_exec(payload_path: &str, command: Vec) -> Result<()> { let exit_code = usernet::run_exec(usernet::ExecInvocation { @@ -348,6 +773,14 @@ async fn main() -> Result<()> { Commands::NetworkList => try_network_list().await?, Commands::NetworkReorder(args) => try_network_reorder(args.id, args.index).await?, Commands::NetworkDelete(args) => try_network_delete(args.id).await?, + Commands::TailnetDiscover(args) => try_tailnet_discover(&args.email).await?, + Commands::TailnetProbe(args) => try_tailnet_probe(&args.authority).await?, + Commands::TailnetPing(args) => { + try_tailnet_ping(&args.remote, &args.payload, args.timeout_ms).await? + } + Commands::TailnetUdpEcho(args) => { + try_tailnet_udp_echo(&args.remote, &args.message, args.timeout_ms).await? + } #[cfg(target_os = "linux")] Commands::Exec(args) => { try_exec(