Fix: EventManager error on unregister of receiver

This commit is contained in:
MaMe82 2018-09-25 23:06:23 +02:00
parent bd47bf0e47
commit d8c2cafe38
2 changed files with 26 additions and 18 deletions

View File

@ -71,14 +71,17 @@ func (em *EventManager) RegisterReceiver(filterEventType int64) *EventReceiver {
fmt.Printf("Registered receiver for %d\n", er.FilterEventType)
em.receiverRegListMutex.Lock()
em.receiverRegisterList[er] = true
er.isRegistered = true
em.receiverRegListMutex.Unlock()
return er
}
func (em *EventManager) UnregisterReceiver(receiver *EventReceiver) {
if !receiver.isRegistered { return }
em.receiverDelListMutex.Lock()
em.receiverDeleteList[receiver] = true
receiver.isRegistered = false
em.receiverDelListMutex.Unlock()
}
@ -88,6 +91,27 @@ loop:
for {
select {
case evToDispatch := <-em.eventQueue:
// delete receivers marked for deletion (only unregister function is allowed to put data into this map)
em.receiverDelListMutex.Lock()
for delReceiver := range em.receiverDeleteList {
delete(em.registeredReceivers, delReceiver)
close(delReceiver.EventQueue)
delReceiver.Cancel()
}
//Replace the delete list with a new one and let the GC take care of the old
em.receiverDeleteList = make(map[*EventReceiver]bool)
em.receiverDelListMutex.Unlock()
//add newly registered receivers
em.receiverRegListMutex.Lock()
for addReceiver := range em.receiverRegisterList {
em.registeredReceivers[addReceiver] = true
}
//Replace the register list with a new one and let the GC take care of the old
em.receiverRegisterList = make(map[*EventReceiver]bool)
em.receiverRegListMutex.Unlock()
// distribute to registered receiver
// Note: no mutex on em.registeredReceivers needed, only accessed in this method
for receiver := range em.registeredReceivers {
@ -104,24 +128,6 @@ loop:
}
}
// delete receivers marked for deletion (only unregister function is allowed to put data into this map)
em.receiverDelListMutex.Lock()
for delReceiver := range em.receiverDeleteList {
delete(em.registeredReceivers, delReceiver)
close(delReceiver.EventQueue)
}
//Replace the delete list with a new one and let the GC take care of the old
em.receiverDeleteList = make(map[*EventReceiver]bool)
em.receiverDelListMutex.Unlock()
//add newly registered receivers
em.receiverRegListMutex.Lock()
for addReceiver := range em.receiverRegisterList {
em.registeredReceivers[addReceiver] = true
}
//Replace the register list with a new one and let the GC take care of the old
em.receiverRegisterList = make(map[*EventReceiver]bool)
em.receiverRegListMutex.Unlock()
case <-em.ctx.Done():
@ -136,6 +142,7 @@ loop:
}
type EventReceiver struct {
isRegistered bool
Ctx context.Context
Cancel context.CancelFunc
EventQueue chan *pb.Event

1
service/triggerAction.go Normal file
View File

@ -0,0 +1 @@
package service