mirror of
https://github.com/RoganDawes/P4wnP1_aloa.git
synced 2025-03-18 05:41:55 +01:00
Service: fix EventManager to unregister cancelled listeners immediately
This commit is contained in:
parent
b15be2bf8f
commit
cdf8a0dbce
114
service/Event.go
114
service/Event.go
@ -17,21 +17,19 @@ type EventManager struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
registeredReceiversMutex *sync.Mutex
|
||||
registeredReceivers map[*EventReceiver]bool
|
||||
receiverDeleteList map[*EventReceiver]bool
|
||||
receiverRegisterList map[*EventReceiver]bool
|
||||
receiverDelListMutex *sync.Mutex
|
||||
receiverRegListMutex *sync.Mutex
|
||||
registerReceiver chan *EventReceiver
|
||||
unregisterReceiver chan *EventReceiver
|
||||
}
|
||||
|
||||
func NewEventManager(queueSize int) *EventManager {
|
||||
EvMgr := &EventManager{
|
||||
eventQueue: make(chan *pb.Event, queueSize),
|
||||
receiverDelListMutex: &sync.Mutex{},
|
||||
receiverRegListMutex: &sync.Mutex{},
|
||||
receiverRegisterList: make(map[*EventReceiver]bool),
|
||||
registeredReceivers: make(map[*EventReceiver]bool),
|
||||
receiverDeleteList: make(map[*EventReceiver]bool),
|
||||
registerReceiver: make(chan *EventReceiver),
|
||||
unregisterReceiver: make(chan *EventReceiver),
|
||||
registeredReceiversMutex: &sync.Mutex{},
|
||||
}
|
||||
EvMgr.ctx, EvMgr.cancel = context.WithCancel(context.Background())
|
||||
return EvMgr
|
||||
@ -39,7 +37,9 @@ func NewEventManager(queueSize int) *EventManager {
|
||||
|
||||
func (evm *EventManager) Start() {
|
||||
log.Println("Event Manager: Starting event dispatcher")
|
||||
go evm.register_unregister()
|
||||
go evm.dispatch()
|
||||
|
||||
}
|
||||
|
||||
func (evm *EventManager) Stop() {
|
||||
@ -69,11 +69,17 @@ func (em *EventManager) RegisterReceiver(filterEventType int64) *EventReceiver {
|
||||
Cancel: cancel,
|
||||
FilterEventType: filterEventType,
|
||||
}
|
||||
fmt.Printf("Registered receiver for %d\n", er.FilterEventType)
|
||||
em.receiverRegListMutex.Lock()
|
||||
em.receiverRegisterList[er] = true
|
||||
er.isRegistered = true
|
||||
em.receiverRegListMutex.Unlock()
|
||||
|
||||
|
||||
go func() {
|
||||
em.registerReceiver <- er
|
||||
er.isRegistered = true //asynchronous registering, as soon as possible
|
||||
|
||||
<- er.Ctx.Done() //continue watching and assure unregister as soon as possible if canceled
|
||||
em.UnregisterReceiver(er)
|
||||
|
||||
|
||||
}()
|
||||
|
||||
return er
|
||||
}
|
||||
@ -82,10 +88,10 @@ func (em *EventManager) UnregisterReceiver(receiver *EventReceiver) {
|
||||
if !receiver.isRegistered {
|
||||
return
|
||||
}
|
||||
em.receiverDelListMutex.Lock()
|
||||
em.receiverDeleteList[receiver] = true
|
||||
|
||||
//mark as unregistered
|
||||
receiver.isRegistered = false
|
||||
em.receiverDelListMutex.Unlock()
|
||||
em.unregisterReceiver <- receiver
|
||||
}
|
||||
|
||||
func (em *EventManager) dispatch() {
|
||||
@ -94,56 +100,54 @@ loop:
|
||||
for {
|
||||
select {
|
||||
case evToDispatch := <-em.eventQueue:
|
||||
// delete receivers marked for deletion (only unregister function is allowed to put data into this map)
|
||||
em.receiverDelListMutex.Lock()
|
||||
for delReceiver := range em.receiverDeleteList {
|
||||
delete(em.registeredReceivers, delReceiver)
|
||||
delReceiver.Cancel() // cancel context BEFORE closing the eventQueue channel
|
||||
close(delReceiver.EventQueue)
|
||||
|
||||
}
|
||||
//Replace the delete list with a new one and let the GC take care of the old
|
||||
em.receiverDeleteList = make(map[*EventReceiver]bool)
|
||||
em.receiverDelListMutex.Unlock()
|
||||
|
||||
//add newly registered receivers
|
||||
em.receiverRegListMutex.Lock()
|
||||
for addReceiver := range em.receiverRegisterList {
|
||||
em.registeredReceivers[addReceiver] = true
|
||||
}
|
||||
//Replace the register list with a new one and let the GC take care of the old
|
||||
em.receiverRegisterList = make(map[*EventReceiver]bool)
|
||||
em.receiverRegListMutex.Unlock()
|
||||
|
||||
// distribute to registered receiver
|
||||
// Note: no mutex on em.registeredReceivers needed, only accessed in this method
|
||||
em.registeredReceiversMutex.Lock()
|
||||
for receiver := range em.registeredReceivers {
|
||||
// check if this receiver is listening for this event type
|
||||
if receiver.FilterEventType == evToDispatch.Type || receiver.FilterEventType == common_web.EVT_ANY {
|
||||
select {
|
||||
case <-receiver.Ctx.Done():
|
||||
//receiver canceled
|
||||
em.UnregisterReceiver(receiver)
|
||||
continue // go on with next registered receiver
|
||||
case receiver.EventQueue <- evToDispatch:
|
||||
//Do nothing
|
||||
}
|
||||
if receiver.isRegistered && (receiver.FilterEventType == evToDispatch.Type || receiver.FilterEventType == common_web.EVT_ANY) {
|
||||
receiver.EventQueue <- evToDispatch
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
em.registeredReceiversMutex.Unlock()
|
||||
case <-em.ctx.Done():
|
||||
//EventManage aborted
|
||||
|
||||
// ToDo: close all eventReceivers eventQueues, to notify them of the stopped dispatcher
|
||||
//fmt.Println("EvtMgr cancelled")
|
||||
em.registeredReceiversMutex.Lock()
|
||||
for receiver := range em.registeredReceivers {
|
||||
// Calling unregister directly would dead lock on registeredReceiversMutex, as a buffer-less channel is
|
||||
// used and the receiver (register_unregister loop) locks registeredReceiversMutex, again
|
||||
// Calling cancel on the receiver itself, isn't a problem as the unregister is called by a dedicated go
|
||||
// routine per receiver, if the context is done.
|
||||
receiver.Cancel()
|
||||
}
|
||||
em.registeredReceiversMutex.Unlock()
|
||||
break loop
|
||||
}
|
||||
}
|
||||
fmt.Println("Stopped event dispatcher")
|
||||
}
|
||||
|
||||
func (em *EventManager) register_unregister() {
|
||||
fmt.Println("Started event receiver (un)register watcher")
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case er := <- em.registerReceiver:
|
||||
em.registeredReceiversMutex.Lock()
|
||||
em.registeredReceivers[er] = true
|
||||
fmt.Printf("Registered event receiver type %d, overall receiver count %d\n", er.FilterEventType, len(em.registeredReceivers))
|
||||
em.registeredReceiversMutex.Unlock()
|
||||
case er := <- em.unregisterReceiver:
|
||||
em.registeredReceiversMutex.Lock()
|
||||
delete(em.registeredReceivers, er)
|
||||
er.Cancel() // cancel context BEFORE closing the eventQueue channel
|
||||
close(er.EventQueue)
|
||||
fmt.Printf("Unregistered event receiver type %d, overall receiver count %d\n", er.FilterEventType, len(em.registeredReceivers))
|
||||
em.registeredReceiversMutex.Unlock()
|
||||
case <-em.ctx.Done():
|
||||
break loop
|
||||
}
|
||||
}
|
||||
fmt.Println("Stopped event receiver (un)register watcher")
|
||||
}
|
||||
|
||||
type EventReceiver struct {
|
||||
isRegistered bool
|
||||
Ctx context.Context
|
||||
|
Loading…
x
Reference in New Issue
Block a user