From c88647104d1621b5021d6e3ca18fb8a1f5cdd394 Mon Sep 17 00:00:00 2001 From: Daniel Hiltgen Date: Wed, 29 Oct 2025 11:50:56 -0700 Subject: [PATCH] int: harden server lifecycle (#12835) this should reduce zombies during integration runs --- integration/README.md | 2 +- integration/utils_test.go | 166 ++++++++++++++++++-------------------- 2 files changed, 78 insertions(+), 90 deletions(-) diff --git a/integration/README.md b/integration/README.md index 1dfd0e359b..5d2acc456e 100644 --- a/integration/README.md +++ b/integration/README.md @@ -7,7 +7,7 @@ By default, these tests are disabled so `go test ./...` will exercise only unit The integration tests have 2 modes of operating. -1. By default, they will start the server on a random port, run the tests, and then shutdown the server. +1. By default, on Unix systems, they will start the server on a random port, run the tests, and then shutdown the server. On Windows you must ALWAYS run the server on OLLAMA_HOST for the tests to work. 2. If `OLLAMA_TEST_EXISTING` is set to a non-empty string, the tests will run against an existing running server, which can be remote based on your `OLLAMA_HOST` environment variable > [!IMPORTANT] diff --git a/integration/utils_test.go b/integration/utils_test.go index 66e8d73165..c0bac5e14f 100644 --- a/integration/utils_test.go +++ b/integration/utils_test.go @@ -323,7 +323,7 @@ func GetTestEndpoint() (*api.Client, string) { } } - if os.Getenv("OLLAMA_TEST_EXISTING") == "" && port == defaultPort { + if os.Getenv("OLLAMA_TEST_EXISTING") == "" && runtime.GOOS != "windows" && port == defaultPort { port = FindPort() } @@ -337,15 +337,20 @@ func GetTestEndpoint() (*api.Client, string) { http.DefaultClient), fmt.Sprintf("%s:%s", host, port) } -var serverMutex sync.Mutex -var serverReady bool -var serverLogFile string +// Server lifecycle management +var ( + serverMutex sync.Mutex + serverReady bool + serverLog bytes.Buffer + serverDone chan int + serverCmd *exec.Cmd +) func startServer(t *testing.T, ctx context.Context, ollamaHost string) error { // Make sure the server has been built CLIName, err := filepath.Abs("../ollama") if err != nil { - return err + return fmt.Errorf("failed to get absolute path: %w", err) } if runtime.GOOS == "windows" { @@ -353,72 +358,42 @@ func startServer(t *testing.T, ctx context.Context, ollamaHost string) error { } _, err = os.Stat(CLIName) if err != nil { - return fmt.Errorf("CLI missing, did you forget to build first? %w", err) + return fmt.Errorf("CLI missing, did you forget to 'go build .' first? %w", err) } serverMutex.Lock() defer serverMutex.Unlock() if serverReady { return nil } + serverDone = make(chan int) + serverLog.Reset() if tmp := os.Getenv("OLLAMA_HOST"); tmp != ollamaHost { slog.Info("setting env", "OLLAMA_HOST", ollamaHost) t.Setenv("OLLAMA_HOST", ollamaHost) } - logDir := t.TempDir() - slog.Info("starting server", "url", ollamaHost) - done, err := SpawnServer(ctx, "../ollama", logDir) - if err != nil { - return fmt.Errorf("failed to start server: %w", err) - } - + serverCmd = exec.Command(CLIName, "serve") + serverCmd.Stderr = &serverLog + serverCmd.Stdout = &serverLog go func() { - <-ctx.Done() - serverMutex.Lock() - defer serverMutex.Unlock() - exitCode := <-done - if exitCode > 0 { - slog.Warn("server failure", "exit", exitCode) - } - serverReady = false - }() - - // TODO wait only long enough for the server to be responsive... - time.Sleep(500 * time.Millisecond) - - serverReady = true - return nil -} - -func SpawnServer(ctx context.Context, command, logDir string) (chan int, error) { - done := make(chan int) - fp, err := os.CreateTemp(logDir, "ollama-server-*.log") - if err != nil { - return nil, fmt.Errorf("failed to create log file: %w", err) - } - serverLogFile = fp.Name() - - cmd := exec.CommandContext(ctx, command, "serve") - cmd.Stderr = fp - cmd.Stdout = fp - - go func() { - slog.Info("starting server...") - if err := cmd.Run(); err != nil { - // "signal: killed" expected + slog.Info("starting server", "url", ollamaHost) + if err := serverCmd.Run(); err != nil { + // "signal: killed" expected during normal shutdown if !strings.Contains(err.Error(), "signal") { slog.Info("failed to run server", "error", err) } } var code int - if cmd.ProcessState != nil { - code = cmd.ProcessState.ExitCode() + if serverCmd.ProcessState != nil { + code = serverCmd.ProcessState.ExitCode() } slog.Info("server exited") - done <- code + serverDone <- code }() - return done, nil + + serverReady = true + return nil } func PullIfMissing(ctx context.Context, client *api.Client, modelName string) error { @@ -479,52 +454,65 @@ var serverProcMutex sync.Mutex // Starts the server if needed func InitServerConnection(ctx context.Context, t *testing.T) (*api.Client, string, func()) { client, testEndpoint := GetTestEndpoint() - if os.Getenv("OLLAMA_TEST_EXISTING") == "" { - serverProcMutex.Lock() - if err := startServer(t, ctx, testEndpoint); err != nil { + cleanup := func() {} + if os.Getenv("OLLAMA_TEST_EXISTING") == "" && runtime.GOOS != "windows" { + var err error + err = startServer(t, ctx, testEndpoint) + if err != nil { t.Fatal(err) } + cleanup = func() { + serverMutex.Lock() + defer serverMutex.Unlock() + serverReady = false + + slog.Info("shutting down server") + serverCmd.Process.Signal(os.Interrupt) + slog.Info("waiting for server to exit") + <-serverDone + slog.Info("terminate complete") + + if t.Failed() { + slog.Warn("SERVER LOG FOLLOWS") + io.Copy(os.Stderr, &serverLog) + slog.Warn("END OF SERVER") + } + slog.Info("cleanup complete", "failed", t.Failed()) + } } // Make sure server is online and healthy before returning - listCtx, cancel := context.WithDeadlineCause( - ctx, - time.Now().Add(120*time.Second), - fmt.Errorf("list models took too long"), - ) - defer cancel() - models, err := client.ListRunning(listCtx) - if err != nil { - t.Fatal(err) - } - if len(models.Models) > 0 { - names := make([]string, len(models.Models)) - for i, m := range models.Models { - names[i] = m.Name + for { + select { + case <-ctx.Done(): + t.Fatalf("context done before server ready: %v", ctx.Err()) + break + default: } - slog.Info("currently loaded", "models", names) + listCtx, cancel := context.WithDeadlineCause( + ctx, + time.Now().Add(10*time.Second), + fmt.Errorf("list models took too long"), + ) + defer cancel() + models, err := client.ListRunning(listCtx) + if err != nil { + if runtime.GOOS == "windows" { + t.Fatalf("did you forget to start the server: %v", err) + } + time.Sleep(10 * time.Millisecond) + continue + } + if len(models.Models) > 0 { + names := make([]string, len(models.Models)) + for i, m := range models.Models { + names[i] = m.Name + } + slog.Info("currently loaded", "models", names) + } + break } - return client, testEndpoint, func() { - if os.Getenv("OLLAMA_TEST_EXISTING") == "" { - defer serverProcMutex.Unlock() - if t.Failed() { - fp, err := os.Open(serverLogFile) - if err != nil { - slog.Error("failed to open server log", "logfile", serverLogFile, "error", err) - return - } - defer fp.Close() - data, err := io.ReadAll(fp) - if err != nil { - slog.Error("failed to read server log", "logfile", serverLogFile, "error", err) - return - } - slog.Warn("SERVER LOG FOLLOWS") - os.Stderr.Write(data) - slog.Warn("END OF SERVER") - } - } - } + return client, testEndpoint, cleanup } func ChatTestHelper(ctx context.Context, t *testing.T, req api.ChatRequest, anyResp []string) {