From 5902aa51593243d4151ae41ec693a059140fcec2 Mon Sep 17 00:00:00 2001 From: Keagan McClelland Date: Fri, 12 Apr 2024 17:51:00 -0600 Subject: [PATCH] fn: add concurrent map operation for slices --- fn/slice.go | 38 ++++++++++++++++++++++++++++++- fn/slice_test.go | 59 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 1 deletion(-) diff --git a/fn/slice.go b/fn/slice.go index f42d34544..32887bdb7 100644 --- a/fn/slice.go +++ b/fn/slice.go @@ -1,6 +1,13 @@ package fn -import "golang.org/x/exp/constraints" +import ( + "context" + "runtime" + "sync" + + "golang.org/x/exp/constraints" + "golang.org/x/sync/semaphore" +) // Number is a type constraint for all numeric types in Go (integers, // float and complex numbers) @@ -205,3 +212,32 @@ func Sum[B Number](items []B) B { func HasDuplicates[A comparable](items []A) bool { return len(NewSet(items...)) != len(items) } + +// ForEachConc maps the argument function over the slice, spawning a new +// goroutine for each element in the slice and then awaits all results before +// returning them. +func ForEachConc[A, B any](f func(A) B, + as []A) []B { + + var wait sync.WaitGroup + ctx := context.Background() + + sem := semaphore.NewWeighted(int64(runtime.NumCPU())) + + bs := make([]B, len(as)) + + for i, a := range as { + i, a := i, a + sem.Acquire(ctx, 1) + wait.Add(1) + go func() { + bs[i] = f(a) + wait.Done() + sem.Release(1) + }() + } + + wait.Wait() + + return bs +} diff --git a/fn/slice_test.go b/fn/slice_test.go index 016ef87d3..47ca1f7f8 100644 --- a/fn/slice_test.go +++ b/fn/slice_test.go @@ -4,6 +4,8 @@ import ( "fmt" "slices" "testing" + "testing/quick" + "time" "github.com/stretchr/testify/require" ) @@ -281,3 +283,60 @@ func TestHasDuplicates(t *testing.T) { }) } } + +// TestPropForEachConcMapIsomorphism ensures the property that ForEachConc and +// Map always yield the same results. +func TestPropForEachConcMapIsomorphism(t *testing.T) { + f := func(incSize int, s []int) bool { + inc := func(i int) int { return i + incSize } + mapped := Map(inc, s) + conc := ForEachConc(inc, s) + + return slices.Equal(mapped, conc) + } + if err := quick.Check(f, nil); err != nil { + t.Fatal(err) + } +} + +// TestPropForEachConcOutperformsMapWhenExpensive ensures the property that +// ForEachConc will beat Map in a race in circumstances where the computation in +// the argument closure is expensive. +func TestPropForEachConcOutperformsMapWhenExpensive(t *testing.T) { + f := func(incSize int, s []int) bool { + if len(s) < 2 { + // Intuitively we don't expect the extra overhead of + // ForEachConc to justify itself for list sizes of 1 or + // 0. + return true + } + + inc := func(i int) int { + time.Sleep(time.Millisecond) + return i + incSize + } + c := make(chan bool, 1) + + go func() { + Map(inc, s) + select { + case c <- false: + default: + } + }() + + go func() { + ForEachConc(inc, s) + select { + case c <- true: + default: + } + }() + + return <-c + } + + if err := quick.Check(f, nil); err != nil { + t.Fatal(err) + } +}