From 45df786f09fec84ff68795845a4b3395cc0def90 Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Tue, 4 Mar 2025 13:06:56 -0800 Subject: [PATCH] comments --- ml/backend/ggml/ggml.go | 212 ++++++++++++++++++++++------------------ 1 file changed, 117 insertions(+), 95 deletions(-) diff --git a/ml/backend/ggml/ggml.go b/ml/backend/ggml/ggml.go index 9cee8216b..c4adcd98f 100644 --- a/ml/backend/ggml/ggml.go +++ b/ml/backend/ggml/ggml.go @@ -12,7 +12,6 @@ import ( "errors" "fmt" "io" - "iter" "log/slog" "maps" "os" @@ -29,26 +28,34 @@ import ( "golang.org/x/sync/errgroup" ) -func devices() iter.Seq[*C.struct_ggml_backend_device] { - return func(yield func(*C.struct_ggml_backend_device) bool) { - ggml.OnceLoad() - for i := range C.ggml_backend_dev_count() { - if !yield(C.ggml_backend_dev_get(i)) { - return - } - } +func devices() []*C.struct_ggml_backend_device { + ggml.OnceLoad() + ds := make([]*C.struct_ggml_backend_device, C.ggml_backend_dev_count()) + for i := range ds { + ds[i] = C.ggml_backend_dev_get(C.size_t(i)) } + + return ds } type Backend struct { meta *fs.GGML sched *C.struct_ggml_backend_sched tensors map[string]*C.struct_ggml_tensor - input *C.struct_ggml_backend - output *C.struct_ggml_backend - layers map[int]*C.struct_ggml_backend + + // input is the backend used for inputs + input *C.struct_ggml_backend + + // output is the backend used for outputs + output *C.struct_ggml_backend + + // layers is the backend used for repeating layers + layers map[int]*C.struct_ggml_backend flashAttention bool + + // maxGraphNodes is the maximum allowed number of graph nodes in this scheduler + maxGraphNodes int } func New(r *os.File, params ml.BackendParams) (ml.Backend, error) { @@ -73,7 +80,7 @@ func New(r *os.File, params ml.BackendParams) (ml.Backend, error) { } var cpus, accels, gpus []*C.struct_ggml_backend_device - for d := range devices() { + for _, d := range devices() { switch C.ggml_backend_dev_type(d) { case C.GGML_BACKEND_DEVICE_TYPE_CPU: cpus = append(cpus, d) @@ -84,6 +91,7 @@ func New(r *os.File, params ml.BackendParams) (ml.Backend, error) { } } + // create list of buffer types for the cpu cpuDeviceBufferType := deviceBufferType{d: C.ggml_backend_dev_by_type(C.GGML_BACKEND_DEVICE_TYPE_CPU)} for _, d := range append(accels, append(gpus, cpus...)...) { switch C.ggml_backend_dev_type(d) { @@ -93,6 +101,7 @@ func New(r *os.File, params ml.BackendParams) (ml.Backend, error) { } } + // create list of buffer types for each gpu var gpuDeviceBufferTypes []deviceBufferType for _, d := range gpus { bt := C.ggml_backend_dev_buffer_type(d) @@ -102,44 +111,53 @@ func New(r *os.File, params ml.BackendParams) (ml.Backend, error) { }) } - splits := make([]float32, len(gpus)) - if func() bool { - for _, s := range params.TensorSplit { - if s != 0 { - return true - } + useDefaultSplit := true + for _, s := range params.TensorSplit { + if s != 0 { + useDefaultSplit = false + break } + } - return false - }() { - splits = params.TensorSplit - } else { + // calculate splits + splits := make([]float32, len(gpus)) + if useDefaultSplit { + // default: split on free memory for i := range splits { var free, total C.size_t C.ggml_backend_dev_memory(gpus[i], &free, &total) splits[i] = float32(free) } + } else { + splits = params.TensorSplit } var sum float32 + // cumulative sum of all splits for i := range splits { sum += splits[i] splits[i] = sum } + // normalize splits for i := range splits { splits[i] /= sum } + // inputs always use cpu input := cpuDeviceBufferType blocks := int(meta.KV().BlockCount()) + + // define a range of gpu layers. anything outside of this range is assigned to the cpu + gpuRangeStart := max(0, blocks-params.NumGPULayers) + gpuRangeStop := min(gpuRangeStart+params.NumGPULayers, blocks+1) assignLayer := func(i int) deviceBufferType { - if i >= params.NumGPULayers { + if i < gpuRangeStart || i >= gpuRangeStop { return cpuDeviceBufferType } - index := slices.IndexFunc(splits, func(f float32) bool { return float32(i)/float32(blocks+1) < f }) + index := slices.IndexFunc(splits, func(f float32) bool { return float32(i-gpuRangeStart)/float32(gpuRangeStop-gpuRangeStart) < f }) if index < 0 || index >= len(gpuDeviceBufferTypes) { return cpuDeviceBufferType } @@ -147,15 +165,18 @@ func New(r *os.File, params ml.BackendParams) (ml.Backend, error) { return gpuDeviceBufferTypes[index] } + // repeating layers are assigned based on their index in reverse order, e.g. i / (block_count + 1) layers := make([]deviceBufferType, blocks) for i := range layers { layers[i] = assignLayer(i) } + // outputs are assigned iff allowed by splits and configured number of gpu layers output := assignLayer(blocks) maxTensors := len(meta.Tensors().Items()) maxTensors += 1 + // each layer has at most 2 extra tensors for rope operations maxTensors += blocks * 2 type tensor struct { @@ -163,8 +184,10 @@ func New(r *os.File, params ml.BackendParams) (ml.Backend, error) { target string } + // some tensors are mapped to different names so keep a list targets := make(map[string][]string) + // contexts are shared by tensors of the same buffer type ctxs := make(map[*C.struct_ggml_backend_buffer_type]*C.struct_ggml_context) createTensor := func(t tensor, bts []*C.struct_ggml_backend_buffer_type) *C.struct_ggml_tensor { for _, bt := range bts { @@ -217,19 +240,21 @@ func New(r *os.File, params ml.BackendParams) (ml.Backend, error) { case contains(t.Name, "cls", "output", "output_norm"): createTensor(tensor{source: t}, output.bts) case strings.HasPrefix(t.Name, "v.") || strings.HasPrefix(t.Name, "mm."): + // TODO: assign vision tensors to the gpu if possible createTensor(tensor{source: t}, input.bts) default: - if i := func() int { - if fields := strings.FieldsFunc(t.Name, func(r rune) bool { return !unicode.IsNumber(r) }); len(fields) > 0 { - if i, err := strconv.Atoi(fields[0]); err == nil { - return i - } + layerIndex := -1 + if fields := strings.FieldsFunc(t.Name, func(r rune) bool { return !unicode.IsNumber(r) }); len(fields) > 0 { + if i, err := strconv.Atoi(fields[0]); err == nil { + layerIndex = i } + } - return -1 - }(); i >= 0 { - createTensor(tensor{source: t}, layers[i].bts) + if layerIndex >= 0 { + createTensor(tensor{source: t}, layers[layerIndex].bts) } else { + // this is a repeating tensor that doesn't explicitly associated with a layer so + // duplicate it for each layer for i, layer := range layers { createTensor(tensor{ source: t, @@ -240,8 +265,8 @@ func New(r *os.File, params ml.BackendParams) (ml.Backend, error) { } } - bbs := make(map[*C.struct_ggml_context][]*C.struct_ggml_backend_buffer, len(ctxs)) - + // allocate buffers for each context + bbs := make(map[*C.struct_ggml_context]*C.struct_ggml_backend_buffer, len(ctxs)) for bt, c := range ctxs { if C.ggml_get_first_tensor(c) == nil { continue @@ -249,15 +274,14 @@ func New(r *os.File, params ml.BackendParams) (ml.Backend, error) { b := C.ggml_backend_alloc_ctx_tensors_from_buft(c, bt) C.ggml_backend_buffer_set_usage(b, C.GGML_BACKEND_BUFFER_USAGE_WEIGHTS) - bbs[c] = append(bbs[c], b) + bbs[c] = b } for bs := range maps.Values(bbs) { - for _, b := range bs { - slog.Info("model weights", "buffer", C.GoString(C.ggml_backend_buffer_name(b)), "size", format.HumanBytes2(uint64(C.ggml_backend_buffer_get_size(b)))) - } + slog.Info("model weights", "buffer", C.GoString(C.ggml_backend_buffer_name(bs)), "size", format.HumanBytes2(uint64(C.ggml_backend_buffer_get_size(bs)))) } + // map tensor names to tensors for easy lookup later tensors := make(map[string]*C.struct_ggml_tensor) for _, c := range ctxs { for t := C.ggml_get_first_tensor(c); t != nil; t = C.ggml_get_next_tensor(c, t) { @@ -265,6 +289,7 @@ func New(r *os.File, params ml.BackendParams) (ml.Backend, error) { } } + // concurrently read in tensor data. uses a section reader which is safe for concurrent reads sr := io.NewSectionReader(r, int64(meta.Tensors().Offset), n-int64(meta.Tensors().Offset)) var g errgroup.Group for _, t := range meta.Tensors().Items() { @@ -289,10 +314,7 @@ func New(r *os.File, params ml.BackendParams) (ml.Backend, error) { return errors.New("short read") } - cname := C.CString(t.Name) C.ggml_backend_tensor_set(tt, unsafe.Pointer(&bts[0]), 0, C.size_t(t.Size())) - C.free(unsafe.Pointer(cname)) - return nil }) } @@ -302,39 +324,45 @@ func New(r *os.File, params ml.BackendParams) (ml.Backend, error) { return nil, err } + // map devices to backends so tensors created post initialization can be assigned to the correct device deviceBackends := make(map[*C.struct_ggml_backend_device]*C.struct_ggml_backend) - var backends []*C.struct_ggml_backend - var bufts []*C.struct_ggml_backend_buffer_type + + // create backends and buffer types used for the compute graph scheduler + var schedBackends []*C.struct_ggml_backend + var schedBufts []*C.struct_ggml_backend_buffer_type for _, d := range append(gpus, append(accels, cpus...)...) { b := C.ggml_backend_dev_init(d, nil) - backends = append(backends, b) + schedBackends = append(schedBackends, b) deviceBackends[d] = b bt := C.ggml_backend_get_default_buffer_type(b) + // use the first gpu host buffer type for gpu if possible if d := C.ggml_backend_get_device(b); C.ggml_backend_dev_type(d) == C.GGML_BACKEND_DEVICE_TYPE_CPU && len(gpus) > 0 { if hbt := C.ggml_backend_dev_host_buffer_type(d); hbt != nil { bt = hbt } } - bufts = append(bufts, bt) + schedBufts = append(schedBufts, bt) slog.Info("compute graph", "backend", C.GoString(C.ggml_backend_name(b)), "buffer_type", C.GoString(C.ggml_backend_buft_name(bt))) if C.ggml_backend_is_cpu(b) { + // set number of threads for cpu backend C.ggml_backend_cpu_set_n_threads(b, C.int(params.NumThreads)) } } + maxGraphNodes := max(8192, len(meta.Tensors().Items())*5) return &Backend{ flashAttention: params.FlashAttention, meta: meta, tensors: tensors, sched: C.ggml_backend_sched_new( - (*C.ggml_backend_t)(unsafe.Pointer(&backends[0])), - (*C.ggml_backend_buffer_type_t)(unsafe.Pointer(&bufts[0])), - C.int(len(backends)), - C.size_t(max(8192, len(meta.Tensors().Items())*5)), + (*C.ggml_backend_t)(unsafe.Pointer(&schedBackends[0])), + (*C.ggml_backend_buffer_type_t)(unsafe.Pointer(&schedBufts[0])), + C.int(len(schedBackends)), + C.size_t(maxGraphNodes), true, ), input: deviceBackends[input.d], @@ -346,6 +374,7 @@ func New(r *os.File, params ml.BackendParams) (ml.Backend, error) { } return m }(), + maxGraphNodes: maxGraphNodes, }, nil } @@ -366,10 +395,11 @@ func (b *Backend) Get(name string) ml.Tensor { } func (b *Backend) NewContext() ml.Context { - return b.NewContextSize(max(8192, len(b.meta.Tensors().Items())*5)) + return b.NewContextSize(b.maxGraphNodes) } func (b *Backend) NewContextSize(n int) ml.Context { + n = min(n, b.maxGraphNodes) return &Context{ b: b, ctx: C.ggml_init(C.struct_ggml_init_params{ @@ -378,9 +408,6 @@ func (b *Backend) NewContextSize(n int) ml.Context { }), backend: C.ggml_backend_sched_get_backend(b.sched, 0), maxGraphNodes: n, - input: b.input, - output: b.output, - layers: b.layers, } } @@ -401,46 +428,38 @@ type Context struct { // backend is the backend used for new tensors backend *C.struct_ggml_backend - // input is the backend used for inputs - input *C.struct_ggml_backend - - // output is the backend used for outputs - output *C.struct_ggml_backend - - // output is the backend used for repeating layers - layers map[int]*C.struct_ggml_backend - + // maxGraphNodes is the maximum allowed number of graph nodes in this context maxGraphNodes int } -func (c *Context) Input() ml.Context { - if c.input != nil { +func (c Context) Input() ml.Context { + if c.b.input != nil { return &Context{ b: c.b, ctx: c.ctx, - backend: c.input, + backend: c.b.input, maxGraphNodes: c.maxGraphNodes, } } - return c + return &c } -func (c *Context) Output() ml.Context { - if c.output != nil { +func (c Context) Output() ml.Context { + if c.b.output != nil { return &Context{ b: c.b, ctx: c.ctx, - backend: c.output, + backend: c.b.output, maxGraphNodes: c.maxGraphNodes, } } - return c + return &c } -func (c *Context) Layer(i int) ml.Context { - if backend, ok := c.layers[i]; ok { +func (c Context) Layer(i int) ml.Context { + if backend, ok := c.b.layers[i]; ok { return &Context{ b: c.b, ctx: c.ctx, @@ -449,7 +468,7 @@ func (c *Context) Layer(i int) ml.Context { } } - return c + return &c } func (c *Context) Forward(tensors ...ml.Tensor) ml.Context { @@ -464,10 +483,9 @@ func (c *Context) Forward(tensors ...ml.Tensor) ml.Context { return c } -func (c *Context) Compute(tensors ...ml.Tensor) { - C.ggml_backend_sched_reset(c.b.sched) - C.ggml_backend_sched_alloc_graph(c.b.sched, c.graph) +func (c Context) Compute(tensors ...ml.Tensor) { C.ggml_backend_sched_graph_compute_async(c.b.sched, c.graph) + C.ggml_backend_sched_reset(c.b.sched) needSync := true sync := func() { @@ -484,7 +502,7 @@ func (c *Context) Compute(tensors ...ml.Tensor) { } } -func (c *Context) MaxGraphNodes() int { +func (c Context) MaxGraphNodes() int { return c.maxGraphNodes } @@ -498,7 +516,22 @@ func shapeToGGML(shape []int) *C.int64_t { } func (c Context) newTensor(dtype ml.DType, shape []int) ml.Tensor { - if len(shape) < 1 || len(shape) > 4 { + var cdtype uint32 + switch dtype { + case ml.DTypeF32: + cdtype = C.GGML_TYPE_F32 + case ml.DTypeF16: + cdtype = C.GGML_TYPE_F16 + case ml.DTypeI32: + cdtype = C.GGML_TYPE_I32 + default: + panic("unsupported dtype") + } + + if len(shape) < 1 { + var shape C.int64_t = 0 + return &Tensor{b: c.b, t: C.ggml_new_tensor(c.ctx, cdtype, 1, &shape)} + } else if len(shape) > 4 { panic("unsupported number of dimensions") } @@ -508,18 +541,7 @@ func (c Context) newTensor(dtype ml.DType, shape []int) ml.Tensor { } } - var t *C.struct_ggml_tensor - switch dtype { - case ml.DTypeF32: - t = C.ggml_new_tensor(c.ctx, C.GGML_TYPE_F32, C.int(len(shape)), shapeToGGML(shape)) - case ml.DTypeF16: - t = C.ggml_new_tensor(c.ctx, C.GGML_TYPE_F16, C.int(len(shape)), shapeToGGML(shape)) - case ml.DTypeI32: - t = C.ggml_new_tensor(c.ctx, C.GGML_TYPE_I32, C.int(len(shape)), shapeToGGML(shape)) - default: - panic("unsupported dtype") - } - + t := C.ggml_new_tensor(c.ctx, cdtype, C.int(len(shape)), shapeToGGML(shape)) b := C.ggml_backend_alloc_buffer(c.backend, C.ggml_nbytes(t)) C.ggml_backend_tensor_alloc(b, t, C.ggml_backend_buffer_get_base(b)) return &Tensor{b: c.b, t: t} @@ -549,7 +571,7 @@ func checkShape[S ~[]E, E any](s S, shape ...int) error { } func (c Context) FromFloatSlice(s []float32, shape ...int) (ml.Tensor, error) { - if err := checkShape(s, shape...); err != nil { + if err := checkShape(s, shape...); err != nil && len(shape) > 0 { return nil, err } @@ -559,7 +581,7 @@ func (c Context) FromFloatSlice(s []float32, shape ...int) (ml.Tensor, error) { } func (c Context) FromIntSlice(s []int32, shape ...int) (ml.Tensor, error) { - if err := checkShape(s, shape...); err != nil { + if err := checkShape(s, shape...); err != nil && len(shape) > 0 { return nil, err } @@ -568,8 +590,8 @@ func (c Context) FromIntSlice(s []int32, shape ...int) (ml.Tensor, error) { return t, nil } -func (c Context) Close() { - if c.ctx != nil { +func (c *Context) Close() { + if c != nil { C.ggml_free(c.ctx) } }