mirror of
https://github.com/nbd-wtf/go-nostr.git
synced 2025-06-11 01:10:48 +02:00
sdk: optimize caching lists (so we don't fetch twice in a row).
This commit is contained in:
parent
1b786ab213
commit
2edc0fb713
@ -1,21 +1,35 @@
|
|||||||
package sdk
|
package sdk
|
||||||
|
|
||||||
import (
|
import "time"
|
||||||
"strings"
|
|
||||||
)
|
|
||||||
|
|
||||||
// IsVirtualRelay returns true if the given normalized relay URL shouldn't be considered for outbox-model calculations.
|
var serial = 0
|
||||||
func IsVirtualRelay(url string) bool {
|
|
||||||
if len(url) < 6 {
|
|
||||||
// this is just invalid
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
if strings.HasPrefix(url, "wss://feeds.nostr.band") ||
|
func pickNext(list []string) string {
|
||||||
strings.HasPrefix(url, "wss://filter.nostr.wine") ||
|
serial++
|
||||||
strings.HasPrefix(url, "wss://cache") {
|
return list[serial%len(list)]
|
||||||
return true
|
}
|
||||||
}
|
|
||||||
|
func doThisNotMoreThanOnceAnHour(key string) (doItNow bool) {
|
||||||
return false
|
if _dtnmtoah == nil {
|
||||||
|
go func() {
|
||||||
|
_dtnmtoah = make(map[string]time.Time)
|
||||||
|
for {
|
||||||
|
time.Sleep(time.Minute * 10)
|
||||||
|
_dtnmtoahLock.Lock()
|
||||||
|
now := time.Now()
|
||||||
|
for k, v := range _dtnmtoah {
|
||||||
|
if v.Before(now) {
|
||||||
|
delete(_dtnmtoah, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_dtnmtoahLock.Unlock()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
_dtnmtoahLock.Lock()
|
||||||
|
defer _dtnmtoahLock.Unlock()
|
||||||
|
|
||||||
|
_, exists := _dtnmtoah[key]
|
||||||
|
return !exists
|
||||||
}
|
}
|
||||||
|
47
sdk/list.go
47
sdk/list.go
@ -3,6 +3,7 @@ package sdk
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"slices"
|
"slices"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nbd-wtf/go-nostr"
|
"github.com/nbd-wtf/go-nostr"
|
||||||
@ -20,6 +21,11 @@ type TagItemWithValue interface {
|
|||||||
Value() string
|
Value() string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
genericListMutexes = [24]sync.Mutex{}
|
||||||
|
valueWasJustCached = [24]bool{}
|
||||||
|
)
|
||||||
|
|
||||||
func fetchGenericList[I TagItemWithValue](
|
func fetchGenericList[I TagItemWithValue](
|
||||||
sys *System,
|
sys *System,
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
@ -29,36 +35,45 @@ func fetchGenericList[I TagItemWithValue](
|
|||||||
cache cache.Cache32[GenericList[I]],
|
cache cache.Cache32[GenericList[I]],
|
||||||
skipFetch bool,
|
skipFetch bool,
|
||||||
) (fl GenericList[I], fromInternal bool) {
|
) (fl GenericList[I], fromInternal bool) {
|
||||||
if cache != nil {
|
// we have 24 mutexes, so we can load up to 24 lists at the same time, but if we do the same exact
|
||||||
if v, ok := cache.Get(pubkey); ok {
|
// call that will do it only once, the subsequent ones will wait for a result to be cached
|
||||||
return v, true
|
// and then return it from cache -- 13 is an arbitrary index for the pubkey
|
||||||
}
|
lockIdx := (int(pubkey[13]) + kind) % 24
|
||||||
|
genericListMutexes[lockIdx].Lock()
|
||||||
|
|
||||||
|
if valueWasJustCached[lockIdx] {
|
||||||
|
// this ensures the cache has had time to commit the values
|
||||||
|
// so we don't repeat a fetch immediately after the other
|
||||||
|
valueWasJustCached[lockIdx] = false
|
||||||
|
time.Sleep(time.Millisecond * 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
events, _ := sys.StoreRelay.QuerySync(ctx, nostr.Filter{Kinds: []int{kind}, Authors: []string{pubkey}})
|
defer genericListMutexes[lockIdx].Unlock()
|
||||||
if len(events) != 0 {
|
|
||||||
items := parseItemsFromEventTags(events[0], parseTag)
|
if v, ok := cache.Get(pubkey); ok {
|
||||||
v := GenericList[I]{
|
|
||||||
PubKey: pubkey,
|
|
||||||
Event: events[0],
|
|
||||||
Items: items,
|
|
||||||
}
|
|
||||||
cache.SetWithTTL(pubkey, v, time.Hour*6)
|
|
||||||
return v, true
|
return v, true
|
||||||
}
|
}
|
||||||
|
|
||||||
v := GenericList[I]{PubKey: pubkey}
|
v := GenericList[I]{PubKey: pubkey}
|
||||||
|
|
||||||
|
events, _ := sys.StoreRelay.QuerySync(ctx, nostr.Filter{Kinds: []int{kind}, Authors: []string{pubkey}})
|
||||||
|
if len(events) != 0 {
|
||||||
|
items := parseItemsFromEventTags(events[0], parseTag)
|
||||||
|
v.Event = events[0]
|
||||||
|
v.Items = items
|
||||||
|
valueWasJustCached[lockIdx] = true
|
||||||
|
return v, true
|
||||||
|
}
|
||||||
|
|
||||||
if !skipFetch {
|
if !skipFetch {
|
||||||
thunk := sys.replaceableLoaders[kind].Load(ctx, pubkey)
|
thunk := sys.replaceableLoaders[kind].Load(ctx, pubkey)
|
||||||
evt, err := thunk()
|
evt, err := thunk()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
items := parseItemsFromEventTags(evt, parseTag)
|
items := parseItemsFromEventTags(evt, parseTag)
|
||||||
v.Items = items
|
v.Items = items
|
||||||
if cache != nil {
|
|
||||||
cache.SetWithTTL(pubkey, v, time.Hour*6)
|
|
||||||
}
|
|
||||||
sys.StoreRelay.Publish(ctx, *evt)
|
sys.StoreRelay.Publish(ctx, *evt)
|
||||||
}
|
}
|
||||||
|
valueWasJustCached[lockIdx] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
return v, false
|
return v, false
|
||||||
|
@ -17,13 +17,10 @@ func (sys *System) FetchOutboxRelays(ctx context.Context, pubkey string, n int)
|
|||||||
return relays
|
return relays
|
||||||
}
|
}
|
||||||
|
|
||||||
if rl, ok := sys.RelayListCache.Get(pubkey); !ok || (rl.Event != nil && rl.Event.CreatedAt < nostr.Now()-60*60*24*7) {
|
// if we have it cached that means we have at least tried to fetch recently and it won't be tried again
|
||||||
// try to fetch relays list again if we don't have one or if ours is a week old
|
fetchGenericList(sys, ctx, pubkey, 10002, parseRelayFromKind10002, sys.RelayListCache, false)
|
||||||
fetchGenericList(sys, ctx, pubkey, 10002, parseRelayFromKind10002, sys.RelayListCache, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
relays := sys.Hints.TopN(pubkey, 6)
|
relays := sys.Hints.TopN(pubkey, 6)
|
||||||
|
|
||||||
if len(relays) == 0 {
|
if len(relays) == 0 {
|
||||||
return []string{"wss://relay.damus.io", "wss://nos.lol"}
|
return []string{"wss://relay.damus.io", "wss://nos.lol"}
|
||||||
}
|
}
|
||||||
|
@ -22,11 +22,8 @@ func (sys *System) initializeDataloaders() {
|
|||||||
|
|
||||||
func (sys *System) createReplaceableDataloader(kind int) *dataloader.Loader[string, *nostr.Event] {
|
func (sys *System) createReplaceableDataloader(kind int) *dataloader.Loader[string, *nostr.Event] {
|
||||||
return dataloader.NewBatchedLoader(
|
return dataloader.NewBatchedLoader(
|
||||||
func(
|
func(_ context.Context, pubkeys []string) []*dataloader.Result[*nostr.Event] {
|
||||||
ctx context.Context,
|
return sys.batchLoadReplaceableEvents(kind, pubkeys)
|
||||||
pubkeys []string,
|
|
||||||
) []*dataloader.Result[*nostr.Event] {
|
|
||||||
return sys.batchLoadReplaceableEvents(ctx, kind, pubkeys)
|
|
||||||
},
|
},
|
||||||
dataloader.WithBatchCapacity[string, *nostr.Event](60),
|
dataloader.WithBatchCapacity[string, *nostr.Event](60),
|
||||||
dataloader.WithClearCacheOnBatch[string, *nostr.Event](),
|
dataloader.WithClearCacheOnBatch[string, *nostr.Event](),
|
||||||
@ -35,7 +32,6 @@ func (sys *System) createReplaceableDataloader(kind int) *dataloader.Loader[stri
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sys *System) batchLoadReplaceableEvents(
|
func (sys *System) batchLoadReplaceableEvents(
|
||||||
ctx context.Context,
|
|
||||||
kind int,
|
kind int,
|
||||||
pubkeys []string,
|
pubkeys []string,
|
||||||
) []*dataloader.Result[*nostr.Event] {
|
) []*dataloader.Result[*nostr.Event] {
|
||||||
@ -67,7 +63,7 @@ func (sys *System) batchLoadReplaceableEvents(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// save attempts here so we don't try the same failed query over and over
|
// save attempts here so we don't try the same failed query over and over
|
||||||
if doItNow := DoThisNotMoreThanOnceAnHour("repl:" + strconv.Itoa(kind) + pubkey); !doItNow {
|
if doItNow := doThisNotMoreThanOnceAnHour("repl:" + strconv.Itoa(kind) + pubkey); !doItNow {
|
||||||
results[i] = &dataloader.Result[*nostr.Event]{
|
results[i] = &dataloader.Result[*nostr.Event]{
|
||||||
Error: fmt.Errorf("last attempt failed, waiting more to try again"),
|
Error: fmt.Errorf("last attempt failed, waiting more to try again"),
|
||||||
}
|
}
|
||||||
|
39
sdk/utils.go
39
sdk/utils.go
@ -1,6 +1,7 @@
|
|||||||
package sdk
|
package sdk
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -10,34 +11,18 @@ var (
|
|||||||
_dtnmtoahLock sync.Mutex
|
_dtnmtoahLock sync.Mutex
|
||||||
)
|
)
|
||||||
|
|
||||||
func DoThisNotMoreThanOnceAnHour(key string) (doItNow bool) {
|
// IsVirtualRelay returns true if the given normalized relay URL shouldn't be considered for outbox-model calculations.
|
||||||
if _dtnmtoah == nil {
|
func IsVirtualRelay(url string) bool {
|
||||||
go func() {
|
if len(url) < 6 {
|
||||||
_dtnmtoah = make(map[string]time.Time)
|
// this is just invalid
|
||||||
for {
|
return true
|
||||||
time.Sleep(time.Minute * 10)
|
|
||||||
_dtnmtoahLock.Lock()
|
|
||||||
now := time.Now()
|
|
||||||
for k, v := range _dtnmtoah {
|
|
||||||
if v.Before(now) {
|
|
||||||
delete(_dtnmtoah, k)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_dtnmtoahLock.Unlock()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_dtnmtoahLock.Lock()
|
if strings.HasPrefix(url, "wss://feeds.nostr.band") ||
|
||||||
defer _dtnmtoahLock.Unlock()
|
strings.HasPrefix(url, "wss://filter.nostr.wine") ||
|
||||||
|
strings.HasPrefix(url, "wss://cache") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
_, exists := _dtnmtoah[key]
|
return false
|
||||||
return !exists
|
|
||||||
}
|
|
||||||
|
|
||||||
var serial = 0
|
|
||||||
|
|
||||||
func pickNext(list []string) string {
|
|
||||||
serial++
|
|
||||||
return list[serial%len(list)]
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user