Files
multica/server/internal/handler/daemon_ws.go
devv-eve 9db91e89f5 feat: add daemon websocket task wakeups (#1772)
* feat: add daemon websocket task wakeups

* feat: fan out daemon wakeups across nodes

* fix: dedupe daemon wakeup loopback events

* fix: lengthen daemon polling fallback interval

---------

Co-authored-by: Eve <eve@multica.ai>
2026-04-28 16:07:24 +08:00

67 lines
1.6 KiB
Go

package handler
import (
"net/http"
"strings"
"github.com/multica-ai/multica/server/internal/daemonws"
"github.com/multica-ai/multica/server/internal/middleware"
)
func (h *Handler) DaemonWebSocket(w http.ResponseWriter, r *http.Request) {
if h.DaemonHub == nil {
writeError(w, http.StatusServiceUnavailable, "daemon websocket unavailable")
return
}
runtimeIDs := parseRuntimeIDs(r)
if len(runtimeIDs) == 0 {
writeError(w, http.StatusBadRequest, "runtime_ids required")
return
}
for _, runtimeID := range runtimeIDs {
rt, ok := h.requireDaemonRuntimeAccess(w, r, runtimeID)
if !ok {
return
}
if daemonID := middleware.DaemonIDFromContext(r.Context()); daemonID != "" && rt.DaemonID.Valid && rt.DaemonID.String != daemonID {
writeError(w, http.StatusNotFound, "runtime not found")
return
}
}
h.DaemonHub.HandleWebSocket(w, r, daemonws.ClientIdentity{
DaemonID: middleware.DaemonIDFromContext(r.Context()),
UserID: requestUserID(r),
WorkspaceID: middleware.DaemonWorkspaceIDFromContext(r.Context()),
RuntimeIDs: runtimeIDs,
ClientVersion: r.Header.Get("X-Client-Version"),
})
}
func parseRuntimeIDs(r *http.Request) []string {
seen := map[string]struct{}{}
var out []string
add := func(raw string) {
for _, part := range strings.Split(raw, ",") {
id := strings.TrimSpace(part)
if id == "" {
continue
}
if _, ok := seen[id]; ok {
continue
}
seen[id] = struct{}{}
out = append(out, id)
}
}
for _, raw := range r.URL.Query()["runtime_id"] {
add(raw)
}
for _, raw := range r.URL.Query()["runtime_ids"] {
add(raw)
}
return out
}