mirror of
https://github.com/nbd-wtf/go-nostr.git
synced 2025-08-26 13:41:15 +02:00
sdk: test for feeds and a fix.
This commit is contained in:
24
sdk/feeds.go
24
sdk/feeds.go
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
@@ -33,10 +34,16 @@ func (sys *System) StreamLiveFeed(
|
||||
) (<-chan *nostr.Event, error) {
|
||||
events := make(chan *nostr.Event)
|
||||
|
||||
active := atomic.Int32{}
|
||||
active.Add(int32(len(pubkeys)))
|
||||
|
||||
// start a subscription for each relay group
|
||||
for _, pubkey := range pubkeys {
|
||||
relays := sys.FetchOutboxRelays(ctx, pubkey, 2)
|
||||
if len(relays) == 0 {
|
||||
if active.Add(-1) == 0 {
|
||||
close(events)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -59,9 +66,9 @@ func (sys *System) StreamLiveFeed(
|
||||
Kinds: kinds,
|
||||
}
|
||||
|
||||
sub := sys.Pool.SubMany(ctx, relays, nostr.Filters{filter})
|
||||
for evt := range sub {
|
||||
go func() {
|
||||
go func() {
|
||||
sub := sys.Pool.SubMany(ctx, relays, nostr.Filters{filter})
|
||||
for evt := range sub {
|
||||
sys.StoreRelay.Publish(ctx, *evt.Event)
|
||||
if latest < evt.CreatedAt {
|
||||
latest = evt.CreatedAt
|
||||
@@ -73,9 +80,14 @@ func (sys *System) StreamLiveFeed(
|
||||
oldest = evt.CreatedAt
|
||||
sys.KVStore.Set(oldestKey, encodeTimestamp(oldest))
|
||||
}
|
||||
}()
|
||||
events <- evt.Event
|
||||
}
|
||||
|
||||
events <- evt.Event
|
||||
}
|
||||
|
||||
if active.Add(-1) == 0 {
|
||||
close(events)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return events, nil
|
||||
|
203
sdk/feeds_test.go
Normal file
203
sdk/feeds_test.go
Normal file
@@ -0,0 +1,203 @@
|
||||
package sdk
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fiatjaf/eventstore/slicestore"
|
||||
"github.com/fiatjaf/khatru"
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestStreamLiveFeed(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// start 3 local relays
|
||||
relay1 := khatru.NewRelay()
|
||||
relay2 := khatru.NewRelay()
|
||||
relay3 := khatru.NewRelay()
|
||||
|
||||
for _, r := range []*khatru.Relay{relay1, relay2, relay3} {
|
||||
db := slicestore.SliceStore{}
|
||||
db.Init()
|
||||
r.QueryEvents = append(r.QueryEvents, db.QueryEvents)
|
||||
r.StoreEvent = append(r.StoreEvent, db.SaveEvent)
|
||||
r.ReplaceEvent = append(r.ReplaceEvent, db.ReplaceEvent)
|
||||
r.DeleteEvent = append(r.DeleteEvent, db.DeleteEvent)
|
||||
defer db.Close()
|
||||
}
|
||||
|
||||
s1 := make(chan bool)
|
||||
s2 := make(chan bool)
|
||||
s3 := make(chan bool)
|
||||
|
||||
go func() {
|
||||
err := relay1.Start("127.0.0.1", 48481, s1)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
go func() {
|
||||
err := relay2.Start("127.0.0.1", 48482, s2)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
go func() {
|
||||
err := relay3.Start("127.0.0.1", 48483, s3)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
defer relay1.Shutdown(ctx)
|
||||
defer relay2.Shutdown(ctx)
|
||||
defer relay3.Shutdown(ctx)
|
||||
|
||||
<-s1
|
||||
<-s2
|
||||
<-s3
|
||||
|
||||
// generate two random keypairs for testing
|
||||
sk1 := nostr.GeneratePrivateKey()
|
||||
pk1, _ := nostr.GetPublicKey(sk1)
|
||||
sk2 := nostr.GeneratePrivateKey()
|
||||
pk2, _ := nostr.GetPublicKey(sk2)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// first publish relay lists to relay1 for both users
|
||||
relayListEvt1 := nostr.Event{
|
||||
PubKey: pk1,
|
||||
CreatedAt: nostr.Now(),
|
||||
Kind: 10002,
|
||||
Tags: nostr.Tags{
|
||||
{"r", "ws://localhost:48482", "write"},
|
||||
{"r", "ws://localhost:48483", "write"},
|
||||
},
|
||||
Content: "",
|
||||
}
|
||||
relayListEvt1.Sign(sk1)
|
||||
|
||||
relayListEvt2 := nostr.Event{
|
||||
PubKey: pk2,
|
||||
CreatedAt: nostr.Now(),
|
||||
Kind: 10002,
|
||||
Tags: nostr.Tags{
|
||||
{"r", "ws://localhost:48482", "write"},
|
||||
{"r", "ws://localhost:48483", "write"},
|
||||
},
|
||||
Content: "",
|
||||
}
|
||||
relayListEvt2.Sign(sk2)
|
||||
|
||||
// publish relay lists to relay1
|
||||
relay, err := nostr.RelayConnect(ctx, "ws://localhost:48481")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to connect to relay1: %v", err)
|
||||
}
|
||||
if err := relay.Publish(ctx, relayListEvt1); err != nil {
|
||||
t.Fatalf("failed to publish relay list 1: %v", err)
|
||||
}
|
||||
if err := relay.Publish(ctx, relayListEvt2); err != nil {
|
||||
t.Fatalf("failed to publish relay list 2: %v", err)
|
||||
}
|
||||
relay.Close()
|
||||
|
||||
// create a new system instance pointing only to relay1 as the "indexer"
|
||||
sys := NewSystem(WithRelayListRelays([]string{
|
||||
"ws://localhost:48481",
|
||||
}))
|
||||
defer sys.Close()
|
||||
|
||||
// prepublish some events
|
||||
evt1 := nostr.Event{
|
||||
PubKey: pk1,
|
||||
CreatedAt: nostr.Now(),
|
||||
Kind: 1,
|
||||
Tags: nostr.Tags{},
|
||||
Content: "hello from user 1",
|
||||
}
|
||||
evt1.Sign(sk1)
|
||||
|
||||
evt2 := nostr.Event{
|
||||
PubKey: pk2,
|
||||
CreatedAt: nostr.Now(),
|
||||
Kind: 1,
|
||||
Tags: nostr.Tags{},
|
||||
Content: "hello from user 2",
|
||||
}
|
||||
evt2.Sign(sk2)
|
||||
|
||||
// publish events concurrently to relays 2 and 3
|
||||
go sys.Pool.PublishMany(ctx, []string{"ws://localhost:48482", "ws://localhost:48483"}, evt1)
|
||||
go sys.Pool.PublishMany(ctx, []string{"ws://localhost:48482", "ws://localhost:48483"}, evt2)
|
||||
|
||||
// start streaming events for both pubkeys
|
||||
events, err := sys.StreamLiveFeed(ctx, []string{pk1, pk2}, []int{1})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to start streaming: %v", err)
|
||||
}
|
||||
|
||||
{
|
||||
// wait for the prepublished events
|
||||
receivedEvt1 := false
|
||||
receivedEvt2 := false
|
||||
|
||||
timeout := time.After(5 * time.Second)
|
||||
for !receivedEvt1 || !receivedEvt2 {
|
||||
select {
|
||||
case evt := <-events:
|
||||
if evt.ID == evt1.ID {
|
||||
receivedEvt1 = true
|
||||
}
|
||||
if evt.ID == evt2.ID {
|
||||
receivedEvt2 = true
|
||||
}
|
||||
case <-timeout:
|
||||
t.Fatal("timeout waiting for events")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// publish some live events
|
||||
evt1 := nostr.Event{
|
||||
PubKey: pk1,
|
||||
CreatedAt: nostr.Now(),
|
||||
Kind: 1,
|
||||
Tags: nostr.Tags{},
|
||||
Content: "hello from user 1",
|
||||
}
|
||||
evt1.Sign(sk1)
|
||||
|
||||
evt2 := nostr.Event{
|
||||
PubKey: pk2,
|
||||
CreatedAt: nostr.Now(),
|
||||
Kind: 1,
|
||||
Tags: nostr.Tags{},
|
||||
Content: "hello from user 2",
|
||||
}
|
||||
evt2.Sign(sk2)
|
||||
|
||||
// publish events concurrently to relays 2 and 3
|
||||
go sys.Pool.PublishMany(ctx, []string{"ws://localhost:48482", "ws://localhost:48483"}, evt1)
|
||||
go sys.Pool.PublishMany(ctx, []string{"ws://localhost:48482", "ws://localhost:48483"}, evt2)
|
||||
|
||||
// wait for events
|
||||
receivedEvt1 := false
|
||||
receivedEvt2 := false
|
||||
|
||||
timeout := time.After(5 * time.Second)
|
||||
for !receivedEvt1 || !receivedEvt2 {
|
||||
select {
|
||||
case evt := <-events:
|
||||
if evt.ID == evt1.ID {
|
||||
receivedEvt1 = true
|
||||
}
|
||||
if evt.ID == evt2.ID {
|
||||
receivedEvt2 = true
|
||||
}
|
||||
case <-timeout:
|
||||
t.Fatal("timeout waiting for events")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@@ -31,10 +31,12 @@ func TestMetadataAndEvents(t *testing.T) {
|
||||
Authors: []string{meta.PubKey},
|
||||
Limit: 5,
|
||||
}
|
||||
events, err := sys.FetchUserEvents(ctx, filter)
|
||||
events := make([]*nostr.Event, 0, 5)
|
||||
for ie := range sys.Pool.SubManyEose(ctx, relays, nostr.Filters{filter}) {
|
||||
events = append(events, ie.Event)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, events[meta.PubKey])
|
||||
require.GreaterOrEqual(t, len(events[meta.PubKey]), 5)
|
||||
require.GreaterOrEqual(t, len(events), 5)
|
||||
}
|
||||
|
||||
func TestConcurrentMetadata(t *testing.T) {
|
||||
|
Reference in New Issue
Block a user