Compare commits

...

4 Commits

Author SHA1 Message Date
Jiayuan Zhang
953fdd5003 fix(daemon/terminal): close re-entry barrier + reap orphan PTY (MUL-2295)
- Manager.Close concurrent re-entry now blocks late callers on closeDone
  so every Close() return shares the "manager drained" guarantee.
- Open cleanup path on lost race with Close calls pty.Wait() to reap the
  child synchronously (waitLoop never runs there).
- Tests: concurrent Close callers all observe drained state; Open cleanup
  invokes pty.Wait at least once.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-16 17:10:22 +08:00
Jiayuan Zhang
e70f44b92b fix(daemon/terminal): lock Done()/Manager.Close finalize order (MUL-2295)
Round 2 review fixes:

1. PtySession finalize sequence is now
   ExitC -> close(output) -> onClose/deregister -> close(done)
   so external waiters (bridge / GC hook / audit) can `<-Done()` and
   immediately query the manager without a race window.

2. Manager.Close now waits for each session's Done() (not just Close())
   so by the time it returns the registry is empty and every session
   is fully finalized.

Adds TestSession_DoneFiresAfterDeregister (locks the ordering contract)
and TestManager_CloseWaitsForSessionFinalize (fakePTY.Wait delay proves
Manager.Close blocks through finalize).

Co-authored-by: multica-agent <github@multica.ai>
2026-05-16 17:00:02 +08:00
Jiayuan Zhang
281f1073b5 fix(daemon/terminal): address Phase 1 review feedback (MUL-2295)
Wires in the four fixes Emacs flagged on the Phase 1 review:

1. Lifecycle: split stop/done with a WaitGroup. readLoop and idleLoop
   exit via <-stop; waitLoop is the finalizer that waits on the WG
   before closing output/done. Eliminates the "send on closed channel"
   race when the output buffer is saturated. Adds a regression test
   that fills output, calls Close, and verifies Done converges + ExitC
   fires before output closes (the doc contract).

2. Errors: Manager.Open wraps spawner errors with double-%w so
   errors.Is matches both ErrSpawnFailed and ErrUnsupportedOS. Adds a
   test with a fake spawner that returns ErrUnsupportedOS.

3. Close path on unix: SIGHUP to the process group, 250ms grace,
   SIGKILL, then close fd — comment now matches behavior. Skips the
   signal+sleep work entirely when the child already exited naturally.
   Manager.Close fans out per-session Close in parallel so the grace
   period doesn't multiply by session count.

4. IdleTimeout semantics: removes the NewManager default that
   silently rewrote 0 to 60min. Zero/negative now disables, per the
   doc comment. Added DefaultIdleTimeout for daemon wiring to opt in
   explicitly.

Verified: go test, go test -race, GOOS=windows go test -c.
Co-authored-by: multica-agent <github@multica.ai>
2026-05-16 16:48:11 +08:00
Jiayuan Zhang
6758feba05 feat(daemon): add terminal Manager + PTY session (Phase 1, MUL-2295)
Daemon-side foundation for the Issue → Terminal feature. Manager owns
the lifecycle of all live PtySessions; sessions spawn a shell on a real
PTY via creack/pty (unix-only — Windows returns ErrUnsupportedOS until
ConPty support lands).

Open enforces the cross-workspace ACL — a client acting in workspace A
cannot attach to a task that belongs to workspace B. Each session
injects CLAUDE_SESSION_ID + MULTICA_{WORKSPACE,ISSUE,TASK,USER}_ID into
the child env so `claude --resume $CLAUDE_SESSION_ID` continues the
same session the agent run was using.

Adds the terminal.* WebSocket message types to server/pkg/protocol so
Phase 2 (daemonws routing) and Phase 3 (CLI) can land without touching
the manager.

Tests cover open, data round-trip, resize, explicit close, idle timeout
sweep, manager shutdown, cross-workspace rejection, and unknown task.
A fake Spawner backed by channels lets tests exercise lifecycle without
forking a real shell.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-16 16:32:39 +08:00
11 changed files with 1667 additions and 0 deletions

View File

@@ -8,6 +8,7 @@ require (
github.com/aws/aws-sdk-go-v2/credentials v1.19.13
github.com/aws/aws-sdk-go-v2/service/s3 v1.97.3
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.5
github.com/creack/pty v1.1.21
github.com/go-chi/chi/v5 v5.2.5
github.com/go-chi/cors v1.2.2
github.com/golang-jwt/jwt/v5 v5.3.1

View File

@@ -48,6 +48,8 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.21 h1:1/QdRyBaHHJP61QkWMXlOIBfsgdDeeKfK8SYVUWJKf0=
github.com/creack/pty v1.1.21/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@@ -0,0 +1,14 @@
// Package terminal manages interactive PTY sessions bound to a task's
// workdir on the local daemon.
//
// A Manager owns the lifecycle of all live PtySessions. Callers (the
// daemonws bridge today, the CLI socket later) translate WebSocket
// terminal.* frames into method calls on Manager, and the Manager
// forwards PTY output back through a per-session Output channel.
//
// Sessions run with the daemon process's identity — there is no
// additional sandbox. This is the same trust boundary as agent runs
// (which are also daemon-spawned child processes). The Manager only
// enforces that the requesting client's workspace matches the task's
// workspace; anything beyond that is the OS's responsibility.
package terminal

View File

@@ -0,0 +1,14 @@
package terminal
import "errors"
// Sentinel errors returned by Manager. Callers map these to the
// protocol.TerminalErrorCode* constants when reporting to clients.
var (
ErrTaskNotFound = errors.New("terminal: task not found")
ErrWorkspaceMismatch = errors.New("terminal: task belongs to a different workspace")
ErrSessionNotFound = errors.New("terminal: session not found")
ErrUnsupportedOS = errors.New("terminal: PTY not supported on this OS")
ErrSpawnFailed = errors.New("terminal: failed to spawn shell")
ErrManagerClosed = errors.New("terminal: manager is shut down")
)

View File

@@ -0,0 +1,324 @@
package terminal
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"github.com/google/uuid"
)
// DefaultIdleTimeout is the recommended IdleTimeout for production
// daemon wiring. Callers must set ManagerConfig.IdleTimeout to this
// (or any positive duration) explicitly; zero/negative disables the
// idle sweep.
const DefaultIdleTimeout = 60 * time.Minute
// TaskInfo is the subset of task state the Manager needs to set up a PTY.
// The daemon resolves a TaskID into TaskInfo via TaskLookup at open time.
type TaskInfo struct {
TaskID string
WorkspaceID string
IssueID string
WorkDir string
PriorSessionID string // injected as CLAUDE_SESSION_ID for `claude --resume`
}
// TaskLookup resolves a TaskID into the workdir + workspace required to
// open a PTY. Returns ErrTaskNotFound when the task is unknown. Lookups
// hit the daemon's local task cache in production; tests supply a stub.
type TaskLookup func(ctx context.Context, taskID string) (TaskInfo, error)
// OpenParams is the input to Manager.Open.
type OpenParams struct {
// TaskID identifies the workdir the PTY should run in.
TaskID string
// WorkspaceID is the workspace the caller is acting on behalf of.
// Open rejects the request if it does not match the task's workspace
// (cross-workspace ACL — clients never see other workspaces' workdirs).
WorkspaceID string
// UserID is the human user who opened the terminal. Logged in audit
// records; the PTY itself runs as the daemon process owner.
UserID string
// Cols/Rows seed the initial PTY window size. Zero values default to 80x24.
Cols uint16
Rows uint16
}
// ManagerConfig tunes Manager behaviour. Zero values are sensible defaults.
type ManagerConfig struct {
// Shell to spawn for each session. Defaults to "bash" with "-l".
// Overridable for tests; the production daemon hardcodes bash for now
// (RFC open question #4 — shell selection deferred to a later release).
ShellPath string
ShellArgs []string
// IdleTimeout closes a session that has had no I/O for this long.
// Zero or negative disables the sweep entirely. Production daemon
// wiring should pass DefaultIdleTimeout explicitly; we intentionally
// don't default here so callers stay in control (the docs page for
// this package previously said "0 disables" while NewManager silently
// rewrote 0 to 60min — those two have to agree).
IdleTimeout time.Duration
// Spawner overrides PTY spawning. Defaults to ptyStartShell which
// shells out to creack/pty. Tests inject a fake to avoid forking.
Spawner Spawner
// Now returns the current time. Defaults to time.Now. Tests inject a
// fake clock to drive IdleTimeout deterministically.
Now func() time.Time
// Logger receives operational events. Defaults to slog.Default().
Logger *slog.Logger
}
// Manager owns all live PtySessions on this daemon. It is safe for
// concurrent use.
type Manager struct {
cfg ManagerConfig
lookup TaskLookup
mu sync.Mutex
sessions map[string]*PtySession
closed bool
// closeDone is closed by the first Close() caller AFTER finalize
// finishes (every session deregistered, Done() closed). Subsequent
// concurrent callers wait on it instead of racing past, so all
// Close() returns share the same "manager fully drained" guarantee.
closeDone chan struct{}
}
// NewManager constructs a Manager. lookup may be nil in tests that only
// exercise direct session APIs.
func NewManager(cfg ManagerConfig, lookup TaskLookup) *Manager {
if cfg.ShellPath == "" {
cfg.ShellPath = "bash"
cfg.ShellArgs = []string{"-l"}
}
// IdleTimeout intentionally not defaulted — see ManagerConfig.
if cfg.Spawner == nil {
cfg.Spawner = realSpawner{}
}
if cfg.Now == nil {
cfg.Now = time.Now
}
if cfg.Logger == nil {
cfg.Logger = slog.Default()
}
return &Manager{
cfg: cfg,
lookup: lookup,
sessions: make(map[string]*PtySession),
closeDone: make(chan struct{}),
}
}
// Open spawns a new PTY session for the given task. The returned
// session is also registered with the manager and retrievable via Get.
func (m *Manager) Open(ctx context.Context, p OpenParams) (*PtySession, error) {
if m.lookup == nil {
return nil, fmt.Errorf("terminal: Manager has no TaskLookup configured")
}
info, err := m.lookup(ctx, p.TaskID)
if err != nil {
return nil, err
}
if info.WorkspaceID != p.WorkspaceID {
return nil, ErrWorkspaceMismatch
}
if info.WorkDir == "" {
return nil, ErrTaskNotFound
}
cols, rows := normalizeSize(p.Cols, p.Rows)
env := buildEnv(info, p.UserID)
m.mu.Lock()
if m.closed {
m.mu.Unlock()
return nil, ErrManagerClosed
}
m.mu.Unlock()
startedAt := m.cfg.Now()
pty, err := m.cfg.Spawner.Start(SpawnRequest{
Shell: m.cfg.ShellPath,
Args: m.cfg.ShellArgs,
Cwd: info.WorkDir,
Env: env,
Cols: cols,
Rows: rows,
Started: startedAt,
})
if err != nil {
// Double-%w so errors.Is matches both ErrSpawnFailed AND any
// sentinel the spawner surfaced (notably ErrUnsupportedOS from
// the windows stub — the protocol layer needs to distinguish
// "no PTY on this OS" from generic spawn failures).
return nil, fmt.Errorf("%w: %w", ErrSpawnFailed, err)
}
sess := &PtySession{
id: uuid.NewString(),
taskID: info.TaskID,
workspaceID: info.WorkspaceID,
issueID: info.IssueID,
workDir: info.WorkDir,
userID: p.UserID,
shellPath: m.cfg.ShellPath,
cols: cols,
rows: rows,
pty: pty,
output: make(chan []byte, 64),
exit: make(chan ExitInfo, 1),
done: make(chan struct{}),
stop: make(chan struct{}),
now: m.cfg.Now,
idleTimeout: m.cfg.IdleTimeout,
startedAt: startedAt,
lastIO: startedAt,
logger: m.cfg.Logger.With("session_id_pending", true, "task_id", info.TaskID),
onClose: func(id string) { m.deregister(id) },
}
sess.logger = m.cfg.Logger.With("session_id", sess.id, "task_id", info.TaskID)
m.mu.Lock()
if m.closed {
m.mu.Unlock()
_ = pty.Close()
// We won that race: spawn succeeded but the manager closed before
// we could register the session, so waitLoop never runs. Reap the
// child synchronously here — pty.Close fires SIGHUP/SIGKILL but
// only Wait() collects the exit status, otherwise the unix child
// stays around as a zombie until the daemon process dies.
_, _ = pty.Wait()
return nil, ErrManagerClosed
}
m.sessions[sess.id] = sess
m.mu.Unlock()
sess.start()
return sess, nil
}
// Get returns the session with the given id, or ErrSessionNotFound.
func (m *Manager) Get(id string) (*PtySession, error) {
m.mu.Lock()
defer m.mu.Unlock()
sess, ok := m.sessions[id]
if !ok {
return nil, ErrSessionNotFound
}
return sess, nil
}
// Sessions returns a snapshot of currently registered session IDs.
func (m *Manager) Sessions() []string {
m.mu.Lock()
defer m.mu.Unlock()
ids := make([]string, 0, len(m.sessions))
for id := range m.sessions {
ids = append(ids, id)
}
return ids
}
// Close tears down every live session and refuses subsequent Open calls.
// Safe to call concurrently from multiple goroutines: the first caller
// runs the actual teardown, the rest block on closeDone until that
// teardown is fully observable. Every Close() return — first or Nth —
// thus carries the same "manager drained, every session finalized"
// guarantee that downstream GC/audit cleanup depends on.
func (m *Manager) Close() {
m.mu.Lock()
if m.closed {
done := m.closeDone
m.mu.Unlock()
<-done
return
}
m.closed = true
live := make([]*PtySession, 0, len(m.sessions))
for _, s := range m.sessions {
live = append(live, s)
}
m.mu.Unlock()
// Parallel: each session.Close blocks for the unix spawner's
// SIGHUP→grace→SIGKILL window. Running serially would multiply
// shutdown latency by N sessions. We additionally wait on each
// session's Done() so Manager.Close returning is a hard guarantee
// that every session finalized (output closed, deregistered, done
// fired) — downstream GC/audit cleanup relies on this.
var wg sync.WaitGroup
for _, s := range live {
wg.Add(1)
go func(s *PtySession) {
defer wg.Done()
s.Close("manager_shutdown")
<-s.Done()
}(s)
}
wg.Wait()
close(m.closeDone)
}
// CheckIdle walks every session and closes those whose idle interval
// has elapsed. The daemon's existing GC loop calls this periodically;
// each session also self-monitors via its own timer for cases where the
// outer loop runs at a coarser cadence than IdleTimeout.
func (m *Manager) CheckIdle() {
if m.cfg.IdleTimeout <= 0 {
return
}
m.mu.Lock()
sessions := make([]*PtySession, 0, len(m.sessions))
for _, s := range m.sessions {
sessions = append(sessions, s)
}
m.mu.Unlock()
now := m.cfg.Now()
for _, s := range sessions {
if now.Sub(s.LastIO()) >= m.cfg.IdleTimeout {
s.Close("idle_timeout")
}
}
}
func (m *Manager) deregister(id string) {
m.mu.Lock()
delete(m.sessions, id)
m.mu.Unlock()
}
func normalizeSize(cols, rows uint16) (uint16, uint16) {
if cols == 0 {
cols = 80
}
if rows == 0 {
rows = 24
}
return cols, rows
}
func buildEnv(info TaskInfo, userID string) []string {
env := []string{
"MULTICA_WORKSPACE_ID=" + info.WorkspaceID,
"MULTICA_TASK_ID=" + info.TaskID,
}
if info.IssueID != "" {
env = append(env, "MULTICA_ISSUE_ID="+info.IssueID)
}
if userID != "" {
env = append(env, "MULTICA_USER_ID="+userID)
}
if info.PriorSessionID != "" {
// Injected so `claude --resume $CLAUDE_SESSION_ID` continues the
// same session that the agent run was using (see RFC §Resume).
env = append(env, "CLAUDE_SESSION_ID="+info.PriorSessionID)
}
return env
}

View File

@@ -0,0 +1,804 @@
package terminal
import (
"context"
"errors"
"io"
"sync"
"sync/atomic"
"testing"
"time"
)
// fakePTY is a Spawner-served stand-in for a real PTY. Tests push child
// output via WriteFromChild and read client input via ReadFromClient.
type fakePTY struct {
t *testing.T
// child -> client (output queue, read by readLoop)
childToClient chan []byte
// client -> child (writes captured into a buffer slice under mu)
mu sync.Mutex
clientWrote [][]byte
cols, rows uint16
// closeOnce coordinates teardown
closeOnce sync.Once
closeCh chan struct{}
// waitDone signals Wait can return. Defaults closed by Close.
waitOnce sync.Once
waitDone chan struct{}
// waitDelay (optional) sleeps inside Wait AFTER waitDone fires.
// Lets tests prove Manager.Close waits for session finalize rather
// than just for s.Close() to return.
waitDelay time.Duration
exitCode int32
resizedCh chan [2]uint16
closed atomic.Bool
// waitCount tracks how many times Wait() was invoked. Lets tests
// assert the cleanup path reaped the child even when no session was
// ever registered (Manager.Close racing Open).
waitCount atomic.Int32
}
func newFakePTY(t *testing.T, cols, rows uint16) *fakePTY {
return &fakePTY{
t: t,
childToClient: make(chan []byte, 8),
cols: cols,
rows: rows,
closeCh: make(chan struct{}),
waitDone: make(chan struct{}),
resizedCh: make(chan [2]uint16, 8),
}
}
func (p *fakePTY) Read(b []byte) (int, error) {
select {
case chunk, ok := <-p.childToClient:
if !ok {
return 0, io.EOF
}
n := copy(b, chunk)
return n, nil
case <-p.closeCh:
return 0, io.EOF
}
}
func (p *fakePTY) Write(b []byte) (int, error) {
if p.closed.Load() {
return 0, io.ErrClosedPipe
}
p.mu.Lock()
c := make([]byte, len(b))
copy(c, b)
p.clientWrote = append(p.clientWrote, c)
p.mu.Unlock()
return len(b), nil
}
func (p *fakePTY) Resize(cols, rows uint16) error {
if p.closed.Load() {
return io.ErrClosedPipe
}
p.mu.Lock()
p.cols, p.rows = cols, rows
p.mu.Unlock()
select {
case p.resizedCh <- [2]uint16{cols, rows}:
default:
}
return nil
}
func (p *fakePTY) Wait() (int, error) {
p.waitCount.Add(1)
<-p.waitDone
if p.waitDelay > 0 {
time.Sleep(p.waitDelay)
}
return int(atomic.LoadInt32(&p.exitCode)), nil
}
func (p *fakePTY) Close() error {
p.closeOnce.Do(func() {
p.closed.Store(true)
close(p.closeCh)
close(p.childToClient)
p.waitOnce.Do(func() { close(p.waitDone) })
})
return nil
}
// pushChildOutput simulates the shell writing bytes to its stdout/stderr.
func (p *fakePTY) pushChildOutput(b []byte) {
select {
case p.childToClient <- b:
case <-time.After(time.Second):
p.t.Fatalf("childToClient send timed out — readLoop not draining")
}
}
func (p *fakePTY) writes() [][]byte {
p.mu.Lock()
defer p.mu.Unlock()
out := make([][]byte, len(p.clientWrote))
copy(out, p.clientWrote)
return out
}
func (p *fakePTY) size() (uint16, uint16) {
p.mu.Lock()
defer p.mu.Unlock()
return p.cols, p.rows
}
// fakeSpawner records each spawn so tests can inspect injected env / cwd.
type fakeSpawner struct {
t *testing.T
spawnsMu sync.Mutex
spawns []SpawnRequest
make func(*testing.T, SpawnRequest) (*fakePTY, error)
}
func (s *fakeSpawner) Start(req SpawnRequest) (PTY, error) {
s.spawnsMu.Lock()
s.spawns = append(s.spawns, req)
s.spawnsMu.Unlock()
pty, err := s.make(s.t, req)
if err != nil {
return nil, err
}
return pty, nil
}
func (s *fakeSpawner) lastRequest() SpawnRequest {
s.spawnsMu.Lock()
defer s.spawnsMu.Unlock()
if len(s.spawns) == 0 {
return SpawnRequest{}
}
return s.spawns[len(s.spawns)-1]
}
// helper: build a Manager with a default fake spawner and a single task.
type fixture struct {
mgr *Manager
spawner *fakeSpawner
tasks map[string]TaskInfo
now func() time.Time
clockMu sync.Mutex
clock time.Time
}
func newFixture(t *testing.T, opts ...func(*ManagerConfig)) *fixture {
f := &fixture{
tasks: map[string]TaskInfo{
"task-1": {
TaskID: "task-1",
WorkspaceID: "ws-A",
IssueID: "issue-1",
WorkDir: t.TempDir(),
PriorSessionID: "claude-session-xyz",
},
},
clock: time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC),
}
f.now = func() time.Time {
f.clockMu.Lock()
defer f.clockMu.Unlock()
return f.clock
}
f.spawner = &fakeSpawner{
t: t,
make: func(tt *testing.T, req SpawnRequest) (*fakePTY, error) { return newFakePTY(tt, req.Cols, req.Rows), nil },
}
cfg := ManagerConfig{
ShellPath: "/usr/bin/bash",
ShellArgs: []string{"-l"},
IdleTimeout: 0,
Spawner: f.spawner,
Now: f.now,
}
for _, opt := range opts {
opt(&cfg)
}
lookup := func(_ context.Context, id string) (TaskInfo, error) {
info, ok := f.tasks[id]
if !ok {
return TaskInfo{}, ErrTaskNotFound
}
return info, nil
}
f.mgr = NewManager(cfg, lookup)
return f
}
func (f *fixture) advance(d time.Duration) {
f.clockMu.Lock()
f.clock = f.clock.Add(d)
f.clockMu.Unlock()
}
// drainPTY pulls the *fakePTY back out of the spawner so tests can drive it.
func (f *fixture) lastPTY(t *testing.T) *fakePTY {
t.Helper()
req := f.spawner.lastRequest()
if req.Shell == "" {
t.Fatal("no spawn recorded")
}
// The Spawner.make closure always returns a *fakePTY; the manager
// wraps it as a PTY interface and we don't retain the concrete in
// the manager. Re-acquire via the registry by walking sessions.
for _, id := range f.mgr.Sessions() {
s, err := f.mgr.Get(id)
if err == nil {
if fp, ok := s.pty.(*fakePTY); ok {
return fp
}
}
}
t.Fatal("no fake PTY found in any registered session")
return nil
}
func TestManager_OpenSpawnsWithInjectedEnvAndCwd(t *testing.T) {
f := newFixture(t)
defer f.mgr.Close()
sess, err := f.mgr.Open(context.Background(), OpenParams{
TaskID: "task-1",
WorkspaceID: "ws-A",
UserID: "user-42",
Cols: 120,
Rows: 40,
})
if err != nil {
t.Fatalf("Open: %v", err)
}
if sess.ID() == "" {
t.Fatal("session ID empty")
}
if got := sess.WorkDir(); got != f.tasks["task-1"].WorkDir {
t.Errorf("workdir = %q, want %q", got, f.tasks["task-1"].WorkDir)
}
req := f.spawner.lastRequest()
if req.Cwd != f.tasks["task-1"].WorkDir {
t.Errorf("spawn cwd = %q, want %q", req.Cwd, f.tasks["task-1"].WorkDir)
}
if req.Cols != 120 || req.Rows != 40 {
t.Errorf("spawn size = %dx%d, want 120x40", req.Cols, req.Rows)
}
wantEnv := map[string]string{
"MULTICA_WORKSPACE_ID": "ws-A",
"MULTICA_TASK_ID": "task-1",
"MULTICA_ISSUE_ID": "issue-1",
"MULTICA_USER_ID": "user-42",
"CLAUDE_SESSION_ID": "claude-session-xyz",
}
envMap := map[string]string{}
for _, kv := range req.Env {
for i := 0; i < len(kv); i++ {
if kv[i] == '=' {
envMap[kv[:i]] = kv[i+1:]
break
}
}
}
for k, want := range wantEnv {
if got := envMap[k]; got != want {
t.Errorf("env %s = %q, want %q", k, got, want)
}
}
}
func TestManager_DefaultSize(t *testing.T) {
f := newFixture(t)
defer f.mgr.Close()
_, err := f.mgr.Open(context.Background(), OpenParams{
TaskID: "task-1",
WorkspaceID: "ws-A",
})
if err != nil {
t.Fatalf("Open: %v", err)
}
req := f.spawner.lastRequest()
if req.Cols != 80 || req.Rows != 24 {
t.Errorf("default size = %dx%d, want 80x24", req.Cols, req.Rows)
}
}
func TestManager_RejectsCrossWorkspace(t *testing.T) {
f := newFixture(t)
defer f.mgr.Close()
_, err := f.mgr.Open(context.Background(), OpenParams{
TaskID: "task-1",
WorkspaceID: "ws-B-not-the-tasks-workspace",
})
if !errors.Is(err, ErrWorkspaceMismatch) {
t.Fatalf("Open err = %v, want ErrWorkspaceMismatch", err)
}
if got := len(f.mgr.Sessions()); got != 0 {
t.Errorf("Sessions after rejected open = %d, want 0", got)
}
}
func TestManager_RejectsUnknownTask(t *testing.T) {
f := newFixture(t)
defer f.mgr.Close()
_, err := f.mgr.Open(context.Background(), OpenParams{
TaskID: "does-not-exist",
WorkspaceID: "ws-A",
})
if !errors.Is(err, ErrTaskNotFound) {
t.Fatalf("Open err = %v, want ErrTaskNotFound", err)
}
}
func TestSession_DataRoundTrip(t *testing.T) {
f := newFixture(t)
defer f.mgr.Close()
sess, err := f.mgr.Open(context.Background(), OpenParams{
TaskID: "task-1",
WorkspaceID: "ws-A",
})
if err != nil {
t.Fatalf("Open: %v", err)
}
pty := f.lastPTY(t)
// client → child
if _, err := sess.Write([]byte("ls -al\n")); err != nil {
t.Fatalf("Write: %v", err)
}
// child → client
pty.pushChildOutput([]byte("total 0\n"))
select {
case got := <-sess.Output():
if string(got) != "total 0\n" {
t.Errorf("Output chunk = %q, want %q", got, "total 0\n")
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for Output chunk")
}
writes := pty.writes()
if len(writes) != 1 || string(writes[0]) != "ls -al\n" {
t.Errorf("recorded writes = %#v, want one 'ls -al\\n'", writes)
}
}
func TestSession_Resize(t *testing.T) {
f := newFixture(t)
defer f.mgr.Close()
sess, err := f.mgr.Open(context.Background(), OpenParams{
TaskID: "task-1",
WorkspaceID: "ws-A",
Cols: 80, Rows: 24,
})
if err != nil {
t.Fatalf("Open: %v", err)
}
pty := f.lastPTY(t)
if err := sess.Resize(132, 50); err != nil {
t.Fatalf("Resize: %v", err)
}
c, r := sess.Size()
if c != 132 || r != 50 {
t.Errorf("Size = %dx%d, want 132x50", c, r)
}
gc, gr := pty.size()
if gc != 132 || gr != 50 {
t.Errorf("PTY size = %dx%d, want 132x50", gc, gr)
}
}
func TestSession_CloseDeregistersAndDelivers(t *testing.T) {
f := newFixture(t)
defer f.mgr.Close()
sess, err := f.mgr.Open(context.Background(), OpenParams{
TaskID: "task-1",
WorkspaceID: "ws-A",
})
if err != nil {
t.Fatalf("Open: %v", err)
}
id := sess.ID()
sess.Close("user_requested")
select {
case info := <-sess.ExitC():
if info.Reason != "user_requested" {
t.Errorf("exit reason = %q, want user_requested", info.Reason)
}
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for ExitC")
}
// Output should close once exit fires; verify by ranging.
drained := false
for range sess.Output() {
drained = true
}
_ = drained
<-sess.Done()
// Session must be deregistered.
if _, err := f.mgr.Get(id); !errors.Is(err, ErrSessionNotFound) {
t.Errorf("Get after Close = %v, want ErrSessionNotFound", err)
}
}
func TestManager_IdleTimeoutSweep(t *testing.T) {
f := newFixture(t, func(c *ManagerConfig) {
c.IdleTimeout = 30 * time.Minute
})
defer f.mgr.Close()
sess, err := f.mgr.Open(context.Background(), OpenParams{
TaskID: "task-1",
WorkspaceID: "ws-A",
})
if err != nil {
t.Fatalf("Open: %v", err)
}
// 29 minutes — still active.
f.advance(29 * time.Minute)
f.mgr.CheckIdle()
if _, err := f.mgr.Get(sess.ID()); err != nil {
t.Fatalf("session evicted before idle timeout: %v", err)
}
// Cross the threshold.
f.advance(2 * time.Minute)
f.mgr.CheckIdle()
select {
case info := <-sess.ExitC():
if info.Reason != "idle_timeout" {
t.Errorf("exit reason = %q, want idle_timeout", info.Reason)
}
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for idle close")
}
if _, err := f.mgr.Get(sess.ID()); !errors.Is(err, ErrSessionNotFound) {
t.Errorf("session not deregistered after idle sweep")
}
}
func TestManager_CloseTearsDownAllSessions(t *testing.T) {
f := newFixture(t)
s1, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
if err != nil {
t.Fatalf("Open: %v", err)
}
s2, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
if err != nil {
t.Fatalf("Open: %v", err)
}
f.mgr.Close()
for _, s := range []*PtySession{s1, s2} {
select {
case <-s.Done():
case <-time.After(2 * time.Second):
t.Fatalf("session %s did not tear down", s.ID())
}
}
if got := len(f.mgr.Sessions()); got != 0 {
t.Errorf("Sessions after Manager.Close = %d, want 0", got)
}
// Subsequent opens must be rejected.
if _, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"}); !errors.Is(err, ErrManagerClosed) {
t.Errorf("Open after Close = %v, want ErrManagerClosed", err)
}
}
func TestSession_CloseWithFullOutputBufferDoesNotPanic(t *testing.T) {
// Regression: Close used to race with readLoop's "output <- chunk"
// when the channel was full. waitLoop closed output unconditionally,
// which could panic on send-to-closed-channel. The new lifecycle
// has waitLoop wait on a WaitGroup so readLoop's blocked send
// unblocks via <-stop before the close runs.
f := newFixture(t)
defer f.mgr.Close()
// Override on the existing spawner so newFixture's wiring (and
// f.spawner.lastRequest tracking) still works.
f.spawner.make = func(tt *testing.T, req SpawnRequest) (*fakePTY, error) {
p := newFakePTY(tt, req.Cols, req.Rows)
// Give the child-side queue plenty of room so the test can
// saturate the *session* output buffer before childToClient
// back-pressures the producer goroutine.
p.childToClient = make(chan []byte, 256)
return p, nil
}
sess, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
if err != nil {
t.Fatalf("Open: %v", err)
}
pty := f.lastPTY(t)
// Pump enough chunks to fill session.output (cap 64) and queue more
// on childToClient; readLoop ends up blocked on output <- chunk.
// Don't drain sess.Output() — that's the whole point. Producer runs
// to completion (and exits) BEFORE Close, otherwise producer's send
// races Close's pty.Close which closes childToClient.
producerDone := make(chan struct{})
go func() {
defer close(producerDone)
for i := 0; i < 200; i++ {
select {
case pty.childToClient <- []byte("x"):
case <-time.After(50 * time.Millisecond):
return
}
}
}()
<-producerDone
// Should not panic, should not hang.
sess.Close("user_requested")
select {
case <-sess.Done():
case <-time.After(2 * time.Second):
t.Fatal("Done did not converge after Close with saturated output buffer")
}
if _, err := f.mgr.Get(sess.ID()); !errors.Is(err, ErrSessionNotFound) {
t.Errorf("session not deregistered after Close")
}
// ExitC must have fired before Done — required by the Output() doc
// contract ("channel closes after the child exits and a value has
// been delivered on ExitC()").
select {
case info := <-sess.ExitC():
if info.Reason != "user_requested" {
t.Errorf("exit reason = %q, want user_requested", info.Reason)
}
default:
t.Error("ExitC was empty after Done — finalize order violated")
}
}
func TestManager_OpenPropagatesUnsupportedOS(t *testing.T) {
// Regression: Manager.Open used fmt.Errorf("%w: %v", ErrSpawnFailed, err)
// which swallowed the inner sentinel. The protocol layer needs
// errors.Is to match both ErrSpawnFailed and ErrUnsupportedOS so it
// can map to terminal.error code "unsupported_os" instead of a
// generic "spawn_failed". Switched to double-%w; both must match.
f := newFixture(t)
defer f.mgr.Close()
f.spawner.make = func(_ *testing.T, _ SpawnRequest) (*fakePTY, error) {
return nil, ErrUnsupportedOS
}
_, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
if err == nil {
t.Fatal("Open returned nil err with failing spawner")
}
if !errors.Is(err, ErrUnsupportedOS) {
t.Errorf("errors.Is(err, ErrUnsupportedOS) = false; err = %v", err)
}
if !errors.Is(err, ErrSpawnFailed) {
t.Errorf("errors.Is(err, ErrSpawnFailed) = false; err = %v", err)
}
}
func TestSession_WriteUpdatesLastIO(t *testing.T) {
f := newFixture(t, func(c *ManagerConfig) {
c.IdleTimeout = 30 * time.Minute
})
defer f.mgr.Close()
sess, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
if err != nil {
t.Fatalf("Open: %v", err)
}
f.advance(20 * time.Minute)
if _, err := sess.Write([]byte("echo hi\n")); err != nil {
t.Fatalf("Write: %v", err)
}
f.advance(20 * time.Minute) // total 40min, but 20 min since last IO
f.mgr.CheckIdle()
if _, err := f.mgr.Get(sess.ID()); err != nil {
t.Fatalf("session evicted despite recent write: %v", err)
}
}
func TestSession_DoneFiresAfterDeregister(t *testing.T) {
// Locks the finalize-order contract from Round 2 review:
// ExitC → close(output) → onClose/deregister → close(done)
// External waiters (daemonws bridge, GC hook, audit) use `<-Done()`
// as the signal that the session is fully torn down. Any consumer
// querying the manager immediately after Done() must observe the
// session deregistered.
f := newFixture(t)
defer f.mgr.Close()
sess, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
if err != nil {
t.Fatalf("Open: %v", err)
}
id := sess.ID()
sess.Close("user_requested")
<-sess.Done()
if _, err := f.mgr.Get(id); !errors.Is(err, ErrSessionNotFound) {
t.Fatalf("Get after <-Done() = %v, want ErrSessionNotFound (finalize order violated)", err)
}
}
func TestManager_CloseConcurrentReentryWaitsForFinalize(t *testing.T) {
// Regression for Round 3 review: a late Close() caller used to return
// immediately when it saw m.closed==true, even though the first caller
// was still in the middle of waiting for each session's Done(). That
// broke the "Manager.Close returning means everything is drained"
// contract for every caller but the first. With closeDone, all callers
// now share the same finalize barrier.
f := newFixture(t)
f.spawner.make = func(tt *testing.T, req SpawnRequest) (*fakePTY, error) {
p := newFakePTY(tt, req.Cols, req.Rows)
// Long enough that the second goroutine's Close call definitely
// observes the first one mid-flight rather than already-finished.
p.waitDelay = 200 * time.Millisecond
return p, nil
}
s1, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
if err != nil {
t.Fatalf("Open: %v", err)
}
const callers = 4
var wg sync.WaitGroup
wg.Add(callers)
sessionsAfter := make([]int, callers)
doneClosed := make([]bool, callers)
for i := 0; i < callers; i++ {
i := i
go func() {
defer wg.Done()
f.mgr.Close()
sessionsAfter[i] = len(f.mgr.Sessions())
select {
case <-s1.Done():
doneClosed[i] = true
default:
doneClosed[i] = false
}
}()
}
wg.Wait()
for i := 0; i < callers; i++ {
if sessionsAfter[i] != 0 {
t.Errorf("caller %d: Sessions() after Close = %d, want 0", i, sessionsAfter[i])
}
if !doneClosed[i] {
t.Errorf("caller %d: session Done not closed when Close returned", i)
}
}
}
func TestManager_OpenAfterCloseReapsSpawnedPTY(t *testing.T) {
// Regression for Round 3 review: Manager.Open's cleanup path used to
// only call pty.Close() when it lost the race with Manager.Close —
// no Wait(), so on a real unix PTY the killed child stayed as a zombie
// (waitLoop never ran because sess.start() never ran). The fix calls
// pty.Wait() synchronously to reap.
f := newFixture(t)
inSpawn := make(chan struct{})
releaseSpawn := make(chan struct{})
spawnedPTY := make(chan *fakePTY, 1)
f.spawner.make = func(tt *testing.T, req SpawnRequest) (*fakePTY, error) {
close(inSpawn)
<-releaseSpawn
p := newFakePTY(tt, req.Cols, req.Rows)
// Allow Wait() to return immediately once Close fires its waitDone.
spawnedPTY <- p
return p, nil
}
openDone := make(chan error, 1)
go func() {
_, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
openDone <- err
}()
// Wait until Open is parked inside the spawner, then close the manager
// with zero registered sessions. Open will lose the race when it
// reacquires the mu after the spawn returns.
<-inSpawn
f.mgr.Close()
// Let the spawn finish; Open should now hit the closed-manager cleanup
// path: pty.Close + pty.Wait + return ErrManagerClosed.
close(releaseSpawn)
select {
case err := <-openDone:
if !errors.Is(err, ErrManagerClosed) {
t.Fatalf("Open after Close = %v, want ErrManagerClosed", err)
}
case <-time.After(2 * time.Second):
t.Fatal("Open did not return after Manager.Close + spawn release")
}
pty := <-spawnedPTY
if got := pty.waitCount.Load(); got < 1 {
t.Fatalf("pty.Wait() called %d times in cleanup path, want >=1 — child not reaped", got)
}
}
func TestManager_CloseWaitsForSessionFinalize(t *testing.T) {
// Manager.Close used to only wait for s.Close() (which just initiates
// teardown — signals stop, closes the PTY). The waitLoop finalizer
// could still be running after Manager.Close returned, leaving the
// sessions map non-empty briefly. With Round 2 review's fix, each
// goroutine in Manager.Close additionally `<-s.Done()` so the manager
// is fully drained by the time Close returns. We inject a Wait delay
// to make the difference observable: without the fix, the session map
// is still populated when Manager.Close returns and `<-s.Done()` would
// block.
f := newFixture(t)
f.spawner.make = func(tt *testing.T, req SpawnRequest) (*fakePTY, error) {
p := newFakePTY(tt, req.Cols, req.Rows)
p.waitDelay = 150 * time.Millisecond
return p, nil
}
s1, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
if err != nil {
t.Fatalf("Open: %v", err)
}
s2, err := f.mgr.Open(context.Background(), OpenParams{TaskID: "task-1", WorkspaceID: "ws-A"})
if err != nil {
t.Fatalf("Open: %v", err)
}
f.mgr.Close()
// After Close returns: registry empty AND every Done is already closed.
if got := len(f.mgr.Sessions()); got != 0 {
t.Errorf("Sessions after Manager.Close = %d, want 0", got)
}
for _, s := range []*PtySession{s1, s2} {
select {
case <-s.Done():
default:
t.Errorf("session %s Done not closed when Manager.Close returned", s.ID())
}
if _, err := f.mgr.Get(s.ID()); !errors.Is(err, ErrSessionNotFound) {
t.Errorf("session %s still registered after Manager.Close: %v", s.ID(), err)
}
}
}

View File

@@ -0,0 +1,35 @@
package terminal
import (
"io"
"time"
)
// PTY abstracts the platform PTY + child process so tests can swap in a
// fake without forking a real shell. Read returns child stdout/stderr;
// Write delivers stdin; Resize updates the window; Wait blocks for the
// child to exit and returns its exit code; Close terminates the child
// and releases the master fd.
type PTY interface {
io.ReadWriter
Resize(cols, rows uint16) error
Wait() (exitCode int, err error)
Close() error
}
// SpawnRequest is the input to Spawner.Start.
type SpawnRequest struct {
Shell string
Args []string
Cwd string
Env []string
Cols uint16
Rows uint16
Started time.Time
}
// Spawner creates new PTYs. Production uses realSpawner; tests inject a
// channel-backed fake (see fakePTY in manager_test.go).
type Spawner interface {
Start(SpawnRequest) (PTY, error)
}

View File

@@ -0,0 +1,275 @@
package terminal
import (
"errors"
"io"
"log/slog"
"sync"
"time"
)
// ExitInfo describes how a session terminated.
type ExitInfo struct {
ExitCode int
Reason string
}
// PtySession is a single live PTY + child shell. Methods are safe for
// concurrent use; readers consume from Output() and ExitC() until
// Output() is closed, which always follows an ExitC() send.
type PtySession struct {
id string
taskID string
workspaceID string
issueID string
workDir string
userID string
shellPath string
mu sync.Mutex
cols, rows uint16
pty PTY
output chan []byte
exit chan ExitInfo
done chan struct{}
stop chan struct{}
stopOnce sync.Once
closing bool
closeReason string
// wg tracks readLoop and idleLoop. waitLoop is the finalizer: it
// waits on wg before closing output/done so we never close the
// output channel while readLoop is mid-send.
wg sync.WaitGroup
now func() time.Time
idleTimeout time.Duration
startedAt time.Time
lastIO time.Time
logger *slog.Logger
onClose func(string)
}
// ID returns the session identifier.
func (s *PtySession) ID() string { return s.id }
// TaskID returns the task this session is bound to.
func (s *PtySession) TaskID() string { return s.taskID }
// WorkspaceID returns the workspace this session belongs to.
func (s *PtySession) WorkspaceID() string { return s.workspaceID }
// IssueID returns the issue this session was opened from, if any.
func (s *PtySession) IssueID() string { return s.issueID }
// WorkDir returns the cwd of the child shell.
func (s *PtySession) WorkDir() string { return s.workDir }
// UserID returns the human user who opened the session.
func (s *PtySession) UserID() string { return s.userID }
// Shell returns the shell binary path that was spawned.
func (s *PtySession) Shell() string { return s.shellPath }
// StartedAt returns the wall-clock time the session was spawned.
func (s *PtySession) StartedAt() time.Time { return s.startedAt }
// LastIO returns the most recent time data flowed in either direction.
func (s *PtySession) LastIO() time.Time {
s.mu.Lock()
defer s.mu.Unlock()
return s.lastIO
}
// Output yields PTY output chunks as they arrive. The channel closes
// after the child exits and a value has been delivered on ExitC().
func (s *PtySession) Output() <-chan []byte { return s.output }
// ExitC fires once when the child exits. After that, Output() closes.
func (s *PtySession) ExitC() <-chan ExitInfo { return s.exit }
// Done returns a channel closed when the session is fully torn down
// (all goroutines exited, registry deregistered).
func (s *PtySession) Done() <-chan struct{} { return s.done }
// Write forwards bytes to the PTY stdin. Returns the byte count actually
// written. Updates LastIO so idle detection sees the activity.
func (s *PtySession) Write(p []byte) (int, error) {
s.mu.Lock()
if s.closing {
s.mu.Unlock()
return 0, ErrSessionNotFound
}
pty := s.pty
s.lastIO = s.now()
s.mu.Unlock()
return pty.Write(p)
}
// Resize updates the PTY window size.
func (s *PtySession) Resize(cols, rows uint16) error {
cols, rows = normalizeSize(cols, rows)
s.mu.Lock()
if s.closing {
s.mu.Unlock()
return ErrSessionNotFound
}
s.cols = cols
s.rows = rows
pty := s.pty
s.lastIO = s.now()
s.mu.Unlock()
return pty.Resize(cols, rows)
}
// Size returns the current cols, rows of the PTY.
func (s *PtySession) Size() (uint16, uint16) {
s.mu.Lock()
defer s.mu.Unlock()
return s.cols, s.rows
}
// Close tears down the session. Subsequent calls are no-ops. The
// reason is recorded for audit logging and the terminal.exit payload.
//
// Close only initiates teardown — signals stop, closes the PTY, returns.
// waitLoop is the actual finalizer: it waits for readLoop + idleLoop
// to exit (via wg) before closing output/done. That ordering is what
// makes "Close while output buffer is full" safe — readLoop's blocked
// send unblocks on <-stop, and only then does the output channel close.
func (s *PtySession) Close(reason string) {
s.mu.Lock()
if s.closing {
s.mu.Unlock()
return
}
s.closing = true
s.closeReason = reason
pty := s.pty
s.mu.Unlock()
s.stopOnce.Do(func() { close(s.stop) })
if pty != nil {
// pty.Close on the unix spawner runs SIGHUP → grace → SIGKILL.
// It's idempotent (sync.Once), so the second call from waitLoop's
// finalizer is a no-op.
_ = pty.Close()
}
}
// start kicks off the reader, exit-watch, and (optional) idle
// goroutines. Manager.Open is the only caller. wg.Add runs
// synchronously before waitLoop is spawned so wg.Wait sees the
// correct count even if Close fires immediately.
func (s *PtySession) start() {
s.wg.Add(1)
go s.readLoop()
if s.idleTimeout > 0 {
s.wg.Add(1)
go s.idleLoop()
}
go s.waitLoop()
}
func (s *PtySession) readLoop() {
defer s.wg.Done()
buf := make([]byte, 4096)
for {
n, err := s.pty.Read(buf)
if n > 0 {
chunk := make([]byte, n)
copy(chunk, buf[:n])
s.mu.Lock()
s.lastIO = s.now()
s.mu.Unlock()
select {
case s.output <- chunk:
case <-s.stop:
return
}
}
if err != nil {
if !errors.Is(err, io.EOF) && err != io.ErrClosedPipe {
s.logger.Debug("pty read error", "err", err)
}
return
}
}
}
func (s *PtySession) waitLoop() {
code, waitErr := s.pty.Wait()
s.mu.Lock()
reason := s.closeReason
if reason == "" {
if waitErr != nil {
reason = "wait_error"
} else {
reason = "exited"
}
s.closeReason = reason
}
s.closing = true
s.mu.Unlock()
// Ensure the PTY fd is closed so readLoop's pty.Read returns EOF.
// pty.Close is idempotent (sync.Once on the unix spawner).
_ = s.pty.Close()
// Signal stop so idleLoop and any blocked send in readLoop exit.
s.stopOnce.Do(func() { close(s.stop) })
// Wait for readLoop + idleLoop before closing output/done. This is
// the invariant that prevents "send on closed channel" panics when
// output is full: readLoop is either past its send or unblocked via
// <-stop, but never racing with close(s.output).
s.wg.Wait()
// Finalize order is load-bearing: external waiters use `<-Done()` as
// a signal that the session is fully torn down AND deregistered from
// the manager. The sequence must be:
// ExitC → close(output) → onClose/deregister → close(done)
// so that any consumer doing `<-Done(); manager.Get(id)` after a
// teardown is guaranteed to observe ErrSessionNotFound.
select {
case s.exit <- ExitInfo{ExitCode: code, Reason: reason}:
default:
}
close(s.output)
if s.onClose != nil {
s.onClose(s.id)
}
close(s.done)
}
func (s *PtySession) idleLoop() {
defer s.wg.Done()
// Sample at IdleTimeout/4 so reaction time is bounded but ticks
// stay cheap with many sessions. Manager.CheckIdle catches anything
// this loop misses (e.g. when daemon's outer GC tick is coarser).
interval := s.idleTimeout / 4
if interval < time.Second {
interval = time.Second
}
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-s.stop:
return
case <-t.C:
if s.now().Sub(s.LastIO()) >= s.idleTimeout {
// Close calls pty.Close + waits for wg in waitLoop. If
// we ran it inline, waitLoop's wg.Wait would block on
// this goroutine, which can't exit until Close returns
// — deadlock. Spawning lets idleLoop return and
// decrement wg.
go s.Close("idle_timeout")
return
}
}
}
}

View File

@@ -0,0 +1,102 @@
//go:build !windows
package terminal
import (
"fmt"
"os"
"os/exec"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/creack/pty"
)
// closeGracePeriod is the window between SIGHUP and SIGKILL during a
// Close. Long enough for interactive shells to run trap handlers and
// flush state; short enough that closing a tab feels instant.
const closeGracePeriod = 250 * time.Millisecond
// realSpawner forks the shell on a PTY using creack/pty. Linux/macOS
// only; Windows reaches the stub in spawner_windows.go and returns
// ErrUnsupportedOS.
type realSpawner struct{}
func (realSpawner) Start(req SpawnRequest) (PTY, error) {
cmd := exec.Command(req.Shell, req.Args...)
cmd.Dir = req.Cwd
// Inherit the daemon's PATH so users get whatever CLIs are installed
// in the daemon's environment (claude, codex, multica, etc.); merge
// in the per-session vars built by buildEnv.
env := os.Environ()
env = append(env, req.Env...)
cmd.Env = env
size := &pty.Winsize{Cols: req.Cols, Rows: req.Rows}
f, err := pty.StartWithSize(cmd, size)
if err != nil {
return nil, fmt.Errorf("pty.StartWithSize: %w", err)
}
return &unixPTY{cmd: cmd, file: f}, nil
}
type unixPTY struct {
cmd *exec.Cmd
file *os.File
exited atomic.Bool
closeOnce sync.Once
closeErr error
}
func (p *unixPTY) Read(b []byte) (int, error) { return p.file.Read(b) }
func (p *unixPTY) Write(b []byte) (int, error) { return p.file.Write(b) }
func (p *unixPTY) Resize(cols, rows uint16) error {
return pty.Setsize(p.file, &pty.Winsize{Cols: cols, Rows: rows})
}
func (p *unixPTY) Wait() (int, error) {
err := p.cmd.Wait()
p.exited.Store(true)
if p.cmd.ProcessState != nil {
return p.cmd.ProcessState.ExitCode(), err
}
return -1, err
}
// Close terminates the child shell and releases the PTY master fd.
// Closing a tab is a hangup, not an interrupt — so the signal path is
// SIGHUP → brief grace → SIGKILL → file.Close, in that order:
//
// - SIGHUP gives interactive shells a chance to run trap handlers,
// write history, etc. before the fd disappears.
// - The grace window is bounded; anything slower than that is stuck.
// - SIGKILL is the cliff for shells that ignore HUP.
// - file.Close releases the master fd last so the slave side keeps
// working during cleanup.
//
// Signals are sent to the negated pid so they hit the whole process
// group. creack/pty starts the child as a session leader (Setsid), so
// pid == pgid and any descendants the user spawned in the shell are
// caught by the same kill.
//
// If the child already exited naturally (Wait returned), all signal
// work is skipped — we only close the fd. That avoids a pointless
// 250ms sleep in the natural-exit teardown path.
func (p *unixPTY) Close() error {
p.closeOnce.Do(func() {
if p.cmd.Process != nil && !p.exited.Load() {
pid := p.cmd.Process.Pid
_ = syscall.Kill(-pid, syscall.SIGHUP)
time.Sleep(closeGracePeriod)
if !p.exited.Load() {
_ = syscall.Kill(-pid, syscall.SIGKILL)
}
}
p.closeErr = p.file.Close()
})
return p.closeErr
}

View File

@@ -0,0 +1,9 @@
//go:build windows
package terminal
// realSpawner on Windows always refuses — ConPty support is RFC P1 and
// the Desktop button + CLI both surface a clear error from this layer.
type realSpawner struct{}
func (realSpawner) Start(SpawnRequest) (PTY, error) { return nil, ErrUnsupportedOS }

View File

@@ -168,3 +168,90 @@ type DaemonHeartbeatPendingLocalSkillImport struct {
ID string `json:"id"`
SkillKey string `json:"skill_key"`
}
// Terminal WS message types. These flow over the existing daemonws hub
// between client (web/desktop/CLI) and daemon. Bytes payloads are base64
// encoded so they can travel as JSON text frames without binary framing.
const (
// TerminalOpen — client → daemon: request a new PTY session bound to a task workdir.
MessageTypeTerminalOpen = "terminal.open"
// TerminalOpened — daemon → client: ack carrying the session_id and resolved workdir.
MessageTypeTerminalOpened = "terminal.opened"
// TerminalData — bidirectional: PTY stdin (client→daemon) / stdout+stderr (daemon→client).
MessageTypeTerminalData = "terminal.data"
// TerminalResize — client → daemon: window-size change.
MessageTypeTerminalResize = "terminal.resize"
// TerminalClose — bidirectional: explicit teardown request / ack.
MessageTypeTerminalClose = "terminal.close"
// TerminalExit — daemon → client: child process exited; carries exit code and optional reason.
MessageTypeTerminalExit = "terminal.exit"
// TerminalError — daemon → client: open/resize/etc. failed; carries human-readable code+message.
MessageTypeTerminalError = "terminal.error"
)
// TerminalOpenPayload requests a PTY session bound to the given task's
// workdir. WorkspaceID is the workspace the caller is acting in; the daemon
// must reject if it does not match the task's workspace.
type TerminalOpenPayload struct {
RequestID string `json:"request_id"`
TaskID string `json:"task_id"`
WorkspaceID string `json:"workspace_id"`
UserID string `json:"user_id,omitempty"`
Cols uint16 `json:"cols"`
Rows uint16 `json:"rows"`
}
// TerminalOpenedPayload echoes the request_id and carries the session_id the
// client must include on subsequent data/resize/close frames.
type TerminalOpenedPayload struct {
RequestID string `json:"request_id"`
SessionID string `json:"session_id"`
WorkDir string `json:"work_dir"`
Shell string `json:"shell"`
}
// TerminalDataPayload carries raw PTY bytes in base64.
type TerminalDataPayload struct {
SessionID string `json:"session_id"`
DataB64 string `json:"data_b64"`
}
// TerminalResizePayload updates the PTY window size.
type TerminalResizePayload struct {
SessionID string `json:"session_id"`
Cols uint16 `json:"cols"`
Rows uint16 `json:"rows"`
}
// TerminalClosePayload requests teardown. Reason is informational.
type TerminalClosePayload struct {
SessionID string `json:"session_id"`
Reason string `json:"reason,omitempty"`
}
// TerminalExitPayload signals the child process exited.
type TerminalExitPayload struct {
SessionID string `json:"session_id"`
ExitCode int `json:"exit_code"`
Reason string `json:"reason,omitempty"`
}
// Terminal error codes returned in TerminalErrorPayload.Code.
const (
TerminalErrorCodeWorkspaceMismatch = "workspace_mismatch"
TerminalErrorCodeTaskNotFound = "task_not_found"
TerminalErrorCodeSessionNotFound = "session_not_found"
TerminalErrorCodeUnsupportedOS = "unsupported_os"
TerminalErrorCodeSpawnFailed = "spawn_failed"
TerminalErrorCodeInternal = "internal"
)
// TerminalErrorPayload reports a failure. RequestID is set when the error
// is a response to a specific open request; SessionID is set when it
// references an already-established session.
type TerminalErrorPayload struct {
RequestID string `json:"request_id,omitempty"`
SessionID string `json:"session_id,omitempty"`
Code string `json:"code"`
Message string `json:"message"`
}