Compare commits

...

1 Commits

Author SHA1 Message Date
yushen
cdfdf8b4a6 fix(sweeper): reduce dispatch timeout from 300s to 60s (MUL-2246)
Shrinks the 'black hole' window when a daemon claim response is lost and
the task sits unstarted in dispatched state. Previously a lost claim would
leave the task stuck for 5 minutes before the sweeper failed it; now it
takes ~60-90s (60s threshold + up to 30s sweeper tick).

Phase 1 of the claim-lease fix: server-only, no protocol changes.

- Change dispatchTimeoutSeconds constant from 300.0 to 60.0
- Update related comments in daemon.go
- Add TestDispatchTimeoutIs60Seconds regression test validating the
  constant value and boundary behavior (90s stale task swept, 30s fresh
  task preserved)

Co-authored-by: multica-agent <github@multica.ai>
2026-05-15 12:38:11 +08:00
3 changed files with 126 additions and 4 deletions

View File

@@ -32,9 +32,11 @@ const (
// after this duration. 7 days gives users plenty of time to restart daemons.
offlineRuntimeTTLSeconds = 7 * 24 * 3600.0
// dispatchTimeoutSeconds fails tasks stuck in 'dispatched' beyond this.
// The dispatched→running transition should be near-instant, so 5 minutes
// The dispatched→running transition should be near-instant, so 60 seconds
// means something went wrong (e.g. StartTask API call failed silently).
dispatchTimeoutSeconds = 300.0
// Reduced from 300s to 60s (MUL-2246) to shrink the "black hole" window
// when a daemon claim response is lost and the task sits unstarted.
dispatchTimeoutSeconds = 60.0
// runningTimeoutSeconds fails tasks stuck in 'running' beyond this.
// The default agent timeout is 2h, so 2.5h gives a generous buffer.
runningTimeoutSeconds = 9000.0

View File

@@ -689,6 +689,126 @@ func TestExpireStaleQueuedTasksRespectsBatchLimit(t *testing.T) {
}
}
// TestDispatchTimeoutIs60Seconds is a regression test for MUL-2246: the dispatch
// timeout was reduced from 300s to 60s to shrink the "black hole" window when a
// daemon claim response is lost and the task sits unstarted. This test verifies:
// 1. The constant is 60s (not the old 300s).
// 2. A task dispatched 90s ago (> 60s) is swept as stale.
// 3. A task dispatched 30s ago (< 60s) is NOT swept.
func TestDispatchTimeoutIs60Seconds(t *testing.T) {
// Verify the constant value directly.
if dispatchTimeoutSeconds != 60.0 {
t.Fatalf("expected dispatchTimeoutSeconds=60, got %v", dispatchTimeoutSeconds)
}
if testPool == nil {
t.Skip("no database connection")
}
ctx := context.Background()
var agentID, runtimeID string
err := testPool.QueryRow(ctx, `
SELECT a.id, a.runtime_id FROM agent a
JOIN member m ON m.workspace_id = a.workspace_id
JOIN "user" u ON u.id = m.user_id
WHERE u.email = $1
LIMIT 1
`, integrationTestEmail).Scan(&agentID, &runtimeID)
if err != nil {
t.Fatalf("failed to find test agent: %v", err)
}
// Create two issues for the two tasks (unique constraint requires distinct issues).
mkIssue := func(label string) string {
var id string
if err := testPool.QueryRow(ctx, `
WITH bumped AS (
UPDATE workspace SET issue_counter = issue_counter + 1
WHERE id = $1 RETURNING issue_counter
)
INSERT INTO issue (workspace_id, title, status, priority, creator_type, creator_id, assignee_type, assignee_id, number)
SELECT $1, $3, 'todo', 'none', 'member', m.user_id, 'agent', $2, (SELECT issue_counter FROM bumped)
FROM member m WHERE m.workspace_id = $1 LIMIT 1
RETURNING id
`, testWorkspaceID, agentID, label).Scan(&id); err != nil {
t.Fatalf("failed to create issue %s: %v", label, err)
}
return id
}
staleIssueID := mkIssue("MUL-2246 dispatch timeout stale")
freshIssueID := mkIssue("MUL-2246 dispatch timeout fresh")
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE issue_id IN ($1, $2)`, staleIssueID, freshIssueID)
testPool.Exec(ctx, `DELETE FROM issue WHERE id IN ($1, $2)`, staleIssueID, freshIssueID)
})
// Stale task: dispatched 90s ago (beyond 60s threshold).
var staleTaskID string
err = testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, dispatched_at)
VALUES ($1, $2, $3, 'dispatched', 0, now() - interval '90 seconds')
RETURNING id
`, agentID, runtimeID, staleIssueID).Scan(&staleTaskID)
if err != nil {
t.Fatalf("failed to create stale task: %v", err)
}
// Fresh task: dispatched 30s ago (within 60s threshold).
var freshTaskID string
err = testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, dispatched_at)
VALUES ($1, $2, $3, 'dispatched', 0, now() - interval '30 seconds')
RETURNING id
`, agentID, runtimeID, freshIssueID).Scan(&freshTaskID)
if err != nil {
t.Fatalf("failed to create fresh task: %v", err)
}
queries := db.New(testPool)
// Use the actual constant value — this is the key assertion.
failedTasks, err := queries.FailStaleTasks(ctx, db.FailStaleTasksParams{
DispatchTimeoutSecs: dispatchTimeoutSeconds,
RunningTimeoutSecs: runningTimeoutSeconds,
})
if err != nil {
t.Fatalf("FailStaleTasks failed: %v", err)
}
// Stale task (90s old) must be failed.
staleFound := false
for _, ft := range failedTasks {
if ft.ID.Bytes == parseUUIDBytes(staleTaskID) {
staleFound = true
}
// Fresh task (30s old) must NOT be failed.
if ft.ID.Bytes == parseUUIDBytes(freshTaskID) {
t.Fatalf("fresh task (30s old) was incorrectly swept — dispatch timeout too aggressive")
}
}
if !staleFound {
t.Fatalf("stale task (90s old) was NOT swept — dispatch timeout not working at 60s")
}
// Verify DB states.
var staleStatus string
if err := testPool.QueryRow(ctx, `SELECT status FROM agent_task_queue WHERE id = $1`, staleTaskID).Scan(&staleStatus); err != nil {
t.Fatalf("failed to query stale task: %v", err)
}
if staleStatus != "failed" {
t.Fatalf("stale task: expected status=failed, got %q", staleStatus)
}
var freshStatus string
if err := testPool.QueryRow(ctx, `SELECT status FROM agent_task_queue WHERE id = $1`, freshTaskID).Scan(&freshStatus); err != nil {
t.Fatalf("failed to query fresh task: %v", err)
}
if freshStatus != "dispatched" {
t.Fatalf("fresh task: expected status=dispatched (not yet timed out), got %q", freshStatus)
}
}
// parseUUIDBytes converts a UUID string to the 16-byte array used by pgtype.UUID.
func parseUUIDBytes(s string) [16]byte {
s = strings.ReplaceAll(s, "-", "")

View File

@@ -1661,13 +1661,13 @@ func (d *Daemon) pollLoop(ctx context.Context, taskWakeups <-chan struct{}) erro
// claiming first and then waiting for a slot — would let claimed tasks pile
// up in the server-side `dispatched` state without a corresponding
// StartTask, and the server's sweeper would fail them as `failed/timeout`
// after dispatchTimeoutSeconds=300s (runtime_sweeper.go:25). That is the
// after dispatchTimeoutSeconds=60s (runtime_sweeper.go:25). That is the
// exact user-visible failure this issue is fixing, so we cannot risk
// recreating it under load.
//
// Slot-before-claim does mean a slow claim holds a slot during its HTTP
// roundtrip; the upper bound is `client.Timeout = 30s` (client.go:59), well
// below the 300s dispatch timeout, so other runtimes' tasks stay in
// below the 60s dispatch timeout, so other runtimes' tasks stay in
// server-side `queued` state (which has no timeout) rather than entering
// `dispatched` and racing the sweeper.
//