Compare commits

...

9 Commits

Author SHA1 Message Date
Jiayuan Zhang
92de9bea78 fix(terminal): Phase 4 review round 1 — start-hook ordering, immediate close, GetJSON HTTPError (MUL-2295)
- Manager.openWith now fires OnSessionStart before sess.start(); if the
  child exits before the hook ran, OnSessionStop would otherwise race
  ahead and unmark an env root that was never marked, then OnSessionStart
  would mark it forever. Added a regression test that uses a pre-closed
  waitDone to force immediate exit; with a 20ms delay inside the start
  hook it deterministically fails on the old ordering.
- terminalProxy.writePump now calls audit.RecordClose immediately on
  terminal.exit, not just from run()'s defer. CloseTerminalSession is
  idempotent (WHERE ended_at IS NULL) so the browser_disconnect fallback
  becomes a no-op when fired later; without this, a client that keeps
  the WS open after exit would leave ended_at NULL and `multica issue
  runs` would render exited terminals as active.
- cli.APIClient.GetJSON / GetJSONWithHeaders now return *HTTPError
  (matching PostJSON), so callers can errors.As to distinguish 404 from
  other failures. runIssueRuns only swallows 404 (old server, no
  terminal-sessions endpoint); 500 / auth / network failures now print
  a warning to stderr instead of silently downgrading to "no terminal
  rows".

Co-authored-by: multica-agent <github@multica.ai>
2026-05-16 19:11:38 +08:00
Jiayuan Zhang
0529d28133 feat(terminal): Phase 4 — GC hook, audit log, issue runs entry (MUL-2295)
- terminal.Manager: OnSessionStart/OnSessionStop hooks fire around the
  manager's register/deregister so callers can safely mark/unmark
  external state (GC protection, audit rows) without racing Get/Done.
- daemon: wires the hooks to markActiveEnvRoot(filepath.Dir(workDir))
  so an idle terminal on a done/cancelled issue can't have its workdir
  reclaimed mid-session, and emits structured slog audit records
  (user_id/task_id/duration; no keystrokes — RFC §Auth).
- server: persists every PTY open/close to a new terminal_sessions
  table (migration 091) as the source behind the audit log and the
  new `type=terminal` rows in `multica issue runs`. New endpoint
  GET /api/issues/{id}/terminal-sessions.
- CLI: `multica issue runs` merges the agent task runs feed with the
  terminal sessions feed, sorted by started_at; terminal rows render
  with agent="terminal" and close_reason in the ERROR column. Old
  servers without the endpoint degrade silently.
- server/handler/terminal_ws.go: pass parsed Cols/Rows query values
  to sendOpenToDaemon so the first PTY frame matches the client's
  viewport; post-open resize is now just a defensive patch (addresses
  Emacs's Phase 3 non-blocking nit).

Co-authored-by: multica-agent <github@multica.ai>
2026-05-16 18:55:20 +08:00
Jiayuan Zhang
cd414a52ea feat(cli): multica issue terminal — attach via Phase 2 WS endpoint (MUL-2295)
Phase 3 of MUL-2295. Adds `multica issue terminal <issue-id>` which dials
the Phase 2 /ws/issues/{id}/terminal endpoint, performs first-frame auth
with the existing PAT/JWT, and runs an interactive PTY through the
daemon-side terminal manager from Phase 1. SIGWINCH on unix /
poll on windows pushes resize frames; ssh-style `<enter>~.` detaches.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-16 18:32:00 +08:00
Jiayuan Zhang
f675f03fbb fix(daemon/terminal): Phase 2 review — cleanup order, backpressure, origin (MUL-2295)
Address Phase 2 Round 1 review blockers + security item:

1. Fold daemonws teardown into a single cleanup defer so the order is:
   cancel heartbeat → wait hbDone → clearWSWrites → close(writes) →
   wait writer. The previous LIFO defer ordering let close(writes) run
   before the terminal bridge tore down, so an in-flight terminal pump
   could panic on send-to-closed-channel.

2. Replace silent drop of terminal.data on a saturated daemonws writer
   with real backpressure: pump uses sendWSFrameCtx (blocking with ctx
   escape) and bridge.closeAll now waits for every pump goroutine to
   exit before returning, giving the wakeup loop a hard barrier before
   close(writes).

3. terminalUpgrader now reuses realtime.CheckOrigin instead of
   CheckOrigin: true. The terminal endpoint executes shells; it must be
   at least as strict as the read-only realtime WS.

Tests:
- TestTerminalBridge_DataBackpressureNoSilentDrop pins that a full
  writes channel never loses bytes.
- TestTerminalBridge_TeardownDoesNotPanicOnInFlightSend mirrors the
  wakeup defer sequence (closeAll → close(writes)) and asserts no panic.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-16 18:05:40 +08:00
Jiayuan Zhang
0a97663acb feat(terminal): Phase 2 — daemonws routing + server proxy + Desktop xterm.js (MUL-2295)
Wires the Phase 1 terminal.Manager into the live daemonws transport so a
browser tab on the Multica Desktop app can attach to a PTY running in
the daemon's task workdir. End-to-end frame path:

  Desktop (xterm.js)  → /ws/issues/{id}/terminal (server proxy, cookie
                       or first-frame JWT auth, workspace membership
                       enforced before upgrade)
  → daemonws hub      (new SendToRuntime + TerminalRouter routing
                       terminal.* frames back to the proxy by
                       request_id, then re-keyed on session_id)
  → daemon            (new terminal_bridge.go owns one Manager per WS
                       connection, drains PtySession.Output() into
                       terminal.data frames, surfaces ExitC as
                       terminal.exit; closeAll on WS disconnect)
  → terminal.Manager.OpenWithInfo (new method) — server resolves
    task.work_dir/issue_id/prior_session_id from its DB and embeds
    them on the protocol; daemon trusts the server payload, no
    daemon-local task cache needed.

Auth + ACL:
- Browser proxy enforces workspace membership before the upgrade.
- TerminalOpenPayload carries the resolved task info; cross-workspace
  is structurally impossible at the bridge layer because both
  OpenParams.WorkspaceID and TaskInfo.WorkspaceID come from the same
  server-resolved field.
- terminal_bridge maps terminal.{Manager.OpenWithInfo} errors to the
  protocol error codes (workspace_mismatch / task_not_found /
  unsupported_os / spawn_failed / internal).

Resume:
- Server passes prior_session_id; daemon injects CLAUDE_SESSION_ID +
  MULTICA_{WORKSPACE,ISSUE,TASK,USER}_ID into the PTY env per the RFC.
  `claude --resume \$CLAUDE_SESSION_ID` continues the agent's session.

Lifecycle:
- Daemon installs a fresh terminalBridge per daemonws connection and
  tears every PtySession down on disconnect; session_ids minted on one
  WS cannot be reused on a reconnect because the server-side routing
  registration is also gone.
- daemonws client.send buffer raised from 16 to 256 and read limit
  from 4KB to 64KB so PTY traffic fits without evicting connections
  used for heartbeat / wakeup hints.

Desktop UI:
- packages/views/issues/components/terminal-panel.tsx renders an
  xterm.js console with FitAddon, base64 wire encoding, ResizeObserver
  → terminal.resize, reconnect button, and a clear web-only placeholder
  when window.desktopAPI is absent.
- TerminalPanelSection wrapper hangs collapsed in the issue-detail
  sidebar next to the execution log so bootstrap doesn't run for
  every issue view.

Tests:
- terminal/manager_test.go: OpenWithInfo happy path + cross-workspace
  reject (16 tests total, all -race clean).
- daemonws/terminal_test.go: TerminalRouter request_id → session_id
  re-keying and unknown-session drop.
- daemon/terminal_bridge_test.go: server-supplied workdir round-trips
  through OpenWithInfo, missing-workdir surfaces task_not_found and
  never spawns, data+exit round trip.
- GOOS=windows go test -c clean for both daemon and daemon/terminal.

Phase 1 / 2 / 3 / 4 mapping unchanged: Phase 3 (CLI) and Phase 4
(execenv GC hook + issue runs entry + audit log) are still untouched.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-16 17:45:53 +08:00
Jiayuan Zhang
953fdd5003 fix(daemon/terminal): close re-entry barrier + reap orphan PTY (MUL-2295)
- Manager.Close concurrent re-entry now blocks late callers on closeDone
  so every Close() return shares the "manager drained" guarantee.
- Open cleanup path on lost race with Close calls pty.Wait() to reap the
  child synchronously (waitLoop never runs there).
- Tests: concurrent Close callers all observe drained state; Open cleanup
  invokes pty.Wait at least once.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-16 17:10:22 +08:00
Jiayuan Zhang
e70f44b92b fix(daemon/terminal): lock Done()/Manager.Close finalize order (MUL-2295)
Round 2 review fixes:

1. PtySession finalize sequence is now
   ExitC -> close(output) -> onClose/deregister -> close(done)
   so external waiters (bridge / GC hook / audit) can `<-Done()` and
   immediately query the manager without a race window.

2. Manager.Close now waits for each session's Done() (not just Close())
   so by the time it returns the registry is empty and every session
   is fully finalized.

Adds TestSession_DoneFiresAfterDeregister (locks the ordering contract)
and TestManager_CloseWaitsForSessionFinalize (fakePTY.Wait delay proves
Manager.Close blocks through finalize).

Co-authored-by: multica-agent <github@multica.ai>
2026-05-16 17:00:02 +08:00
Jiayuan Zhang
281f1073b5 fix(daemon/terminal): address Phase 1 review feedback (MUL-2295)
Wires in the four fixes Emacs flagged on the Phase 1 review:

1. Lifecycle: split stop/done with a WaitGroup. readLoop and idleLoop
   exit via <-stop; waitLoop is the finalizer that waits on the WG
   before closing output/done. Eliminates the "send on closed channel"
   race when the output buffer is saturated. Adds a regression test
   that fills output, calls Close, and verifies Done converges + ExitC
   fires before output closes (the doc contract).

2. Errors: Manager.Open wraps spawner errors with double-%w so
   errors.Is matches both ErrSpawnFailed and ErrUnsupportedOS. Adds a
   test with a fake spawner that returns ErrUnsupportedOS.

3. Close path on unix: SIGHUP to the process group, 250ms grace,
   SIGKILL, then close fd — comment now matches behavior. Skips the
   signal+sleep work entirely when the child already exited naturally.
   Manager.Close fans out per-session Close in parallel so the grace
   period doesn't multiply by session count.

4. IdleTimeout semantics: removes the NewManager default that
   silently rewrote 0 to 60min. Zero/negative now disables, per the
   doc comment. Added DefaultIdleTimeout for daemon wiring to opt in
   explicitly.

Verified: go test, go test -race, GOOS=windows go test -c.
Co-authored-by: multica-agent <github@multica.ai>
2026-05-16 16:48:11 +08:00
Jiayuan Zhang
6758feba05 feat(daemon): add terminal Manager + PTY session (Phase 1, MUL-2295)
Daemon-side foundation for the Issue → Terminal feature. Manager owns
the lifecycle of all live PtySessions; sessions spawn a shell on a real
PTY via creack/pty (unix-only — Windows returns ErrUnsupportedOS until
ConPty support lands).

Open enforces the cross-workspace ACL — a client acting in workspace A
cannot attach to a task that belongs to workspace B. Each session
injects CLAUDE_SESSION_ID + MULTICA_{WORKSPACE,ISSUE,TASK,USER}_ID into
the child env so `claude --resume $CLAUDE_SESSION_ID` continues the
same session the agent run was using.

Adds the terminal.* WebSocket message types to server/pkg/protocol so
Phase 2 (daemonws routing) and Phase 3 (CLI) can land without touching
the manager.

Tests cover open, data round-trip, resize, explicit close, idle timeout
sweep, manager shutdown, cross-workspace rejection, and unknown task.
A fake Spawner backed by channels lets tests exercise lifecycle without
forking a real shell.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-16 16:32:39 +08:00
42 changed files with 6299 additions and 17 deletions

View File

@@ -53,6 +53,7 @@ import { ResolvedThreadBar } from "./resolved-thread-bar";
import { collectThreadReplies } from "./thread-utils";
import { AgentLiveCard } from "./agent-live-card";
import { ExecutionLogSection } from "./execution-log-section";
import { TerminalPanel } from "./terminal-panel";
import { PullRequestList } from "./pull-request-list";
import { useQuery } from "@tanstack/react-query";
import { useAuthStore } from "@multica/core/auth";
@@ -1366,6 +1367,12 @@ export function IssueDetail({ issueId, onDelete, onDone, defaultSidebarOpen = tr
when there are no runs to show. */}
<ExecutionLogSection issueId={id} />
{/* Terminal panel — attaches to the PTY running in the daemon's
workdir for the latest agent task. Desktop-only (the panel
itself renders an explanatory placeholder on web).
See MUL-2295. */}
<TerminalPanelSection issueId={id} workspaceId={wsId} />
{/* Token usage */}
{usage && usage.task_count > 0 && (
<div>
@@ -1904,3 +1911,23 @@ export function IssueDetail({ issueId, onDelete, onDone, defaultSidebarOpen = tr
</ResizablePanelGroup>
);
}
// TerminalPanelSection wraps TerminalPanel in a collapsible header that
// matches the existing sidebar sections (Token usage, etc.). Collapsed
// by default — opening a PTY is an explicit action, and ResizeObserver +
// xterm bootstrap should not run for every issue view.
function TerminalPanelSection({ issueId, workspaceId }: { issueId: string; workspaceId: string }) {
const [open, setOpen] = useState(false);
return (
<div className="mt-6">
<button
className={`flex w-full items-center gap-1 rounded-md px-2 py-1 text-xs font-medium transition-colors mb-2 hover:bg-accent/70 ${open ? "" : "text-muted-foreground hover:text-foreground"}`}
onClick={() => setOpen((v) => !v)}
>
Terminal
<ChevronRight className={`!size-3 shrink-0 stroke-[2.5] text-muted-foreground transition-transform ${open ? "rotate-90" : ""}`} />
</button>
{open && <TerminalPanel issueId={issueId} workspaceId={workspaceId} />}
</div>
);
}

View File

@@ -0,0 +1,342 @@
"use client";
import { useEffect, useMemo, useRef, useState } from "react";
import { Terminal as XTerminal } from "@xterm/xterm";
import { FitAddon } from "@xterm/addon-fit";
import { getApi } from "@multica/core/api";
import { Button } from "@multica/ui/components/ui/button";
import "@xterm/xterm/css/xterm.css";
// Protocol message types — kept in lockstep with
// server/pkg/protocol/messages.go. Strings are stable across daemon /
// server / browser, so duplicating them client-side is OK; if we ever
// regenerate types from Go we can swap these out.
const MSG_TERMINAL_DATA = "terminal.data";
const MSG_TERMINAL_RESIZE = "terminal.resize";
const MSG_TERMINAL_CLOSE = "terminal.close";
const MSG_TERMINAL_OPENED = "terminal.opened";
const MSG_TERMINAL_EXIT = "terminal.exit";
const MSG_TERMINAL_ERROR = "terminal.error";
interface Envelope {
type: string;
payload: unknown;
}
interface OpenedPayload {
request_id: string;
session_id: string;
work_dir: string;
shell: string;
}
interface DataPayload {
session_id: string;
data_b64: string;
}
interface ExitPayload {
session_id: string;
exit_code: number;
reason?: string;
}
interface ErrorPayload {
request_id?: string;
session_id?: string;
code: string;
message: string;
}
// Detect Electron — server-side render guard plus the desktop preload
// surface check. Mirrors the pattern used elsewhere in the desktop app;
// the Terminal panel is intentionally desktop-only because the daemon
// only runs on a developer machine.
function isDesktopRuntime(): boolean {
return typeof window !== "undefined" && "desktopAPI" in window;
}
interface TerminalPanelProps {
issueId: string;
workspaceId: string;
}
export function TerminalPanel({ issueId, workspaceId }: TerminalPanelProps) {
const containerRef = useRef<HTMLDivElement | null>(null);
const termRef = useRef<XTerminal | null>(null);
const fitRef = useRef<FitAddon | null>(null);
const wsRef = useRef<WebSocket | null>(null);
const sessionIdRef = useRef<string>("");
const [status, setStatus] = useState<
"idle" | "connecting" | "connected" | "closed" | "error"
>("idle");
const [errorMessage, setErrorMessage] = useState<string>("");
const [reconnectKey, setReconnectKey] = useState(0);
const wsUrl = useMemo(() => deriveTerminalWsUrl(issueId, workspaceId), [
issueId,
workspaceId,
]);
useEffect(() => {
if (!isDesktopRuntime()) return;
if (!containerRef.current) return;
const term = new XTerminal({
convertEol: true,
cursorBlink: true,
fontFamily:
"ui-monospace, SFMono-Regular, Menlo, Monaco, 'Cascadia Mono', 'Roboto Mono', 'Courier New', monospace",
fontSize: 13,
theme: { background: "#0b0b0b", foreground: "#e6e6e6" },
// Scrollback large enough to read a verbose `cargo build` or `git
// log` without auto-clipping the top.
scrollback: 5000,
});
const fit = new FitAddon();
term.loadAddon(fit);
term.open(containerRef.current);
fit.fit();
termRef.current = term;
fitRef.current = fit;
term.writeln("\x1b[90mconnecting to daemon…\x1b[0m");
setStatus("connecting");
const ws = new WebSocket(wsUrl);
wsRef.current = ws;
ws.onopen = () => {
// Cookie auth carries the session by default. If we ever flip to
// token-mode (no cookie), this is where we'd send an `auth` frame
// mirroring realtime/ws-client.ts. Server falls back gracefully.
setStatus("connected");
};
ws.onerror = () => {
// The browser only surfaces a generic Event; the server sends a
// structured terminal.error frame which we already render below.
// Keep this minimal so we don't double-up the error UI.
setStatus("error");
};
ws.onclose = (ev) => {
setStatus("closed");
term.writeln(
`\r\n\x1b[90mconnection closed (code=${ev.code})${
ev.reason ? ` reason=${ev.reason}` : ""
}\x1b[0m`,
);
};
ws.onmessage = (ev) => {
let env: Envelope;
try {
env = JSON.parse(typeof ev.data === "string" ? ev.data : "");
} catch {
return;
}
switch (env.type) {
case MSG_TERMINAL_OPENED: {
const p = env.payload as OpenedPayload;
sessionIdRef.current = p.session_id;
term.writeln(
`\x1b[90mattached to ${p.shell} (cwd: ${p.work_dir})\x1b[0m`,
);
// Send an initial resize matching the terminal's actual size,
// because the server-side open uses default 80x24 until we tell
// it otherwise.
const cols = term.cols;
const rows = term.rows;
ws.send(
JSON.stringify({
type: MSG_TERMINAL_RESIZE,
payload: {
session_id: p.session_id,
cols,
rows,
},
}),
);
break;
}
case MSG_TERMINAL_DATA: {
const p = env.payload as DataPayload;
if (typeof p.data_b64 !== "string") break;
const decoded = atobToUint8(p.data_b64);
// xterm.js accepts Uint8Array; we avoid the latin1 round-trip
// that would otherwise mangle UTF-8 PTY output.
term.write(decoded);
break;
}
case MSG_TERMINAL_EXIT: {
const p = env.payload as ExitPayload;
term.writeln(
`\r\n\x1b[90mprocess exited (code=${p.exit_code}${
p.reason ? `, reason=${p.reason}` : ""
})\x1b[0m`,
);
ws.close();
break;
}
case MSG_TERMINAL_ERROR: {
const p = env.payload as ErrorPayload;
setErrorMessage(`${p.code}: ${p.message}`);
term.writeln(`\r\n\x1b[31m${p.code}: ${p.message}\x1b[0m`);
break;
}
}
};
// Forward keystrokes as terminal.data with base64 of the UTF-8 bytes.
const dataSub = term.onData((data) => {
if (ws.readyState !== WebSocket.OPEN) return;
if (!sessionIdRef.current) return;
ws.send(
JSON.stringify({
type: MSG_TERMINAL_DATA,
payload: {
session_id: sessionIdRef.current,
data_b64: utf8ToBase64(data),
},
}),
);
});
const resizeSub = term.onResize(({ cols, rows }) => {
if (ws.readyState !== WebSocket.OPEN) return;
if (!sessionIdRef.current) return;
ws.send(
JSON.stringify({
type: MSG_TERMINAL_RESIZE,
payload: {
session_id: sessionIdRef.current,
cols,
rows,
},
}),
);
});
// Observe container size and re-fit so the PTY size tracks the panel
// (the right sidebar can be resized at runtime).
const ro = new ResizeObserver(() => {
try {
fit.fit();
} catch {
// fit() throws when the container has zero height during teardown;
// ignore — the next mount will rebind.
}
});
ro.observe(containerRef.current);
return () => {
dataSub.dispose();
resizeSub.dispose();
ro.disconnect();
try {
if (sessionIdRef.current && ws.readyState === WebSocket.OPEN) {
ws.send(
JSON.stringify({
type: MSG_TERMINAL_CLOSE,
payload: { session_id: sessionIdRef.current, reason: "panel_unmount" },
}),
);
}
} catch {
// ws may be already closing; nothing to do.
}
ws.close();
term.dispose();
termRef.current = null;
fitRef.current = null;
wsRef.current = null;
sessionIdRef.current = "";
};
}, [wsUrl, reconnectKey]);
if (!isDesktopRuntime()) {
return (
<div className="rounded-md border border-dashed p-4 text-sm text-muted-foreground">
The terminal is only available in the Multica Desktop app. It attaches
to the PTY hosted by the local daemon that ran the agent task.
</div>
);
}
return (
<div className="flex flex-col gap-2">
<div className="flex items-center justify-between text-xs text-muted-foreground">
<span>
Status: <span className="font-medium">{status}</span>
{errorMessage ? (
<span className="ml-2 text-destructive"> {errorMessage}</span>
) : null}
</span>
<Button
variant="ghost"
size="sm"
onClick={() => {
setErrorMessage("");
setReconnectKey((n) => n + 1);
}}
>
Reconnect
</Button>
</div>
<div
ref={containerRef}
className="h-[360px] w-full overflow-hidden rounded-md border bg-black"
/>
</div>
);
}
function deriveTerminalWsUrl(issueId: string, workspaceId: string): string {
// The API client knows the http(s) base URL; flip the scheme to ws(s)
// and target the proxy endpoint registered in router.go. Falls back to
// the page origin if for some reason the API base is empty (dev
// environments where the API lives on the same host).
let base = "";
try {
base = getApi().getBaseUrl();
} catch {
base = "";
}
if (!base && typeof window !== "undefined") {
base = window.location.origin;
}
const url = new URL(base);
if (url.protocol === "https:") {
url.protocol = "wss:";
} else if (url.protocol === "http:") {
url.protocol = "ws:";
}
url.pathname = url.pathname.replace(/\/$/, "") +
`/ws/issues/${encodeURIComponent(issueId)}/terminal`;
url.search = `?workspace_id=${encodeURIComponent(workspaceId)}&cols=120&rows=30`;
return url.toString();
}
function utf8ToBase64(s: string): string {
if (typeof TextEncoder !== "undefined") {
const bytes = new TextEncoder().encode(s);
let bin = "";
bytes.forEach((b) => {
bin += String.fromCharCode(b);
});
return btoa(bin);
}
// Fallback for old runtimes: assume latin1.
return btoa(s);
}
function atobToUint8(s: string): Uint8Array {
const bin = atob(s);
const out = new Uint8Array(bin.length);
for (let i = 0; i < bin.length; i++) {
out[i] = bin.charCodeAt(i);
}
return out;
}

View File

@@ -76,6 +76,8 @@
"@tiptap/react": "^3.22.1",
"@tiptap/starter-kit": "^3.22.1",
"@tiptap/suggestion": "^3.22.1",
"@xterm/xterm": "catalog:",
"@xterm/addon-fit": "catalog:",
"cmdk": "^1.1.1",
"hast-util-to-html": "^4.0.1",
"katex": "catalog:",

26
pnpm-lock.yaml generated
View File

@@ -33,6 +33,12 @@ catalogs:
'@vitejs/plugin-react':
specifier: ^6.0.1
version: 6.0.1
'@xterm/addon-fit':
specifier: ^0.10.0
version: 0.10.0
'@xterm/xterm':
specifier: ^5.5.0
version: 5.5.0
class-variance-authority:
specifier: ^0.7.1
version: 0.7.1
@@ -775,6 +781,12 @@ importers:
'@tiptap/suggestion':
specifier: ^3.22.1
version: 3.22.1(@tiptap/core@3.22.1(@tiptap/pm@3.22.1))(@tiptap/pm@3.22.1)
'@xterm/addon-fit':
specifier: 'catalog:'
version: 0.10.0(@xterm/xterm@5.5.0)
'@xterm/xterm':
specifier: 'catalog:'
version: 5.5.0
cmdk:
specifier: ^1.1.1
version: 1.1.1(@types/react-dom@19.2.3(@types/react@19.2.14))(@types/react@19.2.14)(react-dom@19.2.3(react@19.2.3))(react@19.2.3)
@@ -3318,6 +3330,14 @@ packages:
resolution: {integrity: sha512-9k/gHF6n/pAi/9tqr3m3aqkuiNosYTurLLUtc7xQ9sxB/wm7WPygCv8GYa6mS0fLJEHhqMC1ATYhz++U/lRHqg==}
engines: {node: '>=10.0.0'}
'@xterm/addon-fit@0.10.0':
resolution: {integrity: sha512-UFYkDm4HUahf2lnEyHvio51TNGiLK66mqP2JoATy7hRZeXaGMRDr00JiSF7m63vR5WKATF605yEggJKsw0JpMQ==}
peerDependencies:
'@xterm/xterm': ^5.0.0
'@xterm/xterm@5.5.0':
resolution: {integrity: sha512-hqJHYaQb5OptNunnyAnkHyM8aCjZ1MEIDTQu1iIbbTD/xops91NB5yq1ZK/dC2JDbVWtF23zUtl9JE2NqwT87A==}
abbrev@3.0.1:
resolution: {integrity: sha512-AO2ac6pjRB3SJmGJo+v5/aK6Omggp6fsLrs6wN9bd35ulu4cCwaAU9+7ZhXjeqHVkaHThLuzH0nZr0YpCDhygg==}
engines: {node: ^18.17.0 || >=20.5.0}
@@ -10103,6 +10123,12 @@ snapshots:
'@xmldom/xmldom@0.8.12': {}
'@xterm/addon-fit@0.10.0(@xterm/xterm@5.5.0)':
dependencies:
'@xterm/xterm': 5.5.0
'@xterm/xterm@5.5.0': {}
abbrev@3.0.1: {}
accepts@2.0.0:

View File

@@ -49,6 +49,10 @@ catalog:
# Virtualized timeline (issue detail comments)
react-virtuoso: "^4.14.0"
# Interactive PTY rendering (Issue → Terminal panel on Desktop, MUL-2295)
"@xterm/xterm": "^5.5.0"
"@xterm/addon-fit": "^0.10.0"
# Product analytics
posthog-js: "^1.176.1"

View File

@@ -9,6 +9,7 @@ import (
"net/http"
"net/url"
"os"
"sort"
"strings"
"time"
"unicode/utf8"
@@ -1066,6 +1067,30 @@ func runIssueRuns(cmd *cobra.Command, args []string) error {
return fmt.Errorf("list runs: %w", err)
}
// Merge in interactive terminal sessions so `issue runs` is the single
// surface for "what has happened in this workdir" — agent runs and the
// `type=terminal` audit rows side by side, per RFC §"与现有 agent run
// sandbox 的关系". The endpoint is new in MUL-2295 Phase 4; older
// servers return 404, which we treat as "no terminal feature" rather
// than a hard error so the CLI keeps working against pre-feature
// servers. Any non-404 failure (500, auth, network) is surfaced to
// stderr so a real regression doesn't silently downgrade to "no
// terminal rows" — see Phase 4 review feedback.
var terminals []map[string]any
if err := client.GetJSON(ctx, "/api/issues/"+issueRef.ID+"/terminal-sessions", &terminals); err != nil {
var httpErr *cli.HTTPError
if !errors.As(err, &httpErr) || httpErr.StatusCode != http.StatusNotFound {
fmt.Fprintf(os.Stderr, "warning: list terminal sessions: %v\n", err)
}
} else {
runs = append(runs, terminals...)
}
// Sort merged list newest-first by started_at so the table renders in
// the same order whether or not terminal rows are present.
sort.SliceStable(runs, func(i, j int) bool {
return strVal(runs[i], "started_at") > strVal(runs[j], "started_at")
})
output, _ := cmd.Flags().GetString("output")
if output == "json" {
return cli.PrintJSON(os.Stdout, runs)
@@ -1081,6 +1106,9 @@ func runIssueRuns(cmd *cobra.Command, args []string) error {
started = started[:16]
}
completed := strVal(r, "completed_at")
if completed == "" {
completed = strVal(r, "ended_at")
}
if len(completed) >= 16 {
completed = completed[:16]
}
@@ -1089,9 +1117,20 @@ func runIssueRuns(cmd *cobra.Command, args []string) error {
runes := []rune(errMsg)
errMsg = string(runes[:47]) + "..."
}
// Terminal rows have kind="terminal" and no agent_id. Surface them
// with a synthetic agent column ("terminal") so the user can tell
// the two row kinds apart at a glance without us adding a new
// column (keeps the table width stable on narrow terminals).
agent := actors.agent(strVal(r, "agent_id"))
if strVal(r, "kind") == "terminal" {
agent = "terminal"
if errMsg == "" {
errMsg = strVal(r, "close_reason")
}
}
rows = append(rows, []string{
displayID(strVal(r, "id"), fullID),
actors.agent(strVal(r, "agent_id")),
agent,
strVal(r, "status"),
started,
completed,

View File

@@ -0,0 +1,563 @@
package main
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/url"
"os"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
"github.com/spf13/cobra"
"golang.org/x/term"
"github.com/multica-ai/multica/server/pkg/protocol"
)
var issueTerminalCmd = &cobra.Command{
Use: "terminal <issue-id>",
Short: "Attach to the issue's most recent agent task PTY",
Long: "Open an interactive shell inside the workdir of the issue's most recent agent task. " +
"Reuses the daemon-side PTY manager added in MUL-2295 — the daemon spawns a bash login " +
"shell with CLAUDE_SESSION_ID + MULTICA_{WORKSPACE,ISSUE,TASK,USER}_ID injected so you " +
"can immediately `claude --resume $CLAUDE_SESSION_ID`.\n\n" +
"Detach without closing your shell: type `<enter>~.` (escape sequence). The daemon-side " +
"session is currently torn down on disconnect — see RFC follow-up for `--attach`.",
Args: exactArgs(1),
RunE: runIssueTerminal,
}
const (
terminalDefaultCols = 80
terminalDefaultRows = 24
terminalAuthAckTimeout = 10 * time.Second
terminalOpenAckTimeout = 15 * time.Second
terminalServerWriteWait = 10 * time.Second
terminalServerReadLimit = 1 << 20 // 1 MiB per frame; matches realistic xterm bursts
terminalDetachExitMessage = "[multica] detached — daemon session was torn down"
)
func init() {
issueCmd.AddCommand(issueTerminalCmd)
issueTerminalCmd.Flags().Uint16("cols", 0, "Initial terminal columns (defaults to detected size, or 80 if stdout is not a TTY)")
issueTerminalCmd.Flags().Uint16("rows", 0, "Initial terminal rows (defaults to detected size, or 24 if stdout is not a TTY)")
issueTerminalCmd.Flags().String("escape-char", "~", "Escape character for detach sequence (`<enter><esc>.` to detach). Empty disables escape detection.")
issueTerminalCmd.Flags().Bool("no-raw", false, "Don't put the local TTY into raw mode (mostly for testing / piped input)")
}
func runIssueTerminal(cmd *cobra.Command, args []string) error {
client, err := newAPIClient(cmd)
if err != nil {
return err
}
if _, err := requireWorkspaceID(cmd); err != nil {
return err
}
token := resolveToken(cmd)
if token == "" {
return fmt.Errorf("not authenticated: run 'multica login'")
}
resolveCtx, cancelResolve := context.WithTimeout(cmd.Context(), 15*time.Second)
defer cancelResolve()
issueRef, err := resolveIssueRef(resolveCtx, client, args[0])
if err != nil {
return fmt.Errorf("resolve issue: %w", err)
}
// Detect terminal size from stdout (the surface the user actually sees);
// fall back to defaults if stdout is piped. Flag overrides win.
cols, rows := detectInitialSize(cmd)
pathAndQuery := buildTerminalPathAndQuery(issueRef.ID, client.WorkspaceID, cols, rows)
// Use a long-lived context for the WS connection; cancellation is driven
// by the proxy goroutines + signals rather than a timeout.
conn, _, err := client.DialWebSocket(cmd.Context(), pathAndQuery)
if err != nil {
return fmt.Errorf("dial terminal websocket: %w", err)
}
proxy := newCLITerminalProxy(conn, os.Stdin, os.Stdout, os.Stderr, token, cmd)
return proxy.run(cmd.Context(), cols, rows)
}
func detectInitialSize(cmd *cobra.Command) (uint16, uint16) {
cols, _ := cmd.Flags().GetUint16("cols")
rows, _ := cmd.Flags().GetUint16("rows")
if cols > 0 && rows > 0 {
return cols, rows
}
if c, r, err := term.GetSize(int(os.Stdout.Fd())); err == nil && c > 0 && r > 0 {
if cols == 0 {
cols = uint16(c)
}
if rows == 0 {
rows = uint16(r)
}
}
if cols == 0 {
cols = terminalDefaultCols
}
if rows == 0 {
rows = terminalDefaultRows
}
return cols, rows
}
func buildTerminalPathAndQuery(issueID, workspaceID string, cols, rows uint16) string {
q := url.Values{}
q.Set("workspace_id", workspaceID)
q.Set("cols", strconv.FormatUint(uint64(cols), 10))
q.Set("rows", strconv.FormatUint(uint64(rows), 10))
return "/ws/issues/" + url.PathEscape(issueID) + "/terminal?" + q.Encode()
}
// cliTerminalProxy mirrors the server-side terminalProxy: one goroutine
// owns conn writes, one owns conn reads, plus a stdin reader and resize
// watcher. The struct is the only owner of the websocket.Conn; all writes
// go through writeFrame() to keep a single point that holds writeMu.
type cliTerminalProxy struct {
conn *websocket.Conn
stdin io.Reader
stdout io.Writer
stderr io.Writer
token string
cmd *cobra.Command
writeMu sync.Mutex
sessionMu sync.RWMutex
sessionID string
closeOnce sync.Once
doneCh chan struct{}
// exit reporting from the read pump back to the orchestrator.
exitCode atomic.Int32 // 0 = unset, see exitCodeUnset / >=1
exitMsg atomic.Pointer[string]
escapeChar byte
noRaw bool
}
const exitCodeUnset int32 = -1
func newCLITerminalProxy(conn *websocket.Conn, stdin io.Reader, stdout, stderr io.Writer, token string, cmd *cobra.Command) *cliTerminalProxy {
escape, _ := cmd.Flags().GetString("escape-char")
noRaw, _ := cmd.Flags().GetBool("no-raw")
var ec byte
if len(escape) >= 1 {
ec = escape[0]
}
p := &cliTerminalProxy{
conn: conn,
stdin: stdin,
stdout: stdout,
stderr: stderr,
token: token,
cmd: cmd,
doneCh: make(chan struct{}),
escapeChar: ec,
noRaw: noRaw,
}
p.exitCode.Store(exitCodeUnset)
conn.SetReadLimit(terminalServerReadLimit)
return p
}
func (p *cliTerminalProxy) run(ctx context.Context, cols, rows uint16) error {
defer p.conn.Close()
if err := p.handshake(); err != nil {
return err
}
// Push our local size right after open in case the server's hardcoded
// initial 80x24 didn't match. (Phase 2 server stamps 80x24 on the
// daemon-bound terminal.open frame regardless of query string; sending
// resize immediately makes the PTY render correctly.)
if err := p.sendResize(cols, rows); err != nil {
// non-fatal — daemon will just keep the original size
fmt.Fprintf(p.stderr, "[multica] warning: initial resize failed: %v\n", err)
}
rawTTY := !p.noRaw && term.IsTerminal(int(os.Stdin.Fd()))
var restore func() error
if rawTTY {
oldState, err := term.MakeRaw(int(os.Stdin.Fd()))
if err != nil {
return fmt.Errorf("enter raw mode: %w", err)
}
fd := int(os.Stdin.Fd())
restore = func() error { return term.Restore(fd, oldState) }
defer restore()
}
stopResize := startResizeWatcher(p)
defer stopResize()
go p.readPump()
go p.stdinPump(rawTTY)
select {
case <-p.doneCh:
case <-ctx.Done():
p.shutdown()
}
if restore != nil {
_ = restore()
}
if msgPtr := p.exitMsg.Load(); msgPtr != nil && *msgPtr != "" {
fmt.Fprintln(p.stderr, *msgPtr)
}
if code := p.exitCode.Load(); code > 0 {
os.Exit(int(code))
}
return nil
}
// handshake performs first-frame auth and waits for terminal.opened.
func (p *cliTerminalProxy) handshake() error {
authFrame, err := json.Marshal(struct {
Type string `json:"type"`
Payload map[string]any `json:"payload"`
}{
Type: "auth",
Payload: map[string]any{"token": p.token},
})
if err != nil {
return fmt.Errorf("marshal auth frame: %w", err)
}
if err := p.writeRawFrame(authFrame); err != nil {
return fmt.Errorf("send auth frame: %w", err)
}
deadline := time.Now().Add(terminalAuthAckTimeout)
if err := p.conn.SetReadDeadline(deadline); err != nil {
return fmt.Errorf("set auth read deadline: %w", err)
}
for {
_, raw, err := p.conn.ReadMessage()
if err != nil {
return fmt.Errorf("read auth response: %w", err)
}
var preview struct {
Type string `json:"type"`
Error string `json:"error"`
}
if err := json.Unmarshal(raw, &preview); err == nil {
if preview.Error != "" {
return fmt.Errorf("auth rejected: %s", preview.Error)
}
if preview.Type == "auth_ack" {
break
}
}
// Tolerate stray frames during handshake (none expected in current
// server implementation, but don't lock up if that changes).
}
// After auth_ack the server proxies a terminal.open to the daemon and
// waits for terminal.opened or terminal.error. Block until we see one.
openDeadline := time.Now().Add(terminalOpenAckTimeout)
if err := p.conn.SetReadDeadline(openDeadline); err != nil {
return fmt.Errorf("set open read deadline: %w", err)
}
for {
_, raw, err := p.conn.ReadMessage()
if err != nil {
return fmt.Errorf("waiting for terminal.opened: %w", err)
}
var env protocol.Message
if err := json.Unmarshal(raw, &env); err != nil {
continue
}
switch env.Type {
case protocol.MessageTypeTerminalOpened:
var op protocol.TerminalOpenedPayload
if err := json.Unmarshal(env.Payload, &op); err != nil {
return fmt.Errorf("decode terminal.opened: %w", err)
}
if op.SessionID == "" {
return fmt.Errorf("daemon returned empty session_id in terminal.opened")
}
p.setSessionID(op.SessionID)
workDir := op.WorkDir
if workDir == "" {
workDir = "(unknown)"
}
fmt.Fprintf(p.stderr, "[multica] attached to %s — escape: %s.\r\n", workDir, escapeHelpString(p.escapeChar))
// Restore non-blocking reads for the pumps.
if err := p.conn.SetReadDeadline(time.Time{}); err != nil {
return fmt.Errorf("clear read deadline: %w", err)
}
return nil
case protocol.MessageTypeTerminalError:
var ep protocol.TerminalErrorPayload
if err := json.Unmarshal(env.Payload, &ep); err != nil {
return fmt.Errorf("daemon returned terminal.error (undecodable)")
}
return fmt.Errorf("daemon rejected terminal.open: %s (%s)", ep.Message, ep.Code)
default:
// keep waiting
}
}
}
func escapeHelpString(b byte) string {
if b == 0 {
return "(disabled)"
}
return "<enter>" + string(b) + "."
}
func (p *cliTerminalProxy) readPump() {
defer p.shutdown()
for {
_, raw, err := p.conn.ReadMessage()
if err != nil {
if !isClosedConnError(err) {
msg := fmt.Sprintf("[multica] websocket closed: %v", err)
p.exitMsg.CompareAndSwap(nil, &msg)
}
return
}
var env protocol.Message
if err := json.Unmarshal(raw, &env); err != nil {
continue
}
switch env.Type {
case protocol.MessageTypeTerminalData:
var pl protocol.TerminalDataPayload
if err := json.Unmarshal(env.Payload, &pl); err != nil {
continue
}
data, err := base64.StdEncoding.DecodeString(pl.DataB64)
if err != nil {
continue
}
_, _ = p.stdout.Write(data)
case protocol.MessageTypeTerminalExit:
var pl protocol.TerminalExitPayload
if err := json.Unmarshal(env.Payload, &pl); err != nil {
continue
}
reason := pl.Reason
if reason == "" {
reason = "child exited"
}
msg := fmt.Sprintf("\r\n[multica] %s (exit code %d)", reason, pl.ExitCode)
p.exitMsg.CompareAndSwap(nil, &msg)
if pl.ExitCode > 0 {
p.exitCode.Store(int32(pl.ExitCode))
}
return
case protocol.MessageTypeTerminalError:
var pl protocol.TerminalErrorPayload
if err := json.Unmarshal(env.Payload, &pl); err != nil {
continue
}
msg := fmt.Sprintf("\r\n[multica] error: %s (%s)", pl.Message, pl.Code)
p.exitMsg.CompareAndSwap(nil, &msg)
p.exitCode.Store(1)
return
case protocol.MessageTypeTerminalClose:
return
}
}
}
// stdinPump reads stdin, runs it through the escape-sequence state machine,
// and forwards bytes as terminal.data frames. Detach (~.) closes the WS
// without sending the bytes.
func (p *cliTerminalProxy) stdinPump(rawTTY bool) {
defer p.shutdown()
buf := make([]byte, 4096)
// Start in newline state so the very first character can trigger an
// escape sequence; mirrors ssh's behavior.
state := newlineState{atNewline: true}
for {
n, err := p.stdin.Read(buf)
if n > 0 {
toSend, detach := state.process(buf[:n], p.escapeChar)
if len(toSend) > 0 {
if err := p.sendData(toSend); err != nil {
return
}
}
if detach {
msg := terminalDetachExitMessage
p.exitMsg.CompareAndSwap(nil, &msg)
_ = p.sendCloseBestEffort("client_detach")
return
}
}
if err != nil {
if !errors.Is(err, io.EOF) {
msg := fmt.Sprintf("[multica] stdin error: %v", err)
p.exitMsg.CompareAndSwap(nil, &msg)
}
return
}
}
}
func (p *cliTerminalProxy) sendData(data []byte) error {
sid := p.SessionID()
if sid == "" {
return errors.New("session_id not set")
}
frame, err := marshalCLITerminalFrame(protocol.MessageTypeTerminalData, protocol.TerminalDataPayload{
SessionID: sid,
DataB64: base64.StdEncoding.EncodeToString(data),
})
if err != nil {
return err
}
return p.writeRawFrame(frame)
}
func (p *cliTerminalProxy) sendResize(cols, rows uint16) error {
sid := p.SessionID()
if sid == "" {
// Pre-handshake resize is sent later by run() once session is known.
return nil
}
frame, err := marshalCLITerminalFrame(protocol.MessageTypeTerminalResize, protocol.TerminalResizePayload{
SessionID: sid,
Cols: cols,
Rows: rows,
})
if err != nil {
return err
}
return p.writeRawFrame(frame)
}
func (p *cliTerminalProxy) sendCloseBestEffort(reason string) error {
sid := p.SessionID()
if sid == "" {
return nil
}
frame, err := marshalCLITerminalFrame(protocol.MessageTypeTerminalClose, protocol.TerminalClosePayload{
SessionID: sid,
Reason: reason,
})
if err != nil {
return err
}
return p.writeRawFrame(frame)
}
func (p *cliTerminalProxy) writeRawFrame(frame []byte) error {
p.writeMu.Lock()
defer p.writeMu.Unlock()
if err := p.conn.SetWriteDeadline(time.Now().Add(terminalServerWriteWait)); err != nil {
return err
}
return p.conn.WriteMessage(websocket.TextMessage, frame)
}
func (p *cliTerminalProxy) SessionID() string {
p.sessionMu.RLock()
defer p.sessionMu.RUnlock()
return p.sessionID
}
func (p *cliTerminalProxy) setSessionID(sid string) {
p.sessionMu.Lock()
defer p.sessionMu.Unlock()
p.sessionID = sid
}
func (p *cliTerminalProxy) shutdown() {
p.closeOnce.Do(func() {
close(p.doneCh)
_ = p.conn.Close()
})
}
func marshalCLITerminalFrame(msgType string, payload any) ([]byte, error) {
raw, err := json.Marshal(payload)
if err != nil {
return nil, err
}
return json.Marshal(protocol.Message{Type: msgType, Payload: raw})
}
func isClosedConnError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, io.EOF) {
return true
}
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
return true
}
return false
}
// --- escape sequence state machine -----------------------------------------
//
// Mirrors ssh(1)'s `~.` detach: after a newline, a single escape character
// followed by `.` detaches; `~~` emits a literal escape; `~?` prints help;
// any other byte aborts the escape and forwards both bytes.
type newlineState struct {
atNewline bool
gotEscape bool
}
// process consumes a chunk of stdin bytes. Returns the bytes that should
// actually be forwarded to the daemon and whether the user requested detach.
// The state machine mutates the receiver across calls so multi-byte chunks
// straddling escape boundaries (rare, but possible with paste) work.
func (s *newlineState) process(in []byte, escape byte) (out []byte, detach bool) {
if escape == 0 {
// Escape detection disabled — pass through.
return in, false
}
out = make([]byte, 0, len(in))
for _, b := range in {
switch {
case s.gotEscape:
s.gotEscape = false
switch b {
case '.':
return out, true
case escape:
out = append(out, escape)
s.atNewline = false
case '?':
// Help is a local-only signal — not delivered to PTY.
// Caller can detect by … actually keep it simple: just
// emit a CR for visual feedback so the prompt redraws.
out = append(out, '\r')
s.atNewline = true
default:
// Not a recognized escape: forward ESC then this byte.
out = append(out, escape, b)
s.atNewline = b == '\r' || b == '\n'
}
case s.atNewline && b == escape:
s.gotEscape = true
default:
out = append(out, b)
s.atNewline = b == '\r' || b == '\n'
}
}
return out, false
}

View File

@@ -0,0 +1,526 @@
package main
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"sync"
"testing"
"time"
"github.com/gorilla/websocket"
"github.com/spf13/cobra"
"github.com/multica-ai/multica/server/pkg/protocol"
)
func TestEscapeState_DetachOnFreshLine(t *testing.T) {
s := &newlineState{atNewline: true}
out, detach := s.process([]byte("~."), '~')
if !detach {
t.Fatalf("expected detach")
}
if len(out) != 0 {
t.Fatalf("expected no bytes forwarded, got %q", out)
}
}
func TestEscapeState_TildeNotAfterNewlineIsLiteral(t *testing.T) {
s := &newlineState{atNewline: false}
out, detach := s.process([]byte("foo~.bar"), '~')
if detach {
t.Fatalf("must not detach when ~ is mid-line")
}
if string(out) != "foo~.bar" {
t.Fatalf("got %q", out)
}
}
func TestEscapeState_DoubleTildeEmitsLiteral(t *testing.T) {
s := &newlineState{atNewline: true}
out, detach := s.process([]byte("~~"), '~')
if detach {
t.Fatalf("~~ must not detach")
}
if string(out) != "~" {
t.Fatalf("got %q want ~", out)
}
}
func TestEscapeState_StraddledChunks(t *testing.T) {
// User pastes/types ~ and . in two separate stdin reads — escape
// detection still works because state is preserved across calls.
s := &newlineState{atNewline: true}
out1, detach1 := s.process([]byte("~"), '~')
if detach1 || len(out1) != 0 {
t.Fatalf("first chunk: detach=%v out=%q", detach1, out1)
}
out2, detach2 := s.process([]byte("."), '~')
if !detach2 {
t.Fatalf("expected detach on second chunk")
}
if len(out2) != 0 {
t.Fatalf("second chunk should forward nothing, got %q", out2)
}
}
func TestEscapeState_DisabledWhenEscapeIsZero(t *testing.T) {
s := &newlineState{atNewline: true}
out, detach := s.process([]byte("~."), 0)
if detach {
t.Fatalf("disabled escape must not detach")
}
if string(out) != "~." {
t.Fatalf("got %q want ~.", out)
}
}
func TestEscapeState_UnknownEscapeForwardsBoth(t *testing.T) {
s := &newlineState{atNewline: true}
out, _ := s.process([]byte("~x"), '~')
if string(out) != "~x" {
t.Fatalf("got %q want ~x", out)
}
}
func TestBuildTerminalPathAndQuery(t *testing.T) {
got := buildTerminalPathAndQuery("MUL-2295", "ws-uuid", 120, 40)
u, err := url.Parse("http://x" + got)
if err != nil {
t.Fatalf("parse: %v", err)
}
if u.Path != "/ws/issues/MUL-2295/terminal" {
t.Errorf("path = %q", u.Path)
}
q := u.Query()
if q.Get("workspace_id") != "ws-uuid" {
t.Errorf("workspace_id = %q", q.Get("workspace_id"))
}
if q.Get("cols") != "120" {
t.Errorf("cols = %q", q.Get("cols"))
}
if q.Get("rows") != "40" {
t.Errorf("rows = %q", q.Get("rows"))
}
}
// fakeServer simulates the Phase 2 /ws/issues/{id}/terminal handshake plus
// a tiny echo loop, so we can drive the CLI proxy through its full lifecycle
// in-process without spinning up the real daemon.
type fakeServer struct {
t *testing.T
upgrader websocket.Upgrader
gotAuth chan string
gotData chan []byte
gotClose chan string
sessionID string
server *httptest.Server
connMu sync.Mutex
conn *websocket.Conn
sendOpenErr *protocol.TerminalErrorPayload // if set, send terminal.error instead of terminal.opened
}
// writeFrame serializes writes from the handler goroutine and any test
// goroutine that wants to push a frame to the connected client. Required
// because gorilla/websocket allows concurrent read+write but NOT concurrent
// writes from different goroutines.
func (fs *fakeServer) writeFrame(frame []byte) error {
fs.connMu.Lock()
defer fs.connMu.Unlock()
if fs.conn == nil {
return fmt.Errorf("no client")
}
return fs.conn.WriteMessage(websocket.TextMessage, frame)
}
func newFakeServer(t *testing.T) *fakeServer {
fs := &fakeServer{
t: t,
upgrader: websocket.Upgrader{},
gotAuth: make(chan string, 1),
gotData: make(chan []byte, 32),
gotClose: make(chan string, 1),
sessionID: "session-xyz",
}
fs.server = httptest.NewServer(http.HandlerFunc(fs.handle))
return fs
}
func (fs *fakeServer) close() {
fs.connMu.Lock()
c := fs.conn
fs.connMu.Unlock()
if c != nil {
c.Close()
}
fs.server.Close()
}
func (fs *fakeServer) baseURL() string { return fs.server.URL }
func (fs *fakeServer) handle(w http.ResponseWriter, r *http.Request) {
conn, err := fs.upgrader.Upgrade(w, r, nil)
if err != nil {
fs.t.Errorf("upgrade: %v", err)
return
}
fs.connMu.Lock()
fs.conn = conn
fs.connMu.Unlock()
// 1. Auth.
_, raw, err := conn.ReadMessage()
if err != nil {
return
}
var auth struct {
Type string `json:"type"`
Payload map[string]any `json:"payload"`
}
if err := json.Unmarshal(raw, &auth); err != nil || auth.Type != "auth" {
_ = fs.writeFrame([]byte(`{"error":"bad auth"}`))
return
}
tok, _ := auth.Payload["token"].(string)
fs.gotAuth <- tok
_ = fs.writeFrame([]byte(`{"type":"auth_ack"}`))
// 2. Open ack.
if fs.sendOpenErr != nil {
ep := *fs.sendOpenErr
frame, _ := marshalCLITerminalFrame(protocol.MessageTypeTerminalError, ep)
_ = fs.writeFrame(frame)
return
}
openedFrame, _ := marshalCLITerminalFrame(protocol.MessageTypeTerminalOpened, protocol.TerminalOpenedPayload{
SessionID: fs.sessionID,
WorkDir: "/tmp/work",
Shell: "/bin/bash",
})
_ = fs.writeFrame(openedFrame)
// 3. Pump.
for {
_, raw, err := conn.ReadMessage()
if err != nil {
return
}
var env protocol.Message
if err := json.Unmarshal(raw, &env); err != nil {
continue
}
switch env.Type {
case protocol.MessageTypeTerminalData:
var pl protocol.TerminalDataPayload
if err := json.Unmarshal(env.Payload, &pl); err != nil {
continue
}
data, _ := base64.StdEncoding.DecodeString(pl.DataB64)
fs.gotData <- data
// Echo back so the CLI's stdout pump has something to do.
echo, _ := marshalCLITerminalFrame(protocol.MessageTypeTerminalData, protocol.TerminalDataPayload{
SessionID: fs.sessionID,
DataB64: pl.DataB64,
})
_ = fs.writeFrame(echo)
case protocol.MessageTypeTerminalClose:
var pl protocol.TerminalClosePayload
_ = json.Unmarshal(env.Payload, &pl)
fs.gotClose <- pl.Reason
return
case protocol.MessageTypeTerminalResize:
// observed but unused in this fake
}
}
}
func newTestCmd() *cobra.Command {
c := &cobra.Command{}
c.Flags().String("escape-char", "~", "")
c.Flags().Bool("no-raw", true, "")
return c
}
func TestCLITerminalProxy_HandshakeAndEcho(t *testing.T) {
fs := newFakeServer(t)
defer fs.close()
wsURL := strings.Replace(fs.baseURL(), "http://", "ws://", 1) + "/"
dialer := *websocket.DefaultDialer
conn, _, err := dialer.Dial(wsURL, nil)
if err != nil {
t.Fatalf("dial: %v", err)
}
stdinR, stdinW := io.Pipe()
stdout := newSafeBuffer()
stderr := newSafeBuffer()
cmd := newTestCmd()
p := newCLITerminalProxy(conn, stdinR, stdout, stderr, "mul_test", cmd)
// Drive handshake explicitly so we can also assert the auth token reached
// the fake server.
if err := p.handshake(); err != nil {
t.Fatalf("handshake: %v", err)
}
select {
case got := <-fs.gotAuth:
if got != "mul_test" {
t.Errorf("auth token = %q, want mul_test", got)
}
case <-time.After(2 * time.Second):
t.Fatal("server did not receive auth frame")
}
if p.SessionID() != fs.sessionID {
t.Fatalf("session_id = %q, want %q", p.SessionID(), fs.sessionID)
}
// Now run the pumps in a goroutine.
pumpsDone := make(chan struct{})
go func() {
go p.readPump()
p.stdinPump(false)
close(pumpsDone)
}()
// Send "hello" through stdin; expect server to receive it and echo it
// back into stdout.
if _, err := stdinW.Write([]byte("hello")); err != nil {
t.Fatalf("stdin write: %v", err)
}
select {
case got := <-fs.gotData:
if string(got) != "hello" {
t.Fatalf("server got %q, want hello", got)
}
case <-time.After(2 * time.Second):
t.Fatal("server did not receive data")
}
// Wait for the echo to land in stdout.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if strings.Contains(stdout.String(), "hello") {
break
}
time.Sleep(10 * time.Millisecond)
}
if !strings.Contains(stdout.String(), "hello") {
t.Fatalf("stdout missing echo, got %q", stdout.String())
}
// Trigger detach: send "\n~." after a newline. Because stdinPump starts
// the state machine at atNewline=true on the very first byte, we need
// to walk through a real newline first to make the test realistic.
if _, err := stdinW.Write([]byte("\n~.")); err != nil {
t.Fatalf("stdin write detach: %v", err)
}
select {
case <-pumpsDone:
case <-time.After(3 * time.Second):
t.Fatal("stdin pump did not exit after detach")
}
select {
case reason := <-fs.gotClose:
if reason != "client_detach" {
t.Errorf("close reason = %q, want client_detach", reason)
}
case <-time.After(2 * time.Second):
t.Fatal("server did not receive terminal.close on detach")
}
// run() prints the exit message to stderr; in this lower-level test we
// drive the pumps directly, so check the captured exit message.
msgPtr := p.exitMsg.Load()
if msgPtr == nil || !strings.Contains(*msgPtr, "detached") {
got := ""
if msgPtr != nil {
got = *msgPtr
}
t.Errorf("exit msg = %q, want detach text", got)
}
}
// safeBuffer is a tiny mutex-wrapped bytes.Buffer for tests that read from
// the buffer in one goroutine while another writes (race-detector-clean).
type safeBuffer struct {
mu sync.Mutex
buf bytes.Buffer
}
func newSafeBuffer() *safeBuffer { return &safeBuffer{} }
func (b *safeBuffer) Write(p []byte) (int, error) {
b.mu.Lock()
defer b.mu.Unlock()
return b.buf.Write(p)
}
func (b *safeBuffer) String() string {
b.mu.Lock()
defer b.mu.Unlock()
return b.buf.String()
}
func TestCLITerminalProxy_HandshakeRejectedOnTerminalError(t *testing.T) {
fs := newFakeServer(t)
fs.sendOpenErr = &protocol.TerminalErrorPayload{
Code: protocol.TerminalErrorCodeTaskNotFound,
Message: "no agent task on this issue",
}
defer fs.close()
wsURL := strings.Replace(fs.baseURL(), "http://", "ws://", 1) + "/"
dialer := *websocket.DefaultDialer
conn, _, err := dialer.Dial(wsURL, nil)
if err != nil {
t.Fatalf("dial: %v", err)
}
cmd := newTestCmd()
p := newCLITerminalProxy(conn, strings.NewReader(""), io.Discard, io.Discard, "mul_test", cmd)
err = p.handshake()
if err == nil {
t.Fatal("expected handshake error, got nil")
}
if !strings.Contains(err.Error(), protocol.TerminalErrorCodeTaskNotFound) {
t.Errorf("error %q does not mention error code", err)
}
}
func TestCLITerminalProxy_AuthRejected(t *testing.T) {
upgrader := websocket.Upgrader{}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
// Read auth frame, reply with error.
_, _, _ = conn.ReadMessage()
_ = conn.WriteMessage(websocket.TextMessage, []byte(`{"error":"invalid token"}`))
}))
defer server.Close()
wsURL := strings.Replace(server.URL, "http://", "ws://", 1) + "/"
dialer := *websocket.DefaultDialer
conn, _, err := dialer.Dial(wsURL, nil)
if err != nil {
t.Fatalf("dial: %v", err)
}
cmd := newTestCmd()
p := newCLITerminalProxy(conn, strings.NewReader(""), io.Discard, io.Discard, "mul_test", cmd)
err = p.handshake()
if err == nil {
t.Fatal("expected handshake error, got nil")
}
if !strings.Contains(err.Error(), "invalid token") {
t.Errorf("error %q does not surface server reason", err)
}
}
func TestCLITerminalProxy_TerminalExitDeliversCode(t *testing.T) {
// Driver: open server, advance through handshake, then push a
// terminal.exit frame and verify the proxy's exit code state.
fs := newFakeServer(t)
defer fs.close()
wsURL := strings.Replace(fs.baseURL(), "http://", "ws://", 1) + "/"
dialer := *websocket.DefaultDialer
conn, _, err := dialer.Dial(wsURL, nil)
if err != nil {
t.Fatalf("dial: %v", err)
}
cmd := newTestCmd()
p := newCLITerminalProxy(conn, strings.NewReader(""), io.Discard, io.Discard, "mul_test", cmd)
if err := p.handshake(); err != nil {
t.Fatalf("handshake: %v", err)
}
exitFrame, _ := marshalCLITerminalFrame(protocol.MessageTypeTerminalExit, protocol.TerminalExitPayload{
SessionID: fs.sessionID,
ExitCode: 42,
Reason: "child exited",
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
p.readPump()
}()
if err := fs.writeFrame(exitFrame); err != nil {
t.Fatalf("server write exit: %v", err)
}
doneAt := time.Now().Add(2 * time.Second)
for time.Now().Before(doneAt) {
if p.exitCode.Load() == 42 {
break
}
time.Sleep(10 * time.Millisecond)
}
wg.Wait()
if got := p.exitCode.Load(); got != 42 {
t.Fatalf("exit code = %d, want 42", got)
}
msgPtr := p.exitMsg.Load()
if msgPtr == nil || !strings.Contains(*msgPtr, "exit code 42") {
got := ""
if msgPtr != nil {
got = *msgPtr
}
t.Errorf("exit msg = %q", got)
}
}
// Compile-time check: ensure the marshaled frame round-trips through the
// real protocol.Message envelope. Catches any drift if the protocol pkg
// renames a field.
func TestMarshalCLITerminalFrame_EnvelopeShape(t *testing.T) {
frame, err := marshalCLITerminalFrame(protocol.MessageTypeTerminalResize, protocol.TerminalResizePayload{
SessionID: "sid",
Cols: 100,
Rows: 30,
})
if err != nil {
t.Fatal(err)
}
var env protocol.Message
if err := json.Unmarshal(frame, &env); err != nil {
t.Fatal(err)
}
if env.Type != protocol.MessageTypeTerminalResize {
t.Fatalf("type = %q", env.Type)
}
var pl protocol.TerminalResizePayload
if err := json.Unmarshal(env.Payload, &pl); err != nil {
t.Fatal(err)
}
if pl.Cols != 100 || pl.Rows != 30 || pl.SessionID != "sid" {
t.Fatalf("payload = %+v", pl)
}
}
// Sanity check the help string does not crash on a zero escape byte.
func TestEscapeHelpString(t *testing.T) {
if got := escapeHelpString(0); got != "(disabled)" {
t.Errorf("escape disabled hint = %q", got)
}
if got := escapeHelpString('~'); !strings.Contains(got, "~") {
t.Errorf("escape help = %q", got)
}
}

View File

@@ -0,0 +1,40 @@
//go:build !windows
package main
import (
"os"
"os/signal"
"syscall"
"golang.org/x/term"
)
// startResizeWatcher installs a SIGWINCH handler that pushes the new local
// terminal size to the daemon every time the user resizes their window.
// Returns a stop function that uninstalls the handler and exits the
// goroutine. On platforms without SIGWINCH (Windows) the windows-tagged
// implementation polls term.GetSize on a timer instead.
func startResizeWatcher(p *cliTerminalProxy) func() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGWINCH)
stop := make(chan struct{})
go func() {
for {
select {
case <-ch:
if c, r, err := term.GetSize(int(os.Stdout.Fd())); err == nil && c > 0 && r > 0 {
_ = p.sendResize(uint16(c), uint16(r))
}
case <-stop:
return
}
}
}()
return func() {
signal.Stop(ch)
close(stop)
}
}

View File

@@ -0,0 +1,39 @@
//go:build windows
package main
import (
"os"
"time"
"golang.org/x/term"
)
// startResizeWatcher polls the local terminal size on a timer, since
// Windows has no SIGWINCH equivalent that is reliable for console resize
// events. 500ms is a compromise between responsiveness and CPU cost.
func startResizeWatcher(p *cliTerminalProxy) func() {
stop := make(chan struct{})
go func() {
var lastC, lastR int
t := time.NewTicker(500 * time.Millisecond)
defer t.Stop()
for {
select {
case <-stop:
return
case <-t.C:
c, r, err := term.GetSize(int(os.Stdout.Fd()))
if err != nil || c <= 0 || r <= 0 {
continue
}
if c == lastC && r == lastR {
continue
}
lastC, lastR = c, r
_ = p.sendResize(uint16(c), uint16(r))
}
}
}()
return func() { close(stop) }
}

View File

@@ -197,6 +197,13 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
realtime.HandleWebSocket(hub, mc, pr, slugResolver, w, r)
})
// Terminal proxy WebSocket: browser ↔ server ↔ daemonws hub ↔ daemon PTY.
// Auth is cookie-or-first-frame (browsers can't set Authorization on a
// WS upgrade), matching the /ws pattern above. Workspace + issue
// membership is enforced inside the handler before the upgrade so a
// 403 surfaces as an HTTP response rather than a silent WS close.
r.Get("/ws/issues/{issue_id}/terminal", h.HandleIssueTerminalWS)
// Local file serving (when using local storage)
if local, ok := store.(*storage.LocalStorage); ok {
r.Get("/uploads/*", func(w http.ResponseWriter, r *http.Request) {
@@ -352,6 +359,7 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
r.Post("/tasks/{taskId}/cancel", h.CancelTask)
r.Post("/rerun", h.RerunIssue)
r.Get("/task-runs", h.ListTasksByIssue)
r.Get("/terminal-sessions", h.ListTerminalSessionsByIssue)
r.Get("/usage", h.GetIssueUsage)
r.Post("/reactions", h.AddIssueReaction)
r.Delete("/reactions", h.RemoveIssueReaction)

View File

@@ -8,6 +8,7 @@ require (
github.com/aws/aws-sdk-go-v2/credentials v1.19.13
github.com/aws/aws-sdk-go-v2/service/s3 v1.97.3
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.5
github.com/creack/pty v1.1.21
github.com/go-chi/chi/v5 v5.2.5
github.com/go-chi/cors v1.2.2
github.com/golang-jwt/jwt/v5 v5.3.1
@@ -23,6 +24,7 @@ require (
github.com/resend/resend-go/v2 v2.28.0
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cobra v1.10.2
golang.org/x/term v0.43.0
)
require (
@@ -57,7 +59,7 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/sys v0.44.0 // indirect
golang.org/x/text v0.35.0 // indirect
google.golang.org/protobuf v1.36.8 // indirect
)

View File

@@ -48,6 +48,8 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.21 h1:1/QdRyBaHHJP61QkWMXlOIBfsgdDeeKfK8SYVUWJKf0=
github.com/creack/pty v1.1.21/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -135,8 +137,10 @@ go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/term v0.43.0 h1:S4RLU2sB31O/NCl+zFN9Aru9A/Cq2aqKpTZJ6B+DwT4=
golang.org/x/term v0.43.0/go.mod h1:lrhlHNdQJHO+1qVYiHfFKVuVioJIheAc3fBSMFYEIsk=
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=

View File

@@ -133,7 +133,12 @@ func (c *APIClient) GetJSON(ctx context.Context, path string, out any) error {
if resp.StatusCode >= 400 {
data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return fmt.Errorf("GET %s returned %d: %s", path, resp.StatusCode, strings.TrimSpace(string(data)))
return &HTTPError{
Method: http.MethodGet,
Path: path,
StatusCode: resp.StatusCode,
Body: strings.TrimSpace(string(data)),
}
}
if out == nil {
return nil
@@ -159,7 +164,12 @@ func (c *APIClient) GetJSONWithHeaders(ctx context.Context, path string, out any
if resp.StatusCode >= 400 {
data, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return nil, fmt.Errorf("GET %s returned %d: %s", path, resp.StatusCode, strings.TrimSpace(string(data)))
return nil, &HTTPError{
Method: http.MethodGet,
Path: path,
StatusCode: resp.StatusCode,
Body: strings.TrimSpace(string(data)),
}
}
if out != nil {
if err := json.NewDecoder(resp.Body).Decode(out); err != nil {

97
server/internal/cli/ws.go Normal file
View File

@@ -0,0 +1,97 @@
package cli
import (
"context"
"fmt"
"net/http"
"net/url"
"strings"
"github.com/gorilla/websocket"
)
// DialWebSocket opens a WebSocket connection to the server at the given path
// + query string. The path must start with "/". Auth is intentionally NOT
// sent as a header here: the server's terminal endpoint runs WS upgrade
// before applying header-based auth middleware (browsers cannot set
// Authorization on a WS upgrade), so the caller authenticates via the
// first-frame `auth` message instead. The standard X-Workspace-ID /
// X-Client-* identity headers are still attached so dashboards can attribute
// the connection to the right CLI build.
func (c *APIClient) DialWebSocket(ctx context.Context, pathAndQuery string) (*websocket.Conn, *http.Response, error) {
if c.BaseURL == "" {
return nil, nil, fmt.Errorf("APIClient has no BaseURL")
}
wsURL, err := httpToWSURL(c.BaseURL, pathAndQuery)
if err != nil {
return nil, nil, err
}
header := http.Header{}
c.setWSHeaders(header)
dialer := *websocket.DefaultDialer
conn, resp, err := dialer.DialContext(ctx, wsURL, header)
if err != nil {
return nil, resp, err
}
return conn, resp, nil
}
// setWSHeaders attaches identity headers but deliberately omits the
// Authorization header. Auth happens in-band via the first frame so this
// stays consistent with cookie-based browser clients.
func (c *APIClient) setWSHeaders(h http.Header) {
if c.WorkspaceID != "" {
h.Set("X-Workspace-ID", c.WorkspaceID)
}
platform := c.Platform
if platform == "" {
platform = ClientPlatform
}
if platform != "" {
h.Set("X-Client-Platform", platform)
}
version := c.Version
if version == "" {
version = ClientVersion
}
if version != "" {
h.Set("X-Client-Version", version)
}
osName := c.OS
if osName == "" {
osName = ClientOS
}
if osName != "" {
h.Set("X-Client-OS", osName)
}
}
func httpToWSURL(baseURL, pathAndQuery string) (string, error) {
u, err := url.Parse(baseURL)
if err != nil {
return "", fmt.Errorf("parse base URL: %w", err)
}
switch strings.ToLower(u.Scheme) {
case "http":
u.Scheme = "ws"
case "https":
u.Scheme = "wss"
case "ws", "wss":
// already WS
default:
return "", fmt.Errorf("unsupported base URL scheme %q", u.Scheme)
}
if !strings.HasPrefix(pathAndQuery, "/") {
return "", fmt.Errorf("path must start with /, got %q", pathAndQuery)
}
suffix, err := url.Parse(pathAndQuery)
if err != nil {
return "", fmt.Errorf("parse path/query: %w", err)
}
u.Path = strings.TrimRight(u.Path, "/") + suffix.Path
u.RawQuery = suffix.RawQuery
u.Fragment = ""
return u.String(), nil
}

View File

@@ -0,0 +1,125 @@
package cli
import (
"context"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/gorilla/websocket"
)
func TestHTTPToWSURL(t *testing.T) {
cases := []struct {
name string
base string
path string
want string
wantErr bool
}{
{
name: "https → wss",
base: "https://api.example.com",
path: "/ws/issues/abc/terminal?workspace_id=ws1&cols=80",
want: "wss://api.example.com/ws/issues/abc/terminal?workspace_id=ws1&cols=80",
},
{
name: "http → ws",
base: "http://localhost:8080",
path: "/ws/issues/x/terminal",
want: "ws://localhost:8080/ws/issues/x/terminal",
},
{
name: "wss left alone",
base: "wss://api.example.com",
path: "/ws",
want: "wss://api.example.com/ws",
},
{
name: "trailing slash on base preserved correctly",
base: "https://api.example.com/",
path: "/ws/x",
want: "wss://api.example.com/ws/x",
},
{
name: "missing leading slash on path",
base: "https://api.example.com",
path: "ws/x",
wantErr: true,
},
{
name: "unsupported scheme",
base: "ftp://example.com",
path: "/ws",
wantErr: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got, err := httpToWSURL(tc.base, tc.path)
if tc.wantErr {
if err == nil {
t.Fatalf("expected error, got %q", got)
}
return
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got != tc.want {
t.Fatalf("got %q want %q", got, tc.want)
}
})
}
}
func TestDialWebSocketAttachesIdentityHeaders(t *testing.T) {
upgrader := websocket.Upgrader{}
gotHeaders := make(chan http.Header, 1)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotHeaders <- r.Header.Clone()
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
conn.Close()
}))
defer server.Close()
client := NewAPIClient(server.URL, "ws-uuid", "mul_test_token")
client.Platform = "cli"
client.Version = "1.2.3"
client.OS = "macos"
conn, _, err := client.DialWebSocket(context.Background(), "/ws")
if err != nil {
t.Fatalf("dial: %v", err)
}
defer conn.Close()
headers := <-gotHeaders
if got := headers.Get("X-Workspace-ID"); got != "ws-uuid" {
t.Errorf("X-Workspace-ID = %q, want ws-uuid", got)
}
if got := headers.Get("X-Client-Platform"); got != "cli" {
t.Errorf("X-Client-Platform = %q, want cli", got)
}
if got := headers.Get("X-Client-Version"); got != "1.2.3" {
t.Errorf("X-Client-Version = %q, want 1.2.3", got)
}
if got := headers.Get("X-Client-OS"); got != "macos" {
t.Errorf("X-Client-OS = %q, want macos", got)
}
if got := headers.Get("Authorization"); got != "" {
// The server's terminal endpoint runs WS upgrade before any header
// auth middleware, so the CLI must authenticate via the first frame
// to match cookie-based browser clients. Sending a Bearer header
// here would silently work in some setups and silently fail in
// others — keep it consistent and absent.
t.Errorf("Authorization header should NOT be set on WS dial, got %q", got)
}
if got := headers.Get("Sec-WebSocket-Key"); !strings.HasPrefix(strings.TrimSpace(got), "") || got == "" {
t.Errorf("Sec-WebSocket-Key missing")
}
}

View File

@@ -18,6 +18,7 @@ import (
"github.com/multica-ai/multica/server/internal/cli"
"github.com/multica-ai/multica/server/internal/daemon/execenv"
"github.com/multica-ai/multica/server/internal/daemon/repocache"
"github.com/multica-ai/multica/server/internal/daemon/terminal"
"github.com/multica-ai/multica/server/pkg/agent"
)
@@ -138,6 +139,24 @@ type Daemon struct {
// deleted bare clone and an unrelated `not empty` cleanup failure.
bgSyncs sync.WaitGroup
// terminalManager owns every live PTY this daemon hosts on behalf of
// users opening the Issue → Terminal panel. Constructed in New() so
// tests can override the manager config via cfg fields if needed
// without forcing the production daemon to wire it lazily.
terminalManager *terminal.Manager
// terminalBridge mediates between the WS hub (server-side) and the
// terminalManager. Only non-nil while a daemonws connection is up; the
// wakeup loop swaps in a fresh bridge on every reconnect because
// session_ids cannot survive a hub disconnect.
terminalBridgeMu sync.RWMutex
terminalBridge *terminalBridge
// wsWritesMu guards wsWrites. wsWrites is the active daemonws writer
// queue (nil while disconnected); the terminalBridge funnels every
// outbound terminal.* frame through Daemon.sendWSFrame, which reads
// this pointer under the lock so reconnects don't race the bridge.
wsWritesMu sync.RWMutex
wsWrites chan<- []byte
runner taskRunner // executes agent tasks; set to d.runTask by New(), overridable in tests
cancelPollInterval time.Duration // how often handleTask polls for server-side cancellation; overridable in tests
// runUpdateFn executes the brew-or-download upgrade. Set to d.runUpdate by
@@ -171,9 +190,113 @@ func New(cfg Config, logger *slog.Logger) *Daemon {
}
d.runner = taskRunnerFunc(d.runTask)
d.runUpdateFn = d.runUpdate
// The terminal manager has no TaskLookup wired: every Open call goes
// through OpenWithInfo using a TaskInfo that the server resolved from
// the DB before forwarding terminal.open over daemonws (the daemon
// does not maintain a persistent task cache).
d.terminalManager = terminal.NewManager(terminal.ManagerConfig{
IdleTimeout: terminal.DefaultIdleTimeout,
Logger: logger,
OnSessionStart: d.onTerminalSessionStart,
OnSessionStop: d.onTerminalSessionStop,
}, nil)
return d
}
// sendWSFrame pushes a raw frame onto the current daemonws writer queue
// without blocking. Returns false when no connection is active or the
// writer queue is saturated. Used for *droppable* traffic (currently the
// heartbeat sender's fallback path): bytes lost here are recoverable on
// the next tick. PTY output must NOT use this path — it goes through
// sendWSFrameCtx so a saturated writer back-pressures the producer
// instead of corrupting the terminal stream.
func (d *Daemon) sendWSFrame(frame []byte) bool {
d.wsWritesMu.RLock()
writes := d.wsWrites
d.wsWritesMu.RUnlock()
if writes == nil {
return false
}
select {
case writes <- frame:
return true
default:
return false
}
}
// sendWSFrameCtx pushes a frame onto the daemonws writer queue and BLOCKS
// until either the queue accepts it or ctx is canceled. Returns true when
// the frame was queued, false when no writer is active or ctx fired first.
//
// This is the real-backpressure path for the terminal bridge: when the
// hub is saturated, slowing the PTY reader down (which slows the child
// process via its own stdout buffer) is the only way to preserve the byte
// stream. Dropping silently would print partial output to the user.
//
// Safety: the caller's ctx is the terminal pump's ctx, which the bridge
// teardown (clearWSWrites → bridge.closeAll) cancels and *waits on* before
// close(writes) runs in the wakeup loop. That ordering means we never send
// on a closed channel even though we don't hold any lock around the send.
func (d *Daemon) sendWSFrameCtx(ctx context.Context, frame []byte) bool {
d.wsWritesMu.RLock()
writes := d.wsWrites
d.wsWritesMu.RUnlock()
if writes == nil {
return false
}
select {
case writes <- frame:
return true
case <-ctx.Done():
return false
}
}
// installWSWrites stores the connection-local writer queue so the bridge
// can address it through Daemon.sendWSFrame. A fresh terminalBridge is
// installed each call: session_ids minted on a previous WS connection are
// not valid on the new one (the server-side proxy registered routing
// against the old hub client), so it's cleaner to tear every PTY down
// than to half-revive them.
func (d *Daemon) installWSWrites(writes chan<- []byte) {
d.wsWritesMu.Lock()
d.wsWrites = writes
d.wsWritesMu.Unlock()
bridge := newTerminalBridge(d.terminalManager, d.logger, d.sendWSFrame, d.sendWSFrameCtx)
d.terminalBridgeMu.Lock()
prev := d.terminalBridge
d.terminalBridge = bridge
d.terminalBridgeMu.Unlock()
if prev != nil {
prev.closeAll("ws_reconnect")
}
}
// clearWSWrites removes the writer pointer and tears down every live
// terminal session bound to this connection. Called from the wakeup
// connection's deferred cleanup.
func (d *Daemon) clearWSWrites() {
d.wsWritesMu.Lock()
d.wsWrites = nil
d.wsWritesMu.Unlock()
d.terminalBridgeMu.Lock()
bridge := d.terminalBridge
d.terminalBridge = nil
d.terminalBridgeMu.Unlock()
if bridge != nil {
bridge.closeAll("ws_disconnect")
}
}
func (d *Daemon) currentTerminalBridge() *terminalBridge {
d.terminalBridgeMu.RLock()
defer d.terminalBridgeMu.RUnlock()
return d.terminalBridge
}
// setAgentVersion records the detected CLI version for an agent provider so
// later task-dispatch code (e.g. Codex sandbox policy) can read it.
func (d *Daemon) setAgentVersion(provider, version string) {
@@ -2987,6 +3110,46 @@ func (d *Daemon) isActiveEnvRoot(envRoot string) bool {
return d.activeEnvRoots[envRoot] > 0
}
// onTerminalSessionStart is the terminal.Manager OnSessionStart hook. It
// reference-counts the session's env root into activeEnvRoots so the GC
// loop's isActiveEnvRoot check protects the workdir while a terminal is
// attached. Without this, an idle terminal on a done/cancelled issue
// would have its workdir reclaimed out from under the user on the next
// GC cycle (the issue's TTL alone doesn't notice live terminal activity).
//
// Audit log is emitted here as a structured slog record so operators can
// reconstruct who attached to which workdir when, without surfacing
// keystrokes — see RFC §Auth.
func (d *Daemon) onTerminalSessionStart(s *terminal.PtySession) {
envRoot := filepath.Dir(s.WorkDir())
d.markActiveEnvRoot(envRoot)
d.logger.Info("terminal: session opened",
"session_id", s.ID(),
"task_id", s.TaskID(),
"workspace_id", s.WorkspaceID(),
"issue_id", s.IssueID(),
"user_id", s.UserID(),
"work_dir", s.WorkDir(),
"shell", s.Shell(),
)
}
// onTerminalSessionStop is the terminal.Manager OnSessionStop hook. Pairs
// with onTerminalSessionStart's mark and emits the close audit record
// (with duration) for operator visibility.
func (d *Daemon) onTerminalSessionStop(s *terminal.PtySession) {
envRoot := filepath.Dir(s.WorkDir())
d.unmarkActiveEnvRoot(envRoot)
d.logger.Info("terminal: session closed",
"session_id", s.ID(),
"task_id", s.TaskID(),
"workspace_id", s.WorkspaceID(),
"issue_id", s.IssueID(),
"user_id", s.UserID(),
"duration", time.Since(s.StartedAt()).Round(time.Second).String(),
)
}
// shortID returns the first 8 characters of an ID for readable logs.
func shortID(id string) string {
if len(id) <= 8 {

View File

@@ -0,0 +1,14 @@
// Package terminal manages interactive PTY sessions bound to a task's
// workdir on the local daemon.
//
// A Manager owns the lifecycle of all live PtySessions. Callers (the
// daemonws bridge today, the CLI socket later) translate WebSocket
// terminal.* frames into method calls on Manager, and the Manager
// forwards PTY output back through a per-session Output channel.
//
// Sessions run with the daemon process's identity — there is no
// additional sandbox. This is the same trust boundary as agent runs
// (which are also daemon-spawned child processes). The Manager only
// enforces that the requesting client's workspace matches the task's
// workspace; anything beyond that is the OS's responsibility.
package terminal

View File

@@ -0,0 +1,14 @@
package terminal
import "errors"
// Sentinel errors returned by Manager. Callers map these to the
// protocol.TerminalErrorCode* constants when reporting to clients.
var (
ErrTaskNotFound = errors.New("terminal: task not found")
ErrWorkspaceMismatch = errors.New("terminal: task belongs to a different workspace")
ErrSessionNotFound = errors.New("terminal: session not found")
ErrUnsupportedOS = errors.New("terminal: PTY not supported on this OS")
ErrSpawnFailed = errors.New("terminal: failed to spawn shell")
ErrManagerClosed = errors.New("terminal: manager is shut down")
)

View File

@@ -0,0 +1,371 @@
package terminal
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"github.com/google/uuid"
)
// DefaultIdleTimeout is the recommended IdleTimeout for production
// daemon wiring. Callers must set ManagerConfig.IdleTimeout to this
// (or any positive duration) explicitly; zero/negative disables the
// idle sweep.
const DefaultIdleTimeout = 60 * time.Minute
// TaskInfo is the subset of task state the Manager needs to set up a PTY.
// The daemon resolves a TaskID into TaskInfo via TaskLookup at open time.
type TaskInfo struct {
TaskID string
WorkspaceID string
IssueID string
WorkDir string
PriorSessionID string // injected as CLAUDE_SESSION_ID for `claude --resume`
}
// TaskLookup resolves a TaskID into the workdir + workspace required to
// open a PTY. Returns ErrTaskNotFound when the task is unknown. Lookups
// hit the daemon's local task cache in production; tests supply a stub.
type TaskLookup func(ctx context.Context, taskID string) (TaskInfo, error)
// OpenParams is the input to Manager.Open.
type OpenParams struct {
// TaskID identifies the workdir the PTY should run in.
TaskID string
// WorkspaceID is the workspace the caller is acting on behalf of.
// Open rejects the request if it does not match the task's workspace
// (cross-workspace ACL — clients never see other workspaces' workdirs).
WorkspaceID string
// UserID is the human user who opened the terminal. Logged in audit
// records; the PTY itself runs as the daemon process owner.
UserID string
// Cols/Rows seed the initial PTY window size. Zero values default to 80x24.
Cols uint16
Rows uint16
}
// ManagerConfig tunes Manager behaviour. Zero values are sensible defaults.
type ManagerConfig struct {
// Shell to spawn for each session. Defaults to "bash" with "-l".
// Overridable for tests; the production daemon hardcodes bash for now
// (RFC open question #4 — shell selection deferred to a later release).
ShellPath string
ShellArgs []string
// IdleTimeout closes a session that has had no I/O for this long.
// Zero or negative disables the sweep entirely. Production daemon
// wiring should pass DefaultIdleTimeout explicitly; we intentionally
// don't default here so callers stay in control (the docs page for
// this package previously said "0 disables" while NewManager silently
// rewrote 0 to 60min — those two have to agree).
IdleTimeout time.Duration
// Spawner overrides PTY spawning. Defaults to ptyStartShell which
// shells out to creack/pty. Tests inject a fake to avoid forking.
Spawner Spawner
// Now returns the current time. Defaults to time.Now. Tests inject a
// fake clock to drive IdleTimeout deterministically.
Now func() time.Time
// Logger receives operational events. Defaults to slog.Default().
Logger *slog.Logger
// OnSessionStart fires synchronously after a PTY has spawned and the
// session is registered. Wired by the daemon to mark the env root as
// active so the GC loop's isActiveEnvRoot check protects the workdir
// for as long as a terminal is attached — without this, a long-idle
// terminal on a done/cancelled issue would have its workdir reclaimed
// out from under the user. Called from Open's caller goroutine.
OnSessionStart func(s *PtySession)
// OnSessionStop fires from waitLoop after the session has been fully
// finalized (output closed, deregistered, Done closed). Daemons that
// reference-counted in OnSessionStart unmark here. Called exactly once
// per session, regardless of close reason.
OnSessionStop func(s *PtySession)
}
// Manager owns all live PtySessions on this daemon. It is safe for
// concurrent use.
type Manager struct {
cfg ManagerConfig
lookup TaskLookup
mu sync.Mutex
sessions map[string]*PtySession
closed bool
// closeDone is closed by the first Close() caller AFTER finalize
// finishes (every session deregistered, Done() closed). Subsequent
// concurrent callers wait on it instead of racing past, so all
// Close() returns share the same "manager fully drained" guarantee.
closeDone chan struct{}
}
// NewManager constructs a Manager. lookup may be nil in tests that only
// exercise direct session APIs.
func NewManager(cfg ManagerConfig, lookup TaskLookup) *Manager {
if cfg.ShellPath == "" {
cfg.ShellPath = "bash"
cfg.ShellArgs = []string{"-l"}
}
// IdleTimeout intentionally not defaulted — see ManagerConfig.
if cfg.Spawner == nil {
cfg.Spawner = realSpawner{}
}
if cfg.Now == nil {
cfg.Now = time.Now
}
if cfg.Logger == nil {
cfg.Logger = slog.Default()
}
return &Manager{
cfg: cfg,
lookup: lookup,
sessions: make(map[string]*PtySession),
closeDone: make(chan struct{}),
}
}
// Open spawns a new PTY session for the given task. The returned
// session is also registered with the manager and retrievable via Get.
func (m *Manager) Open(ctx context.Context, p OpenParams) (*PtySession, error) {
if m.lookup == nil {
return nil, fmt.Errorf("terminal: Manager has no TaskLookup configured")
}
info, err := m.lookup(ctx, p.TaskID)
if err != nil {
return nil, err
}
if info.WorkspaceID != p.WorkspaceID {
return nil, ErrWorkspaceMismatch
}
if info.WorkDir == "" {
return nil, ErrTaskNotFound
}
return m.openWith(info, p)
}
// OpenWithInfo spawns a PTY against a caller-supplied TaskInfo, bypassing
// the configured TaskLookup. Production daemon wiring takes this path
// because the server resolves task metadata from its DB before forwarding
// terminal.open over the daemonws hub — the daemon trusts the server-
// authenticated payload and does not have its own task cache. The
// workspace mismatch check still runs so a server bug or a relayed frame
// cannot cross workspace boundaries.
func (m *Manager) OpenWithInfo(_ context.Context, info TaskInfo, p OpenParams) (*PtySession, error) {
if info.WorkspaceID != p.WorkspaceID {
return nil, ErrWorkspaceMismatch
}
if info.WorkDir == "" {
return nil, ErrTaskNotFound
}
if info.TaskID == "" {
info.TaskID = p.TaskID
}
return m.openWith(info, p)
}
func (m *Manager) openWith(info TaskInfo, p OpenParams) (*PtySession, error) {
cols, rows := normalizeSize(p.Cols, p.Rows)
env := buildEnv(info, p.UserID)
m.mu.Lock()
if m.closed {
m.mu.Unlock()
return nil, ErrManagerClosed
}
m.mu.Unlock()
startedAt := m.cfg.Now()
pty, err := m.cfg.Spawner.Start(SpawnRequest{
Shell: m.cfg.ShellPath,
Args: m.cfg.ShellArgs,
Cwd: info.WorkDir,
Env: env,
Cols: cols,
Rows: rows,
Started: startedAt,
})
if err != nil {
// Double-%w so errors.Is matches both ErrSpawnFailed AND any
// sentinel the spawner surfaced (notably ErrUnsupportedOS from
// the windows stub — the protocol layer needs to distinguish
// "no PTY on this OS" from generic spawn failures).
return nil, fmt.Errorf("%w: %w", ErrSpawnFailed, err)
}
sess := &PtySession{
id: uuid.NewString(),
taskID: info.TaskID,
workspaceID: info.WorkspaceID,
issueID: info.IssueID,
workDir: info.WorkDir,
userID: p.UserID,
shellPath: m.cfg.ShellPath,
cols: cols,
rows: rows,
pty: pty,
output: make(chan []byte, 64),
exit: make(chan ExitInfo, 1),
done: make(chan struct{}),
stop: make(chan struct{}),
now: m.cfg.Now,
idleTimeout: m.cfg.IdleTimeout,
startedAt: startedAt,
lastIO: startedAt,
logger: m.cfg.Logger.With("session_id_pending", true, "task_id", info.TaskID),
onClose: func(id string) { m.deregister(id) },
}
sess.logger = m.cfg.Logger.With("session_id", sess.id, "task_id", info.TaskID)
m.mu.Lock()
if m.closed {
m.mu.Unlock()
_ = pty.Close()
// We won that race: spawn succeeded but the manager closed before
// we could register the session, so waitLoop never runs. Reap the
// child synchronously here — pty.Close fires SIGHUP/SIGKILL but
// only Wait() collects the exit status, otherwise the unix child
// stays around as a zombie until the daemon process dies.
_, _ = pty.Wait()
return nil, ErrManagerClosed
}
m.sessions[sess.id] = sess
m.mu.Unlock()
sess.onStop = m.cfg.OnSessionStop
// OnSessionStart MUST fire before sess.start(). waitLoop calls
// OnSessionStop when the child exits — if the child exits before
// OnSessionStart runs, daemon would unmark an env root it never
// marked, then mark it forever after OnSessionStart races in. The
// "start hook happens-before stop hook" contract is what makes the
// daemon's markActiveEnvRoot / unmark pair balanced.
if m.cfg.OnSessionStart != nil {
m.cfg.OnSessionStart(sess)
}
sess.start()
return sess, nil
}
// Get returns the session with the given id, or ErrSessionNotFound.
func (m *Manager) Get(id string) (*PtySession, error) {
m.mu.Lock()
defer m.mu.Unlock()
sess, ok := m.sessions[id]
if !ok {
return nil, ErrSessionNotFound
}
return sess, nil
}
// Sessions returns a snapshot of currently registered session IDs.
func (m *Manager) Sessions() []string {
m.mu.Lock()
defer m.mu.Unlock()
ids := make([]string, 0, len(m.sessions))
for id := range m.sessions {
ids = append(ids, id)
}
return ids
}
// Close tears down every live session and refuses subsequent Open calls.
// Safe to call concurrently from multiple goroutines: the first caller
// runs the actual teardown, the rest block on closeDone until that
// teardown is fully observable. Every Close() return — first or Nth —
// thus carries the same "manager drained, every session finalized"
// guarantee that downstream GC/audit cleanup depends on.
func (m *Manager) Close() {
m.mu.Lock()
if m.closed {
done := m.closeDone
m.mu.Unlock()
<-done
return
}
m.closed = true
live := make([]*PtySession, 0, len(m.sessions))
for _, s := range m.sessions {
live = append(live, s)
}
m.mu.Unlock()
// Parallel: each session.Close blocks for the unix spawner's
// SIGHUP→grace→SIGKILL window. Running serially would multiply
// shutdown latency by N sessions. We additionally wait on each
// session's Done() so Manager.Close returning is a hard guarantee
// that every session finalized (output closed, deregistered, done
// fired) — downstream GC/audit cleanup relies on this.
var wg sync.WaitGroup
for _, s := range live {
wg.Add(1)
go func(s *PtySession) {
defer wg.Done()
s.Close("manager_shutdown")
<-s.Done()
}(s)
}
wg.Wait()
close(m.closeDone)
}
// CheckIdle walks every session and closes those whose idle interval
// has elapsed. The daemon's existing GC loop calls this periodically;
// each session also self-monitors via its own timer for cases where the
// outer loop runs at a coarser cadence than IdleTimeout.
func (m *Manager) CheckIdle() {
if m.cfg.IdleTimeout <= 0 {
return
}
m.mu.Lock()
sessions := make([]*PtySession, 0, len(m.sessions))
for _, s := range m.sessions {
sessions = append(sessions, s)
}
m.mu.Unlock()
now := m.cfg.Now()
for _, s := range sessions {
if now.Sub(s.LastIO()) >= m.cfg.IdleTimeout {
s.Close("idle_timeout")
}
}
}
func (m *Manager) deregister(id string) {
m.mu.Lock()
delete(m.sessions, id)
m.mu.Unlock()
}
func normalizeSize(cols, rows uint16) (uint16, uint16) {
if cols == 0 {
cols = 80
}
if rows == 0 {
rows = 24
}
return cols, rows
}
func buildEnv(info TaskInfo, userID string) []string {
env := []string{
"MULTICA_WORKSPACE_ID=" + info.WorkspaceID,
"MULTICA_TASK_ID=" + info.TaskID,
}
if info.IssueID != "" {
env = append(env, "MULTICA_ISSUE_ID="+info.IssueID)
}
if userID != "" {
env = append(env, "MULTICA_USER_ID="+userID)
}
if info.PriorSessionID != "" {
// Injected so `claude --resume $CLAUDE_SESSION_ID` continues the
// same session that the agent run was using (see RFC §Resume).
env = append(env, "CLAUDE_SESSION_ID="+info.PriorSessionID)
}
return env
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,35 @@
package terminal
import (
"io"
"time"
)
// PTY abstracts the platform PTY + child process so tests can swap in a
// fake without forking a real shell. Read returns child stdout/stderr;
// Write delivers stdin; Resize updates the window; Wait blocks for the
// child to exit and returns its exit code; Close terminates the child
// and releases the master fd.
type PTY interface {
io.ReadWriter
Resize(cols, rows uint16) error
Wait() (exitCode int, err error)
Close() error
}
// SpawnRequest is the input to Spawner.Start.
type SpawnRequest struct {
Shell string
Args []string
Cwd string
Env []string
Cols uint16
Rows uint16
Started time.Time
}
// Spawner creates new PTYs. Production uses realSpawner; tests inject a
// channel-backed fake (see fakePTY in manager_test.go).
type Spawner interface {
Start(SpawnRequest) (PTY, error)
}

View File

@@ -0,0 +1,283 @@
package terminal
import (
"errors"
"io"
"log/slog"
"sync"
"time"
)
// ExitInfo describes how a session terminated.
type ExitInfo struct {
ExitCode int
Reason string
}
// PtySession is a single live PTY + child shell. Methods are safe for
// concurrent use; readers consume from Output() and ExitC() until
// Output() is closed, which always follows an ExitC() send.
type PtySession struct {
id string
taskID string
workspaceID string
issueID string
workDir string
userID string
shellPath string
mu sync.Mutex
cols, rows uint16
pty PTY
output chan []byte
exit chan ExitInfo
done chan struct{}
stop chan struct{}
stopOnce sync.Once
closing bool
closeReason string
// wg tracks readLoop and idleLoop. waitLoop is the finalizer: it
// waits on wg before closing output/done so we never close the
// output channel while readLoop is mid-send.
wg sync.WaitGroup
now func() time.Time
idleTimeout time.Duration
startedAt time.Time
lastIO time.Time
logger *slog.Logger
onClose func(string)
onStop func(*PtySession)
}
// ID returns the session identifier.
func (s *PtySession) ID() string { return s.id }
// TaskID returns the task this session is bound to.
func (s *PtySession) TaskID() string { return s.taskID }
// WorkspaceID returns the workspace this session belongs to.
func (s *PtySession) WorkspaceID() string { return s.workspaceID }
// IssueID returns the issue this session was opened from, if any.
func (s *PtySession) IssueID() string { return s.issueID }
// WorkDir returns the cwd of the child shell.
func (s *PtySession) WorkDir() string { return s.workDir }
// UserID returns the human user who opened the session.
func (s *PtySession) UserID() string { return s.userID }
// Shell returns the shell binary path that was spawned.
func (s *PtySession) Shell() string { return s.shellPath }
// StartedAt returns the wall-clock time the session was spawned.
func (s *PtySession) StartedAt() time.Time { return s.startedAt }
// LastIO returns the most recent time data flowed in either direction.
func (s *PtySession) LastIO() time.Time {
s.mu.Lock()
defer s.mu.Unlock()
return s.lastIO
}
// Output yields PTY output chunks as they arrive. The channel closes
// after the child exits and a value has been delivered on ExitC().
func (s *PtySession) Output() <-chan []byte { return s.output }
// ExitC fires once when the child exits. After that, Output() closes.
func (s *PtySession) ExitC() <-chan ExitInfo { return s.exit }
// Done returns a channel closed when the session is fully torn down
// (all goroutines exited, registry deregistered).
func (s *PtySession) Done() <-chan struct{} { return s.done }
// Write forwards bytes to the PTY stdin. Returns the byte count actually
// written. Updates LastIO so idle detection sees the activity.
func (s *PtySession) Write(p []byte) (int, error) {
s.mu.Lock()
if s.closing {
s.mu.Unlock()
return 0, ErrSessionNotFound
}
pty := s.pty
s.lastIO = s.now()
s.mu.Unlock()
return pty.Write(p)
}
// Resize updates the PTY window size.
func (s *PtySession) Resize(cols, rows uint16) error {
cols, rows = normalizeSize(cols, rows)
s.mu.Lock()
if s.closing {
s.mu.Unlock()
return ErrSessionNotFound
}
s.cols = cols
s.rows = rows
pty := s.pty
s.lastIO = s.now()
s.mu.Unlock()
return pty.Resize(cols, rows)
}
// Size returns the current cols, rows of the PTY.
func (s *PtySession) Size() (uint16, uint16) {
s.mu.Lock()
defer s.mu.Unlock()
return s.cols, s.rows
}
// Close tears down the session. Subsequent calls are no-ops. The
// reason is recorded for audit logging and the terminal.exit payload.
//
// Close only initiates teardown — signals stop, closes the PTY, returns.
// waitLoop is the actual finalizer: it waits for readLoop + idleLoop
// to exit (via wg) before closing output/done. That ordering is what
// makes "Close while output buffer is full" safe — readLoop's blocked
// send unblocks on <-stop, and only then does the output channel close.
func (s *PtySession) Close(reason string) {
s.mu.Lock()
if s.closing {
s.mu.Unlock()
return
}
s.closing = true
s.closeReason = reason
pty := s.pty
s.mu.Unlock()
s.stopOnce.Do(func() { close(s.stop) })
if pty != nil {
// pty.Close on the unix spawner runs SIGHUP → grace → SIGKILL.
// It's idempotent (sync.Once), so the second call from waitLoop's
// finalizer is a no-op.
_ = pty.Close()
}
}
// start kicks off the reader, exit-watch, and (optional) idle
// goroutines. Manager.Open is the only caller. wg.Add runs
// synchronously before waitLoop is spawned so wg.Wait sees the
// correct count even if Close fires immediately.
func (s *PtySession) start() {
s.wg.Add(1)
go s.readLoop()
if s.idleTimeout > 0 {
s.wg.Add(1)
go s.idleLoop()
}
go s.waitLoop()
}
func (s *PtySession) readLoop() {
defer s.wg.Done()
buf := make([]byte, 4096)
for {
n, err := s.pty.Read(buf)
if n > 0 {
chunk := make([]byte, n)
copy(chunk, buf[:n])
s.mu.Lock()
s.lastIO = s.now()
s.mu.Unlock()
select {
case s.output <- chunk:
case <-s.stop:
return
}
}
if err != nil {
if !errors.Is(err, io.EOF) && err != io.ErrClosedPipe {
s.logger.Debug("pty read error", "err", err)
}
return
}
}
}
func (s *PtySession) waitLoop() {
code, waitErr := s.pty.Wait()
s.mu.Lock()
reason := s.closeReason
if reason == "" {
if waitErr != nil {
reason = "wait_error"
} else {
reason = "exited"
}
s.closeReason = reason
}
s.closing = true
s.mu.Unlock()
// Ensure the PTY fd is closed so readLoop's pty.Read returns EOF.
// pty.Close is idempotent (sync.Once on the unix spawner).
_ = s.pty.Close()
// Signal stop so idleLoop and any blocked send in readLoop exit.
s.stopOnce.Do(func() { close(s.stop) })
// Wait for readLoop + idleLoop before closing output/done. This is
// the invariant that prevents "send on closed channel" panics when
// output is full: readLoop is either past its send or unblocked via
// <-stop, but never racing with close(s.output).
s.wg.Wait()
// Finalize order is load-bearing: external waiters use `<-Done()` as
// a signal that the session is fully torn down AND deregistered from
// the manager. The sequence must be:
// ExitC → close(output) → onClose/deregister → close(done)
// so that any consumer doing `<-Done(); manager.Get(id)` after a
// teardown is guaranteed to observe ErrSessionNotFound.
select {
case s.exit <- ExitInfo{ExitCode: code, Reason: reason}:
default:
}
close(s.output)
if s.onClose != nil {
s.onClose(s.id)
}
if s.onStop != nil {
// Fires after deregister so a Manager.OnSessionStop callback sees a
// session that no longer appears in Sessions()/Get(); fires before
// close(done) so `<-Done()` implies the daemon's GC-unmark hook
// has already run.
s.onStop(s)
}
close(s.done)
}
func (s *PtySession) idleLoop() {
defer s.wg.Done()
// Sample at IdleTimeout/4 so reaction time is bounded but ticks
// stay cheap with many sessions. Manager.CheckIdle catches anything
// this loop misses (e.g. when daemon's outer GC tick is coarser).
interval := s.idleTimeout / 4
if interval < time.Second {
interval = time.Second
}
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-s.stop:
return
case <-t.C:
if s.now().Sub(s.LastIO()) >= s.idleTimeout {
// Close calls pty.Close + waits for wg in waitLoop. If
// we ran it inline, waitLoop's wg.Wait would block on
// this goroutine, which can't exit until Close returns
// — deadlock. Spawning lets idleLoop return and
// decrement wg.
go s.Close("idle_timeout")
return
}
}
}
}

View File

@@ -0,0 +1,102 @@
//go:build !windows
package terminal
import (
"fmt"
"os"
"os/exec"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/creack/pty"
)
// closeGracePeriod is the window between SIGHUP and SIGKILL during a
// Close. Long enough for interactive shells to run trap handlers and
// flush state; short enough that closing a tab feels instant.
const closeGracePeriod = 250 * time.Millisecond
// realSpawner forks the shell on a PTY using creack/pty. Linux/macOS
// only; Windows reaches the stub in spawner_windows.go and returns
// ErrUnsupportedOS.
type realSpawner struct{}
func (realSpawner) Start(req SpawnRequest) (PTY, error) {
cmd := exec.Command(req.Shell, req.Args...)
cmd.Dir = req.Cwd
// Inherit the daemon's PATH so users get whatever CLIs are installed
// in the daemon's environment (claude, codex, multica, etc.); merge
// in the per-session vars built by buildEnv.
env := os.Environ()
env = append(env, req.Env...)
cmd.Env = env
size := &pty.Winsize{Cols: req.Cols, Rows: req.Rows}
f, err := pty.StartWithSize(cmd, size)
if err != nil {
return nil, fmt.Errorf("pty.StartWithSize: %w", err)
}
return &unixPTY{cmd: cmd, file: f}, nil
}
type unixPTY struct {
cmd *exec.Cmd
file *os.File
exited atomic.Bool
closeOnce sync.Once
closeErr error
}
func (p *unixPTY) Read(b []byte) (int, error) { return p.file.Read(b) }
func (p *unixPTY) Write(b []byte) (int, error) { return p.file.Write(b) }
func (p *unixPTY) Resize(cols, rows uint16) error {
return pty.Setsize(p.file, &pty.Winsize{Cols: cols, Rows: rows})
}
func (p *unixPTY) Wait() (int, error) {
err := p.cmd.Wait()
p.exited.Store(true)
if p.cmd.ProcessState != nil {
return p.cmd.ProcessState.ExitCode(), err
}
return -1, err
}
// Close terminates the child shell and releases the PTY master fd.
// Closing a tab is a hangup, not an interrupt — so the signal path is
// SIGHUP → brief grace → SIGKILL → file.Close, in that order:
//
// - SIGHUP gives interactive shells a chance to run trap handlers,
// write history, etc. before the fd disappears.
// - The grace window is bounded; anything slower than that is stuck.
// - SIGKILL is the cliff for shells that ignore HUP.
// - file.Close releases the master fd last so the slave side keeps
// working during cleanup.
//
// Signals are sent to the negated pid so they hit the whole process
// group. creack/pty starts the child as a session leader (Setsid), so
// pid == pgid and any descendants the user spawned in the shell are
// caught by the same kill.
//
// If the child already exited naturally (Wait returned), all signal
// work is skipped — we only close the fd. That avoids a pointless
// 250ms sleep in the natural-exit teardown path.
func (p *unixPTY) Close() error {
p.closeOnce.Do(func() {
if p.cmd.Process != nil && !p.exited.Load() {
pid := p.cmd.Process.Pid
_ = syscall.Kill(-pid, syscall.SIGHUP)
time.Sleep(closeGracePeriod)
if !p.exited.Load() {
_ = syscall.Kill(-pid, syscall.SIGKILL)
}
}
p.closeErr = p.file.Close()
})
return p.closeErr
}

View File

@@ -0,0 +1,9 @@
//go:build windows
package terminal
// realSpawner on Windows always refuses — ConPty support is RFC P1 and
// the Desktop button + CLI both surface a clear error from this layer.
type realSpawner struct{}
func (realSpawner) Start(SpawnRequest) (PTY, error) { return nil, ErrUnsupportedOS }

View File

@@ -0,0 +1,316 @@
package daemon
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"log/slog"
"sync"
"github.com/multica-ai/multica/server/internal/daemon/terminal"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// terminalBridge adapts the daemon-side terminal.Manager to the daemonws
// WebSocket transport. Per session it:
//
// - relays PtySession.Output() → terminal.data frames (daemon→server)
// - relays PtySession.ExitC() → terminal.exit frames
// - tears the bridge goroutine down when Done() fires
//
// Two send paths are wired:
//
// - send (non-blocking): used for control / handshake frames that
// are safe to drop on backlog (terminal.opened, terminal.exit,
// terminal.error). Maps to Daemon.sendWSFrame.
// - sendCtx (blocking with ctx escape): used for PTY data frames so a
// saturated hub writer back-pressures the producer instead
// of corrupting the terminal byte stream. Maps to
// Daemon.sendWSFrameCtx.
type terminalBridge struct {
manager *terminal.Manager
logger *slog.Logger
send func([]byte) bool
sendCtx func(context.Context, []byte) bool
mu sync.Mutex
sessions map[string]*terminalRoute
}
type terminalRoute struct {
session *terminal.PtySession
cancel context.CancelFunc
pumpDone chan struct{}
}
func newTerminalBridge(mgr *terminal.Manager, logger *slog.Logger, send func([]byte) bool, sendCtx func(context.Context, []byte) bool) *terminalBridge {
return &terminalBridge{
manager: mgr,
logger: logger,
send: send,
sendCtx: sendCtx,
sessions: make(map[string]*terminalRoute),
}
}
// handleFrame dispatches a single terminal.* envelope from the server. The
// caller already decoded protocol.Message; we receive the inner type+payload.
func (b *terminalBridge) handleFrame(msgType string, payload json.RawMessage) {
switch msgType {
case protocol.MessageTypeTerminalOpen:
var p protocol.TerminalOpenPayload
if err := json.Unmarshal(payload, &p); err != nil {
b.logger.Debug("terminal.open invalid payload", "error", err)
return
}
b.handleOpen(p)
case protocol.MessageTypeTerminalData:
var p protocol.TerminalDataPayload
if err := json.Unmarshal(payload, &p); err != nil {
b.logger.Debug("terminal.data invalid payload", "error", err)
return
}
b.handleData(p)
case protocol.MessageTypeTerminalResize:
var p protocol.TerminalResizePayload
if err := json.Unmarshal(payload, &p); err != nil {
b.logger.Debug("terminal.resize invalid payload", "error", err)
return
}
b.handleResize(p)
case protocol.MessageTypeTerminalClose:
var p protocol.TerminalClosePayload
if err := json.Unmarshal(payload, &p); err != nil {
b.logger.Debug("terminal.close invalid payload", "error", err)
return
}
b.handleClose(p)
}
}
func (b *terminalBridge) handleOpen(p protocol.TerminalOpenPayload) {
info := terminal.TaskInfo{
TaskID: p.TaskID,
WorkspaceID: p.WorkspaceID,
IssueID: p.IssueID,
WorkDir: p.WorkDir,
PriorSessionID: p.PriorSessionID,
}
sess, err := b.manager.OpenWithInfo(context.Background(), info, terminal.OpenParams{
TaskID: p.TaskID,
WorkspaceID: p.WorkspaceID,
UserID: p.UserID,
Cols: p.Cols,
Rows: p.Rows,
})
if err != nil {
b.sendError(p.RequestID, "", mapTerminalError(err), err.Error())
return
}
ctx, cancel := context.WithCancel(context.Background())
pumpDone := make(chan struct{})
b.mu.Lock()
b.sessions[sess.ID()] = &terminalRoute{session: sess, cancel: cancel, pumpDone: pumpDone}
b.mu.Unlock()
b.sendFrame(protocol.MessageTypeTerminalOpened, protocol.TerminalOpenedPayload{
RequestID: p.RequestID,
SessionID: sess.ID(),
WorkDir: sess.WorkDir(),
Shell: sess.Shell(),
})
go func() {
defer close(pumpDone)
b.pump(ctx, sess)
}()
}
func (b *terminalBridge) handleData(p protocol.TerminalDataPayload) {
sess, err := b.manager.Get(p.SessionID)
if err != nil {
b.sendError("", p.SessionID, protocol.TerminalErrorCodeSessionNotFound, err.Error())
return
}
data, err := base64.StdEncoding.DecodeString(p.DataB64)
if err != nil {
b.logger.Debug("terminal.data invalid base64", "error", err, "session_id", p.SessionID)
return
}
if _, err := sess.Write(data); err != nil {
b.logger.Debug("terminal.data write failed", "error", err, "session_id", p.SessionID)
}
}
func (b *terminalBridge) handleResize(p protocol.TerminalResizePayload) {
sess, err := b.manager.Get(p.SessionID)
if err != nil {
b.sendError("", p.SessionID, protocol.TerminalErrorCodeSessionNotFound, err.Error())
return
}
if err := sess.Resize(p.Cols, p.Rows); err != nil {
b.logger.Debug("terminal.resize failed", "error", err, "session_id", p.SessionID)
}
}
func (b *terminalBridge) handleClose(p protocol.TerminalClosePayload) {
sess, err := b.manager.Get(p.SessionID)
if err != nil {
// Already gone — nothing to do; the server side has already received
// a terminal.exit frame (or will, through the pump goroutine).
return
}
reason := p.Reason
if reason == "" {
reason = "client_close"
}
sess.Close(reason)
}
// pump bridges one session's output channel onto the WS as terminal.data
// frames, and emits a terminal.exit when the child exits. Returns when
// either the session is fully torn down or ctx is cancelled.
//
// terminal.data is delivered with REAL backpressure (sendDataFrame blocks
// on a full hub writer). That is intentional: a saturated writer must
// slow the PTY reader down, not drop bytes — half-streams break shells
// far worse than a momentary lag. Heartbeat / control frames still go
// through the droppable send path because they are recoverable.
func (b *terminalBridge) pump(ctx context.Context, sess *terminal.PtySession) {
sessionID := sess.ID()
defer func() {
b.mu.Lock()
delete(b.sessions, sessionID)
b.mu.Unlock()
}()
for {
select {
case <-ctx.Done():
return
case chunk, ok := <-sess.Output():
if !ok {
// Output closed → child exited and waitLoop finalized.
// ExitC was already delivered (or about to be); pull it once
// non-blocking and forward, then exit the pump.
var info terminal.ExitInfo
select {
case info = <-sess.ExitC():
default:
}
b.sendFrame(protocol.MessageTypeTerminalExit, protocol.TerminalExitPayload{
SessionID: sessionID,
ExitCode: info.ExitCode,
Reason: info.Reason,
})
<-sess.Done()
return
}
if !b.sendDataFrame(ctx, sessionID, chunk) {
// ctx canceled (bridge being torn down) — bail. We don't
// emit a terminal.exit here: the teardown path on the
// caller side already accounts for the session going away.
return
}
}
}
}
// sendDataFrame is the backpressure-aware variant of sendFrame, used only
// for terminal.data. Returns false iff ctx was canceled mid-send (i.e.,
// the bridge is being torn down).
func (b *terminalBridge) sendDataFrame(ctx context.Context, sessionID string, chunk []byte) bool {
raw, err := json.Marshal(protocol.TerminalDataPayload{
SessionID: sessionID,
DataB64: base64.StdEncoding.EncodeToString(chunk),
})
if err != nil {
b.logger.Debug("terminal data payload marshal failed", "error", err, "session_id", sessionID)
return true
}
frame, err := json.Marshal(protocol.Message{Type: protocol.MessageTypeTerminalData, Payload: raw})
if err != nil {
b.logger.Debug("terminal data envelope marshal failed", "error", err, "session_id", sessionID)
return true
}
if b.sendCtx == nil {
// Defensive: pre-test bridges may not have plumbed sendCtx. Fall
// back to the non-blocking sender so existing tests still run.
_ = b.send(frame)
return true
}
return b.sendCtx(ctx, frame)
}
// closeAll tears down every live session. Called when the daemon
// disconnects from the server: the browser proxy will fail downstream,
// and a reconnect cannot resurrect the pre-existing PTYs because the
// session_ids only existed in the prior WS context.
//
// closeAll BLOCKS until every pump goroutine has actually exited. The
// wakeup loop relies on this guarantee: after closeAll returns, no pump
// goroutine can still be calling sendWSFrameCtx, so the wakeup loop can
// safely close the writes channel without racing producers.
func (b *terminalBridge) closeAll(reason string) {
b.mu.Lock()
routes := make([]*terminalRoute, 0, len(b.sessions))
for _, r := range b.sessions {
routes = append(routes, r)
}
b.mu.Unlock()
for _, r := range routes {
r.cancel()
r.session.Close(reason)
}
for _, r := range routes {
if r.pumpDone != nil {
<-r.pumpDone
}
}
}
func (b *terminalBridge) sendFrame(msgType string, payload any) {
raw, err := json.Marshal(payload)
if err != nil {
b.logger.Debug("terminal frame marshal failed", "error", err, "type", msgType)
return
}
frame, err := json.Marshal(protocol.Message{Type: msgType, Payload: raw})
if err != nil {
b.logger.Debug("terminal envelope marshal failed", "error", err, "type", msgType)
return
}
if !b.send(frame) {
b.logger.Debug("terminal frame dropped: ws disconnected or backed up", "type", msgType)
}
}
func (b *terminalBridge) sendError(requestID, sessionID, code, message string) {
b.sendFrame(protocol.MessageTypeTerminalError, protocol.TerminalErrorPayload{
RequestID: requestID,
SessionID: sessionID,
Code: code,
Message: message,
})
}
// mapTerminalError translates the terminal package's sentinel errors into
// protocol error codes the browser proxy can render. Anything we don't
// recognise falls back to TerminalErrorCodeInternal — drop information
// rather than surface internal wrap text to the user.
func mapTerminalError(err error) string {
switch {
case errors.Is(err, terminal.ErrWorkspaceMismatch):
return protocol.TerminalErrorCodeWorkspaceMismatch
case errors.Is(err, terminal.ErrTaskNotFound):
return protocol.TerminalErrorCodeTaskNotFound
case errors.Is(err, terminal.ErrSessionNotFound):
return protocol.TerminalErrorCodeSessionNotFound
case errors.Is(err, terminal.ErrUnsupportedOS):
return protocol.TerminalErrorCodeUnsupportedOS
case errors.Is(err, terminal.ErrSpawnFailed):
return protocol.TerminalErrorCodeSpawnFailed
}
return protocol.TerminalErrorCodeInternal
}

View File

@@ -0,0 +1,204 @@
package daemon
import (
"context"
"encoding/base64"
"encoding/json"
"log/slog"
"testing"
"time"
"github.com/multica-ai/multica/server/internal/daemon/terminal"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// openBridgeSession is the shared "open one terminal session through the
// bridge" helper for lifecycle tests: spawns a fake PTY, opens it via the
// bridge, waits for terminal.opened to come back, and returns the session
// id plus the fake PTY (so the test can push child output later).
func openBridgeSession(t *testing.T, bridge *terminalBridge, sender *captureSender, pty *fakeBridgePTY) string {
t.Helper()
openPayload, err := json.Marshal(protocol.TerminalOpenPayload{
RequestID: "req-bp",
TaskID: "task-bp",
WorkspaceID: "ws-bp",
WorkDir: t.TempDir(),
Cols: 80,
Rows: 24,
})
if err != nil {
t.Fatalf("marshal open: %v", err)
}
bridge.handleFrame(protocol.MessageTypeTerminalOpen, openPayload)
openedMsg := sender.waitFor(t, protocol.MessageTypeTerminalOpened, time.Second)
var opened protocol.TerminalOpenedPayload
if err := json.Unmarshal(openedMsg.Payload, &opened); err != nil {
t.Fatalf("opened payload: %v", err)
}
if opened.SessionID == "" {
t.Fatalf("expected non-empty session id")
}
return opened.SessionID
}
// TestTerminalBridge_DataBackpressureNoSilentDrop pins Phase 2 review
// blocker 2: terminal.data must NOT be silently dropped when the daemon's
// outbound WS queue is saturated. Instead, the pump back-pressures the
// PTY reader via a blocking send (with ctx escape), so the eventual
// reader still sees every byte.
//
// The shape of the test:
//
// - We use a writes channel of size 1 to mimic a hot, saturated hub.
// - sendCtx blocks on this channel (the real backpressure path).
// - The test pushes 4 PTY chunks into the session while the consumer
// is asleep — the pump cannot drop them.
// - The consumer then drains all 4 frames in order.
//
// If the bridge regresses to the old `default: drop` behavior, fewer
// than 4 chunks will be observed and the assertion fails.
func TestTerminalBridge_DataBackpressureNoSilentDrop(t *testing.T) {
writes := make(chan []byte, 1)
sendCtx := func(ctx context.Context, frame []byte) bool {
select {
case writes <- frame:
return true
case <-ctx.Done():
return false
}
}
pty := newFakeBridgePTY(80, 24)
spawner := &stubSpawner{pty: pty}
mgr := terminal.NewManager(terminal.ManagerConfig{
Spawner: spawner,
Logger: slog.Default(),
}, nil)
defer mgr.Close()
sender := &captureSender{}
bridge := newTerminalBridge(mgr, slog.Default(), sender.send, sendCtx)
sessionID := openBridgeSession(t, bridge, sender, pty)
// Push 4 chunks. The pump can only buffer 1 in the writes channel; the
// remainder must back-pressure via the PTY's bounded output channel
// (cap 4 in fakeBridgePTY). If any chunk were dropped instead of
// pressed back, the count below would fall short.
chunks := []string{"chunk-1\n", "chunk-2\n", "chunk-3\n", "chunk-4\n"}
for _, c := range chunks {
pty.out <- []byte(c)
}
// Drain writes; reassemble the data frames the pump emitted.
got := make([]string, 0, len(chunks))
deadline := time.Now().Add(2 * time.Second)
for len(got) < len(chunks) {
select {
case frame := <-writes:
var env protocol.Message
if err := json.Unmarshal(frame, &env); err != nil {
t.Fatalf("envelope: %v", err)
}
if env.Type != protocol.MessageTypeTerminalData {
continue
}
var dp protocol.TerminalDataPayload
if err := json.Unmarshal(env.Payload, &dp); err != nil {
t.Fatalf("data payload: %v", err)
}
if dp.SessionID != sessionID {
t.Fatalf("session_id mismatch: got %q want %q", dp.SessionID, sessionID)
}
decoded, err := base64.StdEncoding.DecodeString(dp.DataB64)
if err != nil {
t.Fatalf("decode: %v", err)
}
got = append(got, string(decoded))
case <-time.After(time.Until(deadline)):
t.Fatalf("only saw %d/%d chunks before timeout — backpressure regressed to silent drop", len(got), len(chunks))
}
}
for i, c := range chunks {
if got[i] != c {
t.Errorf("chunk %d: got %q, want %q", i, got[i], c)
}
}
}
// TestTerminalBridge_TeardownDoesNotPanicOnInFlightSend pins Phase 2
// review blocker 1: when the daemonws connection drops while a terminal
// pump is mid-send, the teardown must NOT cause `send on closed channel`.
// The required invariant is that bridge.closeAll cancels and *waits for*
// every pump goroutine before the wakeup loop closes the writes channel.
//
// This test models the wakeup loop's teardown sequence directly:
//
// 1. Wire a writes channel and a backpressure sendCtx, same as production.
// 2. Open a session and stall the pump on a full writes channel.
// 3. Run closeAll → close(writes) in the same goroutine, exactly the
// order wakeup.go now uses.
// 4. Assert: no panic, teardown completes within a tight deadline.
//
// Before the fix, closeAll returned while the pump was still inside its
// blocking send, and the subsequent close(writes) would panic the pump
// the moment select picked the closed channel.
func TestTerminalBridge_TeardownDoesNotPanicOnInFlightSend(t *testing.T) {
writes := make(chan []byte, 1)
sendCtx := func(ctx context.Context, frame []byte) bool {
select {
case writes <- frame:
return true
case <-ctx.Done():
return false
}
}
pty := newFakeBridgePTY(80, 24)
spawner := &stubSpawner{pty: pty}
mgr := terminal.NewManager(terminal.ManagerConfig{
Spawner: spawner,
Logger: slog.Default(),
}, nil)
defer mgr.Close()
sender := &captureSender{}
bridge := newTerminalBridge(mgr, slog.Default(), sender.send, sendCtx)
_ = openBridgeSession(t, bridge, sender, pty)
// Push two chunks so the pump has one in writes (queued) and one
// blocked on the next select. That blocked send is the exact race
// window the old defer order tripped over.
pty.out <- []byte("first\n")
pty.out <- []byte("second\n")
// Give the pump a moment to actually park on the blocking send.
time.Sleep(50 * time.Millisecond)
done := make(chan struct{})
go func() {
defer func() {
if r := recover(); r != nil {
t.Errorf("teardown panicked: %v", r)
}
close(done)
}()
// Mirror wakeup.go's folded cleanup defer:
// clearWSWrites equivalent → bridge.closeAll → close(writes)
bridge.closeAll("ws_disconnect")
close(writes)
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("teardown did not finish — closeAll likely did not wait for pump exit")
}
// Drain any residual frames so the writes channel close is observable
// and the test doesn't leak goroutines.
for range writes {
}
}

View File

@@ -0,0 +1,324 @@
package daemon
import (
"context"
"encoding/base64"
"encoding/json"
"log/slog"
"sync"
"testing"
"time"
"github.com/multica-ai/multica/server/internal/daemon/terminal"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// captureSender is the test stand-in for the daemon's outbound WS writer.
// Frames are kept in order so the test can wait for a specific message type
// to appear.
type captureSender struct {
mu sync.Mutex
frames [][]byte
}
func (c *captureSender) send(frame []byte) bool {
c.mu.Lock()
defer c.mu.Unlock()
cp := make([]byte, len(frame))
copy(cp, frame)
c.frames = append(c.frames, cp)
return true
}
// sendCtx is the backpressure-aware delivery path the terminal bridge uses
// for terminal.data. Tests that don't care about backpressure can use this
// default — it accepts every frame the same way send() does.
func (c *captureSender) sendCtx(ctx context.Context, frame []byte) bool {
if ctx.Err() != nil {
return false
}
return c.send(frame)
}
func (c *captureSender) waitFor(t *testing.T, msgType string, timeout time.Duration) protocol.Message {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
c.mu.Lock()
for _, f := range c.frames {
var m protocol.Message
if err := json.Unmarshal(f, &m); err == nil && m.Type == msgType {
c.mu.Unlock()
return m
}
}
c.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("timeout waiting for frame of type %q (saw %d frames)", msgType, len(c.frames))
return protocol.Message{}
}
// fakeBridgePTY is a minimal PTY for the bridge integration: it lets the
// test push child output and read writes back. Wait blocks until Close.
type fakeBridgePTY struct {
out chan []byte
mu sync.Mutex
written []byte
cols uint16
rows uint16
closeCh chan struct{}
exit int
waitOnce sync.Once
waitDone chan struct{}
}
func newFakeBridgePTY(cols, rows uint16) *fakeBridgePTY {
return &fakeBridgePTY{
out: make(chan []byte, 4),
cols: cols,
rows: rows,
closeCh: make(chan struct{}),
waitDone: make(chan struct{}),
}
}
func (p *fakeBridgePTY) Read(b []byte) (int, error) {
select {
case chunk, ok := <-p.out:
if !ok {
return 0, errEOF
}
n := copy(b, chunk)
return n, nil
case <-p.closeCh:
return 0, errEOF
}
}
func (p *fakeBridgePTY) Write(b []byte) (int, error) {
p.mu.Lock()
defer p.mu.Unlock()
p.written = append(p.written, b...)
return len(b), nil
}
func (p *fakeBridgePTY) Resize(cols, rows uint16) error {
p.mu.Lock()
defer p.mu.Unlock()
p.cols = cols
p.rows = rows
return nil
}
func (p *fakeBridgePTY) Close() error {
p.waitOnce.Do(func() {
close(p.closeCh)
close(p.waitDone)
})
return nil
}
func (p *fakeBridgePTY) Wait() (int, error) {
<-p.waitDone
return p.exit, nil
}
type stringErr string
func (e stringErr) Error() string { return string(e) }
const errEOF = stringErr("EOF")
func TestTerminalBridge_OpenSendsOpenedFrameWithServerSuppliedWorkdir(t *testing.T) {
tmp := t.TempDir()
pty := newFakeBridgePTY(80, 24)
spawner := &stubSpawner{pty: pty}
mgr := terminal.NewManager(terminal.ManagerConfig{
Spawner: spawner,
Logger: slog.Default(),
}, nil)
defer mgr.Close()
sender := &captureSender{}
bridge := newTerminalBridge(mgr, slog.Default(), sender.send, sender.sendCtx)
openPayload, err := json.Marshal(protocol.TerminalOpenPayload{
RequestID: "req-1",
TaskID: "task-via-ws",
WorkspaceID: "ws-A",
UserID: "user-1",
IssueID: "issue-1",
WorkDir: tmp,
PriorSessionID: "claude-xyz",
Cols: 120,
Rows: 30,
})
if err != nil {
t.Fatalf("marshal open: %v", err)
}
bridge.handleFrame(protocol.MessageTypeTerminalOpen, openPayload)
openedMsg := sender.waitFor(t, protocol.MessageTypeTerminalOpened, time.Second)
var opened protocol.TerminalOpenedPayload
if err := json.Unmarshal(openedMsg.Payload, &opened); err != nil {
t.Fatalf("opened payload: %v", err)
}
if opened.RequestID != "req-1" {
t.Errorf("opened.request_id = %q, want req-1", opened.RequestID)
}
if opened.SessionID == "" {
t.Errorf("opened.session_id is empty")
}
if opened.WorkDir != tmp {
t.Errorf("opened.work_dir = %q, want %q", opened.WorkDir, tmp)
}
}
func TestTerminalBridge_OpenWithoutWorkdirEmitsTaskNotFound(t *testing.T) {
// The server is supposed to resolve task.work_dir from its DB before
// forwarding terminal.open. If it forgets / fails, the daemon must
// not silently fall through to spawning bash in CWD — it has to
// surface a structured terminal.error and never call the spawner.
pty := newFakeBridgePTY(80, 24)
spawner := &stubSpawner{pty: pty}
mgr := terminal.NewManager(terminal.ManagerConfig{
Spawner: spawner,
Logger: slog.Default(),
}, nil)
defer mgr.Close()
sender := &captureSender{}
bridge := newTerminalBridge(mgr, slog.Default(), sender.send, sender.sendCtx)
openPayload, _ := json.Marshal(protocol.TerminalOpenPayload{
RequestID: "req-2",
TaskID: "task-evil",
WorkspaceID: "ws-B",
WorkDir: "", // server failed to resolve
Cols: 80,
Rows: 24,
})
bridge.handleFrame(protocol.MessageTypeTerminalOpen, openPayload)
errMsg := sender.waitFor(t, protocol.MessageTypeTerminalError, time.Second)
var errPayload protocol.TerminalErrorPayload
if err := json.Unmarshal(errMsg.Payload, &errPayload); err != nil {
t.Fatalf("error payload: %v", err)
}
if errPayload.Code != protocol.TerminalErrorCodeTaskNotFound {
t.Errorf("error code = %q, want %q", errPayload.Code, protocol.TerminalErrorCodeTaskNotFound)
}
if errPayload.RequestID != "req-2" {
t.Errorf("error request_id = %q, want req-2", errPayload.RequestID)
}
if spawner.callCount() != 0 {
t.Errorf("spawner was invoked %d times despite resolve failure", spawner.callCount())
}
}
func TestTerminalBridge_DataAndExitRoundTrip(t *testing.T) {
tmp := t.TempDir()
pty := newFakeBridgePTY(80, 24)
spawner := &stubSpawner{pty: pty}
mgr := terminal.NewManager(terminal.ManagerConfig{
Spawner: spawner,
Logger: slog.Default(),
}, nil)
defer mgr.Close()
sender := &captureSender{}
bridge := newTerminalBridge(mgr, slog.Default(), sender.send, sender.sendCtx)
openPayload, _ := json.Marshal(protocol.TerminalOpenPayload{
RequestID: "req-3",
TaskID: "task-3",
WorkspaceID: "ws-A",
WorkDir: tmp,
Cols: 80,
Rows: 24,
})
bridge.handleFrame(protocol.MessageTypeTerminalOpen, openPayload)
openedMsg := sender.waitFor(t, protocol.MessageTypeTerminalOpened, time.Second)
var opened protocol.TerminalOpenedPayload
_ = json.Unmarshal(openedMsg.Payload, &opened)
sessionID := opened.SessionID
// Push child output → bridge should emit terminal.data on the WS.
pty.out <- []byte("hello\n")
dataMsg := sender.waitFor(t, protocol.MessageTypeTerminalData, time.Second)
var dp protocol.TerminalDataPayload
if err := json.Unmarshal(dataMsg.Payload, &dp); err != nil {
t.Fatalf("data payload: %v", err)
}
if dp.SessionID != sessionID {
t.Errorf("data session_id = %q, want %q", dp.SessionID, sessionID)
}
decoded, err := base64.StdEncoding.DecodeString(dp.DataB64)
if err != nil {
t.Fatalf("decode data: %v", err)
}
if string(decoded) != "hello\n" {
t.Errorf("data bytes = %q, want %q", decoded, "hello\n")
}
// Send a write the other direction. The bridge should base64-decode
// and call PTY.Write.
inboundData, _ := json.Marshal(protocol.TerminalDataPayload{
SessionID: sessionID,
DataB64: base64.StdEncoding.EncodeToString([]byte("ls\n")),
})
bridge.handleFrame(protocol.MessageTypeTerminalData, inboundData)
// Allow Write to settle.
deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) {
pty.mu.Lock()
got := string(pty.written)
pty.mu.Unlock()
if got == "ls\n" {
break
}
time.Sleep(10 * time.Millisecond)
}
pty.mu.Lock()
if string(pty.written) != "ls\n" {
t.Errorf("PTY received %q, want %q", pty.written, "ls\n")
}
pty.mu.Unlock()
// Close from the client side. The bridge should propagate via
// session.Close → waitLoop → terminal.exit.
closePayload, _ := json.Marshal(protocol.TerminalClosePayload{
SessionID: sessionID,
Reason: "test",
})
bridge.handleFrame(protocol.MessageTypeTerminalClose, closePayload)
sender.waitFor(t, protocol.MessageTypeTerminalExit, time.Second)
}
// stubSpawner returns a single pre-built PTY on the first Start. callCount
// lets tests assert that no spawn happened on a reject path.
type stubSpawner struct {
pty *fakeBridgePTY
mu sync.Mutex
calls int
}
func (s *stubSpawner) Start(_ terminal.SpawnRequest) (terminal.PTY, error) {
s.mu.Lock()
s.calls++
s.mu.Unlock()
return s.pty, nil
}
func (s *stubSpawner) callCount() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.calls
}

View File

@@ -116,6 +116,12 @@ func (d *Daemon) runTaskWakeupConnection(ctx context.Context, runtimeIDs []strin
writerDone := make(chan struct{})
go d.runWSWriter(conn, writes, writerDone)
// Expose this connection's writer to the terminal bridge so it can push
// terminal.* frames back to the server. The single cleanup defer below
// clears the pointer (and tears any live PTYs down) BEFORE the writes
// channel is closed, so terminal pumps cannot panic on a closed send.
d.installWSWrites(writes)
heartbeatCtx, cancelHeartbeat := context.WithCancel(ctx)
hbDone := make(chan struct{})
go func() {
@@ -128,19 +134,26 @@ func (d *Daemon) runTaskWakeupConnection(ctx context.Context, runtimeIDs []strin
errCh <- d.readTaskWakeupMessages(conn, taskWakeups)
}()
// Defer cleanup must shut goroutines down in this order:
// 1. cancel the heartbeat sender's ctx
// 2. wait for the sender to actually return — only then is it safe
// to close the writes channel without a "send on closed channel"
// panic from sendWSHeartbeats
// 3. close writes; the writer drains and exits
// 4. wait for the writer to finish so it doesn't outlive the conn
// Teardown ordering is load-bearing — every step here has a producer
// that would panic on a closed `writes` channel if reordered:
//
// LIFO defer order would close writes before the sender stops, so the
// teardown is folded into a single deferred function instead.
// 1. cancel the heartbeat sender's ctx and wait for it to return.
// Heartbeats are the only droppable producer; once the sender
// has exited it can no longer reach the writes channel.
// 2. clearWSWrites: nil the pointer (new sendWSFrame calls bounce)
// AND tear down the terminal bridge. closeAll blocks until every
// pump goroutine has exited — that's the barrier the terminal
// pumps need before we close the channel they share.
// 3. close(writes): only safe now that no producer remains.
// 4. wait for the writer goroutine to drain & exit so it doesn't
// outlive the conn.
//
// Two separate defers would run LIFO and reverse 2↔3, which is the
// exact race Phase 2 review caught.
defer func() {
cancelHeartbeat()
<-hbDone
d.clearWSWrites()
close(writes)
<-writerDone
}()
@@ -287,6 +300,13 @@ func (d *Daemon) readTaskWakeupMessages(conn *websocket.Conn, taskWakeups chan<-
continue
}
d.handleWSHeartbeatAck(context.Background(), &ack)
case protocol.MessageTypeTerminalOpen,
protocol.MessageTypeTerminalData,
protocol.MessageTypeTerminalResize,
protocol.MessageTypeTerminalClose:
if bridge := d.currentTerminalBridge(); bridge != nil {
bridge.handleFrame(msg.Type, msg.Payload)
}
}
}
}

View File

@@ -82,6 +82,9 @@ type Hub struct {
hbMu sync.RWMutex
onHeartbeat HeartbeatHandler
termMu sync.RWMutex
termRouter *TerminalRouter
}
func NewHub() *Hub {
@@ -146,7 +149,12 @@ func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request, identity C
c := &client{
hub: h,
conn: conn,
send: make(chan []byte, 16),
// Buffer sized for PTY traffic: terminal.data frames carry up to a
// few KB each and the read pump from xterm.js can fall behind during
// large bursts (build output, `cat` of a big file). 256 frames gives
// ~1MB of slack before we start dropping; wakeup hints and heartbeat
// acks coexist on the same queue but are tiny in comparison.
send: make(chan []byte, 256),
identity: identity,
runtimes: runtimes,
}
@@ -320,7 +328,12 @@ func (c *client) readPump() {
c.conn.Close()
}()
c.conn.SetReadLimit(4096)
// Terminal frames embed base64-encoded PTY chunks; readLoop on the
// daemon side caps each chunk at 4KB raw, which is ~5.6KB base64 plus
// JSON envelope. 64KB leaves headroom for future growth without
// disconnecting daemons that briefly exceed the legacy 4KB heartbeat-
// only ceiling.
c.conn.SetReadLimit(64 * 1024)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
@@ -348,6 +361,14 @@ func (c *client) handleFrame(raw []byte) {
switch msg.Type {
case protocol.EventDaemonHeartbeat:
c.handleHeartbeatFrame(msg.Payload)
case protocol.MessageTypeTerminalOpened,
protocol.MessageTypeTerminalData,
protocol.MessageTypeTerminalClose,
protocol.MessageTypeTerminalExit,
protocol.MessageTypeTerminalError:
if router := c.hub.terminalRouter(); router != nil {
router.Route(raw, msg.Type, msg.Payload)
}
default:
// Unknown app messages are intentionally ignored for forward
// compatibility with future daemon → server message types.

View File

@@ -0,0 +1,196 @@
package daemonws
import (
"encoding/json"
"errors"
"log/slog"
"sync"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// TerminalSink receives terminal.* frames addressed to a single browser
// WebSocket connection. The handler implementation owns the frame queue;
// implementations must be non-blocking — the hub drops the frame if Deliver
// returns false rather than back up the daemon read pump.
type TerminalSink interface {
Deliver(frame []byte) bool
}
// TerminalRouter is the daemonws-side multiplex for terminal.* frames coming
// back from a daemon. Browser proxy connections register under their pending
// request_id and, after terminal.opened arrives, re-register under the
// session_id the daemon picked.
type TerminalRouter struct {
mu sync.RWMutex
sinks map[string]TerminalSink
}
// NewTerminalRouter constructs an empty router. The Hub owns the only
// instance in production; tests can build their own.
func NewTerminalRouter() *TerminalRouter {
return &TerminalRouter{sinks: make(map[string]TerminalSink)}
}
// Register installs sink under the given key. The key is either a
// request_id (before the daemon assigns a session_id) or a session_id
// (after the open ack). Re-registering an existing key replaces the sink.
func (r *TerminalRouter) Register(key string, sink TerminalSink) {
if r == nil || key == "" || sink == nil {
return
}
r.mu.Lock()
r.sinks[key] = sink
r.mu.Unlock()
}
// Unregister removes the sink for key, if any.
func (r *TerminalRouter) Unregister(key string) {
if r == nil || key == "" {
return
}
r.mu.Lock()
delete(r.sinks, key)
r.mu.Unlock()
}
// Route extracts the routing key (request_id or session_id) from a
// terminal.* frame and forwards the raw frame to the registered sink.
// Unknown keys are dropped silently — the daemon-side session ultimately
// observes the dead client via send-side errors / idle timeout.
func (r *TerminalRouter) Route(frame []byte, msgType string, payload json.RawMessage) {
if r == nil {
return
}
key := terminalRouteKey(msgType, payload)
if key == "" {
return
}
r.mu.RLock()
sink := r.sinks[key]
r.mu.RUnlock()
if sink == nil {
return
}
if !sink.Deliver(frame) {
slog.Debug("daemon ws terminal frame dropped: slow sink", "type", msgType, "key", key)
}
}
func terminalRouteKey(msgType string, payload json.RawMessage) string {
switch msgType {
case protocol.MessageTypeTerminalOpened, protocol.MessageTypeTerminalError:
var p struct {
RequestID string `json:"request_id,omitempty"`
SessionID string `json:"session_id,omitempty"`
}
if err := json.Unmarshal(payload, &p); err != nil {
return ""
}
// terminal.error may carry either request_id (pre-open failure) or
// session_id (post-open failure). Prefer request_id so the proxy
// receives the failure on the same key it registered.
if p.RequestID != "" {
return p.RequestID
}
return p.SessionID
case protocol.MessageTypeTerminalData,
protocol.MessageTypeTerminalClose,
protocol.MessageTypeTerminalExit:
var p struct {
SessionID string `json:"session_id"`
}
if err := json.Unmarshal(payload, &p); err != nil {
return ""
}
return p.SessionID
}
return ""
}
// SetTerminalRouter installs an explicit router. Tests use this to inject a
// fake; production code can rely on the auto-created router exposed by
// TerminalRouter() below.
func (h *Hub) SetTerminalRouter(r *TerminalRouter) {
if h == nil {
return
}
h.termMu.Lock()
h.termRouter = r
h.termMu.Unlock()
}
// TerminalRouter returns the hub's router, creating one on first access.
// The browser proxy WS handler grabs this to register per-session sinks;
// production wiring does not need an explicit SetTerminalRouter call.
func (h *Hub) TerminalRouter() *TerminalRouter {
if h == nil {
return nil
}
h.termMu.RLock()
r := h.termRouter
h.termMu.RUnlock()
if r != nil {
return r
}
h.termMu.Lock()
defer h.termMu.Unlock()
if h.termRouter == nil {
h.termRouter = NewTerminalRouter()
}
return h.termRouter
}
// terminalRouter is the internal read-only accessor used by handleFrame.
// It returns nil if no router has been configured, which short-circuits
// dispatch — the auto-create only happens through the public accessor.
func (h *Hub) terminalRouter() *TerminalRouter {
h.termMu.RLock()
defer h.termMu.RUnlock()
return h.termRouter
}
// ErrNoDaemonForRuntime is returned by SendToRuntime when no daemon is
// currently connected for the given runtime_id. The browser proxy uses this
// to fail the open request with a clear error.
var ErrNoDaemonForRuntime = errors.New("daemonws: no daemon connected for runtime")
// SendToRuntime delivers a raw frame to one daemon connection serving
// runtimeID. If multiple daemons are registered (rare — usually one per
// runtime), the first one wins. Returns ErrNoDaemonForRuntime when no
// connection exists, or a "buffer full" error when the daemon's outbound
// queue is saturated — callers should surface that as a transient failure
// to the browser rather than retrying tightly.
func (h *Hub) SendToRuntime(runtimeID string, frame []byte) error {
if h == nil || runtimeID == "" {
return ErrNoDaemonForRuntime
}
h.mu.RLock()
var target *client
for c := range h.byRuntime[runtimeID] {
target = c
break
}
h.mu.RUnlock()
if target == nil {
return ErrNoDaemonForRuntime
}
if !target.trySend(frame) {
return errors.New("daemonws: daemon send buffer full")
}
return nil
}
// trySend pushes frame onto the client's outbound queue without blocking.
// Returns false if the buffer is saturated. We deliberately do not evict
// the connection here — terminal back-pressure should slow the producing
// browser/server side, not tear down the entire daemonws connection (which
// would also break heartbeat + wakeup delivery for unrelated runtimes).
func (c *client) trySend(frame []byte) bool {
select {
case c.send <- frame:
return true
default:
return false
}
}

View File

@@ -0,0 +1,95 @@
package daemonws
import (
"encoding/json"
"testing"
"github.com/multica-ai/multica/server/pkg/protocol"
)
type collectingSink struct {
frames [][]byte
}
func (s *collectingSink) Deliver(frame []byte) bool {
cp := make([]byte, len(frame))
copy(cp, frame)
s.frames = append(s.frames, cp)
return true
}
func TestTerminalRouter_RoutesByRequestThenSession(t *testing.T) {
router := NewTerminalRouter()
sink := &collectingSink{}
router.Register("req-1", sink)
// terminal.error before the daemon picked a session_id: routed on
// request_id. This is the failure path browsers see when the daemon
// can't spawn a PTY (e.g. ErrUnsupportedOS on windows).
errFrame := mustEncode(t, protocol.MessageTypeTerminalError, protocol.TerminalErrorPayload{
RequestID: "req-1",
Code: protocol.TerminalErrorCodeUnsupportedOS,
Message: "no PTY on windows",
})
router.Route(errFrame, protocol.MessageTypeTerminalError, mustPayload(t, errFrame))
if got, want := len(sink.frames), 1; got != want {
t.Fatalf("delivered = %d, want %d", got, want)
}
// Re-key to session_id after a hypothetical terminal.opened.
router.Register("sess-1", sink)
router.Unregister("req-1")
dataFrame := mustEncode(t, protocol.MessageTypeTerminalData, protocol.TerminalDataPayload{
SessionID: "sess-1",
DataB64: "Zm9vYmFy",
})
router.Route(dataFrame, protocol.MessageTypeTerminalData, mustPayload(t, dataFrame))
if got, want := len(sink.frames), 2; got != want {
t.Fatalf("after data delivered = %d, want %d", got, want)
}
// Frames for an unknown session must drop silently — never panic, and
// never leak into the wrong sink.
strayFrame := mustEncode(t, protocol.MessageTypeTerminalExit, protocol.TerminalExitPayload{
SessionID: "sess-2-unknown",
ExitCode: 0,
})
router.Route(strayFrame, protocol.MessageTypeTerminalExit, mustPayload(t, strayFrame))
if got, want := len(sink.frames), 2; got != want {
t.Fatalf("stray frame delivered to wrong sink: %d", got)
}
}
func TestTerminalRouter_UnknownSessionDropsSilently(t *testing.T) {
router := NewTerminalRouter()
frame := mustEncode(t, protocol.MessageTypeTerminalData, protocol.TerminalDataPayload{
SessionID: "ghost",
DataB64: "Zm9v",
})
// Should not panic / not deliver anywhere.
router.Route(frame, protocol.MessageTypeTerminalData, mustPayload(t, frame))
}
func mustEncode(t *testing.T, msgType string, payload any) []byte {
t.Helper()
raw, err := json.Marshal(payload)
if err != nil {
t.Fatalf("marshal payload: %v", err)
}
frame, err := json.Marshal(protocol.Message{Type: msgType, Payload: raw})
if err != nil {
t.Fatalf("marshal envelope: %v", err)
}
return frame
}
func mustPayload(t *testing.T, envelope []byte) json.RawMessage {
t.Helper()
var m protocol.Message
if err := json.Unmarshal(envelope, &m); err != nil {
t.Fatalf("unmarshal envelope: %v", err)
}
return m.Payload
}

View File

@@ -1927,6 +1927,93 @@ func (h *Handler) ListTasksByIssue(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, resp)
}
// TerminalSessionResponse is the JSON shape behind GET
// /api/issues/{id}/terminal-sessions. Mirrors terminal_sessions row fields
// with the timestamp formatting the rest of the API uses (RFC3339 strings
// for clients that parse loosely; pointers for nullable columns).
type TerminalSessionResponse struct {
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
IssueID string `json:"issue_id"`
TaskID string `json:"task_id"`
RuntimeID string `json:"runtime_id,omitempty"`
UserID string `json:"user_id"`
WorkDir string `json:"work_dir"`
Shell string `json:"shell,omitempty"`
StartedAt string `json:"started_at"`
EndedAt *string `json:"ended_at"`
ExitCode *int32 `json:"exit_code"`
CloseReason string `json:"close_reason,omitempty"`
// Status is derived: "active" while the row's ended_at is NULL,
// "closed" otherwise. We expose it explicitly so the CLI's `issue
// runs` table — which shares its STATUS column with agent task rows —
// can render terminal entries without bespoke logic.
Status string `json:"status"`
// Kind discriminates these rows from agent task rows in clients that
// merge both feeds (notably `multica issue runs`).
Kind string `json:"kind"`
}
// ListTerminalSessionsByIssue returns the audit log of interactive PTYs
// opened against the issue's tasks. The CLI's `issue runs` view fetches
// this alongside /task-runs and merges them by timestamp so a `multica
// issue runs` listing shows both agent runs and `type=terminal` rows.
func (h *Handler) ListTerminalSessionsByIssue(w http.ResponseWriter, r *http.Request) {
issueID := chi.URLParam(r, "id")
issue, ok := h.loadIssueForUser(w, r, issueID)
if !ok {
return
}
sessions, err := h.Queries.ListTerminalSessionsByIssue(r.Context(), issue.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list terminal sessions")
return
}
resp := make([]TerminalSessionResponse, len(sessions))
for i, s := range sessions {
resp[i] = terminalSessionToResponse(s)
}
writeJSON(w, http.StatusOK, resp)
}
func terminalSessionToResponse(s db.TerminalSession) TerminalSessionResponse {
status := "active"
var endedAt *string
var exitCode *int32
if s.EndedAt.Valid {
status = "closed"
stamp := s.EndedAt.Time.UTC().Format(time.RFC3339)
endedAt = &stamp
}
if s.ExitCode.Valid {
v := s.ExitCode.Int32
exitCode = &v
}
runtimeID := ""
if s.RuntimeID.Valid {
runtimeID = uuidToString(s.RuntimeID)
}
return TerminalSessionResponse{
ID: uuidToString(s.ID),
WorkspaceID: uuidToString(s.WorkspaceID),
IssueID: uuidToString(s.IssueID),
TaskID: uuidToString(s.TaskID),
RuntimeID: runtimeID,
UserID: uuidToString(s.UserID),
WorkDir: s.WorkDir,
Shell: s.Shell,
StartedAt: s.StartedAt.Time.UTC().Format(time.RFC3339),
EndedAt: endedAt,
ExitCode: exitCode,
CloseReason: s.CloseReason,
Status: status,
Kind: "terminal",
}
}
// ListTaskMessagesByUser returns task messages for a task.
// Used by the frontend under regular user auth (not daemon auth).
// Verifies the task belongs to the caller's workspace.

View File

@@ -0,0 +1,84 @@
package handler
import (
"testing"
"time"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
// TestTerminalSessionToResponse_Active validates the shape returned for a
// session whose ended_at is still NULL — the CLI's `issue runs` merge
// expects status="active" and a nil ended_at pointer (not the zero
// timestamp string, which would render as a real completion in the
// table).
func TestTerminalSessionToResponse_Active(t *testing.T) {
started := time.Date(2026, 5, 16, 10, 30, 0, 0, time.UTC)
s := db.TerminalSession{
ID: util.MustParseUUID("11111111-1111-1111-1111-111111111111"),
WorkspaceID: util.MustParseUUID("22222222-2222-2222-2222-222222222222"),
IssueID: util.MustParseUUID("33333333-3333-3333-3333-333333333333"),
TaskID: util.MustParseUUID("44444444-4444-4444-4444-444444444444"),
UserID: util.MustParseUUID("55555555-5555-5555-5555-555555555555"),
WorkDir: "/tmp/ws/task/workdir",
Shell: "/bin/bash",
StartedAt: pgtype.Timestamptz{Time: started, Valid: true},
}
resp := terminalSessionToResponse(s)
if resp.Status != "active" {
t.Errorf("Status = %q, want active", resp.Status)
}
if resp.Kind != "terminal" {
t.Errorf("Kind = %q, want terminal", resp.Kind)
}
if resp.EndedAt != nil {
t.Errorf("EndedAt = %v, want nil", *resp.EndedAt)
}
if resp.ExitCode != nil {
t.Errorf("ExitCode = %v, want nil", *resp.ExitCode)
}
if resp.StartedAt != started.Format(time.RFC3339) {
t.Errorf("StartedAt = %q, want %q", resp.StartedAt, started.Format(time.RFC3339))
}
}
// TestTerminalSessionToResponse_Closed validates the closed-session shape:
// status flips to "closed", ended_at is a populated pointer, exit_code
// surfaces the signed int. close_reason rides through verbatim so the CLI
// can display it in the ERROR column for terminal rows.
func TestTerminalSessionToResponse_Closed(t *testing.T) {
started := time.Date(2026, 5, 16, 10, 30, 0, 0, time.UTC)
ended := started.Add(15 * time.Minute)
s := db.TerminalSession{
ID: util.MustParseUUID("11111111-1111-1111-1111-111111111111"),
WorkspaceID: util.MustParseUUID("22222222-2222-2222-2222-222222222222"),
IssueID: util.MustParseUUID("33333333-3333-3333-3333-333333333333"),
TaskID: util.MustParseUUID("44444444-4444-4444-4444-444444444444"),
UserID: util.MustParseUUID("55555555-5555-5555-5555-555555555555"),
WorkDir: "/tmp/ws/task/workdir",
StartedAt: pgtype.Timestamptz{Time: started, Valid: true},
EndedAt: pgtype.Timestamptz{Time: ended, Valid: true},
ExitCode: pgtype.Int4{Int32: 130, Valid: true},
CloseReason: "idle_timeout",
}
resp := terminalSessionToResponse(s)
if resp.Status != "closed" {
t.Errorf("Status = %q, want closed", resp.Status)
}
if resp.EndedAt == nil || *resp.EndedAt != ended.Format(time.RFC3339) {
t.Errorf("EndedAt = %v, want %q", resp.EndedAt, ended.Format(time.RFC3339))
}
if resp.ExitCode == nil || *resp.ExitCode != 130 {
t.Errorf("ExitCode = %v, want 130", resp.ExitCode)
}
if resp.CloseReason != "idle_timeout" {
t.Errorf("CloseReason = %q, want idle_timeout", resp.CloseReason)
}
}

View File

@@ -0,0 +1,758 @@
package handler
import (
"context"
"encoding/json"
"errors"
"log/slog"
"net/http"
"strconv"
"sync"
"time"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/auth"
"github.com/multica-ai/multica/server/internal/daemonws"
"github.com/multica-ai/multica/server/internal/realtime"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
"github.com/golang-jwt/jwt/v5"
)
// terminalWriteWait caps how long a single WriteMessage may block before
// we tear the browser connection down as slow-client. Matches the daemonws
// hub's writeWait so back-pressure semantics are consistent end-to-end.
const terminalWriteWait = 10 * time.Second
// terminalOpenTimeout caps how long the proxy waits for the daemon to
// respond to a terminal.open request with terminal.opened or terminal.error.
// 5s is generous: PTY spawn is local and synchronous on the daemon side.
const terminalOpenTimeout = 5 * time.Second
// terminalUpgrader reuses the realtime hub's origin allowlist. The terminal
// endpoint executes a shell on the daemon, so it must be at least as strict
// about cross-origin connections as the read-only realtime WS — using the
// shared CheckOrigin keeps the policy in one place and prevents an
// accidentally permissive `CheckOrigin: true` from sneaking past review.
var terminalUpgrader = websocket.Upgrader{
CheckOrigin: realtime.CheckOrigin,
}
// HandleIssueTerminalWS proxies a browser WebSocket onto a PTY running on
// the daemon hosting the issue's most-recent agent task. The flow per
// connection:
//
// 1. Authenticate the user (cookie JWT preferred; first-message auth as
// fallback for clients that cannot set cookies — e.g. some Desktop
// dev modes).
// 2. Resolve issue → workspace → latest task with a non-empty work_dir +
// runtime_id. Fail closed if no such task exists; users see a clear
// "no task to attach to" error instead of a silent hang.
// 3. Register a sink on the daemonws TerminalRouter under a fresh
// request_id, then send terminal.open to the daemon.
// 4. On terminal.opened: re-register the sink under the session_id the
// daemon picked, drop the request_id route, and start the bidirectional
// pump until either side closes.
// 5. On disconnect: send terminal.close so the daemon tears the PTY down
// promptly rather than waiting for its idle sweep.
func (h *Handler) HandleIssueTerminalWS(w http.ResponseWriter, r *http.Request) {
if h.DaemonHub == nil {
http.Error(w, `{"error":"terminal proxy not configured"}`, http.StatusServiceUnavailable)
return
}
workspaceID := r.URL.Query().Get("workspace_id")
if workspaceID == "" {
http.Error(w, `{"error":"workspace_id required"}`, http.StatusBadRequest)
return
}
wsUUID, err := util.ParseUUID(workspaceID)
if err != nil {
http.Error(w, `{"error":"invalid workspace_id"}`, http.StatusBadRequest)
return
}
issueParam := chi.URLParam(r, "issue_id")
if issueParam == "" {
http.Error(w, `{"error":"issue_id required"}`, http.StatusBadRequest)
return
}
userID, errMsg := terminalAuthCookie(r, h)
if errMsg != "" && userID == "" {
// No cookie or invalid cookie. Defer auth to the first WS frame.
}
if userID != "" && !h.terminalIsMember(r.Context(), userID, workspaceID) {
http.Error(w, `{"error":"not a member of this workspace"}`, http.StatusForbidden)
return
}
conn, err := terminalUpgrader.Upgrade(w, r, nil)
if err != nil {
slog.Error("terminal ws upgrade failed", "error", err)
return
}
if userID == "" {
uid, errMsg := terminalFirstFrameAuth(conn, h)
if errMsg != "" {
_ = conn.WriteMessage(websocket.TextMessage, []byte(errMsg))
conn.Close()
return
}
if !h.terminalIsMember(r.Context(), uid, workspaceID) {
_ = conn.WriteMessage(websocket.TextMessage, []byte(`{"error":"not a member of this workspace"}`))
conn.Close()
return
}
userID = uid
_ = conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"auth_ack"}`))
}
issue, ok := h.terminalResolveIssue(r.Context(), issueParam, wsUUID)
if !ok {
sendTerminalErrorAndClose(conn, "", "", protocol.TerminalErrorCodeTaskNotFound, "issue not found")
return
}
task, ok := h.terminalLatestAttachableTask(r.Context(), issue.ID)
if !ok {
sendTerminalErrorAndClose(conn, "", "", protocol.TerminalErrorCodeTaskNotFound, "no agent task has run on this issue yet — trigger a run first")
return
}
cols := parseUint16Query(r, "cols", 80)
rows := parseUint16Query(r, "rows", 24)
proxy := newTerminalProxy(conn, h.DaemonHub, userID, util.UUIDToString(task.RuntimeID), util.UUIDToString(task.ID), workspaceID, util.UUIDToString(issue.ID), task.SessionID.String, task.WorkDir.String, cols, rows)
proxy.audit = newTerminalAuditRecorder(h, util.UUIDToString(issue.ID), util.UUIDToString(task.ID), util.UUIDToString(task.RuntimeID), workspaceID, userID, task.WorkDir.String)
proxy.run()
}
// terminalAuditRecorder writes terminal_sessions rows for the audit log
// (RFC §Auth) and as the data source behind `multica issue runs`'
// `type=terminal` entries. Persisting happens out-of-band on a background
// context so a slow DB write can't stall the WS handshake — the trade-off
// is that an audit row may briefly lag the actual session open by a few
// milliseconds, which is acceptable for an audit surface.
type terminalAuditRecorder struct {
h *Handler
issueID string
taskID string
runtimeID string
workspaceID string
userID string
workDir string
}
func newTerminalAuditRecorder(h *Handler, issueID, taskID, runtimeID, workspaceID, userID, workDir string) *terminalAuditRecorder {
if h == nil || h.Queries == nil {
// Tests that build a handler without DB queries (terminal protocol
// tests, etc.) skip recording — keep that path nil-safe so the
// audit hook never panics in a unit environment.
return nil
}
return &terminalAuditRecorder{
h: h,
issueID: issueID,
taskID: taskID,
runtimeID: runtimeID,
workspaceID: workspaceID,
userID: userID,
workDir: workDir,
}
}
func (a *terminalAuditRecorder) RecordOpen(sessionID, shell string) {
if a == nil {
return
}
sessUUID, err := util.ParseUUID(sessionID)
if err != nil {
slog.Debug("terminal audit: invalid session id", "session_id", sessionID, "error", err)
return
}
issueUUID, err := util.ParseUUID(a.issueID)
if err != nil {
return
}
taskUUID, err := util.ParseUUID(a.taskID)
if err != nil {
return
}
wsUUID, err := util.ParseUUID(a.workspaceID)
if err != nil {
return
}
userUUID, err := util.ParseUUID(a.userID)
if err != nil {
return
}
var runtimeUUID pgtype.UUID
if a.runtimeID != "" {
if parsed, perr := util.ParseUUID(a.runtimeID); perr == nil {
runtimeUUID = parsed
}
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if _, err := a.h.Queries.CreateTerminalSession(ctx, db.CreateTerminalSessionParams{
ID: sessUUID,
WorkspaceID: wsUUID,
IssueID: issueUUID,
TaskID: taskUUID,
RuntimeID: runtimeUUID,
UserID: userUUID,
WorkDir: a.workDir,
Shell: shell,
StartedAt: pgtype.Timestamptz{Time: time.Now().UTC(), Valid: true},
}); err != nil {
slog.Warn("terminal audit: record open failed", "session_id", sessionID, "error", err)
}
}
func (a *terminalAuditRecorder) RecordClose(sessionID string, exitCode int32, hasExit bool, reason string) {
if a == nil || sessionID == "" {
return
}
sessUUID, err := util.ParseUUID(sessionID)
if err != nil {
return
}
var code pgtype.Int4
if hasExit {
code = pgtype.Int4{Int32: exitCode, Valid: true}
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := a.h.Queries.CloseTerminalSession(ctx, db.CloseTerminalSessionParams{
ID: sessUUID,
EndedAt: pgtype.Timestamptz{Time: time.Now().UTC(), Valid: true},
ExitCode: code,
CloseReason: reason,
}); err != nil {
slog.Warn("terminal audit: record close failed", "session_id", sessionID, "error", err)
}
}
// terminalProxy is the per-connection bridge between browser and daemon.
// Methods that touch the WS connection do so from one of two goroutines
// only: writePump owns conn writes and readPump owns conn reads, matching
// gorilla/websocket's single-goroutine-per-direction contract.
type terminalProxy struct {
conn *websocket.Conn
hub *daemonws.Hub
userID string
runtimeID string
taskID string
workspaceID string
issueID string
priorSess string
workDir string
cols uint16
rows uint16
requestID string
mu sync.Mutex
sessionID string
closedOnce sync.Once
closeCh chan struct{}
sendCh chan []byte
openedCh chan struct{}
openErr chan terminalOpenFailure
// audit is the persistence hook for terminal_sessions rows (RFC §Auth).
// nil in tests that build a proxy without a Handler — every call site
// is nil-safe through the recorder methods.
audit *terminalAuditRecorder
// exitMu guards exitCode/hasExit, written from writePump when the
// daemon sends terminal.exit and read from the run() defer that
// finalizes the audit row.
exitMu sync.Mutex
exitCode int32
hasExit bool
exitMsg string
}
type terminalOpenFailure struct {
code string
message string
}
func newTerminalProxy(conn *websocket.Conn, hub *daemonws.Hub, userID, runtimeID, taskID, workspaceID, issueID, priorSess, workDir string, cols, rows uint16) *terminalProxy {
return &terminalProxy{
conn: conn,
hub: hub,
userID: userID,
runtimeID: runtimeID,
taskID: taskID,
workspaceID: workspaceID,
issueID: issueID,
priorSess: priorSess,
workDir: workDir,
cols: cols,
rows: rows,
requestID: uuid.NewString(),
closeCh: make(chan struct{}),
sendCh: make(chan []byte, 256),
openedCh: make(chan struct{}),
openErr: make(chan terminalOpenFailure, 1),
}
}
// Deliver implements daemonws.TerminalSink. The daemon hub's read pump
// invokes this for every terminal.* frame addressed to our request_id /
// session_id. We must stay non-blocking; the hub drops the frame on a
// full buffer rather than stalling its own pump.
func (p *terminalProxy) Deliver(frame []byte) bool {
select {
case p.sendCh <- frame:
return true
default:
return false
}
}
func (p *terminalProxy) run() {
defer p.conn.Close()
router := p.hub.TerminalRouter()
router.Register(p.requestID, p)
defer router.Unregister(p.requestID)
go p.writePump()
if err := p.sendOpenToDaemon(); err != nil {
// Couldn't reach the daemon at all — surface a structured error and
// bail before we register cleanup paths that assume a live session.
sendTerminalErrorOverChannel(p.sendCh, p.requestID, "", protocol.TerminalErrorCodeInternal, err.Error())
<-time.After(50 * time.Millisecond) // give writePump a tick to flush
p.shutdown()
return
}
// Block until the open ack arrives, the open is rejected, or the user
// disconnects mid-handshake. After this point sessionID is stable and
// we are routing on session_id rather than request_id.
select {
case <-p.openedCh:
// Open succeeded. The router is already re-keyed on session_id by
// observeOpen. Fall through to the bidirectional pump.
case failure := <-p.openErr:
sendTerminalErrorOverChannel(p.sendCh, p.requestID, "", failure.code, failure.message)
<-time.After(50 * time.Millisecond)
p.shutdown()
return
case <-p.closeCh:
return
case <-time.After(terminalOpenTimeout):
sendTerminalErrorOverChannel(p.sendCh, p.requestID, "", protocol.TerminalErrorCodeInternal, "daemon did not respond to terminal.open within timeout")
<-time.After(50 * time.Millisecond)
p.shutdown()
return
}
defer func() {
sid := p.SessionID()
if sid != "" {
router.Unregister(sid)
// Best-effort teardown on the daemon. If the connection to the
// daemon is already gone, the daemon's own clearWSWrites path
// will close the session — we just lose an idle slot's worth of
// latency before the GC catches it.
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalClose, protocol.TerminalClosePayload{
SessionID: sid,
Reason: "browser_disconnect",
})
if err == nil {
_ = p.hub.SendToRuntime(p.runtimeID, frame)
}
// Stamp the audit row. If the daemon sent terminal.exit before
// we got here, use its exit code + reason; otherwise this is a
// browser-initiated disconnect.
p.exitMu.Lock()
code, has, reason := p.exitCode, p.hasExit, p.exitMsg
p.exitMu.Unlock()
if reason == "" {
reason = "browser_disconnect"
}
p.audit.RecordClose(sid, code, has, reason)
}
}()
p.readPump()
}
func (p *terminalProxy) SessionID() string {
p.mu.Lock()
defer p.mu.Unlock()
return p.sessionID
}
func (p *terminalProxy) sendOpenToDaemon() error {
payload := protocol.TerminalOpenPayload{
RequestID: p.requestID,
TaskID: p.taskID,
WorkspaceID: p.workspaceID,
UserID: p.userID,
IssueID: p.issueID,
WorkDir: p.workDir,
PriorSessionID: p.priorSess,
Cols: p.cols,
Rows: p.rows,
}
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalOpen, payload)
if err != nil {
return err
}
if err := p.hub.SendToRuntime(p.runtimeID, frame); err != nil {
if errors.Is(err, daemonws.ErrNoDaemonForRuntime) {
return errors.New("daemon offline for this runtime — start the agent's daemon and retry")
}
return err
}
return nil
}
// readPump reads browser frames and forwards terminal.data/resize/close to
// the daemon. Unknown frames are dropped silently. Returns when the
// connection closes; we then trigger shutdown so writePump exits too.
func (p *terminalProxy) readPump() {
defer p.shutdown()
p.conn.SetReadLimit(64 * 1024)
for {
_, raw, err := p.conn.ReadMessage()
if err != nil {
return
}
var env protocol.Message
if err := json.Unmarshal(raw, &env); err != nil {
continue
}
sid := p.SessionID()
// We force-stamp session_id on outbound frames: the browser may have
// sent its own session_id, but only the one the daemon assigned is
// trusted. This prevents a misbehaving client from addressing a
// session that another user opened against the same daemon.
switch env.Type {
case protocol.MessageTypeTerminalData:
var pl protocol.TerminalDataPayload
if err := json.Unmarshal(env.Payload, &pl); err != nil {
continue
}
pl.SessionID = sid
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalData, pl)
if err != nil {
continue
}
_ = p.hub.SendToRuntime(p.runtimeID, frame)
case protocol.MessageTypeTerminalResize:
var pl protocol.TerminalResizePayload
if err := json.Unmarshal(env.Payload, &pl); err != nil {
continue
}
pl.SessionID = sid
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalResize, pl)
if err != nil {
continue
}
_ = p.hub.SendToRuntime(p.runtimeID, frame)
case protocol.MessageTypeTerminalClose:
return
}
}
}
// writePump is the single owner of conn writes. It also watches for the
// terminal.opened / terminal.error handshake transition while it pumps,
// because separating those into a second goroutine would race over
// sendCh (only one reader per channel value).
//
// Once the session is open, this is just a straight relay. During the
// open window (sessionID empty), every frame is forwarded to the browser
// AND inspected — opened/error frames feed openedCh / openErr so run()
// can unblock or fail the handshake.
func (p *terminalProxy) writePump() {
router := p.hub.TerminalRouter()
for {
select {
case <-p.closeCh:
return
case frame, ok := <-p.sendCh:
if !ok {
return
}
p.forwardToBrowser(frame)
// terminal.exit can arrive at any point in the session lifecycle
// (idle timeout, child crash, manager shutdown), so we always
// peek the envelope to capture the exit code for the audit row.
// terminal.opened / terminal.error are only meaningful during
// the open handshake window — once sessionID is set they are
// just relayed without re-inspection.
var env protocol.Message
if err := json.Unmarshal(frame, &env); err != nil {
continue
}
if env.Type == protocol.MessageTypeTerminalExit {
var ep protocol.TerminalExitPayload
if err := json.Unmarshal(env.Payload, &ep); err == nil {
p.exitMu.Lock()
p.exitCode = int32(ep.ExitCode)
p.hasExit = true
if ep.Reason != "" {
p.exitMsg = ep.Reason
}
p.exitMu.Unlock()
// Finalize the audit row as soon as the daemon reports
// exit, not when the client disconnects. CloseTerminalSession
// is idempotent (WHERE ended_at IS NULL) so the browser_disconnect
// fallback in run()'s defer becomes a no-op if it fires later.
// Without this, a client that keeps the WS open after exit
// would leave terminal_sessions.ended_at NULL forever and
// `multica issue runs` would render an already-exited
// terminal as active.
sid := ep.SessionID
if sid == "" {
sid = p.SessionID()
}
reason := ep.Reason
if reason == "" {
reason = "exited"
}
p.audit.RecordClose(sid, int32(ep.ExitCode), true, reason)
}
continue
}
if p.SessionID() != "" {
continue
}
switch env.Type {
case protocol.MessageTypeTerminalOpened:
var op protocol.TerminalOpenedPayload
if err := json.Unmarshal(env.Payload, &op); err == nil && op.SessionID != "" {
p.mu.Lock()
p.sessionID = op.SessionID
p.mu.Unlock()
router.Register(op.SessionID, p)
router.Unregister(p.requestID)
// Persist the open audit row before unblocking run(). If
// the DB write fails the slog records the error; the
// session itself still runs (audit is best-effort).
p.audit.RecordOpen(op.SessionID, op.Shell)
close(p.openedCh)
}
case protocol.MessageTypeTerminalError:
var ep protocol.TerminalErrorPayload
if err := json.Unmarshal(env.Payload, &ep); err == nil {
select {
case p.openErr <- terminalOpenFailure{code: ep.Code, message: ep.Message}:
default:
}
}
}
}
}
}
func (p *terminalProxy) forwardToBrowser(frame []byte) {
p.conn.SetWriteDeadline(time.Now().Add(terminalWriteWait))
if err := p.conn.WriteMessage(websocket.TextMessage, frame); err != nil {
slog.Debug("terminal ws write to browser failed", "error", err)
p.shutdown()
}
}
func (p *terminalProxy) shutdown() {
p.closedOnce.Do(func() {
close(p.closeCh)
})
}
func sendTerminalErrorOverChannel(ch chan<- []byte, requestID, sessionID, code, message string) {
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalError, protocol.TerminalErrorPayload{
RequestID: requestID,
SessionID: sessionID,
Code: code,
Message: message,
})
if err != nil {
return
}
select {
case ch <- frame:
default:
}
}
func sendTerminalErrorAndClose(conn *websocket.Conn, requestID, sessionID, code, message string) {
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalError, protocol.TerminalErrorPayload{
RequestID: requestID,
SessionID: sessionID,
Code: code,
Message: message,
})
if err == nil {
_ = conn.WriteMessage(websocket.TextMessage, frame)
}
conn.Close()
}
func marshalTerminalFrame(msgType string, payload any) ([]byte, error) {
raw, err := json.Marshal(payload)
if err != nil {
return nil, err
}
return json.Marshal(protocol.Message{Type: msgType, Payload: raw})
}
func parseUint16Query(r *http.Request, key string, defaultVal uint16) uint16 {
raw := r.URL.Query().Get(key)
if raw == "" {
return defaultVal
}
v, err := strconv.ParseUint(raw, 10, 16)
if err != nil || v == 0 {
return defaultVal
}
return uint16(v)
}
// --- auth helpers (cookie + first-frame JWT/PAT) ---
func terminalAuthCookie(r *http.Request, h *Handler) (string, string) {
cookie, err := r.Cookie(auth.AuthCookieName)
if err != nil || cookie.Value == "" {
return "", ""
}
return terminalAuthToken(cookie.Value, h, r.Context())
}
func terminalFirstFrameAuth(conn *websocket.Conn, h *Handler) (string, string) {
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
defer conn.SetReadDeadline(time.Time{})
_, raw, err := conn.ReadMessage()
if err != nil {
return "", `{"error":"auth timeout or read error"}`
}
var msg struct {
Type string `json:"type"`
Payload struct {
Token string `json:"token"`
} `json:"payload"`
}
if err := json.Unmarshal(raw, &msg); err != nil || msg.Type != "auth" || msg.Payload.Token == "" {
return "", `{"error":"expected auth message as first frame"}`
}
return terminalAuthToken(msg.Payload.Token, h, context.Background())
}
func terminalAuthToken(tokenStr string, h *Handler, ctx context.Context) (string, string) {
if len(tokenStr) > 4 && tokenStr[:4] == "mul_" {
if h.PATCache == nil {
pat, err := h.Queries.GetPersonalAccessTokenByHash(ctx, auth.HashToken(tokenStr))
if err != nil {
return "", `{"error":"invalid token"}`
}
return util.UUIDToString(pat.UserID), ""
}
hash := auth.HashToken(tokenStr)
if uid, ok := h.PATCache.Get(ctx, hash); ok {
return uid, ""
}
pat, err := h.Queries.GetPersonalAccessTokenByHash(ctx, hash)
if err != nil {
return "", `{"error":"invalid token"}`
}
uid := util.UUIDToString(pat.UserID)
return uid, ""
}
token, err := jwt.Parse(tokenStr, func(token *jwt.Token) (any, error) {
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, jwt.ErrSignatureInvalid
}
return auth.JWTSecret(), nil
})
if err != nil || !token.Valid {
return "", `{"error":"invalid token"}`
}
claims, ok := token.Claims.(jwt.MapClaims)
if !ok {
return "", `{"error":"invalid claims"}`
}
uid, ok := claims["sub"].(string)
if !ok || uid == "" {
return "", `{"error":"invalid claims"}`
}
return uid, ""
}
func (h *Handler) terminalIsMember(ctx context.Context, userID, workspaceID string) bool {
userUUID, err := util.ParseUUID(userID)
if err != nil {
return false
}
wsUUID, err := util.ParseUUID(workspaceID)
if err != nil {
return false
}
_, err = h.Queries.GetMemberByUserAndWorkspace(ctx, db.GetMemberByUserAndWorkspaceParams{
UserID: userUUID,
WorkspaceID: wsUUID,
})
return err == nil
}
func (h *Handler) terminalResolveIssue(ctx context.Context, issueID string, wsUUID pgtype.UUID) (db.Issue, bool) {
if parts := splitIdentifier(issueID); parts != nil {
issue, err := h.Queries.GetIssueByNumber(ctx, db.GetIssueByNumberParams{
WorkspaceID: wsUUID,
Number: parts.number,
})
if err == nil {
return issue, true
}
}
issueUUID, err := util.ParseUUID(issueID)
if err != nil {
return db.Issue{}, false
}
issue, err := h.Queries.GetIssueInWorkspace(ctx, db.GetIssueInWorkspaceParams{
ID: issueUUID,
WorkspaceID: wsUUID,
})
if err != nil {
return db.Issue{}, false
}
return issue, true
}
// terminalLatestAttachableTask returns the most recent task on the issue
// that the proxy can attach to: must have a known work_dir, a runtime_id,
// and a daemon currently connected for that runtime. Falls back through
// the task history rather than picking only the absolute latest, because
// the most-recent row may be a queued task that never ran (no workdir yet).
func (h *Handler) terminalLatestAttachableTask(ctx context.Context, issueID pgtype.UUID) (db.AgentTaskQueue, bool) {
tasks, err := h.Queries.ListTasksByIssue(ctx, issueID)
if err != nil {
return db.AgentTaskQueue{}, false
}
for _, t := range tasks {
if !t.WorkDir.Valid || t.WorkDir.String == "" {
continue
}
if !t.RuntimeID.Valid {
continue
}
return t, true
}
return db.AgentTaskQueue{}, false
}

View File

@@ -77,6 +77,13 @@ func SetAllowedOrigins(origins []string) {
allowedWSOrigins.Store(origins)
}
// CheckOrigin is the WebSocket origin check shared by every authenticated
// realtime endpoint (including the issue terminal proxy). Browser endpoints
// must NOT install their own `CheckOrigin: true` upgrader — that bypasses
// CSWSH defense; route through this function instead so the allowlist stays
// the single source of truth.
func CheckOrigin(r *http.Request) bool { return checkOrigin(r) }
func checkOrigin(r *http.Request) bool {
origin := r.Header.Get("Origin")
if origin == "" {

View File

@@ -0,0 +1 @@
DROP TABLE IF EXISTS terminal_sessions;

View File

@@ -0,0 +1,36 @@
-- terminal_sessions records every interactive PTY a user opens against an
-- agent task workdir via the Issue → Terminal panel or `multica issue
-- terminal`. The row is the audit log entry (RFC §Auth) and the source
-- behind the `type=terminal` rows that surface in `multica issue runs`.
--
-- We keep this lightweight on purpose: keystrokes are NEVER recorded
-- (privacy + volume), only the open/close envelope. close_reason is the
-- string the daemon's terminal.Manager attaches to the teardown
-- (browser_disconnect, idle_timeout, manager_shutdown, ws_disconnect,
-- exited, …) so operators can tell why a session ended without grepping
-- the daemon logs.
CREATE TABLE terminal_sessions (
id UUID PRIMARY KEY,
workspace_id UUID NOT NULL,
issue_id UUID NOT NULL,
task_id UUID NOT NULL,
runtime_id UUID,
user_id UUID NOT NULL,
work_dir TEXT NOT NULL,
shell TEXT NOT NULL DEFAULT '',
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
ended_at TIMESTAMPTZ,
exit_code INTEGER,
close_reason TEXT NOT NULL DEFAULT ''
);
-- Listing in the issue runs view is always scoped to a single issue and
-- ordered by most-recent-first; this is the dominant access path.
CREATE INDEX terminal_sessions_issue_started_idx
ON terminal_sessions (issue_id, started_at DESC);
-- Per-workspace audits (e.g. "show me every terminal session in this
-- workspace") and the cross-workspace ACL check both filter by
-- workspace_id first, so a covering index keeps that path cheap.
CREATE INDEX terminal_sessions_workspace_started_idx
ON terminal_sessions (workspace_id, started_at DESC);

View File

@@ -563,6 +563,21 @@ type TaskUsageRollupState struct {
LastError pgtype.Text `json:"last_error"`
}
type TerminalSession struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
IssueID pgtype.UUID `json:"issue_id"`
TaskID pgtype.UUID `json:"task_id"`
RuntimeID pgtype.UUID `json:"runtime_id"`
UserID pgtype.UUID `json:"user_id"`
WorkDir string `json:"work_dir"`
Shell string `json:"shell"`
StartedAt pgtype.Timestamptz `json:"started_at"`
EndedAt pgtype.Timestamptz `json:"ended_at"`
ExitCode pgtype.Int4 `json:"exit_code"`
CloseReason string `json:"close_reason"`
}
type User struct {
ID pgtype.UUID `json:"id"`
Name string `json:"name"`

View File

@@ -0,0 +1,129 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
// source: terminal_session.sql
package db
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const closeTerminalSession = `-- name: CloseTerminalSession :exec
UPDATE terminal_sessions
SET ended_at = $2,
exit_code = $3,
close_reason = $4
WHERE id = $1 AND ended_at IS NULL
`
type CloseTerminalSessionParams struct {
ID pgtype.UUID `json:"id"`
EndedAt pgtype.Timestamptz `json:"ended_at"`
ExitCode pgtype.Int4 `json:"exit_code"`
CloseReason string `json:"close_reason"`
}
// Idempotent: ended_at IS NULL guards against double-close (e.g. the
// daemon emitting terminal.exit and the user closing the tab racing).
// A second call after the first has stamped ended_at is a no-op.
func (q *Queries) CloseTerminalSession(ctx context.Context, arg CloseTerminalSessionParams) error {
_, err := q.db.Exec(ctx, closeTerminalSession,
arg.ID,
arg.EndedAt,
arg.ExitCode,
arg.CloseReason,
)
return err
}
const createTerminalSession = `-- name: CreateTerminalSession :one
INSERT INTO terminal_sessions (
id, workspace_id, issue_id, task_id, runtime_id, user_id, work_dir, shell, started_at
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9
) RETURNING id, workspace_id, issue_id, task_id, runtime_id, user_id, work_dir, shell, started_at, ended_at, exit_code, close_reason
`
type CreateTerminalSessionParams struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
IssueID pgtype.UUID `json:"issue_id"`
TaskID pgtype.UUID `json:"task_id"`
RuntimeID pgtype.UUID `json:"runtime_id"`
UserID pgtype.UUID `json:"user_id"`
WorkDir string `json:"work_dir"`
Shell string `json:"shell"`
StartedAt pgtype.Timestamptz `json:"started_at"`
}
func (q *Queries) CreateTerminalSession(ctx context.Context, arg CreateTerminalSessionParams) (TerminalSession, error) {
row := q.db.QueryRow(ctx, createTerminalSession,
arg.ID,
arg.WorkspaceID,
arg.IssueID,
arg.TaskID,
arg.RuntimeID,
arg.UserID,
arg.WorkDir,
arg.Shell,
arg.StartedAt,
)
var i TerminalSession
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.IssueID,
&i.TaskID,
&i.RuntimeID,
&i.UserID,
&i.WorkDir,
&i.Shell,
&i.StartedAt,
&i.EndedAt,
&i.ExitCode,
&i.CloseReason,
)
return i, err
}
const listTerminalSessionsByIssue = `-- name: ListTerminalSessionsByIssue :many
SELECT id, workspace_id, issue_id, task_id, runtime_id, user_id, work_dir, shell, started_at, ended_at, exit_code, close_reason FROM terminal_sessions
WHERE issue_id = $1
ORDER BY started_at DESC
`
func (q *Queries) ListTerminalSessionsByIssue(ctx context.Context, issueID pgtype.UUID) ([]TerminalSession, error) {
rows, err := q.db.Query(ctx, listTerminalSessionsByIssue, issueID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []TerminalSession{}
for rows.Next() {
var i TerminalSession
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.IssueID,
&i.TaskID,
&i.RuntimeID,
&i.UserID,
&i.WorkDir,
&i.Shell,
&i.StartedAt,
&i.EndedAt,
&i.ExitCode,
&i.CloseReason,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}

View File

@@ -0,0 +1,21 @@
-- name: CreateTerminalSession :one
INSERT INTO terminal_sessions (
id, workspace_id, issue_id, task_id, runtime_id, user_id, work_dir, shell, started_at
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9
) RETURNING *;
-- name: CloseTerminalSession :exec
-- Idempotent: ended_at IS NULL guards against double-close (e.g. the
-- daemon emitting terminal.exit and the user closing the tab racing).
-- A second call after the first has stamped ended_at is a no-op.
UPDATE terminal_sessions
SET ended_at = $2,
exit_code = $3,
close_reason = $4
WHERE id = $1 AND ended_at IS NULL;
-- name: ListTerminalSessionsByIssue :many
SELECT * FROM terminal_sessions
WHERE issue_id = $1
ORDER BY started_at DESC;

View File

@@ -168,3 +168,99 @@ type DaemonHeartbeatPendingLocalSkillImport struct {
ID string `json:"id"`
SkillKey string `json:"skill_key"`
}
// Terminal WS message types. These flow over the existing daemonws hub
// between client (web/desktop/CLI) and daemon. Bytes payloads are base64
// encoded so they can travel as JSON text frames without binary framing.
const (
// TerminalOpen — client → daemon: request a new PTY session bound to a task workdir.
MessageTypeTerminalOpen = "terminal.open"
// TerminalOpened — daemon → client: ack carrying the session_id and resolved workdir.
MessageTypeTerminalOpened = "terminal.opened"
// TerminalData — bidirectional: PTY stdin (client→daemon) / stdout+stderr (daemon→client).
MessageTypeTerminalData = "terminal.data"
// TerminalResize — client → daemon: window-size change.
MessageTypeTerminalResize = "terminal.resize"
// TerminalClose — bidirectional: explicit teardown request / ack.
MessageTypeTerminalClose = "terminal.close"
// TerminalExit — daemon → client: child process exited; carries exit code and optional reason.
MessageTypeTerminalExit = "terminal.exit"
// TerminalError — daemon → client: open/resize/etc. failed; carries human-readable code+message.
MessageTypeTerminalError = "terminal.error"
)
// TerminalOpenPayload requests a PTY session bound to the given task's
// workdir. WorkspaceID is the workspace the caller is acting in; the daemon
// must reject if it does not match the task's workspace.
//
// The server resolves WorkDir / IssueID / PriorSessionID from its own DB
// (the daemon has no persistent task cache) and embeds them here before
// forwarding to the daemon. The daemon trusts these fields because the
// daemonws connection is already authenticated and scoped — but it still
// rechecks WorkspaceID against the request body to catch a misrouted frame.
type TerminalOpenPayload struct {
RequestID string `json:"request_id"`
TaskID string `json:"task_id"`
WorkspaceID string `json:"workspace_id"`
UserID string `json:"user_id,omitempty"`
IssueID string `json:"issue_id,omitempty"`
WorkDir string `json:"work_dir,omitempty"`
PriorSessionID string `json:"prior_session_id,omitempty"`
Cols uint16 `json:"cols"`
Rows uint16 `json:"rows"`
}
// TerminalOpenedPayload echoes the request_id and carries the session_id the
// client must include on subsequent data/resize/close frames.
type TerminalOpenedPayload struct {
RequestID string `json:"request_id"`
SessionID string `json:"session_id"`
WorkDir string `json:"work_dir"`
Shell string `json:"shell"`
}
// TerminalDataPayload carries raw PTY bytes in base64.
type TerminalDataPayload struct {
SessionID string `json:"session_id"`
DataB64 string `json:"data_b64"`
}
// TerminalResizePayload updates the PTY window size.
type TerminalResizePayload struct {
SessionID string `json:"session_id"`
Cols uint16 `json:"cols"`
Rows uint16 `json:"rows"`
}
// TerminalClosePayload requests teardown. Reason is informational.
type TerminalClosePayload struct {
SessionID string `json:"session_id"`
Reason string `json:"reason,omitempty"`
}
// TerminalExitPayload signals the child process exited.
type TerminalExitPayload struct {
SessionID string `json:"session_id"`
ExitCode int `json:"exit_code"`
Reason string `json:"reason,omitempty"`
}
// Terminal error codes returned in TerminalErrorPayload.Code.
const (
TerminalErrorCodeWorkspaceMismatch = "workspace_mismatch"
TerminalErrorCodeTaskNotFound = "task_not_found"
TerminalErrorCodeSessionNotFound = "session_not_found"
TerminalErrorCodeUnsupportedOS = "unsupported_os"
TerminalErrorCodeSpawnFailed = "spawn_failed"
TerminalErrorCodeInternal = "internal"
)
// TerminalErrorPayload reports a failure. RequestID is set when the error
// is a response to a specific open request; SessionID is set when it
// references an already-established session.
type TerminalErrorPayload struct {
RequestID string `json:"request_id,omitempty"`
SessionID string `json:"session_id,omitempty"`
Code string `json:"code"`
Message string `json:"message"`
}