mirror of
https://github.com/nbd-wtf/go-nostr.git
synced 2025-12-09 20:31:04 +01:00
sdk: fetching sets.
This commit is contained in:
183
sdk/addressable_loader.go
Normal file
183
sdk/addressable_loader.go
Normal file
@@ -0,0 +1,183 @@
|
||||
package sdk
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/graph-gophers/dataloader/v7"
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
|
||||
// this is similar to replaceable_loader and reuses logic from that.
|
||||
|
||||
type addressableIndex int
|
||||
|
||||
const (
|
||||
kind_30000 addressableIndex = 0
|
||||
kind_30002 addressableIndex = 1
|
||||
kind_30015 addressableIndex = 2
|
||||
kind_30030 addressableIndex = 3
|
||||
)
|
||||
|
||||
func (sys *System) initializeAddressableDataloaders() {
|
||||
sys.addressableLoaders[kind_30000] = sys.createAddressableDataloader(30000)
|
||||
sys.addressableLoaders[kind_30002] = sys.createAddressableDataloader(30002)
|
||||
sys.addressableLoaders[kind_30015] = sys.createAddressableDataloader(30015)
|
||||
sys.addressableLoaders[kind_30030] = sys.createAddressableDataloader(30030)
|
||||
}
|
||||
|
||||
func (sys *System) createAddressableDataloader(kind int) *dataloader.Loader[string, []*nostr.Event] {
|
||||
return dataloader.NewBatchedLoader(
|
||||
func(_ context.Context, pubkeys []string) []*dataloader.Result[[]*nostr.Event] {
|
||||
return sys.batchLoadAddressableEvents(kind, pubkeys)
|
||||
},
|
||||
dataloader.WithBatchCapacity[string, []*nostr.Event](60),
|
||||
dataloader.WithClearCacheOnBatch[string, []*nostr.Event](),
|
||||
dataloader.WithWait[string, []*nostr.Event](time.Millisecond*350),
|
||||
)
|
||||
}
|
||||
|
||||
func (sys *System) batchLoadAddressableEvents(
|
||||
kind int,
|
||||
pubkeys []string,
|
||||
) []*dataloader.Result[[]*nostr.Event] {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
|
||||
defer cancel()
|
||||
|
||||
batchSize := len(pubkeys)
|
||||
results := make([]*dataloader.Result[[]*nostr.Event], batchSize)
|
||||
keyPositions := make(map[string]int) // { [pubkey]: slice_index }
|
||||
relayFilters := make(map[string]nostr.Filter) // { [relayUrl]: filter }
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(pubkeys))
|
||||
cm := sync.Mutex{}
|
||||
|
||||
for i, pubkey := range pubkeys {
|
||||
// build batched queries for the external relays
|
||||
keyPositions[pubkey] = i // this is to help us know where to save the result later
|
||||
|
||||
go func(i int, pubkey string) {
|
||||
defer wg.Done()
|
||||
|
||||
// if we're attempting this query with a short key (last 8 characters), stop here
|
||||
if len(pubkey) != 64 {
|
||||
results[i] = &dataloader.Result[[]*nostr.Event]{
|
||||
Error: fmt.Errorf("won't proceed to query relays with a shortened key (%d)", kind),
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// save attempts here so we don't try the same failed query over and over
|
||||
if doItNow := doThisNotMoreThanOnceAnHour("repl:" + strconv.Itoa(kind) + pubkey); !doItNow {
|
||||
results[i] = &dataloader.Result[[]*nostr.Event]{
|
||||
Error: fmt.Errorf("last attempt failed, waiting more to try again"),
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// gather relays we'll use for this pubkey
|
||||
relays := sys.determineRelaysToQuery(ctx, pubkey, kind)
|
||||
|
||||
// by default we will return an error (this will be overwritten when we find an event)
|
||||
results[i] = &dataloader.Result[[]*nostr.Event]{
|
||||
Error: fmt.Errorf("couldn't find a kind %d event anywhere %v", kind, relays),
|
||||
}
|
||||
|
||||
cm.Lock()
|
||||
for _, relay := range relays {
|
||||
// each relay will have a custom filter
|
||||
filter, ok := relayFilters[relay]
|
||||
if !ok {
|
||||
filter = nostr.Filter{
|
||||
Kinds: []int{kind},
|
||||
Authors: make([]string, 0, batchSize-i /* this and all pubkeys after this can be added */),
|
||||
}
|
||||
}
|
||||
filter.Authors = append(filter.Authors, pubkey)
|
||||
relayFilters[relay] = filter
|
||||
}
|
||||
cm.Unlock()
|
||||
}(i, pubkey)
|
||||
}
|
||||
|
||||
// query all relays with the prepared filters
|
||||
wg.Wait()
|
||||
multiSubs := sys.batchAddressableRelayQueries(ctx, relayFilters)
|
||||
nextEvent:
|
||||
for {
|
||||
select {
|
||||
case evt, more := <-multiSubs:
|
||||
if !more {
|
||||
return results
|
||||
}
|
||||
|
||||
// insert this event at the desired position
|
||||
pos := keyPositions[evt.PubKey] // @unchecked: it must succeed because it must be a key we passed
|
||||
|
||||
events := results[pos].Data
|
||||
if events == nil {
|
||||
// no events found, so just add this and end
|
||||
results[pos] = &dataloader.Result[[]*nostr.Event]{Data: []*nostr.Event{evt}}
|
||||
continue nextEvent
|
||||
}
|
||||
|
||||
// there are events, so look for a match
|
||||
d := evt.Tags.GetD()
|
||||
for i, event := range events {
|
||||
if event.Tags.GetD() == d {
|
||||
// there is a match
|
||||
if event.CreatedAt < evt.CreatedAt {
|
||||
// ...and this one is newer, so replace
|
||||
events[i] = evt
|
||||
} else {
|
||||
// ... but this one is older, so ignore
|
||||
}
|
||||
// in any case we end this here
|
||||
continue nextEvent
|
||||
}
|
||||
}
|
||||
|
||||
// there is no match, so add to the end
|
||||
events = append(events, evt)
|
||||
results[pos].Data = events
|
||||
case <-ctx.Done():
|
||||
return results
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// batchAddressableRelayQueries is like batchReplaceableRelayQueries, except it doesn't count results to
|
||||
// try to exit early.
|
||||
func (sys *System) batchAddressableRelayQueries(
|
||||
ctx context.Context,
|
||||
relayFilters map[string]nostr.Filter,
|
||||
) <-chan *nostr.Event {
|
||||
all := make(chan *nostr.Event)
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(relayFilters))
|
||||
for url, filter := range relayFilters {
|
||||
go func(url string, filter nostr.Filter) {
|
||||
defer wg.Done()
|
||||
n := len(filter.Authors)
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*450+time.Millisecond*50*time.Duration(n))
|
||||
defer cancel()
|
||||
|
||||
for ie := range sys.Pool.SubManyEose(ctx, []string{url}, nostr.Filters{filter}, nostr.WithLabel("addr")) {
|
||||
all <- ie.Event
|
||||
}
|
||||
}(url, filter)
|
||||
}
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(all)
|
||||
}()
|
||||
|
||||
return all
|
||||
}
|
||||
Reference in New Issue
Block a user