chainntnfs/btcdnotify: implement spend+epoch ntfn cancellations

This commit minifies the BtcdNotifier concrete implementation of the
ChainNotifier interface to allow callers to optionally cancel an
outstanding block epoch or spend notificaiton intent.

To do this efficiently, we now give each notification intent a unique
ID based on if it’s an epoch intent or a spend intent. We then use this
ID to reference back to the original un-dispatched notification intent
when the caller wishes to cancel the intent.
This commit is contained in:
Olaoluwa Osuntokun 2017-02-20 16:31:16 -08:00
parent a3319bb21a
commit 73cc28d5fb
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2

View File

@ -43,23 +43,30 @@ type txUpdate struct {
details *btcjson.BlockDetails details *btcjson.BlockDetails
} }
// TODO(roasbeef): generalize struct below:
// * move chans to config, allow outside callers to handle send conditions
// BtcdNotifier implements the ChainNotifier interface using btcd's websockets // BtcdNotifier implements the ChainNotifier interface using btcd's websockets
// notifications. Multiple concurrent clients are supported. All notifications // notifications. Multiple concurrent clients are supported. All notifications
// are achieved via non-blocking sends on client channels. // are achieved via non-blocking sends on client channels.
type BtcdNotifier struct { type BtcdNotifier struct {
spendClientCounter uint64 // To be used atomically.
epochClientCounter uint64 // To be used atomically.
started int32 // To be used atomically. started int32 // To be used atomically.
stopped int32 // To be used atomically. stopped int32 // To be used atomically.
chainConn *btcrpcclient.Client chainConn *btcrpcclient.Client
notificationCancels chan interface{}
notificationRegistry chan interface{} notificationRegistry chan interface{}
spendNotifications map[wire.OutPoint][]*spendNotification spendNotifications map[wire.OutPoint]map[uint64]*spendNotification
confNotifications map[chainhash.Hash][]*confirmationsNotification confNotifications map[chainhash.Hash][]*confirmationsNotification
confHeap *confirmationHeap confHeap *confirmationHeap
blockEpochClients []chan *chainntnfs.BlockEpoch blockEpochClients map[uint64]chan *chainntnfs.BlockEpoch
disconnectedBlockHashes chan *blockNtfn disconnectedBlockHashes chan *blockNtfn
@ -83,11 +90,15 @@ var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil)
// accept new websockets clients. // accept new websockets clients.
func New(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) { func New(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) {
notifier := &BtcdNotifier{ notifier := &BtcdNotifier{
notificationCancels: make(chan interface{}),
notificationRegistry: make(chan interface{}), notificationRegistry: make(chan interface{}),
spendNotifications: make(map[wire.OutPoint][]*spendNotification), blockEpochClients: make(map[uint64]chan *chainntnfs.BlockEpoch),
confNotifications: make(map[chainhash.Hash][]*confirmationsNotification),
confHeap: newConfirmationHeap(), spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification),
confNotifications: make(map[chainhash.Hash][]*confirmationsNotification),
confHeap: newConfirmationHeap(),
disconnectedBlockHashes: make(chan *blockNtfn, 20), disconnectedBlockHashes: make(chan *blockNtfn, 20),
@ -229,13 +240,38 @@ func (b *BtcdNotifier) notificationDispatcher(currentHeight int32) {
out: out:
for { for {
select { select {
case cancelMsg := <-b.notificationCancels:
switch msg := cancelMsg.(type) {
case *spendCancel:
chainntnfs.Log.Infof("Cancelling spend "+
"notification for out_point=%v, "+
"spend_id=%v", msg.op, msg.spendID)
// Before we attempt to close the spendChan,
// ensure that the notification hasn't already
// yet been dispatched.
if outPointClients, ok := b.spendNotifications[msg.op]; ok {
close(outPointClients[msg.spendID].spendChan)
delete(b.spendNotifications[msg.op], msg.spendID)
}
case *epochCancel:
chainntnfs.Log.Infof("Cancelling epoch "+
"notification, epoch_id=%v", msg.epochID)
close(b.blockEpochClients[msg.epochID])
delete(b.blockEpochClients, msg.epochID)
}
case registerMsg := <-b.notificationRegistry: case registerMsg := <-b.notificationRegistry:
switch msg := registerMsg.(type) { switch msg := registerMsg.(type) {
case *spendNotification: case *spendNotification:
chainntnfs.Log.Infof("New spend subscription: "+ chainntnfs.Log.Infof("New spend subscription: "+
"utxo=%v", msg.targetOutpoint) "utxo=%v", msg.targetOutpoint)
op := *msg.targetOutpoint op := *msg.targetOutpoint
b.spendNotifications[op] = append(b.spendNotifications[op], msg)
if _, ok := b.spendNotifications[op]; !ok {
b.spendNotifications[op] = make(map[uint64]*spendNotification)
}
b.spendNotifications[op][msg.spendID] = msg
case *confirmationsNotification: case *confirmationsNotification:
chainntnfs.Log.Infof("New confirmations "+ chainntnfs.Log.Infof("New confirmations "+
"subscription: txid=%v, numconfs=%v", "subscription: txid=%v, numconfs=%v",
@ -252,9 +288,9 @@ out:
b.confNotifications[txid] = append(b.confNotifications[txid], msg) b.confNotifications[txid] = append(b.confNotifications[txid], msg)
case *blockEpochRegistration: case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")
b.blockEpochClients = append(b.blockEpochClients, b.blockEpochClients[msg.epochID] = msg.epochChan
msg.epochChan)
} }
case staleBlockHash := <-b.disconnectedBlockHashes: case staleBlockHash := <-b.disconnectedBlockHashes:
// TODO(roasbeef): re-orgs // TODO(roasbeef): re-orgs
// * second channel to notify of confirmation decrementing // * second channel to notify of confirmation decrementing
@ -262,6 +298,7 @@ out:
// * notify of negative confirmations // * notify of negative confirmations
chainntnfs.Log.Warnf("Block disconnected from main "+ chainntnfs.Log.Warnf("Block disconnected from main "+
"chain: %v", staleBlockHash) "chain: %v", staleBlockHash)
case <-b.chainUpdateSignal: case <-b.chainUpdateSignal:
// A new update is available, so pop the new chain // A new update is available, so pop the new chain
// update from the front of the update queue. // update from the front of the update queue.
@ -303,6 +340,7 @@ out:
// chain. Send out any N confirmation notifications // chain. Send out any N confirmation notifications
// which may have been triggered by this new block. // which may have been triggered by this new block.
b.notifyConfs(newHeight) b.notifyConfs(newHeight)
case <-b.txUpdateSignal: case <-b.txUpdateSignal:
// A new update is available, so pop the new chain // A new update is available, so pop the new chain
// update from the front of the update queue. // update from the front of the update queue.
@ -344,6 +382,7 @@ out:
delete(b.spendNotifications, prevOut) delete(b.spendNotifications, prevOut)
} }
} }
case <-b.quit: case <-b.quit:
break out break out
} }
@ -437,7 +476,6 @@ func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash
Hash: newSha, Hash: newSha,
} }
// TODO(roasbeef): spwan a new goroutine for each client instead?
for _, epochChan := range b.blockEpochClients { for _, epochChan := range b.blockEpochClients {
// Attempt a non-blocking send. If the buffered channel is // Attempt a non-blocking send. If the buffered channel is
// full, then we no-op and move onto the next client. // full, then we no-op and move onto the next client.
@ -542,6 +580,18 @@ type spendNotification struct {
targetOutpoint *wire.OutPoint targetOutpoint *wire.OutPoint
spendChan chan *chainntnfs.SpendDetail spendChan chan *chainntnfs.SpendDetail
spendID uint64
}
// spendCancel is a message sent to the BtcdNotifier when a client wishes to
// cancel an outstanding spend notification that has yet to be dispatched.
type spendCancel struct {
// op is the target outpoint of the notification to be cancelled.
op wire.OutPoint
// spendID the ID of the notification to cancel.
spendID uint64
} }
// RegisterSpendNotification registers an intent to be notified once the target // RegisterSpendNotification registers an intent to be notified once the target
@ -557,6 +607,7 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.S
ntfn := &spendNotification{ ntfn := &spendNotification{
targetOutpoint: outpoint, targetOutpoint: outpoint,
spendChan: make(chan *chainntnfs.SpendDetail, 1), spendChan: make(chan *chainntnfs.SpendDetail, 1),
spendID: atomic.AddUint64(&b.spendClientCounter, 1),
} }
select { select {
@ -594,6 +645,16 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.S
return &chainntnfs.SpendEvent{ return &chainntnfs.SpendEvent{
Spend: ntfn.spendChan, Spend: ntfn.spendChan,
Cancel: func() {
select {
case b.notificationCancels <- &spendCancel{
op: *outpoint,
spendID: ntfn.spendID,
}:
case <-b.quit:
return
}
},
}, nil }, nil
} }
@ -637,14 +698,23 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
// notification with each newly connected block. // notification with each newly connected block.
type blockEpochRegistration struct { type blockEpochRegistration struct {
epochChan chan *chainntnfs.BlockEpoch epochChan chan *chainntnfs.BlockEpoch
epochID uint64
}
// epochCancel is a message sent to the BtcdNotifier when a client wishes to
// cancel an outstanding epoch notification that has yet to be dispatched.
type epochCancel struct {
epochID uint64
} }
// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the
// caller to receive notificationsm, of each new block connected to the main // caller to receive notifications, of each new block connected to the main
// chain. // chain.
func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) {
registration := &blockEpochRegistration{ registration := &blockEpochRegistration{
epochChan: make(chan *chainntnfs.BlockEpoch, 20), epochChan: make(chan *chainntnfs.BlockEpoch, 20),
epochID: atomic.AddUint64(&b.epochClientCounter, 1),
} }
select { select {
@ -654,6 +724,15 @@ func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, er
case b.notificationRegistry <- registration: case b.notificationRegistry <- registration:
return &chainntnfs.BlockEpochEvent{ return &chainntnfs.BlockEpochEvent{
Epochs: registration.epochChan, Epochs: registration.epochChan,
Cancel: func() {
select {
case b.notificationCancels <- &epochCancel{
epochID: registration.epochID,
}:
case <-b.quit:
return
}
},
}, nil }, nil
} }
} }