mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-09-06 09:34:13 +02:00
fn: add concurrent map operation for slices
This commit is contained in:
38
fn/slice.go
38
fn/slice.go
@@ -1,6 +1,13 @@
|
|||||||
package fn
|
package fn
|
||||||
|
|
||||||
import "golang.org/x/exp/constraints"
|
import (
|
||||||
|
"context"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"golang.org/x/exp/constraints"
|
||||||
|
"golang.org/x/sync/semaphore"
|
||||||
|
)
|
||||||
|
|
||||||
// Number is a type constraint for all numeric types in Go (integers,
|
// Number is a type constraint for all numeric types in Go (integers,
|
||||||
// float and complex numbers)
|
// float and complex numbers)
|
||||||
@@ -205,3 +212,32 @@ func Sum[B Number](items []B) B {
|
|||||||
func HasDuplicates[A comparable](items []A) bool {
|
func HasDuplicates[A comparable](items []A) bool {
|
||||||
return len(NewSet(items...)) != len(items)
|
return len(NewSet(items...)) != len(items)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ForEachConc maps the argument function over the slice, spawning a new
|
||||||
|
// goroutine for each element in the slice and then awaits all results before
|
||||||
|
// returning them.
|
||||||
|
func ForEachConc[A, B any](f func(A) B,
|
||||||
|
as []A) []B {
|
||||||
|
|
||||||
|
var wait sync.WaitGroup
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
sem := semaphore.NewWeighted(int64(runtime.NumCPU()))
|
||||||
|
|
||||||
|
bs := make([]B, len(as))
|
||||||
|
|
||||||
|
for i, a := range as {
|
||||||
|
i, a := i, a
|
||||||
|
sem.Acquire(ctx, 1)
|
||||||
|
wait.Add(1)
|
||||||
|
go func() {
|
||||||
|
bs[i] = f(a)
|
||||||
|
wait.Done()
|
||||||
|
sem.Release(1)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wait.Wait()
|
||||||
|
|
||||||
|
return bs
|
||||||
|
}
|
||||||
|
@@ -4,6 +4,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
"testing"
|
"testing"
|
||||||
|
"testing/quick"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
@@ -281,3 +283,60 @@ func TestHasDuplicates(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestPropForEachConcMapIsomorphism ensures the property that ForEachConc and
|
||||||
|
// Map always yield the same results.
|
||||||
|
func TestPropForEachConcMapIsomorphism(t *testing.T) {
|
||||||
|
f := func(incSize int, s []int) bool {
|
||||||
|
inc := func(i int) int { return i + incSize }
|
||||||
|
mapped := Map(inc, s)
|
||||||
|
conc := ForEachConc(inc, s)
|
||||||
|
|
||||||
|
return slices.Equal(mapped, conc)
|
||||||
|
}
|
||||||
|
if err := quick.Check(f, nil); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestPropForEachConcOutperformsMapWhenExpensive ensures the property that
|
||||||
|
// ForEachConc will beat Map in a race in circumstances where the computation in
|
||||||
|
// the argument closure is expensive.
|
||||||
|
func TestPropForEachConcOutperformsMapWhenExpensive(t *testing.T) {
|
||||||
|
f := func(incSize int, s []int) bool {
|
||||||
|
if len(s) < 2 {
|
||||||
|
// Intuitively we don't expect the extra overhead of
|
||||||
|
// ForEachConc to justify itself for list sizes of 1 or
|
||||||
|
// 0.
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
inc := func(i int) int {
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
return i + incSize
|
||||||
|
}
|
||||||
|
c := make(chan bool, 1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
Map(inc, s)
|
||||||
|
select {
|
||||||
|
case c <- false:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
ForEachConc(inc, s)
|
||||||
|
select {
|
||||||
|
case c <- true:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return <-c
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := quick.Check(f, nil); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user