Add tailnet connectivity smoke path

This commit is contained in:
Conrad Kramer 2026-04-03 17:49:11 -07:00
parent 5079786515
commit 3d80e772c8
3 changed files with 1019 additions and 10 deletions

View file

@ -0,0 +1,186 @@
#!/usr/bin/env bash
set -euo pipefail
repo_root="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
bundle_id="${BURROW_UI_TEST_APP_BUNDLE_ID:-com.hackclub.burrow}"
smoke_root="${BURROW_TAILNET_SMOKE_ROOT:-/tmp/burrow-tailnet-connectivity}"
socket_path="${smoke_root}/burrow.sock"
db_path="${smoke_root}/burrow.db"
daemon_log="${BURROW_TAILNET_SMOKE_DAEMON_LOG:-${smoke_root}/daemon.log}"
payload_path="${smoke_root}/tailnet.json"
authority="${BURROW_TAILNET_SMOKE_AUTHORITY:-https://ts.burrow.net}"
account_name="${BURROW_TAILNET_SMOKE_ACCOUNT:-ui-test}"
identity_name="${BURROW_TAILNET_SMOKE_IDENTITY:-apple}"
hostname="${BURROW_TAILNET_SMOKE_HOSTNAME:-burrow-apple}"
message="${BURROW_TAILNET_SMOKE_MESSAGE:-burrow-tailnet-smoke}"
timeout_ms="${BURROW_TAILNET_SMOKE_TIMEOUT_MS:-8000}"
remote_ip="${BURROW_TAILNET_SMOKE_REMOTE_IP:-}"
remote_port="${BURROW_TAILNET_SMOKE_REMOTE_PORT:-18081}"
remote_hostname="${BURROW_TAILNET_SMOKE_REMOTE_HOSTNAME:-burrow-echo}"
remote_authkey="${BURROW_TAILNET_SMOKE_REMOTE_AUTHKEY:-}"
helper_bin="${BURROW_TAILNET_SMOKE_HELPER_BIN:-${smoke_root}/tailscale-login-bridge}"
remote_state_root="${BURROW_TAILNET_SMOKE_REMOTE_STATE_ROOT:-${smoke_root}/remote-state}"
remote_stdout="${smoke_root}/remote-helper.stdout"
remote_stderr="${BURROW_TAILNET_SMOKE_REMOTE_LOG:-${smoke_root}/remote-helper.log}"
if [[ -n "${TS_AUTHKEY:-}" ]]; then
default_tailnet_state_root="${smoke_root}/local-state"
else
default_tailnet_state_root="/tmp/${bundle_id}/SimulatorTailnetState"
fi
tailnet_state_root="${BURROW_TAILNET_STATE_ROOT:-${default_tailnet_state_root}}"
need_login=0
if [[ -z "${TS_AUTHKEY:-}" ]] && { [[ ! -d "$tailnet_state_root" ]] || [[ -z "$(find "$tailnet_state_root" -mindepth 1 -maxdepth 2 -print -quit 2>/dev/null)" ]]; }; then
need_login=1
fi
if [[ "$need_login" -eq 1 ]]; then
echo "Tailnet state root is empty; running iOS login bootstrap first..."
"${repo_root}/Scripts/run-ios-tailnet-ui-tests.sh"
fi
rm -rf "$smoke_root"
mkdir -p "$smoke_root"
cleanup() {
rm -f "$payload_path"
if [[ -n "${daemon_pid:-}" ]]; then
kill "$daemon_pid" >/dev/null 2>&1 || true
wait "$daemon_pid" >/dev/null 2>&1 || true
fi
if [[ -n "${remote_pid:-}" ]]; then
kill "$remote_pid" >/dev/null 2>&1 || true
wait "$remote_pid" >/dev/null 2>&1 || true
fi
}
trap cleanup EXIT
wait_for_helper_listen() {
python3 - <<'PY' "$1"
import json
import pathlib
import sys
import time
path = pathlib.Path(sys.argv[1])
deadline = time.time() + 20
while time.time() < deadline:
if path.exists():
with path.open("r", encoding="utf-8") as handle:
line = handle.readline().strip()
if line:
hello = json.loads(line)
print(hello["listen_addr"])
raise SystemExit(0)
time.sleep(0.1)
raise SystemExit("timed out waiting for helper startup line")
PY
}
wait_for_helper_ip() {
python3 - <<'PY' "$1"
import json
import sys
import time
import urllib.request
url = sys.argv[1]
deadline = time.time() + 30
while time.time() < deadline:
with urllib.request.urlopen(url, timeout=5) as response:
status = json.load(response)
if status.get("running") and status.get("tailscale_ips"):
print(status["tailscale_ips"][0])
raise SystemExit(0)
time.sleep(0.25)
raise SystemExit("timed out waiting for helper to become ready")
PY
}
python3 - <<'PY' "$payload_path" "$authority" "$account_name" "$identity_name" "$hostname"
import json
import pathlib
import sys
path = pathlib.Path(sys.argv[1])
payload = {
"authority": sys.argv[2],
"account": sys.argv[3],
"identity": sys.argv[4],
"hostname": sys.argv[5],
}
path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
PY
cargo build -p burrow --bin burrow
(
cd "${repo_root}/Tools/tailscale-login-bridge"
GOWORK=off go build -o "$helper_bin" .
)
if [[ -z "$remote_ip" ]]; then
if [[ -z "$remote_authkey" ]] && { [[ ! -d "$remote_state_root" ]] || [[ -z "$(find "$remote_state_root" -mindepth 1 -maxdepth 1 -print -quit 2>/dev/null)" ]]; }; then
echo "error: set BURROW_TAILNET_SMOKE_REMOTE_IP, BURROW_TAILNET_SMOKE_REMOTE_AUTHKEY, or BURROW_TAILNET_SMOKE_REMOTE_STATE_ROOT to an existing logged-in helper state" >&2
exit 1
fi
if [[ -n "$remote_authkey" ]]; then
rm -rf "$remote_state_root"
mkdir -p "$remote_state_root"
fi
(
cd "$repo_root"
if [[ -n "$remote_authkey" ]]; then
export TS_AUTHKEY="$remote_authkey"
fi
"$helper_bin" \
--listen 127.0.0.1:0 \
--state-dir "$remote_state_root" \
--hostname "$remote_hostname" \
--control-url "$authority" \
--udp-echo-port "$remote_port" \
>"$remote_stdout" 2>"$remote_stderr"
) &
remote_pid=$!
remote_listen_addr="$(wait_for_helper_listen "$remote_stdout")"
remote_ip="$(wait_for_helper_ip "http://${remote_listen_addr}/status")"
fi
(
cd "$smoke_root"
RUST_LOG="${BURROW_TAILNET_SMOKE_RUST_LOG:-info,burrow=debug}" \
BURROW_SOCKET_PATH="$socket_path" \
BURROW_TAILSCALE_STATE_ROOT="$tailnet_state_root" \
"${repo_root}/target/debug/burrow" daemon >"$daemon_log" 2>&1
) &
daemon_pid=$!
for _ in $(seq 1 50); do
[[ -S "$socket_path" ]] && break
sleep 0.2
done
if [[ ! -S "$socket_path" ]]; then
echo "error: Burrow daemon did not create ${socket_path}" >&2
[[ -f "$daemon_log" ]] && cat "$daemon_log" >&2
exit 1
fi
run_burrow() {
BURROW_SOCKET_PATH="$socket_path" \
BURROW_TAILSCALE_STATE_ROOT="$tailnet_state_root" \
"${repo_root}/target/debug/burrow" "$@"
}
run_burrow network-add 1 1 "$payload_path"
run_burrow start
run_burrow tunnel-config
run_burrow tailnet-udp-echo "${remote_ip}:${remote_port}" --message "$message" --timeout-ms "$timeout_ms"
echo
echo "Tailnet connectivity smoke passed."
echo "State root: $tailnet_state_root"
echo "Remote: ${remote_ip}:${remote_port}"

View file

@ -2,17 +2,26 @@ package main
import ( import (
"context" "context"
"encoding/binary"
"encoding/json" "encoding/json"
"errors"
"flag" "flag"
"fmt" "fmt"
"io"
"log" "log"
"net" "net"
"net/netip"
"net/http" "net/http"
"os" "os"
"strconv"
"sync"
"time" "time"
"github.com/tailscale/wireguard-go/tun"
"tailscale.com/client/local" "tailscale.com/client/local"
"tailscale.com/ipn" "tailscale.com/ipn"
"tailscale.com/ipn/ipnstate"
"tailscale.com/tailcfg"
"tailscale.com/tsnet" "tailscale.com/tsnet"
) )
@ -26,13 +35,123 @@ type statusResponse struct {
SelfDNSName string `json:"self_dns_name,omitempty"` SelfDNSName string `json:"self_dns_name,omitempty"`
TailscaleIPs []string `json:"tailscale_ips,omitempty"` TailscaleIPs []string `json:"tailscale_ips,omitempty"`
Health []string `json:"health,omitempty"` Health []string `json:"health,omitempty"`
Peers []peerSummary `json:"peers,omitempty"`
} }
type peerSummary struct {
Name string `json:"name,omitempty"`
DNSName string `json:"dns_name,omitempty"`
TailscaleIPs []string `json:"tailscale_ips,omitempty"`
Online bool `json:"online"`
Active bool `json:"active"`
Relay string `json:"relay,omitempty"`
CurAddr string `json:"cur_addr,omitempty"`
LastSeenUnix int64 `json:"last_seen_unix,omitempty"`
}
type pingResponse struct {
Result *ipnstate.PingResult `json:"result,omitempty"`
}
type helperHello struct {
ListenAddr string `json:"listen_addr"`
PacketSocket string `json:"packet_socket,omitempty"`
}
type helperState struct {
mu sync.RWMutex
authURL string
}
func (s *helperState) authURLSnapshot() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.authURL
}
func (s *helperState) setAuthURL(url string) {
s.mu.Lock()
defer s.mu.Unlock()
s.authURL = url
}
func (s *helperState) clearAuthURL() {
s.setAuthURL("")
}
// chanTUN is a tun.Device backed by channels so another process can feed and
// consume raw IP packets while tsnet handles the Tailnet control/data plane.
type chanTUN struct {
Inbound chan []byte
Outbound chan []byte
closed chan struct{}
events chan tun.Event
}
func newChanTUN() *chanTUN {
t := &chanTUN{
Inbound: make(chan []byte, 1024),
Outbound: make(chan []byte, 1024),
closed: make(chan struct{}),
events: make(chan tun.Event, 1),
}
t.events <- tun.EventUp
return t
}
func (t *chanTUN) File() *os.File { return nil }
func (t *chanTUN) Close() error {
select {
case <-t.closed:
default:
close(t.closed)
close(t.Inbound)
}
return nil
}
func (t *chanTUN) Read(bufs [][]byte, sizes []int, offset int) (int, error) {
select {
case <-t.closed:
return 0, io.EOF
case pkt, ok := <-t.Outbound:
if !ok {
return 0, io.EOF
}
sizes[0] = copy(bufs[0][offset:], pkt)
return 1, nil
}
}
func (t *chanTUN) Write(bufs [][]byte, offset int) (int, error) {
for _, buf := range bufs {
pkt := buf[offset:]
if len(pkt) == 0 {
continue
}
select {
case <-t.closed:
return 0, errors.New("closed")
case t.Inbound <- append([]byte(nil), pkt...):
default:
}
}
return len(bufs), nil
}
func (t *chanTUN) MTU() (int, error) { return 1280, nil }
func (t *chanTUN) Name() (string, error) { return "burrow-tailnet", nil }
func (t *chanTUN) Events() <-chan tun.Event { return t.events }
func (t *chanTUN) BatchSize() int { return 1 }
func main() { func main() {
listen := flag.String("listen", "127.0.0.1:0", "local listen address") listen := flag.String("listen", "127.0.0.1:0", "local listen address")
stateDir := flag.String("state-dir", "", "persistent state directory") stateDir := flag.String("state-dir", "", "persistent state directory")
hostname := flag.String("hostname", "burrow-apple", "tailnet hostname") hostname := flag.String("hostname", "burrow-apple", "tailnet hostname")
controlURL := flag.String("control-url", "", "optional control URL") controlURL := flag.String("control-url", "", "optional control URL")
packetSocket := flag.String("packet-socket", "", "optional unix socket path for raw packet bridging")
udpEchoPort := flag.Int("udp-echo-port", 0, "optional tailnet UDP echo port")
flag.Parse() flag.Parse()
if *stateDir == "" { if *stateDir == "" {
@ -48,6 +167,24 @@ func main() {
Hostname: *hostname, Hostname: *hostname,
UserLogf: log.Printf, UserLogf: log.Printf,
} }
var tunDevice *chanTUN
var packetListener net.Listener
if *packetSocket != "" {
_ = os.Remove(*packetSocket)
ln, err := net.Listen("unix", *packetSocket)
if err != nil {
log.Fatalf("packet listen: %v", err)
}
packetListener = ln
defer func() {
packetListener.Close()
_ = os.Remove(*packetSocket)
}()
tunDevice = newChanTUN()
server.Tun = tunDevice
}
if *controlURL != "" { if *controlURL != "" {
server.ControlURL = *controlURL server.ControlURL = *controlURL
} }
@ -61,6 +198,7 @@ func main() {
if err != nil { if err != nil {
log.Fatalf("local client: %v", err) log.Fatalf("local client: %v", err)
} }
state := &helperState{}
ln, err := net.Listen("tcp", *listen) ln, err := net.Listen("tcp", *listen)
if err != nil { if err != nil {
@ -68,12 +206,27 @@ func main() {
} }
defer ln.Close() defer ln.Close()
fmt.Printf("{\"listen_addr\":%q}\n", ln.Addr().String()) if packetListener != nil {
go servePacketBridge(packetListener, tunDevice)
}
if *udpEchoPort > 0 {
go serveUDPEcho(context.Background(), server, localClient, *udpEchoPort)
}
hello := helperHello{
ListenAddr: ln.Addr().String(),
}
if *packetSocket != "" {
hello.PacketSocket = *packetSocket
}
if err := json.NewEncoder(os.Stdout).Encode(hello); err != nil {
log.Fatalf("write hello: %v", err)
}
_ = os.Stdout.Sync() _ = os.Stdout.Sync()
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
status, err := snapshot(r.Context(), localClient) status, err := snapshot(r.Context(), localClient, state)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway) http.Error(w, err.Error(), http.StatusBadGateway)
return return
@ -81,6 +234,40 @@ func main() {
w.Header().Set("content-type", "application/json") w.Header().Set("content-type", "application/json")
_ = json.NewEncoder(w).Encode(status) _ = json.NewEncoder(w).Encode(status)
}) })
mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
ip := r.URL.Query().Get("ip")
if ip == "" {
http.Error(w, "missing ip", http.StatusBadRequest)
return
}
target, err := netip.ParseAddr(ip)
if err != nil {
http.Error(w, fmt.Sprintf("invalid ip: %v", err), http.StatusBadRequest)
return
}
pingType := tailcfg.PingTSMP
switch r.URL.Query().Get("type") {
case "", "tsmp", "TSMP":
pingType = tailcfg.PingTSMP
case "icmp", "ICMP":
pingType = tailcfg.PingICMP
case "peerapi":
pingType = tailcfg.PingPeerAPI
default:
http.Error(w, "unsupported ping type", http.StatusBadRequest)
return
}
result, err := localClient.Ping(r.Context(), target, pingType)
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
}
w.Header().Set("content-type", "application/json")
_ = json.NewEncoder(w).Encode(&pingResponse{Result: result})
})
mux.HandleFunc("/shutdown", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/shutdown", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
go func() { go func() {
@ -96,16 +283,110 @@ func main() {
log.Fatal(httpServer.Serve(ln)) log.Fatal(httpServer.Serve(ln))
} }
func snapshot(ctx context.Context, localClient *local.Client) (*statusResponse, error) { func servePacketBridge(listener net.Listener, device *chanTUN) {
status, err := localClient.StatusWithoutPeers(ctx) for {
conn, err := listener.Accept()
if err != nil {
if errors.Is(err, net.ErrClosed) {
return
}
log.Printf("packet accept: %v", err)
continue
}
log.Printf("packet bridge connected")
if err := bridgePacketConn(conn, device); err != nil && !errors.Is(err, io.EOF) {
log.Printf("packet bridge error: %v", err)
}
_ = conn.Close()
log.Printf("packet bridge disconnected")
}
}
func bridgePacketConn(conn net.Conn, device *chanTUN) error {
errCh := make(chan error, 2)
go func() {
for {
pkt, err := readFrame(conn)
if err != nil {
errCh <- err
return
}
select {
case <-device.closed:
errCh <- io.EOF
return
case device.Outbound <- pkt:
}
}
}()
go func() {
for {
select {
case <-device.closed:
errCh <- io.EOF
return
case pkt, ok := <-device.Inbound:
if !ok {
errCh <- io.EOF
return
}
if err := writeFrame(conn, pkt); err != nil {
errCh <- err
return
}
}
}
}()
return <-errCh
}
func readFrame(r io.Reader) ([]byte, error) {
var size [4]byte
if _, err := io.ReadFull(r, size[:]); err != nil {
return nil, err
}
length := binary.BigEndian.Uint32(size[:])
if length == 0 {
return []byte{}, nil
}
packet := make([]byte, length)
if _, err := io.ReadFull(r, packet); err != nil {
return nil, err
}
return packet, nil
}
func writeFrame(w io.Writer, packet []byte) error {
var size [4]byte
binary.BigEndian.PutUint32(size[:], uint32(len(packet)))
if _, err := w.Write(size[:]); err != nil {
return err
}
if len(packet) == 0 {
return nil
}
_, err := w.Write(packet)
return err
}
func snapshot(ctx context.Context, localClient *local.Client, state *helperState) (*statusResponse, error) {
status, err := localClient.Status(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if (status.BackendState == ipn.NeedsLogin.String() || status.BackendState == ipn.NoState.String()) && status.AuthURL == "" {
if err := localClient.StartLoginInteractive(ctx); err != nil { authURL := status.AuthURL
return nil, err if authURL == "" {
} authURL = state.authURLSnapshot()
status, err = localClient.StatusWithoutPeers(ctx) }
if status.BackendState == ipn.Running.String() {
state.clearAuthURL()
authURL = ""
} else if (status.BackendState == ipn.NeedsLogin.String() || status.BackendState == ipn.NoState.String()) && authURL == "" {
authURL, err = awaitAuthURL(ctx, localClient, state)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -113,7 +394,7 @@ func snapshot(ctx context.Context, localClient *local.Client) (*statusResponse,
response := &statusResponse{ response := &statusResponse{
BackendState: status.BackendState, BackendState: status.BackendState,
AuthURL: status.AuthURL, AuthURL: authURL,
Running: status.BackendState == ipn.Running.String(), Running: status.BackendState == ipn.Running.String(),
NeedsLogin: status.BackendState == ipn.NeedsLogin.String(), NeedsLogin: status.BackendState == ipn.NeedsLogin.String(),
Health: append([]string(nil), status.Health...), Health: append([]string(nil), status.Health...),
@ -129,5 +410,114 @@ func snapshot(ctx context.Context, localClient *local.Client) (*statusResponse,
for _, ip := range status.TailscaleIPs { for _, ip := range status.TailscaleIPs {
response.TailscaleIPs = append(response.TailscaleIPs, ip.String()) response.TailscaleIPs = append(response.TailscaleIPs, ip.String())
} }
for _, key := range status.Peers() {
peer := status.Peer[key]
if peer == nil {
continue
}
summary := peerSummary{
Name: peer.HostName,
DNSName: peer.DNSName,
Online: peer.Online,
Active: peer.Active,
Relay: peer.Relay,
CurAddr: peer.CurAddr,
LastSeenUnix: peer.LastSeen.Unix(),
}
for _, ip := range peer.TailscaleIPs {
summary.TailscaleIPs = append(summary.TailscaleIPs, ip.String())
}
response.Peers = append(response.Peers, summary)
}
return response, nil return response, nil
} }
func serveUDPEcho(ctx context.Context, server *tsnet.Server, localClient *local.Client, port int) {
ip, err := awaitTailscaleIP(ctx, localClient)
if err != nil {
log.Printf("udp echo setup failed: %v", err)
return
}
listenAddr := net.JoinHostPort(ip.String(), strconv.Itoa(port))
pc, err := server.ListenPacket("udp", listenAddr)
if err != nil {
log.Printf("udp echo listen failed on %s: %v", listenAddr, err)
return
}
defer pc.Close()
log.Printf("udp echo listening on %s", pc.LocalAddr())
buf := make([]byte, 64<<10)
for {
n, addr, err := pc.ReadFrom(buf)
if err != nil {
if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
return
}
log.Printf("udp echo read failed: %v", err)
return
}
if _, err := pc.WriteTo(buf[:n], addr); err != nil {
log.Printf("udp echo write failed: %v", err)
return
}
}
}
func awaitTailscaleIP(ctx context.Context, localClient *local.Client) (netip.Addr, error) {
for range 60 {
status, err := localClient.StatusWithoutPeers(ctx)
if err == nil {
for _, ip := range status.TailscaleIPs {
if ip.Is4() {
return ip, nil
}
}
for _, ip := range status.TailscaleIPs {
if ip.Is6() {
return ip, nil
}
}
}
select {
case <-ctx.Done():
return netip.Addr{}, ctx.Err()
case <-time.After(250 * time.Millisecond):
}
}
return netip.Addr{}, errors.New("timed out waiting for tailscale IP")
}
func awaitAuthURL(ctx context.Context, localClient *local.Client, state *helperState) (string, error) {
watchCtx, cancel := context.WithTimeout(ctx, 8*time.Second)
defer cancel()
watcher, err := localClient.WatchIPNBus(watchCtx, ipn.NotifyInitialState)
if err != nil {
return "", err
}
defer watcher.Close()
if err := localClient.StartLoginInteractive(ctx); err != nil {
return "", err
}
for {
notify, err := watcher.Next()
if err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return state.authURLSnapshot(), nil
}
return "", err
}
if notify.BrowseToURL != nil && *notify.BrowseToURL != "" {
state.setAuthURL(*notify.BrowseToURL)
return *notify.BrowseToURL, nil
}
if notify.State != nil && *notify.State == ipn.Running {
state.clearAuthURL()
return "", nil
}
}
}

View file

@ -72,6 +72,14 @@ enum Commands {
NetworkReorder(NetworkReorderArgs), NetworkReorder(NetworkReorderArgs),
/// Delete Network /// Delete Network
NetworkDelete(NetworkDeleteArgs), NetworkDelete(NetworkDeleteArgs),
/// Discover a Tailnet authority through the daemon
TailnetDiscover(TailnetDiscoverArgs),
/// Probe a Tailnet authority through the daemon
TailnetProbe(TailnetProbeArgs),
/// Send an ICMP echo probe through the active Tailnet tunnel over daemon packet streaming
TailnetPing(TailnetPingArgs),
/// Send a UDP echo probe through the active Tailnet tunnel over daemon packet streaming
TailnetUdpEcho(TailnetUdpEchoArgs),
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
/// Run a command in an unshared Linux namespace using a Burrow backend /// Run a command in an unshared Linux namespace using a Burrow backend
Exec(ExecArgs), Exec(ExecArgs),
@ -110,6 +118,36 @@ struct NetworkDeleteArgs {
id: i32, id: i32,
} }
#[derive(Args)]
struct TailnetDiscoverArgs {
email: String,
}
#[derive(Args)]
struct TailnetProbeArgs {
authority: String,
}
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
#[derive(Args)]
struct TailnetPingArgs {
remote: String,
#[arg(long, default_value = "burrow-tailnet-smoke")]
payload: String,
#[arg(long, default_value_t = 5000)]
timeout_ms: u64,
}
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
#[derive(Args)]
struct TailnetUdpEchoArgs {
remote: String,
#[arg(long, default_value = "burrow-tailnet-smoke")]
message: String,
#[arg(long, default_value_t = 5000)]
timeout_ms: u64,
}
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
#[derive(Args)] #[derive(Args)]
struct TorExecArgs { struct TorExecArgs {
@ -240,6 +278,393 @@ async fn try_network_delete(id: i32) -> Result<()> {
Ok(()) Ok(())
} }
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
async fn try_tailnet_discover(email: &str) -> Result<()> {
let mut client = BurrowClient::from_uds().await?;
let response = client
.tailnet_client
.discover(crate::daemon::rpc::grpc_defs::TailnetDiscoverRequest {
email: email.to_owned(),
})
.await?
.into_inner();
println!("Tailnet Discover Response: {:?}", response);
Ok(())
}
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
async fn try_tailnet_probe(authority: &str) -> Result<()> {
let mut client = BurrowClient::from_uds().await?;
let response = client
.tailnet_client
.probe(crate::daemon::rpc::grpc_defs::TailnetProbeRequest {
authority: authority.to_owned(),
})
.await?
.into_inner();
println!("Tailnet Probe Response: {:?}", response);
Ok(())
}
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
async fn try_tailnet_ping(remote: &str, payload: &str, timeout_ms: u64) -> Result<()> {
use std::net::IpAddr;
use anyhow::Context;
use rand::Rng;
use tokio::{
sync::mpsc,
time::{timeout, Duration},
};
use tokio_stream::wrappers::ReceiverStream;
use crate::daemon::rpc::grpc_defs::{Empty, TunnelPacket};
let remote_ip: IpAddr = remote
.parse()
.with_context(|| format!("invalid remote IP address {remote}"))?;
let message = payload.as_bytes().to_vec();
let mut client = BurrowClient::from_uds().await?;
client.tunnel_client.tunnel_start(Empty {}).await?;
let mut config_stream = client
.tunnel_client
.tunnel_configuration(Empty {})
.await?
.into_inner();
let config = config_stream
.message()
.await?
.context("tunnel configuration stream ended before yielding a config")?;
let local_ip = select_tailnet_local_ip(&config.addresses, remote_ip)?;
let identifier = rand::thread_rng().gen::<u16>();
let sequence = 1_u16;
let packet = build_icmp_echo_request(local_ip, remote_ip, identifier, sequence, &message)?;
let (outbound_tx, outbound_rx) = mpsc::channel::<TunnelPacket>(128);
let mut tunnel_packets = client
.tunnel_client
.tunnel_packets(ReceiverStream::new(outbound_rx))
.await?
.into_inner();
outbound_tx
.send(TunnelPacket { payload: packet })
.await
.context("failed to send ICMP echo probe into daemon packet stream")?;
log::debug!(
"tailnet ping probe queued from {local_ip} to {remote_ip} identifier={identifier} sequence={sequence}"
);
drop(outbound_tx);
let reply = timeout(Duration::from_millis(timeout_ms), async {
loop {
let packet = tunnel_packets
.message()
.await
.context("failed to read packet from daemon packet stream")?
.context("daemon packet stream ended before returning a reply")?;
log::debug!(
"tailnet ping received {} bytes from daemon packet stream",
packet.payload.len()
);
if let Some(reply) = parse_icmp_echo_reply(
&packet.payload,
local_ip,
remote_ip,
identifier,
sequence,
)? {
break Ok::<_, anyhow::Error>(reply);
}
}
})
.await
.with_context(|| format!("timed out waiting for ICMP echo reply from {remote_ip}"))??;
println!("Tailnet Ping Source: {}", reply.source);
println!("Tailnet Ping Destination: {}", reply.destination);
println!(
"Tailnet Ping Payload: {}",
String::from_utf8_lossy(&reply.payload)
);
Ok(())
}
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
async fn try_tailnet_udp_echo(remote: &str, message: &str, timeout_ms: u64) -> Result<()> {
use std::net::SocketAddr;
use anyhow::{bail, Context};
use futures::{SinkExt, StreamExt};
use netstack_smoltcp::StackBuilder;
use tokio::{
sync::mpsc,
time::{timeout, Duration},
};
use tokio_stream::wrappers::ReceiverStream;
use crate::daemon::rpc::grpc_defs::{Empty, TunnelPacket};
let remote_addr: SocketAddr = remote
.parse()
.with_context(|| format!("invalid remote socket address {remote}"))?;
let mut client = BurrowClient::from_uds().await?;
client.tunnel_client.tunnel_start(Empty {}).await?;
let mut config_stream = client
.tunnel_client
.tunnel_configuration(Empty {})
.await?
.into_inner();
let config = config_stream
.message()
.await?
.context("tunnel configuration stream ended before yielding a config")?;
let local_addr = select_tailnet_local_socket(&config.addresses, remote_addr.ip())?;
let (stack, runner, udp_socket, _) = StackBuilder::default()
.enable_udp(true)
.enable_tcp(true)
.build()
.context("failed to build userspace UDP stack")?;
let runner = runner.context("userspace UDP stack runner unavailable")?;
let udp_socket = udp_socket.context("userspace UDP stack socket unavailable")?;
let (mut stack_sink, mut stack_stream) = stack.split();
let (mut udp_reader, mut udp_writer) = udp_socket.split();
let (outbound_tx, outbound_rx) = mpsc::channel::<TunnelPacket>(128);
let mut tunnel_packets = client
.tunnel_client
.tunnel_packets(ReceiverStream::new(outbound_rx))
.await?
.into_inner();
let ingress_task = tokio::spawn(async move {
loop {
match tunnel_packets.message().await? {
Some(packet) => {
log::debug!(
"tailnet udp echo received {} bytes from daemon packet stream",
packet.payload.len()
);
stack_sink
.send(packet.payload)
.await
.context("failed to feed inbound tailnet packet into userspace stack")?;
}
None => break,
}
}
Result::<()>::Ok(())
});
let egress_task = tokio::spawn(async move {
while let Some(packet) = stack_stream.next().await {
let payload =
packet.context("failed to read outbound packet from userspace stack")?;
log::debug!(
"tailnet udp echo sending {} bytes into daemon packet stream",
payload.len()
);
outbound_tx
.send(TunnelPacket { payload })
.await
.context("failed to forward outbound tailnet packet to daemon")?;
}
Result::<()>::Ok(())
});
let runner_task = tokio::spawn(async move { runner.await.map_err(anyhow::Error::from) });
udp_writer
.send((message.as_bytes().to_vec(), local_addr, remote_addr))
.await
.context("failed to send UDP echo probe into userspace stack")?;
log::debug!(
"tailnet udp echo probe queued from {local_addr} to {remote_addr}"
);
let response = timeout(Duration::from_millis(timeout_ms), udp_reader.next())
.await
.with_context(|| format!("timed out waiting for UDP echo from {remote_addr}"))?
.context("userspace UDP stack ended before returning a reply")?;
let (payload, reply_source, reply_destination) = response;
let response_text = String::from_utf8_lossy(&payload);
ingress_task.abort();
egress_task.abort();
runner_task.abort();
if reply_source != remote_addr {
bail!("received UDP reply from unexpected source {reply_source}");
}
if reply_destination != local_addr {
bail!("received UDP reply for unexpected local socket {reply_destination}");
}
if payload != message.as_bytes() {
bail!("UDP echo payload mismatch");
}
println!("Tailnet UDP Echo Source: {reply_source}");
println!("Tailnet UDP Echo Destination: {reply_destination}");
println!("Tailnet UDP Echo Payload: {response_text}");
Ok(())
}
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
fn select_tailnet_local_ip(addresses: &[String], remote_ip: std::net::IpAddr) -> Result<std::net::IpAddr> {
use anyhow::Context;
let family_is_v4 = remote_ip.is_ipv4();
addresses
.iter()
.filter_map(|cidr| cidr.split('/').next())
.filter_map(|ip| ip.parse::<std::net::IpAddr>().ok())
.find(|ip| ip.is_ipv4() == family_is_v4)
.with_context(|| {
format!(
"no local {} tailnet address found in daemon config {:?}",
if family_is_v4 { "IPv4" } else { "IPv6" },
addresses
)
})
}
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
fn select_tailnet_local_socket(
addresses: &[String],
remote_ip: std::net::IpAddr,
) -> Result<std::net::SocketAddr> {
use rand::Rng;
let local_ip = select_tailnet_local_ip(addresses, remote_ip)?;
let port = rand::thread_rng().gen_range(40000..50000);
Ok(std::net::SocketAddr::new(local_ip, port))
}
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
struct IcmpEchoReply {
source: std::net::IpAddr,
destination: std::net::IpAddr,
payload: Vec<u8>,
}
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
fn build_icmp_echo_request(
source: std::net::IpAddr,
destination: std::net::IpAddr,
identifier: u16,
sequence: u16,
payload: &[u8],
) -> Result<Vec<u8>> {
use anyhow::bail;
let (source, destination) = match (source, destination) {
(std::net::IpAddr::V4(source), std::net::IpAddr::V4(destination)) => (source, destination),
_ => bail!("tailnet ping currently supports IPv4 only"),
};
let mut icmp = Vec::with_capacity(8 + payload.len());
icmp.push(8);
icmp.push(0);
icmp.extend_from_slice(&[0, 0]);
icmp.extend_from_slice(&identifier.to_be_bytes());
icmp.extend_from_slice(&sequence.to_be_bytes());
icmp.extend_from_slice(payload);
let icmp_checksum = internet_checksum(&icmp);
icmp[2..4].copy_from_slice(&icmp_checksum.to_be_bytes());
let total_len = 20 + icmp.len();
let mut packet = Vec::with_capacity(total_len);
packet.push(0x45);
packet.push(0);
packet.extend_from_slice(&(total_len as u16).to_be_bytes());
packet.extend_from_slice(&0u16.to_be_bytes());
packet.extend_from_slice(&0u16.to_be_bytes());
packet.push(64);
packet.push(1);
packet.extend_from_slice(&[0, 0]);
packet.extend_from_slice(&source.octets());
packet.extend_from_slice(&destination.octets());
let header_checksum = internet_checksum(&packet);
packet[10..12].copy_from_slice(&header_checksum.to_be_bytes());
packet.extend_from_slice(&icmp);
Ok(packet)
}
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
fn parse_icmp_echo_reply(
packet: &[u8],
local_ip: std::net::IpAddr,
remote_ip: std::net::IpAddr,
identifier: u16,
sequence: u16,
) -> Result<Option<IcmpEchoReply>> {
use anyhow::bail;
let (local_ip, remote_ip) = match (local_ip, remote_ip) {
(std::net::IpAddr::V4(local_ip), std::net::IpAddr::V4(remote_ip)) => (local_ip, remote_ip),
_ => bail!("tailnet ping currently supports IPv4 only"),
};
if packet.len() < 20 {
return Ok(None);
}
let version = packet[0] >> 4;
if version != 4 {
return Ok(None);
}
let ihl = (packet[0] & 0x0f) as usize * 4;
if packet.len() < ihl + 8 {
return Ok(None);
}
if packet[9] != 1 {
return Ok(None);
}
let source = std::net::Ipv4Addr::new(packet[12], packet[13], packet[14], packet[15]);
let destination = std::net::Ipv4Addr::new(packet[16], packet[17], packet[18], packet[19]);
if source != remote_ip || destination != local_ip {
return Ok(None);
}
let icmp = &packet[ihl..];
if icmp[0] != 0 || icmp[1] != 0 {
return Ok(None);
}
let reply_identifier = u16::from_be_bytes([icmp[4], icmp[5]]);
let reply_sequence = u16::from_be_bytes([icmp[6], icmp[7]]);
if reply_identifier != identifier || reply_sequence != sequence {
return Ok(None);
}
Ok(Some(IcmpEchoReply {
source: std::net::IpAddr::V4(source),
destination: std::net::IpAddr::V4(destination),
payload: icmp[8..].to_vec(),
}))
}
#[cfg(any(target_os = "linux", target_vendor = "apple"))]
fn internet_checksum(bytes: &[u8]) -> u16 {
let mut sum = 0u32;
let mut chunks = bytes.chunks_exact(2);
for chunk in &mut chunks {
sum += u16::from_be_bytes([chunk[0], chunk[1]]) as u32;
}
if let Some(&last) = chunks.remainder().first() {
sum += (last as u32) << 8;
}
while (sum >> 16) != 0 {
sum = (sum & 0xffff) + (sum >> 16);
}
!(sum as u16)
}
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
async fn try_tor_exec(payload_path: &str, command: Vec<String>) -> Result<()> { async fn try_tor_exec(payload_path: &str, command: Vec<String>) -> Result<()> {
let exit_code = usernet::run_exec(usernet::ExecInvocation { let exit_code = usernet::run_exec(usernet::ExecInvocation {
@ -348,6 +773,14 @@ async fn main() -> Result<()> {
Commands::NetworkList => try_network_list().await?, Commands::NetworkList => try_network_list().await?,
Commands::NetworkReorder(args) => try_network_reorder(args.id, args.index).await?, Commands::NetworkReorder(args) => try_network_reorder(args.id, args.index).await?,
Commands::NetworkDelete(args) => try_network_delete(args.id).await?, Commands::NetworkDelete(args) => try_network_delete(args.id).await?,
Commands::TailnetDiscover(args) => try_tailnet_discover(&args.email).await?,
Commands::TailnetProbe(args) => try_tailnet_probe(&args.authority).await?,
Commands::TailnetPing(args) => {
try_tailnet_ping(&args.remote, &args.payload, args.timeout_ms).await?
}
Commands::TailnetUdpEcho(args) => {
try_tailnet_udp_echo(&args.remote, &args.message, args.timeout_ms).await?
}
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
Commands::Exec(args) => { Commands::Exec(args) => {
try_exec( try_exec(