diff --git a/server/internal/service/task.go b/server/internal/service/task.go index 9ae6b6ed9..4ae3f2d58 100644 --- a/server/internal/service/task.go +++ b/server/internal/service/task.go @@ -1833,15 +1833,23 @@ func (s *TaskService) HandleFailedTasks(ctx context.Context, tasks []db.AgentTas "error", checkErr, ) } else if !hasActive { - if _, updateErr := s.Queries.UpdateIssueStatus(ctx, db.UpdateIssueStatusParams{ + updatedIssue, updateErr := s.Queries.UpdateIssueStatus(ctx, db.UpdateIssueStatusParams{ ID: t.IssueID, Status: "todo", WorkspaceID: issue.WorkspaceID, - }); updateErr != nil { + }) + if updateErr != nil { slog.Warn("handle failed tasks: reset stuck issue failed", "issue_id", issueKey, "error", updateErr, ) + } else { + // This direct reset bypasses the HTTP UpdateIssue + // handler that normally emits issue:updated, so emit + // it here too. Without it the board / status-filter + // caches keep showing the issue as in_progress until + // the next write touches it (#4648 / MUL-3782). + s.broadcastIssueUpdated(updatedIssue, issue.Status) } } } @@ -2261,14 +2269,32 @@ func (s *TaskService) broadcastChatDone(ctx context.Context, task db.AgentTaskQu }) } -func (s *TaskService) broadcastIssueUpdated(issue db.Issue) { +// broadcastIssueUpdated publishes the issue:updated event the frontend's +// realtime reconcile (onIssueUpdated) relies on to move an issue between status +// columns / status filters and reconcile their bucket counts. prevStatus is the +// issue's status before the write so the client can gate that reconcile on +// status_changed. +// +// The `issue` payload is a map (issueToMap), which the workspace WS fanout +// (listeners.go SubscribeAll) marshals and broadcasts as-is — that is what +// drives the UI reconcile. Note this does NOT cover the full HTTP UpdateIssue +// side effects: the activity-log and inbox listeners type-assert `issue` to a +// handler.IssueResponse and skip a map, so a background status reset does not +// emit status-change activity / notifications. That is intentional for the +// realtime-staleness fix (#4648 / MUL-3782); folding those side effects in +// would mean unifying the payload type and is left as a follow-up. +func (s *TaskService) broadcastIssueUpdated(issue db.Issue, prevStatus string) { prefix := s.getIssuePrefix(issue.WorkspaceID) s.Bus.Publish(events.Event{ Type: protocol.EventIssueUpdated, WorkspaceID: util.UUIDToString(issue.WorkspaceID), ActorType: "system", ActorID: "", - Payload: map[string]any{"issue": issueToMap(issue, prefix)}, + Payload: map[string]any{ + "issue": issueToMap(issue, prefix), + "status_changed": prevStatus != issue.Status, + "prev_status": prevStatus, + }, }) } diff --git a/server/internal/service/task_issue_broadcast_test.go b/server/internal/service/task_issue_broadcast_test.go new file mode 100644 index 000000000..e74a834c2 --- /dev/null +++ b/server/internal/service/task_issue_broadcast_test.go @@ -0,0 +1,119 @@ +package service + +import ( + "context" + "testing" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + "github.com/multica-ai/multica/server/internal/events" + "github.com/multica-ai/multica/server/internal/util" + db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" +) + +// noRowsDBTX makes every read return pgx.ErrNoRows so getIssuePrefix's +// GetWorkspace lookup falls back to an empty prefix without needing a DB. The +// helper under test still publishes regardless of the prefix result. +type noRowsDBTX struct{} + +func (noRowsDBTX) Exec(context.Context, string, ...any) (pgconn.CommandTag, error) { + return pgconn.NewCommandTag(""), nil +} +func (noRowsDBTX) Query(context.Context, string, ...any) (pgx.Rows, error) { + return nil, pgx.ErrNoRows +} +func (noRowsDBTX) QueryRow(context.Context, string, ...any) pgx.Row { return noRow{} } + +type noRow struct{} + +func (noRow) Scan(...any) error { return pgx.ErrNoRows } + +// TestBroadcastIssueUpdated_EmitsStatusChange pins the realtime contract behind +// #4648 / MUL-3782: when a background path resets an issue's status (e.g. the +// failed-task handler flipping a stuck in_progress issue back to todo), it must +// publish issue:updated with status_changed=true and the new status so the +// frontend's onIssueUpdated reconcile moves the card between status columns / +// filters instead of leaving it stale until the next unrelated write. +func TestBroadcastIssueUpdated_EmitsStatusChange(t *testing.T) { + bus := events.New() + var got []events.Event + bus.SubscribeAll(func(e events.Event) { got = append(got, e) }) + + svc := &TaskService{ + Queries: db.New(noRowsDBTX{}), + Bus: bus, + } + + issue := db.Issue{ + ID: testUUID(1), + WorkspaceID: testUUID(2), + Number: 7, + Status: "todo", + } + svc.broadcastIssueUpdated(issue, "in_progress") + + if len(got) != 1 { + t.Fatalf("expected exactly 1 published event, got %d", len(got)) + } + e := got[0] + if e.Type != protocol.EventIssueUpdated { + t.Fatalf("expected event type %q, got %q", protocol.EventIssueUpdated, e.Type) + } + if e.WorkspaceID != util.UUIDToString(issue.WorkspaceID) { + t.Fatalf("workspace mismatch: got %q want %q", e.WorkspaceID, util.UUIDToString(issue.WorkspaceID)) + } + + payload, ok := e.Payload.(map[string]any) + if !ok { + t.Fatalf("payload is not map[string]any: %T", e.Payload) + } + if payload["status_changed"] != true { + t.Errorf("expected status_changed=true, got %v", payload["status_changed"]) + } + if payload["prev_status"] != "in_progress" { + t.Errorf("expected prev_status=in_progress, got %v", payload["prev_status"]) + } + issueMap, ok := payload["issue"].(map[string]any) + if !ok { + t.Fatalf("issue payload is not map[string]any: %T", payload["issue"]) + } + if issueMap["status"] != "todo" { + t.Errorf("expected issue.status=todo, got %v", issueMap["status"]) + } + if issueMap["id"] != util.UUIDToString(issue.ID) { + t.Errorf("issue.id mismatch: got %v want %q", issueMap["id"], util.UUIDToString(issue.ID)) + } +} + +// TestBroadcastIssueUpdated_NoStatusChange guards the gate: a same-status +// broadcast reports status_changed=false so the client skips the status-bucket +// reconcile for non-status field updates. +func TestBroadcastIssueUpdated_NoStatusChange(t *testing.T) { + bus := events.New() + var got []events.Event + bus.SubscribeAll(func(e events.Event) { got = append(got, e) }) + + svc := &TaskService{ + Queries: db.New(noRowsDBTX{}), + Bus: bus, + } + + issue := db.Issue{ + ID: testUUID(1), + WorkspaceID: testUUID(2), + Status: "todo", + } + svc.broadcastIssueUpdated(issue, "todo") + + if len(got) != 1 { + t.Fatalf("expected exactly 1 published event, got %d", len(got)) + } + payload, ok := got[0].Payload.(map[string]any) + if !ok { + t.Fatalf("payload is not map[string]any: %T", got[0].Payload) + } + if payload["status_changed"] != false { + t.Errorf("expected status_changed=false, got %v", payload["status_changed"]) + } +}