ml/backend/ggml: update model loading for hybrid/multi backends

use a similar strategy as llama.cpp for deciding where tensors should be
allocated. this will be improved later to be aware of usable memory
before assigning the tensor
This commit is contained in:
Michael Yang 2025-02-19 14:26:40 -08:00
parent 0682dae027
commit bab6f34dc0
3 changed files with 249 additions and 147 deletions

View File

@ -9,67 +9,46 @@ package ggml
import "C"
import (
"errors"
"fmt"
"io"
"iter"
"log/slog"
"maps"
"os"
"sync"
"slices"
"strconv"
"strings"
"unicode"
"unsafe"
"github.com/ollama/ollama/format"
fs "github.com/ollama/ollama/fs/ggml"
"github.com/ollama/ollama/ml"
"golang.org/x/sync/errgroup"
ggml "github.com/ollama/ollama/ml/backend/ggml/ggml/src"
)
type device struct {
d *C.struct_ggml_backend_device
}
func (d device) LogValue() slog.Value {
var free, total uint64
C.ggml_backend_dev_memory(d.d, (*C.size_t)(&free), (*C.size_t)(&total))
kind := "unknown"
switch C.ggml_backend_dev_type(d.d) {
case C.GGML_BACKEND_DEVICE_TYPE_CPU:
kind = "cpu"
case C.GGML_BACKEND_DEVICE_TYPE_GPU:
kind = "gpu"
case C.GGML_BACKEND_DEVICE_TYPE_ACCEL:
kind = "accel"
func devices() iter.Seq[*C.struct_ggml_backend_device] {
return func(yield func(*C.struct_ggml_backend_device) bool) {
for i := range C.ggml_backend_dev_count() {
if !yield(C.ggml_backend_dev_get(i)) {
return
}
}
}
return slog.GroupValue(
slog.String("name", C.GoString(C.ggml_backend_dev_name(d.d))),
slog.String("description", C.GoString(C.ggml_backend_dev_description(d.d))),
slog.String("kind", kind),
slog.String("free", format.HumanBytes2(free)),
slog.String("total", format.HumanBytes2(total)),
)
}
var devices = sync.OnceValue(func() []device {
ggml.OnceLoad()
s := make([]device, C.ggml_backend_dev_count())
for i := range s {
s[i] = device{C.ggml_backend_dev_get(C.size_t(i))}
}
return s
})
type Backend struct {
meta *fs.GGML
flashAttention bool
meta *fs.GGML
cpus, gpus []Context
tensors map[string]*Context
sched *C.struct_ggml_backend_sched
tensors map[string]*C.struct_ggml_tensor
ctxs []*C.struct_ggml_context
backends []*C.struct_ggml_backend
bufts []*C.struct_ggml_backend_buffer_type
}
func New(r *os.File, params ml.BackendParams) (ml.Backend, error) {
@ -88,100 +67,226 @@ func New(r *os.File, params ml.BackendParams) (ml.Backend, error) {
"num_key_values", len(meta.KV()),
)
var cpus, gpus []Context
for _, d := range devices() {
switch C.ggml_backend_dev_type(d.d) {
type dbt struct {
d *C.struct_ggml_backend_device
bts []*C.struct_ggml_backend_buffer_type
}
var cpus, accels, gpus []*C.struct_ggml_backend_device
for d := range devices() {
switch C.ggml_backend_dev_type(d) {
case C.GGML_BACKEND_DEVICE_TYPE_CPU:
cpus = append(cpus, d)
case C.GGML_BACKEND_DEVICE_TYPE_ACCEL:
accels = append(accels, d)
case C.GGML_BACKEND_DEVICE_TYPE_GPU:
gpus = append(gpus, d)
}
}
var cpuBufferTypes []*C.struct_ggml_backend_buffer_type
for _, d := range append(accels, append(gpus, cpus...)...) {
switch C.ggml_backend_dev_type(d) {
case C.GGML_BACKEND_DEVICE_TYPE_CPU,
C.GGML_BACKEND_DEVICE_TYPE_ACCEL:
slog.Info("cpu", "device", d)
cpus = append(cpus, Context{
ctx: C.ggml_init(C.struct_ggml_init_params{
mem_size: C.size_t(int(C.ggml_tensor_overhead()) * (len(meta.Tensors().Items()) + 1 + int(meta.KV().BlockCount())*2)),
no_alloc: true,
}),
backend: C.ggml_backend_dev_init(d.d, nil),
})
case C.GGML_BACKEND_DEVICE_TYPE_GPU:
slog.Info("gpu", "device", d)
gpus = append(gpus, Context{
ctx: C.ggml_init(C.struct_ggml_init_params{
mem_size: C.size_t(int(C.ggml_tensor_overhead()) * (len(meta.Tensors().Items()) + 1 + int(meta.KV().BlockCount())*2)),
no_alloc: true,
}),
backend: C.ggml_backend_dev_init(d.d, nil),
})
cpuBufferTypes = append(cpuBufferTypes, C.ggml_backend_dev_buffer_type(d))
}
}
ctxFunc := func(s []Context) (*Context, error) {
for _, e := range s {
return &e, nil
}
var sum uint64
var cumsum []uint64
return nil, fmt.Errorf("no devices available")
var gpuBufferTypes []dbt
for _, d := range gpus {
var free, total C.size_t
C.ggml_backend_dev_memory(d, &free, &total)
sum += uint64(free)
cumsum = append(cumsum, sum)
bt := C.ggml_backend_dev_buffer_type(d)
gpuBufferTypes = append(gpuBufferTypes, dbt{
d: d,
bts: append([]*C.struct_ggml_backend_buffer_type{bt}, cpuBufferTypes...),
})
}
tensors := make(map[*fs.Tensor]*Context, len(meta.Tensors().Items()))
for _, t := range meta.Tensors().Items() {
c, err := ctxFunc(append(gpus, cpus...))
if err != nil {
return nil, err
}
splits := make([]float64, len(cumsum))
for i := range splits {
splits[i] = float64(cumsum[i]) / float64(sum)
}
func() {
tt := C.ggml_new_tensor(c.ctx, t.Kind, C.int(len(t.Shape)), (*C.int64_t)(unsafe.Pointer(&t.Shape[0])))
input := dbt{C.ggml_backend_dev_by_type(C.GGML_BACKEND_DEVICE_TYPE_CPU), cpuBufferTypes}
slog.Info("input layer", "device", C.GoString(C.ggml_backend_dev_name(input.d)))
var blocks int
for key, value := range meta.KV() {
if strings.HasSuffix(key, ".block_count") {
blocks += int(value.(uint32))
}
}
indexFunc := func(i int) func(float64) bool {
return func(f float64) bool {
return float64(i)/float64(blocks+1) < f
}
}
layers := make([]dbt, blocks)
for i := range layers {
layers[i] = gpuBufferTypes[slices.IndexFunc(splits, indexFunc(i))]
slog.Info("layer", "i", i, "device", C.GoString(C.ggml_backend_dev_name(layers[i].d)))
}
output := gpuBufferTypes[slices.IndexFunc(splits, indexFunc(blocks))]
slog.Info("output layer", "device", C.GoString(C.ggml_backend_dev_name(output.d)))
maxTensors := len(meta.Tensors().Items())
maxTensors += 1
maxTensors += blocks * 2
slog.Info("max tensors", "max_tensors", maxTensors)
ctxs := make(map[*C.struct_ggml_backend_buffer_type]*C.struct_ggml_context)
createTensor := func(t *fs.Tensor, bts []*C.struct_ggml_backend_buffer_type) *C.struct_ggml_tensor {
for _, bt := range bts {
if _, ok := ctxs[bt]; !ok {
ctxs[bt] = C.ggml_init(C.struct_ggml_init_params{
mem_size: C.ggml_tensor_overhead() * C.size_t(maxTensors),
no_alloc: true,
})
}
cname := C.CString(t.Name)
defer C.free(unsafe.Pointer(cname))
if tt := C.ggml_get_tensor(ctxs[bt], cname); tt != nil {
return tt
}
tt := C.ggml_new_tensor(ctxs[bt], t.Kind, C.int(len(t.Shape)), (*C.int64_t)(unsafe.Pointer(&t.Shape[0])))
C.ggml_set_name(tt, cname)
tensors[t] = c
}()
slog.Debug("created tensor", "name", t.Name, "shape", t.Shape, "dtype", t.Kind, "buffer_type", C.GoString(C.ggml_backend_buft_name(bt)))
//nolint:staticcheck // TODO: check if buffer type supports this tensor
return tt
}
return nil
}
for _, b := range append(gpus, cpus...) {
C.ggml_backend_alloc_ctx_tensors(b.ctx, b.backend)
hasPart := func(s string, parts ...string) bool {
split := strings.Split(s, ".")
for _, part := range parts {
if slices.Contains(split, part) {
return true
}
}
return false
}
for _, t := range meta.Tensors().Items() {
switch {
case hasPart(t.Name, "position_embd", "token_embd", "token_norm_embd", "token_types"):
createTensor(t, input.bts)
case hasPart(t.Name, "cls", "output", "output_norm"):
createTensor(t, output.bts)
default:
if i := func() int {
if fields := strings.FieldsFunc(t.Name, func(r rune) bool { return !unicode.IsNumber(r) }); len(fields) > 0 {
if i, err := strconv.Atoi(fields[0]); err == nil {
return i
}
}
return -1
}(); i >= 0 {
createTensor(t, layers[i].bts)
} else {
for _, layer := range layers {
createTensor(t, layer.bts)
}
}
}
}
bbs := make(map[*C.struct_ggml_context][]*C.struct_ggml_backend_buffer, len(ctxs))
for bt, c := range ctxs {
if C.ggml_get_first_tensor(c) == nil {
continue
}
b := C.ggml_backend_alloc_ctx_tensors_from_buft(c, bt)
C.ggml_backend_buffer_set_usage(b, C.GGML_BACKEND_BUFFER_USAGE_WEIGHTS)
bbs[c] = append(bbs[c], b)
}
for bs := range maps.Values(bbs) {
for _, b := range bs {
slog.Info("model", "buffer", C.GoString(C.ggml_backend_buffer_name(b)), "size", format.HumanBytes2(uint64(C.ggml_backend_buffer_get_size(b))))
}
}
tensors := make(map[string]*C.struct_ggml_tensor)
for _, c := range ctxs {
for t := C.ggml_get_first_tensor(c); t != nil; t = C.ggml_get_next_tensor(c, t) {
tensors[C.GoString(C.ggml_get_name(t))] = t
}
}
sr := io.NewSectionReader(r, int64(meta.Tensors().Offset), n-int64(meta.Tensors().Offset))
var g errgroup.Group
for t, c := range tensors {
for _, t := range meta.Tensors().Items() {
g.Go(func() error {
tt, ok := tensors[t.Name]
if !ok {
return fmt.Errorf("unassigned tensor: %s", t.Name)
}
bts := make([]byte, t.Size())
n, err := io.ReadFull(io.NewSectionReader(sr, int64(t.Offset), int64(t.Size())), bts)
if err != nil {
return err
}
if n != int(t.Size()) {
return fmt.Errorf("expected %d bytes, got %d", t.Size(), n)
if n != len(bts) {
return errors.New("short read")
}
cname := C.CString(t.Name)
defer C.free(unsafe.Pointer(cname))
C.ggml_backend_tensor_set(tt, unsafe.Pointer(&bts[0]), 0, C.size_t(t.Size()))
C.free(unsafe.Pointer(cname))
C.ggml_backend_tensor_set(C.ggml_get_tensor(c.ctx, cname), unsafe.Pointer(&bts[0]), 0, C.size_t(n))
return nil
})
}
if err := g.Wait(); err != nil {
if g.Wait() != nil {
return nil, err
}
backends := make([]*C.struct_ggml_backend, len(gpus)+len(cpus))
bufts := make([]*C.struct_ggml_backend_buffer_type, len(gpus)+len(cpus))
for i, c := range append(gpus, cpus...) {
backends[i] = c.backend
bufts[i] = C.ggml_backend_get_default_buffer_type(c.backend)
var backends []*C.struct_ggml_backend
var bufts []*C.struct_ggml_backend_buffer_type
for _, d := range append(gpus, append(accels, cpus...)...) {
b := C.ggml_backend_dev_init(d, nil)
backends = append(backends, b)
bt := C.ggml_backend_get_default_buffer_type(b)
if d := C.ggml_backend_get_device(b); C.ggml_backend_dev_type(d) == C.GGML_BACKEND_DEVICE_TYPE_CPU && len(gpus) > 0 {
if hbt := C.ggml_backend_dev_host_buffer_type(d); hbt != nil {
bt = hbt
}
}
bufts = append(bufts, bt)
slog.Info("compute buffer", "backend", C.GoString(C.ggml_backend_name(b)), "buffer_type", C.GoString(C.ggml_backend_buft_name(bt)))
}
return &Backend{
flashAttention: params.FlashAttention,
meta: meta,
cpus: cpus,
gpus: gpus,
meta: meta,
tensors: tensors,
sched: C.ggml_backend_sched_new(
(*C.ggml_backend_t)(unsafe.Pointer(&backends[0])),
(*C.ggml_backend_buffer_type_t)(unsafe.Pointer(&bufts[0])),
@ -201,36 +306,22 @@ func (b *Backend) Config() ml.Config {
}
func (b *Backend) Get(name string) ml.Tensor {
cname := C.CString(name)
defer C.free(unsafe.Pointer(cname))
for _, c := range append(b.gpus, b.cpus...) {
if t := C.ggml_get_tensor(c.ctx, cname); t != nil {
return &Tensor{b: b, t: t}
}
if t, ok := b.tensors[name]; ok {
return &Tensor{b: b, t: t}
}
return nil
}
func (b *Backend) NewContext() ml.Context {
nodes := max(8192, len(b.meta.Tensors().Items())*5)
c := C.ggml_init(C.struct_ggml_init_params{
mem_buffer: nil,
mem_size: C.size_t(nodes)*C.ggml_tensor_overhead() + C.ggml_graph_overhead_custom(C.size_t(nodes), false),
no_alloc: true,
})
backends := make([]*C.struct_ggml_backend, len(b.gpus)+len(b.cpus))
for i, c := range append(b.gpus, b.cpus...) {
backends[i] = c.backend
}
maxTensors := max(8192, len(b.meta.Tensors().Items())*5)
return &Context{
b: b,
ctx: c,
backend: backends[0],
nodes: nodes,
b: b,
maxTensors: maxTensors,
ctx: C.ggml_init(C.struct_ggml_init_params{
mem_size: C.size_t(maxTensors)*C.ggml_tensor_overhead() + C.ggml_graph_overhead_custom(C.size_t(maxTensors), false),
no_alloc: true,
}),
}
}
@ -243,17 +334,17 @@ func (b *Backend) CacheConfig() ml.CacheConfig {
}
type Context struct {
b *Backend
ctx *C.struct_ggml_context
backend *C.struct_ggml_backend
b *Backend
ctx *C.struct_ggml_context
graph *C.struct_ggml_cgraph
nodes int
maxTensors int
}
func (c *Context) Forward(tensors ...ml.Tensor) ml.Context {
if c.graph == nil {
c.graph = C.ggml_new_graph_custom(c.ctx, C.size_t(c.nodes), false)
c.graph = C.ggml_new_graph_custom(c.ctx, C.size_t(c.maxTensors), false)
}
for _, tensor := range tensors {
@ -264,8 +355,9 @@ func (c *Context) Forward(tensors ...ml.Tensor) ml.Context {
}
func (c *Context) Compute(tensors ...ml.Tensor) {
C.ggml_backend_sched_graph_compute_async(c.b.sched, c.graph)
C.ggml_backend_sched_reset(c.b.sched)
C.ggml_backend_sched_alloc_graph(c.b.sched, c.graph)
C.ggml_backend_sched_graph_compute_async(c.b.sched, c.graph)
needSync := true
sync := func() {
@ -283,19 +375,19 @@ func (c *Context) Compute(tensors ...ml.Tensor) {
}
func (c *Context) MaxTensors() int {
return c.nodes
return c.maxTensors
}
func shapeToGGML(shape []int) *C.int64_t {
sh := make([]C.int64_t, len(shape))
for i, s := range shape {
sh[i] = (C.int64_t)(s)
sh[i] = C.int64_t(s)
}
return &sh[0]
}
func newTensor(ctx Context, dtype ml.DType, zero bool, shape []int) ml.Tensor {
func newTensor(ctx Context, dtype ml.DType, shape []int) ml.Tensor {
if len(shape) < 1 || len(shape) > 4 {
panic("unsupported number of dimensions")
}
@ -318,20 +410,20 @@ func newTensor(ctx Context, dtype ml.DType, zero bool, shape []int) ml.Tensor {
panic("unsupported dtype")
}
b := C.ggml_backend_alloc_buffer(ctx.backend, C.ggml_nbytes(t))
b := C.ggml_backend_alloc_buffer(C.ggml_backend_sched_get_backend(ctx.b.sched, 0), C.ggml_nbytes(t))
C.ggml_backend_tensor_alloc(b, t, C.ggml_backend_buffer_get_base(b))
if zero {
C.ggml_set_zero(t)
}
C.ggml_set_input(t)
return &Tensor{b: ctx.b, t: t}
}
func (c Context) Empty(dtype ml.DType, shape ...int) ml.Tensor {
return newTensor(c, dtype, false, shape)
return newTensor(c, dtype, shape)
}
func (c Context) Zeros(dtype ml.DType, shape ...int) ml.Tensor {
return newTensor(c, dtype, true, shape)
t := newTensor(c, dtype, shape)
C.ggml_set_zero(t.(*Tensor).t)
return t
}
func fromSlice[S ~[]E, E float32 | int32](ctx Context, s S, shape []int, dtype uint32) (ml.Tensor, error) {
@ -352,9 +444,10 @@ func fromSlice[S ~[]E, E float32 | int32](ctx Context, s S, shape []int, dtype u
}
t := C.ggml_new_tensor(ctx.ctx, dtype, C.int(len(shape)), shapeToGGML(shape))
b := C.ggml_backend_alloc_buffer(ctx.backend, C.ggml_nbytes(t))
b := C.ggml_backend_alloc_buffer(C.ggml_backend_sched_get_backend(ctx.b.sched, 0), C.ggml_nbytes(t))
C.ggml_backend_tensor_alloc(b, t, C.ggml_backend_buffer_get_base(b))
C.ggml_backend_tensor_set(t, unsafe.Pointer(&s[0]), 0, C.ggml_nbytes(t))
C.ggml_set_input(t)
return &Tensor{b: ctx.b, t: t}, nil
}

View File

@ -207,13 +207,7 @@ struct ggml_backend_registry {
for (size_t i = 0; i < ggml_backend_reg_dev_count(reg); i++) {
register_device(ggml_backend_reg_dev_get(reg, i), score);
}
}
void register_device(ggml_backend_dev_t device, int score = -1) {
#ifndef NDEBUG
GGML_LOG_DEBUG("%s: registered device %s (%s)\n", __func__, ggml_backend_dev_name(device), ggml_backend_dev_description(device));
#endif
devices.push_back({device, score});
std::stable_sort(devices.begin(), devices.end(),
[](const auto & a, const auto & b) {
return a.second > b.second;
@ -221,6 +215,21 @@ struct ggml_backend_registry {
);
}
void register_device(ggml_backend_dev_t device, int score = -1) {
switch (ggml_backend_dev_type(device)) {
case GGML_BACKEND_DEVICE_TYPE_CPU:
case GGML_BACKEND_DEVICE_TYPE_GPU:
score += 1 << 16;
case GGML_BACKEND_DEVICE_TYPE_ACCEL:
score += 1 << 20;
}
#ifndef NDEBUG
GGML_LOG_DEBUG("%s: registered device %s (%s)\n", __func__, ggml_backend_dev_name(device), ggml_backend_dev_description(device));
#endif
devices.push_back({device, score});
}
ggml_backend_reg_t load_backend(const std::filesystem::path & path, bool silent) {
dl_handle_ptr handle { dl_load_library(path) };
if (!handle) {

View File

@ -12,7 +12,6 @@ import (
)
type Options struct {
RopeFactors ml.Tensor `gguf:"rope_freqs.weight"`
hiddenSize, numHeads, numKVHeads int
eps, ropeBase, ropeScale float32
ropeDim uint32
@ -66,10 +65,11 @@ func New(c ml.Config) (model.Model, error) {
}
type SelfAttention struct {
Query *nn.Linear `gguf:"attn_q"`
Key *nn.Linear `gguf:"attn_k"`
Value *nn.Linear `gguf:"attn_v"`
Output *nn.Linear `gguf:"attn_output"`
Query *nn.Linear `gguf:"attn_q"`
Key *nn.Linear `gguf:"attn_k"`
Value *nn.Linear `gguf:"attn_v"`
Output *nn.Linear `gguf:"attn_output"`
RopeFactors ml.Tensor `gguf:"rope_freqs.weight"`
}
func (sa *SelfAttention) Forward(ctx ml.Context, hiddenState, positionIDs ml.Tensor, cache kvcache.Cache, opts *Options) ml.Tensor {
@ -78,11 +78,11 @@ func (sa *SelfAttention) Forward(ctx ml.Context, hiddenState, positionIDs ml.Ten
q := sa.Query.Forward(ctx, hiddenState)
q = q.Reshape(ctx, headDim, opts.numHeads, batchSize)
q = q.RoPE(ctx, positionIDs, opts.RopeFactors, opts.ropeDim, opts.ropeBase, opts.ropeScale)
q = q.RoPE(ctx, positionIDs, sa.RopeFactors, opts.ropeDim, opts.ropeBase, opts.ropeScale)
k := sa.Key.Forward(ctx, hiddenState)
k = k.Reshape(ctx, headDim, opts.numKVHeads, batchSize)
k = k.RoPE(ctx, positionIDs, opts.RopeFactors, opts.ropeDim, opts.ropeBase, opts.ropeScale)
k = k.RoPE(ctx, positionIDs, sa.RopeFactors, opts.ropeDim, opts.ropeBase, opts.ropeScale)
v := sa.Value.Forward(ctx, hiddenState)
v = v.Reshape(ctx, headDim, opts.numKVHeads, batchSize)
@ -95,7 +95,7 @@ func (sa *SelfAttention) Forward(ctx ml.Context, hiddenState, positionIDs ml.Ten
}
func (m *Model) Shift(ctx ml.Context, layer int, key, shift ml.Tensor) (ml.Tensor, error) {
return key.RoPE(ctx, shift, m.Options.RopeFactors, m.Options.ropeDim, m.Options.ropeBase, m.Options.ropeScale), nil
return key.RoPE(ctx, shift, m.Layers[layer].SelfAttention.RopeFactors, m.ropeDim, m.ropeBase, m.ropeScale), nil
}
type MLP struct {