From 3fd33ce281f13582afb67f402a14096b966ec988 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Fri, 17 Jan 2025 17:39:24 -0300 Subject: [PATCH] fix locking on generic list and set fetching, decrease dataloader batch size, test. --- pool.go | 2 +- sdk/addressable_loader.go | 4 +-- sdk/list.go | 12 ++++--- sdk/metadata.go | 2 +- sdk/replaceable_loader.go | 6 ++-- sdk/sdk_test.go | 74 ++++++++++++++++++++++++++++++++++----- sdk/set.go | 9 +++-- 7 files changed, 83 insertions(+), 26 deletions(-) diff --git a/pool.go b/pool.go index d54694f..cf5ace6 100644 --- a/pool.go +++ b/pool.go @@ -435,7 +435,7 @@ func (pool *SimplePool) subManyEoseNonOverwriteCheckDuplicate( relay, err := pool.EnsureRelay(nm) if err != nil { - debugLogf("error connecting to %s with %v: %s", relay, filters, err) + debugLogf("error connecting to %s with %v: %s", nm, filters, err) return } diff --git a/sdk/addressable_loader.go b/sdk/addressable_loader.go index 1b5c9ae..1994247 100644 --- a/sdk/addressable_loader.go +++ b/sdk/addressable_loader.go @@ -36,7 +36,7 @@ func (sys *System) createAddressableDataloader(kind int) *dataloader.Loader[stri func(_ context.Context, pubkeys []string) []*dataloader.Result[[]*nostr.Event] { return sys.batchLoadAddressableEvents(kind, pubkeys) }, - dataloader.WithBatchCapacity[string, []*nostr.Event](60), + dataloader.WithBatchCapacity[string, []*nostr.Event](30), dataloader.WithClearCacheOnBatch[string, []*nostr.Event](), dataloader.WithCache(&dataloader.NoCache[string, []*nostr.Event]{}), dataloader.WithWait[string, []*nostr.Event](time.Millisecond*350), @@ -86,7 +86,7 @@ func (sys *System) batchLoadAddressableEvents( } // gather relays we'll use for this pubkey - relays := sys.determineRelaysToQuery(ctx, pubkey, kind) + relays := sys.determineRelaysToQuery(pubkey, kind) // by default we will return an error (this will be overwritten when we find an event) results[i] = &dataloader.Result[[]*nostr.Event]{ diff --git a/sdk/list.go b/sdk/list.go index 6657488..9b2d495 100644 --- a/sdk/list.go +++ b/sdk/list.go @@ -3,6 +3,7 @@ package sdk import ( "context" "slices" + "strconv" "sync" "time" @@ -22,8 +23,8 @@ type TagItemWithValue interface { } var ( - genericListMutexes = [24]sync.Mutex{} - valueWasJustCached = [24]bool{} + genericListMutexes = [60]sync.Mutex{} + valueWasJustCached = [60]bool{} ) func fetchGenericList[I TagItemWithValue]( @@ -35,10 +36,11 @@ func fetchGenericList[I TagItemWithValue]( parseTag func(nostr.Tag) (I, bool), cache cache.Cache32[GenericList[I]], ) (fl GenericList[I], fromInternal bool) { - // we have 24 mutexes, so we can load up to 24 lists at the same time, but if we do the same exact + // we have 60 mutexes, so we can load up to 60 lists at the same time, but if we do the same exact // call that will do it only once, the subsequent ones will wait for a result to be cached // and then return it from cache -- 13 is an arbitrary index for the pubkey - lockIdx := (int(pubkey[13]) + int(replaceableIndex)) % 24 + n, _ := strconv.ParseUint(pubkey[14:16], 16, 8) + lockIdx := (n + uint64(actualKind)) % 60 genericListMutexes[lockIdx].Lock() if valueWasJustCached[lockIdx] { @@ -48,7 +50,7 @@ func fetchGenericList[I TagItemWithValue]( time.Sleep(time.Millisecond * 10) } - defer genericListMutexes[lockIdx].Unlock() + genericListMutexes[lockIdx].Unlock() if v, ok := cache.Get(pubkey); ok { return v, true diff --git a/sdk/metadata.go b/sdk/metadata.go index 96272af..b0ef469 100644 --- a/sdk/metadata.go +++ b/sdk/metadata.go @@ -122,7 +122,7 @@ func (sys *System) FetchProfileMetadata(ctx context.Context, pubkey string) (pm pm, _ = ParseMetadata(evt) // save on store even if the metadata json is malformed - if sys.StoreRelay != nil && pm.Event != nil { + if pm.Event != nil { sys.StoreRelay.Publish(ctx, *pm.Event) } } diff --git a/sdk/replaceable_loader.go b/sdk/replaceable_loader.go index 42b1dda..3fe23b5 100644 --- a/sdk/replaceable_loader.go +++ b/sdk/replaceable_loader.go @@ -70,7 +70,7 @@ func (sys *System) createReplaceableDataloader(kind int) *dataloader.Loader[stri return sys.batchLoadReplaceableEvents(ctx, kind, pubkeys) }, - dataloader.WithBatchCapacity[string, *nostr.Event](60), + dataloader.WithBatchCapacity[string, *nostr.Event](30), dataloader.WithClearCacheOnBatch[string, *nostr.Event](), dataloader.WithCache(&dataloader.NoCache[string, *nostr.Event]{}), dataloader.WithWait[string, *nostr.Event](time.Millisecond*350), @@ -116,7 +116,7 @@ func (sys *System) batchLoadReplaceableEvents( } // gather relays we'll use for this pubkey - relays := sys.determineRelaysToQuery(ctx, pubkey, kind) + relays := sys.determineRelaysToQuery(pubkey, kind) // by default we will return an error (this will be overwritten when we find an event) results[i] = &dataloader.Result[*nostr.Event]{ @@ -170,7 +170,7 @@ func (sys *System) batchLoadReplaceableEvents( } } -func (sys *System) determineRelaysToQuery(ctx context.Context, pubkey string, kind int) []string { +func (sys *System) determineRelaysToQuery(pubkey string, kind int) []string { var relays []string // search in specific relays for user diff --git a/sdk/sdk_test.go b/sdk/sdk_test.go index dff5196..9fc71d6 100644 --- a/sdk/sdk_test.go +++ b/sdk/sdk_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "sync" "testing" "github.com/nbd-wtf/go-nostr" @@ -36,6 +37,55 @@ func TestMetadataAndEvents(t *testing.T) { require.GreaterOrEqual(t, len(events[meta.PubKey]), 5) } +func TestConcurrentMetadata(t *testing.T) { + sys := NewSystem() + ctx := context.Background() + + wg := sync.WaitGroup{} + for _, v := range []struct { + input string + name string + }{ + { + "nprofile1qyxhwumn8ghj7mn0wvhxcmmvqyd8wumn8ghj7un9d3shjtnhv4ehgetjde38gcewvdhk6qpq80cvv07tjdrrgpa0j7j7tmnyl2yr6yr7l8j4s3evf6u64th6gkwswpnfsn", + "fiatjaf", + }, + { + "npub1t6jxfqz9hv0lygn9thwndekuahwyxkgvycyscjrtauuw73gd5k7sqvksrw", + "constant", + }, + { + "npub1jlrs53pkdfjnts29kveljul2sm0actt6n8dxrrzqcersttvcuv3qdjynqn", + "hodlbod", + }, + { + "npub1xtscya34g58tk0z605fvr788k263gsu6cy9x0mhnm87echrgufzsevkk5s", + "jb55", + }, + { + "npub1qny3tkh0acurzla8x3zy4nhrjz5zd8l9sy9jys09umwng00manysew95gx", + "odell", + }, + { + "npub1l2vyh47mk2p0qlsku7hg0vn29faehy9hy34ygaclpn66ukqp3afqutajft", + "pablo", + }, + } { + wg.Add(1) + go func() { + meta, err := sys.FetchProfileFromInput(ctx, v.input) + require.NoError(t, err) + require.Contains(t, strings.ToLower(meta.Name), v.name) + + fl := sys.FetchFollowList(ctx, meta.PubKey) + require.GreaterOrEqual(t, len(fl.Items), 30, "%s/%s", meta.PubKey, meta.Name) + wg.Done() + }() + } + + wg.Wait() +} + func TestFollowListRecursion(t *testing.T) { sys := NewSystem() ctx := context.Background() @@ -53,21 +103,27 @@ func TestFollowListRecursion(t *testing.T) { } results := make(chan result) - go func() { - for _, item := range followList.Items { - fl := sys.FetchFollowList(ctx, item.Pubkey) + wg := sync.WaitGroup{} + for i, item := range followList.Items[0:120] { + wg.Add(1) + go func() { meta := sys.FetchProfileMetadata(ctx, item.Pubkey) - fmt.Println(" ~", item.Pubkey, meta.Name, len(fl.Items)) + fl := sys.FetchFollowList(ctx, item.Pubkey) + fmt.Println(" ~", i, item.Pubkey, len(fl.Items)) results <- result{item.Pubkey, fl, meta} - } + wg.Done() + }() + } + + go func() { + wg.Wait() + close(results) }() // collect results var validAccounts int var accountsWithManyFollows int - for i := 0; i < len(followList.Items); i++ { - r := <-results - + for r := range results { // skip if metadata has "bot" in name if strings.Contains(strings.ToLower(r.metadata.Name), "bot") { continue @@ -81,5 +137,5 @@ func TestFollowListRecursion(t *testing.T) { // check if at least 90% of non-bot accounts follow more than 20 accounts ratio := float64(accountsWithManyFollows) / float64(validAccounts) - require.Greater(t, ratio, 0.9, "at least 90%% of accounts should follow more than 20 others (actual: %.2f%%)", ratio*100) + require.Greater(t, ratio, 0.7, "at least 70%% of accounts should follow more than 20 others (actual: %.2f%%)", ratio*100) } diff --git a/sdk/set.go b/sdk/set.go index 68c62ea..0b2f617 100644 --- a/sdk/set.go +++ b/sdk/set.go @@ -3,6 +3,7 @@ package sdk import ( "context" "slices" + "strconv" "time" "github.com/nbd-wtf/go-nostr" @@ -27,10 +28,8 @@ func fetchGenericSets[I TagItemWithValue]( parseTag func(nostr.Tag) (I, bool), cache cache.Cache32[GenericSets[I]], ) (fl GenericSets[I], fromInternal bool) { - // we have 24 mutexes, so we can load up to 24 lists at the same time, but if we do the same exact - // call that will do it only once, the subsequent ones will wait for a result to be cached - // and then return it from cache -- 13 is an arbitrary index for the pubkey - lockIdx := (int(pubkey[13]) + int(addressableIndex)) % 24 + n, _ := strconv.ParseUint(pubkey[14:16], 16, 8) + lockIdx := (n + uint64(actualKind)) % 60 genericListMutexes[lockIdx].Lock() if valueWasJustCached[lockIdx] { @@ -40,7 +39,7 @@ func fetchGenericSets[I TagItemWithValue]( time.Sleep(time.Millisecond * 10) } - defer genericListMutexes[lockIdx].Unlock() + genericListMutexes[lockIdx].Unlock() if v, ok := cache.Get(pubkey); ok { return v, true