mirror of
https://github.com/fiatjaf/khatru.git
synced 2025-03-17 13:22:56 +01:00
test expiration.
This commit is contained in:
parent
c73037ac82
commit
1a03178d83
@ -33,22 +33,24 @@ func (h *expiringEventHeap) Pop() interface{} {
|
|||||||
return x
|
return x
|
||||||
}
|
}
|
||||||
|
|
||||||
type ExpirationManager struct {
|
type expirationManager struct {
|
||||||
events expiringEventHeap
|
events expiringEventHeap
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
relay *Relay
|
relay *Relay
|
||||||
|
interval time.Duration
|
||||||
initialScanDone bool
|
initialScanDone bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newExpirationManager(relay *Relay) *ExpirationManager {
|
func newExpirationManager(relay *Relay) *expirationManager {
|
||||||
return &ExpirationManager{
|
return &expirationManager{
|
||||||
events: make(expiringEventHeap, 0),
|
events: make(expiringEventHeap, 0),
|
||||||
relay: relay,
|
relay: relay,
|
||||||
|
interval: time.Hour,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em *ExpirationManager) start(ctx context.Context) {
|
func (em *expirationManager) start(ctx context.Context) {
|
||||||
ticker := time.NewTicker(time.Hour)
|
ticker := time.NewTicker(em.interval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@ -66,7 +68,7 @@ func (em *ExpirationManager) start(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em *ExpirationManager) initialScan(ctx context.Context) {
|
func (em *expirationManager) initialScan(ctx context.Context) {
|
||||||
em.mu.Lock()
|
em.mu.Lock()
|
||||||
defer em.mu.Unlock()
|
defer em.mu.Unlock()
|
||||||
|
|
||||||
@ -90,7 +92,7 @@ func (em *ExpirationManager) initialScan(ctx context.Context) {
|
|||||||
heap.Init(&em.events)
|
heap.Init(&em.events)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em *ExpirationManager) checkExpiredEvents(ctx context.Context) {
|
func (em *expirationManager) checkExpiredEvents(ctx context.Context) {
|
||||||
em.mu.Lock()
|
em.mu.Lock()
|
||||||
defer em.mu.Unlock()
|
defer em.mu.Unlock()
|
||||||
|
|
||||||
@ -121,7 +123,7 @@ func (em *ExpirationManager) checkExpiredEvents(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em *ExpirationManager) trackEvent(evt *nostr.Event) {
|
func (em *expirationManager) trackEvent(evt *nostr.Event) {
|
||||||
if expiresAt := nip40.GetExpiration(evt.Tags); expiresAt != -1 {
|
if expiresAt := nip40.GetExpiration(evt.Tags); expiresAt != -1 {
|
||||||
em.mu.Lock()
|
em.mu.Lock()
|
||||||
heap.Push(&em.events, expiringEvent{
|
heap.Push(&em.events, expiringEvent{
|
||||||
|
2
relay.go
2
relay.go
@ -115,7 +115,7 @@ type Relay struct {
|
|||||||
MaxMessageSize int64 // Maximum message size allowed from peer.
|
MaxMessageSize int64 // Maximum message size allowed from peer.
|
||||||
|
|
||||||
// NIP-40 expiration manager
|
// NIP-40 expiration manager
|
||||||
expirationManager *ExpirationManager
|
expirationManager *expirationManager
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rl *Relay) getBaseURL(r *http.Request) string {
|
func (rl *Relay) getBaseURL(r *http.Request) string {
|
||||||
|
@ -3,6 +3,7 @@ package khatru
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -238,7 +239,89 @@ func TestBasicRelayFunctionality(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// test 5: unauthorized deletion
|
// test 5: event expiration
|
||||||
|
t.Run("event expiration", func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// create a new relay with shorter expiration check interval
|
||||||
|
relay := NewRelay()
|
||||||
|
relay.expirationManager.interval = 3 * time.Second // check every 3 seconds
|
||||||
|
store := slicestore.SliceStore{}
|
||||||
|
store.Init()
|
||||||
|
relay.StoreEvent = append(relay.StoreEvent, store.SaveEvent)
|
||||||
|
relay.QueryEvents = append(relay.QueryEvents, store.QueryEvents)
|
||||||
|
relay.DeleteEvent = append(relay.DeleteEvent, store.DeleteEvent)
|
||||||
|
|
||||||
|
// start test server
|
||||||
|
server := httptest.NewServer(relay)
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
// connect test client
|
||||||
|
url := "ws" + server.URL[4:]
|
||||||
|
client, err := nostr.RelayConnect(context.Background(), url)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to connect client: %v", err)
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// create event that expires in 2 seconds
|
||||||
|
expiration := strconv.FormatInt(int64(nostr.Now()+2), 10)
|
||||||
|
evt := createEvent(sk1, 1, "i will expire soon", nostr.Tags{{"expiration", expiration}})
|
||||||
|
err = client.Publish(ctx, evt)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to publish event: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify event exists initially
|
||||||
|
sub, err := client.Subscribe(ctx, []nostr.Filter{{
|
||||||
|
IDs: []string{evt.ID},
|
||||||
|
}})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to subscribe: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// should get the event
|
||||||
|
select {
|
||||||
|
case env := <-sub.Events:
|
||||||
|
if env.ID != evt.ID {
|
||||||
|
t.Error("got wrong event")
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.Fatal("timeout waiting for event")
|
||||||
|
}
|
||||||
|
sub.Unsub()
|
||||||
|
|
||||||
|
// wait for expiration check (>3 seconds)
|
||||||
|
time.Sleep(4 * time.Second)
|
||||||
|
|
||||||
|
// verify event no longer exists
|
||||||
|
sub, err = client.Subscribe(ctx, []nostr.Filter{{
|
||||||
|
IDs: []string{evt.ID},
|
||||||
|
}})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to subscribe: %v", err)
|
||||||
|
}
|
||||||
|
defer sub.Unsub()
|
||||||
|
|
||||||
|
// should get EOSE without receiving the expired event
|
||||||
|
gotEvent := false
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-sub.Events:
|
||||||
|
gotEvent = true
|
||||||
|
case <-sub.EndOfStoredEvents:
|
||||||
|
if gotEvent {
|
||||||
|
t.Error("should not have received expired event")
|
||||||
|
}
|
||||||
|
return
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.Fatal("timeout waiting for EOSE")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// test 6: unauthorized deletion
|
||||||
t.Run("unauthorized deletion", func(t *testing.T) {
|
t.Run("unauthorized deletion", func(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user