allow more fine-grained control over subscription filters.

This commit is contained in:
fiatjaf 2022-11-19 14:00:29 -03:00
parent 8bc91a894c
commit b0ae497656
No known key found for this signature in database
GPG Key ID: BAD43C4BE5C1A3A1
3 changed files with 24 additions and 17 deletions

View File

@ -125,7 +125,7 @@ func (r *Relay) Connect() error {
} }
// check if the event matches the desired filter, ignore otherwise // check if the event matches the desired filter, ignore otherwise
if !subscription.filters.Match(&event) { if !subscription.Filters.Match(&event) {
continue continue
} }
@ -212,24 +212,30 @@ func (r *Relay) Subscribe(filters Filters) *Subscription {
panic(fmt.Errorf("must call .Connect() first before calling .Subscribe()")) panic(fmt.Errorf("must call .Connect() first before calling .Subscribe()"))
} }
sub := r.PrepareSubscription()
sub.Filters = filters
sub.Fire()
return sub
}
func (r *Relay) PrepareSubscription() *Subscription {
random := make([]byte, 7) random := make([]byte, 7)
rand.Read(random) rand.Read(random)
id := hex.EncodeToString(random) id := hex.EncodeToString(random)
return r.subscribe(id, filters)
return r.prepareSubscription(id)
} }
func (r *Relay) subscribe(id string, filters Filters) *Subscription { func (r *Relay) prepareSubscription(id string) *Subscription {
sub := Subscription{ sub := &Subscription{
conn: r.Connection, conn: r.Connection,
id: id, id: id,
Events: make(chan Event), Events: make(chan Event),
EndOfStoredEvents: make(chan struct{}, 1), EndOfStoredEvents: make(chan struct{}, 1),
} }
r.subscriptions.Store(sub.id, &sub) r.subscriptions.Store(sub.id, sub)
return sub
sub.Sub(filters)
return &sub
} }
func (r *Relay) Close() error { func (r *Relay) Close() error {

View File

@ -78,7 +78,8 @@ func (r *RelayPool) Add(url string, policy RelayPoolPolicy) chan error {
r.Relays.Store(relay.URL, relay) r.Relays.Store(relay.URL, relay)
r.subscriptions.Range(func(id string, filters Filters) bool { r.subscriptions.Range(func(id string, filters Filters) bool {
sub := relay.subscribe(id, filters) sub := relay.prepareSubscription(id)
sub.Sub(filters)
eventStream, _ := r.eventStreams.Load(id) eventStream, _ := r.eventStreams.Load(id)
go func(sub *Subscription) { go func(sub *Subscription) {
@ -119,7 +120,8 @@ func (r *RelayPool) Sub(filters Filters) (string, chan EventMessage) {
r.eventStreams.Store(id, eventStream) r.eventStreams.Store(id, eventStream)
r.Relays.Range(func(_ string, relay *Relay) bool { r.Relays.Range(func(_ string, relay *Relay) bool {
sub := relay.subscribe(id, filters) sub := relay.prepareSubscription(id)
sub.Sub(filters)
go func(sub *Subscription) { go func(sub *Subscription) {
for evt := range sub.Events { for evt := range sub.Events {

View File

@ -6,7 +6,7 @@ type Subscription struct {
id string id string
conn *Connection conn *Connection
filters Filters Filters Filters
Events chan Event Events chan Event
EndOfStoredEvents chan struct{} EndOfStoredEvents chan struct{}
@ -19,10 +19,6 @@ type EventMessage struct {
Relay string Relay string
} }
func (sub Subscription) GetFilters() Filters {
return sub.filters
}
func (sub Subscription) Unsub() { func (sub Subscription) Unsub() {
sub.conn.WriteJSON([]interface{}{"CLOSE", sub.id}) sub.conn.WriteJSON([]interface{}{"CLOSE", sub.id})
@ -33,10 +29,13 @@ func (sub Subscription) Unsub() {
} }
func (sub *Subscription) Sub(filters Filters) { func (sub *Subscription) Sub(filters Filters) {
sub.filters = filters sub.Filters = filters
sub.Fire()
}
func (sub *Subscription) Fire() {
message := []interface{}{"REQ", sub.id} message := []interface{}{"REQ", sub.id}
for _, filter := range sub.filters { for _, filter := range sub.Filters {
message = append(message, filter) message = append(message, filter)
} }