Add Burrow forge infrastructure and tailnet control plane

This commit is contained in:
Conrad Kramer 2026-03-31 14:53:48 -07:00
parent d1ed826389
commit de25f240d5
51 changed files with 9058 additions and 0 deletions

View file

@ -0,0 +1,385 @@
package autoscaler
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/go-chi/chi/v5"
"namespacelabs.dev/foundation/std/tasks"
"github.com/burrow/forgejo-nsc/internal/forgejo"
)
type Service struct {
listen string
controllers map[string]*InstanceController
router chi.Router
}
func NewService(cfg Config) (*Service, error) {
controllers := make(map[string]*InstanceController)
for _, inst := range cfg.Instances {
scope, err := inst.Scope.ToScope()
if err != nil {
return nil, err
}
forgejoClient, err := forgejo.NewClient(inst.Forgejo.BaseURL, inst.Forgejo.Token)
if err != nil {
return nil, err
}
dispCfg := cfg.Dispatcher
if inst.Dispatcher != nil && inst.Dispatcher.URL != "" {
dispCfg = *inst.Dispatcher
if dispCfg.Timeout.Duration == 0 {
dispCfg.Timeout = cfg.Dispatcher.Timeout
}
}
dClient := newDispatcherClient(dispCfg.URL, dispCfg.Timeout.Duration)
webhookActive := true
if inst.Webhook.Active != nil {
webhookActive = *inst.Webhook.Active
}
controller := &InstanceController{
name: inst.Name,
cfg: inst,
scope: scope,
forgejo: forgejoClient,
dispatcher: dClient,
webhook: forgejo.WebhookConfig{
URL: inst.Webhook.URL,
ContentType: inst.Webhook.ContentType,
Events: inst.Webhook.Events,
Active: webhookActive,
},
secret: inst.WebhookSecret,
}
controllers[inst.Name] = controller
}
router := chi.NewRouter()
service := &Service{
listen: cfg.Listen,
controllers: controllers,
router: router,
}
router.Get("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
})
router.Post("/webhook/{instance}", service.handleWebhook)
return service, nil
}
func (s *Service) Start(ctx context.Context) error {
for _, controller := range s.controllers {
if err := controller.EnsureWebhook(ctx); err != nil {
return err
}
}
var wg sync.WaitGroup
for _, controller := range s.controllers {
wg.Add(1)
go func(c *InstanceController) {
defer wg.Done()
c.Run(ctx)
}(controller)
}
srv := &http.Server{
Addr: s.listen,
Handler: s.router,
}
go func() {
<-ctx.Done()
_ = srv.Shutdown(context.Background())
}()
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}
wg.Wait()
return nil
}
func (s *Service) handleWebhook(w http.ResponseWriter, r *http.Request) {
name := chi.URLParam(r, "instance")
controller, ok := s.controllers[name]
if !ok {
http.Error(w, "unknown instance", http.StatusNotFound)
return
}
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "invalid body", http.StatusBadRequest)
return
}
if controller.cfg.WebhookSecret != "" {
signature := r.Header.Get("X-Gitea-Signature")
if signature == "" {
http.Error(w, "missing signature", http.StatusUnauthorized)
return
}
if !verifySignature(controller.cfg.WebhookSecret, signature, body) {
http.Error(w, "invalid signature", http.StatusUnauthorized)
return
}
}
var payload workflowJobPayload
if err := json.Unmarshal(body, &payload); err != nil {
http.Error(w, "bad payload", http.StatusBadRequest)
return
}
controller.MarkWebhookSeen()
if payload.Action == "queued" {
controller.DispatchForJob(r.Context(), payload)
}
w.WriteHeader(http.StatusAccepted)
}
type workflowJobPayload struct {
Action string `json:"action"`
WorkflowJob struct {
Labels []string `json:"labels"`
} `json:"workflow_job"`
}
type InstanceController struct {
name string
cfg InstanceConfig
scope forgejo.Scope
forgejo *forgejo.Client
dispatcher *dispatcherClient
ready atomic.Bool
webhook forgejo.WebhookConfig
secret string
}
func (c *InstanceController) EnsureWebhook(ctx context.Context) error {
if c.webhook.URL == "" {
return nil
}
return tasks.Action("autoscaler.ensure-webhook").Arg("instance", c.name).Run(ctx, func(ctx context.Context) error {
return c.forgejo.EnsureWebhook(ctx, c.scope, c.webhook, c.secret)
})
}
func (c *InstanceController) Run(ctx context.Context) {
if c.cfg.DisablePolling {
<-ctx.Done()
return
}
ticker := time.NewTicker(c.cfg.PollInterval.Duration)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_ = tasks.Action("autoscaler.poll").Arg("instance", c.name).Run(ctx, func(ctx context.Context) error {
return c.reconcile(ctx)
})
}
}
}
func (c *InstanceController) reconcile(ctx context.Context) error {
runners, err := c.forgejo.ListRunners(ctx, c.scope)
if err != nil {
// Keep polling even if runner listing fails; we can still dispatch based on queued jobs.
runners = nil
}
for _, target := range c.cfg.Targets {
idle := countIdle(runners, target.Labels)
need := 0
if idle < target.MinIdle {
need = target.MinIdle - idle
}
jobs, jobErr := c.forgejo.ListRunJobs(ctx, c.scope, target.Labels)
if jobErr != nil {
return jobErr
}
waiting := countWaitingJobs(jobs, target.Labels)
// Scale-to-zero friendly: if anything is waiting and there are no idle runners
// for that label set, dispatch exactly one runner to unblock the queue.
if waiting > 0 && idle == 0 && need < 1 {
need = 1
}
if need <= 0 {
continue
}
if err := c.dispatch(ctx, target, need, "poll"); err != nil {
return err
}
}
return nil
}
func (c *InstanceController) dispatch(ctx context.Context, target TargetConfig, count int, reason string) error {
if count <= 0 {
return nil
}
req := dispatcherRequest{
Count: count,
Labels: target.Labels,
}
if target.TTL.Duration > 0 {
req.TTL = target.TTL.Duration.String()
}
if target.MachineType != "" {
req.MachineType = target.MachineType
}
if target.Image != "" {
req.Image = target.Image
}
if len(target.Env) > 0 {
req.Env = target.Env
}
return tasks.Action("autoscaler.dispatch").Arg("instance", c.name).Arg("reason", reason).Arg("labels", strings.Join(target.Labels, ",")).Run(ctx, func(ctx context.Context) error {
return c.dispatcher.Dispatch(ctx, req)
})
}
func (c *InstanceController) DispatchForJob(ctx context.Context, payload workflowJobPayload) {
action := strings.ToLower(payload.Action)
if action != "queued" && action != "waiting" {
return
}
jobLabels := payload.WorkflowJob.Labels
for _, target := range c.cfg.Targets {
if labelsMatch(jobLabels, target.Labels) {
_ = c.dispatch(ctx, target, 1, "webhook")
return
}
}
}
func (c *InstanceController) MarkWebhookSeen() {
c.ready.Store(true)
}
func countIdle(runners []forgejo.Runner, labels []string) int {
count := 0
for _, runner := range runners {
if strings.ToLower(runner.Status) != "online" || runner.Busy {
continue
}
if labelsMatch(extractLabels(runner.Labels), labels) {
count++
}
}
return count
}
func countWaitingJobs(jobs []forgejo.RunJob, labels []string) int {
count := 0
for _, job := range jobs {
if status := strings.ToLower(job.Status); status != "waiting" && status != "queued" {
continue
}
if labelsMatch(job.RunsOn, labels) {
count++
}
}
return count
}
func extractLabels(src []forgejo.RunnerLabel) []string {
result := make([]string, 0, len(src))
for _, lbl := range src {
result = append(result, lbl.Name)
}
return result
}
func labelsMatch(have, want []string) bool {
set := make(map[string]struct{}, len(have))
for _, label := range have {
set[label] = struct{}{}
}
for _, label := range want {
if _, ok := set[label]; !ok {
return false
}
}
return true
}
func verifySignature(secret, signature string, body []byte) bool {
parts := strings.SplitN(signature, "=", 2)
if len(parts) == 2 {
signature = parts[1]
}
mac := hmac.New(sha256.New, []byte(secret))
mac.Write(body)
expected := hex.EncodeToString(mac.Sum(nil))
return hmac.Equal([]byte(expected), []byte(signature))
}
type dispatcherClient struct {
url string
client *http.Client
}
type dispatcherRequest struct {
Count int `json:"count"`
Labels []string `json:"labels"`
TTL string `json:"ttl,omitempty"`
MachineType string `json:"machine_type,omitempty"`
Image string `json:"image,omitempty"`
Env map[string]string `json:"env,omitempty"`
}
func newDispatcherClient(url string, timeout time.Duration) *dispatcherClient {
if timeout == 0 {
timeout = 30 * time.Second
}
return &dispatcherClient{
url: url,
client: &http.Client{
Timeout: timeout,
},
}
}
func (d *dispatcherClient) Dispatch(ctx context.Context, req dispatcherRequest) error {
body, _ := json.Marshal(req)
endpoint := strings.TrimSuffix(d.url, "/") + "/api/v1/dispatch"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
if err != nil {
return err
}
httpReq.Header.Set("Content-Type", "application/json")
resp, err := d.client.Do(httpReq)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("dispatcher returned %s", resp.Status)
}
return nil
}