From cdf8a0dbcea554e2a83f4725112bd4a3b529a040 Mon Sep 17 00:00:00 2001 From: MaMe82 Date: Fri, 26 Oct 2018 15:07:58 +0200 Subject: [PATCH] Service: fix EventManager to unregister cancelled listeners immediately --- service/Event.go | 114 ++++++++++++++++++++++++----------------------- 1 file changed, 59 insertions(+), 55 deletions(-) diff --git a/service/Event.go b/service/Event.go index c62d423..7b5c24c 100644 --- a/service/Event.go +++ b/service/Event.go @@ -17,21 +17,19 @@ type EventManager struct { ctx context.Context cancel context.CancelFunc + registeredReceiversMutex *sync.Mutex registeredReceivers map[*EventReceiver]bool - receiverDeleteList map[*EventReceiver]bool - receiverRegisterList map[*EventReceiver]bool - receiverDelListMutex *sync.Mutex - receiverRegListMutex *sync.Mutex + registerReceiver chan *EventReceiver + unregisterReceiver chan *EventReceiver } func NewEventManager(queueSize int) *EventManager { EvMgr := &EventManager{ eventQueue: make(chan *pb.Event, queueSize), - receiverDelListMutex: &sync.Mutex{}, - receiverRegListMutex: &sync.Mutex{}, - receiverRegisterList: make(map[*EventReceiver]bool), registeredReceivers: make(map[*EventReceiver]bool), - receiverDeleteList: make(map[*EventReceiver]bool), + registerReceiver: make(chan *EventReceiver), + unregisterReceiver: make(chan *EventReceiver), + registeredReceiversMutex: &sync.Mutex{}, } EvMgr.ctx, EvMgr.cancel = context.WithCancel(context.Background()) return EvMgr @@ -39,7 +37,9 @@ func NewEventManager(queueSize int) *EventManager { func (evm *EventManager) Start() { log.Println("Event Manager: Starting event dispatcher") + go evm.register_unregister() go evm.dispatch() + } func (evm *EventManager) Stop() { @@ -69,11 +69,17 @@ func (em *EventManager) RegisterReceiver(filterEventType int64) *EventReceiver { Cancel: cancel, FilterEventType: filterEventType, } - fmt.Printf("Registered receiver for %d\n", er.FilterEventType) - em.receiverRegListMutex.Lock() - em.receiverRegisterList[er] = true - er.isRegistered = true - em.receiverRegListMutex.Unlock() + + + go func() { + em.registerReceiver <- er + er.isRegistered = true //asynchronous registering, as soon as possible + + <- er.Ctx.Done() //continue watching and assure unregister as soon as possible if canceled + em.UnregisterReceiver(er) + + + }() return er } @@ -82,10 +88,10 @@ func (em *EventManager) UnregisterReceiver(receiver *EventReceiver) { if !receiver.isRegistered { return } - em.receiverDelListMutex.Lock() - em.receiverDeleteList[receiver] = true + + //mark as unregistered receiver.isRegistered = false - em.receiverDelListMutex.Unlock() + em.unregisterReceiver <- receiver } func (em *EventManager) dispatch() { @@ -94,56 +100,54 @@ loop: for { select { case evToDispatch := <-em.eventQueue: - // delete receivers marked for deletion (only unregister function is allowed to put data into this map) - em.receiverDelListMutex.Lock() - for delReceiver := range em.receiverDeleteList { - delete(em.registeredReceivers, delReceiver) - delReceiver.Cancel() // cancel context BEFORE closing the eventQueue channel - close(delReceiver.EventQueue) - - } - //Replace the delete list with a new one and let the GC take care of the old - em.receiverDeleteList = make(map[*EventReceiver]bool) - em.receiverDelListMutex.Unlock() - - //add newly registered receivers - em.receiverRegListMutex.Lock() - for addReceiver := range em.receiverRegisterList { - em.registeredReceivers[addReceiver] = true - } - //Replace the register list with a new one and let the GC take care of the old - em.receiverRegisterList = make(map[*EventReceiver]bool) - em.receiverRegListMutex.Unlock() - - // distribute to registered receiver - // Note: no mutex on em.registeredReceivers needed, only accessed in this method + em.registeredReceiversMutex.Lock() for receiver := range em.registeredReceivers { // check if this receiver is listening for this event type - if receiver.FilterEventType == evToDispatch.Type || receiver.FilterEventType == common_web.EVT_ANY { - select { - case <-receiver.Ctx.Done(): - //receiver canceled - em.UnregisterReceiver(receiver) - continue // go on with next registered receiver - case receiver.EventQueue <- evToDispatch: - //Do nothing - } + if receiver.isRegistered && (receiver.FilterEventType == evToDispatch.Type || receiver.FilterEventType == common_web.EVT_ANY) { + receiver.EventQueue <- evToDispatch } } - - - + em.registeredReceiversMutex.Unlock() case <-em.ctx.Done(): - //EventManage aborted - - // ToDo: close all eventReceivers eventQueues, to notify them of the stopped dispatcher - //fmt.Println("EvtMgr cancelled") + em.registeredReceiversMutex.Lock() + for receiver := range em.registeredReceivers { + // Calling unregister directly would dead lock on registeredReceiversMutex, as a buffer-less channel is + // used and the receiver (register_unregister loop) locks registeredReceiversMutex, again + // Calling cancel on the receiver itself, isn't a problem as the unregister is called by a dedicated go + // routine per receiver, if the context is done. + receiver.Cancel() + } + em.registeredReceiversMutex.Unlock() break loop } } fmt.Println("Stopped event dispatcher") } +func (em *EventManager) register_unregister() { + fmt.Println("Started event receiver (un)register watcher") +loop: + for { + select { + case er := <- em.registerReceiver: + em.registeredReceiversMutex.Lock() + em.registeredReceivers[er] = true + fmt.Printf("Registered event receiver type %d, overall receiver count %d\n", er.FilterEventType, len(em.registeredReceivers)) + em.registeredReceiversMutex.Unlock() + case er := <- em.unregisterReceiver: + em.registeredReceiversMutex.Lock() + delete(em.registeredReceivers, er) + er.Cancel() // cancel context BEFORE closing the eventQueue channel + close(er.EventQueue) + fmt.Printf("Unregistered event receiver type %d, overall receiver count %d\n", er.FilterEventType, len(em.registeredReceivers)) + em.registeredReceiversMutex.Unlock() + case <-em.ctx.Done(): + break loop + } + } + fmt.Println("Stopped event receiver (un)register watcher") +} + type EventReceiver struct { isRegistered bool Ctx context.Context