Compare commits

...

5 Commits

Author SHA1 Message Date
Jiayuan Zhang
0a97663acb feat(terminal): Phase 2 — daemonws routing + server proxy + Desktop xterm.js (MUL-2295)
Wires the Phase 1 terminal.Manager into the live daemonws transport so a
browser tab on the Multica Desktop app can attach to a PTY running in
the daemon's task workdir. End-to-end frame path:

  Desktop (xterm.js)  → /ws/issues/{id}/terminal (server proxy, cookie
                       or first-frame JWT auth, workspace membership
                       enforced before upgrade)
  → daemonws hub      (new SendToRuntime + TerminalRouter routing
                       terminal.* frames back to the proxy by
                       request_id, then re-keyed on session_id)
  → daemon            (new terminal_bridge.go owns one Manager per WS
                       connection, drains PtySession.Output() into
                       terminal.data frames, surfaces ExitC as
                       terminal.exit; closeAll on WS disconnect)
  → terminal.Manager.OpenWithInfo (new method) — server resolves
    task.work_dir/issue_id/prior_session_id from its DB and embeds
    them on the protocol; daemon trusts the server payload, no
    daemon-local task cache needed.

Auth + ACL:
- Browser proxy enforces workspace membership before the upgrade.
- TerminalOpenPayload carries the resolved task info; cross-workspace
  is structurally impossible at the bridge layer because both
  OpenParams.WorkspaceID and TaskInfo.WorkspaceID come from the same
  server-resolved field.
- terminal_bridge maps terminal.{Manager.OpenWithInfo} errors to the
  protocol error codes (workspace_mismatch / task_not_found /
  unsupported_os / spawn_failed / internal).

Resume:
- Server passes prior_session_id; daemon injects CLAUDE_SESSION_ID +
  MULTICA_{WORKSPACE,ISSUE,TASK,USER}_ID into the PTY env per the RFC.
  `claude --resume \$CLAUDE_SESSION_ID` continues the agent's session.

Lifecycle:
- Daemon installs a fresh terminalBridge per daemonws connection and
  tears every PtySession down on disconnect; session_ids minted on one
  WS cannot be reused on a reconnect because the server-side routing
  registration is also gone.
- daemonws client.send buffer raised from 16 to 256 and read limit
  from 4KB to 64KB so PTY traffic fits without evicting connections
  used for heartbeat / wakeup hints.

Desktop UI:
- packages/views/issues/components/terminal-panel.tsx renders an
  xterm.js console with FitAddon, base64 wire encoding, ResizeObserver
  → terminal.resize, reconnect button, and a clear web-only placeholder
  when window.desktopAPI is absent.
- TerminalPanelSection wrapper hangs collapsed in the issue-detail
  sidebar next to the execution log so bootstrap doesn't run for
  every issue view.

Tests:
- terminal/manager_test.go: OpenWithInfo happy path + cross-workspace
  reject (16 tests total, all -race clean).
- daemonws/terminal_test.go: TerminalRouter request_id → session_id
  re-keying and unknown-session drop.
- daemon/terminal_bridge_test.go: server-supplied workdir round-trips
  through OpenWithInfo, missing-workdir surfaces task_not_found and
  never spawns, data+exit round trip.
- GOOS=windows go test -c clean for both daemon and daemon/terminal.

Phase 1 / 2 / 3 / 4 mapping unchanged: Phase 3 (CLI) and Phase 4
(execenv GC hook + issue runs entry + audit log) are still untouched.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-16 17:45:53 +08:00
Jiayuan Zhang
953fdd5003 fix(daemon/terminal): close re-entry barrier + reap orphan PTY (MUL-2295)
- Manager.Close concurrent re-entry now blocks late callers on closeDone
  so every Close() return shares the "manager drained" guarantee.
- Open cleanup path on lost race with Close calls pty.Wait() to reap the
  child synchronously (waitLoop never runs there).
- Tests: concurrent Close callers all observe drained state; Open cleanup
  invokes pty.Wait at least once.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-16 17:10:22 +08:00
Jiayuan Zhang
e70f44b92b fix(daemon/terminal): lock Done()/Manager.Close finalize order (MUL-2295)
Round 2 review fixes:

1. PtySession finalize sequence is now
   ExitC -> close(output) -> onClose/deregister -> close(done)
   so external waiters (bridge / GC hook / audit) can `<-Done()` and
   immediately query the manager without a race window.

2. Manager.Close now waits for each session's Done() (not just Close())
   so by the time it returns the registry is empty and every session
   is fully finalized.

Adds TestSession_DoneFiresAfterDeregister (locks the ordering contract)
and TestManager_CloseWaitsForSessionFinalize (fakePTY.Wait delay proves
Manager.Close blocks through finalize).

Co-authored-by: multica-agent <github@multica.ai>
2026-05-16 17:00:02 +08:00
Jiayuan Zhang
281f1073b5 fix(daemon/terminal): address Phase 1 review feedback (MUL-2295)
Wires in the four fixes Emacs flagged on the Phase 1 review:

1. Lifecycle: split stop/done with a WaitGroup. readLoop and idleLoop
   exit via <-stop; waitLoop is the finalizer that waits on the WG
   before closing output/done. Eliminates the "send on closed channel"
   race when the output buffer is saturated. Adds a regression test
   that fills output, calls Close, and verifies Done converges + ExitC
   fires before output closes (the doc contract).

2. Errors: Manager.Open wraps spawner errors with double-%w so
   errors.Is matches both ErrSpawnFailed and ErrUnsupportedOS. Adds a
   test with a fake spawner that returns ErrUnsupportedOS.

3. Close path on unix: SIGHUP to the process group, 250ms grace,
   SIGKILL, then close fd — comment now matches behavior. Skips the
   signal+sleep work entirely when the child already exited naturally.
   Manager.Close fans out per-session Close in parallel so the grace
   period doesn't multiply by session count.

4. IdleTimeout semantics: removes the NewManager default that
   silently rewrote 0 to 60min. Zero/negative now disables, per the
   doc comment. Added DefaultIdleTimeout for daemon wiring to opt in
   explicitly.

Verified: go test, go test -race, GOOS=windows go test -c.
Co-authored-by: multica-agent <github@multica.ai>
2026-05-16 16:48:11 +08:00
Jiayuan Zhang
6758feba05 feat(daemon): add terminal Manager + PTY session (Phase 1, MUL-2295)
Daemon-side foundation for the Issue → Terminal feature. Manager owns
the lifecycle of all live PtySessions; sessions spawn a shell on a real
PTY via creack/pty (unix-only — Windows returns ErrUnsupportedOS until
ConPty support lands).

Open enforces the cross-workspace ACL — a client acting in workspace A
cannot attach to a task that belongs to workspace B. Each session
injects CLAUDE_SESSION_ID + MULTICA_{WORKSPACE,ISSUE,TASK,USER}_ID into
the child env so `claude --resume $CLAUDE_SESSION_ID` continues the
same session the agent run was using.

Adds the terminal.* WebSocket message types to server/pkg/protocol so
Phase 2 (daemonws routing) and Phase 3 (CLI) can land without touching
the manager.

Tests cover open, data round-trip, resize, explicit close, idle timeout
sweep, manager shutdown, cross-workspace rejection, and unknown task.
A fake Spawner backed by channels lets tests exercise lifecycle without
forking a real shell.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-16 16:32:39 +08:00
25 changed files with 3774 additions and 2 deletions

View File

@@ -53,6 +53,7 @@ import { ResolvedThreadBar } from "./resolved-thread-bar";
import { collectThreadReplies } from "./thread-utils";
import { AgentLiveCard } from "./agent-live-card";
import { ExecutionLogSection } from "./execution-log-section";
import { TerminalPanel } from "./terminal-panel";
import { PullRequestList } from "./pull-request-list";
import { useQuery } from "@tanstack/react-query";
import { useAuthStore } from "@multica/core/auth";
@@ -1366,6 +1367,12 @@ export function IssueDetail({ issueId, onDelete, onDone, defaultSidebarOpen = tr
when there are no runs to show. */}
<ExecutionLogSection issueId={id} />
{/* Terminal panel — attaches to the PTY running in the daemon's
workdir for the latest agent task. Desktop-only (the panel
itself renders an explanatory placeholder on web).
See MUL-2295. */}
<TerminalPanelSection issueId={id} workspaceId={wsId} />
{/* Token usage */}
{usage && usage.task_count > 0 && (
<div>
@@ -1904,3 +1911,23 @@ export function IssueDetail({ issueId, onDelete, onDone, defaultSidebarOpen = tr
</ResizablePanelGroup>
);
}
// TerminalPanelSection wraps TerminalPanel in a collapsible header that
// matches the existing sidebar sections (Token usage, etc.). Collapsed
// by default — opening a PTY is an explicit action, and ResizeObserver +
// xterm bootstrap should not run for every issue view.
function TerminalPanelSection({ issueId, workspaceId }: { issueId: string; workspaceId: string }) {
const [open, setOpen] = useState(false);
return (
<div className="mt-6">
<button
className={`flex w-full items-center gap-1 rounded-md px-2 py-1 text-xs font-medium transition-colors mb-2 hover:bg-accent/70 ${open ? "" : "text-muted-foreground hover:text-foreground"}`}
onClick={() => setOpen((v) => !v)}
>
Terminal
<ChevronRight className={`!size-3 shrink-0 stroke-[2.5] text-muted-foreground transition-transform ${open ? "rotate-90" : ""}`} />
</button>
{open && <TerminalPanel issueId={issueId} workspaceId={workspaceId} />}
</div>
);
}

View File

@@ -0,0 +1,342 @@
"use client";
import { useEffect, useMemo, useRef, useState } from "react";
import { Terminal as XTerminal } from "@xterm/xterm";
import { FitAddon } from "@xterm/addon-fit";
import { getApi } from "@multica/core/api";
import { Button } from "@multica/ui/components/ui/button";
import "@xterm/xterm/css/xterm.css";
// Protocol message types — kept in lockstep with
// server/pkg/protocol/messages.go. Strings are stable across daemon /
// server / browser, so duplicating them client-side is OK; if we ever
// regenerate types from Go we can swap these out.
const MSG_TERMINAL_DATA = "terminal.data";
const MSG_TERMINAL_RESIZE = "terminal.resize";
const MSG_TERMINAL_CLOSE = "terminal.close";
const MSG_TERMINAL_OPENED = "terminal.opened";
const MSG_TERMINAL_EXIT = "terminal.exit";
const MSG_TERMINAL_ERROR = "terminal.error";
interface Envelope {
type: string;
payload: unknown;
}
interface OpenedPayload {
request_id: string;
session_id: string;
work_dir: string;
shell: string;
}
interface DataPayload {
session_id: string;
data_b64: string;
}
interface ExitPayload {
session_id: string;
exit_code: number;
reason?: string;
}
interface ErrorPayload {
request_id?: string;
session_id?: string;
code: string;
message: string;
}
// Detect Electron — server-side render guard plus the desktop preload
// surface check. Mirrors the pattern used elsewhere in the desktop app;
// the Terminal panel is intentionally desktop-only because the daemon
// only runs on a developer machine.
function isDesktopRuntime(): boolean {
return typeof window !== "undefined" && "desktopAPI" in window;
}
interface TerminalPanelProps {
issueId: string;
workspaceId: string;
}
export function TerminalPanel({ issueId, workspaceId }: TerminalPanelProps) {
const containerRef = useRef<HTMLDivElement | null>(null);
const termRef = useRef<XTerminal | null>(null);
const fitRef = useRef<FitAddon | null>(null);
const wsRef = useRef<WebSocket | null>(null);
const sessionIdRef = useRef<string>("");
const [status, setStatus] = useState<
"idle" | "connecting" | "connected" | "closed" | "error"
>("idle");
const [errorMessage, setErrorMessage] = useState<string>("");
const [reconnectKey, setReconnectKey] = useState(0);
const wsUrl = useMemo(() => deriveTerminalWsUrl(issueId, workspaceId), [
issueId,
workspaceId,
]);
useEffect(() => {
if (!isDesktopRuntime()) return;
if (!containerRef.current) return;
const term = new XTerminal({
convertEol: true,
cursorBlink: true,
fontFamily:
"ui-monospace, SFMono-Regular, Menlo, Monaco, 'Cascadia Mono', 'Roboto Mono', 'Courier New', monospace",
fontSize: 13,
theme: { background: "#0b0b0b", foreground: "#e6e6e6" },
// Scrollback large enough to read a verbose `cargo build` or `git
// log` without auto-clipping the top.
scrollback: 5000,
});
const fit = new FitAddon();
term.loadAddon(fit);
term.open(containerRef.current);
fit.fit();
termRef.current = term;
fitRef.current = fit;
term.writeln("\x1b[90mconnecting to daemon…\x1b[0m");
setStatus("connecting");
const ws = new WebSocket(wsUrl);
wsRef.current = ws;
ws.onopen = () => {
// Cookie auth carries the session by default. If we ever flip to
// token-mode (no cookie), this is where we'd send an `auth` frame
// mirroring realtime/ws-client.ts. Server falls back gracefully.
setStatus("connected");
};
ws.onerror = () => {
// The browser only surfaces a generic Event; the server sends a
// structured terminal.error frame which we already render below.
// Keep this minimal so we don't double-up the error UI.
setStatus("error");
};
ws.onclose = (ev) => {
setStatus("closed");
term.writeln(
`\r\n\x1b[90mconnection closed (code=${ev.code})${
ev.reason ? ` reason=${ev.reason}` : ""
}\x1b[0m`,
);
};
ws.onmessage = (ev) => {
let env: Envelope;
try {
env = JSON.parse(typeof ev.data === "string" ? ev.data : "");
} catch {
return;
}
switch (env.type) {
case MSG_TERMINAL_OPENED: {
const p = env.payload as OpenedPayload;
sessionIdRef.current = p.session_id;
term.writeln(
`\x1b[90mattached to ${p.shell} (cwd: ${p.work_dir})\x1b[0m`,
);
// Send an initial resize matching the terminal's actual size,
// because the server-side open uses default 80x24 until we tell
// it otherwise.
const cols = term.cols;
const rows = term.rows;
ws.send(
JSON.stringify({
type: MSG_TERMINAL_RESIZE,
payload: {
session_id: p.session_id,
cols,
rows,
},
}),
);
break;
}
case MSG_TERMINAL_DATA: {
const p = env.payload as DataPayload;
if (typeof p.data_b64 !== "string") break;
const decoded = atobToUint8(p.data_b64);
// xterm.js accepts Uint8Array; we avoid the latin1 round-trip
// that would otherwise mangle UTF-8 PTY output.
term.write(decoded);
break;
}
case MSG_TERMINAL_EXIT: {
const p = env.payload as ExitPayload;
term.writeln(
`\r\n\x1b[90mprocess exited (code=${p.exit_code}${
p.reason ? `, reason=${p.reason}` : ""
})\x1b[0m`,
);
ws.close();
break;
}
case MSG_TERMINAL_ERROR: {
const p = env.payload as ErrorPayload;
setErrorMessage(`${p.code}: ${p.message}`);
term.writeln(`\r\n\x1b[31m${p.code}: ${p.message}\x1b[0m`);
break;
}
}
};
// Forward keystrokes as terminal.data with base64 of the UTF-8 bytes.
const dataSub = term.onData((data) => {
if (ws.readyState !== WebSocket.OPEN) return;
if (!sessionIdRef.current) return;
ws.send(
JSON.stringify({
type: MSG_TERMINAL_DATA,
payload: {
session_id: sessionIdRef.current,
data_b64: utf8ToBase64(data),
},
}),
);
});
const resizeSub = term.onResize(({ cols, rows }) => {
if (ws.readyState !== WebSocket.OPEN) return;
if (!sessionIdRef.current) return;
ws.send(
JSON.stringify({
type: MSG_TERMINAL_RESIZE,
payload: {
session_id: sessionIdRef.current,
cols,
rows,
},
}),
);
});
// Observe container size and re-fit so the PTY size tracks the panel
// (the right sidebar can be resized at runtime).
const ro = new ResizeObserver(() => {
try {
fit.fit();
} catch {
// fit() throws when the container has zero height during teardown;
// ignore — the next mount will rebind.
}
});
ro.observe(containerRef.current);
return () => {
dataSub.dispose();
resizeSub.dispose();
ro.disconnect();
try {
if (sessionIdRef.current && ws.readyState === WebSocket.OPEN) {
ws.send(
JSON.stringify({
type: MSG_TERMINAL_CLOSE,
payload: { session_id: sessionIdRef.current, reason: "panel_unmount" },
}),
);
}
} catch {
// ws may be already closing; nothing to do.
}
ws.close();
term.dispose();
termRef.current = null;
fitRef.current = null;
wsRef.current = null;
sessionIdRef.current = "";
};
}, [wsUrl, reconnectKey]);
if (!isDesktopRuntime()) {
return (
<div className="rounded-md border border-dashed p-4 text-sm text-muted-foreground">
The terminal is only available in the Multica Desktop app. It attaches
to the PTY hosted by the local daemon that ran the agent task.
</div>
);
}
return (
<div className="flex flex-col gap-2">
<div className="flex items-center justify-between text-xs text-muted-foreground">
<span>
Status: <span className="font-medium">{status}</span>
{errorMessage ? (
<span className="ml-2 text-destructive"> {errorMessage}</span>
) : null}
</span>
<Button
variant="ghost"
size="sm"
onClick={() => {
setErrorMessage("");
setReconnectKey((n) => n + 1);
}}
>
Reconnect
</Button>
</div>
<div
ref={containerRef}
className="h-[360px] w-full overflow-hidden rounded-md border bg-black"
/>
</div>
);
}
function deriveTerminalWsUrl(issueId: string, workspaceId: string): string {
// The API client knows the http(s) base URL; flip the scheme to ws(s)
// and target the proxy endpoint registered in router.go. Falls back to
// the page origin if for some reason the API base is empty (dev
// environments where the API lives on the same host).
let base = "";
try {
base = getApi().getBaseUrl();
} catch {
base = "";
}
if (!base && typeof window !== "undefined") {
base = window.location.origin;
}
const url = new URL(base);
if (url.protocol === "https:") {
url.protocol = "wss:";
} else if (url.protocol === "http:") {
url.protocol = "ws:";
}
url.pathname = url.pathname.replace(/\/$/, "") +
`/ws/issues/${encodeURIComponent(issueId)}/terminal`;
url.search = `?workspace_id=${encodeURIComponent(workspaceId)}&cols=120&rows=30`;
return url.toString();
}
function utf8ToBase64(s: string): string {
if (typeof TextEncoder !== "undefined") {
const bytes = new TextEncoder().encode(s);
let bin = "";
bytes.forEach((b) => {
bin += String.fromCharCode(b);
});
return btoa(bin);
}
// Fallback for old runtimes: assume latin1.
return btoa(s);
}
function atobToUint8(s: string): Uint8Array {
const bin = atob(s);
const out = new Uint8Array(bin.length);
for (let i = 0; i < bin.length; i++) {
out[i] = bin.charCodeAt(i);
}
return out;
}

View File

@@ -76,6 +76,8 @@
"@tiptap/react": "^3.22.1",
"@tiptap/starter-kit": "^3.22.1",
"@tiptap/suggestion": "^3.22.1",
"@xterm/xterm": "catalog:",
"@xterm/addon-fit": "catalog:",
"cmdk": "^1.1.1",
"hast-util-to-html": "^4.0.1",
"katex": "catalog:",

26
pnpm-lock.yaml generated
View File

@@ -33,6 +33,12 @@ catalogs:
'@vitejs/plugin-react':
specifier: ^6.0.1
version: 6.0.1
'@xterm/addon-fit':
specifier: ^0.10.0
version: 0.10.0
'@xterm/xterm':
specifier: ^5.5.0
version: 5.5.0
class-variance-authority:
specifier: ^0.7.1
version: 0.7.1
@@ -775,6 +781,12 @@ importers:
'@tiptap/suggestion':
specifier: ^3.22.1
version: 3.22.1(@tiptap/core@3.22.1(@tiptap/pm@3.22.1))(@tiptap/pm@3.22.1)
'@xterm/addon-fit':
specifier: 'catalog:'
version: 0.10.0(@xterm/xterm@5.5.0)
'@xterm/xterm':
specifier: 'catalog:'
version: 5.5.0
cmdk:
specifier: ^1.1.1
version: 1.1.1(@types/react-dom@19.2.3(@types/react@19.2.14))(@types/react@19.2.14)(react-dom@19.2.3(react@19.2.3))(react@19.2.3)
@@ -3318,6 +3330,14 @@ packages:
resolution: {integrity: sha512-9k/gHF6n/pAi/9tqr3m3aqkuiNosYTurLLUtc7xQ9sxB/wm7WPygCv8GYa6mS0fLJEHhqMC1ATYhz++U/lRHqg==}
engines: {node: '>=10.0.0'}
'@xterm/addon-fit@0.10.0':
resolution: {integrity: sha512-UFYkDm4HUahf2lnEyHvio51TNGiLK66mqP2JoATy7hRZeXaGMRDr00JiSF7m63vR5WKATF605yEggJKsw0JpMQ==}
peerDependencies:
'@xterm/xterm': ^5.0.0
'@xterm/xterm@5.5.0':
resolution: {integrity: sha512-hqJHYaQb5OptNunnyAnkHyM8aCjZ1MEIDTQu1iIbbTD/xops91NB5yq1ZK/dC2JDbVWtF23zUtl9JE2NqwT87A==}
abbrev@3.0.1:
resolution: {integrity: sha512-AO2ac6pjRB3SJmGJo+v5/aK6Omggp6fsLrs6wN9bd35ulu4cCwaAU9+7ZhXjeqHVkaHThLuzH0nZr0YpCDhygg==}
engines: {node: ^18.17.0 || >=20.5.0}
@@ -10103,6 +10123,12 @@ snapshots:
'@xmldom/xmldom@0.8.12': {}
'@xterm/addon-fit@0.10.0(@xterm/xterm@5.5.0)':
dependencies:
'@xterm/xterm': 5.5.0
'@xterm/xterm@5.5.0': {}
abbrev@3.0.1: {}
accepts@2.0.0:

View File

@@ -49,6 +49,10 @@ catalog:
# Virtualized timeline (issue detail comments)
react-virtuoso: "^4.14.0"
# Interactive PTY rendering (Issue → Terminal panel on Desktop, MUL-2295)
"@xterm/xterm": "^5.5.0"
"@xterm/addon-fit": "^0.10.0"
# Product analytics
posthog-js: "^1.176.1"

View File

@@ -197,6 +197,13 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
realtime.HandleWebSocket(hub, mc, pr, slugResolver, w, r)
})
// Terminal proxy WebSocket: browser ↔ server ↔ daemonws hub ↔ daemon PTY.
// Auth is cookie-or-first-frame (browsers can't set Authorization on a
// WS upgrade), matching the /ws pattern above. Workspace + issue
// membership is enforced inside the handler before the upgrade so a
// 403 surfaces as an HTTP response rather than a silent WS close.
r.Get("/ws/issues/{issue_id}/terminal", h.HandleIssueTerminalWS)
// Local file serving (when using local storage)
if local, ok := store.(*storage.LocalStorage); ok {
r.Get("/uploads/*", func(w http.ResponseWriter, r *http.Request) {

View File

@@ -8,6 +8,7 @@ require (
github.com/aws/aws-sdk-go-v2/credentials v1.19.13
github.com/aws/aws-sdk-go-v2/service/s3 v1.97.3
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.5
github.com/creack/pty v1.1.21
github.com/go-chi/chi/v5 v5.2.5
github.com/go-chi/cors v1.2.2
github.com/golang-jwt/jwt/v5 v5.3.1

View File

@@ -48,6 +48,8 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.21 h1:1/QdRyBaHHJP61QkWMXlOIBfsgdDeeKfK8SYVUWJKf0=
github.com/creack/pty v1.1.21/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@@ -18,6 +18,7 @@ import (
"github.com/multica-ai/multica/server/internal/cli"
"github.com/multica-ai/multica/server/internal/daemon/execenv"
"github.com/multica-ai/multica/server/internal/daemon/repocache"
"github.com/multica-ai/multica/server/internal/daemon/terminal"
"github.com/multica-ai/multica/server/pkg/agent"
)
@@ -138,6 +139,24 @@ type Daemon struct {
// deleted bare clone and an unrelated `not empty` cleanup failure.
bgSyncs sync.WaitGroup
// terminalManager owns every live PTY this daemon hosts on behalf of
// users opening the Issue → Terminal panel. Constructed in New() so
// tests can override the manager config via cfg fields if needed
// without forcing the production daemon to wire it lazily.
terminalManager *terminal.Manager
// terminalBridge mediates between the WS hub (server-side) and the
// terminalManager. Only non-nil while a daemonws connection is up; the
// wakeup loop swaps in a fresh bridge on every reconnect because
// session_ids cannot survive a hub disconnect.
terminalBridgeMu sync.RWMutex
terminalBridge *terminalBridge
// wsWritesMu guards wsWrites. wsWrites is the active daemonws writer
// queue (nil while disconnected); the terminalBridge funnels every
// outbound terminal.* frame through Daemon.sendWSFrame, which reads
// this pointer under the lock so reconnects don't race the bridge.
wsWritesMu sync.RWMutex
wsWrites chan<- []byte
runner taskRunner // executes agent tasks; set to d.runTask by New(), overridable in tests
cancelPollInterval time.Duration // how often handleTask polls for server-side cancellation; overridable in tests
// runUpdateFn executes the brew-or-download upgrade. Set to d.runUpdate by
@@ -171,9 +190,80 @@ func New(cfg Config, logger *slog.Logger) *Daemon {
}
d.runner = taskRunnerFunc(d.runTask)
d.runUpdateFn = d.runUpdate
// The terminal manager has no TaskLookup wired: every Open call goes
// through OpenWithInfo using a TaskInfo that the server resolved from
// the DB before forwarding terminal.open over daemonws (the daemon
// does not maintain a persistent task cache).
d.terminalManager = terminal.NewManager(terminal.ManagerConfig{
IdleTimeout: terminal.DefaultIdleTimeout,
Logger: logger,
}, nil)
return d
}
// sendWSFrame pushes a raw frame onto the current daemonws writer queue.
// Returns false when no connection is active or the writer queue is
// saturated — callers (today: terminalBridge) drop the frame rather than
// block, because the next reconnect or queue drain will unstick us.
func (d *Daemon) sendWSFrame(frame []byte) bool {
d.wsWritesMu.RLock()
writes := d.wsWrites
d.wsWritesMu.RUnlock()
if writes == nil {
return false
}
select {
case writes <- frame:
return true
default:
return false
}
}
// installWSWrites stores the connection-local writer queue so the bridge
// can address it through Daemon.sendWSFrame. A fresh terminalBridge is
// installed each call: session_ids minted on a previous WS connection are
// not valid on the new one (the server-side proxy registered routing
// against the old hub client), so it's cleaner to tear every PTY down
// than to half-revive them.
func (d *Daemon) installWSWrites(writes chan<- []byte) {
d.wsWritesMu.Lock()
d.wsWrites = writes
d.wsWritesMu.Unlock()
bridge := newTerminalBridge(d.terminalManager, d.logger, d.sendWSFrame)
d.terminalBridgeMu.Lock()
prev := d.terminalBridge
d.terminalBridge = bridge
d.terminalBridgeMu.Unlock()
if prev != nil {
prev.closeAll("ws_reconnect")
}
}
// clearWSWrites removes the writer pointer and tears down every live
// terminal session bound to this connection. Called from the wakeup
// connection's deferred cleanup.
func (d *Daemon) clearWSWrites() {
d.wsWritesMu.Lock()
d.wsWrites = nil
d.wsWritesMu.Unlock()
d.terminalBridgeMu.Lock()
bridge := d.terminalBridge
d.terminalBridge = nil
d.terminalBridgeMu.Unlock()
if bridge != nil {
bridge.closeAll("ws_disconnect")
}
}
func (d *Daemon) currentTerminalBridge() *terminalBridge {
d.terminalBridgeMu.RLock()
defer d.terminalBridgeMu.RUnlock()
return d.terminalBridge
}
// setAgentVersion records the detected CLI version for an agent provider so
// later task-dispatch code (e.g. Codex sandbox policy) can read it.
func (d *Daemon) setAgentVersion(provider, version string) {

View File

@@ -0,0 +1,14 @@
// Package terminal manages interactive PTY sessions bound to a task's
// workdir on the local daemon.
//
// A Manager owns the lifecycle of all live PtySessions. Callers (the
// daemonws bridge today, the CLI socket later) translate WebSocket
// terminal.* frames into method calls on Manager, and the Manager
// forwards PTY output back through a per-session Output channel.
//
// Sessions run with the daemon process's identity — there is no
// additional sandbox. This is the same trust boundary as agent runs
// (which are also daemon-spawned child processes). The Manager only
// enforces that the requesting client's workspace matches the task's
// workspace; anything beyond that is the OS's responsibility.
package terminal

View File

@@ -0,0 +1,14 @@
package terminal
import "errors"
// Sentinel errors returned by Manager. Callers map these to the
// protocol.TerminalErrorCode* constants when reporting to clients.
var (
ErrTaskNotFound = errors.New("terminal: task not found")
ErrWorkspaceMismatch = errors.New("terminal: task belongs to a different workspace")
ErrSessionNotFound = errors.New("terminal: session not found")
ErrUnsupportedOS = errors.New("terminal: PTY not supported on this OS")
ErrSpawnFailed = errors.New("terminal: failed to spawn shell")
ErrManagerClosed = errors.New("terminal: manager is shut down")
)

View File

@@ -0,0 +1,347 @@
package terminal
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"github.com/google/uuid"
)
// DefaultIdleTimeout is the recommended IdleTimeout for production
// daemon wiring. Callers must set ManagerConfig.IdleTimeout to this
// (or any positive duration) explicitly; zero/negative disables the
// idle sweep.
const DefaultIdleTimeout = 60 * time.Minute
// TaskInfo is the subset of task state the Manager needs to set up a PTY.
// The daemon resolves a TaskID into TaskInfo via TaskLookup at open time.
type TaskInfo struct {
TaskID string
WorkspaceID string
IssueID string
WorkDir string
PriorSessionID string // injected as CLAUDE_SESSION_ID for `claude --resume`
}
// TaskLookup resolves a TaskID into the workdir + workspace required to
// open a PTY. Returns ErrTaskNotFound when the task is unknown. Lookups
// hit the daemon's local task cache in production; tests supply a stub.
type TaskLookup func(ctx context.Context, taskID string) (TaskInfo, error)
// OpenParams is the input to Manager.Open.
type OpenParams struct {
// TaskID identifies the workdir the PTY should run in.
TaskID string
// WorkspaceID is the workspace the caller is acting on behalf of.
// Open rejects the request if it does not match the task's workspace
// (cross-workspace ACL — clients never see other workspaces' workdirs).
WorkspaceID string
// UserID is the human user who opened the terminal. Logged in audit
// records; the PTY itself runs as the daemon process owner.
UserID string
// Cols/Rows seed the initial PTY window size. Zero values default to 80x24.
Cols uint16
Rows uint16
}
// ManagerConfig tunes Manager behaviour. Zero values are sensible defaults.
type ManagerConfig struct {
// Shell to spawn for each session. Defaults to "bash" with "-l".
// Overridable for tests; the production daemon hardcodes bash for now
// (RFC open question #4 — shell selection deferred to a later release).
ShellPath string
ShellArgs []string
// IdleTimeout closes a session that has had no I/O for this long.
// Zero or negative disables the sweep entirely. Production daemon
// wiring should pass DefaultIdleTimeout explicitly; we intentionally
// don't default here so callers stay in control (the docs page for
// this package previously said "0 disables" while NewManager silently
// rewrote 0 to 60min — those two have to agree).
IdleTimeout time.Duration
// Spawner overrides PTY spawning. Defaults to ptyStartShell which
// shells out to creack/pty. Tests inject a fake to avoid forking.
Spawner Spawner
// Now returns the current time. Defaults to time.Now. Tests inject a
// fake clock to drive IdleTimeout deterministically.
Now func() time.Time
// Logger receives operational events. Defaults to slog.Default().
Logger *slog.Logger
}
// Manager owns all live PtySessions on this daemon. It is safe for
// concurrent use.
type Manager struct {
cfg ManagerConfig
lookup TaskLookup
mu sync.Mutex
sessions map[string]*PtySession
closed bool
// closeDone is closed by the first Close() caller AFTER finalize
// finishes (every session deregistered, Done() closed). Subsequent
// concurrent callers wait on it instead of racing past, so all
// Close() returns share the same "manager fully drained" guarantee.
closeDone chan struct{}
}
// NewManager constructs a Manager. lookup may be nil in tests that only
// exercise direct session APIs.
func NewManager(cfg ManagerConfig, lookup TaskLookup) *Manager {
if cfg.ShellPath == "" {
cfg.ShellPath = "bash"
cfg.ShellArgs = []string{"-l"}
}
// IdleTimeout intentionally not defaulted — see ManagerConfig.
if cfg.Spawner == nil {
cfg.Spawner = realSpawner{}
}
if cfg.Now == nil {
cfg.Now = time.Now
}
if cfg.Logger == nil {
cfg.Logger = slog.Default()
}
return &Manager{
cfg: cfg,
lookup: lookup,
sessions: make(map[string]*PtySession),
closeDone: make(chan struct{}),
}
}
// Open spawns a new PTY session for the given task. The returned
// session is also registered with the manager and retrievable via Get.
func (m *Manager) Open(ctx context.Context, p OpenParams) (*PtySession, error) {
if m.lookup == nil {
return nil, fmt.Errorf("terminal: Manager has no TaskLookup configured")
}
info, err := m.lookup(ctx, p.TaskID)
if err != nil {
return nil, err
}
if info.WorkspaceID != p.WorkspaceID {
return nil, ErrWorkspaceMismatch
}
if info.WorkDir == "" {
return nil, ErrTaskNotFound
}
return m.openWith(info, p)
}
// OpenWithInfo spawns a PTY against a caller-supplied TaskInfo, bypassing
// the configured TaskLookup. Production daemon wiring takes this path
// because the server resolves task metadata from its DB before forwarding
// terminal.open over the daemonws hub — the daemon trusts the server-
// authenticated payload and does not have its own task cache. The
// workspace mismatch check still runs so a server bug or a relayed frame
// cannot cross workspace boundaries.
func (m *Manager) OpenWithInfo(_ context.Context, info TaskInfo, p OpenParams) (*PtySession, error) {
if info.WorkspaceID != p.WorkspaceID {
return nil, ErrWorkspaceMismatch
}
if info.WorkDir == "" {
return nil, ErrTaskNotFound
}
if info.TaskID == "" {
info.TaskID = p.TaskID
}
return m.openWith(info, p)
}
func (m *Manager) openWith(info TaskInfo, p OpenParams) (*PtySession, error) {
cols, rows := normalizeSize(p.Cols, p.Rows)
env := buildEnv(info, p.UserID)
m.mu.Lock()
if m.closed {
m.mu.Unlock()
return nil, ErrManagerClosed
}
m.mu.Unlock()
startedAt := m.cfg.Now()
pty, err := m.cfg.Spawner.Start(SpawnRequest{
Shell: m.cfg.ShellPath,
Args: m.cfg.ShellArgs,
Cwd: info.WorkDir,
Env: env,
Cols: cols,
Rows: rows,
Started: startedAt,
})
if err != nil {
// Double-%w so errors.Is matches both ErrSpawnFailed AND any
// sentinel the spawner surfaced (notably ErrUnsupportedOS from
// the windows stub — the protocol layer needs to distinguish
// "no PTY on this OS" from generic spawn failures).
return nil, fmt.Errorf("%w: %w", ErrSpawnFailed, err)
}
sess := &PtySession{
id: uuid.NewString(),
taskID: info.TaskID,
workspaceID: info.WorkspaceID,
issueID: info.IssueID,
workDir: info.WorkDir,
userID: p.UserID,
shellPath: m.cfg.ShellPath,
cols: cols,
rows: rows,
pty: pty,
output: make(chan []byte, 64),
exit: make(chan ExitInfo, 1),
done: make(chan struct{}),
stop: make(chan struct{}),
now: m.cfg.Now,
idleTimeout: m.cfg.IdleTimeout,
startedAt: startedAt,
lastIO: startedAt,
logger: m.cfg.Logger.With("session_id_pending", true, "task_id", info.TaskID),
onClose: func(id string) { m.deregister(id) },
}
sess.logger = m.cfg.Logger.With("session_id", sess.id, "task_id", info.TaskID)
m.mu.Lock()
if m.closed {
m.mu.Unlock()
_ = pty.Close()
// We won that race: spawn succeeded but the manager closed before
// we could register the session, so waitLoop never runs. Reap the
// child synchronously here — pty.Close fires SIGHUP/SIGKILL but
// only Wait() collects the exit status, otherwise the unix child
// stays around as a zombie until the daemon process dies.
_, _ = pty.Wait()
return nil, ErrManagerClosed
}
m.sessions[sess.id] = sess
m.mu.Unlock()
sess.start()
return sess, nil
}
// Get returns the session with the given id, or ErrSessionNotFound.
func (m *Manager) Get(id string) (*PtySession, error) {
m.mu.Lock()
defer m.mu.Unlock()
sess, ok := m.sessions[id]
if !ok {
return nil, ErrSessionNotFound
}
return sess, nil
}
// Sessions returns a snapshot of currently registered session IDs.
func (m *Manager) Sessions() []string {
m.mu.Lock()
defer m.mu.Unlock()
ids := make([]string, 0, len(m.sessions))
for id := range m.sessions {
ids = append(ids, id)
}
return ids
}
// Close tears down every live session and refuses subsequent Open calls.
// Safe to call concurrently from multiple goroutines: the first caller
// runs the actual teardown, the rest block on closeDone until that
// teardown is fully observable. Every Close() return — first or Nth —
// thus carries the same "manager drained, every session finalized"
// guarantee that downstream GC/audit cleanup depends on.
func (m *Manager) Close() {
m.mu.Lock()
if m.closed {
done := m.closeDone
m.mu.Unlock()
<-done
return
}
m.closed = true
live := make([]*PtySession, 0, len(m.sessions))
for _, s := range m.sessions {
live = append(live, s)
}
m.mu.Unlock()
// Parallel: each session.Close blocks for the unix spawner's
// SIGHUP→grace→SIGKILL window. Running serially would multiply
// shutdown latency by N sessions. We additionally wait on each
// session's Done() so Manager.Close returning is a hard guarantee
// that every session finalized (output closed, deregistered, done
// fired) — downstream GC/audit cleanup relies on this.
var wg sync.WaitGroup
for _, s := range live {
wg.Add(1)
go func(s *PtySession) {
defer wg.Done()
s.Close("manager_shutdown")
<-s.Done()
}(s)
}
wg.Wait()
close(m.closeDone)
}
// CheckIdle walks every session and closes those whose idle interval
// has elapsed. The daemon's existing GC loop calls this periodically;
// each session also self-monitors via its own timer for cases where the
// outer loop runs at a coarser cadence than IdleTimeout.
func (m *Manager) CheckIdle() {
if m.cfg.IdleTimeout <= 0 {
return
}
m.mu.Lock()
sessions := make([]*PtySession, 0, len(m.sessions))
for _, s := range m.sessions {
sessions = append(sessions, s)
}
m.mu.Unlock()
now := m.cfg.Now()
for _, s := range sessions {
if now.Sub(s.LastIO()) >= m.cfg.IdleTimeout {
s.Close("idle_timeout")
}
}
}
func (m *Manager) deregister(id string) {
m.mu.Lock()
delete(m.sessions, id)
m.mu.Unlock()
}
func normalizeSize(cols, rows uint16) (uint16, uint16) {
if cols == 0 {
cols = 80
}
if rows == 0 {
rows = 24
}
return cols, rows
}
func buildEnv(info TaskInfo, userID string) []string {
env := []string{
"MULTICA_WORKSPACE_ID=" + info.WorkspaceID,
"MULTICA_TASK_ID=" + info.TaskID,
}
if info.IssueID != "" {
env = append(env, "MULTICA_ISSUE_ID="+info.IssueID)
}
if userID != "" {
env = append(env, "MULTICA_USER_ID="+userID)
}
if info.PriorSessionID != "" {
// Injected so `claude --resume $CLAUDE_SESSION_ID` continues the
// same session that the agent run was using (see RFC §Resume).
env = append(env, "CLAUDE_SESSION_ID="+info.PriorSessionID)
}
return env
}

View File

@@ -0,0 +1,898 @@
package terminal
import (
"context"
"errors"
"io"
"sync"
"sync/atomic"
"testing"
"time"
)
// fakePTY is a Spawner-served stand-in for a real PTY. Tests push child
// output via WriteFromChild and read client input via ReadFromClient.
type fakePTY struct {
t *testing.T
// child -> client (output queue, read by readLoop)
childToClient chan []byte
// client -> child (writes captured into a buffer slice under mu)
mu sync.Mutex
clientWrote [][]byte
cols, rows uint16
// closeOnce coordinates teardown
closeOnce sync.Once
closeCh chan struct{}
// waitDone signals Wait can return. Defaults closed by Close.
waitOnce sync.Once
waitDone chan struct{}
// waitDelay (optional) sleeps inside Wait AFTER waitDone fires.
// Lets tests prove Manager.Close waits for session finalize rather
// than just for s.Close() to return.
waitDelay time.Duration
exitCode int32
resizedCh chan [2]uint16
closed atomic.Bool
// waitCount tracks how many times Wait() was invoked. Lets tests
// assert the cleanup path reaped the child even when no session was
// ever registered (Manager.Close racing Open).
waitCount atomic.Int32
}
func newFakePTY(t *testing.T, cols, rows uint16) *fakePTY {
return &fakePTY{
t: t,
childToClient: make(chan []byte, 8),
cols: cols,
rows: rows,
closeCh: make(chan struct{}),
waitDone: make(chan struct{}),
resizedCh: make(chan [2]uint16, 8),
}
}
func (p *fakePTY) Read(b []byte) (int, error) {
select {
case chunk, ok := <-p.childToClient:
if !ok {
return 0, io.EOF
}
n := copy(b, chunk)
return n, nil
case <-p.closeCh:
return 0, io.EOF
}
}
func (p *fakePTY) Write(b []byte) (int, error) {
if p.closed.Load() {
return 0, io.ErrClosedPipe
}
p.mu.Lock()
c := make([]byte, len(b))
copy(c, b)
p.clientWrote = append(p.clientWrote, c)
p.mu.Unlock()
return len(b), nil
}
func (p *fakePTY) Resize(cols, rows uint16) error {
if p.closed.Load() {
return io.ErrClosedPipe
}
p.mu.Lock()
p.cols, p.rows = cols, rows
p.mu.Unlock()
select {
case p.resizedCh <- [2]uint16{cols, rows}:
default:
}
return nil
}
func (p *fakePTY) Wait() (int, error) {
p.waitCount.Add(1)
<-p.waitDone
if p.waitDelay > 0 {
time.Sleep(p.waitDelay)
}
return int(atomic.LoadInt32(&p.exitCode)), nil
}
func (p *fakePTY) Close() error {
p.closeOnce.Do(func() {
p.closed.Store(true)
close(p.closeCh)
close(p.childToClient)
p.waitOnce.Do(func() { close(p.waitDone) })
})
return nil
}
// pushChildOutput simulates the shell writing bytes to its stdout/stderr.
func (p *fakePTY) pushChildOutput(b []byte) {
select {
case p.childToClient <- b:
case <-time.After(time.Second):
p.t.Fatalf("childToClient send timed out — readLoop not draining")
}
}
func (p *fakePTY) writes() [][]byte {
p.mu.Lock()
defer p.mu.Unlock()
out := make([][]byte, len(p.clientWrote))
copy(out, p.clientWrote)
return out
}
func (p *fakePTY) size() (uint16, uint16) {
p.mu.Lock()
defer p.mu.Unlock()
return p.cols, p.rows
}
// fakeSpawner records each spawn so tests can inspect injected env / cwd.
type fakeSpawner struct {
t *testing.T
spawnsMu sync.Mutex
spawns []SpawnRequest
make func(*testing.T, SpawnRequest) (*fakePTY, error)
}
func (s *fakeSpawner) Start(req SpawnRequest) (PTY, error) {
s.spawnsMu.Lock()
s.spawns = append(s.spawns, req)
s.spawnsMu.Unlock()
pty, err := s.make(s.t, req)
if err != nil {
return nil, err
}
return pty, nil
}
func (s *fakeSpawner) lastRequest() SpawnRequest {
s.spawnsMu.Lock()
defer s.spawnsMu.Unlock()
if len(s.spawns) == 0 {
return SpawnRequest{}
}
return s.spawns[len(s.spawns)-1]
}
// helper: build a Manager with a default fake spawner and a single task.
type fixture struct {
mgr *Manager
spawner *fakeSpawner
tasks map[string]TaskInfo
now func() time.Time
clockMu sync.Mutex
clock time.Time
}
func newFixture(t *testing.T, opts ...func(*ManagerConfig)) *fixture {
f := &fixture{
tasks: map[string]TaskInfo{
"task-1": {
TaskID: "task-1",
WorkspaceID: "ws-A",
IssueID: "issue-1",
WorkDir: t.TempDir(),
PriorSessionID: "claude-session-xyz",
},
},
clock: time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC),
}
f.now = func() time.Time {
f.clockMu.Lock()
defer f.clockMu.Unlock()
return f.clock
}
f.spawner = &fakeSpawner{
t: t,
make: func(tt *testing.T, req SpawnRequest) (*fakePTY, error) { return newFakePTY(tt, req.Cols, req.Rows), nil },
}
cfg := ManagerConfig{
ShellPath: "/usr/bin/bash",
ShellArgs: []string{"-l"},
IdleTimeout: 0,
Spawner: f.spawner,
Now: f.now,
}
for _, opt := range opts {
opt(&cfg)
}
lookup := func(_ context.Context, id string) (TaskInfo, error) {
info, ok := f.tasks[id]
if !ok {
return TaskInfo{}, ErrTaskNotFound
}
return info, nil
}
f.mgr = NewManager(cfg, lookup)
return f
}
func (f *fixture) advance(d time.Duration) {
f.clockMu.Lock()
f.clock = f.clock.Add(d)
f.clockMu.Unlock()
}
// drainPTY pulls the *fakePTY back out of the spawner so tests can drive it.
func (f *fixture) lastPTY(t *testing.T) *fakePTY {
t.Helper()
req := f.spawner.lastRequest()
if req.Shell == "" {
t.Fatal("no spawn recorded")
}
// The Spawner.make closure always returns a *fakePTY; the manager
// wraps it as a PTY interface and we don't retain the concrete in
// the manager. Re-acquire via the registry by walking sessions.
for _, id := range f.mgr.Sessions() {
s, err := f.mgr.Get(id)
if err == nil {
if fp, ok := s.pty.(*fakePTY); ok {
return fp
}
}
}
t.Fatal("no fake PTY found in any registered session")
return nil
}
func TestManager_OpenSpawnsWithInjectedEnvAndCwd(t *testing.T) {
f := newFixture(t)
defer f.mgr.Close()
sess, err := f.mgr.Open(context.Background(), OpenParams{
TaskID: "task-1",
WorkspaceID: "ws-A",
UserID: "user-42",
Cols: 120,
Rows: 40,
})
if err != nil {
t.Fatalf("Open: %v", err)
}
if sess.ID() == "" {
t.Fatal("session ID empty")
}
if got := sess.WorkDir(); got != f.tasks["task-1"].WorkDir {
t.Errorf("workdir = %q, want %q", got, f.tasks["task-1"].WorkDir)
}
req := f.spawner.lastRequest()
if req.Cwd != f.tasks["task-1"].WorkDir {
t.Errorf("spawn cwd = %q, want %q", req.Cwd, f.tasks["task-1"].WorkDir)
}
if req.Cols != 120 || req.Rows != 40 {
t.Errorf("spawn size = %dx%d, want 120x40", req.Cols, req.Rows)
}
wantEnv := map[string]string{
"MULTICA_WORKSPACE_ID": "ws-A",
"MULTICA_TASK_ID": "task-1",
"MULTICA_ISSUE_ID": "issue-1",
"MULTICA_USER_ID": "user-42",
"CLAUDE_SESSION_ID": "claude-session-xyz",
}
envMap := map[string]string{}
for _, kv := range req.Env {
for i := 0; i < len(kv); i++ {
if kv[i] == '=' {
envMap[kv[:i]] = kv[i+1:]
break
}
}
}
for k, want := range wantEnv {
if got := envMap[k]; got != want {
t.Errorf("env %s = %q, want %q", k, got, want)
}
}
}
func TestManager_DefaultSize(t *testing.T) {
f := newFixture(t)
defer f.mgr.Close()
_, err := f.mgr.Open(context.Background(), OpenParams{
TaskID: "task-1",
WorkspaceID: "ws-A",
})
if err != nil {
t.Fatalf("Open: %v", err)
}
req := f.spawner.lastRequest()
if req.Cols != 80 || req.Rows != 24 {
t.Errorf("default size = %dx%d, want 80x24", req.Cols, req.Rows)
}
}
func TestManager_RejectsCrossWorkspace(t *testing.T) {
f := newFixture(t)
defer f.mgr.Close()
_, err := f.mgr.Open(context.Background(), OpenParams{
TaskID: "task-1",
WorkspaceID: "ws-B-not-the-tasks-workspace",
})
if !errors.Is(err, ErrWorkspaceMismatch) {
t.Fatalf("Open err = %v, want ErrWorkspaceMismatch", err)
}
if got := len(f.mgr.Sessions()); got != 0 {
t.Errorf("Sessions after rejected open = %d, want 0", got)
}
}
func TestManager_OpenWithInfoBypassesLookup(t *testing.T) {
// Manager is built with a nil lookup; OpenWithInfo must still succeed
// when the server-supplied TaskInfo is well-formed. This is the path
// daemonws hits in production — the daemon has no task cache, so the
// server resolves task metadata and embeds it on the protocol.
spawner := &fakeSpawner{
t: t,
make: func(tt *testing.T, req SpawnRequest) (*fakePTY, error) { return newFakePTY(tt, req.Cols, req.Rows), nil },
}
mgr := NewManager(ManagerConfig{
ShellPath: "/usr/bin/bash",
ShellArgs: []string{"-l"},
Spawner: spawner,
}, nil)
defer mgr.Close()
dir := t.TempDir()
info := TaskInfo{
TaskID: "task-via-ws",
WorkspaceID: "ws-A",
IssueID: "issue-1",
WorkDir: dir,
PriorSessionID: "claude-session-xyz",
}
sess, err := mgr.OpenWithInfo(context.Background(), info, OpenParams{
TaskID: "task-via-ws",
WorkspaceID: "ws-A",
UserID: "user-1",
})
if err != nil {
t.Fatalf("OpenWithInfo: %v", err)
}
if sess.WorkDir() != dir {
t.Errorf("WorkDir = %q, want %q", sess.WorkDir(), dir)
}
req := spawner.lastRequest()
if req.Cwd != dir {
t.Errorf("spawn cwd = %q, want %q", req.Cwd, dir)
}
gotEnv := map[string]string{}
for _, kv := range req.Env {
eq := -1
for i, c := range kv {
if c == '=' {
eq = i
break
}
}
if eq > 0 {
gotEnv[kv[:eq]] = kv[eq+1:]
}
}
if gotEnv["MULTICA_WORKSPACE_ID"] != "ws-A" {
t.Errorf("MULTICA_WORKSPACE_ID = %q, want ws-A", gotEnv["MULTICA_WORKSPACE_ID"])
}
if gotEnv["MULTICA_ISSUE_ID"] != "issue-1" {
t.Errorf("MULTICA_ISSUE_ID = %q, want issue-1", gotEnv["MULTICA_ISSUE_ID"])
}
if gotEnv["MULTICA_TASK_ID"] != "task-via-ws" {
t.Errorf("MULTICA_TASK_ID = %q, want task-via-ws", gotEnv["MULTICA_TASK_ID"])
}
if gotEnv["CLAUDE_SESSION_ID"] != "claude-session-xyz" {
t.Errorf("CLAUDE_SESSION_ID = %q, want claude-session-xyz", gotEnv["CLAUDE_SESSION_ID"])
}
}
func TestManager_OpenWithInfoRejectsCrossWorkspace(t *testing.T) {
// OpenWithInfo skips the lookup, but the workspace_id check on the
// caller-supplied OpenParams vs. the supplied TaskInfo still has to
// run — otherwise a misrouted frame from one workspace could attach
// to another workspace's workdir.
spawner := &fakeSpawner{
t: t,
make: func(tt *testing.T, req SpawnRequest) (*fakePTY, error) { return newFakePTY(tt, req.Cols, req.Rows), nil },
}
mgr := NewManager(ManagerConfig{Spawner: spawner}, nil)
defer mgr.Close()
_, err := mgr.OpenWithInfo(context.Background(), TaskInfo{
TaskID: "task-via-ws",
WorkspaceID: "ws-A",
WorkDir: t.TempDir(),
}, OpenParams{
TaskID: "task-via-ws",
WorkspaceID: "ws-B",
})
if !errors.Is(err, ErrWorkspaceMismatch) {
t.Fatalf("err = %v, want ErrWorkspaceMismatch", err)
}
if got := len(mgr.Sessions()); got != 0 {
t.Errorf("Sessions after rejected OpenWithInfo = %d, want 0", got)
}
}
func TestManager_RejectsUnknownTask(t *testing.T) {
f := newFixture(t)
defer f.mgr.Close()
_, err := f.mgr.Open(context.Background(), OpenParams{
TaskID: "does-not-exist",
WorkspaceID: "ws-A",
})
if !errors.Is(err, ErrTaskNotFound) {
t.Fatalf("Open err = %v, want ErrTaskNotFound", err)
}
}
func TestSession_DataRoundTrip(t *testing.T) {
f := newFixture(t)
defer f.mgr.Close()
sess, err := f.mgr.Open(context.Background(), OpenParams{
TaskID: "task-1",
WorkspaceID: "ws-A",
})
if err != nil {
t.Fatalf("Open: %v", err)
}
pty := f.lastPTY(t)
// client → child
if _, err := sess.Write([]byte("ls -al\n")); err != nil {
t.Fatalf("Write: %v", err)
}
// child → client
pty.pushChildOutput([]byte("total 0\n"))
select {
case got := <-sess.Output():
if string(got) != "total 0\n" {
t.Errorf("Output chunk = %q, want %q", got, "total 0\n")
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for Output chunk")
}
writes := pty.writes()
if len(writes) != 1 || string(writes[0]) != "ls -al\n" {
t.Errorf("recorded writes = %#v, want one 'ls -al\\n'", writes)
}
}
func TestSession_Resize(t *testing.T) {
f := newFixture(t)
defer f.mgr.Close()
sess, err := f.mgr.Open(context.Background(), OpenParams{
TaskID: "task-1",
WorkspaceID: "ws-A",
Cols: 80, Rows: 24,
})
if err != nil {
t.Fatalf("Open: %v", err)
}
pty := f.lastPTY(t)
if err := sess.Resize(132, 50); err != nil {
t.Fatalf("Resize: %v", err)
}
c, r := sess.Size()
if c != 132 || r != 50 {
t.Errorf("Size = %dx%d, want 132x50", c, r)
}
gc, gr := pty.size()
if gc != 132 || gr != 50 {
t.Errorf("PTY size = %dx%d, want 132x50", gc, gr)
}
}
func TestSession_CloseDeregistersAndDelivers(t *testing.T) {
f := newFixture(t)
defer f.mgr.Close()
sess, err := f.mgr.Open(context.Background(), OpenParams{
TaskID: "task-1",
WorkspaceID: "ws-A",
})
if err != nil {
t.Fatalf("Open: %v", err)
}
id := sess.ID()
sess.Close("user_requested")
select {
case info := <-sess.ExitC():
if info.Reason != "user_requested" {
t.Errorf("exit reason = %q, want user_requested", info.Reason)
}
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for ExitC")
}
// Output should close once exit fires; verify by ranging.
drained := false
for range sess.Output() {
drained = true
}
_ = drained
<-sess.Done()
// Session must be deregistered.
if _, err := f.mgr.Get(id); !errors.Is(err, ErrSessionNotFound) {
t.Errorf("Get after Close = %v, want ErrSessionNotFound", err)
}
}
func TestManager_IdleTimeoutSweep(t *testing.T) {
f := newFixture(t, func(c *ManagerConfig) {
c.IdleTimeout = 30 * time.Minute
})
defer f.mgr.Close()
sess, err := f.mgr.Open(context.Background(), OpenParams{
TaskID: "task-1",
WorkspaceID: "ws-A",
})
if err != nil {
t.Fatalf("Open: %v", err)
}
// 29 minutes — still active.
f.advance(29 * time.Minute)
f.mgr.CheckIdle()
if _, err := f.mgr.Get(sess.ID()); err != nil {
t.Fatalf("session evicted before idle timeout: %v", err)
}
// Cross the threshold.
f.advance(2 * time.Minute)
f.mgr.CheckIdle()
select {
case info := <-sess.ExitC():
if info.Reason != "idle_timeout" {
t.Errorf("exit reason = %q, want idle_timeout", info.Reason)
}
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for idle close")
}
if _, err := f.mgr.Get(sess.ID()); !errors.Is(err, ErrSessionNotFound) {
t.Errorf("session not deregistered after idle sweep")
}
}
func TestManager_CloseTearsDownAllSessions(t *testing.T) {
f := newFixture(t)
s1, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
if err != nil {
t.Fatalf("Open: %v", err)
}
s2, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
if err != nil {
t.Fatalf("Open: %v", err)
}
f.mgr.Close()
for _, s := range []*PtySession{s1, s2} {
select {
case <-s.Done():
case <-time.After(2 * time.Second):
t.Fatalf("session %s did not tear down", s.ID())
}
}
if got := len(f.mgr.Sessions()); got != 0 {
t.Errorf("Sessions after Manager.Close = %d, want 0", got)
}
// Subsequent opens must be rejected.
if _, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"}); !errors.Is(err, ErrManagerClosed) {
t.Errorf("Open after Close = %v, want ErrManagerClosed", err)
}
}
func TestSession_CloseWithFullOutputBufferDoesNotPanic(t *testing.T) {
// Regression: Close used to race with readLoop's "output <- chunk"
// when the channel was full. waitLoop closed output unconditionally,
// which could panic on send-to-closed-channel. The new lifecycle
// has waitLoop wait on a WaitGroup so readLoop's blocked send
// unblocks via <-stop before the close runs.
f := newFixture(t)
defer f.mgr.Close()
// Override on the existing spawner so newFixture's wiring (and
// f.spawner.lastRequest tracking) still works.
f.spawner.make = func(tt *testing.T, req SpawnRequest) (*fakePTY, error) {
p := newFakePTY(tt, req.Cols, req.Rows)
// Give the child-side queue plenty of room so the test can
// saturate the *session* output buffer before childToClient
// back-pressures the producer goroutine.
p.childToClient = make(chan []byte, 256)
return p, nil
}
sess, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
if err != nil {
t.Fatalf("Open: %v", err)
}
pty := f.lastPTY(t)
// Pump enough chunks to fill session.output (cap 64) and queue more
// on childToClient; readLoop ends up blocked on output <- chunk.
// Don't drain sess.Output() — that's the whole point. Producer runs
// to completion (and exits) BEFORE Close, otherwise producer's send
// races Close's pty.Close which closes childToClient.
producerDone := make(chan struct{})
go func() {
defer close(producerDone)
for i := 0; i < 200; i++ {
select {
case pty.childToClient <- []byte("x"):
case <-time.After(50 * time.Millisecond):
return
}
}
}()
<-producerDone
// Should not panic, should not hang.
sess.Close("user_requested")
select {
case <-sess.Done():
case <-time.After(2 * time.Second):
t.Fatal("Done did not converge after Close with saturated output buffer")
}
if _, err := f.mgr.Get(sess.ID()); !errors.Is(err, ErrSessionNotFound) {
t.Errorf("session not deregistered after Close")
}
// ExitC must have fired before Done — required by the Output() doc
// contract ("channel closes after the child exits and a value has
// been delivered on ExitC()").
select {
case info := <-sess.ExitC():
if info.Reason != "user_requested" {
t.Errorf("exit reason = %q, want user_requested", info.Reason)
}
default:
t.Error("ExitC was empty after Done — finalize order violated")
}
}
func TestManager_OpenPropagatesUnsupportedOS(t *testing.T) {
// Regression: Manager.Open used fmt.Errorf("%w: %v", ErrSpawnFailed, err)
// which swallowed the inner sentinel. The protocol layer needs
// errors.Is to match both ErrSpawnFailed and ErrUnsupportedOS so it
// can map to terminal.error code "unsupported_os" instead of a
// generic "spawn_failed". Switched to double-%w; both must match.
f := newFixture(t)
defer f.mgr.Close()
f.spawner.make = func(_ *testing.T, _ SpawnRequest) (*fakePTY, error) {
return nil, ErrUnsupportedOS
}
_, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
if err == nil {
t.Fatal("Open returned nil err with failing spawner")
}
if !errors.Is(err, ErrUnsupportedOS) {
t.Errorf("errors.Is(err, ErrUnsupportedOS) = false; err = %v", err)
}
if !errors.Is(err, ErrSpawnFailed) {
t.Errorf("errors.Is(err, ErrSpawnFailed) = false; err = %v", err)
}
}
func TestSession_WriteUpdatesLastIO(t *testing.T) {
f := newFixture(t, func(c *ManagerConfig) {
c.IdleTimeout = 30 * time.Minute
})
defer f.mgr.Close()
sess, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
if err != nil {
t.Fatalf("Open: %v", err)
}
f.advance(20 * time.Minute)
if _, err := sess.Write([]byte("echo hi\n")); err != nil {
t.Fatalf("Write: %v", err)
}
f.advance(20 * time.Minute) // total 40min, but 20 min since last IO
f.mgr.CheckIdle()
if _, err := f.mgr.Get(sess.ID()); err != nil {
t.Fatalf("session evicted despite recent write: %v", err)
}
}
func TestSession_DoneFiresAfterDeregister(t *testing.T) {
// Locks the finalize-order contract from Round 2 review:
// ExitC → close(output) → onClose/deregister → close(done)
// External waiters (daemonws bridge, GC hook, audit) use `<-Done()`
// as the signal that the session is fully torn down. Any consumer
// querying the manager immediately after Done() must observe the
// session deregistered.
f := newFixture(t)
defer f.mgr.Close()
sess, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
if err != nil {
t.Fatalf("Open: %v", err)
}
id := sess.ID()
sess.Close("user_requested")
<-sess.Done()
if _, err := f.mgr.Get(id); !errors.Is(err, ErrSessionNotFound) {
t.Fatalf("Get after <-Done() = %v, want ErrSessionNotFound (finalize order violated)", err)
}
}
func TestManager_CloseConcurrentReentryWaitsForFinalize(t *testing.T) {
// Regression for Round 3 review: a late Close() caller used to return
// immediately when it saw m.closed==true, even though the first caller
// was still in the middle of waiting for each session's Done(). That
// broke the "Manager.Close returning means everything is drained"
// contract for every caller but the first. With closeDone, all callers
// now share the same finalize barrier.
f := newFixture(t)
f.spawner.make = func(tt *testing.T, req SpawnRequest) (*fakePTY, error) {
p := newFakePTY(tt, req.Cols, req.Rows)
// Long enough that the second goroutine's Close call definitely
// observes the first one mid-flight rather than already-finished.
p.waitDelay = 200 * time.Millisecond
return p, nil
}
s1, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
if err != nil {
t.Fatalf("Open: %v", err)
}
const callers = 4
var wg sync.WaitGroup
wg.Add(callers)
sessionsAfter := make([]int, callers)
doneClosed := make([]bool, callers)
for i := 0; i < callers; i++ {
i := i
go func() {
defer wg.Done()
f.mgr.Close()
sessionsAfter[i] = len(f.mgr.Sessions())
select {
case <-s1.Done():
doneClosed[i] = true
default:
doneClosed[i] = false
}
}()
}
wg.Wait()
for i := 0; i < callers; i++ {
if sessionsAfter[i] != 0 {
t.Errorf("caller %d: Sessions() after Close = %d, want 0", i, sessionsAfter[i])
}
if !doneClosed[i] {
t.Errorf("caller %d: session Done not closed when Close returned", i)
}
}
}
func TestManager_OpenAfterCloseReapsSpawnedPTY(t *testing.T) {
// Regression for Round 3 review: Manager.Open's cleanup path used to
// only call pty.Close() when it lost the race with Manager.Close —
// no Wait(), so on a real unix PTY the killed child stayed as a zombie
// (waitLoop never ran because sess.start() never ran). The fix calls
// pty.Wait() synchronously to reap.
f := newFixture(t)
inSpawn := make(chan struct{})
releaseSpawn := make(chan struct{})
spawnedPTY := make(chan *fakePTY, 1)
f.spawner.make = func(tt *testing.T, req SpawnRequest) (*fakePTY, error) {
close(inSpawn)
<-releaseSpawn
p := newFakePTY(tt, req.Cols, req.Rows)
// Allow Wait() to return immediately once Close fires its waitDone.
spawnedPTY <- p
return p, nil
}
openDone := make(chan error, 1)
go func() {
_, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
openDone <- err
}()
// Wait until Open is parked inside the spawner, then close the manager
// with zero registered sessions. Open will lose the race when it
// reacquires the mu after the spawn returns.
<-inSpawn
f.mgr.Close()
// Let the spawn finish; Open should now hit the closed-manager cleanup
// path: pty.Close + pty.Wait + return ErrManagerClosed.
close(releaseSpawn)
select {
case err := <-openDone:
if !errors.Is(err, ErrManagerClosed) {
t.Fatalf("Open after Close = %v, want ErrManagerClosed", err)
}
case <-time.After(2 * time.Second):
t.Fatal("Open did not return after Manager.Close + spawn release")
}
pty := <-spawnedPTY
if got := pty.waitCount.Load(); got < 1 {
t.Fatalf("pty.Wait() called %d times in cleanup path, want >=1 — child not reaped", got)
}
}
func TestManager_CloseWaitsForSessionFinalize(t *testing.T) {
// Manager.Close used to only wait for s.Close() (which just initiates
// teardown — signals stop, closes the PTY). The waitLoop finalizer
// could still be running after Manager.Close returned, leaving the
// sessions map non-empty briefly. With Round 2 review's fix, each
// goroutine in Manager.Close additionally `<-s.Done()` so the manager
// is fully drained by the time Close returns. We inject a Wait delay
// to make the difference observable: without the fix, the session map
// is still populated when Manager.Close returns and `<-s.Done()` would
// block.
f := newFixture(t)
f.spawner.make = func(tt *testing.T, req SpawnRequest) (*fakePTY, error) {
p := newFakePTY(tt, req.Cols, req.Rows)
p.waitDelay = 150 * time.Millisecond
return p, nil
}
s1, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
if err != nil {
t.Fatalf("Open: %v", err)
}
s2, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
if err != nil {
t.Fatalf("Open: %v", err)
}
f.mgr.Close()
// After Close returns: registry empty AND every Done is already closed.
if got := len(f.mgr.Sessions()); got != 0 {
t.Errorf("Sessions after Manager.Close = %d, want 0", got)
}
for _, s := range []*PtySession{s1, s2} {
select {
case <-s.Done():
default:
t.Errorf("session %s Done not closed when Manager.Close returned", s.ID())
}
if _, err := f.mgr.Get(s.ID()); !errors.Is(err, ErrSessionNotFound) {
t.Errorf("session %s still registered after Manager.Close: %v", s.ID(), err)
}
}
}

View File

@@ -0,0 +1,35 @@
package terminal
import (
"io"
"time"
)
// PTY abstracts the platform PTY + child process so tests can swap in a
// fake without forking a real shell. Read returns child stdout/stderr;
// Write delivers stdin; Resize updates the window; Wait blocks for the
// child to exit and returns its exit code; Close terminates the child
// and releases the master fd.
type PTY interface {
io.ReadWriter
Resize(cols, rows uint16) error
Wait() (exitCode int, err error)
Close() error
}
// SpawnRequest is the input to Spawner.Start.
type SpawnRequest struct {
Shell string
Args []string
Cwd string
Env []string
Cols uint16
Rows uint16
Started time.Time
}
// Spawner creates new PTYs. Production uses realSpawner; tests inject a
// channel-backed fake (see fakePTY in manager_test.go).
type Spawner interface {
Start(SpawnRequest) (PTY, error)
}

View File

@@ -0,0 +1,275 @@
package terminal
import (
"errors"
"io"
"log/slog"
"sync"
"time"
)
// ExitInfo describes how a session terminated.
type ExitInfo struct {
ExitCode int
Reason string
}
// PtySession is a single live PTY + child shell. Methods are safe for
// concurrent use; readers consume from Output() and ExitC() until
// Output() is closed, which always follows an ExitC() send.
type PtySession struct {
id string
taskID string
workspaceID string
issueID string
workDir string
userID string
shellPath string
mu sync.Mutex
cols, rows uint16
pty PTY
output chan []byte
exit chan ExitInfo
done chan struct{}
stop chan struct{}
stopOnce sync.Once
closing bool
closeReason string
// wg tracks readLoop and idleLoop. waitLoop is the finalizer: it
// waits on wg before closing output/done so we never close the
// output channel while readLoop is mid-send.
wg sync.WaitGroup
now func() time.Time
idleTimeout time.Duration
startedAt time.Time
lastIO time.Time
logger *slog.Logger
onClose func(string)
}
// ID returns the session identifier.
func (s *PtySession) ID() string { return s.id }
// TaskID returns the task this session is bound to.
func (s *PtySession) TaskID() string { return s.taskID }
// WorkspaceID returns the workspace this session belongs to.
func (s *PtySession) WorkspaceID() string { return s.workspaceID }
// IssueID returns the issue this session was opened from, if any.
func (s *PtySession) IssueID() string { return s.issueID }
// WorkDir returns the cwd of the child shell.
func (s *PtySession) WorkDir() string { return s.workDir }
// UserID returns the human user who opened the session.
func (s *PtySession) UserID() string { return s.userID }
// Shell returns the shell binary path that was spawned.
func (s *PtySession) Shell() string { return s.shellPath }
// StartedAt returns the wall-clock time the session was spawned.
func (s *PtySession) StartedAt() time.Time { return s.startedAt }
// LastIO returns the most recent time data flowed in either direction.
func (s *PtySession) LastIO() time.Time {
s.mu.Lock()
defer s.mu.Unlock()
return s.lastIO
}
// Output yields PTY output chunks as they arrive. The channel closes
// after the child exits and a value has been delivered on ExitC().
func (s *PtySession) Output() <-chan []byte { return s.output }
// ExitC fires once when the child exits. After that, Output() closes.
func (s *PtySession) ExitC() <-chan ExitInfo { return s.exit }
// Done returns a channel closed when the session is fully torn down
// (all goroutines exited, registry deregistered).
func (s *PtySession) Done() <-chan struct{} { return s.done }
// Write forwards bytes to the PTY stdin. Returns the byte count actually
// written. Updates LastIO so idle detection sees the activity.
func (s *PtySession) Write(p []byte) (int, error) {
s.mu.Lock()
if s.closing {
s.mu.Unlock()
return 0, ErrSessionNotFound
}
pty := s.pty
s.lastIO = s.now()
s.mu.Unlock()
return pty.Write(p)
}
// Resize updates the PTY window size.
func (s *PtySession) Resize(cols, rows uint16) error {
cols, rows = normalizeSize(cols, rows)
s.mu.Lock()
if s.closing {
s.mu.Unlock()
return ErrSessionNotFound
}
s.cols = cols
s.rows = rows
pty := s.pty
s.lastIO = s.now()
s.mu.Unlock()
return pty.Resize(cols, rows)
}
// Size returns the current cols, rows of the PTY.
func (s *PtySession) Size() (uint16, uint16) {
s.mu.Lock()
defer s.mu.Unlock()
return s.cols, s.rows
}
// Close tears down the session. Subsequent calls are no-ops. The
// reason is recorded for audit logging and the terminal.exit payload.
//
// Close only initiates teardown — signals stop, closes the PTY, returns.
// waitLoop is the actual finalizer: it waits for readLoop + idleLoop
// to exit (via wg) before closing output/done. That ordering is what
// makes "Close while output buffer is full" safe — readLoop's blocked
// send unblocks on <-stop, and only then does the output channel close.
func (s *PtySession) Close(reason string) {
s.mu.Lock()
if s.closing {
s.mu.Unlock()
return
}
s.closing = true
s.closeReason = reason
pty := s.pty
s.mu.Unlock()
s.stopOnce.Do(func() { close(s.stop) })
if pty != nil {
// pty.Close on the unix spawner runs SIGHUP → grace → SIGKILL.
// It's idempotent (sync.Once), so the second call from waitLoop's
// finalizer is a no-op.
_ = pty.Close()
}
}
// start kicks off the reader, exit-watch, and (optional) idle
// goroutines. Manager.Open is the only caller. wg.Add runs
// synchronously before waitLoop is spawned so wg.Wait sees the
// correct count even if Close fires immediately.
func (s *PtySession) start() {
s.wg.Add(1)
go s.readLoop()
if s.idleTimeout > 0 {
s.wg.Add(1)
go s.idleLoop()
}
go s.waitLoop()
}
func (s *PtySession) readLoop() {
defer s.wg.Done()
buf := make([]byte, 4096)
for {
n, err := s.pty.Read(buf)
if n > 0 {
chunk := make([]byte, n)
copy(chunk, buf[:n])
s.mu.Lock()
s.lastIO = s.now()
s.mu.Unlock()
select {
case s.output <- chunk:
case <-s.stop:
return
}
}
if err != nil {
if !errors.Is(err, io.EOF) && err != io.ErrClosedPipe {
s.logger.Debug("pty read error", "err", err)
}
return
}
}
}
func (s *PtySession) waitLoop() {
code, waitErr := s.pty.Wait()
s.mu.Lock()
reason := s.closeReason
if reason == "" {
if waitErr != nil {
reason = "wait_error"
} else {
reason = "exited"
}
s.closeReason = reason
}
s.closing = true
s.mu.Unlock()
// Ensure the PTY fd is closed so readLoop's pty.Read returns EOF.
// pty.Close is idempotent (sync.Once on the unix spawner).
_ = s.pty.Close()
// Signal stop so idleLoop and any blocked send in readLoop exit.
s.stopOnce.Do(func() { close(s.stop) })
// Wait for readLoop + idleLoop before closing output/done. This is
// the invariant that prevents "send on closed channel" panics when
// output is full: readLoop is either past its send or unblocked via
// <-stop, but never racing with close(s.output).
s.wg.Wait()
// Finalize order is load-bearing: external waiters use `<-Done()` as
// a signal that the session is fully torn down AND deregistered from
// the manager. The sequence must be:
// ExitC → close(output) → onClose/deregister → close(done)
// so that any consumer doing `<-Done(); manager.Get(id)` after a
// teardown is guaranteed to observe ErrSessionNotFound.
select {
case s.exit <- ExitInfo{ExitCode: code, Reason: reason}:
default:
}
close(s.output)
if s.onClose != nil {
s.onClose(s.id)
}
close(s.done)
}
func (s *PtySession) idleLoop() {
defer s.wg.Done()
// Sample at IdleTimeout/4 so reaction time is bounded but ticks
// stay cheap with many sessions. Manager.CheckIdle catches anything
// this loop misses (e.g. when daemon's outer GC tick is coarser).
interval := s.idleTimeout / 4
if interval < time.Second {
interval = time.Second
}
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-s.stop:
return
case <-t.C:
if s.now().Sub(s.LastIO()) >= s.idleTimeout {
// Close calls pty.Close + waits for wg in waitLoop. If
// we ran it inline, waitLoop's wg.Wait would block on
// this goroutine, which can't exit until Close returns
// — deadlock. Spawning lets idleLoop return and
// decrement wg.
go s.Close("idle_timeout")
return
}
}
}
}

View File

@@ -0,0 +1,102 @@
//go:build !windows
package terminal
import (
"fmt"
"os"
"os/exec"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/creack/pty"
)
// closeGracePeriod is the window between SIGHUP and SIGKILL during a
// Close. Long enough for interactive shells to run trap handlers and
// flush state; short enough that closing a tab feels instant.
const closeGracePeriod = 250 * time.Millisecond
// realSpawner forks the shell on a PTY using creack/pty. Linux/macOS
// only; Windows reaches the stub in spawner_windows.go and returns
// ErrUnsupportedOS.
type realSpawner struct{}
func (realSpawner) Start(req SpawnRequest) (PTY, error) {
cmd := exec.Command(req.Shell, req.Args...)
cmd.Dir = req.Cwd
// Inherit the daemon's PATH so users get whatever CLIs are installed
// in the daemon's environment (claude, codex, multica, etc.); merge
// in the per-session vars built by buildEnv.
env := os.Environ()
env = append(env, req.Env...)
cmd.Env = env
size := &pty.Winsize{Cols: req.Cols, Rows: req.Rows}
f, err := pty.StartWithSize(cmd, size)
if err != nil {
return nil, fmt.Errorf("pty.StartWithSize: %w", err)
}
return &unixPTY{cmd: cmd, file: f}, nil
}
type unixPTY struct {
cmd *exec.Cmd
file *os.File
exited atomic.Bool
closeOnce sync.Once
closeErr error
}
func (p *unixPTY) Read(b []byte) (int, error) { return p.file.Read(b) }
func (p *unixPTY) Write(b []byte) (int, error) { return p.file.Write(b) }
func (p *unixPTY) Resize(cols, rows uint16) error {
return pty.Setsize(p.file, &pty.Winsize{Cols: cols, Rows: rows})
}
func (p *unixPTY) Wait() (int, error) {
err := p.cmd.Wait()
p.exited.Store(true)
if p.cmd.ProcessState != nil {
return p.cmd.ProcessState.ExitCode(), err
}
return -1, err
}
// Close terminates the child shell and releases the PTY master fd.
// Closing a tab is a hangup, not an interrupt — so the signal path is
// SIGHUP → brief grace → SIGKILL → file.Close, in that order:
//
// - SIGHUP gives interactive shells a chance to run trap handlers,
// write history, etc. before the fd disappears.
// - The grace window is bounded; anything slower than that is stuck.
// - SIGKILL is the cliff for shells that ignore HUP.
// - file.Close releases the master fd last so the slave side keeps
// working during cleanup.
//
// Signals are sent to the negated pid so they hit the whole process
// group. creack/pty starts the child as a session leader (Setsid), so
// pid == pgid and any descendants the user spawned in the shell are
// caught by the same kill.
//
// If the child already exited naturally (Wait returned), all signal
// work is skipped — we only close the fd. That avoids a pointless
// 250ms sleep in the natural-exit teardown path.
func (p *unixPTY) Close() error {
p.closeOnce.Do(func() {
if p.cmd.Process != nil && !p.exited.Load() {
pid := p.cmd.Process.Pid
_ = syscall.Kill(-pid, syscall.SIGHUP)
time.Sleep(closeGracePeriod)
if !p.exited.Load() {
_ = syscall.Kill(-pid, syscall.SIGKILL)
}
}
p.closeErr = p.file.Close()
})
return p.closeErr
}

View File

@@ -0,0 +1,9 @@
//go:build windows
package terminal
// realSpawner on Windows always refuses — ConPty support is RFC P1 and
// the Desktop button + CLI both surface a clear error from this layer.
type realSpawner struct{}
func (realSpawner) Start(SpawnRequest) (PTY, error) { return nil, ErrUnsupportedOS }

View File

@@ -0,0 +1,260 @@
package daemon
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"log/slog"
"sync"
"github.com/multica-ai/multica/server/internal/daemon/terminal"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// terminalBridge adapts the daemon-side terminal.Manager to the daemonws
// WebSocket transport. Per session it:
//
// - relays PtySession.Output() → terminal.data frames (daemon→server)
// - relays PtySession.ExitC() → terminal.exit frames
// - tears the bridge goroutine down when Done() fires
//
// frameSender is the daemon's currently-active WS writer (see
// Daemon.sendWSFrame). It returns false when no connection is active or the
// outbound queue is saturated; we drop the frame in that case rather than
// stall the reader, because the next reconnect would unstick us anyway.
type terminalBridge struct {
manager *terminal.Manager
logger *slog.Logger
send func([]byte) bool
mu sync.Mutex
sessions map[string]*terminalRoute
}
type terminalRoute struct {
session *terminal.PtySession
cancel context.CancelFunc
}
func newTerminalBridge(mgr *terminal.Manager, logger *slog.Logger, send func([]byte) bool) *terminalBridge {
return &terminalBridge{
manager: mgr,
logger: logger,
send: send,
sessions: make(map[string]*terminalRoute),
}
}
// handleFrame dispatches a single terminal.* envelope from the server. The
// caller already decoded protocol.Message; we receive the inner type+payload.
func (b *terminalBridge) handleFrame(msgType string, payload json.RawMessage) {
switch msgType {
case protocol.MessageTypeTerminalOpen:
var p protocol.TerminalOpenPayload
if err := json.Unmarshal(payload, &p); err != nil {
b.logger.Debug("terminal.open invalid payload", "error", err)
return
}
b.handleOpen(p)
case protocol.MessageTypeTerminalData:
var p protocol.TerminalDataPayload
if err := json.Unmarshal(payload, &p); err != nil {
b.logger.Debug("terminal.data invalid payload", "error", err)
return
}
b.handleData(p)
case protocol.MessageTypeTerminalResize:
var p protocol.TerminalResizePayload
if err := json.Unmarshal(payload, &p); err != nil {
b.logger.Debug("terminal.resize invalid payload", "error", err)
return
}
b.handleResize(p)
case protocol.MessageTypeTerminalClose:
var p protocol.TerminalClosePayload
if err := json.Unmarshal(payload, &p); err != nil {
b.logger.Debug("terminal.close invalid payload", "error", err)
return
}
b.handleClose(p)
}
}
func (b *terminalBridge) handleOpen(p protocol.TerminalOpenPayload) {
info := terminal.TaskInfo{
TaskID: p.TaskID,
WorkspaceID: p.WorkspaceID,
IssueID: p.IssueID,
WorkDir: p.WorkDir,
PriorSessionID: p.PriorSessionID,
}
sess, err := b.manager.OpenWithInfo(context.Background(), info, terminal.OpenParams{
TaskID: p.TaskID,
WorkspaceID: p.WorkspaceID,
UserID: p.UserID,
Cols: p.Cols,
Rows: p.Rows,
})
if err != nil {
b.sendError(p.RequestID, "", mapTerminalError(err), err.Error())
return
}
ctx, cancel := context.WithCancel(context.Background())
b.mu.Lock()
b.sessions[sess.ID()] = &terminalRoute{session: sess, cancel: cancel}
b.mu.Unlock()
b.sendFrame(protocol.MessageTypeTerminalOpened, protocol.TerminalOpenedPayload{
RequestID: p.RequestID,
SessionID: sess.ID(),
WorkDir: sess.WorkDir(),
Shell: sess.Shell(),
})
go b.pump(ctx, sess)
}
func (b *terminalBridge) handleData(p protocol.TerminalDataPayload) {
sess, err := b.manager.Get(p.SessionID)
if err != nil {
b.sendError("", p.SessionID, protocol.TerminalErrorCodeSessionNotFound, err.Error())
return
}
data, err := base64.StdEncoding.DecodeString(p.DataB64)
if err != nil {
b.logger.Debug("terminal.data invalid base64", "error", err, "session_id", p.SessionID)
return
}
if _, err := sess.Write(data); err != nil {
b.logger.Debug("terminal.data write failed", "error", err, "session_id", p.SessionID)
}
}
func (b *terminalBridge) handleResize(p protocol.TerminalResizePayload) {
sess, err := b.manager.Get(p.SessionID)
if err != nil {
b.sendError("", p.SessionID, protocol.TerminalErrorCodeSessionNotFound, err.Error())
return
}
if err := sess.Resize(p.Cols, p.Rows); err != nil {
b.logger.Debug("terminal.resize failed", "error", err, "session_id", p.SessionID)
}
}
func (b *terminalBridge) handleClose(p protocol.TerminalClosePayload) {
sess, err := b.manager.Get(p.SessionID)
if err != nil {
// Already gone — nothing to do; the server side has already received
// a terminal.exit frame (or will, through the pump goroutine).
return
}
reason := p.Reason
if reason == "" {
reason = "client_close"
}
sess.Close(reason)
}
// pump bridges one session's output channel onto the WS as terminal.data
// frames, and emits a terminal.exit when the child exits. Returns when
// either the session is fully torn down or ctx is cancelled.
func (b *terminalBridge) pump(ctx context.Context, sess *terminal.PtySession) {
sessionID := sess.ID()
defer func() {
b.mu.Lock()
delete(b.sessions, sessionID)
b.mu.Unlock()
}()
for {
select {
case <-ctx.Done():
return
case chunk, ok := <-sess.Output():
if !ok {
// Output closed → child exited and waitLoop finalized.
// ExitC was already delivered (or about to be); pull it once
// non-blocking and forward, then exit the pump.
var info terminal.ExitInfo
select {
case info = <-sess.ExitC():
default:
}
b.sendFrame(protocol.MessageTypeTerminalExit, protocol.TerminalExitPayload{
SessionID: sessionID,
ExitCode: info.ExitCode,
Reason: info.Reason,
})
<-sess.Done()
return
}
b.sendFrame(protocol.MessageTypeTerminalData, protocol.TerminalDataPayload{
SessionID: sessionID,
DataB64: base64.StdEncoding.EncodeToString(chunk),
})
}
}
}
// closeAll tears down every live session. Called when the daemon
// disconnects from the server: the browser proxy will fail downstream,
// and a reconnect cannot resurrect the pre-existing PTYs because the
// session_ids only existed in the prior WS context.
func (b *terminalBridge) closeAll(reason string) {
b.mu.Lock()
routes := make([]*terminalRoute, 0, len(b.sessions))
for _, r := range b.sessions {
routes = append(routes, r)
}
b.mu.Unlock()
for _, r := range routes {
r.cancel()
r.session.Close(reason)
}
}
func (b *terminalBridge) sendFrame(msgType string, payload any) {
raw, err := json.Marshal(payload)
if err != nil {
b.logger.Debug("terminal frame marshal failed", "error", err, "type", msgType)
return
}
frame, err := json.Marshal(protocol.Message{Type: msgType, Payload: raw})
if err != nil {
b.logger.Debug("terminal envelope marshal failed", "error", err, "type", msgType)
return
}
if !b.send(frame) {
b.logger.Debug("terminal frame dropped: ws disconnected or backed up", "type", msgType)
}
}
func (b *terminalBridge) sendError(requestID, sessionID, code, message string) {
b.sendFrame(protocol.MessageTypeTerminalError, protocol.TerminalErrorPayload{
RequestID: requestID,
SessionID: sessionID,
Code: code,
Message: message,
})
}
// mapTerminalError translates the terminal package's sentinel errors into
// protocol error codes the browser proxy can render. Anything we don't
// recognise falls back to TerminalErrorCodeInternal — drop information
// rather than surface internal wrap text to the user.
func mapTerminalError(err error) string {
switch {
case errors.Is(err, terminal.ErrWorkspaceMismatch):
return protocol.TerminalErrorCodeWorkspaceMismatch
case errors.Is(err, terminal.ErrTaskNotFound):
return protocol.TerminalErrorCodeTaskNotFound
case errors.Is(err, terminal.ErrSessionNotFound):
return protocol.TerminalErrorCodeSessionNotFound
case errors.Is(err, terminal.ErrUnsupportedOS):
return protocol.TerminalErrorCodeUnsupportedOS
case errors.Is(err, terminal.ErrSpawnFailed):
return protocol.TerminalErrorCodeSpawnFailed
}
return protocol.TerminalErrorCodeInternal
}

View File

@@ -0,0 +1,313 @@
package daemon
import (
"encoding/base64"
"encoding/json"
"log/slog"
"sync"
"testing"
"time"
"github.com/multica-ai/multica/server/internal/daemon/terminal"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// captureSender is the test stand-in for the daemon's outbound WS writer.
// Frames are kept in order so the test can wait for a specific message type
// to appear.
type captureSender struct {
mu sync.Mutex
frames [][]byte
}
func (c *captureSender) send(frame []byte) bool {
c.mu.Lock()
defer c.mu.Unlock()
cp := make([]byte, len(frame))
copy(cp, frame)
c.frames = append(c.frames, cp)
return true
}
func (c *captureSender) waitFor(t *testing.T, msgType string, timeout time.Duration) protocol.Message {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
c.mu.Lock()
for _, f := range c.frames {
var m protocol.Message
if err := json.Unmarshal(f, &m); err == nil && m.Type == msgType {
c.mu.Unlock()
return m
}
}
c.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("timeout waiting for frame of type %q (saw %d frames)", msgType, len(c.frames))
return protocol.Message{}
}
// fakeBridgePTY is a minimal PTY for the bridge integration: it lets the
// test push child output and read writes back. Wait blocks until Close.
type fakeBridgePTY struct {
out chan []byte
mu sync.Mutex
written []byte
cols uint16
rows uint16
closeCh chan struct{}
exit int
waitOnce sync.Once
waitDone chan struct{}
}
func newFakeBridgePTY(cols, rows uint16) *fakeBridgePTY {
return &fakeBridgePTY{
out: make(chan []byte, 4),
cols: cols,
rows: rows,
closeCh: make(chan struct{}),
waitDone: make(chan struct{}),
}
}
func (p *fakeBridgePTY) Read(b []byte) (int, error) {
select {
case chunk, ok := <-p.out:
if !ok {
return 0, errEOF
}
n := copy(b, chunk)
return n, nil
case <-p.closeCh:
return 0, errEOF
}
}
func (p *fakeBridgePTY) Write(b []byte) (int, error) {
p.mu.Lock()
defer p.mu.Unlock()
p.written = append(p.written, b...)
return len(b), nil
}
func (p *fakeBridgePTY) Resize(cols, rows uint16) error {
p.mu.Lock()
defer p.mu.Unlock()
p.cols = cols
p.rows = rows
return nil
}
func (p *fakeBridgePTY) Close() error {
p.waitOnce.Do(func() {
close(p.closeCh)
close(p.waitDone)
})
return nil
}
func (p *fakeBridgePTY) Wait() (int, error) {
<-p.waitDone
return p.exit, nil
}
type stringErr string
func (e stringErr) Error() string { return string(e) }
const errEOF = stringErr("EOF")
func TestTerminalBridge_OpenSendsOpenedFrameWithServerSuppliedWorkdir(t *testing.T) {
tmp := t.TempDir()
pty := newFakeBridgePTY(80, 24)
spawner := &stubSpawner{pty: pty}
mgr := terminal.NewManager(terminal.ManagerConfig{
Spawner: spawner,
Logger: slog.Default(),
}, nil)
defer mgr.Close()
sender := &captureSender{}
bridge := newTerminalBridge(mgr, slog.Default(), sender.send)
openPayload, err := json.Marshal(protocol.TerminalOpenPayload{
RequestID: "req-1",
TaskID: "task-via-ws",
WorkspaceID: "ws-A",
UserID: "user-1",
IssueID: "issue-1",
WorkDir: tmp,
PriorSessionID: "claude-xyz",
Cols: 120,
Rows: 30,
})
if err != nil {
t.Fatalf("marshal open: %v", err)
}
bridge.handleFrame(protocol.MessageTypeTerminalOpen, openPayload)
openedMsg := sender.waitFor(t, protocol.MessageTypeTerminalOpened, time.Second)
var opened protocol.TerminalOpenedPayload
if err := json.Unmarshal(openedMsg.Payload, &opened); err != nil {
t.Fatalf("opened payload: %v", err)
}
if opened.RequestID != "req-1" {
t.Errorf("opened.request_id = %q, want req-1", opened.RequestID)
}
if opened.SessionID == "" {
t.Errorf("opened.session_id is empty")
}
if opened.WorkDir != tmp {
t.Errorf("opened.work_dir = %q, want %q", opened.WorkDir, tmp)
}
}
func TestTerminalBridge_OpenWithoutWorkdirEmitsTaskNotFound(t *testing.T) {
// The server is supposed to resolve task.work_dir from its DB before
// forwarding terminal.open. If it forgets / fails, the daemon must
// not silently fall through to spawning bash in CWD — it has to
// surface a structured terminal.error and never call the spawner.
pty := newFakeBridgePTY(80, 24)
spawner := &stubSpawner{pty: pty}
mgr := terminal.NewManager(terminal.ManagerConfig{
Spawner: spawner,
Logger: slog.Default(),
}, nil)
defer mgr.Close()
sender := &captureSender{}
bridge := newTerminalBridge(mgr, slog.Default(), sender.send)
openPayload, _ := json.Marshal(protocol.TerminalOpenPayload{
RequestID: "req-2",
TaskID: "task-evil",
WorkspaceID: "ws-B",
WorkDir: "", // server failed to resolve
Cols: 80,
Rows: 24,
})
bridge.handleFrame(protocol.MessageTypeTerminalOpen, openPayload)
errMsg := sender.waitFor(t, protocol.MessageTypeTerminalError, time.Second)
var errPayload protocol.TerminalErrorPayload
if err := json.Unmarshal(errMsg.Payload, &errPayload); err != nil {
t.Fatalf("error payload: %v", err)
}
if errPayload.Code != protocol.TerminalErrorCodeTaskNotFound {
t.Errorf("error code = %q, want %q", errPayload.Code, protocol.TerminalErrorCodeTaskNotFound)
}
if errPayload.RequestID != "req-2" {
t.Errorf("error request_id = %q, want req-2", errPayload.RequestID)
}
if spawner.callCount() != 0 {
t.Errorf("spawner was invoked %d times despite resolve failure", spawner.callCount())
}
}
func TestTerminalBridge_DataAndExitRoundTrip(t *testing.T) {
tmp := t.TempDir()
pty := newFakeBridgePTY(80, 24)
spawner := &stubSpawner{pty: pty}
mgr := terminal.NewManager(terminal.ManagerConfig{
Spawner: spawner,
Logger: slog.Default(),
}, nil)
defer mgr.Close()
sender := &captureSender{}
bridge := newTerminalBridge(mgr, slog.Default(), sender.send)
openPayload, _ := json.Marshal(protocol.TerminalOpenPayload{
RequestID: "req-3",
TaskID: "task-3",
WorkspaceID: "ws-A",
WorkDir: tmp,
Cols: 80,
Rows: 24,
})
bridge.handleFrame(protocol.MessageTypeTerminalOpen, openPayload)
openedMsg := sender.waitFor(t, protocol.MessageTypeTerminalOpened, time.Second)
var opened protocol.TerminalOpenedPayload
_ = json.Unmarshal(openedMsg.Payload, &opened)
sessionID := opened.SessionID
// Push child output → bridge should emit terminal.data on the WS.
pty.out <- []byte("hello\n")
dataMsg := sender.waitFor(t, protocol.MessageTypeTerminalData, time.Second)
var dp protocol.TerminalDataPayload
if err := json.Unmarshal(dataMsg.Payload, &dp); err != nil {
t.Fatalf("data payload: %v", err)
}
if dp.SessionID != sessionID {
t.Errorf("data session_id = %q, want %q", dp.SessionID, sessionID)
}
decoded, err := base64.StdEncoding.DecodeString(dp.DataB64)
if err != nil {
t.Fatalf("decode data: %v", err)
}
if string(decoded) != "hello\n" {
t.Errorf("data bytes = %q, want %q", decoded, "hello\n")
}
// Send a write the other direction. The bridge should base64-decode
// and call PTY.Write.
inboundData, _ := json.Marshal(protocol.TerminalDataPayload{
SessionID: sessionID,
DataB64: base64.StdEncoding.EncodeToString([]byte("ls\n")),
})
bridge.handleFrame(protocol.MessageTypeTerminalData, inboundData)
// Allow Write to settle.
deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) {
pty.mu.Lock()
got := string(pty.written)
pty.mu.Unlock()
if got == "ls\n" {
break
}
time.Sleep(10 * time.Millisecond)
}
pty.mu.Lock()
if string(pty.written) != "ls\n" {
t.Errorf("PTY received %q, want %q", pty.written, "ls\n")
}
pty.mu.Unlock()
// Close from the client side. The bridge should propagate via
// session.Close → waitLoop → terminal.exit.
closePayload, _ := json.Marshal(protocol.TerminalClosePayload{
SessionID: sessionID,
Reason: "test",
})
bridge.handleFrame(protocol.MessageTypeTerminalClose, closePayload)
sender.waitFor(t, protocol.MessageTypeTerminalExit, time.Second)
}
// stubSpawner returns a single pre-built PTY on the first Start. callCount
// lets tests assert that no spawn happened on a reject path.
type stubSpawner struct {
pty *fakeBridgePTY
mu sync.Mutex
calls int
}
func (s *stubSpawner) Start(_ terminal.SpawnRequest) (terminal.PTY, error) {
s.mu.Lock()
s.calls++
s.mu.Unlock()
return s.pty, nil
}
func (s *stubSpawner) callCount() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.calls
}

View File

@@ -116,6 +116,13 @@ func (d *Daemon) runTaskWakeupConnection(ctx context.Context, runtimeIDs []strin
writerDone := make(chan struct{})
go d.runWSWriter(conn, writes, writerDone)
// Expose this connection's writer to the terminal bridge so it can push
// terminal.* frames back to the server. The defer in the parent cleanup
// block clears the pointer (and tears any live PTYs down) before the
// writes channel is closed.
d.installWSWrites(writes)
defer d.clearWSWrites()
heartbeatCtx, cancelHeartbeat := context.WithCancel(ctx)
hbDone := make(chan struct{})
go func() {
@@ -287,6 +294,13 @@ func (d *Daemon) readTaskWakeupMessages(conn *websocket.Conn, taskWakeups chan<-
continue
}
d.handleWSHeartbeatAck(context.Background(), &ack)
case protocol.MessageTypeTerminalOpen,
protocol.MessageTypeTerminalData,
protocol.MessageTypeTerminalResize,
protocol.MessageTypeTerminalClose:
if bridge := d.currentTerminalBridge(); bridge != nil {
bridge.handleFrame(msg.Type, msg.Payload)
}
}
}
}

View File

@@ -82,6 +82,9 @@ type Hub struct {
hbMu sync.RWMutex
onHeartbeat HeartbeatHandler
termMu sync.RWMutex
termRouter *TerminalRouter
}
func NewHub() *Hub {
@@ -146,7 +149,12 @@ func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request, identity C
c := &client{
hub: h,
conn: conn,
send: make(chan []byte, 16),
// Buffer sized for PTY traffic: terminal.data frames carry up to a
// few KB each and the read pump from xterm.js can fall behind during
// large bursts (build output, `cat` of a big file). 256 frames gives
// ~1MB of slack before we start dropping; wakeup hints and heartbeat
// acks coexist on the same queue but are tiny in comparison.
send: make(chan []byte, 256),
identity: identity,
runtimes: runtimes,
}
@@ -320,7 +328,12 @@ func (c *client) readPump() {
c.conn.Close()
}()
c.conn.SetReadLimit(4096)
// Terminal frames embed base64-encoded PTY chunks; readLoop on the
// daemon side caps each chunk at 4KB raw, which is ~5.6KB base64 plus
// JSON envelope. 64KB leaves headroom for future growth without
// disconnecting daemons that briefly exceed the legacy 4KB heartbeat-
// only ceiling.
c.conn.SetReadLimit(64 * 1024)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
@@ -348,6 +361,14 @@ func (c *client) handleFrame(raw []byte) {
switch msg.Type {
case protocol.EventDaemonHeartbeat:
c.handleHeartbeatFrame(msg.Payload)
case protocol.MessageTypeTerminalOpened,
protocol.MessageTypeTerminalData,
protocol.MessageTypeTerminalClose,
protocol.MessageTypeTerminalExit,
protocol.MessageTypeTerminalError:
if router := c.hub.terminalRouter(); router != nil {
router.Route(raw, msg.Type, msg.Payload)
}
default:
// Unknown app messages are intentionally ignored for forward
// compatibility with future daemon → server message types.

View File

@@ -0,0 +1,196 @@
package daemonws
import (
"encoding/json"
"errors"
"log/slog"
"sync"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// TerminalSink receives terminal.* frames addressed to a single browser
// WebSocket connection. The handler implementation owns the frame queue;
// implementations must be non-blocking — the hub drops the frame if Deliver
// returns false rather than back up the daemon read pump.
type TerminalSink interface {
Deliver(frame []byte) bool
}
// TerminalRouter is the daemonws-side multiplex for terminal.* frames coming
// back from a daemon. Browser proxy connections register under their pending
// request_id and, after terminal.opened arrives, re-register under the
// session_id the daemon picked.
type TerminalRouter struct {
mu sync.RWMutex
sinks map[string]TerminalSink
}
// NewTerminalRouter constructs an empty router. The Hub owns the only
// instance in production; tests can build their own.
func NewTerminalRouter() *TerminalRouter {
return &TerminalRouter{sinks: make(map[string]TerminalSink)}
}
// Register installs sink under the given key. The key is either a
// request_id (before the daemon assigns a session_id) or a session_id
// (after the open ack). Re-registering an existing key replaces the sink.
func (r *TerminalRouter) Register(key string, sink TerminalSink) {
if r == nil || key == "" || sink == nil {
return
}
r.mu.Lock()
r.sinks[key] = sink
r.mu.Unlock()
}
// Unregister removes the sink for key, if any.
func (r *TerminalRouter) Unregister(key string) {
if r == nil || key == "" {
return
}
r.mu.Lock()
delete(r.sinks, key)
r.mu.Unlock()
}
// Route extracts the routing key (request_id or session_id) from a
// terminal.* frame and forwards the raw frame to the registered sink.
// Unknown keys are dropped silently — the daemon-side session ultimately
// observes the dead client via send-side errors / idle timeout.
func (r *TerminalRouter) Route(frame []byte, msgType string, payload json.RawMessage) {
if r == nil {
return
}
key := terminalRouteKey(msgType, payload)
if key == "" {
return
}
r.mu.RLock()
sink := r.sinks[key]
r.mu.RUnlock()
if sink == nil {
return
}
if !sink.Deliver(frame) {
slog.Debug("daemon ws terminal frame dropped: slow sink", "type", msgType, "key", key)
}
}
func terminalRouteKey(msgType string, payload json.RawMessage) string {
switch msgType {
case protocol.MessageTypeTerminalOpened, protocol.MessageTypeTerminalError:
var p struct {
RequestID string `json:"request_id,omitempty"`
SessionID string `json:"session_id,omitempty"`
}
if err := json.Unmarshal(payload, &p); err != nil {
return ""
}
// terminal.error may carry either request_id (pre-open failure) or
// session_id (post-open failure). Prefer request_id so the proxy
// receives the failure on the same key it registered.
if p.RequestID != "" {
return p.RequestID
}
return p.SessionID
case protocol.MessageTypeTerminalData,
protocol.MessageTypeTerminalClose,
protocol.MessageTypeTerminalExit:
var p struct {
SessionID string `json:"session_id"`
}
if err := json.Unmarshal(payload, &p); err != nil {
return ""
}
return p.SessionID
}
return ""
}
// SetTerminalRouter installs an explicit router. Tests use this to inject a
// fake; production code can rely on the auto-created router exposed by
// TerminalRouter() below.
func (h *Hub) SetTerminalRouter(r *TerminalRouter) {
if h == nil {
return
}
h.termMu.Lock()
h.termRouter = r
h.termMu.Unlock()
}
// TerminalRouter returns the hub's router, creating one on first access.
// The browser proxy WS handler grabs this to register per-session sinks;
// production wiring does not need an explicit SetTerminalRouter call.
func (h *Hub) TerminalRouter() *TerminalRouter {
if h == nil {
return nil
}
h.termMu.RLock()
r := h.termRouter
h.termMu.RUnlock()
if r != nil {
return r
}
h.termMu.Lock()
defer h.termMu.Unlock()
if h.termRouter == nil {
h.termRouter = NewTerminalRouter()
}
return h.termRouter
}
// terminalRouter is the internal read-only accessor used by handleFrame.
// It returns nil if no router has been configured, which short-circuits
// dispatch — the auto-create only happens through the public accessor.
func (h *Hub) terminalRouter() *TerminalRouter {
h.termMu.RLock()
defer h.termMu.RUnlock()
return h.termRouter
}
// ErrNoDaemonForRuntime is returned by SendToRuntime when no daemon is
// currently connected for the given runtime_id. The browser proxy uses this
// to fail the open request with a clear error.
var ErrNoDaemonForRuntime = errors.New("daemonws: no daemon connected for runtime")
// SendToRuntime delivers a raw frame to one daemon connection serving
// runtimeID. If multiple daemons are registered (rare — usually one per
// runtime), the first one wins. Returns ErrNoDaemonForRuntime when no
// connection exists, or a "buffer full" error when the daemon's outbound
// queue is saturated — callers should surface that as a transient failure
// to the browser rather than retrying tightly.
func (h *Hub) SendToRuntime(runtimeID string, frame []byte) error {
if h == nil || runtimeID == "" {
return ErrNoDaemonForRuntime
}
h.mu.RLock()
var target *client
for c := range h.byRuntime[runtimeID] {
target = c
break
}
h.mu.RUnlock()
if target == nil {
return ErrNoDaemonForRuntime
}
if !target.trySend(frame) {
return errors.New("daemonws: daemon send buffer full")
}
return nil
}
// trySend pushes frame onto the client's outbound queue without blocking.
// Returns false if the buffer is saturated. We deliberately do not evict
// the connection here — terminal back-pressure should slow the producing
// browser/server side, not tear down the entire daemonws connection (which
// would also break heartbeat + wakeup delivery for unrelated runtimes).
func (c *client) trySend(frame []byte) bool {
select {
case c.send <- frame:
return true
default:
return false
}
}

View File

@@ -0,0 +1,95 @@
package daemonws
import (
"encoding/json"
"testing"
"github.com/multica-ai/multica/server/pkg/protocol"
)
type collectingSink struct {
frames [][]byte
}
func (s *collectingSink) Deliver(frame []byte) bool {
cp := make([]byte, len(frame))
copy(cp, frame)
s.frames = append(s.frames, cp)
return true
}
func TestTerminalRouter_RoutesByRequestThenSession(t *testing.T) {
router := NewTerminalRouter()
sink := &collectingSink{}
router.Register("req-1", sink)
// terminal.error before the daemon picked a session_id: routed on
// request_id. This is the failure path browsers see when the daemon
// can't spawn a PTY (e.g. ErrUnsupportedOS on windows).
errFrame := mustEncode(t, protocol.MessageTypeTerminalError, protocol.TerminalErrorPayload{
RequestID: "req-1",
Code: protocol.TerminalErrorCodeUnsupportedOS,
Message: "no PTY on windows",
})
router.Route(errFrame, protocol.MessageTypeTerminalError, mustPayload(t, errFrame))
if got, want := len(sink.frames), 1; got != want {
t.Fatalf("delivered = %d, want %d", got, want)
}
// Re-key to session_id after a hypothetical terminal.opened.
router.Register("sess-1", sink)
router.Unregister("req-1")
dataFrame := mustEncode(t, protocol.MessageTypeTerminalData, protocol.TerminalDataPayload{
SessionID: "sess-1",
DataB64: "Zm9vYmFy",
})
router.Route(dataFrame, protocol.MessageTypeTerminalData, mustPayload(t, dataFrame))
if got, want := len(sink.frames), 2; got != want {
t.Fatalf("after data delivered = %d, want %d", got, want)
}
// Frames for an unknown session must drop silently — never panic, and
// never leak into the wrong sink.
strayFrame := mustEncode(t, protocol.MessageTypeTerminalExit, protocol.TerminalExitPayload{
SessionID: "sess-2-unknown",
ExitCode: 0,
})
router.Route(strayFrame, protocol.MessageTypeTerminalExit, mustPayload(t, strayFrame))
if got, want := len(sink.frames), 2; got != want {
t.Fatalf("stray frame delivered to wrong sink: %d", got)
}
}
func TestTerminalRouter_UnknownSessionDropsSilently(t *testing.T) {
router := NewTerminalRouter()
frame := mustEncode(t, protocol.MessageTypeTerminalData, protocol.TerminalDataPayload{
SessionID: "ghost",
DataB64: "Zm9v",
})
// Should not panic / not deliver anywhere.
router.Route(frame, protocol.MessageTypeTerminalData, mustPayload(t, frame))
}
func mustEncode(t *testing.T, msgType string, payload any) []byte {
t.Helper()
raw, err := json.Marshal(payload)
if err != nil {
t.Fatalf("marshal payload: %v", err)
}
frame, err := json.Marshal(protocol.Message{Type: msgType, Payload: raw})
if err != nil {
t.Fatalf("marshal envelope: %v", err)
}
return frame
}
func mustPayload(t *testing.T, envelope []byte) json.RawMessage {
t.Helper()
var m protocol.Message
if err := json.Unmarshal(envelope, &m); err != nil {
t.Fatalf("unmarshal envelope: %v", err)
}
return m.Payload
}

View File

@@ -0,0 +1,582 @@
package handler
import (
"context"
"encoding/json"
"errors"
"log/slog"
"net/http"
"strconv"
"sync"
"time"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/auth"
"github.com/multica-ai/multica/server/internal/daemonws"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
"github.com/golang-jwt/jwt/v5"
)
// terminalWriteWait caps how long a single WriteMessage may block before
// we tear the browser connection down as slow-client. Matches the daemonws
// hub's writeWait so back-pressure semantics are consistent end-to-end.
const terminalWriteWait = 10 * time.Second
// terminalOpenTimeout caps how long the proxy waits for the daemon to
// respond to a terminal.open request with terminal.opened or terminal.error.
// 5s is generous: PTY spawn is local and synchronous on the daemon side.
const terminalOpenTimeout = 5 * time.Second
var terminalUpgrader = websocket.Upgrader{
// Browser cookie auth has already happened by the time we get here;
// cross-site cookie attacks are mitigated by SameSite=Lax on the auth
// cookie itself (see auth.SetAuthCookie). Re-evaluate if we ever
// support Authorization-header auth on this endpoint.
CheckOrigin: func(r *http.Request) bool { return true },
}
// HandleIssueTerminalWS proxies a browser WebSocket onto a PTY running on
// the daemon hosting the issue's most-recent agent task. The flow per
// connection:
//
// 1. Authenticate the user (cookie JWT preferred; first-message auth as
// fallback for clients that cannot set cookies — e.g. some Desktop
// dev modes).
// 2. Resolve issue → workspace → latest task with a non-empty work_dir +
// runtime_id. Fail closed if no such task exists; users see a clear
// "no task to attach to" error instead of a silent hang.
// 3. Register a sink on the daemonws TerminalRouter under a fresh
// request_id, then send terminal.open to the daemon.
// 4. On terminal.opened: re-register the sink under the session_id the
// daemon picked, drop the request_id route, and start the bidirectional
// pump until either side closes.
// 5. On disconnect: send terminal.close so the daemon tears the PTY down
// promptly rather than waiting for its idle sweep.
func (h *Handler) HandleIssueTerminalWS(w http.ResponseWriter, r *http.Request) {
if h.DaemonHub == nil {
http.Error(w, `{"error":"terminal proxy not configured"}`, http.StatusServiceUnavailable)
return
}
workspaceID := r.URL.Query().Get("workspace_id")
if workspaceID == "" {
http.Error(w, `{"error":"workspace_id required"}`, http.StatusBadRequest)
return
}
wsUUID, err := util.ParseUUID(workspaceID)
if err != nil {
http.Error(w, `{"error":"invalid workspace_id"}`, http.StatusBadRequest)
return
}
issueParam := chi.URLParam(r, "issue_id")
if issueParam == "" {
http.Error(w, `{"error":"issue_id required"}`, http.StatusBadRequest)
return
}
userID, errMsg := terminalAuthCookie(r, h)
if errMsg != "" && userID == "" {
// No cookie or invalid cookie. Defer auth to the first WS frame.
}
if userID != "" && !h.terminalIsMember(r.Context(), userID, workspaceID) {
http.Error(w, `{"error":"not a member of this workspace"}`, http.StatusForbidden)
return
}
conn, err := terminalUpgrader.Upgrade(w, r, nil)
if err != nil {
slog.Error("terminal ws upgrade failed", "error", err)
return
}
if userID == "" {
uid, errMsg := terminalFirstFrameAuth(conn, h)
if errMsg != "" {
_ = conn.WriteMessage(websocket.TextMessage, []byte(errMsg))
conn.Close()
return
}
if !h.terminalIsMember(r.Context(), uid, workspaceID) {
_ = conn.WriteMessage(websocket.TextMessage, []byte(`{"error":"not a member of this workspace"}`))
conn.Close()
return
}
userID = uid
_ = conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"auth_ack"}`))
}
issue, ok := h.terminalResolveIssue(r.Context(), issueParam, wsUUID)
if !ok {
sendTerminalErrorAndClose(conn, "", "", protocol.TerminalErrorCodeTaskNotFound, "issue not found")
return
}
task, ok := h.terminalLatestAttachableTask(r.Context(), issue.ID)
if !ok {
sendTerminalErrorAndClose(conn, "", "", protocol.TerminalErrorCodeTaskNotFound, "no agent task has run on this issue yet — trigger a run first")
return
}
cols := parseUint16Query(r, "cols", 80)
rows := parseUint16Query(r, "rows", 24)
proxy := newTerminalProxy(conn, h.DaemonHub, userID, util.UUIDToString(task.RuntimeID), util.UUIDToString(task.ID), workspaceID, util.UUIDToString(issue.ID), task.SessionID.String, task.WorkDir.String, cols, rows)
proxy.run()
}
// terminalProxy is the per-connection bridge between browser and daemon.
// Methods that touch the WS connection do so from one of two goroutines
// only: writePump owns conn writes and readPump owns conn reads, matching
// gorilla/websocket's single-goroutine-per-direction contract.
type terminalProxy struct {
conn *websocket.Conn
hub *daemonws.Hub
userID string
runtimeID string
taskID string
workspaceID string
issueID string
priorSess string
workDir string
requestID string
mu sync.Mutex
sessionID string
closedOnce sync.Once
closeCh chan struct{}
sendCh chan []byte
openedCh chan struct{}
openErr chan terminalOpenFailure
}
type terminalOpenFailure struct {
code string
message string
}
func newTerminalProxy(conn *websocket.Conn, hub *daemonws.Hub, userID, runtimeID, taskID, workspaceID, issueID, priorSess, workDir string, cols, rows uint16) *terminalProxy {
return &terminalProxy{
conn: conn,
hub: hub,
userID: userID,
runtimeID: runtimeID,
taskID: taskID,
workspaceID: workspaceID,
issueID: issueID,
priorSess: priorSess,
workDir: workDir,
requestID: uuid.NewString(),
closeCh: make(chan struct{}),
sendCh: make(chan []byte, 256),
openedCh: make(chan struct{}),
openErr: make(chan terminalOpenFailure, 1),
}
}
// Deliver implements daemonws.TerminalSink. The daemon hub's read pump
// invokes this for every terminal.* frame addressed to our request_id /
// session_id. We must stay non-blocking; the hub drops the frame on a
// full buffer rather than stalling its own pump.
func (p *terminalProxy) Deliver(frame []byte) bool {
select {
case p.sendCh <- frame:
return true
default:
return false
}
}
func (p *terminalProxy) run() {
defer p.conn.Close()
router := p.hub.TerminalRouter()
router.Register(p.requestID, p)
defer router.Unregister(p.requestID)
go p.writePump()
if err := p.sendOpenToDaemon(); err != nil {
// Couldn't reach the daemon at all — surface a structured error and
// bail before we register cleanup paths that assume a live session.
sendTerminalErrorOverChannel(p.sendCh, p.requestID, "", protocol.TerminalErrorCodeInternal, err.Error())
<-time.After(50 * time.Millisecond) // give writePump a tick to flush
p.shutdown()
return
}
// Block until the open ack arrives, the open is rejected, or the user
// disconnects mid-handshake. After this point sessionID is stable and
// we are routing on session_id rather than request_id.
select {
case <-p.openedCh:
// Open succeeded. The router is already re-keyed on session_id by
// observeOpen. Fall through to the bidirectional pump.
case failure := <-p.openErr:
sendTerminalErrorOverChannel(p.sendCh, p.requestID, "", failure.code, failure.message)
<-time.After(50 * time.Millisecond)
p.shutdown()
return
case <-p.closeCh:
return
case <-time.After(terminalOpenTimeout):
sendTerminalErrorOverChannel(p.sendCh, p.requestID, "", protocol.TerminalErrorCodeInternal, "daemon did not respond to terminal.open within timeout")
<-time.After(50 * time.Millisecond)
p.shutdown()
return
}
defer func() {
sid := p.SessionID()
if sid != "" {
router.Unregister(sid)
// Best-effort teardown on the daemon. If the connection to the
// daemon is already gone, the daemon's own clearWSWrites path
// will close the session — we just lose an idle slot's worth of
// latency before the GC catches it.
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalClose, protocol.TerminalClosePayload{
SessionID: sid,
Reason: "browser_disconnect",
})
if err == nil {
_ = p.hub.SendToRuntime(p.runtimeID, frame)
}
}
}()
p.readPump()
}
func (p *terminalProxy) SessionID() string {
p.mu.Lock()
defer p.mu.Unlock()
return p.sessionID
}
func (p *terminalProxy) sendOpenToDaemon() error {
payload := protocol.TerminalOpenPayload{
RequestID: p.requestID,
TaskID: p.taskID,
WorkspaceID: p.workspaceID,
UserID: p.userID,
IssueID: p.issueID,
WorkDir: p.workDir,
PriorSessionID: p.priorSess,
Cols: 80,
Rows: 24,
}
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalOpen, payload)
if err != nil {
return err
}
if err := p.hub.SendToRuntime(p.runtimeID, frame); err != nil {
if errors.Is(err, daemonws.ErrNoDaemonForRuntime) {
return errors.New("daemon offline for this runtime — start the agent's daemon and retry")
}
return err
}
return nil
}
// readPump reads browser frames and forwards terminal.data/resize/close to
// the daemon. Unknown frames are dropped silently. Returns when the
// connection closes; we then trigger shutdown so writePump exits too.
func (p *terminalProxy) readPump() {
defer p.shutdown()
p.conn.SetReadLimit(64 * 1024)
for {
_, raw, err := p.conn.ReadMessage()
if err != nil {
return
}
var env protocol.Message
if err := json.Unmarshal(raw, &env); err != nil {
continue
}
sid := p.SessionID()
// We force-stamp session_id on outbound frames: the browser may have
// sent its own session_id, but only the one the daemon assigned is
// trusted. This prevents a misbehaving client from addressing a
// session that another user opened against the same daemon.
switch env.Type {
case protocol.MessageTypeTerminalData:
var pl protocol.TerminalDataPayload
if err := json.Unmarshal(env.Payload, &pl); err != nil {
continue
}
pl.SessionID = sid
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalData, pl)
if err != nil {
continue
}
_ = p.hub.SendToRuntime(p.runtimeID, frame)
case protocol.MessageTypeTerminalResize:
var pl protocol.TerminalResizePayload
if err := json.Unmarshal(env.Payload, &pl); err != nil {
continue
}
pl.SessionID = sid
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalResize, pl)
if err != nil {
continue
}
_ = p.hub.SendToRuntime(p.runtimeID, frame)
case protocol.MessageTypeTerminalClose:
return
}
}
}
// writePump is the single owner of conn writes. It also watches for the
// terminal.opened / terminal.error handshake transition while it pumps,
// because separating those into a second goroutine would race over
// sendCh (only one reader per channel value).
//
// Once the session is open, this is just a straight relay. During the
// open window (sessionID empty), every frame is forwarded to the browser
// AND inspected — opened/error frames feed openedCh / openErr so run()
// can unblock or fail the handshake.
func (p *terminalProxy) writePump() {
router := p.hub.TerminalRouter()
for {
select {
case <-p.closeCh:
return
case frame, ok := <-p.sendCh:
if !ok {
return
}
p.forwardToBrowser(frame)
if p.SessionID() != "" {
continue
}
var env protocol.Message
if err := json.Unmarshal(frame, &env); err != nil {
continue
}
switch env.Type {
case protocol.MessageTypeTerminalOpened:
var op protocol.TerminalOpenedPayload
if err := json.Unmarshal(env.Payload, &op); err == nil && op.SessionID != "" {
p.mu.Lock()
p.sessionID = op.SessionID
p.mu.Unlock()
router.Register(op.SessionID, p)
router.Unregister(p.requestID)
close(p.openedCh)
}
case protocol.MessageTypeTerminalError:
var ep protocol.TerminalErrorPayload
if err := json.Unmarshal(env.Payload, &ep); err == nil {
select {
case p.openErr <- terminalOpenFailure{code: ep.Code, message: ep.Message}:
default:
}
}
}
}
}
}
func (p *terminalProxy) forwardToBrowser(frame []byte) {
p.conn.SetWriteDeadline(time.Now().Add(terminalWriteWait))
if err := p.conn.WriteMessage(websocket.TextMessage, frame); err != nil {
slog.Debug("terminal ws write to browser failed", "error", err)
p.shutdown()
}
}
func (p *terminalProxy) shutdown() {
p.closedOnce.Do(func() {
close(p.closeCh)
})
}
func sendTerminalErrorOverChannel(ch chan<- []byte, requestID, sessionID, code, message string) {
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalError, protocol.TerminalErrorPayload{
RequestID: requestID,
SessionID: sessionID,
Code: code,
Message: message,
})
if err != nil {
return
}
select {
case ch <- frame:
default:
}
}
func sendTerminalErrorAndClose(conn *websocket.Conn, requestID, sessionID, code, message string) {
frame, err := marshalTerminalFrame(protocol.MessageTypeTerminalError, protocol.TerminalErrorPayload{
RequestID: requestID,
SessionID: sessionID,
Code: code,
Message: message,
})
if err == nil {
_ = conn.WriteMessage(websocket.TextMessage, frame)
}
conn.Close()
}
func marshalTerminalFrame(msgType string, payload any) ([]byte, error) {
raw, err := json.Marshal(payload)
if err != nil {
return nil, err
}
return json.Marshal(protocol.Message{Type: msgType, Payload: raw})
}
func parseUint16Query(r *http.Request, key string, defaultVal uint16) uint16 {
raw := r.URL.Query().Get(key)
if raw == "" {
return defaultVal
}
v, err := strconv.ParseUint(raw, 10, 16)
if err != nil || v == 0 {
return defaultVal
}
return uint16(v)
}
// --- auth helpers (cookie + first-frame JWT/PAT) ---
func terminalAuthCookie(r *http.Request, h *Handler) (string, string) {
cookie, err := r.Cookie(auth.AuthCookieName)
if err != nil || cookie.Value == "" {
return "", ""
}
return terminalAuthToken(cookie.Value, h, r.Context())
}
func terminalFirstFrameAuth(conn *websocket.Conn, h *Handler) (string, string) {
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
defer conn.SetReadDeadline(time.Time{})
_, raw, err := conn.ReadMessage()
if err != nil {
return "", `{"error":"auth timeout or read error"}`
}
var msg struct {
Type string `json:"type"`
Payload struct {
Token string `json:"token"`
} `json:"payload"`
}
if err := json.Unmarshal(raw, &msg); err != nil || msg.Type != "auth" || msg.Payload.Token == "" {
return "", `{"error":"expected auth message as first frame"}`
}
return terminalAuthToken(msg.Payload.Token, h, context.Background())
}
func terminalAuthToken(tokenStr string, h *Handler, ctx context.Context) (string, string) {
if len(tokenStr) > 4 && tokenStr[:4] == "mul_" {
if h.PATCache == nil {
pat, err := h.Queries.GetPersonalAccessTokenByHash(ctx, auth.HashToken(tokenStr))
if err != nil {
return "", `{"error":"invalid token"}`
}
return util.UUIDToString(pat.UserID), ""
}
hash := auth.HashToken(tokenStr)
if uid, ok := h.PATCache.Get(ctx, hash); ok {
return uid, ""
}
pat, err := h.Queries.GetPersonalAccessTokenByHash(ctx, hash)
if err != nil {
return "", `{"error":"invalid token"}`
}
uid := util.UUIDToString(pat.UserID)
return uid, ""
}
token, err := jwt.Parse(tokenStr, func(token *jwt.Token) (any, error) {
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, jwt.ErrSignatureInvalid
}
return auth.JWTSecret(), nil
})
if err != nil || !token.Valid {
return "", `{"error":"invalid token"}`
}
claims, ok := token.Claims.(jwt.MapClaims)
if !ok {
return "", `{"error":"invalid claims"}`
}
uid, ok := claims["sub"].(string)
if !ok || uid == "" {
return "", `{"error":"invalid claims"}`
}
return uid, ""
}
func (h *Handler) terminalIsMember(ctx context.Context, userID, workspaceID string) bool {
userUUID, err := util.ParseUUID(userID)
if err != nil {
return false
}
wsUUID, err := util.ParseUUID(workspaceID)
if err != nil {
return false
}
_, err = h.Queries.GetMemberByUserAndWorkspace(ctx, db.GetMemberByUserAndWorkspaceParams{
UserID: userUUID,
WorkspaceID: wsUUID,
})
return err == nil
}
func (h *Handler) terminalResolveIssue(ctx context.Context, issueID string, wsUUID pgtype.UUID) (db.Issue, bool) {
if parts := splitIdentifier(issueID); parts != nil {
issue, err := h.Queries.GetIssueByNumber(ctx, db.GetIssueByNumberParams{
WorkspaceID: wsUUID,
Number: parts.number,
})
if err == nil {
return issue, true
}
}
issueUUID, err := util.ParseUUID(issueID)
if err != nil {
return db.Issue{}, false
}
issue, err := h.Queries.GetIssueInWorkspace(ctx, db.GetIssueInWorkspaceParams{
ID: issueUUID,
WorkspaceID: wsUUID,
})
if err != nil {
return db.Issue{}, false
}
return issue, true
}
// terminalLatestAttachableTask returns the most recent task on the issue
// that the proxy can attach to: must have a known work_dir, a runtime_id,
// and a daemon currently connected for that runtime. Falls back through
// the task history rather than picking only the absolute latest, because
// the most-recent row may be a queued task that never ran (no workdir yet).
func (h *Handler) terminalLatestAttachableTask(ctx context.Context, issueID pgtype.UUID) (db.AgentTaskQueue, bool) {
tasks, err := h.Queries.ListTasksByIssue(ctx, issueID)
if err != nil {
return db.AgentTaskQueue{}, false
}
for _, t := range tasks {
if !t.WorkDir.Valid || t.WorkDir.String == "" {
continue
}
if !t.RuntimeID.Valid {
continue
}
return t, true
}
return db.AgentTaskQueue{}, false
}

View File

@@ -168,3 +168,99 @@ type DaemonHeartbeatPendingLocalSkillImport struct {
ID string `json:"id"`
SkillKey string `json:"skill_key"`
}
// Terminal WS message types. These flow over the existing daemonws hub
// between client (web/desktop/CLI) and daemon. Bytes payloads are base64
// encoded so they can travel as JSON text frames without binary framing.
const (
// TerminalOpen — client → daemon: request a new PTY session bound to a task workdir.
MessageTypeTerminalOpen = "terminal.open"
// TerminalOpened — daemon → client: ack carrying the session_id and resolved workdir.
MessageTypeTerminalOpened = "terminal.opened"
// TerminalData — bidirectional: PTY stdin (client→daemon) / stdout+stderr (daemon→client).
MessageTypeTerminalData = "terminal.data"
// TerminalResize — client → daemon: window-size change.
MessageTypeTerminalResize = "terminal.resize"
// TerminalClose — bidirectional: explicit teardown request / ack.
MessageTypeTerminalClose = "terminal.close"
// TerminalExit — daemon → client: child process exited; carries exit code and optional reason.
MessageTypeTerminalExit = "terminal.exit"
// TerminalError — daemon → client: open/resize/etc. failed; carries human-readable code+message.
MessageTypeTerminalError = "terminal.error"
)
// TerminalOpenPayload requests a PTY session bound to the given task's
// workdir. WorkspaceID is the workspace the caller is acting in; the daemon
// must reject if it does not match the task's workspace.
//
// The server resolves WorkDir / IssueID / PriorSessionID from its own DB
// (the daemon has no persistent task cache) and embeds them here before
// forwarding to the daemon. The daemon trusts these fields because the
// daemonws connection is already authenticated and scoped — but it still
// rechecks WorkspaceID against the request body to catch a misrouted frame.
type TerminalOpenPayload struct {
RequestID string `json:"request_id"`
TaskID string `json:"task_id"`
WorkspaceID string `json:"workspace_id"`
UserID string `json:"user_id,omitempty"`
IssueID string `json:"issue_id,omitempty"`
WorkDir string `json:"work_dir,omitempty"`
PriorSessionID string `json:"prior_session_id,omitempty"`
Cols uint16 `json:"cols"`
Rows uint16 `json:"rows"`
}
// TerminalOpenedPayload echoes the request_id and carries the session_id the
// client must include on subsequent data/resize/close frames.
type TerminalOpenedPayload struct {
RequestID string `json:"request_id"`
SessionID string `json:"session_id"`
WorkDir string `json:"work_dir"`
Shell string `json:"shell"`
}
// TerminalDataPayload carries raw PTY bytes in base64.
type TerminalDataPayload struct {
SessionID string `json:"session_id"`
DataB64 string `json:"data_b64"`
}
// TerminalResizePayload updates the PTY window size.
type TerminalResizePayload struct {
SessionID string `json:"session_id"`
Cols uint16 `json:"cols"`
Rows uint16 `json:"rows"`
}
// TerminalClosePayload requests teardown. Reason is informational.
type TerminalClosePayload struct {
SessionID string `json:"session_id"`
Reason string `json:"reason,omitempty"`
}
// TerminalExitPayload signals the child process exited.
type TerminalExitPayload struct {
SessionID string `json:"session_id"`
ExitCode int `json:"exit_code"`
Reason string `json:"reason,omitempty"`
}
// Terminal error codes returned in TerminalErrorPayload.Code.
const (
TerminalErrorCodeWorkspaceMismatch = "workspace_mismatch"
TerminalErrorCodeTaskNotFound = "task_not_found"
TerminalErrorCodeSessionNotFound = "session_not_found"
TerminalErrorCodeUnsupportedOS = "unsupported_os"
TerminalErrorCodeSpawnFailed = "spawn_failed"
TerminalErrorCodeInternal = "internal"
)
// TerminalErrorPayload reports a failure. RequestID is set when the error
// is a response to a specific open request; SessionID is set when it
// references an already-established session.
type TerminalErrorPayload struct {
RequestID string `json:"request_id,omitempty"`
SessionID string `json:"session_id,omitempty"`
Code string `json:"code"`
Message string `json:"message"`
}