chainntfns/btcdnotify: add support for multi-dispatch spend notifications

This commit adds support for dispatching the same spend notification to
multiple clients. This is now required by the ChainNotiifer interface
documentation and will be needed within the daemon in order to support
some upcoming refactors.
This commit is contained in:
Olaoluwa Osuntokun 2016-11-27 19:17:27 -08:00
parent 0bd5c6790d
commit d1f12627d2
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2

View File

@ -47,9 +47,7 @@ type BtcdNotifier struct {
notificationRegistry chan interface{} notificationRegistry chan interface{}
// TODO(roasbeef): make map point to slices? Would allow for multiple spendNotifications map[wire.OutPoint][]*spendNotification
// clients to listen for same spend. Would we ever need this?
spendNotifications map[wire.OutPoint]*spendNotification
confNotifications map[wire.ShaHash][]*confirmationsNotification confNotifications map[wire.ShaHash][]*confirmationsNotification
confHeap *confirmationHeap confHeap *confirmationHeap
@ -74,13 +72,13 @@ type BtcdNotifier struct {
var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil) var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil)
// New returns a new BtcdNotifier instance. This function assumes the btcd node // New returns a new BtcdNotifier instance. This function assumes the btcd node
// detailed in the passed configuration is already running, and // detailed in the passed configuration is already running, and willing to
// willing to accept new websockets clients. // accept new websockets clients.
func New(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) { func New(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) {
notifier := &BtcdNotifier{ notifier := &BtcdNotifier{
notificationRegistry: make(chan interface{}), notificationRegistry: make(chan interface{}),
spendNotifications: make(map[wire.OutPoint]*spendNotification), spendNotifications: make(map[wire.OutPoint][]*spendNotification),
confNotifications: make(map[wire.ShaHash][]*confirmationsNotification), confNotifications: make(map[wire.ShaHash][]*confirmationsNotification),
confHeap: newConfirmationHeap(), confHeap: newConfirmationHeap(),
@ -150,8 +148,10 @@ func (b *BtcdNotifier) Stop() error {
// Notify all pending clients of our shutdown by closing the related // Notify all pending clients of our shutdown by closing the related
// notification channels. // notification channels.
for _, spendClient := range b.spendNotifications { for _, spendClients := range b.spendNotifications {
close(spendClient.spendChan) for _, spendClient := range spendClients {
close(spendClient.spendChan)
}
} }
for _, confClients := range b.confNotifications { for _, confClients := range b.confNotifications {
for _, confClient := range confClients { for _, confClient := range confClients {
@ -194,8 +194,8 @@ func (b *BtcdNotifier) onBlockDisconnected(hash *wire.ShaHash, height int32, t t
// onRedeemingTx implements on OnRedeemingTx callback for btcrpcclient. // onRedeemingTx implements on OnRedeemingTx callback for btcrpcclient.
func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetails) { func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetails) {
// Append this new transaction update to the end of the queue of new chain // Append this new transaction update to the end of the queue of new
// updates. // chain updates.
b.txUpdateMtx.Lock() b.txUpdateMtx.Lock()
b.txUpdates = append(b.txUpdates, &txUpdate{tx, details}) b.txUpdates = append(b.txUpdates, &txUpdate{tx, details})
b.txUpdateMtx.Unlock() b.txUpdateMtx.Unlock()
@ -219,11 +219,15 @@ out:
case *spendNotification: case *spendNotification:
chainntnfs.Log.Infof("New spend subscription: "+ chainntnfs.Log.Infof("New spend subscription: "+
"utxo=%v", msg.targetOutpoint) "utxo=%v", msg.targetOutpoint)
b.spendNotifications[*msg.targetOutpoint] = msg op := *msg.targetOutpoint
b.spendNotifications[op] = append(b.spendNotifications[op], msg)
case *confirmationsNotification: case *confirmationsNotification:
chainntnfs.Log.Infof("New confirmations "+ chainntnfs.Log.Infof("New confirmations "+
"subscription: txid=%v, numconfs=%v", "subscription: txid=%v, numconfs=%v",
*msg.txid, msg.numConfirmations) *msg.txid, msg.numConfirmations)
// TODO(roasbeef): perform a N-block look
// behind to catch race-condition due to faster
// inter-block time?
txid := *msg.txid txid := *msg.txid
b.confNotifications[txid] = append(b.confNotifications[txid], msg) b.confNotifications[txid] = append(b.confNotifications[txid], msg)
case *blockEpochRegistration: case *blockEpochRegistration:
@ -264,9 +268,10 @@ out:
// Check if the inclusion of this transaction // Check if the inclusion of this transaction
// within a block by itself triggers a block // within a block by itself triggers a block
// confirmation threshold, if so send a // confirmation threshold, if so send a
// notification. Otherwise, place the notification // notification. Otherwise, place the
// on a heap to be triggered in the future once // notification on a heap to be triggered in
// additional confirmations are attained. // the future once additional confirmations are
// attained.
txSha := tx.Sha() txSha := tx.Sha()
b.checkConfirmationTrigger(txSha, newHeight) b.checkConfirmationTrigger(txSha, newHeight)
} }
@ -292,24 +297,27 @@ out:
prevOut := txIn.PreviousOutPoint prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an // If this transaction indeed does spend an
// output which we have a registered notification // output which we have a registered
// for, then create a spend summary, finally // notification for, then create a spend
// sending off the details to the notification // summary, finally sending off the details to
// subscriber. // the notification subscriber.
if ntfn, ok := b.spendNotifications[prevOut]; ok { if clients, ok := b.spendNotifications[prevOut]; ok {
spenderSha := newSpend.tx.Sha() spenderSha := newSpend.tx.Sha()
spendDetails := &chainntnfs.SpendDetail{ for _, ntfn := range clients {
SpentOutPoint: ntfn.targetOutpoint, spendDetails := &chainntnfs.SpendDetail{
SpenderTxHash: spenderSha, SpentOutPoint: ntfn.targetOutpoint,
// TODO(roasbeef): copy tx? SpenderTxHash: spenderSha,
SpendingTx: spendingTx.MsgTx(), // TODO(roasbeef): copy tx?
SpenderInputIndex: uint32(i), SpendingTx: spendingTx.MsgTx(),
SpenderInputIndex: uint32(i),
}
chainntnfs.Log.Infof("Dispatching "+
"spend notification for "+
"outpoint=%v", ntfn.targetOutpoint)
ntfn.spendChan <- spendDetails
} }
chainntnfs.Log.Infof("Dispatching "+
"spend notification for "+
"outpoint=%v", ntfn.targetOutpoint)
ntfn.spendChan <- spendDetails
delete(b.spendNotifications, prevOut) delete(b.spendNotifications, prevOut)
} }
} }