perf: build graph for next batch async to keep GPU busy (#11863)

* perf: build graph for next batch in parallel to keep GPU busy

This refactors the main run loop of the ollama runner to perform the main GPU
intensive tasks (Compute+Floats) in a go routine so we can prepare the next
batch in parallel to reduce the amount of time the GPU stalls waiting for the
next batch of work.

* tests: tune integration tests for ollama engine

This tunes the integration tests to focus more on models supported
by the new engine.
This commit is contained in:
Daniel Hiltgen
2025-08-29 14:20:28 -07:00
committed by GitHub
parent ead4a9a1d0
commit 517807cdf2
20 changed files with 591 additions and 235 deletions

View File

@@ -9,6 +9,7 @@ import (
"fmt"
"io"
"log/slog"
"math"
"math/rand"
"net"
"net/http"
@@ -25,11 +26,11 @@ import (
"github.com/ollama/ollama/api"
"github.com/ollama/ollama/app/lifecycle"
"github.com/ollama/ollama/format"
"github.com/stretchr/testify/require"
)
var (
smol = "llama3.2:1b"
smol = "llama3.2:1b"
stream = false
)
var (
@@ -435,7 +436,9 @@ func InitServerConnection(ctx context.Context, t *testing.T) (*api.Client, strin
}
lifecycle.ServerLogFile = fp.Name()
fp.Close()
require.NoError(t, startServer(t, ctx, testEndpoint))
if err := startServer(t, ctx, testEndpoint); err != nil {
t.Fatal(err)
}
}
return client, testEndpoint, func() {
@@ -468,7 +471,9 @@ func InitServerConnection(ctx context.Context, t *testing.T) (*api.Client, strin
func GenerateTestHelper(ctx context.Context, t *testing.T, genReq api.GenerateRequest, anyResp []string) {
client, _, cleanup := InitServerConnection(ctx, t)
defer cleanup()
require.NoError(t, PullIfMissing(ctx, client, genReq.Model))
if err := PullIfMissing(ctx, client, genReq.Model); err != nil {
t.Fatal(err)
}
DoGenerate(ctx, t, client, genReq, anyResp, 30*time.Second, 10*time.Second)
}
@@ -509,7 +514,9 @@ func DoGenerate(ctx context.Context, t *testing.T, client *api.Client, genReq ap
slog.Warn("model is too large for the target test system", "model", genReq.Model, "error", genErr)
return context
}
require.NoError(t, genErr, "failed with %s request prompt %s ", genReq.Model, genReq.Prompt)
if genErr != nil {
t.Fatalf("%s failed with %s request prompt %s", genErr, genReq.Model, genReq.Prompt)
}
// Verify the response contains the expected data
response := buf.String()
atLeastOne := false
@@ -519,7 +526,9 @@ func DoGenerate(ctx context.Context, t *testing.T, client *api.Client, genReq ap
break
}
}
require.True(t, atLeastOne, "%s: none of %v found in %s", genReq.Model, anyResp, response)
if !atLeastOne {
t.Fatalf("%s: none of %v found in %s", genReq.Model, anyResp, response)
}
slog.Info("test pass", "model", genReq.Model, "prompt", genReq.Prompt, "contains", anyResp, "response", response)
case <-ctx.Done():
t.Error("outer test context done while waiting for generate")
@@ -561,17 +570,97 @@ func GenerateRequests() ([]api.GenerateRequest, [][]string) {
[][]string{
{"sunlight", "scattering", "interact", "color", "surface", "depth", "red", "orange", "yellow", "absorbs", "wavelength"},
{"soil", "organic", "earth", "black", "tan", "chemical", "processes", "pigments", "particles", "iron oxide", "rust", "air", "water", "mixture", "mixing"},
{"england", "english", "massachusetts", "pilgrims", "colonists", "independence", "british", "feast", "family", "gatherings", "traditions", "turkey", "colonial", "period", "harvest", "agricultural", "european settlers", "american revolution", "civil war", "16th century", "17th century", "native american", "united states"},
{"england", "english", "massachusetts", "pilgrims", "colonists", "independence", "british", "feast", "family", "gatherings", "traditions", "turkey", "colonial", "period", "harvest", "agricultural", "european settlers", "american revolution", "civil war", "16th century", "17th century", "native american", "united states", "cultural", "hardship", "autumn", "festival"},
{"fourth", "july", "declaration", "independence"},
{"nitrogen", "oxygen", "carbon", "dioxide"},
}
}
func DoChat(ctx context.Context, t *testing.T, client *api.Client, req api.ChatRequest, anyResp []string, initialTimeout, streamTimeout time.Duration) *api.Message {
stallTimer := time.NewTimer(initialTimeout)
var buf bytes.Buffer
role := "assistant"
fn := func(response api.ChatResponse) error {
// fmt.Print(".")
role = response.Message.Role
buf.Write([]byte(response.Message.Content))
if !stallTimer.Reset(streamTimeout) {
return errors.New("stall was detected while streaming response, aborting")
}
return nil
}
stream := true
req.Stream = &stream
done := make(chan int)
var genErr error
go func() {
genErr = client.Chat(ctx, &req, fn)
done <- 0
}()
select {
case <-stallTimer.C:
if buf.Len() == 0 {
t.Errorf("generate never started. Timed out after :%s", initialTimeout.String())
} else {
t.Errorf("generate stalled. Response so far:%s", buf.String())
}
case <-done:
if genErr != nil && strings.Contains(genErr.Error(), "model requires more system memory") {
slog.Warn("model is too large for the target test system", "model", req.Model, "error", genErr)
return nil
}
if genErr != nil {
t.Fatalf("%s failed with %s request prompt %v", genErr, req.Model, req.Messages)
}
// Verify the response contains the expected data
response := buf.String()
atLeastOne := false
for _, resp := range anyResp {
if strings.Contains(strings.ToLower(response), resp) {
atLeastOne = true
break
}
}
if !atLeastOne {
t.Fatalf("%s: none of %v found in \"%s\" -- request was:%v", req.Model, anyResp, response, req.Messages)
}
slog.Info("test pass", "model", req.Model, "messages", req.Messages, "contains", anyResp, "response", response)
case <-ctx.Done():
t.Error("outer test context done while waiting for generate")
}
return &api.Message{Role: role, Content: buf.String()}
}
func ChatRequests() ([]api.ChatRequest, [][]string) {
genReqs, results := GenerateRequests()
reqs := make([]api.ChatRequest, len(genReqs))
// think := api.ThinkValue{Value: "low"}
for i := range reqs {
reqs[i].Model = genReqs[i].Model
reqs[i].Stream = genReqs[i].Stream
reqs[i].KeepAlive = genReqs[i].KeepAlive
// reqs[i].Think = &think
reqs[i].Messages = []api.Message{
{
Role: "user",
Content: genReqs[i].Prompt,
},
}
}
return reqs, results
}
func skipUnderMinVRAM(t *testing.T, gb uint64) {
// TODO use info API in the future
if s := os.Getenv("OLLAMA_MAX_VRAM"); s != "" {
maxVram, err := strconv.ParseUint(s, 10, 64)
require.NoError(t, err)
if err != nil {
t.Fatal(err)
}
// Don't hammer on small VRAM cards...
if maxVram < gb*format.GibiByte {
t.Skip("skipping with small VRAM to avoid timeouts")
@@ -579,6 +668,39 @@ func skipUnderMinVRAM(t *testing.T, gb uint64) {
}
}
// Skip if the target model isn't X% GPU loaded to avoid excessive runtime
func skipIfNotGPULoaded(ctx context.Context, t *testing.T, client *api.Client, model string, minPercent int) {
models, err := client.ListRunning(ctx)
if err != nil {
t.Fatalf("failed to list running models: %s", err)
}
loaded := []string{}
for _, m := range models.Models {
loaded = append(loaded, m.Name)
if m.Name != model {
continue
}
gpuPercent := 0
switch {
case m.SizeVRAM == 0:
gpuPercent = 0
case m.SizeVRAM == m.Size:
gpuPercent = 100
case m.SizeVRAM > m.Size || m.Size == 0:
t.Logf("unexpected size detected: %d", m.SizeVRAM)
default:
sizeCPU := m.Size - m.SizeVRAM
cpuPercent := math.Round(float64(sizeCPU) / float64(m.Size) * 110)
gpuPercent = int(100 - cpuPercent)
}
if gpuPercent < minPercent {
t.Skip(fmt.Sprintf("test requires minimum %d%% GPU load, but model %s only has %d%%", minPercent, model, gpuPercent))
}
return
}
t.Skip(fmt.Sprintf("model %s not loaded - actually loaded: %v", model, loaded))
}
func getTimeouts(t *testing.T) (soft time.Duration, hard time.Duration) {
deadline, hasDeadline := t.Deadline()
if !hasDeadline {