mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-28 18:09:14 +02:00
Compare commits
2 Commits
agent/lamb
...
agent/j/e7
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a6002c58cb | ||
|
|
dac739205a |
@@ -715,7 +715,7 @@ multica repo list | add | update | delete
|
||||
#### Runtime
|
||||
|
||||
```bash
|
||||
multica runtime list | get | ping | delete
|
||||
multica runtime list | usage | activity | update
|
||||
```
|
||||
|
||||
#### 配置 / 更新
|
||||
|
||||
@@ -33,7 +33,6 @@ import type {
|
||||
RuntimeUsage,
|
||||
IssueUsageSummary,
|
||||
RuntimeHourlyActivity,
|
||||
RuntimePing,
|
||||
RuntimeUpdate,
|
||||
RuntimeModelListRequest,
|
||||
RuntimeLocalSkillListRequest,
|
||||
@@ -583,14 +582,6 @@ export class ApiClient {
|
||||
return this.fetch(`/api/runtimes/${runtimeId}/activity`);
|
||||
}
|
||||
|
||||
async pingRuntime(runtimeId: string): Promise<RuntimePing> {
|
||||
return this.fetch(`/api/runtimes/${runtimeId}/ping`, { method: "POST" });
|
||||
}
|
||||
|
||||
async getPingResult(runtimeId: string, pingId: string): Promise<RuntimePing> {
|
||||
return this.fetch(`/api/runtimes/${runtimeId}/ping/${pingId}`);
|
||||
}
|
||||
|
||||
async initiateUpdate(
|
||||
runtimeId: string,
|
||||
targetVersion: string,
|
||||
|
||||
@@ -146,19 +146,6 @@ export interface SetAgentSkillsRequest {
|
||||
skill_ids: string[];
|
||||
}
|
||||
|
||||
export type RuntimePingStatus = "pending" | "running" | "completed" | "failed" | "timeout";
|
||||
|
||||
export interface RuntimePing {
|
||||
id: string;
|
||||
runtime_id: string;
|
||||
status: RuntimePingStatus;
|
||||
output?: string;
|
||||
error?: string;
|
||||
duration_ms?: number;
|
||||
created_at: string;
|
||||
updated_at: string;
|
||||
}
|
||||
|
||||
export interface IssueUsageSummary {
|
||||
total_input_tokens: number;
|
||||
total_output_tokens: number;
|
||||
|
||||
@@ -16,8 +16,6 @@ export type {
|
||||
SetAgentSkillsRequest,
|
||||
RuntimeUsage,
|
||||
RuntimeHourlyActivity,
|
||||
RuntimePing,
|
||||
RuntimePingStatus,
|
||||
RuntimeUpdate,
|
||||
RuntimeUpdateStatus,
|
||||
RuntimeModel,
|
||||
|
||||
@@ -1,120 +0,0 @@
|
||||
import { useState, useEffect, useCallback, useRef } from "react";
|
||||
import { Loader2, CheckCircle2, XCircle, Zap } from "lucide-react";
|
||||
import { Button } from "@multica/ui/components/ui/button";
|
||||
import { api } from "@multica/core/api";
|
||||
import type { RuntimePingStatus } from "@multica/core/types";
|
||||
|
||||
const pingStatusConfig: Record<
|
||||
RuntimePingStatus,
|
||||
{ label: string; icon: typeof Loader2; color: string }
|
||||
> = {
|
||||
pending: { label: "Waiting for daemon...", icon: Loader2, color: "text-muted-foreground" },
|
||||
running: { label: "Running test...", icon: Loader2, color: "text-info" },
|
||||
completed: { label: "Connected", icon: CheckCircle2, color: "text-success" },
|
||||
failed: { label: "Failed", icon: XCircle, color: "text-destructive" },
|
||||
timeout: { label: "Timeout", icon: XCircle, color: "text-warning" },
|
||||
};
|
||||
|
||||
export function PingSection({ runtimeId }: { runtimeId: string }) {
|
||||
const [status, setStatus] = useState<RuntimePingStatus | null>(null);
|
||||
const [output, setOutput] = useState("");
|
||||
const [error, setError] = useState("");
|
||||
const [durationMs, setDurationMs] = useState<number | null>(null);
|
||||
const [testing, setTesting] = useState(false);
|
||||
const pollRef = useRef<ReturnType<typeof setInterval> | null>(null);
|
||||
|
||||
const cleanup = useCallback(() => {
|
||||
if (pollRef.current) {
|
||||
clearInterval(pollRef.current);
|
||||
pollRef.current = null;
|
||||
}
|
||||
}, []);
|
||||
|
||||
useEffect(() => cleanup, [cleanup]);
|
||||
|
||||
const handleTest = async () => {
|
||||
cleanup();
|
||||
setTesting(true);
|
||||
setStatus("pending");
|
||||
setOutput("");
|
||||
setError("");
|
||||
setDurationMs(null);
|
||||
|
||||
try {
|
||||
const ping = await api.pingRuntime(runtimeId);
|
||||
|
||||
pollRef.current = setInterval(async () => {
|
||||
try {
|
||||
const result = await api.getPingResult(runtimeId, ping.id);
|
||||
setStatus(result.status as RuntimePingStatus);
|
||||
|
||||
if (result.status === "completed") {
|
||||
setOutput(result.output ?? "");
|
||||
setDurationMs(result.duration_ms ?? null);
|
||||
setTesting(false);
|
||||
cleanup();
|
||||
} else if (result.status === "failed" || result.status === "timeout") {
|
||||
setError(result.error ?? "Unknown error");
|
||||
setDurationMs(result.duration_ms ?? null);
|
||||
setTesting(false);
|
||||
cleanup();
|
||||
}
|
||||
} catch {
|
||||
// ignore poll errors
|
||||
}
|
||||
}, 2000);
|
||||
} catch {
|
||||
setStatus("failed");
|
||||
setError("Failed to initiate test");
|
||||
setTesting(false);
|
||||
}
|
||||
};
|
||||
|
||||
const config = status ? pingStatusConfig[status] : null;
|
||||
const Icon = config?.icon;
|
||||
const isActive = status === "pending" || status === "running";
|
||||
|
||||
return (
|
||||
<div className="space-y-2">
|
||||
<div className="flex items-center gap-2">
|
||||
<Button
|
||||
variant="outline"
|
||||
size="xs"
|
||||
onClick={handleTest}
|
||||
disabled={testing}
|
||||
>
|
||||
{testing ? (
|
||||
<Loader2 className="h-3 w-3 animate-spin" />
|
||||
) : (
|
||||
<Zap className="h-3 w-3" />
|
||||
)}
|
||||
{testing ? "Testing..." : "Test Connection"}
|
||||
</Button>
|
||||
|
||||
{config && Icon && (
|
||||
<span className={`inline-flex items-center gap-1 text-xs ${config.color}`}>
|
||||
<Icon className={`h-3 w-3 ${isActive ? "animate-spin" : ""}`} />
|
||||
{config.label}
|
||||
{durationMs != null && (
|
||||
<span className="text-muted-foreground">
|
||||
({(durationMs / 1000).toFixed(1)}s)
|
||||
</span>
|
||||
)}
|
||||
</span>
|
||||
)}
|
||||
</div>
|
||||
|
||||
{status === "completed" && output && (
|
||||
<div className="rounded-lg border bg-success/5 px-3 py-2">
|
||||
<pre className="text-xs font-mono whitespace-pre-wrap">{output}</pre>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{(status === "failed" || status === "timeout") && error && (
|
||||
<div className="rounded-lg border border-destructive/20 bg-destructive/5 px-3 py-2">
|
||||
<p className="text-xs text-destructive">{error}</p>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
@@ -24,7 +24,6 @@ import { ActorAvatar } from "../../common/actor-avatar";
|
||||
import { formatLastSeen } from "../utils";
|
||||
import { StatusBadge, InfoField } from "./shared";
|
||||
import { ProviderLogo } from "./provider-logo";
|
||||
import { PingSection } from "./ping-section";
|
||||
import { UpdateSection } from "./update-section";
|
||||
import { UsageSection } from "./usage-section";
|
||||
|
||||
@@ -164,14 +163,6 @@ export function RuntimeDetail({ runtime }: { runtime: AgentRuntime }) {
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* Connection Test */}
|
||||
<div>
|
||||
<h3 className="text-xs font-medium text-muted-foreground mb-3">
|
||||
Connection Test
|
||||
</h3>
|
||||
<PingSection runtimeId={runtime.id} />
|
||||
</div>
|
||||
|
||||
{/* Usage */}
|
||||
<div>
|
||||
<h3 className="text-xs font-medium text-muted-foreground mb-3">
|
||||
|
||||
@@ -36,13 +36,6 @@ var runtimeActivityCmd = &cobra.Command{
|
||||
RunE: runRuntimeActivity,
|
||||
}
|
||||
|
||||
var runtimePingCmd = &cobra.Command{
|
||||
Use: "ping <runtime-id>",
|
||||
Short: "Ping a runtime to check connectivity",
|
||||
Args: exactArgs(1),
|
||||
RunE: runRuntimePing,
|
||||
}
|
||||
|
||||
var runtimeUpdateCmd = &cobra.Command{
|
||||
Use: "update <runtime-id>",
|
||||
Short: "Initiate a CLI update on a runtime",
|
||||
@@ -54,7 +47,6 @@ func init() {
|
||||
runtimeCmd.AddCommand(runtimeListCmd)
|
||||
runtimeCmd.AddCommand(runtimeUsageCmd)
|
||||
runtimeCmd.AddCommand(runtimeActivityCmd)
|
||||
runtimeCmd.AddCommand(runtimePingCmd)
|
||||
runtimeCmd.AddCommand(runtimeUpdateCmd)
|
||||
|
||||
// runtime list
|
||||
@@ -67,10 +59,6 @@ func init() {
|
||||
// runtime activity
|
||||
runtimeActivityCmd.Flags().String("output", "table", "Output format: table or json")
|
||||
|
||||
// runtime ping
|
||||
runtimePingCmd.Flags().String("output", "json", "Output format: table or json")
|
||||
runtimePingCmd.Flags().Bool("wait", false, "Wait for ping to complete (poll until done)")
|
||||
|
||||
// runtime update
|
||||
runtimeUpdateCmd.Flags().String("target-version", "", "Target version to update to (required)")
|
||||
runtimeUpdateCmd.Flags().String("output", "json", "Output format: table or json")
|
||||
@@ -189,60 +177,6 @@ func runRuntimeActivity(cmd *cobra.Command, args []string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func runRuntimePing(cmd *cobra.Command, args []string) error {
|
||||
client, err := newAPIClient(cmd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Initiate ping.
|
||||
var ping map[string]any
|
||||
if err := client.PostJSON(ctx, "/api/runtimes/"+args[0]+"/ping", nil, &ping); err != nil {
|
||||
return fmt.Errorf("initiate ping: %w", err)
|
||||
}
|
||||
|
||||
wait, _ := cmd.Flags().GetBool("wait")
|
||||
if !wait {
|
||||
output, _ := cmd.Flags().GetString("output")
|
||||
if output == "json" {
|
||||
return cli.PrintJSON(os.Stdout, ping)
|
||||
}
|
||||
fmt.Printf("Ping initiated: %s (status: %s)\n", strVal(ping, "id"), strVal(ping, "status"))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Poll until completed/failed/timeout.
|
||||
pingID := strVal(ping, "id")
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("timed out waiting for ping (last status: %s)", strVal(ping, "status"))
|
||||
case <-time.After(1 * time.Second):
|
||||
}
|
||||
|
||||
if err := client.GetJSON(ctx, "/api/runtimes/"+args[0]+"/ping/"+pingID, &ping); err != nil {
|
||||
return fmt.Errorf("get ping status: %w", err)
|
||||
}
|
||||
|
||||
status := strVal(ping, "status")
|
||||
if status == "completed" || status == "failed" || status == "timeout" {
|
||||
output, _ := cmd.Flags().GetString("output")
|
||||
if output == "json" {
|
||||
return cli.PrintJSON(os.Stdout, ping)
|
||||
}
|
||||
if status == "completed" {
|
||||
fmt.Printf("Ping completed in %sms\n", strVal(ping, "duration_ms"))
|
||||
} else {
|
||||
fmt.Printf("Ping %s: %s\n", status, strVal(ping, "error"))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func runRuntimeUpdate(cmd *cobra.Command, args []string) error {
|
||||
client, err := newAPIClient(cmd)
|
||||
if err != nil {
|
||||
|
||||
@@ -158,7 +158,6 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus, analytics
|
||||
|
||||
r.Post("/runtimes/{runtimeId}/tasks/claim", h.ClaimTaskByRuntime)
|
||||
r.Get("/runtimes/{runtimeId}/tasks/pending", h.ListPendingTasksByRuntime)
|
||||
r.Post("/runtimes/{runtimeId}/ping/{pingId}/result", h.ReportPingResult)
|
||||
r.Post("/runtimes/{runtimeId}/update/{updateId}/result", h.ReportUpdateResult)
|
||||
r.Post("/runtimes/{runtimeId}/models/{requestId}/result", h.ReportModelListResult)
|
||||
r.Post("/runtimes/{runtimeId}/local-skills/{requestId}/result", h.ReportLocalSkillListResult)
|
||||
@@ -369,8 +368,6 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus, analytics
|
||||
r.Route("/{runtimeId}", func(r chi.Router) {
|
||||
r.Get("/usage", h.GetRuntimeUsage)
|
||||
r.Get("/activity", h.GetRuntimeTaskActivity)
|
||||
r.Post("/ping", h.InitiatePing)
|
||||
r.Get("/ping/{pingId}", h.GetPing)
|
||||
r.Post("/update", h.InitiateUpdate)
|
||||
r.Get("/update/{updateId}", h.GetUpdate)
|
||||
r.Post("/models", h.InitiateListModels)
|
||||
|
||||
@@ -217,18 +217,12 @@ func (c *Client) GetTaskStatus(ctx context.Context, taskID string) (string, erro
|
||||
// HeartbeatResponse contains the server's response to a heartbeat, including any pending actions.
|
||||
type HeartbeatResponse struct {
|
||||
Status string `json:"status"`
|
||||
PendingPing *PendingPing `json:"pending_ping,omitempty"`
|
||||
PendingUpdate *PendingUpdate `json:"pending_update,omitempty"`
|
||||
PendingModelList *PendingModelList `json:"pending_model_list,omitempty"`
|
||||
PendingLocalSkills *PendingLocalSkills `json:"pending_local_skills,omitempty"`
|
||||
PendingLocalSkillImport *PendingLocalSkillImport `json:"pending_local_skill_import,omitempty"`
|
||||
}
|
||||
|
||||
// PendingPing represents a ping test request from the server.
|
||||
type PendingPing struct {
|
||||
ID string `json:"id"`
|
||||
}
|
||||
|
||||
// PendingUpdate represents a CLI update request from the server.
|
||||
type PendingUpdate struct {
|
||||
ID string `json:"id"`
|
||||
@@ -261,10 +255,6 @@ func (c *Client) SendHeartbeat(ctx context.Context, runtimeID string) (*Heartbea
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
func (c *Client) ReportPingResult(ctx context.Context, runtimeID, pingID string, result map[string]any) error {
|
||||
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/ping/%s/result", runtimeID, pingID), result, nil)
|
||||
}
|
||||
|
||||
// ReportUpdateResult sends the CLI update result back to the server.
|
||||
func (c *Client) ReportUpdateResult(ctx context.Context, runtimeID, updateID string, result map[string]any) error {
|
||||
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/update/%s/result", runtimeID, updateID), result, nil)
|
||||
|
||||
@@ -498,14 +498,6 @@ func (d *Daemon) heartbeatLoop(ctx context.Context) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Handle pending ping requests.
|
||||
if resp.PendingPing != nil {
|
||||
rt := d.findRuntime(rid)
|
||||
if rt != nil {
|
||||
go d.handlePing(ctx, *rt, resp.PendingPing.ID)
|
||||
}
|
||||
}
|
||||
|
||||
// Handle pending update requests.
|
||||
if resp.PendingUpdate != nil {
|
||||
go d.handleUpdate(ctx, rid, resp.PendingUpdate)
|
||||
@@ -634,91 +626,6 @@ func (d *Daemon) handleLocalSkillImport(ctx context.Context, rt Runtime, pending
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Daemon) handlePing(ctx context.Context, rt Runtime, pingID string) {
|
||||
d.logger.Info("ping requested", "runtime_id", rt.ID, "ping_id", pingID, "provider", rt.Provider)
|
||||
|
||||
start := time.Now()
|
||||
|
||||
entry, ok := d.cfg.Agents[rt.Provider]
|
||||
if !ok {
|
||||
d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{
|
||||
"status": "failed",
|
||||
"error": fmt.Sprintf("no agent configured for provider %q", rt.Provider),
|
||||
"duration_ms": time.Since(start).Milliseconds(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
backend, err := agent.New(rt.Provider, agent.Config{
|
||||
ExecutablePath: entry.Path,
|
||||
Logger: d.logger,
|
||||
})
|
||||
if err != nil {
|
||||
d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{
|
||||
"status": "failed",
|
||||
"error": err.Error(),
|
||||
"duration_ms": time.Since(start).Milliseconds(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
pingCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
||||
defer cancel()
|
||||
|
||||
session, err := backend.Execute(pingCtx, "Respond with exactly one word: pong", agent.ExecOptions{
|
||||
MaxTurns: 1,
|
||||
Timeout: 60 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{
|
||||
"status": "failed",
|
||||
"error": err.Error(),
|
||||
"duration_ms": time.Since(start).Milliseconds(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Drain messages
|
||||
go func() {
|
||||
for range session.Messages {
|
||||
}
|
||||
}()
|
||||
|
||||
var result agent.Result
|
||||
select {
|
||||
case result = <-session.Result:
|
||||
case <-pingCtx.Done():
|
||||
d.logger.Warn("ping timed out waiting for result", "runtime_id", rt.ID, "ping_id", pingID)
|
||||
d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{
|
||||
"status": "failed",
|
||||
"error": "ping context cancelled while waiting for result",
|
||||
"duration_ms": time.Since(start).Milliseconds(),
|
||||
})
|
||||
return
|
||||
}
|
||||
durationMs := time.Since(start).Milliseconds()
|
||||
|
||||
if result.Status == "completed" {
|
||||
d.logger.Info("ping completed", "runtime_id", rt.ID, "ping_id", pingID, "duration_ms", durationMs)
|
||||
d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{
|
||||
"status": "completed",
|
||||
"output": result.Output,
|
||||
"duration_ms": durationMs,
|
||||
})
|
||||
} else {
|
||||
errMsg := result.Error
|
||||
if errMsg == "" {
|
||||
errMsg = fmt.Sprintf("agent returned status: %s", result.Status)
|
||||
}
|
||||
d.logger.Warn("ping failed", "runtime_id", rt.ID, "ping_id", pingID, "error", errMsg)
|
||||
d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{
|
||||
"status": "failed",
|
||||
"error": errMsg,
|
||||
"duration_ms": durationMs,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// handleUpdate performs the CLI update when triggered by the server via heartbeat.
|
||||
func (d *Daemon) handleUpdate(ctx context.Context, runtimeID string, update *PendingUpdate) {
|
||||
// Desktop-managed daemons share their CLI binary with the Electron app,
|
||||
|
||||
@@ -510,11 +510,6 @@ func (h *Handler) DaemonHeartbeat(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
resp := map[string]any{"status": "ok"}
|
||||
|
||||
// Check for pending ping requests for this runtime.
|
||||
if pending := h.PingStore.PopPending(req.RuntimeID); pending != nil {
|
||||
resp["pending_ping"] = map[string]string{"id": pending.ID}
|
||||
}
|
||||
|
||||
// Check for pending update requests for this runtime.
|
||||
if pending := h.UpdateStore.PopPending(req.RuntimeID); pending != nil {
|
||||
resp["pending_update"] = map[string]string{
|
||||
|
||||
@@ -2,6 +2,8 @@ package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log/slog"
|
||||
@@ -22,6 +24,14 @@ import (
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
)
|
||||
|
||||
// randomID returns a random 16-byte hex string used as a request ID for
|
||||
// in-memory stores (model list, local skills, CLI update, etc.).
|
||||
func randomID() string {
|
||||
b := make([]byte, 16)
|
||||
rand.Read(b)
|
||||
return hex.EncodeToString(b)
|
||||
}
|
||||
|
||||
type txStarter interface {
|
||||
Begin(ctx context.Context) (pgx.Tx, error)
|
||||
}
|
||||
@@ -47,7 +57,6 @@ type Handler struct {
|
||||
TaskService *service.TaskService
|
||||
AutopilotService *service.AutopilotService
|
||||
EmailService *service.EmailService
|
||||
PingStore *PingStore
|
||||
UpdateStore *UpdateStore
|
||||
ModelListStore *ModelListStore
|
||||
LocalSkillListStore *RuntimeLocalSkillListStore
|
||||
@@ -78,7 +87,6 @@ func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *event
|
||||
TaskService: taskSvc,
|
||||
AutopilotService: service.NewAutopilotService(queries, txStarter, bus, taskSvc),
|
||||
EmailService: emailService,
|
||||
PingStore: NewPingStore(),
|
||||
UpdateStore: NewUpdateStore(),
|
||||
ModelListStore: NewModelListStore(),
|
||||
LocalSkillListStore: NewRuntimeLocalSkillListStore(),
|
||||
|
||||
@@ -15,8 +15,8 @@ import (
|
||||
// ---------------------------------------------------------------------------
|
||||
//
|
||||
// The server cannot call the daemon directly (the daemon is behind the user's
|
||||
// NAT and only polls the server). So "list models for this runtime" uses the
|
||||
// same pattern as PingStore: server creates a pending request, daemon pops it
|
||||
// NAT and only polls the server). So "list models for this runtime" uses a
|
||||
// pending-request pattern: server creates a pending request, daemon pops it
|
||||
// on the next heartbeat, executes locally, and reports the result back.
|
||||
|
||||
// ModelListStatus represents the lifecycle of a model list request.
|
||||
|
||||
@@ -1,207 +0,0 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// In-memory ping store
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// PingStatus represents the lifecycle of a runtime ping test.
|
||||
type PingStatus string
|
||||
|
||||
const (
|
||||
PingPending PingStatus = "pending"
|
||||
PingRunning PingStatus = "running"
|
||||
PingCompleted PingStatus = "completed"
|
||||
PingFailed PingStatus = "failed"
|
||||
PingTimeout PingStatus = "timeout"
|
||||
)
|
||||
|
||||
// PingRequest represents a pending or completed ping test.
|
||||
type PingRequest struct {
|
||||
ID string `json:"id"`
|
||||
RuntimeID string `json:"runtime_id"`
|
||||
Status PingStatus `json:"status"`
|
||||
Output string `json:"output,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
DurationMs int64 `json:"duration_ms,omitempty"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
}
|
||||
|
||||
// PingStore is a thread-safe in-memory store for ping requests.
|
||||
// Pings expire after 2 minutes.
|
||||
type PingStore struct {
|
||||
mu sync.Mutex
|
||||
pings map[string]*PingRequest // keyed by ping ID
|
||||
}
|
||||
|
||||
func NewPingStore() *PingStore {
|
||||
return &PingStore{
|
||||
pings: make(map[string]*PingRequest),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *PingStore) Create(runtimeID string) *PingRequest {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Clean up old pings for this runtime
|
||||
for id, p := range s.pings {
|
||||
if time.Since(p.CreatedAt) > 2*time.Minute {
|
||||
delete(s.pings, id)
|
||||
}
|
||||
}
|
||||
|
||||
ping := &PingRequest{
|
||||
ID: randomID(),
|
||||
RuntimeID: runtimeID,
|
||||
Status: PingPending,
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
s.pings[ping.ID] = ping
|
||||
return ping
|
||||
}
|
||||
|
||||
func (s *PingStore) Get(id string) *PingRequest {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
p, ok := s.pings[id]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
// Check for timeout
|
||||
if p.Status == PingPending && time.Since(p.CreatedAt) > 60*time.Second {
|
||||
p.Status = PingTimeout
|
||||
p.Error = "daemon did not respond within 60 seconds"
|
||||
p.UpdatedAt = time.Now()
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
// PopPending returns and removes the oldest pending ping for a runtime.
|
||||
func (s *PingStore) PopPending(runtimeID string) *PingRequest {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
var oldest *PingRequest
|
||||
for _, p := range s.pings {
|
||||
if p.RuntimeID == runtimeID && p.Status == PingPending {
|
||||
if oldest == nil || p.CreatedAt.Before(oldest.CreatedAt) {
|
||||
oldest = p
|
||||
}
|
||||
}
|
||||
}
|
||||
if oldest != nil {
|
||||
oldest.Status = PingRunning
|
||||
oldest.UpdatedAt = time.Now()
|
||||
}
|
||||
return oldest
|
||||
}
|
||||
|
||||
func (s *PingStore) Complete(id string, output string, durationMs int64) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if p, ok := s.pings[id]; ok {
|
||||
p.Status = PingCompleted
|
||||
p.Output = output
|
||||
p.DurationMs = durationMs
|
||||
p.UpdatedAt = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *PingStore) Fail(id string, errMsg string, durationMs int64) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if p, ok := s.pings[id]; ok {
|
||||
p.Status = PingFailed
|
||||
p.Error = errMsg
|
||||
p.DurationMs = durationMs
|
||||
p.UpdatedAt = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
func randomID() string {
|
||||
b := make([]byte, 16)
|
||||
rand.Read(b)
|
||||
return hex.EncodeToString(b)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Handlers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// InitiatePing creates a new ping request (protected route, called by frontend).
|
||||
func (h *Handler) InitiatePing(w http.ResponseWriter, r *http.Request) {
|
||||
runtimeID := chi.URLParam(r, "runtimeId")
|
||||
|
||||
rt, err := h.Queries.GetAgentRuntime(r.Context(), parseUUID(runtimeID))
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, "runtime not found")
|
||||
return
|
||||
}
|
||||
|
||||
if _, ok := h.requireWorkspaceMember(w, r, uuidToString(rt.WorkspaceID), "runtime not found"); !ok {
|
||||
return
|
||||
}
|
||||
|
||||
ping := h.PingStore.Create(runtimeID)
|
||||
writeJSON(w, http.StatusOK, ping)
|
||||
}
|
||||
|
||||
// GetPing returns the status of a ping request (protected route, called by frontend).
|
||||
func (h *Handler) GetPing(w http.ResponseWriter, r *http.Request) {
|
||||
pingID := chi.URLParam(r, "pingId")
|
||||
|
||||
ping := h.PingStore.Get(pingID)
|
||||
if ping == nil {
|
||||
writeError(w, http.StatusNotFound, "ping not found")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, ping)
|
||||
}
|
||||
|
||||
// ReportPingResult receives the ping result from the daemon.
|
||||
func (h *Handler) ReportPingResult(w http.ResponseWriter, r *http.Request) {
|
||||
runtimeID := chi.URLParam(r, "runtimeId")
|
||||
|
||||
// Verify the caller owns this runtime's workspace.
|
||||
if _, ok := h.requireDaemonRuntimeAccess(w, r, runtimeID); !ok {
|
||||
return
|
||||
}
|
||||
|
||||
pingID := chi.URLParam(r, "pingId")
|
||||
|
||||
var req struct {
|
||||
Status string `json:"status"` // "completed" or "failed"
|
||||
Output string `json:"output"`
|
||||
Error string `json:"error"`
|
||||
DurationMs int64 `json:"duration_ms"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||||
return
|
||||
}
|
||||
|
||||
if req.Status == "completed" {
|
||||
h.PingStore.Complete(pingID, req.Output, req.DurationMs)
|
||||
} else {
|
||||
h.PingStore.Fail(pingID, req.Error, req.DurationMs)
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
}
|
||||
Reference in New Issue
Block a user