mirror of
https://github.com/nbd-wtf/go-nostr.git
synced 2025-05-20 15:39:56 +02:00
sdk: fix FetchFeedPage()
I'm not sure if it was even wrong before, but now at least I understand it and is checked against pen and paper.
This commit is contained in:
parent
4200144489
commit
b1571e0253
52
sdk/feeds.go
52
sdk/feeds.go
@ -67,7 +67,7 @@ func (sys *System) StreamLiveFeed(
|
|||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
sub := sys.Pool.SubMany(ctx, relays, nostr.Filters{filter})
|
sub := sys.Pool.SubMany(ctx, relays, nostr.Filters{filter}, nostr.WithLabel("livefeed"))
|
||||||
for evt := range sub {
|
for evt := range sub {
|
||||||
sys.StoreRelay.Publish(ctx, *evt.Event)
|
sys.StoreRelay.Publish(ctx, *evt.Event)
|
||||||
if latest < evt.CreatedAt {
|
if latest < evt.CreatedAt {
|
||||||
@ -114,15 +114,19 @@ func (sys *System) FetchFeedPage(
|
|||||||
|
|
||||||
for _, pubkey := range pubkeys {
|
for _, pubkey := range pubkeys {
|
||||||
oldestKey := makePubkeyStreamKey(pubkeyStreamOldestPrefix, pubkey)
|
oldestKey := makePubkeyStreamKey(pubkeyStreamOldestPrefix, pubkey)
|
||||||
|
var oldestTimestamp nostr.Timestamp
|
||||||
|
|
||||||
filter := nostr.Filter{Authors: []string{pubkey}, Kinds: kinds}
|
|
||||||
if data, _ := sys.KVStore.Get(oldestKey); data != nil {
|
if data, _ := sys.KVStore.Get(oldestKey); data != nil {
|
||||||
oldest := decodeTimestamp(data)
|
oldestTimestamp = decodeTimestamp(data)
|
||||||
filter.Since = &oldest
|
if oldestTimestamp == 0 {
|
||||||
|
oldestTimestamp = nostr.Now()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if filter.Since != nil && *filter.Since < until {
|
filter := nostr.Filter{Authors: []string{pubkey}, Kinds: kinds}
|
||||||
// eligible for a local query
|
|
||||||
|
if until > oldestTimestamp {
|
||||||
|
// we can use our local database
|
||||||
filter.Until = &until
|
filter.Until = &until
|
||||||
res, err := sys.StoreRelay.QuerySync(ctx, filter)
|
res, err := sys.StoreRelay.QuerySync(ctx, filter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -137,29 +141,35 @@ func (sys *System) FetchFeedPage(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we didn't query the local store or we didn't get enough, then we will fetch from relays
|
// if we didn't get enough events from local database
|
||||||
filter.Until = filter.Since
|
// OR if we are requesting for very old stuff
|
||||||
filter.Since = nil
|
// then we will query relays -- always with Until set to our oldestTimestamp+1
|
||||||
|
// (so we don't get events we already have)
|
||||||
relays := sys.FetchOutboxRelays(ctx, pubkey, 2)
|
relays := sys.FetchOutboxRelays(ctx, pubkey, 2)
|
||||||
if len(relays) == 0 {
|
if len(relays) == 0 {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
fUntil := oldestTimestamp + 1
|
||||||
|
filter.Until = &fUntil
|
||||||
|
filter.Since = nil
|
||||||
|
for ie := range sys.Pool.SubManyEose(ctx, relays, nostr.Filters{filter}, nostr.WithLabel("feedpage")) {
|
||||||
|
sys.StoreRelay.Publish(ctx, *ie.Event)
|
||||||
|
|
||||||
go func() {
|
// we shouldn't need this check here, but against rogue relays we'll do it
|
||||||
sub := sys.Pool.SubManyEose(ctx, relays, nostr.Filters{filter})
|
if ie.Event.CreatedAt < oldestTimestamp {
|
||||||
var oldest nostr.Timestamp
|
oldestTimestamp = ie.Event.CreatedAt
|
||||||
for ie := range sub {
|
}
|
||||||
sys.StoreRelay.Publish(ctx, *ie.Event)
|
|
||||||
oldest = ie.Event.CreatedAt
|
// we should check this because we might be just catching up to the point where the
|
||||||
|
// offset that was requested.
|
||||||
|
// so we don't add these events to our results, just to our local store (above)
|
||||||
|
if ie.Event.CreatedAt < until {
|
||||||
events = append(events, ie.Event)
|
events = append(events, ie.Event)
|
||||||
}
|
}
|
||||||
wg.Done()
|
}
|
||||||
if oldest != 0 && filter.Until != nil && oldest < *filter.Until {
|
wg.Done()
|
||||||
sys.KVStore.Set(oldestKey, encodeTimestamp(oldest))
|
sys.KVStore.Set(oldestKey, encodeTimestamp(oldestTimestamp))
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user