Merge pull request #750 from Roasbeef/block-epoch-queue

chainntnfs: ensure all block epoch notifications are sent *in order*
This commit is contained in:
Olaoluwa Osuntokun
2018-02-12 15:15:55 -08:00
committed by GitHub
3 changed files with 172 additions and 57 deletions

View File

@ -174,6 +174,9 @@ func (b *BitcoindNotifier) Stop() error {
}
}
for _, epochClient := range b.blockEpochClients {
close(epochClient.cancelChan)
epochClient.wg.Wait()
close(epochClient.epochChan)
}
b.txConfNotifier.TearDown()
@ -213,7 +216,13 @@ out:
chainntnfs.Log.Infof("Cancelling epoch "+
"notification, epoch_id=%v", msg.epochID)
// First, close the cancel channel for this
// First, we'll lookup the original
// registration in order to stop the active
// queue goroutine.
reg := b.blockEpochClients[msg.epochID]
reg.epochQueue.Stop()
// Next, close the cancel channel for this
// specific client, and wait for the client to
// exit.
close(b.blockEpochClients[msg.epochID].cancelChan)
@ -441,27 +450,14 @@ func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.
}
for _, epochClient := range b.blockEpochClients {
b.wg.Add(1)
epochClient.wg.Add(1)
go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{},
clientWg *sync.WaitGroup) {
// TODO(roasbeef): move to goroutine per client, use sync queue
defer clientWg.Done()
defer b.wg.Done()
select {
case ntfnChan <- epoch:
case <-cancelChan:
return
case epochClient.epochQueue.ChanIn() <- epoch:
case <-epochClient.cancelChan:
case <-b.quit:
return
}
}(epochClient.epochChan, epochClient.cancelChan, &epochClient.wg)
}
}
@ -628,6 +624,8 @@ type blockEpochRegistration struct {
epochChan chan *chainntnfs.BlockEpoch
epochQueue *chainntnfs.ConcurrentQueue
cancelChan chan struct{}
wg sync.WaitGroup
@ -643,22 +641,58 @@ type epochCancel struct {
// caller to receive notifications, of each new block connected to the main
// chain.
func (b *BitcoindNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) {
registration := &blockEpochRegistration{
reg := &blockEpochRegistration{
epochQueue: chainntnfs.NewConcurrentQueue(20),
epochChan: make(chan *chainntnfs.BlockEpoch, 20),
cancelChan: make(chan struct{}),
epochID: atomic.AddUint64(&b.epochClientCounter, 1),
}
reg.epochQueue.Start()
// Before we send the request to the main goroutine, we'll launch a new
// goroutine to proxy items added to our queue to the client itself.
// This ensures that all notifications are received *in order*.
reg.wg.Add(1)
go func() {
defer reg.wg.Done()
for {
select {
case ntfn := <-reg.epochQueue.ChanOut():
blockNtfn := ntfn.(*chainntnfs.BlockEpoch)
select {
case reg.epochChan <- blockNtfn:
case <-reg.cancelChan:
return
case <-b.quit:
return
}
case <-reg.cancelChan:
return
case <-b.quit:
return
}
}
}()
select {
case <-b.quit:
// As we're exiting before the registration could be sent,
// we'll stop the queue now ourselves.
reg.epochQueue.Stop()
return nil, errors.New("chainntnfs: system interrupt while " +
"attempting to register for block epoch notification.")
case b.notificationRegistry <- registration:
case b.notificationRegistry <- reg:
return &chainntnfs.BlockEpochEvent{
Epochs: registration.epochChan,
Epochs: reg.epochChan,
Cancel: func() {
cancel := &epochCancel{
epochID: registration.epochID,
epochID: reg.epochID,
}
// Submit epoch cancellation to notification dispatcher.
@ -668,7 +702,7 @@ func (b *BitcoindNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent
// closed before yielding to caller.
for {
select {
case _, ok := <-registration.epochChan:
case _, ok := <-reg.epochChan:
if !ok {
return
}

View File

@ -184,6 +184,9 @@ func (b *BtcdNotifier) Stop() error {
}
}
for _, epochClient := range b.blockEpochClients {
close(epochClient.cancelChan)
epochClient.wg.Wait()
close(epochClient.epochChan)
}
b.txConfNotifier.TearDown()
@ -247,7 +250,13 @@ out:
chainntnfs.Log.Infof("Cancelling epoch "+
"notification, epoch_id=%v", msg.epochID)
// First, close the cancel channel for this
// First, we'll lookup the original
// registration in order to stop the active
// queue goroutine.
reg := b.blockEpochClients[msg.epochID]
reg.epochQueue.Stop()
// Next, close the cancel channel for this
// specific client, and wait for the client to
// exit.
close(b.blockEpochClients[msg.epochID].cancelChan)
@ -260,7 +269,6 @@ out:
// cancelled.
close(b.blockEpochClients[msg.epochID].epochChan)
delete(b.blockEpochClients, msg.epochID)
}
case registerMsg := <-b.notificationRegistry:
switch msg := registerMsg.(type) {
@ -462,27 +470,14 @@ func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash
}
for _, epochClient := range b.blockEpochClients {
b.wg.Add(1)
epochClient.wg.Add(1)
go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{},
clientWg *sync.WaitGroup) {
// TODO(roasbeef): move to goroutine per client, use sync queue
defer clientWg.Done()
defer b.wg.Done()
select {
case ntfnChan <- epoch:
case <-cancelChan:
return
case epochClient.epochQueue.ChanIn() <- epoch:
case <-epochClient.cancelChan:
case <-b.quit:
return
}
}(epochClient.epochChan, epochClient.cancelChan, &epochClient.wg)
}
}
@ -631,6 +626,8 @@ type blockEpochRegistration struct {
epochChan chan *chainntnfs.BlockEpoch
epochQueue *chainntnfs.ConcurrentQueue
cancelChan chan struct{}
wg sync.WaitGroup
@ -646,32 +643,69 @@ type epochCancel struct {
// caller to receive notifications, of each new block connected to the main
// chain.
func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) {
registration := &blockEpochRegistration{
reg := &blockEpochRegistration{
epochQueue: chainntnfs.NewConcurrentQueue(20),
epochChan: make(chan *chainntnfs.BlockEpoch, 20),
cancelChan: make(chan struct{}),
epochID: atomic.AddUint64(&b.epochClientCounter, 1),
}
reg.epochQueue.Start()
// Before we send the request to the main goroutine, we'll launch a new
// goroutine to proxy items added to our queue to the client itself.
// This ensures that all notifications are received *in order*.
reg.wg.Add(1)
go func() {
defer reg.wg.Done()
for {
select {
case ntfn := <-reg.epochQueue.ChanOut():
blockNtfn := ntfn.(*chainntnfs.BlockEpoch)
select {
case reg.epochChan <- blockNtfn:
case <-reg.cancelChan:
return
case <-b.quit:
return
}
case <-reg.cancelChan:
return
case <-b.quit:
return
}
}
}()
select {
case <-b.quit:
// As we're exiting before the registration could be sent,
// we'll stop the queue now ourselves.
reg.epochQueue.Stop()
return nil, errors.New("chainntnfs: system interrupt while " +
"attempting to register for block epoch notification.")
case b.notificationRegistry <- registration:
case b.notificationRegistry <- reg:
return &chainntnfs.BlockEpochEvent{
Epochs: registration.epochChan,
Epochs: reg.epochChan,
Cancel: func() {
cancel := &epochCancel{
epochID: registration.epochID,
epochID: reg.epochID,
}
// Submit epoch cancellation to notification dispatcher.
select {
case b.notificationCancels <- cancel:
// Cancellation is being handled, drain the epoch channel until it is
// closed before yielding to caller.
// Cancellation is being handled, drain
// the epoch channel until it is closed
// before yielding to caller.
for {
select {
case _, ok := <-registration.epochChan:
case _, ok := <-reg.epochChan:
if !ok {
return
}

View File

@ -181,6 +181,9 @@ func (n *NeutrinoNotifier) Stop() error {
}
}
for _, epochClient := range n.blockEpochClients {
close(epochClient.cancelChan)
epochClient.wg.Wait()
close(epochClient.epochChan)
}
n.txConfNotifier.TearDown()
@ -257,7 +260,13 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
chainntnfs.Log.Infof("Cancelling epoch "+
"notification, epoch_id=%v", msg.epochID)
// First, close the cancel channel for this
// First, we'll lookup the original
// registration in order to stop the active
// queue goroutine.
reg := n.blockEpochClients[msg.epochID]
reg.epochQueue.Stop()
// Next, close the cancel channel for this
// specific client, and wait for the client to
// exit.
close(n.blockEpochClients[msg.epochID].cancelChan)
@ -715,6 +724,8 @@ type blockEpochRegistration struct {
epochChan chan *chainntnfs.BlockEpoch
epochQueue *chainntnfs.ConcurrentQueue
cancelChan chan struct{}
wg sync.WaitGroup
@ -729,22 +740,58 @@ type epochCancel struct {
// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the caller
// to receive notifications, of each new block connected to the main chain.
func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) {
registration := &blockEpochRegistration{
reg := &blockEpochRegistration{
epochQueue: chainntnfs.NewConcurrentQueue(20),
epochChan: make(chan *chainntnfs.BlockEpoch, 20),
cancelChan: make(chan struct{}),
epochID: atomic.AddUint64(&n.epochClientCounter, 1),
}
reg.epochQueue.Start()
// Before we send the request to the main goroutine, we'll launch a new
// goroutine to proxy items added to our queue to the client itself.
// This ensures that all notifications are received *in order*.
reg.wg.Add(1)
go func() {
defer reg.wg.Done()
for {
select {
case ntfn := <-reg.epochQueue.ChanOut():
blockNtfn := ntfn.(*chainntnfs.BlockEpoch)
select {
case reg.epochChan <- blockNtfn:
case <-reg.cancelChan:
return
case <-n.quit:
return
}
case <-reg.cancelChan:
return
case <-n.quit:
return
}
}
}()
select {
case <-n.quit:
// As we're exiting before the registration could be sent,
// we'll stop the queue now ourselves.
reg.epochQueue.Stop()
return nil, errors.New("chainntnfs: system interrupt while " +
"attempting to register for block epoch notification.")
case n.notificationRegistry <- registration:
case n.notificationRegistry <- reg:
return &chainntnfs.BlockEpochEvent{
Epochs: registration.epochChan,
Epochs: reg.epochChan,
Cancel: func() {
cancel := &epochCancel{
epochID: registration.epochID,
epochID: reg.epochID,
}
// Submit epoch cancellation to notification dispatcher.
@ -754,7 +801,7 @@ func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent
// closed before yielding to caller.
for {
select {
case _, ok := <-registration.epochChan:
case _, ok := <-reg.epochChan:
if !ok {
return
}