go-nostr/sdk/feeds_test.go
2025-01-20 13:45:43 -03:00

204 lines
4.7 KiB
Go

package sdk
import (
"context"
"testing"
"time"
"github.com/fiatjaf/eventstore/slicestore"
"github.com/fiatjaf/khatru"
"github.com/nbd-wtf/go-nostr"
"github.com/stretchr/testify/require"
)
func TestStreamLiveFeed(t *testing.T) {
ctx := context.Background()
// start 3 local relays
relay1 := khatru.NewRelay()
relay2 := khatru.NewRelay()
relay3 := khatru.NewRelay()
for _, r := range []*khatru.Relay{relay1, relay2, relay3} {
db := slicestore.SliceStore{}
db.Init()
r.QueryEvents = append(r.QueryEvents, db.QueryEvents)
r.StoreEvent = append(r.StoreEvent, db.SaveEvent)
r.ReplaceEvent = append(r.ReplaceEvent, db.ReplaceEvent)
r.DeleteEvent = append(r.DeleteEvent, db.DeleteEvent)
defer db.Close()
}
s1 := make(chan bool)
s2 := make(chan bool)
s3 := make(chan bool)
go func() {
err := relay1.Start("127.0.0.1", 48481, s1)
require.NoError(t, err)
}()
go func() {
err := relay2.Start("127.0.0.1", 48482, s2)
require.NoError(t, err)
}()
go func() {
err := relay3.Start("127.0.0.1", 48483, s3)
require.NoError(t, err)
}()
defer relay1.Shutdown(ctx)
defer relay2.Shutdown(ctx)
defer relay3.Shutdown(ctx)
<-s1
<-s2
<-s3
// generate two random keypairs for testing
sk1 := nostr.GeneratePrivateKey()
pk1, _ := nostr.GetPublicKey(sk1)
sk2 := nostr.GeneratePrivateKey()
pk2, _ := nostr.GetPublicKey(sk2)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// first publish relay lists to relay1 for both users
relayListEvt1 := nostr.Event{
PubKey: pk1,
CreatedAt: nostr.Now(),
Kind: 10002,
Tags: nostr.Tags{
{"r", "ws://localhost:48482", "write"},
{"r", "ws://localhost:48483", "write"},
},
Content: "",
}
relayListEvt1.Sign(sk1)
relayListEvt2 := nostr.Event{
PubKey: pk2,
CreatedAt: nostr.Now(),
Kind: 10002,
Tags: nostr.Tags{
{"r", "ws://localhost:48482", "write"},
{"r", "ws://localhost:48483", "write"},
},
Content: "",
}
relayListEvt2.Sign(sk2)
// publish relay lists to relay1
relay, err := nostr.RelayConnect(ctx, "ws://localhost:48481")
if err != nil {
t.Fatalf("failed to connect to relay1: %v", err)
}
if err := relay.Publish(ctx, relayListEvt1); err != nil {
t.Fatalf("failed to publish relay list 1: %v", err)
}
if err := relay.Publish(ctx, relayListEvt2); err != nil {
t.Fatalf("failed to publish relay list 2: %v", err)
}
relay.Close()
// create a new system instance pointing only to relay1 as the "indexer"
sys := NewSystem(WithRelayListRelays([]string{
"ws://localhost:48481",
}))
defer sys.Close()
// prepublish some events
evt1 := nostr.Event{
PubKey: pk1,
CreatedAt: nostr.Now(),
Kind: 1,
Tags: nostr.Tags{},
Content: "hello from user 1",
}
evt1.Sign(sk1)
evt2 := nostr.Event{
PubKey: pk2,
CreatedAt: nostr.Now(),
Kind: 1,
Tags: nostr.Tags{},
Content: "hello from user 2",
}
evt2.Sign(sk2)
// publish events concurrently to relays 2 and 3
go sys.Pool.PublishMany(ctx, []string{"ws://localhost:48482", "ws://localhost:48483"}, evt1)
go sys.Pool.PublishMany(ctx, []string{"ws://localhost:48482", "ws://localhost:48483"}, evt2)
// start streaming events for both pubkeys
events, err := sys.StreamLiveFeed(ctx, []string{pk1, pk2}, []int{1})
if err != nil {
t.Fatalf("failed to start streaming: %v", err)
}
{
// wait for the prepublished events
receivedEvt1 := false
receivedEvt2 := false
timeout := time.After(5 * time.Second)
for !receivedEvt1 || !receivedEvt2 {
select {
case evt := <-events:
if evt.ID == evt1.ID {
receivedEvt1 = true
}
if evt.ID == evt2.ID {
receivedEvt2 = true
}
case <-timeout:
t.Fatal("timeout waiting for events")
}
}
}
{
// publish some live events
evt1 := nostr.Event{
PubKey: pk1,
CreatedAt: nostr.Now(),
Kind: 1,
Tags: nostr.Tags{},
Content: "hello from user 1",
}
evt1.Sign(sk1)
evt2 := nostr.Event{
PubKey: pk2,
CreatedAt: nostr.Now(),
Kind: 1,
Tags: nostr.Tags{},
Content: "hello from user 2",
}
evt2.Sign(sk2)
// publish events concurrently to relays 2 and 3
go sys.Pool.PublishMany(ctx, []string{"ws://localhost:48482", "ws://localhost:48483"}, evt1)
go sys.Pool.PublishMany(ctx, []string{"ws://localhost:48482", "ws://localhost:48483"}, evt2)
// wait for events
receivedEvt1 := false
receivedEvt2 := false
timeout := time.After(5 * time.Second)
for !receivedEvt1 || !receivedEvt2 {
select {
case evt := <-events:
if evt.ID == evt1.ID {
receivedEvt1 = true
}
if evt.ID == evt2.ID {
receivedEvt2 = true
}
case <-timeout:
t.Fatal("timeout waiting for events")
}
}
}
}