DRY out the runner lifecycle code (#12540)

* DRY out the runner lifecycle code

Now that discovery uses the runners as well, this unifies the runner spawning code
into a single place.  This also unifies GPU discovery types with the newer ml.DeviceInfo

* win: make incremental builds better

Place build artifacts in discrete directories so incremental builds don't have to start fresh

* Adjust sort order to consider iGPUs

* handle cpu inference oom scenarios

* review comments
This commit is contained in:
Daniel Hiltgen
2025-10-23 11:20:02 -07:00
committed by GitHub
parent 1c093e97af
commit 3258a89b6e
16 changed files with 720 additions and 924 deletions

View File

@@ -5,12 +5,9 @@ import (
"errors"
"fmt"
"log/slog"
"os"
"reflect"
"runtime"
"slices"
"sort"
"strconv"
"strings"
"sync"
"time"
@@ -52,12 +49,10 @@ type Scheduler struct {
activeLoading llm.LlamaServer
loaded map[string]*runnerRef
loadFn func(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoList, requireFull bool) bool
newServerFn func(gpus discover.GpuInfoList, model string, f *ggml.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error)
getGpuFn func(ctx context.Context, runners []discover.FilteredRunnerDiscovery) discover.GpuInfoList
getCpuFn func() discover.GpuInfo
// waitForRecovery sets the limit for how long to wait for memory usage to recover after unload before scheduling the next model
loadFn func(req *LlmRequest, f *ggml.GGML, systemInfo ml.SystemInfo, gpus []ml.DeviceInfo, requireFull bool) bool
newServerFn func(systemInfo ml.SystemInfo, gpus []ml.DeviceInfo, model string, f *ggml.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error)
getGpuFn func(ctx context.Context, runners []ml.FilteredRunnerDiscovery) []ml.DeviceInfo
getSystemInfoFn func() ml.SystemInfo
waitForRecovery time.Duration
}
@@ -77,8 +72,8 @@ func InitScheduler(ctx context.Context) *Scheduler {
unloadedCh: make(chan any, maxQueue),
loaded: make(map[string]*runnerRef),
newServerFn: llm.NewLlamaServer,
getGpuFn: discover.GetGPUInfo,
getCpuFn: discover.GetCPUInfo,
getGpuFn: discover.GPUDevices,
getSystemInfoFn: discover.GetSystemInfo,
waitForRecovery: 5 * time.Second,
}
sched.loadFn = sched.load
@@ -133,6 +128,8 @@ func (s *Scheduler) Run(ctx context.Context) {
}
func (s *Scheduler) processPending(ctx context.Context) {
maxRunners := envconfig.MaxRunners()
for {
select {
case <-ctx.Done():
@@ -152,7 +149,7 @@ func (s *Scheduler) processPending(ctx context.Context) {
s.loadedMu.Lock()
runner := s.loaded[pending.model.ModelPath]
loadedCount := len(s.loaded)
runnersSnapshot := make([]discover.FilteredRunnerDiscovery, 0, len(s.loaded))
runnersSnapshot := make([]ml.FilteredRunnerDiscovery, 0, len(s.loaded))
for _, r := range s.loaded {
runnersSnapshot = append(runnersSnapshot, r)
}
@@ -167,39 +164,29 @@ func (s *Scheduler) processPending(ctx context.Context) {
pending.useLoadedRunner(runner, s.finishedReqCh)
break
}
} else if envconfig.MaxRunners() > 0 && loadedCount >= int(envconfig.MaxRunners()) {
} else if maxRunners > 0 && loadedCount >= int(maxRunners) {
slog.Debug("max runners achieved, unloading one to make room", "runner_count", loadedCount)
runnerToExpire = s.findRunnerToUnload()
} else {
// Either no models are loaded or below envconfig.MaxRunners
// Get a refreshed GPU list
var gpus discover.GpuInfoList
var gpus []ml.DeviceInfo
if pending.opts.NumGPU == 0 {
gpus = discover.GpuInfoList{s.getCpuFn()}
gpus = []ml.DeviceInfo{}
} else {
gpus = s.getGpuFn(ctx, runnersSnapshot)
}
if envconfig.MaxRunners() <= 0 {
// No user specified MaxRunners, so figure out what automatic setting to use
// If all GPUs have reliable free memory reporting, defaultModelsPerGPU * the number of GPUs
// if any GPU has unreliable free memory reporting, 1x the number of GPUs
allReliable := true
for _, gpu := range gpus {
if gpu.UnreliableFreeMemory {
allReliable = false
break
}
}
if allReliable {
// HACK
os.Setenv("OLLAMA_MAX_LOADED_MODELS", strconv.Itoa(defaultModelsPerGPU*len(gpus)))
slog.Debug("updating default concurrency", "OLLAMA_MAX_LOADED_MODELS", envconfig.MaxRunners(), "gpu_count", len(gpus))
systemInfo := s.getSystemInfoFn()
if maxRunners <= 0 {
// No user specified MaxRunners, so figure out what automatic setting to use for the next load attempt
if pending.opts.NumGPU == 0 {
// Need to get actual GPU list to set the correct default max models
g := s.getGpuFn(ctx, runnersSnapshot)
maxRunners = uint(defaultModelsPerGPU * max(len(g), 1))
} else {
// HACK
os.Setenv("OLLAMA_MAX_LOADED_MODELS", strconv.Itoa(len(gpus)))
slog.Info("one or more GPUs detected that are unable to accurately report free memory - disabling default concurrency")
maxRunners = uint(defaultModelsPerGPU * max(len(gpus), 1))
}
slog.Debug("updating default concurrency", "OLLAMA_MAX_LOADED_MODELS", maxRunners, "gpu_count", len(gpus))
}
// Load model for fitting
@@ -215,14 +202,14 @@ func (s *Scheduler) processPending(ctx context.Context) {
if loadedCount == 0 {
// No models loaded. Load the model but prefer the best fit.
slog.Debug("loading first model", "model", pending.model.ModelPath)
s.loadFn(pending, ggml, gpus, false)
s.loadFn(pending, ggml, systemInfo, gpus, false)
break
}
// More than one loaded model, so we have to see if the
// new one fits
needEvict := s.loadFn(pending, ggml, gpus, true)
needEvict := s.loadFn(pending, ggml, systemInfo, gpus, true)
if !needEvict {
slog.Debug("new model fits with existing models, loading")
break
@@ -353,7 +340,7 @@ func (s *Scheduler) processCompleted(ctx context.Context) {
runner.refMu.Unlock()
} else {
slog.Debug("starting background wait for VRAM recovery", "runner", runner)
runnersSnapshot := make([]discover.FilteredRunnerDiscovery, 0, len(s.loaded))
runnersSnapshot := make([]ml.FilteredRunnerDiscovery, 0, len(s.loaded))
for _, r := range s.loaded {
runnersSnapshot = append(runnersSnapshot, r)
}
@@ -395,7 +382,7 @@ func (pending *LlmRequest) useLoadedRunner(runner *runnerRef, finished chan *Llm
// load creates a new model based on req and loads it. If requireFull is true then the model must be loaded fully onto GPUs
// (if any). Returns whether the scheduler needs to evict a model to make this one fit.
func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoList, requireFull bool) bool {
func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, systemInfo ml.SystemInfo, gpus []ml.DeviceInfo, requireFull bool) bool {
numParallel := max(int(envconfig.NumParallel()), 1)
// Embedding models should always be loaded with parallel=1
@@ -420,7 +407,7 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis
if llama == nil {
var err error
llama, err = s.newServerFn(gpus, req.model.ModelPath, f, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts, numParallel)
llama, err = s.newServerFn(systemInfo, gpus, req.model.ModelPath, f, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts, numParallel)
if err != nil {
// some older models are not compatible with newer versions of llama.cpp
// show a generalized compatibility error until there is a better way to
@@ -443,9 +430,16 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis
s.loadedMu.Unlock()
gpuIDs, err := llama.Load(req.ctx, gpus, requireFull)
gpuIDs, err := llama.Load(req.ctx, systemInfo, gpus, requireFull)
if err != nil {
if errors.Is(err, llm.ErrLoadRequiredFull) {
if !requireFull {
// No other models loaded, yet we still don't fit, so report an error
slog.Info("model is too large for system memory", "requireFull", requireFull)
s.activeLoading.Close()
s.activeLoading = nil
req.errCh <- err
}
return true
}
@@ -456,6 +450,20 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis
return false
}
// Determine if we have discrete GPUs which we should monitor VRAM usage on during shutdown
discreteGPUs := false
iGPUScan:
for _, devid := range gpuIDs {
for _, dev := range gpus {
if dev.DeviceID == devid {
if !dev.Integrated {
discreteGPUs = true
break iGPUScan
}
}
}
}
runner := &runnerRef{
model: req.model,
modelPath: req.model.ModelPath,
@@ -463,6 +471,7 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis
Options: &req.opts,
sessionDuration: sessionDuration,
gpus: gpuIDs,
discreteGPUs: discreteGPUs,
vramSize: llama.VRAMSize(),
totalSize: llama.TotalSize(),
loading: true,
@@ -510,7 +519,10 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis
return false
}
func (s *Scheduler) updateFreeSpace(allGpus discover.GpuInfoList) {
func (s *Scheduler) updateFreeSpace(allGpus []ml.DeviceInfo) {
if len(allGpus) == 0 {
return
}
predMap := map[ml.DeviceID]uint64{} // Sum up the total predicted usage per GPU for all runners
s.loadedMu.Lock()
runners := make([]*runnerRef, 0, len(s.loaded))
@@ -554,12 +566,13 @@ type runnerRef struct {
refMu sync.Mutex
refCount uint // prevent unloading if > 0
llama llm.LlamaServer
pid int
loading bool // True only during initial load, then false forever
gpus []ml.DeviceID // Recorded at time of provisioning
vramSize uint64
totalSize uint64
llama llm.LlamaServer
pid int
loading bool // True only during initial load, then false forever
gpus []ml.DeviceID // Recorded at time of provisioning
discreteGPUs bool // True if all devices are discrete GPUs - used to skip VRAM recovery check for iGPUs
vramSize uint64
totalSize uint64
sessionDuration time.Duration
expireTimer *time.Timer
@@ -627,14 +640,12 @@ func (runner *runnerRef) needsReload(ctx context.Context, req *LlmRequest) bool
// a before and after GPU memory allocation. The returned channel
// will be notified when we're done waiting, or have timed out and should
// proceed anyway
func (s *Scheduler) waitForVRAMRecovery(runner *runnerRef, runners []discover.FilteredRunnerDiscovery) chan any {
func (s *Scheduler) waitForVRAMRecovery(runner *runnerRef, runners []ml.FilteredRunnerDiscovery) chan any {
finished := make(chan any, 1)
// CPU or Metal don't need checking, so no waiting required
// windows can page VRAM, only cuda currently can report accurate used vram usage
if len(runner.gpus) == 0 ||
(len(runner.gpus) == 1 && (runner.gpus[0].Library == "cpu" || runner.gpus[0].Library == "Metal")) ||
(runtime.GOOS == "windows" && runner.gpus[0].Library != "CUDA") {
// CPU, Metal and iGPUs don't need checking, so no waiting required
if len(runner.gpus) == 0 || !runner.discreteGPUs ||
(len(runner.gpus) == 1 && runner.gpus[0].Library == "Metal") {
finished <- struct{}{}
slog.Debug("no need to wait for VRAM recovery", "runner", runner)
return finished
@@ -668,7 +679,11 @@ func (s *Scheduler) waitForVRAMRecovery(runner *runnerRef, runners []discover.Fi
totalMemoryNow += gpu.TotalMemory
freeMemoryNow += gpu.FreeMemory
}
logutil.Trace("gpu VRAM convergence", "percent", int(max(float32(freeMemoryNow-freeMemoryBefore), 0.0)/float32(runner.vramSize)*100))
if freeMemoryNow > freeMemoryBefore {
logutil.Trace("gpu VRAM convergence", "percent", int(float32(freeMemoryNow-freeMemoryBefore)/float32(runner.vramSize)*100))
} else {
logutil.Trace("gpu VRAM convergence", "percent", 0)
}
// If we're within ~75% of the estimated memory usage recovered, bail out
if float32(freeMemoryNow-freeMemoryBefore) > float32(runner.vramSize)*0.75 {
slog.Debug(fmt.Sprintf("gpu VRAM free memory converged after %0.2f seconds", time.Since(start).Seconds()), "free_before", format.HumanBytes2(freeMemoryBefore), "free_now", format.HumanBytes2(freeMemoryNow), "runner", runner)