mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 13:29:44 +02:00
* feat(analytics): add PostHog client with async batch shipping Introduces server/internal/analytics, the shipping layer for the product funnel defined in docs/analytics.md. Capture is non-blocking — events are enqueued into a bounded channel and a background worker batches them to PostHog's /batch/ endpoint. A broken backend drops events rather than blocking request handlers. Local dev and self-hosted instances run a noop client until the operator sets POSTHOG_API_KEY. This is PR 1 of MUL-1122; signup and workspace_created emission land in the follow-up commit so this change is independently reviewable. * feat(server): emit signup and workspace_created analytics events Wires analytics.Client through handler.New and main, then emits the first two funnel events: - signup fires from findOrCreateUser (which now reports isNew), covering both the verification-code and Google OAuth entry points — a single emission site guarantees Google signups aren't missed. - workspace_created fires after the CreateWorkspace transaction commits, with is_first_workspace computed from a post-commit ListWorkspaces count so we can distinguish fresh-user activation from returning-user expansion. Tests use analytics.NoopClient so nothing ships from test runs. PR 1 of MUL-1122; runtime_registered and issue_executed follow in later PRs per the plan. * refactor(analytics): drop is_first_workspace from workspace_created Stamping "is this the user's first workspace?" at emit time races under concurrent CreateWorkspace requests: two transactions committing close together can both read a post-commit count greater than one and both emit false. Fixing it at the SQL layer requires a schema change we don't want in PR 1. PostHog answers the same question exactly from the event stream (funnel on "first time user does X" / cohort on $initial_event), so removing the property loses no information and makes the emit side race-free. * docs(analytics): document self-host safety defaults Spell out why self-hosted instances never ship events upstream by default (empty POSTHOG_API_KEY → noop client) and explain how operators can point at their own PostHog project without any code change. * feat(analytics): emit runtime_registered, issue_executed, team_invite_* Three server-side funnel events, all gated on first-time state transitions so retries and re-runs don't inflate the WAW buckets: - runtime_registered fires from DaemonRegister when UpsertAgentRuntime reports (xmax = 0) — i.e. the row was inserted, not updated. Heartbeats and re-registrations stay silent. - issue_executed fires from CompleteTask after an atomic UPDATE issue SET first_executed_at = now() WHERE id = $1 AND first_executed_at IS NULL flips the column for the first time. Retries, re-assignments, and comment-triggered follow-up tasks hit the WHERE clause and no-op. Carries nth_issue_for_workspace so the ≥1/≥2/≥5/≥10 buckets filter without extra queries. - team_invite_sent fires from CreateInvitation and team_invite_accepted from AcceptInvitation, closing the expansion funnel. Adds a 050 migration for issue.first_executed_at plus a partial index so the workspace-scoped executed-count query doesn't scan the never-executed tail. * feat(config): surface PostHog key via /api/config Extends AppConfig with posthog_key / posthog_host sourced from env on every request (so operators can rotate the key via secret refresh without a restart). Reading the key off the server — rather than baking it into the frontend bundle via NEXT_PUBLIC_* — means self-hosted instances inherit the blank key automatically and never ship events upstream. * feat(analytics): wire posthog-js identify + UTM capture on the client Adds @multica/core/analytics — a thin wrapper around posthog-js that owns attribution capture and identity merge. Posthog-js config comes from /api/config (not NEXT_PUBLIC_*), so self-hosted instances whose server returns an empty key automatically run the SDK inert. captureSignupSource stamps a multica_signup_source cookie with UTM params and the referrer's origin (never the full referrer — that can leak OAuth code/state in the callback URL). The backend signup event reads this cookie on new-user creation. Identity flows: - auth-initializer fires identify() right after getMe() resolves, on both cookie and token paths. A getConfig/getMe race is handled by buffering a pending identify inside the analytics module and flushing it once initAnalytics finishes. - auth store calls identify() on verifyCode / loginWithGoogle / loginWithToken and resetAnalytics() on logout so the next login merges cleanly without bleeding events. * docs(analytics): describe runtime_registered, issue_executed, invite events Fills in the schema for the remaining funnel events. Captures the design commentary that belongs next to the contract rather than in a PR description — in particular why issue_executed uses the atomic first_executed_at flip instead of counting task-terminal events, and why runtime_registered relies on xmax = 0 rather than a query-then-write. * fix(analytics): drop non-atomic nth_issue_for_workspace from issue_executed Computing the workspace's Nth-issue ordinal at emit time is not atomic under concurrent first-completions — two transactions can both run MarkIssueFirstExecuted, then both run CountExecutedIssuesInWorkspace, and both observe count=1 before either has committed, so both events go out stamped as n=1. Serialising it would mean a per-workspace advisory lock or a SERIALIZABLE-isolated tx; PostHog answers the same question exactly at query time via row_number() partitioned by workspace_id, so the emit-time property adds risk without adding information. Removes the property from analytics.IssueExecuted, deletes the unused CountExecutedIssuesInWorkspace query, and regenerates sqlc. The partial index stays — any future workspace-scoped executed-issue query will want it. * fix(analytics): wire $pageview and harden signup_source cookie payload Two frontend fixes from the PR review: - PageviewTracker, mounted under WebProviders, fires capturePageview on every Next.js App Router path / query-string change. Without this the capturePageview helper in @multica/core/analytics was never called and the acquisition funnel's / → signup step was empty. - captureSignupSource now caps each UTM / referrer value at 96 chars *before* JSON.stringify, and drops the whole cookie when the serialised payload still exceeds 512 chars. Previously the overall slice(0, 256) could leave a half-JSON string on the wire that neither the backend nor PostHog could parse. Both capturePageview and identify now buffer a single pending call when fired before initAnalytics resolves — otherwise the initial "/" pageview and same-turn login identify race the /api/config fetch and get dropped. resetAnalytics clears both buffers so a logout→login cycle stays clean. * fix(analytics): URL-decode signup_source cookie on read Go does not URL-decode Cookie.Value automatically, so the frontend's JSON-then-encodeURIComponent payload was landing in PostHog as percent-encoded garbage (%7B%22utm_source...). Unescape on read so the backend receives the original JSON string the frontend intended, and drop values that fail to decode or exceed the server-side cap — sending truncated garbage is worse than sending nothing. Oversized-cookie guard matches the frontend's SIGNUP_SOURCE_MAX_LEN. * docs(analytics): reflect nth-issue drop, $pageview wiring, cookie encoding Pulls the schema doc back in line with the code: issue_executed no longer advertises nth_issue_for_workspace (with a note about why PostHog derives it at query time instead), the frontend $pageview section names the actual PageviewTracker component that fires it, and the signup_source section documents the per-value cap / overall drop rule and the encode-on-write / decode-on-read contract. --------- Co-authored-by: Jiang Bohan <bhjiang@outlook.com>
425 lines
13 KiB
Go
425 lines
13 KiB
Go
package handler
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"log/slog"
|
|
"net/http"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgconn"
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
"github.com/multica-ai/multica/server/internal/analytics"
|
|
"github.com/multica-ai/multica/server/internal/auth"
|
|
"github.com/multica-ai/multica/server/internal/events"
|
|
"github.com/multica-ai/multica/server/internal/middleware"
|
|
"github.com/multica-ai/multica/server/internal/realtime"
|
|
"github.com/multica-ai/multica/server/internal/service"
|
|
"github.com/multica-ai/multica/server/internal/storage"
|
|
"github.com/multica-ai/multica/server/internal/util"
|
|
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
|
)
|
|
|
|
type txStarter interface {
|
|
Begin(ctx context.Context) (pgx.Tx, error)
|
|
}
|
|
|
|
type dbExecutor interface {
|
|
Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
|
|
Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
|
|
QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
|
|
}
|
|
|
|
type Config struct {
|
|
AllowSignup bool
|
|
AllowedEmails []string
|
|
AllowedEmailDomains []string
|
|
}
|
|
|
|
type Handler struct {
|
|
Queries *db.Queries
|
|
DB dbExecutor
|
|
TxStarter txStarter
|
|
Hub *realtime.Hub
|
|
Bus *events.Bus
|
|
TaskService *service.TaskService
|
|
AutopilotService *service.AutopilotService
|
|
EmailService *service.EmailService
|
|
PingStore *PingStore
|
|
UpdateStore *UpdateStore
|
|
ModelListStore *ModelListStore
|
|
Storage storage.Storage
|
|
CFSigner *auth.CloudFrontSigner
|
|
Analytics analytics.Client
|
|
cfg Config
|
|
}
|
|
|
|
func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *events.Bus, emailService *service.EmailService, store storage.Storage, cfSigner *auth.CloudFrontSigner, analyticsClient analytics.Client, cfg Config) *Handler {
|
|
var executor dbExecutor
|
|
if candidate, ok := txStarter.(dbExecutor); ok {
|
|
executor = candidate
|
|
}
|
|
|
|
if analyticsClient == nil {
|
|
analyticsClient = analytics.NoopClient{}
|
|
}
|
|
|
|
taskSvc := service.NewTaskService(queries, txStarter, hub, bus)
|
|
return &Handler{
|
|
Queries: queries,
|
|
DB: executor,
|
|
TxStarter: txStarter,
|
|
Hub: hub,
|
|
Bus: bus,
|
|
TaskService: taskSvc,
|
|
AutopilotService: service.NewAutopilotService(queries, txStarter, bus, taskSvc),
|
|
EmailService: emailService,
|
|
PingStore: NewPingStore(),
|
|
UpdateStore: NewUpdateStore(),
|
|
ModelListStore: NewModelListStore(),
|
|
Storage: store,
|
|
CFSigner: cfSigner,
|
|
Analytics: analyticsClient,
|
|
cfg: cfg,
|
|
}
|
|
}
|
|
|
|
func writeJSON(w http.ResponseWriter, status int, v any) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(status)
|
|
json.NewEncoder(w).Encode(v)
|
|
}
|
|
|
|
func writeError(w http.ResponseWriter, status int, msg string) {
|
|
writeJSON(w, status, map[string]string{"error": msg})
|
|
}
|
|
|
|
// Thin wrappers around util functions (preserve existing handler code unchanged).
|
|
func parseUUID(s string) pgtype.UUID { return util.ParseUUID(s) }
|
|
func uuidToString(u pgtype.UUID) string { return util.UUIDToString(u) }
|
|
func textToPtr(t pgtype.Text) *string { return util.TextToPtr(t) }
|
|
func ptrToText(s *string) pgtype.Text { return util.PtrToText(s) }
|
|
func strToText(s string) pgtype.Text { return util.StrToText(s) }
|
|
func timestampToString(t pgtype.Timestamptz) string { return util.TimestampToString(t) }
|
|
func timestampToPtr(t pgtype.Timestamptz) *string { return util.TimestampToPtr(t) }
|
|
func uuidToPtr(u pgtype.UUID) *string { return util.UUIDToPtr(u) }
|
|
|
|
// publish sends a domain event through the event bus.
|
|
func (h *Handler) publish(eventType, workspaceID, actorType, actorID string, payload any) {
|
|
h.Bus.Publish(events.Event{
|
|
Type: eventType,
|
|
WorkspaceID: workspaceID,
|
|
ActorType: actorType,
|
|
ActorID: actorID,
|
|
Payload: payload,
|
|
})
|
|
}
|
|
|
|
func isNotFound(err error) bool {
|
|
return errors.Is(err, pgx.ErrNoRows)
|
|
}
|
|
|
|
func isUniqueViolation(err error) bool {
|
|
var pgErr *pgconn.PgError
|
|
return errors.As(err, &pgErr) && pgErr.Code == "23505"
|
|
}
|
|
|
|
func requestUserID(r *http.Request) string {
|
|
return r.Header.Get("X-User-ID")
|
|
}
|
|
|
|
// resolveActor determines whether the request is from an agent or a human member.
|
|
// If X-Agent-ID and X-Task-ID headers are both set, validates that the task
|
|
// belongs to the claimed agent (defense-in-depth against manual header spoofing).
|
|
// If only X-Agent-ID is set, validates that the agent belongs to the workspace.
|
|
// Returns ("agent", agentID) on success, ("member", userID) otherwise.
|
|
func (h *Handler) resolveActor(r *http.Request, userID, workspaceID string) (actorType, actorID string) {
|
|
agentID := r.Header.Get("X-Agent-ID")
|
|
if agentID == "" {
|
|
return "member", userID
|
|
}
|
|
|
|
// Validate the agent exists in the target workspace.
|
|
agent, err := h.Queries.GetAgent(r.Context(), parseUUID(agentID))
|
|
if err != nil || uuidToString(agent.WorkspaceID) != workspaceID {
|
|
slog.Debug("resolveActor: X-Agent-ID rejected, agent not found or workspace mismatch", "agent_id", agentID, "workspace_id", workspaceID)
|
|
return "member", userID
|
|
}
|
|
|
|
// When X-Task-ID is provided, cross-check that the task belongs to this agent.
|
|
if taskID := r.Header.Get("X-Task-ID"); taskID != "" {
|
|
task, err := h.Queries.GetAgentTask(r.Context(), parseUUID(taskID))
|
|
if err != nil || uuidToString(task.AgentID) != agentID {
|
|
slog.Debug("resolveActor: X-Task-ID rejected, task not found or agent mismatch", "agent_id", agentID, "task_id", taskID)
|
|
return "member", userID
|
|
}
|
|
}
|
|
|
|
return "agent", agentID
|
|
}
|
|
|
|
func requireUserID(w http.ResponseWriter, r *http.Request) (string, bool) {
|
|
userID := requestUserID(r)
|
|
if userID == "" {
|
|
writeError(w, http.StatusUnauthorized, "user not authenticated")
|
|
return "", false
|
|
}
|
|
return userID, true
|
|
}
|
|
|
|
// resolveWorkspaceID returns the workspace UUID for this request. Delegates
|
|
// to middleware.ResolveWorkspaceIDFromRequest so middleware-protected routes
|
|
// and middleware-less routes (e.g. /api/upload-file) share identical
|
|
// resolution behavior — including slug → UUID translation via the DB.
|
|
//
|
|
// Returns "" when no workspace identifier was provided or a slug was provided
|
|
// but doesn't match any workspace.
|
|
func (h *Handler) resolveWorkspaceID(r *http.Request) string {
|
|
return middleware.ResolveWorkspaceIDFromRequest(r, h.Queries)
|
|
}
|
|
|
|
// ctxMember returns the workspace member from context (set by workspace middleware).
|
|
func ctxMember(ctx context.Context) (db.Member, bool) {
|
|
return middleware.MemberFromContext(ctx)
|
|
}
|
|
|
|
// ctxWorkspaceID returns the workspace ID from context (set by workspace middleware).
|
|
func ctxWorkspaceID(ctx context.Context) string {
|
|
return middleware.WorkspaceIDFromContext(ctx)
|
|
}
|
|
|
|
// workspaceIDFromURL returns the workspace ID from context (preferred) or chi URL param (fallback).
|
|
func workspaceIDFromURL(r *http.Request, param string) string {
|
|
if id := middleware.WorkspaceIDFromContext(r.Context()); id != "" {
|
|
return id
|
|
}
|
|
return chi.URLParam(r, param)
|
|
}
|
|
|
|
// workspaceMember returns the member from middleware context, or falls back to a DB
|
|
// lookup when the handler is called directly (e.g. in tests).
|
|
func (h *Handler) workspaceMember(w http.ResponseWriter, r *http.Request, workspaceID string) (db.Member, bool) {
|
|
if m, ok := ctxMember(r.Context()); ok {
|
|
return m, true
|
|
}
|
|
return h.requireWorkspaceMember(w, r, workspaceID, "workspace not found")
|
|
}
|
|
|
|
func roleAllowed(role string, roles ...string) bool {
|
|
for _, candidate := range roles {
|
|
if role == candidate {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func countOwners(members []db.Member) int {
|
|
owners := 0
|
|
for _, member := range members {
|
|
if member.Role == "owner" {
|
|
owners++
|
|
}
|
|
}
|
|
return owners
|
|
}
|
|
|
|
func (h *Handler) getWorkspaceMember(ctx context.Context, userID, workspaceID string) (db.Member, error) {
|
|
return h.Queries.GetMemberByUserAndWorkspace(ctx, db.GetMemberByUserAndWorkspaceParams{
|
|
UserID: parseUUID(userID),
|
|
WorkspaceID: parseUUID(workspaceID),
|
|
})
|
|
}
|
|
|
|
func (h *Handler) requireWorkspaceMember(w http.ResponseWriter, r *http.Request, workspaceID, notFoundMsg string) (db.Member, bool) {
|
|
if workspaceID == "" {
|
|
writeError(w, http.StatusBadRequest, "workspace_id is required")
|
|
return db.Member{}, false
|
|
}
|
|
|
|
userID, ok := requireUserID(w, r)
|
|
if !ok {
|
|
return db.Member{}, false
|
|
}
|
|
|
|
member, err := h.getWorkspaceMember(r.Context(), userID, workspaceID)
|
|
if err != nil {
|
|
writeError(w, http.StatusNotFound, notFoundMsg)
|
|
return db.Member{}, false
|
|
}
|
|
|
|
return member, true
|
|
}
|
|
|
|
func (h *Handler) requireWorkspaceRole(w http.ResponseWriter, r *http.Request, workspaceID, notFoundMsg string, roles ...string) (db.Member, bool) {
|
|
member, ok := h.requireWorkspaceMember(w, r, workspaceID, notFoundMsg)
|
|
if !ok {
|
|
return db.Member{}, false
|
|
}
|
|
if !roleAllowed(member.Role, roles...) {
|
|
writeError(w, http.StatusForbidden, "insufficient permissions")
|
|
return db.Member{}, false
|
|
}
|
|
return member, true
|
|
}
|
|
|
|
// isWorkspaceEntity checks whether a user_id belongs to the given workspace,
|
|
// as either a member or an agent depending on userType.
|
|
func (h *Handler) isWorkspaceEntity(ctx context.Context, userType, userID, workspaceID string) bool {
|
|
switch userType {
|
|
case "member":
|
|
_, err := h.getWorkspaceMember(ctx, userID, workspaceID)
|
|
return err == nil
|
|
case "agent":
|
|
_, err := h.Queries.GetAgentInWorkspace(ctx, db.GetAgentInWorkspaceParams{
|
|
ID: parseUUID(userID),
|
|
WorkspaceID: parseUUID(workspaceID),
|
|
})
|
|
return err == nil
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (h *Handler) loadIssueForUser(w http.ResponseWriter, r *http.Request, issueID string) (db.Issue, bool) {
|
|
if _, ok := requireUserID(w, r); !ok {
|
|
return db.Issue{}, false
|
|
}
|
|
|
|
workspaceID := h.resolveWorkspaceID(r)
|
|
if workspaceID == "" {
|
|
writeError(w, http.StatusBadRequest, "workspace_id is required")
|
|
return db.Issue{}, false
|
|
}
|
|
|
|
// Try identifier format first (e.g., "JIA-42").
|
|
if issue, ok := h.resolveIssueByIdentifier(r.Context(), issueID, workspaceID); ok {
|
|
return issue, true
|
|
}
|
|
|
|
issue, err := h.Queries.GetIssueInWorkspace(r.Context(), db.GetIssueInWorkspaceParams{
|
|
ID: parseUUID(issueID),
|
|
WorkspaceID: parseUUID(workspaceID),
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusNotFound, "issue not found")
|
|
return db.Issue{}, false
|
|
}
|
|
return issue, true
|
|
}
|
|
|
|
// resolveIssueByIdentifier tries to look up an issue by "PREFIX-NUMBER" format.
|
|
func (h *Handler) resolveIssueByIdentifier(ctx context.Context, id, workspaceID string) (db.Issue, bool) {
|
|
parts := splitIdentifier(id)
|
|
if parts == nil {
|
|
return db.Issue{}, false
|
|
}
|
|
if workspaceID == "" {
|
|
return db.Issue{}, false
|
|
}
|
|
issue, err := h.Queries.GetIssueByNumber(ctx, db.GetIssueByNumberParams{
|
|
WorkspaceID: parseUUID(workspaceID),
|
|
Number: parts.number,
|
|
})
|
|
if err != nil {
|
|
return db.Issue{}, false
|
|
}
|
|
return issue, true
|
|
}
|
|
|
|
type identifierParts struct {
|
|
prefix string
|
|
number int32
|
|
}
|
|
|
|
func splitIdentifier(id string) *identifierParts {
|
|
idx := -1
|
|
for i := len(id) - 1; i >= 0; i-- {
|
|
if id[i] == '-' {
|
|
idx = i
|
|
break
|
|
}
|
|
}
|
|
if idx <= 0 || idx >= len(id)-1 {
|
|
return nil
|
|
}
|
|
numStr := id[idx+1:]
|
|
num := 0
|
|
for _, c := range numStr {
|
|
if c < '0' || c > '9' {
|
|
return nil
|
|
}
|
|
num = num*10 + int(c-'0')
|
|
}
|
|
if num <= 0 {
|
|
return nil
|
|
}
|
|
return &identifierParts{prefix: id[:idx], number: int32(num)}
|
|
}
|
|
|
|
// getIssuePrefix fetches the issue_prefix for a workspace.
|
|
// Falls back to generating a prefix from the workspace name if the stored
|
|
// prefix is empty (e.g. workspaces created before the prefix was introduced).
|
|
func (h *Handler) getIssuePrefix(ctx context.Context, workspaceID pgtype.UUID) string {
|
|
ws, err := h.Queries.GetWorkspace(ctx, workspaceID)
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
if ws.IssuePrefix != "" {
|
|
return ws.IssuePrefix
|
|
}
|
|
return generateIssuePrefix(ws.Name)
|
|
}
|
|
|
|
func (h *Handler) loadAgentForUser(w http.ResponseWriter, r *http.Request, agentID string) (db.Agent, bool) {
|
|
if _, ok := requireUserID(w, r); !ok {
|
|
return db.Agent{}, false
|
|
}
|
|
|
|
workspaceID := h.resolveWorkspaceID(r)
|
|
if workspaceID == "" {
|
|
writeError(w, http.StatusBadRequest, "workspace_id is required")
|
|
return db.Agent{}, false
|
|
}
|
|
|
|
agent, err := h.Queries.GetAgentInWorkspace(r.Context(), db.GetAgentInWorkspaceParams{
|
|
ID: parseUUID(agentID),
|
|
WorkspaceID: parseUUID(workspaceID),
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusNotFound, "agent not found")
|
|
return db.Agent{}, false
|
|
}
|
|
return agent, true
|
|
}
|
|
|
|
func (h *Handler) loadInboxItemForUser(w http.ResponseWriter, r *http.Request, itemID string) (db.InboxItem, bool) {
|
|
userID, ok := requireUserID(w, r)
|
|
if !ok {
|
|
return db.InboxItem{}, false
|
|
}
|
|
|
|
workspaceID := h.resolveWorkspaceID(r)
|
|
if workspaceID == "" {
|
|
writeError(w, http.StatusBadRequest, "workspace_id is required")
|
|
return db.InboxItem{}, false
|
|
}
|
|
|
|
item, err := h.Queries.GetInboxItemInWorkspace(r.Context(), db.GetInboxItemInWorkspaceParams{
|
|
ID: parseUUID(itemID),
|
|
WorkspaceID: parseUUID(workspaceID),
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusNotFound, "inbox item not found")
|
|
return db.InboxItem{}, false
|
|
}
|
|
|
|
if item.RecipientType != "member" || uuidToString(item.RecipientID) != userID {
|
|
writeError(w, http.StatusNotFound, "inbox item not found")
|
|
return db.InboxItem{}, false
|
|
}
|
|
return item, true
|
|
}
|