Files
multica/server/cmd/server/router.go
J c1b236f9cf feat(lark): serve Feishu and Lark from one deployment, per installation
The Lark integration was locked to a single open-platform host chosen
deployment-wide (MULTICA_LARK_HTTP_BASE_URL / _CALLBACK_BASE_URL,
defaulting to open.feishu.cn), so one deployment could talk to only the
mainland Feishu cloud OR Lark international — never both. Teams on the
other tenant could not use the integration at all.

Make the host per-installation. The device-flow installer already
auto-detects the tenant (Lark emits tenant_brand="lark" mid-poll); we now
persist that as lark_installation.region, carry it on
InstallationCredentials.Region, and resolve the open-platform host per
call (REST + WS bootstrap) from the region. An explicit cfg.BaseURL
(env / httptest) still overrides every region, so existing tests and
staging/proxy setups keep working.

- migration 116: lark_installation.region TEXT NOT NULL DEFAULT 'feishu'
  CHECK (region IN ('feishu','lark')) — existing rows are all mainland.
- lark.Region enum + OpenPlatformBaseURL/RegionOrDefault helpers.
- registration: thread the detected region into finishSuccess so the
  install-time GetBotInfo hits the right cloud AND the row records it.
- every credential-build site (patcher, replier, WS provider, union_id
  backfill) copies region off the installation row.
- region is part of the WS supervisor fingerprint so a re-install that
  switches cloud restarts the connection.
- API: surface region on the installation listing DTO.

MUL-3083

Co-authored-by: multica-agent <github@multica.ai>
2026-06-05 15:26:41 +08:00

1101 lines
45 KiB
Go

package main
import (
"context"
"log/slog"
"net/http"
"net/netip"
"os"
"strings"
"time"
"github.com/go-chi/chi/v5"
chimw "github.com/go-chi/chi/v5/middleware"
"github.com/go-chi/cors"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/redis/go-redis/v9"
"github.com/multica-ai/multica/server/internal/analytics"
"github.com/multica-ai/multica/server/internal/auth"
"github.com/multica-ai/multica/server/internal/cloudruntime"
"github.com/multica-ai/multica/server/internal/daemonws"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/handler"
"github.com/multica-ai/multica/server/internal/integrations/lark"
obsmetrics "github.com/multica-ai/multica/server/internal/metrics"
"github.com/multica-ai/multica/server/internal/middleware"
"github.com/multica-ai/multica/server/internal/realtime"
"github.com/multica-ai/multica/server/internal/service"
"github.com/multica-ai/multica/server/internal/storage"
"github.com/multica-ai/multica/server/internal/util"
"github.com/multica-ai/multica/server/internal/util/secretbox"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
var defaultOrigins = []string{
"http://localhost:3000", // Next.js dev
"http://localhost:5173", // electron-vite dev
"http://localhost:5174", // electron-vite dev (fallback port)
}
func allowedOrigins() []string {
raw := strings.TrimSpace(os.Getenv("CORS_ALLOWED_ORIGINS"))
if raw == "" {
raw = strings.TrimSpace(os.Getenv("FRONTEND_ORIGIN"))
}
if raw == "" {
return defaultOrigins
}
parts := strings.Split(raw, ",")
origins := make([]string, 0, len(parts))
for _, part := range parts {
origin := strings.TrimSpace(part)
if origin != "" {
origins = append(origins, origin)
}
}
if len(origins) == 0 {
return defaultOrigins
}
return origins
}
// parseTrustedProxies parses a comma-separated list of CIDR prefixes from the
// MULTICA_TRUSTED_PROXIES env var. Invalid entries are dropped with a single
// warn-line per entry rather than crashing the server — a typo in one CIDR
// shouldn't take the whole API down. Returns nil for empty input, which the
// rate limiter treats as "trust no proxy headers, use RemoteAddr only".
func parseTrustedProxies(raw string) []netip.Prefix {
raw = strings.TrimSpace(raw)
if raw == "" {
return nil
}
var out []netip.Prefix
for _, part := range strings.Split(raw, ",") {
s := strings.TrimSpace(part)
if s == "" {
continue
}
p, err := netip.ParsePrefix(s)
if err != nil {
slog.Warn("MULTICA_TRUSTED_PROXIES: ignoring invalid CIDR",
"value", s, "error", err)
continue
}
out = append(out, p)
}
return out
}
// NewRouter creates the fully-configured Chi router with all middleware and routes.
// rdb is optional: when non-nil the runtime local-skill request stores are
// swapped for Redis-backed implementations so multiple API nodes share the
// same pending queue (required for multi-node prod). This should be a request
// path Redis client, not the realtime relay's blocking read client. A nil rdb
// keeps the default in-memory stores which are fine for single-node dev and
// tests.
func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus, analyticsClient analytics.Client, rdb *redis.Client) chi.Router {
r, _ := NewRouterWithOptions(pool, hub, bus, analyticsClient, rdb, RouterOptions{})
return r
}
type RouterOptions struct {
HTTPMetrics *obsmetrics.HTTPMetrics
BusinessMetrics *obsmetrics.BusinessMetrics
DaemonHub *daemonws.Hub
DaemonWakeup service.TaskWakeupNotifier
// HeartbeatScheduler, when non-nil, replaces the default synchronous
// passthrough scheduler on the constructed Handler. main.go injects a
// BatchedHeartbeatScheduler here so the caller can also drive Run/Stop;
// tests leave this nil and get the legacy synchronous behavior.
HeartbeatScheduler handler.HeartbeatScheduler
}
// NewRouterWithOptions builds the fully-configured Chi router and
// returns the *handler.Handler it was constructed from. Callers that
// need to drive background lifecycle on services attached to the
// handler (e.g. starting the Lark inbound Hub under a long-running
// context, calling Wait on shutdown) use the returned handler;
// callers that only need the HTTP handler (tests, the simple
// NewRouter shim) discard the second value.
func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus, analyticsClient analytics.Client, rdb *redis.Client, opts RouterOptions) (chi.Router, *handler.Handler) {
queries := db.New(pool)
emailSvc := service.NewEmailService()
daemonHub := opts.DaemonHub
if daemonHub == nil {
daemonHub = daemonws.NewHub()
}
// Initialize storage with S3 as primary, fallback to local
var store storage.Storage
s3 := storage.NewS3StorageFromEnv()
if s3 != nil {
store = s3
} else {
local := storage.NewLocalStorageFromEnv()
if local != nil {
store = local
}
}
cfSigner := auth.NewCloudFrontSignerFromEnv()
signupConfig := handler.Config{
AllowSignup: os.Getenv("ALLOW_SIGNUP") != "false",
AllowedEmails: splitAndTrim(os.Getenv("ALLOWED_EMAILS")),
AllowedEmailDomains: splitAndTrim(os.Getenv("ALLOWED_EMAIL_DOMAINS")),
DisableWorkspaceCreation: os.Getenv("DISABLE_WORKSPACE_CREATION") == "true",
PublicURL: strings.TrimRight(strings.TrimSpace(os.Getenv("MULTICA_PUBLIC_URL")), "/"),
TrustedProxies: parseTrustedProxies(os.Getenv("MULTICA_TRUSTED_PROXIES")),
CloudRuntimeFleetURL: cloudRuntimeFleetURLFromEnv(),
CloudRuntimeFleetTimeout: envDuration("MULTICA_CLOUD_FLEET_TIMEOUT", 35*time.Second),
AttachmentDownloadMode: os.Getenv("ATTACHMENT_DOWNLOAD_MODE"),
AttachmentDownloadURLTTL: envDuration("ATTACHMENT_DOWNLOAD_URL_TTL", 30*time.Minute),
}
h := handler.New(queries, pool, hub, bus, emailSvc, store, cfSigner, analyticsClient, signupConfig, daemonHub)
h.Metrics = opts.BusinessMetrics
h.TaskService.Metrics = opts.BusinessMetrics
h.IssueService.Metrics = opts.BusinessMetrics
if opts.BusinessMetrics != nil {
// Wire the BusinessMetrics receiver into the cloud runtime client
// so every outbound Fleet/Gateway request feeds the
// multica_cloudruntime_request_* histograms.
if client, ok := h.CloudRuntime.(*cloudruntime.Client); ok {
client.SetRecorder(opts.BusinessMetrics)
}
}
if opts.DaemonWakeup != nil {
h.TaskService.Wakeup = opts.DaemonWakeup
}
if rdb != nil {
h.UpdateStore = handler.NewRedisUpdateStore(rdb)
h.ModelListStore = handler.NewRedisModelListStore(rdb)
h.LocalSkillListStore = handler.NewRedisLocalSkillListStore(rdb)
h.LocalSkillImportStore = handler.NewRedisLocalSkillImportStore(rdb)
h.LivenessStore = handler.NewRedisLivenessStore(rdb)
h.WebhookRateLimiter = handler.NewRedisWebhookRateLimiter(rdb, handler.DefaultWebhookRateLimit())
h.WebhookIPRateLimiter = handler.NewRedisWebhookIPRateLimiter(rdb, handler.DefaultWebhookIPRateLimit())
}
// Lark integration. Only wired when MULTICA_LARK_SECRET_KEY is set:
// the InstallationService refuses to fall back to plaintext storage
// for app_secret, and the BindingTokenService cannot mint usable
// tokens without it either. When the key is absent the Lark
// handlers return 503 with a clear message; the rest of the server
// continues to start so self-host deployments that have not opted
// in to Lark are unaffected.
if larkKey, err := secretbox.LoadKey("MULTICA_LARK_SECRET_KEY"); err == nil {
box, err := secretbox.New(larkKey)
if err != nil {
slog.Error("lark: secretbox.New failed; lark integration disabled", "error", err)
} else {
installSvc, err := lark.NewInstallationService(queries, box)
if err != nil {
slog.Error("lark: InstallationService init failed; lark integration disabled", "error", err)
} else {
h.LarkInstallations = installSvc
h.LarkBindingTokens = lark.NewBindingTokenService(queries, pool)
slog.Info("lark integration enabled")
// APIClient: wire the real Lark Open Platform HTTP client
// (IM v1 send/patch + binding-prompt + bot info). Setting
// MULTICA_LARK_SECRET_KEY is the operator's opt-in for
// the integration as a whole; we don't expose a separate
// "HTTP enabled" knob because the inbound dispatcher
// without outbound replies is not a useful production
// state, and CI / integration tests that want to avoid
// real Lark traffic can point MULTICA_LARK_HTTP_BASE_URL
// at a mock server.
//
// MULTICA_LARK_HTTP_BASE_URL overrides the default
// open.feishu.cn host (set to https://open.larksuite.com
// for the Lark international tenant, or to a mock for
// integration tests).
larkClient := lark.NewHTTPAPIClient(lark.HTTPClientConfig{
BaseURL: strings.TrimSpace(os.Getenv("MULTICA_LARK_HTTP_BASE_URL")),
Logger: slog.Default(),
})
h.LarkAPIClient = larkClient
patcher := lark.NewPatcher(queries, installSvc, larkClient, lark.PatcherConfig{})
patcher.Register(bus)
// Inbound pipeline: lark_inbound_audit logger,
// channel-aware ChatSessionService, and the
// Dispatcher that orders identity / dedup / append /
// /issue / enqueue per §4.3. The Dispatcher depends
// on the same IssueService + TaskService that back
// HTTP, so /issue-created issues share counter, dup
// guard, project boundary, broadcast, analytics and
// agent-enqueue with the rest of the product.
auditLogger := lark.NewAuditLogger(queries)
chatSvc := lark.NewChatSessionService(queries, pool)
dispatcher := &lark.Dispatcher{
Queries: queries,
Chat: chatSvc,
Audit: auditLogger,
IssueService: h.IssueService,
TaskService: h.TaskService,
Logger: slog.Default(),
}
// Debounce the per-session run trigger so a burst of
// messages (e.g. "forward a transcript, then type a note")
// collapses into one agent run instead of one per message.
// MUL-2968.
dispatcher.EnableRunBatching(lark.DefaultChatRunBatchWindow)
// WS Hub: lease + supervisor goroutines per installation.
// The WSLongConnConnector talks Lark's long-conn protocol
// over gorilla/websocket. The connector wraps every read
// with a ctx-cancel watchdog so lease loss / shutdown
// breaks the blocking ReadMessage in bounded time — the
// invariant §4.4 leans on. If the endpoint fetcher fails
// to initialize (bad MULTICA_LARK_CALLBACK_BASE_URL or
// similar config error), buildLarkConnectorFactory logs
// and falls back to the NoopConnector so the lease /
// supervisor lifecycle still runs against real DB rows —
// inbound messages will be silently dropped until the
// config is fixed, with the boot log labelling the mode
// "noop" so operators can spot it.
connectorFactory, connectorLabel := buildLarkConnectorFactory(installSvc, larkClient)
h.LarkHub = lark.NewHub(queries, connectorFactory, dispatcher, lark.HubConfig{})
// OutcomeReplier wires the outbound side of the
// EventEmitter contract: NeedsBinding / AgentOffline /
// AgentArchived translate to a Lark-side reply card.
// Requires the real APIClient (the stub returns
// ErrAPIClientNotConfigured on every send) and the
// binding token service. When either is missing, the
// Hub falls back to the noop replier and the outcomes
// get logged but not delivered — clearly visible in
// boot output so operators understand the gap.
replier := lark.NewLarkOutcomeReplier(lark.OutcomeReplierConfig{
APIClient: larkClient,
BindingSvc: h.LarkBindingTokens,
Credentials: installSvc,
Queries: queries,
PublicURL: signupConfig.PublicURL,
Logger: slog.Default(),
})
h.LarkHub.SetOutcomeReplier(replier)
// The agent-offline / agent-archived notice is now decided
// at debounce-flush time rather than synchronously from
// Handle, so the dispatcher drives that reply itself through
// the same replier. MUL-2968.
dispatcher.FlushReply = replier.Reply
slog.Info("lark inbound pipeline wired", "connector", connectorLabel)
// One-shot union_id backfill for installations created
// before migration 112 added bot_union_id. Runs off the
// hot startup path so a slow Lark round-trip cannot block
// HTTP listener boot. New installs already write
// bot_union_id during the device-flow finalize, so this
// is bridge code — it will simply find no rows to update
// on a fresh deployment and exit. MUL-2671.
go lark.BackfillBotUnionIDs(context.Background(), queries, larkClient, installSvc, slog.Default())
// Device-flow registration service: end-to-end install
// pipeline that talks to accounts.feishu.cn (RFC 8628)
// for the QR-scan handshake and then commits the
// resulting Bot credentials + the installer's
// lark_user_binding in one DB transaction. The optional
// MULTICA_LARK_REGISTRATION_DOMAIN / _LARK_DOMAIN env
// vars override the protocol hosts for staging / dev.
regCfg := lark.RegistrationConfig{
Domain: strings.TrimSpace(os.Getenv("MULTICA_LARK_REGISTRATION_DOMAIN")),
LarkDomain: strings.TrimSpace(os.Getenv("MULTICA_LARK_REGISTRATION_LARK_DOMAIN")),
}
regClient := lark.NewRegistrationClient(regCfg)
regSvc, rerr := lark.NewRegistrationService(
lark.RegistrationServiceConfig{Logger: slog.Default()},
regClient,
larkClient,
queries,
pool,
installSvc,
h.LarkBindingTokens,
)
if rerr != nil {
slog.Error("lark: RegistrationService init failed; install disabled", "error", rerr)
} else {
// Publish lark_installation:created at row-commit time so the
// connection badge refreshes on every workspace client, not just
// the tab that polls the install status to success.
regSvc.SetEventBus(bus)
h.LarkRegistration = regSvc
slog.Info("lark device-flow install enabled")
}
}
}
} else {
slog.Info("lark integration disabled (MULTICA_LARK_SECRET_KEY not set)")
}
if opts.HeartbeatScheduler != nil {
h.HeartbeatScheduler = opts.HeartbeatScheduler
}
// Auth caches: PAT cache is shared between the regular Auth middleware,
// the DaemonAuth fallback (mul_) path, and the revoke handler
// (invalidate). DaemonTokenCache backs the DaemonAuth mdt_ path. Both
// constructors return nil when rdb is nil — every consumer handles that
// as "no cache, always hit DB".
patCache := auth.NewPATCache(rdb)
daemonTokenCache := auth.NewDaemonTokenCache(rdb)
h.PATCache = patCache
h.DaemonTokenCache = daemonTokenCache
h.MembershipCache = auth.NewMembershipCache(rdb)
// Cloud PAT verifier: validates mcn_ tokens against Multica Cloud
// Fleet. Returns nil when no Fleet URL is configured — the Auth /
// DaemonAuth middlewares treat nil as "mcn_ not supported" and
// reject with 401, instead of falling through to mul_/JWT paths.
// Reuses MULTICA_CLOUD_FLEET_URL (the same URL the cloud-runtime
// proxy uses) so a deployment doesn't need a second config knob.
cloudPATVerifier := auth.NewCloudPATVerifier(auth.CloudPATVerifierConfig{
FleetBaseURL: signupConfig.CloudRuntimeFleetURL,
Redis: rdb,
})
// Empty-claim cache: lets the daemon poll path skip a Postgres
// scan when a recent check confirmed the runtime had no queued
// task. Returns nil when rdb is nil — TaskService treats that
// as "no cache, always hit DB" (existing behavior).
h.TaskService.EmptyClaim = service.NewEmptyClaimCache(rdb)
// Wire WS heartbeat after stores are finalized so the WS path uses the
// same (possibly Redis-backed) stores as the HTTP path.
daemonHub.SetHeartbeatHandler(h.HandleDaemonWSHeartbeat)
health := newServerHealth(pool)
r := chi.NewRouter()
// Global middleware
r.Use(chimw.RequestID)
r.Use(middleware.ClientMetadata)
r.Use(middleware.RequestLogger)
if opts.HTTPMetrics != nil {
r.Use(opts.HTTPMetrics.Middleware)
}
r.Use(chimw.Recoverer)
r.Use(middleware.ContentSecurityPolicy)
origins := allowedOrigins()
// Share allowed origins with WebSocket origin checker.
realtime.SetAllowedOrigins(origins)
r.Use(cors.Handler(cors.Options{
AllowedOrigins: origins,
AllowedMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"},
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type", "X-Workspace-ID", "X-Workspace-Slug", "X-Request-ID", "X-Agent-ID", "X-Task-ID", "X-CSRF-Token", "X-Client-Platform", "X-Client-Version", "X-Client-OS"},
AllowCredentials: true,
MaxAge: 300,
}))
// Health / readiness checks
r.Get("/health", health.liveHandler)
r.Get("/readyz", health.readyHandler)
r.Get("/healthz", health.readyHandler)
// Realtime subsystem metrics — connection counts, slow-client evictions,
// and per-event-type send QPS counters. Exposed as JSON so it can be
// scraped by ops or surfaced in the admin UI without adding a Prometheus
// dependency. See MUL-1138 (Phase 0).
//
// Access is restricted (MUL-1342): when REALTIME_METRICS_TOKEN is set,
// callers must present it via Authorization: Bearer <token>. When the
// env var is unset the handler only serves loopback callers so local
// dev keeps working without exposing the metrics on a public listener.
r.Get("/health/realtime", realtimeMetricsHandler(os.Getenv("REALTIME_METRICS_TOKEN")))
// WebSocket
mc := &membershipChecker{queries: queries}
pr := &patResolver{queries: queries, cache: patCache}
slugResolver := realtime.SlugResolver(func(ctx context.Context, slug string) (string, error) {
ws, err := queries.GetWorkspaceBySlug(ctx, slug)
if err != nil {
return "", err
}
return util.UUIDToString(ws.ID), nil
})
r.Get("/ws", func(w http.ResponseWriter, r *http.Request) {
realtime.HandleWebSocket(hub, mc, pr, slugResolver, w, r)
})
// Local file serving (when using local storage)
if local, ok := store.(*storage.LocalStorage); ok {
r.Get("/uploads/*", func(w http.ResponseWriter, r *http.Request) {
file := strings.TrimPrefix(r.URL.Path, "/uploads/")
local.ServeFile(w, r, file)
})
}
// Auth (public) — per-IP rate limiting.
if rdb == nil {
slog.Warn("rate limiting disabled: REDIS_URL not configured")
}
trustedProxies := middleware.ParseTrustedProxies(os.Getenv("RATE_LIMIT_TRUSTED_PROXIES"))
authRL := middleware.RateLimit(rdb, envPositiveInt("RATE_LIMIT_AUTH", 5), time.Minute, trustedProxies)
authVerifyRL := middleware.RateLimit(rdb, envPositiveInt("RATE_LIMIT_AUTH_VERIFY", 20), time.Minute, trustedProxies)
contactSalesRL := middleware.RateLimit(rdb, envPositiveInt("RATE_LIMIT_CONTACT_SALES", 5), time.Hour, trustedProxies)
r.With(authRL).Post("/auth/send-code", h.SendCode)
r.With(authVerifyRL).Post("/auth/verify-code", h.VerifyCode)
r.With(authRL).Post("/auth/google", h.GoogleLogin)
r.Post("/auth/logout", h.Logout)
// Public API
r.Get("/api/config", h.GetConfig)
r.With(contactSalesRL).Post("/api/contact-sales", h.CreateContactSales)
// Webhook ingress for autopilots. Outside the authenticated group on
// purpose: the bearer token in the URL path IS the credential. Workspace
// context is derived from the trigger row, never from request headers.
r.Post("/api/webhooks/autopilots/{token}", h.HandleAutopilotWebhook)
// GitHub App webhook (no Multica auth — requests are authenticated via
// HMAC-SHA256 signature in the handler) and post-install setup callback.
r.Post("/api/webhooks/github", h.HandleGitHubWebhook)
r.Get("/api/github/setup", h.GitHubSetupCallback)
// Stripe webhook (no Multica auth — Stripe signs the raw body
// with a shared secret, the multica-cloud upstream verifies. We
// only forward the bytes + the Stripe-Signature header; see
// HandleCloudBillingStripeWebhook for the rationale).
r.Post("/api/webhooks/stripe", h.HandleCloudBillingStripeWebhook)
// Daemon API routes (require daemon token or valid user token)
r.Route("/api/daemon", func(r chi.Router) {
r.Use(middleware.DaemonAuth(queries, patCache, daemonTokenCache, cloudPATVerifier))
r.Post("/register", h.DaemonRegister)
r.Post("/deregister", h.DaemonDeregister)
r.Post("/heartbeat", h.DaemonHeartbeat)
r.Get("/ws", h.DaemonWebSocket)
r.Get("/workspaces/{workspaceId}/repos", h.GetDaemonWorkspaceRepos)
r.Post("/runtimes/{runtimeId}/tasks/claim", h.ClaimTaskByRuntime)
r.Get("/runtimes/{runtimeId}/tasks/pending", h.ListPendingTasksByRuntime)
r.Post("/runtimes/{runtimeId}/update/{updateId}/result", h.ReportUpdateResult)
r.Post("/runtimes/{runtimeId}/models/{requestId}/result", h.ReportModelListResult)
r.Post("/runtimes/{runtimeId}/local-skills/{requestId}/result", h.ReportLocalSkillListResult)
r.Post("/runtimes/{runtimeId}/local-skills/import/{requestId}/result", h.ReportLocalSkillImportResult)
r.Get("/tasks/{taskId}/status", h.GetTaskStatus)
r.Post("/tasks/{taskId}/start", h.StartTask)
r.Post("/tasks/{taskId}/wait-local-directory", h.MarkTaskWaitingLocalDirectory)
r.Post("/tasks/{taskId}/progress", h.ReportTaskProgress)
r.Post("/tasks/{taskId}/complete", h.CompleteTask)
r.Post("/tasks/{taskId}/fail", h.FailTask)
r.Post("/tasks/{taskId}/usage", h.ReportTaskUsage)
r.Post("/tasks/{taskId}/messages", h.ReportTaskMessages)
r.Get("/tasks/{taskId}/messages", h.ListTaskMessages)
r.Get("/issues/{issueId}/gc-check", h.GetIssueGCCheck)
r.Get("/chat-sessions/{sessionId}/gc-check", h.GetChatSessionGCCheck)
r.Get("/autopilot-runs/{runId}/gc-check", h.GetAutopilotRunGCCheck)
r.Get("/tasks/{taskId}/gc-check", h.GetTaskGCCheck)
r.Post("/runtimes/{runtimeId}/recover-orphans", h.RecoverOrphanedTasks)
r.Post("/tasks/{taskId}/session", h.PinTaskSession)
})
// Protected API routes
r.Group(func(r chi.Router) {
r.Use(middleware.Auth(queries, patCache, cloudPATVerifier))
r.Use(middleware.RefreshCloudFrontCookies(cfSigner))
// --- User-scoped routes (no workspace context required) ---
r.Get("/api/me", h.GetMe)
r.Patch("/api/me", h.UpdateMe)
r.Patch("/api/me/onboarding", h.PatchOnboarding)
r.Post("/api/me/onboarding/complete", h.CompleteOnboarding)
r.Post("/api/me/onboarding/cloud-waitlist", h.JoinCloudWaitlist)
// DEPRECATED — shim routes for desktop < v3 during the rollout
// window. v3 frontend creates the Helper agent + starter issue
// via generic CreateAgent / CreateIssue and only calls /complete
// here. Remove once X-Client-Version telemetry confirms zero
// pre-v3 desktops are still calling these. Handlers live in
// server/internal/handler/onboarding_shim.go.
r.Post("/api/me/onboarding/runtime-bootstrap", h.BootstrapOnboardingRuntime)
r.Post("/api/me/onboarding/no-runtime-bootstrap", h.BootstrapOnboardingNoRuntime)
r.Post("/api/cli-token", h.IssueCliToken)
r.Post("/api/upload-file", h.UploadFile)
r.Post("/api/feedback", h.CreateFeedback)
r.Route("/api/workspaces", func(r chi.Router) {
r.Get("/", h.ListWorkspaces)
r.Post("/", h.CreateWorkspace)
r.Route("/{id}", func(r chi.Router) {
// Member-level access
r.Group(func(r chi.Router) {
r.Use(middleware.RequireWorkspaceMemberFromURL(queries, "id"))
r.Get("/", h.GetWorkspace)
r.Get("/members", h.ListMembersWithUser)
r.Post("/leave", h.LeaveWorkspace)
r.Get("/invitations", h.ListWorkspaceInvitations)
// Listing GitHub installations is member-visible so the
// integrations tab no longer renders blank for non-admins;
// the handler strips the management handle and adds a
// can_manage hint so the UI can gate connect/disconnect.
r.Get("/github/installations", h.ListGitHubInstallations)
})
// Admin-level access
r.Group(func(r chi.Router) {
r.Use(middleware.RequireWorkspaceRoleFromURL(queries, "id", "owner", "admin"))
r.Put("/", h.UpdateWorkspace)
r.Patch("/", h.UpdateWorkspace)
r.Post("/members", h.CreateInvitation)
r.Route("/members/{memberId}", func(r chi.Router) {
r.Patch("/", h.UpdateMember)
r.Delete("/", h.DeleteMember)
})
r.Delete("/invitations/{invitationId}", h.RevokeInvitation)
})
// Owner-only access
r.With(middleware.RequireWorkspaceRoleFromURL(queries, "id", "owner")).Delete("/", h.DeleteWorkspace)
// GitHub integration — connect / disconnect remain admin-only;
// the read-only list endpoint lives in the member-level group
// above so non-admins can see the workspace's connection state.
r.Group(func(r chi.Router) {
r.Use(middleware.RequireWorkspaceRoleFromURL(queries, "id", "owner", "admin"))
r.Get("/github/connect", h.GitHubConnect)
r.Delete("/github/installations/{installationId}", h.DeleteGitHubInstallation)
})
// Lark integration. Listing is member-visible (same
// rationale as GitHub: the Integrations tab must
// render for non-admins so they see "wired up by whom").
// Install / revoke require admin to prevent a non-admin
// from binding a Bot to a workspace agent or yanking
// an installation out from under one.
r.Group(func(r chi.Router) {
r.Use(middleware.RequireWorkspaceMemberFromURL(queries, "id"))
r.Get("/lark/installations", h.ListLarkInstallations)
})
r.Group(func(r chi.Router) {
r.Use(middleware.RequireWorkspaceRoleFromURL(queries, "id", "owner", "admin"))
r.Delete("/lark/installations/{installationId}", h.RevokeLarkInstallation)
// Device-flow scan-to-install. Begin opens a new
// registration session against Lark and returns
// the QR-code URL; the frontend dialog then polls
// /install/{sessionId}/status until success or
// terminal failure.
r.Post("/lark/install/begin", h.BeginLarkInstall)
r.Get("/lark/install/{sessionId}/status", h.GetLarkInstallStatus)
})
})
})
// Lark binding-token redemption. NOT workspace-scoped because
// the redeemer hits this BEFORE they have any workspace
// context — the redemption itself is what mints their
// lark_user_binding row. Identity comes from the session;
// the token only proves "this open_id requested binding," and
// is combined with the logged-in user to create the mapping.
r.Post("/api/lark/binding/redeem", h.RedeemLarkBindingToken)
// User-scoped invitation routes (no workspace context required)
r.Get("/api/invitations", h.ListMyInvitations)
r.Get("/api/invitations/{id}", h.GetMyInvitation)
r.Post("/api/invitations/{id}/accept", h.AcceptInvitation)
r.Post("/api/invitations/{id}/decline", h.DeclineInvitation)
r.Route("/api/tokens", func(r chi.Router) {
r.Get("/", h.ListPersonalAccessTokens)
r.Post("/", h.CreatePersonalAccessToken)
r.Post("/current/renew", h.RenewCurrentPersonalAccessToken)
r.Delete("/{id}", h.RevokePersonalAccessToken)
})
// Cloud Billing proxy. Same upstream service / port as
// cloud-runtime — multica-cloud's Fleet and Billing share
// :8080 and the same chi router. All routes here forward
// to /api/v1/billing/* with X-User-ID stamped from the
// authenticated context.
//
// User-scoped (account-level), NOT workspace-scoped — sits
// outside the RequireWorkspaceMember group so a user can
// inspect their balance, top up, and open the Billing Portal
// without an active workspace selected. The upstream owner
// model is single-user; X-Workspace-ID would be ignored even
// if we sent it. The Stripe webhook is the public outlier
// and lives outside the entire Auth group (see above).
//
// IMPORTANT — task-token actors are blocked here. The Auth
// middleware happily turns an mat_ task token into a normal
// X-User-ID stamp (so agents can comment, claim issues, etc.
// as their owner), but billing is account-level and a running
// agent reading its owner's balance / opening a checkout
// session is the kind of lateral-movement we're explicitly
// trying to prevent. handler.RequireHumanActor checks the
// authoritative server-set X-Actor-Source header and 403s
// any task-token request. See actor_guards.go for the full
// rationale.
r.Route("/api/cloud-billing", func(r chi.Router) {
r.Use(handler.RequireHumanActor)
r.Get("/balance", h.GetCloudBillingBalance)
r.Get("/transactions", h.ListCloudBillingTransactions)
r.Get("/batches", h.ListCloudBillingBatches)
r.Get("/topups", h.ListCloudBillingTopups)
r.Get("/price-tiers", h.ListCloudBillingPriceTiers)
r.Post("/checkout-sessions", h.CreateCloudBillingCheckoutSession)
r.Get("/checkout-sessions/{sessionId}", h.GetCloudBillingCheckoutSession)
r.Post("/portal-sessions", h.CreateCloudBillingPortalSession)
})
// --- Workspace-scoped routes (all require workspace membership) ---
r.Group(func(r chi.Router) {
r.Use(middleware.RequireWorkspaceMember(queries))
// Assignee frequency
r.Get("/api/assignee-frequency", h.GetAssigneeFrequency)
// Issues
r.Route("/api/issues", func(r chi.Router) {
r.Get("/search", h.SearchIssues)
r.Get("/child-progress", h.ChildIssueProgress)
r.Get("/children", h.ListChildrenByParents)
r.Get("/grouped", h.ListGroupedIssues)
r.Get("/", h.ListIssues)
r.Post("/", h.CreateIssue)
r.Post("/quick-create", h.QuickCreateIssue)
r.Post("/batch-update", h.BatchUpdateIssues)
r.Post("/batch-delete", h.BatchDeleteIssues)
r.Route("/{id}", func(r chi.Router) {
r.Get("/", h.GetIssue)
r.Put("/", h.UpdateIssue)
r.Delete("/", h.DeleteIssue)
r.Post("/comments", h.CreateComment)
r.Get("/comments", h.ListComments)
r.Get("/timeline", h.ListTimeline)
r.Get("/subscribers", h.ListIssueSubscribers)
r.Post("/subscribe", h.SubscribeToIssue)
r.Post("/unsubscribe", h.UnsubscribeFromIssue)
r.Get("/active-task", h.GetActiveTaskForIssue)
r.Post("/tasks/{taskId}/cancel", h.CancelTask)
r.Post("/rerun", h.RerunIssue)
r.Get("/task-runs", h.ListTasksByIssue)
r.Get("/usage", h.GetIssueUsage)
r.Post("/reactions", h.AddIssueReaction)
r.Delete("/reactions", h.RemoveIssueReaction)
r.Get("/attachments", h.ListAttachments)
r.Get("/children", h.ListChildIssues)
r.Get("/labels", h.ListLabelsForIssue)
r.Post("/labels", h.AttachLabel)
r.Delete("/labels/{labelId}", h.DetachLabel)
r.Get("/metadata", h.ListIssueMetadata)
r.Put("/metadata/{key}", h.SetIssueMetadataKey)
r.Delete("/metadata/{key}", h.DeleteIssueMetadataKey)
r.Get("/pull-requests", h.ListPullRequestsForIssue)
})
})
// Task messages (user-facing, not daemon auth)
r.Get("/api/tasks/{taskId}/messages", h.ListTaskMessagesByUser)
// Labels
r.Route("/api/labels", func(r chi.Router) {
r.Get("/", h.ListLabels)
r.Post("/", h.CreateLabel)
r.Route("/{id}", func(r chi.Router) {
r.Get("/", h.GetLabel)
r.Put("/", h.UpdateLabel)
r.Delete("/", h.DeleteLabel)
})
})
// Projects
r.Route("/api/projects", func(r chi.Router) {
r.Get("/search", h.SearchProjects)
r.Get("/", h.ListProjects)
r.Post("/", h.CreateProject)
r.Route("/{id}", func(r chi.Router) {
r.Get("/", h.GetProject)
r.Put("/", h.UpdateProject)
r.Delete("/", h.DeleteProject)
r.Get("/resources", h.ListProjectResources)
r.Post("/resources", h.CreateProjectResource)
r.Put("/resources/{resourceId}", h.UpdateProjectResource)
r.Delete("/resources/{resourceId}", h.DeleteProjectResource)
})
})
// Squads
r.Route("/api/squads", func(r chi.Router) {
r.Get("/", h.ListSquads)
r.Post("/", h.CreateSquad)
r.Route("/{id}", func(r chi.Router) {
r.Get("/", h.GetSquad)
r.Put("/", h.UpdateSquad)
r.Delete("/", h.DeleteSquad)
r.Get("/members", h.ListSquadMembers)
r.Get("/members/status", h.ListSquadMemberStatus)
r.Post("/members", h.AddSquadMember)
r.Delete("/members", h.RemoveSquadMember)
r.Patch("/members/role", h.UpdateSquadMemberRole)
})
})
// Squad leader evaluation (writes to activity_log)
r.Post("/api/issues/{id}/squad-evaluated", h.RecordSquadLeaderEvaluation)
// Autopilots
r.Route("/api/autopilots", func(r chi.Router) {
r.Get("/", h.ListAutopilots)
r.Post("/", h.CreateAutopilot)
r.Route("/{id}", func(r chi.Router) {
r.Get("/", h.GetAutopilot)
r.Patch("/", h.UpdateAutopilot)
r.Delete("/", h.DeleteAutopilot)
r.Post("/trigger", h.TriggerAutopilot)
r.Get("/runs", h.ListAutopilotRuns)
r.Get("/runs/{runId}", h.GetAutopilotRun)
r.Get("/deliveries", h.ListAutopilotDeliveries)
r.Get("/deliveries/{deliveryId}", h.GetAutopilotDelivery)
r.Post("/deliveries/{deliveryId}/replay", h.ReplayAutopilotDelivery)
r.Post("/triggers", h.CreateAutopilotTrigger)
r.Route("/triggers/{triggerId}", func(r chi.Router) {
r.Patch("/", h.UpdateAutopilotTrigger)
r.Delete("/", h.DeleteAutopilotTrigger)
r.Post("/rotate-webhook-token", h.RotateAutopilotTriggerWebhookToken)
r.Put("/signing-secret", h.SetAutopilotTriggerSigningSecret)
})
})
})
// Pins
r.Route("/api/pins", func(r chi.Router) {
r.Get("/", h.ListPins)
r.Post("/", h.CreatePin)
r.Put("/reorder", h.ReorderPins)
r.Delete("/{itemType}/{itemId}", h.DeletePin)
})
// Attachments
r.Get("/api/attachments/{id}", h.GetAttachmentByID)
r.Get("/api/attachments/{id}/download", h.DownloadAttachment)
r.Get("/api/attachments/{id}/content", h.GetAttachmentContent)
r.Delete("/api/attachments/{id}", h.DeleteAttachment)
// Comments
r.Route("/api/comments/{commentId}", func(r chi.Router) {
r.Put("/", h.UpdateComment)
r.Delete("/", h.DeleteComment)
r.Post("/resolve", h.ResolveComment)
r.Delete("/resolve", h.UnresolveComment)
r.Post("/reactions", h.AddReaction)
r.Delete("/reactions", h.RemoveReaction)
})
// Agents
r.Route("/api/agents", func(r chi.Router) {
r.Get("/", h.ListAgents)
r.Post("/", h.CreateAgent)
// Agent templates: pre-configured instructions + skill refs.
// Picking a template imports the referenced skills into the
// workspace (find-or-create by name) and creates the agent
// with the template's instructions in one transaction.
r.Post("/from-template", h.CreateAgentFromTemplate)
r.Route("/{id}", func(r chi.Router) {
r.Get("/", h.GetAgent)
r.Put("/", h.UpdateAgent)
r.Post("/archive", h.ArchiveAgent)
r.Post("/restore", h.RestoreAgent)
r.Post("/cancel-tasks", h.CancelAgentTasks)
r.Get("/tasks", h.ListAgentTasks)
r.Get("/skills", h.ListAgentSkills)
r.Put("/skills", h.SetAgentSkills)
r.Post("/skills/add", h.AddAgentSkills)
// Dedicated env-management endpoint. Owner/admin only;
// agent actors are denied. Every reveal / write is
// audited to activity_log. See MUL-2600 and
// internal/handler/agent_env.go.
r.Get("/env", h.GetAgentEnv)
r.Put("/env", h.UpdateAgentEnv)
})
})
// Agent templates catalog (browse + detail). The Create flow
// lives under /api/agents/from-template above; this route is for
// the picker UI to list available templates.
r.Route("/api/agent-templates", func(r chi.Router) {
r.Get("/", h.ListAgentTemplates)
r.Get("/{slug}", h.GetAgentTemplate)
})
// Skills
r.Route("/api/skills", func(r chi.Router) {
r.Get("/", h.ListSkills)
r.Post("/", h.CreateSkill)
r.Get("/search", h.SearchSkills)
r.Post("/import", h.ImportSkill)
r.Route("/{id}", func(r chi.Router) {
r.Get("/", h.GetSkill)
r.Put("/", h.UpdateSkill)
r.Delete("/", h.DeleteSkill)
r.Get("/files", h.ListSkillFiles)
r.Put("/files", h.UpsertSkillFile)
r.Delete("/files/{fileId}", h.DeleteSkillFile)
})
})
// Dashboard — workspace-wide token + run-time rollups for the
// "/{slug}/dashboard" page. Optional ?project_id filter scopes
// the rollup to a single project.
r.Route("/api/dashboard", func(r chi.Router) {
r.Get("/usage/daily", h.GetDashboardUsageDaily)
r.Get("/usage/by-agent", h.GetDashboardUsageByAgent)
r.Get("/agent-runtime", h.GetDashboardAgentRunTime)
r.Get("/runtime/daily", h.GetDashboardRunTimeDaily)
})
// Runtimes
r.Route("/api/runtimes", func(r chi.Router) {
r.Get("/", h.ListAgentRuntimes)
r.Route("/{runtimeId}", func(r chi.Router) {
r.Patch("/", h.UpdateAgentRuntime)
r.Get("/usage", h.GetRuntimeUsage)
r.Get("/usage/by-agent", h.GetRuntimeUsageByAgent)
r.Get("/usage/by-hour", h.GetRuntimeUsageByHour)
r.Get("/activity", h.GetRuntimeTaskActivity)
r.Post("/update", h.InitiateUpdate)
r.Get("/update/{updateId}", h.GetUpdate)
r.Post("/models", h.InitiateListModels)
r.Get("/models/{requestId}", h.GetModelListRequest)
r.Post("/local-skills", h.InitiateListLocalSkills)
r.Get("/local-skills/{requestId}", h.GetLocalSkillListRequest)
r.Post("/local-skills/import", h.InitiateImportLocalSkill)
r.Get("/local-skills/import/{requestId}", h.GetLocalSkillImportRequest)
r.Delete("/", h.DeleteAgentRuntime)
// Cascade variant of DELETE: archive every active agent
// bound to this runtime, cancel their tasks, then delete
// the runtime — all in one transaction. Used by the
// DeleteRuntimeDialog when the strict DELETE refused with
// `runtime_has_active_agents` and the user confirmed the
// cascade plan.
r.Post("/archive-agents-and-delete", h.ArchiveAgentsAndDeleteRuntime)
})
})
// Cloud Runtime fleet proxy. The remote service URL is configured
// on SaaS API nodes only; self-hosted deployments return 503.
r.Route("/api/cloud-runtime", func(r chi.Router) {
r.Get("/", h.GetCloudRuntimeService)
r.Get("/healthz", h.GetCloudRuntimeHealth)
r.Get("/readyz", h.GetCloudRuntimeReady)
r.Get("/nodes", h.ListCloudRuntimeNodes)
r.Post("/nodes", h.CreateCloudRuntimeNode)
r.Delete("/nodes", h.DeleteCloudRuntimeNode)
r.Post("/nodes/start", h.StartCloudRuntimeNode)
r.Post("/nodes/stop", h.StopCloudRuntimeNode)
r.Post("/nodes/reboot", h.RebootCloudRuntimeNode)
r.Post("/nodes/status", h.GetCloudRuntimeNodeStatus)
r.Post("/nodes/exec", h.ExecCloudRuntimeNode)
})
// Tasks (user-facing, with ownership check)
r.Post("/api/tasks/{taskId}/cancel", h.CancelTaskByUser)
// Workspace-wide agent task snapshot for presence derivation:
// every active task + each agent's most recent terminal task.
r.Get("/api/agent-task-snapshot", h.ListWorkspaceAgentTaskSnapshot)
// Workspace-wide daily agent activity (last 30d, anchored on
// completed_at). Backs the Agents-list sparkline (trailing 7d
// slice) AND the agent detail "Last 30 days" panel.
r.Get("/api/agent-activity-30d", h.GetWorkspaceAgentActivity30d)
// Workspace-wide 30-day run counts per agent for the Agents-list RUNS column.
r.Get("/api/agent-run-counts", h.GetWorkspaceAgentRunCounts)
r.Route("/api/chat/sessions", func(r chi.Router) {
r.Post("/", h.CreateChatSession)
r.Get("/", h.ListChatSessions)
r.Route("/{sessionId}", func(r chi.Router) {
r.Get("/", h.GetChatSession)
r.Patch("/", h.UpdateChatSession)
r.Delete("/", h.DeleteChatSession)
r.Post("/messages", h.SendChatMessage)
r.Get("/messages", h.ListChatMessages)
r.Get("/messages/page", h.ListChatMessagesPage)
r.Get("/pending-task", h.GetPendingChatTask)
r.Post("/read", h.MarkChatSessionRead)
})
})
r.Get("/api/chat/pending-tasks", h.ListPendingChatTasks)
// Inbox
r.Route("/api/inbox", func(r chi.Router) {
r.Get("/", h.ListInbox)
r.Get("/unread-count", h.CountUnreadInbox)
r.Post("/mark-all-read", h.MarkAllInboxRead)
r.Post("/archive-all", h.ArchiveAllInbox)
r.Post("/archive-all-read", h.ArchiveAllReadInbox)
r.Post("/archive-completed", h.ArchiveCompletedInbox)
r.Post("/{id}/read", h.MarkInboxRead)
r.Post("/{id}/archive", h.ArchiveInboxItem)
})
// Notification preferences
r.Route("/api/notification-preferences", func(r chi.Router) {
r.Get("/", h.GetNotificationPreferences)
r.Put("/", h.UpdateNotificationPreferences)
})
})
})
return r, h
}
// buildLarkConnectorFactory wires the real WS long-conn connector
// that talks to /callback/ws/endpoint directly with app_id/app_secret.
// The connector wraps every read with a ctx-cancel watchdog so lease
// loss / shutdown breaks the blocking ReadMessage in bounded time —
// the invariant §4.4 leans on.
//
// If the endpoint fetcher fails to initialize (typically a malformed
// MULTICA_LARK_CALLBACK_BASE_URL), we log and fall back to the
// NoopConnector so the lease / supervisor lifecycle still exercises
// against real DB rows. Inbound messages are silently dropped until
// the config is fixed; the boot log labels the mode "noop" so the
// degraded state is visible.
//
// Returns the factory plus a short label for the boot log: "ws" in
// the healthy case, "noop" in the fallback case.
func buildLarkConnectorFactory(installSvc *lark.InstallationService, apiClient lark.APIClient) (lark.ConnectorFactory, string) {
endpointFetcher, err := lark.NewHTTPConnectionTokenFetcher(lark.HTTPConnectionTokenConfig{
BaseURL: strings.TrimSpace(os.Getenv("MULTICA_LARK_CALLBACK_BASE_URL")),
Logger: slog.Default(),
})
if err != nil {
slog.Error("lark ws: endpoint fetcher init failed; falling back to noop", "error", err)
return lark.NoopConnectorFactory(slog.Default()), "noop"
}
decoder := lark.NewLarkJSONFrameDecoder()
dialer := lark.NewGorillaDialer()
credsProvider := lark.CredentialsProviderFunc(func(ctx context.Context, inst db.LarkInstallation) (lark.InstallationCredentials, error) {
secret, err := installSvc.DecryptAppSecret(inst)
if err != nil {
return lark.InstallationCredentials{}, err
}
creds := lark.InstallationCredentials{
AppID: inst.AppID,
AppSecret: secret,
Region: lark.RegionOrDefault(inst.Region),
}
if inst.TenantKey.Valid {
creds.TenantKey = inst.TenantKey.String
}
return creds, nil
})
// Inbound enricher: expands quoted replies / forwarded bundles into
// the agent's body via the IM API before dispatch. It shares the
// connector's resolved credentials and runs under the connector's
// EnrichTimeout so it cannot overrun the Lark long-conn ACK budget.
enricher := lark.NewInboundEnricher(apiClient, lark.InboundEnricherConfig{Logger: slog.Default()})
conn, err := lark.NewWSLongConnConnector(lark.WSConnectorConfig{
Dialer: dialer,
EndpointFetcher: endpointFetcher,
FrameDecoder: decoder,
Enricher: enricher,
CredentialsProvider: credsProvider,
Logger: slog.Default(),
})
if err != nil {
slog.Error("lark ws: connector init failed; falling back to noop", "error", err)
return lark.NoopConnectorFactory(slog.Default()), "noop"
}
return func(_ db.LarkInstallation) (lark.EventConnector, error) {
return conn, nil
}, "ws-long-conn"
}
// membershipChecker implements realtime.MembershipChecker using database queries.
type membershipChecker struct {
queries *db.Queries
}
func (mc *membershipChecker) IsMember(ctx context.Context, userID, workspaceID string) bool {
_, err := mc.queries.GetMemberByUserAndWorkspace(ctx, db.GetMemberByUserAndWorkspaceParams{
UserID: parseUUID(userID),
WorkspaceID: parseUUID(workspaceID),
})
return err == nil
}
// patResolver implements realtime.PATResolver using database queries.
// patCache is shared with the Auth and DaemonAuth middlewares so a token
// revoke through any path invalidates the cache for all of them. Nil
// cache is supported and degrades to direct DB lookups.
type patResolver struct {
queries *db.Queries
cache *auth.PATCache
}
func (pr *patResolver) ResolveToken(ctx context.Context, token string) (string, bool) {
hash := auth.HashToken(token)
if userID, ok := pr.cache.Get(ctx, hash); ok {
return userID, true
}
pat, err := pr.queries.GetPersonalAccessTokenByHash(ctx, hash)
if err != nil {
return "", false
}
userID := util.UUIDToString(pat.UserID)
var expiresAt time.Time
if pat.ExpiresAt.Valid {
expiresAt = pat.ExpiresAt.Time
}
pr.cache.Set(ctx, hash, userID, auth.TTLForExpiry(time.Now(), expiresAt))
// Cache miss = first WS auth in this TTL window. Refresh last_used_at;
// subsequent connects within the window skip the write.
go pr.queries.UpdatePersonalAccessTokenLastUsed(context.Background(), pat.ID)
return userID, true
}
// parseUUID is a thin alias for util.MustParseUUID. Call sites here are all
// internal round-trips of DB-sourced UUIDs (e.g. issue.ID, e.ActorID), so an
// invalid value indicates a programming error and should panic loudly.
func parseUUID(s string) pgtype.UUID {
return util.MustParseUUID(s)
}
// optionalUUID returns a NULL pgtype.UUID for an empty string and otherwise
// behaves like parseUUID. Use this for actor IDs on events where the producer
// may legitimately be a "system" actor with no member/agent attribution
// (e.g. GitHub webhook auto-status sync) — the activity_log and inbox_item
// tables both allow actor_id to be NULL.
func optionalUUID(s string) pgtype.UUID {
if s == "" {
return pgtype.UUID{}
}
return util.MustParseUUID(s)
}
func splitAndTrim(s string) []string {
if s == "" {
return nil
}
parts := strings.Split(s, ",")
res := make([]string, 0, len(parts))
for _, p := range parts {
trimmed := strings.TrimSpace(p)
if trimmed != "" {
res = append(res, trimmed)
}
}
return res
}
func cloudRuntimeFleetURLFromEnv() string {
if url := strings.TrimSpace(os.Getenv("MULTICA_CLOUD_FLEET_URL")); url != "" {
return url
}
return strings.TrimSpace(os.Getenv("MULTICA_FLEET_URL"))
}