fix locking on generic list and set fetching, decrease dataloader batch size, test.

This commit is contained in:
fiatjaf 2025-01-17 17:39:24 -03:00
parent 3e1c0ddc7e
commit 3fd33ce281
7 changed files with 83 additions and 26 deletions

View File

@ -435,7 +435,7 @@ func (pool *SimplePool) subManyEoseNonOverwriteCheckDuplicate(
relay, err := pool.EnsureRelay(nm) relay, err := pool.EnsureRelay(nm)
if err != nil { if err != nil {
debugLogf("error connecting to %s with %v: %s", relay, filters, err) debugLogf("error connecting to %s with %v: %s", nm, filters, err)
return return
} }

View File

@ -36,7 +36,7 @@ func (sys *System) createAddressableDataloader(kind int) *dataloader.Loader[stri
func(_ context.Context, pubkeys []string) []*dataloader.Result[[]*nostr.Event] { func(_ context.Context, pubkeys []string) []*dataloader.Result[[]*nostr.Event] {
return sys.batchLoadAddressableEvents(kind, pubkeys) return sys.batchLoadAddressableEvents(kind, pubkeys)
}, },
dataloader.WithBatchCapacity[string, []*nostr.Event](60), dataloader.WithBatchCapacity[string, []*nostr.Event](30),
dataloader.WithClearCacheOnBatch[string, []*nostr.Event](), dataloader.WithClearCacheOnBatch[string, []*nostr.Event](),
dataloader.WithCache(&dataloader.NoCache[string, []*nostr.Event]{}), dataloader.WithCache(&dataloader.NoCache[string, []*nostr.Event]{}),
dataloader.WithWait[string, []*nostr.Event](time.Millisecond*350), dataloader.WithWait[string, []*nostr.Event](time.Millisecond*350),
@ -86,7 +86,7 @@ func (sys *System) batchLoadAddressableEvents(
} }
// gather relays we'll use for this pubkey // gather relays we'll use for this pubkey
relays := sys.determineRelaysToQuery(ctx, pubkey, kind) relays := sys.determineRelaysToQuery(pubkey, kind)
// by default we will return an error (this will be overwritten when we find an event) // by default we will return an error (this will be overwritten when we find an event)
results[i] = &dataloader.Result[[]*nostr.Event]{ results[i] = &dataloader.Result[[]*nostr.Event]{

View File

@ -3,6 +3,7 @@ package sdk
import ( import (
"context" "context"
"slices" "slices"
"strconv"
"sync" "sync"
"time" "time"
@ -22,8 +23,8 @@ type TagItemWithValue interface {
} }
var ( var (
genericListMutexes = [24]sync.Mutex{} genericListMutexes = [60]sync.Mutex{}
valueWasJustCached = [24]bool{} valueWasJustCached = [60]bool{}
) )
func fetchGenericList[I TagItemWithValue]( func fetchGenericList[I TagItemWithValue](
@ -35,10 +36,11 @@ func fetchGenericList[I TagItemWithValue](
parseTag func(nostr.Tag) (I, bool), parseTag func(nostr.Tag) (I, bool),
cache cache.Cache32[GenericList[I]], cache cache.Cache32[GenericList[I]],
) (fl GenericList[I], fromInternal bool) { ) (fl GenericList[I], fromInternal bool) {
// we have 24 mutexes, so we can load up to 24 lists at the same time, but if we do the same exact // we have 60 mutexes, so we can load up to 60 lists at the same time, but if we do the same exact
// call that will do it only once, the subsequent ones will wait for a result to be cached // call that will do it only once, the subsequent ones will wait for a result to be cached
// and then return it from cache -- 13 is an arbitrary index for the pubkey // and then return it from cache -- 13 is an arbitrary index for the pubkey
lockIdx := (int(pubkey[13]) + int(replaceableIndex)) % 24 n, _ := strconv.ParseUint(pubkey[14:16], 16, 8)
lockIdx := (n + uint64(actualKind)) % 60
genericListMutexes[lockIdx].Lock() genericListMutexes[lockIdx].Lock()
if valueWasJustCached[lockIdx] { if valueWasJustCached[lockIdx] {
@ -48,7 +50,7 @@ func fetchGenericList[I TagItemWithValue](
time.Sleep(time.Millisecond * 10) time.Sleep(time.Millisecond * 10)
} }
defer genericListMutexes[lockIdx].Unlock() genericListMutexes[lockIdx].Unlock()
if v, ok := cache.Get(pubkey); ok { if v, ok := cache.Get(pubkey); ok {
return v, true return v, true

View File

@ -122,7 +122,7 @@ func (sys *System) FetchProfileMetadata(ctx context.Context, pubkey string) (pm
pm, _ = ParseMetadata(evt) pm, _ = ParseMetadata(evt)
// save on store even if the metadata json is malformed // save on store even if the metadata json is malformed
if sys.StoreRelay != nil && pm.Event != nil { if pm.Event != nil {
sys.StoreRelay.Publish(ctx, *pm.Event) sys.StoreRelay.Publish(ctx, *pm.Event)
} }
} }

View File

@ -70,7 +70,7 @@ func (sys *System) createReplaceableDataloader(kind int) *dataloader.Loader[stri
return sys.batchLoadReplaceableEvents(ctx, kind, pubkeys) return sys.batchLoadReplaceableEvents(ctx, kind, pubkeys)
}, },
dataloader.WithBatchCapacity[string, *nostr.Event](60), dataloader.WithBatchCapacity[string, *nostr.Event](30),
dataloader.WithClearCacheOnBatch[string, *nostr.Event](), dataloader.WithClearCacheOnBatch[string, *nostr.Event](),
dataloader.WithCache(&dataloader.NoCache[string, *nostr.Event]{}), dataloader.WithCache(&dataloader.NoCache[string, *nostr.Event]{}),
dataloader.WithWait[string, *nostr.Event](time.Millisecond*350), dataloader.WithWait[string, *nostr.Event](time.Millisecond*350),
@ -116,7 +116,7 @@ func (sys *System) batchLoadReplaceableEvents(
} }
// gather relays we'll use for this pubkey // gather relays we'll use for this pubkey
relays := sys.determineRelaysToQuery(ctx, pubkey, kind) relays := sys.determineRelaysToQuery(pubkey, kind)
// by default we will return an error (this will be overwritten when we find an event) // by default we will return an error (this will be overwritten when we find an event)
results[i] = &dataloader.Result[*nostr.Event]{ results[i] = &dataloader.Result[*nostr.Event]{
@ -170,7 +170,7 @@ func (sys *System) batchLoadReplaceableEvents(
} }
} }
func (sys *System) determineRelaysToQuery(ctx context.Context, pubkey string, kind int) []string { func (sys *System) determineRelaysToQuery(pubkey string, kind int) []string {
var relays []string var relays []string
// search in specific relays for user // search in specific relays for user

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"strings" "strings"
"sync"
"testing" "testing"
"github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr"
@ -36,6 +37,55 @@ func TestMetadataAndEvents(t *testing.T) {
require.GreaterOrEqual(t, len(events[meta.PubKey]), 5) require.GreaterOrEqual(t, len(events[meta.PubKey]), 5)
} }
func TestConcurrentMetadata(t *testing.T) {
sys := NewSystem()
ctx := context.Background()
wg := sync.WaitGroup{}
for _, v := range []struct {
input string
name string
}{
{
"nprofile1qyxhwumn8ghj7mn0wvhxcmmvqyd8wumn8ghj7un9d3shjtnhv4ehgetjde38gcewvdhk6qpq80cvv07tjdrrgpa0j7j7tmnyl2yr6yr7l8j4s3evf6u64th6gkwswpnfsn",
"fiatjaf",
},
{
"npub1t6jxfqz9hv0lygn9thwndekuahwyxkgvycyscjrtauuw73gd5k7sqvksrw",
"constant",
},
{
"npub1jlrs53pkdfjnts29kveljul2sm0actt6n8dxrrzqcersttvcuv3qdjynqn",
"hodlbod",
},
{
"npub1xtscya34g58tk0z605fvr788k263gsu6cy9x0mhnm87echrgufzsevkk5s",
"jb55",
},
{
"npub1qny3tkh0acurzla8x3zy4nhrjz5zd8l9sy9jys09umwng00manysew95gx",
"odell",
},
{
"npub1l2vyh47mk2p0qlsku7hg0vn29faehy9hy34ygaclpn66ukqp3afqutajft",
"pablo",
},
} {
wg.Add(1)
go func() {
meta, err := sys.FetchProfileFromInput(ctx, v.input)
require.NoError(t, err)
require.Contains(t, strings.ToLower(meta.Name), v.name)
fl := sys.FetchFollowList(ctx, meta.PubKey)
require.GreaterOrEqual(t, len(fl.Items), 30, "%s/%s", meta.PubKey, meta.Name)
wg.Done()
}()
}
wg.Wait()
}
func TestFollowListRecursion(t *testing.T) { func TestFollowListRecursion(t *testing.T) {
sys := NewSystem() sys := NewSystem()
ctx := context.Background() ctx := context.Background()
@ -53,21 +103,27 @@ func TestFollowListRecursion(t *testing.T) {
} }
results := make(chan result) results := make(chan result)
go func() { wg := sync.WaitGroup{}
for _, item := range followList.Items { for i, item := range followList.Items[0:120] {
fl := sys.FetchFollowList(ctx, item.Pubkey) wg.Add(1)
go func() {
meta := sys.FetchProfileMetadata(ctx, item.Pubkey) meta := sys.FetchProfileMetadata(ctx, item.Pubkey)
fmt.Println(" ~", item.Pubkey, meta.Name, len(fl.Items)) fl := sys.FetchFollowList(ctx, item.Pubkey)
fmt.Println(" ~", i, item.Pubkey, len(fl.Items))
results <- result{item.Pubkey, fl, meta} results <- result{item.Pubkey, fl, meta}
} wg.Done()
}()
}
go func() {
wg.Wait()
close(results)
}() }()
// collect results // collect results
var validAccounts int var validAccounts int
var accountsWithManyFollows int var accountsWithManyFollows int
for i := 0; i < len(followList.Items); i++ { for r := range results {
r := <-results
// skip if metadata has "bot" in name // skip if metadata has "bot" in name
if strings.Contains(strings.ToLower(r.metadata.Name), "bot") { if strings.Contains(strings.ToLower(r.metadata.Name), "bot") {
continue continue
@ -81,5 +137,5 @@ func TestFollowListRecursion(t *testing.T) {
// check if at least 90% of non-bot accounts follow more than 20 accounts // check if at least 90% of non-bot accounts follow more than 20 accounts
ratio := float64(accountsWithManyFollows) / float64(validAccounts) ratio := float64(accountsWithManyFollows) / float64(validAccounts)
require.Greater(t, ratio, 0.9, "at least 90%% of accounts should follow more than 20 others (actual: %.2f%%)", ratio*100) require.Greater(t, ratio, 0.7, "at least 70%% of accounts should follow more than 20 others (actual: %.2f%%)", ratio*100)
} }

View File

@ -3,6 +3,7 @@ package sdk
import ( import (
"context" "context"
"slices" "slices"
"strconv"
"time" "time"
"github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr"
@ -27,10 +28,8 @@ func fetchGenericSets[I TagItemWithValue](
parseTag func(nostr.Tag) (I, bool), parseTag func(nostr.Tag) (I, bool),
cache cache.Cache32[GenericSets[I]], cache cache.Cache32[GenericSets[I]],
) (fl GenericSets[I], fromInternal bool) { ) (fl GenericSets[I], fromInternal bool) {
// we have 24 mutexes, so we can load up to 24 lists at the same time, but if we do the same exact n, _ := strconv.ParseUint(pubkey[14:16], 16, 8)
// call that will do it only once, the subsequent ones will wait for a result to be cached lockIdx := (n + uint64(actualKind)) % 60
// and then return it from cache -- 13 is an arbitrary index for the pubkey
lockIdx := (int(pubkey[13]) + int(addressableIndex)) % 24
genericListMutexes[lockIdx].Lock() genericListMutexes[lockIdx].Lock()
if valueWasJustCached[lockIdx] { if valueWasJustCached[lockIdx] {
@ -40,7 +39,7 @@ func fetchGenericSets[I TagItemWithValue](
time.Sleep(time.Millisecond * 10) time.Sleep(time.Millisecond * 10)
} }
defer genericListMutexes[lockIdx].Unlock() genericListMutexes[lockIdx].Unlock()
if v, ok := cache.Get(pubkey); ok { if v, ok := cache.Get(pubkey); ok {
return v, true return v, true