mirror of
https://github.com/nbd-wtf/go-nostr.git
synced 2025-06-22 06:41:06 +02:00
sdk: pubkey feeds, live and past pages.
This commit is contained in:
parent
40538486d5
commit
f992c6e7ea
157
sdk/feeds.go
Normal file
157
sdk/feeds.go
Normal file
@ -0,0 +1,157 @@
|
|||||||
|
package sdk
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
|
"slices"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/nbd-wtf/go-nostr"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
pubkeyStreamLatestPrefix = byte('L')
|
||||||
|
pubkeyStreamOldestPrefix = byte('O')
|
||||||
|
)
|
||||||
|
|
||||||
|
func makePubkeyStreamKey(prefix byte, pubkey string) []byte {
|
||||||
|
key := make([]byte, 1+8)
|
||||||
|
key[0] = prefix
|
||||||
|
hex.Decode(key[1:], []byte(pubkey[0:16]))
|
||||||
|
return key
|
||||||
|
}
|
||||||
|
|
||||||
|
// StreamPubkeysForward starts listening for new events from the given pubkeys,
|
||||||
|
// taking into account their outbox relays. It returns a channel that emits events
|
||||||
|
// continuously. The events are fetched from the time of the last seen event for
|
||||||
|
// each pubkey (stored in KVStore) onwards.
|
||||||
|
func (sys *System) StreamLiveFeed(
|
||||||
|
ctx context.Context,
|
||||||
|
pubkeys []string,
|
||||||
|
kinds []int,
|
||||||
|
) (<-chan *nostr.Event, error) {
|
||||||
|
events := make(chan *nostr.Event)
|
||||||
|
|
||||||
|
// start a subscription for each relay group
|
||||||
|
for _, pubkey := range pubkeys {
|
||||||
|
relays := sys.FetchOutboxRelays(ctx, pubkey, 2)
|
||||||
|
if len(relays) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
latestKey := makePubkeyStreamKey(pubkeyStreamLatestPrefix, pubkey)
|
||||||
|
latest := nostr.Timestamp(0)
|
||||||
|
oldestKey := makePubkeyStreamKey(pubkeyStreamOldestPrefix, pubkey)
|
||||||
|
oldest := nostr.Timestamp(0)
|
||||||
|
|
||||||
|
serial := 0
|
||||||
|
|
||||||
|
var since *nostr.Timestamp
|
||||||
|
if data, _ := sys.KVStore.Get(latestKey); data != nil {
|
||||||
|
latest = decodeTimestamp(data)
|
||||||
|
since = &latest
|
||||||
|
}
|
||||||
|
|
||||||
|
filter := nostr.Filter{
|
||||||
|
Authors: []string{pubkey},
|
||||||
|
Since: since,
|
||||||
|
Kinds: kinds,
|
||||||
|
}
|
||||||
|
|
||||||
|
sub := sys.Pool.SubMany(ctx, relays, nostr.Filters{filter})
|
||||||
|
for evt := range sub {
|
||||||
|
go func() {
|
||||||
|
sys.StoreRelay.Publish(ctx, *evt.Event)
|
||||||
|
if latest < evt.CreatedAt {
|
||||||
|
latest = evt.CreatedAt
|
||||||
|
serial++
|
||||||
|
if serial%10 == 0 {
|
||||||
|
sys.KVStore.Set(latestKey, encodeTimestamp(latest))
|
||||||
|
}
|
||||||
|
} else if oldest > evt.CreatedAt {
|
||||||
|
oldest = evt.CreatedAt
|
||||||
|
sys.KVStore.Set(oldestKey, encodeTimestamp(oldest))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
events <- evt.Event
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return events, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FetchFeedNextPage fetches historical events from the given pubkeys in descending order starting from the
|
||||||
|
// given until timestamp. The limit argument is just a hint of how much content you want for the entire list,
|
||||||
|
// it isn't guaranteed that this quantity of events will be returned -- it could be more or less.
|
||||||
|
//
|
||||||
|
// It relies on KVStore's latestKey and oldestKey in order to determine if we should go to relays to ask
|
||||||
|
// for events or if we should just return what we have stored locally.
|
||||||
|
func (sys *System) FetchFeedPage(
|
||||||
|
ctx context.Context,
|
||||||
|
pubkeys []string,
|
||||||
|
kinds []int,
|
||||||
|
until nostr.Timestamp,
|
||||||
|
totalLimit int,
|
||||||
|
) ([]*nostr.Event, error) {
|
||||||
|
limitPerKey := PerQueryLimitInBatch(totalLimit, len(pubkeys))
|
||||||
|
events := make([]*nostr.Event, 0, len(pubkeys)*limitPerKey)
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(len(pubkeys))
|
||||||
|
|
||||||
|
for _, pubkey := range pubkeys {
|
||||||
|
oldestKey := makePubkeyStreamKey(pubkeyStreamOldestPrefix, pubkey)
|
||||||
|
|
||||||
|
filter := nostr.Filter{Authors: []string{pubkey}, Kinds: kinds}
|
||||||
|
if data, _ := sys.KVStore.Get(oldestKey); data != nil {
|
||||||
|
oldest := decodeTimestamp(data)
|
||||||
|
filter.Since = &oldest
|
||||||
|
}
|
||||||
|
|
||||||
|
if filter.Since != nil && *filter.Until < until {
|
||||||
|
// eligible for a local query
|
||||||
|
filter.Until = &until
|
||||||
|
res, err := sys.StoreRelay.QuerySync(ctx, filter)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("query failure at '%s': %w", pubkey, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(res) >= limitPerKey {
|
||||||
|
// we got enough from the local store
|
||||||
|
events = append(events, res...)
|
||||||
|
wg.Done()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we didn't query the local store or we didn't get enough, then we will fetch from relays
|
||||||
|
filter.Until = filter.Since
|
||||||
|
filter.Since = nil
|
||||||
|
|
||||||
|
relays := sys.FetchOutboxRelays(ctx, pubkey, 2)
|
||||||
|
if len(relays) == 0 {
|
||||||
|
wg.Done()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
sub := sys.Pool.SubManyEose(ctx, relays, nostr.Filters{filter})
|
||||||
|
var oldest nostr.Timestamp
|
||||||
|
for ie := range sub {
|
||||||
|
sys.StoreRelay.Publish(ctx, *ie.Event)
|
||||||
|
oldest = ie.Event.CreatedAt
|
||||||
|
events = append(events, ie.Event)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
if oldest != 0 && oldest < *filter.Until {
|
||||||
|
sys.KVStore.Set(oldestKey, encodeTimestamp(oldest))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
slices.SortFunc(events, nostr.CompareEventPtrReverse)
|
||||||
|
|
||||||
|
return events, nil
|
||||||
|
}
|
@ -24,7 +24,7 @@ func (s *Store) Get(key []byte) ([]byte, error) {
|
|||||||
defer s.RUnlock()
|
defer s.RUnlock()
|
||||||
|
|
||||||
if val, ok := s.data[string(key)]; ok {
|
if val, ok := s.data[string(key)]; ok {
|
||||||
// Return a copy to prevent modification of stored data
|
// return a copy to prevent modification of stored data
|
||||||
cp := make([]byte, len(val))
|
cp := make([]byte, len(val))
|
||||||
copy(cp, val)
|
copy(cp, val)
|
||||||
return cp, nil
|
return cp, nil
|
||||||
@ -36,7 +36,7 @@ func (s *Store) Set(key []byte, value []byte) error {
|
|||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
|
|
||||||
// Store a copy to prevent modification of stored data
|
// store a copy to prevent modification of stored data
|
||||||
cp := make([]byte, len(value))
|
cp := make([]byte, len(value))
|
||||||
copy(cp, value)
|
copy(cp, value)
|
||||||
s.data[string(key)] = cp
|
s.data[string(key)] = cp
|
||||||
|
@ -28,9 +28,3 @@ func encodeTimestamp(t nostr.Timestamp) []byte {
|
|||||||
func decodeTimestamp(b []byte) nostr.Timestamp {
|
func decodeTimestamp(b []byte) nostr.Timestamp {
|
||||||
return nostr.Timestamp(binary.BigEndian.Uint32(b))
|
return nostr.Timestamp(binary.BigEndian.Uint32(b))
|
||||||
}
|
}
|
||||||
|
|
||||||
// shouldRefreshFromNetwork checks if we should try fetching from network
|
|
||||||
func shouldRefreshFromNetwork(lastFetchData []byte) bool {
|
|
||||||
lastFetch := decodeTimestamp(lastFetchData)
|
|
||||||
return nostr.Now()-lastFetch > 7*24*60*60
|
|
||||||
}
|
|
||||||
|
@ -3,7 +3,6 @@ package sdk
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nbd-wtf/go-nostr"
|
"github.com/nbd-wtf/go-nostr"
|
||||||
@ -140,30 +139,6 @@ func (sys *System) FetchProfileMetadata(ctx context.Context, pubkey string) (pm
|
|||||||
return pm
|
return pm
|
||||||
}
|
}
|
||||||
|
|
||||||
// FetchUserEvents fetches events from each users' outbox relays, grouping queries when possible.
|
|
||||||
func (sys *System) FetchUserEvents(ctx context.Context, filter nostr.Filter) (map[string][]*nostr.Event, error) {
|
|
||||||
filters, err := sys.ExpandQueriesByAuthorAndRelays(ctx, filter)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to expand queries: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
results := make(map[string][]*nostr.Event)
|
|
||||||
wg := sync.WaitGroup{}
|
|
||||||
wg.Add(len(filters))
|
|
||||||
for relayURL, filter := range filters {
|
|
||||||
go func(relayURL string, filter nostr.Filter) {
|
|
||||||
defer wg.Done()
|
|
||||||
filter.Limit = filter.Limit * len(filter.Authors) // hack
|
|
||||||
for ie := range sys.Pool.SubManyEose(ctx, []string{relayURL}, nostr.Filters{filter}, nostr.WithLabel("userevts")) {
|
|
||||||
results[ie.PubKey] = append(results[ie.PubKey], ie.Event)
|
|
||||||
}
|
|
||||||
}(relayURL, filter)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
return results, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sys *System) tryFetchMetadataFromNetwork(ctx context.Context, pubkey string) *ProfileMetadata {
|
func (sys *System) tryFetchMetadataFromNetwork(ctx context.Context, pubkey string) *ProfileMetadata {
|
||||||
thunk0 := sys.replaceableLoaders[kind_0].Load(ctx, pubkey)
|
thunk0 := sys.replaceableLoaders[kind_0].Load(ctx, pubkey)
|
||||||
evt, err := thunk0()
|
evt, err := thunk0()
|
||||||
|
@ -2,12 +2,8 @@ package sdk
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nbd-wtf/go-nostr"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var outboxShortTermCache = [256]ostcEntry{}
|
var outboxShortTermCache = [256]ostcEntry{}
|
||||||
@ -45,54 +41,3 @@ func (sys *System) FetchOutboxRelays(ctx context.Context, pubkey string, n int)
|
|||||||
|
|
||||||
return relays
|
return relays
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sys *System) ExpandQueriesByAuthorAndRelays(
|
|
||||||
ctx context.Context,
|
|
||||||
filter nostr.Filter,
|
|
||||||
) (map[string]nostr.Filter, error) {
|
|
||||||
n := len(filter.Authors)
|
|
||||||
if n == 0 {
|
|
||||||
return nil, fmt.Errorf("no authors in filter")
|
|
||||||
}
|
|
||||||
|
|
||||||
relaysForPubkey := make(map[string][]string, n)
|
|
||||||
mu := sync.Mutex{}
|
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
|
||||||
wg.Add(n)
|
|
||||||
for _, pubkey := range filter.Authors {
|
|
||||||
go func(pubkey string) {
|
|
||||||
defer wg.Done()
|
|
||||||
relayURLs := sys.FetchOutboxRelays(ctx, pubkey, 3)
|
|
||||||
c := 0
|
|
||||||
for _, r := range relayURLs {
|
|
||||||
relay, err := sys.Pool.EnsureRelay(r)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
mu.Lock()
|
|
||||||
relaysForPubkey[pubkey] = append(relaysForPubkey[pubkey], relay.URL)
|
|
||||||
mu.Unlock()
|
|
||||||
c++
|
|
||||||
if c == 3 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(pubkey)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
filterForRelay := make(map[string]nostr.Filter, n) // { [relay]: filter }
|
|
||||||
for pubkey, relays := range relaysForPubkey {
|
|
||||||
for _, relay := range relays {
|
|
||||||
flt, ok := filterForRelay[relay]
|
|
||||||
if !ok {
|
|
||||||
flt = filter.Clone()
|
|
||||||
filterForRelay[relay] = flt
|
|
||||||
}
|
|
||||||
flt.Authors = append(flt.Authors, pubkey)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return filterForRelay, nil
|
|
||||||
}
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user