go-nostr/sdk/feeds.go
2025-01-20 13:45:43 -03:00

170 lines
4.3 KiB
Go

package sdk
import (
"context"
"encoding/hex"
"fmt"
"slices"
"sync"
"sync/atomic"
"github.com/nbd-wtf/go-nostr"
)
const (
pubkeyStreamLatestPrefix = byte('L')
pubkeyStreamOldestPrefix = byte('O')
)
func makePubkeyStreamKey(prefix byte, pubkey string) []byte {
key := make([]byte, 1+8)
key[0] = prefix
hex.Decode(key[1:], []byte(pubkey[0:16]))
return key
}
// StreamPubkeysForward starts listening for new events from the given pubkeys,
// taking into account their outbox relays. It returns a channel that emits events
// continuously. The events are fetched from the time of the last seen event for
// each pubkey (stored in KVStore) onwards.
func (sys *System) StreamLiveFeed(
ctx context.Context,
pubkeys []string,
kinds []int,
) (<-chan *nostr.Event, error) {
events := make(chan *nostr.Event)
active := atomic.Int32{}
active.Add(int32(len(pubkeys)))
// start a subscription for each relay group
for _, pubkey := range pubkeys {
relays := sys.FetchOutboxRelays(ctx, pubkey, 2)
if len(relays) == 0 {
if active.Add(-1) == 0 {
close(events)
}
continue
}
latestKey := makePubkeyStreamKey(pubkeyStreamLatestPrefix, pubkey)
latest := nostr.Timestamp(0)
oldestKey := makePubkeyStreamKey(pubkeyStreamOldestPrefix, pubkey)
oldest := nostr.Timestamp(0)
serial := 0
var since *nostr.Timestamp
if data, _ := sys.KVStore.Get(latestKey); data != nil {
latest = decodeTimestamp(data)
since = &latest
}
filter := nostr.Filter{
Authors: []string{pubkey},
Since: since,
Kinds: kinds,
}
go func() {
sub := sys.Pool.SubMany(ctx, relays, nostr.Filters{filter})
for evt := range sub {
sys.StoreRelay.Publish(ctx, *evt.Event)
if latest < evt.CreatedAt {
latest = evt.CreatedAt
serial++
if serial%10 == 0 {
sys.KVStore.Set(latestKey, encodeTimestamp(latest))
}
} else if oldest > evt.CreatedAt {
oldest = evt.CreatedAt
sys.KVStore.Set(oldestKey, encodeTimestamp(oldest))
}
events <- evt.Event
}
if active.Add(-1) == 0 {
close(events)
}
}()
}
return events, nil
}
// FetchFeedNextPage fetches historical events from the given pubkeys in descending order starting from the
// given until timestamp. The limit argument is just a hint of how much content you want for the entire list,
// it isn't guaranteed that this quantity of events will be returned -- it could be more or less.
//
// It relies on KVStore's latestKey and oldestKey in order to determine if we should go to relays to ask
// for events or if we should just return what we have stored locally.
func (sys *System) FetchFeedPage(
ctx context.Context,
pubkeys []string,
kinds []int,
until nostr.Timestamp,
totalLimit int,
) ([]*nostr.Event, error) {
limitPerKey := PerQueryLimitInBatch(totalLimit, len(pubkeys))
events := make([]*nostr.Event, 0, len(pubkeys)*limitPerKey)
wg := sync.WaitGroup{}
wg.Add(len(pubkeys))
for _, pubkey := range pubkeys {
oldestKey := makePubkeyStreamKey(pubkeyStreamOldestPrefix, pubkey)
filter := nostr.Filter{Authors: []string{pubkey}, Kinds: kinds}
if data, _ := sys.KVStore.Get(oldestKey); data != nil {
oldest := decodeTimestamp(data)
filter.Since = &oldest
}
if filter.Since != nil && *filter.Until < until {
// eligible for a local query
filter.Until = &until
res, err := sys.StoreRelay.QuerySync(ctx, filter)
if err != nil {
return nil, fmt.Errorf("query failure at '%s': %w", pubkey, err)
}
if len(res) >= limitPerKey {
// we got enough from the local store
events = append(events, res...)
wg.Done()
continue
}
}
// if we didn't query the local store or we didn't get enough, then we will fetch from relays
filter.Until = filter.Since
filter.Since = nil
relays := sys.FetchOutboxRelays(ctx, pubkey, 2)
if len(relays) == 0 {
wg.Done()
continue
}
go func() {
sub := sys.Pool.SubManyEose(ctx, relays, nostr.Filters{filter})
var oldest nostr.Timestamp
for ie := range sub {
sys.StoreRelay.Publish(ctx, *ie.Event)
oldest = ie.Event.CreatedAt
events = append(events, ie.Event)
}
wg.Done()
if oldest != 0 && oldest < *filter.Until {
sys.KVStore.Set(oldestKey, encodeTimestamp(oldest))
}
}()
}
wg.Wait()
slices.SortFunc(events, nostr.CompareEventPtrReverse)
return events, nil
}