int: harden server lifecycle (#12835)

this should reduce zombies during integration runs
This commit is contained in:
Daniel Hiltgen
2025-10-29 11:50:56 -07:00
committed by GitHub
parent 05aff4a4f1
commit c88647104d
2 changed files with 78 additions and 90 deletions

View File

@@ -7,7 +7,7 @@ By default, these tests are disabled so `go test ./...` will exercise only unit
The integration tests have 2 modes of operating. The integration tests have 2 modes of operating.
1. By default, they will start the server on a random port, run the tests, and then shutdown the server. 1. By default, on Unix systems, they will start the server on a random port, run the tests, and then shutdown the server. On Windows you must ALWAYS run the server on OLLAMA_HOST for the tests to work.
2. If `OLLAMA_TEST_EXISTING` is set to a non-empty string, the tests will run against an existing running server, which can be remote based on your `OLLAMA_HOST` environment variable 2. If `OLLAMA_TEST_EXISTING` is set to a non-empty string, the tests will run against an existing running server, which can be remote based on your `OLLAMA_HOST` environment variable
> [!IMPORTANT] > [!IMPORTANT]

View File

@@ -323,7 +323,7 @@ func GetTestEndpoint() (*api.Client, string) {
} }
} }
if os.Getenv("OLLAMA_TEST_EXISTING") == "" && port == defaultPort { if os.Getenv("OLLAMA_TEST_EXISTING") == "" && runtime.GOOS != "windows" && port == defaultPort {
port = FindPort() port = FindPort()
} }
@@ -337,15 +337,20 @@ func GetTestEndpoint() (*api.Client, string) {
http.DefaultClient), fmt.Sprintf("%s:%s", host, port) http.DefaultClient), fmt.Sprintf("%s:%s", host, port)
} }
var serverMutex sync.Mutex // Server lifecycle management
var serverReady bool var (
var serverLogFile string serverMutex sync.Mutex
serverReady bool
serverLog bytes.Buffer
serverDone chan int
serverCmd *exec.Cmd
)
func startServer(t *testing.T, ctx context.Context, ollamaHost string) error { func startServer(t *testing.T, ctx context.Context, ollamaHost string) error {
// Make sure the server has been built // Make sure the server has been built
CLIName, err := filepath.Abs("../ollama") CLIName, err := filepath.Abs("../ollama")
if err != nil { if err != nil {
return err return fmt.Errorf("failed to get absolute path: %w", err)
} }
if runtime.GOOS == "windows" { if runtime.GOOS == "windows" {
@@ -353,72 +358,42 @@ func startServer(t *testing.T, ctx context.Context, ollamaHost string) error {
} }
_, err = os.Stat(CLIName) _, err = os.Stat(CLIName)
if err != nil { if err != nil {
return fmt.Errorf("CLI missing, did you forget to build first? %w", err) return fmt.Errorf("CLI missing, did you forget to 'go build .' first? %w", err)
} }
serverMutex.Lock() serverMutex.Lock()
defer serverMutex.Unlock() defer serverMutex.Unlock()
if serverReady { if serverReady {
return nil return nil
} }
serverDone = make(chan int)
serverLog.Reset()
if tmp := os.Getenv("OLLAMA_HOST"); tmp != ollamaHost { if tmp := os.Getenv("OLLAMA_HOST"); tmp != ollamaHost {
slog.Info("setting env", "OLLAMA_HOST", ollamaHost) slog.Info("setting env", "OLLAMA_HOST", ollamaHost)
t.Setenv("OLLAMA_HOST", ollamaHost) t.Setenv("OLLAMA_HOST", ollamaHost)
} }
logDir := t.TempDir() serverCmd = exec.Command(CLIName, "serve")
serverCmd.Stderr = &serverLog
serverCmd.Stdout = &serverLog
go func() {
slog.Info("starting server", "url", ollamaHost) slog.Info("starting server", "url", ollamaHost)
done, err := SpawnServer(ctx, "../ollama", logDir) if err := serverCmd.Run(); err != nil {
if err != nil { // "signal: killed" expected during normal shutdown
return fmt.Errorf("failed to start server: %w", err)
}
go func() {
<-ctx.Done()
serverMutex.Lock()
defer serverMutex.Unlock()
exitCode := <-done
if exitCode > 0 {
slog.Warn("server failure", "exit", exitCode)
}
serverReady = false
}()
// TODO wait only long enough for the server to be responsive...
time.Sleep(500 * time.Millisecond)
serverReady = true
return nil
}
func SpawnServer(ctx context.Context, command, logDir string) (chan int, error) {
done := make(chan int)
fp, err := os.CreateTemp(logDir, "ollama-server-*.log")
if err != nil {
return nil, fmt.Errorf("failed to create log file: %w", err)
}
serverLogFile = fp.Name()
cmd := exec.CommandContext(ctx, command, "serve")
cmd.Stderr = fp
cmd.Stdout = fp
go func() {
slog.Info("starting server...")
if err := cmd.Run(); err != nil {
// "signal: killed" expected
if !strings.Contains(err.Error(), "signal") { if !strings.Contains(err.Error(), "signal") {
slog.Info("failed to run server", "error", err) slog.Info("failed to run server", "error", err)
} }
} }
var code int var code int
if cmd.ProcessState != nil { if serverCmd.ProcessState != nil {
code = cmd.ProcessState.ExitCode() code = serverCmd.ProcessState.ExitCode()
} }
slog.Info("server exited") slog.Info("server exited")
done <- code serverDone <- code
}() }()
return done, nil
serverReady = true
return nil
} }
func PullIfMissing(ctx context.Context, client *api.Client, modelName string) error { func PullIfMissing(ctx context.Context, client *api.Client, modelName string) error {
@@ -479,22 +454,53 @@ var serverProcMutex sync.Mutex
// Starts the server if needed // Starts the server if needed
func InitServerConnection(ctx context.Context, t *testing.T) (*api.Client, string, func()) { func InitServerConnection(ctx context.Context, t *testing.T) (*api.Client, string, func()) {
client, testEndpoint := GetTestEndpoint() client, testEndpoint := GetTestEndpoint()
if os.Getenv("OLLAMA_TEST_EXISTING") == "" { cleanup := func() {}
serverProcMutex.Lock() if os.Getenv("OLLAMA_TEST_EXISTING") == "" && runtime.GOOS != "windows" {
if err := startServer(t, ctx, testEndpoint); err != nil { var err error
err = startServer(t, ctx, testEndpoint)
if err != nil {
t.Fatal(err) t.Fatal(err)
} }
cleanup = func() {
serverMutex.Lock()
defer serverMutex.Unlock()
serverReady = false
slog.Info("shutting down server")
serverCmd.Process.Signal(os.Interrupt)
slog.Info("waiting for server to exit")
<-serverDone
slog.Info("terminate complete")
if t.Failed() {
slog.Warn("SERVER LOG FOLLOWS")
io.Copy(os.Stderr, &serverLog)
slog.Warn("END OF SERVER")
}
slog.Info("cleanup complete", "failed", t.Failed())
}
} }
// Make sure server is online and healthy before returning // Make sure server is online and healthy before returning
for {
select {
case <-ctx.Done():
t.Fatalf("context done before server ready: %v", ctx.Err())
break
default:
}
listCtx, cancel := context.WithDeadlineCause( listCtx, cancel := context.WithDeadlineCause(
ctx, ctx,
time.Now().Add(120*time.Second), time.Now().Add(10*time.Second),
fmt.Errorf("list models took too long"), fmt.Errorf("list models took too long"),
) )
defer cancel() defer cancel()
models, err := client.ListRunning(listCtx) models, err := client.ListRunning(listCtx)
if err != nil { if err != nil {
t.Fatal(err) if runtime.GOOS == "windows" {
t.Fatalf("did you forget to start the server: %v", err)
}
time.Sleep(10 * time.Millisecond)
continue
} }
if len(models.Models) > 0 { if len(models.Models) > 0 {
names := make([]string, len(models.Models)) names := make([]string, len(models.Models))
@@ -503,28 +509,10 @@ func InitServerConnection(ctx context.Context, t *testing.T) (*api.Client, strin
} }
slog.Info("currently loaded", "models", names) slog.Info("currently loaded", "models", names)
} }
break
}
return client, testEndpoint, func() { return client, testEndpoint, cleanup
if os.Getenv("OLLAMA_TEST_EXISTING") == "" {
defer serverProcMutex.Unlock()
if t.Failed() {
fp, err := os.Open(serverLogFile)
if err != nil {
slog.Error("failed to open server log", "logfile", serverLogFile, "error", err)
return
}
defer fp.Close()
data, err := io.ReadAll(fp)
if err != nil {
slog.Error("failed to read server log", "logfile", serverLogFile, "error", err)
return
}
slog.Warn("SERVER LOG FOLLOWS")
os.Stderr.Write(data)
slog.Warn("END OF SERVER")
}
}
}
} }
func ChatTestHelper(ctx context.Context, t *testing.T, req api.ChatRequest, anyResp []string) { func ChatTestHelper(ctx context.Context, t *testing.T, req api.ChatRequest, anyResp []string) {