Compare commits

...

1 Commits

Author SHA1 Message Date
yushen
12d69672d1 feat(task): add claim lease mechanism (Phase 2, MUL-2246)
Add claim_token + claim_expires_at columns to agent_task_queue and three
new SQL queries for the claim lease protocol:

- ClaimAgentTaskWithLease: generates a UUID token and sets a lease expiry
  when claiming a task, so the daemon must prove it received the response
- StartAgentTaskWithClaimToken: validates the token on StartTask, preventing
  stale daemons from starting requeued tasks
- RequeueExpiredClaimLeases: moves dispatched tasks with expired leases back
  to queued for re-claim

This closes the reliability gap where a claim response lost in transit
leaves a task stuck in dispatched until the 60s dispatch timeout fires.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-15 12:51:56 +08:00
8 changed files with 391 additions and 29 deletions

View File

@@ -0,0 +1,3 @@
ALTER TABLE agent_task_queue
DROP COLUMN IF EXISTS claim_token,
DROP COLUMN IF EXISTS claim_expires_at;

View File

@@ -0,0 +1,16 @@
-- Adds claim lease columns to agent_task_queue for Phase 2 of the claim
-- reliability fix (MUL-2246 / GitHub #2649).
--
-- claim_token: opaque UUID generated at claim time; the daemon must present
-- it back in StartTask to prove it received the claim response. Prevents
-- a stale daemon from starting a task that was already requeued and
-- re-claimed by another runtime.
--
-- claim_expires_at: absolute deadline by which the daemon must call StartTask
-- with the matching token. If this timestamp passes while the task is still
-- 'dispatched', the expired-lease requeue sweep moves it back to 'queued'
-- so it can be re-claimed.
ALTER TABLE agent_task_queue
ADD COLUMN claim_token UUID,
ADD COLUMN claim_expires_at TIMESTAMPTZ;

View File

@@ -113,7 +113,7 @@ const cancelAgentTask = `-- name: CancelAgentTask :one
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE id = $1 AND status IN ('queued', 'dispatched', 'running')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
@@ -145,6 +145,8 @@ func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTas
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
)
return i, err
}
@@ -153,7 +155,7 @@ const cancelAgentTasksByAgent = `-- name: CancelAgentTasksByAgent :many
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE agent_id = $1 AND status IN ('queued', 'dispatched', 'running')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
// Bulk-cancel every active (queued/dispatched/running) task for an agent.
@@ -196,6 +198,8 @@ func (q *Queries) CancelAgentTasksByAgent(ctx context.Context, agentID pgtype.UU
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
); err != nil {
return nil, err
}
@@ -211,7 +215,7 @@ const cancelAgentTasksByChatSession = `-- name: CancelAgentTasksByChatSession :m
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE chat_session_id = $1 AND status IN ('queued', 'dispatched', 'running')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
// Cancels active tasks belonging to a chat session. Called from
@@ -254,6 +258,8 @@ func (q *Queries) CancelAgentTasksByChatSession(ctx context.Context, chatSession
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
); err != nil {
return nil, err
}
@@ -269,7 +275,7 @@ const cancelAgentTasksByIssue = `-- name: CancelAgentTasksByIssue :many
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
// Cancels every active task on the issue and returns the affected rows so the
@@ -312,6 +318,8 @@ func (q *Queries) CancelAgentTasksByIssue(ctx context.Context, issueID pgtype.UU
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
); err != nil {
return nil, err
}
@@ -327,7 +335,7 @@ const cancelAgentTasksByIssueAndAgent = `-- name: CancelAgentTasksByIssueAndAgen
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched', 'running')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
type CancelAgentTasksByIssueAndAgentParams struct {
@@ -374,6 +382,8 @@ func (q *Queries) CancelAgentTasksByIssueAndAgent(ctx context.Context, arg Cance
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
); err != nil {
return nil, err
}
@@ -389,7 +399,7 @@ const cancelAgentTasksByTriggerComment = `-- name: CancelAgentTasksByTriggerComm
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE trigger_comment_id = $1 AND status IN ('queued', 'dispatched', 'running')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
// Cancels active tasks whose trigger is the given comment. Called when a
@@ -432,6 +442,8 @@ func (q *Queries) CancelAgentTasksByTriggerComment(ctx context.Context, triggerC
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
); err != nil {
return nil, err
}
@@ -470,7 +482,7 @@ WHERE id = (
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
// Claims the next queued task for an agent, enforcing per-(issue, agent) serialization:
@@ -511,6 +523,86 @@ func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (Agen
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
)
return i, err
}
const claimAgentTaskWithLease = `-- name: ClaimAgentTaskWithLease :one
UPDATE agent_task_queue
SET status = 'dispatched',
dispatched_at = now(),
claim_token = gen_random_uuid(),
claim_expires_at = now() + make_interval(secs => $2::double precision)
WHERE id = (
SELECT atq.id FROM agent_task_queue atq
WHERE atq.agent_id = $1 AND atq.status = 'queued'
AND NOT EXISTS (
SELECT 1 FROM agent_task_queue active
WHERE active.agent_id = atq.agent_id
AND active.status IN ('dispatched', 'running')
AND (
(atq.issue_id IS NOT NULL AND active.issue_id = atq.issue_id)
OR (atq.chat_session_id IS NOT NULL AND active.chat_session_id = atq.chat_session_id)
OR (
atq.issue_id IS NULL
AND atq.chat_session_id IS NULL
AND atq.autopilot_run_id IS NULL
AND active.issue_id IS NULL
AND active.chat_session_id IS NULL
AND active.autopilot_run_id IS NULL
)
)
)
ORDER BY atq.priority DESC, atq.created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
type ClaimAgentTaskWithLeaseParams struct {
AgentID pgtype.UUID `json:"agent_id"`
LeaseSeconds float64 `json:"lease_seconds"`
}
// Like ClaimAgentTask but generates a claim_token and sets claim_expires_at.
// The daemon must present the token back in StartAgentTaskWithClaimToken to
// prove it received the claim response. If claim_expires_at passes without
// a successful StartTask, the expired-lease requeue sweep moves the task
// back to 'queued'.
func (q *Queries) ClaimAgentTaskWithLease(ctx context.Context, arg ClaimAgentTaskWithLeaseParams) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, claimAgentTaskWithLease, arg.AgentID, arg.LeaseSeconds)
var i AgentTaskQueue
err := row.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
)
return i, err
}
@@ -554,7 +646,7 @@ const completeAgentTask = `-- name: CompleteAgentTask :one
UPDATE agent_task_queue
SET status = 'completed', completed_at = now(), result = $2, session_id = $3, work_dir = $4
WHERE id = $1 AND status = 'running'
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
type CompleteAgentTaskParams struct {
@@ -598,6 +690,8 @@ func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskPa
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
)
return i, err
}
@@ -697,7 +791,7 @@ VALUES (
COALESCE($7::boolean, FALSE),
COALESCE($8::boolean, FALSE)
)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
type CreateAgentTaskParams struct {
@@ -749,6 +843,8 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
)
return i, err
}
@@ -756,7 +852,7 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams
const createQuickCreateTask = `-- name: CreateQuickCreateTask :one
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, context)
VALUES ($1, $2, NULL, 'queued', $3, $4)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
type CreateQuickCreateTaskParams struct {
@@ -803,6 +899,8 @@ func (q *Queries) CreateQuickCreateTask(ctx context.Context, arg CreateQuickCrea
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
)
return i, err
}
@@ -821,7 +919,7 @@ SELECT
p.attempt + 1, p.max_attempts, p.id, p.is_leader_task
FROM agent_task_queue p
WHERE p.id = $1
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
// Clones a parent task into a fresh queued attempt. Carries forward the
@@ -860,6 +958,8 @@ func (q *Queries) CreateRetryTask(ctx context.Context, id pgtype.UUID) (AgentTas
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
)
return i, err
}
@@ -882,7 +982,7 @@ FROM victims v
WHERE t.id = v.id
AND t.status = 'queued'
AND t.created_at < now() - make_interval(secs => $1::double precision)
RETURNING t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task
RETURNING t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task, t.claim_token, t.claim_expires_at
`
type ExpireStaleQueuedTasksParams struct {
@@ -948,6 +1048,8 @@ func (q *Queries) ExpireStaleQueuedTasks(ctx context.Context, arg ExpireStaleQue
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
); err != nil {
return nil, err
}
@@ -968,7 +1070,7 @@ SET status = 'failed',
session_id = COALESCE($4, session_id),
work_dir = COALESCE($5, work_dir)
WHERE id = $1 AND status IN ('dispatched', 'running')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
type FailAgentTaskParams struct {
@@ -1023,6 +1125,8 @@ func (q *Queries) FailAgentTask(ctx context.Context, arg FailAgentTaskParams) (A
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
)
return i, err
}
@@ -1033,7 +1137,7 @@ SET status = 'failed', completed_at = now(), error = 'task timed out',
failure_reason = 'timeout'
WHERE (status = 'dispatched' AND dispatched_at < now() - make_interval(secs => $1::double precision))
OR (status = 'running' AND started_at < now() - make_interval(secs => $2::double precision))
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
type FailStaleTasksParams struct {
@@ -1079,6 +1183,8 @@ func (q *Queries) FailStaleTasks(ctx context.Context, arg FailStaleTasksParams)
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
); err != nil {
return nil, err
}
@@ -1164,7 +1270,7 @@ func (q *Queries) GetAgentInWorkspace(ctx context.Context, arg GetAgentInWorkspa
}
const getAgentTask = `-- name: GetAgentTask :one
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at FROM agent_task_queue
WHERE id = $1
`
@@ -1197,6 +1303,8 @@ func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQu
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
)
return i, err
}
@@ -1461,7 +1569,7 @@ func (q *Queries) LinkTaskToIssue(ctx context.Context, arg LinkTaskToIssueParams
}
const listActiveTasksByIssue = `-- name: ListActiveTasksByIssue :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at FROM agent_task_queue
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running')
ORDER BY created_at DESC
`
@@ -1506,6 +1614,8 @@ func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUI
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
); err != nil {
return nil, err
}
@@ -1518,7 +1628,7 @@ func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUI
}
const listAgentTasks = `-- name: ListAgentTasks :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at FROM agent_task_queue
WHERE agent_id = $1
ORDER BY created_at DESC
`
@@ -1558,6 +1668,8 @@ func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]Ag
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
); err != nil {
return nil, err
}
@@ -1666,7 +1778,7 @@ func (q *Queries) ListAllAgents(ctx context.Context, workspaceID pgtype.UUID) ([
}
const listPendingTasksByRuntime = `-- name: ListPendingTasksByRuntime :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at FROM agent_task_queue
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
ORDER BY priority DESC, created_at ASC
`
@@ -1706,6 +1818,8 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
); err != nil {
return nil, err
}
@@ -1718,7 +1832,7 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
}
const listQueuedClaimCandidatesByRuntime = `-- name: ListQueuedClaimCandidatesByRuntime :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at FROM agent_task_queue
WHERE runtime_id = $1 AND status = 'queued'
ORDER BY priority DESC, created_at ASC
`
@@ -1766,6 +1880,8 @@ func (q *Queries) ListQueuedClaimCandidatesByRuntime(ctx context.Context, runtim
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
); err != nil {
return nil, err
}
@@ -1778,7 +1894,7 @@ func (q *Queries) ListQueuedClaimCandidatesByRuntime(ctx context.Context, runtim
}
const listTasksByIssue = `-- name: ListTasksByIssue :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at FROM agent_task_queue
WHERE issue_id = $1
ORDER BY created_at DESC
`
@@ -1818,6 +1934,8 @@ func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
); err != nil {
return nil, err
}
@@ -1830,15 +1948,15 @@ func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]
}
const listWorkspaceAgentTaskSnapshot = `-- name: ListWorkspaceAgentTaskSnapshot :many
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task FROM agent_task_queue atq
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.claim_token, atq.claim_expires_at FROM agent_task_queue atq
JOIN agent a ON a.id = atq.agent_id
WHERE a.workspace_id = $1
AND atq.status IN ('queued', 'dispatched', 'running')
UNION ALL
SELECT t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task FROM (
SELECT DISTINCT ON (atq.agent_id) atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task
SELECT t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task, t.claim_token, t.claim_expires_at FROM (
SELECT DISTINCT ON (atq.agent_id) atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.claim_token, atq.claim_expires_at
FROM agent_task_queue atq
JOIN agent a ON a.id = atq.agent_id
WHERE a.workspace_id = $1
@@ -1900,6 +2018,8 @@ func (q *Queries) ListWorkspaceAgentTaskSnapshot(ctx context.Context, workspaceI
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
); err != nil {
return nil, err
}
@@ -1918,7 +2038,7 @@ SET status = 'failed',
error = 'daemon restarted while task was in flight',
failure_reason = 'runtime_recovery'
WHERE runtime_id = $1 AND status IN ('dispatched', 'running')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
// Called by the daemon at startup. Atomically fails any dispatched/running
@@ -1960,6 +2080,8 @@ func (q *Queries) RecoverOrphanedTasksForRuntime(ctx context.Context, runtimeID
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
); err != nil {
return nil, err
}
@@ -2011,6 +2133,83 @@ func (q *Queries) RefreshAgentStatusFromTasks(ctx context.Context, id pgtype.UUI
return i, err
}
const requeueExpiredClaimLeases = `-- name: RequeueExpiredClaimLeases :many
WITH expired AS (
SELECT id FROM agent_task_queue
WHERE status = 'dispatched'
AND claim_expires_at IS NOT NULL
AND claim_expires_at < now()
ORDER BY claim_expires_at ASC
LIMIT $1::int
FOR UPDATE SKIP LOCKED
)
UPDATE agent_task_queue t
SET status = 'queued',
dispatched_at = NULL,
claim_token = NULL,
claim_expires_at = NULL
FROM expired e
WHERE t.id = e.id
AND t.status = 'dispatched'
AND t.claim_expires_at IS NOT NULL
AND t.claim_expires_at < now()
RETURNING t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task, t.claim_token, t.claim_expires_at
`
// Moves dispatched tasks whose claim lease has expired back to 'queued' so
// they can be re-claimed. This handles the case where the server committed
// the claim but the response never reached the daemon (network timeout,
// daemon crash, etc.). Capped via LIMIT inside the CTE to bound per-tick
// work. Uses FOR UPDATE SKIP LOCKED to avoid contention with concurrent
// claim/start operations.
func (q *Queries) RequeueExpiredClaimLeases(ctx context.Context, maxPerTick int32) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, requeueExpiredClaimLeases, maxPerTick)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const restoreAgent = `-- name: RestoreAgent :one
UPDATE agent SET archived_at = NULL, archived_by = NULL, updated_at = now()
WHERE id = $1
@@ -2050,7 +2249,7 @@ const startAgentTask = `-- name: StartAgentTask :one
UPDATE agent_task_queue
SET status = 'running', started_at = now()
WHERE id = $1 AND status = 'dispatched'
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
@@ -2082,6 +2281,62 @@ func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTask
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
)
return i, err
}
const startAgentTaskWithClaimToken = `-- name: StartAgentTaskWithClaimToken :one
UPDATE agent_task_queue
SET status = 'running',
started_at = now(),
claim_token = NULL,
claim_expires_at = NULL
WHERE id = $1 AND status = 'dispatched' AND claim_token = $2
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
type StartAgentTaskWithClaimTokenParams struct {
ID pgtype.UUID `json:"id"`
ClaimToken pgtype.UUID `json:"claim_token"`
}
// Transitions a dispatched task to running only if the caller presents the
// correct claim_token. This prevents a stale daemon (whose original claim
// response was lost) from starting a task that has since been requeued and
// re-claimed by another runtime.
func (q *Queries) StartAgentTaskWithClaimToken(ctx context.Context, arg StartAgentTaskWithClaimTokenParams) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, startAgentTaskWithClaimToken, arg.ID, arg.ClaimToken)
var i AgentTaskQueue
err := row.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
)
return i, err
}

View File

@@ -204,7 +204,7 @@ const createAutopilotTask = `-- name: CreateAutopilotTask :one
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, autopilot_run_id, trigger_summary)
VALUES ($1, $2, NULL, 'queued', $3, $4, $5)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
type CreateAutopilotTaskParams struct {
@@ -253,6 +253,8 @@ func (q *Queries) CreateAutopilotTask(ctx context.Context, arg CreateAutopilotTa
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
)
return i, err
}

View File

@@ -90,7 +90,7 @@ func (q *Queries) CreateChatSession(ctx context.Context, arg CreateChatSessionPa
const createChatTask = `-- name: CreateChatTask :one
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, chat_session_id)
VALUES ($1, $2, NULL, 'queued', $3, $4)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
type CreateChatTaskParams struct {
@@ -134,6 +134,8 @@ func (q *Queries) CreateChatTask(ctx context.Context, arg CreateChatTaskParams)
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
)
return i, err
}

View File

@@ -95,6 +95,8 @@ type AgentTaskQueue struct {
TriggerSummary pgtype.Text `json:"trigger_summary"`
ForceFreshSession bool `json:"force_fresh_session"`
IsLeaderTask bool `json:"is_leader_task"`
ClaimToken pgtype.UUID `json:"claim_token"`
ClaimExpiresAt pgtype.Timestamptz `json:"claim_expires_at"`
}
type Attachment struct {

View File

@@ -16,7 +16,7 @@ UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE (runtime_id = ANY($1::uuid[]) OR agent_id = ANY($2::uuid[]))
AND status IN ('queued', 'dispatched', 'running')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
type CancelAgentTasksByRuntimeOrAgentParams struct {
@@ -73,6 +73,8 @@ func (q *Queries) CancelAgentTasksByRuntimeOrAgent(ctx context.Context, arg Canc
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
); err != nil {
return nil, err
}
@@ -187,7 +189,7 @@ WHERE status IN ('dispatched', 'running')
AND runtime_id IN (
SELECT id FROM agent_runtime WHERE status = 'offline'
)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, claim_token, claim_expires_at
`
// Marks dispatched/running tasks as failed when their runtime is offline.
@@ -227,6 +229,8 @@ func (q *Queries) FailTasksForOfflineRuntimes(ctx context.Context) ([]AgentTaskQ
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.ClaimToken,
&i.ClaimExpiresAt,
); err != nil {
return nil, err
}

View File

@@ -539,3 +539,81 @@ SET status = CASE WHEN EXISTS (
updated_at = now()
WHERE a.id = $1
RETURNING *;
-- name: ClaimAgentTaskWithLease :one
-- Like ClaimAgentTask but generates a claim_token and sets claim_expires_at.
-- The daemon must present the token back in StartAgentTaskWithClaimToken to
-- prove it received the claim response. If claim_expires_at passes without
-- a successful StartTask, the expired-lease requeue sweep moves the task
-- back to 'queued'.
UPDATE agent_task_queue
SET status = 'dispatched',
dispatched_at = now(),
claim_token = gen_random_uuid(),
claim_expires_at = now() + make_interval(secs => @lease_seconds::double precision)
WHERE id = (
SELECT atq.id FROM agent_task_queue atq
WHERE atq.agent_id = $1 AND atq.status = 'queued'
AND NOT EXISTS (
SELECT 1 FROM agent_task_queue active
WHERE active.agent_id = atq.agent_id
AND active.status IN ('dispatched', 'running')
AND (
(atq.issue_id IS NOT NULL AND active.issue_id = atq.issue_id)
OR (atq.chat_session_id IS NOT NULL AND active.chat_session_id = atq.chat_session_id)
OR (
atq.issue_id IS NULL
AND atq.chat_session_id IS NULL
AND atq.autopilot_run_id IS NULL
AND active.issue_id IS NULL
AND active.chat_session_id IS NULL
AND active.autopilot_run_id IS NULL
)
)
)
ORDER BY atq.priority DESC, atq.created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING *;
-- name: StartAgentTaskWithClaimToken :one
-- Transitions a dispatched task to running only if the caller presents the
-- correct claim_token. This prevents a stale daemon (whose original claim
-- response was lost) from starting a task that has since been requeued and
-- re-claimed by another runtime.
UPDATE agent_task_queue
SET status = 'running',
started_at = now(),
claim_token = NULL,
claim_expires_at = NULL
WHERE id = $1 AND status = 'dispatched' AND claim_token = $2
RETURNING *;
-- name: RequeueExpiredClaimLeases :many
-- Moves dispatched tasks whose claim lease has expired back to 'queued' so
-- they can be re-claimed. This handles the case where the server committed
-- the claim but the response never reached the daemon (network timeout,
-- daemon crash, etc.). Capped via LIMIT inside the CTE to bound per-tick
-- work. Uses FOR UPDATE SKIP LOCKED to avoid contention with concurrent
-- claim/start operations.
WITH expired AS (
SELECT id FROM agent_task_queue
WHERE status = 'dispatched'
AND claim_expires_at IS NOT NULL
AND claim_expires_at < now()
ORDER BY claim_expires_at ASC
LIMIT @max_per_tick::int
FOR UPDATE SKIP LOCKED
)
UPDATE agent_task_queue t
SET status = 'queued',
dispatched_at = NULL,
claim_token = NULL,
claim_expires_at = NULL
FROM expired e
WHERE t.id = e.id
AND t.status = 'dispatched'
AND t.claim_expires_at IS NOT NULL
AND t.claim_expires_at < now()
RETURNING t.*;