mirror of
https://github.com/fiatjaf/khatru.git
synced 2026-04-07 14:06:51 +02:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
583f712fe4 | ||
|
|
28b1061166 | ||
|
|
25f19ce46e | ||
|
|
33545587b6 | ||
|
|
214371f8bd | ||
|
|
fbb40f3b74 |
176
adding.go
176
adding.go
@@ -11,34 +11,90 @@ import (
|
||||
|
||||
// AddEvent sends an event through then normal add pipeline, as if it was received from a websocket.
|
||||
func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) (skipBroadcast bool, writeError error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
if evt == nil {
|
||||
return false, errors.New("error: event is nil")
|
||||
}
|
||||
|
||||
if nostr.IsEphemeralKind(evt.Kind) {
|
||||
return false, rl.handleEphemeral(ctx, evt)
|
||||
} else {
|
||||
return rl.handleNormal(ctx, evt)
|
||||
}
|
||||
}
|
||||
|
||||
func (rl *Relay) handleNormal(ctx context.Context, evt *nostr.Event) (skipBroadcast bool, writeError error) {
|
||||
for _, reject := range rl.RejectEvent {
|
||||
if reject, msg := reject(ctx, evt); reject {
|
||||
if msg == "" {
|
||||
return false, errors.New("blocked: no reason")
|
||||
return true, errors.New("blocked: no reason")
|
||||
} else {
|
||||
return false, errors.New(nostr.NormalizeOKMessage(msg, "blocked"))
|
||||
return true, errors.New(nostr.NormalizeOKMessage(msg, "blocked"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if 20000 <= evt.Kind && evt.Kind < 30000 {
|
||||
// do not store ephemeral events
|
||||
for _, oee := range rl.OnEphemeralEvent {
|
||||
oee(ctx, evt)
|
||||
// Check to see if the event has been deleted by ID
|
||||
for _, query := range rl.QueryEvents {
|
||||
ch, err := query(ctx, nostr.Filter{
|
||||
Kinds: []int{5},
|
||||
Tags: nostr.TagMap{"#e": []string{evt.ID}},
|
||||
})
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
target := <-ch
|
||||
if target == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
return true, errors.New("blocked: this event has been deleted")
|
||||
}
|
||||
|
||||
// will store
|
||||
// regular kinds are just saved directly
|
||||
if nostr.IsRegularKind(evt.Kind) {
|
||||
for _, store := range rl.StoreEvent {
|
||||
if err := store(ctx, evt); err != nil {
|
||||
switch err {
|
||||
case eventstore.ErrDupEvent:
|
||||
return true, nil
|
||||
default:
|
||||
return false, fmt.Errorf("%s", nostr.NormalizeOKMessage(err.Error(), "error"))
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// will store
|
||||
// regular kinds are just saved directly
|
||||
if nostr.IsRegularKind(evt.Kind) {
|
||||
for _, store := range rl.StoreEvent {
|
||||
if err := store(ctx, evt); err != nil {
|
||||
// Check to see if the event has been deleted by address
|
||||
for _, query := range rl.QueryEvents {
|
||||
dTagValue := ""
|
||||
for _, tag := range evt.Tags {
|
||||
if len(tag) > 0 && tag[0] == "d" {
|
||||
dTagValue = tag[1]
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
address := fmt.Sprintf("%d:%s:%s", evt.Kind, evt.PubKey, dTagValue)
|
||||
ch, err := query(ctx, nostr.Filter{
|
||||
Kinds: []int{5},
|
||||
Since: &evt.CreatedAt,
|
||||
Tags: nostr.TagMap{"#a": []string{address}},
|
||||
})
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
target := <-ch
|
||||
if target == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
return true, errors.New("blocked: this event has been deleted")
|
||||
}
|
||||
|
||||
// otherwise it's a replaceable -- so we'll use the replacer functions if we have any
|
||||
if len(rl.ReplaceEvent) > 0 {
|
||||
for _, repl := range rl.ReplaceEvent {
|
||||
if err := repl(ctx, evt); err != nil {
|
||||
switch err {
|
||||
case eventstore.ErrDupEvent:
|
||||
return true, nil
|
||||
@@ -48,68 +104,54 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) (skipBroadcast
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// otherwise it's a replaceable -- so we'll use the replacer functions if we have any
|
||||
if len(rl.ReplaceEvent) > 0 {
|
||||
for _, repl := range rl.ReplaceEvent {
|
||||
if err := repl(ctx, evt); err != nil {
|
||||
switch err {
|
||||
// otherwise do it the manual way
|
||||
filter := nostr.Filter{Limit: 1, Kinds: []int{evt.Kind}, Authors: []string{evt.PubKey}}
|
||||
if nostr.IsAddressableKind(evt.Kind) {
|
||||
// when addressable, add the "d" tag to the filter
|
||||
filter.Tags = nostr.TagMap{"d": []string{evt.Tags.GetD()}}
|
||||
}
|
||||
|
||||
// now we fetch old events and delete them
|
||||
shouldStore := true
|
||||
for _, query := range rl.QueryEvents {
|
||||
ch, err := query(ctx, filter)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for previous := range ch {
|
||||
if isOlder(previous, evt) {
|
||||
for _, del := range rl.DeleteEvent {
|
||||
del(ctx, previous)
|
||||
}
|
||||
} else {
|
||||
// we found a more recent event, so we won't delete it and also will not store this new one
|
||||
shouldStore = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// store
|
||||
if shouldStore {
|
||||
for _, store := range rl.StoreEvent {
|
||||
if saveErr := store(ctx, evt); saveErr != nil {
|
||||
switch saveErr {
|
||||
case eventstore.ErrDupEvent:
|
||||
return true, nil
|
||||
default:
|
||||
return false, fmt.Errorf("%s", nostr.NormalizeOKMessage(err.Error(), "error"))
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// otherwise do it the manual way
|
||||
filter := nostr.Filter{Limit: 1, Kinds: []int{evt.Kind}, Authors: []string{evt.PubKey}}
|
||||
if nostr.IsAddressableKind(evt.Kind) {
|
||||
// when addressable, add the "d" tag to the filter
|
||||
filter.Tags = nostr.TagMap{"d": []string{evt.Tags.GetD()}}
|
||||
}
|
||||
|
||||
// now we fetch old events and delete them
|
||||
shouldStore := true
|
||||
for _, query := range rl.QueryEvents {
|
||||
ch, err := query(ctx, filter)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for previous := range ch {
|
||||
if isOlder(previous, evt) {
|
||||
for _, del := range rl.DeleteEvent {
|
||||
del(ctx, previous)
|
||||
}
|
||||
} else {
|
||||
// we found a more recent event, so we won't delete it and also will not store this new one
|
||||
shouldStore = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// store
|
||||
if shouldStore {
|
||||
for _, store := range rl.StoreEvent {
|
||||
if saveErr := store(ctx, evt); saveErr != nil {
|
||||
switch saveErr {
|
||||
case eventstore.ErrDupEvent:
|
||||
return true, nil
|
||||
default:
|
||||
return false, fmt.Errorf("%s", nostr.NormalizeOKMessage(saveErr.Error(), "error"))
|
||||
}
|
||||
return false, fmt.Errorf("%s", nostr.NormalizeOKMessage(saveErr.Error(), "error"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, ons := range rl.OnEventSaved {
|
||||
ons(ctx, evt)
|
||||
}
|
||||
|
||||
// track event expiration if applicable
|
||||
rl.expirationManager.trackEvent(evt)
|
||||
}
|
||||
|
||||
for _, ons := range rl.OnEventSaved {
|
||||
ons(ctx, evt)
|
||||
}
|
||||
|
||||
// track event expiration if applicable
|
||||
rl.expirationManager.trackEvent(evt)
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ func (bs BlossomServer) handleUploadCheck(w http.ResponseWriter, r *http.Request
|
||||
blossomError(w, "missing \"Authorization\" header", 401)
|
||||
return
|
||||
}
|
||||
if auth.Tags.GetFirst([]string{"t", "upload"}) == nil {
|
||||
if auth.Tags.FindWithValue("t", "upload") == nil {
|
||||
blossomError(w, "invalid \"Authorization\" event \"t\" tag", 403)
|
||||
return
|
||||
}
|
||||
@@ -59,7 +59,7 @@ func (bs BlossomServer) handleUpload(w http.ResponseWriter, r *http.Request) {
|
||||
blossomError(w, "missing \"Authorization\" header", 401)
|
||||
return
|
||||
}
|
||||
if auth.Tags.GetFirst([]string{"t", "upload"}) == nil {
|
||||
if auth.Tags.FindWithValue("t", "upload") == nil {
|
||||
blossomError(w, "invalid \"Authorization\" event \"t\" tag", 403)
|
||||
return
|
||||
}
|
||||
@@ -163,13 +163,13 @@ func (bs BlossomServer) handleGetBlob(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// if there is one, we check if it has the extra requirements
|
||||
if auth != nil {
|
||||
if auth.Tags.GetFirst([]string{"t", "get"}) == nil {
|
||||
if auth.Tags.FindWithValue("t", "get") == nil {
|
||||
blossomError(w, "invalid \"Authorization\" event \"t\" tag", 403)
|
||||
return
|
||||
}
|
||||
|
||||
if auth.Tags.GetFirst([]string{"x", hhash}) == nil &&
|
||||
auth.Tags.GetFirst([]string{"server", bs.ServiceURL}) == nil {
|
||||
if auth.Tags.FindWithValue("x", hhash) == nil &&
|
||||
auth.Tags.FindWithValue("server", bs.ServiceURL) == nil {
|
||||
blossomError(w, "invalid \"Authorization\" event \"x\" or \"server\" tag", 403)
|
||||
return
|
||||
}
|
||||
@@ -239,7 +239,7 @@ func (bs BlossomServer) handleList(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// if there is one, we check if it has the extra requirements
|
||||
if auth != nil {
|
||||
if auth.Tags.GetFirst([]string{"t", "list"}) == nil {
|
||||
if auth.Tags.FindWithValue("t", "list") == nil {
|
||||
blossomError(w, "invalid \"Authorization\" event \"t\" tag", 403)
|
||||
return
|
||||
}
|
||||
@@ -283,7 +283,7 @@ func (bs BlossomServer) handleDelete(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
if auth != nil {
|
||||
if auth.Tags.GetFirst([]string{"t", "delete"}) == nil {
|
||||
if auth.Tags.FindWithValue("t", "delete") == nil {
|
||||
blossomError(w, "invalid \"Authorization\" event \"t\" tag", 403)
|
||||
return
|
||||
}
|
||||
@@ -296,8 +296,8 @@ func (bs BlossomServer) handleDelete(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
hhash = hhash[1:]
|
||||
if auth.Tags.GetFirst([]string{"x", hhash}) == nil &&
|
||||
auth.Tags.GetFirst([]string{"server", bs.ServiceURL}) == nil {
|
||||
if auth.Tags.FindWithValue("x", hhash) == nil &&
|
||||
auth.Tags.FindWithValue("server", bs.ServiceURL) == nil {
|
||||
blossomError(w, "invalid \"Authorization\" event \"x\" or \"server\" tag", 403)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -6,6 +6,6 @@ import (
|
||||
|
||||
// BroadcastEvent emits an event to all listeners whose filters' match, skipping all filters and actions
|
||||
// it also doesn't attempt to store the event or trigger any reactions or callbacks
|
||||
func (rl *Relay) BroadcastEvent(evt *nostr.Event) {
|
||||
rl.notifyListeners(evt)
|
||||
func (rl *Relay) BroadcastEvent(evt *nostr.Event) int {
|
||||
return rl.notifyListeners(evt)
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ router.Route().
|
||||
return true
|
||||
case event.Kind <= 12 && event.Kind >= 9:
|
||||
return true
|
||||
case event.Tags.GetFirst([]string{"h", ""}) != nil:
|
||||
case event.Tags.Find("h") != nil:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
|
||||
26
ephemeral.go
Normal file
26
ephemeral.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package khatru
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
|
||||
func (rl *Relay) handleEphemeral(ctx context.Context, evt *nostr.Event) error {
|
||||
for _, reject := range rl.RejectEvent {
|
||||
if reject, msg := reject(ctx, evt); reject {
|
||||
if msg == "" {
|
||||
return errors.New("blocked: no reason")
|
||||
} else {
|
||||
return errors.New(nostr.NormalizeOKMessage(msg, "blocked"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, oee := range rl.OnEphemeralEvent {
|
||||
oee(ctx, evt)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -13,7 +13,7 @@ func main() {
|
||||
relay := khatru.NewRelay()
|
||||
|
||||
db := lmdb.LMDBBackend{Path: "/tmp/khatru-lmdb-tmp"}
|
||||
os.MkdirAll(db.Path, 0755)
|
||||
os.MkdirAll(db.Path, 0o755)
|
||||
if err := db.Init(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@ func main() {
|
||||
relay := khatru.NewRelay()
|
||||
|
||||
db := lmdb.LMDBBackend{Path: "/tmp/exclusive"}
|
||||
os.MkdirAll(db.Path, 0755)
|
||||
os.MkdirAll(db.Path, 0o755)
|
||||
if err := db.Init(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ func main() {
|
||||
return slices.Contains(filter.Kinds, 1) && slices.Contains(filter.Tags["t"], "spam")
|
||||
}).
|
||||
Event(func(event *nostr.Event) bool {
|
||||
return event.Kind == 1 && event.Tags.GetFirst([]string{"t", "spam"}) != nil
|
||||
return event.Kind == 1 && event.Tags.FindWithValue("t", "spam") != nil
|
||||
}).
|
||||
Relay(r2)
|
||||
|
||||
|
||||
31
handlers.go
31
handlers.go
@@ -6,6 +6,7 @@ import (
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -141,6 +142,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
continue
|
||||
}
|
||||
|
||||
// this is safe because ReadMessage() will always create a new slice
|
||||
message := unsafe.String(unsafe.SliceData(msgb), len(msgb))
|
||||
|
||||
// parse messages sequentially otherwise sonic breaks
|
||||
@@ -215,9 +217,16 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
if env.Event.Kind == 5 {
|
||||
// this always returns "blocked: " whenever it returns an error
|
||||
writeErr = srl.handleDeleteRequest(ctx, &env.Event)
|
||||
} else {
|
||||
// this will also always return a prefixed reason
|
||||
skipBroadcast, writeErr = srl.AddEvent(ctx, &env.Event)
|
||||
}
|
||||
|
||||
if writeErr == nil {
|
||||
if nostr.IsEphemeralKind(env.Event.Kind) {
|
||||
// this will also always return a prefixed reason
|
||||
writeErr = srl.handleEphemeral(ctx, &env.Event)
|
||||
} else {
|
||||
// this will also always return a prefixed reason
|
||||
skipBroadcast, writeErr = srl.handleNormal(ctx, &env.Event)
|
||||
}
|
||||
}
|
||||
|
||||
var reason string
|
||||
@@ -227,9 +236,20 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
ovw(ctx, &env.Event)
|
||||
}
|
||||
if !skipBroadcast {
|
||||
srl.notifyListeners(&env.Event)
|
||||
n := srl.notifyListeners(&env.Event)
|
||||
|
||||
// the number of notified listeners matters in ephemeral events
|
||||
if nostr.IsEphemeralKind(env.Event.Kind) {
|
||||
if n == 0 {
|
||||
ok = false
|
||||
reason = "mute: no one was listening for this"
|
||||
} else {
|
||||
reason = "broadcasted to " + strconv.Itoa(n) + " listeners"
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ok = false
|
||||
reason = writeErr.Error()
|
||||
if strings.HasPrefix(reason, "auth-required:") {
|
||||
RequestAuth(ctx)
|
||||
@@ -244,14 +264,13 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
var total int64
|
||||
var hll *hyperloglog.HyperLogLog
|
||||
uneligibleForHLL := false
|
||||
|
||||
srl := rl
|
||||
if rl.getSubRelayFromFilter != nil {
|
||||
srl = rl.getSubRelayFromFilter(env.Filter)
|
||||
}
|
||||
|
||||
if offset := nip45.HyperLogLogEventPubkeyOffsetForFilter(env.Filter); offset != -1 && !uneligibleForHLL {
|
||||
if offset := nip45.HyperLogLogEventPubkeyOffsetForFilter(env.Filter); offset != -1 {
|
||||
total, hll = srl.handleCountRequestWithHLL(ctx, ws, env.Filter, offset)
|
||||
} else {
|
||||
total = srl.handleCountRequest(ctx, ws, env.Filter)
|
||||
|
||||
@@ -132,15 +132,20 @@ func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
|
||||
delete(rl.clients, ws)
|
||||
}
|
||||
|
||||
func (rl *Relay) notifyListeners(event *nostr.Event) {
|
||||
// returns how many listeners were notified
|
||||
func (rl *Relay) notifyListeners(event *nostr.Event) int {
|
||||
count := 0
|
||||
listenersloop:
|
||||
for _, listener := range rl.listeners {
|
||||
if listener.filter.Matches(event) {
|
||||
for _, pb := range rl.PreventBroadcast {
|
||||
if pb(listener.ws, event) {
|
||||
return
|
||||
continue listenersloop
|
||||
}
|
||||
}
|
||||
listener.ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &listener.id, Event: *event})
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
7
nip86.go
7
nip86.go
@@ -86,10 +86,11 @@ func (rl *Relay) HandleNIP86(w http.ResponseWriter, r *http.Request) {
|
||||
goto respond
|
||||
}
|
||||
|
||||
if uTag := evt.Tags.GetFirst([]string{"u", ""}); uTag == nil || rl.getBaseURL(r) != (*uTag)[1] {
|
||||
resp.Error = "invalid 'u' tag"
|
||||
if uTag := evt.Tags.Find("u"); uTag == nil || nostr.NormalizeURL(rl.getBaseURL(r)) != nostr.NormalizeURL(uTag[1]) {
|
||||
resp.Error = fmt.Sprintf("invalid 'u' tag, got '%s', expected '%s'",
|
||||
nostr.NormalizeURL(rl.getBaseURL(r)), nostr.NormalizeURL(uTag[1]))
|
||||
goto respond
|
||||
} else if pht := evt.Tags.GetFirst([]string{"payload", hex.EncodeToString(payloadHash[:])}); pht == nil {
|
||||
} else if pht := evt.Tags.FindWithValue("payload", hex.EncodeToString(payloadHash[:])); pht == nil {
|
||||
resp.Error = "invalid auth event payload hash"
|
||||
goto respond
|
||||
} else if evt.CreatedAt < nostr.Now()-30 {
|
||||
|
||||
@@ -2,7 +2,6 @@ package policies
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"slices"
|
||||
|
||||
"github.com/fiatjaf/khatru"
|
||||
|
||||
@@ -151,33 +151,64 @@ func TestBasicRelayFunctionality(t *testing.T) {
|
||||
t.Fatalf("failed to publish deletion event: %v", err)
|
||||
}
|
||||
|
||||
// Try to query the deleted event
|
||||
sub, err := client2.Subscribe(ctx, []nostr.Filter{{
|
||||
IDs: []string{evt3.ID},
|
||||
}})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to subscribe: %v", err)
|
||||
}
|
||||
defer sub.Unsub()
|
||||
{
|
||||
// Try to query the deleted event
|
||||
sub, err := client2.Subscribe(ctx, []nostr.Filter{{
|
||||
IDs: []string{evt3.ID},
|
||||
}})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to subscribe: %v", err)
|
||||
}
|
||||
defer sub.Unsub()
|
||||
|
||||
// Should get EOSE without receiving the deleted event
|
||||
gotEvent := false
|
||||
for {
|
||||
select {
|
||||
case <-sub.Events:
|
||||
gotEvent = true
|
||||
case <-sub.EndOfStoredEvents:
|
||||
if gotEvent {
|
||||
t.Error("should not have received deleted event")
|
||||
// Should get EOSE without receiving the deleted event
|
||||
gotEvent := false
|
||||
DeletedLoop:
|
||||
for {
|
||||
select {
|
||||
case <-sub.Events:
|
||||
gotEvent = true
|
||||
case <-sub.EndOfStoredEvents:
|
||||
if gotEvent {
|
||||
t.Error("should not have received deleted event")
|
||||
}
|
||||
break DeletedLoop
|
||||
case <-ctx.Done():
|
||||
t.Fatal("timeout waiting for EOSE")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// Try to query the deletion itself
|
||||
sub, err := client2.Subscribe(ctx, []nostr.Filter{{
|
||||
Kinds: []int{5},
|
||||
}})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to subscribe: %v", err)
|
||||
}
|
||||
defer sub.Unsub()
|
||||
|
||||
// Should get EOSE without receiving the deleted event
|
||||
gotEvent := false
|
||||
DeletionLoop:
|
||||
for {
|
||||
select {
|
||||
case <-sub.Events:
|
||||
gotEvent = true
|
||||
case <-sub.EndOfStoredEvents:
|
||||
if !gotEvent {
|
||||
t.Error("should have received deletion event")
|
||||
}
|
||||
break DeletionLoop
|
||||
case <-ctx.Done():
|
||||
t.Fatal("timeout waiting for EOSE")
|
||||
}
|
||||
return
|
||||
case <-ctx.Done():
|
||||
t.Fatal("timeout waiting for EOSE")
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// test 4: teplaceable events
|
||||
// test 4: replaceable events
|
||||
t.Run("replaceable events", func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
Reference in New Issue
Block a user