diff --git a/integration/concurrency_test.go b/integration/concurrency_test.go index efcd5410f6..cb44e900e2 100644 --- a/integration/concurrency_test.go +++ b/integration/concurrency_test.go @@ -109,6 +109,8 @@ func TestMultiModelStress(t *testing.T) { defer cancel() client, _, cleanup := InitServerConnection(ctx, t) defer cleanup() + initialTimeout := 120 * time.Second + streamTimeout := 20 * time.Second // Make sure all the models are pulled before we get started for _, model := range chosenModels { @@ -147,6 +149,8 @@ chooseModels: for _, m := range models.Models { if m.SizeVRAM == 0 { slog.Info("model running on CPU", "name", m.Name, "target", targetLoadCount, "chosen", chosenModels[:targetLoadCount]) + initialTimeout = 240 * time.Second + streamTimeout = 30 * time.Second break chooseModels } } @@ -172,10 +176,7 @@ chooseModels: k := r.Int() % len(reqs) reqs[k].Model = chosenModels[i] slog.Info("Starting", "model", reqs[k].Model, "iteration", j, "request", reqs[k].Messages[0].Content) - DoChat(ctx, t, client, reqs[k], resps[k], - 120*time.Second, // Be extra patient for the model to load initially - 10*time.Second, // Once results start streaming, fail if they stall - ) + DoChat(ctx, t, client, reqs[k], resps[k], initialTimeout, streamTimeout) } }(i) } diff --git a/integration/context_test.go b/integration/context_test.go index c8a87160f6..1d2d655493 100644 --- a/integration/context_test.go +++ b/integration/context_test.go @@ -78,7 +78,7 @@ func TestContextExhaustion(t *testing.T) { // Send multiple generate requests with prior context and ensure the response is coherant and expected func TestParallelGenerateWithHistory(t *testing.T) { - modelOverride := "gpt-oss:20b" + modelName := "gpt-oss:20b" req, resp := GenerateRequests() numParallel := 2 iterLimit := 2 @@ -88,15 +88,23 @@ func TestParallelGenerateWithHistory(t *testing.T) { defer cancel() client, _, cleanup := InitServerConnection(ctx, t) defer cleanup() + initialTimeout := 120 * time.Second + streamTimeout := 20 * time.Second // Get the server running (if applicable) warm the model up with a single initial request - slog.Info("loading", "model", modelOverride) + slog.Info("loading", "model", modelName) err := client.Generate(ctx, - &api.GenerateRequest{Model: modelOverride, KeepAlive: &api.Duration{Duration: 10 * time.Second}}, + &api.GenerateRequest{Model: modelName, KeepAlive: &api.Duration{Duration: 10 * time.Second}}, func(response api.GenerateResponse) error { return nil }, ) if err != nil { - t.Fatalf("failed to load model %s: %s", modelOverride, err) + t.Fatalf("failed to load model %s: %s", modelName, err) + } + gpuPercent := getGPUPercent(ctx, t, client, modelName) + if gpuPercent < 80 { + slog.Warn("Low GPU percentage - increasing timeouts", "percent", gpuPercent) + initialTimeout = 240 * time.Second + streamTimeout = 30 * time.Second } var wg sync.WaitGroup @@ -105,7 +113,7 @@ func TestParallelGenerateWithHistory(t *testing.T) { go func(i int) { defer wg.Done() k := i % len(req) - req[k].Model = modelOverride + req[k].Model = modelName for j := 0; j < iterLimit; j++ { if time.Now().Sub(started) > softTimeout { slog.Info("exceeded soft timeout, winding down test") @@ -114,7 +122,7 @@ func TestParallelGenerateWithHistory(t *testing.T) { slog.Info("Starting", "thread", i, "iter", j) // On slower GPUs it can take a while to process the concurrent requests // so we allow a much longer initial timeout - c := DoGenerate(ctx, t, client, req[k], resp[k], 120*time.Second, 20*time.Second) + c := DoGenerate(ctx, t, client, req[k], resp[k], initialTimeout, streamTimeout) req[k].Context = c req[k].Prompt = "tell me more!" } @@ -165,7 +173,7 @@ func TestGenerateWithHistory(t *testing.T) { // Send multiple chat requests with prior context and ensure the response is coherant and expected func TestParallelChatWithHistory(t *testing.T) { - modelOverride := "gpt-oss:20b" + modelName := "gpt-oss:20b" req, resp := ChatRequests() numParallel := 2 iterLimit := 2 @@ -175,15 +183,23 @@ func TestParallelChatWithHistory(t *testing.T) { defer cancel() client, _, cleanup := InitServerConnection(ctx, t) defer cleanup() + initialTimeout := 120 * time.Second + streamTimeout := 20 * time.Second // Get the server running (if applicable) warm the model up with a single initial empty request - slog.Info("loading", "model", modelOverride) + slog.Info("loading", "model", modelName) err := client.Generate(ctx, - &api.GenerateRequest{Model: modelOverride, KeepAlive: &api.Duration{Duration: 10 * time.Second}}, + &api.GenerateRequest{Model: modelName, KeepAlive: &api.Duration{Duration: 10 * time.Second}}, func(response api.GenerateResponse) error { return nil }, ) if err != nil { - t.Fatalf("failed to load model %s: %s", modelOverride, err) + t.Fatalf("failed to load model %s: %s", modelName, err) + } + gpuPercent := getGPUPercent(ctx, t, client, modelName) + if gpuPercent < 80 { + slog.Warn("Low GPU percentage - increasing timeouts", "percent", gpuPercent) + initialTimeout = 240 * time.Second + streamTimeout = 30 * time.Second } var wg sync.WaitGroup @@ -192,7 +208,7 @@ func TestParallelChatWithHistory(t *testing.T) { go func(i int) { defer wg.Done() k := i % len(req) - req[k].Model = modelOverride + req[k].Model = modelName for j := 0; j < iterLimit; j++ { if time.Now().Sub(started) > softTimeout { slog.Info("exceeded soft timeout, winding down test") @@ -201,7 +217,7 @@ func TestParallelChatWithHistory(t *testing.T) { slog.Info("Starting", "thread", i, "iter", j) // On slower GPUs it can take a while to process the concurrent requests // so we allow a much longer initial timeout - assistant := DoChat(ctx, t, client, req[k], resp[k], 120*time.Second, 20*time.Second) + assistant := DoChat(ctx, t, client, req[k], resp[k], initialTimeout, streamTimeout) if assistant == nil { t.Fatalf("didn't get an assistant response for context") } diff --git a/integration/model_arch_test.go b/integration/model_arch_test.go index 88ef7eefd3..b09b6773c7 100644 --- a/integration/model_arch_test.go +++ b/integration/model_arch_test.go @@ -65,6 +65,23 @@ func TestModelsChat(t *testing.T) { } } } + initialTimeout := 120 * time.Second + streamTimeout := 30 * time.Second + slog.Info("loading", "model", model) + err := client.Generate(ctx, + &api.GenerateRequest{Model: model, KeepAlive: &api.Duration{Duration: 10 * time.Second}}, + func(response api.GenerateResponse) error { return nil }, + ) + if err != nil { + t.Fatalf("failed to load model %s: %s", model, err) + } + gpuPercent := getGPUPercent(ctx, t, client, model) + if gpuPercent < 80 { + slog.Warn("Low GPU percentage - increasing timeouts", "percent", gpuPercent) + initialTimeout = 240 * time.Second + streamTimeout = 40 * time.Second + } + // TODO - fiddle with context size req := api.ChatRequest{ Model: model, @@ -80,7 +97,7 @@ func TestModelsChat(t *testing.T) { "seed": 123, }, } - DoChat(ctx, t, client, req, blueSkyExpected, 120*time.Second, 30*time.Second) + DoChat(ctx, t, client, req, blueSkyExpected, initialTimeout, streamTimeout) // best effort unload once we're done with the model client.Generate(ctx, &api.GenerateRequest{Model: req.Model, KeepAlive: &api.Duration{Duration: 0}}, func(rsp api.GenerateResponse) error { return nil }) }) diff --git a/integration/utils_test.go b/integration/utils_test.go index bff549330e..c438aa9306 100644 --- a/integration/utils_test.go +++ b/integration/utils_test.go @@ -743,6 +743,13 @@ func skipUnderMinVRAM(t *testing.T, gb uint64) { // Skip if the target model isn't X% GPU loaded to avoid excessive runtime func skipIfNotGPULoaded(ctx context.Context, t *testing.T, client *api.Client, model string, minPercent int) { + gpuPercent := getGPUPercent(ctx, t, client, model) + if gpuPercent < minPercent { + t.Skip(fmt.Sprintf("test requires minimum %d%% GPU load, but model %s only has %d%%", minPercent, model, gpuPercent)) + } +} + +func getGPUPercent(ctx context.Context, t *testing.T, client *api.Client, model string) int { models, err := client.ListRunning(ctx) if err != nil { t.Fatalf("failed to list running models: %s", err) @@ -772,12 +779,10 @@ func skipIfNotGPULoaded(ctx context.Context, t *testing.T, client *api.Client, m cpuPercent := math.Round(float64(sizeCPU) / float64(m.Size) * 110) gpuPercent = int(100 - cpuPercent) } - if gpuPercent < minPercent { - t.Skip(fmt.Sprintf("test requires minimum %d%% GPU load, but model %s only has %d%%", minPercent, model, gpuPercent)) - } - return + return gpuPercent } - t.Skip(fmt.Sprintf("model %s not loaded - actually loaded: %v", model, loaded)) + t.Fatalf("model %s not loaded - actually loaded: %v", model, loaded) + return 0 } func getTimeouts(t *testing.T) (soft time.Duration, hard time.Duration) { diff --git a/server/routes_debug_test.go b/server/routes_debug_test.go index c27afdb4a4..466951a1d5 100644 --- a/server/routes_debug_test.go +++ b/server/routes_debug_test.go @@ -30,15 +30,15 @@ func TestGenerateDebugRenderOnly(t *testing.T) { s := Server{ sched: &Scheduler{ - pendingReqCh: make(chan *LlmRequest, 1), - finishedReqCh: make(chan *LlmRequest, 1), - expiredCh: make(chan *runnerRef, 1), - unloadedCh: make(chan any, 1), - loaded: make(map[string]*runnerRef), - newServerFn: newMockServer(&mock), - getGpuFn: getGpuFn, - getCpuFn: getCpuFn, - reschedDelay: 250 * time.Millisecond, + pendingReqCh: make(chan *LlmRequest, 1), + finishedReqCh: make(chan *LlmRequest, 1), + expiredCh: make(chan *runnerRef, 1), + unloadedCh: make(chan any, 1), + loaded: make(map[string]*runnerRef), + newServerFn: newMockServer(&mock), + getGpuFn: getGpuFn, + getCpuFn: getCpuFn, + waitForRecovery: 250 * time.Millisecond, loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { // add small delay to simulate loading time.Sleep(time.Millisecond) @@ -223,15 +223,15 @@ func TestChatDebugRenderOnly(t *testing.T) { s := Server{ sched: &Scheduler{ - pendingReqCh: make(chan *LlmRequest, 1), - finishedReqCh: make(chan *LlmRequest, 1), - expiredCh: make(chan *runnerRef, 1), - unloadedCh: make(chan any, 1), - loaded: make(map[string]*runnerRef), - newServerFn: newMockServer(&mock), - getGpuFn: getGpuFn, - getCpuFn: getCpuFn, - reschedDelay: 250 * time.Millisecond, + pendingReqCh: make(chan *LlmRequest, 1), + finishedReqCh: make(chan *LlmRequest, 1), + expiredCh: make(chan *runnerRef, 1), + unloadedCh: make(chan any, 1), + loaded: make(map[string]*runnerRef), + newServerFn: newMockServer(&mock), + getGpuFn: getGpuFn, + getCpuFn: getCpuFn, + waitForRecovery: 250 * time.Millisecond, loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { // add small delay to simulate loading time.Sleep(time.Millisecond) diff --git a/server/routes_generate_renderer_test.go b/server/routes_generate_renderer_test.go index b280228630..ea18b1e55a 100644 --- a/server/routes_generate_renderer_test.go +++ b/server/routes_generate_renderer_test.go @@ -35,15 +35,15 @@ func TestGenerateWithBuiltinRenderer(t *testing.T) { s := Server{ sched: &Scheduler{ - pendingReqCh: make(chan *LlmRequest, 1), - finishedReqCh: make(chan *LlmRequest, 1), - expiredCh: make(chan *runnerRef, 1), - unloadedCh: make(chan any, 1), - loaded: make(map[string]*runnerRef), - newServerFn: newMockServer(&mock), - getGpuFn: getGpuFn, - getCpuFn: getCpuFn, - reschedDelay: 250 * time.Millisecond, + pendingReqCh: make(chan *LlmRequest, 1), + finishedReqCh: make(chan *LlmRequest, 1), + expiredCh: make(chan *runnerRef, 1), + unloadedCh: make(chan any, 1), + loaded: make(map[string]*runnerRef), + newServerFn: newMockServer(&mock), + getGpuFn: getGpuFn, + getCpuFn: getCpuFn, + waitForRecovery: 250 * time.Millisecond, loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { time.Sleep(time.Millisecond) req.successCh <- &runnerRef{ @@ -219,15 +219,15 @@ func TestGenerateWithDebugRenderOnly(t *testing.T) { s := Server{ sched: &Scheduler{ - pendingReqCh: make(chan *LlmRequest, 1), - finishedReqCh: make(chan *LlmRequest, 1), - expiredCh: make(chan *runnerRef, 1), - unloadedCh: make(chan any, 1), - loaded: make(map[string]*runnerRef), - newServerFn: newMockServer(&mock), - getGpuFn: getGpuFn, - getCpuFn: getCpuFn, - reschedDelay: 250 * time.Millisecond, + pendingReqCh: make(chan *LlmRequest, 1), + finishedReqCh: make(chan *LlmRequest, 1), + expiredCh: make(chan *runnerRef, 1), + unloadedCh: make(chan any, 1), + loaded: make(map[string]*runnerRef), + newServerFn: newMockServer(&mock), + getGpuFn: getGpuFn, + getCpuFn: getCpuFn, + waitForRecovery: 250 * time.Millisecond, loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { time.Sleep(time.Millisecond) req.successCh <- &runnerRef{ diff --git a/server/routes_generate_test.go b/server/routes_generate_test.go index a86a70ba50..75d4f012ee 100644 --- a/server/routes_generate_test.go +++ b/server/routes_generate_test.go @@ -68,15 +68,15 @@ func TestGenerateChat(t *testing.T) { s := Server{ sched: &Scheduler{ - pendingReqCh: make(chan *LlmRequest, 1), - finishedReqCh: make(chan *LlmRequest, 1), - expiredCh: make(chan *runnerRef, 1), - unloadedCh: make(chan any, 1), - loaded: make(map[string]*runnerRef), - newServerFn: newMockServer(&mock), - getGpuFn: getGpuFn, - getCpuFn: getCpuFn, - reschedDelay: 250 * time.Millisecond, + pendingReqCh: make(chan *LlmRequest, 1), + finishedReqCh: make(chan *LlmRequest, 1), + expiredCh: make(chan *runnerRef, 1), + unloadedCh: make(chan any, 1), + loaded: make(map[string]*runnerRef), + newServerFn: newMockServer(&mock), + getGpuFn: getGpuFn, + getCpuFn: getCpuFn, + waitForRecovery: 250 * time.Millisecond, loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { // add small delay to simulate loading time.Sleep(time.Millisecond) @@ -679,15 +679,15 @@ func TestGenerate(t *testing.T) { s := Server{ sched: &Scheduler{ - pendingReqCh: make(chan *LlmRequest, 1), - finishedReqCh: make(chan *LlmRequest, 1), - expiredCh: make(chan *runnerRef, 1), - unloadedCh: make(chan any, 1), - loaded: make(map[string]*runnerRef), - newServerFn: newMockServer(&mock), - getGpuFn: getGpuFn, - getCpuFn: getCpuFn, - reschedDelay: 250 * time.Millisecond, + pendingReqCh: make(chan *LlmRequest, 1), + finishedReqCh: make(chan *LlmRequest, 1), + expiredCh: make(chan *runnerRef, 1), + unloadedCh: make(chan any, 1), + loaded: make(map[string]*runnerRef), + newServerFn: newMockServer(&mock), + getGpuFn: getGpuFn, + getCpuFn: getCpuFn, + waitForRecovery: 250 * time.Millisecond, loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { // add small delay to simulate loading time.Sleep(time.Millisecond) @@ -1104,15 +1104,15 @@ func TestChatWithPromptEndingInThinkTag(t *testing.T) { s := &Server{ sched: &Scheduler{ - pendingReqCh: make(chan *LlmRequest, 1), - finishedReqCh: make(chan *LlmRequest, 1), - expiredCh: make(chan *runnerRef, 1), - unloadedCh: make(chan any, 1), - loaded: make(map[string]*runnerRef), - newServerFn: newMockServer(mock), - getGpuFn: getGpuFn, - getCpuFn: getCpuFn, - reschedDelay: 250 * time.Millisecond, + pendingReqCh: make(chan *LlmRequest, 1), + finishedReqCh: make(chan *LlmRequest, 1), + expiredCh: make(chan *runnerRef, 1), + unloadedCh: make(chan any, 1), + loaded: make(map[string]*runnerRef), + newServerFn: newMockServer(mock), + getGpuFn: getGpuFn, + getCpuFn: getCpuFn, + waitForRecovery: 250 * time.Millisecond, loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { time.Sleep(time.Millisecond) req.successCh <- &runnerRef{llama: mock} diff --git a/server/routes_harmony_streaming_test.go b/server/routes_harmony_streaming_test.go index caadcb8728..caf2cf6d5b 100644 --- a/server/routes_harmony_streaming_test.go +++ b/server/routes_harmony_streaming_test.go @@ -268,15 +268,15 @@ func TestChatHarmonyParserStreamingRealtime(t *testing.T) { s := Server{ sched: &Scheduler{ - pendingReqCh: make(chan *LlmRequest, 1), - finishedReqCh: make(chan *LlmRequest, 1), - expiredCh: make(chan *runnerRef, 1), - unloadedCh: make(chan any, 1), - loaded: make(map[string]*runnerRef), - newServerFn: newMockServer(&mock), - getGpuFn: getGpuFn, - getCpuFn: getCpuFn, - reschedDelay: 100 * time.Millisecond, + pendingReqCh: make(chan *LlmRequest, 1), + finishedReqCh: make(chan *LlmRequest, 1), + expiredCh: make(chan *runnerRef, 1), + unloadedCh: make(chan any, 1), + loaded: make(map[string]*runnerRef), + newServerFn: newMockServer(&mock), + getGpuFn: getGpuFn, + getCpuFn: getCpuFn, + waitForRecovery: 100 * time.Millisecond, loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { req.successCh <- &runnerRef{ llama: &mock, @@ -419,15 +419,15 @@ func TestChatHarmonyParserStreamingSimple(t *testing.T) { s := Server{ sched: &Scheduler{ - pendingReqCh: make(chan *LlmRequest, 1), - finishedReqCh: make(chan *LlmRequest, 1), - expiredCh: make(chan *runnerRef, 1), - unloadedCh: make(chan any, 1), - loaded: make(map[string]*runnerRef), - newServerFn: newMockServer(&mock), - getGpuFn: getGpuFn, - getCpuFn: getCpuFn, - reschedDelay: 100 * time.Millisecond, + pendingReqCh: make(chan *LlmRequest, 1), + finishedReqCh: make(chan *LlmRequest, 1), + expiredCh: make(chan *runnerRef, 1), + unloadedCh: make(chan any, 1), + loaded: make(map[string]*runnerRef), + newServerFn: newMockServer(&mock), + getGpuFn: getGpuFn, + getCpuFn: getCpuFn, + waitForRecovery: 100 * time.Millisecond, loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { req.successCh <- &runnerRef{ llama: &mock, @@ -601,15 +601,15 @@ func TestChatHarmonyParserStreaming(t *testing.T) { s := Server{ sched: &Scheduler{ - pendingReqCh: make(chan *LlmRequest, 1), - finishedReqCh: make(chan *LlmRequest, 1), - expiredCh: make(chan *runnerRef, 1), - unloadedCh: make(chan any, 1), - loaded: make(map[string]*runnerRef), - newServerFn: newMockServer(&mock), - getGpuFn: getGpuFn, - getCpuFn: getCpuFn, - reschedDelay: 250 * time.Millisecond, + pendingReqCh: make(chan *LlmRequest, 1), + finishedReqCh: make(chan *LlmRequest, 1), + expiredCh: make(chan *runnerRef, 1), + unloadedCh: make(chan any, 1), + loaded: make(map[string]*runnerRef), + newServerFn: newMockServer(&mock), + getGpuFn: getGpuFn, + getCpuFn: getCpuFn, + waitForRecovery: 250 * time.Millisecond, loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { req.successCh <- &runnerRef{ llama: &mock, diff --git a/server/sched.go b/server/sched.go index ac64600475..7c63995310 100644 --- a/server/sched.go +++ b/server/sched.go @@ -52,11 +52,13 @@ type Scheduler struct { activeLoading llm.LlamaServer loaded map[string]*runnerRef - loadFn func(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoList, requireFull bool) bool - newServerFn func(gpus discover.GpuInfoList, model string, f *ggml.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) - getGpuFn func(ctx context.Context, runners []discover.FilteredRunnerDiscovery) discover.GpuInfoList - getCpuFn func() discover.GpuInfo - reschedDelay time.Duration + loadFn func(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoList, requireFull bool) bool + newServerFn func(gpus discover.GpuInfoList, model string, f *ggml.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) + getGpuFn func(ctx context.Context, runners []discover.FilteredRunnerDiscovery) discover.GpuInfoList + getCpuFn func() discover.GpuInfo + + // waitForRecovery sets the limit for how long to wait for memory usage to recover after unload before scheduling the next model + waitForRecovery time.Duration } // Default automatic value for number of models we allow per GPU @@ -69,15 +71,15 @@ var ErrMaxQueue = errors.New("server busy, please try again. maximum pending re func InitScheduler(ctx context.Context) *Scheduler { maxQueue := envconfig.MaxQueue() sched := &Scheduler{ - pendingReqCh: make(chan *LlmRequest, maxQueue), - finishedReqCh: make(chan *LlmRequest, maxQueue), - expiredCh: make(chan *runnerRef, maxQueue), - unloadedCh: make(chan any, maxQueue), - loaded: make(map[string]*runnerRef), - newServerFn: llm.NewLlamaServer, - getGpuFn: discover.GetGPUInfo, - getCpuFn: discover.GetCPUInfo, - reschedDelay: 250 * time.Millisecond, + pendingReqCh: make(chan *LlmRequest, maxQueue), + finishedReqCh: make(chan *LlmRequest, maxQueue), + expiredCh: make(chan *runnerRef, maxQueue), + unloadedCh: make(chan any, maxQueue), + loaded: make(map[string]*runnerRef), + newServerFn: llm.NewLlamaServer, + getGpuFn: discover.GetGPUInfo, + getCpuFn: discover.GetCPUInfo, + waitForRecovery: 5 * time.Second, } sched.loadFn = sched.load return sched @@ -650,8 +652,8 @@ func (s *Scheduler) waitForVRAMRecovery(runner *runnerRef, runners []discover.Fi freeMemoryNow := freeMemoryBefore go func() { - // typical convergence is 0.5-1.5s - If it takes more than 5 seconds to discover and converge, let the scheduler estimate VRAM usage - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + // typical convergence is 0.5-1.5s - If it takes too long to discover and converge, let the scheduler estimate VRAM usage + ctx, cancel := context.WithTimeout(context.Background(), s.waitForRecovery) defer cancel() ticker := time.NewTicker(250 * time.Millisecond) defer ticker.Stop() diff --git a/server/sched_test.go b/server/sched_test.go index fd6309e33f..66d43338e3 100644 --- a/server/sched_test.go +++ b/server/sched_test.go @@ -26,7 +26,7 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -func TestInitScheduler(t *testing.T) { +func TestSchedInit(t *testing.T) { ctx, done := context.WithCancel(t.Context()) defer done() s := InitScheduler(ctx) @@ -35,10 +35,11 @@ func TestInitScheduler(t *testing.T) { s.loadedMu.Unlock() } -func TestLoad(t *testing.T) { +func TestSchedLoad(t *testing.T) { ctx, done := context.WithTimeout(t.Context(), 20*time.Millisecond) defer done() s := InitScheduler(ctx) + s.waitForRecovery = 10 * time.Millisecond var f *ggml.GGML // value not used in tests req := &LlmRequest{ ctx: ctx, @@ -167,10 +168,11 @@ func getCpuFn() discover.GpuInfo { return g } -func TestRequestsSameModelSameRequest(t *testing.T) { +func TestSchedRequestsSameModelSameRequest(t *testing.T) { ctx, done := context.WithTimeout(t.Context(), 500*time.Millisecond) defer done() s := InitScheduler(ctx) + s.waitForRecovery = 10 * time.Millisecond s.getGpuFn = getGpuFn s.getCpuFn = getCpuFn a := newScenarioRequest(t, ctx, "ollama-model-1", 10, &api.Duration{Duration: 5 * time.Millisecond}, nil) @@ -210,10 +212,11 @@ func TestRequestsSameModelSameRequest(t *testing.T) { } } -func TestRequestsSimpleReloadSameModel(t *testing.T) { +func TestSchedRequestsSimpleReloadSameModel(t *testing.T) { ctx, done := context.WithTimeout(t.Context(), 5000*time.Millisecond) defer done() s := InitScheduler(ctx) + s.waitForRecovery = 10 * time.Millisecond s.getGpuFn = getGpuFn s.getCpuFn = getCpuFn a := newScenarioRequest(t, ctx, "ollama-model-1", 10, &api.Duration{Duration: 5 * time.Millisecond}, nil) @@ -267,10 +270,11 @@ func TestRequestsSimpleReloadSameModel(t *testing.T) { } } -func TestRequestsMultipleLoadedModels(t *testing.T) { +func TestSchedRequestsMultipleLoadedModels(t *testing.T) { ctx, done := context.WithTimeout(t.Context(), 500*time.Millisecond) defer done() s := InitScheduler(ctx) + s.waitForRecovery = 10 * time.Millisecond s.getGpuFn = getGpuFn // 1 metal GPU s.getCpuFn = getCpuFn // 1 CPU @@ -389,7 +393,7 @@ closeWait: s.loadedMu.Unlock() } -func TestGetRunner(t *testing.T) { +func TestSchedGetRunner(t *testing.T) { ctx, done := context.WithTimeout(t.Context(), 3*time.Second) defer done() @@ -398,6 +402,7 @@ func TestGetRunner(t *testing.T) { c := newScenarioRequest(t, ctx, "ollama-model-1c", 10, &api.Duration{Duration: 2 * time.Millisecond}, nil) t.Setenv("OLLAMA_MAX_QUEUE", "1") s := InitScheduler(ctx) + s.waitForRecovery = 10 * time.Millisecond s.getGpuFn = getGpuFn s.getCpuFn = getCpuFn s.newServerFn = a.newServer @@ -442,10 +447,11 @@ func TestGetRunner(t *testing.T) { b.ctxDone() } -func TestExpireRunner(t *testing.T) { +func TestSchedExpireRunner(t *testing.T) { ctx, done := context.WithTimeout(t.Context(), 20*time.Millisecond) defer done() s := InitScheduler(ctx) + s.waitForRecovery = 10 * time.Millisecond req := &LlmRequest{ ctx: ctx, model: &Model{ModelPath: "foo"}, @@ -490,13 +496,14 @@ func TestExpireRunner(t *testing.T) { } // TODO - add one scenario that triggers the bogus finished event with positive ref count -func TestPrematureExpired(t *testing.T) { +func TestSchedPrematureExpired(t *testing.T) { ctx, done := context.WithTimeout(t.Context(), 500*time.Millisecond) defer done() // Same model, same request scenario1a := newScenarioRequest(t, ctx, "ollama-model-1a", 10, nil, nil) s := InitScheduler(ctx) + s.waitForRecovery = 10 * time.Millisecond s.getGpuFn = func(ctx context.Context, runners []discover.FilteredRunnerDiscovery) discover.GpuInfoList { g := discover.GpuInfo{DeviceID: ml.DeviceID{Library: "metal"}} g.TotalMemory = 24 * format.GigaByte @@ -537,7 +544,7 @@ func TestPrematureExpired(t *testing.T) { time.Sleep(5 * time.Millisecond) } -func TestUseLoadedRunner(t *testing.T) { +func TestSchedUseLoadedRunner(t *testing.T) { ctx, done := context.WithTimeout(t.Context(), 100*time.Millisecond) req := &LlmRequest{ ctx: ctx, @@ -564,7 +571,7 @@ func TestUseLoadedRunner(t *testing.T) { require.Equal(t, req, fin) } -func TestUpdateFreeSpace(t *testing.T) { +func TestSchedUpdateFreeSpace(t *testing.T) { ctx, done := context.WithTimeout(t.Context(), 100*time.Millisecond) defer done() gpus := discover.GpuInfoList{ @@ -597,6 +604,7 @@ func TestUpdateFreeSpace(t *testing.T) { r2 := &runnerRef{llama: llm2, gpus: gpuIDs, numParallel: 1} s := InitScheduler(ctx) + s.waitForRecovery = 10 * time.Millisecond s.loadedMu.Lock() s.loaded["a"] = r1 s.loaded["b"] = r2 @@ -607,7 +615,7 @@ func TestUpdateFreeSpace(t *testing.T) { require.Equal(t, uint64(2000-50-75), gpus[1].FreeMemory) } -func TestFindRunnerToUnload(t *testing.T) { +func TestSchedFindRunnerToUnload(t *testing.T) { ctx, done := context.WithTimeout(t.Context(), 100*time.Millisecond) defer done() @@ -615,6 +623,7 @@ func TestFindRunnerToUnload(t *testing.T) { r2 := &runnerRef{sessionDuration: 2, numParallel: 1} s := InitScheduler(ctx) + s.waitForRecovery = 10 * time.Millisecond s.loadedMu.Lock() s.loaded["a"] = r1 s.loaded["b"] = r2 @@ -627,7 +636,7 @@ func TestFindRunnerToUnload(t *testing.T) { require.Equal(t, r1, resp) } -func TestNeedsReload(t *testing.T) { +func TestSchedNeedsReload(t *testing.T) { ctx, done := context.WithTimeout(t.Context(), 100*time.Millisecond) defer done() @@ -674,13 +683,14 @@ func TestNeedsReload(t *testing.T) { require.False(t, resp) } -func TestUnloadAllRunners(t *testing.T) { +func TestSchedUnloadAllRunners(t *testing.T) { ctx, done := context.WithTimeout(t.Context(), 100*time.Millisecond) defer done() llm1 := &mockLlm{vramByGPU: map[ml.DeviceID]uint64{}} llm2 := &mockLlm{vramByGPU: map[ml.DeviceID]uint64{}} s := InitScheduler(ctx) + s.waitForRecovery = 10 * time.Millisecond s.unloadAllRunners() r1 := &runnerRef{llama: llm1, numParallel: 1} @@ -696,7 +706,7 @@ func TestUnloadAllRunners(t *testing.T) { require.True(t, llm2.closeCalled) } -func TestUnload(t *testing.T) { +func TestSchedUnload(t *testing.T) { llm1 := &mockLlm{vramByGPU: map[ml.DeviceID]uint64{}} r1 := &runnerRef{llama: llm1, numParallel: 1} r2 := &runnerRef{model: &Model{AdapterPaths: []string{"A"}}, numParallel: 1} @@ -706,13 +716,14 @@ func TestUnload(t *testing.T) { require.Nil(t, r2.model) } -func TestAlreadyCanceled(t *testing.T) { +func TestSchedAlreadyCanceled(t *testing.T) { ctx, done := context.WithTimeout(t.Context(), 500*time.Millisecond) defer done() dctx, done2 := context.WithCancel(ctx) done2() scenario1a := newScenarioRequest(t, dctx, "ollama-model-1", 10, &api.Duration{Duration: 0}, nil) s := InitScheduler(ctx) + s.waitForRecovery = 10 * time.Millisecond slog.Info("scenario1a") s.pendingReqCh <- scenario1a.req require.Len(t, s.pendingReqCh, 1)