diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 38b0330fc..66f2883e8 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -47,9 +47,7 @@ type BtcdNotifier struct { notificationRegistry chan interface{} - // TODO(roasbeef): make map point to slices? Would allow for multiple - // clients to listen for same spend. Would we ever need this? - spendNotifications map[wire.OutPoint]*spendNotification + spendNotifications map[wire.OutPoint][]*spendNotification confNotifications map[wire.ShaHash][]*confirmationsNotification confHeap *confirmationHeap @@ -74,13 +72,13 @@ type BtcdNotifier struct { var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil) // New returns a new BtcdNotifier instance. This function assumes the btcd node -// detailed in the passed configuration is already running, and -// willing to accept new websockets clients. +// detailed in the passed configuration is already running, and willing to +// accept new websockets clients. func New(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) { notifier := &BtcdNotifier{ notificationRegistry: make(chan interface{}), - spendNotifications: make(map[wire.OutPoint]*spendNotification), + spendNotifications: make(map[wire.OutPoint][]*spendNotification), confNotifications: make(map[wire.ShaHash][]*confirmationsNotification), confHeap: newConfirmationHeap(), @@ -150,8 +148,10 @@ func (b *BtcdNotifier) Stop() error { // Notify all pending clients of our shutdown by closing the related // notification channels. - for _, spendClient := range b.spendNotifications { - close(spendClient.spendChan) + for _, spendClients := range b.spendNotifications { + for _, spendClient := range spendClients { + close(spendClient.spendChan) + } } for _, confClients := range b.confNotifications { for _, confClient := range confClients { @@ -194,8 +194,8 @@ func (b *BtcdNotifier) onBlockDisconnected(hash *wire.ShaHash, height int32, t t // onRedeemingTx implements on OnRedeemingTx callback for btcrpcclient. func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetails) { - // Append this new transaction update to the end of the queue of new chain - // updates. + // Append this new transaction update to the end of the queue of new + // chain updates. b.txUpdateMtx.Lock() b.txUpdates = append(b.txUpdates, &txUpdate{tx, details}) b.txUpdateMtx.Unlock() @@ -219,11 +219,15 @@ out: case *spendNotification: chainntnfs.Log.Infof("New spend subscription: "+ "utxo=%v", msg.targetOutpoint) - b.spendNotifications[*msg.targetOutpoint] = msg + op := *msg.targetOutpoint + b.spendNotifications[op] = append(b.spendNotifications[op], msg) case *confirmationsNotification: chainntnfs.Log.Infof("New confirmations "+ "subscription: txid=%v, numconfs=%v", *msg.txid, msg.numConfirmations) + // TODO(roasbeef): perform a N-block look + // behind to catch race-condition due to faster + // inter-block time? txid := *msg.txid b.confNotifications[txid] = append(b.confNotifications[txid], msg) case *blockEpochRegistration: @@ -264,9 +268,10 @@ out: // Check if the inclusion of this transaction // within a block by itself triggers a block // confirmation threshold, if so send a - // notification. Otherwise, place the notification - // on a heap to be triggered in the future once - // additional confirmations are attained. + // notification. Otherwise, place the + // notification on a heap to be triggered in + // the future once additional confirmations are + // attained. txSha := tx.Sha() b.checkConfirmationTrigger(txSha, newHeight) } @@ -292,24 +297,27 @@ out: prevOut := txIn.PreviousOutPoint // If this transaction indeed does spend an - // output which we have a registered notification - // for, then create a spend summary, finally - // sending off the details to the notification - // subscriber. - if ntfn, ok := b.spendNotifications[prevOut]; ok { + // output which we have a registered + // notification for, then create a spend + // summary, finally sending off the details to + // the notification subscriber. + if clients, ok := b.spendNotifications[prevOut]; ok { spenderSha := newSpend.tx.Sha() - spendDetails := &chainntnfs.SpendDetail{ - SpentOutPoint: ntfn.targetOutpoint, - SpenderTxHash: spenderSha, - // TODO(roasbeef): copy tx? - SpendingTx: spendingTx.MsgTx(), - SpenderInputIndex: uint32(i), + for _, ntfn := range clients { + spendDetails := &chainntnfs.SpendDetail{ + SpentOutPoint: ntfn.targetOutpoint, + SpenderTxHash: spenderSha, + // TODO(roasbeef): copy tx? + SpendingTx: spendingTx.MsgTx(), + SpenderInputIndex: uint32(i), + } + + chainntnfs.Log.Infof("Dispatching "+ + "spend notification for "+ + "outpoint=%v", ntfn.targetOutpoint) + ntfn.spendChan <- spendDetails } - chainntnfs.Log.Infof("Dispatching "+ - "spend notification for "+ - "outpoint=%v", ntfn.targetOutpoint) - ntfn.spendChan <- spendDetails delete(b.spendNotifications, prevOut) } }