mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-06-01 10:41:34 +02:00
fn: add new EventPublisher event pub/sub system
This builds on the concurrent queue to create a generic way to allow goroutines to pub/sub information. An example includes being notified each time a state machine is able to carry out a new state transition.
This commit is contained in:
parent
1cbfda525a
commit
d702158d9b
142
fn/events.go
Normal file
142
fn/events.go
Normal file
@ -0,0 +1,142 @@
|
||||
package fn
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultQueueSize is the default size to use for concurrent queues.
|
||||
DefaultQueueSize = 10
|
||||
)
|
||||
|
||||
var (
|
||||
// nextID is the next subscription ID that will be used for a new event
|
||||
// receiver. This MUST be used atomically.
|
||||
nextID uint64
|
||||
)
|
||||
|
||||
// EventReceiver is a struct type that holds two queues for new and removed
|
||||
// items respectively.
|
||||
type EventReceiver[T any] struct {
|
||||
// id is the internal process-unique ID of the subscription.
|
||||
id uint64
|
||||
|
||||
// NewItemCreated is sent to when a new item was created successfully.
|
||||
NewItemCreated *ConcurrentQueue[T]
|
||||
|
||||
// ItemRemoved is sent to when an existing item was removed.
|
||||
ItemRemoved *ConcurrentQueue[T]
|
||||
}
|
||||
|
||||
// ID returns the internal process-unique ID of the subscription.
|
||||
func (e *EventReceiver[T]) ID() uint64 {
|
||||
return e.id
|
||||
}
|
||||
|
||||
// Stop stops the receiver from processing events.
|
||||
func (e *EventReceiver[T]) Stop() {
|
||||
e.NewItemCreated.Stop()
|
||||
e.ItemRemoved.Stop()
|
||||
}
|
||||
|
||||
// NewEventReceiver creates a new event receiver with concurrent queues of the
|
||||
// given size.
|
||||
func NewEventReceiver[T any](queueSize int) *EventReceiver[T] {
|
||||
created := NewConcurrentQueue[T](queueSize)
|
||||
created.Start()
|
||||
removed := NewConcurrentQueue[T](queueSize)
|
||||
removed.Start()
|
||||
|
||||
id := atomic.AddUint64(&nextID, 1)
|
||||
|
||||
return &EventReceiver[T]{
|
||||
id: id,
|
||||
NewItemCreated: created,
|
||||
ItemRemoved: removed,
|
||||
}
|
||||
}
|
||||
|
||||
// EventPublisher is an interface type for a component that offers event based
|
||||
// subscriptions for publishing events.
|
||||
type EventPublisher[T any, Q any] interface {
|
||||
// RegisterSubscriber adds a new subscriber for receiving events. The
|
||||
// deliverExisting boolean indicates whether already existing items
|
||||
// should be sent to the NewItemCreated channel when the subscription is
|
||||
// started. An optional deliverFrom can be specified to indicate from
|
||||
// which timestamp/index/marker onward existing items should be
|
||||
// delivered on startup. If deliverFrom is nil/zero/empty then all
|
||||
// existing items will be delivered.
|
||||
RegisterSubscriber(receiver *EventReceiver[T], deliverExisting bool,
|
||||
deliverFrom Q) error
|
||||
|
||||
// RemoveSubscriber removes the given subscriber and also stops it from
|
||||
// processing events.
|
||||
RemoveSubscriber(subscriber *EventReceiver[T]) error
|
||||
}
|
||||
|
||||
// Event is a generic event that can be sent to a subscriber.
|
||||
type Event interface {
|
||||
Timestamp() time.Time
|
||||
}
|
||||
|
||||
// EventDistributor is a struct type that helps to distribute events to multiple
|
||||
// subscribers.
|
||||
type EventDistributor[T any] struct {
|
||||
// subscribers is a map of components that want to be notified on new
|
||||
// events, keyed by their subscription ID.
|
||||
subscribers map[uint64]*EventReceiver[T]
|
||||
|
||||
// subscriberMtx guards the subscribers map and access to the
|
||||
// subscriptionID.
|
||||
subscriberMtx sync.Mutex
|
||||
}
|
||||
|
||||
// NewEventDistributor creates a new event distributor of the declared type.
|
||||
func NewEventDistributor[T any]() *EventDistributor[T] {
|
||||
return &EventDistributor[T]{
|
||||
subscribers: make(map[uint64]*EventReceiver[T]),
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterSubscriber adds a new subscriber for receiving events.
|
||||
func (d *EventDistributor[T]) RegisterSubscriber(subscriber *EventReceiver[T]) {
|
||||
d.subscriberMtx.Lock()
|
||||
defer d.subscriberMtx.Unlock()
|
||||
|
||||
d.subscribers[subscriber.ID()] = subscriber
|
||||
}
|
||||
|
||||
// RemoveSubscriber removes the given subscriber and also stops it from
|
||||
// processing events.
|
||||
func (d *EventDistributor[T]) RemoveSubscriber(
|
||||
subscriber *EventReceiver[T]) error {
|
||||
|
||||
d.subscriberMtx.Lock()
|
||||
defer d.subscriberMtx.Unlock()
|
||||
|
||||
_, ok := d.subscribers[subscriber.ID()]
|
||||
if !ok {
|
||||
return fmt.Errorf("subscriber with ID %d not found",
|
||||
subscriber.ID())
|
||||
}
|
||||
|
||||
subscriber.Stop()
|
||||
delete(d.subscribers, subscriber.ID())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NotifySubscribers sends the given events to all subscribers.
|
||||
func (d *EventDistributor[T]) NotifySubscribers(events ...T) {
|
||||
d.subscriberMtx.Lock()
|
||||
for i := range events {
|
||||
event := events[i]
|
||||
for id := range d.subscribers {
|
||||
d.subscribers[id].NewItemCreated.ChanIn() <- event
|
||||
}
|
||||
}
|
||||
d.subscriberMtx.Unlock()
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user