mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 19:59:20 +02:00
Compare commits
9 Commits
fix/cloud-
...
agent/lamb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
92de9bea78 | ||
|
|
0529d28133 | ||
|
|
cd414a52ea | ||
|
|
f675f03fbb | ||
|
|
0a97663acb | ||
|
|
953fdd5003 | ||
|
|
e70f44b92b | ||
|
|
281f1073b5 | ||
|
|
6758feba05 |
@@ -53,6 +53,7 @@ import { ResolvedThreadBar } from "./resolved-thread-bar";
|
||||
import { collectThreadReplies } from "./thread-utils";
|
||||
import { AgentLiveCard } from "./agent-live-card";
|
||||
import { ExecutionLogSection } from "./execution-log-section";
|
||||
import { TerminalPanel } from "./terminal-panel";
|
||||
import { PullRequestList } from "./pull-request-list";
|
||||
import { useQuery } from "@tanstack/react-query";
|
||||
import { useAuthStore } from "@multica/core/auth";
|
||||
@@ -1366,6 +1367,12 @@ export function IssueDetail({ issueId, onDelete, onDone, defaultSidebarOpen = tr
|
||||
when there are no runs to show. */}
|
||||
<ExecutionLogSection issueId={id} />
|
||||
|
||||
{/* Terminal panel — attaches to the PTY running in the daemon's
|
||||
workdir for the latest agent task. Desktop-only (the panel
|
||||
itself renders an explanatory placeholder on web).
|
||||
See MUL-2295. */}
|
||||
<TerminalPanelSection issueId={id} workspaceId={wsId} />
|
||||
|
||||
{/* Token usage */}
|
||||
{usage && usage.task_count > 0 && (
|
||||
<div>
|
||||
@@ -1904,3 +1911,23 @@ export function IssueDetail({ issueId, onDelete, onDone, defaultSidebarOpen = tr
|
||||
</ResizablePanelGroup>
|
||||
);
|
||||
}
|
||||
|
||||
// TerminalPanelSection wraps TerminalPanel in a collapsible header that
|
||||
// matches the existing sidebar sections (Token usage, etc.). Collapsed
|
||||
// by default — opening a PTY is an explicit action, and ResizeObserver +
|
||||
// xterm bootstrap should not run for every issue view.
|
||||
function TerminalPanelSection({ issueId, workspaceId }: { issueId: string; workspaceId: string }) {
|
||||
const [open, setOpen] = useState(false);
|
||||
return (
|
||||
<div className="mt-6">
|
||||
<button
|
||||
className={`flex w-full items-center gap-1 rounded-md px-2 py-1 text-xs font-medium transition-colors mb-2 hover:bg-accent/70 ${open ? "" : "text-muted-foreground hover:text-foreground"}`}
|
||||
onClick={() => setOpen((v) => !v)}
|
||||
>
|
||||
Terminal
|
||||
<ChevronRight className={`!size-3 shrink-0 stroke-[2.5] text-muted-foreground transition-transform ${open ? "rotate-90" : ""}`} />
|
||||
</button>
|
||||
{open && <TerminalPanel issueId={issueId} workspaceId={workspaceId} />}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
342
packages/views/issues/components/terminal-panel.tsx
Normal file
342
packages/views/issues/components/terminal-panel.tsx
Normal file
@@ -0,0 +1,342 @@
|
||||
"use client";
|
||||
|
||||
import { useEffect, useMemo, useRef, useState } from "react";
|
||||
import { Terminal as XTerminal } from "@xterm/xterm";
|
||||
import { FitAddon } from "@xterm/addon-fit";
|
||||
import { getApi } from "@multica/core/api";
|
||||
import { Button } from "@multica/ui/components/ui/button";
|
||||
import "@xterm/xterm/css/xterm.css";
|
||||
|
||||
// Protocol message types — kept in lockstep with
|
||||
// server/pkg/protocol/messages.go. Strings are stable across daemon /
|
||||
// server / browser, so duplicating them client-side is OK; if we ever
|
||||
// regenerate types from Go we can swap these out.
|
||||
const MSG_TERMINAL_DATA = "terminal.data";
|
||||
const MSG_TERMINAL_RESIZE = "terminal.resize";
|
||||
const MSG_TERMINAL_CLOSE = "terminal.close";
|
||||
const MSG_TERMINAL_OPENED = "terminal.opened";
|
||||
const MSG_TERMINAL_EXIT = "terminal.exit";
|
||||
const MSG_TERMINAL_ERROR = "terminal.error";
|
||||
|
||||
interface Envelope {
|
||||
type: string;
|
||||
payload: unknown;
|
||||
}
|
||||
|
||||
interface OpenedPayload {
|
||||
request_id: string;
|
||||
session_id: string;
|
||||
work_dir: string;
|
||||
shell: string;
|
||||
}
|
||||
|
||||
interface DataPayload {
|
||||
session_id: string;
|
||||
data_b64: string;
|
||||
}
|
||||
|
||||
interface ExitPayload {
|
||||
session_id: string;
|
||||
exit_code: number;
|
||||
reason?: string;
|
||||
}
|
||||
|
||||
interface ErrorPayload {
|
||||
request_id?: string;
|
||||
session_id?: string;
|
||||
code: string;
|
||||
message: string;
|
||||
}
|
||||
|
||||
// Detect Electron — server-side render guard plus the desktop preload
|
||||
// surface check. Mirrors the pattern used elsewhere in the desktop app;
|
||||
// the Terminal panel is intentionally desktop-only because the daemon
|
||||
// only runs on a developer machine.
|
||||
function isDesktopRuntime(): boolean {
|
||||
return typeof window !== "undefined" && "desktopAPI" in window;
|
||||
}
|
||||
|
||||
interface TerminalPanelProps {
|
||||
issueId: string;
|
||||
workspaceId: string;
|
||||
}
|
||||
|
||||
export function TerminalPanel({ issueId, workspaceId }: TerminalPanelProps) {
|
||||
const containerRef = useRef<HTMLDivElement | null>(null);
|
||||
const termRef = useRef<XTerminal | null>(null);
|
||||
const fitRef = useRef<FitAddon | null>(null);
|
||||
const wsRef = useRef<WebSocket | null>(null);
|
||||
const sessionIdRef = useRef<string>("");
|
||||
|
||||
const [status, setStatus] = useState<
|
||||
"idle" | "connecting" | "connected" | "closed" | "error"
|
||||
>("idle");
|
||||
const [errorMessage, setErrorMessage] = useState<string>("");
|
||||
const [reconnectKey, setReconnectKey] = useState(0);
|
||||
|
||||
const wsUrl = useMemo(() => deriveTerminalWsUrl(issueId, workspaceId), [
|
||||
issueId,
|
||||
workspaceId,
|
||||
]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!isDesktopRuntime()) return;
|
||||
if (!containerRef.current) return;
|
||||
|
||||
const term = new XTerminal({
|
||||
convertEol: true,
|
||||
cursorBlink: true,
|
||||
fontFamily:
|
||||
"ui-monospace, SFMono-Regular, Menlo, Monaco, 'Cascadia Mono', 'Roboto Mono', 'Courier New', monospace",
|
||||
fontSize: 13,
|
||||
theme: { background: "#0b0b0b", foreground: "#e6e6e6" },
|
||||
// Scrollback large enough to read a verbose `cargo build` or `git
|
||||
// log` without auto-clipping the top.
|
||||
scrollback: 5000,
|
||||
});
|
||||
const fit = new FitAddon();
|
||||
term.loadAddon(fit);
|
||||
term.open(containerRef.current);
|
||||
fit.fit();
|
||||
termRef.current = term;
|
||||
fitRef.current = fit;
|
||||
|
||||
term.writeln("\x1b[90mconnecting to daemon…\x1b[0m");
|
||||
|
||||
setStatus("connecting");
|
||||
const ws = new WebSocket(wsUrl);
|
||||
wsRef.current = ws;
|
||||
|
||||
ws.onopen = () => {
|
||||
// Cookie auth carries the session by default. If we ever flip to
|
||||
// token-mode (no cookie), this is where we'd send an `auth` frame
|
||||
// mirroring realtime/ws-client.ts. Server falls back gracefully.
|
||||
setStatus("connected");
|
||||
};
|
||||
|
||||
ws.onerror = () => {
|
||||
// The browser only surfaces a generic Event; the server sends a
|
||||
// structured terminal.error frame which we already render below.
|
||||
// Keep this minimal so we don't double-up the error UI.
|
||||
setStatus("error");
|
||||
};
|
||||
|
||||
ws.onclose = (ev) => {
|
||||
setStatus("closed");
|
||||
term.writeln(
|
||||
`\r\n\x1b[90mconnection closed (code=${ev.code})${
|
||||
ev.reason ? ` reason=${ev.reason}` : ""
|
||||
}\x1b[0m`,
|
||||
);
|
||||
};
|
||||
|
||||
ws.onmessage = (ev) => {
|
||||
let env: Envelope;
|
||||
try {
|
||||
env = JSON.parse(typeof ev.data === "string" ? ev.data : "");
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
switch (env.type) {
|
||||
case MSG_TERMINAL_OPENED: {
|
||||
const p = env.payload as OpenedPayload;
|
||||
sessionIdRef.current = p.session_id;
|
||||
term.writeln(
|
||||
`\x1b[90mattached to ${p.shell} (cwd: ${p.work_dir})\x1b[0m`,
|
||||
);
|
||||
// Send an initial resize matching the terminal's actual size,
|
||||
// because the server-side open uses default 80x24 until we tell
|
||||
// it otherwise.
|
||||
const cols = term.cols;
|
||||
const rows = term.rows;
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
type: MSG_TERMINAL_RESIZE,
|
||||
payload: {
|
||||
session_id: p.session_id,
|
||||
cols,
|
||||
rows,
|
||||
},
|
||||
}),
|
||||
);
|
||||
break;
|
||||
}
|
||||
case MSG_TERMINAL_DATA: {
|
||||
const p = env.payload as DataPayload;
|
||||
if (typeof p.data_b64 !== "string") break;
|
||||
const decoded = atobToUint8(p.data_b64);
|
||||
// xterm.js accepts Uint8Array; we avoid the latin1 round-trip
|
||||
// that would otherwise mangle UTF-8 PTY output.
|
||||
term.write(decoded);
|
||||
break;
|
||||
}
|
||||
case MSG_TERMINAL_EXIT: {
|
||||
const p = env.payload as ExitPayload;
|
||||
term.writeln(
|
||||
`\r\n\x1b[90mprocess exited (code=${p.exit_code}${
|
||||
p.reason ? `, reason=${p.reason}` : ""
|
||||
})\x1b[0m`,
|
||||
);
|
||||
ws.close();
|
||||
break;
|
||||
}
|
||||
case MSG_TERMINAL_ERROR: {
|
||||
const p = env.payload as ErrorPayload;
|
||||
setErrorMessage(`${p.code}: ${p.message}`);
|
||||
term.writeln(`\r\n\x1b[31m${p.code}: ${p.message}\x1b[0m`);
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Forward keystrokes as terminal.data with base64 of the UTF-8 bytes.
|
||||
const dataSub = term.onData((data) => {
|
||||
if (ws.readyState !== WebSocket.OPEN) return;
|
||||
if (!sessionIdRef.current) return;
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
type: MSG_TERMINAL_DATA,
|
||||
payload: {
|
||||
session_id: sessionIdRef.current,
|
||||
data_b64: utf8ToBase64(data),
|
||||
},
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
const resizeSub = term.onResize(({ cols, rows }) => {
|
||||
if (ws.readyState !== WebSocket.OPEN) return;
|
||||
if (!sessionIdRef.current) return;
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
type: MSG_TERMINAL_RESIZE,
|
||||
payload: {
|
||||
session_id: sessionIdRef.current,
|
||||
cols,
|
||||
rows,
|
||||
},
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
// Observe container size and re-fit so the PTY size tracks the panel
|
||||
// (the right sidebar can be resized at runtime).
|
||||
const ro = new ResizeObserver(() => {
|
||||
try {
|
||||
fit.fit();
|
||||
} catch {
|
||||
// fit() throws when the container has zero height during teardown;
|
||||
// ignore — the next mount will rebind.
|
||||
}
|
||||
});
|
||||
ro.observe(containerRef.current);
|
||||
|
||||
return () => {
|
||||
dataSub.dispose();
|
||||
resizeSub.dispose();
|
||||
ro.disconnect();
|
||||
try {
|
||||
if (sessionIdRef.current && ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
type: MSG_TERMINAL_CLOSE,
|
||||
payload: { session_id: sessionIdRef.current, reason: "panel_unmount" },
|
||||
}),
|
||||
);
|
||||
}
|
||||
} catch {
|
||||
// ws may be already closing; nothing to do.
|
||||
}
|
||||
ws.close();
|
||||
term.dispose();
|
||||
termRef.current = null;
|
||||
fitRef.current = null;
|
||||
wsRef.current = null;
|
||||
sessionIdRef.current = "";
|
||||
};
|
||||
}, [wsUrl, reconnectKey]);
|
||||
|
||||
if (!isDesktopRuntime()) {
|
||||
return (
|
||||
<div className="rounded-md border border-dashed p-4 text-sm text-muted-foreground">
|
||||
The terminal is only available in the Multica Desktop app. It attaches
|
||||
to the PTY hosted by the local daemon that ran the agent task.
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
return (
|
||||
<div className="flex flex-col gap-2">
|
||||
<div className="flex items-center justify-between text-xs text-muted-foreground">
|
||||
<span>
|
||||
Status: <span className="font-medium">{status}</span>
|
||||
{errorMessage ? (
|
||||
<span className="ml-2 text-destructive">— {errorMessage}</span>
|
||||
) : null}
|
||||
</span>
|
||||
<Button
|
||||
variant="ghost"
|
||||
size="sm"
|
||||
onClick={() => {
|
||||
setErrorMessage("");
|
||||
setReconnectKey((n) => n + 1);
|
||||
}}
|
||||
>
|
||||
Reconnect
|
||||
</Button>
|
||||
</div>
|
||||
<div
|
||||
ref={containerRef}
|
||||
className="h-[360px] w-full overflow-hidden rounded-md border bg-black"
|
||||
/>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
function deriveTerminalWsUrl(issueId: string, workspaceId: string): string {
|
||||
// The API client knows the http(s) base URL; flip the scheme to ws(s)
|
||||
// and target the proxy endpoint registered in router.go. Falls back to
|
||||
// the page origin if for some reason the API base is empty (dev
|
||||
// environments where the API lives on the same host).
|
||||
let base = "";
|
||||
try {
|
||||
base = getApi().getBaseUrl();
|
||||
} catch {
|
||||
base = "";
|
||||
}
|
||||
if (!base && typeof window !== "undefined") {
|
||||
base = window.location.origin;
|
||||
}
|
||||
const url = new URL(base);
|
||||
if (url.protocol === "https:") {
|
||||
url.protocol = "wss:";
|
||||
} else if (url.protocol === "http:") {
|
||||
url.protocol = "ws:";
|
||||
}
|
||||
url.pathname = url.pathname.replace(/\/$/, "") +
|
||||
`/ws/issues/${encodeURIComponent(issueId)}/terminal`;
|
||||
url.search = `?workspace_id=${encodeURIComponent(workspaceId)}&cols=120&rows=30`;
|
||||
return url.toString();
|
||||
}
|
||||
|
||||
function utf8ToBase64(s: string): string {
|
||||
if (typeof TextEncoder !== "undefined") {
|
||||
const bytes = new TextEncoder().encode(s);
|
||||
let bin = "";
|
||||
bytes.forEach((b) => {
|
||||
bin += String.fromCharCode(b);
|
||||
});
|
||||
return btoa(bin);
|
||||
}
|
||||
// Fallback for old runtimes: assume latin1.
|
||||
return btoa(s);
|
||||
}
|
||||
|
||||
function atobToUint8(s: string): Uint8Array {
|
||||
const bin = atob(s);
|
||||
const out = new Uint8Array(bin.length);
|
||||
for (let i = 0; i < bin.length; i++) {
|
||||
out[i] = bin.charCodeAt(i);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
@@ -76,6 +76,8 @@
|
||||
"@tiptap/react": "^3.22.1",
|
||||
"@tiptap/starter-kit": "^3.22.1",
|
||||
"@tiptap/suggestion": "^3.22.1",
|
||||
"@xterm/xterm": "catalog:",
|
||||
"@xterm/addon-fit": "catalog:",
|
||||
"cmdk": "^1.1.1",
|
||||
"hast-util-to-html": "^4.0.1",
|
||||
"katex": "catalog:",
|
||||
|
||||
26
pnpm-lock.yaml
generated
26
pnpm-lock.yaml
generated
@@ -33,6 +33,12 @@ catalogs:
|
||||
'@vitejs/plugin-react':
|
||||
specifier: ^6.0.1
|
||||
version: 6.0.1
|
||||
'@xterm/addon-fit':
|
||||
specifier: ^0.10.0
|
||||
version: 0.10.0
|
||||
'@xterm/xterm':
|
||||
specifier: ^5.5.0
|
||||
version: 5.5.0
|
||||
class-variance-authority:
|
||||
specifier: ^0.7.1
|
||||
version: 0.7.1
|
||||
@@ -775,6 +781,12 @@ importers:
|
||||
'@tiptap/suggestion':
|
||||
specifier: ^3.22.1
|
||||
version: 3.22.1(@tiptap/core@3.22.1(@tiptap/pm@3.22.1))(@tiptap/pm@3.22.1)
|
||||
'@xterm/addon-fit':
|
||||
specifier: 'catalog:'
|
||||
version: 0.10.0(@xterm/xterm@5.5.0)
|
||||
'@xterm/xterm':
|
||||
specifier: 'catalog:'
|
||||
version: 5.5.0
|
||||
cmdk:
|
||||
specifier: ^1.1.1
|
||||
version: 1.1.1(@types/react-dom@19.2.3(@types/react@19.2.14))(@types/react@19.2.14)(react-dom@19.2.3(react@19.2.3))(react@19.2.3)
|
||||
@@ -3318,6 +3330,14 @@ packages:
|
||||
resolution: {integrity: sha512-9k/gHF6n/pAi/9tqr3m3aqkuiNosYTurLLUtc7xQ9sxB/wm7WPygCv8GYa6mS0fLJEHhqMC1ATYhz++U/lRHqg==}
|
||||
engines: {node: '>=10.0.0'}
|
||||
|
||||
'@xterm/addon-fit@0.10.0':
|
||||
resolution: {integrity: sha512-UFYkDm4HUahf2lnEyHvio51TNGiLK66mqP2JoATy7hRZeXaGMRDr00JiSF7m63vR5WKATF605yEggJKsw0JpMQ==}
|
||||
peerDependencies:
|
||||
'@xterm/xterm': ^5.0.0
|
||||
|
||||
'@xterm/xterm@5.5.0':
|
||||
resolution: {integrity: sha512-hqJHYaQb5OptNunnyAnkHyM8aCjZ1MEIDTQu1iIbbTD/xops91NB5yq1ZK/dC2JDbVWtF23zUtl9JE2NqwT87A==}
|
||||
|
||||
abbrev@3.0.1:
|
||||
resolution: {integrity: sha512-AO2ac6pjRB3SJmGJo+v5/aK6Omggp6fsLrs6wN9bd35ulu4cCwaAU9+7ZhXjeqHVkaHThLuzH0nZr0YpCDhygg==}
|
||||
engines: {node: ^18.17.0 || >=20.5.0}
|
||||
@@ -10103,6 +10123,12 @@ snapshots:
|
||||
|
||||
'@xmldom/xmldom@0.8.12': {}
|
||||
|
||||
'@xterm/addon-fit@0.10.0(@xterm/xterm@5.5.0)':
|
||||
dependencies:
|
||||
'@xterm/xterm': 5.5.0
|
||||
|
||||
'@xterm/xterm@5.5.0': {}
|
||||
|
||||
abbrev@3.0.1: {}
|
||||
|
||||
accepts@2.0.0:
|
||||
|
||||
@@ -49,6 +49,10 @@ catalog:
|
||||
# Virtualized timeline (issue detail comments)
|
||||
react-virtuoso: "^4.14.0"
|
||||
|
||||
# Interactive PTY rendering (Issue → Terminal panel on Desktop, MUL-2295)
|
||||
"@xterm/xterm": "^5.5.0"
|
||||
"@xterm/addon-fit": "^0.10.0"
|
||||
|
||||
# Product analytics
|
||||
posthog-js: "^1.176.1"
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
@@ -1066,6 +1067,30 @@ func runIssueRuns(cmd *cobra.Command, args []string) error {
|
||||
return fmt.Errorf("list runs: %w", err)
|
||||
}
|
||||
|
||||
// Merge in interactive terminal sessions so `issue runs` is the single
|
||||
// surface for "what has happened in this workdir" — agent runs and the
|
||||
// `type=terminal` audit rows side by side, per RFC §"与现有 agent run
|
||||
// sandbox 的关系". The endpoint is new in MUL-2295 Phase 4; older
|
||||
// servers return 404, which we treat as "no terminal feature" rather
|
||||
// than a hard error so the CLI keeps working against pre-feature
|
||||
// servers. Any non-404 failure (500, auth, network) is surfaced to
|
||||
// stderr so a real regression doesn't silently downgrade to "no
|
||||
// terminal rows" — see Phase 4 review feedback.
|
||||
var terminals []map[string]any
|
||||
if err := client.GetJSON(ctx, "/api/issues/"+issueRef.ID+"/terminal-sessions", &terminals); err != nil {
|
||||
var httpErr *cli.HTTPError
|
||||
if !errors.As(err, &httpErr) || httpErr.StatusCode != http.StatusNotFound {
|
||||
fmt.Fprintf(os.Stderr, "warning: list terminal sessions: %v\n", err)
|
||||
}
|
||||
} else {
|
||||
runs = append(runs, terminals...)
|
||||
}
|
||||
// Sort merged list newest-first by started_at so the table renders in
|
||||
// the same order whether or not terminal rows are present.
|
||||
sort.SliceStable(runs, func(i, j int) bool {
|
||||
return strVal(runs[i], "started_at") > strVal(runs[j], "started_at")
|
||||
})
|
||||
|
||||
output, _ := cmd.Flags().GetString("output")
|
||||
if output == "json" {
|
||||
return cli.PrintJSON(os.Stdout, runs)
|
||||
@@ -1081,6 +1106,9 @@ func runIssueRuns(cmd *cobra.Command, args []string) error {
|
||||
started = started[:16]
|
||||
}
|
||||
completed := strVal(r, "completed_at")
|
||||
if completed == "" {
|
||||
completed = strVal(r, "ended_at")
|
||||
}
|
||||
if len(completed) >= 16 {
|
||||
completed = completed[:16]
|
||||
}
|
||||
@@ -1089,9 +1117,20 @@ func runIssueRuns(cmd *cobra.Command, args []string) error {
|
||||
runes := []rune(errMsg)
|
||||
errMsg = string(runes[:47]) + "..."
|
||||
}
|
||||
// Terminal rows have kind="terminal" and no agent_id. Surface them
|
||||
// with a synthetic agent column ("terminal") so the user can tell
|
||||
// the two row kinds apart at a glance without us adding a new
|
||||
// column (keeps the table width stable on narrow terminals).
|
||||
agent := actors.agent(strVal(r, "agent_id"))
|
||||
if strVal(r, "kind") == "terminal" {
|
||||
agent = "terminal"
|
||||
if errMsg == "" {
|
||||
errMsg = strVal(r, "close_reason")
|
||||
}
|
||||
}
|
||||
rows = append(rows, []string{
|
||||
displayID(strVal(r, "id"), fullID),
|
||||
actors.agent(strVal(r, "agent_id")),
|
||||
agent,
|
||||
strVal(r, "status"),
|
||||
started,
|
||||
completed,
|
||||
|
||||
563
server/cmd/multica/cmd_issue_terminal.go
Normal file
563
server/cmd/multica/cmd_issue_terminal.go
Normal file
@@ -0,0 +1,563 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/spf13/cobra"
|
||||
"golang.org/x/term"
|
||||
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
var issueTerminalCmd = &cobra.Command{
|
||||
Use: "terminal <issue-id>",
|
||||
Short: "Attach to the issue's most recent agent task PTY",
|
||||
Long: "Open an interactive shell inside the workdir of the issue's most recent agent task. " +
|
||||
"Reuses the daemon-side PTY manager added in MUL-2295 — the daemon spawns a bash login " +
|
||||
"shell with CLAUDE_SESSION_ID + MULTICA_{WORKSPACE,ISSUE,TASK,USER}_ID injected so you " +
|
||||
"can immediately `claude --resume $CLAUDE_SESSION_ID`.\n\n" +
|
||||
"Detach without closing your shell: type `<enter>~.` (escape sequence). The daemon-side " +
|
||||
"session is currently torn down on disconnect — see RFC follow-up for `--attach`.",
|
||||
Args: exactArgs(1),
|
||||
RunE: runIssueTerminal,
|
||||
}
|
||||
|
||||
const (
|
||||
terminalDefaultCols = 80
|
||||
terminalDefaultRows = 24
|
||||
terminalAuthAckTimeout = 10 * time.Second
|
||||
terminalOpenAckTimeout = 15 * time.Second
|
||||
terminalServerWriteWait = 10 * time.Second
|
||||
terminalServerReadLimit = 1 << 20 // 1 MiB per frame; matches realistic xterm bursts
|
||||
terminalDetachExitMessage = "[multica] detached — daemon session was torn down"
|
||||
)
|
||||
|
||||
func init() {
|
||||
issueCmd.AddCommand(issueTerminalCmd)
|
||||
issueTerminalCmd.Flags().Uint16("cols", 0, "Initial terminal columns (defaults to detected size, or 80 if stdout is not a TTY)")
|
||||
issueTerminalCmd.Flags().Uint16("rows", 0, "Initial terminal rows (defaults to detected size, or 24 if stdout is not a TTY)")
|
||||
issueTerminalCmd.Flags().String("escape-char", "~", "Escape character for detach sequence (`<enter><esc>.` to detach). Empty disables escape detection.")
|
||||
issueTerminalCmd.Flags().Bool("no-raw", false, "Don't put the local TTY into raw mode (mostly for testing / piped input)")
|
||||
}
|
||||
|
||||
func runIssueTerminal(cmd *cobra.Command, args []string) error {
|
||||
client, err := newAPIClient(cmd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := requireWorkspaceID(cmd); err != nil {
|
||||
return err
|
||||
}
|
||||
token := resolveToken(cmd)
|
||||
if token == "" {
|
||||
return fmt.Errorf("not authenticated: run 'multica login'")
|
||||
}
|
||||
|
||||
resolveCtx, cancelResolve := context.WithTimeout(cmd.Context(), 15*time.Second)
|
||||
defer cancelResolve()
|
||||
issueRef, err := resolveIssueRef(resolveCtx, client, args[0])
|
||||
if err != nil {
|
||||
return fmt.Errorf("resolve issue: %w", err)
|
||||
}
|
||||
|
||||
// Detect terminal size from stdout (the surface the user actually sees);
|
||||
// fall back to defaults if stdout is piped. Flag overrides win.
|
||||
cols, rows := detectInitialSize(cmd)
|
||||
|
||||
pathAndQuery := buildTerminalPathAndQuery(issueRef.ID, client.WorkspaceID, cols, rows)
|
||||
|
||||
// Use a long-lived context for the WS connection; cancellation is driven
|
||||
// by the proxy goroutines + signals rather than a timeout.
|
||||
conn, _, err := client.DialWebSocket(cmd.Context(), pathAndQuery)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dial terminal websocket: %w", err)
|
||||
}
|
||||
|
||||
proxy := newCLITerminalProxy(conn, os.Stdin, os.Stdout, os.Stderr, token, cmd)
|
||||
return proxy.run(cmd.Context(), cols, rows)
|
||||
}
|
||||
|
||||
func detectInitialSize(cmd *cobra.Command) (uint16, uint16) {
|
||||
cols, _ := cmd.Flags().GetUint16("cols")
|
||||
rows, _ := cmd.Flags().GetUint16("rows")
|
||||
if cols > 0 && rows > 0 {
|
||||
return cols, rows
|
||||
}
|
||||
if c, r, err := term.GetSize(int(os.Stdout.Fd())); err == nil && c > 0 && r > 0 {
|
||||
if cols == 0 {
|
||||
cols = uint16(c)
|
||||
}
|
||||
if rows == 0 {
|
||||
rows = uint16(r)
|
||||
}
|
||||
}
|
||||
if cols == 0 {
|
||||
cols = terminalDefaultCols
|
||||
}
|
||||
if rows == 0 {
|
||||
rows = terminalDefaultRows
|
||||
}
|
||||
return cols, rows
|
||||
}
|
||||
|
||||
func buildTerminalPathAndQuery(issueID, workspaceID string, cols, rows uint16) string {
|
||||
q := url.Values{}
|
||||
q.Set("workspace_id", workspaceID)
|
||||
q.Set("cols", strconv.FormatUint(uint64(cols), 10))
|
||||
q.Set("rows", strconv.FormatUint(uint64(rows), 10))
|
||||
return "/ws/issues/" + url.PathEscape(issueID) + "/terminal?" + q.Encode()
|
||||
}
|
||||
|
||||
// cliTerminalProxy mirrors the server-side terminalProxy: one goroutine
|
||||
// owns conn writes, one owns conn reads, plus a stdin reader and resize
|
||||
// watcher. The struct is the only owner of the websocket.Conn; all writes
|
||||
// go through writeFrame() to keep a single point that holds writeMu.
|
||||
type cliTerminalProxy struct {
|
||||
conn *websocket.Conn
|
||||
stdin io.Reader
|
||||
stdout io.Writer
|
||||
stderr io.Writer
|
||||
token string
|
||||
cmd *cobra.Command
|
||||
|
||||
writeMu sync.Mutex
|
||||
|
||||
sessionMu sync.RWMutex
|
||||
sessionID string
|
||||
|
||||
closeOnce sync.Once
|
||||
doneCh chan struct{}
|
||||
|
||||
// exit reporting from the read pump back to the orchestrator.
|
||||
exitCode atomic.Int32 // 0 = unset, see exitCodeUnset / >=1
|
||||
exitMsg atomic.Pointer[string]
|
||||
|
||||
escapeChar byte
|
||||
noRaw bool
|
||||
}
|
||||
|
||||
const exitCodeUnset int32 = -1
|
||||
|
||||
func newCLITerminalProxy(conn *websocket.Conn, stdin io.Reader, stdout, stderr io.Writer, token string, cmd *cobra.Command) *cliTerminalProxy {
|
||||
escape, _ := cmd.Flags().GetString("escape-char")
|
||||
noRaw, _ := cmd.Flags().GetBool("no-raw")
|
||||
var ec byte
|
||||
if len(escape) >= 1 {
|
||||
ec = escape[0]
|
||||
}
|
||||
p := &cliTerminalProxy{
|
||||
conn: conn,
|
||||
stdin: stdin,
|
||||
stdout: stdout,
|
||||
stderr: stderr,
|
||||
token: token,
|
||||
cmd: cmd,
|
||||
doneCh: make(chan struct{}),
|
||||
escapeChar: ec,
|
||||
noRaw: noRaw,
|
||||
}
|
||||
p.exitCode.Store(exitCodeUnset)
|
||||
conn.SetReadLimit(terminalServerReadLimit)
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *cliTerminalProxy) run(ctx context.Context, cols, rows uint16) error {
|
||||
defer p.conn.Close()
|
||||
|
||||
if err := p.handshake(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Push our local size right after open in case the server's hardcoded
|
||||
// initial 80x24 didn't match. (Phase 2 server stamps 80x24 on the
|
||||
// daemon-bound terminal.open frame regardless of query string; sending
|
||||
// resize immediately makes the PTY render correctly.)
|
||||
if err := p.sendResize(cols, rows); err != nil {
|
||||
// non-fatal — daemon will just keep the original size
|
||||
fmt.Fprintf(p.stderr, "[multica] warning: initial resize failed: %v\n", err)
|
||||
}
|
||||
|
||||
rawTTY := !p.noRaw && term.IsTerminal(int(os.Stdin.Fd()))
|
||||
var restore func() error
|
||||
if rawTTY {
|
||||
oldState, err := term.MakeRaw(int(os.Stdin.Fd()))
|
||||
if err != nil {
|
||||
return fmt.Errorf("enter raw mode: %w", err)
|
||||
}
|
||||
fd := int(os.Stdin.Fd())
|
||||
restore = func() error { return term.Restore(fd, oldState) }
|
||||
defer restore()
|
||||
}
|
||||
|
||||
stopResize := startResizeWatcher(p)
|
||||
defer stopResize()
|
||||
|
||||
go p.readPump()
|
||||
go p.stdinPump(rawTTY)
|
||||
|
||||
select {
|
||||
case <-p.doneCh:
|
||||
case <-ctx.Done():
|
||||
p.shutdown()
|
||||
}
|
||||
|
||||
if restore != nil {
|
||||
_ = restore()
|
||||
}
|
||||
|
||||
if msgPtr := p.exitMsg.Load(); msgPtr != nil && *msgPtr != "" {
|
||||
fmt.Fprintln(p.stderr, *msgPtr)
|
||||
}
|
||||
if code := p.exitCode.Load(); code > 0 {
|
||||
os.Exit(int(code))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// handshake performs first-frame auth and waits for terminal.opened.
|
||||
func (p *cliTerminalProxy) handshake() error {
|
||||
authFrame, err := json.Marshal(struct {
|
||||
Type string `json:"type"`
|
||||
Payload map[string]any `json:"payload"`
|
||||
}{
|
||||
Type: "auth",
|
||||
Payload: map[string]any{"token": p.token},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal auth frame: %w", err)
|
||||
}
|
||||
if err := p.writeRawFrame(authFrame); err != nil {
|
||||
return fmt.Errorf("send auth frame: %w", err)
|
||||
}
|
||||
|
||||
deadline := time.Now().Add(terminalAuthAckTimeout)
|
||||
if err := p.conn.SetReadDeadline(deadline); err != nil {
|
||||
return fmt.Errorf("set auth read deadline: %w", err)
|
||||
}
|
||||
for {
|
||||
_, raw, err := p.conn.ReadMessage()
|
||||
if err != nil {
|
||||
return fmt.Errorf("read auth response: %w", err)
|
||||
}
|
||||
var preview struct {
|
||||
Type string `json:"type"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
if err := json.Unmarshal(raw, &preview); err == nil {
|
||||
if preview.Error != "" {
|
||||
return fmt.Errorf("auth rejected: %s", preview.Error)
|
||||
}
|
||||
if preview.Type == "auth_ack" {
|
||||
break
|
||||
}
|
||||
}
|
||||
// Tolerate stray frames during handshake (none expected in current
|
||||
// server implementation, but don't lock up if that changes).
|
||||
}
|
||||
|
||||
// After auth_ack the server proxies a terminal.open to the daemon and
|
||||
// waits for terminal.opened or terminal.error. Block until we see one.
|
||||
openDeadline := time.Now().Add(terminalOpenAckTimeout)
|
||||
if err := p.conn.SetReadDeadline(openDeadline); err != nil {
|
||||
return fmt.Errorf("set open read deadline: %w", err)
|
||||
}
|
||||
for {
|
||||
_, raw, err := p.conn.ReadMessage()
|
||||
if err != nil {
|
||||
return fmt.Errorf("waiting for terminal.opened: %w", err)
|
||||
}
|
||||
var env protocol.Message
|
||||
if err := json.Unmarshal(raw, &env); err != nil {
|
||||
continue
|
||||
}
|
||||
switch env.Type {
|
||||
case protocol.MessageTypeTerminalOpened:
|
||||
var op protocol.TerminalOpenedPayload
|
||||
if err := json.Unmarshal(env.Payload, &op); err != nil {
|
||||
return fmt.Errorf("decode terminal.opened: %w", err)
|
||||
}
|
||||
if op.SessionID == "" {
|
||||
return fmt.Errorf("daemon returned empty session_id in terminal.opened")
|
||||
}
|
||||
p.setSessionID(op.SessionID)
|
||||
workDir := op.WorkDir
|
||||
if workDir == "" {
|
||||
workDir = "(unknown)"
|
||||
}
|
||||
fmt.Fprintf(p.stderr, "[multica] attached to %s — escape: %s.\r\n", workDir, escapeHelpString(p.escapeChar))
|
||||
// Restore non-blocking reads for the pumps.
|
||||
if err := p.conn.SetReadDeadline(time.Time{}); err != nil {
|
||||
return fmt.Errorf("clear read deadline: %w", err)
|
||||
}
|
||||
return nil
|
||||
case protocol.MessageTypeTerminalError:
|
||||
var ep protocol.TerminalErrorPayload
|
||||
if err := json.Unmarshal(env.Payload, &ep); err != nil {
|
||||
return fmt.Errorf("daemon returned terminal.error (undecodable)")
|
||||
}
|
||||
return fmt.Errorf("daemon rejected terminal.open: %s (%s)", ep.Message, ep.Code)
|
||||
default:
|
||||
// keep waiting
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func escapeHelpString(b byte) string {
|
||||
if b == 0 {
|
||||
return "(disabled)"
|
||||
}
|
||||
return "<enter>" + string(b) + "."
|
||||
}
|
||||
|
||||
func (p *cliTerminalProxy) readPump() {
|
||||
defer p.shutdown()
|
||||
for {
|
||||
_, raw, err := p.conn.ReadMessage()
|
||||
if err != nil {
|
||||
if !isClosedConnError(err) {
|
||||
msg := fmt.Sprintf("[multica] websocket closed: %v", err)
|
||||
p.exitMsg.CompareAndSwap(nil, &msg)
|
||||
}
|
||||
return
|
||||
}
|
||||
var env protocol.Message
|
||||
if err := json.Unmarshal(raw, &env); err != nil {
|
||||
continue
|
||||
}
|
||||
switch env.Type {
|
||||
case protocol.MessageTypeTerminalData:
|
||||
var pl protocol.TerminalDataPayload
|
||||
if err := json.Unmarshal(env.Payload, &pl); err != nil {
|
||||
continue
|
||||
}
|
||||
data, err := base64.StdEncoding.DecodeString(pl.DataB64)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
_, _ = p.stdout.Write(data)
|
||||
case protocol.MessageTypeTerminalExit:
|
||||
var pl protocol.TerminalExitPayload
|
||||
if err := json.Unmarshal(env.Payload, &pl); err != nil {
|
||||
continue
|
||||
}
|
||||
reason := pl.Reason
|
||||
if reason == "" {
|
||||
reason = "child exited"
|
||||
}
|
||||
msg := fmt.Sprintf("\r\n[multica] %s (exit code %d)", reason, pl.ExitCode)
|
||||
p.exitMsg.CompareAndSwap(nil, &msg)
|
||||
if pl.ExitCode > 0 {
|
||||
p.exitCode.Store(int32(pl.ExitCode))
|
||||
}
|
||||
return
|
||||
case protocol.MessageTypeTerminalError:
|
||||
var pl protocol.TerminalErrorPayload
|
||||
if err := json.Unmarshal(env.Payload, &pl); err != nil {
|
||||
continue
|
||||
}
|
||||
msg := fmt.Sprintf("\r\n[multica] error: %s (%s)", pl.Message, pl.Code)
|
||||
p.exitMsg.CompareAndSwap(nil, &msg)
|
||||
p.exitCode.Store(1)
|
||||
return
|
||||
case protocol.MessageTypeTerminalClose:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// stdinPump reads stdin, runs it through the escape-sequence state machine,
|
||||
// and forwards bytes as terminal.data frames. Detach (~.) closes the WS
|
||||
// without sending the bytes.
|
||||
func (p *cliTerminalProxy) stdinPump(rawTTY bool) {
|
||||
defer p.shutdown()
|
||||
|
||||
buf := make([]byte, 4096)
|
||||
// Start in newline state so the very first character can trigger an
|
||||
// escape sequence; mirrors ssh's behavior.
|
||||
state := newlineState{atNewline: true}
|
||||
for {
|
||||
n, err := p.stdin.Read(buf)
|
||||
if n > 0 {
|
||||
toSend, detach := state.process(buf[:n], p.escapeChar)
|
||||
if len(toSend) > 0 {
|
||||
if err := p.sendData(toSend); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if detach {
|
||||
msg := terminalDetachExitMessage
|
||||
p.exitMsg.CompareAndSwap(nil, &msg)
|
||||
_ = p.sendCloseBestEffort("client_detach")
|
||||
return
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, io.EOF) {
|
||||
msg := fmt.Sprintf("[multica] stdin error: %v", err)
|
||||
p.exitMsg.CompareAndSwap(nil, &msg)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *cliTerminalProxy) sendData(data []byte) error {
|
||||
sid := p.SessionID()
|
||||
if sid == "" {
|
||||
return errors.New("session_id not set")
|
||||
}
|
||||
frame, err := marshalCLITerminalFrame(protocol.MessageTypeTerminalData, protocol.TerminalDataPayload{
|
||||
SessionID: sid,
|
||||
DataB64: base64.StdEncoding.EncodeToString(data),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.writeRawFrame(frame)
|
||||
}
|
||||
|
||||
func (p *cliTerminalProxy) sendResize(cols, rows uint16) error {
|
||||
sid := p.SessionID()
|
||||
if sid == "" {
|
||||
// Pre-handshake resize is sent later by run() once session is known.
|
||||
return nil
|
||||
}
|
||||
frame, err := marshalCLITerminalFrame(protocol.MessageTypeTerminalResize, protocol.TerminalResizePayload{
|
||||
SessionID: sid,
|
||||
Cols: cols,
|
||||
Rows: rows,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.writeRawFrame(frame)
|
||||
}
|
||||
|
||||
func (p *cliTerminalProxy) sendCloseBestEffort(reason string) error {
|
||||
sid := p.SessionID()
|
||||
if sid == "" {
|
||||
return nil
|
||||
}
|
||||
frame, err := marshalCLITerminalFrame(protocol.MessageTypeTerminalClose, protocol.TerminalClosePayload{
|
||||
SessionID: sid,
|
||||
Reason: reason,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.writeRawFrame(frame)
|
||||
}
|
||||
|
||||
func (p *cliTerminalProxy) writeRawFrame(frame []byte) error {
|
||||
p.writeMu.Lock()
|
||||
defer p.writeMu.Unlock()
|
||||
if err := p.conn.SetWriteDeadline(time.Now().Add(terminalServerWriteWait)); err != nil {
|
||||
return err
|
||||
}
|
||||
return p.conn.WriteMessage(websocket.TextMessage, frame)
|
||||
}
|
||||
|
||||
func (p *cliTerminalProxy) SessionID() string {
|
||||
p.sessionMu.RLock()
|
||||
defer p.sessionMu.RUnlock()
|
||||
return p.sessionID
|
||||
}
|
||||
|
||||
func (p *cliTerminalProxy) setSessionID(sid string) {
|
||||
p.sessionMu.Lock()
|
||||
defer p.sessionMu.Unlock()
|
||||
p.sessionID = sid
|
||||
}
|
||||
|
||||
func (p *cliTerminalProxy) shutdown() {
|
||||
p.closeOnce.Do(func() {
|
||||
close(p.doneCh)
|
||||
_ = p.conn.Close()
|
||||
})
|
||||
}
|
||||
|
||||
func marshalCLITerminalFrame(msgType string, payload any) ([]byte, error) {
|
||||
raw, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return json.Marshal(protocol.Message{Type: msgType, Payload: raw})
|
||||
}
|
||||
|
||||
func isClosedConnError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
if errors.Is(err, io.EOF) {
|
||||
return true
|
||||
}
|
||||
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// --- escape sequence state machine -----------------------------------------
|
||||
//
|
||||
// Mirrors ssh(1)'s `~.` detach: after a newline, a single escape character
|
||||
// followed by `.` detaches; `~~` emits a literal escape; `~?` prints help;
|
||||
// any other byte aborts the escape and forwards both bytes.
|
||||
|
||||
type newlineState struct {
|
||||
atNewline bool
|
||||
gotEscape bool
|
||||
}
|
||||
|
||||
// process consumes a chunk of stdin bytes. Returns the bytes that should
|
||||
// actually be forwarded to the daemon and whether the user requested detach.
|
||||
// The state machine mutates the receiver across calls so multi-byte chunks
|
||||
// straddling escape boundaries (rare, but possible with paste) work.
|
||||
func (s *newlineState) process(in []byte, escape byte) (out []byte, detach bool) {
|
||||
if escape == 0 {
|
||||
// Escape detection disabled — pass through.
|
||||
return in, false
|
||||
}
|
||||
out = make([]byte, 0, len(in))
|
||||
for _, b := range in {
|
||||
switch {
|
||||
case s.gotEscape:
|
||||
s.gotEscape = false
|
||||
switch b {
|
||||
case '.':
|
||||
return out, true
|
||||
case escape:
|
||||
out = append(out, escape)
|
||||
s.atNewline = false
|
||||
case '?':
|
||||
// Help is a local-only signal — not delivered to PTY.
|
||||
// Caller can detect by … actually keep it simple: just
|
||||
// emit a CR for visual feedback so the prompt redraws.
|
||||
out = append(out, '\r')
|
||||
s.atNewline = true
|
||||
default:
|
||||
// Not a recognized escape: forward ESC then this byte.
|
||||
out = append(out, escape, b)
|
||||
s.atNewline = b == '\r' || b == '\n'
|
||||
}
|
||||
case s.atNewline && b == escape:
|
||||
s.gotEscape = true
|
||||
default:
|
||||
out = append(out, b)
|
||||
s.atNewline = b == '\r' || b == '\n'
|
||||
}
|
||||
}
|
||||
return out, false
|
||||
}
|
||||
|
||||
526
server/cmd/multica/cmd_issue_terminal_test.go
Normal file
526
server/cmd/multica/cmd_issue_terminal_test.go
Normal file
@@ -0,0 +1,526 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
func TestEscapeState_DetachOnFreshLine(t *testing.T) {
|
||||
s := &newlineState{atNewline: true}
|
||||
out, detach := s.process([]byte("~."), '~')
|
||||
if !detach {
|
||||
t.Fatalf("expected detach")
|
||||
}
|
||||
if len(out) != 0 {
|
||||
t.Fatalf("expected no bytes forwarded, got %q", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEscapeState_TildeNotAfterNewlineIsLiteral(t *testing.T) {
|
||||
s := &newlineState{atNewline: false}
|
||||
out, detach := s.process([]byte("foo~.bar"), '~')
|
||||
if detach {
|
||||
t.Fatalf("must not detach when ~ is mid-line")
|
||||
}
|
||||
if string(out) != "foo~.bar" {
|
||||
t.Fatalf("got %q", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEscapeState_DoubleTildeEmitsLiteral(t *testing.T) {
|
||||
s := &newlineState{atNewline: true}
|
||||
out, detach := s.process([]byte("~~"), '~')
|
||||
if detach {
|
||||
t.Fatalf("~~ must not detach")
|
||||
}
|
||||
if string(out) != "~" {
|
||||
t.Fatalf("got %q want ~", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEscapeState_StraddledChunks(t *testing.T) {
|
||||
// User pastes/types ~ and . in two separate stdin reads — escape
|
||||
// detection still works because state is preserved across calls.
|
||||
s := &newlineState{atNewline: true}
|
||||
out1, detach1 := s.process([]byte("~"), '~')
|
||||
if detach1 || len(out1) != 0 {
|
||||
t.Fatalf("first chunk: detach=%v out=%q", detach1, out1)
|
||||
}
|
||||
out2, detach2 := s.process([]byte("."), '~')
|
||||
if !detach2 {
|
||||
t.Fatalf("expected detach on second chunk")
|
||||
}
|
||||
if len(out2) != 0 {
|
||||
t.Fatalf("second chunk should forward nothing, got %q", out2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEscapeState_DisabledWhenEscapeIsZero(t *testing.T) {
|
||||
s := &newlineState{atNewline: true}
|
||||
out, detach := s.process([]byte("~."), 0)
|
||||
if detach {
|
||||
t.Fatalf("disabled escape must not detach")
|
||||
}
|
||||
if string(out) != "~." {
|
||||
t.Fatalf("got %q want ~.", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEscapeState_UnknownEscapeForwardsBoth(t *testing.T) {
|
||||
s := &newlineState{atNewline: true}
|
||||
out, _ := s.process([]byte("~x"), '~')
|
||||
if string(out) != "~x" {
|
||||
t.Fatalf("got %q want ~x", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildTerminalPathAndQuery(t *testing.T) {
|
||||
got := buildTerminalPathAndQuery("MUL-2295", "ws-uuid", 120, 40)
|
||||
u, err := url.Parse("http://x" + got)
|
||||
if err != nil {
|
||||
t.Fatalf("parse: %v", err)
|
||||
}
|
||||
if u.Path != "/ws/issues/MUL-2295/terminal" {
|
||||
t.Errorf("path = %q", u.Path)
|
||||
}
|
||||
q := u.Query()
|
||||
if q.Get("workspace_id") != "ws-uuid" {
|
||||
t.Errorf("workspace_id = %q", q.Get("workspace_id"))
|
||||
}
|
||||
if q.Get("cols") != "120" {
|
||||
t.Errorf("cols = %q", q.Get("cols"))
|
||||
}
|
||||
if q.Get("rows") != "40" {
|
||||
t.Errorf("rows = %q", q.Get("rows"))
|
||||
}
|
||||
}
|
||||
|
||||
// fakeServer simulates the Phase 2 /ws/issues/{id}/terminal handshake plus
|
||||
// a tiny echo loop, so we can drive the CLI proxy through its full lifecycle
|
||||
// in-process without spinning up the real daemon.
|
||||
type fakeServer struct {
|
||||
t *testing.T
|
||||
upgrader websocket.Upgrader
|
||||
gotAuth chan string
|
||||
gotData chan []byte
|
||||
gotClose chan string
|
||||
sessionID string
|
||||
server *httptest.Server
|
||||
connMu sync.Mutex
|
||||
conn *websocket.Conn
|
||||
sendOpenErr *protocol.TerminalErrorPayload // if set, send terminal.error instead of terminal.opened
|
||||
}
|
||||
|
||||
// writeFrame serializes writes from the handler goroutine and any test
|
||||
// goroutine that wants to push a frame to the connected client. Required
|
||||
// because gorilla/websocket allows concurrent read+write but NOT concurrent
|
||||
// writes from different goroutines.
|
||||
func (fs *fakeServer) writeFrame(frame []byte) error {
|
||||
fs.connMu.Lock()
|
||||
defer fs.connMu.Unlock()
|
||||
if fs.conn == nil {
|
||||
return fmt.Errorf("no client")
|
||||
}
|
||||
return fs.conn.WriteMessage(websocket.TextMessage, frame)
|
||||
}
|
||||
|
||||
func newFakeServer(t *testing.T) *fakeServer {
|
||||
fs := &fakeServer{
|
||||
t: t,
|
||||
upgrader: websocket.Upgrader{},
|
||||
gotAuth: make(chan string, 1),
|
||||
gotData: make(chan []byte, 32),
|
||||
gotClose: make(chan string, 1),
|
||||
sessionID: "session-xyz",
|
||||
}
|
||||
fs.server = httptest.NewServer(http.HandlerFunc(fs.handle))
|
||||
return fs
|
||||
}
|
||||
|
||||
func (fs *fakeServer) close() {
|
||||
fs.connMu.Lock()
|
||||
c := fs.conn
|
||||
fs.connMu.Unlock()
|
||||
if c != nil {
|
||||
c.Close()
|
||||
}
|
||||
fs.server.Close()
|
||||
}
|
||||
|
||||
func (fs *fakeServer) baseURL() string { return fs.server.URL }
|
||||
|
||||
func (fs *fakeServer) handle(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := fs.upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
fs.t.Errorf("upgrade: %v", err)
|
||||
return
|
||||
}
|
||||
fs.connMu.Lock()
|
||||
fs.conn = conn
|
||||
fs.connMu.Unlock()
|
||||
|
||||
// 1. Auth.
|
||||
_, raw, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var auth struct {
|
||||
Type string `json:"type"`
|
||||
Payload map[string]any `json:"payload"`
|
||||
}
|
||||
if err := json.Unmarshal(raw, &auth); err != nil || auth.Type != "auth" {
|
||||
_ = fs.writeFrame([]byte(`{"error":"bad auth"}`))
|
||||
return
|
||||
}
|
||||
tok, _ := auth.Payload["token"].(string)
|
||||
fs.gotAuth <- tok
|
||||
_ = fs.writeFrame([]byte(`{"type":"auth_ack"}`))
|
||||
|
||||
// 2. Open ack.
|
||||
if fs.sendOpenErr != nil {
|
||||
ep := *fs.sendOpenErr
|
||||
frame, _ := marshalCLITerminalFrame(protocol.MessageTypeTerminalError, ep)
|
||||
_ = fs.writeFrame(frame)
|
||||
return
|
||||
}
|
||||
openedFrame, _ := marshalCLITerminalFrame(protocol.MessageTypeTerminalOpened, protocol.TerminalOpenedPayload{
|
||||
SessionID: fs.sessionID,
|
||||
WorkDir: "/tmp/work",
|
||||
Shell: "/bin/bash",
|
||||
})
|
||||
_ = fs.writeFrame(openedFrame)
|
||||
|
||||
// 3. Pump.
|
||||
for {
|
||||
_, raw, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var env protocol.Message
|
||||
if err := json.Unmarshal(raw, &env); err != nil {
|
||||
continue
|
||||
}
|
||||
switch env.Type {
|
||||
case protocol.MessageTypeTerminalData:
|
||||
var pl protocol.TerminalDataPayload
|
||||
if err := json.Unmarshal(env.Payload, &pl); err != nil {
|
||||
continue
|
||||
}
|
||||
data, _ := base64.StdEncoding.DecodeString(pl.DataB64)
|
||||
fs.gotData <- data
|
||||
// Echo back so the CLI's stdout pump has something to do.
|
||||
echo, _ := marshalCLITerminalFrame(protocol.MessageTypeTerminalData, protocol.TerminalDataPayload{
|
||||
SessionID: fs.sessionID,
|
||||
DataB64: pl.DataB64,
|
||||
})
|
||||
_ = fs.writeFrame(echo)
|
||||
case protocol.MessageTypeTerminalClose:
|
||||
var pl protocol.TerminalClosePayload
|
||||
_ = json.Unmarshal(env.Payload, &pl)
|
||||
fs.gotClose <- pl.Reason
|
||||
return
|
||||
case protocol.MessageTypeTerminalResize:
|
||||
// observed but unused in this fake
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newTestCmd() *cobra.Command {
|
||||
c := &cobra.Command{}
|
||||
c.Flags().String("escape-char", "~", "")
|
||||
c.Flags().Bool("no-raw", true, "")
|
||||
return c
|
||||
}
|
||||
|
||||
func TestCLITerminalProxy_HandshakeAndEcho(t *testing.T) {
|
||||
fs := newFakeServer(t)
|
||||
defer fs.close()
|
||||
|
||||
wsURL := strings.Replace(fs.baseURL(), "http://", "ws://", 1) + "/"
|
||||
dialer := *websocket.DefaultDialer
|
||||
conn, _, err := dialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial: %v", err)
|
||||
}
|
||||
|
||||
stdinR, stdinW := io.Pipe()
|
||||
stdout := newSafeBuffer()
|
||||
stderr := newSafeBuffer()
|
||||
|
||||
cmd := newTestCmd()
|
||||
p := newCLITerminalProxy(conn, stdinR, stdout, stderr, "mul_test", cmd)
|
||||
|
||||
// Drive handshake explicitly so we can also assert the auth token reached
|
||||
// the fake server.
|
||||
if err := p.handshake(); err != nil {
|
||||
t.Fatalf("handshake: %v", err)
|
||||
}
|
||||
select {
|
||||
case got := <-fs.gotAuth:
|
||||
if got != "mul_test" {
|
||||
t.Errorf("auth token = %q, want mul_test", got)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("server did not receive auth frame")
|
||||
}
|
||||
if p.SessionID() != fs.sessionID {
|
||||
t.Fatalf("session_id = %q, want %q", p.SessionID(), fs.sessionID)
|
||||
}
|
||||
|
||||
// Now run the pumps in a goroutine.
|
||||
pumpsDone := make(chan struct{})
|
||||
go func() {
|
||||
go p.readPump()
|
||||
p.stdinPump(false)
|
||||
close(pumpsDone)
|
||||
}()
|
||||
|
||||
// Send "hello" through stdin; expect server to receive it and echo it
|
||||
// back into stdout.
|
||||
if _, err := stdinW.Write([]byte("hello")); err != nil {
|
||||
t.Fatalf("stdin write: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case got := <-fs.gotData:
|
||||
if string(got) != "hello" {
|
||||
t.Fatalf("server got %q, want hello", got)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("server did not receive data")
|
||||
}
|
||||
|
||||
// Wait for the echo to land in stdout.
|
||||
deadline := time.Now().Add(2 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
if strings.Contains(stdout.String(), "hello") {
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
if !strings.Contains(stdout.String(), "hello") {
|
||||
t.Fatalf("stdout missing echo, got %q", stdout.String())
|
||||
}
|
||||
|
||||
// Trigger detach: send "\n~." after a newline. Because stdinPump starts
|
||||
// the state machine at atNewline=true on the very first byte, we need
|
||||
// to walk through a real newline first to make the test realistic.
|
||||
if _, err := stdinW.Write([]byte("\n~.")); err != nil {
|
||||
t.Fatalf("stdin write detach: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-pumpsDone:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("stdin pump did not exit after detach")
|
||||
}
|
||||
|
||||
select {
|
||||
case reason := <-fs.gotClose:
|
||||
if reason != "client_detach" {
|
||||
t.Errorf("close reason = %q, want client_detach", reason)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("server did not receive terminal.close on detach")
|
||||
}
|
||||
|
||||
// run() prints the exit message to stderr; in this lower-level test we
|
||||
// drive the pumps directly, so check the captured exit message.
|
||||
msgPtr := p.exitMsg.Load()
|
||||
if msgPtr == nil || !strings.Contains(*msgPtr, "detached") {
|
||||
got := ""
|
||||
if msgPtr != nil {
|
||||
got = *msgPtr
|
||||
}
|
||||
t.Errorf("exit msg = %q, want detach text", got)
|
||||
}
|
||||
}
|
||||
|
||||
// safeBuffer is a tiny mutex-wrapped bytes.Buffer for tests that read from
|
||||
// the buffer in one goroutine while another writes (race-detector-clean).
|
||||
type safeBuffer struct {
|
||||
mu sync.Mutex
|
||||
buf bytes.Buffer
|
||||
}
|
||||
|
||||
func newSafeBuffer() *safeBuffer { return &safeBuffer{} }
|
||||
|
||||
func (b *safeBuffer) Write(p []byte) (int, error) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
return b.buf.Write(p)
|
||||
}
|
||||
|
||||
func (b *safeBuffer) String() string {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
return b.buf.String()
|
||||
}
|
||||
|
||||
func TestCLITerminalProxy_HandshakeRejectedOnTerminalError(t *testing.T) {
|
||||
fs := newFakeServer(t)
|
||||
fs.sendOpenErr = &protocol.TerminalErrorPayload{
|
||||
Code: protocol.TerminalErrorCodeTaskNotFound,
|
||||
Message: "no agent task on this issue",
|
||||
}
|
||||
defer fs.close()
|
||||
|
||||
wsURL := strings.Replace(fs.baseURL(), "http://", "ws://", 1) + "/"
|
||||
dialer := *websocket.DefaultDialer
|
||||
conn, _, err := dialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial: %v", err)
|
||||
}
|
||||
|
||||
cmd := newTestCmd()
|
||||
p := newCLITerminalProxy(conn, strings.NewReader(""), io.Discard, io.Discard, "mul_test", cmd)
|
||||
err = p.handshake()
|
||||
if err == nil {
|
||||
t.Fatal("expected handshake error, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), protocol.TerminalErrorCodeTaskNotFound) {
|
||||
t.Errorf("error %q does not mention error code", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCLITerminalProxy_AuthRejected(t *testing.T) {
|
||||
upgrader := websocket.Upgrader{}
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
// Read auth frame, reply with error.
|
||||
_, _, _ = conn.ReadMessage()
|
||||
_ = conn.WriteMessage(websocket.TextMessage, []byte(`{"error":"invalid token"}`))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
wsURL := strings.Replace(server.URL, "http://", "ws://", 1) + "/"
|
||||
dialer := *websocket.DefaultDialer
|
||||
conn, _, err := dialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial: %v", err)
|
||||
}
|
||||
|
||||
cmd := newTestCmd()
|
||||
p := newCLITerminalProxy(conn, strings.NewReader(""), io.Discard, io.Discard, "mul_test", cmd)
|
||||
err = p.handshake()
|
||||
if err == nil {
|
||||
t.Fatal("expected handshake error, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "invalid token") {
|
||||
t.Errorf("error %q does not surface server reason", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCLITerminalProxy_TerminalExitDeliversCode(t *testing.T) {
|
||||
// Driver: open server, advance through handshake, then push a
|
||||
// terminal.exit frame and verify the proxy's exit code state.
|
||||
fs := newFakeServer(t)
|
||||
defer fs.close()
|
||||
|
||||
wsURL := strings.Replace(fs.baseURL(), "http://", "ws://", 1) + "/"
|
||||
dialer := *websocket.DefaultDialer
|
||||
conn, _, err := dialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial: %v", err)
|
||||
}
|
||||
|
||||
cmd := newTestCmd()
|
||||
p := newCLITerminalProxy(conn, strings.NewReader(""), io.Discard, io.Discard, "mul_test", cmd)
|
||||
if err := p.handshake(); err != nil {
|
||||
t.Fatalf("handshake: %v", err)
|
||||
}
|
||||
|
||||
exitFrame, _ := marshalCLITerminalFrame(protocol.MessageTypeTerminalExit, protocol.TerminalExitPayload{
|
||||
SessionID: fs.sessionID,
|
||||
ExitCode: 42,
|
||||
Reason: "child exited",
|
||||
})
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
p.readPump()
|
||||
}()
|
||||
if err := fs.writeFrame(exitFrame); err != nil {
|
||||
t.Fatalf("server write exit: %v", err)
|
||||
}
|
||||
|
||||
doneAt := time.Now().Add(2 * time.Second)
|
||||
for time.Now().Before(doneAt) {
|
||||
if p.exitCode.Load() == 42 {
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
wg.Wait()
|
||||
if got := p.exitCode.Load(); got != 42 {
|
||||
t.Fatalf("exit code = %d, want 42", got)
|
||||
}
|
||||
msgPtr := p.exitMsg.Load()
|
||||
if msgPtr == nil || !strings.Contains(*msgPtr, "exit code 42") {
|
||||
got := ""
|
||||
if msgPtr != nil {
|
||||
got = *msgPtr
|
||||
}
|
||||
t.Errorf("exit msg = %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
// Compile-time check: ensure the marshaled frame round-trips through the
|
||||
// real protocol.Message envelope. Catches any drift if the protocol pkg
|
||||
// renames a field.
|
||||
func TestMarshalCLITerminalFrame_EnvelopeShape(t *testing.T) {
|
||||
frame, err := marshalCLITerminalFrame(protocol.MessageTypeTerminalResize, protocol.TerminalResizePayload{
|
||||
SessionID: "sid",
|
||||
Cols: 100,
|
||||
Rows: 30,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var env protocol.Message
|
||||
if err := json.Unmarshal(frame, &env); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if env.Type != protocol.MessageTypeTerminalResize {
|
||||
t.Fatalf("type = %q", env.Type)
|
||||
}
|
||||
var pl protocol.TerminalResizePayload
|
||||
if err := json.Unmarshal(env.Payload, &pl); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if pl.Cols != 100 || pl.Rows != 30 || pl.SessionID != "sid" {
|
||||
t.Fatalf("payload = %+v", pl)
|
||||
}
|
||||
}
|
||||
|
||||
// Sanity check the help string does not crash on a zero escape byte.
|
||||
func TestEscapeHelpString(t *testing.T) {
|
||||
if got := escapeHelpString(0); got != "(disabled)" {
|
||||
t.Errorf("escape disabled hint = %q", got)
|
||||
}
|
||||
if got := escapeHelpString('~'); !strings.Contains(got, "~") {
|
||||
t.Errorf("escape help = %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
40
server/cmd/multica/cmd_issue_terminal_unix.go
Normal file
40
server/cmd/multica/cmd_issue_terminal_unix.go
Normal file
@@ -0,0 +1,40 @@
|
||||
//go:build !windows
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"golang.org/x/term"
|
||||
)
|
||||
|
||||
// startResizeWatcher installs a SIGWINCH handler that pushes the new local
|
||||
// terminal size to the daemon every time the user resizes their window.
|
||||
// Returns a stop function that uninstalls the handler and exits the
|
||||
// goroutine. On platforms without SIGWINCH (Windows) the windows-tagged
|
||||
// implementation polls term.GetSize on a timer instead.
|
||||
func startResizeWatcher(p *cliTerminalProxy) func() {
|
||||
ch := make(chan os.Signal, 1)
|
||||
signal.Notify(ch, syscall.SIGWINCH)
|
||||
stop := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ch:
|
||||
if c, r, err := term.GetSize(int(os.Stdout.Fd())); err == nil && c > 0 && r > 0 {
|
||||
_ = p.sendResize(uint16(c), uint16(r))
|
||||
}
|
||||
case <-stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return func() {
|
||||
signal.Stop(ch)
|
||||
close(stop)
|
||||
}
|
||||
}
|
||||
39
server/cmd/multica/cmd_issue_terminal_windows.go
Normal file
39
server/cmd/multica/cmd_issue_terminal_windows.go
Normal file
@@ -0,0 +1,39 @@
|
||||
//go:build windows
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"golang.org/x/term"
|
||||
)
|
||||
|
||||
// startResizeWatcher polls the local terminal size on a timer, since
|
||||
// Windows has no SIGWINCH equivalent that is reliable for console resize
|
||||
// events. 500ms is a compromise between responsiveness and CPU cost.
|
||||
func startResizeWatcher(p *cliTerminalProxy) func() {
|
||||
stop := make(chan struct{})
|
||||
go func() {
|
||||
var lastC, lastR int
|
||||
t := time.NewTicker(500 * time.Millisecond)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case <-t.C:
|
||||
c, r, err := term.GetSize(int(os.Stdout.Fd()))
|
||||
if err != nil || c <= 0 || r <= 0 {
|
||||
continue
|
||||
}
|
||||
if c == lastC && r == lastR {
|
||||
continue
|
||||
}
|
||||
lastC, lastR = c, r
|
||||
_ = p.sendResize(uint16(c), uint16(r))
|
||||
}
|
||||
}
|
||||
}()
|
||||
return func() { close(stop) }
|
||||
}
|
||||
@@ -197,6 +197,13 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
realtime.HandleWebSocket(hub, mc, pr, slugResolver, w, r)
|
||||
})
|
||||
|
||||
// Terminal proxy WebSocket: browser ↔ server ↔ daemonws hub ↔ daemon PTY.
|
||||
// Auth is cookie-or-first-frame (browsers can't set Authorization on a
|
||||
// WS upgrade), matching the /ws pattern above. Workspace + issue
|
||||
// membership is enforced inside the handler before the upgrade so a
|
||||
// 403 surfaces as an HTTP response rather than a silent WS close.
|
||||
r.Get("/ws/issues/{issue_id}/terminal", h.HandleIssueTerminalWS)
|
||||
|
||||
// Local file serving (when using local storage)
|
||||
if local, ok := store.(*storage.LocalStorage); ok {
|
||||
r.Get("/uploads/*", func(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -352,6 +359,7 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
r.Post("/tasks/{taskId}/cancel", h.CancelTask)
|
||||
r.Post("/rerun", h.RerunIssue)
|
||||
r.Get("/task-runs", h.ListTasksByIssue)
|
||||
r.Get("/terminal-sessions", h.ListTerminalSessionsByIssue)
|
||||
r.Get("/usage", h.GetIssueUsage)
|
||||
r.Post("/reactions", h.AddIssueReaction)
|
||||
r.Delete("/reactions", h.RemoveIssueReaction)
|
||||
|
||||
@@ -8,6 +8,7 @@ require (
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.19.13
|
||||
github.com/aws/aws-sdk-go-v2/service/s3 v1.97.3
|
||||
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.5
|
||||
github.com/creack/pty v1.1.21
|
||||
github.com/go-chi/chi/v5 v5.2.5
|
||||
github.com/go-chi/cors v1.2.2
|
||||
github.com/golang-jwt/jwt/v5 v5.3.1
|
||||
@@ -23,6 +24,7 @@ require (
|
||||
github.com/resend/resend-go/v2 v2.28.0
|
||||
github.com/robfig/cron/v3 v3.0.1
|
||||
github.com/spf13/cobra v1.10.2
|
||||
golang.org/x/term v0.43.0
|
||||
)
|
||||
|
||||
require (
|
||||
@@ -57,7 +59,7 @@ require (
|
||||
go.uber.org/atomic v1.11.0 // indirect
|
||||
go.yaml.in/yaml/v2 v2.4.2 // indirect
|
||||
golang.org/x/sync v0.20.0 // indirect
|
||||
golang.org/x/sys v0.35.0 // indirect
|
||||
golang.org/x/sys v0.44.0 // indirect
|
||||
golang.org/x/text v0.35.0 // indirect
|
||||
google.golang.org/protobuf v1.36.8 // indirect
|
||||
)
|
||||
|
||||
@@ -48,6 +48,8 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/creack/pty v1.1.21 h1:1/QdRyBaHHJP61QkWMXlOIBfsgdDeeKfK8SYVUWJKf0=
|
||||
github.com/creack/pty v1.1.21/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
@@ -135,8 +137,10 @@ go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
|
||||
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
|
||||
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
|
||||
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
|
||||
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
|
||||
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
|
||||
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/term v0.43.0 h1:S4RLU2sB31O/NCl+zFN9Aru9A/Cq2aqKpTZJ6B+DwT4=
|
||||
golang.org/x/term v0.43.0/go.mod h1:lrhlHNdQJHO+1qVYiHfFKVuVioJIheAc3fBSMFYEIsk=
|
||||
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
|
||||
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
|
||||
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
|
||||
|
||||
@@ -133,7 +133,12 @@ func (c *APIClient) GetJSON(ctx context.Context, path string, out any) error {
|
||||
|
||||
if resp.StatusCode >= 400 {
|
||||
data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
||||
return fmt.Errorf("GET %s returned %d: %s", path, resp.StatusCode, strings.TrimSpace(string(data)))
|
||||
return &HTTPError{
|
||||
Method: http.MethodGet,
|
||||
Path: path,
|
||||
StatusCode: resp.StatusCode,
|
||||
Body: strings.TrimSpace(string(data)),
|
||||
}
|
||||
}
|
||||
if out == nil {
|
||||
return nil
|
||||
@@ -159,7 +164,12 @@ func (c *APIClient) GetJSONWithHeaders(ctx context.Context, path string, out any
|
||||
|
||||
if resp.StatusCode >= 400 {
|
||||
data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
||||
return nil, fmt.Errorf("GET %s returned %d: %s", path, resp.StatusCode, strings.TrimSpace(string(data)))
|
||||
return nil, &HTTPError{
|
||||
Method: http.MethodGet,
|
||||
Path: path,
|
||||
StatusCode: resp.StatusCode,
|
||||
Body: strings.TrimSpace(string(data)),
|
||||
}
|
||||
}
|
||||
if out != nil {
|
||||
if err := json.NewDecoder(resp.Body).Decode(out); err != nil {
|
||||
|
||||
97
server/internal/cli/ws.go
Normal file
97
server/internal/cli/ws.go
Normal file
@@ -0,0 +1,97 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
// DialWebSocket opens a WebSocket connection to the server at the given path
|
||||
// + query string. The path must start with "/". Auth is intentionally NOT
|
||||
// sent as a header here: the server's terminal endpoint runs WS upgrade
|
||||
// before applying header-based auth middleware (browsers cannot set
|
||||
// Authorization on a WS upgrade), so the caller authenticates via the
|
||||
// first-frame `auth` message instead. The standard X-Workspace-ID /
|
||||
// X-Client-* identity headers are still attached so dashboards can attribute
|
||||
// the connection to the right CLI build.
|
||||
func (c *APIClient) DialWebSocket(ctx context.Context, pathAndQuery string) (*websocket.Conn, *http.Response, error) {
|
||||
if c.BaseURL == "" {
|
||||
return nil, nil, fmt.Errorf("APIClient has no BaseURL")
|
||||
}
|
||||
wsURL, err := httpToWSURL(c.BaseURL, pathAndQuery)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
header := http.Header{}
|
||||
c.setWSHeaders(header)
|
||||
|
||||
dialer := *websocket.DefaultDialer
|
||||
conn, resp, err := dialer.DialContext(ctx, wsURL, header)
|
||||
if err != nil {
|
||||
return nil, resp, err
|
||||
}
|
||||
return conn, resp, nil
|
||||
}
|
||||
|
||||
// setWSHeaders attaches identity headers but deliberately omits the
|
||||
// Authorization header. Auth happens in-band via the first frame so this
|
||||
// stays consistent with cookie-based browser clients.
|
||||
func (c *APIClient) setWSHeaders(h http.Header) {
|
||||
if c.WorkspaceID != "" {
|
||||
h.Set("X-Workspace-ID", c.WorkspaceID)
|
||||
}
|
||||
platform := c.Platform
|
||||
if platform == "" {
|
||||
platform = ClientPlatform
|
||||
}
|
||||
if platform != "" {
|
||||
h.Set("X-Client-Platform", platform)
|
||||
}
|
||||
version := c.Version
|
||||
if version == "" {
|
||||
version = ClientVersion
|
||||
}
|
||||
if version != "" {
|
||||
h.Set("X-Client-Version", version)
|
||||
}
|
||||
osName := c.OS
|
||||
if osName == "" {
|
||||
osName = ClientOS
|
||||
}
|
||||
if osName != "" {
|
||||
h.Set("X-Client-OS", osName)
|
||||
}
|
||||
}
|
||||
|
||||
func httpToWSURL(baseURL, pathAndQuery string) (string, error) {
|
||||
u, err := url.Parse(baseURL)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("parse base URL: %w", err)
|
||||
}
|
||||
switch strings.ToLower(u.Scheme) {
|
||||
case "http":
|
||||
u.Scheme = "ws"
|
||||
case "https":
|
||||
u.Scheme = "wss"
|
||||
case "ws", "wss":
|
||||
// already WS
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported base URL scheme %q", u.Scheme)
|
||||
}
|
||||
if !strings.HasPrefix(pathAndQuery, "/") {
|
||||
return "", fmt.Errorf("path must start with /, got %q", pathAndQuery)
|
||||
}
|
||||
suffix, err := url.Parse(pathAndQuery)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("parse path/query: %w", err)
|
||||
}
|
||||
u.Path = strings.TrimRight(u.Path, "/") + suffix.Path
|
||||
u.RawQuery = suffix.RawQuery
|
||||
u.Fragment = ""
|
||||
return u.String(), nil
|
||||
}
|
||||
125
server/internal/cli/ws_test.go
Normal file
125
server/internal/cli/ws_test.go
Normal file
@@ -0,0 +1,125 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
func TestHTTPToWSURL(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
base string
|
||||
path string
|
||||
want string
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "https → wss",
|
||||
base: "https://api.example.com",
|
||||
path: "/ws/issues/abc/terminal?workspace_id=ws1&cols=80",
|
||||
want: "wss://api.example.com/ws/issues/abc/terminal?workspace_id=ws1&cols=80",
|
||||
},
|
||||
{
|
||||
name: "http → ws",
|
||||
base: "http://localhost:8080",
|
||||
path: "/ws/issues/x/terminal",
|
||||
want: "ws://localhost:8080/ws/issues/x/terminal",
|
||||
},
|
||||
{
|
||||
name: "wss left alone",
|
||||
base: "wss://api.example.com",
|
||||
path: "/ws",
|
||||
want: "wss://api.example.com/ws",
|
||||
},
|
||||
{
|
||||
name: "trailing slash on base preserved correctly",
|
||||
base: "https://api.example.com/",
|
||||
path: "/ws/x",
|
||||
want: "wss://api.example.com/ws/x",
|
||||
},
|
||||
{
|
||||
name: "missing leading slash on path",
|
||||
base: "https://api.example.com",
|
||||
path: "ws/x",
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "unsupported scheme",
|
||||
base: "ftp://example.com",
|
||||
path: "/ws",
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got, err := httpToWSURL(tc.base, tc.path)
|
||||
if tc.wantErr {
|
||||
if err == nil {
|
||||
t.Fatalf("expected error, got %q", got)
|
||||
}
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if got != tc.want {
|
||||
t.Fatalf("got %q want %q", got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDialWebSocketAttachesIdentityHeaders(t *testing.T) {
|
||||
upgrader := websocket.Upgrader{}
|
||||
gotHeaders := make(chan http.Header, 1)
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
gotHeaders <- r.Header.Clone()
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
conn.Close()
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewAPIClient(server.URL, "ws-uuid", "mul_test_token")
|
||||
client.Platform = "cli"
|
||||
client.Version = "1.2.3"
|
||||
client.OS = "macos"
|
||||
|
||||
conn, _, err := client.DialWebSocket(context.Background(), "/ws")
|
||||
if err != nil {
|
||||
t.Fatalf("dial: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
headers := <-gotHeaders
|
||||
if got := headers.Get("X-Workspace-ID"); got != "ws-uuid" {
|
||||
t.Errorf("X-Workspace-ID = %q, want ws-uuid", got)
|
||||
}
|
||||
if got := headers.Get("X-Client-Platform"); got != "cli" {
|
||||
t.Errorf("X-Client-Platform = %q, want cli", got)
|
||||
}
|
||||
if got := headers.Get("X-Client-Version"); got != "1.2.3" {
|
||||
t.Errorf("X-Client-Version = %q, want 1.2.3", got)
|
||||
}
|
||||
if got := headers.Get("X-Client-OS"); got != "macos" {
|
||||
t.Errorf("X-Client-OS = %q, want macos", got)
|
||||
}
|
||||
if got := headers.Get("Authorization"); got != "" {
|
||||
// The server's terminal endpoint runs WS upgrade before any header
|
||||
// auth middleware, so the CLI must authenticate via the first frame
|
||||
// to match cookie-based browser clients. Sending a Bearer header
|
||||
// here would silently work in some setups and silently fail in
|
||||
// others — keep it consistent and absent.
|
||||
t.Errorf("Authorization header should NOT be set on WS dial, got %q", got)
|
||||
}
|
||||
if got := headers.Get("Sec-WebSocket-Key"); !strings.HasPrefix(strings.TrimSpace(got), "") || got == "" {
|
||||
t.Errorf("Sec-WebSocket-Key missing")
|
||||
}
|
||||
}
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"github.com/multica-ai/multica/server/internal/cli"
|
||||
"github.com/multica-ai/multica/server/internal/daemon/execenv"
|
||||
"github.com/multica-ai/multica/server/internal/daemon/repocache"
|
||||
"github.com/multica-ai/multica/server/internal/daemon/terminal"
|
||||
"github.com/multica-ai/multica/server/pkg/agent"
|
||||
)
|
||||
|
||||
@@ -138,6 +139,24 @@ type Daemon struct {
|
||||
// deleted bare clone and an unrelated `not empty` cleanup failure.
|
||||
bgSyncs sync.WaitGroup
|
||||
|
||||
// terminalManager owns every live PTY this daemon hosts on behalf of
|
||||
// users opening the Issue → Terminal panel. Constructed in New() so
|
||||
// tests can override the manager config via cfg fields if needed
|
||||
// without forcing the production daemon to wire it lazily.
|
||||
terminalManager *terminal.Manager
|
||||
// terminalBridge mediates between the WS hub (server-side) and the
|
||||
// terminalManager. Only non-nil while a daemonws connection is up; the
|
||||
// wakeup loop swaps in a fresh bridge on every reconnect because
|
||||
// session_ids cannot survive a hub disconnect.
|
||||
terminalBridgeMu sync.RWMutex
|
||||
terminalBridge *terminalBridge
|
||||
// wsWritesMu guards wsWrites. wsWrites is the active daemonws writer
|
||||
// queue (nil while disconnected); the terminalBridge funnels every
|
||||
// outbound terminal.* frame through Daemon.sendWSFrame, which reads
|
||||
// this pointer under the lock so reconnects don't race the bridge.
|
||||
wsWritesMu sync.RWMutex
|
||||
wsWrites chan<- []byte
|
||||
|
||||
runner taskRunner // executes agent tasks; set to d.runTask by New(), overridable in tests
|
||||
cancelPollInterval time.Duration // how often handleTask polls for server-side cancellation; overridable in tests
|
||||
// runUpdateFn executes the brew-or-download upgrade. Set to d.runUpdate by
|
||||
@@ -171,9 +190,113 @@ func New(cfg Config, logger *slog.Logger) *Daemon {
|
||||
}
|
||||
d.runner = taskRunnerFunc(d.runTask)
|
||||
d.runUpdateFn = d.runUpdate
|
||||
// The terminal manager has no TaskLookup wired: every Open call goes
|
||||
// through OpenWithInfo using a TaskInfo that the server resolved from
|
||||
// the DB before forwarding terminal.open over daemonws (the daemon
|
||||
// does not maintain a persistent task cache).
|
||||
d.terminalManager = terminal.NewManager(terminal.ManagerConfig{
|
||||
IdleTimeout: terminal.DefaultIdleTimeout,
|
||||
Logger: logger,
|
||||
OnSessionStart: d.onTerminalSessionStart,
|
||||
OnSessionStop: d.onTerminalSessionStop,
|
||||
}, nil)
|
||||
return d
|
||||
}
|
||||
|
||||
// sendWSFrame pushes a raw frame onto the current daemonws writer queue
|
||||
// without blocking. Returns false when no connection is active or the
|
||||
// writer queue is saturated. Used for *droppable* traffic (currently the
|
||||
// heartbeat sender's fallback path): bytes lost here are recoverable on
|
||||
// the next tick. PTY output must NOT use this path — it goes through
|
||||
// sendWSFrameCtx so a saturated writer back-pressures the producer
|
||||
// instead of corrupting the terminal stream.
|
||||
func (d *Daemon) sendWSFrame(frame []byte) bool {
|
||||
d.wsWritesMu.RLock()
|
||||
writes := d.wsWrites
|
||||
d.wsWritesMu.RUnlock()
|
||||
if writes == nil {
|
||||
return false
|
||||
}
|
||||
select {
|
||||
case writes <- frame:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// sendWSFrameCtx pushes a frame onto the daemonws writer queue and BLOCKS
|
||||
// until either the queue accepts it or ctx is canceled. Returns true when
|
||||
// the frame was queued, false when no writer is active or ctx fired first.
|
||||
//
|
||||
// This is the real-backpressure path for the terminal bridge: when the
|
||||
// hub is saturated, slowing the PTY reader down (which slows the child
|
||||
// process via its own stdout buffer) is the only way to preserve the byte
|
||||
// stream. Dropping silently would print partial output to the user.
|
||||
//
|
||||
// Safety: the caller's ctx is the terminal pump's ctx, which the bridge
|
||||
// teardown (clearWSWrites → bridge.closeAll) cancels and *waits on* before
|
||||
// close(writes) runs in the wakeup loop. That ordering means we never send
|
||||
// on a closed channel even though we don't hold any lock around the send.
|
||||
func (d *Daemon) sendWSFrameCtx(ctx context.Context, frame []byte) bool {
|
||||
d.wsWritesMu.RLock()
|
||||
writes := d.wsWrites
|
||||
d.wsWritesMu.RUnlock()
|
||||
if writes == nil {
|
||||
return false
|
||||
}
|
||||
select {
|
||||
case writes <- frame:
|
||||
return true
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// installWSWrites stores the connection-local writer queue so the bridge
|
||||
// can address it through Daemon.sendWSFrame. A fresh terminalBridge is
|
||||
// installed each call: session_ids minted on a previous WS connection are
|
||||
// not valid on the new one (the server-side proxy registered routing
|
||||
// against the old hub client), so it's cleaner to tear every PTY down
|
||||
// than to half-revive them.
|
||||
func (d *Daemon) installWSWrites(writes chan<- []byte) {
|
||||
d.wsWritesMu.Lock()
|
||||
d.wsWrites = writes
|
||||
d.wsWritesMu.Unlock()
|
||||
|
||||
bridge := newTerminalBridge(d.terminalManager, d.logger, d.sendWSFrame, d.sendWSFrameCtx)
|
||||
d.terminalBridgeMu.Lock()
|
||||
prev := d.terminalBridge
|
||||
d.terminalBridge = bridge
|
||||
d.terminalBridgeMu.Unlock()
|
||||
if prev != nil {
|
||||
prev.closeAll("ws_reconnect")
|
||||
}
|
||||
}
|
||||
|
||||
// clearWSWrites removes the writer pointer and tears down every live
|
||||
// terminal session bound to this connection. Called from the wakeup
|
||||
// connection's deferred cleanup.
|
||||
func (d *Daemon) clearWSWrites() {
|
||||
d.wsWritesMu.Lock()
|
||||
d.wsWrites = nil
|
||||
d.wsWritesMu.Unlock()
|
||||
|
||||
d.terminalBridgeMu.Lock()
|
||||
bridge := d.terminalBridge
|
||||
d.terminalBridge = nil
|
||||
d.terminalBridgeMu.Unlock()
|
||||
if bridge != nil {
|
||||
bridge.closeAll("ws_disconnect")
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Daemon) currentTerminalBridge() *terminalBridge {
|
||||
d.terminalBridgeMu.RLock()
|
||||
defer d.terminalBridgeMu.RUnlock()
|
||||
return d.terminalBridge
|
||||
}
|
||||
|
||||
// setAgentVersion records the detected CLI version for an agent provider so
|
||||
// later task-dispatch code (e.g. Codex sandbox policy) can read it.
|
||||
func (d *Daemon) setAgentVersion(provider, version string) {
|
||||
@@ -2987,6 +3110,46 @@ func (d *Daemon) isActiveEnvRoot(envRoot string) bool {
|
||||
return d.activeEnvRoots[envRoot] > 0
|
||||
}
|
||||
|
||||
// onTerminalSessionStart is the terminal.Manager OnSessionStart hook. It
|
||||
// reference-counts the session's env root into activeEnvRoots so the GC
|
||||
// loop's isActiveEnvRoot check protects the workdir while a terminal is
|
||||
// attached. Without this, an idle terminal on a done/cancelled issue
|
||||
// would have its workdir reclaimed out from under the user on the next
|
||||
// GC cycle (the issue's TTL alone doesn't notice live terminal activity).
|
||||
//
|
||||
// Audit log is emitted here as a structured slog record so operators can
|
||||
// reconstruct who attached to which workdir when, without surfacing
|
||||
// keystrokes — see RFC §Auth.
|
||||
func (d *Daemon) onTerminalSessionStart(s *terminal.PtySession) {
|
||||
envRoot := filepath.Dir(s.WorkDir())
|
||||
d.markActiveEnvRoot(envRoot)
|
||||
d.logger.Info("terminal: session opened",
|
||||
"session_id", s.ID(),
|
||||
"task_id", s.TaskID(),
|
||||
"workspace_id", s.WorkspaceID(),
|
||||
"issue_id", s.IssueID(),
|
||||
"user_id", s.UserID(),
|
||||
"work_dir", s.WorkDir(),
|
||||
"shell", s.Shell(),
|
||||
)
|
||||
}
|
||||
|
||||
// onTerminalSessionStop is the terminal.Manager OnSessionStop hook. Pairs
|
||||
// with onTerminalSessionStart's mark and emits the close audit record
|
||||
// (with duration) for operator visibility.
|
||||
func (d *Daemon) onTerminalSessionStop(s *terminal.PtySession) {
|
||||
envRoot := filepath.Dir(s.WorkDir())
|
||||
d.unmarkActiveEnvRoot(envRoot)
|
||||
d.logger.Info("terminal: session closed",
|
||||
"session_id", s.ID(),
|
||||
"task_id", s.TaskID(),
|
||||
"workspace_id", s.WorkspaceID(),
|
||||
"issue_id", s.IssueID(),
|
||||
"user_id", s.UserID(),
|
||||
"duration", time.Since(s.StartedAt()).Round(time.Second).String(),
|
||||
)
|
||||
}
|
||||
|
||||
// shortID returns the first 8 characters of an ID for readable logs.
|
||||
func shortID(id string) string {
|
||||
if len(id) <= 8 {
|
||||
|
||||
14
server/internal/daemon/terminal/doc.go
Normal file
14
server/internal/daemon/terminal/doc.go
Normal file
@@ -0,0 +1,14 @@
|
||||
// Package terminal manages interactive PTY sessions bound to a task's
|
||||
// workdir on the local daemon.
|
||||
//
|
||||
// A Manager owns the lifecycle of all live PtySessions. Callers (the
|
||||
// daemonws bridge today, the CLI socket later) translate WebSocket
|
||||
// terminal.* frames into method calls on Manager, and the Manager
|
||||
// forwards PTY output back through a per-session Output channel.
|
||||
//
|
||||
// Sessions run with the daemon process's identity — there is no
|
||||
// additional sandbox. This is the same trust boundary as agent runs
|
||||
// (which are also daemon-spawned child processes). The Manager only
|
||||
// enforces that the requesting client's workspace matches the task's
|
||||
// workspace; anything beyond that is the OS's responsibility.
|
||||
package terminal
|
||||
14
server/internal/daemon/terminal/errors.go
Normal file
14
server/internal/daemon/terminal/errors.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package terminal
|
||||
|
||||
import "errors"
|
||||
|
||||
// Sentinel errors returned by Manager. Callers map these to the
|
||||
// protocol.TerminalErrorCode* constants when reporting to clients.
|
||||
var (
|
||||
ErrTaskNotFound = errors.New("terminal: task not found")
|
||||
ErrWorkspaceMismatch = errors.New("terminal: task belongs to a different workspace")
|
||||
ErrSessionNotFound = errors.New("terminal: session not found")
|
||||
ErrUnsupportedOS = errors.New("terminal: PTY not supported on this OS")
|
||||
ErrSpawnFailed = errors.New("terminal: failed to spawn shell")
|
||||
ErrManagerClosed = errors.New("terminal: manager is shut down")
|
||||
)
|
||||
371
server/internal/daemon/terminal/manager.go
Normal file
371
server/internal/daemon/terminal/manager.go
Normal file
@@ -0,0 +1,371 @@
|
||||
package terminal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// DefaultIdleTimeout is the recommended IdleTimeout for production
|
||||
// daemon wiring. Callers must set ManagerConfig.IdleTimeout to this
|
||||
// (or any positive duration) explicitly; zero/negative disables the
|
||||
// idle sweep.
|
||||
const DefaultIdleTimeout = 60 * time.Minute
|
||||
|
||||
// TaskInfo is the subset of task state the Manager needs to set up a PTY.
|
||||
// The daemon resolves a TaskID into TaskInfo via TaskLookup at open time.
|
||||
type TaskInfo struct {
|
||||
TaskID string
|
||||
WorkspaceID string
|
||||
IssueID string
|
||||
WorkDir string
|
||||
PriorSessionID string // injected as CLAUDE_SESSION_ID for `claude --resume`
|
||||
}
|
||||
|
||||
// TaskLookup resolves a TaskID into the workdir + workspace required to
|
||||
// open a PTY. Returns ErrTaskNotFound when the task is unknown. Lookups
|
||||
// hit the daemon's local task cache in production; tests supply a stub.
|
||||
type TaskLookup func(ctx context.Context, taskID string) (TaskInfo, error)
|
||||
|
||||
// OpenParams is the input to Manager.Open.
|
||||
type OpenParams struct {
|
||||
// TaskID identifies the workdir the PTY should run in.
|
||||
TaskID string
|
||||
// WorkspaceID is the workspace the caller is acting on behalf of.
|
||||
// Open rejects the request if it does not match the task's workspace
|
||||
// (cross-workspace ACL — clients never see other workspaces' workdirs).
|
||||
WorkspaceID string
|
||||
// UserID is the human user who opened the terminal. Logged in audit
|
||||
// records; the PTY itself runs as the daemon process owner.
|
||||
UserID string
|
||||
// Cols/Rows seed the initial PTY window size. Zero values default to 80x24.
|
||||
Cols uint16
|
||||
Rows uint16
|
||||
}
|
||||
|
||||
// ManagerConfig tunes Manager behaviour. Zero values are sensible defaults.
|
||||
type ManagerConfig struct {
|
||||
// Shell to spawn for each session. Defaults to "bash" with "-l".
|
||||
// Overridable for tests; the production daemon hardcodes bash for now
|
||||
// (RFC open question #4 — shell selection deferred to a later release).
|
||||
ShellPath string
|
||||
ShellArgs []string
|
||||
|
||||
// IdleTimeout closes a session that has had no I/O for this long.
|
||||
// Zero or negative disables the sweep entirely. Production daemon
|
||||
// wiring should pass DefaultIdleTimeout explicitly; we intentionally
|
||||
// don't default here so callers stay in control (the docs page for
|
||||
// this package previously said "0 disables" while NewManager silently
|
||||
// rewrote 0 to 60min — those two have to agree).
|
||||
IdleTimeout time.Duration
|
||||
|
||||
// Spawner overrides PTY spawning. Defaults to ptyStartShell which
|
||||
// shells out to creack/pty. Tests inject a fake to avoid forking.
|
||||
Spawner Spawner
|
||||
|
||||
// Now returns the current time. Defaults to time.Now. Tests inject a
|
||||
// fake clock to drive IdleTimeout deterministically.
|
||||
Now func() time.Time
|
||||
|
||||
// Logger receives operational events. Defaults to slog.Default().
|
||||
Logger *slog.Logger
|
||||
|
||||
// OnSessionStart fires synchronously after a PTY has spawned and the
|
||||
// session is registered. Wired by the daemon to mark the env root as
|
||||
// active so the GC loop's isActiveEnvRoot check protects the workdir
|
||||
// for as long as a terminal is attached — without this, a long-idle
|
||||
// terminal on a done/cancelled issue would have its workdir reclaimed
|
||||
// out from under the user. Called from Open's caller goroutine.
|
||||
OnSessionStart func(s *PtySession)
|
||||
|
||||
// OnSessionStop fires from waitLoop after the session has been fully
|
||||
// finalized (output closed, deregistered, Done closed). Daemons that
|
||||
// reference-counted in OnSessionStart unmark here. Called exactly once
|
||||
// per session, regardless of close reason.
|
||||
OnSessionStop func(s *PtySession)
|
||||
}
|
||||
|
||||
// Manager owns all live PtySessions on this daemon. It is safe for
|
||||
// concurrent use.
|
||||
type Manager struct {
|
||||
cfg ManagerConfig
|
||||
lookup TaskLookup
|
||||
|
||||
mu sync.Mutex
|
||||
sessions map[string]*PtySession
|
||||
closed bool
|
||||
// closeDone is closed by the first Close() caller AFTER finalize
|
||||
// finishes (every session deregistered, Done() closed). Subsequent
|
||||
// concurrent callers wait on it instead of racing past, so all
|
||||
// Close() returns share the same "manager fully drained" guarantee.
|
||||
closeDone chan struct{}
|
||||
}
|
||||
|
||||
// NewManager constructs a Manager. lookup may be nil in tests that only
|
||||
// exercise direct session APIs.
|
||||
func NewManager(cfg ManagerConfig, lookup TaskLookup) *Manager {
|
||||
if cfg.ShellPath == "" {
|
||||
cfg.ShellPath = "bash"
|
||||
cfg.ShellArgs = []string{"-l"}
|
||||
}
|
||||
// IdleTimeout intentionally not defaulted — see ManagerConfig.
|
||||
if cfg.Spawner == nil {
|
||||
cfg.Spawner = realSpawner{}
|
||||
}
|
||||
if cfg.Now == nil {
|
||||
cfg.Now = time.Now
|
||||
}
|
||||
if cfg.Logger == nil {
|
||||
cfg.Logger = slog.Default()
|
||||
}
|
||||
return &Manager{
|
||||
cfg: cfg,
|
||||
lookup: lookup,
|
||||
sessions: make(map[string]*PtySession),
|
||||
closeDone: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Open spawns a new PTY session for the given task. The returned
|
||||
// session is also registered with the manager and retrievable via Get.
|
||||
func (m *Manager) Open(ctx context.Context, p OpenParams) (*PtySession, error) {
|
||||
if m.lookup == nil {
|
||||
return nil, fmt.Errorf("terminal: Manager has no TaskLookup configured")
|
||||
}
|
||||
info, err := m.lookup(ctx, p.TaskID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if info.WorkspaceID != p.WorkspaceID {
|
||||
return nil, ErrWorkspaceMismatch
|
||||
}
|
||||
if info.WorkDir == "" {
|
||||
return nil, ErrTaskNotFound
|
||||
}
|
||||
return m.openWith(info, p)
|
||||
}
|
||||
|
||||
// OpenWithInfo spawns a PTY against a caller-supplied TaskInfo, bypassing
|
||||
// the configured TaskLookup. Production daemon wiring takes this path
|
||||
// because the server resolves task metadata from its DB before forwarding
|
||||
// terminal.open over the daemonws hub — the daemon trusts the server-
|
||||
// authenticated payload and does not have its own task cache. The
|
||||
// workspace mismatch check still runs so a server bug or a relayed frame
|
||||
// cannot cross workspace boundaries.
|
||||
func (m *Manager) OpenWithInfo(_ context.Context, info TaskInfo, p OpenParams) (*PtySession, error) {
|
||||
if info.WorkspaceID != p.WorkspaceID {
|
||||
return nil, ErrWorkspaceMismatch
|
||||
}
|
||||
if info.WorkDir == "" {
|
||||
return nil, ErrTaskNotFound
|
||||
}
|
||||
if info.TaskID == "" {
|
||||
info.TaskID = p.TaskID
|
||||
}
|
||||
return m.openWith(info, p)
|
||||
}
|
||||
|
||||
func (m *Manager) openWith(info TaskInfo, p OpenParams) (*PtySession, error) {
|
||||
cols, rows := normalizeSize(p.Cols, p.Rows)
|
||||
env := buildEnv(info, p.UserID)
|
||||
|
||||
m.mu.Lock()
|
||||
if m.closed {
|
||||
m.mu.Unlock()
|
||||
return nil, ErrManagerClosed
|
||||
}
|
||||
m.mu.Unlock()
|
||||
|
||||
startedAt := m.cfg.Now()
|
||||
pty, err := m.cfg.Spawner.Start(SpawnRequest{
|
||||
Shell: m.cfg.ShellPath,
|
||||
Args: m.cfg.ShellArgs,
|
||||
Cwd: info.WorkDir,
|
||||
Env: env,
|
||||
Cols: cols,
|
||||
Rows: rows,
|
||||
Started: startedAt,
|
||||
})
|
||||
if err != nil {
|
||||
// Double-%w so errors.Is matches both ErrSpawnFailed AND any
|
||||
// sentinel the spawner surfaced (notably ErrUnsupportedOS from
|
||||
// the windows stub — the protocol layer needs to distinguish
|
||||
// "no PTY on this OS" from generic spawn failures).
|
||||
return nil, fmt.Errorf("%w: %w", ErrSpawnFailed, err)
|
||||
}
|
||||
|
||||
sess := &PtySession{
|
||||
id: uuid.NewString(),
|
||||
taskID: info.TaskID,
|
||||
workspaceID: info.WorkspaceID,
|
||||
issueID: info.IssueID,
|
||||
workDir: info.WorkDir,
|
||||
userID: p.UserID,
|
||||
shellPath: m.cfg.ShellPath,
|
||||
cols: cols,
|
||||
rows: rows,
|
||||
pty: pty,
|
||||
output: make(chan []byte, 64),
|
||||
exit: make(chan ExitInfo, 1),
|
||||
done: make(chan struct{}),
|
||||
stop: make(chan struct{}),
|
||||
now: m.cfg.Now,
|
||||
idleTimeout: m.cfg.IdleTimeout,
|
||||
startedAt: startedAt,
|
||||
lastIO: startedAt,
|
||||
logger: m.cfg.Logger.With("session_id_pending", true, "task_id", info.TaskID),
|
||||
onClose: func(id string) { m.deregister(id) },
|
||||
}
|
||||
sess.logger = m.cfg.Logger.With("session_id", sess.id, "task_id", info.TaskID)
|
||||
|
||||
m.mu.Lock()
|
||||
if m.closed {
|
||||
m.mu.Unlock()
|
||||
_ = pty.Close()
|
||||
// We won that race: spawn succeeded but the manager closed before
|
||||
// we could register the session, so waitLoop never runs. Reap the
|
||||
// child synchronously here — pty.Close fires SIGHUP/SIGKILL but
|
||||
// only Wait() collects the exit status, otherwise the unix child
|
||||
// stays around as a zombie until the daemon process dies.
|
||||
_, _ = pty.Wait()
|
||||
return nil, ErrManagerClosed
|
||||
}
|
||||
m.sessions[sess.id] = sess
|
||||
m.mu.Unlock()
|
||||
|
||||
sess.onStop = m.cfg.OnSessionStop
|
||||
// OnSessionStart MUST fire before sess.start(). waitLoop calls
|
||||
// OnSessionStop when the child exits — if the child exits before
|
||||
// OnSessionStart runs, daemon would unmark an env root it never
|
||||
// marked, then mark it forever after OnSessionStart races in. The
|
||||
// "start hook happens-before stop hook" contract is what makes the
|
||||
// daemon's markActiveEnvRoot / unmark pair balanced.
|
||||
if m.cfg.OnSessionStart != nil {
|
||||
m.cfg.OnSessionStart(sess)
|
||||
}
|
||||
sess.start()
|
||||
return sess, nil
|
||||
}
|
||||
|
||||
// Get returns the session with the given id, or ErrSessionNotFound.
|
||||
func (m *Manager) Get(id string) (*PtySession, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
sess, ok := m.sessions[id]
|
||||
if !ok {
|
||||
return nil, ErrSessionNotFound
|
||||
}
|
||||
return sess, nil
|
||||
}
|
||||
|
||||
// Sessions returns a snapshot of currently registered session IDs.
|
||||
func (m *Manager) Sessions() []string {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
ids := make([]string, 0, len(m.sessions))
|
||||
for id := range m.sessions {
|
||||
ids = append(ids, id)
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
// Close tears down every live session and refuses subsequent Open calls.
|
||||
// Safe to call concurrently from multiple goroutines: the first caller
|
||||
// runs the actual teardown, the rest block on closeDone until that
|
||||
// teardown is fully observable. Every Close() return — first or Nth —
|
||||
// thus carries the same "manager drained, every session finalized"
|
||||
// guarantee that downstream GC/audit cleanup depends on.
|
||||
func (m *Manager) Close() {
|
||||
m.mu.Lock()
|
||||
if m.closed {
|
||||
done := m.closeDone
|
||||
m.mu.Unlock()
|
||||
<-done
|
||||
return
|
||||
}
|
||||
m.closed = true
|
||||
live := make([]*PtySession, 0, len(m.sessions))
|
||||
for _, s := range m.sessions {
|
||||
live = append(live, s)
|
||||
}
|
||||
m.mu.Unlock()
|
||||
// Parallel: each session.Close blocks for the unix spawner's
|
||||
// SIGHUP→grace→SIGKILL window. Running serially would multiply
|
||||
// shutdown latency by N sessions. We additionally wait on each
|
||||
// session's Done() so Manager.Close returning is a hard guarantee
|
||||
// that every session finalized (output closed, deregistered, done
|
||||
// fired) — downstream GC/audit cleanup relies on this.
|
||||
var wg sync.WaitGroup
|
||||
for _, s := range live {
|
||||
wg.Add(1)
|
||||
go func(s *PtySession) {
|
||||
defer wg.Done()
|
||||
s.Close("manager_shutdown")
|
||||
<-s.Done()
|
||||
}(s)
|
||||
}
|
||||
wg.Wait()
|
||||
close(m.closeDone)
|
||||
}
|
||||
|
||||
// CheckIdle walks every session and closes those whose idle interval
|
||||
// has elapsed. The daemon's existing GC loop calls this periodically;
|
||||
// each session also self-monitors via its own timer for cases where the
|
||||
// outer loop runs at a coarser cadence than IdleTimeout.
|
||||
func (m *Manager) CheckIdle() {
|
||||
if m.cfg.IdleTimeout <= 0 {
|
||||
return
|
||||
}
|
||||
m.mu.Lock()
|
||||
sessions := make([]*PtySession, 0, len(m.sessions))
|
||||
for _, s := range m.sessions {
|
||||
sessions = append(sessions, s)
|
||||
}
|
||||
m.mu.Unlock()
|
||||
|
||||
now := m.cfg.Now()
|
||||
for _, s := range sessions {
|
||||
if now.Sub(s.LastIO()) >= m.cfg.IdleTimeout {
|
||||
s.Close("idle_timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) deregister(id string) {
|
||||
m.mu.Lock()
|
||||
delete(m.sessions, id)
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func normalizeSize(cols, rows uint16) (uint16, uint16) {
|
||||
if cols == 0 {
|
||||
cols = 80
|
||||
}
|
||||
if rows == 0 {
|
||||
rows = 24
|
||||
}
|
||||
return cols, rows
|
||||
}
|
||||
|
||||
func buildEnv(info TaskInfo, userID string) []string {
|
||||
env := []string{
|
||||
"MULTICA_WORKSPACE_ID=" + info.WorkspaceID,
|
||||
"MULTICA_TASK_ID=" + info.TaskID,
|
||||
}
|
||||
if info.IssueID != "" {
|
||||
env = append(env, "MULTICA_ISSUE_ID="+info.IssueID)
|
||||
}
|
||||
if userID != "" {
|
||||
env = append(env, "MULTICA_USER_ID="+userID)
|
||||
}
|
||||
if info.PriorSessionID != "" {
|
||||
// Injected so `claude --resume $CLAUDE_SESSION_ID` continues the
|
||||
// same session that the agent run was using (see RFC §Resume).
|
||||
env = append(env, "CLAUDE_SESSION_ID="+info.PriorSessionID)
|
||||
}
|
||||
return env
|
||||
}
|
||||
1027
server/internal/daemon/terminal/manager_test.go
Normal file
1027
server/internal/daemon/terminal/manager_test.go
Normal file
File diff suppressed because it is too large
Load Diff
35
server/internal/daemon/terminal/pty.go
Normal file
35
server/internal/daemon/terminal/pty.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package terminal
|
||||
|
||||
import (
|
||||
"io"
|
||||
"time"
|
||||
)
|
||||
|
||||
// PTY abstracts the platform PTY + child process so tests can swap in a
|
||||
// fake without forking a real shell. Read returns child stdout/stderr;
|
||||
// Write delivers stdin; Resize updates the window; Wait blocks for the
|
||||
// child to exit and returns its exit code; Close terminates the child
|
||||
// and releases the master fd.
|
||||
type PTY interface {
|
||||
io.ReadWriter
|
||||
Resize(cols, rows uint16) error
|
||||
Wait() (exitCode int, err error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
// SpawnRequest is the input to Spawner.Start.
|
||||
type SpawnRequest struct {
|
||||
Shell string
|
||||
Args []string
|
||||
Cwd string
|
||||
Env []string
|
||||
Cols uint16
|
||||
Rows uint16
|
||||
Started time.Time
|
||||
}
|
||||
|
||||
// Spawner creates new PTYs. Production uses realSpawner; tests inject a
|
||||
// channel-backed fake (see fakePTY in manager_test.go).
|
||||
type Spawner interface {
|
||||
Start(SpawnRequest) (PTY, error)
|
||||
}
|
||||
283
server/internal/daemon/terminal/session.go
Normal file
283
server/internal/daemon/terminal/session.go
Normal file
@@ -0,0 +1,283 @@
|
||||
package terminal
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ExitInfo describes how a session terminated.
|
||||
type ExitInfo struct {
|
||||
ExitCode int
|
||||
Reason string
|
||||
}
|
||||
|
||||
// PtySession is a single live PTY + child shell. Methods are safe for
|
||||
// concurrent use; readers consume from Output() and ExitC() until
|
||||
// Output() is closed, which always follows an ExitC() send.
|
||||
type PtySession struct {
|
||||
id string
|
||||
taskID string
|
||||
workspaceID string
|
||||
issueID string
|
||||
workDir string
|
||||
userID string
|
||||
shellPath string
|
||||
|
||||
mu sync.Mutex
|
||||
cols, rows uint16
|
||||
pty PTY
|
||||
output chan []byte
|
||||
exit chan ExitInfo
|
||||
done chan struct{}
|
||||
stop chan struct{}
|
||||
stopOnce sync.Once
|
||||
closing bool
|
||||
closeReason string
|
||||
|
||||
// wg tracks readLoop and idleLoop. waitLoop is the finalizer: it
|
||||
// waits on wg before closing output/done so we never close the
|
||||
// output channel while readLoop is mid-send.
|
||||
wg sync.WaitGroup
|
||||
|
||||
now func() time.Time
|
||||
idleTimeout time.Duration
|
||||
startedAt time.Time
|
||||
lastIO time.Time
|
||||
|
||||
logger *slog.Logger
|
||||
onClose func(string)
|
||||
onStop func(*PtySession)
|
||||
}
|
||||
|
||||
// ID returns the session identifier.
|
||||
func (s *PtySession) ID() string { return s.id }
|
||||
|
||||
// TaskID returns the task this session is bound to.
|
||||
func (s *PtySession) TaskID() string { return s.taskID }
|
||||
|
||||
// WorkspaceID returns the workspace this session belongs to.
|
||||
func (s *PtySession) WorkspaceID() string { return s.workspaceID }
|
||||
|
||||
// IssueID returns the issue this session was opened from, if any.
|
||||
func (s *PtySession) IssueID() string { return s.issueID }
|
||||
|
||||
// WorkDir returns the cwd of the child shell.
|
||||
func (s *PtySession) WorkDir() string { return s.workDir }
|
||||
|
||||
// UserID returns the human user who opened the session.
|
||||
func (s *PtySession) UserID() string { return s.userID }
|
||||
|
||||
// Shell returns the shell binary path that was spawned.
|
||||
func (s *PtySession) Shell() string { return s.shellPath }
|
||||
|
||||
// StartedAt returns the wall-clock time the session was spawned.
|
||||
func (s *PtySession) StartedAt() time.Time { return s.startedAt }
|
||||
|
||||
// LastIO returns the most recent time data flowed in either direction.
|
||||
func (s *PtySession) LastIO() time.Time {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.lastIO
|
||||
}
|
||||
|
||||
// Output yields PTY output chunks as they arrive. The channel closes
|
||||
// after the child exits and a value has been delivered on ExitC().
|
||||
func (s *PtySession) Output() <-chan []byte { return s.output }
|
||||
|
||||
// ExitC fires once when the child exits. After that, Output() closes.
|
||||
func (s *PtySession) ExitC() <-chan ExitInfo { return s.exit }
|
||||
|
||||
// Done returns a channel closed when the session is fully torn down
|
||||
// (all goroutines exited, registry deregistered).
|
||||
func (s *PtySession) Done() <-chan struct{} { return s.done }
|
||||
|
||||
// Write forwards bytes to the PTY stdin. Returns the byte count actually
|
||||
// written. Updates LastIO so idle detection sees the activity.
|
||||
func (s *PtySession) Write(p []byte) (int, error) {
|
||||
s.mu.Lock()
|
||||
if s.closing {
|
||||
s.mu.Unlock()
|
||||
return 0, ErrSessionNotFound
|
||||
}
|
||||
pty := s.pty
|
||||
s.lastIO = s.now()
|
||||
s.mu.Unlock()
|
||||
return pty.Write(p)
|
||||
}
|
||||
|
||||
// Resize updates the PTY window size.
|
||||
func (s *PtySession) Resize(cols, rows uint16) error {
|
||||
cols, rows = normalizeSize(cols, rows)
|
||||
s.mu.Lock()
|
||||
if s.closing {
|
||||
s.mu.Unlock()
|
||||
return ErrSessionNotFound
|
||||
}
|
||||
s.cols = cols
|
||||
s.rows = rows
|
||||
pty := s.pty
|
||||
s.lastIO = s.now()
|
||||
s.mu.Unlock()
|
||||
return pty.Resize(cols, rows)
|
||||
}
|
||||
|
||||
// Size returns the current cols, rows of the PTY.
|
||||
func (s *PtySession) Size() (uint16, uint16) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.cols, s.rows
|
||||
}
|
||||
|
||||
// Close tears down the session. Subsequent calls are no-ops. The
|
||||
// reason is recorded for audit logging and the terminal.exit payload.
|
||||
//
|
||||
// Close only initiates teardown — signals stop, closes the PTY, returns.
|
||||
// waitLoop is the actual finalizer: it waits for readLoop + idleLoop
|
||||
// to exit (via wg) before closing output/done. That ordering is what
|
||||
// makes "Close while output buffer is full" safe — readLoop's blocked
|
||||
// send unblocks on <-stop, and only then does the output channel close.
|
||||
func (s *PtySession) Close(reason string) {
|
||||
s.mu.Lock()
|
||||
if s.closing {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
s.closing = true
|
||||
s.closeReason = reason
|
||||
pty := s.pty
|
||||
s.mu.Unlock()
|
||||
|
||||
s.stopOnce.Do(func() { close(s.stop) })
|
||||
|
||||
if pty != nil {
|
||||
// pty.Close on the unix spawner runs SIGHUP → grace → SIGKILL.
|
||||
// It's idempotent (sync.Once), so the second call from waitLoop's
|
||||
// finalizer is a no-op.
|
||||
_ = pty.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// start kicks off the reader, exit-watch, and (optional) idle
|
||||
// goroutines. Manager.Open is the only caller. wg.Add runs
|
||||
// synchronously before waitLoop is spawned so wg.Wait sees the
|
||||
// correct count even if Close fires immediately.
|
||||
func (s *PtySession) start() {
|
||||
s.wg.Add(1)
|
||||
go s.readLoop()
|
||||
if s.idleTimeout > 0 {
|
||||
s.wg.Add(1)
|
||||
go s.idleLoop()
|
||||
}
|
||||
go s.waitLoop()
|
||||
}
|
||||
|
||||
func (s *PtySession) readLoop() {
|
||||
defer s.wg.Done()
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
n, err := s.pty.Read(buf)
|
||||
if n > 0 {
|
||||
chunk := make([]byte, n)
|
||||
copy(chunk, buf[:n])
|
||||
s.mu.Lock()
|
||||
s.lastIO = s.now()
|
||||
s.mu.Unlock()
|
||||
select {
|
||||
case s.output <- chunk:
|
||||
case <-s.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, io.EOF) && err != io.ErrClosedPipe {
|
||||
s.logger.Debug("pty read error", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *PtySession) waitLoop() {
|
||||
code, waitErr := s.pty.Wait()
|
||||
|
||||
s.mu.Lock()
|
||||
reason := s.closeReason
|
||||
if reason == "" {
|
||||
if waitErr != nil {
|
||||
reason = "wait_error"
|
||||
} else {
|
||||
reason = "exited"
|
||||
}
|
||||
s.closeReason = reason
|
||||
}
|
||||
s.closing = true
|
||||
s.mu.Unlock()
|
||||
|
||||
// Ensure the PTY fd is closed so readLoop's pty.Read returns EOF.
|
||||
// pty.Close is idempotent (sync.Once on the unix spawner).
|
||||
_ = s.pty.Close()
|
||||
|
||||
// Signal stop so idleLoop and any blocked send in readLoop exit.
|
||||
s.stopOnce.Do(func() { close(s.stop) })
|
||||
|
||||
// Wait for readLoop + idleLoop before closing output/done. This is
|
||||
// the invariant that prevents "send on closed channel" panics when
|
||||
// output is full: readLoop is either past its send or unblocked via
|
||||
// <-stop, but never racing with close(s.output).
|
||||
s.wg.Wait()
|
||||
|
||||
// Finalize order is load-bearing: external waiters use `<-Done()` as
|
||||
// a signal that the session is fully torn down AND deregistered from
|
||||
// the manager. The sequence must be:
|
||||
// ExitC → close(output) → onClose/deregister → close(done)
|
||||
// so that any consumer doing `<-Done(); manager.Get(id)` after a
|
||||
// teardown is guaranteed to observe ErrSessionNotFound.
|
||||
select {
|
||||
case s.exit <- ExitInfo{ExitCode: code, Reason: reason}:
|
||||
default:
|
||||
}
|
||||
close(s.output)
|
||||
if s.onClose != nil {
|
||||
s.onClose(s.id)
|
||||
}
|
||||
if s.onStop != nil {
|
||||
// Fires after deregister so a Manager.OnSessionStop callback sees a
|
||||
// session that no longer appears in Sessions()/Get(); fires before
|
||||
// close(done) so `<-Done()` implies the daemon's GC-unmark hook
|
||||
// has already run.
|
||||
s.onStop(s)
|
||||
}
|
||||
close(s.done)
|
||||
}
|
||||
|
||||
func (s *PtySession) idleLoop() {
|
||||
defer s.wg.Done()
|
||||
// Sample at IdleTimeout/4 so reaction time is bounded but ticks
|
||||
// stay cheap with many sessions. Manager.CheckIdle catches anything
|
||||
// this loop misses (e.g. when daemon's outer GC tick is coarser).
|
||||
interval := s.idleTimeout / 4
|
||||
if interval < time.Second {
|
||||
interval = time.Second
|
||||
}
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-s.stop:
|
||||
return
|
||||
case <-t.C:
|
||||
if s.now().Sub(s.LastIO()) >= s.idleTimeout {
|
||||
// Close calls pty.Close + waits for wg in waitLoop. If
|
||||
// we ran it inline, waitLoop's wg.Wait would block on
|
||||
// this goroutine, which can't exit until Close returns
|
||||
// — deadlock. Spawning lets idleLoop return and
|
||||
// decrement wg.
|
||||
go s.Close("idle_timeout")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
102
server/internal/daemon/terminal/spawner_unix.go
Normal file
102
server/internal/daemon/terminal/spawner_unix.go
Normal file
@@ -0,0 +1,102 @@
|
||||
//go:build !windows
|
||||
|
||||
package terminal
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/creack/pty"
|
||||
)
|
||||
|
||||
// closeGracePeriod is the window between SIGHUP and SIGKILL during a
|
||||
// Close. Long enough for interactive shells to run trap handlers and
|
||||
// flush state; short enough that closing a tab feels instant.
|
||||
const closeGracePeriod = 250 * time.Millisecond
|
||||
|
||||
// realSpawner forks the shell on a PTY using creack/pty. Linux/macOS
|
||||
// only; Windows reaches the stub in spawner_windows.go and returns
|
||||
// ErrUnsupportedOS.
|
||||
type realSpawner struct{}
|
||||
|
||||
func (realSpawner) Start(req SpawnRequest) (PTY, error) {
|
||||
cmd := exec.Command(req.Shell, req.Args...)
|
||||
cmd.Dir = req.Cwd
|
||||
|
||||
// Inherit the daemon's PATH so users get whatever CLIs are installed
|
||||
// in the daemon's environment (claude, codex, multica, etc.); merge
|
||||
// in the per-session vars built by buildEnv.
|
||||
env := os.Environ()
|
||||
env = append(env, req.Env...)
|
||||
cmd.Env = env
|
||||
|
||||
size := &pty.Winsize{Cols: req.Cols, Rows: req.Rows}
|
||||
f, err := pty.StartWithSize(cmd, size)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pty.StartWithSize: %w", err)
|
||||
}
|
||||
return &unixPTY{cmd: cmd, file: f}, nil
|
||||
}
|
||||
|
||||
type unixPTY struct {
|
||||
cmd *exec.Cmd
|
||||
file *os.File
|
||||
exited atomic.Bool
|
||||
closeOnce sync.Once
|
||||
closeErr error
|
||||
}
|
||||
|
||||
func (p *unixPTY) Read(b []byte) (int, error) { return p.file.Read(b) }
|
||||
func (p *unixPTY) Write(b []byte) (int, error) { return p.file.Write(b) }
|
||||
|
||||
func (p *unixPTY) Resize(cols, rows uint16) error {
|
||||
return pty.Setsize(p.file, &pty.Winsize{Cols: cols, Rows: rows})
|
||||
}
|
||||
|
||||
func (p *unixPTY) Wait() (int, error) {
|
||||
err := p.cmd.Wait()
|
||||
p.exited.Store(true)
|
||||
if p.cmd.ProcessState != nil {
|
||||
return p.cmd.ProcessState.ExitCode(), err
|
||||
}
|
||||
return -1, err
|
||||
}
|
||||
|
||||
// Close terminates the child shell and releases the PTY master fd.
|
||||
// Closing a tab is a hangup, not an interrupt — so the signal path is
|
||||
// SIGHUP → brief grace → SIGKILL → file.Close, in that order:
|
||||
//
|
||||
// - SIGHUP gives interactive shells a chance to run trap handlers,
|
||||
// write history, etc. before the fd disappears.
|
||||
// - The grace window is bounded; anything slower than that is stuck.
|
||||
// - SIGKILL is the cliff for shells that ignore HUP.
|
||||
// - file.Close releases the master fd last so the slave side keeps
|
||||
// working during cleanup.
|
||||
//
|
||||
// Signals are sent to the negated pid so they hit the whole process
|
||||
// group. creack/pty starts the child as a session leader (Setsid), so
|
||||
// pid == pgid and any descendants the user spawned in the shell are
|
||||
// caught by the same kill.
|
||||
//
|
||||
// If the child already exited naturally (Wait returned), all signal
|
||||
// work is skipped — we only close the fd. That avoids a pointless
|
||||
// 250ms sleep in the natural-exit teardown path.
|
||||
func (p *unixPTY) Close() error {
|
||||
p.closeOnce.Do(func() {
|
||||
if p.cmd.Process != nil && !p.exited.Load() {
|
||||
pid := p.cmd.Process.Pid
|
||||
_ = syscall.Kill(-pid, syscall.SIGHUP)
|
||||
time.Sleep(closeGracePeriod)
|
||||
if !p.exited.Load() {
|
||||
_ = syscall.Kill(-pid, syscall.SIGKILL)
|
||||
}
|
||||
}
|
||||
p.closeErr = p.file.Close()
|
||||
})
|
||||
return p.closeErr
|
||||
}
|
||||
9
server/internal/daemon/terminal/spawner_windows.go
Normal file
9
server/internal/daemon/terminal/spawner_windows.go
Normal file
@@ -0,0 +1,9 @@
|
||||
//go:build windows
|
||||
|
||||
package terminal
|
||||
|
||||
// realSpawner on Windows always refuses — ConPty support is RFC P1 and
|
||||
// the Desktop button + CLI both surface a clear error from this layer.
|
||||
type realSpawner struct{}
|
||||
|
||||
func (realSpawner) Start(SpawnRequest) (PTY, error) { return nil, ErrUnsupportedOS }
|
||||
316
server/internal/daemon/terminal_bridge.go
Normal file
316
server/internal/daemon/terminal_bridge.go
Normal file
@@ -0,0 +1,316 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"sync"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/daemon/terminal"
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
// terminalBridge adapts the daemon-side terminal.Manager to the daemonws
|
||||
// WebSocket transport. Per session it:
|
||||
//
|
||||
// - relays PtySession.Output() → terminal.data frames (daemon→server)
|
||||
// - relays PtySession.ExitC() → terminal.exit frames
|
||||
// - tears the bridge goroutine down when Done() fires
|
||||
//
|
||||
// Two send paths are wired:
|
||||
//
|
||||
// - send (non-blocking): used for control / handshake frames that
|
||||
// are safe to drop on backlog (terminal.opened, terminal.exit,
|
||||
// terminal.error). Maps to Daemon.sendWSFrame.
|
||||
// - sendCtx (blocking with ctx escape): used for PTY data frames so a
|
||||
// saturated hub writer back-pressures the producer instead
|
||||
// of corrupting the terminal byte stream. Maps to
|
||||
// Daemon.sendWSFrameCtx.
|
||||
type terminalBridge struct {
|
||||
manager *terminal.Manager
|
||||
logger *slog.Logger
|
||||
send func([]byte) bool
|
||||
sendCtx func(context.Context, []byte) bool
|
||||
|
||||
mu sync.Mutex
|
||||
sessions map[string]*terminalRoute
|
||||
}
|
||||
|
||||
type terminalRoute struct {
|
||||
session *terminal.PtySession
|
||||
cancel context.CancelFunc
|
||||
pumpDone chan struct{}
|
||||
}
|
||||
|
||||
func newTerminalBridge(mgr *terminal.Manager, logger *slog.Logger, send func([]byte) bool, sendCtx func(context.Context, []byte) bool) *terminalBridge {
|
||||
return &terminalBridge{
|
||||
manager: mgr,
|
||||
logger: logger,
|
||||
send: send,
|
||||
sendCtx: sendCtx,
|
||||
sessions: make(map[string]*terminalRoute),
|
||||
}
|
||||
}
|
||||
|
||||
// handleFrame dispatches a single terminal.* envelope from the server. The
|
||||
// caller already decoded protocol.Message; we receive the inner type+payload.
|
||||
func (b *terminalBridge) handleFrame(msgType string, payload json.RawMessage) {
|
||||
switch msgType {
|
||||
case protocol.MessageTypeTerminalOpen:
|
||||
var p protocol.TerminalOpenPayload
|
||||
if err := json.Unmarshal(payload, &p); err != nil {
|
||||
b.logger.Debug("terminal.open invalid payload", "error", err)
|
||||
return
|
||||
}
|
||||
b.handleOpen(p)
|
||||
case protocol.MessageTypeTerminalData:
|
||||
var p protocol.TerminalDataPayload
|
||||
if err := json.Unmarshal(payload, &p); err != nil {
|
||||
b.logger.Debug("terminal.data invalid payload", "error", err)
|
||||
return
|
||||
}
|
||||
b.handleData(p)
|
||||
case protocol.MessageTypeTerminalResize:
|
||||
var p protocol.TerminalResizePayload
|
||||
if err := json.Unmarshal(payload, &p); err != nil {
|
||||
b.logger.Debug("terminal.resize invalid payload", "error", err)
|
||||
return
|
||||
}
|
||||
b.handleResize(p)
|
||||
case protocol.MessageTypeTerminalClose:
|
||||
var p protocol.TerminalClosePayload
|
||||
if err := json.Unmarshal(payload, &p); err != nil {
|
||||
b.logger.Debug("terminal.close invalid payload", "error", err)
|
||||
return
|
||||
}
|
||||
b.handleClose(p)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *terminalBridge) handleOpen(p protocol.TerminalOpenPayload) {
|
||||
info := terminal.TaskInfo{
|
||||
TaskID: p.TaskID,
|
||||
WorkspaceID: p.WorkspaceID,
|
||||
IssueID: p.IssueID,
|
||||
WorkDir: p.WorkDir,
|
||||
PriorSessionID: p.PriorSessionID,
|
||||
}
|
||||
sess, err := b.manager.OpenWithInfo(context.Background(), info, terminal.OpenParams{
|
||||
TaskID: p.TaskID,
|
||||
WorkspaceID: p.WorkspaceID,
|
||||
UserID: p.UserID,
|
||||
Cols: p.Cols,
|
||||
Rows: p.Rows,
|
||||
})
|
||||
if err != nil {
|
||||
b.sendError(p.RequestID, "", mapTerminalError(err), err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
pumpDone := make(chan struct{})
|
||||
b.mu.Lock()
|
||||
b.sessions[sess.ID()] = &terminalRoute{session: sess, cancel: cancel, pumpDone: pumpDone}
|
||||
b.mu.Unlock()
|
||||
|
||||
b.sendFrame(protocol.MessageTypeTerminalOpened, protocol.TerminalOpenedPayload{
|
||||
RequestID: p.RequestID,
|
||||
SessionID: sess.ID(),
|
||||
WorkDir: sess.WorkDir(),
|
||||
Shell: sess.Shell(),
|
||||
})
|
||||
|
||||
go func() {
|
||||
defer close(pumpDone)
|
||||
b.pump(ctx, sess)
|
||||
}()
|
||||
}
|
||||
|
||||
func (b *terminalBridge) handleData(p protocol.TerminalDataPayload) {
|
||||
sess, err := b.manager.Get(p.SessionID)
|
||||
if err != nil {
|
||||
b.sendError("", p.SessionID, protocol.TerminalErrorCodeSessionNotFound, err.Error())
|
||||
return
|
||||
}
|
||||
data, err := base64.StdEncoding.DecodeString(p.DataB64)
|
||||
if err != nil {
|
||||
b.logger.Debug("terminal.data invalid base64", "error", err, "session_id", p.SessionID)
|
||||
return
|
||||
}
|
||||
if _, err := sess.Write(data); err != nil {
|
||||
b.logger.Debug("terminal.data write failed", "error", err, "session_id", p.SessionID)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *terminalBridge) handleResize(p protocol.TerminalResizePayload) {
|
||||
sess, err := b.manager.Get(p.SessionID)
|
||||
if err != nil {
|
||||
b.sendError("", p.SessionID, protocol.TerminalErrorCodeSessionNotFound, err.Error())
|
||||
return
|
||||
}
|
||||
if err := sess.Resize(p.Cols, p.Rows); err != nil {
|
||||
b.logger.Debug("terminal.resize failed", "error", err, "session_id", p.SessionID)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *terminalBridge) handleClose(p protocol.TerminalClosePayload) {
|
||||
sess, err := b.manager.Get(p.SessionID)
|
||||
if err != nil {
|
||||
// Already gone — nothing to do; the server side has already received
|
||||
// a terminal.exit frame (or will, through the pump goroutine).
|
||||
return
|
||||
}
|
||||
reason := p.Reason
|
||||
if reason == "" {
|
||||
reason = "client_close"
|
||||
}
|
||||
sess.Close(reason)
|
||||
}
|
||||
|
||||
// pump bridges one session's output channel onto the WS as terminal.data
|
||||
// frames, and emits a terminal.exit when the child exits. Returns when
|
||||
// either the session is fully torn down or ctx is cancelled.
|
||||
//
|
||||
// terminal.data is delivered with REAL backpressure (sendDataFrame blocks
|
||||
// on a full hub writer). That is intentional: a saturated writer must
|
||||
// slow the PTY reader down, not drop bytes — half-streams break shells
|
||||
// far worse than a momentary lag. Heartbeat / control frames still go
|
||||
// through the droppable send path because they are recoverable.
|
||||
func (b *terminalBridge) pump(ctx context.Context, sess *terminal.PtySession) {
|
||||
sessionID := sess.ID()
|
||||
defer func() {
|
||||
b.mu.Lock()
|
||||
delete(b.sessions, sessionID)
|
||||
b.mu.Unlock()
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chunk, ok := <-sess.Output():
|
||||
if !ok {
|
||||
// Output closed → child exited and waitLoop finalized.
|
||||
// ExitC was already delivered (or about to be); pull it once
|
||||
// non-blocking and forward, then exit the pump.
|
||||
var info terminal.ExitInfo
|
||||
select {
|
||||
case info = <-sess.ExitC():
|
||||
default:
|
||||
}
|
||||
b.sendFrame(protocol.MessageTypeTerminalExit, protocol.TerminalExitPayload{
|
||||
SessionID: sessionID,
|
||||
ExitCode: info.ExitCode,
|
||||
Reason: info.Reason,
|
||||
})
|
||||
<-sess.Done()
|
||||
return
|
||||
}
|
||||
if !b.sendDataFrame(ctx, sessionID, chunk) {
|
||||
// ctx canceled (bridge being torn down) — bail. We don't
|
||||
// emit a terminal.exit here: the teardown path on the
|
||||
// caller side already accounts for the session going away.
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sendDataFrame is the backpressure-aware variant of sendFrame, used only
|
||||
// for terminal.data. Returns false iff ctx was canceled mid-send (i.e.,
|
||||
// the bridge is being torn down).
|
||||
func (b *terminalBridge) sendDataFrame(ctx context.Context, sessionID string, chunk []byte) bool {
|
||||
raw, err := json.Marshal(protocol.TerminalDataPayload{
|
||||
SessionID: sessionID,
|
||||
DataB64: base64.StdEncoding.EncodeToString(chunk),
|
||||
})
|
||||
if err != nil {
|
||||
b.logger.Debug("terminal data payload marshal failed", "error", err, "session_id", sessionID)
|
||||
return true
|
||||
}
|
||||
frame, err := json.Marshal(protocol.Message{Type: protocol.MessageTypeTerminalData, Payload: raw})
|
||||
if err != nil {
|
||||
b.logger.Debug("terminal data envelope marshal failed", "error", err, "session_id", sessionID)
|
||||
return true
|
||||
}
|
||||
if b.sendCtx == nil {
|
||||
// Defensive: pre-test bridges may not have plumbed sendCtx. Fall
|
||||
// back to the non-blocking sender so existing tests still run.
|
||||
_ = b.send(frame)
|
||||
return true
|
||||
}
|
||||
return b.sendCtx(ctx, frame)
|
||||
}
|
||||
|
||||
// closeAll tears down every live session. Called when the daemon
|
||||
// disconnects from the server: the browser proxy will fail downstream,
|
||||
// and a reconnect cannot resurrect the pre-existing PTYs because the
|
||||
// session_ids only existed in the prior WS context.
|
||||
//
|
||||
// closeAll BLOCKS until every pump goroutine has actually exited. The
|
||||
// wakeup loop relies on this guarantee: after closeAll returns, no pump
|
||||
// goroutine can still be calling sendWSFrameCtx, so the wakeup loop can
|
||||
// safely close the writes channel without racing producers.
|
||||
func (b *terminalBridge) closeAll(reason string) {
|
||||
b.mu.Lock()
|
||||
routes := make([]*terminalRoute, 0, len(b.sessions))
|
||||
for _, r := range b.sessions {
|
||||
routes = append(routes, r)
|
||||
}
|
||||
b.mu.Unlock()
|
||||
for _, r := range routes {
|
||||
r.cancel()
|
||||
r.session.Close(reason)
|
||||
}
|
||||
for _, r := range routes {
|
||||
if r.pumpDone != nil {
|
||||
<-r.pumpDone
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *terminalBridge) sendFrame(msgType string, payload any) {
|
||||
raw, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
b.logger.Debug("terminal frame marshal failed", "error", err, "type", msgType)
|
||||
return
|
||||
}
|
||||
frame, err := json.Marshal(protocol.Message{Type: msgType, Payload: raw})
|
||||
if err != nil {
|
||||
b.logger.Debug("terminal envelope marshal failed", "error", err, "type", msgType)
|
||||
return
|
||||
}
|
||||
if !b.send(frame) {
|
||||
b.logger.Debug("terminal frame dropped: ws disconnected or backed up", "type", msgType)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *terminalBridge) sendError(requestID, sessionID, code, message string) {
|
||||
b.sendFrame(protocol.MessageTypeTerminalError, protocol.TerminalErrorPayload{
|
||||
RequestID: requestID,
|
||||
SessionID: sessionID,
|
||||
Code: code,
|
||||
Message: message,
|
||||
})
|
||||
}
|
||||
|
||||
// mapTerminalError translates the terminal package's sentinel errors into
|
||||
// protocol error codes the browser proxy can render. Anything we don't
|
||||
// recognise falls back to TerminalErrorCodeInternal — drop information
|
||||
// rather than surface internal wrap text to the user.
|
||||
func mapTerminalError(err error) string {
|
||||
switch {
|
||||
case errors.Is(err, terminal.ErrWorkspaceMismatch):
|
||||
return protocol.TerminalErrorCodeWorkspaceMismatch
|
||||
case errors.Is(err, terminal.ErrTaskNotFound):
|
||||
return protocol.TerminalErrorCodeTaskNotFound
|
||||
case errors.Is(err, terminal.ErrSessionNotFound):
|
||||
return protocol.TerminalErrorCodeSessionNotFound
|
||||
case errors.Is(err, terminal.ErrUnsupportedOS):
|
||||
return protocol.TerminalErrorCodeUnsupportedOS
|
||||
case errors.Is(err, terminal.ErrSpawnFailed):
|
||||
return protocol.TerminalErrorCodeSpawnFailed
|
||||
}
|
||||
return protocol.TerminalErrorCodeInternal
|
||||
}
|
||||
204
server/internal/daemon/terminal_bridge_lifecycle_test.go
Normal file
204
server/internal/daemon/terminal_bridge_lifecycle_test.go
Normal file
@@ -0,0 +1,204 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/daemon/terminal"
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
// openBridgeSession is the shared "open one terminal session through the
|
||||
// bridge" helper for lifecycle tests: spawns a fake PTY, opens it via the
|
||||
// bridge, waits for terminal.opened to come back, and returns the session
|
||||
// id plus the fake PTY (so the test can push child output later).
|
||||
func openBridgeSession(t *testing.T, bridge *terminalBridge, sender *captureSender, pty *fakeBridgePTY) string {
|
||||
t.Helper()
|
||||
openPayload, err := json.Marshal(protocol.TerminalOpenPayload{
|
||||
RequestID: "req-bp",
|
||||
TaskID: "task-bp",
|
||||
WorkspaceID: "ws-bp",
|
||||
WorkDir: t.TempDir(),
|
||||
Cols: 80,
|
||||
Rows: 24,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("marshal open: %v", err)
|
||||
}
|
||||
bridge.handleFrame(protocol.MessageTypeTerminalOpen, openPayload)
|
||||
openedMsg := sender.waitFor(t, protocol.MessageTypeTerminalOpened, time.Second)
|
||||
var opened protocol.TerminalOpenedPayload
|
||||
if err := json.Unmarshal(openedMsg.Payload, &opened); err != nil {
|
||||
t.Fatalf("opened payload: %v", err)
|
||||
}
|
||||
if opened.SessionID == "" {
|
||||
t.Fatalf("expected non-empty session id")
|
||||
}
|
||||
return opened.SessionID
|
||||
}
|
||||
|
||||
// TestTerminalBridge_DataBackpressureNoSilentDrop pins Phase 2 review
|
||||
// blocker 2: terminal.data must NOT be silently dropped when the daemon's
|
||||
// outbound WS queue is saturated. Instead, the pump back-pressures the
|
||||
// PTY reader via a blocking send (with ctx escape), so the eventual
|
||||
// reader still sees every byte.
|
||||
//
|
||||
// The shape of the test:
|
||||
//
|
||||
// - We use a writes channel of size 1 to mimic a hot, saturated hub.
|
||||
// - sendCtx blocks on this channel (the real backpressure path).
|
||||
// - The test pushes 4 PTY chunks into the session while the consumer
|
||||
// is asleep — the pump cannot drop them.
|
||||
// - The consumer then drains all 4 frames in order.
|
||||
//
|
||||
// If the bridge regresses to the old `default: drop` behavior, fewer
|
||||
// than 4 chunks will be observed and the assertion fails.
|
||||
func TestTerminalBridge_DataBackpressureNoSilentDrop(t *testing.T) {
|
||||
writes := make(chan []byte, 1)
|
||||
sendCtx := func(ctx context.Context, frame []byte) bool {
|
||||
select {
|
||||
case writes <- frame:
|
||||
return true
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
pty := newFakeBridgePTY(80, 24)
|
||||
spawner := &stubSpawner{pty: pty}
|
||||
mgr := terminal.NewManager(terminal.ManagerConfig{
|
||||
Spawner: spawner,
|
||||
Logger: slog.Default(),
|
||||
}, nil)
|
||||
defer mgr.Close()
|
||||
|
||||
sender := &captureSender{}
|
||||
bridge := newTerminalBridge(mgr, slog.Default(), sender.send, sendCtx)
|
||||
|
||||
sessionID := openBridgeSession(t, bridge, sender, pty)
|
||||
|
||||
// Push 4 chunks. The pump can only buffer 1 in the writes channel; the
|
||||
// remainder must back-pressure via the PTY's bounded output channel
|
||||
// (cap 4 in fakeBridgePTY). If any chunk were dropped instead of
|
||||
// pressed back, the count below would fall short.
|
||||
chunks := []string{"chunk-1\n", "chunk-2\n", "chunk-3\n", "chunk-4\n"}
|
||||
for _, c := range chunks {
|
||||
pty.out <- []byte(c)
|
||||
}
|
||||
|
||||
// Drain writes; reassemble the data frames the pump emitted.
|
||||
got := make([]string, 0, len(chunks))
|
||||
deadline := time.Now().Add(2 * time.Second)
|
||||
for len(got) < len(chunks) {
|
||||
select {
|
||||
case frame := <-writes:
|
||||
var env protocol.Message
|
||||
if err := json.Unmarshal(frame, &env); err != nil {
|
||||
t.Fatalf("envelope: %v", err)
|
||||
}
|
||||
if env.Type != protocol.MessageTypeTerminalData {
|
||||
continue
|
||||
}
|
||||
var dp protocol.TerminalDataPayload
|
||||
if err := json.Unmarshal(env.Payload, &dp); err != nil {
|
||||
t.Fatalf("data payload: %v", err)
|
||||
}
|
||||
if dp.SessionID != sessionID {
|
||||
t.Fatalf("session_id mismatch: got %q want %q", dp.SessionID, sessionID)
|
||||
}
|
||||
decoded, err := base64.StdEncoding.DecodeString(dp.DataB64)
|
||||
if err != nil {
|
||||
t.Fatalf("decode: %v", err)
|
||||
}
|
||||
got = append(got, string(decoded))
|
||||
case <-time.After(time.Until(deadline)):
|
||||
t.Fatalf("only saw %d/%d chunks before timeout — backpressure regressed to silent drop", len(got), len(chunks))
|
||||
}
|
||||
}
|
||||
|
||||
for i, c := range chunks {
|
||||
if got[i] != c {
|
||||
t.Errorf("chunk %d: got %q, want %q", i, got[i], c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestTerminalBridge_TeardownDoesNotPanicOnInFlightSend pins Phase 2
|
||||
// review blocker 1: when the daemonws connection drops while a terminal
|
||||
// pump is mid-send, the teardown must NOT cause `send on closed channel`.
|
||||
// The required invariant is that bridge.closeAll cancels and *waits for*
|
||||
// every pump goroutine before the wakeup loop closes the writes channel.
|
||||
//
|
||||
// This test models the wakeup loop's teardown sequence directly:
|
||||
//
|
||||
// 1. Wire a writes channel and a backpressure sendCtx, same as production.
|
||||
// 2. Open a session and stall the pump on a full writes channel.
|
||||
// 3. Run closeAll → close(writes) in the same goroutine, exactly the
|
||||
// order wakeup.go now uses.
|
||||
// 4. Assert: no panic, teardown completes within a tight deadline.
|
||||
//
|
||||
// Before the fix, closeAll returned while the pump was still inside its
|
||||
// blocking send, and the subsequent close(writes) would panic the pump
|
||||
// the moment select picked the closed channel.
|
||||
func TestTerminalBridge_TeardownDoesNotPanicOnInFlightSend(t *testing.T) {
|
||||
writes := make(chan []byte, 1)
|
||||
sendCtx := func(ctx context.Context, frame []byte) bool {
|
||||
select {
|
||||
case writes <- frame:
|
||||
return true
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
pty := newFakeBridgePTY(80, 24)
|
||||
spawner := &stubSpawner{pty: pty}
|
||||
mgr := terminal.NewManager(terminal.ManagerConfig{
|
||||
Spawner: spawner,
|
||||
Logger: slog.Default(),
|
||||
}, nil)
|
||||
defer mgr.Close()
|
||||
|
||||
sender := &captureSender{}
|
||||
bridge := newTerminalBridge(mgr, slog.Default(), sender.send, sendCtx)
|
||||
|
||||
_ = openBridgeSession(t, bridge, sender, pty)
|
||||
|
||||
// Push two chunks so the pump has one in writes (queued) and one
|
||||
// blocked on the next select. That blocked send is the exact race
|
||||
// window the old defer order tripped over.
|
||||
pty.out <- []byte("first\n")
|
||||
pty.out <- []byte("second\n")
|
||||
|
||||
// Give the pump a moment to actually park on the blocking send.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
t.Errorf("teardown panicked: %v", r)
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
// Mirror wakeup.go's folded cleanup defer:
|
||||
// clearWSWrites equivalent → bridge.closeAll → close(writes)
|
||||
bridge.closeAll("ws_disconnect")
|
||||
close(writes)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("teardown did not finish — closeAll likely did not wait for pump exit")
|
||||
}
|
||||
|
||||
// Drain any residual frames so the writes channel close is observable
|
||||
// and the test doesn't leak goroutines.
|
||||
for range writes {
|
||||
}
|
||||
}
|
||||
324
server/internal/daemon/terminal_bridge_test.go
Normal file
324
server/internal/daemon/terminal_bridge_test.go
Normal file
@@ -0,0 +1,324 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/daemon/terminal"
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
// captureSender is the test stand-in for the daemon's outbound WS writer.
|
||||
// Frames are kept in order so the test can wait for a specific message type
|
||||
// to appear.
|
||||
type captureSender struct {
|
||||
mu sync.Mutex
|
||||
frames [][]byte
|
||||
}
|
||||
|
||||
func (c *captureSender) send(frame []byte) bool {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
cp := make([]byte, len(frame))
|
||||
copy(cp, frame)
|
||||
c.frames = append(c.frames, cp)
|
||||
return true
|
||||
}
|
||||
|
||||
// sendCtx is the backpressure-aware delivery path the terminal bridge uses
|
||||
// for terminal.data. Tests that don't care about backpressure can use this
|
||||
// default — it accepts every frame the same way send() does.
|
||||
func (c *captureSender) sendCtx(ctx context.Context, frame []byte) bool {
|
||||
if ctx.Err() != nil {
|
||||
return false
|
||||
}
|
||||
return c.send(frame)
|
||||
}
|
||||
|
||||
func (c *captureSender) waitFor(t *testing.T, msgType string, timeout time.Duration) protocol.Message {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(timeout)
|
||||
for time.Now().Before(deadline) {
|
||||
c.mu.Lock()
|
||||
for _, f := range c.frames {
|
||||
var m protocol.Message
|
||||
if err := json.Unmarshal(f, &m); err == nil && m.Type == msgType {
|
||||
c.mu.Unlock()
|
||||
return m
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("timeout waiting for frame of type %q (saw %d frames)", msgType, len(c.frames))
|
||||
return protocol.Message{}
|
||||
}
|
||||
|
||||
// fakeBridgePTY is a minimal PTY for the bridge integration: it lets the
|
||||
// test push child output and read writes back. Wait blocks until Close.
|
||||
type fakeBridgePTY struct {
|
||||
out chan []byte
|
||||
mu sync.Mutex
|
||||
written []byte
|
||||
cols uint16
|
||||
rows uint16
|
||||
closeCh chan struct{}
|
||||
exit int
|
||||
waitOnce sync.Once
|
||||
waitDone chan struct{}
|
||||
}
|
||||
|
||||
func newFakeBridgePTY(cols, rows uint16) *fakeBridgePTY {
|
||||
return &fakeBridgePTY{
|
||||
out: make(chan []byte, 4),
|
||||
cols: cols,
|
||||
rows: rows,
|
||||
closeCh: make(chan struct{}),
|
||||
waitDone: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *fakeBridgePTY) Read(b []byte) (int, error) {
|
||||
select {
|
||||
case chunk, ok := <-p.out:
|
||||
if !ok {
|
||||
return 0, errEOF
|
||||
}
|
||||
n := copy(b, chunk)
|
||||
return n, nil
|
||||
case <-p.closeCh:
|
||||
return 0, errEOF
|
||||
}
|
||||
}
|
||||
|
||||
func (p *fakeBridgePTY) Write(b []byte) (int, error) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.written = append(p.written, b...)
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
func (p *fakeBridgePTY) Resize(cols, rows uint16) error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.cols = cols
|
||||
p.rows = rows
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *fakeBridgePTY) Close() error {
|
||||
p.waitOnce.Do(func() {
|
||||
close(p.closeCh)
|
||||
close(p.waitDone)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *fakeBridgePTY) Wait() (int, error) {
|
||||
<-p.waitDone
|
||||
return p.exit, nil
|
||||
}
|
||||
|
||||
type stringErr string
|
||||
|
||||
func (e stringErr) Error() string { return string(e) }
|
||||
|
||||
const errEOF = stringErr("EOF")
|
||||
|
||||
func TestTerminalBridge_OpenSendsOpenedFrameWithServerSuppliedWorkdir(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
|
||||
pty := newFakeBridgePTY(80, 24)
|
||||
spawner := &stubSpawner{pty: pty}
|
||||
mgr := terminal.NewManager(terminal.ManagerConfig{
|
||||
Spawner: spawner,
|
||||
Logger: slog.Default(),
|
||||
}, nil)
|
||||
defer mgr.Close()
|
||||
|
||||
sender := &captureSender{}
|
||||
bridge := newTerminalBridge(mgr, slog.Default(), sender.send, sender.sendCtx)
|
||||
|
||||
openPayload, err := json.Marshal(protocol.TerminalOpenPayload{
|
||||
RequestID: "req-1",
|
||||
TaskID: "task-via-ws",
|
||||
WorkspaceID: "ws-A",
|
||||
UserID: "user-1",
|
||||
IssueID: "issue-1",
|
||||
WorkDir: tmp,
|
||||
PriorSessionID: "claude-xyz",
|
||||
Cols: 120,
|
||||
Rows: 30,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("marshal open: %v", err)
|
||||
}
|
||||
|
||||
bridge.handleFrame(protocol.MessageTypeTerminalOpen, openPayload)
|
||||
|
||||
openedMsg := sender.waitFor(t, protocol.MessageTypeTerminalOpened, time.Second)
|
||||
var opened protocol.TerminalOpenedPayload
|
||||
if err := json.Unmarshal(openedMsg.Payload, &opened); err != nil {
|
||||
t.Fatalf("opened payload: %v", err)
|
||||
}
|
||||
if opened.RequestID != "req-1" {
|
||||
t.Errorf("opened.request_id = %q, want req-1", opened.RequestID)
|
||||
}
|
||||
if opened.SessionID == "" {
|
||||
t.Errorf("opened.session_id is empty")
|
||||
}
|
||||
if opened.WorkDir != tmp {
|
||||
t.Errorf("opened.work_dir = %q, want %q", opened.WorkDir, tmp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTerminalBridge_OpenWithoutWorkdirEmitsTaskNotFound(t *testing.T) {
|
||||
// The server is supposed to resolve task.work_dir from its DB before
|
||||
// forwarding terminal.open. If it forgets / fails, the daemon must
|
||||
// not silently fall through to spawning bash in CWD — it has to
|
||||
// surface a structured terminal.error and never call the spawner.
|
||||
pty := newFakeBridgePTY(80, 24)
|
||||
spawner := &stubSpawner{pty: pty}
|
||||
mgr := terminal.NewManager(terminal.ManagerConfig{
|
||||
Spawner: spawner,
|
||||
Logger: slog.Default(),
|
||||
}, nil)
|
||||
defer mgr.Close()
|
||||
|
||||
sender := &captureSender{}
|
||||
bridge := newTerminalBridge(mgr, slog.Default(), sender.send, sender.sendCtx)
|
||||
|
||||
openPayload, _ := json.Marshal(protocol.TerminalOpenPayload{
|
||||
RequestID: "req-2",
|
||||
TaskID: "task-evil",
|
||||
WorkspaceID: "ws-B",
|
||||
WorkDir: "", // server failed to resolve
|
||||
Cols: 80,
|
||||
Rows: 24,
|
||||
})
|
||||
|
||||
bridge.handleFrame(protocol.MessageTypeTerminalOpen, openPayload)
|
||||
|
||||
errMsg := sender.waitFor(t, protocol.MessageTypeTerminalError, time.Second)
|
||||
var errPayload protocol.TerminalErrorPayload
|
||||
if err := json.Unmarshal(errMsg.Payload, &errPayload); err != nil {
|
||||
t.Fatalf("error payload: %v", err)
|
||||
}
|
||||
if errPayload.Code != protocol.TerminalErrorCodeTaskNotFound {
|
||||
t.Errorf("error code = %q, want %q", errPayload.Code, protocol.TerminalErrorCodeTaskNotFound)
|
||||
}
|
||||
if errPayload.RequestID != "req-2" {
|
||||
t.Errorf("error request_id = %q, want req-2", errPayload.RequestID)
|
||||
}
|
||||
if spawner.callCount() != 0 {
|
||||
t.Errorf("spawner was invoked %d times despite resolve failure", spawner.callCount())
|
||||
}
|
||||
}
|
||||
|
||||
func TestTerminalBridge_DataAndExitRoundTrip(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
|
||||
pty := newFakeBridgePTY(80, 24)
|
||||
spawner := &stubSpawner{pty: pty}
|
||||
mgr := terminal.NewManager(terminal.ManagerConfig{
|
||||
Spawner: spawner,
|
||||
Logger: slog.Default(),
|
||||
}, nil)
|
||||
defer mgr.Close()
|
||||
|
||||
sender := &captureSender{}
|
||||
bridge := newTerminalBridge(mgr, slog.Default(), sender.send, sender.sendCtx)
|
||||
|
||||
openPayload, _ := json.Marshal(protocol.TerminalOpenPayload{
|
||||
RequestID: "req-3",
|
||||
TaskID: "task-3",
|
||||
WorkspaceID: "ws-A",
|
||||
WorkDir: tmp,
|
||||
Cols: 80,
|
||||
Rows: 24,
|
||||
})
|
||||
bridge.handleFrame(protocol.MessageTypeTerminalOpen, openPayload)
|
||||
openedMsg := sender.waitFor(t, protocol.MessageTypeTerminalOpened, time.Second)
|
||||
var opened protocol.TerminalOpenedPayload
|
||||
_ = json.Unmarshal(openedMsg.Payload, &opened)
|
||||
sessionID := opened.SessionID
|
||||
|
||||
// Push child output → bridge should emit terminal.data on the WS.
|
||||
pty.out <- []byte("hello\n")
|
||||
dataMsg := sender.waitFor(t, protocol.MessageTypeTerminalData, time.Second)
|
||||
var dp protocol.TerminalDataPayload
|
||||
if err := json.Unmarshal(dataMsg.Payload, &dp); err != nil {
|
||||
t.Fatalf("data payload: %v", err)
|
||||
}
|
||||
if dp.SessionID != sessionID {
|
||||
t.Errorf("data session_id = %q, want %q", dp.SessionID, sessionID)
|
||||
}
|
||||
decoded, err := base64.StdEncoding.DecodeString(dp.DataB64)
|
||||
if err != nil {
|
||||
t.Fatalf("decode data: %v", err)
|
||||
}
|
||||
if string(decoded) != "hello\n" {
|
||||
t.Errorf("data bytes = %q, want %q", decoded, "hello\n")
|
||||
}
|
||||
|
||||
// Send a write the other direction. The bridge should base64-decode
|
||||
// and call PTY.Write.
|
||||
inboundData, _ := json.Marshal(protocol.TerminalDataPayload{
|
||||
SessionID: sessionID,
|
||||
DataB64: base64.StdEncoding.EncodeToString([]byte("ls\n")),
|
||||
})
|
||||
bridge.handleFrame(protocol.MessageTypeTerminalData, inboundData)
|
||||
|
||||
// Allow Write to settle.
|
||||
deadline := time.Now().Add(time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
pty.mu.Lock()
|
||||
got := string(pty.written)
|
||||
pty.mu.Unlock()
|
||||
if got == "ls\n" {
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
pty.mu.Lock()
|
||||
if string(pty.written) != "ls\n" {
|
||||
t.Errorf("PTY received %q, want %q", pty.written, "ls\n")
|
||||
}
|
||||
pty.mu.Unlock()
|
||||
|
||||
// Close from the client side. The bridge should propagate via
|
||||
// session.Close → waitLoop → terminal.exit.
|
||||
closePayload, _ := json.Marshal(protocol.TerminalClosePayload{
|
||||
SessionID: sessionID,
|
||||
Reason: "test",
|
||||
})
|
||||
bridge.handleFrame(protocol.MessageTypeTerminalClose, closePayload)
|
||||
|
||||
sender.waitFor(t, protocol.MessageTypeTerminalExit, time.Second)
|
||||
}
|
||||
|
||||
// stubSpawner returns a single pre-built PTY on the first Start. callCount
|
||||
// lets tests assert that no spawn happened on a reject path.
|
||||
type stubSpawner struct {
|
||||
pty *fakeBridgePTY
|
||||
mu sync.Mutex
|
||||
calls int
|
||||
}
|
||||
|
||||
func (s *stubSpawner) Start(_ terminal.SpawnRequest) (terminal.PTY, error) {
|
||||
s.mu.Lock()
|
||||
s.calls++
|
||||
s.mu.Unlock()
|
||||
return s.pty, nil
|
||||
}
|
||||
|
||||
func (s *stubSpawner) callCount() int {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.calls
|
||||
}
|
||||
@@ -116,6 +116,12 @@ func (d *Daemon) runTaskWakeupConnection(ctx context.Context, runtimeIDs []strin
|
||||
writerDone := make(chan struct{})
|
||||
go d.runWSWriter(conn, writes, writerDone)
|
||||
|
||||
// Expose this connection's writer to the terminal bridge so it can push
|
||||
// terminal.* frames back to the server. The single cleanup defer below
|
||||
// clears the pointer (and tears any live PTYs down) BEFORE the writes
|
||||
// channel is closed, so terminal pumps cannot panic on a closed send.
|
||||
d.installWSWrites(writes)
|
||||
|
||||
heartbeatCtx, cancelHeartbeat := context.WithCancel(ctx)
|
||||
hbDone := make(chan struct{})
|
||||
go func() {
|
||||
@@ -128,19 +134,26 @@ func (d *Daemon) runTaskWakeupConnection(ctx context.Context, runtimeIDs []strin
|
||||
errCh <- d.readTaskWakeupMessages(conn, taskWakeups)
|
||||
}()
|
||||
|
||||
// Defer cleanup must shut goroutines down in this order:
|
||||
// 1. cancel the heartbeat sender's ctx
|
||||
// 2. wait for the sender to actually return — only then is it safe
|
||||
// to close the writes channel without a "send on closed channel"
|
||||
// panic from sendWSHeartbeats
|
||||
// 3. close writes; the writer drains and exits
|
||||
// 4. wait for the writer to finish so it doesn't outlive the conn
|
||||
// Teardown ordering is load-bearing — every step here has a producer
|
||||
// that would panic on a closed `writes` channel if reordered:
|
||||
//
|
||||
// LIFO defer order would close writes before the sender stops, so the
|
||||
// teardown is folded into a single deferred function instead.
|
||||
// 1. cancel the heartbeat sender's ctx and wait for it to return.
|
||||
// Heartbeats are the only droppable producer; once the sender
|
||||
// has exited it can no longer reach the writes channel.
|
||||
// 2. clearWSWrites: nil the pointer (new sendWSFrame calls bounce)
|
||||
// AND tear down the terminal bridge. closeAll blocks until every
|
||||
// pump goroutine has exited — that's the barrier the terminal
|
||||
// pumps need before we close the channel they share.
|
||||
// 3. close(writes): only safe now that no producer remains.
|
||||
// 4. wait for the writer goroutine to drain & exit so it doesn't
|
||||
// outlive the conn.
|
||||
//
|
||||
// Two separate defers would run LIFO and reverse 2↔3, which is the
|
||||
// exact race Phase 2 review caught.
|
||||
defer func() {
|
||||
cancelHeartbeat()
|
||||
<-hbDone
|
||||
d.clearWSWrites()
|
||||
close(writes)
|
||||
<-writerDone
|
||||
}()
|
||||
@@ -287,6 +300,13 @@ func (d *Daemon) readTaskWakeupMessages(conn *websocket.Conn, taskWakeups chan<-
|
||||
continue
|
||||
}
|
||||
d.handleWSHeartbeatAck(context.Background(), &ack)
|
||||
case protocol.MessageTypeTerminalOpen,
|
||||
protocol.MessageTypeTerminalData,
|
||||
protocol.MessageTypeTerminalResize,
|
||||
protocol.MessageTypeTerminalClose:
|
||||
if bridge := d.currentTerminalBridge(); bridge != nil {
|
||||
bridge.handleFrame(msg.Type, msg.Payload)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,6 +82,9 @@ type Hub struct {
|
||||
|
||||
hbMu sync.RWMutex
|
||||
onHeartbeat HeartbeatHandler
|
||||
|
||||
termMu sync.RWMutex
|
||||
termRouter *TerminalRouter
|
||||
}
|
||||
|
||||
func NewHub() *Hub {
|
||||
@@ -146,7 +149,12 @@ func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request, identity C
|
||||
c := &client{
|
||||
hub: h,
|
||||
conn: conn,
|
||||
send: make(chan []byte, 16),
|
||||
// Buffer sized for PTY traffic: terminal.data frames carry up to a
|
||||
// few KB each and the read pump from xterm.js can fall behind during
|
||||
// large bursts (build output, `cat` of a big file). 256 frames gives
|
||||
// ~1MB of slack before we start dropping; wakeup hints and heartbeat
|
||||
// acks coexist on the same queue but are tiny in comparison.
|
||||
send: make(chan []byte, 256),
|
||||
identity: identity,
|
||||
runtimes: runtimes,
|
||||
}
|
||||
@@ -320,7 +328,12 @@ func (c *client) readPump() {
|
||||
c.conn.Close()
|
||||
}()
|
||||
|
||||
c.conn.SetReadLimit(4096)
|
||||
// Terminal frames embed base64-encoded PTY chunks; readLoop on the
|
||||
// daemon side caps each chunk at 4KB raw, which is ~5.6KB base64 plus
|
||||
// JSON envelope. 64KB leaves headroom for future growth without
|
||||
// disconnecting daemons that briefly exceed the legacy 4KB heartbeat-
|
||||
// only ceiling.
|
||||
c.conn.SetReadLimit(64 * 1024)
|
||||
c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
c.conn.SetPongHandler(func(string) error {
|
||||
c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
@@ -348,6 +361,14 @@ func (c *client) handleFrame(raw []byte) {
|
||||
switch msg.Type {
|
||||
case protocol.EventDaemonHeartbeat:
|
||||
c.handleHeartbeatFrame(msg.Payload)
|
||||
case protocol.MessageTypeTerminalOpened,
|
||||
protocol.MessageTypeTerminalData,
|
||||
protocol.MessageTypeTerminalClose,
|
||||
protocol.MessageTypeTerminalExit,
|
||||
protocol.MessageTypeTerminalError:
|
||||
if router := c.hub.terminalRouter(); router != nil {
|
||||
router.Route(raw, msg.Type, msg.Payload)
|
||||
}
|
||||
default:
|
||||
// Unknown app messages are intentionally ignored for forward
|
||||
// compatibility with future daemon → server message types.
|
||||
|
||||
196
server/internal/daemonws/terminal.go
Normal file
196
server/internal/daemonws/terminal.go
Normal file
@@ -0,0 +1,196 @@
|
||||
package daemonws
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"sync"
|
||||
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
// TerminalSink receives terminal.* frames addressed to a single browser
|
||||
// WebSocket connection. The handler implementation owns the frame queue;
|
||||
// implementations must be non-blocking — the hub drops the frame if Deliver
|
||||
// returns false rather than back up the daemon read pump.
|
||||
type TerminalSink interface {
|
||||
Deliver(frame []byte) bool
|
||||
}
|
||||
|
||||
// TerminalRouter is the daemonws-side multiplex for terminal.* frames coming
|
||||
// back from a daemon. Browser proxy connections register under their pending
|
||||
// request_id and, after terminal.opened arrives, re-register under the
|
||||
// session_id the daemon picked.
|
||||
type TerminalRouter struct {
|
||||
mu sync.RWMutex
|
||||
sinks map[string]TerminalSink
|
||||
}
|
||||
|
||||
// NewTerminalRouter constructs an empty router. The Hub owns the only
|
||||
// instance in production; tests can build their own.
|
||||
func NewTerminalRouter() *TerminalRouter {
|
||||
return &TerminalRouter{sinks: make(map[string]TerminalSink)}
|
||||
}
|
||||
|
||||
// Register installs sink under the given key. The key is either a
|
||||
// request_id (before the daemon assigns a session_id) or a session_id
|
||||
// (after the open ack). Re-registering an existing key replaces the sink.
|
||||
func (r *TerminalRouter) Register(key string, sink TerminalSink) {
|
||||
if r == nil || key == "" || sink == nil {
|
||||
return
|
||||
}
|
||||
r.mu.Lock()
|
||||
r.sinks[key] = sink
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
// Unregister removes the sink for key, if any.
|
||||
func (r *TerminalRouter) Unregister(key string) {
|
||||
if r == nil || key == "" {
|
||||
return
|
||||
}
|
||||
r.mu.Lock()
|
||||
delete(r.sinks, key)
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
// Route extracts the routing key (request_id or session_id) from a
|
||||
// terminal.* frame and forwards the raw frame to the registered sink.
|
||||
// Unknown keys are dropped silently — the daemon-side session ultimately
|
||||
// observes the dead client via send-side errors / idle timeout.
|
||||
func (r *TerminalRouter) Route(frame []byte, msgType string, payload json.RawMessage) {
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
key := terminalRouteKey(msgType, payload)
|
||||
if key == "" {
|
||||
return
|
||||
}
|
||||
r.mu.RLock()
|
||||
sink := r.sinks[key]
|
||||
r.mu.RUnlock()
|
||||
if sink == nil {
|
||||
return
|
||||
}
|
||||
if !sink.Deliver(frame) {
|
||||
slog.Debug("daemon ws terminal frame dropped: slow sink", "type", msgType, "key", key)
|
||||
}
|
||||
}
|
||||
|
||||
func terminalRouteKey(msgType string, payload json.RawMessage) string {
|
||||
switch msgType {
|
||||
case protocol.MessageTypeTerminalOpened, protocol.MessageTypeTerminalError:
|
||||
var p struct {
|
||||
RequestID string `json:"request_id,omitempty"`
|
||||
SessionID string `json:"session_id,omitempty"`
|
||||
}
|
||||
if err := json.Unmarshal(payload, &p); err != nil {
|
||||
return ""
|
||||
}
|
||||
// terminal.error may carry either request_id (pre-open failure) or
|
||||
// session_id (post-open failure). Prefer request_id so the proxy
|
||||
// receives the failure on the same key it registered.
|
||||
if p.RequestID != "" {
|
||||
return p.RequestID
|
||||
}
|
||||
return p.SessionID
|
||||
case protocol.MessageTypeTerminalData,
|
||||
protocol.MessageTypeTerminalClose,
|
||||
protocol.MessageTypeTerminalExit:
|
||||
var p struct {
|
||||
SessionID string `json:"session_id"`
|
||||
}
|
||||
if err := json.Unmarshal(payload, &p); err != nil {
|
||||
return ""
|
||||
}
|
||||
return p.SessionID
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// SetTerminalRouter installs an explicit router. Tests use this to inject a
|
||||
// fake; production code can rely on the auto-created router exposed by
|
||||
// TerminalRouter() below.
|
||||
func (h *Hub) SetTerminalRouter(r *TerminalRouter) {
|
||||
if h == nil {
|
||||
return
|
||||
}
|
||||
h.termMu.Lock()
|
||||
h.termRouter = r
|
||||
h.termMu.Unlock()
|
||||
}
|
||||
|
||||
// TerminalRouter returns the hub's router, creating one on first access.
|
||||
// The browser proxy WS handler grabs this to register per-session sinks;
|
||||
// production wiring does not need an explicit SetTerminalRouter call.
|
||||
func (h *Hub) TerminalRouter() *TerminalRouter {
|
||||
if h == nil {
|
||||
return nil
|
||||
}
|
||||
h.termMu.RLock()
|
||||
r := h.termRouter
|
||||
h.termMu.RUnlock()
|
||||
if r != nil {
|
||||
return r
|
||||
}
|
||||
h.termMu.Lock()
|
||||
defer h.termMu.Unlock()
|
||||
if h.termRouter == nil {
|
||||
h.termRouter = NewTerminalRouter()
|
||||
}
|
||||
return h.termRouter
|
||||
}
|
||||
|
||||
// terminalRouter is the internal read-only accessor used by handleFrame.
|
||||
// It returns nil if no router has been configured, which short-circuits
|
||||
// dispatch — the auto-create only happens through the public accessor.
|
||||
func (h *Hub) terminalRouter() *TerminalRouter {
|
||||
h.termMu.RLock()
|
||||
defer h.termMu.RUnlock()
|
||||
return h.termRouter
|
||||
}
|
||||
|
||||
// ErrNoDaemonForRuntime is returned by SendToRuntime when no daemon is
|
||||
// currently connected for the given runtime_id. The browser proxy uses this
|
||||
// to fail the open request with a clear error.
|
||||
var ErrNoDaemonForRuntime = errors.New("daemonws: no daemon connected for runtime")
|
||||
|
||||
// SendToRuntime delivers a raw frame to one daemon connection serving
|
||||
// runtimeID. If multiple daemons are registered (rare — usually one per
|
||||
// runtime), the first one wins. Returns ErrNoDaemonForRuntime when no
|
||||
// connection exists, or a "buffer full" error when the daemon's outbound
|
||||
// queue is saturated — callers should surface that as a transient failure
|
||||
// to the browser rather than retrying tightly.
|
||||
func (h *Hub) SendToRuntime(runtimeID string, frame []byte) error {
|
||||
if h == nil || runtimeID == "" {
|
||||
return ErrNoDaemonForRuntime
|
||||
}
|
||||
h.mu.RLock()
|
||||
var target *client
|
||||
for c := range h.byRuntime[runtimeID] {
|
||||
target = c
|
||||
break
|
||||
}
|
||||
h.mu.RUnlock()
|
||||
if target == nil {
|
||||
return ErrNoDaemonForRuntime
|
||||
}
|
||||
if !target.trySend(frame) {
|
||||
return errors.New("daemonws: daemon send buffer full")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// trySend pushes frame onto the client's outbound queue without blocking.
|
||||
// Returns false if the buffer is saturated. We deliberately do not evict
|
||||
// the connection here — terminal back-pressure should slow the producing
|
||||
// browser/server side, not tear down the entire daemonws connection (which
|
||||
// would also break heartbeat + wakeup delivery for unrelated runtimes).
|
||||
func (c *client) trySend(frame []byte) bool {
|
||||
select {
|
||||
case c.send <- frame:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
95
server/internal/daemonws/terminal_test.go
Normal file
95
server/internal/daemonws/terminal_test.go
Normal file
@@ -0,0 +1,95 @@
|
||||
package daemonws
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
type collectingSink struct {
|
||||
frames [][]byte
|
||||
}
|
||||
|
||||
func (s *collectingSink) Deliver(frame []byte) bool {
|
||||
cp := make([]byte, len(frame))
|
||||
copy(cp, frame)
|
||||
s.frames = append(s.frames, cp)
|
||||
return true
|
||||
}
|
||||
|
||||
func TestTerminalRouter_RoutesByRequestThenSession(t *testing.T) {
|
||||
router := NewTerminalRouter()
|
||||
sink := &collectingSink{}
|
||||
router.Register("req-1", sink)
|
||||
|
||||
// terminal.error before the daemon picked a session_id: routed on
|
||||
// request_id. This is the failure path browsers see when the daemon
|
||||
// can't spawn a PTY (e.g. ErrUnsupportedOS on windows).
|
||||
errFrame := mustEncode(t, protocol.MessageTypeTerminalError, protocol.TerminalErrorPayload{
|
||||
RequestID: "req-1",
|
||||
Code: protocol.TerminalErrorCodeUnsupportedOS,
|
||||
Message: "no PTY on windows",
|
||||
})
|
||||
router.Route(errFrame, protocol.MessageTypeTerminalError, mustPayload(t, errFrame))
|
||||
|
||||
if got, want := len(sink.frames), 1; got != want {
|
||||
t.Fatalf("delivered = %d, want %d", got, want)
|
||||
}
|
||||
|
||||
// Re-key to session_id after a hypothetical terminal.opened.
|
||||
router.Register("sess-1", sink)
|
||||
router.Unregister("req-1")
|
||||
|
||||
dataFrame := mustEncode(t, protocol.MessageTypeTerminalData, protocol.TerminalDataPayload{
|
||||
SessionID: "sess-1",
|
||||
DataB64: "Zm9vYmFy",
|
||||
})
|
||||
router.Route(dataFrame, protocol.MessageTypeTerminalData, mustPayload(t, dataFrame))
|
||||
if got, want := len(sink.frames), 2; got != want {
|
||||
t.Fatalf("after data delivered = %d, want %d", got, want)
|
||||
}
|
||||
|
||||
// Frames for an unknown session must drop silently — never panic, and
|
||||
// never leak into the wrong sink.
|
||||
strayFrame := mustEncode(t, protocol.MessageTypeTerminalExit, protocol.TerminalExitPayload{
|
||||
SessionID: "sess-2-unknown",
|
||||
ExitCode: 0,
|
||||
})
|
||||
router.Route(strayFrame, protocol.MessageTypeTerminalExit, mustPayload(t, strayFrame))
|
||||
if got, want := len(sink.frames), 2; got != want {
|
||||
t.Fatalf("stray frame delivered to wrong sink: %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTerminalRouter_UnknownSessionDropsSilently(t *testing.T) {
|
||||
router := NewTerminalRouter()
|
||||
frame := mustEncode(t, protocol.MessageTypeTerminalData, protocol.TerminalDataPayload{
|
||||
SessionID: "ghost",
|
||||
DataB64: "Zm9v",
|
||||
})
|
||||
// Should not panic / not deliver anywhere.
|
||||
router.Route(frame, protocol.MessageTypeTerminalData, mustPayload(t, frame))
|
||||
}
|
||||
|
||||
func mustEncode(t *testing.T, msgType string, payload any) []byte {
|
||||
t.Helper()
|
||||
raw, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal payload: %v", err)
|
||||
}
|
||||
frame, err := json.Marshal(protocol.Message{Type: msgType, Payload: raw})
|
||||
if err != nil {
|
||||
t.Fatalf("marshal envelope: %v", err)
|
||||
}
|
||||
return frame
|
||||
}
|
||||
|
||||
func mustPayload(t *testing.T, envelope []byte) json.RawMessage {
|
||||
t.Helper()
|
||||
var m protocol.Message
|
||||
if err := json.Unmarshal(envelope, &m); err != nil {
|
||||
t.Fatalf("unmarshal envelope: %v", err)
|
||||
}
|
||||
return m.Payload
|
||||
}
|
||||
@@ -1927,6 +1927,93 @@ func (h *Handler) ListTasksByIssue(w http.ResponseWriter, r *http.Request) {
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
// TerminalSessionResponse is the JSON shape behind GET
|
||||
// /api/issues/{id}/terminal-sessions. Mirrors terminal_sessions row fields
|
||||
// with the timestamp formatting the rest of the API uses (RFC3339 strings
|
||||
// for clients that parse loosely; pointers for nullable columns).
|
||||
type TerminalSessionResponse struct {
|
||||
ID string `json:"id"`
|
||||
WorkspaceID string `json:"workspace_id"`
|
||||
IssueID string `json:"issue_id"`
|
||||
TaskID string `json:"task_id"`
|
||||
RuntimeID string `json:"runtime_id,omitempty"`
|
||||
UserID string `json:"user_id"`
|
||||
WorkDir string `json:"work_dir"`
|
||||
Shell string `json:"shell,omitempty"`
|
||||
StartedAt string `json:"started_at"`
|
||||
EndedAt *string `json:"ended_at"`
|
||||
ExitCode *int32 `json:"exit_code"`
|
||||
CloseReason string `json:"close_reason,omitempty"`
|
||||
// Status is derived: "active" while the row's ended_at is NULL,
|
||||
// "closed" otherwise. We expose it explicitly so the CLI's `issue
|
||||
// runs` table — which shares its STATUS column with agent task rows —
|
||||
// can render terminal entries without bespoke logic.
|
||||
Status string `json:"status"`
|
||||
// Kind discriminates these rows from agent task rows in clients that
|
||||
// merge both feeds (notably `multica issue runs`).
|
||||
Kind string `json:"kind"`
|
||||
}
|
||||
|
||||
// ListTerminalSessionsByIssue returns the audit log of interactive PTYs
|
||||
// opened against the issue's tasks. The CLI's `issue runs` view fetches
|
||||
// this alongside /task-runs and merges them by timestamp so a `multica
|
||||
// issue runs` listing shows both agent runs and `type=terminal` rows.
|
||||
func (h *Handler) ListTerminalSessionsByIssue(w http.ResponseWriter, r *http.Request) {
|
||||
issueID := chi.URLParam(r, "id")
|
||||
issue, ok := h.loadIssueForUser(w, r, issueID)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
sessions, err := h.Queries.ListTerminalSessionsByIssue(r.Context(), issue.ID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to list terminal sessions")
|
||||
return
|
||||
}
|
||||
|
||||
resp := make([]TerminalSessionResponse, len(sessions))
|
||||
for i, s := range sessions {
|
||||
resp[i] = terminalSessionToResponse(s)
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func terminalSessionToResponse(s db.TerminalSession) TerminalSessionResponse {
|
||||
status := "active"
|
||||
var endedAt *string
|
||||
var exitCode *int32
|
||||
if s.EndedAt.Valid {
|
||||
status = "closed"
|
||||
stamp := s.EndedAt.Time.UTC().Format(time.RFC3339)
|
||||
endedAt = &stamp
|
||||
}
|
||||
if s.ExitCode.Valid {
|
||||
v := s.ExitCode.Int32
|
||||
exitCode = &v
|
||||
}
|
||||
runtimeID := ""
|
||||
if s.RuntimeID.Valid {
|
||||
runtimeID = uuidToString(s.RuntimeID)
|
||||
}
|
||||
return TerminalSessionResponse{
|
||||
ID: uuidToString(s.ID),
|
||||
WorkspaceID: uuidToString(s.WorkspaceID),
|
||||
IssueID: uuidToString(s.IssueID),
|
||||
TaskID: uuidToString(s.TaskID),
|
||||
RuntimeID: runtimeID,
|
||||
UserID: uuidToString(s.UserID),
|
||||
WorkDir: s.WorkDir,
|
||||
Shell: s.Shell,
|
||||
StartedAt: s.StartedAt.Time.UTC().Format(time.RFC3339),
|
||||
EndedAt: endedAt,
|
||||
ExitCode: exitCode,
|
||||
CloseReason: s.CloseReason,
|
||||
Status: status,
|
||||
Kind: "terminal",
|
||||
}
|
||||
}
|
||||
|
||||
// ListTaskMessagesByUser returns task messages for a task.
|
||||
// Used by the frontend under regular user auth (not daemon auth).
|
||||
// Verifies the task belongs to the caller's workspace.
|
||||
|
||||
84
server/internal/handler/terminal_session_response_test.go
Normal file
84
server/internal/handler/terminal_session_response_test.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/util"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
)
|
||||
|
||||
// TestTerminalSessionToResponse_Active validates the shape returned for a
|
||||
// session whose ended_at is still NULL — the CLI's `issue runs` merge
|
||||
// expects status="active" and a nil ended_at pointer (not the zero
|
||||
// timestamp string, which would render as a real completion in the
|
||||
// table).
|
||||
func TestTerminalSessionToResponse_Active(t *testing.T) {
|
||||
started := time.Date(2026, 5, 16, 10, 30, 0, 0, time.UTC)
|
||||
s := db.TerminalSession{
|
||||
ID: util.MustParseUUID("11111111-1111-1111-1111-111111111111"),
|
||||
WorkspaceID: util.MustParseUUID("22222222-2222-2222-2222-222222222222"),
|
||||
IssueID: util.MustParseUUID("33333333-3333-3333-3333-333333333333"),
|
||||
TaskID: util.MustParseUUID("44444444-4444-4444-4444-444444444444"),
|
||||
UserID: util.MustParseUUID("55555555-5555-5555-5555-555555555555"),
|
||||
WorkDir: "/tmp/ws/task/workdir",
|
||||
Shell: "/bin/bash",
|
||||
StartedAt: pgtype.Timestamptz{Time: started, Valid: true},
|
||||
}
|
||||
|
||||
resp := terminalSessionToResponse(s)
|
||||
|
||||
if resp.Status != "active" {
|
||||
t.Errorf("Status = %q, want active", resp.Status)
|
||||
}
|
||||
if resp.Kind != "terminal" {
|
||||
t.Errorf("Kind = %q, want terminal", resp.Kind)
|
||||
}
|
||||
if resp.EndedAt != nil {
|
||||
t.Errorf("EndedAt = %v, want nil", *resp.EndedAt)
|
||||
}
|
||||
if resp.ExitCode != nil {
|
||||
t.Errorf("ExitCode = %v, want nil", *resp.ExitCode)
|
||||
}
|
||||
if resp.StartedAt != started.Format(time.RFC3339) {
|
||||
t.Errorf("StartedAt = %q, want %q", resp.StartedAt, started.Format(time.RFC3339))
|
||||
}
|
||||
}
|
||||
|
||||
// TestTerminalSessionToResponse_Closed validates the closed-session shape:
|
||||
// status flips to "closed", ended_at is a populated pointer, exit_code
|
||||
// surfaces the signed int. close_reason rides through verbatim so the CLI
|
||||
// can display it in the ERROR column for terminal rows.
|
||||
func TestTerminalSessionToResponse_Closed(t *testing.T) {
|
||||
started := time.Date(2026, 5, 16, 10, 30, 0, 0, time.UTC)
|
||||
ended := started.Add(15 * time.Minute)
|
||||
s := db.TerminalSession{
|
||||
ID: util.MustParseUUID("11111111-1111-1111-1111-111111111111"),
|
||||
WorkspaceID: util.MustParseUUID("22222222-2222-2222-2222-222222222222"),
|
||||
IssueID: util.MustParseUUID("33333333-3333-3333-3333-333333333333"),
|
||||
TaskID: util.MustParseUUID("44444444-4444-4444-4444-444444444444"),
|
||||
UserID: util.MustParseUUID("55555555-5555-5555-5555-555555555555"),
|
||||
WorkDir: "/tmp/ws/task/workdir",
|
||||
StartedAt: pgtype.Timestamptz{Time: started, Valid: true},
|
||||
EndedAt: pgtype.Timestamptz{Time: ended, Valid: true},
|
||||
ExitCode: pgtype.Int4{Int32: 130, Valid: true},
|
||||
CloseReason: "idle_timeout",
|
||||
}
|
||||
|
||||
resp := terminalSessionToResponse(s)
|
||||
|
||||
if resp.Status != "closed" {
|
||||
t.Errorf("Status = %q, want closed", resp.Status)
|
||||
}
|
||||
if resp.EndedAt == nil || *resp.EndedAt != ended.Format(time.RFC3339) {
|
||||
t.Errorf("EndedAt = %v, want %q", resp.EndedAt, ended.Format(time.RFC3339))
|
||||
}
|
||||
if resp.ExitCode == nil || *resp.ExitCode != 130 {
|
||||
t.Errorf("ExitCode = %v, want 130", resp.ExitCode)
|
||||
}
|
||||
if resp.CloseReason != "idle_timeout" {
|
||||
t.Errorf("CloseReason = %q, want idle_timeout", resp.CloseReason)
|
||||
}
|
||||
}
|
||||
758
server/internal/handler/terminal_ws.go
Normal file
758
server/internal/handler/terminal_ws.go
Normal file
@@ -0,0 +1,758 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/google/uuid"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/auth"
|
||||
"github.com/multica-ai/multica/server/internal/daemonws"
|
||||
"github.com/multica-ai/multica/server/internal/realtime"
|
||||
"github.com/multica-ai/multica/server/internal/util"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
|
||||
"github.com/golang-jwt/jwt/v5"
|
||||
)
|
||||
|
||||
// terminalWriteWait caps how long a single WriteMessage may block before
|
||||
// we tear the browser connection down as slow-client. Matches the daemonws
|
||||
// hub's writeWait so back-pressure semantics are consistent end-to-end.
|
||||
const terminalWriteWait = 10 * time.Second
|
||||
|
||||
// terminalOpenTimeout caps how long the proxy waits for the daemon to
|
||||
// respond to a terminal.open request with terminal.opened or terminal.error.
|
||||
// 5s is generous: PTY spawn is local and synchronous on the daemon side.
|
||||
const terminalOpenTimeout = 5 * time.Second
|
||||
|
||||
// terminalUpgrader reuses the realtime hub's origin allowlist. The terminal
|
||||
// endpoint executes a shell on the daemon, so it must be at least as strict
|
||||
// about cross-origin connections as the read-only realtime WS — using the
|
||||
// shared CheckOrigin keeps the policy in one place and prevents an
|
||||
// accidentally permissive `CheckOrigin: true` from sneaking past review.
|
||||
var terminalUpgrader = websocket.Upgrader{
|
||||
CheckOrigin: realtime.CheckOrigin,
|
||||
}
|
||||
|
||||
// HandleIssueTerminalWS proxies a browser WebSocket onto a PTY running on
|
||||
// the daemon hosting the issue's most-recent agent task. The flow per
|
||||
// connection:
|
||||
//
|
||||
// 1. Authenticate the user (cookie JWT preferred; first-message auth as
|
||||
// fallback for clients that cannot set cookies — e.g. some Desktop
|
||||
// dev modes).
|
||||
// 2. Resolve issue → workspace → latest task with a non-empty work_dir +
|
||||
// runtime_id. Fail closed if no such task exists; users see a clear
|
||||
// "no task to attach to" error instead of a silent hang.
|
||||
// 3. Register a sink on the daemonws TerminalRouter under a fresh
|
||||
// request_id, then send terminal.open to the daemon.
|
||||
// 4. On terminal.opened: re-register the sink under the session_id the
|
||||
// daemon picked, drop the request_id route, and start the bidirectional
|
||||
// pump until either side closes.
|
||||
// 5. On disconnect: send terminal.close so the daemon tears the PTY down
|
||||
// promptly rather than waiting for its idle sweep.
|
||||
func (h *Handler) HandleIssueTerminalWS(w http.ResponseWriter, r *http.Request) {
|
||||
if h.DaemonHub == nil {
|
||||
http.Error(w, `{"error":"terminal proxy not configured"}`, http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
workspaceID := r.URL.Query().Get("workspace_id")
|
||||
if workspaceID == "" {
|
||||
http.Error(w, `{"error":"workspace_id required"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
wsUUID, err := util.ParseUUID(workspaceID)
|
||||
if err != nil {
|
||||
http.Error(w, `{"error":"invalid workspace_id"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
issueParam := chi.URLParam(r, "issue_id")
|
||||
if issueParam == "" {
|
||||
http.Error(w, `{"error":"issue_id required"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
userID, errMsg := terminalAuthCookie(r, h)
|
||||
if errMsg != "" && userID == "" {
|
||||
// No cookie or invalid cookie. Defer auth to the first WS frame.
|
||||
}
|
||||
if userID != "" && !h.terminalIsMember(r.Context(), userID, workspaceID) {
|
||||
http.Error(w, `{"error":"not a member of this workspace"}`, http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
conn, err := terminalUpgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
slog.Error("terminal ws upgrade failed", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
if userID == "" {
|
||||
uid, errMsg := terminalFirstFrameAuth(conn, h)
|
||||
if errMsg != "" {
|
||||
_ = conn.WriteMessage(websocket.TextMessage, []byte(errMsg))
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
if !h.terminalIsMember(r.Context(), uid, workspaceID) {
|
||||
_ = conn.WriteMessage(websocket.TextMessage, []byte(`{"error":"not a member of this workspace"}`))
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
userID = uid
|
||||
_ = conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"auth_ack"}`))
|
||||
}
|
||||
|
||||
issue, ok := h.terminalResolveIssue(r.Context(), issueParam, wsUUID)
|
||||
if !ok {
|
||||
sendTerminalErrorAndClose(conn, "", "", protocol.TerminalErrorCodeTaskNotFound, "issue not found")
|
||||
return
|
||||
}
|
||||
|
||||
task, ok := h.terminalLatestAttachableTask(r.Context(), issue.ID)
|
||||
if !ok {
|
||||
sendTerminalErrorAndClose(conn, "", "", protocol.TerminalErrorCodeTaskNotFound, "no agent task has run on this issue yet — trigger a run first")
|
||||
return
|
||||
}
|
||||
|
||||
cols := parseUint16Query(r, "cols", 80)
|
||||
rows := parseUint16Query(r, "rows", 24)
|
||||
|
||||
proxy := newTerminalProxy(conn, h.DaemonHub, userID, util.UUIDToString(task.RuntimeID), util.UUIDToString(task.ID), workspaceID, util.UUIDToString(issue.ID), task.SessionID.String, task.WorkDir.String, cols, rows)
|
||||
proxy.audit = newTerminalAuditRecorder(h, util.UUIDToString(issue.ID), util.UUIDToString(task.ID), util.UUIDToString(task.RuntimeID), workspaceID, userID, task.WorkDir.String)
|
||||
proxy.run()
|
||||
}
|
||||
|
||||
// terminalAuditRecorder writes terminal_sessions rows for the audit log
|
||||
// (RFC §Auth) and as the data source behind `multica issue runs`'
|
||||
// `type=terminal` entries. Persisting happens out-of-band on a background
|
||||
// context so a slow DB write can't stall the WS handshake — the trade-off
|
||||
// is that an audit row may briefly lag the actual session open by a few
|
||||
// milliseconds, which is acceptable for an audit surface.
|
||||
type terminalAuditRecorder struct {
|
||||
h *Handler
|
||||
issueID string
|
||||
taskID string
|
||||
runtimeID string
|
||||
workspaceID string
|
||||
userID string
|
||||
workDir string
|
||||
}
|
||||
|
||||
func newTerminalAuditRecorder(h *Handler, issueID, taskID, runtimeID, workspaceID, userID, workDir string) *terminalAuditRecorder {
|
||||
if h == nil || h.Queries == nil {
|
||||
// Tests that build a handler without DB queries (terminal protocol
|
||||
// tests, etc.) skip recording — keep that path nil-safe so the
|
||||
// audit hook never panics in a unit environment.
|
||||
return nil
|
||||
}
|
||||
return &terminalAuditRecorder{
|
||||
h: h,
|
||||
issueID: issueID,
|
||||
taskID: taskID,
|
||||
runtimeID: runtimeID,
|
||||
workspaceID: workspaceID,
|
||||
userID: userID,
|
||||
workDir: workDir,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *terminalAuditRecorder) RecordOpen(sessionID, shell string) {
|
||||
if a == nil {
|
||||
return
|
||||
}
|
||||
sessUUID, err := util.ParseUUID(sessionID)
|
||||
if err != nil {
|
||||
slog.Debug("terminal audit: invalid session id", "session_id", sessionID, "error", err)
|
||||
return
|
||||
}
|
||||
issueUUID, err := util.ParseUUID(a.issueID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
taskUUID, err := util.ParseUUID(a.taskID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
wsUUID, err := util.ParseUUID(a.workspaceID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
userUUID, err := util.ParseUUID(a.userID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var runtimeUUID pgtype.UUID
|
||||
if a.runtimeID != "" {
|
||||
if parsed, perr := util.ParseUUID(a.runtimeID); perr == nil {
|
||||
runtimeUUID = parsed
|
||||
}
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if _, err := a.h.Queries.CreateTerminalSession(ctx, db.CreateTerminalSessionParams{
|
||||
ID: sessUUID,
|
||||
WorkspaceID: wsUUID,
|
||||
IssueID: issueUUID,
|
||||
TaskID: taskUUID,
|
||||
RuntimeID: runtimeUUID,
|
||||
UserID: userUUID,
|
||||
WorkDir: a.workDir,
|
||||
Shell: shell,
|
||||
StartedAt: pgtype.Timestamptz{Time: time.Now().UTC(), Valid: true},
|
||||
}); err != nil {
|
||||
slog.Warn("terminal audit: record open failed", "session_id", sessionID, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *terminalAuditRecorder) RecordClose(sessionID string, exitCode int32, hasExit bool, reason string) {
|
||||
if a == nil || sessionID == "" {
|
||||
return
|
||||
}
|
||||
sessUUID, err := util.ParseUUID(sessionID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var code pgtype.Int4
|
||||
if hasExit {
|
||||
code = pgtype.Int4{Int32: exitCode, Valid: true}
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := a.h.Queries.CloseTerminalSession(ctx, db.CloseTerminalSessionParams{
|
||||
ID: sessUUID,
|
||||
EndedAt: pgtype.Timestamptz{Time: time.Now().UTC(), Valid: true},
|
||||
ExitCode: code,
|
||||
CloseReason: reason,
|
||||
}); err != nil {
|
||||
slog.Warn("terminal audit: record close failed", "session_id", sessionID, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// terminalProxy is the per-connection bridge between browser and daemon.
|
||||
// Methods that touch the WS connection do so from one of two goroutines
|
||||
// only: writePump owns conn writes and readPump owns conn reads, matching
|
||||
// gorilla/websocket's single-goroutine-per-direction contract.
|
||||
type terminalProxy struct {
|
||||
conn *websocket.Conn
|
||||
hub *daemonws.Hub
|
||||
userID string
|
||||
runtimeID string
|
||||
taskID string
|
||||
workspaceID string
|
||||
issueID string
|
||||
priorSess string
|
||||
workDir string
|
||||
cols uint16
|
||||
rows uint16
|
||||
|
||||
requestID string
|
||||
|
||||
mu sync.Mutex
|
||||
sessionID string
|
||||
closedOnce sync.Once
|
||||
closeCh chan struct{}
|
||||
sendCh chan []byte
|
||||
|
||||
openedCh chan struct{}
|
||||
openErr chan terminalOpenFailure
|
||||
|
||||
// audit is the persistence hook for terminal_sessions rows (RFC §Auth).
|
||||
// nil in tests that build a proxy without a Handler — every call site
|
||||
// is nil-safe through the recorder methods.
|
||||
audit *terminalAuditRecorder
|
||||
|
||||
// exitMu guards exitCode/hasExit, written from writePump when the
|
||||
// daemon sends terminal.exit and read from the run() defer that
|
||||
// finalizes the audit row.
|
||||
exitMu sync.Mutex
|
||||
exitCode int32
|
||||
hasExit bool
|
||||
exitMsg string
|
||||
}
|
||||
|
||||
type terminalOpenFailure struct {
|
||||
code string
|
||||
message string
|
||||
}
|
||||
|
||||
func newTerminalProxy(conn *websocket.Conn, hub *daemonws.Hub, userID, runtimeID, taskID, workspaceID, issueID, priorSess, workDir string, cols, rows uint16) *terminalProxy {
|
||||
return &terminalProxy{
|
||||
conn: conn,
|
||||
hub: hub,
|
||||
userID: userID,
|
||||
runtimeID: runtimeID,
|
||||
taskID: taskID,
|
||||
workspaceID: workspaceID,
|
||||
issueID: issueID,
|
||||
priorSess: priorSess,
|
||||
workDir: workDir,
|
||||
cols: cols,
|
||||
rows: rows,
|
||||
requestID: uuid.NewString(),
|
||||
closeCh: make(chan struct{}),
|
||||
sendCh: make(chan []byte, 256),
|
||||
openedCh: make(chan struct{}),
|
||||
openErr: make(chan terminalOpenFailure, 1),
|
||||
}
|
||||
}
|
||||
|
||||
// Deliver implements daemonws.TerminalSink. The daemon hub's read pump
|
||||
// invokes this for every terminal.* frame addressed to our request_id /
|
||||
// session_id. We must stay non-blocking; the hub drops the frame on a
|
||||
// full buffer rather than stalling its own pump.
|
||||
func (p *terminalProxy) Deliver(frame []byte) bool {
|
||||
select {
|
||||
case p.sendCh <- frame:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (p *terminalProxy) run() {
|
||||
defer p.conn.Close()
|
||||
|
||||
router := p.hub.TerminalRouter()
|
||||
router.Register(p.requestID, p)
|
||||
defer router.Unregister(p.requestID)
|
||||
|
||||
go p.writePump()
|
||||
|
||||
if err := p.sendOpenToDaemon(); err != nil {
|
||||
// Couldn't reach the daemon at all — surface a structured error and
|
||||
// bail before we register cleanup paths that assume a live session.
|
||||
sendTerminalErrorOverChannel(p.sendCh, p.requestID, "", protocol.TerminalErrorCodeInternal, err.Error())
|
||||
<-time.After(50 * time.Millisecond) // give writePump a tick to flush
|
||||
p.shutdown()
|
||||
return
|
||||
}
|
||||
|
||||
// Block until the open ack arrives, the open is rejected, or the user
|
||||
// disconnects mid-handshake. After this point sessionID is stable and
|
||||
// we are routing on session_id rather than request_id.
|
||||
select {
|
||||
case <-p.openedCh:
|
||||
// Open succeeded. The router is already re-keyed on session_id by
|
||||
// observeOpen. Fall through to the bidirectional pump.
|
||||
case failure := <-p.openErr:
|
||||
sendTerminalErrorOverChannel(p.sendCh, p.requestID, "", failure.code, failure.message)
|
||||
<-time.After(50 * time.Millisecond)
|
||||
p.shutdown()
|
||||
return
|
||||
case <-p.closeCh:
|
||||
return
|
||||
case <-time.After(terminalOpenTimeout):
|
||||
sendTerminalErrorOverChannel(p.sendCh, p.requestID, "", protocol.TerminalErrorCodeInternal, "daemon did not respond to terminal.open within timeout")
|
||||
<-time.After(50 * time.Millisecond)
|
||||
p.shutdown()
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
sid := p.SessionID()
|
||||
if sid != "" {
|
||||
router.Unregister(sid)
|
||||
// Best-effort teardown on the daemon. If the connection to the
|
||||
// daemon is already gone, the daemon's own clearWSWrites path
|
||||
// will close the session — we just lose an idle slot's worth of
|
||||
// latency before the GC catches it.
|
||||
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalClose, protocol.TerminalClosePayload{
|
||||
SessionID: sid,
|
||||
Reason: "browser_disconnect",
|
||||
})
|
||||
if err == nil {
|
||||
_ = p.hub.SendToRuntime(p.runtimeID, frame)
|
||||
}
|
||||
// Stamp the audit row. If the daemon sent terminal.exit before
|
||||
// we got here, use its exit code + reason; otherwise this is a
|
||||
// browser-initiated disconnect.
|
||||
p.exitMu.Lock()
|
||||
code, has, reason := p.exitCode, p.hasExit, p.exitMsg
|
||||
p.exitMu.Unlock()
|
||||
if reason == "" {
|
||||
reason = "browser_disconnect"
|
||||
}
|
||||
p.audit.RecordClose(sid, code, has, reason)
|
||||
}
|
||||
}()
|
||||
|
||||
p.readPump()
|
||||
}
|
||||
|
||||
func (p *terminalProxy) SessionID() string {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
return p.sessionID
|
||||
}
|
||||
|
||||
func (p *terminalProxy) sendOpenToDaemon() error {
|
||||
payload := protocol.TerminalOpenPayload{
|
||||
RequestID: p.requestID,
|
||||
TaskID: p.taskID,
|
||||
WorkspaceID: p.workspaceID,
|
||||
UserID: p.userID,
|
||||
IssueID: p.issueID,
|
||||
WorkDir: p.workDir,
|
||||
PriorSessionID: p.priorSess,
|
||||
Cols: p.cols,
|
||||
Rows: p.rows,
|
||||
}
|
||||
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalOpen, payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := p.hub.SendToRuntime(p.runtimeID, frame); err != nil {
|
||||
if errors.Is(err, daemonws.ErrNoDaemonForRuntime) {
|
||||
return errors.New("daemon offline for this runtime — start the agent's daemon and retry")
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// readPump reads browser frames and forwards terminal.data/resize/close to
|
||||
// the daemon. Unknown frames are dropped silently. Returns when the
|
||||
// connection closes; we then trigger shutdown so writePump exits too.
|
||||
func (p *terminalProxy) readPump() {
|
||||
defer p.shutdown()
|
||||
p.conn.SetReadLimit(64 * 1024)
|
||||
for {
|
||||
_, raw, err := p.conn.ReadMessage()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var env protocol.Message
|
||||
if err := json.Unmarshal(raw, &env); err != nil {
|
||||
continue
|
||||
}
|
||||
sid := p.SessionID()
|
||||
// We force-stamp session_id on outbound frames: the browser may have
|
||||
// sent its own session_id, but only the one the daemon assigned is
|
||||
// trusted. This prevents a misbehaving client from addressing a
|
||||
// session that another user opened against the same daemon.
|
||||
switch env.Type {
|
||||
case protocol.MessageTypeTerminalData:
|
||||
var pl protocol.TerminalDataPayload
|
||||
if err := json.Unmarshal(env.Payload, &pl); err != nil {
|
||||
continue
|
||||
}
|
||||
pl.SessionID = sid
|
||||
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalData, pl)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
_ = p.hub.SendToRuntime(p.runtimeID, frame)
|
||||
case protocol.MessageTypeTerminalResize:
|
||||
var pl protocol.TerminalResizePayload
|
||||
if err := json.Unmarshal(env.Payload, &pl); err != nil {
|
||||
continue
|
||||
}
|
||||
pl.SessionID = sid
|
||||
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalResize, pl)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
_ = p.hub.SendToRuntime(p.runtimeID, frame)
|
||||
case protocol.MessageTypeTerminalClose:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// writePump is the single owner of conn writes. It also watches for the
|
||||
// terminal.opened / terminal.error handshake transition while it pumps,
|
||||
// because separating those into a second goroutine would race over
|
||||
// sendCh (only one reader per channel value).
|
||||
//
|
||||
// Once the session is open, this is just a straight relay. During the
|
||||
// open window (sessionID empty), every frame is forwarded to the browser
|
||||
// AND inspected — opened/error frames feed openedCh / openErr so run()
|
||||
// can unblock or fail the handshake.
|
||||
func (p *terminalProxy) writePump() {
|
||||
router := p.hub.TerminalRouter()
|
||||
for {
|
||||
select {
|
||||
case <-p.closeCh:
|
||||
return
|
||||
case frame, ok := <-p.sendCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
p.forwardToBrowser(frame)
|
||||
// terminal.exit can arrive at any point in the session lifecycle
|
||||
// (idle timeout, child crash, manager shutdown), so we always
|
||||
// peek the envelope to capture the exit code for the audit row.
|
||||
// terminal.opened / terminal.error are only meaningful during
|
||||
// the open handshake window — once sessionID is set they are
|
||||
// just relayed without re-inspection.
|
||||
var env protocol.Message
|
||||
if err := json.Unmarshal(frame, &env); err != nil {
|
||||
continue
|
||||
}
|
||||
if env.Type == protocol.MessageTypeTerminalExit {
|
||||
var ep protocol.TerminalExitPayload
|
||||
if err := json.Unmarshal(env.Payload, &ep); err == nil {
|
||||
p.exitMu.Lock()
|
||||
p.exitCode = int32(ep.ExitCode)
|
||||
p.hasExit = true
|
||||
if ep.Reason != "" {
|
||||
p.exitMsg = ep.Reason
|
||||
}
|
||||
p.exitMu.Unlock()
|
||||
// Finalize the audit row as soon as the daemon reports
|
||||
// exit, not when the client disconnects. CloseTerminalSession
|
||||
// is idempotent (WHERE ended_at IS NULL) so the browser_disconnect
|
||||
// fallback in run()'s defer becomes a no-op if it fires later.
|
||||
// Without this, a client that keeps the WS open after exit
|
||||
// would leave terminal_sessions.ended_at NULL forever and
|
||||
// `multica issue runs` would render an already-exited
|
||||
// terminal as active.
|
||||
sid := ep.SessionID
|
||||
if sid == "" {
|
||||
sid = p.SessionID()
|
||||
}
|
||||
reason := ep.Reason
|
||||
if reason == "" {
|
||||
reason = "exited"
|
||||
}
|
||||
p.audit.RecordClose(sid, int32(ep.ExitCode), true, reason)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if p.SessionID() != "" {
|
||||
continue
|
||||
}
|
||||
switch env.Type {
|
||||
case protocol.MessageTypeTerminalOpened:
|
||||
var op protocol.TerminalOpenedPayload
|
||||
if err := json.Unmarshal(env.Payload, &op); err == nil && op.SessionID != "" {
|
||||
p.mu.Lock()
|
||||
p.sessionID = op.SessionID
|
||||
p.mu.Unlock()
|
||||
router.Register(op.SessionID, p)
|
||||
router.Unregister(p.requestID)
|
||||
// Persist the open audit row before unblocking run(). If
|
||||
// the DB write fails the slog records the error; the
|
||||
// session itself still runs (audit is best-effort).
|
||||
p.audit.RecordOpen(op.SessionID, op.Shell)
|
||||
close(p.openedCh)
|
||||
}
|
||||
case protocol.MessageTypeTerminalError:
|
||||
var ep protocol.TerminalErrorPayload
|
||||
if err := json.Unmarshal(env.Payload, &ep); err == nil {
|
||||
select {
|
||||
case p.openErr <- terminalOpenFailure{code: ep.Code, message: ep.Message}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *terminalProxy) forwardToBrowser(frame []byte) {
|
||||
p.conn.SetWriteDeadline(time.Now().Add(terminalWriteWait))
|
||||
if err := p.conn.WriteMessage(websocket.TextMessage, frame); err != nil {
|
||||
slog.Debug("terminal ws write to browser failed", "error", err)
|
||||
p.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *terminalProxy) shutdown() {
|
||||
p.closedOnce.Do(func() {
|
||||
close(p.closeCh)
|
||||
})
|
||||
}
|
||||
|
||||
func sendTerminalErrorOverChannel(ch chan<- []byte, requestID, sessionID, code, message string) {
|
||||
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalError, protocol.TerminalErrorPayload{
|
||||
RequestID: requestID,
|
||||
SessionID: sessionID,
|
||||
Code: code,
|
||||
Message: message,
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case ch <- frame:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func sendTerminalErrorAndClose(conn *websocket.Conn, requestID, sessionID, code, message string) {
|
||||
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalError, protocol.TerminalErrorPayload{
|
||||
RequestID: requestID,
|
||||
SessionID: sessionID,
|
||||
Code: code,
|
||||
Message: message,
|
||||
})
|
||||
if err == nil {
|
||||
_ = conn.WriteMessage(websocket.TextMessage, frame)
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
func marshalTerminalFrame(msgType string, payload any) ([]byte, error) {
|
||||
raw, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return json.Marshal(protocol.Message{Type: msgType, Payload: raw})
|
||||
}
|
||||
|
||||
func parseUint16Query(r *http.Request, key string, defaultVal uint16) uint16 {
|
||||
raw := r.URL.Query().Get(key)
|
||||
if raw == "" {
|
||||
return defaultVal
|
||||
}
|
||||
v, err := strconv.ParseUint(raw, 10, 16)
|
||||
if err != nil || v == 0 {
|
||||
return defaultVal
|
||||
}
|
||||
return uint16(v)
|
||||
}
|
||||
|
||||
// --- auth helpers (cookie + first-frame JWT/PAT) ---
|
||||
|
||||
func terminalAuthCookie(r *http.Request, h *Handler) (string, string) {
|
||||
cookie, err := r.Cookie(auth.AuthCookieName)
|
||||
if err != nil || cookie.Value == "" {
|
||||
return "", ""
|
||||
}
|
||||
return terminalAuthToken(cookie.Value, h, r.Context())
|
||||
}
|
||||
|
||||
func terminalFirstFrameAuth(conn *websocket.Conn, h *Handler) (string, string) {
|
||||
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
|
||||
defer conn.SetReadDeadline(time.Time{})
|
||||
_, raw, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
return "", `{"error":"auth timeout or read error"}`
|
||||
}
|
||||
var msg struct {
|
||||
Type string `json:"type"`
|
||||
Payload struct {
|
||||
Token string `json:"token"`
|
||||
} `json:"payload"`
|
||||
}
|
||||
if err := json.Unmarshal(raw, &msg); err != nil || msg.Type != "auth" || msg.Payload.Token == "" {
|
||||
return "", `{"error":"expected auth message as first frame"}`
|
||||
}
|
||||
return terminalAuthToken(msg.Payload.Token, h, context.Background())
|
||||
}
|
||||
|
||||
func terminalAuthToken(tokenStr string, h *Handler, ctx context.Context) (string, string) {
|
||||
if len(tokenStr) > 4 && tokenStr[:4] == "mul_" {
|
||||
if h.PATCache == nil {
|
||||
pat, err := h.Queries.GetPersonalAccessTokenByHash(ctx, auth.HashToken(tokenStr))
|
||||
if err != nil {
|
||||
return "", `{"error":"invalid token"}`
|
||||
}
|
||||
return util.UUIDToString(pat.UserID), ""
|
||||
}
|
||||
hash := auth.HashToken(tokenStr)
|
||||
if uid, ok := h.PATCache.Get(ctx, hash); ok {
|
||||
return uid, ""
|
||||
}
|
||||
pat, err := h.Queries.GetPersonalAccessTokenByHash(ctx, hash)
|
||||
if err != nil {
|
||||
return "", `{"error":"invalid token"}`
|
||||
}
|
||||
uid := util.UUIDToString(pat.UserID)
|
||||
return uid, ""
|
||||
}
|
||||
token, err := jwt.Parse(tokenStr, func(token *jwt.Token) (any, error) {
|
||||
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
|
||||
return nil, jwt.ErrSignatureInvalid
|
||||
}
|
||||
return auth.JWTSecret(), nil
|
||||
})
|
||||
if err != nil || !token.Valid {
|
||||
return "", `{"error":"invalid token"}`
|
||||
}
|
||||
claims, ok := token.Claims.(jwt.MapClaims)
|
||||
if !ok {
|
||||
return "", `{"error":"invalid claims"}`
|
||||
}
|
||||
uid, ok := claims["sub"].(string)
|
||||
if !ok || uid == "" {
|
||||
return "", `{"error":"invalid claims"}`
|
||||
}
|
||||
return uid, ""
|
||||
}
|
||||
|
||||
func (h *Handler) terminalIsMember(ctx context.Context, userID, workspaceID string) bool {
|
||||
userUUID, err := util.ParseUUID(userID)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
wsUUID, err := util.ParseUUID(workspaceID)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
_, err = h.Queries.GetMemberByUserAndWorkspace(ctx, db.GetMemberByUserAndWorkspaceParams{
|
||||
UserID: userUUID,
|
||||
WorkspaceID: wsUUID,
|
||||
})
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (h *Handler) terminalResolveIssue(ctx context.Context, issueID string, wsUUID pgtype.UUID) (db.Issue, bool) {
|
||||
if parts := splitIdentifier(issueID); parts != nil {
|
||||
issue, err := h.Queries.GetIssueByNumber(ctx, db.GetIssueByNumberParams{
|
||||
WorkspaceID: wsUUID,
|
||||
Number: parts.number,
|
||||
})
|
||||
if err == nil {
|
||||
return issue, true
|
||||
}
|
||||
}
|
||||
issueUUID, err := util.ParseUUID(issueID)
|
||||
if err != nil {
|
||||
return db.Issue{}, false
|
||||
}
|
||||
issue, err := h.Queries.GetIssueInWorkspace(ctx, db.GetIssueInWorkspaceParams{
|
||||
ID: issueUUID,
|
||||
WorkspaceID: wsUUID,
|
||||
})
|
||||
if err != nil {
|
||||
return db.Issue{}, false
|
||||
}
|
||||
return issue, true
|
||||
}
|
||||
|
||||
// terminalLatestAttachableTask returns the most recent task on the issue
|
||||
// that the proxy can attach to: must have a known work_dir, a runtime_id,
|
||||
// and a daemon currently connected for that runtime. Falls back through
|
||||
// the task history rather than picking only the absolute latest, because
|
||||
// the most-recent row may be a queued task that never ran (no workdir yet).
|
||||
func (h *Handler) terminalLatestAttachableTask(ctx context.Context, issueID pgtype.UUID) (db.AgentTaskQueue, bool) {
|
||||
tasks, err := h.Queries.ListTasksByIssue(ctx, issueID)
|
||||
if err != nil {
|
||||
return db.AgentTaskQueue{}, false
|
||||
}
|
||||
for _, t := range tasks {
|
||||
if !t.WorkDir.Valid || t.WorkDir.String == "" {
|
||||
continue
|
||||
}
|
||||
if !t.RuntimeID.Valid {
|
||||
continue
|
||||
}
|
||||
return t, true
|
||||
}
|
||||
return db.AgentTaskQueue{}, false
|
||||
}
|
||||
@@ -77,6 +77,13 @@ func SetAllowedOrigins(origins []string) {
|
||||
allowedWSOrigins.Store(origins)
|
||||
}
|
||||
|
||||
// CheckOrigin is the WebSocket origin check shared by every authenticated
|
||||
// realtime endpoint (including the issue terminal proxy). Browser endpoints
|
||||
// must NOT install their own `CheckOrigin: true` upgrader — that bypasses
|
||||
// CSWSH defense; route through this function instead so the allowlist stays
|
||||
// the single source of truth.
|
||||
func CheckOrigin(r *http.Request) bool { return checkOrigin(r) }
|
||||
|
||||
func checkOrigin(r *http.Request) bool {
|
||||
origin := r.Header.Get("Origin")
|
||||
if origin == "" {
|
||||
|
||||
1
server/migrations/091_terminal_sessions.down.sql
Normal file
1
server/migrations/091_terminal_sessions.down.sql
Normal file
@@ -0,0 +1 @@
|
||||
DROP TABLE IF EXISTS terminal_sessions;
|
||||
36
server/migrations/091_terminal_sessions.up.sql
Normal file
36
server/migrations/091_terminal_sessions.up.sql
Normal file
@@ -0,0 +1,36 @@
|
||||
-- terminal_sessions records every interactive PTY a user opens against an
|
||||
-- agent task workdir via the Issue → Terminal panel or `multica issue
|
||||
-- terminal`. The row is the audit log entry (RFC §Auth) and the source
|
||||
-- behind the `type=terminal` rows that surface in `multica issue runs`.
|
||||
--
|
||||
-- We keep this lightweight on purpose: keystrokes are NEVER recorded
|
||||
-- (privacy + volume), only the open/close envelope. close_reason is the
|
||||
-- string the daemon's terminal.Manager attaches to the teardown
|
||||
-- (browser_disconnect, idle_timeout, manager_shutdown, ws_disconnect,
|
||||
-- exited, …) so operators can tell why a session ended without grepping
|
||||
-- the daemon logs.
|
||||
CREATE TABLE terminal_sessions (
|
||||
id UUID PRIMARY KEY,
|
||||
workspace_id UUID NOT NULL,
|
||||
issue_id UUID NOT NULL,
|
||||
task_id UUID NOT NULL,
|
||||
runtime_id UUID,
|
||||
user_id UUID NOT NULL,
|
||||
work_dir TEXT NOT NULL,
|
||||
shell TEXT NOT NULL DEFAULT '',
|
||||
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
ended_at TIMESTAMPTZ,
|
||||
exit_code INTEGER,
|
||||
close_reason TEXT NOT NULL DEFAULT ''
|
||||
);
|
||||
|
||||
-- Listing in the issue runs view is always scoped to a single issue and
|
||||
-- ordered by most-recent-first; this is the dominant access path.
|
||||
CREATE INDEX terminal_sessions_issue_started_idx
|
||||
ON terminal_sessions (issue_id, started_at DESC);
|
||||
|
||||
-- Per-workspace audits (e.g. "show me every terminal session in this
|
||||
-- workspace") and the cross-workspace ACL check both filter by
|
||||
-- workspace_id first, so a covering index keeps that path cheap.
|
||||
CREATE INDEX terminal_sessions_workspace_started_idx
|
||||
ON terminal_sessions (workspace_id, started_at DESC);
|
||||
@@ -563,6 +563,21 @@ type TaskUsageRollupState struct {
|
||||
LastError pgtype.Text `json:"last_error"`
|
||||
}
|
||||
|
||||
type TerminalSession struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
IssueID pgtype.UUID `json:"issue_id"`
|
||||
TaskID pgtype.UUID `json:"task_id"`
|
||||
RuntimeID pgtype.UUID `json:"runtime_id"`
|
||||
UserID pgtype.UUID `json:"user_id"`
|
||||
WorkDir string `json:"work_dir"`
|
||||
Shell string `json:"shell"`
|
||||
StartedAt pgtype.Timestamptz `json:"started_at"`
|
||||
EndedAt pgtype.Timestamptz `json:"ended_at"`
|
||||
ExitCode pgtype.Int4 `json:"exit_code"`
|
||||
CloseReason string `json:"close_reason"`
|
||||
}
|
||||
|
||||
type User struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
Name string `json:"name"`
|
||||
|
||||
129
server/pkg/db/generated/terminal_session.sql.go
Normal file
129
server/pkg/db/generated/terminal_session.sql.go
Normal file
@@ -0,0 +1,129 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.30.0
|
||||
// source: terminal_session.sql
|
||||
|
||||
package db
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
const closeTerminalSession = `-- name: CloseTerminalSession :exec
|
||||
UPDATE terminal_sessions
|
||||
SET ended_at = $2,
|
||||
exit_code = $3,
|
||||
close_reason = $4
|
||||
WHERE id = $1 AND ended_at IS NULL
|
||||
`
|
||||
|
||||
type CloseTerminalSessionParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
EndedAt pgtype.Timestamptz `json:"ended_at"`
|
||||
ExitCode pgtype.Int4 `json:"exit_code"`
|
||||
CloseReason string `json:"close_reason"`
|
||||
}
|
||||
|
||||
// Idempotent: ended_at IS NULL guards against double-close (e.g. the
|
||||
// daemon emitting terminal.exit and the user closing the tab racing).
|
||||
// A second call after the first has stamped ended_at is a no-op.
|
||||
func (q *Queries) CloseTerminalSession(ctx context.Context, arg CloseTerminalSessionParams) error {
|
||||
_, err := q.db.Exec(ctx, closeTerminalSession,
|
||||
arg.ID,
|
||||
arg.EndedAt,
|
||||
arg.ExitCode,
|
||||
arg.CloseReason,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
const createTerminalSession = `-- name: CreateTerminalSession :one
|
||||
INSERT INTO terminal_sessions (
|
||||
id, workspace_id, issue_id, task_id, runtime_id, user_id, work_dir, shell, started_at
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5, $6, $7, $8, $9
|
||||
) RETURNING id, workspace_id, issue_id, task_id, runtime_id, user_id, work_dir, shell, started_at, ended_at, exit_code, close_reason
|
||||
`
|
||||
|
||||
type CreateTerminalSessionParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
IssueID pgtype.UUID `json:"issue_id"`
|
||||
TaskID pgtype.UUID `json:"task_id"`
|
||||
RuntimeID pgtype.UUID `json:"runtime_id"`
|
||||
UserID pgtype.UUID `json:"user_id"`
|
||||
WorkDir string `json:"work_dir"`
|
||||
Shell string `json:"shell"`
|
||||
StartedAt pgtype.Timestamptz `json:"started_at"`
|
||||
}
|
||||
|
||||
func (q *Queries) CreateTerminalSession(ctx context.Context, arg CreateTerminalSessionParams) (TerminalSession, error) {
|
||||
row := q.db.QueryRow(ctx, createTerminalSession,
|
||||
arg.ID,
|
||||
arg.WorkspaceID,
|
||||
arg.IssueID,
|
||||
arg.TaskID,
|
||||
arg.RuntimeID,
|
||||
arg.UserID,
|
||||
arg.WorkDir,
|
||||
arg.Shell,
|
||||
arg.StartedAt,
|
||||
)
|
||||
var i TerminalSession
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.IssueID,
|
||||
&i.TaskID,
|
||||
&i.RuntimeID,
|
||||
&i.UserID,
|
||||
&i.WorkDir,
|
||||
&i.Shell,
|
||||
&i.StartedAt,
|
||||
&i.EndedAt,
|
||||
&i.ExitCode,
|
||||
&i.CloseReason,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const listTerminalSessionsByIssue = `-- name: ListTerminalSessionsByIssue :many
|
||||
SELECT id, workspace_id, issue_id, task_id, runtime_id, user_id, work_dir, shell, started_at, ended_at, exit_code, close_reason FROM terminal_sessions
|
||||
WHERE issue_id = $1
|
||||
ORDER BY started_at DESC
|
||||
`
|
||||
|
||||
func (q *Queries) ListTerminalSessionsByIssue(ctx context.Context, issueID pgtype.UUID) ([]TerminalSession, error) {
|
||||
rows, err := q.db.Query(ctx, listTerminalSessionsByIssue, issueID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []TerminalSession{}
|
||||
for rows.Next() {
|
||||
var i TerminalSession
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.IssueID,
|
||||
&i.TaskID,
|
||||
&i.RuntimeID,
|
||||
&i.UserID,
|
||||
&i.WorkDir,
|
||||
&i.Shell,
|
||||
&i.StartedAt,
|
||||
&i.EndedAt,
|
||||
&i.ExitCode,
|
||||
&i.CloseReason,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
21
server/pkg/db/queries/terminal_session.sql
Normal file
21
server/pkg/db/queries/terminal_session.sql
Normal file
@@ -0,0 +1,21 @@
|
||||
-- name: CreateTerminalSession :one
|
||||
INSERT INTO terminal_sessions (
|
||||
id, workspace_id, issue_id, task_id, runtime_id, user_id, work_dir, shell, started_at
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5, $6, $7, $8, $9
|
||||
) RETURNING *;
|
||||
|
||||
-- name: CloseTerminalSession :exec
|
||||
-- Idempotent: ended_at IS NULL guards against double-close (e.g. the
|
||||
-- daemon emitting terminal.exit and the user closing the tab racing).
|
||||
-- A second call after the first has stamped ended_at is a no-op.
|
||||
UPDATE terminal_sessions
|
||||
SET ended_at = $2,
|
||||
exit_code = $3,
|
||||
close_reason = $4
|
||||
WHERE id = $1 AND ended_at IS NULL;
|
||||
|
||||
-- name: ListTerminalSessionsByIssue :many
|
||||
SELECT * FROM terminal_sessions
|
||||
WHERE issue_id = $1
|
||||
ORDER BY started_at DESC;
|
||||
@@ -168,3 +168,99 @@ type DaemonHeartbeatPendingLocalSkillImport struct {
|
||||
ID string `json:"id"`
|
||||
SkillKey string `json:"skill_key"`
|
||||
}
|
||||
|
||||
// Terminal WS message types. These flow over the existing daemonws hub
|
||||
// between client (web/desktop/CLI) and daemon. Bytes payloads are base64
|
||||
// encoded so they can travel as JSON text frames without binary framing.
|
||||
const (
|
||||
// TerminalOpen — client → daemon: request a new PTY session bound to a task workdir.
|
||||
MessageTypeTerminalOpen = "terminal.open"
|
||||
// TerminalOpened — daemon → client: ack carrying the session_id and resolved workdir.
|
||||
MessageTypeTerminalOpened = "terminal.opened"
|
||||
// TerminalData — bidirectional: PTY stdin (client→daemon) / stdout+stderr (daemon→client).
|
||||
MessageTypeTerminalData = "terminal.data"
|
||||
// TerminalResize — client → daemon: window-size change.
|
||||
MessageTypeTerminalResize = "terminal.resize"
|
||||
// TerminalClose — bidirectional: explicit teardown request / ack.
|
||||
MessageTypeTerminalClose = "terminal.close"
|
||||
// TerminalExit — daemon → client: child process exited; carries exit code and optional reason.
|
||||
MessageTypeTerminalExit = "terminal.exit"
|
||||
// TerminalError — daemon → client: open/resize/etc. failed; carries human-readable code+message.
|
||||
MessageTypeTerminalError = "terminal.error"
|
||||
)
|
||||
|
||||
// TerminalOpenPayload requests a PTY session bound to the given task's
|
||||
// workdir. WorkspaceID is the workspace the caller is acting in; the daemon
|
||||
// must reject if it does not match the task's workspace.
|
||||
//
|
||||
// The server resolves WorkDir / IssueID / PriorSessionID from its own DB
|
||||
// (the daemon has no persistent task cache) and embeds them here before
|
||||
// forwarding to the daemon. The daemon trusts these fields because the
|
||||
// daemonws connection is already authenticated and scoped — but it still
|
||||
// rechecks WorkspaceID against the request body to catch a misrouted frame.
|
||||
type TerminalOpenPayload struct {
|
||||
RequestID string `json:"request_id"`
|
||||
TaskID string `json:"task_id"`
|
||||
WorkspaceID string `json:"workspace_id"`
|
||||
UserID string `json:"user_id,omitempty"`
|
||||
IssueID string `json:"issue_id,omitempty"`
|
||||
WorkDir string `json:"work_dir,omitempty"`
|
||||
PriorSessionID string `json:"prior_session_id,omitempty"`
|
||||
Cols uint16 `json:"cols"`
|
||||
Rows uint16 `json:"rows"`
|
||||
}
|
||||
|
||||
// TerminalOpenedPayload echoes the request_id and carries the session_id the
|
||||
// client must include on subsequent data/resize/close frames.
|
||||
type TerminalOpenedPayload struct {
|
||||
RequestID string `json:"request_id"`
|
||||
SessionID string `json:"session_id"`
|
||||
WorkDir string `json:"work_dir"`
|
||||
Shell string `json:"shell"`
|
||||
}
|
||||
|
||||
// TerminalDataPayload carries raw PTY bytes in base64.
|
||||
type TerminalDataPayload struct {
|
||||
SessionID string `json:"session_id"`
|
||||
DataB64 string `json:"data_b64"`
|
||||
}
|
||||
|
||||
// TerminalResizePayload updates the PTY window size.
|
||||
type TerminalResizePayload struct {
|
||||
SessionID string `json:"session_id"`
|
||||
Cols uint16 `json:"cols"`
|
||||
Rows uint16 `json:"rows"`
|
||||
}
|
||||
|
||||
// TerminalClosePayload requests teardown. Reason is informational.
|
||||
type TerminalClosePayload struct {
|
||||
SessionID string `json:"session_id"`
|
||||
Reason string `json:"reason,omitempty"`
|
||||
}
|
||||
|
||||
// TerminalExitPayload signals the child process exited.
|
||||
type TerminalExitPayload struct {
|
||||
SessionID string `json:"session_id"`
|
||||
ExitCode int `json:"exit_code"`
|
||||
Reason string `json:"reason,omitempty"`
|
||||
}
|
||||
|
||||
// Terminal error codes returned in TerminalErrorPayload.Code.
|
||||
const (
|
||||
TerminalErrorCodeWorkspaceMismatch = "workspace_mismatch"
|
||||
TerminalErrorCodeTaskNotFound = "task_not_found"
|
||||
TerminalErrorCodeSessionNotFound = "session_not_found"
|
||||
TerminalErrorCodeUnsupportedOS = "unsupported_os"
|
||||
TerminalErrorCodeSpawnFailed = "spawn_failed"
|
||||
TerminalErrorCodeInternal = "internal"
|
||||
)
|
||||
|
||||
// TerminalErrorPayload reports a failure. RequestID is set when the error
|
||||
// is a response to a specific open request; SessionID is set when it
|
||||
// references an already-established session.
|
||||
type TerminalErrorPayload struct {
|
||||
RequestID string `json:"request_id,omitempty"`
|
||||
SessionID string `json:"session_id,omitempty"`
|
||||
Code string `json:"code"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user