mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-25 00:19:29 +02:00
Compare commits
2 Commits
agent/niko
...
agent/j/87
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
be2e9e174d | ||
|
|
0b5a210e15 |
@@ -1513,28 +1513,7 @@ func (d *Daemon) handleTask(ctx context.Context, task Task, slot int) {
|
||||
}
|
||||
}
|
||||
|
||||
switch result.Status {
|
||||
case "blocked":
|
||||
// Forward SessionID/WorkDir even on the blocked path: the agent may
|
||||
// have built a real session before getting stuck (rate-limit, tool
|
||||
// error, etc.) and we want the next chat turn to resume there
|
||||
// rather than start over and "forget" the conversation.
|
||||
failureReason := result.FailureReason
|
||||
if failureReason == "" {
|
||||
failureReason = "agent_error"
|
||||
}
|
||||
if err := d.client.FailTask(ctx, task.ID, result.Comment, result.SessionID, result.WorkDir, failureReason); err != nil {
|
||||
taskLog.Error("report blocked task failed", "error", err)
|
||||
}
|
||||
default:
|
||||
taskLog.Info("task completed", "status", result.Status)
|
||||
if err := d.client.CompleteTask(ctx, task.ID, result.Comment, result.BranchName, result.SessionID, result.WorkDir); err != nil {
|
||||
taskLog.Error("complete task failed, falling back to fail", "error", err)
|
||||
if failErr := d.client.FailTask(ctx, task.ID, fmt.Sprintf("complete task failed: %s", err.Error()), result.SessionID, result.WorkDir, "agent_error"); failErr != nil {
|
||||
taskLog.Error("fail task fallback also failed", "error", failErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
d.reportTaskResult(ctx, task.ID, result, taskLog)
|
||||
|
||||
// Write GC metadata after the task finishes so the periodic GC loop
|
||||
// can look up the parent record (issue / chat session / autopilot run /
|
||||
@@ -1549,6 +1528,42 @@ func (d *Daemon) handleTask(ctx context.Context, task Task, slot int) {
|
||||
}
|
||||
}
|
||||
|
||||
// reportTaskResult writes the final task disposition back to the server.
|
||||
//
|
||||
// Fail closed: only an explicit "completed" status is reported as success.
|
||||
// Anything else — "blocked", "cancelled", or any future status we forget to
|
||||
// enumerate — must go through FailTask, so a run that never produced a real
|
||||
// result can never be displayed as "Completed" in the UI (e.g. provider 429 /
|
||||
// out-of-credit / runtime crash). Forward SessionID/WorkDir on every path:
|
||||
// the agent may have built a real session before getting stuck, and we want
|
||||
// the next chat turn to resume there rather than start over and "forget"
|
||||
// the conversation.
|
||||
func (d *Daemon) reportTaskResult(ctx context.Context, taskID string, result TaskResult, taskLog *slog.Logger) {
|
||||
switch result.Status {
|
||||
case "completed":
|
||||
taskLog.Info("task completed", "status", result.Status)
|
||||
if err := d.client.CompleteTask(ctx, taskID, result.Comment, result.BranchName, result.SessionID, result.WorkDir); err != nil {
|
||||
taskLog.Error("complete task failed, falling back to fail", "error", err)
|
||||
if failErr := d.client.FailTask(ctx, taskID, fmt.Sprintf("complete task failed: %s", err.Error()), result.SessionID, result.WorkDir, "agent_error"); failErr != nil {
|
||||
taskLog.Error("fail task fallback also failed", "error", failErr)
|
||||
}
|
||||
}
|
||||
default:
|
||||
failureReason := result.FailureReason
|
||||
if failureReason == "" {
|
||||
if result.Status == "cancelled" {
|
||||
failureReason = "cancelled"
|
||||
} else {
|
||||
failureReason = "agent_error"
|
||||
}
|
||||
}
|
||||
taskLog.Info("task did not complete, reporting failure", "status", result.Status, "failure_reason", failureReason)
|
||||
if err := d.client.FailTask(ctx, taskID, result.Comment, result.SessionID, result.WorkDir, failureReason); err != nil {
|
||||
taskLog.Error("report failed task failed", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// gcMetaForTask classifies a finished task and produces a GCMeta of the right
|
||||
// kind. The discriminator order matters: a task carrying both an issue_id
|
||||
// and a chat_session_id (theoretical, not produced today) should be treated
|
||||
|
||||
@@ -1164,3 +1164,145 @@ func TestDefaultArgsForProvider(t *testing.T) {
|
||||
t.Fatalf("expected nil for unsupported provider, got %#v", got)
|
||||
}
|
||||
}
|
||||
|
||||
// reportTaskResultRecorder captures which terminal endpoint
|
||||
// (.../complete or .../fail) reportTaskResult hits and the body it
|
||||
// posts, so the tests can assert the disposition (success vs fail)
|
||||
// independently of the rest of handleTask.
|
||||
type reportTaskResultRecorder struct {
|
||||
mu sync.Mutex
|
||||
path string
|
||||
method string
|
||||
payload map[string]any
|
||||
}
|
||||
|
||||
func (r *reportTaskResultRecorder) handler(t *testing.T) http.HandlerFunc {
|
||||
t.Helper()
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
body, err := io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
t.Errorf("read body: %v", err)
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
var payload map[string]any
|
||||
if len(body) > 0 {
|
||||
if err := json.Unmarshal(body, &payload); err != nil {
|
||||
t.Errorf("decode body: %v", err)
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
}
|
||||
r.mu.Lock()
|
||||
r.path = req.URL.Path
|
||||
r.method = req.Method
|
||||
r.payload = payload
|
||||
r.mu.Unlock()
|
||||
w.WriteHeader(http.StatusOK)
|
||||
})
|
||||
}
|
||||
|
||||
func TestReportTaskResult_CompletedHitsCompleteEndpoint(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
rec := &reportTaskResultRecorder{}
|
||||
srv := httptest.NewServer(rec.handler(t))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()}
|
||||
d.reportTaskResult(context.Background(), "task-1", TaskResult{
|
||||
Status: "completed",
|
||||
Comment: "all good",
|
||||
BranchName: "agent/foo",
|
||||
SessionID: "ses-1",
|
||||
WorkDir: "/tmp/foo",
|
||||
}, slog.Default())
|
||||
|
||||
rec.mu.Lock()
|
||||
defer rec.mu.Unlock()
|
||||
if rec.path != "/api/daemon/tasks/task-1/complete" {
|
||||
t.Fatalf("expected /complete endpoint, got %s", rec.path)
|
||||
}
|
||||
if rec.payload["output"] != "all good" {
|
||||
t.Errorf("output: got %v", rec.payload["output"])
|
||||
}
|
||||
if rec.payload["branch_name"] != "agent/foo" {
|
||||
t.Errorf("branch_name: got %v", rec.payload["branch_name"])
|
||||
}
|
||||
if rec.payload["session_id"] != "ses-1" {
|
||||
t.Errorf("session_id: got %v", rec.payload["session_id"])
|
||||
}
|
||||
}
|
||||
|
||||
// Pins the GitHub multica#1952 fail-closed behaviour: a task whose
|
||||
// agent run never produced a real result (blocked, cancelled, or any
|
||||
// future status we forget to enumerate) MUST go through FailTask, so
|
||||
// the UI never shows a green "Completed" badge for a run that didn't
|
||||
// actually do anything (e.g. provider 429 / out-of-credit).
|
||||
func TestReportTaskResult_NonCompletedHitsFailEndpoint(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
status string
|
||||
failureReasonIn string
|
||||
wantFailureReason string
|
||||
}{
|
||||
{
|
||||
name: "blocked with explicit reason preserves it",
|
||||
status: "blocked",
|
||||
failureReasonIn: "iteration_limit",
|
||||
wantFailureReason: "iteration_limit",
|
||||
},
|
||||
{
|
||||
name: "blocked without reason defaults to agent_error",
|
||||
status: "blocked",
|
||||
failureReasonIn: "",
|
||||
wantFailureReason: "agent_error",
|
||||
},
|
||||
{
|
||||
name: "cancelled defaults to cancelled reason",
|
||||
status: "cancelled",
|
||||
failureReasonIn: "",
|
||||
wantFailureReason: "cancelled",
|
||||
},
|
||||
{
|
||||
name: "unknown status fails closed",
|
||||
status: "weird_new_status",
|
||||
failureReasonIn: "",
|
||||
wantFailureReason: "agent_error",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
rec := &reportTaskResultRecorder{}
|
||||
srv := httptest.NewServer(rec.handler(t))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
d := &Daemon{client: NewClient(srv.URL), logger: slog.Default()}
|
||||
d.reportTaskResult(context.Background(), "task-x", TaskResult{
|
||||
Status: tc.status,
|
||||
Comment: "rate limit reached",
|
||||
SessionID: "ses-x",
|
||||
WorkDir: "/tmp/x",
|
||||
FailureReason: tc.failureReasonIn,
|
||||
}, slog.Default())
|
||||
|
||||
rec.mu.Lock()
|
||||
defer rec.mu.Unlock()
|
||||
if rec.path != "/api/daemon/tasks/task-x/fail" {
|
||||
t.Fatalf("expected /fail endpoint for status=%q, got %s", tc.status, rec.path)
|
||||
}
|
||||
if rec.payload["error"] != "rate limit reached" {
|
||||
t.Errorf("error body: got %v", rec.payload["error"])
|
||||
}
|
||||
if got := rec.payload["failure_reason"]; got != tc.wantFailureReason {
|
||||
t.Errorf("failure_reason: got %v, want %q", got, tc.wantFailureReason)
|
||||
}
|
||||
if rec.payload["session_id"] != "ses-x" {
|
||||
t.Errorf("session_id should be forwarded on failure paths so chat resume keeps working, got %v", rec.payload["session_id"])
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -312,19 +312,15 @@ func (b *hermesBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
finalOutput := output.String()
|
||||
outputMu.Unlock()
|
||||
|
||||
// If hermes produced no visible output but we sniffed a
|
||||
// provider-level error on stderr (typically HTTP 4xx from
|
||||
// the configured LLM endpoint), promote the status to
|
||||
// failed and surface the real reason. Without this the
|
||||
// daemon reports a cryptic "hermes returned empty output"
|
||||
// and the actionable error (e.g. "model X not supported
|
||||
// with your ChatGPT account") stays buried in daemon logs.
|
||||
if finalStatus == "completed" && finalOutput == "" {
|
||||
if msg := providerErr.message(); msg != "" {
|
||||
finalStatus = "failed"
|
||||
finalError = msg
|
||||
}
|
||||
}
|
||||
// Hermes reports stopReason=end_turn even when the upstream
|
||||
// LLM call ultimately fails (HTTP 429 rate-limit, expired
|
||||
// token, ...). promoteACPResultOnProviderError flips the
|
||||
// status to "failed" when either the stderr sniffer saw a
|
||||
// *terminal* failure marker (not just a transient per-attempt
|
||||
// warning), the agent text stream contains the synthetic
|
||||
// "API call failed after N retries..." turn the adapter
|
||||
// injects on give-up, or there's no output to fall back on.
|
||||
finalStatus, finalError = promoteACPResultOnProviderError(finalStatus, finalError, finalOutput, providerErr)
|
||||
|
||||
// Build usage map.
|
||||
c.usageMu.Lock()
|
||||
@@ -1194,12 +1190,23 @@ func hermesToolNameFromTitle(title string, kind string) string {
|
||||
// Parameterised by provider name so both hermes and kimi can share
|
||||
// the transport: the regexes match format-level signals (HTTP status,
|
||||
// error-kind tags, "API call failed" banner) that both runtimes emit.
|
||||
//
|
||||
// The sniffer distinguishes *transient* per-attempt warnings (e.g.
|
||||
// "API call failed (attempt 1/3): RateLimitError [HTTP 429]" — followed
|
||||
// by a successful retry) from *terminal* exhausted failures (e.g.
|
||||
// "API call failed after 3 retries: ..." or "❌ ... Non-retryable"):
|
||||
// `message()` returns whichever was last seen, while `terminalMessage()`
|
||||
// returns non-empty only when a terminal-failure marker was matched.
|
||||
// Promotion to status="failed" must use `terminalMessage()`, otherwise
|
||||
// a successful retry following an early per-attempt warning would be
|
||||
// wrongly marked as failed.
|
||||
type acpProviderErrorSniffer struct {
|
||||
provider string
|
||||
mu sync.Mutex
|
||||
remains []byte // buffer for a partial trailing line across writes
|
||||
lines []string // captured error lines, bounded
|
||||
seen map[string]bool
|
||||
terminal bool // sticky: at least one line matched acpTerminalErrorRe
|
||||
}
|
||||
|
||||
// acpErrorHeaderRe matches the first line of an API-error block.
|
||||
@@ -1212,6 +1219,22 @@ var acpErrorHeaderRe = regexp.MustCompile(`(?:⚠️|❌|\[ERROR\]).*(?:BadReque
|
||||
// "Details:" tag actually spells out what happened).
|
||||
var acpErrorDetailRe = regexp.MustCompile(`(?:Error:|detail:|Details:)\s*(.+)`)
|
||||
|
||||
// acpTerminalErrorRe matches markers that only appear when the
|
||||
// adapter has *given up* on the upstream call — either after
|
||||
// exhausting retries ("after N retries"), or because the error is
|
||||
// classified as non-retryable up front (Non-retryable, BadRequest /
|
||||
// Authentication errors, ❌ / [ERROR] log levels). Per-attempt
|
||||
// warnings ("(attempt 1/3)") deliberately do NOT match this pattern.
|
||||
var acpTerminalErrorRe = regexp.MustCompile(`(?:❌|\[ERROR\]|after \d+ retr|Non-retryable|BadRequestError|AuthenticationError)`)
|
||||
|
||||
// acpAgentOutputTerminalRe matches the synthetic agent-text turn that
|
||||
// hermes-style ACP adapters inject when they exhaust retries against
|
||||
// the upstream LLM ("API call failed after 3 retries: HTTP 429..."),
|
||||
// surfaced via session/update agent_message_chunk and ending up in the
|
||||
// final output buffer. Per-attempt warnings (which only go to stderr
|
||||
// and use "(attempt N/M)" phrasing) won't match.
|
||||
var acpAgentOutputTerminalRe = regexp.MustCompile(`API call failed after \d+ retr(?:y|ies)`)
|
||||
|
||||
const acpMaxErrorLines = 8
|
||||
|
||||
// newACPProviderErrorSniffer returns a sniffer that tags its messages
|
||||
@@ -1247,6 +1270,9 @@ func (s *acpProviderErrorSniffer) Write(p []byte) (int, error) {
|
||||
if !(acpErrorHeaderRe.MatchString(line) || acpErrorDetailRe.MatchString(line)) {
|
||||
continue
|
||||
}
|
||||
if acpTerminalErrorRe.MatchString(line) {
|
||||
s.terminal = true
|
||||
}
|
||||
if s.seen[line] {
|
||||
continue
|
||||
}
|
||||
@@ -1263,10 +1289,37 @@ func (s *acpProviderErrorSniffer) Write(p []byte) (int, error) {
|
||||
// error field. Prefers the most specific "Error:" / "detail:"
|
||||
// fragment; falls back to the first captured header line; empty
|
||||
// when nothing useful was seen.
|
||||
//
|
||||
// NOTE: a non-empty message() can describe a *transient* per-attempt
|
||||
// warning that was followed by a successful retry. Code that flips
|
||||
// task status to "failed" must instead use terminalMessage() — see
|
||||
// the type doc above.
|
||||
func (s *acpProviderErrorSniffer) message() string {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.messageLocked()
|
||||
}
|
||||
|
||||
// terminalMessage returns the same single-line summary as message()
|
||||
// but only when the sniffer has seen at least one line matching
|
||||
// acpTerminalErrorRe — i.e. the adapter has given up retrying. This
|
||||
// is the signal callers should use to decide whether to promote a
|
||||
// run from "completed" to "failed". Returns empty if all captured
|
||||
// lines look like transient retry warnings.
|
||||
func (s *acpProviderErrorSniffer) terminalMessage() string {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if !s.terminal {
|
||||
return ""
|
||||
}
|
||||
return s.messageLocked()
|
||||
}
|
||||
|
||||
// messageLocked is the lock-held implementation shared by message()
|
||||
// and terminalMessage(). Caller must hold s.mu.
|
||||
func (s *acpProviderErrorSniffer) messageLocked() string {
|
||||
prefix := s.provider + " provider error: "
|
||||
for _, line := range s.lines {
|
||||
if m := acpErrorDetailRe.FindStringSubmatch(line); m != nil {
|
||||
@@ -1283,3 +1336,39 @@ func (s *acpProviderErrorSniffer) message() string {
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// promoteACPResultOnProviderError flips finalStatus to "failed" if
|
||||
// either (a) the stderr sniffer captured a terminal-failure marker,
|
||||
// (b) the adapter injected a synthetic "API call failed after N
|
||||
// retries..." turn into the agent text stream, or (c) output was
|
||||
// empty AND the sniffer captured anything at all (no real result to
|
||||
// fall back on, even from a transient-only sequence). Returns the
|
||||
// updated (status, error) pair; callers should overwrite their
|
||||
// locals with the result.
|
||||
//
|
||||
// This is the shared post-processing step for hermes/kimi/kiro.
|
||||
// Without it, runs that exhaust retries against the upstream LLM
|
||||
// (HTTP 429, expired token, …) silently report as "completed"
|
||||
// because session/prompt still ends with stopReason=end_turn — see
|
||||
// GitHub multica#1952.
|
||||
func promoteACPResultOnProviderError(finalStatus, finalError, finalOutput string, sniffer *acpProviderErrorSniffer) (string, string) {
|
||||
if finalStatus != "completed" {
|
||||
return finalStatus, finalError
|
||||
}
|
||||
if msg := sniffer.terminalMessage(); msg != "" {
|
||||
return "failed", msg
|
||||
}
|
||||
if acpAgentOutputTerminalRe.MatchString(finalOutput) {
|
||||
msg := sniffer.message()
|
||||
if msg == "" {
|
||||
msg = sniffer.provider + " provider error: " + acpAgentOutputTerminalRe.FindString(finalOutput)
|
||||
}
|
||||
return "failed", msg
|
||||
}
|
||||
if finalOutput == "" {
|
||||
if msg := sniffer.message(); msg != "" {
|
||||
return "failed", msg
|
||||
}
|
||||
}
|
||||
return finalStatus, finalError
|
||||
}
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestNewReturnsHermesBackend(t *testing.T) {
|
||||
@@ -967,3 +970,222 @@ func TestHermesProviderErrorSnifferBoundedBuffer(t *testing.T) {
|
||||
t.Errorf("sniffer kept %d lines, limit is %d", len(s.lines), acpMaxErrorLines)
|
||||
}
|
||||
}
|
||||
|
||||
// fakeHermesACPRateLimitScript impersonates hermes for the GitHub
|
||||
// multica#1952 scenario: the upstream LLM returns HTTP 429 (rate
|
||||
// limited / no credit), hermes retries internally and ultimately
|
||||
// emits both a sniffable stderr error block AND a synthetic agent
|
||||
// text turn ("API call failed after 3 retries..."), then completes
|
||||
// session/prompt with stopReason=end_turn (NOT an RPC error). The
|
||||
// daemon must still treat this as a failed run, not a successful
|
||||
// one — which means the hermes backend has to promote the status
|
||||
// to "failed" even though `output` is non-empty.
|
||||
func fakeHermesACPRateLimitScript() string {
|
||||
return `#!/bin/sh
|
||||
while IFS= read -r line; do
|
||||
id=$(printf '%s' "$line" | sed -n 's/.*"id":\([0-9]*\).*/\1/p')
|
||||
case "$line" in
|
||||
*'"method":"initialize"'*)
|
||||
printf '{"jsonrpc":"2.0","id":%s,"result":{"protocolVersion":1,"agentCapabilities":{}}}\n' "$id"
|
||||
;;
|
||||
*'"method":"session/new"'*)
|
||||
printf '{"jsonrpc":"2.0","id":%s,"result":{"sessionId":"ses_429"}}\n' "$id"
|
||||
;;
|
||||
*'"method":"session/prompt"'*)
|
||||
# Mimic hermes' real-world stderr block on a 429.
|
||||
printf '%s\n' '⚠️ API call failed (attempt 3/3): RateLimitError [HTTP 429]' >&2
|
||||
printf '%s\n' ' 📝 Error: HTTP 429: The usage limit has been reached' >&2
|
||||
# Mimic hermes injecting the failure as a synthetic agent turn so
|
||||
# the chat shows *something*; this puts text in output and used to
|
||||
# mask the failure from the daemon.
|
||||
printf '{"jsonrpc":"2.0","method":"session/update","params":{"sessionId":"ses_429","update":{"sessionUpdate":"agent_message_chunk","content":{"type":"text","text":"API call failed after 3 retries: HTTP 429: The usage limit has been reached"}}}}\n'
|
||||
printf '{"jsonrpc":"2.0","id":%s,"result":{"stopReason":"end_turn"}}\n' "$id"
|
||||
exit 0
|
||||
;;
|
||||
esac
|
||||
done
|
||||
`
|
||||
}
|
||||
|
||||
// TestHermesProviderErrorSnifferTerminalVsTransient verifies the
|
||||
// sniffer reports terminalMessage()=="" for a per-attempt warning
|
||||
// that did NOT escalate to an exhausted/non-retryable failure, but
|
||||
// still returns the same string from message() so callers wanting
|
||||
// diagnostic text can use it. This is what prevents the
|
||||
// promote-on-any-sniff false positive (a transient `attempt 1/3`
|
||||
// followed by a successful retry must stay "completed").
|
||||
func TestHermesProviderErrorSnifferTerminalVsTransient(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Transient: the sniffer DID see something matching acpErrorHeaderRe
|
||||
// (so `message()` is non-empty for diagnostic purposes), but the
|
||||
// signal is just "attempt 1/3 against a retryable rate limit" — no
|
||||
// terminal markers at all.
|
||||
s := newACPProviderErrorSniffer("hermes")
|
||||
s.Write([]byte("⚠️ API call failed (attempt 1/3): retryable upstream blip\n"))
|
||||
if msg := s.message(); msg == "" {
|
||||
t.Fatalf("sniffer should still capture transient warnings for diagnostics")
|
||||
}
|
||||
if msg := s.terminalMessage(); msg != "" {
|
||||
t.Fatalf("transient attempt should NOT be a terminal failure, got %q", msg)
|
||||
}
|
||||
|
||||
// Now feed a follow-on terminal marker. terminalMessage must turn on.
|
||||
s.Write([]byte("❌ API call failed after 3 retries: usage limit reached\n"))
|
||||
if msg := s.terminalMessage(); msg == "" {
|
||||
t.Fatalf("after-N-retries / ❌ should switch terminalMessage on")
|
||||
}
|
||||
}
|
||||
|
||||
// TestHermesProviderErrorSnifferTerminalNonRetryable verifies that a
|
||||
// non-retryable error (BadRequest / Authentication / Non-retryable)
|
||||
// is treated as terminal even on attempt 1/3 — those errors don't
|
||||
// retry, so the very first failure is the final disposition. Also
|
||||
// covers ❌ / [ERROR] / "after N retries" markers that adapters
|
||||
// emit on give-up.
|
||||
func TestHermesProviderErrorSnifferTerminalNonRetryable(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
for _, line := range []string{
|
||||
`⚠️ API call failed (attempt 1/3): BadRequestError [HTTP 400]`,
|
||||
`⚠️ API call failed (attempt 1/3): AuthenticationError [HTTP 401]`,
|
||||
`⚠️ API call failed (HTTP 400) attempt a: Non-retryable error`,
|
||||
`❌ API call failed after 3 retries: RateLimitError [HTTP 429]`,
|
||||
`[ERROR] API call failed: upstream returned HTTP 500`,
|
||||
} {
|
||||
s := newACPProviderErrorSniffer("hermes")
|
||||
s.Write([]byte(line + "\n"))
|
||||
if msg := s.terminalMessage(); msg == "" {
|
||||
t.Errorf("expected %q to be classified as terminal", line)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestHermesBackendPromotesProviderErrorWithNonEmptyOutput pins the
|
||||
// fix for GitHub multica#1952: a hermes run that hits a 429 (or any
|
||||
// upstream provider error) must surface as Status=failed even though
|
||||
// hermes' synthetic "API call failed..." agent turn means the output
|
||||
// buffer is non-empty. Before the fix the sniffer-promotion was
|
||||
// gated on `finalOutput == ""`, so the run silently completed.
|
||||
func TestHermesBackendPromotesProviderErrorWithNonEmptyOutput(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fakePath := filepath.Join(t.TempDir(), "hermes")
|
||||
writeTestExecutable(t, fakePath, []byte(fakeHermesACPRateLimitScript()))
|
||||
|
||||
backend, err := New("hermes", Config{ExecutablePath: fakePath, Logger: slog.Default()})
|
||||
if err != nil {
|
||||
t.Fatalf("new hermes backend: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
session, err := backend.Execute(ctx, "prompt-ignored", ExecOptions{
|
||||
Timeout: 5 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("execute: %v", err)
|
||||
}
|
||||
go func() {
|
||||
for range session.Messages {
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case result, ok := <-session.Result:
|
||||
if !ok {
|
||||
t.Fatal("result channel closed without a value")
|
||||
}
|
||||
if result.Status != "failed" {
|
||||
t.Fatalf("expected status=failed (sniffer should promote on 429 even with non-empty output), got %q (error=%q output=%q)", result.Status, result.Error, result.Output)
|
||||
}
|
||||
if !strings.Contains(result.Error, "429") && !strings.Contains(result.Error, "usage limit") {
|
||||
t.Errorf("expected error to surface the 429 / usage-limit message, got %q", result.Error)
|
||||
}
|
||||
if result.SessionID != "ses_429" {
|
||||
t.Errorf("expected session id to be preserved on failure, got %q", result.SessionID)
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("timeout waiting for result")
|
||||
}
|
||||
}
|
||||
|
||||
// fakeHermesACPTransientRetryScript emits a single retryable per-
|
||||
// attempt warning to stderr and then completes with a normal agent
|
||||
// text turn — the situation where the upstream LLM blipped on
|
||||
// attempt 1/3 but a subsequent attempt succeeded and produced a
|
||||
// real answer. The previous (too-broad) promotion logic would have
|
||||
// flipped this to status=failed; the fix must keep it as completed.
|
||||
func fakeHermesACPTransientRetryScript() string {
|
||||
return `#!/bin/sh
|
||||
while IFS= read -r line; do
|
||||
id=$(printf '%s' "$line" | sed -n 's/.*"id":\([0-9]*\).*/\1/p')
|
||||
case "$line" in
|
||||
*'"method":"initialize"'*)
|
||||
printf '{"jsonrpc":"2.0","id":%s,"result":{"protocolVersion":1,"agentCapabilities":{}}}\n' "$id"
|
||||
;;
|
||||
*'"method":"session/new"'*)
|
||||
printf '{"jsonrpc":"2.0","id":%s,"result":{"sessionId":"ses_ok"}}\n' "$id"
|
||||
;;
|
||||
*'"method":"session/prompt"'*)
|
||||
# Per-attempt rate-limit warning that hermes routinely logs on
|
||||
# transient blips — the request DOES retry and succeed below.
|
||||
printf '%s\n' '⚠️ API call failed (attempt 1/3): RateLimitError [HTTP 429]' >&2
|
||||
# Real agent answer streamed back as a normal text turn.
|
||||
printf '{"jsonrpc":"2.0","method":"session/update","params":{"sessionId":"ses_ok","update":{"sessionUpdate":"agent_message_chunk","content":{"type":"text","text":"Here is the answer you asked for."}}}}\n'
|
||||
printf '{"jsonrpc":"2.0","id":%s,"result":{"stopReason":"end_turn"}}\n' "$id"
|
||||
exit 0
|
||||
;;
|
||||
esac
|
||||
done
|
||||
`
|
||||
}
|
||||
|
||||
// TestHermesBackendDoesNotPromoteOnTransientRetry pins the
|
||||
// regression GPT-Boy flagged on the multica#1952 fix: a per-attempt
|
||||
// ⚠️ warning on stderr that does NOT include any terminal marker
|
||||
// ("after N retries", Non-retryable, ❌, [ERROR], BadRequest /
|
||||
// Authentication errors) and is followed by a successful agent
|
||||
// turn must stay status=completed. The previous "any sniffer line
|
||||
// → fail" rule would have wrongly marked this run as failed.
|
||||
func TestHermesBackendDoesNotPromoteOnTransientRetry(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fakePath := filepath.Join(t.TempDir(), "hermes")
|
||||
writeTestExecutable(t, fakePath, []byte(fakeHermesACPTransientRetryScript()))
|
||||
|
||||
backend, err := New("hermes", Config{ExecutablePath: fakePath, Logger: slog.Default()})
|
||||
if err != nil {
|
||||
t.Fatalf("new hermes backend: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
session, err := backend.Execute(ctx, "prompt-ignored", ExecOptions{
|
||||
Timeout: 5 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("execute: %v", err)
|
||||
}
|
||||
go func() {
|
||||
for range session.Messages {
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case result, ok := <-session.Result:
|
||||
if !ok {
|
||||
t.Fatal("result channel closed without a value")
|
||||
}
|
||||
if result.Status != "completed" {
|
||||
t.Fatalf("transient retry that ultimately succeeded must stay status=completed, got %q (error=%q output=%q)", result.Status, result.Error, result.Output)
|
||||
}
|
||||
if !strings.Contains(result.Output, "Here is the answer") {
|
||||
t.Errorf("expected the successful agent turn to be in output, got %q", result.Output)
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("timeout waiting for result")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -302,19 +302,13 @@ func (b *kimiBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
|
||||
finalOutput := output.String()
|
||||
outputMu.Unlock()
|
||||
|
||||
// If kimi produced no visible output but we sniffed a
|
||||
// provider-level error on stderr (typically HTTP 4xx from
|
||||
// api.kimi.com — token expired, rate-limited, upstream
|
||||
// 5xx, …), promote the status to failed and surface the
|
||||
// real reason. Without this the daemon reports a cryptic
|
||||
// "completed + empty output" and the actionable error
|
||||
// stays buried in daemon logs.
|
||||
if finalStatus == "completed" && finalOutput == "" {
|
||||
if msg := providerErr.message(); msg != "" {
|
||||
finalStatus = "failed"
|
||||
finalError = msg
|
||||
}
|
||||
}
|
||||
// Promote completed→failed when stderr or the agent text
|
||||
// stream show a terminal upstream-LLM failure (HTTP 4xx /
|
||||
// rate-limit / expired token). See the helper docs for the
|
||||
// full signal set; the key safety property is that transient
|
||||
// per-attempt warnings followed by a successful retry stay
|
||||
// "completed".
|
||||
finalStatus, finalError = promoteACPResultOnProviderError(finalStatus, finalError, finalOutput, providerErr)
|
||||
|
||||
c.usageMu.Lock()
|
||||
u := c.usage
|
||||
|
||||
@@ -297,12 +297,13 @@ func (b *kiroBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
|
||||
finalOutput := output.String()
|
||||
outputMu.Unlock()
|
||||
|
||||
if finalStatus == "completed" && finalOutput == "" {
|
||||
if msg := providerErr.message(); msg != "" {
|
||||
finalStatus = "failed"
|
||||
finalError = msg
|
||||
}
|
||||
}
|
||||
// Promote completed→failed when stderr or the agent text
|
||||
// stream show a terminal upstream-LLM failure (HTTP 4xx /
|
||||
// rate-limit / expired token). See the helper docs for the
|
||||
// full signal set; the key safety property is that transient
|
||||
// per-attempt warnings followed by a successful retry stay
|
||||
// "completed".
|
||||
finalStatus, finalError = promoteACPResultOnProviderError(finalStatus, finalError, finalOutput, providerErr)
|
||||
|
||||
c.usageMu.Lock()
|
||||
u := c.usage
|
||||
|
||||
Reference in New Issue
Block a user