Add Forgejo namespace workflow stack
This commit is contained in:
parent
482fd5d085
commit
865b676c99
68 changed files with 9709 additions and 11 deletions
385
services/forgejo-nsc/internal/autoscaler/service.go
Normal file
385
services/forgejo-nsc/internal/autoscaler/service.go
Normal file
|
|
@ -0,0 +1,385 @@
|
|||
package autoscaler
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/hmac"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"namespacelabs.dev/foundation/std/tasks"
|
||||
|
||||
"github.com/burrow/forgejo-nsc/internal/forgejo"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
listen string
|
||||
controllers map[string]*InstanceController
|
||||
router chi.Router
|
||||
}
|
||||
|
||||
func NewService(cfg Config) (*Service, error) {
|
||||
controllers := make(map[string]*InstanceController)
|
||||
for _, inst := range cfg.Instances {
|
||||
scope, err := inst.Scope.ToScope()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
forgejoClient, err := forgejo.NewClient(inst.Forgejo.BaseURL, inst.Forgejo.Token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dispCfg := cfg.Dispatcher
|
||||
if inst.Dispatcher != nil && inst.Dispatcher.URL != "" {
|
||||
dispCfg = *inst.Dispatcher
|
||||
if dispCfg.Timeout.Duration == 0 {
|
||||
dispCfg.Timeout = cfg.Dispatcher.Timeout
|
||||
}
|
||||
}
|
||||
dClient := newDispatcherClient(dispCfg.URL, dispCfg.Timeout.Duration)
|
||||
webhookActive := true
|
||||
if inst.Webhook.Active != nil {
|
||||
webhookActive = *inst.Webhook.Active
|
||||
}
|
||||
controller := &InstanceController{
|
||||
name: inst.Name,
|
||||
cfg: inst,
|
||||
scope: scope,
|
||||
forgejo: forgejoClient,
|
||||
dispatcher: dClient,
|
||||
webhook: forgejo.WebhookConfig{
|
||||
URL: inst.Webhook.URL,
|
||||
ContentType: inst.Webhook.ContentType,
|
||||
Events: inst.Webhook.Events,
|
||||
Active: webhookActive,
|
||||
},
|
||||
secret: inst.WebhookSecret,
|
||||
}
|
||||
controllers[inst.Name] = controller
|
||||
}
|
||||
|
||||
router := chi.NewRouter()
|
||||
service := &Service{
|
||||
listen: cfg.Listen,
|
||||
controllers: controllers,
|
||||
router: router,
|
||||
}
|
||||
|
||||
router.Get("/healthz", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte("ok"))
|
||||
})
|
||||
router.Post("/webhook/{instance}", service.handleWebhook)
|
||||
|
||||
return service, nil
|
||||
}
|
||||
|
||||
func (s *Service) Start(ctx context.Context) error {
|
||||
for _, controller := range s.controllers {
|
||||
if err := controller.EnsureWebhook(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, controller := range s.controllers {
|
||||
wg.Add(1)
|
||||
go func(c *InstanceController) {
|
||||
defer wg.Done()
|
||||
c.Run(ctx)
|
||||
}(controller)
|
||||
}
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: s.listen,
|
||||
Handler: s.router,
|
||||
}
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
_ = srv.Shutdown(context.Background())
|
||||
}()
|
||||
|
||||
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
return err
|
||||
}
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) handleWebhook(w http.ResponseWriter, r *http.Request) {
|
||||
name := chi.URLParam(r, "instance")
|
||||
controller, ok := s.controllers[name]
|
||||
if !ok {
|
||||
http.Error(w, "unknown instance", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, "invalid body", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if controller.cfg.WebhookSecret != "" {
|
||||
signature := r.Header.Get("X-Gitea-Signature")
|
||||
if signature == "" {
|
||||
http.Error(w, "missing signature", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
if !verifySignature(controller.cfg.WebhookSecret, signature, body) {
|
||||
http.Error(w, "invalid signature", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
var payload workflowJobPayload
|
||||
if err := json.Unmarshal(body, &payload); err != nil {
|
||||
http.Error(w, "bad payload", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
controller.MarkWebhookSeen()
|
||||
if payload.Action == "queued" {
|
||||
controller.DispatchForJob(r.Context(), payload)
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
}
|
||||
|
||||
type workflowJobPayload struct {
|
||||
Action string `json:"action"`
|
||||
WorkflowJob struct {
|
||||
Labels []string `json:"labels"`
|
||||
} `json:"workflow_job"`
|
||||
}
|
||||
|
||||
type InstanceController struct {
|
||||
name string
|
||||
cfg InstanceConfig
|
||||
scope forgejo.Scope
|
||||
forgejo *forgejo.Client
|
||||
dispatcher *dispatcherClient
|
||||
ready atomic.Bool
|
||||
webhook forgejo.WebhookConfig
|
||||
secret string
|
||||
}
|
||||
|
||||
func (c *InstanceController) EnsureWebhook(ctx context.Context) error {
|
||||
if c.webhook.URL == "" {
|
||||
return nil
|
||||
}
|
||||
return tasks.Action("autoscaler.ensure-webhook").Arg("instance", c.name).Run(ctx, func(ctx context.Context) error {
|
||||
return c.forgejo.EnsureWebhook(ctx, c.scope, c.webhook, c.secret)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *InstanceController) Run(ctx context.Context) {
|
||||
if c.cfg.DisablePolling {
|
||||
<-ctx.Done()
|
||||
return
|
||||
}
|
||||
ticker := time.NewTicker(c.cfg.PollInterval.Duration)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
_ = tasks.Action("autoscaler.poll").Arg("instance", c.name).Run(ctx, func(ctx context.Context) error {
|
||||
return c.reconcile(ctx)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *InstanceController) reconcile(ctx context.Context) error {
|
||||
runners, err := c.forgejo.ListRunners(ctx, c.scope)
|
||||
if err != nil {
|
||||
// Keep polling even if runner listing fails; we can still dispatch based on queued jobs.
|
||||
runners = nil
|
||||
}
|
||||
|
||||
for _, target := range c.cfg.Targets {
|
||||
idle := countIdle(runners, target.Labels)
|
||||
|
||||
need := 0
|
||||
if idle < target.MinIdle {
|
||||
need = target.MinIdle - idle
|
||||
}
|
||||
|
||||
jobs, jobErr := c.forgejo.ListRunJobs(ctx, c.scope, target.Labels)
|
||||
if jobErr != nil {
|
||||
return jobErr
|
||||
}
|
||||
waiting := countWaitingJobs(jobs, target.Labels)
|
||||
// Scale-to-zero friendly: if anything is waiting and there are no idle runners
|
||||
// for that label set, dispatch exactly one runner to unblock the queue.
|
||||
if waiting > 0 && idle == 0 && need < 1 {
|
||||
need = 1
|
||||
}
|
||||
|
||||
if need <= 0 {
|
||||
continue
|
||||
}
|
||||
if err := c.dispatch(ctx, target, need, "poll"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *InstanceController) dispatch(ctx context.Context, target TargetConfig, count int, reason string) error {
|
||||
if count <= 0 {
|
||||
return nil
|
||||
}
|
||||
req := dispatcherRequest{
|
||||
Count: count,
|
||||
Labels: target.Labels,
|
||||
}
|
||||
if target.TTL.Duration > 0 {
|
||||
req.TTL = target.TTL.Duration.String()
|
||||
}
|
||||
if target.MachineType != "" {
|
||||
req.MachineType = target.MachineType
|
||||
}
|
||||
if target.Image != "" {
|
||||
req.Image = target.Image
|
||||
}
|
||||
if len(target.Env) > 0 {
|
||||
req.Env = target.Env
|
||||
}
|
||||
return tasks.Action("autoscaler.dispatch").Arg("instance", c.name).Arg("reason", reason).Arg("labels", strings.Join(target.Labels, ",")).Run(ctx, func(ctx context.Context) error {
|
||||
return c.dispatcher.Dispatch(ctx, req)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *InstanceController) DispatchForJob(ctx context.Context, payload workflowJobPayload) {
|
||||
action := strings.ToLower(payload.Action)
|
||||
if action != "queued" && action != "waiting" {
|
||||
return
|
||||
}
|
||||
jobLabels := payload.WorkflowJob.Labels
|
||||
for _, target := range c.cfg.Targets {
|
||||
if labelsMatch(jobLabels, target.Labels) {
|
||||
_ = c.dispatch(ctx, target, 1, "webhook")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *InstanceController) MarkWebhookSeen() {
|
||||
c.ready.Store(true)
|
||||
}
|
||||
|
||||
func countIdle(runners []forgejo.Runner, labels []string) int {
|
||||
count := 0
|
||||
for _, runner := range runners {
|
||||
if strings.ToLower(runner.Status) != "online" || runner.Busy {
|
||||
continue
|
||||
}
|
||||
if labelsMatch(extractLabels(runner.Labels), labels) {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
func countWaitingJobs(jobs []forgejo.RunJob, labels []string) int {
|
||||
count := 0
|
||||
for _, job := range jobs {
|
||||
if status := strings.ToLower(job.Status); status != "waiting" && status != "queued" {
|
||||
continue
|
||||
}
|
||||
if labelsMatch(job.RunsOn, labels) {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
func extractLabels(src []forgejo.RunnerLabel) []string {
|
||||
result := make([]string, 0, len(src))
|
||||
for _, lbl := range src {
|
||||
result = append(result, lbl.Name)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func labelsMatch(have, want []string) bool {
|
||||
set := make(map[string]struct{}, len(have))
|
||||
for _, label := range have {
|
||||
set[label] = struct{}{}
|
||||
}
|
||||
for _, label := range want {
|
||||
if _, ok := set[label]; !ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func verifySignature(secret, signature string, body []byte) bool {
|
||||
parts := strings.SplitN(signature, "=", 2)
|
||||
if len(parts) == 2 {
|
||||
signature = parts[1]
|
||||
}
|
||||
mac := hmac.New(sha256.New, []byte(secret))
|
||||
mac.Write(body)
|
||||
expected := hex.EncodeToString(mac.Sum(nil))
|
||||
return hmac.Equal([]byte(expected), []byte(signature))
|
||||
}
|
||||
|
||||
type dispatcherClient struct {
|
||||
url string
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
type dispatcherRequest struct {
|
||||
Count int `json:"count"`
|
||||
Labels []string `json:"labels"`
|
||||
TTL string `json:"ttl,omitempty"`
|
||||
MachineType string `json:"machine_type,omitempty"`
|
||||
Image string `json:"image,omitempty"`
|
||||
Env map[string]string `json:"env,omitempty"`
|
||||
}
|
||||
|
||||
func newDispatcherClient(url string, timeout time.Duration) *dispatcherClient {
|
||||
if timeout == 0 {
|
||||
timeout = 30 * time.Second
|
||||
}
|
||||
return &dispatcherClient{
|
||||
url: url,
|
||||
client: &http.Client{
|
||||
Timeout: timeout,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (d *dispatcherClient) Dispatch(ctx context.Context, req dispatcherRequest) error {
|
||||
body, _ := json.Marshal(req)
|
||||
endpoint := strings.TrimSuffix(d.url, "/") + "/api/v1/dispatch"
|
||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
resp, err := d.client.Do(httpReq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode >= 300 {
|
||||
return fmt.Errorf("dispatcher returned %s", resp.Status)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue