expose subscriptions from relay and keep that updated.

This commit is contained in:
fiatjaf 2023-05-30 14:55:44 -03:00
parent 0f7a3f01f2
commit a2941876e3
No known key found for this signature in database
GPG Key ID: BAD43C4BE5C1A3A1
2 changed files with 11 additions and 7 deletions

View File

@ -38,7 +38,7 @@ type Relay struct {
RequestHeader http.Header // e.g. for origin header RequestHeader http.Header // e.g. for origin header
Connection *Connection Connection *Connection
subscriptions *xsync.MapOf[string, *Subscription] Subscriptions *xsync.MapOf[string, *Subscription]
Challenges chan string // NIP-42 Challenges Challenges chan string // NIP-42 Challenges
Notices chan string Notices chan string
@ -59,7 +59,7 @@ func NewRelay(ctx context.Context, url string) *Relay {
return &Relay{ return &Relay{
URL: NormalizeURL(url), URL: NormalizeURL(url),
connectionContext: ctx, connectionContext: ctx,
subscriptions: xsync.NewMapOf[*Subscription](), Subscriptions: xsync.NewMapOf[*Subscription](),
okCallbacks: xsync.NewMapOf[func(bool, string)](), okCallbacks: xsync.NewMapOf[func(bool, string)](),
} }
} }
@ -184,7 +184,7 @@ func (r *Relay) Connect(ctx context.Context) error {
if env.SubscriptionID == nil { if env.SubscriptionID == nil {
continue continue
} }
if subscription, ok := r.subscriptions.Load(*env.SubscriptionID); !ok { if subscription, ok := r.Subscriptions.Load(*env.SubscriptionID); !ok {
InfoLogger.Printf("{%s} no subscription with id '%s'\n", r.URL, *env.SubscriptionID) InfoLogger.Printf("{%s} no subscription with id '%s'\n", r.URL, *env.SubscriptionID)
continue continue
} else { } else {
@ -218,7 +218,7 @@ func (r *Relay) Connect(ctx context.Context) error {
} }
case *EOSEEnvelope: case *EOSEEnvelope:
debugLog("{%s} %v\n", r.URL, message) debugLog("{%s} %v\n", r.URL, message)
if subscription, ok := r.subscriptions.Load(string(*env)); ok { if subscription, ok := r.Subscriptions.Load(string(*env)); ok {
subscription.emitEose.Do(func() { subscription.emitEose.Do(func() {
subscription.EndOfStoredEvents <- struct{}{} subscription.EndOfStoredEvents <- struct{}{}
}) })

View File

@ -47,10 +47,12 @@ func (sub *Subscription) Unsub() {
sub.mutex.Lock() sub.mutex.Lock()
defer sub.mutex.Unlock() defer sub.mutex.Unlock()
closeMsg := CloseEnvelope(sub.GetID()) id := sub.GetID()
closeMsg := CloseEnvelope(id)
closeb, _ := (&closeMsg).MarshalJSON() closeb, _ := (&closeMsg).MarshalJSON()
debugLog("{%s} sending %v", sub.Relay.URL, closeb) debugLog("{%s} sending %v", sub.Relay.URL, closeb)
sub.conn.WriteMessage(closeb) sub.conn.WriteMessage(closeb)
sub.Relay.Subscriptions.Delete(id)
if sub.stopped == false && sub.Events != nil { if sub.stopped == false && sub.Events != nil {
close(sub.Events) close(sub.Events)
@ -59,6 +61,7 @@ func (sub *Subscription) Unsub() {
} }
// Sub sets sub.Filters and then calls sub.Fire(ctx). // Sub sets sub.Filters and then calls sub.Fire(ctx).
// The subscription will be closed if the context expires.
func (sub *Subscription) Sub(ctx context.Context, filters Filters) { func (sub *Subscription) Sub(ctx context.Context, filters Filters) {
sub.Filters = filters sub.Filters = filters
sub.Fire() sub.Fire()
@ -66,9 +69,10 @@ func (sub *Subscription) Sub(ctx context.Context, filters Filters) {
// Fire sends the "REQ" command to the relay. // Fire sends the "REQ" command to the relay.
func (sub *Subscription) Fire() error { func (sub *Subscription) Fire() error {
sub.Relay.subscriptions.Store(sub.GetID(), sub) id := sub.GetID()
sub.Relay.Subscriptions.Store(id, sub)
reqb, _ := ReqEnvelope{sub.GetID(), sub.Filters}.MarshalJSON() reqb, _ := ReqEnvelope{id, sub.Filters}.MarshalJSON()
debugLog("{%s} sending %v", sub.Relay.URL, reqb) debugLog("{%s} sending %v", sub.Relay.URL, reqb)
err := sub.conn.WriteMessage(reqb) err := sub.conn.WriteMessage(reqb)
if err != nil { if err != nil {