diff --git a/sdk/feeds.go b/sdk/feeds.go index 1fb8a6b..0ec40a4 100644 --- a/sdk/feeds.go +++ b/sdk/feeds.go @@ -67,7 +67,7 @@ func (sys *System) StreamLiveFeed( } go func() { - sub := sys.Pool.SubMany(ctx, relays, nostr.Filters{filter}) + sub := sys.Pool.SubMany(ctx, relays, nostr.Filters{filter}, nostr.WithLabel("livefeed")) for evt := range sub { sys.StoreRelay.Publish(ctx, *evt.Event) if latest < evt.CreatedAt { @@ -114,15 +114,19 @@ func (sys *System) FetchFeedPage( for _, pubkey := range pubkeys { oldestKey := makePubkeyStreamKey(pubkeyStreamOldestPrefix, pubkey) + var oldestTimestamp nostr.Timestamp - filter := nostr.Filter{Authors: []string{pubkey}, Kinds: kinds} if data, _ := sys.KVStore.Get(oldestKey); data != nil { - oldest := decodeTimestamp(data) - filter.Since = &oldest + oldestTimestamp = decodeTimestamp(data) + if oldestTimestamp == 0 { + oldestTimestamp = nostr.Now() + } } - if filter.Since != nil && *filter.Since < until { - // eligible for a local query + filter := nostr.Filter{Authors: []string{pubkey}, Kinds: kinds} + + if until > oldestTimestamp { + // we can use our local database filter.Until = &until res, err := sys.StoreRelay.QuerySync(ctx, filter) if err != nil { @@ -137,29 +141,35 @@ func (sys *System) FetchFeedPage( } } - // if we didn't query the local store or we didn't get enough, then we will fetch from relays - filter.Until = filter.Since - filter.Since = nil - + // if we didn't get enough events from local database + // OR if we are requesting for very old stuff + // then we will query relays -- always with Until set to our oldestTimestamp+1 + // (so we don't get events we already have) relays := sys.FetchOutboxRelays(ctx, pubkey, 2) if len(relays) == 0 { wg.Done() continue } + fUntil := oldestTimestamp + 1 + filter.Until = &fUntil + filter.Since = nil + for ie := range sys.Pool.SubManyEose(ctx, relays, nostr.Filters{filter}, nostr.WithLabel("feedpage")) { + sys.StoreRelay.Publish(ctx, *ie.Event) - go func() { - sub := sys.Pool.SubManyEose(ctx, relays, nostr.Filters{filter}) - var oldest nostr.Timestamp - for ie := range sub { - sys.StoreRelay.Publish(ctx, *ie.Event) - oldest = ie.Event.CreatedAt + // we shouldn't need this check here, but against rogue relays we'll do it + if ie.Event.CreatedAt < oldestTimestamp { + oldestTimestamp = ie.Event.CreatedAt + } + + // we should check this because we might be just catching up to the point where the + // offset that was requested. + // so we don't add these events to our results, just to our local store (above) + if ie.Event.CreatedAt < until { events = append(events, ie.Event) } - wg.Done() - if oldest != 0 && filter.Until != nil && oldest < *filter.Until { - sys.KVStore.Set(oldestKey, encodeTimestamp(oldest)) - } - }() + } + wg.Done() + sys.KVStore.Set(oldestKey, encodeTimestamp(oldestTimestamp)) } wg.Wait()