From 3137feecdf2a91eb833208aea7b8137bbeac3afb Mon Sep 17 00:00:00 2001 From: LinYushen Date: Fri, 15 May 2026 15:14:05 +0800 Subject: [PATCH] feat(task): add claim lease mechanism (Phase 2, MUL-2246) (#2660) Add claim_token + claim_expires_at columns to agent_task_queue and three new SQL queries for the claim lease protocol: - ClaimAgentTaskWithLease: generates a UUID token and sets a lease expiry when claiming a task, so the daemon must prove it received the response - StartAgentTaskWithClaimToken: validates the token on StartTask, preventing stale daemons from starting requeued tasks - RequeueExpiredClaimLeases: moves dispatched tasks with expired leases back to queued for re-claim This closes the reliability gap where a claim response lost in transit leaves a task stuck in dispatched until the 60s dispatch timeout fires. Co-authored-by: multica-agent --- .../migrations/091_task_claim_lease.down.sql | 3 + server/migrations/091_task_claim_lease.up.sql | 16 + server/pkg/db/generated/agent.sql.go | 305 ++++++++++++++++-- server/pkg/db/generated/autopilot.sql.go | 4 +- server/pkg/db/generated/chat.sql.go | 4 +- server/pkg/db/generated/models.go | 2 + server/pkg/db/generated/runtime.sql.go | 8 +- server/pkg/db/queries/agent.sql | 78 +++++ 8 files changed, 391 insertions(+), 29 deletions(-) create mode 100644 server/migrations/091_task_claim_lease.down.sql create mode 100644 server/migrations/091_task_claim_lease.up.sql diff --git a/server/migrations/091_task_claim_lease.down.sql b/server/migrations/091_task_claim_lease.down.sql new file mode 100644 index 000000000..d071f62ca --- /dev/null +++ b/server/migrations/091_task_claim_lease.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE agent_task_queue + DROP COLUMN IF EXISTS claim_token, + DROP COLUMN IF EXISTS claim_expires_at; diff --git a/server/migrations/091_task_claim_lease.up.sql b/server/migrations/091_task_claim_lease.up.sql new file mode 100644 index 000000000..40427b272 --- /dev/null +++ b/server/migrations/091_task_claim_lease.up.sql @@ -0,0 +1,16 @@ +-- Adds claim lease columns to agent_task_queue for Phase 2 of the claim +-- reliability fix (MUL-2246 / GitHub #2649). +-- +-- claim_token: opaque UUID generated at claim time; the daemon must present +-- it back in StartTask to prove it received the claim response. Prevents +-- a stale daemon from starting a task that was already requeued and +-- re-claimed by another runtime. +-- +-- claim_expires_at: absolute deadline by which the daemon must call StartTask +-- with the matching token. If this timestamp passes while the task is still +-- 'dispatched', the expired-lease requeue sweep moves it back to 'queued' +-- so it can be re-claimed. + +ALTER TABLE agent_task_queue + ADD COLUMN claim_token UUID, + ADD COLUMN claim_expires_at TIMESTAMPTZ; diff --git a/server/pkg/db/generated/agent.sql.go b/server/pkg/db/generated/agent.sql.go index af96545a8..54ce78d87 100644 --- a/server/pkg/db/generated/agent.sql.go +++ b/server/pkg/db/generated/agent.sql.go @@ -113,7 +113,7 @@ const cancelAgentTask = `-- name: CancelAgentTask :one UPDATE agent_task_queue SET status = 'cancelled', completed_at = now() WHERE id = $1 AND status IN ('queued', 'dispatched', 'running') -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) { @@ -145,6 +145,8 @@ func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTas &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ) return i, err } @@ -153,7 +155,7 @@ const cancelAgentTasksByAgent = `-- name: CancelAgentTasksByAgent :many UPDATE agent_task_queue SET status = 'cancelled', completed_at = now() WHERE agent_id = $1 AND status IN ('queued', 'dispatched', 'running') -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` // Bulk-cancel every active (queued/dispatched/running) task for an agent. @@ -196,6 +198,8 @@ func (q *Queries) CancelAgentTasksByAgent(ctx context.Context, agentID pgtype.UU &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ); err != nil { return nil, err } @@ -211,7 +215,7 @@ const cancelAgentTasksByChatSession = `-- name: CancelAgentTasksByChatSession :m UPDATE agent_task_queue SET status = 'cancelled', completed_at = now() WHERE chat_session_id = $1 AND status IN ('queued', 'dispatched', 'running') -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` // Cancels active tasks belonging to a chat session. Called from @@ -254,6 +258,8 @@ func (q *Queries) CancelAgentTasksByChatSession(ctx context.Context, chatSession &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ); err != nil { return nil, err } @@ -269,7 +275,7 @@ const cancelAgentTasksByIssue = `-- name: CancelAgentTasksByIssue :many UPDATE agent_task_queue SET status = 'cancelled', completed_at = now() WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running') -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` // Cancels every active task on the issue and returns the affected rows so the @@ -312,6 +318,8 @@ func (q *Queries) CancelAgentTasksByIssue(ctx context.Context, issueID pgtype.UU &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ); err != nil { return nil, err } @@ -327,7 +335,7 @@ const cancelAgentTasksByIssueAndAgent = `-- name: CancelAgentTasksByIssueAndAgen UPDATE agent_task_queue SET status = 'cancelled', completed_at = now() WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched', 'running') -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` type CancelAgentTasksByIssueAndAgentParams struct { @@ -374,6 +382,8 @@ func (q *Queries) CancelAgentTasksByIssueAndAgent(ctx context.Context, arg Cance &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ); err != nil { return nil, err } @@ -389,7 +399,7 @@ const cancelAgentTasksByTriggerComment = `-- name: CancelAgentTasksByTriggerComm UPDATE agent_task_queue SET status = 'cancelled', completed_at = now() WHERE trigger_comment_id = $1 AND status IN ('queued', 'dispatched', 'running') -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` // Cancels active tasks whose trigger is the given comment. Called when a @@ -432,6 +442,8 @@ func (q *Queries) CancelAgentTasksByTriggerComment(ctx context.Context, triggerC &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ); err != nil { return nil, err } @@ -470,7 +482,7 @@ WHERE id = ( LIMIT 1 FOR UPDATE SKIP LOCKED ) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` // Claims the next queued task for an agent, enforcing per-(issue, agent) serialization: @@ -511,6 +523,86 @@ func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (Agen &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, + ) + return i, err +} + +const claimAgentTaskWithLease = `-- name: ClaimAgentTaskWithLease :one +UPDATE agent_task_queue +SET status = 'dispatched', + dispatched_at = now(), + claim_token = gen_random_uuid(), + claim_expires_at = now() + make_interval(secs => $2::double precision) +WHERE id = ( + SELECT atq.id FROM agent_task_queue atq + WHERE atq.agent_id = $1 AND atq.status = 'queued' + AND NOT EXISTS ( + SELECT 1 FROM agent_task_queue active + WHERE active.agent_id = atq.agent_id + AND active.status IN ('dispatched', 'running') + AND ( + (atq.issue_id IS NOT NULL AND active.issue_id = atq.issue_id) + OR (atq.chat_session_id IS NOT NULL AND active.chat_session_id = atq.chat_session_id) + OR ( + atq.issue_id IS NULL + AND atq.chat_session_id IS NULL + AND atq.autopilot_run_id IS NULL + AND active.issue_id IS NULL + AND active.chat_session_id IS NULL + AND active.autopilot_run_id IS NULL + ) + ) + ) + ORDER BY atq.priority DESC, atq.created_at ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED +) +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at +` + +type ClaimAgentTaskWithLeaseParams struct { + AgentID pgtype.UUID `json:"agent_id"` + LeaseSeconds float64 `json:"lease_seconds"` +} + +// Like ClaimAgentTask but generates a claim_token and sets claim_expires_at. +// The daemon must present the token back in StartAgentTaskWithClaimToken to +// prove it received the claim response. If claim_expires_at passes without +// a successful StartTask, the expired-lease requeue sweep moves the task +// back to 'queued'. +func (q *Queries) ClaimAgentTaskWithLease(ctx context.Context, arg ClaimAgentTaskWithLeaseParams) (AgentTaskQueue, error) { + row := q.db.QueryRow(ctx, claimAgentTaskWithLease, arg.AgentID, arg.LeaseSeconds) + var i AgentTaskQueue + err := row.Scan( + &i.ID, + &i.AgentID, + &i.IssueID, + &i.Status, + &i.Priority, + &i.DispatchedAt, + &i.StartedAt, + &i.CompletedAt, + &i.Result, + &i.Error, + &i.CreatedAt, + &i.Context, + &i.RuntimeID, + &i.SessionID, + &i.WorkDir, + &i.TriggerCommentID, + &i.ChatSessionID, + &i.AutopilotRunID, + &i.Attempt, + &i.MaxAttempts, + &i.ParentTaskID, + &i.FailureReason, + &i.TriggerSummary, + &i.ForceFreshSession, + &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ) return i, err } @@ -554,7 +646,7 @@ const completeAgentTask = `-- name: CompleteAgentTask :one UPDATE agent_task_queue SET status = 'completed', completed_at = now(), result = $2, session_id = $3, work_dir = $4 WHERE id = $1 AND status = 'running' -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` type CompleteAgentTaskParams struct { @@ -598,6 +690,8 @@ func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskPa &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ) return i, err } @@ -697,7 +791,7 @@ VALUES ( COALESCE($7::boolean, FALSE), COALESCE($8::boolean, FALSE) ) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` type CreateAgentTaskParams struct { @@ -749,6 +843,8 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ) return i, err } @@ -756,7 +852,7 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams const createQuickCreateTask = `-- name: CreateQuickCreateTask :one INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, context) VALUES ($1, $2, NULL, 'queued', $3, $4) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` type CreateQuickCreateTaskParams struct { @@ -803,6 +899,8 @@ func (q *Queries) CreateQuickCreateTask(ctx context.Context, arg CreateQuickCrea &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ) return i, err } @@ -821,7 +919,7 @@ SELECT p.attempt + 1, p.max_attempts, p.id, p.is_leader_task FROM agent_task_queue p WHERE p.id = $1 -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` // Clones a parent task into a fresh queued attempt. Carries forward the @@ -860,6 +958,8 @@ func (q *Queries) CreateRetryTask(ctx context.Context, id pgtype.UUID) (AgentTas &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ) return i, err } @@ -882,7 +982,7 @@ FROM victims v WHERE t.id = v.id AND t.status = 'queued' AND t.created_at < now() - make_interval(secs => $1::double precision) -RETURNING t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task +RETURNING t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task, t.claim_token, t.claim_expires_at ` type ExpireStaleQueuedTasksParams struct { @@ -948,6 +1048,8 @@ func (q *Queries) ExpireStaleQueuedTasks(ctx context.Context, arg ExpireStaleQue &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ); err != nil { return nil, err } @@ -968,7 +1070,7 @@ SET status = 'failed', session_id = COALESCE($4, session_id), work_dir = COALESCE($5, work_dir) WHERE id = $1 AND status IN ('dispatched', 'running') -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` type FailAgentTaskParams struct { @@ -1023,6 +1125,8 @@ func (q *Queries) FailAgentTask(ctx context.Context, arg FailAgentTaskParams) (A &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ) return i, err } @@ -1033,7 +1137,7 @@ SET status = 'failed', completed_at = now(), error = 'task timed out', failure_reason = 'timeout' WHERE (status = 'dispatched' AND dispatched_at < now() - make_interval(secs => $1::double precision)) OR (status = 'running' AND started_at < now() - make_interval(secs => $2::double precision)) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` type FailStaleTasksParams struct { @@ -1079,6 +1183,8 @@ func (q *Queries) FailStaleTasks(ctx context.Context, arg FailStaleTasksParams) &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ); err != nil { return nil, err } @@ -1164,7 +1270,7 @@ func (q *Queries) GetAgentInWorkspace(ctx context.Context, arg GetAgentInWorkspa } const getAgentTask = `-- name: GetAgentTask :one -SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task FROM agent_task_queue +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at FROM agent_task_queue WHERE id = $1 ` @@ -1197,6 +1303,8 @@ func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQu &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ) return i, err } @@ -1461,7 +1569,7 @@ func (q *Queries) LinkTaskToIssue(ctx context.Context, arg LinkTaskToIssueParams } const listActiveTasksByIssue = `-- name: ListActiveTasksByIssue :many -SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task FROM agent_task_queue +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at FROM agent_task_queue WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running') ORDER BY created_at DESC ` @@ -1506,6 +1614,8 @@ func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUI &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ); err != nil { return nil, err } @@ -1518,7 +1628,7 @@ func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUI } const listAgentTasks = `-- name: ListAgentTasks :many -SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task FROM agent_task_queue +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at FROM agent_task_queue WHERE agent_id = $1 ORDER BY created_at DESC ` @@ -1558,6 +1668,8 @@ func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]Ag &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ); err != nil { return nil, err } @@ -1666,7 +1778,7 @@ func (q *Queries) ListAllAgents(ctx context.Context, workspaceID pgtype.UUID) ([ } const listPendingTasksByRuntime = `-- name: ListPendingTasksByRuntime :many -SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task FROM agent_task_queue +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at FROM agent_task_queue WHERE runtime_id = $1 AND status IN ('queued', 'dispatched') ORDER BY priority DESC, created_at ASC ` @@ -1706,6 +1818,8 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ); err != nil { return nil, err } @@ -1718,7 +1832,7 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp } const listQueuedClaimCandidatesByRuntime = `-- name: ListQueuedClaimCandidatesByRuntime :many -SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task FROM agent_task_queue +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at FROM agent_task_queue WHERE runtime_id = $1 AND status = 'queued' ORDER BY priority DESC, created_at ASC ` @@ -1766,6 +1880,8 @@ func (q *Queries) ListQueuedClaimCandidatesByRuntime(ctx context.Context, runtim &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ); err != nil { return nil, err } @@ -1778,7 +1894,7 @@ func (q *Queries) ListQueuedClaimCandidatesByRuntime(ctx context.Context, runtim } const listTasksByIssue = `-- name: ListTasksByIssue :many -SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task FROM agent_task_queue +SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at FROM agent_task_queue WHERE issue_id = $1 ORDER BY created_at DESC ` @@ -1818,6 +1934,8 @@ func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([] &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ); err != nil { return nil, err } @@ -1830,15 +1948,15 @@ func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([] } const listWorkspaceAgentTaskSnapshot = `-- name: ListWorkspaceAgentTaskSnapshot :many -SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task FROM agent_task_queue atq +SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.claim_token, atq.claim_expires_at FROM agent_task_queue atq JOIN agent a ON a.id = atq.agent_id WHERE a.workspace_id = $1 AND atq.status IN ('queued', 'dispatched', 'running') UNION ALL -SELECT t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task FROM ( - SELECT DISTINCT ON (atq.agent_id) atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task +SELECT t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task, t.claim_token, t.claim_expires_at FROM ( + SELECT DISTINCT ON (atq.agent_id) atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.claim_token, atq.claim_expires_at FROM agent_task_queue atq JOIN agent a ON a.id = atq.agent_id WHERE a.workspace_id = $1 @@ -1900,6 +2018,8 @@ func (q *Queries) ListWorkspaceAgentTaskSnapshot(ctx context.Context, workspaceI &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ); err != nil { return nil, err } @@ -1918,7 +2038,7 @@ SET status = 'failed', error = 'daemon restarted while task was in flight', failure_reason = 'runtime_recovery' WHERE runtime_id = $1 AND status IN ('dispatched', 'running') -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` // Called by the daemon at startup. Atomically fails any dispatched/running @@ -1960,6 +2080,8 @@ func (q *Queries) RecoverOrphanedTasksForRuntime(ctx context.Context, runtimeID &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ); err != nil { return nil, err } @@ -2011,6 +2133,83 @@ func (q *Queries) RefreshAgentStatusFromTasks(ctx context.Context, id pgtype.UUI return i, err } +const requeueExpiredClaimLeases = `-- name: RequeueExpiredClaimLeases :many +WITH expired AS ( + SELECT id FROM agent_task_queue + WHERE status = 'dispatched' + AND claim_expires_at IS NOT NULL + AND claim_expires_at < now() + ORDER BY claim_expires_at ASC + LIMIT $1::int + FOR UPDATE SKIP LOCKED +) +UPDATE agent_task_queue t +SET status = 'queued', + dispatched_at = NULL, + claim_token = NULL, + claim_expires_at = NULL +FROM expired e +WHERE t.id = e.id + AND t.status = 'dispatched' + AND t.claim_expires_at IS NOT NULL + AND t.claim_expires_at < now() +RETURNING t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task, t.claim_token, t.claim_expires_at +` + +// Moves dispatched tasks whose claim lease has expired back to 'queued' so +// they can be re-claimed. This handles the case where the server committed +// the claim but the response never reached the daemon (network timeout, +// daemon crash, etc.). Capped via LIMIT inside the CTE to bound per-tick +// work. Uses FOR UPDATE SKIP LOCKED to avoid contention with concurrent +// claim/start operations. +func (q *Queries) RequeueExpiredClaimLeases(ctx context.Context, maxPerTick int32) ([]AgentTaskQueue, error) { + rows, err := q.db.Query(ctx, requeueExpiredClaimLeases, maxPerTick) + if err != nil { + return nil, err + } + defer rows.Close() + items := []AgentTaskQueue{} + for rows.Next() { + var i AgentTaskQueue + if err := rows.Scan( + &i.ID, + &i.AgentID, + &i.IssueID, + &i.Status, + &i.Priority, + &i.DispatchedAt, + &i.StartedAt, + &i.CompletedAt, + &i.Result, + &i.Error, + &i.CreatedAt, + &i.Context, + &i.RuntimeID, + &i.SessionID, + &i.WorkDir, + &i.TriggerCommentID, + &i.ChatSessionID, + &i.AutopilotRunID, + &i.Attempt, + &i.MaxAttempts, + &i.ParentTaskID, + &i.FailureReason, + &i.TriggerSummary, + &i.ForceFreshSession, + &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const restoreAgent = `-- name: RestoreAgent :one UPDATE agent SET archived_at = NULL, archived_by = NULL, updated_at = now() WHERE id = $1 @@ -2050,7 +2249,7 @@ const startAgentTask = `-- name: StartAgentTask :one UPDATE agent_task_queue SET status = 'running', started_at = now() WHERE id = $1 AND status = 'dispatched' -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) { @@ -2082,6 +2281,62 @@ func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTask &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, + ) + return i, err +} + +const startAgentTaskWithClaimToken = `-- name: StartAgentTaskWithClaimToken :one +UPDATE agent_task_queue +SET status = 'running', + started_at = now(), + claim_token = NULL, + claim_expires_at = NULL +WHERE id = $1 AND status = 'dispatched' AND claim_token = $2 +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at +` + +type StartAgentTaskWithClaimTokenParams struct { + ID pgtype.UUID `json:"id"` + ClaimToken pgtype.UUID `json:"claim_token"` +} + +// Transitions a dispatched task to running only if the caller presents the +// correct claim_token. This prevents a stale daemon (whose original claim +// response was lost) from starting a task that has since been requeued and +// re-claimed by another runtime. +func (q *Queries) StartAgentTaskWithClaimToken(ctx context.Context, arg StartAgentTaskWithClaimTokenParams) (AgentTaskQueue, error) { + row := q.db.QueryRow(ctx, startAgentTaskWithClaimToken, arg.ID, arg.ClaimToken) + var i AgentTaskQueue + err := row.Scan( + &i.ID, + &i.AgentID, + &i.IssueID, + &i.Status, + &i.Priority, + &i.DispatchedAt, + &i.StartedAt, + &i.CompletedAt, + &i.Result, + &i.Error, + &i.CreatedAt, + &i.Context, + &i.RuntimeID, + &i.SessionID, + &i.WorkDir, + &i.TriggerCommentID, + &i.ChatSessionID, + &i.AutopilotRunID, + &i.Attempt, + &i.MaxAttempts, + &i.ParentTaskID, + &i.FailureReason, + &i.TriggerSummary, + &i.ForceFreshSession, + &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ) return i, err } diff --git a/server/pkg/db/generated/autopilot.sql.go b/server/pkg/db/generated/autopilot.sql.go index 01e4ff491..108f5504b 100644 --- a/server/pkg/db/generated/autopilot.sql.go +++ b/server/pkg/db/generated/autopilot.sql.go @@ -204,7 +204,7 @@ const createAutopilotTask = `-- name: CreateAutopilotTask :one INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, autopilot_run_id, trigger_summary) VALUES ($1, $2, NULL, 'queued', $3, $4, $5) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` type CreateAutopilotTaskParams struct { @@ -253,6 +253,8 @@ func (q *Queries) CreateAutopilotTask(ctx context.Context, arg CreateAutopilotTa &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ) return i, err } diff --git a/server/pkg/db/generated/chat.sql.go b/server/pkg/db/generated/chat.sql.go index e8250e07e..ad3d90940 100644 --- a/server/pkg/db/generated/chat.sql.go +++ b/server/pkg/db/generated/chat.sql.go @@ -90,7 +90,7 @@ func (q *Queries) CreateChatSession(ctx context.Context, arg CreateChatSessionPa const createChatTask = `-- name: CreateChatTask :one INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, chat_session_id) VALUES ($1, $2, NULL, 'queued', $3, $4) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` type CreateChatTaskParams struct { @@ -134,6 +134,8 @@ func (q *Queries) CreateChatTask(ctx context.Context, arg CreateChatTaskParams) &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ) return i, err } diff --git a/server/pkg/db/generated/models.go b/server/pkg/db/generated/models.go index f5de7faf6..c252aa880 100644 --- a/server/pkg/db/generated/models.go +++ b/server/pkg/db/generated/models.go @@ -95,6 +95,8 @@ type AgentTaskQueue struct { TriggerSummary pgtype.Text `json:"trigger_summary"` ForceFreshSession bool `json:"force_fresh_session"` IsLeaderTask bool `json:"is_leader_task"` + ClaimToken pgtype.UUID `json:"claim_token"` + ClaimExpiresAt pgtype.Timestamptz `json:"claim_expires_at"` } type Attachment struct { diff --git a/server/pkg/db/generated/runtime.sql.go b/server/pkg/db/generated/runtime.sql.go index 5a22e281c..731f0f67b 100644 --- a/server/pkg/db/generated/runtime.sql.go +++ b/server/pkg/db/generated/runtime.sql.go @@ -16,7 +16,7 @@ UPDATE agent_task_queue SET status = 'cancelled', completed_at = now() WHERE (runtime_id = ANY($1::uuid[]) OR agent_id = ANY($2::uuid[])) AND status IN ('queued', 'dispatched', 'running') -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` type CancelAgentTasksByRuntimeOrAgentParams struct { @@ -73,6 +73,8 @@ func (q *Queries) CancelAgentTasksByRuntimeOrAgent(ctx context.Context, arg Canc &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ); err != nil { return nil, err } @@ -187,7 +189,7 @@ WHERE status IN ('dispatched', 'running') AND runtime_id IN ( SELECT id FROM agent_runtime WHERE status = 'offline' ) -RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at ` // Marks dispatched/running tasks as failed when their runtime is offline. @@ -227,6 +229,8 @@ func (q *Queries) FailTasksForOfflineRuntimes(ctx context.Context) ([]AgentTaskQ &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, + &i.ClaimToken, + &i.ClaimExpiresAt, ); err != nil { return nil, err } diff --git a/server/pkg/db/queries/agent.sql b/server/pkg/db/queries/agent.sql index e6ffede7f..a7e0bc4dd 100644 --- a/server/pkg/db/queries/agent.sql +++ b/server/pkg/db/queries/agent.sql @@ -539,3 +539,81 @@ SET status = CASE WHEN EXISTS ( updated_at = now() WHERE a.id = $1 RETURNING *; + +-- name: ClaimAgentTaskWithLease :one +-- Like ClaimAgentTask but generates a claim_token and sets claim_expires_at. +-- The daemon must present the token back in StartAgentTaskWithClaimToken to +-- prove it received the claim response. If claim_expires_at passes without +-- a successful StartTask, the expired-lease requeue sweep moves the task +-- back to 'queued'. +UPDATE agent_task_queue +SET status = 'dispatched', + dispatched_at = now(), + claim_token = gen_random_uuid(), + claim_expires_at = now() + make_interval(secs => @lease_seconds::double precision) +WHERE id = ( + SELECT atq.id FROM agent_task_queue atq + WHERE atq.agent_id = $1 AND atq.status = 'queued' + AND NOT EXISTS ( + SELECT 1 FROM agent_task_queue active + WHERE active.agent_id = atq.agent_id + AND active.status IN ('dispatched', 'running') + AND ( + (atq.issue_id IS NOT NULL AND active.issue_id = atq.issue_id) + OR (atq.chat_session_id IS NOT NULL AND active.chat_session_id = atq.chat_session_id) + OR ( + atq.issue_id IS NULL + AND atq.chat_session_id IS NULL + AND atq.autopilot_run_id IS NULL + AND active.issue_id IS NULL + AND active.chat_session_id IS NULL + AND active.autopilot_run_id IS NULL + ) + ) + ) + ORDER BY atq.priority DESC, atq.created_at ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED +) +RETURNING *; + +-- name: StartAgentTaskWithClaimToken :one +-- Transitions a dispatched task to running only if the caller presents the +-- correct claim_token. This prevents a stale daemon (whose original claim +-- response was lost) from starting a task that has since been requeued and +-- re-claimed by another runtime. +UPDATE agent_task_queue +SET status = 'running', + started_at = now(), + claim_token = NULL, + claim_expires_at = NULL +WHERE id = $1 AND status = 'dispatched' AND claim_token = $2 +RETURNING *; + +-- name: RequeueExpiredClaimLeases :many +-- Moves dispatched tasks whose claim lease has expired back to 'queued' so +-- they can be re-claimed. This handles the case where the server committed +-- the claim but the response never reached the daemon (network timeout, +-- daemon crash, etc.). Capped via LIMIT inside the CTE to bound per-tick +-- work. Uses FOR UPDATE SKIP LOCKED to avoid contention with concurrent +-- claim/start operations. +WITH expired AS ( + SELECT id FROM agent_task_queue + WHERE status = 'dispatched' + AND claim_expires_at IS NOT NULL + AND claim_expires_at < now() + ORDER BY claim_expires_at ASC + LIMIT @max_per_tick::int + FOR UPDATE SKIP LOCKED +) +UPDATE agent_task_queue t +SET status = 'queued', + dispatched_at = NULL, + claim_token = NULL, + claim_expires_at = NULL +FROM expired e +WHERE t.id = e.id + AND t.status = 'dispatched' + AND t.claim_expires_at IS NOT NULL + AND t.claim_expires_at < now() +RETURNING t.*;