mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 11:48:42 +02:00
Compare commits
5 Commits
feat/cloud
...
agent/lamb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0a97663acb | ||
|
|
953fdd5003 | ||
|
|
e70f44b92b | ||
|
|
281f1073b5 | ||
|
|
6758feba05 |
@@ -53,6 +53,7 @@ import { ResolvedThreadBar } from "./resolved-thread-bar";
|
||||
import { collectThreadReplies } from "./thread-utils";
|
||||
import { AgentLiveCard } from "./agent-live-card";
|
||||
import { ExecutionLogSection } from "./execution-log-section";
|
||||
import { TerminalPanel } from "./terminal-panel";
|
||||
import { PullRequestList } from "./pull-request-list";
|
||||
import { useQuery } from "@tanstack/react-query";
|
||||
import { useAuthStore } from "@multica/core/auth";
|
||||
@@ -1366,6 +1367,12 @@ export function IssueDetail({ issueId, onDelete, onDone, defaultSidebarOpen = tr
|
||||
when there are no runs to show. */}
|
||||
<ExecutionLogSection issueId={id} />
|
||||
|
||||
{/* Terminal panel — attaches to the PTY running in the daemon's
|
||||
workdir for the latest agent task. Desktop-only (the panel
|
||||
itself renders an explanatory placeholder on web).
|
||||
See MUL-2295. */}
|
||||
<TerminalPanelSection issueId={id} workspaceId={wsId} />
|
||||
|
||||
{/* Token usage */}
|
||||
{usage && usage.task_count > 0 && (
|
||||
<div>
|
||||
@@ -1904,3 +1911,23 @@ export function IssueDetail({ issueId, onDelete, onDone, defaultSidebarOpen = tr
|
||||
</ResizablePanelGroup>
|
||||
);
|
||||
}
|
||||
|
||||
// TerminalPanelSection wraps TerminalPanel in a collapsible header that
|
||||
// matches the existing sidebar sections (Token usage, etc.). Collapsed
|
||||
// by default — opening a PTY is an explicit action, and ResizeObserver +
|
||||
// xterm bootstrap should not run for every issue view.
|
||||
function TerminalPanelSection({ issueId, workspaceId }: { issueId: string; workspaceId: string }) {
|
||||
const [open, setOpen] = useState(false);
|
||||
return (
|
||||
<div className="mt-6">
|
||||
<button
|
||||
className={`flex w-full items-center gap-1 rounded-md px-2 py-1 text-xs font-medium transition-colors mb-2 hover:bg-accent/70 ${open ? "" : "text-muted-foreground hover:text-foreground"}`}
|
||||
onClick={() => setOpen((v) => !v)}
|
||||
>
|
||||
Terminal
|
||||
<ChevronRight className={`!size-3 shrink-0 stroke-[2.5] text-muted-foreground transition-transform ${open ? "rotate-90" : ""}`} />
|
||||
</button>
|
||||
{open && <TerminalPanel issueId={issueId} workspaceId={workspaceId} />}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
342
packages/views/issues/components/terminal-panel.tsx
Normal file
342
packages/views/issues/components/terminal-panel.tsx
Normal file
@@ -0,0 +1,342 @@
|
||||
"use client";
|
||||
|
||||
import { useEffect, useMemo, useRef, useState } from "react";
|
||||
import { Terminal as XTerminal } from "@xterm/xterm";
|
||||
import { FitAddon } from "@xterm/addon-fit";
|
||||
import { getApi } from "@multica/core/api";
|
||||
import { Button } from "@multica/ui/components/ui/button";
|
||||
import "@xterm/xterm/css/xterm.css";
|
||||
|
||||
// Protocol message types — kept in lockstep with
|
||||
// server/pkg/protocol/messages.go. Strings are stable across daemon /
|
||||
// server / browser, so duplicating them client-side is OK; if we ever
|
||||
// regenerate types from Go we can swap these out.
|
||||
const MSG_TERMINAL_DATA = "terminal.data";
|
||||
const MSG_TERMINAL_RESIZE = "terminal.resize";
|
||||
const MSG_TERMINAL_CLOSE = "terminal.close";
|
||||
const MSG_TERMINAL_OPENED = "terminal.opened";
|
||||
const MSG_TERMINAL_EXIT = "terminal.exit";
|
||||
const MSG_TERMINAL_ERROR = "terminal.error";
|
||||
|
||||
interface Envelope {
|
||||
type: string;
|
||||
payload: unknown;
|
||||
}
|
||||
|
||||
interface OpenedPayload {
|
||||
request_id: string;
|
||||
session_id: string;
|
||||
work_dir: string;
|
||||
shell: string;
|
||||
}
|
||||
|
||||
interface DataPayload {
|
||||
session_id: string;
|
||||
data_b64: string;
|
||||
}
|
||||
|
||||
interface ExitPayload {
|
||||
session_id: string;
|
||||
exit_code: number;
|
||||
reason?: string;
|
||||
}
|
||||
|
||||
interface ErrorPayload {
|
||||
request_id?: string;
|
||||
session_id?: string;
|
||||
code: string;
|
||||
message: string;
|
||||
}
|
||||
|
||||
// Detect Electron — server-side render guard plus the desktop preload
|
||||
// surface check. Mirrors the pattern used elsewhere in the desktop app;
|
||||
// the Terminal panel is intentionally desktop-only because the daemon
|
||||
// only runs on a developer machine.
|
||||
function isDesktopRuntime(): boolean {
|
||||
return typeof window !== "undefined" && "desktopAPI" in window;
|
||||
}
|
||||
|
||||
interface TerminalPanelProps {
|
||||
issueId: string;
|
||||
workspaceId: string;
|
||||
}
|
||||
|
||||
export function TerminalPanel({ issueId, workspaceId }: TerminalPanelProps) {
|
||||
const containerRef = useRef<HTMLDivElement | null>(null);
|
||||
const termRef = useRef<XTerminal | null>(null);
|
||||
const fitRef = useRef<FitAddon | null>(null);
|
||||
const wsRef = useRef<WebSocket | null>(null);
|
||||
const sessionIdRef = useRef<string>("");
|
||||
|
||||
const [status, setStatus] = useState<
|
||||
"idle" | "connecting" | "connected" | "closed" | "error"
|
||||
>("idle");
|
||||
const [errorMessage, setErrorMessage] = useState<string>("");
|
||||
const [reconnectKey, setReconnectKey] = useState(0);
|
||||
|
||||
const wsUrl = useMemo(() => deriveTerminalWsUrl(issueId, workspaceId), [
|
||||
issueId,
|
||||
workspaceId,
|
||||
]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!isDesktopRuntime()) return;
|
||||
if (!containerRef.current) return;
|
||||
|
||||
const term = new XTerminal({
|
||||
convertEol: true,
|
||||
cursorBlink: true,
|
||||
fontFamily:
|
||||
"ui-monospace, SFMono-Regular, Menlo, Monaco, 'Cascadia Mono', 'Roboto Mono', 'Courier New', monospace",
|
||||
fontSize: 13,
|
||||
theme: { background: "#0b0b0b", foreground: "#e6e6e6" },
|
||||
// Scrollback large enough to read a verbose `cargo build` or `git
|
||||
// log` without auto-clipping the top.
|
||||
scrollback: 5000,
|
||||
});
|
||||
const fit = new FitAddon();
|
||||
term.loadAddon(fit);
|
||||
term.open(containerRef.current);
|
||||
fit.fit();
|
||||
termRef.current = term;
|
||||
fitRef.current = fit;
|
||||
|
||||
term.writeln("\x1b[90mconnecting to daemon…\x1b[0m");
|
||||
|
||||
setStatus("connecting");
|
||||
const ws = new WebSocket(wsUrl);
|
||||
wsRef.current = ws;
|
||||
|
||||
ws.onopen = () => {
|
||||
// Cookie auth carries the session by default. If we ever flip to
|
||||
// token-mode (no cookie), this is where we'd send an `auth` frame
|
||||
// mirroring realtime/ws-client.ts. Server falls back gracefully.
|
||||
setStatus("connected");
|
||||
};
|
||||
|
||||
ws.onerror = () => {
|
||||
// The browser only surfaces a generic Event; the server sends a
|
||||
// structured terminal.error frame which we already render below.
|
||||
// Keep this minimal so we don't double-up the error UI.
|
||||
setStatus("error");
|
||||
};
|
||||
|
||||
ws.onclose = (ev) => {
|
||||
setStatus("closed");
|
||||
term.writeln(
|
||||
`\r\n\x1b[90mconnection closed (code=${ev.code})${
|
||||
ev.reason ? ` reason=${ev.reason}` : ""
|
||||
}\x1b[0m`,
|
||||
);
|
||||
};
|
||||
|
||||
ws.onmessage = (ev) => {
|
||||
let env: Envelope;
|
||||
try {
|
||||
env = JSON.parse(typeof ev.data === "string" ? ev.data : "");
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
switch (env.type) {
|
||||
case MSG_TERMINAL_OPENED: {
|
||||
const p = env.payload as OpenedPayload;
|
||||
sessionIdRef.current = p.session_id;
|
||||
term.writeln(
|
||||
`\x1b[90mattached to ${p.shell} (cwd: ${p.work_dir})\x1b[0m`,
|
||||
);
|
||||
// Send an initial resize matching the terminal's actual size,
|
||||
// because the server-side open uses default 80x24 until we tell
|
||||
// it otherwise.
|
||||
const cols = term.cols;
|
||||
const rows = term.rows;
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
type: MSG_TERMINAL_RESIZE,
|
||||
payload: {
|
||||
session_id: p.session_id,
|
||||
cols,
|
||||
rows,
|
||||
},
|
||||
}),
|
||||
);
|
||||
break;
|
||||
}
|
||||
case MSG_TERMINAL_DATA: {
|
||||
const p = env.payload as DataPayload;
|
||||
if (typeof p.data_b64 !== "string") break;
|
||||
const decoded = atobToUint8(p.data_b64);
|
||||
// xterm.js accepts Uint8Array; we avoid the latin1 round-trip
|
||||
// that would otherwise mangle UTF-8 PTY output.
|
||||
term.write(decoded);
|
||||
break;
|
||||
}
|
||||
case MSG_TERMINAL_EXIT: {
|
||||
const p = env.payload as ExitPayload;
|
||||
term.writeln(
|
||||
`\r\n\x1b[90mprocess exited (code=${p.exit_code}${
|
||||
p.reason ? `, reason=${p.reason}` : ""
|
||||
})\x1b[0m`,
|
||||
);
|
||||
ws.close();
|
||||
break;
|
||||
}
|
||||
case MSG_TERMINAL_ERROR: {
|
||||
const p = env.payload as ErrorPayload;
|
||||
setErrorMessage(`${p.code}: ${p.message}`);
|
||||
term.writeln(`\r\n\x1b[31m${p.code}: ${p.message}\x1b[0m`);
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Forward keystrokes as terminal.data with base64 of the UTF-8 bytes.
|
||||
const dataSub = term.onData((data) => {
|
||||
if (ws.readyState !== WebSocket.OPEN) return;
|
||||
if (!sessionIdRef.current) return;
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
type: MSG_TERMINAL_DATA,
|
||||
payload: {
|
||||
session_id: sessionIdRef.current,
|
||||
data_b64: utf8ToBase64(data),
|
||||
},
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
const resizeSub = term.onResize(({ cols, rows }) => {
|
||||
if (ws.readyState !== WebSocket.OPEN) return;
|
||||
if (!sessionIdRef.current) return;
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
type: MSG_TERMINAL_RESIZE,
|
||||
payload: {
|
||||
session_id: sessionIdRef.current,
|
||||
cols,
|
||||
rows,
|
||||
},
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
// Observe container size and re-fit so the PTY size tracks the panel
|
||||
// (the right sidebar can be resized at runtime).
|
||||
const ro = new ResizeObserver(() => {
|
||||
try {
|
||||
fit.fit();
|
||||
} catch {
|
||||
// fit() throws when the container has zero height during teardown;
|
||||
// ignore — the next mount will rebind.
|
||||
}
|
||||
});
|
||||
ro.observe(containerRef.current);
|
||||
|
||||
return () => {
|
||||
dataSub.dispose();
|
||||
resizeSub.dispose();
|
||||
ro.disconnect();
|
||||
try {
|
||||
if (sessionIdRef.current && ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
type: MSG_TERMINAL_CLOSE,
|
||||
payload: { session_id: sessionIdRef.current, reason: "panel_unmount" },
|
||||
}),
|
||||
);
|
||||
}
|
||||
} catch {
|
||||
// ws may be already closing; nothing to do.
|
||||
}
|
||||
ws.close();
|
||||
term.dispose();
|
||||
termRef.current = null;
|
||||
fitRef.current = null;
|
||||
wsRef.current = null;
|
||||
sessionIdRef.current = "";
|
||||
};
|
||||
}, [wsUrl, reconnectKey]);
|
||||
|
||||
if (!isDesktopRuntime()) {
|
||||
return (
|
||||
<div className="rounded-md border border-dashed p-4 text-sm text-muted-foreground">
|
||||
The terminal is only available in the Multica Desktop app. It attaches
|
||||
to the PTY hosted by the local daemon that ran the agent task.
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
return (
|
||||
<div className="flex flex-col gap-2">
|
||||
<div className="flex items-center justify-between text-xs text-muted-foreground">
|
||||
<span>
|
||||
Status: <span className="font-medium">{status}</span>
|
||||
{errorMessage ? (
|
||||
<span className="ml-2 text-destructive">— {errorMessage}</span>
|
||||
) : null}
|
||||
</span>
|
||||
<Button
|
||||
variant="ghost"
|
||||
size="sm"
|
||||
onClick={() => {
|
||||
setErrorMessage("");
|
||||
setReconnectKey((n) => n + 1);
|
||||
}}
|
||||
>
|
||||
Reconnect
|
||||
</Button>
|
||||
</div>
|
||||
<div
|
||||
ref={containerRef}
|
||||
className="h-[360px] w-full overflow-hidden rounded-md border bg-black"
|
||||
/>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
function deriveTerminalWsUrl(issueId: string, workspaceId: string): string {
|
||||
// The API client knows the http(s) base URL; flip the scheme to ws(s)
|
||||
// and target the proxy endpoint registered in router.go. Falls back to
|
||||
// the page origin if for some reason the API base is empty (dev
|
||||
// environments where the API lives on the same host).
|
||||
let base = "";
|
||||
try {
|
||||
base = getApi().getBaseUrl();
|
||||
} catch {
|
||||
base = "";
|
||||
}
|
||||
if (!base && typeof window !== "undefined") {
|
||||
base = window.location.origin;
|
||||
}
|
||||
const url = new URL(base);
|
||||
if (url.protocol === "https:") {
|
||||
url.protocol = "wss:";
|
||||
} else if (url.protocol === "http:") {
|
||||
url.protocol = "ws:";
|
||||
}
|
||||
url.pathname = url.pathname.replace(/\/$/, "") +
|
||||
`/ws/issues/${encodeURIComponent(issueId)}/terminal`;
|
||||
url.search = `?workspace_id=${encodeURIComponent(workspaceId)}&cols=120&rows=30`;
|
||||
return url.toString();
|
||||
}
|
||||
|
||||
function utf8ToBase64(s: string): string {
|
||||
if (typeof TextEncoder !== "undefined") {
|
||||
const bytes = new TextEncoder().encode(s);
|
||||
let bin = "";
|
||||
bytes.forEach((b) => {
|
||||
bin += String.fromCharCode(b);
|
||||
});
|
||||
return btoa(bin);
|
||||
}
|
||||
// Fallback for old runtimes: assume latin1.
|
||||
return btoa(s);
|
||||
}
|
||||
|
||||
function atobToUint8(s: string): Uint8Array {
|
||||
const bin = atob(s);
|
||||
const out = new Uint8Array(bin.length);
|
||||
for (let i = 0; i < bin.length; i++) {
|
||||
out[i] = bin.charCodeAt(i);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
@@ -76,6 +76,8 @@
|
||||
"@tiptap/react": "^3.22.1",
|
||||
"@tiptap/starter-kit": "^3.22.1",
|
||||
"@tiptap/suggestion": "^3.22.1",
|
||||
"@xterm/xterm": "catalog:",
|
||||
"@xterm/addon-fit": "catalog:",
|
||||
"cmdk": "^1.1.1",
|
||||
"hast-util-to-html": "^4.0.1",
|
||||
"katex": "catalog:",
|
||||
|
||||
26
pnpm-lock.yaml
generated
26
pnpm-lock.yaml
generated
@@ -33,6 +33,12 @@ catalogs:
|
||||
'@vitejs/plugin-react':
|
||||
specifier: ^6.0.1
|
||||
version: 6.0.1
|
||||
'@xterm/addon-fit':
|
||||
specifier: ^0.10.0
|
||||
version: 0.10.0
|
||||
'@xterm/xterm':
|
||||
specifier: ^5.5.0
|
||||
version: 5.5.0
|
||||
class-variance-authority:
|
||||
specifier: ^0.7.1
|
||||
version: 0.7.1
|
||||
@@ -775,6 +781,12 @@ importers:
|
||||
'@tiptap/suggestion':
|
||||
specifier: ^3.22.1
|
||||
version: 3.22.1(@tiptap/core@3.22.1(@tiptap/pm@3.22.1))(@tiptap/pm@3.22.1)
|
||||
'@xterm/addon-fit':
|
||||
specifier: 'catalog:'
|
||||
version: 0.10.0(@xterm/xterm@5.5.0)
|
||||
'@xterm/xterm':
|
||||
specifier: 'catalog:'
|
||||
version: 5.5.0
|
||||
cmdk:
|
||||
specifier: ^1.1.1
|
||||
version: 1.1.1(@types/react-dom@19.2.3(@types/react@19.2.14))(@types/react@19.2.14)(react-dom@19.2.3(react@19.2.3))(react@19.2.3)
|
||||
@@ -3318,6 +3330,14 @@ packages:
|
||||
resolution: {integrity: sha512-9k/gHF6n/pAi/9tqr3m3aqkuiNosYTurLLUtc7xQ9sxB/wm7WPygCv8GYa6mS0fLJEHhqMC1ATYhz++U/lRHqg==}
|
||||
engines: {node: '>=10.0.0'}
|
||||
|
||||
'@xterm/addon-fit@0.10.0':
|
||||
resolution: {integrity: sha512-UFYkDm4HUahf2lnEyHvio51TNGiLK66mqP2JoATy7hRZeXaGMRDr00JiSF7m63vR5WKATF605yEggJKsw0JpMQ==}
|
||||
peerDependencies:
|
||||
'@xterm/xterm': ^5.0.0
|
||||
|
||||
'@xterm/xterm@5.5.0':
|
||||
resolution: {integrity: sha512-hqJHYaQb5OptNunnyAnkHyM8aCjZ1MEIDTQu1iIbbTD/xops91NB5yq1ZK/dC2JDbVWtF23zUtl9JE2NqwT87A==}
|
||||
|
||||
abbrev@3.0.1:
|
||||
resolution: {integrity: sha512-AO2ac6pjRB3SJmGJo+v5/aK6Omggp6fsLrs6wN9bd35ulu4cCwaAU9+7ZhXjeqHVkaHThLuzH0nZr0YpCDhygg==}
|
||||
engines: {node: ^18.17.0 || >=20.5.0}
|
||||
@@ -10103,6 +10123,12 @@ snapshots:
|
||||
|
||||
'@xmldom/xmldom@0.8.12': {}
|
||||
|
||||
'@xterm/addon-fit@0.10.0(@xterm/xterm@5.5.0)':
|
||||
dependencies:
|
||||
'@xterm/xterm': 5.5.0
|
||||
|
||||
'@xterm/xterm@5.5.0': {}
|
||||
|
||||
abbrev@3.0.1: {}
|
||||
|
||||
accepts@2.0.0:
|
||||
|
||||
@@ -49,6 +49,10 @@ catalog:
|
||||
# Virtualized timeline (issue detail comments)
|
||||
react-virtuoso: "^4.14.0"
|
||||
|
||||
# Interactive PTY rendering (Issue → Terminal panel on Desktop, MUL-2295)
|
||||
"@xterm/xterm": "^5.5.0"
|
||||
"@xterm/addon-fit": "^0.10.0"
|
||||
|
||||
# Product analytics
|
||||
posthog-js: "^1.176.1"
|
||||
|
||||
|
||||
@@ -197,6 +197,13 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
realtime.HandleWebSocket(hub, mc, pr, slugResolver, w, r)
|
||||
})
|
||||
|
||||
// Terminal proxy WebSocket: browser ↔ server ↔ daemonws hub ↔ daemon PTY.
|
||||
// Auth is cookie-or-first-frame (browsers can't set Authorization on a
|
||||
// WS upgrade), matching the /ws pattern above. Workspace + issue
|
||||
// membership is enforced inside the handler before the upgrade so a
|
||||
// 403 surfaces as an HTTP response rather than a silent WS close.
|
||||
r.Get("/ws/issues/{issue_id}/terminal", h.HandleIssueTerminalWS)
|
||||
|
||||
// Local file serving (when using local storage)
|
||||
if local, ok := store.(*storage.LocalStorage); ok {
|
||||
r.Get("/uploads/*", func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
@@ -8,6 +8,7 @@ require (
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.19.13
|
||||
github.com/aws/aws-sdk-go-v2/service/s3 v1.97.3
|
||||
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.5
|
||||
github.com/creack/pty v1.1.21
|
||||
github.com/go-chi/chi/v5 v5.2.5
|
||||
github.com/go-chi/cors v1.2.2
|
||||
github.com/golang-jwt/jwt/v5 v5.3.1
|
||||
|
||||
@@ -48,6 +48,8 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/creack/pty v1.1.21 h1:1/QdRyBaHHJP61QkWMXlOIBfsgdDeeKfK8SYVUWJKf0=
|
||||
github.com/creack/pty v1.1.21/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"github.com/multica-ai/multica/server/internal/cli"
|
||||
"github.com/multica-ai/multica/server/internal/daemon/execenv"
|
||||
"github.com/multica-ai/multica/server/internal/daemon/repocache"
|
||||
"github.com/multica-ai/multica/server/internal/daemon/terminal"
|
||||
"github.com/multica-ai/multica/server/pkg/agent"
|
||||
)
|
||||
|
||||
@@ -138,6 +139,24 @@ type Daemon struct {
|
||||
// deleted bare clone and an unrelated `not empty` cleanup failure.
|
||||
bgSyncs sync.WaitGroup
|
||||
|
||||
// terminalManager owns every live PTY this daemon hosts on behalf of
|
||||
// users opening the Issue → Terminal panel. Constructed in New() so
|
||||
// tests can override the manager config via cfg fields if needed
|
||||
// without forcing the production daemon to wire it lazily.
|
||||
terminalManager *terminal.Manager
|
||||
// terminalBridge mediates between the WS hub (server-side) and the
|
||||
// terminalManager. Only non-nil while a daemonws connection is up; the
|
||||
// wakeup loop swaps in a fresh bridge on every reconnect because
|
||||
// session_ids cannot survive a hub disconnect.
|
||||
terminalBridgeMu sync.RWMutex
|
||||
terminalBridge *terminalBridge
|
||||
// wsWritesMu guards wsWrites. wsWrites is the active daemonws writer
|
||||
// queue (nil while disconnected); the terminalBridge funnels every
|
||||
// outbound terminal.* frame through Daemon.sendWSFrame, which reads
|
||||
// this pointer under the lock so reconnects don't race the bridge.
|
||||
wsWritesMu sync.RWMutex
|
||||
wsWrites chan<- []byte
|
||||
|
||||
runner taskRunner // executes agent tasks; set to d.runTask by New(), overridable in tests
|
||||
cancelPollInterval time.Duration // how often handleTask polls for server-side cancellation; overridable in tests
|
||||
// runUpdateFn executes the brew-or-download upgrade. Set to d.runUpdate by
|
||||
@@ -171,9 +190,80 @@ func New(cfg Config, logger *slog.Logger) *Daemon {
|
||||
}
|
||||
d.runner = taskRunnerFunc(d.runTask)
|
||||
d.runUpdateFn = d.runUpdate
|
||||
// The terminal manager has no TaskLookup wired: every Open call goes
|
||||
// through OpenWithInfo using a TaskInfo that the server resolved from
|
||||
// the DB before forwarding terminal.open over daemonws (the daemon
|
||||
// does not maintain a persistent task cache).
|
||||
d.terminalManager = terminal.NewManager(terminal.ManagerConfig{
|
||||
IdleTimeout: terminal.DefaultIdleTimeout,
|
||||
Logger: logger,
|
||||
}, nil)
|
||||
return d
|
||||
}
|
||||
|
||||
// sendWSFrame pushes a raw frame onto the current daemonws writer queue.
|
||||
// Returns false when no connection is active or the writer queue is
|
||||
// saturated — callers (today: terminalBridge) drop the frame rather than
|
||||
// block, because the next reconnect or queue drain will unstick us.
|
||||
func (d *Daemon) sendWSFrame(frame []byte) bool {
|
||||
d.wsWritesMu.RLock()
|
||||
writes := d.wsWrites
|
||||
d.wsWritesMu.RUnlock()
|
||||
if writes == nil {
|
||||
return false
|
||||
}
|
||||
select {
|
||||
case writes <- frame:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// installWSWrites stores the connection-local writer queue so the bridge
|
||||
// can address it through Daemon.sendWSFrame. A fresh terminalBridge is
|
||||
// installed each call: session_ids minted on a previous WS connection are
|
||||
// not valid on the new one (the server-side proxy registered routing
|
||||
// against the old hub client), so it's cleaner to tear every PTY down
|
||||
// than to half-revive them.
|
||||
func (d *Daemon) installWSWrites(writes chan<- []byte) {
|
||||
d.wsWritesMu.Lock()
|
||||
d.wsWrites = writes
|
||||
d.wsWritesMu.Unlock()
|
||||
|
||||
bridge := newTerminalBridge(d.terminalManager, d.logger, d.sendWSFrame)
|
||||
d.terminalBridgeMu.Lock()
|
||||
prev := d.terminalBridge
|
||||
d.terminalBridge = bridge
|
||||
d.terminalBridgeMu.Unlock()
|
||||
if prev != nil {
|
||||
prev.closeAll("ws_reconnect")
|
||||
}
|
||||
}
|
||||
|
||||
// clearWSWrites removes the writer pointer and tears down every live
|
||||
// terminal session bound to this connection. Called from the wakeup
|
||||
// connection's deferred cleanup.
|
||||
func (d *Daemon) clearWSWrites() {
|
||||
d.wsWritesMu.Lock()
|
||||
d.wsWrites = nil
|
||||
d.wsWritesMu.Unlock()
|
||||
|
||||
d.terminalBridgeMu.Lock()
|
||||
bridge := d.terminalBridge
|
||||
d.terminalBridge = nil
|
||||
d.terminalBridgeMu.Unlock()
|
||||
if bridge != nil {
|
||||
bridge.closeAll("ws_disconnect")
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Daemon) currentTerminalBridge() *terminalBridge {
|
||||
d.terminalBridgeMu.RLock()
|
||||
defer d.terminalBridgeMu.RUnlock()
|
||||
return d.terminalBridge
|
||||
}
|
||||
|
||||
// setAgentVersion records the detected CLI version for an agent provider so
|
||||
// later task-dispatch code (e.g. Codex sandbox policy) can read it.
|
||||
func (d *Daemon) setAgentVersion(provider, version string) {
|
||||
|
||||
14
server/internal/daemon/terminal/doc.go
Normal file
14
server/internal/daemon/terminal/doc.go
Normal file
@@ -0,0 +1,14 @@
|
||||
// Package terminal manages interactive PTY sessions bound to a task's
|
||||
// workdir on the local daemon.
|
||||
//
|
||||
// A Manager owns the lifecycle of all live PtySessions. Callers (the
|
||||
// daemonws bridge today, the CLI socket later) translate WebSocket
|
||||
// terminal.* frames into method calls on Manager, and the Manager
|
||||
// forwards PTY output back through a per-session Output channel.
|
||||
//
|
||||
// Sessions run with the daemon process's identity — there is no
|
||||
// additional sandbox. This is the same trust boundary as agent runs
|
||||
// (which are also daemon-spawned child processes). The Manager only
|
||||
// enforces that the requesting client's workspace matches the task's
|
||||
// workspace; anything beyond that is the OS's responsibility.
|
||||
package terminal
|
||||
14
server/internal/daemon/terminal/errors.go
Normal file
14
server/internal/daemon/terminal/errors.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package terminal
|
||||
|
||||
import "errors"
|
||||
|
||||
// Sentinel errors returned by Manager. Callers map these to the
|
||||
// protocol.TerminalErrorCode* constants when reporting to clients.
|
||||
var (
|
||||
ErrTaskNotFound = errors.New("terminal: task not found")
|
||||
ErrWorkspaceMismatch = errors.New("terminal: task belongs to a different workspace")
|
||||
ErrSessionNotFound = errors.New("terminal: session not found")
|
||||
ErrUnsupportedOS = errors.New("terminal: PTY not supported on this OS")
|
||||
ErrSpawnFailed = errors.New("terminal: failed to spawn shell")
|
||||
ErrManagerClosed = errors.New("terminal: manager is shut down")
|
||||
)
|
||||
347
server/internal/daemon/terminal/manager.go
Normal file
347
server/internal/daemon/terminal/manager.go
Normal file
@@ -0,0 +1,347 @@
|
||||
package terminal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// DefaultIdleTimeout is the recommended IdleTimeout for production
|
||||
// daemon wiring. Callers must set ManagerConfig.IdleTimeout to this
|
||||
// (or any positive duration) explicitly; zero/negative disables the
|
||||
// idle sweep.
|
||||
const DefaultIdleTimeout = 60 * time.Minute
|
||||
|
||||
// TaskInfo is the subset of task state the Manager needs to set up a PTY.
|
||||
// The daemon resolves a TaskID into TaskInfo via TaskLookup at open time.
|
||||
type TaskInfo struct {
|
||||
TaskID string
|
||||
WorkspaceID string
|
||||
IssueID string
|
||||
WorkDir string
|
||||
PriorSessionID string // injected as CLAUDE_SESSION_ID for `claude --resume`
|
||||
}
|
||||
|
||||
// TaskLookup resolves a TaskID into the workdir + workspace required to
|
||||
// open a PTY. Returns ErrTaskNotFound when the task is unknown. Lookups
|
||||
// hit the daemon's local task cache in production; tests supply a stub.
|
||||
type TaskLookup func(ctx context.Context, taskID string) (TaskInfo, error)
|
||||
|
||||
// OpenParams is the input to Manager.Open.
|
||||
type OpenParams struct {
|
||||
// TaskID identifies the workdir the PTY should run in.
|
||||
TaskID string
|
||||
// WorkspaceID is the workspace the caller is acting on behalf of.
|
||||
// Open rejects the request if it does not match the task's workspace
|
||||
// (cross-workspace ACL — clients never see other workspaces' workdirs).
|
||||
WorkspaceID string
|
||||
// UserID is the human user who opened the terminal. Logged in audit
|
||||
// records; the PTY itself runs as the daemon process owner.
|
||||
UserID string
|
||||
// Cols/Rows seed the initial PTY window size. Zero values default to 80x24.
|
||||
Cols uint16
|
||||
Rows uint16
|
||||
}
|
||||
|
||||
// ManagerConfig tunes Manager behaviour. Zero values are sensible defaults.
|
||||
type ManagerConfig struct {
|
||||
// Shell to spawn for each session. Defaults to "bash" with "-l".
|
||||
// Overridable for tests; the production daemon hardcodes bash for now
|
||||
// (RFC open question #4 — shell selection deferred to a later release).
|
||||
ShellPath string
|
||||
ShellArgs []string
|
||||
|
||||
// IdleTimeout closes a session that has had no I/O for this long.
|
||||
// Zero or negative disables the sweep entirely. Production daemon
|
||||
// wiring should pass DefaultIdleTimeout explicitly; we intentionally
|
||||
// don't default here so callers stay in control (the docs page for
|
||||
// this package previously said "0 disables" while NewManager silently
|
||||
// rewrote 0 to 60min — those two have to agree).
|
||||
IdleTimeout time.Duration
|
||||
|
||||
// Spawner overrides PTY spawning. Defaults to ptyStartShell which
|
||||
// shells out to creack/pty. Tests inject a fake to avoid forking.
|
||||
Spawner Spawner
|
||||
|
||||
// Now returns the current time. Defaults to time.Now. Tests inject a
|
||||
// fake clock to drive IdleTimeout deterministically.
|
||||
Now func() time.Time
|
||||
|
||||
// Logger receives operational events. Defaults to slog.Default().
|
||||
Logger *slog.Logger
|
||||
}
|
||||
|
||||
// Manager owns all live PtySessions on this daemon. It is safe for
|
||||
// concurrent use.
|
||||
type Manager struct {
|
||||
cfg ManagerConfig
|
||||
lookup TaskLookup
|
||||
|
||||
mu sync.Mutex
|
||||
sessions map[string]*PtySession
|
||||
closed bool
|
||||
// closeDone is closed by the first Close() caller AFTER finalize
|
||||
// finishes (every session deregistered, Done() closed). Subsequent
|
||||
// concurrent callers wait on it instead of racing past, so all
|
||||
// Close() returns share the same "manager fully drained" guarantee.
|
||||
closeDone chan struct{}
|
||||
}
|
||||
|
||||
// NewManager constructs a Manager. lookup may be nil in tests that only
|
||||
// exercise direct session APIs.
|
||||
func NewManager(cfg ManagerConfig, lookup TaskLookup) *Manager {
|
||||
if cfg.ShellPath == "" {
|
||||
cfg.ShellPath = "bash"
|
||||
cfg.ShellArgs = []string{"-l"}
|
||||
}
|
||||
// IdleTimeout intentionally not defaulted — see ManagerConfig.
|
||||
if cfg.Spawner == nil {
|
||||
cfg.Spawner = realSpawner{}
|
||||
}
|
||||
if cfg.Now == nil {
|
||||
cfg.Now = time.Now
|
||||
}
|
||||
if cfg.Logger == nil {
|
||||
cfg.Logger = slog.Default()
|
||||
}
|
||||
return &Manager{
|
||||
cfg: cfg,
|
||||
lookup: lookup,
|
||||
sessions: make(map[string]*PtySession),
|
||||
closeDone: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Open spawns a new PTY session for the given task. The returned
|
||||
// session is also registered with the manager and retrievable via Get.
|
||||
func (m *Manager) Open(ctx context.Context, p OpenParams) (*PtySession, error) {
|
||||
if m.lookup == nil {
|
||||
return nil, fmt.Errorf("terminal: Manager has no TaskLookup configured")
|
||||
}
|
||||
info, err := m.lookup(ctx, p.TaskID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if info.WorkspaceID != p.WorkspaceID {
|
||||
return nil, ErrWorkspaceMismatch
|
||||
}
|
||||
if info.WorkDir == "" {
|
||||
return nil, ErrTaskNotFound
|
||||
}
|
||||
return m.openWith(info, p)
|
||||
}
|
||||
|
||||
// OpenWithInfo spawns a PTY against a caller-supplied TaskInfo, bypassing
|
||||
// the configured TaskLookup. Production daemon wiring takes this path
|
||||
// because the server resolves task metadata from its DB before forwarding
|
||||
// terminal.open over the daemonws hub — the daemon trusts the server-
|
||||
// authenticated payload and does not have its own task cache. The
|
||||
// workspace mismatch check still runs so a server bug or a relayed frame
|
||||
// cannot cross workspace boundaries.
|
||||
func (m *Manager) OpenWithInfo(_ context.Context, info TaskInfo, p OpenParams) (*PtySession, error) {
|
||||
if info.WorkspaceID != p.WorkspaceID {
|
||||
return nil, ErrWorkspaceMismatch
|
||||
}
|
||||
if info.WorkDir == "" {
|
||||
return nil, ErrTaskNotFound
|
||||
}
|
||||
if info.TaskID == "" {
|
||||
info.TaskID = p.TaskID
|
||||
}
|
||||
return m.openWith(info, p)
|
||||
}
|
||||
|
||||
func (m *Manager) openWith(info TaskInfo, p OpenParams) (*PtySession, error) {
|
||||
cols, rows := normalizeSize(p.Cols, p.Rows)
|
||||
env := buildEnv(info, p.UserID)
|
||||
|
||||
m.mu.Lock()
|
||||
if m.closed {
|
||||
m.mu.Unlock()
|
||||
return nil, ErrManagerClosed
|
||||
}
|
||||
m.mu.Unlock()
|
||||
|
||||
startedAt := m.cfg.Now()
|
||||
pty, err := m.cfg.Spawner.Start(SpawnRequest{
|
||||
Shell: m.cfg.ShellPath,
|
||||
Args: m.cfg.ShellArgs,
|
||||
Cwd: info.WorkDir,
|
||||
Env: env,
|
||||
Cols: cols,
|
||||
Rows: rows,
|
||||
Started: startedAt,
|
||||
})
|
||||
if err != nil {
|
||||
// Double-%w so errors.Is matches both ErrSpawnFailed AND any
|
||||
// sentinel the spawner surfaced (notably ErrUnsupportedOS from
|
||||
// the windows stub — the protocol layer needs to distinguish
|
||||
// "no PTY on this OS" from generic spawn failures).
|
||||
return nil, fmt.Errorf("%w: %w", ErrSpawnFailed, err)
|
||||
}
|
||||
|
||||
sess := &PtySession{
|
||||
id: uuid.NewString(),
|
||||
taskID: info.TaskID,
|
||||
workspaceID: info.WorkspaceID,
|
||||
issueID: info.IssueID,
|
||||
workDir: info.WorkDir,
|
||||
userID: p.UserID,
|
||||
shellPath: m.cfg.ShellPath,
|
||||
cols: cols,
|
||||
rows: rows,
|
||||
pty: pty,
|
||||
output: make(chan []byte, 64),
|
||||
exit: make(chan ExitInfo, 1),
|
||||
done: make(chan struct{}),
|
||||
stop: make(chan struct{}),
|
||||
now: m.cfg.Now,
|
||||
idleTimeout: m.cfg.IdleTimeout,
|
||||
startedAt: startedAt,
|
||||
lastIO: startedAt,
|
||||
logger: m.cfg.Logger.With("session_id_pending", true, "task_id", info.TaskID),
|
||||
onClose: func(id string) { m.deregister(id) },
|
||||
}
|
||||
sess.logger = m.cfg.Logger.With("session_id", sess.id, "task_id", info.TaskID)
|
||||
|
||||
m.mu.Lock()
|
||||
if m.closed {
|
||||
m.mu.Unlock()
|
||||
_ = pty.Close()
|
||||
// We won that race: spawn succeeded but the manager closed before
|
||||
// we could register the session, so waitLoop never runs. Reap the
|
||||
// child synchronously here — pty.Close fires SIGHUP/SIGKILL but
|
||||
// only Wait() collects the exit status, otherwise the unix child
|
||||
// stays around as a zombie until the daemon process dies.
|
||||
_, _ = pty.Wait()
|
||||
return nil, ErrManagerClosed
|
||||
}
|
||||
m.sessions[sess.id] = sess
|
||||
m.mu.Unlock()
|
||||
|
||||
sess.start()
|
||||
return sess, nil
|
||||
}
|
||||
|
||||
// Get returns the session with the given id, or ErrSessionNotFound.
|
||||
func (m *Manager) Get(id string) (*PtySession, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
sess, ok := m.sessions[id]
|
||||
if !ok {
|
||||
return nil, ErrSessionNotFound
|
||||
}
|
||||
return sess, nil
|
||||
}
|
||||
|
||||
// Sessions returns a snapshot of currently registered session IDs.
|
||||
func (m *Manager) Sessions() []string {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
ids := make([]string, 0, len(m.sessions))
|
||||
for id := range m.sessions {
|
||||
ids = append(ids, id)
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
// Close tears down every live session and refuses subsequent Open calls.
|
||||
// Safe to call concurrently from multiple goroutines: the first caller
|
||||
// runs the actual teardown, the rest block on closeDone until that
|
||||
// teardown is fully observable. Every Close() return — first or Nth —
|
||||
// thus carries the same "manager drained, every session finalized"
|
||||
// guarantee that downstream GC/audit cleanup depends on.
|
||||
func (m *Manager) Close() {
|
||||
m.mu.Lock()
|
||||
if m.closed {
|
||||
done := m.closeDone
|
||||
m.mu.Unlock()
|
||||
<-done
|
||||
return
|
||||
}
|
||||
m.closed = true
|
||||
live := make([]*PtySession, 0, len(m.sessions))
|
||||
for _, s := range m.sessions {
|
||||
live = append(live, s)
|
||||
}
|
||||
m.mu.Unlock()
|
||||
// Parallel: each session.Close blocks for the unix spawner's
|
||||
// SIGHUP→grace→SIGKILL window. Running serially would multiply
|
||||
// shutdown latency by N sessions. We additionally wait on each
|
||||
// session's Done() so Manager.Close returning is a hard guarantee
|
||||
// that every session finalized (output closed, deregistered, done
|
||||
// fired) — downstream GC/audit cleanup relies on this.
|
||||
var wg sync.WaitGroup
|
||||
for _, s := range live {
|
||||
wg.Add(1)
|
||||
go func(s *PtySession) {
|
||||
defer wg.Done()
|
||||
s.Close("manager_shutdown")
|
||||
<-s.Done()
|
||||
}(s)
|
||||
}
|
||||
wg.Wait()
|
||||
close(m.closeDone)
|
||||
}
|
||||
|
||||
// CheckIdle walks every session and closes those whose idle interval
|
||||
// has elapsed. The daemon's existing GC loop calls this periodically;
|
||||
// each session also self-monitors via its own timer for cases where the
|
||||
// outer loop runs at a coarser cadence than IdleTimeout.
|
||||
func (m *Manager) CheckIdle() {
|
||||
if m.cfg.IdleTimeout <= 0 {
|
||||
return
|
||||
}
|
||||
m.mu.Lock()
|
||||
sessions := make([]*PtySession, 0, len(m.sessions))
|
||||
for _, s := range m.sessions {
|
||||
sessions = append(sessions, s)
|
||||
}
|
||||
m.mu.Unlock()
|
||||
|
||||
now := m.cfg.Now()
|
||||
for _, s := range sessions {
|
||||
if now.Sub(s.LastIO()) >= m.cfg.IdleTimeout {
|
||||
s.Close("idle_timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) deregister(id string) {
|
||||
m.mu.Lock()
|
||||
delete(m.sessions, id)
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func normalizeSize(cols, rows uint16) (uint16, uint16) {
|
||||
if cols == 0 {
|
||||
cols = 80
|
||||
}
|
||||
if rows == 0 {
|
||||
rows = 24
|
||||
}
|
||||
return cols, rows
|
||||
}
|
||||
|
||||
func buildEnv(info TaskInfo, userID string) []string {
|
||||
env := []string{
|
||||
"MULTICA_WORKSPACE_ID=" + info.WorkspaceID,
|
||||
"MULTICA_TASK_ID=" + info.TaskID,
|
||||
}
|
||||
if info.IssueID != "" {
|
||||
env = append(env, "MULTICA_ISSUE_ID="+info.IssueID)
|
||||
}
|
||||
if userID != "" {
|
||||
env = append(env, "MULTICA_USER_ID="+userID)
|
||||
}
|
||||
if info.PriorSessionID != "" {
|
||||
// Injected so `claude --resume $CLAUDE_SESSION_ID` continues the
|
||||
// same session that the agent run was using (see RFC §Resume).
|
||||
env = append(env, "CLAUDE_SESSION_ID="+info.PriorSessionID)
|
||||
}
|
||||
return env
|
||||
}
|
||||
898
server/internal/daemon/terminal/manager_test.go
Normal file
898
server/internal/daemon/terminal/manager_test.go
Normal file
@@ -0,0 +1,898 @@
|
||||
package terminal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// fakePTY is a Spawner-served stand-in for a real PTY. Tests push child
|
||||
// output via WriteFromChild and read client input via ReadFromClient.
|
||||
type fakePTY struct {
|
||||
t *testing.T
|
||||
|
||||
// child -> client (output queue, read by readLoop)
|
||||
childToClient chan []byte
|
||||
|
||||
// client -> child (writes captured into a buffer slice under mu)
|
||||
mu sync.Mutex
|
||||
clientWrote [][]byte
|
||||
cols, rows uint16
|
||||
|
||||
// closeOnce coordinates teardown
|
||||
closeOnce sync.Once
|
||||
closeCh chan struct{}
|
||||
|
||||
// waitDone signals Wait can return. Defaults closed by Close.
|
||||
waitOnce sync.Once
|
||||
waitDone chan struct{}
|
||||
// waitDelay (optional) sleeps inside Wait AFTER waitDone fires.
|
||||
// Lets tests prove Manager.Close waits for session finalize rather
|
||||
// than just for s.Close() to return.
|
||||
waitDelay time.Duration
|
||||
exitCode int32
|
||||
resizedCh chan [2]uint16
|
||||
closed atomic.Bool
|
||||
// waitCount tracks how many times Wait() was invoked. Lets tests
|
||||
// assert the cleanup path reaped the child even when no session was
|
||||
// ever registered (Manager.Close racing Open).
|
||||
waitCount atomic.Int32
|
||||
}
|
||||
|
||||
func newFakePTY(t *testing.T, cols, rows uint16) *fakePTY {
|
||||
return &fakePTY{
|
||||
t: t,
|
||||
childToClient: make(chan []byte, 8),
|
||||
cols: cols,
|
||||
rows: rows,
|
||||
closeCh: make(chan struct{}),
|
||||
waitDone: make(chan struct{}),
|
||||
resizedCh: make(chan [2]uint16, 8),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *fakePTY) Read(b []byte) (int, error) {
|
||||
select {
|
||||
case chunk, ok := <-p.childToClient:
|
||||
if !ok {
|
||||
return 0, io.EOF
|
||||
}
|
||||
n := copy(b, chunk)
|
||||
return n, nil
|
||||
case <-p.closeCh:
|
||||
return 0, io.EOF
|
||||
}
|
||||
}
|
||||
|
||||
func (p *fakePTY) Write(b []byte) (int, error) {
|
||||
if p.closed.Load() {
|
||||
return 0, io.ErrClosedPipe
|
||||
}
|
||||
p.mu.Lock()
|
||||
c := make([]byte, len(b))
|
||||
copy(c, b)
|
||||
p.clientWrote = append(p.clientWrote, c)
|
||||
p.mu.Unlock()
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
func (p *fakePTY) Resize(cols, rows uint16) error {
|
||||
if p.closed.Load() {
|
||||
return io.ErrClosedPipe
|
||||
}
|
||||
p.mu.Lock()
|
||||
p.cols, p.rows = cols, rows
|
||||
p.mu.Unlock()
|
||||
select {
|
||||
case p.resizedCh <- [2]uint16{cols, rows}:
|
||||
default:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *fakePTY) Wait() (int, error) {
|
||||
p.waitCount.Add(1)
|
||||
<-p.waitDone
|
||||
if p.waitDelay > 0 {
|
||||
time.Sleep(p.waitDelay)
|
||||
}
|
||||
return int(atomic.LoadInt32(&p.exitCode)), nil
|
||||
}
|
||||
|
||||
func (p *fakePTY) Close() error {
|
||||
p.closeOnce.Do(func() {
|
||||
p.closed.Store(true)
|
||||
close(p.closeCh)
|
||||
close(p.childToClient)
|
||||
p.waitOnce.Do(func() { close(p.waitDone) })
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// pushChildOutput simulates the shell writing bytes to its stdout/stderr.
|
||||
func (p *fakePTY) pushChildOutput(b []byte) {
|
||||
select {
|
||||
case p.childToClient <- b:
|
||||
case <-time.After(time.Second):
|
||||
p.t.Fatalf("childToClient send timed out — readLoop not draining")
|
||||
}
|
||||
}
|
||||
|
||||
func (p *fakePTY) writes() [][]byte {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
out := make([][]byte, len(p.clientWrote))
|
||||
copy(out, p.clientWrote)
|
||||
return out
|
||||
}
|
||||
|
||||
func (p *fakePTY) size() (uint16, uint16) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
return p.cols, p.rows
|
||||
}
|
||||
|
||||
// fakeSpawner records each spawn so tests can inspect injected env / cwd.
|
||||
type fakeSpawner struct {
|
||||
t *testing.T
|
||||
spawnsMu sync.Mutex
|
||||
spawns []SpawnRequest
|
||||
make func(*testing.T, SpawnRequest) (*fakePTY, error)
|
||||
}
|
||||
|
||||
func (s *fakeSpawner) Start(req SpawnRequest) (PTY, error) {
|
||||
s.spawnsMu.Lock()
|
||||
s.spawns = append(s.spawns, req)
|
||||
s.spawnsMu.Unlock()
|
||||
pty, err := s.make(s.t, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pty, nil
|
||||
}
|
||||
|
||||
func (s *fakeSpawner) lastRequest() SpawnRequest {
|
||||
s.spawnsMu.Lock()
|
||||
defer s.spawnsMu.Unlock()
|
||||
if len(s.spawns) == 0 {
|
||||
return SpawnRequest{}
|
||||
}
|
||||
return s.spawns[len(s.spawns)-1]
|
||||
}
|
||||
|
||||
// helper: build a Manager with a default fake spawner and a single task.
|
||||
type fixture struct {
|
||||
mgr *Manager
|
||||
spawner *fakeSpawner
|
||||
tasks map[string]TaskInfo
|
||||
now func() time.Time
|
||||
clockMu sync.Mutex
|
||||
clock time.Time
|
||||
}
|
||||
|
||||
func newFixture(t *testing.T, opts ...func(*ManagerConfig)) *fixture {
|
||||
f := &fixture{
|
||||
tasks: map[string]TaskInfo{
|
||||
"task-1": {
|
||||
TaskID: "task-1",
|
||||
WorkspaceID: "ws-A",
|
||||
IssueID: "issue-1",
|
||||
WorkDir: t.TempDir(),
|
||||
PriorSessionID: "claude-session-xyz",
|
||||
},
|
||||
},
|
||||
clock: time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC),
|
||||
}
|
||||
f.now = func() time.Time {
|
||||
f.clockMu.Lock()
|
||||
defer f.clockMu.Unlock()
|
||||
return f.clock
|
||||
}
|
||||
f.spawner = &fakeSpawner{
|
||||
t: t,
|
||||
make: func(tt *testing.T, req SpawnRequest) (*fakePTY, error) { return newFakePTY(tt, req.Cols, req.Rows), nil },
|
||||
}
|
||||
cfg := ManagerConfig{
|
||||
ShellPath: "/usr/bin/bash",
|
||||
ShellArgs: []string{"-l"},
|
||||
IdleTimeout: 0,
|
||||
Spawner: f.spawner,
|
||||
Now: f.now,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(&cfg)
|
||||
}
|
||||
lookup := func(_ context.Context, id string) (TaskInfo, error) {
|
||||
info, ok := f.tasks[id]
|
||||
if !ok {
|
||||
return TaskInfo{}, ErrTaskNotFound
|
||||
}
|
||||
return info, nil
|
||||
}
|
||||
f.mgr = NewManager(cfg, lookup)
|
||||
return f
|
||||
}
|
||||
|
||||
func (f *fixture) advance(d time.Duration) {
|
||||
f.clockMu.Lock()
|
||||
f.clock = f.clock.Add(d)
|
||||
f.clockMu.Unlock()
|
||||
}
|
||||
|
||||
// drainPTY pulls the *fakePTY back out of the spawner so tests can drive it.
|
||||
func (f *fixture) lastPTY(t *testing.T) *fakePTY {
|
||||
t.Helper()
|
||||
req := f.spawner.lastRequest()
|
||||
if req.Shell == "" {
|
||||
t.Fatal("no spawn recorded")
|
||||
}
|
||||
// The Spawner.make closure always returns a *fakePTY; the manager
|
||||
// wraps it as a PTY interface and we don't retain the concrete in
|
||||
// the manager. Re-acquire via the registry by walking sessions.
|
||||
for _, id := range f.mgr.Sessions() {
|
||||
s, err := f.mgr.Get(id)
|
||||
if err == nil {
|
||||
if fp, ok := s.pty.(*fakePTY); ok {
|
||||
return fp
|
||||
}
|
||||
}
|
||||
}
|
||||
t.Fatal("no fake PTY found in any registered session")
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestManager_OpenSpawnsWithInjectedEnvAndCwd(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
defer f.mgr.Close()
|
||||
|
||||
sess, err := f.mgr.Open(context.Background(), OpenParams{
|
||||
TaskID: "task-1",
|
||||
WorkspaceID: "ws-A",
|
||||
UserID: "user-42",
|
||||
Cols: 120,
|
||||
Rows: 40,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Open: %v", err)
|
||||
}
|
||||
|
||||
if sess.ID() == "" {
|
||||
t.Fatal("session ID empty")
|
||||
}
|
||||
if got := sess.WorkDir(); got != f.tasks["task-1"].WorkDir {
|
||||
t.Errorf("workdir = %q, want %q", got, f.tasks["task-1"].WorkDir)
|
||||
}
|
||||
|
||||
req := f.spawner.lastRequest()
|
||||
if req.Cwd != f.tasks["task-1"].WorkDir {
|
||||
t.Errorf("spawn cwd = %q, want %q", req.Cwd, f.tasks["task-1"].WorkDir)
|
||||
}
|
||||
if req.Cols != 120 || req.Rows != 40 {
|
||||
t.Errorf("spawn size = %dx%d, want 120x40", req.Cols, req.Rows)
|
||||
}
|
||||
|
||||
wantEnv := map[string]string{
|
||||
"MULTICA_WORKSPACE_ID": "ws-A",
|
||||
"MULTICA_TASK_ID": "task-1",
|
||||
"MULTICA_ISSUE_ID": "issue-1",
|
||||
"MULTICA_USER_ID": "user-42",
|
||||
"CLAUDE_SESSION_ID": "claude-session-xyz",
|
||||
}
|
||||
envMap := map[string]string{}
|
||||
for _, kv := range req.Env {
|
||||
for i := 0; i < len(kv); i++ {
|
||||
if kv[i] == '=' {
|
||||
envMap[kv[:i]] = kv[i+1:]
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
for k, want := range wantEnv {
|
||||
if got := envMap[k]; got != want {
|
||||
t.Errorf("env %s = %q, want %q", k, got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_DefaultSize(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
defer f.mgr.Close()
|
||||
|
||||
_, err := f.mgr.Open(context.Background(), OpenParams{
|
||||
TaskID: "task-1",
|
||||
WorkspaceID: "ws-A",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Open: %v", err)
|
||||
}
|
||||
req := f.spawner.lastRequest()
|
||||
if req.Cols != 80 || req.Rows != 24 {
|
||||
t.Errorf("default size = %dx%d, want 80x24", req.Cols, req.Rows)
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_RejectsCrossWorkspace(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
defer f.mgr.Close()
|
||||
|
||||
_, err := f.mgr.Open(context.Background(), OpenParams{
|
||||
TaskID: "task-1",
|
||||
WorkspaceID: "ws-B-not-the-tasks-workspace",
|
||||
})
|
||||
if !errors.Is(err, ErrWorkspaceMismatch) {
|
||||
t.Fatalf("Open err = %v, want ErrWorkspaceMismatch", err)
|
||||
}
|
||||
if got := len(f.mgr.Sessions()); got != 0 {
|
||||
t.Errorf("Sessions after rejected open = %d, want 0", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_OpenWithInfoBypassesLookup(t *testing.T) {
|
||||
// Manager is built with a nil lookup; OpenWithInfo must still succeed
|
||||
// when the server-supplied TaskInfo is well-formed. This is the path
|
||||
// daemonws hits in production — the daemon has no task cache, so the
|
||||
// server resolves task metadata and embeds it on the protocol.
|
||||
spawner := &fakeSpawner{
|
||||
t: t,
|
||||
make: func(tt *testing.T, req SpawnRequest) (*fakePTY, error) { return newFakePTY(tt, req.Cols, req.Rows), nil },
|
||||
}
|
||||
mgr := NewManager(ManagerConfig{
|
||||
ShellPath: "/usr/bin/bash",
|
||||
ShellArgs: []string{"-l"},
|
||||
Spawner: spawner,
|
||||
}, nil)
|
||||
defer mgr.Close()
|
||||
|
||||
dir := t.TempDir()
|
||||
info := TaskInfo{
|
||||
TaskID: "task-via-ws",
|
||||
WorkspaceID: "ws-A",
|
||||
IssueID: "issue-1",
|
||||
WorkDir: dir,
|
||||
PriorSessionID: "claude-session-xyz",
|
||||
}
|
||||
sess, err := mgr.OpenWithInfo(context.Background(), info, OpenParams{
|
||||
TaskID: "task-via-ws",
|
||||
WorkspaceID: "ws-A",
|
||||
UserID: "user-1",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("OpenWithInfo: %v", err)
|
||||
}
|
||||
if sess.WorkDir() != dir {
|
||||
t.Errorf("WorkDir = %q, want %q", sess.WorkDir(), dir)
|
||||
}
|
||||
req := spawner.lastRequest()
|
||||
if req.Cwd != dir {
|
||||
t.Errorf("spawn cwd = %q, want %q", req.Cwd, dir)
|
||||
}
|
||||
gotEnv := map[string]string{}
|
||||
for _, kv := range req.Env {
|
||||
eq := -1
|
||||
for i, c := range kv {
|
||||
if c == '=' {
|
||||
eq = i
|
||||
break
|
||||
}
|
||||
}
|
||||
if eq > 0 {
|
||||
gotEnv[kv[:eq]] = kv[eq+1:]
|
||||
}
|
||||
}
|
||||
if gotEnv["MULTICA_WORKSPACE_ID"] != "ws-A" {
|
||||
t.Errorf("MULTICA_WORKSPACE_ID = %q, want ws-A", gotEnv["MULTICA_WORKSPACE_ID"])
|
||||
}
|
||||
if gotEnv["MULTICA_ISSUE_ID"] != "issue-1" {
|
||||
t.Errorf("MULTICA_ISSUE_ID = %q, want issue-1", gotEnv["MULTICA_ISSUE_ID"])
|
||||
}
|
||||
if gotEnv["MULTICA_TASK_ID"] != "task-via-ws" {
|
||||
t.Errorf("MULTICA_TASK_ID = %q, want task-via-ws", gotEnv["MULTICA_TASK_ID"])
|
||||
}
|
||||
if gotEnv["CLAUDE_SESSION_ID"] != "claude-session-xyz" {
|
||||
t.Errorf("CLAUDE_SESSION_ID = %q, want claude-session-xyz", gotEnv["CLAUDE_SESSION_ID"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_OpenWithInfoRejectsCrossWorkspace(t *testing.T) {
|
||||
// OpenWithInfo skips the lookup, but the workspace_id check on the
|
||||
// caller-supplied OpenParams vs. the supplied TaskInfo still has to
|
||||
// run — otherwise a misrouted frame from one workspace could attach
|
||||
// to another workspace's workdir.
|
||||
spawner := &fakeSpawner{
|
||||
t: t,
|
||||
make: func(tt *testing.T, req SpawnRequest) (*fakePTY, error) { return newFakePTY(tt, req.Cols, req.Rows), nil },
|
||||
}
|
||||
mgr := NewManager(ManagerConfig{Spawner: spawner}, nil)
|
||||
defer mgr.Close()
|
||||
|
||||
_, err := mgr.OpenWithInfo(context.Background(), TaskInfo{
|
||||
TaskID: "task-via-ws",
|
||||
WorkspaceID: "ws-A",
|
||||
WorkDir: t.TempDir(),
|
||||
}, OpenParams{
|
||||
TaskID: "task-via-ws",
|
||||
WorkspaceID: "ws-B",
|
||||
})
|
||||
if !errors.Is(err, ErrWorkspaceMismatch) {
|
||||
t.Fatalf("err = %v, want ErrWorkspaceMismatch", err)
|
||||
}
|
||||
if got := len(mgr.Sessions()); got != 0 {
|
||||
t.Errorf("Sessions after rejected OpenWithInfo = %d, want 0", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_RejectsUnknownTask(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
defer f.mgr.Close()
|
||||
|
||||
_, err := f.mgr.Open(context.Background(), OpenParams{
|
||||
TaskID: "does-not-exist",
|
||||
WorkspaceID: "ws-A",
|
||||
})
|
||||
if !errors.Is(err, ErrTaskNotFound) {
|
||||
t.Fatalf("Open err = %v, want ErrTaskNotFound", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSession_DataRoundTrip(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
defer f.mgr.Close()
|
||||
|
||||
sess, err := f.mgr.Open(context.Background(), OpenParams{
|
||||
TaskID: "task-1",
|
||||
WorkspaceID: "ws-A",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Open: %v", err)
|
||||
}
|
||||
pty := f.lastPTY(t)
|
||||
|
||||
// client → child
|
||||
if _, err := sess.Write([]byte("ls -al\n")); err != nil {
|
||||
t.Fatalf("Write: %v", err)
|
||||
}
|
||||
// child → client
|
||||
pty.pushChildOutput([]byte("total 0\n"))
|
||||
|
||||
select {
|
||||
case got := <-sess.Output():
|
||||
if string(got) != "total 0\n" {
|
||||
t.Errorf("Output chunk = %q, want %q", got, "total 0\n")
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timed out waiting for Output chunk")
|
||||
}
|
||||
|
||||
writes := pty.writes()
|
||||
if len(writes) != 1 || string(writes[0]) != "ls -al\n" {
|
||||
t.Errorf("recorded writes = %#v, want one 'ls -al\\n'", writes)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSession_Resize(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
defer f.mgr.Close()
|
||||
|
||||
sess, err := f.mgr.Open(context.Background(), OpenParams{
|
||||
TaskID: "task-1",
|
||||
WorkspaceID: "ws-A",
|
||||
Cols: 80, Rows: 24,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Open: %v", err)
|
||||
}
|
||||
pty := f.lastPTY(t)
|
||||
|
||||
if err := sess.Resize(132, 50); err != nil {
|
||||
t.Fatalf("Resize: %v", err)
|
||||
}
|
||||
|
||||
c, r := sess.Size()
|
||||
if c != 132 || r != 50 {
|
||||
t.Errorf("Size = %dx%d, want 132x50", c, r)
|
||||
}
|
||||
gc, gr := pty.size()
|
||||
if gc != 132 || gr != 50 {
|
||||
t.Errorf("PTY size = %dx%d, want 132x50", gc, gr)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSession_CloseDeregistersAndDelivers(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
defer f.mgr.Close()
|
||||
|
||||
sess, err := f.mgr.Open(context.Background(), OpenParams{
|
||||
TaskID: "task-1",
|
||||
WorkspaceID: "ws-A",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Open: %v", err)
|
||||
}
|
||||
id := sess.ID()
|
||||
|
||||
sess.Close("user_requested")
|
||||
|
||||
select {
|
||||
case info := <-sess.ExitC():
|
||||
if info.Reason != "user_requested" {
|
||||
t.Errorf("exit reason = %q, want user_requested", info.Reason)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("timed out waiting for ExitC")
|
||||
}
|
||||
|
||||
// Output should close once exit fires; verify by ranging.
|
||||
drained := false
|
||||
for range sess.Output() {
|
||||
drained = true
|
||||
}
|
||||
_ = drained
|
||||
|
||||
<-sess.Done()
|
||||
|
||||
// Session must be deregistered.
|
||||
if _, err := f.mgr.Get(id); !errors.Is(err, ErrSessionNotFound) {
|
||||
t.Errorf("Get after Close = %v, want ErrSessionNotFound", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_IdleTimeoutSweep(t *testing.T) {
|
||||
f := newFixture(t, func(c *ManagerConfig) {
|
||||
c.IdleTimeout = 30 * time.Minute
|
||||
})
|
||||
defer f.mgr.Close()
|
||||
|
||||
sess, err := f.mgr.Open(context.Background(), OpenParams{
|
||||
TaskID: "task-1",
|
||||
WorkspaceID: "ws-A",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Open: %v", err)
|
||||
}
|
||||
|
||||
// 29 minutes — still active.
|
||||
f.advance(29 * time.Minute)
|
||||
f.mgr.CheckIdle()
|
||||
if _, err := f.mgr.Get(sess.ID()); err != nil {
|
||||
t.Fatalf("session evicted before idle timeout: %v", err)
|
||||
}
|
||||
|
||||
// Cross the threshold.
|
||||
f.advance(2 * time.Minute)
|
||||
f.mgr.CheckIdle()
|
||||
|
||||
select {
|
||||
case info := <-sess.ExitC():
|
||||
if info.Reason != "idle_timeout" {
|
||||
t.Errorf("exit reason = %q, want idle_timeout", info.Reason)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("timed out waiting for idle close")
|
||||
}
|
||||
|
||||
if _, err := f.mgr.Get(sess.ID()); !errors.Is(err, ErrSessionNotFound) {
|
||||
t.Errorf("session not deregistered after idle sweep")
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_CloseTearsDownAllSessions(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
s1, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
|
||||
if err != nil {
|
||||
t.Fatalf("Open: %v", err)
|
||||
}
|
||||
s2, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
|
||||
if err != nil {
|
||||
t.Fatalf("Open: %v", err)
|
||||
}
|
||||
|
||||
f.mgr.Close()
|
||||
|
||||
for _, s := range []*PtySession{s1, s2} {
|
||||
select {
|
||||
case <-s.Done():
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("session %s did not tear down", s.ID())
|
||||
}
|
||||
}
|
||||
if got := len(f.mgr.Sessions()); got != 0 {
|
||||
t.Errorf("Sessions after Manager.Close = %d, want 0", got)
|
||||
}
|
||||
|
||||
// Subsequent opens must be rejected.
|
||||
if _, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"}); !errors.Is(err, ErrManagerClosed) {
|
||||
t.Errorf("Open after Close = %v, want ErrManagerClosed", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSession_CloseWithFullOutputBufferDoesNotPanic(t *testing.T) {
|
||||
// Regression: Close used to race with readLoop's "output <- chunk"
|
||||
// when the channel was full. waitLoop closed output unconditionally,
|
||||
// which could panic on send-to-closed-channel. The new lifecycle
|
||||
// has waitLoop wait on a WaitGroup so readLoop's blocked send
|
||||
// unblocks via <-stop before the close runs.
|
||||
f := newFixture(t)
|
||||
defer f.mgr.Close()
|
||||
// Override on the existing spawner so newFixture's wiring (and
|
||||
// f.spawner.lastRequest tracking) still works.
|
||||
f.spawner.make = func(tt *testing.T, req SpawnRequest) (*fakePTY, error) {
|
||||
p := newFakePTY(tt, req.Cols, req.Rows)
|
||||
// Give the child-side queue plenty of room so the test can
|
||||
// saturate the *session* output buffer before childToClient
|
||||
// back-pressures the producer goroutine.
|
||||
p.childToClient = make(chan []byte, 256)
|
||||
return p, nil
|
||||
}
|
||||
|
||||
sess, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
|
||||
if err != nil {
|
||||
t.Fatalf("Open: %v", err)
|
||||
}
|
||||
pty := f.lastPTY(t)
|
||||
|
||||
// Pump enough chunks to fill session.output (cap 64) and queue more
|
||||
// on childToClient; readLoop ends up blocked on output <- chunk.
|
||||
// Don't drain sess.Output() — that's the whole point. Producer runs
|
||||
// to completion (and exits) BEFORE Close, otherwise producer's send
|
||||
// races Close's pty.Close which closes childToClient.
|
||||
producerDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(producerDone)
|
||||
for i := 0; i < 200; i++ {
|
||||
select {
|
||||
case pty.childToClient <- []byte("x"):
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
<-producerDone
|
||||
|
||||
// Should not panic, should not hang.
|
||||
sess.Close("user_requested")
|
||||
|
||||
select {
|
||||
case <-sess.Done():
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Done did not converge after Close with saturated output buffer")
|
||||
}
|
||||
|
||||
if _, err := f.mgr.Get(sess.ID()); !errors.Is(err, ErrSessionNotFound) {
|
||||
t.Errorf("session not deregistered after Close")
|
||||
}
|
||||
|
||||
// ExitC must have fired before Done — required by the Output() doc
|
||||
// contract ("channel closes after the child exits and a value has
|
||||
// been delivered on ExitC()").
|
||||
select {
|
||||
case info := <-sess.ExitC():
|
||||
if info.Reason != "user_requested" {
|
||||
t.Errorf("exit reason = %q, want user_requested", info.Reason)
|
||||
}
|
||||
default:
|
||||
t.Error("ExitC was empty after Done — finalize order violated")
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_OpenPropagatesUnsupportedOS(t *testing.T) {
|
||||
// Regression: Manager.Open used fmt.Errorf("%w: %v", ErrSpawnFailed, err)
|
||||
// which swallowed the inner sentinel. The protocol layer needs
|
||||
// errors.Is to match both ErrSpawnFailed and ErrUnsupportedOS so it
|
||||
// can map to terminal.error code "unsupported_os" instead of a
|
||||
// generic "spawn_failed". Switched to double-%w; both must match.
|
||||
f := newFixture(t)
|
||||
defer f.mgr.Close()
|
||||
|
||||
f.spawner.make = func(_ *testing.T, _ SpawnRequest) (*fakePTY, error) {
|
||||
return nil, ErrUnsupportedOS
|
||||
}
|
||||
|
||||
_, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
|
||||
if err == nil {
|
||||
t.Fatal("Open returned nil err with failing spawner")
|
||||
}
|
||||
if !errors.Is(err, ErrUnsupportedOS) {
|
||||
t.Errorf("errors.Is(err, ErrUnsupportedOS) = false; err = %v", err)
|
||||
}
|
||||
if !errors.Is(err, ErrSpawnFailed) {
|
||||
t.Errorf("errors.Is(err, ErrSpawnFailed) = false; err = %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSession_WriteUpdatesLastIO(t *testing.T) {
|
||||
f := newFixture(t, func(c *ManagerConfig) {
|
||||
c.IdleTimeout = 30 * time.Minute
|
||||
})
|
||||
defer f.mgr.Close()
|
||||
|
||||
sess, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
|
||||
if err != nil {
|
||||
t.Fatalf("Open: %v", err)
|
||||
}
|
||||
|
||||
f.advance(20 * time.Minute)
|
||||
if _, err := sess.Write([]byte("echo hi\n")); err != nil {
|
||||
t.Fatalf("Write: %v", err)
|
||||
}
|
||||
f.advance(20 * time.Minute) // total 40min, but 20 min since last IO
|
||||
f.mgr.CheckIdle()
|
||||
|
||||
if _, err := f.mgr.Get(sess.ID()); err != nil {
|
||||
t.Fatalf("session evicted despite recent write: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSession_DoneFiresAfterDeregister(t *testing.T) {
|
||||
// Locks the finalize-order contract from Round 2 review:
|
||||
// ExitC → close(output) → onClose/deregister → close(done)
|
||||
// External waiters (daemonws bridge, GC hook, audit) use `<-Done()`
|
||||
// as the signal that the session is fully torn down. Any consumer
|
||||
// querying the manager immediately after Done() must observe the
|
||||
// session deregistered.
|
||||
f := newFixture(t)
|
||||
defer f.mgr.Close()
|
||||
|
||||
sess, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
|
||||
if err != nil {
|
||||
t.Fatalf("Open: %v", err)
|
||||
}
|
||||
id := sess.ID()
|
||||
|
||||
sess.Close("user_requested")
|
||||
<-sess.Done()
|
||||
|
||||
if _, err := f.mgr.Get(id); !errors.Is(err, ErrSessionNotFound) {
|
||||
t.Fatalf("Get after <-Done() = %v, want ErrSessionNotFound (finalize order violated)", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_CloseConcurrentReentryWaitsForFinalize(t *testing.T) {
|
||||
// Regression for Round 3 review: a late Close() caller used to return
|
||||
// immediately when it saw m.closed==true, even though the first caller
|
||||
// was still in the middle of waiting for each session's Done(). That
|
||||
// broke the "Manager.Close returning means everything is drained"
|
||||
// contract for every caller but the first. With closeDone, all callers
|
||||
// now share the same finalize barrier.
|
||||
f := newFixture(t)
|
||||
f.spawner.make = func(tt *testing.T, req SpawnRequest) (*fakePTY, error) {
|
||||
p := newFakePTY(tt, req.Cols, req.Rows)
|
||||
// Long enough that the second goroutine's Close call definitely
|
||||
// observes the first one mid-flight rather than already-finished.
|
||||
p.waitDelay = 200 * time.Millisecond
|
||||
return p, nil
|
||||
}
|
||||
|
||||
s1, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
|
||||
if err != nil {
|
||||
t.Fatalf("Open: %v", err)
|
||||
}
|
||||
|
||||
const callers = 4
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(callers)
|
||||
sessionsAfter := make([]int, callers)
|
||||
doneClosed := make([]bool, callers)
|
||||
for i := 0; i < callers; i++ {
|
||||
i := i
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
f.mgr.Close()
|
||||
sessionsAfter[i] = len(f.mgr.Sessions())
|
||||
select {
|
||||
case <-s1.Done():
|
||||
doneClosed[i] = true
|
||||
default:
|
||||
doneClosed[i] = false
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
for i := 0; i < callers; i++ {
|
||||
if sessionsAfter[i] != 0 {
|
||||
t.Errorf("caller %d: Sessions() after Close = %d, want 0", i, sessionsAfter[i])
|
||||
}
|
||||
if !doneClosed[i] {
|
||||
t.Errorf("caller %d: session Done not closed when Close returned", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_OpenAfterCloseReapsSpawnedPTY(t *testing.T) {
|
||||
// Regression for Round 3 review: Manager.Open's cleanup path used to
|
||||
// only call pty.Close() when it lost the race with Manager.Close —
|
||||
// no Wait(), so on a real unix PTY the killed child stayed as a zombie
|
||||
// (waitLoop never ran because sess.start() never ran). The fix calls
|
||||
// pty.Wait() synchronously to reap.
|
||||
f := newFixture(t)
|
||||
|
||||
inSpawn := make(chan struct{})
|
||||
releaseSpawn := make(chan struct{})
|
||||
spawnedPTY := make(chan *fakePTY, 1)
|
||||
f.spawner.make = func(tt *testing.T, req SpawnRequest) (*fakePTY, error) {
|
||||
close(inSpawn)
|
||||
<-releaseSpawn
|
||||
p := newFakePTY(tt, req.Cols, req.Rows)
|
||||
// Allow Wait() to return immediately once Close fires its waitDone.
|
||||
spawnedPTY <- p
|
||||
return p, nil
|
||||
}
|
||||
|
||||
openDone := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
|
||||
openDone <- err
|
||||
}()
|
||||
|
||||
// Wait until Open is parked inside the spawner, then close the manager
|
||||
// with zero registered sessions. Open will lose the race when it
|
||||
// reacquires the mu after the spawn returns.
|
||||
<-inSpawn
|
||||
f.mgr.Close()
|
||||
|
||||
// Let the spawn finish; Open should now hit the closed-manager cleanup
|
||||
// path: pty.Close + pty.Wait + return ErrManagerClosed.
|
||||
close(releaseSpawn)
|
||||
|
||||
select {
|
||||
case err := <-openDone:
|
||||
if !errors.Is(err, ErrManagerClosed) {
|
||||
t.Fatalf("Open after Close = %v, want ErrManagerClosed", err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Open did not return after Manager.Close + spawn release")
|
||||
}
|
||||
|
||||
pty := <-spawnedPTY
|
||||
if got := pty.waitCount.Load(); got < 1 {
|
||||
t.Fatalf("pty.Wait() called %d times in cleanup path, want >=1 — child not reaped", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_CloseWaitsForSessionFinalize(t *testing.T) {
|
||||
// Manager.Close used to only wait for s.Close() (which just initiates
|
||||
// teardown — signals stop, closes the PTY). The waitLoop finalizer
|
||||
// could still be running after Manager.Close returned, leaving the
|
||||
// sessions map non-empty briefly. With Round 2 review's fix, each
|
||||
// goroutine in Manager.Close additionally `<-s.Done()` so the manager
|
||||
// is fully drained by the time Close returns. We inject a Wait delay
|
||||
// to make the difference observable: without the fix, the session map
|
||||
// is still populated when Manager.Close returns and `<-s.Done()` would
|
||||
// block.
|
||||
f := newFixture(t)
|
||||
f.spawner.make = func(tt *testing.T, req SpawnRequest) (*fakePTY, error) {
|
||||
p := newFakePTY(tt, req.Cols, req.Rows)
|
||||
p.waitDelay = 150 * time.Millisecond
|
||||
return p, nil
|
||||
}
|
||||
|
||||
s1, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
|
||||
if err != nil {
|
||||
t.Fatalf("Open: %v", err)
|
||||
}
|
||||
s2, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
|
||||
if err != nil {
|
||||
t.Fatalf("Open: %v", err)
|
||||
}
|
||||
|
||||
f.mgr.Close()
|
||||
|
||||
// After Close returns: registry empty AND every Done is already closed.
|
||||
if got := len(f.mgr.Sessions()); got != 0 {
|
||||
t.Errorf("Sessions after Manager.Close = %d, want 0", got)
|
||||
}
|
||||
for _, s := range []*PtySession{s1, s2} {
|
||||
select {
|
||||
case <-s.Done():
|
||||
default:
|
||||
t.Errorf("session %s Done not closed when Manager.Close returned", s.ID())
|
||||
}
|
||||
if _, err := f.mgr.Get(s.ID()); !errors.Is(err, ErrSessionNotFound) {
|
||||
t.Errorf("session %s still registered after Manager.Close: %v", s.ID(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
35
server/internal/daemon/terminal/pty.go
Normal file
35
server/internal/daemon/terminal/pty.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package terminal
|
||||
|
||||
import (
|
||||
"io"
|
||||
"time"
|
||||
)
|
||||
|
||||
// PTY abstracts the platform PTY + child process so tests can swap in a
|
||||
// fake without forking a real shell. Read returns child stdout/stderr;
|
||||
// Write delivers stdin; Resize updates the window; Wait blocks for the
|
||||
// child to exit and returns its exit code; Close terminates the child
|
||||
// and releases the master fd.
|
||||
type PTY interface {
|
||||
io.ReadWriter
|
||||
Resize(cols, rows uint16) error
|
||||
Wait() (exitCode int, err error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
// SpawnRequest is the input to Spawner.Start.
|
||||
type SpawnRequest struct {
|
||||
Shell string
|
||||
Args []string
|
||||
Cwd string
|
||||
Env []string
|
||||
Cols uint16
|
||||
Rows uint16
|
||||
Started time.Time
|
||||
}
|
||||
|
||||
// Spawner creates new PTYs. Production uses realSpawner; tests inject a
|
||||
// channel-backed fake (see fakePTY in manager_test.go).
|
||||
type Spawner interface {
|
||||
Start(SpawnRequest) (PTY, error)
|
||||
}
|
||||
275
server/internal/daemon/terminal/session.go
Normal file
275
server/internal/daemon/terminal/session.go
Normal file
@@ -0,0 +1,275 @@
|
||||
package terminal
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ExitInfo describes how a session terminated.
|
||||
type ExitInfo struct {
|
||||
ExitCode int
|
||||
Reason string
|
||||
}
|
||||
|
||||
// PtySession is a single live PTY + child shell. Methods are safe for
|
||||
// concurrent use; readers consume from Output() and ExitC() until
|
||||
// Output() is closed, which always follows an ExitC() send.
|
||||
type PtySession struct {
|
||||
id string
|
||||
taskID string
|
||||
workspaceID string
|
||||
issueID string
|
||||
workDir string
|
||||
userID string
|
||||
shellPath string
|
||||
|
||||
mu sync.Mutex
|
||||
cols, rows uint16
|
||||
pty PTY
|
||||
output chan []byte
|
||||
exit chan ExitInfo
|
||||
done chan struct{}
|
||||
stop chan struct{}
|
||||
stopOnce sync.Once
|
||||
closing bool
|
||||
closeReason string
|
||||
|
||||
// wg tracks readLoop and idleLoop. waitLoop is the finalizer: it
|
||||
// waits on wg before closing output/done so we never close the
|
||||
// output channel while readLoop is mid-send.
|
||||
wg sync.WaitGroup
|
||||
|
||||
now func() time.Time
|
||||
idleTimeout time.Duration
|
||||
startedAt time.Time
|
||||
lastIO time.Time
|
||||
|
||||
logger *slog.Logger
|
||||
onClose func(string)
|
||||
}
|
||||
|
||||
// ID returns the session identifier.
|
||||
func (s *PtySession) ID() string { return s.id }
|
||||
|
||||
// TaskID returns the task this session is bound to.
|
||||
func (s *PtySession) TaskID() string { return s.taskID }
|
||||
|
||||
// WorkspaceID returns the workspace this session belongs to.
|
||||
func (s *PtySession) WorkspaceID() string { return s.workspaceID }
|
||||
|
||||
// IssueID returns the issue this session was opened from, if any.
|
||||
func (s *PtySession) IssueID() string { return s.issueID }
|
||||
|
||||
// WorkDir returns the cwd of the child shell.
|
||||
func (s *PtySession) WorkDir() string { return s.workDir }
|
||||
|
||||
// UserID returns the human user who opened the session.
|
||||
func (s *PtySession) UserID() string { return s.userID }
|
||||
|
||||
// Shell returns the shell binary path that was spawned.
|
||||
func (s *PtySession) Shell() string { return s.shellPath }
|
||||
|
||||
// StartedAt returns the wall-clock time the session was spawned.
|
||||
func (s *PtySession) StartedAt() time.Time { return s.startedAt }
|
||||
|
||||
// LastIO returns the most recent time data flowed in either direction.
|
||||
func (s *PtySession) LastIO() time.Time {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.lastIO
|
||||
}
|
||||
|
||||
// Output yields PTY output chunks as they arrive. The channel closes
|
||||
// after the child exits and a value has been delivered on ExitC().
|
||||
func (s *PtySession) Output() <-chan []byte { return s.output }
|
||||
|
||||
// ExitC fires once when the child exits. After that, Output() closes.
|
||||
func (s *PtySession) ExitC() <-chan ExitInfo { return s.exit }
|
||||
|
||||
// Done returns a channel closed when the session is fully torn down
|
||||
// (all goroutines exited, registry deregistered).
|
||||
func (s *PtySession) Done() <-chan struct{} { return s.done }
|
||||
|
||||
// Write forwards bytes to the PTY stdin. Returns the byte count actually
|
||||
// written. Updates LastIO so idle detection sees the activity.
|
||||
func (s *PtySession) Write(p []byte) (int, error) {
|
||||
s.mu.Lock()
|
||||
if s.closing {
|
||||
s.mu.Unlock()
|
||||
return 0, ErrSessionNotFound
|
||||
}
|
||||
pty := s.pty
|
||||
s.lastIO = s.now()
|
||||
s.mu.Unlock()
|
||||
return pty.Write(p)
|
||||
}
|
||||
|
||||
// Resize updates the PTY window size.
|
||||
func (s *PtySession) Resize(cols, rows uint16) error {
|
||||
cols, rows = normalizeSize(cols, rows)
|
||||
s.mu.Lock()
|
||||
if s.closing {
|
||||
s.mu.Unlock()
|
||||
return ErrSessionNotFound
|
||||
}
|
||||
s.cols = cols
|
||||
s.rows = rows
|
||||
pty := s.pty
|
||||
s.lastIO = s.now()
|
||||
s.mu.Unlock()
|
||||
return pty.Resize(cols, rows)
|
||||
}
|
||||
|
||||
// Size returns the current cols, rows of the PTY.
|
||||
func (s *PtySession) Size() (uint16, uint16) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.cols, s.rows
|
||||
}
|
||||
|
||||
// Close tears down the session. Subsequent calls are no-ops. The
|
||||
// reason is recorded for audit logging and the terminal.exit payload.
|
||||
//
|
||||
// Close only initiates teardown — signals stop, closes the PTY, returns.
|
||||
// waitLoop is the actual finalizer: it waits for readLoop + idleLoop
|
||||
// to exit (via wg) before closing output/done. That ordering is what
|
||||
// makes "Close while output buffer is full" safe — readLoop's blocked
|
||||
// send unblocks on <-stop, and only then does the output channel close.
|
||||
func (s *PtySession) Close(reason string) {
|
||||
s.mu.Lock()
|
||||
if s.closing {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
s.closing = true
|
||||
s.closeReason = reason
|
||||
pty := s.pty
|
||||
s.mu.Unlock()
|
||||
|
||||
s.stopOnce.Do(func() { close(s.stop) })
|
||||
|
||||
if pty != nil {
|
||||
// pty.Close on the unix spawner runs SIGHUP → grace → SIGKILL.
|
||||
// It's idempotent (sync.Once), so the second call from waitLoop's
|
||||
// finalizer is a no-op.
|
||||
_ = pty.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// start kicks off the reader, exit-watch, and (optional) idle
|
||||
// goroutines. Manager.Open is the only caller. wg.Add runs
|
||||
// synchronously before waitLoop is spawned so wg.Wait sees the
|
||||
// correct count even if Close fires immediately.
|
||||
func (s *PtySession) start() {
|
||||
s.wg.Add(1)
|
||||
go s.readLoop()
|
||||
if s.idleTimeout > 0 {
|
||||
s.wg.Add(1)
|
||||
go s.idleLoop()
|
||||
}
|
||||
go s.waitLoop()
|
||||
}
|
||||
|
||||
func (s *PtySession) readLoop() {
|
||||
defer s.wg.Done()
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
n, err := s.pty.Read(buf)
|
||||
if n > 0 {
|
||||
chunk := make([]byte, n)
|
||||
copy(chunk, buf[:n])
|
||||
s.mu.Lock()
|
||||
s.lastIO = s.now()
|
||||
s.mu.Unlock()
|
||||
select {
|
||||
case s.output <- chunk:
|
||||
case <-s.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, io.EOF) && err != io.ErrClosedPipe {
|
||||
s.logger.Debug("pty read error", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *PtySession) waitLoop() {
|
||||
code, waitErr := s.pty.Wait()
|
||||
|
||||
s.mu.Lock()
|
||||
reason := s.closeReason
|
||||
if reason == "" {
|
||||
if waitErr != nil {
|
||||
reason = "wait_error"
|
||||
} else {
|
||||
reason = "exited"
|
||||
}
|
||||
s.closeReason = reason
|
||||
}
|
||||
s.closing = true
|
||||
s.mu.Unlock()
|
||||
|
||||
// Ensure the PTY fd is closed so readLoop's pty.Read returns EOF.
|
||||
// pty.Close is idempotent (sync.Once on the unix spawner).
|
||||
_ = s.pty.Close()
|
||||
|
||||
// Signal stop so idleLoop and any blocked send in readLoop exit.
|
||||
s.stopOnce.Do(func() { close(s.stop) })
|
||||
|
||||
// Wait for readLoop + idleLoop before closing output/done. This is
|
||||
// the invariant that prevents "send on closed channel" panics when
|
||||
// output is full: readLoop is either past its send or unblocked via
|
||||
// <-stop, but never racing with close(s.output).
|
||||
s.wg.Wait()
|
||||
|
||||
// Finalize order is load-bearing: external waiters use `<-Done()` as
|
||||
// a signal that the session is fully torn down AND deregistered from
|
||||
// the manager. The sequence must be:
|
||||
// ExitC → close(output) → onClose/deregister → close(done)
|
||||
// so that any consumer doing `<-Done(); manager.Get(id)` after a
|
||||
// teardown is guaranteed to observe ErrSessionNotFound.
|
||||
select {
|
||||
case s.exit <- ExitInfo{ExitCode: code, Reason: reason}:
|
||||
default:
|
||||
}
|
||||
close(s.output)
|
||||
if s.onClose != nil {
|
||||
s.onClose(s.id)
|
||||
}
|
||||
close(s.done)
|
||||
}
|
||||
|
||||
func (s *PtySession) idleLoop() {
|
||||
defer s.wg.Done()
|
||||
// Sample at IdleTimeout/4 so reaction time is bounded but ticks
|
||||
// stay cheap with many sessions. Manager.CheckIdle catches anything
|
||||
// this loop misses (e.g. when daemon's outer GC tick is coarser).
|
||||
interval := s.idleTimeout / 4
|
||||
if interval < time.Second {
|
||||
interval = time.Second
|
||||
}
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-s.stop:
|
||||
return
|
||||
case <-t.C:
|
||||
if s.now().Sub(s.LastIO()) >= s.idleTimeout {
|
||||
// Close calls pty.Close + waits for wg in waitLoop. If
|
||||
// we ran it inline, waitLoop's wg.Wait would block on
|
||||
// this goroutine, which can't exit until Close returns
|
||||
// — deadlock. Spawning lets idleLoop return and
|
||||
// decrement wg.
|
||||
go s.Close("idle_timeout")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
102
server/internal/daemon/terminal/spawner_unix.go
Normal file
102
server/internal/daemon/terminal/spawner_unix.go
Normal file
@@ -0,0 +1,102 @@
|
||||
//go:build !windows
|
||||
|
||||
package terminal
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/creack/pty"
|
||||
)
|
||||
|
||||
// closeGracePeriod is the window between SIGHUP and SIGKILL during a
|
||||
// Close. Long enough for interactive shells to run trap handlers and
|
||||
// flush state; short enough that closing a tab feels instant.
|
||||
const closeGracePeriod = 250 * time.Millisecond
|
||||
|
||||
// realSpawner forks the shell on a PTY using creack/pty. Linux/macOS
|
||||
// only; Windows reaches the stub in spawner_windows.go and returns
|
||||
// ErrUnsupportedOS.
|
||||
type realSpawner struct{}
|
||||
|
||||
func (realSpawner) Start(req SpawnRequest) (PTY, error) {
|
||||
cmd := exec.Command(req.Shell, req.Args...)
|
||||
cmd.Dir = req.Cwd
|
||||
|
||||
// Inherit the daemon's PATH so users get whatever CLIs are installed
|
||||
// in the daemon's environment (claude, codex, multica, etc.); merge
|
||||
// in the per-session vars built by buildEnv.
|
||||
env := os.Environ()
|
||||
env = append(env, req.Env...)
|
||||
cmd.Env = env
|
||||
|
||||
size := &pty.Winsize{Cols: req.Cols, Rows: req.Rows}
|
||||
f, err := pty.StartWithSize(cmd, size)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pty.StartWithSize: %w", err)
|
||||
}
|
||||
return &unixPTY{cmd: cmd, file: f}, nil
|
||||
}
|
||||
|
||||
type unixPTY struct {
|
||||
cmd *exec.Cmd
|
||||
file *os.File
|
||||
exited atomic.Bool
|
||||
closeOnce sync.Once
|
||||
closeErr error
|
||||
}
|
||||
|
||||
func (p *unixPTY) Read(b []byte) (int, error) { return p.file.Read(b) }
|
||||
func (p *unixPTY) Write(b []byte) (int, error) { return p.file.Write(b) }
|
||||
|
||||
func (p *unixPTY) Resize(cols, rows uint16) error {
|
||||
return pty.Setsize(p.file, &pty.Winsize{Cols: cols, Rows: rows})
|
||||
}
|
||||
|
||||
func (p *unixPTY) Wait() (int, error) {
|
||||
err := p.cmd.Wait()
|
||||
p.exited.Store(true)
|
||||
if p.cmd.ProcessState != nil {
|
||||
return p.cmd.ProcessState.ExitCode(), err
|
||||
}
|
||||
return -1, err
|
||||
}
|
||||
|
||||
// Close terminates the child shell and releases the PTY master fd.
|
||||
// Closing a tab is a hangup, not an interrupt — so the signal path is
|
||||
// SIGHUP → brief grace → SIGKILL → file.Close, in that order:
|
||||
//
|
||||
// - SIGHUP gives interactive shells a chance to run trap handlers,
|
||||
// write history, etc. before the fd disappears.
|
||||
// - The grace window is bounded; anything slower than that is stuck.
|
||||
// - SIGKILL is the cliff for shells that ignore HUP.
|
||||
// - file.Close releases the master fd last so the slave side keeps
|
||||
// working during cleanup.
|
||||
//
|
||||
// Signals are sent to the negated pid so they hit the whole process
|
||||
// group. creack/pty starts the child as a session leader (Setsid), so
|
||||
// pid == pgid and any descendants the user spawned in the shell are
|
||||
// caught by the same kill.
|
||||
//
|
||||
// If the child already exited naturally (Wait returned), all signal
|
||||
// work is skipped — we only close the fd. That avoids a pointless
|
||||
// 250ms sleep in the natural-exit teardown path.
|
||||
func (p *unixPTY) Close() error {
|
||||
p.closeOnce.Do(func() {
|
||||
if p.cmd.Process != nil && !p.exited.Load() {
|
||||
pid := p.cmd.Process.Pid
|
||||
_ = syscall.Kill(-pid, syscall.SIGHUP)
|
||||
time.Sleep(closeGracePeriod)
|
||||
if !p.exited.Load() {
|
||||
_ = syscall.Kill(-pid, syscall.SIGKILL)
|
||||
}
|
||||
}
|
||||
p.closeErr = p.file.Close()
|
||||
})
|
||||
return p.closeErr
|
||||
}
|
||||
9
server/internal/daemon/terminal/spawner_windows.go
Normal file
9
server/internal/daemon/terminal/spawner_windows.go
Normal file
@@ -0,0 +1,9 @@
|
||||
//go:build windows
|
||||
|
||||
package terminal
|
||||
|
||||
// realSpawner on Windows always refuses — ConPty support is RFC P1 and
|
||||
// the Desktop button + CLI both surface a clear error from this layer.
|
||||
type realSpawner struct{}
|
||||
|
||||
func (realSpawner) Start(SpawnRequest) (PTY, error) { return nil, ErrUnsupportedOS }
|
||||
260
server/internal/daemon/terminal_bridge.go
Normal file
260
server/internal/daemon/terminal_bridge.go
Normal file
@@ -0,0 +1,260 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"sync"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/daemon/terminal"
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
// terminalBridge adapts the daemon-side terminal.Manager to the daemonws
|
||||
// WebSocket transport. Per session it:
|
||||
//
|
||||
// - relays PtySession.Output() → terminal.data frames (daemon→server)
|
||||
// - relays PtySession.ExitC() → terminal.exit frames
|
||||
// - tears the bridge goroutine down when Done() fires
|
||||
//
|
||||
// frameSender is the daemon's currently-active WS writer (see
|
||||
// Daemon.sendWSFrame). It returns false when no connection is active or the
|
||||
// outbound queue is saturated; we drop the frame in that case rather than
|
||||
// stall the reader, because the next reconnect would unstick us anyway.
|
||||
type terminalBridge struct {
|
||||
manager *terminal.Manager
|
||||
logger *slog.Logger
|
||||
send func([]byte) bool
|
||||
|
||||
mu sync.Mutex
|
||||
sessions map[string]*terminalRoute
|
||||
}
|
||||
|
||||
type terminalRoute struct {
|
||||
session *terminal.PtySession
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func newTerminalBridge(mgr *terminal.Manager, logger *slog.Logger, send func([]byte) bool) *terminalBridge {
|
||||
return &terminalBridge{
|
||||
manager: mgr,
|
||||
logger: logger,
|
||||
send: send,
|
||||
sessions: make(map[string]*terminalRoute),
|
||||
}
|
||||
}
|
||||
|
||||
// handleFrame dispatches a single terminal.* envelope from the server. The
|
||||
// caller already decoded protocol.Message; we receive the inner type+payload.
|
||||
func (b *terminalBridge) handleFrame(msgType string, payload json.RawMessage) {
|
||||
switch msgType {
|
||||
case protocol.MessageTypeTerminalOpen:
|
||||
var p protocol.TerminalOpenPayload
|
||||
if err := json.Unmarshal(payload, &p); err != nil {
|
||||
b.logger.Debug("terminal.open invalid payload", "error", err)
|
||||
return
|
||||
}
|
||||
b.handleOpen(p)
|
||||
case protocol.MessageTypeTerminalData:
|
||||
var p protocol.TerminalDataPayload
|
||||
if err := json.Unmarshal(payload, &p); err != nil {
|
||||
b.logger.Debug("terminal.data invalid payload", "error", err)
|
||||
return
|
||||
}
|
||||
b.handleData(p)
|
||||
case protocol.MessageTypeTerminalResize:
|
||||
var p protocol.TerminalResizePayload
|
||||
if err := json.Unmarshal(payload, &p); err != nil {
|
||||
b.logger.Debug("terminal.resize invalid payload", "error", err)
|
||||
return
|
||||
}
|
||||
b.handleResize(p)
|
||||
case protocol.MessageTypeTerminalClose:
|
||||
var p protocol.TerminalClosePayload
|
||||
if err := json.Unmarshal(payload, &p); err != nil {
|
||||
b.logger.Debug("terminal.close invalid payload", "error", err)
|
||||
return
|
||||
}
|
||||
b.handleClose(p)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *terminalBridge) handleOpen(p protocol.TerminalOpenPayload) {
|
||||
info := terminal.TaskInfo{
|
||||
TaskID: p.TaskID,
|
||||
WorkspaceID: p.WorkspaceID,
|
||||
IssueID: p.IssueID,
|
||||
WorkDir: p.WorkDir,
|
||||
PriorSessionID: p.PriorSessionID,
|
||||
}
|
||||
sess, err := b.manager.OpenWithInfo(context.Background(), info, terminal.OpenParams{
|
||||
TaskID: p.TaskID,
|
||||
WorkspaceID: p.WorkspaceID,
|
||||
UserID: p.UserID,
|
||||
Cols: p.Cols,
|
||||
Rows: p.Rows,
|
||||
})
|
||||
if err != nil {
|
||||
b.sendError(p.RequestID, "", mapTerminalError(err), err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
b.mu.Lock()
|
||||
b.sessions[sess.ID()] = &terminalRoute{session: sess, cancel: cancel}
|
||||
b.mu.Unlock()
|
||||
|
||||
b.sendFrame(protocol.MessageTypeTerminalOpened, protocol.TerminalOpenedPayload{
|
||||
RequestID: p.RequestID,
|
||||
SessionID: sess.ID(),
|
||||
WorkDir: sess.WorkDir(),
|
||||
Shell: sess.Shell(),
|
||||
})
|
||||
|
||||
go b.pump(ctx, sess)
|
||||
}
|
||||
|
||||
func (b *terminalBridge) handleData(p protocol.TerminalDataPayload) {
|
||||
sess, err := b.manager.Get(p.SessionID)
|
||||
if err != nil {
|
||||
b.sendError("", p.SessionID, protocol.TerminalErrorCodeSessionNotFound, err.Error())
|
||||
return
|
||||
}
|
||||
data, err := base64.StdEncoding.DecodeString(p.DataB64)
|
||||
if err != nil {
|
||||
b.logger.Debug("terminal.data invalid base64", "error", err, "session_id", p.SessionID)
|
||||
return
|
||||
}
|
||||
if _, err := sess.Write(data); err != nil {
|
||||
b.logger.Debug("terminal.data write failed", "error", err, "session_id", p.SessionID)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *terminalBridge) handleResize(p protocol.TerminalResizePayload) {
|
||||
sess, err := b.manager.Get(p.SessionID)
|
||||
if err != nil {
|
||||
b.sendError("", p.SessionID, protocol.TerminalErrorCodeSessionNotFound, err.Error())
|
||||
return
|
||||
}
|
||||
if err := sess.Resize(p.Cols, p.Rows); err != nil {
|
||||
b.logger.Debug("terminal.resize failed", "error", err, "session_id", p.SessionID)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *terminalBridge) handleClose(p protocol.TerminalClosePayload) {
|
||||
sess, err := b.manager.Get(p.SessionID)
|
||||
if err != nil {
|
||||
// Already gone — nothing to do; the server side has already received
|
||||
// a terminal.exit frame (or will, through the pump goroutine).
|
||||
return
|
||||
}
|
||||
reason := p.Reason
|
||||
if reason == "" {
|
||||
reason = "client_close"
|
||||
}
|
||||
sess.Close(reason)
|
||||
}
|
||||
|
||||
// pump bridges one session's output channel onto the WS as terminal.data
|
||||
// frames, and emits a terminal.exit when the child exits. Returns when
|
||||
// either the session is fully torn down or ctx is cancelled.
|
||||
func (b *terminalBridge) pump(ctx context.Context, sess *terminal.PtySession) {
|
||||
sessionID := sess.ID()
|
||||
defer func() {
|
||||
b.mu.Lock()
|
||||
delete(b.sessions, sessionID)
|
||||
b.mu.Unlock()
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chunk, ok := <-sess.Output():
|
||||
if !ok {
|
||||
// Output closed → child exited and waitLoop finalized.
|
||||
// ExitC was already delivered (or about to be); pull it once
|
||||
// non-blocking and forward, then exit the pump.
|
||||
var info terminal.ExitInfo
|
||||
select {
|
||||
case info = <-sess.ExitC():
|
||||
default:
|
||||
}
|
||||
b.sendFrame(protocol.MessageTypeTerminalExit, protocol.TerminalExitPayload{
|
||||
SessionID: sessionID,
|
||||
ExitCode: info.ExitCode,
|
||||
Reason: info.Reason,
|
||||
})
|
||||
<-sess.Done()
|
||||
return
|
||||
}
|
||||
b.sendFrame(protocol.MessageTypeTerminalData, protocol.TerminalDataPayload{
|
||||
SessionID: sessionID,
|
||||
DataB64: base64.StdEncoding.EncodeToString(chunk),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// closeAll tears down every live session. Called when the daemon
|
||||
// disconnects from the server: the browser proxy will fail downstream,
|
||||
// and a reconnect cannot resurrect the pre-existing PTYs because the
|
||||
// session_ids only existed in the prior WS context.
|
||||
func (b *terminalBridge) closeAll(reason string) {
|
||||
b.mu.Lock()
|
||||
routes := make([]*terminalRoute, 0, len(b.sessions))
|
||||
for _, r := range b.sessions {
|
||||
routes = append(routes, r)
|
||||
}
|
||||
b.mu.Unlock()
|
||||
for _, r := range routes {
|
||||
r.cancel()
|
||||
r.session.Close(reason)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *terminalBridge) sendFrame(msgType string, payload any) {
|
||||
raw, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
b.logger.Debug("terminal frame marshal failed", "error", err, "type", msgType)
|
||||
return
|
||||
}
|
||||
frame, err := json.Marshal(protocol.Message{Type: msgType, Payload: raw})
|
||||
if err != nil {
|
||||
b.logger.Debug("terminal envelope marshal failed", "error", err, "type", msgType)
|
||||
return
|
||||
}
|
||||
if !b.send(frame) {
|
||||
b.logger.Debug("terminal frame dropped: ws disconnected or backed up", "type", msgType)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *terminalBridge) sendError(requestID, sessionID, code, message string) {
|
||||
b.sendFrame(protocol.MessageTypeTerminalError, protocol.TerminalErrorPayload{
|
||||
RequestID: requestID,
|
||||
SessionID: sessionID,
|
||||
Code: code,
|
||||
Message: message,
|
||||
})
|
||||
}
|
||||
|
||||
// mapTerminalError translates the terminal package's sentinel errors into
|
||||
// protocol error codes the browser proxy can render. Anything we don't
|
||||
// recognise falls back to TerminalErrorCodeInternal — drop information
|
||||
// rather than surface internal wrap text to the user.
|
||||
func mapTerminalError(err error) string {
|
||||
switch {
|
||||
case errors.Is(err, terminal.ErrWorkspaceMismatch):
|
||||
return protocol.TerminalErrorCodeWorkspaceMismatch
|
||||
case errors.Is(err, terminal.ErrTaskNotFound):
|
||||
return protocol.TerminalErrorCodeTaskNotFound
|
||||
case errors.Is(err, terminal.ErrSessionNotFound):
|
||||
return protocol.TerminalErrorCodeSessionNotFound
|
||||
case errors.Is(err, terminal.ErrUnsupportedOS):
|
||||
return protocol.TerminalErrorCodeUnsupportedOS
|
||||
case errors.Is(err, terminal.ErrSpawnFailed):
|
||||
return protocol.TerminalErrorCodeSpawnFailed
|
||||
}
|
||||
return protocol.TerminalErrorCodeInternal
|
||||
}
|
||||
313
server/internal/daemon/terminal_bridge_test.go
Normal file
313
server/internal/daemon/terminal_bridge_test.go
Normal file
@@ -0,0 +1,313 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/daemon/terminal"
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
// captureSender is the test stand-in for the daemon's outbound WS writer.
|
||||
// Frames are kept in order so the test can wait for a specific message type
|
||||
// to appear.
|
||||
type captureSender struct {
|
||||
mu sync.Mutex
|
||||
frames [][]byte
|
||||
}
|
||||
|
||||
func (c *captureSender) send(frame []byte) bool {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
cp := make([]byte, len(frame))
|
||||
copy(cp, frame)
|
||||
c.frames = append(c.frames, cp)
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *captureSender) waitFor(t *testing.T, msgType string, timeout time.Duration) protocol.Message {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(timeout)
|
||||
for time.Now().Before(deadline) {
|
||||
c.mu.Lock()
|
||||
for _, f := range c.frames {
|
||||
var m protocol.Message
|
||||
if err := json.Unmarshal(f, &m); err == nil && m.Type == msgType {
|
||||
c.mu.Unlock()
|
||||
return m
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("timeout waiting for frame of type %q (saw %d frames)", msgType, len(c.frames))
|
||||
return protocol.Message{}
|
||||
}
|
||||
|
||||
// fakeBridgePTY is a minimal PTY for the bridge integration: it lets the
|
||||
// test push child output and read writes back. Wait blocks until Close.
|
||||
type fakeBridgePTY struct {
|
||||
out chan []byte
|
||||
mu sync.Mutex
|
||||
written []byte
|
||||
cols uint16
|
||||
rows uint16
|
||||
closeCh chan struct{}
|
||||
exit int
|
||||
waitOnce sync.Once
|
||||
waitDone chan struct{}
|
||||
}
|
||||
|
||||
func newFakeBridgePTY(cols, rows uint16) *fakeBridgePTY {
|
||||
return &fakeBridgePTY{
|
||||
out: make(chan []byte, 4),
|
||||
cols: cols,
|
||||
rows: rows,
|
||||
closeCh: make(chan struct{}),
|
||||
waitDone: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *fakeBridgePTY) Read(b []byte) (int, error) {
|
||||
select {
|
||||
case chunk, ok := <-p.out:
|
||||
if !ok {
|
||||
return 0, errEOF
|
||||
}
|
||||
n := copy(b, chunk)
|
||||
return n, nil
|
||||
case <-p.closeCh:
|
||||
return 0, errEOF
|
||||
}
|
||||
}
|
||||
|
||||
func (p *fakeBridgePTY) Write(b []byte) (int, error) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.written = append(p.written, b...)
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
func (p *fakeBridgePTY) Resize(cols, rows uint16) error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.cols = cols
|
||||
p.rows = rows
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *fakeBridgePTY) Close() error {
|
||||
p.waitOnce.Do(func() {
|
||||
close(p.closeCh)
|
||||
close(p.waitDone)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *fakeBridgePTY) Wait() (int, error) {
|
||||
<-p.waitDone
|
||||
return p.exit, nil
|
||||
}
|
||||
|
||||
type stringErr string
|
||||
|
||||
func (e stringErr) Error() string { return string(e) }
|
||||
|
||||
const errEOF = stringErr("EOF")
|
||||
|
||||
func TestTerminalBridge_OpenSendsOpenedFrameWithServerSuppliedWorkdir(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
|
||||
pty := newFakeBridgePTY(80, 24)
|
||||
spawner := &stubSpawner{pty: pty}
|
||||
mgr := terminal.NewManager(terminal.ManagerConfig{
|
||||
Spawner: spawner,
|
||||
Logger: slog.Default(),
|
||||
}, nil)
|
||||
defer mgr.Close()
|
||||
|
||||
sender := &captureSender{}
|
||||
bridge := newTerminalBridge(mgr, slog.Default(), sender.send)
|
||||
|
||||
openPayload, err := json.Marshal(protocol.TerminalOpenPayload{
|
||||
RequestID: "req-1",
|
||||
TaskID: "task-via-ws",
|
||||
WorkspaceID: "ws-A",
|
||||
UserID: "user-1",
|
||||
IssueID: "issue-1",
|
||||
WorkDir: tmp,
|
||||
PriorSessionID: "claude-xyz",
|
||||
Cols: 120,
|
||||
Rows: 30,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("marshal open: %v", err)
|
||||
}
|
||||
|
||||
bridge.handleFrame(protocol.MessageTypeTerminalOpen, openPayload)
|
||||
|
||||
openedMsg := sender.waitFor(t, protocol.MessageTypeTerminalOpened, time.Second)
|
||||
var opened protocol.TerminalOpenedPayload
|
||||
if err := json.Unmarshal(openedMsg.Payload, &opened); err != nil {
|
||||
t.Fatalf("opened payload: %v", err)
|
||||
}
|
||||
if opened.RequestID != "req-1" {
|
||||
t.Errorf("opened.request_id = %q, want req-1", opened.RequestID)
|
||||
}
|
||||
if opened.SessionID == "" {
|
||||
t.Errorf("opened.session_id is empty")
|
||||
}
|
||||
if opened.WorkDir != tmp {
|
||||
t.Errorf("opened.work_dir = %q, want %q", opened.WorkDir, tmp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTerminalBridge_OpenWithoutWorkdirEmitsTaskNotFound(t *testing.T) {
|
||||
// The server is supposed to resolve task.work_dir from its DB before
|
||||
// forwarding terminal.open. If it forgets / fails, the daemon must
|
||||
// not silently fall through to spawning bash in CWD — it has to
|
||||
// surface a structured terminal.error and never call the spawner.
|
||||
pty := newFakeBridgePTY(80, 24)
|
||||
spawner := &stubSpawner{pty: pty}
|
||||
mgr := terminal.NewManager(terminal.ManagerConfig{
|
||||
Spawner: spawner,
|
||||
Logger: slog.Default(),
|
||||
}, nil)
|
||||
defer mgr.Close()
|
||||
|
||||
sender := &captureSender{}
|
||||
bridge := newTerminalBridge(mgr, slog.Default(), sender.send)
|
||||
|
||||
openPayload, _ := json.Marshal(protocol.TerminalOpenPayload{
|
||||
RequestID: "req-2",
|
||||
TaskID: "task-evil",
|
||||
WorkspaceID: "ws-B",
|
||||
WorkDir: "", // server failed to resolve
|
||||
Cols: 80,
|
||||
Rows: 24,
|
||||
})
|
||||
|
||||
bridge.handleFrame(protocol.MessageTypeTerminalOpen, openPayload)
|
||||
|
||||
errMsg := sender.waitFor(t, protocol.MessageTypeTerminalError, time.Second)
|
||||
var errPayload protocol.TerminalErrorPayload
|
||||
if err := json.Unmarshal(errMsg.Payload, &errPayload); err != nil {
|
||||
t.Fatalf("error payload: %v", err)
|
||||
}
|
||||
if errPayload.Code != protocol.TerminalErrorCodeTaskNotFound {
|
||||
t.Errorf("error code = %q, want %q", errPayload.Code, protocol.TerminalErrorCodeTaskNotFound)
|
||||
}
|
||||
if errPayload.RequestID != "req-2" {
|
||||
t.Errorf("error request_id = %q, want req-2", errPayload.RequestID)
|
||||
}
|
||||
if spawner.callCount() != 0 {
|
||||
t.Errorf("spawner was invoked %d times despite resolve failure", spawner.callCount())
|
||||
}
|
||||
}
|
||||
|
||||
func TestTerminalBridge_DataAndExitRoundTrip(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
|
||||
pty := newFakeBridgePTY(80, 24)
|
||||
spawner := &stubSpawner{pty: pty}
|
||||
mgr := terminal.NewManager(terminal.ManagerConfig{
|
||||
Spawner: spawner,
|
||||
Logger: slog.Default(),
|
||||
}, nil)
|
||||
defer mgr.Close()
|
||||
|
||||
sender := &captureSender{}
|
||||
bridge := newTerminalBridge(mgr, slog.Default(), sender.send)
|
||||
|
||||
openPayload, _ := json.Marshal(protocol.TerminalOpenPayload{
|
||||
RequestID: "req-3",
|
||||
TaskID: "task-3",
|
||||
WorkspaceID: "ws-A",
|
||||
WorkDir: tmp,
|
||||
Cols: 80,
|
||||
Rows: 24,
|
||||
})
|
||||
bridge.handleFrame(protocol.MessageTypeTerminalOpen, openPayload)
|
||||
openedMsg := sender.waitFor(t, protocol.MessageTypeTerminalOpened, time.Second)
|
||||
var opened protocol.TerminalOpenedPayload
|
||||
_ = json.Unmarshal(openedMsg.Payload, &opened)
|
||||
sessionID := opened.SessionID
|
||||
|
||||
// Push child output → bridge should emit terminal.data on the WS.
|
||||
pty.out <- []byte("hello\n")
|
||||
dataMsg := sender.waitFor(t, protocol.MessageTypeTerminalData, time.Second)
|
||||
var dp protocol.TerminalDataPayload
|
||||
if err := json.Unmarshal(dataMsg.Payload, &dp); err != nil {
|
||||
t.Fatalf("data payload: %v", err)
|
||||
}
|
||||
if dp.SessionID != sessionID {
|
||||
t.Errorf("data session_id = %q, want %q", dp.SessionID, sessionID)
|
||||
}
|
||||
decoded, err := base64.StdEncoding.DecodeString(dp.DataB64)
|
||||
if err != nil {
|
||||
t.Fatalf("decode data: %v", err)
|
||||
}
|
||||
if string(decoded) != "hello\n" {
|
||||
t.Errorf("data bytes = %q, want %q", decoded, "hello\n")
|
||||
}
|
||||
|
||||
// Send a write the other direction. The bridge should base64-decode
|
||||
// and call PTY.Write.
|
||||
inboundData, _ := json.Marshal(protocol.TerminalDataPayload{
|
||||
SessionID: sessionID,
|
||||
DataB64: base64.StdEncoding.EncodeToString([]byte("ls\n")),
|
||||
})
|
||||
bridge.handleFrame(protocol.MessageTypeTerminalData, inboundData)
|
||||
|
||||
// Allow Write to settle.
|
||||
deadline := time.Now().Add(time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
pty.mu.Lock()
|
||||
got := string(pty.written)
|
||||
pty.mu.Unlock()
|
||||
if got == "ls\n" {
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
pty.mu.Lock()
|
||||
if string(pty.written) != "ls\n" {
|
||||
t.Errorf("PTY received %q, want %q", pty.written, "ls\n")
|
||||
}
|
||||
pty.mu.Unlock()
|
||||
|
||||
// Close from the client side. The bridge should propagate via
|
||||
// session.Close → waitLoop → terminal.exit.
|
||||
closePayload, _ := json.Marshal(protocol.TerminalClosePayload{
|
||||
SessionID: sessionID,
|
||||
Reason: "test",
|
||||
})
|
||||
bridge.handleFrame(protocol.MessageTypeTerminalClose, closePayload)
|
||||
|
||||
sender.waitFor(t, protocol.MessageTypeTerminalExit, time.Second)
|
||||
}
|
||||
|
||||
// stubSpawner returns a single pre-built PTY on the first Start. callCount
|
||||
// lets tests assert that no spawn happened on a reject path.
|
||||
type stubSpawner struct {
|
||||
pty *fakeBridgePTY
|
||||
mu sync.Mutex
|
||||
calls int
|
||||
}
|
||||
|
||||
func (s *stubSpawner) Start(_ terminal.SpawnRequest) (terminal.PTY, error) {
|
||||
s.mu.Lock()
|
||||
s.calls++
|
||||
s.mu.Unlock()
|
||||
return s.pty, nil
|
||||
}
|
||||
|
||||
func (s *stubSpawner) callCount() int {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.calls
|
||||
}
|
||||
@@ -116,6 +116,13 @@ func (d *Daemon) runTaskWakeupConnection(ctx context.Context, runtimeIDs []strin
|
||||
writerDone := make(chan struct{})
|
||||
go d.runWSWriter(conn, writes, writerDone)
|
||||
|
||||
// Expose this connection's writer to the terminal bridge so it can push
|
||||
// terminal.* frames back to the server. The defer in the parent cleanup
|
||||
// block clears the pointer (and tears any live PTYs down) before the
|
||||
// writes channel is closed.
|
||||
d.installWSWrites(writes)
|
||||
defer d.clearWSWrites()
|
||||
|
||||
heartbeatCtx, cancelHeartbeat := context.WithCancel(ctx)
|
||||
hbDone := make(chan struct{})
|
||||
go func() {
|
||||
@@ -287,6 +294,13 @@ func (d *Daemon) readTaskWakeupMessages(conn *websocket.Conn, taskWakeups chan<-
|
||||
continue
|
||||
}
|
||||
d.handleWSHeartbeatAck(context.Background(), &ack)
|
||||
case protocol.MessageTypeTerminalOpen,
|
||||
protocol.MessageTypeTerminalData,
|
||||
protocol.MessageTypeTerminalResize,
|
||||
protocol.MessageTypeTerminalClose:
|
||||
if bridge := d.currentTerminalBridge(); bridge != nil {
|
||||
bridge.handleFrame(msg.Type, msg.Payload)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,6 +82,9 @@ type Hub struct {
|
||||
|
||||
hbMu sync.RWMutex
|
||||
onHeartbeat HeartbeatHandler
|
||||
|
||||
termMu sync.RWMutex
|
||||
termRouter *TerminalRouter
|
||||
}
|
||||
|
||||
func NewHub() *Hub {
|
||||
@@ -146,7 +149,12 @@ func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request, identity C
|
||||
c := &client{
|
||||
hub: h,
|
||||
conn: conn,
|
||||
send: make(chan []byte, 16),
|
||||
// Buffer sized for PTY traffic: terminal.data frames carry up to a
|
||||
// few KB each and the read pump from xterm.js can fall behind during
|
||||
// large bursts (build output, `cat` of a big file). 256 frames gives
|
||||
// ~1MB of slack before we start dropping; wakeup hints and heartbeat
|
||||
// acks coexist on the same queue but are tiny in comparison.
|
||||
send: make(chan []byte, 256),
|
||||
identity: identity,
|
||||
runtimes: runtimes,
|
||||
}
|
||||
@@ -320,7 +328,12 @@ func (c *client) readPump() {
|
||||
c.conn.Close()
|
||||
}()
|
||||
|
||||
c.conn.SetReadLimit(4096)
|
||||
// Terminal frames embed base64-encoded PTY chunks; readLoop on the
|
||||
// daemon side caps each chunk at 4KB raw, which is ~5.6KB base64 plus
|
||||
// JSON envelope. 64KB leaves headroom for future growth without
|
||||
// disconnecting daemons that briefly exceed the legacy 4KB heartbeat-
|
||||
// only ceiling.
|
||||
c.conn.SetReadLimit(64 * 1024)
|
||||
c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
c.conn.SetPongHandler(func(string) error {
|
||||
c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
@@ -348,6 +361,14 @@ func (c *client) handleFrame(raw []byte) {
|
||||
switch msg.Type {
|
||||
case protocol.EventDaemonHeartbeat:
|
||||
c.handleHeartbeatFrame(msg.Payload)
|
||||
case protocol.MessageTypeTerminalOpened,
|
||||
protocol.MessageTypeTerminalData,
|
||||
protocol.MessageTypeTerminalClose,
|
||||
protocol.MessageTypeTerminalExit,
|
||||
protocol.MessageTypeTerminalError:
|
||||
if router := c.hub.terminalRouter(); router != nil {
|
||||
router.Route(raw, msg.Type, msg.Payload)
|
||||
}
|
||||
default:
|
||||
// Unknown app messages are intentionally ignored for forward
|
||||
// compatibility with future daemon → server message types.
|
||||
|
||||
196
server/internal/daemonws/terminal.go
Normal file
196
server/internal/daemonws/terminal.go
Normal file
@@ -0,0 +1,196 @@
|
||||
package daemonws
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"sync"
|
||||
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
// TerminalSink receives terminal.* frames addressed to a single browser
|
||||
// WebSocket connection. The handler implementation owns the frame queue;
|
||||
// implementations must be non-blocking — the hub drops the frame if Deliver
|
||||
// returns false rather than back up the daemon read pump.
|
||||
type TerminalSink interface {
|
||||
Deliver(frame []byte) bool
|
||||
}
|
||||
|
||||
// TerminalRouter is the daemonws-side multiplex for terminal.* frames coming
|
||||
// back from a daemon. Browser proxy connections register under their pending
|
||||
// request_id and, after terminal.opened arrives, re-register under the
|
||||
// session_id the daemon picked.
|
||||
type TerminalRouter struct {
|
||||
mu sync.RWMutex
|
||||
sinks map[string]TerminalSink
|
||||
}
|
||||
|
||||
// NewTerminalRouter constructs an empty router. The Hub owns the only
|
||||
// instance in production; tests can build their own.
|
||||
func NewTerminalRouter() *TerminalRouter {
|
||||
return &TerminalRouter{sinks: make(map[string]TerminalSink)}
|
||||
}
|
||||
|
||||
// Register installs sink under the given key. The key is either a
|
||||
// request_id (before the daemon assigns a session_id) or a session_id
|
||||
// (after the open ack). Re-registering an existing key replaces the sink.
|
||||
func (r *TerminalRouter) Register(key string, sink TerminalSink) {
|
||||
if r == nil || key == "" || sink == nil {
|
||||
return
|
||||
}
|
||||
r.mu.Lock()
|
||||
r.sinks[key] = sink
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
// Unregister removes the sink for key, if any.
|
||||
func (r *TerminalRouter) Unregister(key string) {
|
||||
if r == nil || key == "" {
|
||||
return
|
||||
}
|
||||
r.mu.Lock()
|
||||
delete(r.sinks, key)
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
// Route extracts the routing key (request_id or session_id) from a
|
||||
// terminal.* frame and forwards the raw frame to the registered sink.
|
||||
// Unknown keys are dropped silently — the daemon-side session ultimately
|
||||
// observes the dead client via send-side errors / idle timeout.
|
||||
func (r *TerminalRouter) Route(frame []byte, msgType string, payload json.RawMessage) {
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
key := terminalRouteKey(msgType, payload)
|
||||
if key == "" {
|
||||
return
|
||||
}
|
||||
r.mu.RLock()
|
||||
sink := r.sinks[key]
|
||||
r.mu.RUnlock()
|
||||
if sink == nil {
|
||||
return
|
||||
}
|
||||
if !sink.Deliver(frame) {
|
||||
slog.Debug("daemon ws terminal frame dropped: slow sink", "type", msgType, "key", key)
|
||||
}
|
||||
}
|
||||
|
||||
func terminalRouteKey(msgType string, payload json.RawMessage) string {
|
||||
switch msgType {
|
||||
case protocol.MessageTypeTerminalOpened, protocol.MessageTypeTerminalError:
|
||||
var p struct {
|
||||
RequestID string `json:"request_id,omitempty"`
|
||||
SessionID string `json:"session_id,omitempty"`
|
||||
}
|
||||
if err := json.Unmarshal(payload, &p); err != nil {
|
||||
return ""
|
||||
}
|
||||
// terminal.error may carry either request_id (pre-open failure) or
|
||||
// session_id (post-open failure). Prefer request_id so the proxy
|
||||
// receives the failure on the same key it registered.
|
||||
if p.RequestID != "" {
|
||||
return p.RequestID
|
||||
}
|
||||
return p.SessionID
|
||||
case protocol.MessageTypeTerminalData,
|
||||
protocol.MessageTypeTerminalClose,
|
||||
protocol.MessageTypeTerminalExit:
|
||||
var p struct {
|
||||
SessionID string `json:"session_id"`
|
||||
}
|
||||
if err := json.Unmarshal(payload, &p); err != nil {
|
||||
return ""
|
||||
}
|
||||
return p.SessionID
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// SetTerminalRouter installs an explicit router. Tests use this to inject a
|
||||
// fake; production code can rely on the auto-created router exposed by
|
||||
// TerminalRouter() below.
|
||||
func (h *Hub) SetTerminalRouter(r *TerminalRouter) {
|
||||
if h == nil {
|
||||
return
|
||||
}
|
||||
h.termMu.Lock()
|
||||
h.termRouter = r
|
||||
h.termMu.Unlock()
|
||||
}
|
||||
|
||||
// TerminalRouter returns the hub's router, creating one on first access.
|
||||
// The browser proxy WS handler grabs this to register per-session sinks;
|
||||
// production wiring does not need an explicit SetTerminalRouter call.
|
||||
func (h *Hub) TerminalRouter() *TerminalRouter {
|
||||
if h == nil {
|
||||
return nil
|
||||
}
|
||||
h.termMu.RLock()
|
||||
r := h.termRouter
|
||||
h.termMu.RUnlock()
|
||||
if r != nil {
|
||||
return r
|
||||
}
|
||||
h.termMu.Lock()
|
||||
defer h.termMu.Unlock()
|
||||
if h.termRouter == nil {
|
||||
h.termRouter = NewTerminalRouter()
|
||||
}
|
||||
return h.termRouter
|
||||
}
|
||||
|
||||
// terminalRouter is the internal read-only accessor used by handleFrame.
|
||||
// It returns nil if no router has been configured, which short-circuits
|
||||
// dispatch — the auto-create only happens through the public accessor.
|
||||
func (h *Hub) terminalRouter() *TerminalRouter {
|
||||
h.termMu.RLock()
|
||||
defer h.termMu.RUnlock()
|
||||
return h.termRouter
|
||||
}
|
||||
|
||||
// ErrNoDaemonForRuntime is returned by SendToRuntime when no daemon is
|
||||
// currently connected for the given runtime_id. The browser proxy uses this
|
||||
// to fail the open request with a clear error.
|
||||
var ErrNoDaemonForRuntime = errors.New("daemonws: no daemon connected for runtime")
|
||||
|
||||
// SendToRuntime delivers a raw frame to one daemon connection serving
|
||||
// runtimeID. If multiple daemons are registered (rare — usually one per
|
||||
// runtime), the first one wins. Returns ErrNoDaemonForRuntime when no
|
||||
// connection exists, or a "buffer full" error when the daemon's outbound
|
||||
// queue is saturated — callers should surface that as a transient failure
|
||||
// to the browser rather than retrying tightly.
|
||||
func (h *Hub) SendToRuntime(runtimeID string, frame []byte) error {
|
||||
if h == nil || runtimeID == "" {
|
||||
return ErrNoDaemonForRuntime
|
||||
}
|
||||
h.mu.RLock()
|
||||
var target *client
|
||||
for c := range h.byRuntime[runtimeID] {
|
||||
target = c
|
||||
break
|
||||
}
|
||||
h.mu.RUnlock()
|
||||
if target == nil {
|
||||
return ErrNoDaemonForRuntime
|
||||
}
|
||||
if !target.trySend(frame) {
|
||||
return errors.New("daemonws: daemon send buffer full")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// trySend pushes frame onto the client's outbound queue without blocking.
|
||||
// Returns false if the buffer is saturated. We deliberately do not evict
|
||||
// the connection here — terminal back-pressure should slow the producing
|
||||
// browser/server side, not tear down the entire daemonws connection (which
|
||||
// would also break heartbeat + wakeup delivery for unrelated runtimes).
|
||||
func (c *client) trySend(frame []byte) bool {
|
||||
select {
|
||||
case c.send <- frame:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
95
server/internal/daemonws/terminal_test.go
Normal file
95
server/internal/daemonws/terminal_test.go
Normal file
@@ -0,0 +1,95 @@
|
||||
package daemonws
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
type collectingSink struct {
|
||||
frames [][]byte
|
||||
}
|
||||
|
||||
func (s *collectingSink) Deliver(frame []byte) bool {
|
||||
cp := make([]byte, len(frame))
|
||||
copy(cp, frame)
|
||||
s.frames = append(s.frames, cp)
|
||||
return true
|
||||
}
|
||||
|
||||
func TestTerminalRouter_RoutesByRequestThenSession(t *testing.T) {
|
||||
router := NewTerminalRouter()
|
||||
sink := &collectingSink{}
|
||||
router.Register("req-1", sink)
|
||||
|
||||
// terminal.error before the daemon picked a session_id: routed on
|
||||
// request_id. This is the failure path browsers see when the daemon
|
||||
// can't spawn a PTY (e.g. ErrUnsupportedOS on windows).
|
||||
errFrame := mustEncode(t, protocol.MessageTypeTerminalError, protocol.TerminalErrorPayload{
|
||||
RequestID: "req-1",
|
||||
Code: protocol.TerminalErrorCodeUnsupportedOS,
|
||||
Message: "no PTY on windows",
|
||||
})
|
||||
router.Route(errFrame, protocol.MessageTypeTerminalError, mustPayload(t, errFrame))
|
||||
|
||||
if got, want := len(sink.frames), 1; got != want {
|
||||
t.Fatalf("delivered = %d, want %d", got, want)
|
||||
}
|
||||
|
||||
// Re-key to session_id after a hypothetical terminal.opened.
|
||||
router.Register("sess-1", sink)
|
||||
router.Unregister("req-1")
|
||||
|
||||
dataFrame := mustEncode(t, protocol.MessageTypeTerminalData, protocol.TerminalDataPayload{
|
||||
SessionID: "sess-1",
|
||||
DataB64: "Zm9vYmFy",
|
||||
})
|
||||
router.Route(dataFrame, protocol.MessageTypeTerminalData, mustPayload(t, dataFrame))
|
||||
if got, want := len(sink.frames), 2; got != want {
|
||||
t.Fatalf("after data delivered = %d, want %d", got, want)
|
||||
}
|
||||
|
||||
// Frames for an unknown session must drop silently — never panic, and
|
||||
// never leak into the wrong sink.
|
||||
strayFrame := mustEncode(t, protocol.MessageTypeTerminalExit, protocol.TerminalExitPayload{
|
||||
SessionID: "sess-2-unknown",
|
||||
ExitCode: 0,
|
||||
})
|
||||
router.Route(strayFrame, protocol.MessageTypeTerminalExit, mustPayload(t, strayFrame))
|
||||
if got, want := len(sink.frames), 2; got != want {
|
||||
t.Fatalf("stray frame delivered to wrong sink: %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTerminalRouter_UnknownSessionDropsSilently(t *testing.T) {
|
||||
router := NewTerminalRouter()
|
||||
frame := mustEncode(t, protocol.MessageTypeTerminalData, protocol.TerminalDataPayload{
|
||||
SessionID: "ghost",
|
||||
DataB64: "Zm9v",
|
||||
})
|
||||
// Should not panic / not deliver anywhere.
|
||||
router.Route(frame, protocol.MessageTypeTerminalData, mustPayload(t, frame))
|
||||
}
|
||||
|
||||
func mustEncode(t *testing.T, msgType string, payload any) []byte {
|
||||
t.Helper()
|
||||
raw, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal payload: %v", err)
|
||||
}
|
||||
frame, err := json.Marshal(protocol.Message{Type: msgType, Payload: raw})
|
||||
if err != nil {
|
||||
t.Fatalf("marshal envelope: %v", err)
|
||||
}
|
||||
return frame
|
||||
}
|
||||
|
||||
func mustPayload(t *testing.T, envelope []byte) json.RawMessage {
|
||||
t.Helper()
|
||||
var m protocol.Message
|
||||
if err := json.Unmarshal(envelope, &m); err != nil {
|
||||
t.Fatalf("unmarshal envelope: %v", err)
|
||||
}
|
||||
return m.Payload
|
||||
}
|
||||
582
server/internal/handler/terminal_ws.go
Normal file
582
server/internal/handler/terminal_ws.go
Normal file
@@ -0,0 +1,582 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/google/uuid"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/auth"
|
||||
"github.com/multica-ai/multica/server/internal/daemonws"
|
||||
"github.com/multica-ai/multica/server/internal/util"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
|
||||
"github.com/golang-jwt/jwt/v5"
|
||||
)
|
||||
|
||||
// terminalWriteWait caps how long a single WriteMessage may block before
|
||||
// we tear the browser connection down as slow-client. Matches the daemonws
|
||||
// hub's writeWait so back-pressure semantics are consistent end-to-end.
|
||||
const terminalWriteWait = 10 * time.Second
|
||||
|
||||
// terminalOpenTimeout caps how long the proxy waits for the daemon to
|
||||
// respond to a terminal.open request with terminal.opened or terminal.error.
|
||||
// 5s is generous: PTY spawn is local and synchronous on the daemon side.
|
||||
const terminalOpenTimeout = 5 * time.Second
|
||||
|
||||
var terminalUpgrader = websocket.Upgrader{
|
||||
// Browser cookie auth has already happened by the time we get here;
|
||||
// cross-site cookie attacks are mitigated by SameSite=Lax on the auth
|
||||
// cookie itself (see auth.SetAuthCookie). Re-evaluate if we ever
|
||||
// support Authorization-header auth on this endpoint.
|
||||
CheckOrigin: func(r *http.Request) bool { return true },
|
||||
}
|
||||
|
||||
// HandleIssueTerminalWS proxies a browser WebSocket onto a PTY running on
|
||||
// the daemon hosting the issue's most-recent agent task. The flow per
|
||||
// connection:
|
||||
//
|
||||
// 1. Authenticate the user (cookie JWT preferred; first-message auth as
|
||||
// fallback for clients that cannot set cookies — e.g. some Desktop
|
||||
// dev modes).
|
||||
// 2. Resolve issue → workspace → latest task with a non-empty work_dir +
|
||||
// runtime_id. Fail closed if no such task exists; users see a clear
|
||||
// "no task to attach to" error instead of a silent hang.
|
||||
// 3. Register a sink on the daemonws TerminalRouter under a fresh
|
||||
// request_id, then send terminal.open to the daemon.
|
||||
// 4. On terminal.opened: re-register the sink under the session_id the
|
||||
// daemon picked, drop the request_id route, and start the bidirectional
|
||||
// pump until either side closes.
|
||||
// 5. On disconnect: send terminal.close so the daemon tears the PTY down
|
||||
// promptly rather than waiting for its idle sweep.
|
||||
func (h *Handler) HandleIssueTerminalWS(w http.ResponseWriter, r *http.Request) {
|
||||
if h.DaemonHub == nil {
|
||||
http.Error(w, `{"error":"terminal proxy not configured"}`, http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
workspaceID := r.URL.Query().Get("workspace_id")
|
||||
if workspaceID == "" {
|
||||
http.Error(w, `{"error":"workspace_id required"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
wsUUID, err := util.ParseUUID(workspaceID)
|
||||
if err != nil {
|
||||
http.Error(w, `{"error":"invalid workspace_id"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
issueParam := chi.URLParam(r, "issue_id")
|
||||
if issueParam == "" {
|
||||
http.Error(w, `{"error":"issue_id required"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
userID, errMsg := terminalAuthCookie(r, h)
|
||||
if errMsg != "" && userID == "" {
|
||||
// No cookie or invalid cookie. Defer auth to the first WS frame.
|
||||
}
|
||||
if userID != "" && !h.terminalIsMember(r.Context(), userID, workspaceID) {
|
||||
http.Error(w, `{"error":"not a member of this workspace"}`, http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
conn, err := terminalUpgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
slog.Error("terminal ws upgrade failed", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
if userID == "" {
|
||||
uid, errMsg := terminalFirstFrameAuth(conn, h)
|
||||
if errMsg != "" {
|
||||
_ = conn.WriteMessage(websocket.TextMessage, []byte(errMsg))
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
if !h.terminalIsMember(r.Context(), uid, workspaceID) {
|
||||
_ = conn.WriteMessage(websocket.TextMessage, []byte(`{"error":"not a member of this workspace"}`))
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
userID = uid
|
||||
_ = conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"auth_ack"}`))
|
||||
}
|
||||
|
||||
issue, ok := h.terminalResolveIssue(r.Context(), issueParam, wsUUID)
|
||||
if !ok {
|
||||
sendTerminalErrorAndClose(conn, "", "", protocol.TerminalErrorCodeTaskNotFound, "issue not found")
|
||||
return
|
||||
}
|
||||
|
||||
task, ok := h.terminalLatestAttachableTask(r.Context(), issue.ID)
|
||||
if !ok {
|
||||
sendTerminalErrorAndClose(conn, "", "", protocol.TerminalErrorCodeTaskNotFound, "no agent task has run on this issue yet — trigger a run first")
|
||||
return
|
||||
}
|
||||
|
||||
cols := parseUint16Query(r, "cols", 80)
|
||||
rows := parseUint16Query(r, "rows", 24)
|
||||
|
||||
proxy := newTerminalProxy(conn, h.DaemonHub, userID, util.UUIDToString(task.RuntimeID), util.UUIDToString(task.ID), workspaceID, util.UUIDToString(issue.ID), task.SessionID.String, task.WorkDir.String, cols, rows)
|
||||
proxy.run()
|
||||
}
|
||||
|
||||
// terminalProxy is the per-connection bridge between browser and daemon.
|
||||
// Methods that touch the WS connection do so from one of two goroutines
|
||||
// only: writePump owns conn writes and readPump owns conn reads, matching
|
||||
// gorilla/websocket's single-goroutine-per-direction contract.
|
||||
type terminalProxy struct {
|
||||
conn *websocket.Conn
|
||||
hub *daemonws.Hub
|
||||
userID string
|
||||
runtimeID string
|
||||
taskID string
|
||||
workspaceID string
|
||||
issueID string
|
||||
priorSess string
|
||||
workDir string
|
||||
|
||||
requestID string
|
||||
|
||||
mu sync.Mutex
|
||||
sessionID string
|
||||
closedOnce sync.Once
|
||||
closeCh chan struct{}
|
||||
sendCh chan []byte
|
||||
|
||||
openedCh chan struct{}
|
||||
openErr chan terminalOpenFailure
|
||||
}
|
||||
|
||||
type terminalOpenFailure struct {
|
||||
code string
|
||||
message string
|
||||
}
|
||||
|
||||
func newTerminalProxy(conn *websocket.Conn, hub *daemonws.Hub, userID, runtimeID, taskID, workspaceID, issueID, priorSess, workDir string, cols, rows uint16) *terminalProxy {
|
||||
return &terminalProxy{
|
||||
conn: conn,
|
||||
hub: hub,
|
||||
userID: userID,
|
||||
runtimeID: runtimeID,
|
||||
taskID: taskID,
|
||||
workspaceID: workspaceID,
|
||||
issueID: issueID,
|
||||
priorSess: priorSess,
|
||||
workDir: workDir,
|
||||
requestID: uuid.NewString(),
|
||||
closeCh: make(chan struct{}),
|
||||
sendCh: make(chan []byte, 256),
|
||||
openedCh: make(chan struct{}),
|
||||
openErr: make(chan terminalOpenFailure, 1),
|
||||
}
|
||||
}
|
||||
|
||||
// Deliver implements daemonws.TerminalSink. The daemon hub's read pump
|
||||
// invokes this for every terminal.* frame addressed to our request_id /
|
||||
// session_id. We must stay non-blocking; the hub drops the frame on a
|
||||
// full buffer rather than stalling its own pump.
|
||||
func (p *terminalProxy) Deliver(frame []byte) bool {
|
||||
select {
|
||||
case p.sendCh <- frame:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (p *terminalProxy) run() {
|
||||
defer p.conn.Close()
|
||||
|
||||
router := p.hub.TerminalRouter()
|
||||
router.Register(p.requestID, p)
|
||||
defer router.Unregister(p.requestID)
|
||||
|
||||
go p.writePump()
|
||||
|
||||
if err := p.sendOpenToDaemon(); err != nil {
|
||||
// Couldn't reach the daemon at all — surface a structured error and
|
||||
// bail before we register cleanup paths that assume a live session.
|
||||
sendTerminalErrorOverChannel(p.sendCh, p.requestID, "", protocol.TerminalErrorCodeInternal, err.Error())
|
||||
<-time.After(50 * time.Millisecond) // give writePump a tick to flush
|
||||
p.shutdown()
|
||||
return
|
||||
}
|
||||
|
||||
// Block until the open ack arrives, the open is rejected, or the user
|
||||
// disconnects mid-handshake. After this point sessionID is stable and
|
||||
// we are routing on session_id rather than request_id.
|
||||
select {
|
||||
case <-p.openedCh:
|
||||
// Open succeeded. The router is already re-keyed on session_id by
|
||||
// observeOpen. Fall through to the bidirectional pump.
|
||||
case failure := <-p.openErr:
|
||||
sendTerminalErrorOverChannel(p.sendCh, p.requestID, "", failure.code, failure.message)
|
||||
<-time.After(50 * time.Millisecond)
|
||||
p.shutdown()
|
||||
return
|
||||
case <-p.closeCh:
|
||||
return
|
||||
case <-time.After(terminalOpenTimeout):
|
||||
sendTerminalErrorOverChannel(p.sendCh, p.requestID, "", protocol.TerminalErrorCodeInternal, "daemon did not respond to terminal.open within timeout")
|
||||
<-time.After(50 * time.Millisecond)
|
||||
p.shutdown()
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
sid := p.SessionID()
|
||||
if sid != "" {
|
||||
router.Unregister(sid)
|
||||
// Best-effort teardown on the daemon. If the connection to the
|
||||
// daemon is already gone, the daemon's own clearWSWrites path
|
||||
// will close the session — we just lose an idle slot's worth of
|
||||
// latency before the GC catches it.
|
||||
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalClose, protocol.TerminalClosePayload{
|
||||
SessionID: sid,
|
||||
Reason: "browser_disconnect",
|
||||
})
|
||||
if err == nil {
|
||||
_ = p.hub.SendToRuntime(p.runtimeID, frame)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
p.readPump()
|
||||
}
|
||||
|
||||
func (p *terminalProxy) SessionID() string {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
return p.sessionID
|
||||
}
|
||||
|
||||
func (p *terminalProxy) sendOpenToDaemon() error {
|
||||
payload := protocol.TerminalOpenPayload{
|
||||
RequestID: p.requestID,
|
||||
TaskID: p.taskID,
|
||||
WorkspaceID: p.workspaceID,
|
||||
UserID: p.userID,
|
||||
IssueID: p.issueID,
|
||||
WorkDir: p.workDir,
|
||||
PriorSessionID: p.priorSess,
|
||||
Cols: 80,
|
||||
Rows: 24,
|
||||
}
|
||||
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalOpen, payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := p.hub.SendToRuntime(p.runtimeID, frame); err != nil {
|
||||
if errors.Is(err, daemonws.ErrNoDaemonForRuntime) {
|
||||
return errors.New("daemon offline for this runtime — start the agent's daemon and retry")
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// readPump reads browser frames and forwards terminal.data/resize/close to
|
||||
// the daemon. Unknown frames are dropped silently. Returns when the
|
||||
// connection closes; we then trigger shutdown so writePump exits too.
|
||||
func (p *terminalProxy) readPump() {
|
||||
defer p.shutdown()
|
||||
p.conn.SetReadLimit(64 * 1024)
|
||||
for {
|
||||
_, raw, err := p.conn.ReadMessage()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var env protocol.Message
|
||||
if err := json.Unmarshal(raw, &env); err != nil {
|
||||
continue
|
||||
}
|
||||
sid := p.SessionID()
|
||||
// We force-stamp session_id on outbound frames: the browser may have
|
||||
// sent its own session_id, but only the one the daemon assigned is
|
||||
// trusted. This prevents a misbehaving client from addressing a
|
||||
// session that another user opened against the same daemon.
|
||||
switch env.Type {
|
||||
case protocol.MessageTypeTerminalData:
|
||||
var pl protocol.TerminalDataPayload
|
||||
if err := json.Unmarshal(env.Payload, &pl); err != nil {
|
||||
continue
|
||||
}
|
||||
pl.SessionID = sid
|
||||
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalData, pl)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
_ = p.hub.SendToRuntime(p.runtimeID, frame)
|
||||
case protocol.MessageTypeTerminalResize:
|
||||
var pl protocol.TerminalResizePayload
|
||||
if err := json.Unmarshal(env.Payload, &pl); err != nil {
|
||||
continue
|
||||
}
|
||||
pl.SessionID = sid
|
||||
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalResize, pl)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
_ = p.hub.SendToRuntime(p.runtimeID, frame)
|
||||
case protocol.MessageTypeTerminalClose:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// writePump is the single owner of conn writes. It also watches for the
|
||||
// terminal.opened / terminal.error handshake transition while it pumps,
|
||||
// because separating those into a second goroutine would race over
|
||||
// sendCh (only one reader per channel value).
|
||||
//
|
||||
// Once the session is open, this is just a straight relay. During the
|
||||
// open window (sessionID empty), every frame is forwarded to the browser
|
||||
// AND inspected — opened/error frames feed openedCh / openErr so run()
|
||||
// can unblock or fail the handshake.
|
||||
func (p *terminalProxy) writePump() {
|
||||
router := p.hub.TerminalRouter()
|
||||
for {
|
||||
select {
|
||||
case <-p.closeCh:
|
||||
return
|
||||
case frame, ok := <-p.sendCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
p.forwardToBrowser(frame)
|
||||
if p.SessionID() != "" {
|
||||
continue
|
||||
}
|
||||
var env protocol.Message
|
||||
if err := json.Unmarshal(frame, &env); err != nil {
|
||||
continue
|
||||
}
|
||||
switch env.Type {
|
||||
case protocol.MessageTypeTerminalOpened:
|
||||
var op protocol.TerminalOpenedPayload
|
||||
if err := json.Unmarshal(env.Payload, &op); err == nil && op.SessionID != "" {
|
||||
p.mu.Lock()
|
||||
p.sessionID = op.SessionID
|
||||
p.mu.Unlock()
|
||||
router.Register(op.SessionID, p)
|
||||
router.Unregister(p.requestID)
|
||||
close(p.openedCh)
|
||||
}
|
||||
case protocol.MessageTypeTerminalError:
|
||||
var ep protocol.TerminalErrorPayload
|
||||
if err := json.Unmarshal(env.Payload, &ep); err == nil {
|
||||
select {
|
||||
case p.openErr <- terminalOpenFailure{code: ep.Code, message: ep.Message}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *terminalProxy) forwardToBrowser(frame []byte) {
|
||||
p.conn.SetWriteDeadline(time.Now().Add(terminalWriteWait))
|
||||
if err := p.conn.WriteMessage(websocket.TextMessage, frame); err != nil {
|
||||
slog.Debug("terminal ws write to browser failed", "error", err)
|
||||
p.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *terminalProxy) shutdown() {
|
||||
p.closedOnce.Do(func() {
|
||||
close(p.closeCh)
|
||||
})
|
||||
}
|
||||
|
||||
func sendTerminalErrorOverChannel(ch chan<- []byte, requestID, sessionID, code, message string) {
|
||||
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalError, protocol.TerminalErrorPayload{
|
||||
RequestID: requestID,
|
||||
SessionID: sessionID,
|
||||
Code: code,
|
||||
Message: message,
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case ch <- frame:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func sendTerminalErrorAndClose(conn *websocket.Conn, requestID, sessionID, code, message string) {
|
||||
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalError, protocol.TerminalErrorPayload{
|
||||
RequestID: requestID,
|
||||
SessionID: sessionID,
|
||||
Code: code,
|
||||
Message: message,
|
||||
})
|
||||
if err == nil {
|
||||
_ = conn.WriteMessage(websocket.TextMessage, frame)
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
func marshalTerminalFrame(msgType string, payload any) ([]byte, error) {
|
||||
raw, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return json.Marshal(protocol.Message{Type: msgType, Payload: raw})
|
||||
}
|
||||
|
||||
func parseUint16Query(r *http.Request, key string, defaultVal uint16) uint16 {
|
||||
raw := r.URL.Query().Get(key)
|
||||
if raw == "" {
|
||||
return defaultVal
|
||||
}
|
||||
v, err := strconv.ParseUint(raw, 10, 16)
|
||||
if err != nil || v == 0 {
|
||||
return defaultVal
|
||||
}
|
||||
return uint16(v)
|
||||
}
|
||||
|
||||
// --- auth helpers (cookie + first-frame JWT/PAT) ---
|
||||
|
||||
func terminalAuthCookie(r *http.Request, h *Handler) (string, string) {
|
||||
cookie, err := r.Cookie(auth.AuthCookieName)
|
||||
if err != nil || cookie.Value == "" {
|
||||
return "", ""
|
||||
}
|
||||
return terminalAuthToken(cookie.Value, h, r.Context())
|
||||
}
|
||||
|
||||
func terminalFirstFrameAuth(conn *websocket.Conn, h *Handler) (string, string) {
|
||||
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
|
||||
defer conn.SetReadDeadline(time.Time{})
|
||||
_, raw, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
return "", `{"error":"auth timeout or read error"}`
|
||||
}
|
||||
var msg struct {
|
||||
Type string `json:"type"`
|
||||
Payload struct {
|
||||
Token string `json:"token"`
|
||||
} `json:"payload"`
|
||||
}
|
||||
if err := json.Unmarshal(raw, &msg); err != nil || msg.Type != "auth" || msg.Payload.Token == "" {
|
||||
return "", `{"error":"expected auth message as first frame"}`
|
||||
}
|
||||
return terminalAuthToken(msg.Payload.Token, h, context.Background())
|
||||
}
|
||||
|
||||
func terminalAuthToken(tokenStr string, h *Handler, ctx context.Context) (string, string) {
|
||||
if len(tokenStr) > 4 && tokenStr[:4] == "mul_" {
|
||||
if h.PATCache == nil {
|
||||
pat, err := h.Queries.GetPersonalAccessTokenByHash(ctx, auth.HashToken(tokenStr))
|
||||
if err != nil {
|
||||
return "", `{"error":"invalid token"}`
|
||||
}
|
||||
return util.UUIDToString(pat.UserID), ""
|
||||
}
|
||||
hash := auth.HashToken(tokenStr)
|
||||
if uid, ok := h.PATCache.Get(ctx, hash); ok {
|
||||
return uid, ""
|
||||
}
|
||||
pat, err := h.Queries.GetPersonalAccessTokenByHash(ctx, hash)
|
||||
if err != nil {
|
||||
return "", `{"error":"invalid token"}`
|
||||
}
|
||||
uid := util.UUIDToString(pat.UserID)
|
||||
return uid, ""
|
||||
}
|
||||
token, err := jwt.Parse(tokenStr, func(token *jwt.Token) (any, error) {
|
||||
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
|
||||
return nil, jwt.ErrSignatureInvalid
|
||||
}
|
||||
return auth.JWTSecret(), nil
|
||||
})
|
||||
if err != nil || !token.Valid {
|
||||
return "", `{"error":"invalid token"}`
|
||||
}
|
||||
claims, ok := token.Claims.(jwt.MapClaims)
|
||||
if !ok {
|
||||
return "", `{"error":"invalid claims"}`
|
||||
}
|
||||
uid, ok := claims["sub"].(string)
|
||||
if !ok || uid == "" {
|
||||
return "", `{"error":"invalid claims"}`
|
||||
}
|
||||
return uid, ""
|
||||
}
|
||||
|
||||
func (h *Handler) terminalIsMember(ctx context.Context, userID, workspaceID string) bool {
|
||||
userUUID, err := util.ParseUUID(userID)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
wsUUID, err := util.ParseUUID(workspaceID)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
_, err = h.Queries.GetMemberByUserAndWorkspace(ctx, db.GetMemberByUserAndWorkspaceParams{
|
||||
UserID: userUUID,
|
||||
WorkspaceID: wsUUID,
|
||||
})
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (h *Handler) terminalResolveIssue(ctx context.Context, issueID string, wsUUID pgtype.UUID) (db.Issue, bool) {
|
||||
if parts := splitIdentifier(issueID); parts != nil {
|
||||
issue, err := h.Queries.GetIssueByNumber(ctx, db.GetIssueByNumberParams{
|
||||
WorkspaceID: wsUUID,
|
||||
Number: parts.number,
|
||||
})
|
||||
if err == nil {
|
||||
return issue, true
|
||||
}
|
||||
}
|
||||
issueUUID, err := util.ParseUUID(issueID)
|
||||
if err != nil {
|
||||
return db.Issue{}, false
|
||||
}
|
||||
issue, err := h.Queries.GetIssueInWorkspace(ctx, db.GetIssueInWorkspaceParams{
|
||||
ID: issueUUID,
|
||||
WorkspaceID: wsUUID,
|
||||
})
|
||||
if err != nil {
|
||||
return db.Issue{}, false
|
||||
}
|
||||
return issue, true
|
||||
}
|
||||
|
||||
// terminalLatestAttachableTask returns the most recent task on the issue
|
||||
// that the proxy can attach to: must have a known work_dir, a runtime_id,
|
||||
// and a daemon currently connected for that runtime. Falls back through
|
||||
// the task history rather than picking only the absolute latest, because
|
||||
// the most-recent row may be a queued task that never ran (no workdir yet).
|
||||
func (h *Handler) terminalLatestAttachableTask(ctx context.Context, issueID pgtype.UUID) (db.AgentTaskQueue, bool) {
|
||||
tasks, err := h.Queries.ListTasksByIssue(ctx, issueID)
|
||||
if err != nil {
|
||||
return db.AgentTaskQueue{}, false
|
||||
}
|
||||
for _, t := range tasks {
|
||||
if !t.WorkDir.Valid || t.WorkDir.String == "" {
|
||||
continue
|
||||
}
|
||||
if !t.RuntimeID.Valid {
|
||||
continue
|
||||
}
|
||||
return t, true
|
||||
}
|
||||
return db.AgentTaskQueue{}, false
|
||||
}
|
||||
@@ -168,3 +168,99 @@ type DaemonHeartbeatPendingLocalSkillImport struct {
|
||||
ID string `json:"id"`
|
||||
SkillKey string `json:"skill_key"`
|
||||
}
|
||||
|
||||
// Terminal WS message types. These flow over the existing daemonws hub
|
||||
// between client (web/desktop/CLI) and daemon. Bytes payloads are base64
|
||||
// encoded so they can travel as JSON text frames without binary framing.
|
||||
const (
|
||||
// TerminalOpen — client → daemon: request a new PTY session bound to a task workdir.
|
||||
MessageTypeTerminalOpen = "terminal.open"
|
||||
// TerminalOpened — daemon → client: ack carrying the session_id and resolved workdir.
|
||||
MessageTypeTerminalOpened = "terminal.opened"
|
||||
// TerminalData — bidirectional: PTY stdin (client→daemon) / stdout+stderr (daemon→client).
|
||||
MessageTypeTerminalData = "terminal.data"
|
||||
// TerminalResize — client → daemon: window-size change.
|
||||
MessageTypeTerminalResize = "terminal.resize"
|
||||
// TerminalClose — bidirectional: explicit teardown request / ack.
|
||||
MessageTypeTerminalClose = "terminal.close"
|
||||
// TerminalExit — daemon → client: child process exited; carries exit code and optional reason.
|
||||
MessageTypeTerminalExit = "terminal.exit"
|
||||
// TerminalError — daemon → client: open/resize/etc. failed; carries human-readable code+message.
|
||||
MessageTypeTerminalError = "terminal.error"
|
||||
)
|
||||
|
||||
// TerminalOpenPayload requests a PTY session bound to the given task's
|
||||
// workdir. WorkspaceID is the workspace the caller is acting in; the daemon
|
||||
// must reject if it does not match the task's workspace.
|
||||
//
|
||||
// The server resolves WorkDir / IssueID / PriorSessionID from its own DB
|
||||
// (the daemon has no persistent task cache) and embeds them here before
|
||||
// forwarding to the daemon. The daemon trusts these fields because the
|
||||
// daemonws connection is already authenticated and scoped — but it still
|
||||
// rechecks WorkspaceID against the request body to catch a misrouted frame.
|
||||
type TerminalOpenPayload struct {
|
||||
RequestID string `json:"request_id"`
|
||||
TaskID string `json:"task_id"`
|
||||
WorkspaceID string `json:"workspace_id"`
|
||||
UserID string `json:"user_id,omitempty"`
|
||||
IssueID string `json:"issue_id,omitempty"`
|
||||
WorkDir string `json:"work_dir,omitempty"`
|
||||
PriorSessionID string `json:"prior_session_id,omitempty"`
|
||||
Cols uint16 `json:"cols"`
|
||||
Rows uint16 `json:"rows"`
|
||||
}
|
||||
|
||||
// TerminalOpenedPayload echoes the request_id and carries the session_id the
|
||||
// client must include on subsequent data/resize/close frames.
|
||||
type TerminalOpenedPayload struct {
|
||||
RequestID string `json:"request_id"`
|
||||
SessionID string `json:"session_id"`
|
||||
WorkDir string `json:"work_dir"`
|
||||
Shell string `json:"shell"`
|
||||
}
|
||||
|
||||
// TerminalDataPayload carries raw PTY bytes in base64.
|
||||
type TerminalDataPayload struct {
|
||||
SessionID string `json:"session_id"`
|
||||
DataB64 string `json:"data_b64"`
|
||||
}
|
||||
|
||||
// TerminalResizePayload updates the PTY window size.
|
||||
type TerminalResizePayload struct {
|
||||
SessionID string `json:"session_id"`
|
||||
Cols uint16 `json:"cols"`
|
||||
Rows uint16 `json:"rows"`
|
||||
}
|
||||
|
||||
// TerminalClosePayload requests teardown. Reason is informational.
|
||||
type TerminalClosePayload struct {
|
||||
SessionID string `json:"session_id"`
|
||||
Reason string `json:"reason,omitempty"`
|
||||
}
|
||||
|
||||
// TerminalExitPayload signals the child process exited.
|
||||
type TerminalExitPayload struct {
|
||||
SessionID string `json:"session_id"`
|
||||
ExitCode int `json:"exit_code"`
|
||||
Reason string `json:"reason,omitempty"`
|
||||
}
|
||||
|
||||
// Terminal error codes returned in TerminalErrorPayload.Code.
|
||||
const (
|
||||
TerminalErrorCodeWorkspaceMismatch = "workspace_mismatch"
|
||||
TerminalErrorCodeTaskNotFound = "task_not_found"
|
||||
TerminalErrorCodeSessionNotFound = "session_not_found"
|
||||
TerminalErrorCodeUnsupportedOS = "unsupported_os"
|
||||
TerminalErrorCodeSpawnFailed = "spawn_failed"
|
||||
TerminalErrorCodeInternal = "internal"
|
||||
)
|
||||
|
||||
// TerminalErrorPayload reports a failure. RequestID is set when the error
|
||||
// is a response to a specific open request; SessionID is set when it
|
||||
// references an already-established session.
|
||||
type TerminalErrorPayload struct {
|
||||
RequestID string `json:"request_id,omitempty"`
|
||||
SessionID string `json:"session_id,omitempty"`
|
||||
Code string `json:"code"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user