Harden macos runner cleanup
This commit is contained in:
parent
fc79766a31
commit
283209d364
5 changed files with 239 additions and 113 deletions
144
Scripts/forgejo-prune-runners.py
Executable file
144
Scripts/forgejo-prune-runners.py
Executable file
|
|
@ -0,0 +1,144 @@
|
|||
#!/usr/bin/env python3
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
import subprocess
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
|
||||
|
||||
def _read_token() -> str:
|
||||
token = os.environ.get("FORGEJO_API_TOKEN", "").strip()
|
||||
token_file = os.environ.get("FORGEJO_API_TOKEN_FILE", "").strip()
|
||||
if not token and token_file:
|
||||
token = pathlib.Path(token_file).read_text().strip()
|
||||
if not token:
|
||||
raise SystemExit("Forgejo API token is missing")
|
||||
if token.startswith("PENDING-"):
|
||||
raise SystemExit("Forgejo API token is pending")
|
||||
return token
|
||||
|
||||
|
||||
def _request(method: str, url: str, token: str) -> tuple[int, str]:
|
||||
headers = {"Authorization": f"token {token}", "Accept": "application/json"}
|
||||
req = urllib.request.Request(url, headers=headers, method=method)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=20) as resp:
|
||||
body = resp.read().decode("utf-8")
|
||||
return resp.getcode(), body
|
||||
except urllib.error.HTTPError as exc:
|
||||
body = exc.read().decode("utf-8")
|
||||
return exc.code, body
|
||||
|
||||
|
||||
def _list_runners(api_url: str, token: str, org: str | None) -> tuple[str, list[dict]]:
|
||||
if org:
|
||||
list_url = f"{api_url}/orgs/{org}/actions/runners"
|
||||
else:
|
||||
list_url = f"{api_url}/actions/runners"
|
||||
status, body = _request("GET", list_url, token)
|
||||
if status == 404:
|
||||
return list_url, []
|
||||
if status >= 400:
|
||||
raise RuntimeError(f"list runners failed ({status}) {body}")
|
||||
try:
|
||||
runners = json.loads(body)
|
||||
except json.JSONDecodeError as exc:
|
||||
raise RuntimeError(f"invalid runner list response: {exc}") from exc
|
||||
if not isinstance(runners, list):
|
||||
raise RuntimeError("runner list response is not a list")
|
||||
return list_url, runners
|
||||
|
||||
|
||||
def _delete_runner(api_url: str, token: str, org: str | None, runner_id: int) -> bool:
|
||||
if org:
|
||||
delete_url = f"{api_url}/orgs/{org}/actions/runners/{runner_id}"
|
||||
else:
|
||||
delete_url = f"{api_url}/actions/runners/{runner_id}"
|
||||
status, body = _request("DELETE", delete_url, token)
|
||||
if status in (200, 204):
|
||||
return True
|
||||
print(f"[forgejo-prune-runners] delete {runner_id} failed: {status} {body}")
|
||||
return False
|
||||
|
||||
|
||||
def _prune_db(ttl_seconds: int) -> int:
|
||||
cutoff = int(time.time()) - ttl_seconds
|
||||
now = int(time.time())
|
||||
sql = (
|
||||
"WITH updated AS ("
|
||||
"UPDATE action_runner "
|
||||
f"SET deleted = {now} "
|
||||
"WHERE (deleted IS NULL OR deleted = 0) "
|
||||
f"AND ((last_online IS NOT NULL AND last_online > 0 AND last_online < {cutoff}) "
|
||||
f"OR (COALESCE(last_online, 0) = 0 AND created < {cutoff})) "
|
||||
"RETURNING 1"
|
||||
") SELECT count(*) FROM updated;"
|
||||
)
|
||||
result = subprocess.run(
|
||||
["psql", "-h", "/run/postgresql", "-U", "forgejo", "forgejo", "-tAc", sql],
|
||||
check=True,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
output = (result.stdout or "").strip()
|
||||
try:
|
||||
return int(output)
|
||||
except ValueError:
|
||||
return 0
|
||||
|
||||
|
||||
def main() -> None:
|
||||
api_url = os.environ.get("FORGEJO_API_URL", "https://git.burrow.net/api/v1").rstrip("/")
|
||||
org = os.environ.get("FORGEJO_ORG", "hackclub").strip() or None
|
||||
dry_run = os.environ.get("FORGEJO_DRY_RUN", "0") == "1"
|
||||
db_only = os.environ.get("FORGEJO_PRUNE_DB", "0") == "1"
|
||||
ttl_seconds = int(os.environ.get("FORGEJO_RUNNER_TTL_SEC", "3600"))
|
||||
|
||||
if db_only:
|
||||
removed = _prune_db(ttl_seconds)
|
||||
print(f"[forgejo-prune-runners] pruned {removed} runners via DB")
|
||||
return
|
||||
|
||||
token = _read_token()
|
||||
|
||||
try:
|
||||
_, runners = _list_runners(api_url, token, org)
|
||||
except RuntimeError as exc:
|
||||
if org is not None:
|
||||
print(f"[forgejo-prune-runners] org runner list failed ({exc}); retrying instance scope")
|
||||
_, runners = _list_runners(api_url, token, None)
|
||||
org = None
|
||||
else:
|
||||
raise SystemExit(str(exc))
|
||||
|
||||
if not runners:
|
||||
removed = _prune_db(ttl_seconds)
|
||||
print(f"[forgejo-prune-runners] pruned {removed} runners via DB fallback")
|
||||
return
|
||||
|
||||
removed = 0
|
||||
for runner in runners:
|
||||
runner_id = runner.get("id")
|
||||
name = runner.get("name", "unknown")
|
||||
status = (runner.get("status") or "").lower()
|
||||
busy = bool(runner.get("busy"))
|
||||
if status == "online" or busy:
|
||||
continue
|
||||
if runner_id is None:
|
||||
continue
|
||||
if dry_run:
|
||||
print(f"[forgejo-prune-runners] would delete runner {runner_id} ({name}) status={status}")
|
||||
continue
|
||||
if _delete_runner(api_url, token, org, int(runner_id)):
|
||||
removed += 1
|
||||
print(f"[forgejo-prune-runners] deleted runner {runner_id} ({name})")
|
||||
|
||||
print(f"[forgejo-prune-runners] done; removed {removed} runners")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -150,6 +150,38 @@ in {
|
|||
description = "Allow placeholder values (PENDING-) in the autoscaler config.";
|
||||
};
|
||||
};
|
||||
|
||||
pruneRunners = {
|
||||
enable = mkOption {
|
||||
type = types.bool;
|
||||
default = true;
|
||||
description = "Enable periodic pruning of stale Forgejo action runners.";
|
||||
};
|
||||
|
||||
ttlSeconds = mkOption {
|
||||
type = types.ints.positive;
|
||||
default = 3600;
|
||||
description = "Age threshold in seconds before offline runners are marked deleted.";
|
||||
};
|
||||
|
||||
onBootSec = mkOption {
|
||||
type = types.str;
|
||||
default = "15m";
|
||||
description = "How long after boot to wait before the first prune run.";
|
||||
};
|
||||
|
||||
onUnitActiveSec = mkOption {
|
||||
type = types.str;
|
||||
default = "1h";
|
||||
description = "How often to rerun stale runner pruning.";
|
||||
};
|
||||
|
||||
randomizedDelaySec = mkOption {
|
||||
type = types.str;
|
||||
default = "10m";
|
||||
description = "Randomized delay applied to the prune timer.";
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
config = mkIf cfg.enable {
|
||||
|
|
@ -230,5 +262,35 @@ in {
|
|||
tokenSync
|
||||
]);
|
||||
};
|
||||
|
||||
systemd.services.forgejo-prune-runners = mkIf cfg.pruneRunners.enable {
|
||||
description = "Prune offline Forgejo action runners";
|
||||
after = [ "forgejo.service" ];
|
||||
requires = [ "forgejo.service" ];
|
||||
serviceConfig = {
|
||||
Type = "oneshot";
|
||||
User = "forgejo";
|
||||
Group = "forgejo";
|
||||
};
|
||||
environment = {
|
||||
FORGEJO_PRUNE_DB = "1";
|
||||
FORGEJO_RUNNER_TTL_SEC = toString cfg.pruneRunners.ttlSeconds;
|
||||
};
|
||||
path = [ pkgs.python3 pkgs.postgresql ];
|
||||
script = ''
|
||||
${pkgs.python3}/bin/python3 ${self}/Scripts/forgejo-prune-runners.py
|
||||
'';
|
||||
};
|
||||
|
||||
systemd.timers.forgejo-prune-runners = mkIf cfg.pruneRunners.enable {
|
||||
description = "Periodic Forgejo runner cleanup";
|
||||
wantedBy = [ "timers.target" ];
|
||||
timerConfig = {
|
||||
OnBootSec = cfg.pruneRunners.onBootSec;
|
||||
OnUnitActiveSec = cfg.pruneRunners.onUnitActiveSec;
|
||||
RandomizedDelaySec = cfg.pruneRunners.randomizedDelaySec;
|
||||
Unit = "forgejo-prune-runners.service";
|
||||
};
|
||||
};
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,6 +45,9 @@ profile. The important knobs are:
|
|||
- `namespace.machine_type` / `namespace.duration` – shape + TTL for the ephemeral
|
||||
Namespace environment. The dispatcher destroys the instance after a job so the
|
||||
TTL acts as a hard cap, not an idle timeout.
|
||||
- macOS fallback launches still use `nsc create`, but bootstrap runs over the
|
||||
Compute SSH config endpoint instead of `nsc ssh` so the dispatcher can always
|
||||
destroy the instance itself instead of relying on a websocket SSH proxy handoff.
|
||||
- `namespace.linux_cache_*` / `namespace.macos_cache_*` – persistent cache
|
||||
volumes mounted into runners so Linux can keep `/nix` plus shared build
|
||||
caches warm and macOS can reuse Rust toolchains, Xcode package caches, and
|
||||
|
|
@ -176,6 +179,9 @@ Long-lived runtime state is now sourced from age-encrypted files:
|
|||
After refreshing the encrypted secrets, deploy the forge host so
|
||||
`config.age.secrets.*` updates the live paths for `services.burrow.forge`,
|
||||
`services.burrow.forgeRunner`, and `services.burrow.forgejoNsc`.
|
||||
The Nix host module also installs a periodic `forgejo-prune-runners` timer that
|
||||
marks stale offline runners deleted in Forgejo's database so wedged instances do
|
||||
not leave the queue polluted indefinitely.
|
||||
|
||||
Run it next to the dispatcher:
|
||||
|
||||
|
|
|
|||
|
|
@ -64,13 +64,6 @@ func normalizeMacOSNSCMachineType(machineType string) (normalized string, change
|
|||
return normalized, changed, nil
|
||||
}
|
||||
|
||||
type macosNSCSSHOutcome int
|
||||
|
||||
const (
|
||||
macosNSCSSHCompleted macosNSCSSHOutcome = iota
|
||||
macosNSCSSHHandoff
|
||||
)
|
||||
|
||||
func (d *Dispatcher) launchMacOSRunnerViaNSC(ctx context.Context, runnerName string, req LaunchRequest, ttl time.Duration, machineType string) error {
|
||||
if machineType == "" {
|
||||
return errors.New("machine_type is required for macos runners")
|
||||
|
|
@ -223,30 +216,16 @@ func (d *Dispatcher) launchMacOSRunnerViaNSC(ctx context.Context, runnerName str
|
|||
return fmt.Errorf("nsc create failed without producing an instance id\n%s", lastOut)
|
||||
}
|
||||
|
||||
destroyOnReturn := true
|
||||
// Always attempt cleanup on failure; successful handoff is allowed to run out
|
||||
// to its NSC TTL because `nsc ssh` may detach before the Forgejo job exits.
|
||||
defer func() {
|
||||
if destroyOnReturn {
|
||||
d.destroyNSCInstance(context.Background(), runnerName, instanceID)
|
||||
}
|
||||
}()
|
||||
// Always attempt cleanup even if the runner fails.
|
||||
defer d.destroyNSCInstance(context.Background(), runnerName, instanceID)
|
||||
|
||||
script := macosBootstrapWrapperScript(runnerName, req, d.opts.Executor, d.opts.WorkDir)
|
||||
// The CLI fallback is explicitly keychain-backed and does not rely on the
|
||||
// service bearer token, so use `nsc ssh` end-to-end here.
|
||||
outcome, err := d.runMacOSNSCSSHScript(ctx, runnerName, instanceID, script)
|
||||
if err != nil {
|
||||
// Use the Compute SSH config endpoint (direct TCP) instead of `nsc ssh`, which
|
||||
// relies on a websocket-based SSH proxy that is less reliable under the
|
||||
// revokable tenant token flow used by the dispatcher.
|
||||
if err := d.runMacOSComputeSSHScript(ctx, runnerName, instanceID, script); err != nil {
|
||||
return err
|
||||
}
|
||||
if outcome == macosNSCSSHHandoff {
|
||||
destroyOnReturn = false
|
||||
d.log.Info("leaving macos nsc instance running until TTL after runner handoff",
|
||||
"runner", runnerName,
|
||||
"instance", instanceID,
|
||||
"ttl", ttl.String(),
|
||||
)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -366,57 +345,6 @@ func shellSingleQuote(value string) string {
|
|||
return "'" + strings.ReplaceAll(value, "'", `'\"'\"'`) + "'"
|
||||
}
|
||||
|
||||
func (d *Dispatcher) runMacOSNSCSSHScript(ctx context.Context, runnerName, instanceID, script string) (macosNSCSSHOutcome, error) {
|
||||
sshCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
args := []string{"ssh", "--disable-pty", instanceID, "/bin/bash"}
|
||||
args = prependNSCRegionArgs(args, d.opts.ComputeBaseURL)
|
||||
|
||||
cmd := exec.CommandContext(sshCtx, d.opts.BinaryPath, args...)
|
||||
cmd.Env = nscCLIEnv()
|
||||
cmd.Stdin = strings.NewReader(script)
|
||||
|
||||
var buf bytes.Buffer
|
||||
cmd.Stdout = &buf
|
||||
cmd.Stderr = &buf
|
||||
|
||||
if err := cmd.Run(); err != nil {
|
||||
if errors.Is(sshCtx.Err(), context.DeadlineExceeded) {
|
||||
return macosNSCSSHCompleted, fmt.Errorf("nsc ssh timed out after %s\n%s", 5*time.Minute, strings.TrimSpace(buf.String()))
|
||||
}
|
||||
if nscSSHBootstrapLikelySucceeded(err, buf.String()) {
|
||||
d.log.Warn("nsc ssh exited after runner handoff; treating bootstrap as successful",
|
||||
"runner", runnerName,
|
||||
"instance", instanceID,
|
||||
"err", err,
|
||||
)
|
||||
d.log.Info("macos runner bootstrap completed via nsc ssh", "runner", runnerName, "instance", instanceID)
|
||||
return macosNSCSSHHandoff, nil
|
||||
}
|
||||
return macosNSCSSHCompleted, fmt.Errorf("nsc ssh runner bootstrap failed: %w\n%s", err, strings.TrimSpace(buf.String()))
|
||||
}
|
||||
|
||||
d.log.Info("macos runner bootstrap completed via nsc ssh", "runner", runnerName, "instance", instanceID)
|
||||
return macosNSCSSHCompleted, nil
|
||||
}
|
||||
|
||||
func nscSSHBootstrapLikelySucceeded(err error, output string) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
errText := strings.ToLower(err.Error())
|
||||
if !strings.Contains(errText, "remote command exited without exit status or exit signal") {
|
||||
return false
|
||||
}
|
||||
|
||||
output = strings.ToLower(output)
|
||||
return strings.Contains(output, "runner registered successfully") &&
|
||||
strings.Contains(output, "starting job") &&
|
||||
strings.Contains(output, "task ")
|
||||
}
|
||||
|
||||
func prependNSCRegionArgs(args []string, computeBaseURL string) []string {
|
||||
region := strings.TrimSpace(os.Getenv("NSC_REGION"))
|
||||
if region == "" {
|
||||
|
|
|
|||
|
|
@ -1,47 +1,33 @@
|
|||
package nsc
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
)
|
||||
import "testing"
|
||||
|
||||
func TestNSCSSHBootstrapLikelySucceeded(t *testing.T) {
|
||||
func TestNormalizeMacOSNSCMachineTypeRoundsUp(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
err := errors.New("wait: remote command exited without exit status or exit signal")
|
||||
output := `
|
||||
level=info msg="Runner registered successfully."
|
||||
time="2026-03-19T11:29:49Z" level=info msg="Starting job"
|
||||
time="2026-03-19T11:29:50Z" level=info msg="task 124 repo is hackclub/burrow"
|
||||
`
|
||||
|
||||
if !nscSSHBootstrapLikelySucceeded(err, output) {
|
||||
t.Fatal("expected handoff success heuristic to match")
|
||||
got, changed, err := normalizeMacOSNSCMachineType("5x10")
|
||||
if err != nil {
|
||||
t.Fatalf("normalizeMacOSNSCMachineType: %v", err)
|
||||
}
|
||||
if !changed {
|
||||
t.Fatal("expected machine type to be normalized")
|
||||
}
|
||||
if got != "6x14" {
|
||||
t.Fatalf("expected 6x14, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNSCSSHBootstrapLikelySucceededRejectsIncompleteOutput(t *testing.T) {
|
||||
func TestNormalizeMacOSNSCMachineTypeKeepsAllowedShape(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
err := errors.New("wait: remote command exited without exit status or exit signal")
|
||||
output := `level=info msg="Runner registered successfully."`
|
||||
|
||||
if nscSSHBootstrapLikelySucceeded(err, output) {
|
||||
t.Fatal("expected incomplete runner output to remain a failure")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNSCSSHBootstrapLikelySucceededRejectsDifferentErrors(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
err := errors.New("exit status 1")
|
||||
output := `
|
||||
level=info msg="Runner registered successfully."
|
||||
time="2026-03-19T11:29:49Z" level=info msg="Starting job"
|
||||
time="2026-03-19T11:29:50Z" level=info msg="task 124 repo is hackclub/burrow"
|
||||
`
|
||||
|
||||
if nscSSHBootstrapLikelySucceeded(err, output) {
|
||||
t.Fatal("expected unrelated nsc ssh errors to remain failures")
|
||||
got, changed, err := normalizeMacOSNSCMachineType("6x14")
|
||||
if err != nil {
|
||||
t.Fatalf("normalizeMacOSNSCMachineType: %v", err)
|
||||
}
|
||||
if changed {
|
||||
t.Fatal("expected allowed machine type to remain unchanged")
|
||||
}
|
||||
if got != "6x14" {
|
||||
t.Fatalf("expected 6x14, got %q", got)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue