From 5443cdddbf955e8ba847027458cb3fd016e00bf1 Mon Sep 17 00:00:00 2001 From: highperfocused Date: Thu, 12 Mar 2026 11:50:15 +0100 Subject: [PATCH 1/8] Add HTTP gateway with streaming chat and multi-client conversation mapping --- .env.example | 52 +++-- .gitignore | 3 + Dockerfile | 6 + README.md | 130 +++++++++++ compose.yaml | 19 +- package.json | 2 + src/agent-session-factory.ts | 158 +++++++++++++ src/config.ts | 129 +++++++++++ src/conversation-manager.ts | 410 +++++++++++++++++++++++++++++++++ src/gateway/events.ts | 119 ++++++++++ src/gateway/server.ts | 432 +++++++++++++++++++++++++++++++++++ src/gateway/web-ui.ts | 296 ++++++++++++++++++++++++ src/index.ts | 335 +++------------------------ src/single-shot.ts | 54 +++++ 14 files changed, 1827 insertions(+), 318 deletions(-) create mode 100644 README.md create mode 100644 src/agent-session-factory.ts create mode 100644 src/config.ts create mode 100644 src/conversation-manager.ts create mode 100644 src/gateway/events.ts create mode 100644 src/gateway/server.ts create mode 100644 src/gateway/web-ui.ts create mode 100644 src/single-shot.ts diff --git a/.env.example b/.env.example index 333699c..ff0c7d7 100644 --- a/.env.example +++ b/.env.example @@ -1,4 +1,19 @@ -# ── Ollama (local / self-hosted) ──────────────────────────────────────────────── +# ── Run mode ──────────────────────────────────────────────────────────────── +# gateway (default) starts HTTP API + streaming endpoint + simple web UI +# single runs one-shot mode (PROMPT/stdin) and exits +RUN_MODE=gateway + +# ── Gateway server ─────────────────────────────────────────────────────────── +GATEWAY_HOST=0.0.0.0 +GATEWAY_PORT=8787 +# Set to * or a specific origin for browser apps hosted elsewhere +# GATEWAY_CORS_ORIGIN=* +# Optional bearer token required on every API request +# GATEWAY_AUTH_TOKEN=replace-me +# Disable built-in web UI at / +GATEWAY_ENABLE_WEB_UI=true + +# ── Ollama (local / self-hosted) ──────────────────────────────────────────── # To use a local Ollama instance, override PROVIDER and MODEL: # PROVIDER=ollama # MODEL=llama3.2 # or qwen2.5-coder, mistral, deepseek-r1:14b … @@ -12,7 +27,7 @@ # Locally (no Docker): change to http://localhost:11434/v1 # # OLLAMA_BASE_URL=http://host.docker.internal:11434/v1 -# OLLAMA_CONTEXT_WINDOW=32768 # tokens passed as num_ctx +# OLLAMA_CONTEXT_WINDOW=32768 # OLLAMA_MAX_TOKENS=8192 # ── Model ──────────────────────────────────────────────────────────────────── @@ -22,7 +37,7 @@ PROVIDER=anthropic MODEL=claude-sonnet-4-20250514 -# ── Authentication ──────────────────────────────────────────────────────────── +# ── Authentication ─────────────────────────────────────────────────────────── # Generic API key for the selected PROVIDER (injected at runtime, not stored). API_KEY=sk-ant-... @@ -36,36 +51,37 @@ API_KEY=sk-ant-... # XAI_API_KEY=... # OPENROUTER_API_KEY=... -# ── Prompt ──────────────────────────────────────────────────────────────────── -# The user message to send. Alternatively pipe via stdin. -PROMPT=List all .ts files in the current directory. +# ── Prompt (single mode only) ─────────────────────────────────────────────── +# Used only when RUN_MODE=single. +# PROMPT=List all .ts files in the current directory. -# ── System prompt ───────────────────────────────────────────────────────────── +# ── System prompt ──────────────────────────────────────────────────────────── # Completely replaces the default system prompt (optional). # SYSTEM_PROMPT=You are a helpful assistant. Work inside /app only. # Appended to the (possibly overridden) system prompt (optional). # APPEND_SYSTEM_PROMPT=Always answer in English. -# ── Thinking ────────────────────────────────────────────────────────────────── -# off | minimal | low | medium | high | xhigh (default: off) +# ── Thinking ───────────────────────────────────────────────────────────────── +# off | minimal | low | medium | high | xhigh THINKING_LEVEL=off -# ── Tools ───────────────────────────────────────────────────────────────────── -# all → read, bash, edit, write (default) -# readonly → read only +# ── Tools ──────────────────────────────────────────────────────────────────── +# all → read, bash, edit, write (default) +# readonly → read, grep, find, ls # none → no built-in tools -# Or a comma-separated list: read,bash +# Or a comma-separated subset of coding tools: read,bash,edit,write TOOLS=all -# ── Working directory ───────────────────────────────────────────────────────── +# ── Working directory ──────────────────────────────────────────────────────── # Directory the agent reads/writes. Maps to the Docker volume mount point. CWD=/app -# ── Session persistence ─────────────────────────────────────────────────────── -# true → persist session files under CWD; false/unset → in-memory (default) -SESSION_PERSIST=false +# ── Session persistence ────────────────────────────────────────────────────── +# true → persist agent sessions and gateway conversation index under CWD/.gateway +# false → in-memory sessions only +SESSION_PERSIST=true -# ── Verbose tool logging ────────────────────────────────────────────────────── +# ── Verbose tool logging ───────────────────────────────────────────────────── # true → log tool start/end events to stderr VERBOSE_TOOLS=false diff --git a/.gitignore b/.gitignore index 1e56f98..acee971 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ node_modules/ +dist/ data/** +.pi/ +.DS_Store .env \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index b4c8e88..250fa56 100644 --- a/Dockerfile +++ b/Dockerfile @@ -34,5 +34,11 @@ VOLUME ["/app"] ENV NODE_ENV=production # Default working directory for the agent (overridable via CWD env var) ENV CWD=/app +# Gateway defaults +ENV RUN_MODE=gateway +ENV GATEWAY_HOST=0.0.0.0 +ENV GATEWAY_PORT=8787 + +EXPOSE 8787 CMD ["node", "dist/index.js"] \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..ee216d2 --- /dev/null +++ b/README.md @@ -0,0 +1,130 @@ +# MoA Agent Gateway + +This project now runs as a **chat gateway** in front of `@mariozechner/pi-coding-agent`. + +It supports: +- long-lived agent conversations +- HTTP JSON API +- streaming responses via Server-Sent Events (SSE) +- a built-in browser chat UI at `/` + +You can also keep one-shot mode (`RUN_MODE=single`) for script usage. + +--- + +## Run + +```bash +cp .env.example .env +# set provider/model/key in .env + +docker compose up --build +``` + +Gateway default URL: +- `http://localhost:8787` + +Health check: + +```bash +curl http://localhost:8787/health +``` + +One-shot mode (legacy behavior): + +```bash +RUN_MODE=single PROMPT="List files" npm start +``` + +--- + +## API + +### Create/list conversations + +```bash +curl -X POST http://localhost:8787/v1/conversations \ + -H 'content-type: application/json' \ + -d '{"conversationId":"web:user-42:chat-a"}' + +curl http://localhost:8787/v1/conversations +``` + +### Chat (non-streaming) + +```bash +curl -X POST http://localhost:8787/v1/chat \ + -H 'content-type: application/json' \ + -d '{"conversationId":"web:user-42:chat-a","message":"Summarize this repo"}' +``` + +### Chat (streaming SSE) + +```bash +curl -N -X POST http://localhost:8787/v1/chat/stream \ + -H 'content-type: application/json' \ + -d '{"conversationId":"web:user-42:chat-a","message":"List TypeScript files"}' +``` + +### Adapter-friendly endpoint (Slack/Matrix/etc) + +Instead of constructing `conversationId` in your adapter, you can call: + +- `POST /v1/adapters/chat` +- `POST /v1/adapters/chat/stream` + +Body fields: + +```json +{ + "source": "slack", + "workspaceId": "T123", + "channelId": "C456", + "threadId": "1712233.991", + "userId": "U789", + "message": "Summarize the thread" +} +``` + +The gateway derives a stable conversation id: +`source:workspaceId:channelId:threadId` + +SSE events include: +- `assistant_text_delta` +- `assistant_thinking_delta` +- `tool_start`, `tool_update`, `tool_end` +- `agent_start`, `agent_end` +- `done` (final response payload) +- `error` + +--- + +## Built-in Web UI + +Open: + +- `http://localhost:8787/` + +The UI stores `conversationId` in local storage and reuses it to keep context. + +--- + +## Auth and CORS + +Optional env vars: +- `GATEWAY_AUTH_TOKEN`: require `Authorization: Bearer ` +- `GATEWAY_CORS_ORIGIN`: e.g. `*` or `https://my-ui.example` + +--- + +## Multi-client strategy (Slack / Matrix / custom UI) + +The gateway is transport-agnostic. For each upstream client, map its thread identity to a stable `conversationId`, for example: + +- Web: `web::` +- Slack: `slack:::` +- Matrix: `matrix::` + +Then post messages to `/v1/chat` or `/v1/chat/stream` with that `conversationId`. + +This keeps one agent session per thread across transports. diff --git a/compose.yaml b/compose.yaml index 4edcddc..f43540f 100644 --- a/compose.yaml +++ b/compose.yaml @@ -2,7 +2,21 @@ services: agent: # image: agent:latest build: . + ports: + - "${GATEWAY_PORT:-8787}:${GATEWAY_PORT:-8787}" environment: + # ── Run mode ─────────────────────────────────────────────────────────── + # gateway (default) starts the HTTP gateway + web UI + # single runs one-shot prompt mode (PROMPT/stdin) and exits + - RUN_MODE=${RUN_MODE:-gateway} + + # ── Gateway server ───────────────────────────────────────────────────── + - GATEWAY_HOST=0.0.0.0 + - GATEWAY_PORT=${GATEWAY_PORT:-8787} + - GATEWAY_CORS_ORIGIN=${GATEWAY_CORS_ORIGIN:-} + - GATEWAY_AUTH_TOKEN=${GATEWAY_AUTH_TOKEN:-} + - GATEWAY_ENABLE_WEB_UI=${GATEWAY_ENABLE_WEB_UI:-true} + # ── Model ────────────────────────────────────────────────────────────── # Both PROVIDER and MODEL must be set together to select a specific model. # If omitted, the agent uses the first available / settings model. @@ -32,7 +46,8 @@ services: - OLLAMA_CONTEXT_WINDOW=${OLLAMA_CONTEXT_WINDOW:-32768} - OLLAMA_MAX_TOKENS=${OLLAMA_MAX_TOKENS:-8192} - # ── Prompt ───────────────────────────────────────────────────────────── + # ── Prompt (single mode only) ───────────────────────────────────────── + # Used only when RUN_MODE=single. - PROMPT=${PROMPT:-} # ── System prompt ────────────────────────────────────────────────────── @@ -51,7 +66,7 @@ services: - CWD=/app # ── Session persistence ──────────────────────────────────────────────── - - SESSION_PERSIST=${SESSION_PERSIST:-false} + - SESSION_PERSIST=${SESSION_PERSIST:-true} # ── Verbose tool logging ─────────────────────────────────────────────── - VERBOSE_TOOLS=${VERBOSE_TOOLS:-false} diff --git a/package.json b/package.json index 0e9ff33..bc4bf2c 100644 --- a/package.json +++ b/package.json @@ -7,6 +7,8 @@ "scripts": { "build": "tsc", "start": "node dist/index.js", + "start:single": "RUN_MODE=single node dist/index.js", + "start:gateway": "RUN_MODE=gateway node dist/index.js", "dev": "tsx src/index.ts" }, "dependencies": { diff --git a/src/agent-session-factory.ts b/src/agent-session-factory.ts new file mode 100644 index 0000000..0623c1d --- /dev/null +++ b/src/agent-session-factory.ts @@ -0,0 +1,158 @@ +import { getModel, type Model } from "@mariozechner/pi-ai"; +import type { AgentTool, ThinkingLevel } from "@mariozechner/pi-agent-core"; +import { + AuthStorage, + createAgentSession, + createBashTool, + createCodingTools, + createEditTool, + createReadOnlyTools, + createReadTool, + createWriteTool, + DefaultResourceLoader, + ModelRegistry, + type AgentSession, + type SessionManager, +} from "@mariozechner/pi-coding-agent"; +import type { AgentConfig } from "./config.js"; + +function resolveModel(config: AgentConfig): Model | undefined { + const providerName = config.providerName; + const modelId = config.modelId; + + if (providerName?.toLowerCase() === "ollama") { + if (!modelId) { + throw new Error("PROVIDER=ollama requires MODEL to be set."); + } + + return { + id: modelId, + name: modelId, + api: "openai-completions", + provider: "ollama", + baseUrl: config.ollamaBaseUrl, + reasoning: false, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: config.ollamaContextWindow, + maxTokens: config.ollamaMaxTokens, + compat: { + supportsStore: false, + supportsDeveloperRole: false, + supportsReasoningEffort: false, + supportsStrictMode: false, + maxTokensField: "max_tokens", + }, + } satisfies Model<"openai-completions">; + } + + if (providerName && modelId) { + const model = (getModel as (provider: string, model: string) => Model | undefined)( + providerName, + modelId, + ); + if (!model) { + throw new Error( + `Model "${providerName}/${modelId}" not found. Run \"pi --list-models\" to inspect available models.`, + ); + } + return model; + } + + if (providerName || modelId) { + console.warn( + "[agent] PROVIDER and MODEL must be set together. Falling back to default model resolution.", + ); + } + + return undefined; +} + +function resolveTools(cwd: string, toolsEnv: string): AgentTool[] { + const normalized = toolsEnv.toLowerCase().trim(); + + if (normalized === "all") { + return createCodingTools(cwd); + } + + if (normalized === "readonly" || normalized === "read-only") { + return createReadOnlyTools(cwd); + } + + if (normalized === "none") { + return []; + } + + const customFactories: Record AgentTool> = { + read: createReadTool, + bash: createBashTool, + edit: createEditTool, + write: createWriteTool, + }; + + const tools = normalized + .split(",") + .map((name) => name.trim()) + .filter(Boolean) + .map((name) => { + const factory = customFactories[name]; + if (!factory) { + console.warn(`[agent] Unknown tool \"${name}\", ignoring.`); + return undefined; + } + return factory(cwd); + }) + .filter((tool): tool is AgentTool => tool !== undefined); + + return tools; +} + +export class AgentSessionFactory { + constructor(private readonly config: AgentConfig) {} + + async createSession(sessionManager: SessionManager): Promise { + const authStorage = AuthStorage.create(); + + if (this.config.providerName && this.config.apiKey) { + authStorage.setRuntimeApiKey(this.config.providerName, this.config.apiKey); + } + + if (this.config.providerName?.toLowerCase() === "ollama" && !this.config.apiKey) { + authStorage.setRuntimeApiKey("ollama", "ollama"); + } + + const modelRegistry = new ModelRegistry(authStorage); + const model = resolveModel(this.config); + const tools = resolveTools(this.config.cwd, this.config.toolsEnv); + + let resourceLoader: DefaultResourceLoader | undefined; + if (this.config.systemPrompt !== undefined || this.config.appendSystemPrompt !== undefined) { + resourceLoader = new DefaultResourceLoader({ + cwd: this.config.cwd, + systemPromptOverride: (base: string | undefined) => { + let result = this.config.systemPrompt !== undefined ? this.config.systemPrompt : base; + if (this.config.appendSystemPrompt) { + result = result + ? `${result}\n\n${this.config.appendSystemPrompt}` + : this.config.appendSystemPrompt; + } + return result; + }, + }); + await resourceLoader.reload(); + } + + const { session } = await createAgentSession({ + ...(model ? { model } : {}), + ...(resourceLoader ? { resourceLoader } : {}), + thinkingLevel: this.config.thinkingLevel as ThinkingLevel, + tools, + sessionManager, + authStorage, + modelRegistry, + cwd: this.config.cwd, + }); + + return session; + } +} diff --git a/src/config.ts b/src/config.ts new file mode 100644 index 0000000..a674c0f --- /dev/null +++ b/src/config.ts @@ -0,0 +1,129 @@ +import path from "node:path"; + +export type ThinkingLevel = "off" | "minimal" | "low" | "medium" | "high" | "xhigh"; + +export interface AgentConfig { + providerName?: string; + modelId?: string; + apiKey?: string; + ollamaBaseUrl: string; + ollamaContextWindow: number; + ollamaMaxTokens: number; + thinkingLevel: ThinkingLevel; + toolsEnv: string; + systemPrompt?: string; + appendSystemPrompt?: string; + promptText?: string; + cwd: string; + sessionPersist: boolean; + verboseTools: boolean; +} + +export interface GatewayConfig { + runMode: "gateway" | "single"; + host: string; + port: number; + corsOrigin?: string; + authToken?: string; + enableWebUi: boolean; +} + +export interface AppConfig { + agent: AgentConfig; + gateway: GatewayConfig; +} + +function env(name: string): string | undefined { + const value = process.env[name]; + if (value === undefined || value === "") { + return undefined; + } + return value; +} + +function parseIntEnv(name: string, fallback: number): number { + const value = env(name); + if (!value) { + return fallback; + } + const parsed = Number.parseInt(value, 10); + if (Number.isNaN(parsed)) { + throw new Error(`Environment variable ${name} must be a valid integer.`); + } + return parsed; +} + +function parseBoolEnv(name: string, fallback: boolean): boolean { + const value = env(name); + if (!value) { + return fallback; + } + switch (value.toLowerCase()) { + case "1": + case "true": + case "yes": + case "on": + return true; + case "0": + case "false": + case "no": + case "off": + return false; + default: + throw new Error(`Environment variable ${name} must be a boolean (true/false).`); + } +} + +function resolveThinkingLevel(value: string | undefined): ThinkingLevel { + const normalized = (value ?? "off").toLowerCase(); + const valid: ThinkingLevel[] = ["off", "minimal", "low", "medium", "high", "xhigh"]; + if (!valid.includes(normalized as ThinkingLevel)) { + throw new Error( + `Invalid THINKING_LEVEL "${value}". Expected one of: ${valid.join(", ")}.`, + ); + } + return normalized as ThinkingLevel; +} + +export function loadConfig(): AppConfig { + const cwd = path.resolve(env("CWD") ?? "/app"); + + const runModeRaw = (env("RUN_MODE") ?? "gateway").toLowerCase(); + const runMode = runModeRaw === "single" ? "single" : "gateway"; + + return { + agent: { + providerName: env("PROVIDER"), + modelId: env("MODEL"), + apiKey: env("API_KEY"), + ollamaBaseUrl: env("OLLAMA_BASE_URL") ?? "http://host.docker.internal:11434/v1", + ollamaContextWindow: parseIntEnv("OLLAMA_CONTEXT_WINDOW", 32768), + ollamaMaxTokens: parseIntEnv("OLLAMA_MAX_TOKENS", 8192), + thinkingLevel: resolveThinkingLevel(env("THINKING_LEVEL")), + toolsEnv: env("TOOLS") ?? "all", + systemPrompt: env("SYSTEM_PROMPT"), + appendSystemPrompt: env("APPEND_SYSTEM_PROMPT"), + promptText: env("PROMPT"), + cwd, + sessionPersist: parseBoolEnv("SESSION_PERSIST", false), + verboseTools: parseBoolEnv("VERBOSE_TOOLS", false), + }, + gateway: { + runMode, + host: env("GATEWAY_HOST") ?? "0.0.0.0", + port: parseIntEnv("GATEWAY_PORT", 8787), + corsOrigin: env("GATEWAY_CORS_ORIGIN"), + authToken: env("GATEWAY_AUTH_TOKEN"), + enableWebUi: parseBoolEnv("GATEWAY_ENABLE_WEB_UI", true), + }, + }; +} + +export function readStdin(): Promise { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + process.stdin.on("data", (chunk) => chunks.push(chunk)); + process.stdin.on("end", () => resolve(Buffer.concat(chunks).toString("utf8").trim())); + process.stdin.on("error", reject); + }); +} diff --git a/src/conversation-manager.ts b/src/conversation-manager.ts new file mode 100644 index 0000000..1ad0ba2 --- /dev/null +++ b/src/conversation-manager.ts @@ -0,0 +1,410 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import { randomUUID } from "node:crypto"; +import { + SessionManager, + type AgentSession, + type AgentSessionEvent, +} from "@mariozechner/pi-coding-agent"; +import type { ImageContent } from "@mariozechner/pi-ai"; +import type { AgentConfig } from "./config.js"; +import { AgentSessionFactory } from "./agent-session-factory.js"; + +export interface ChatRequest { + conversationId?: string; + message: string; + images?: ImageContent[]; + streamingBehavior?: "steer" | "followUp"; +} + +export interface ChatExecutionOptions { + includeEvents?: boolean; + onEvent?: (event: AgentSessionEvent) => void; +} + +export interface ChatResult { + conversationId: string; + sessionId: string; + sessionFile?: string; + assistantText: string; + eventCount: number; + events?: AgentSessionEvent[]; +} + +export interface ConversationSummary { + id: string; + createdAt: string; + updatedAt: string; + sessionId?: string; + sessionFile?: string; + isLoaded: boolean; + isStreaming: boolean; +} + +interface PersistedConversation { + id: string; + createdAt: string; + updatedAt: string; + sessionFile?: string; + sessionId?: string; +} + +interface PersistedConversationIndex { + version: 1; + conversations: PersistedConversation[]; +} + +interface ConversationRecord extends PersistedConversation { + queue: Promise; + session?: AgentSession; + sessionInit?: Promise; +} + +export class ConversationManager { + private readonly sessionFactory: AgentSessionFactory; + private readonly conversations = new Map(); + private initialized = false; + private persistQueue: Promise = Promise.resolve(); + + private readonly gatewayRootDir: string; + private readonly sessionDir: string; + private readonly indexPath: string; + + constructor(private readonly config: AgentConfig) { + this.sessionFactory = new AgentSessionFactory(config); + this.gatewayRootDir = path.join(this.config.cwd, ".gateway"); + this.sessionDir = path.join(this.gatewayRootDir, "sessions"); + this.indexPath = path.join(this.gatewayRootDir, "conversations.json"); + } + + async init(): Promise { + if (this.initialized) { + return; + } + + if (!this.config.sessionPersist) { + this.initialized = true; + return; + } + + try { + const raw = await fs.readFile(this.indexPath, "utf8"); + const parsed = JSON.parse(raw) as PersistedConversationIndex; + if (parsed.version !== 1 || !Array.isArray(parsed.conversations)) { + throw new Error("Unsupported conversation index format."); + } + + for (const item of parsed.conversations) { + if (!item.id || typeof item.id !== "string") { + continue; + } + this.conversations.set(item.id, { + id: item.id, + createdAt: item.createdAt ?? new Date().toISOString(), + updatedAt: item.updatedAt ?? new Date().toISOString(), + sessionFile: item.sessionFile, + sessionId: item.sessionId, + queue: Promise.resolve(), + }); + } + + console.error(`[gateway] Restored ${this.conversations.size} persisted conversation(s).`); + } catch (error) { + const nodeError = error as NodeJS.ErrnoException; + if (nodeError.code !== "ENOENT") { + console.warn(`[gateway] Failed to load conversation index: ${nodeError.message}`); + } + } + + this.initialized = true; + } + + listConversations(): ConversationSummary[] { + return [...this.conversations.values()] + .map((record) => this.toSummary(record)) + .sort((a, b) => b.updatedAt.localeCompare(a.updatedAt)); + } + + getConversation(conversationId: string): ConversationSummary | undefined { + const record = this.conversations.get(conversationId); + return record ? this.toSummary(record) : undefined; + } + + async createConversation(preferredId?: string): Promise { + await this.init(); + const id = this.resolveConversationId(preferredId); + const record = this.getOrCreateRecord(id); + await this.persistIndex(); + return this.toSummary(record); + } + + async deleteConversation(conversationId: string): Promise { + await this.init(); + const record = this.conversations.get(conversationId); + if (!record) { + return false; + } + + if (record.session) { + record.session.dispose(); + record.session = undefined; + } + + if (record.sessionFile && this.config.sessionPersist) { + try { + await fs.unlink(record.sessionFile); + } catch (error) { + const nodeError = error as NodeJS.ErrnoException; + if (nodeError.code !== "ENOENT") { + console.warn(`[gateway] Failed to delete session file ${record.sessionFile}: ${nodeError.message}`); + } + } + } + + this.conversations.delete(conversationId); + await this.persistIndex(); + return true; + } + + async abortConversation(conversationId: string): Promise { + await this.init(); + const record = this.conversations.get(conversationId); + if (!record?.session) { + return; + } + await record.session.abort(); + } + + async chat(request: ChatRequest, options: ChatExecutionOptions = {}): Promise { + await this.init(); + + const message = request.message.trim(); + if (!message) { + throw new Error("message must be a non-empty string."); + } + + const conversationId = this.resolveConversationId(request.conversationId); + const record = this.getOrCreateRecord(conversationId); + + return this.enqueue(record, async () => { + const session = await this.ensureSession(record); + const collectedEvents: AgentSessionEvent[] | undefined = options.includeEvents ? [] : undefined; + const assistantTextChunks: string[] = []; + + const unsubscribe = session.subscribe((event) => { + if (collectedEvents) { + collectedEvents.push(event); + } + options.onEvent?.(event); + + if (event.type === "message_update" && event.assistantMessageEvent.type === "text_delta") { + assistantTextChunks.push(event.assistantMessageEvent.delta); + } + }); + + try { + const streamingBehavior = request.streamingBehavior ?? (session.isStreaming ? "followUp" : undefined); + + await session.prompt(message, { + ...(request.images ? { images: request.images } : {}), + ...(streamingBehavior ? { streamingBehavior } : {}), + }); + } finally { + unsubscribe(); + } + + record.updatedAt = new Date().toISOString(); + record.sessionFile = session.sessionFile; + record.sessionId = session.sessionId; + await this.persistIndex(); + + const assistantText = assistantTextChunks.join("") || this.getLastAssistantText(session); + + return { + conversationId: record.id, + sessionId: session.sessionId, + sessionFile: session.sessionFile, + assistantText, + eventCount: collectedEvents?.length ?? 0, + ...(collectedEvents ? { events: collectedEvents } : {}), + }; + }); + } + + async shutdown(): Promise { + for (const record of this.conversations.values()) { + if (record.session) { + record.session.dispose(); + record.session = undefined; + } + } + await this.persistIndex(); + } + + private toSummary(record: ConversationRecord): ConversationSummary { + return { + id: record.id, + createdAt: record.createdAt, + updatedAt: record.updatedAt, + sessionId: record.session?.sessionId ?? record.sessionId, + sessionFile: record.session?.sessionFile ?? record.sessionFile, + isLoaded: record.session !== undefined, + isStreaming: record.session?.isStreaming ?? false, + }; + } + + private resolveConversationId(preferredId?: string): string { + if (!preferredId) { + return randomUUID(); + } + + const normalized = preferredId.trim(); + if (!normalized) { + return randomUUID(); + } + + if (normalized.length > 200) { + throw new Error("conversationId must be 200 characters or fewer."); + } + + if (/\r|\n/.test(normalized)) { + throw new Error("conversationId must not contain newlines."); + } + + return normalized; + } + + private getOrCreateRecord(conversationId: string): ConversationRecord { + const existing = this.conversations.get(conversationId); + if (existing) { + return existing; + } + + const now = new Date().toISOString(); + const created: ConversationRecord = { + id: conversationId, + createdAt: now, + updatedAt: now, + queue: Promise.resolve(), + }; + + this.conversations.set(conversationId, created); + return created; + } + + private enqueue(record: ConversationRecord, task: () => Promise): Promise { + const run = record.queue.then(task, task); + record.queue = run.then( + () => undefined, + () => undefined, + ); + return run; + } + + private async ensureSession(record: ConversationRecord): Promise { + if (record.session) { + return record.session; + } + + if (!record.sessionInit) { + record.sessionInit = this.createSessionForRecord(record) + .then((session) => { + record.session = session; + return session; + }) + .finally(() => { + record.sessionInit = undefined; + }); + } + + return record.sessionInit; + } + + private async createSessionForRecord(record: ConversationRecord): Promise { + let sessionManager: SessionManager; + + if (!this.config.sessionPersist) { + sessionManager = SessionManager.inMemory(); + } else if (record.sessionFile) { + try { + sessionManager = SessionManager.open(record.sessionFile); + } catch (error) { + console.warn( + `[gateway] Failed to open persisted session ${record.sessionFile}, creating a new session.`, + error, + ); + sessionManager = SessionManager.create(this.config.cwd, this.sessionDir); + } + } else { + sessionManager = SessionManager.create(this.config.cwd, this.sessionDir); + } + + const session = await this.sessionFactory.createSession(sessionManager); + + if (this.config.verboseTools) { + session.subscribe((event) => { + if (event.type === "tool_execution_start") { + process.stderr.write(`\n[${record.id}] [tool:start] ${event.toolName}\n`); + } else if (event.type === "tool_execution_end") { + process.stderr.write(`[${record.id}] [tool:end] ${event.toolName}\n`); + } + }); + } + + record.sessionFile = session.sessionFile; + record.sessionId = session.sessionId; + + await this.persistIndex(); + + return session; + } + + private getLastAssistantText(session: AgentSession): string { + for (let i = session.messages.length - 1; i >= 0; i -= 1) { + const message = session.messages[i]; + if (message.role !== "assistant") { + continue; + } + + return message.content + .filter((contentPart): contentPart is { type: "text"; text: string } => contentPart.type === "text") + .map((contentPart) => contentPart.text) + .join(""); + } + + return ""; + } + + private async persistIndex(): Promise { + if (!this.config.sessionPersist) { + return; + } + + this.persistQueue = this.persistQueue + .catch((error) => { + console.warn("[gateway] Previous persistence operation failed:", error); + }) + .then(async () => { + await fs.mkdir(this.gatewayRootDir, { recursive: true }); + await fs.mkdir(this.sessionDir, { recursive: true }); + + const payload: PersistedConversationIndex = { + version: 1, + conversations: [...this.conversations.values()].map((record) => ({ + id: record.id, + createdAt: record.createdAt, + updatedAt: record.updatedAt, + sessionFile: record.session?.sessionFile ?? record.sessionFile, + sessionId: record.session?.sessionId ?? record.sessionId, + })), + }; + + const serialized = `${JSON.stringify(payload, null, 2)}\n`; + const tempPath = `${this.indexPath}.tmp`; + await fs.writeFile(tempPath, serialized, "utf8"); + await fs.rename(tempPath, this.indexPath); + }); + + await this.persistQueue; + } +} diff --git a/src/gateway/events.ts b/src/gateway/events.ts new file mode 100644 index 0000000..a6e2162 --- /dev/null +++ b/src/gateway/events.ts @@ -0,0 +1,119 @@ +import type { AgentSessionEvent } from "@mariozechner/pi-coding-agent"; + +export interface GatewayEvent { + type: string; + data: Record; +} + +export function toGatewayEvents(event: AgentSessionEvent): GatewayEvent[] { + switch (event.type) { + case "message_update": { + if (event.assistantMessageEvent.type === "text_delta") { + return [ + { + type: "assistant_text_delta", + data: { delta: event.assistantMessageEvent.delta }, + }, + ]; + } + + if (event.assistantMessageEvent.type === "thinking_delta") { + return [ + { + type: "assistant_thinking_delta", + data: { delta: event.assistantMessageEvent.delta }, + }, + ]; + } + + return [ + { + type: "assistant_message_update", + data: { updateType: event.assistantMessageEvent.type }, + }, + ]; + } + + case "tool_execution_start": + return [ + { + type: "tool_start", + data: { + toolName: event.toolName, + toolCallId: event.toolCallId, + }, + }, + ]; + + case "tool_execution_update": + return [ + { + type: "tool_update", + data: { + toolName: event.toolName, + toolCallId: event.toolCallId, + }, + }, + ]; + + case "tool_execution_end": + return [ + { + type: "tool_end", + data: { + toolName: event.toolName, + toolCallId: event.toolCallId, + isError: event.isError, + }, + }, + ]; + + case "agent_start": + return [{ type: "agent_start", data: {} }]; + + case "agent_end": + return [{ type: "agent_end", data: {} }]; + + case "auto_retry_start": + return [ + { + type: "retry_start", + data: { + attempt: event.attempt, + maxAttempts: event.maxAttempts, + delayMs: event.delayMs, + }, + }, + ]; + + case "auto_retry_end": + return [ + { + type: "retry_end", + data: { + success: event.success, + attempt: event.attempt, + finalError: event.finalError, + }, + }, + ]; + + case "auto_compaction_start": + return [{ type: "compaction_start", data: { reason: event.reason } }]; + + case "auto_compaction_end": + return [ + { + type: "compaction_end", + data: { + aborted: event.aborted, + willRetry: event.willRetry, + errorMessage: event.errorMessage, + }, + }, + ]; + + default: + return [{ type: event.type, data: {} }]; + } +} diff --git a/src/gateway/server.ts b/src/gateway/server.ts new file mode 100644 index 0000000..e79daf3 --- /dev/null +++ b/src/gateway/server.ts @@ -0,0 +1,432 @@ +import { createServer, type IncomingMessage, type Server, type ServerResponse } from "node:http"; +import { URL } from "node:url"; +import type { ImageContent } from "@mariozechner/pi-ai"; +import type { GatewayConfig } from "../config.js"; +import type { ChatRequest } from "../conversation-manager.js"; +import { ConversationManager } from "../conversation-manager.js"; +import { toGatewayEvents } from "./events.js"; +import { getWebUiHtml } from "./web-ui.js"; + +interface ChatRequestBody { + conversationId?: unknown; + message?: unknown; + images?: unknown; + streamingBehavior?: unknown; + includeEvents?: unknown; +} + +interface AdapterChatRequestBody extends ChatRequestBody { + source?: unknown; + workspaceId?: unknown; + channelId?: unknown; + threadId?: unknown; + userId?: unknown; +} + +class HttpError extends Error { + constructor( + readonly statusCode: number, + message: string, + ) { + super(message); + } +} + +export class GatewayHttpServer { + private readonly server: Server; + + constructor( + private readonly manager: ConversationManager, + private readonly config: GatewayConfig, + ) { + this.server = createServer((req, res) => { + this.handleRequest(req, res).catch((error) => { + if (error instanceof HttpError) { + this.sendJson(res, error.statusCode, { error: error.message }); + return; + } + + console.error("[gateway] Unhandled request error", error); + if (!res.headersSent) { + this.sendJson(res, 500, { error: "Internal server error" }); + } else { + res.end(); + } + }); + }); + } + + async start(): Promise { + await new Promise((resolve, reject) => { + this.server.once("error", reject); + this.server.listen(this.config.port, this.config.host, () => { + this.server.off("error", reject); + resolve(); + }); + }); + + console.error(`[gateway] Listening on http://${this.config.host}:${this.config.port}`); + } + + async stop(): Promise { + await new Promise((resolve, reject) => { + this.server.close((error) => { + if (error) { + reject(error); + return; + } + resolve(); + }); + }); + } + + private async handleRequest(req: IncomingMessage, res: ServerResponse): Promise { + if (this.handleCors(req, res)) { + return; + } + + if (!this.isAuthorized(req, res)) { + return; + } + + const url = new URL(req.url ?? "/", `http://${req.headers.host ?? "localhost"}`); + const { pathname } = url; + const method = req.method ?? "GET"; + + if (method === "GET" && pathname === "/health") { + this.sendJson(res, 200, { ok: true }); + return; + } + + if (method === "GET" && pathname === "/") { + if (!this.config.enableWebUi) { + this.sendJson(res, 404, { error: "Web UI disabled" }); + return; + } + + this.sendHtml(res, 200, getWebUiHtml()); + return; + } + + if (method === "GET" && pathname === "/v1/conversations") { + this.sendJson(res, 200, { conversations: this.manager.listConversations() }); + return; + } + + if (method === "POST" && pathname === "/v1/conversations") { + const body = (await this.readJsonBody(req)) as { conversationId?: unknown }; + if (body.conversationId !== undefined && typeof body.conversationId !== "string") { + throw new HttpError(400, "conversationId must be a string when provided."); + } + + const created = await this.manager.createConversation(body.conversationId as string | undefined); + this.sendJson(res, 201, created); + return; + } + + const conversationMatch = pathname.match(/^\/v1\/conversations\/([^/]+)$/); + if (method === "GET" && conversationMatch) { + const conversationId = this.decodePathSegment(conversationMatch[1]); + const conversation = this.manager.getConversation(conversationId); + if (!conversation) { + this.sendJson(res, 404, { error: "Conversation not found" }); + return; + } + this.sendJson(res, 200, conversation); + return; + } + + if (method === "DELETE" && conversationMatch) { + const conversationId = this.decodePathSegment(conversationMatch[1]); + const deleted = await this.manager.deleteConversation(conversationId); + this.sendJson(res, deleted ? 200 : 404, { deleted }); + return; + } + + const abortMatch = pathname.match(/^\/v1\/conversations\/([^/]+)\/abort$/); + if (method === "POST" && abortMatch) { + const conversationId = this.decodePathSegment(abortMatch[1]); + await this.manager.abortConversation(conversationId); + this.sendJson(res, 200, { aborted: true }); + return; + } + + if (method === "POST" && pathname === "/v1/adapters/chat") { + const body = (await this.readJsonBody(req)) as AdapterChatRequestBody; + const { chatRequest, adapterKey } = this.parseAdapterChatRequest(body); + const includeEvents = body.includeEvents === true; + + const result = await this.manager.chat(chatRequest, { includeEvents }); + this.sendJson(res, 200, { adapterKey, ...result }); + return; + } + + if (method === "POST" && pathname === "/v1/adapters/chat/stream") { + const body = (await this.readJsonBody(req)) as AdapterChatRequestBody; + const { chatRequest, adapterKey } = this.parseAdapterChatRequest(body); + await this.handleChatStream(req, res, chatRequest, { adapterKey }); + return; + } + + if (method === "POST" && pathname === "/v1/chat") { + const body = (await this.readJsonBody(req)) as ChatRequestBody; + const chatRequest = this.parseChatRequest(body); + const includeEvents = body.includeEvents === true; + + const result = await this.manager.chat(chatRequest, { includeEvents }); + this.sendJson(res, 200, result); + return; + } + + if (method === "POST" && pathname === "/v1/chat/stream") { + const body = (await this.readJsonBody(req)) as ChatRequestBody; + const chatRequest = this.parseChatRequest(body); + await this.handleChatStream(req, res, chatRequest); + return; + } + + this.sendJson(res, 404, { error: "Not found" }); + } + + private async handleChatStream( + req: IncomingMessage, + res: ServerResponse, + chatRequest: ChatRequest, + metadata: Record = {}, + ): Promise { + this.writeSseHeaders(res); + this.writeSse(res, "ready", { ok: true }); + + let finished = false; + const providedConversationId = chatRequest.conversationId; + + req.on("close", () => { + if (!finished && providedConversationId) { + void this.manager.abortConversation(providedConversationId).catch((error) => { + console.warn("[gateway] Failed to abort conversation after client disconnect", error); + }); + } + }); + + try { + const result = await this.manager.chat(chatRequest, { + onEvent: (event) => { + for (const gatewayEvent of toGatewayEvents(event)) { + this.writeSse(res, gatewayEvent.type, gatewayEvent.data); + } + }, + }); + + this.writeSse(res, "done", { + ...metadata, + conversationId: result.conversationId, + sessionId: result.sessionId, + sessionFile: result.sessionFile, + assistantText: result.assistantText, + }); + + finished = true; + res.end(); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + this.writeSse(res, "error", { message }); + finished = true; + res.end(); + } + } + + private parseChatRequest(body: ChatRequestBody): ChatRequest { + if (typeof body.message !== "string") { + throw new HttpError(400, "message is required and must be a string."); + } + + if (!body.message.trim()) { + throw new HttpError(400, "message must not be empty."); + } + + const conversationId = + typeof body.conversationId === "string" ? body.conversationId : undefined; + + if (conversationId) { + if (conversationId.length > 200) { + throw new HttpError(400, "conversationId must be 200 characters or fewer."); + } + if (/\r|\n/.test(conversationId)) { + throw new HttpError(400, "conversationId must not contain newlines."); + } + } + + let images: ImageContent[] | undefined; + if (body.images !== undefined) { + if (!Array.isArray(body.images)) { + throw new HttpError(400, "images must be an array when provided."); + } + images = body.images as ImageContent[]; + } + + let streamingBehavior: "steer" | "followUp" | undefined; + if (body.streamingBehavior !== undefined) { + if (body.streamingBehavior === "steer" || body.streamingBehavior === "followUp") { + streamingBehavior = body.streamingBehavior; + } else { + throw new HttpError(400, 'streamingBehavior must be "steer" or "followUp".'); + } + } + + return { + conversationId, + message: body.message, + ...(images ? { images } : {}), + ...(streamingBehavior ? { streamingBehavior } : {}), + }; + } + + private parseAdapterChatRequest(body: AdapterChatRequestBody): { + chatRequest: ChatRequest; + adapterKey: string; + } { + const source = this.parseOptionalSegment(body.source, "source") ?? "generic"; + const workspaceId = this.parseOptionalSegment(body.workspaceId, "workspaceId") ?? "default"; + const channelId = this.parseRequiredSegment(body.channelId, "channelId"); + const threadId = this.parseOptionalSegment(body.threadId, "threadId") ?? "root"; + const userId = this.parseOptionalSegment(body.userId, "userId") ?? "anonymous"; + + const normalizedBody: ChatRequestBody = { + ...body, + conversationId: `${source}:${workspaceId}:${channelId}:${threadId}`, + }; + + return { + adapterKey: `${source}:${workspaceId}:${channelId}:${threadId}:${userId}`, + chatRequest: this.parseChatRequest(normalizedBody), + }; + } + + private parseOptionalSegment(value: unknown, fieldName: string): string | undefined { + if (value === undefined || value === null) { + return undefined; + } + + if (typeof value !== "string") { + throw new HttpError(400, `${fieldName} must be a string when provided.`); + } + + const trimmed = value.trim(); + if (!trimmed) { + return undefined; + } + + if (trimmed.includes(":")) { + throw new HttpError(400, `${fieldName} must not contain ':'.`); + } + + return trimmed; + } + + private parseRequiredSegment(value: unknown, fieldName: string): string { + const segment = this.parseOptionalSegment(value, fieldName); + if (!segment) { + throw new HttpError(400, `${fieldName} is required.`); + } + return segment; + } + + private async readJsonBody(req: IncomingMessage, maxBytes = 1024 * 1024): Promise { + const chunks: Buffer[] = []; + let total = 0; + + for await (const chunk of req) { + const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); + total += buffer.length; + if (total > maxBytes) { + throw new HttpError(413, `Request body exceeds ${maxBytes} bytes.`); + } + chunks.push(buffer); + } + + if (chunks.length === 0) { + return {}; + } + + const raw = Buffer.concat(chunks).toString("utf8").trim(); + if (!raw) { + return {}; + } + + try { + return JSON.parse(raw); + } catch { + throw new HttpError(400, "Request body must be valid JSON."); + } + } + + private handleCors(req: IncomingMessage, res: ServerResponse): boolean { + if (!this.config.corsOrigin) { + return false; + } + + res.setHeader("Access-Control-Allow-Origin", this.config.corsOrigin); + res.setHeader("Access-Control-Allow-Headers", "Content-Type, Authorization"); + res.setHeader("Access-Control-Allow-Methods", "GET, POST, DELETE, OPTIONS"); + + if (req.method === "OPTIONS") { + res.statusCode = 204; + res.end(); + return true; + } + + return false; + } + + private isAuthorized(req: IncomingMessage, res: ServerResponse): boolean { + if (!this.config.authToken) { + return true; + } + + const authHeader = req.headers.authorization; + if (authHeader === `Bearer ${this.config.authToken}`) { + return true; + } + + this.sendJson(res, 401, { error: "Unauthorized" }); + return false; + } + + private decodePathSegment(value: string): string { + try { + return decodeURIComponent(value); + } catch { + throw new HttpError(400, "Invalid path segment encoding."); + } + } + + private sendJson(res: ServerResponse, statusCode: number, body: unknown): void { + res.statusCode = statusCode; + res.setHeader("Content-Type", "application/json; charset=utf-8"); + res.end(`${JSON.stringify(body)}\n`); + } + + private sendHtml(res: ServerResponse, statusCode: number, html: string): void { + res.statusCode = statusCode; + res.setHeader("Content-Type", "text/html; charset=utf-8"); + res.end(html); + } + + private writeSseHeaders(res: ServerResponse): void { + res.statusCode = 200; + res.setHeader("Content-Type", "text/event-stream; charset=utf-8"); + res.setHeader("Cache-Control", "no-cache, no-transform"); + res.setHeader("Connection", "keep-alive"); + res.setHeader("X-Accel-Buffering", "no"); + } + + private writeSse(res: ServerResponse, event: string, data: unknown): void { + if (res.writableEnded || res.destroyed) { + return; + } + + const payload = typeof data === "string" ? data : JSON.stringify(data); + res.write(`event: ${event}\n`); + res.write(`data: ${payload}\n\n`); + } +} diff --git a/src/gateway/web-ui.ts b/src/gateway/web-ui.ts new file mode 100644 index 0000000..99120d5 --- /dev/null +++ b/src/gateway/web-ui.ts @@ -0,0 +1,296 @@ +export function getWebUiHtml(): string { + return ` + + + + + Pi Gateway Chat + + + +
+
+
Gateway endpoint: /v1/chat/stream
+
+ + +
+
+ + +
+
+ +
+ +
+
+ + +
+
+
Idle
+ +
+
+
+ + + +`; +} diff --git a/src/index.ts b/src/index.ts index ba22bdf..f3dc76c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,306 +1,45 @@ -/** - * MoA Agent – pi coding agent SDK wrapper - * - * All behaviour is driven by environment variables so the container - * works as a drop-in Docker service without any code changes. - * - * Environment variables - * ───────────────────── - * PROVIDER Provider name (anthropic | openai | google | mistral | - * groq | cerebras | xai | openrouter | ollama | …) - * When set together with MODEL, the exact model is resolved. - * If omitted the session uses whatever model is already in - * settings / the first available one. - * - * ── Ollama (local) ────────────────────────────────────── - * Set PROVIDER=ollama and MODEL= (e.g. llama3.2, - * qwen2.5-coder, mistral, deepseek-r1:14b …). - * No API key is needed. - * OLLAMA_BASE_URL URL of the Ollama HTTP API. - * Default in Docker: - * http://host.docker.internal:11434/v1 - * Default locally: - * http://localhost:11434/v1 - * OLLAMA_CONTEXT_WINDOW Context size in tokens (default: 32768) - * OLLAMA_MAX_TOKENS Max output tokens (default: 8192) - * - * MODEL Model ID as understood by getModel(), e.g. - * "claude-sonnet-4-20250514" or "gpt-4o". - * - * API_KEY Generic API key for the selected PROVIDER. - * Stored as a runtime (non-persisted) key. - * Provider-specific env vars (ANTHROPIC_API_KEY, - * OPENAI_API_KEY, GEMINI_API_KEY, …) are also honoured - * automatically by the underlying pi-ai library. - * Not required when PROVIDER=ollama. - * - * THINKING_LEVEL off | minimal | low | medium | high | xhigh (default: off) - * - * TOOLS all – read, bash, edit, write (default) - * readonly – read only - * none – no built-in tools - * Or a comma-separated list: read,bash,edit,write - * - * SYSTEM_PROMPT Completely replaces the default system prompt. - * APPEND_SYSTEM_PROMPT Appended to the (possibly overridden) system prompt. - * - * PROMPT The user prompt to send. If omitted the agent reads from - * stdin. Exactly one of the two must be provided. - * - * CWD Working directory the agent operates in. - * Defaults to /app (the Docker volume mount point). - * - * SESSION_PERSIST Set to "true" to persist the session under CWD. - * Defaults to in-memory (ephemeral). - * - * VERBOSE_TOOLS Set to "true" to log tool start/end events to stderr. - */ +import { loadConfig } from "./config.js"; +import { runSingleShot } from "./single-shot.js"; +import { ConversationManager } from "./conversation-manager.js"; +import { GatewayHttpServer } from "./gateway/server.js"; -import { getModel, type Model } from "@mariozechner/pi-ai"; -import { - AuthStorage, - createAgentSession, - DefaultResourceLoader, - ModelRegistry, - SessionManager, - codingTools, - readOnlyTools, - readTool, - bashTool, - editTool, - writeTool, -} from "@mariozechner/pi-coding-agent"; +async function run(): Promise { + const config = loadConfig(); -// ─── helpers ──────────────────────────────────────────────────────────────── - -function env(name: string): string | undefined { - const v = process.env[name]; - return v === "" ? undefined : v; -} - -function requiredEnv(name: string): string { - const v = env(name); - if (!v) { - console.error(`[agent] Required environment variable ${name} is not set.`); - process.exit(1); + if (config.gateway.runMode === "single") { + await runSingleShot(config.agent); + return; } - return v; -} -function readStdin(): Promise { - return new Promise((resolve, reject) => { - const chunks: Buffer[] = []; - process.stdin.on("data", (chunk) => chunks.push(chunk)); - process.stdin.on("end", () => resolve(Buffer.concat(chunks).toString("utf8").trim())); - process.stdin.on("error", reject); + const manager = new ConversationManager(config.agent); + await manager.init(); + + const server = new GatewayHttpServer(manager, config.gateway); + await server.start(); + + let shuttingDown = false; + const shutdown = async (signal: string): Promise => { + if (shuttingDown) { + return; + } + shuttingDown = true; + + console.error(`\n[gateway] Received ${signal}, shutting down...`); + await server.stop(); + await manager.shutdown(); + process.exit(0); + }; + + process.on("SIGINT", () => { + void shutdown("SIGINT"); + }); + + process.on("SIGTERM", () => { + void shutdown("SIGTERM"); }); } -// ─── configuration ────────────────────────────────────────────────────────── - -const providerName = env("PROVIDER"); -const modelId = env("MODEL"); -const apiKey = env("API_KEY"); -const ollamaBaseUrl = env("OLLAMA_BASE_URL") ?? "http://host.docker.internal:11434/v1"; -const ollamaCtxWin = parseInt(env("OLLAMA_CONTEXT_WINDOW") ?? "32768", 10); -const ollamaMaxTok = parseInt(env("OLLAMA_MAX_TOKENS") ?? "8192", 10); -const thinkingLevel = (env("THINKING_LEVEL") ?? "off") as - "off" | "minimal" | "low" | "medium" | "high" | "xhigh"; -const toolsEnv = env("TOOLS") ?? "all"; -const systemPrompt = env("SYSTEM_PROMPT"); -const appendPrompt = env("APPEND_SYSTEM_PROMPT"); -const promptText = env("PROMPT"); -const cwd = env("CWD") ?? "/app"; -const sessionPersist = env("SESSION_PERSIST") === "true"; -const verboseTools = env("VERBOSE_TOOLS") === "true"; - -// ─── auth & model registry ────────────────────────────────────────────────── - -const authStorage = AuthStorage.create(); - -// Inject the generic API_KEY for the chosen provider at runtime so it is -// never written to disk inside the container. -if (providerName && apiKey) { - authStorage.setRuntimeApiKey(providerName, apiKey); -} - -// Ollama doesn't require a real key; set a dummy to satisfy the auth layer. -if (providerName?.toLowerCase() === "ollama" && !apiKey) { - authStorage.setRuntimeApiKey("ollama", "ollama"); -} - -const modelRegistry = new ModelRegistry(authStorage); - -// ─── model resolution ─────────────────────────────────────────────────────── - -let model: Model | undefined; - -if (providerName?.toLowerCase() === "ollama") { - // Ollama is OpenAI-compatible; build a custom model object. - if (!modelId) { - console.error("[agent] PROVIDER=ollama requires MODEL to be set (e.g. MODEL=llama3.2)."); - process.exit(1); - } - model = { - id: modelId, - name: modelId, - api: "openai-completions", - provider: "ollama", - baseUrl: ollamaBaseUrl, - reasoning: false, - input: ["text"], - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, - contextWindow: ollamaCtxWin, - maxTokens: ollamaMaxTok, - compat: { - supportsStore: false, - supportsDeveloperRole: false, - supportsReasoningEffort: false, - supportsStrictMode: false, - maxTokensField: "max_tokens", - }, - } satisfies Model<"openai-completions">; - console.error(`[agent] Using Ollama model "${modelId}" at ${ollamaBaseUrl}`); - -} else if (providerName && modelId) { - model = (getModel as (provider: string, model: string) => Model | undefined)(providerName, modelId); - if (!model) { - console.error( - `[agent] Model "${providerName}/${modelId}" not found. ` + - `Run "pi --list-models" to check available models.` - ); - process.exit(1); - } - console.error(`[agent] Using model: ${model.provider}/${model.id}`); - -} else if (providerName || modelId) { - console.error( - "[agent] Both PROVIDER and MODEL must be set together. " + - "Falling back to default model." - ); -} - -// ─── tools ────────────────────────────────────────────────────────────────── - -// eslint-disable-next-line @typescript-eslint/no-explicit-any -const toolMap: Record = { - read: readTool, - bash: bashTool, - edit: editTool, - write: writeTool, -}; - -let tools: typeof codingTools; - -switch (toolsEnv.toLowerCase()) { - case "all": - tools = codingTools; - break; - case "readonly": - case "read-only": - tools = readOnlyTools; - break; - case "none": - tools = []; - break; - default: - tools = toolsEnv - .split(",") - .map((t) => t.trim().toLowerCase()) - .filter(Boolean) - .map((name) => { - const tool = toolMap[name]; - if (!tool) { - console.error(`[agent] Unknown tool "${name}", ignoring.`); - return null; - } - return tool; - }) - .filter((t): t is typeof readTool => t !== null); -} - -// ─── resource loader (system prompt) ──────────────────────────────────────── - -let resourceLoader: DefaultResourceLoader | undefined; - -if (systemPrompt !== undefined || appendPrompt !== undefined) { - resourceLoader = new DefaultResourceLoader({ - systemPromptOverride: (base: string | undefined) => { - let result = systemPrompt !== undefined ? systemPrompt : base; - if (appendPrompt) { - result = result ? `${result}\n\n${appendPrompt}` : appendPrompt; - } - return result; - }, - }); - await resourceLoader.reload(); -} - -// ─── session manager ──────────────────────────────────────────────────────── - -const sessionManager = sessionPersist - ? SessionManager.create(cwd) - : SessionManager.inMemory(); - -// ─── create session ────────────────────────────────────────────────────────── - -const { session } = await createAgentSession({ - ...(model ? { model } : {}), - ...(resourceLoader ? { resourceLoader } : {}), - thinkingLevel, - tools, - sessionManager, - authStorage, - modelRegistry, - cwd, -}); - -// ─── event handling ────────────────────────────────────────────────────────── - -session.subscribe((event) => { - switch (event.type) { - case "message_update": - if (event.assistantMessageEvent.type === "text_delta") { - process.stdout.write(event.assistantMessageEvent.delta); - } - break; - - case "tool_execution_start": - if (verboseTools) { - process.stderr.write(`\n[tool:start] ${event.toolName}\n`); - } - break; - - case "tool_execution_end": - if (verboseTools) { - process.stderr.write(`[tool:end] ${event.toolName}\n`); - } - break; - - case "agent_end": - process.stdout.write("\n"); - break; - } -}); - -// ─── run prompt ────────────────────────────────────────────────────────────── - -let userPrompt: string; - -if (promptText) { - userPrompt = promptText; -} else if (!process.stdin.isTTY) { - userPrompt = await readStdin(); - if (!userPrompt) { - console.error("[agent] Stdin was empty. Set PROMPT or pipe a non-empty message."); - process.exit(1); - } -} else { - console.error( - "[agent] No prompt provided.\n" + - " Set the PROMPT environment variable, or pipe input via stdin." - ); +run().catch((error) => { + console.error("[app] Fatal error:", error); process.exit(1); -} - -await session.prompt(userPrompt); +}); diff --git a/src/single-shot.ts b/src/single-shot.ts new file mode 100644 index 0000000..ae563cd --- /dev/null +++ b/src/single-shot.ts @@ -0,0 +1,54 @@ +import { SessionManager } from "@mariozechner/pi-coding-agent"; +import type { AgentConfig } from "./config.js"; +import { readStdin } from "./config.js"; +import { AgentSessionFactory } from "./agent-session-factory.js"; + +export async function runSingleShot(config: AgentConfig): Promise { + const sessionManager = config.sessionPersist + ? SessionManager.create(config.cwd) + : SessionManager.inMemory(); + + const factory = new AgentSessionFactory(config); + const session = await factory.createSession(sessionManager); + + session.subscribe((event) => { + switch (event.type) { + case "message_update": + if (event.assistantMessageEvent.type === "text_delta") { + process.stdout.write(event.assistantMessageEvent.delta); + } + break; + + case "tool_execution_start": + if (config.verboseTools) { + process.stderr.write(`\n[tool:start] ${event.toolName}\n`); + } + break; + + case "tool_execution_end": + if (config.verboseTools) { + process.stderr.write(`[tool:end] ${event.toolName}\n`); + } + break; + + case "agent_end": + process.stdout.write("\n"); + break; + } + }); + + let userPrompt: string; + + if (config.promptText) { + userPrompt = config.promptText; + } else if (!process.stdin.isTTY) { + userPrompt = await readStdin(); + if (!userPrompt) { + throw new Error("Stdin was empty. Set PROMPT or pipe a non-empty message."); + } + } else { + throw new Error("No prompt provided. Set PROMPT or pipe input via stdin."); + } + + await session.prompt(userPrompt); +} -- 2.49.1 From 8f1352013c7fa41b0a97c6d1b5c9a2559556b780 Mon Sep 17 00:00:00 2001 From: highperfocused Date: Thu, 12 Mar 2026 12:07:54 +0100 Subject: [PATCH 2/8] Fix Docker metadata SHA tag for PR builds --- .gitea/workflows/docker-build.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.gitea/workflows/docker-build.yml b/.gitea/workflows/docker-build.yml index 168cccf..6d8a470 100644 --- a/.gitea/workflows/docker-build.yml +++ b/.gitea/workflows/docker-build.yml @@ -43,7 +43,9 @@ jobs: type=ref,event=pr type=semver,pattern={{version}} type=semver,pattern={{major}}.{{minor}} - type=sha,prefix={{branch}}- + # Use a stable non-empty prefix. {{branch}} can be empty on PR events, + # which would otherwise generate an invalid tag like ":-". + type=sha,prefix=sha- type=raw,value=latest,enable={{is_default_branch}} - name: Build and push Docker image -- 2.49.1 From e41edea7455f0e9de289d57e49d6e368d037c05e Mon Sep 17 00:00:00 2001 From: highperfocused Date: Thu, 12 Mar 2026 12:11:22 +0100 Subject: [PATCH 3/8] Export PR Docker builds with --load to avoid no-output warning --- .gitea/workflows/docker-build.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitea/workflows/docker-build.yml b/.gitea/workflows/docker-build.yml index 6d8a470..2f1ff96 100644 --- a/.gitea/workflows/docker-build.yml +++ b/.gitea/workflows/docker-build.yml @@ -53,6 +53,9 @@ jobs: with: context: . file: ./Dockerfile + # Push on branch builds; on PR builds export to local Docker daemon + # to avoid "No output specified" warnings while still validating the build. push: ${{ gitea.event_name != 'pull_request' }} + load: ${{ gitea.event_name == 'pull_request' }} tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} -- 2.49.1 From bb90b237f4ab27b9e2a523ad6e0d09c438fd5389 Mon Sep 17 00:00:00 2001 From: highperfocused Date: Thu, 12 Mar 2026 12:11:57 +0100 Subject: [PATCH 4/8] =?UTF-8?q?Entferne=20die=20Push-=20und=20Load-Optione?= =?UTF-8?q?n=20f=C3=BCr=20PR-Bauten=20im=20Docker-Workflow,=20um=20Warnung?= =?UTF-8?q?en=20zu=20vermeiden.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitea/workflows/docker-build.yml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/.gitea/workflows/docker-build.yml b/.gitea/workflows/docker-build.yml index 2f1ff96..2e4cff6 100644 --- a/.gitea/workflows/docker-build.yml +++ b/.gitea/workflows/docker-build.yml @@ -53,9 +53,6 @@ jobs: with: context: . file: ./Dockerfile - # Push on branch builds; on PR builds export to local Docker daemon - # to avoid "No output specified" warnings while still validating the build. - push: ${{ gitea.event_name != 'pull_request' }} - load: ${{ gitea.event_name == 'pull_request' }} + # push: ${{ gitea.event_name != 'pull_request' }} tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} -- 2.49.1 From 77e3e8ac528deceb40efb60639cbd727ef9467e9 Mon Sep 17 00:00:00 2001 From: highperfocused Date: Thu, 12 Mar 2026 12:16:44 +0100 Subject: [PATCH 5/8] =?UTF-8?q?Aktiviere=20die=20Push-Option=20f=C3=BCr=20?= =?UTF-8?q?Docker-Bauten=20im=20Workflow,=20um=20die=20Handhabung=20von=20?= =?UTF-8?q?Pull-Requests=20zu=20optimieren.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitea/workflows/docker-build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitea/workflows/docker-build.yml b/.gitea/workflows/docker-build.yml index 2e4cff6..6d8a470 100644 --- a/.gitea/workflows/docker-build.yml +++ b/.gitea/workflows/docker-build.yml @@ -53,6 +53,6 @@ jobs: with: context: . file: ./Dockerfile - # push: ${{ gitea.event_name != 'pull_request' }} + push: ${{ gitea.event_name != 'pull_request' }} tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} -- 2.49.1 From 9c71a5d431ed9da83bba112a007bac4b5e831a58 Mon Sep 17 00:00:00 2001 From: highperfocused Date: Thu, 12 Mar 2026 12:17:37 +0100 Subject: [PATCH 6/8] =?UTF-8?q?Aktiviere=20das=20Pushen=20von=20Docker-Ima?= =?UTF-8?q?ges=20im=20Workflow=20f=C3=BCr=20alle=20Builds,=20um=20die=20Ko?= =?UTF-8?q?nsistenz=20zu=20gew=C3=A4hrleisten.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitea/workflows/docker-build.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitea/workflows/docker-build.yml b/.gitea/workflows/docker-build.yml index 6d8a470..65c1f24 100644 --- a/.gitea/workflows/docker-build.yml +++ b/.gitea/workflows/docker-build.yml @@ -53,6 +53,7 @@ jobs: with: context: . file: ./Dockerfile - push: ${{ gitea.event_name != 'pull_request' }} + # push: ${{ gitea.event_name != 'pull_request' }} + push: true tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} -- 2.49.1 From d9e3baaa3617268c43f210bbb2f171c455677c06 Mon Sep 17 00:00:00 2001 From: highperfocused Date: Thu, 12 Mar 2026 14:32:43 +0100 Subject: [PATCH 7/8] Add gateway/web UI docs and new session button --- README.md | 7 ++ docs/README.md | 4 + docs/gateway.md | 226 ++++++++++++++++++++++++++++++++++++++++++ docs/web-ui.md | 157 +++++++++++++++++++++++++++++ src/gateway/web-ui.ts | 24 ++++- 5 files changed, 417 insertions(+), 1 deletion(-) create mode 100644 docs/README.md create mode 100644 docs/gateway.md create mode 100644 docs/web-ui.md diff --git a/README.md b/README.md index ee216d2..8ec8602 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,13 @@ You can also keep one-shot mode (`RUN_MODE=single`) for script usage. --- +## Documentation + +- Gateway internals: `docs/gateway.md` +- Web UI internals: `docs/web-ui.md` + +--- + ## Run ```bash diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..868deb2 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,4 @@ +# Documentation + +- [Gateway: how it works](./gateway.md) +- [Web UI: how it works](./web-ui.md) diff --git a/docs/gateway.md b/docs/gateway.md new file mode 100644 index 0000000..863e274 --- /dev/null +++ b/docs/gateway.md @@ -0,0 +1,226 @@ +# Gateway: how it works + +This document explains how the HTTP gateway in this repository works. + +## Overview + +The gateway is a thin HTTP layer around `@mariozechner/pi-coding-agent` sessions. + +Main goals: + +- expose chat over HTTP (`/v1/chat`, `/v1/chat/stream`) +- keep long-lived conversation state per `conversationId` +- support adapter-friendly IDs (Slack/Matrix/etc.) +- optionally expose a built-in browser UI at `/` + +Key source files: + +- `src/index.ts` +- `src/gateway/server.ts` +- `src/conversation-manager.ts` +- `src/agent-session-factory.ts` +- `src/gateway/events.ts` + +--- + +## Startup flow + +1. `src/index.ts` loads env config via `loadConfig()`. +2. If `RUN_MODE=single`, one-shot mode is executed and exits. +3. Otherwise (`RUN_MODE=gateway`), it: + - creates `ConversationManager` + - initializes persisted conversation metadata (if enabled) + - starts `GatewayHttpServer` +4. On `SIGINT`/`SIGTERM`, it stops the HTTP server and disposes sessions. + +--- + +## Core components + +### 1) `GatewayHttpServer` (`src/gateway/server.ts`) + +Responsible for: + +- request routing +- auth and CORS handling +- request validation +- SSE streaming responses +- JSON/HTML responses + +### 2) `ConversationManager` (`src/conversation-manager.ts`) + +Responsible for: + +- creating and tracking conversation records +- loading/opening/creating agent sessions +- serializing prompts per conversation (queue) +- persisting conversation index + session metadata +- aborting/deleting sessions + +### 3) `AgentSessionFactory` (`src/agent-session-factory.ts`) + +Responsible for constructing agent sessions with: + +- model/provider selection (including Ollama support) +- tool selection (`all`, `readonly`, `none`, or subset) +- optional system prompt override/append +- auth storage and model registry wiring + +--- + +## Conversation model + +A conversation is identified by `conversationId`. + +- If client provides no ID, a UUID is generated. +- Each conversation maps to one `AgentSession`. +- Multiple requests for the same conversation are queued and processed in order. +- Metadata is exposed via `/v1/conversations` endpoints. + +Validation rules: + +- `conversationId` max length: 200 +- `conversationId` must not contain `\n`/`\r` +- `message` must be a non-empty string +- `images` must be an array when provided +- `streamingBehavior` must be `"steer"` or `"followUp"` when provided + +--- + +## Persistence behavior + +Controlled by `SESSION_PERSIST`. + +### `SESSION_PERSIST=true` + +Data is stored under: + +- `/.gateway/conversations.json` (conversation index) +- `/.gateway/sessions/...` (session files) + +At startup, the index is loaded and conversations are restored as unloaded records. +The actual `AgentSession` is lazily opened when that conversation is used. + +### `SESSION_PERSIST=false` + +Everything is in memory and lost on process exit. + +--- + +## API routes + +### Health/UI + +- `GET /health` → `{ "ok": true }` +- `GET /` → built-in Web UI HTML (if `GATEWAY_ENABLE_WEB_UI=true`) + +### Conversation management + +- `GET /v1/conversations` +- `POST /v1/conversations` +- `GET /v1/conversations/:id` +- `DELETE /v1/conversations/:id` +- `POST /v1/conversations/:id/abort` + +### Chat + +- `POST /v1/chat` (JSON response) +- `POST /v1/chat/stream` (SSE response) + +### Adapter endpoints + +- `POST /v1/adapters/chat` +- `POST /v1/adapters/chat/stream` + +Adapter request fields (`source`, `workspaceId`, `channelId`, `threadId`, `userId`) are normalized into: + +- `conversationId = source:workspaceId:channelId:threadId` +- `adapterKey = source:workspaceId:channelId:threadId:userId` + +`channelId` is required. `:` is not allowed inside segment values. + +--- + +## Streaming (SSE) behavior + +For `/v1/chat/stream` and `/v1/adapters/chat/stream`: + +1. Response starts with SSE headers. +2. A `ready` event is emitted. +3. Agent session events are mapped to gateway events (`src/gateway/events.ts`). +4. A final `done` event is emitted with summary payload. +5. On failure, an `error` event is emitted and stream ends. + +Common emitted event types: + +- `assistant_text_delta` +- `assistant_thinking_delta` +- `assistant_message_update` +- `tool_start`, `tool_update`, `tool_end` +- `agent_start`, `agent_end` +- `retry_start`, `retry_end` +- `compaction_start`, `compaction_end` +- `done` +- `error` + +`done` includes: + +- `conversationId` +- `sessionId` +- `sessionFile` +- `assistantText` +- plus `adapterKey` on adapter streaming routes + +Disconnect behavior: + +- if client disconnects mid-stream **and** the request had a `conversationId`, the server attempts to abort that conversation. + +--- + +## Auth and CORS + +### Bearer auth + +If `GATEWAY_AUTH_TOKEN` is set, requests must include: + +`Authorization: Bearer ` + +Otherwise server returns `401`. + +Note: auth is checked before route handling, so this applies to all routes (including `GET /` and `GET /health`). + +### CORS + +If `GATEWAY_CORS_ORIGIN` is set, server adds: + +- `Access-Control-Allow-Origin` +- `Access-Control-Allow-Headers: Content-Type, Authorization` +- `Access-Control-Allow-Methods: GET, POST, DELETE, OPTIONS` + +`OPTIONS` preflight returns `204`. + +--- + +## Request limits and errors + +- JSON body max size: 1 MiB (413 if exceeded) +- invalid JSON: 400 +- invalid payload field types: 400 +- unknown route: 404 +- unexpected errors: 500 + +--- + +## Environment variables (gateway-relevant) + +- `RUN_MODE` (`gateway` | `single`) +- `GATEWAY_HOST` +- `GATEWAY_PORT` +- `GATEWAY_CORS_ORIGIN` +- `GATEWAY_AUTH_TOKEN` +- `GATEWAY_ENABLE_WEB_UI` +- `SESSION_PERSIST` +- `VERBOSE_TOOLS` +- `CWD` + +See `.env.example` for complete defaults and comments. diff --git a/docs/web-ui.md b/docs/web-ui.md new file mode 100644 index 0000000..1965c00 --- /dev/null +++ b/docs/web-ui.md @@ -0,0 +1,157 @@ +# Web UI: how it works + +This document explains the built-in browser UI served by the gateway. + +Source file: + +- `src/gateway/web-ui.ts` + +--- + +## Overview + +The Web UI is a single HTML page returned by `GET /` (when enabled). + +It is intentionally simple: + +- plain HTML/CSS/JS (no framework) +- sends requests to `/v1/chat/stream` +- renders streamed assistant text in real time +- stores and reuses `conversationId` in `localStorage` + +--- + +## Availability + +The UI route is controlled by `GATEWAY_ENABLE_WEB_UI`: + +- `true` (default): `GET /` returns UI +- `false`: `GET /` returns `404` with `{ "error": "Web UI disabled" }` + +If `GATEWAY_AUTH_TOKEN` is enabled, `GET /` also requires an `Authorization` header, because auth is global in the gateway. + +--- + +## UI sections + +### 1) Session/header card + +- **Conversation ID input** (`#conversationId`) + - if empty, server auto-creates one during first message + - persisted locally under `pi_gateway_conversation_id` +- **Auth token input** (`#token`) + - optional bearer token included in API requests from the page + - this affects `fetch` calls only; it does not add auth headers to the initial page load + +### 2) Messages card + +- container `#messages` +- each message is appended as a `.msg.user` or `.msg.assistant` block +- text is rendered as plain text (`textContent`), not Markdown/HTML + +### 3) Composer card + +- textarea `#message` +- status text `#status` +- buttons: + - `Send` + - `New session` + +--- + +## Local state + +The page keeps only minimal browser-side state: + +- `conversationId` in input + local storage +- rendered message list in DOM +- current request state via button disabled/enabled + +Storage key: + +- `pi_gateway_conversation_id` + +On load, if this key exists, it pre-fills the conversation input. + +--- + +## Send flow + +When user presses **Send** (or Cmd/Ctrl + Enter): + +1. Trim textarea value; ignore empty input. +2. Disable `Send` and `New session` buttons. +3. Append user message bubble. +4. Append empty assistant bubble. +5. Build payload: + - required: `message` + - optional: `conversationId` (if input non-empty) +6. POST to `/v1/chat/stream` with JSON body. +7. Parse SSE stream incrementally. +8. Update assistant bubble and status based on events. +9. Re-enable buttons when request finishes/fails. + +--- + +## SSE event handling in UI + +Handled events: + +- `assistant_text_delta` + - appends `data.delta` to assistant message bubble +- `done` + - reads `data.conversationId` + - updates conversation input + - writes `pi_gateway_conversation_id` + - status becomes `Done • conversation ` +- `error` + - status becomes error text + - writes fallback error into assistant bubble if empty + +Other event types are currently ignored by the UI. + +--- + +## New session button behavior + +Clicking **New session**: + +- does nothing if a request is currently streaming (`Send` disabled) +- clears conversation ID input +- removes `pi_gateway_conversation_id` from local storage +- clears rendered message list +- sets status to `New session ready` +- focuses the message textarea + +This starts a fresh client-side chat thread. The next send will create a new conversation on the server. + +--- + +## Keyboard shortcut + +In the message textarea: + +- `Cmd + Enter` (macOS) or `Ctrl + Enter` (Windows/Linux) +- triggers the same send flow as the Send button + +--- + +## Limitations + +Current UI is intentionally minimal: + +- no server-side message history loading +- no cancel/abort button for in-flight response +- no rendering for tool events/thinking events +- no Markdown formatting +- no multi-conversation sidebar + +It is best used as a lightweight test/debug interface. + +--- + +## Related API docs + +For full gateway/API details, see: + +- `docs/gateway.md` diff --git a/src/gateway/web-ui.ts b/src/gateway/web-ui.ts index 99120d5..a54ad06 100644 --- a/src/gateway/web-ui.ts +++ b/src/gateway/web-ui.ts @@ -69,8 +69,9 @@ export function getWebUiHtml(): string { .actions { display: grid; - grid-template-columns: 1fr auto; + grid-template-columns: 1fr auto auto; gap: 10px; + align-items: center; } button { @@ -80,6 +81,10 @@ export function getWebUiHtml(): string { padding: 10px 18px; } + button.secondary { + background: #1f2734; + } + button:disabled { opacity: 0.6; cursor: default; @@ -136,6 +141,7 @@ export function getWebUiHtml(): string {
Idle
+
@@ -146,6 +152,7 @@ export function getWebUiHtml(): string { const tokenInput = document.getElementById("token"); const messageInput = document.getElementById("message"); const sendButton = document.getElementById("send"); + const newSessionButton = document.getElementById("newSession"); const messagesEl = document.getElementById("messages"); const statusEl = document.getElementById("status"); @@ -167,6 +174,18 @@ export function getWebUiHtml(): string { return el; } + function startNewSession() { + if (sendButton.disabled) { + return; + } + + conversationInput.value = ""; + localStorage.removeItem("pi_gateway_conversation_id"); + messagesEl.textContent = ""; + setStatus("New session ready"); + messageInput.focus(); + } + async function consumeSse(body, onEvent) { const reader = body.getReader(); const decoder = new TextDecoder(); @@ -214,6 +233,7 @@ export function getWebUiHtml(): string { } sendButton.disabled = true; + newSessionButton.disabled = true; setStatus("Streaming response..."); addMessage("user", message); @@ -280,9 +300,11 @@ export function getWebUiHtml(): string { setStatus("Request failed"); } finally { sendButton.disabled = false; + newSessionButton.disabled = false; } } + newSessionButton.addEventListener("click", startNewSession); sendButton.addEventListener("click", sendMessage); messageInput.addEventListener("keydown", (event) => { if ((event.metaKey || event.ctrlKey) && event.key === "Enter") { -- 2.49.1 From 2ed59c03343a048edc8eb66bbd506b91a1d9a667 Mon Sep 17 00:00:00 2001 From: highperfocused Date: Thu, 12 Mar 2026 14:44:01 +0100 Subject: [PATCH 8/8] docs: add channel integration guide --- README.md | 2 + docs/README.md | 1 + docs/channels.md | 231 +++++++++++++++++++++++++++++++++++++++++++++++ docs/gateway.md | 2 + 4 files changed, 236 insertions(+) create mode 100644 docs/channels.md diff --git a/README.md b/README.md index 8ec8602..5fe6fc2 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ You can also keep one-shot mode (`RUN_MODE=single`) for script usage. - Gateway internals: `docs/gateway.md` - Web UI internals: `docs/web-ui.md` +- Channel integrations (Web UI, Slack, Matrix, custom): `docs/channels.md` --- @@ -135,3 +136,4 @@ The gateway is transport-agnostic. For each upstream client, map its thread iden Then post messages to `/v1/chat` or `/v1/chat/stream` with that `conversationId`. This keeps one agent session per thread across transports. +For detailed setup guidance per transport, see `docs/channels.md`. diff --git a/docs/README.md b/docs/README.md index 868deb2..6f1b798 100644 --- a/docs/README.md +++ b/docs/README.md @@ -2,3 +2,4 @@ - [Gateway: how it works](./gateway.md) - [Web UI: how it works](./web-ui.md) +- [Channels: Web UI, Slack, Matrix, custom adapters](./channels.md) diff --git a/docs/channels.md b/docs/channels.md new file mode 100644 index 0000000..6a0bb02 --- /dev/null +++ b/docs/channels.md @@ -0,0 +1,231 @@ +# Channels: Web UI, Slack, Matrix, and custom adapters + +This document explains how **channels** work in the gateway and how to integrate transports like Web UI, Slack, Matrix, or your own chat source. + +--- + +## What is a "channel" in this project? + +In this gateway, a channel is not a separate server object you provision. + +A channel is simply an upstream conversation context (for example: a Slack thread, a Matrix room thread, or a browser chat tab) that maps to one stable `conversationId`. + +The gateway uses that `conversationId` to keep one long-lived agent session. + +--- + +## How channels are represented internally + +You have two integration options: + +1. **Direct chat endpoints** + - `POST /v1/chat` + - `POST /v1/chat/stream` + - You provide `conversationId` directly. + +2. **Adapter endpoints** (recommended for Slack/Matrix/etc.) + - `POST /v1/adapters/chat` + - `POST /v1/adapters/chat/stream` + - You provide channel/thread fields; the gateway derives `conversationId` for you. + +Adapter ID format: + +- `conversationId = source:workspaceId:channelId:threadId` +- `adapterKey = source:workspaceId:channelId:threadId:userId` + +Field behavior on adapter routes: + +- `source` (optional, default: `generic`) +- `workspaceId` (optional, default: `default`) +- `channelId` (**required**) +- `threadId` (optional, default: `root`) +- `userId` (optional, default: `anonymous`) + +Important constraints: + +- segment values must be strings +- `channelId` must not be empty +- segment values must not contain `:` + +If your upstream IDs contain `:` (common in Matrix), sanitize/encode them before sending to adapter routes. + +--- + +## Do I need to "create" a channel first? + +Usually: **no**. + +A conversation/session is auto-created on first message. You can optionally pre-create one via: + +- `POST /v1/conversations` + +But for most channel integrations, you just start sending messages to chat endpoints with stable IDs. + +--- + +## Requirements checklist (all channels) + +1. Gateway is running (`RUN_MODE=gateway`) +2. Model/provider credentials are configured (`PROVIDER`, `MODEL`, `API_KEY` or provider-specific key) +3. Your adapter/client can reach the gateway URL +4. If `GATEWAY_AUTH_TOKEN` is set, send `Authorization: Bearer ` +5. For browser apps on another origin, set `GATEWAY_CORS_ORIGIN` +6. If you want restart-safe sessions, set `SESSION_PERSIST=true` + +--- + +## Web UI channel + +### Built-in UI + +The built-in UI is available at `GET /` when: + +- `GATEWAY_ENABLE_WEB_UI=true` + +How it works: + +- sends messages to `/v1/chat/stream` +- stores `conversationId` in browser local storage +- "New session" clears local `conversationId` and starts a new thread + +### Custom web app + +If you build your own web frontend, call `/v1/chat` or `/v1/chat/stream` and choose your own ID format, e.g.: + +- `web::` + +Example: + +```json +{ + "conversationId": "web:user-42:chat-main", + "message": "Summarize the last response" +} +``` + +--- + +## Slack channel integration + +There is no built-in Slack bot in this repository; you run a small Slack adapter service. + +Typical adapter needs: + +- Slack bot token + app configuration (events/interactions) +- webhook/event receiver in your adapter +- code that forwards messages to gateway and posts replies back to Slack + +Recommended mapping from Slack event payload: + +- `source`: `"slack"` +- `workspaceId`: Slack team/workspace ID (e.g. `T123`) +- `channelId`: Slack channel ID (e.g. `C456`) **required** +- `threadId`: Slack `thread_ts` (or fallback strategy) +- `userId`: Slack user ID (e.g. `U789`) + +Example request to gateway: + +```bash +curl -N -X POST http://localhost:8787/v1/adapters/chat/stream \ + -H 'content-type: application/json' \ + -d '{ + "source": "slack", + "workspaceId": "T123", + "channelId": "C456", + "threadId": "1712233.991", + "userId": "U789", + "message": "Please summarize this thread" + }' +``` + +Threading strategy tip: + +- use a stable `threadId` per Slack thread to keep context isolated per thread +- if you omit `threadId`, gateway uses `root` (all messages for that channel collapse into one thread context) + +--- + +## Matrix channel integration + +There is no built-in Matrix bot in this repository; use a Matrix bot/bridge process as adapter. + +Typical adapter needs: + +- Matrix bot account + access token +- event listener for room messages +- logic to send assistant output back into room/thread + +Recommended mapping: + +- `source`: `"matrix"` +- `workspaceId`: homeserver or deployment identifier (optional but useful) +- `channelId`: Matrix `room_id` (**required**, encoded/sanitized for adapter route) +- `threadId`: Matrix thread root event ID (encoded/sanitized, or omit for `root`) +- `userId`: Matrix sender ID (encoded/sanitized) + +Because adapter segments cannot contain `:`, Matrix IDs should be encoded in the adapter, e.g.: + +```ts +const encodeSegment = (value: string) => encodeURIComponent(value); +``` + +Example payload (encoded values): + +```json +{ + "source": "matrix", + "workspaceId": "matrix.org", + "channelId": "%21roomid%3Amatrix.org", + "threadId": "%24rootEventId%3Amatrix.org", + "userId": "%40alice%3Amatrix.org", + "message": "What did we decide in this thread?" +} +``` + +Alternative: call `/v1/chat` directly and provide your own `conversationId` format if you do not want per-segment adapter normalization. + +--- + +## Custom channel template (Discord, Telegram, email, etc.) + +For any source, map your upstream identifiers into adapter fields and keep that mapping stable. + +Minimal payload: + +```json +{ + "channelId": "your-channel-id", + "message": "Hello" +} +``` + +Full payload template: + +```json +{ + "source": "custom", + "workspaceId": "tenant-a", + "channelId": "channel-123", + "threadId": "thread-456", + "userId": "user-789", + "message": "Hello" +} +``` + +--- + +## Best practices + +- Keep `threadId` stable per thread (do not generate random values per message) +- Include `userId` for audit/telemetry (`adapterKey`) even though it does not change `conversationId` +- Avoid `:` in any adapter segment value +- Use streaming endpoints for better UX (`/stream`) +- Persist sessions in production (`SESSION_PERSIST=true`) +- Add retry/backoff logic in your adapter for network failures + +--- + +## Related docs + +- Gateway internals/API: `docs/gateway.md` +- Built-in browser UI: `docs/web-ui.md` diff --git a/docs/gateway.md b/docs/gateway.md index 863e274..49681e8 100644 --- a/docs/gateway.md +++ b/docs/gateway.md @@ -139,6 +139,8 @@ Adapter request fields (`source`, `workspaceId`, `channelId`, `threadId`, `userI `channelId` is required. `:` is not allowed inside segment values. +For practical setup patterns per transport (Web UI, Slack, Matrix, custom), see `docs/channels.md`. + --- ## Streaming (SSE) behavior -- 2.49.1