Compare commits

...

1 Commits

Author SHA1 Message Date
Jiang Bohan
6f83593fff feat: support multiple agents running concurrently on the same issue
- Relax ClaimAgentTask SQL constraint from per-issue to per-(issue, agent)
  serialization, allowing different agents to run in parallel on the same issue
- Update GetActiveTaskForIssue API to return all active tasks (array)
- Refactor AgentLiveCard to support multi-agent display: first agent sticky,
  expand button to reveal additional agents
- Fix shouldEnqueueOnComment to use per-agent dedup so a mentioned agent's
  pending task doesn't block the assigned agent's on_comment trigger

Closes MUL-160
2026-04-08 17:18:46 +08:00
8 changed files with 251 additions and 171 deletions

View File

@@ -182,7 +182,7 @@ vi.mock("@/shared/api", () => ({
listIssueSubscribers: vi.fn().mockResolvedValue([]),
subscribeToIssue: vi.fn().mockResolvedValue(undefined),
unsubscribeFromIssue: vi.fn().mockResolvedValue(undefined),
getActiveTaskForIssue: vi.fn().mockResolvedValue({ task: null }),
getActiveTasksForIssue: vi.fn().mockResolvedValue({ tasks: [] }),
listTasksByIssue: vi.fn().mockResolvedValue([]),
listTaskMessages: vi.fn().mockResolvedValue([]),
},

View File

@@ -1,13 +1,14 @@
"use client";
import { useState, useEffect, useCallback, useRef } from "react";
import { Bot, ChevronRight, Loader2, ArrowDown, Brain, AlertCircle, Clock, CheckCircle2, XCircle, Square } from "lucide-react";
import { Bot, ChevronRight, ChevronDown, Loader2, ArrowDown, Brain, AlertCircle, Clock, CheckCircle2, XCircle, Square } from "lucide-react";
import { api } from "@/shared/api";
import { useWSEvent } from "@/features/realtime";
import type { TaskMessagePayload, TaskCompletedPayload, TaskFailedPayload, TaskCancelledPayload } from "@/shared/types/events";
import type { AgentTask } from "@/shared/types/agent";
import { cn } from "@/lib/utils";
import { toast } from "sonner";
import { ActorAvatar } from "@/components/common/actor-avatar";
import { Collapsible, CollapsibleContent, CollapsibleTrigger } from "@/components/ui/collapsible";
import { useActorName } from "@/features/workspace";
import { redactSecrets } from "../utils/redact";
@@ -52,26 +53,20 @@ function getToolSummary(item: TimelineItem): string {
if (!item.input) return "";
const inp = item.input as Record<string, string>;
// WebSearch / web search
if (inp.query) return inp.query;
// File operations
if (inp.file_path) return shortenPath(inp.file_path);
if (inp.path) return shortenPath(inp.path);
if (inp.pattern) return inp.pattern;
// Bash
if (inp.description) return String(inp.description);
if (inp.command) {
const cmd = String(inp.command);
return cmd.length > 100 ? cmd.slice(0, 100) + "..." : cmd;
}
// Agent
if (inp.prompt) {
const p = String(inp.prompt);
return p.length > 100 ? p.slice(0, 100) + "..." : p;
}
// Skill
if (inp.skill) return String(inp.skill);
// Fallback: show first string value
for (const v of Object.values(inp)) {
if (typeof v === "string" && v.length > 0 && v.length < 120) return v;
}
@@ -94,45 +89,50 @@ function buildTimeline(msgs: TaskMessagePayload[]): TimelineItem[] {
return items.sort((a, b) => a.seq - b.seq);
}
// ─── AgentLiveCard (real-time view) ────────────────────────────────────────
// ─── Per-task state ─────────────────────────────────────────────────────────
interface TaskState {
task: AgentTask;
items: TimelineItem[];
}
// ─── AgentLiveCard (multi-agent real-time view) ───────────────────────────
interface AgentLiveCardProps {
issueId: string;
agentName?: string;
}
export function AgentLiveCard({ issueId, agentName }: AgentLiveCardProps) {
export function AgentLiveCard({ issueId }: AgentLiveCardProps) {
const { getActorName } = useActorName();
const [activeTask, setActiveTask] = useState<AgentTask | null>(null);
const [items, setItems] = useState<TimelineItem[]>([]);
const [elapsed, setElapsed] = useState("");
const [autoScroll, setAutoScroll] = useState(true);
const [cancelling, setCancelling] = useState(false);
const scrollRef = useRef<HTMLDivElement>(null);
const [taskStates, setTaskStates] = useState<Map<string, TaskState>>(new Map());
const [expanded, setExpanded] = useState(false);
const seenSeqs = useRef(new Set<string>());
// Check for active task on mount
// Fetch active tasks on mount
useEffect(() => {
let cancelled = false;
api.getActiveTaskForIssue(issueId).then(({ task }) => {
if (!cancelled) {
setActiveTask(task);
if (task) {
api.listTaskMessages(task.id).then((msgs) => {
if (!cancelled) {
const timeline = buildTimeline(msgs);
setItems(timeline);
for (const m of msgs) seenSeqs.current.add(`${m.task_id}:${m.seq}`);
}
}).catch(console.error);
api.getActiveTasksForIssue(issueId).then(({ tasks }) => {
if (cancelled || tasks.length === 0) return;
const newStates = new Map<string, TaskState>();
const loadPromises = tasks.map(async (task) => {
try {
const msgs = await api.listTaskMessages(task.id);
const timeline = buildTimeline(msgs);
for (const m of msgs) seenSeqs.current.add(`${m.task_id}:${m.seq}`);
newStates.set(task.id, { task, items: timeline });
} catch {
newStates.set(task.id, { task, items: [] });
}
}
});
Promise.all(loadPromises).then(() => {
if (!cancelled) setTaskStates(newStates);
});
}).catch(console.error);
return () => { cancelled = true; };
}, [issueId]);
// Handle real-time task messages
// Handle real-time task messages — route by task_id
useWSEvent(
"task:message",
useCallback((payload: unknown) => {
@@ -142,86 +142,139 @@ export function AgentLiveCard({ issueId, agentName }: AgentLiveCardProps) {
if (seenSeqs.current.has(key)) return;
seenSeqs.current.add(key);
setItems((prev) => {
const item: TimelineItem = {
seq: msg.seq,
type: msg.type,
tool: msg.tool,
content: msg.content,
input: msg.input,
output: msg.output,
};
const next = [...prev, item];
next.sort((a, b) => a.seq - b.seq);
const item: TimelineItem = {
seq: msg.seq,
type: msg.type,
tool: msg.tool,
content: msg.content,
input: msg.input,
output: msg.output,
};
setTaskStates((prev) => {
const next = new Map(prev);
const existing = next.get(msg.task_id);
if (existing) {
const items = [...existing.items, item].sort((a, b) => a.seq - b.seq);
next.set(msg.task_id, { ...existing, items });
}
return next;
});
}, [issueId]),
);
// Handle task completion/failure
useWSEvent(
"task:completed",
useCallback((payload: unknown) => {
const p = payload as TaskCompletedPayload;
if (p.issue_id !== issueId) return;
setActiveTask(null);
setItems([]);
seenSeqs.current.clear();
setCancelling(false);
}, [issueId]),
);
// Handle task end events — remove only the specific task
const handleTaskEnd = useCallback((payload: unknown) => {
const p = payload as { task_id: string; issue_id: string };
if (p.issue_id !== issueId) return;
setTaskStates((prev) => {
const next = new Map(prev);
next.delete(p.task_id);
return next;
});
}, [issueId]);
useWSEvent(
"task:failed",
useCallback((payload: unknown) => {
const p = payload as TaskFailedPayload;
if (p.issue_id !== issueId) return;
setActiveTask(null);
setItems([]);
seenSeqs.current.clear();
setCancelling(false);
}, [issueId]),
);
useWSEvent("task:completed", handleTaskEnd);
useWSEvent("task:failed", handleTaskEnd);
useWSEvent("task:cancelled", handleTaskEnd);
useWSEvent(
"task:cancelled",
useCallback((payload: unknown) => {
const p = payload as TaskCancelledPayload;
if (p.issue_id !== issueId) return;
setActiveTask(null);
setItems([]);
seenSeqs.current.clear();
setCancelling(false);
}, [issueId]),
);
// Pick up new tasks — skip if we're already showing an active task to avoid
// replacing its timeline mid-execution (per-issue serialization in the
// backend prevents this race, but this is a defensive safeguard).
// Pick up newly dispatched tasks
useWSEvent(
"task:dispatch",
useCallback(() => {
if (activeTask) return;
api.getActiveTaskForIssue(issueId).then(({ task }) => {
if (task) {
setActiveTask(task);
setItems([]);
seenSeqs.current.clear();
}
api.getActiveTasksForIssue(issueId).then(({ tasks }) => {
setTaskStates((prev) => {
const next = new Map(prev);
for (const task of tasks) {
if (!next.has(task.id)) {
next.set(task.id, { task, items: [] });
}
}
return next;
});
}).catch(console.error);
}, [issueId, activeTask]),
}, [issueId]),
);
if (taskStates.size === 0) return null;
const entries = Array.from(taskStates.values());
if (entries.length === 0) return null;
const firstEntry = entries[0]!;
const restEntries = entries.slice(1);
return (
<div className="sticky top-4 z-10 space-y-1.5">
{/* Primary agent card — always visible */}
<SingleAgentLiveCard
task={firstEntry.task}
items={firstEntry.items}
issueId={issueId}
agentName={firstEntry.task.agent_id ? getActorName("agent", firstEntry.task.agent_id) : "Agent"}
/>
{/* Expand button for additional agents */}
{restEntries.length > 0 && !expanded && (
<button
onClick={() => setExpanded(true)}
className="flex w-full items-center justify-center gap-1.5 rounded-lg border border-info/20 bg-info/5 px-3 py-1.5 text-xs text-muted-foreground hover:text-foreground hover:bg-info/10 transition-colors"
>
<ChevronDown className="h-3 w-3" />
<span>{restEntries.length} more agent{restEntries.length > 1 ? "s" : ""} working</span>
</button>
)}
{/* Additional agent cards — shown when expanded */}
{restEntries.length > 0 && expanded && (
<>
{restEntries.map(({ task, items }) => (
<SingleAgentLiveCard
key={task.id}
task={task}
items={items}
issueId={issueId}
agentName={task.agent_id ? getActorName("agent", task.agent_id) : "Agent"}
/>
))}
<button
onClick={() => setExpanded(false)}
className="flex w-full items-center justify-center gap-1.5 text-xs text-muted-foreground hover:text-foreground transition-colors py-0.5"
>
<ChevronDown className="h-3 w-3 rotate-180" />
<span>Collapse</span>
</button>
</>
)}
</div>
);
}
// ─── SingleAgentLiveCard (one card per running task) ──────────────────────
interface SingleAgentLiveCardProps {
task: AgentTask;
items: TimelineItem[];
issueId: string;
agentName: string;
}
function SingleAgentLiveCard({ task, items, issueId, agentName }: SingleAgentLiveCardProps) {
const [elapsed, setElapsed] = useState("");
const [open, setOpen] = useState(false);
const [autoScroll, setAutoScroll] = useState(true);
const [cancelling, setCancelling] = useState(false);
const scrollRef = useRef<HTMLDivElement>(null);
// Elapsed time
useEffect(() => {
if (!activeTask?.started_at && !activeTask?.dispatched_at) return;
const ref = activeTask.started_at ?? activeTask.dispatched_at!;
setElapsed(formatElapsed(ref));
const interval = setInterval(() => setElapsed(formatElapsed(ref)), 1000);
if (!task.started_at && !task.dispatched_at) return;
const startRef = task.started_at ?? task.dispatched_at!;
setElapsed(formatElapsed(startRef));
const interval = setInterval(() => setElapsed(formatElapsed(startRef)), 1000);
return () => clearInterval(interval);
}, [activeTask?.started_at, activeTask?.dispatched_at]);
}, [task.started_at, task.dispatched_at]);
// Auto-scroll
// Auto-scroll timeline to bottom
useEffect(() => {
if (autoScroll && scrollRef.current) {
scrollRef.current.scrollTop = scrollRef.current.scrollHeight;
@@ -235,79 +288,100 @@ export function AgentLiveCard({ issueId, agentName }: AgentLiveCardProps) {
}, []);
const handleCancel = useCallback(async () => {
if (!activeTask || cancelling) return;
if (cancelling) return;
setCancelling(true);
try {
await api.cancelTask(issueId, activeTask.id);
await api.cancelTask(issueId, task.id);
} catch (e) {
toast.error(e instanceof Error ? e.message : "Failed to cancel task");
setCancelling(false);
}
}, [activeTask, issueId, cancelling]);
if (!activeTask) return null;
}, [task.id, issueId, cancelling]);
const toolCount = items.filter((i) => i.type === "tool_use").length;
return (
<div className="rounded-lg border border-info/20 bg-info/5">
{/* Header */}
<div className="flex items-center gap-2 px-3 py-2">
<div className="flex items-center justify-center h-5 w-5 rounded-full bg-info/10 text-info shrink-0">
<Bot className="h-3 w-3" />
</div>
<div className="flex items-center gap-1.5 text-xs font-medium min-w-0">
<Loader2 className="h-3 w-3 animate-spin text-info shrink-0" />
<span className="truncate">{(activeTask?.agent_id ? getActorName("agent", activeTask.agent_id) : agentName) ?? "Agent"} is working</span>
</div>
<span className="ml-auto text-xs text-muted-foreground tabular-nums shrink-0">{elapsed}</span>
{toolCount > 0 && (
<span className="text-xs text-muted-foreground shrink-0">
{toolCount} tool {toolCount === 1 ? "call" : "calls"}
</span>
<div className="rounded-lg border border-info/20 bg-info/5 backdrop-blur-sm">
{/* Header — click to toggle timeline */}
<div
className="group flex items-center gap-2 px-3 py-2 cursor-pointer select-none text-muted-foreground hover:text-foreground transition-colors"
role="button"
tabIndex={0}
aria-expanded={open}
onClick={() => setOpen(!open)}
onKeyDown={(e) => {
if (e.key === "Enter" || e.key === " ") {
e.preventDefault();
setOpen(!open);
}
}}
>
{task.agent_id ? (
<ActorAvatar actorType="agent" actorId={task.agent_id} size={20} />
) : (
<div className="flex items-center justify-center h-5 w-5 rounded-full shrink-0 bg-info/10 text-info">
<Bot className="h-3 w-3" />
</div>
)}
<button
onClick={handleCancel}
disabled={cancelling}
className="flex items-center gap-1 rounded px-1.5 py-0.5 text-xs text-muted-foreground hover:text-destructive hover:bg-destructive/10 transition-colors disabled:opacity-50 shrink-0"
title="Stop agent"
>
{cancelling ? (
<Loader2 className="h-3 w-3 animate-spin" />
) : (
<Square className="h-3 w-3" />
<div className="flex items-center gap-1.5 text-xs min-w-0">
<Loader2 className="h-3 w-3 animate-spin text-info shrink-0" />
<span className="font-medium text-foreground truncate">{agentName} is working</span>
<span className="text-muted-foreground tabular-nums shrink-0">{elapsed}</span>
{toolCount > 0 && (
<span className="text-muted-foreground shrink-0">{toolCount} tools</span>
)}
<span>Stop</span>
</button>
</div>
<div className="ml-auto flex items-center gap-1 shrink-0">
<button
onClick={(e) => { e.stopPropagation(); handleCancel(); }}
disabled={cancelling}
className="flex items-center gap-1 rounded px-1.5 py-0.5 text-xs text-muted-foreground hover:text-destructive hover:bg-destructive/10 transition-colors disabled:opacity-50"
title="Stop agent"
>
{cancelling ? <Loader2 className="h-3 w-3 animate-spin" /> : <Square className="h-3 w-3" />}
<span>Stop</span>
</button>
<ChevronDown className={cn("h-3.5 w-3.5 transition-transform", open && "rotate-180")} />
</div>
</div>
{/* Timeline content */}
{items.length > 0 && (
<div
ref={scrollRef}
onScroll={handleScroll}
className="relative max-h-80 overflow-y-auto border-t border-info/10 px-3 py-2 space-y-0.5"
>
{items.map((item, idx) => (
<TimelineRow key={`${item.seq}-${idx}`} item={item} />
))}
{!autoScroll && (
<button
onClick={() => {
if (scrollRef.current) {
scrollRef.current.scrollTop = scrollRef.current.scrollHeight;
setAutoScroll(true);
}
}}
className="sticky bottom-0 left-1/2 -translate-x-1/2 flex items-center gap-1 rounded-full bg-background border px-2 py-0.5 text-xs text-muted-foreground hover:text-foreground shadow-sm"
{/* Timeline — grid-rows animation for smooth collapse/expand */}
<div
className={cn(
"grid transition-[grid-template-rows] duration-200 ease-out",
open ? "grid-rows-[1fr]" : "grid-rows-[0fr]",
)}
>
<div className="overflow-hidden">
{items.length > 0 && (
<div
ref={scrollRef}
onScroll={handleScroll}
className="relative max-h-80 overflow-y-auto overscroll-y-contain border-t border-info/10 px-3 py-2 space-y-0.5"
>
<ArrowDown className="h-3 w-3" />
Latest
</button>
{items.map((item, idx) => (
<TimelineRow key={`${item.seq}-${idx}`} item={item} />
))}
{!autoScroll && (
<button
onClick={(e) => {
e.stopPropagation();
if (scrollRef.current) {
scrollRef.current.scrollTop = scrollRef.current.scrollHeight;
setAutoScroll(true);
}
}}
className="sticky bottom-0 left-1/2 -translate-x-1/2 flex items-center gap-1 rounded-full bg-background border px-2 py-0.5 text-xs text-muted-foreground hover:text-foreground shadow-sm"
>
<ArrowDown className="h-3 w-3" />
Latest
</button>
)}
</div>
)}
</div>
)}
</div>
</div>
);
}
@@ -326,7 +400,6 @@ export function TaskRunHistory({ issueId }: TaskRunHistoryProps) {
api.listTasksByIssue(issueId).then(setTasks).catch(console.error);
}, [issueId]);
// Refresh when a task completes
useWSEvent(
"task:completed",
useCallback((payload: unknown) => {
@@ -345,7 +418,6 @@ export function TaskRunHistory({ issueId }: TaskRunHistoryProps) {
}, [issueId]),
);
// Refresh when a task is cancelled
useWSEvent(
"task:cancelled",
useCallback((payload: unknown) => {
@@ -381,7 +453,7 @@ function TaskRunEntry({ task }: { task: AgentTask }) {
const [items, setItems] = useState<TimelineItem[] | null>(null);
const loadMessages = useCallback(() => {
if (items !== null) return; // already loaded
if (items !== null) return;
api.listTaskMessages(task.id).then((msgs) => {
setItems(buildTimeline(msgs));
}).catch((e) => {

View File

@@ -774,7 +774,6 @@ export function IssueDetail({ issueId, onDelete, defaultSidebarOpen = true, layo
<div className="mt-4">
<AgentLiveCard
issueId={id}
agentName={issue.assignee_type === "agent" && issue.assignee_id ? getActorName("agent", issue.assignee_id) : undefined}
/>
</div>

View File

@@ -371,7 +371,7 @@ export class ApiClient {
return this.fetch(`/api/agents/${agentId}/tasks`);
}
async getActiveTaskForIssue(issueId: string): Promise<{ task: AgentTask | null }> {
async getActiveTasksForIssue(issueId: string): Promise<{ tasks: AgentTask[] }> {
return this.fetch(`/api/issues/${issueId}/active-task`);
}

View File

@@ -536,17 +536,21 @@ func (h *Handler) ListTaskMessages(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, resp)
}
// GetActiveTaskForIssue returns the currently running task for an issue, if any.
// GetActiveTaskForIssue returns all currently active tasks for an issue.
func (h *Handler) GetActiveTaskForIssue(w http.ResponseWriter, r *http.Request) {
issueID := chi.URLParam(r, "id")
tasks, err := h.Queries.ListActiveTasksByIssue(r.Context(), parseUUID(issueID))
if err != nil || len(tasks) == 0 {
writeJSON(w, http.StatusOK, map[string]any{"task": nil})
return
if err != nil {
tasks = nil
}
writeJSON(w, http.StatusOK, map[string]any{"task": taskToResponse(tasks[0])})
resp := make([]AgentTaskResponse, len(tasks))
for i, t := range tasks {
resp[i] = taskToResponse(t)
}
writeJSON(w, http.StatusOK, map[string]any{"tasks": resp})
}
// CancelTask cancels a running or queued task by ID.

View File

@@ -506,9 +506,12 @@ func (h *Handler) shouldEnqueueOnComment(ctx context.Context, issue db.Issue) bo
return false
}
// Coalescing queue: allow enqueue when a task is running (so the agent
// picks up new comments on the next cycle) but skip if a pending task
// already exists (natural dedup for rapid-fire comments).
hasPending, err := h.Queries.HasPendingTaskForIssue(ctx, issue.ID)
// picks up new comments on the next cycle) but skip if this agent already
// has a pending task (natural dedup for rapid-fire comments).
hasPending, err := h.Queries.HasPendingTaskForIssueAndAgent(ctx, db.HasPendingTaskForIssueAndAgentParams{
IssueID: issue.ID,
AgentID: issue.AssigneeID,
})
if err != nil || hasPending {
return false
}

View File

@@ -111,6 +111,7 @@ WHERE id = (
AND NOT EXISTS (
SELECT 1 FROM agent_task_queue active
WHERE active.issue_id = atq.issue_id
AND active.agent_id = atq.agent_id
AND active.status IN ('dispatched', 'running')
)
ORDER BY atq.priority DESC, atq.created_at ASC
@@ -120,10 +121,10 @@ WHERE id = (
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id
`
// Claims the next queued task for an agent, enforcing per-issue serialization:
// a task is only claimable when no other task for the same issue is already
// dispatched or running. This guarantees serial execution within an issue
// while allowing parallel execution across different issues.
// Claims the next queued task for an agent, enforcing per-(issue, agent) serialization:
// a task is only claimable when no other task for the same issue AND same agent is
// already dispatched or running. This allows different agents to work on the same
// issue in parallel while preventing a single agent from running duplicate tasks.
func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, claimAgentTask, agentID)
var i AgentTaskQueue

View File

@@ -77,10 +77,10 @@ SELECT * FROM agent_task_queue
WHERE id = $1;
-- name: ClaimAgentTask :one
-- Claims the next queued task for an agent, enforcing per-issue serialization:
-- a task is only claimable when no other task for the same issue is already
-- dispatched or running. This guarantees serial execution within an issue
-- while allowing parallel execution across different issues.
-- Claims the next queued task for an agent, enforcing per-(issue, agent) serialization:
-- a task is only claimable when no other task for the same issue AND same agent is
-- already dispatched or running. This allows different agents to work on the same
-- issue in parallel while preventing a single agent from running duplicate tasks.
UPDATE agent_task_queue
SET status = 'dispatched', dispatched_at = now()
WHERE id = (
@@ -89,6 +89,7 @@ WHERE id = (
AND NOT EXISTS (
SELECT 1 FROM agent_task_queue active
WHERE active.issue_id = atq.issue_id
AND active.agent_id = atq.agent_id
AND active.status IN ('dispatched', 'running')
)
ORDER BY atq.priority DESC, atq.created_at ASC