diff --git a/breacharbiter.go b/breacharbiter.go index 4fa713e6e..ffad00adc 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -15,6 +15,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lnwallet" + "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/wire" @@ -27,7 +28,7 @@ import ( // is critical that such state is persisted on disk, so that if our node // restarts at any point during the retribution procedure, we can recover and // continue from the persisted state. -var retributionBucket = []byte("ret") +var retributionBucket = []byte("retribution") // breachArbiter is a special subsystem which is responsible for watching and // acting on the detection of any attempted uncooperative channel breaches by @@ -38,13 +39,14 @@ var retributionBucket = []byte("ret") // counterparties. // TODO(roasbeef): closures in config for subsystem pointers to decouple? type breachArbiter struct { - wallet *lnwallet.LightningWallet - db *channeldb.DB - notifier chainntnfs.ChainNotifier - htlcSwitch *htlcswitch.Switch - chainIO lnwallet.BlockChainIO - estimator lnwallet.FeeEstimator - retributionStore *retributionStore + wallet *lnwallet.LightningWallet + db *channeldb.DB + notifier chainntnfs.ChainNotifier + chainIO lnwallet.BlockChainIO + estimator lnwallet.FeeEstimator + htlcSwitch *htlcswitch.Switch + + retributionStore RetributionStore // breachObservers is a map which tracks all the active breach // observers we're currently managing. The key of the map is the @@ -84,10 +86,13 @@ func newBreachArbiter(wallet *lnwallet.LightningWallet, db *channeldb.DB, chain lnwallet.BlockChainIO, fe lnwallet.FeeEstimator) *breachArbiter { return &breachArbiter{ - wallet: wallet, - notifier: notifier, - htlcSwitch: h, - db: db, + wallet: wallet, + db: db, + notifier: notifier, + chainIO: chain, + htlcSwitch: h, + estimator: fe, + retributionStore: newRetributionStore(db), breachObservers: make(map[wire.OutPoint]chan struct{}), @@ -107,30 +112,30 @@ func (b *breachArbiter) Start() error { brarLog.Tracef("Starting breach arbiter") - // TODO(roasbeef): instead use closure height of channel - _, currentHeight, err := b.chainIO.GetBestBlock() - if err != nil { - return err - } + // We load all pending retributions from the database and + // deterministically reconstruct a channel close summary for each. In + // the event that a channel is still open after being breached, we can + // use the close summary to reinitiate a channel close so that the + // breach is reflected in channeldb. + breachRetInfos := make(map[wire.OutPoint]retributionInfo) + closeSummaries := make(map[wire.OutPoint]channeldb.ChannelCloseSummary) + err := b.retributionStore.ForAll(func(ret *retributionInfo) error { + // Extract emitted retribution information. + breachRetInfos[ret.chanPoint] = *ret - // We load any pending retributions from the database. For each retribution - // we need to restart the retribution procedure to claim our just reward. - err = b.retributionStore.ForAll(func(ret *retributionInfo) error { - // Register for a notification when the breach transaction is confirmed - // on chain. - breachTXID := &ret.commitHash - confChan, err := b.notifier.RegisterConfirmationsNtfn(breachTXID, 1, - uint32(currentHeight)) - if err != nil { - brarLog.Errorf("unable to register for conf updates for txid: "+ - "%v, err: %v", breachTXID, err) - return err + // Deterministically reconstruct channel close summary from + // persisted retribution information and record in breach close + // summaries map under the corresponding channel point. + closeSummary := channeldb.ChannelCloseSummary{ + ChanPoint: ret.chanPoint, + ClosingTXID: ret.commitHash, + RemotePub: &ret.remoteIdentity, + Capacity: ret.capacity, + SettledBalance: ret.settledBalance, + CloseType: channeldb.BreachClose, + IsPending: true, } - - // Launch a new goroutine which to finalize the channel retribution - // after the breach transaction confirms. - b.wg.Add(1) - go b.exactRetribution(confChan, ret) + closeSummaries[ret.chanPoint] = closeSummary return nil }) @@ -147,15 +152,39 @@ func (b *breachArbiter) Start() error { return err } - if len(activeChannels) > 0 { + nActive := len(activeChannels) + if nActive > 0 { brarLog.Infof("Retrieved %v channels from database, watching "+ "with vigilance!", len(activeChannels)) } - // For each of the channels read from disk, we'll create a channel - // state machine in order to watch for any potential channel closures. - channelsToWatch := make([]*lnwallet.LightningChannel, len(activeChannels)) - for i, chanState := range activeChannels { + // Here we will determine a set of channels that will need to be managed + // by the contractObserver. For each of the open channels read from + // disk, we will create a channel state machine that can be used to + // watch for any potential channel closures. We must first exclude any + // channel whose retribution process has been initiated, and proceed to + // mark them as closed. + // The state machines generated for these filtered channels can be + // discarded, as their fate will be placed in the hands of an + // exactRetribution task spawned later. + // + // NOTE Spawning of the exactRetribution task is intentionally postponed + // until after this step in order to ensure that the all breached + // channels are reflected as closed in channeldb and consistent with + // what is checkpointed by the breach arbiter. Instead of treating the + // breached-and-closed and breached-but-still-active channels as + // separate sets of channels, we first + // ensure that all breach-but-still-active channels are promoted to + // breached-and-closed during restart, allowing us to treat them as a + // single set from here on out. This approach also has the added benefit + // of minimizing the likelihood that the wrong number of tasks are + // spawned per breached channel, and prevents us from being in a + // position where + // retribution has completed but the channel is still marked as open in + // channeldb. + channelsToWatch := make([]*lnwallet.LightningChannel, 0, nActive) + for _, chanState := range activeChannels { + // Initialize active channel from persisted channel state. channel, err := lnwallet.NewLightningChannel(nil, b.notifier, b.estimator, chanState) if err != nil { @@ -164,9 +193,73 @@ func (b *breachArbiter) Start() error { return err } - channelsToWatch[i] = channel + // Before marking this as an active channel that the breach + // arbiter should watch, check to see if this channel was + // previously breached. If so, we attempt to reflect this in the + // channeldb by closing the channel. Upon success, we continue + // because the channel is no longer open, and thus does not need + // to be managed by the contractObserver. + chanPoint := chanState.FundingOutpoint + if closeSummary, ok := closeSummaries[chanPoint]; ok { + // Since this channel should not be open, we immediately + // notify the HTLC switch that this link should be + // closed, and that all activity on the link should + // cease. + b.htlcSwitch.CloseLink( + &chanState.FundingOutpoint, + htlcswitch.CloseBreach, + ) + + // Ensure channeldb is consistent with the persisted + // breach. + err := channel.DeleteState(&closeSummary) + if err != nil { + brarLog.Errorf("unable to delete channel "+ + "state: %v", err) + return err + } + + // Now that this channel is both breached _and_ closed, + // we can skip adding it to the `channelsToWatch` since + // we can begin the retribution process immediately. + continue + } + + // Finally, add this channel to breach arbiter's list of + // channels to watch. + channelsToWatch = append(channelsToWatch, channel) + } + // Trim channels in the event that some were filtered. + channelsToWatch = channelsToWatch[:] + + // TODO(roasbeef): instead use closure height of channel + _, currentHeight, err := b.chainIO.GetBestBlock() + if err != nil { + return err } + // Spawn the exactRetribution tasks to monitor and resolve any breaches + // that were loaded from the retribution store. + for chanPoint, closeSummary := range closeSummaries { + // Register for a notification when the breach transaction is + // confirmed on chain. + breachTXID := closeSummary.ClosingTXID + confChan, err := b.notifier.RegisterConfirmationsNtfn( + &breachTXID, 1, uint32(currentHeight)) + if err != nil { + brarLog.Errorf("unable to register for conf updates "+ + "for txid: %v, err: %v", breachTXID, err) + return err + } + + // Launch a new goroutine which to finalize the channel + // retribution after the breach transaction confirms. + retInfo := breachRetInfos[chanPoint] + b.wg.Add(1) + go b.exactRetribution(confChan, &retInfo) + } + + // Start watching the remaining active channels! b.wg.Add(1) go b.contractObserver(channelsToWatch) @@ -191,34 +284,45 @@ func (b *breachArbiter) Start() error { brarLog.Infof("Watching for the closure of ChannelPoint(%v)", pendingClose.ChanPoint) - chanPoint := &pendingClose.ChanPoint - closeTXID := &pendingClose.ClosingTXID + closeTXID := pendingClose.ClosingTXID confNtfn, err := b.notifier.RegisterConfirmationsNtfn( - closeTXID, 1, uint32(currentHeight), + &closeTXID, 1, uint32(currentHeight), ) if err != nil { return err } - go func() { + b.wg.Add(1) + go func(chanPoint wire.OutPoint) { + defer b.wg.Done() + // In the case that the ChainNotifier is shutting down, // all subscriber notification channels will be closed, // generating a nil receive. - confInfo, ok := <-confNtfn.Confirmed - if !ok { + select { + case confInfo, ok := <-confNtfn.Confirmed: + if !ok { + return + } + + brarLog.Infof("ChannelPoint(%v) is "+ + "fully closed, at height: %v", + chanPoint, confInfo.BlockHeight) + + // TODO(roasbeef): need to store + // UnilateralCloseSummary on disk so can + // possibly sweep output here + + err := b.db.MarkChanFullyClosed(&chanPoint) + if err != nil { + brarLog.Errorf("unable to mark chan "+ + "as closed: %v", err) + } + + case <-b.quit: return } - - brarLog.Infof("ChannelPoint(%v) is fully closed, "+ - "at height: %v", chanPoint, confInfo.BlockHeight) - - // TODO(roasbeef): need to store UnilateralCloseSummary - // on disk so can possibly sweep output here - - if err := b.db.MarkChanFullyClosed(chanPoint); err != nil { - brarLog.Errorf("unable to mark chan as closed: %v", err) - } - }() + }(pendingClose.ChanPoint) } return nil @@ -248,7 +352,9 @@ func (b *breachArbiter) Stop() error { // channel into the daemon's wallet. // // NOTE: This MUST be run as a goroutine. -func (b *breachArbiter) contractObserver(activeChannels []*lnwallet.LightningChannel) { +func (b *breachArbiter) contractObserver( + activeChannels []*lnwallet.LightningChannel) { + defer b.wg.Done() // For each active channel found within the database, we launch a @@ -273,7 +379,8 @@ out: case breachInfo := <-b.breachedContracts: _, currentHeight, err := b.chainIO.GetBestBlock() if err != nil { - brarLog.Errorf("unable to get best height: %v", err) + brarLog.Errorf( + "unable to get best height: %v", err) } // A new channel contract has just been breached! We @@ -286,28 +393,26 @@ out: breachTXID, 1, uint32(currentHeight), ) if err != nil { - brarLog.Errorf("unable to register for conf updates for txid: "+ - "%v, err: %v", breachTXID, err) + brarLog.Errorf("unable to register for conf "+ + "updates for txid: %v, err: %v", + breachTXID, err) continue } - brarLog.Warnf("A channel has been breached with txid: %v. "+ - "Waiting for confirmation, then justice will be served!", - breachTXID) + brarLog.Warnf("A channel has been breached with "+ + "txid: %v. Waiting for confirmation, then "+ + "justice will be served!", breachTXID) - // Persist the pending retribution state to disk. - if err := b.retributionStore.Add(breachInfo); err != nil { - brarLog.Errorf("unable to persist breach info to db: %v", err) - continue - } - - // With the notification registered and retribution state persisted, - // we launch a new goroutine which will finalize the channel - // retribution after the breach transaction has been confirmed. + // With the retribution state persisted, channel close + // persisted, and notification registered, we launch a + // new goroutine which will finalize the channel + // retribution after the breach transaction has been + // confirmed. b.wg.Add(1) go b.exactRetribution(confChan, breachInfo) delete(b.breachObservers, breachInfo.chanPoint) + case contract := <-b.newContracts: // A new channel has just been opened within the // daemon, so we launch a new breachObserver to handle @@ -335,9 +440,10 @@ out: b.wg.Add(1) go b.breachObserver(contract, settleSignal) - // TODO(roasbeef): add doneChan to signal to peer continue - // * peer send over to us on loadActiveChanenls, sync - // until we're aware so no state transitions + // TODO(roasbeef): add doneChan to signal to peer + // continue * peer send over to us on + // loadActiveChanenls, sync until we're aware so no + // state transitions case chanPoint := <-b.settledContracts: // A new channel has been closed either unilaterally or // cooperatively, as a result we no longer need a @@ -371,7 +477,8 @@ out: // the lingering funds within the channel into the daemon's wallet. // // NOTE: This MUST be run as a goroutine. -func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, +func (b *breachArbiter) exactRetribution( + confChan *chainntnfs.ConfirmationEvent, breachInfo *retributionInfo) { defer b.wg.Done() @@ -403,9 +510,11 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, return } - brarLog.Debugf("Broadcasting justice tx: %v", newLogClosure(func() string { - return spew.Sdump(justiceTx) - })) + brarLog.Debugf( + "Broadcasting justice tx: %v", + newLogClosure(func() string { + return spew.Sdump(justiceTx) + })) _, currentHeight, err := b.chainIO.GetBestBlock() if err != nil { @@ -455,16 +564,18 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, brarLog.Errorf("unable to mark chan as closed: %v", err) } - // Justice has been carried out; we can safely delete the retribution - // info from the database. + // Justice has been carried out; we can safely delete the + // retribution info from the database. err = b.retributionStore.Remove(&breachInfo.chanPoint) if err != nil { - brarLog.Errorf("unable to remove retribution from the db: %v", err) + brarLog.Errorf("unable to remove retribution "+ + "from the db: %v", err) } // TODO(roasbeef): add peer to blacklist? - // TODO(roasbeef): close other active channels with offending peer + // TODO(roasbeef): close other active channels with offending + // peer close(breachInfo.doneChan) @@ -488,7 +599,8 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, chanPoint := contract.ChannelPoint() - brarLog.Debugf("Breach observer for ChannelPoint(%v) started", chanPoint) + brarLog.Debugf( + "Breach observer for ChannelPoint(%v) started", chanPoint) select { // A read from this channel indicates that the contract has been @@ -502,18 +614,32 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, case closeInfo := <-contract.UnilateralClose: // Launch a goroutine to cancel out this contract within the // breachArbiter's main goroutine. + b.wg.Add(1) go func() { - b.settledContracts <- chanPoint + defer b.wg.Done() + + select { + case b.settledContracts <- chanPoint: + case <-b.quit: + } }() // Next, we'll launch a goroutine to wait until the closing // transaction has been confirmed so we can mark the contract - // as resolved in the database. + // as resolved in the database. This go routine is _not_ + // tracked by the breach aribter's wait group since the callback + // may not be executed before shutdown, potentially leading to + // a deadlock. // // TODO(roasbeef): also notify utxoNursery, might've had // outbound HTLC's in flight - go waitForChanToClose(uint32(closeInfo.SpendingHeight), b.notifier, - nil, chanPoint, closeInfo.SpenderTxHash, func() { + go waitForChanToClose( + uint32(closeInfo.SpendingHeight), + b.notifier, + nil, + chanPoint, + closeInfo.SpenderTxHash, + func() { // As we just detected a channel was closed via // a unilateral commitment broadcast by the // remote party, we'll need to sweep our main @@ -528,11 +654,14 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, ) if err != nil { brarLog.Errorf("unable to "+ - "generate sweep tx: %v", err) + "generate sweep tx: %v", + err) goto close } - err = b.wallet.PublishTransaction(sweepTx) + err = b.wallet.PublishTransaction( + sweepTx, + ) if err != nil { brarLog.Errorf("unable to "+ "broadcast tx: %v", err) @@ -540,11 +669,14 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, } close: - brarLog.Infof("Force closed ChannelPoint(%v) is "+ - "fully closed, updating DB", chanPoint) + brarLog.Infof("Force closed ChannelPoint(%v) "+ + "is fully closed, updating DB", + chanPoint) - if err := b.db.MarkChanFullyClosed(chanPoint); err != nil { - brarLog.Errorf("unable to mark chan as closed: %v", err) + err := b.db.MarkChanFullyClosed(chanPoint) + if err != nil { + brarLog.Errorf("unable to mark chan "+ + "as closed: %v", err) } }) @@ -563,21 +695,10 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, // links associated with this peer. b.htlcSwitch.CloseLink(chanPoint, htlcswitch.CloseBreach) chanInfo := contract.StateSnapshot() - closeInfo := &channeldb.ChannelCloseSummary{ - ChanPoint: *chanPoint, - ClosingTXID: breachInfo.BreachTransaction.TxHash(), - RemotePub: &chanInfo.RemoteIdentity, - Capacity: chanInfo.Capacity, - SettledBalance: chanInfo.LocalBalance.ToSatoshis(), - CloseType: channeldb.BreachClose, - IsPending: true, - } - if err := contract.DeleteState(closeInfo); err != nil { - brarLog.Errorf("unable to delete channel state: %v", err) - } // TODO(roasbeef): need to handle case of remote broadcast - // mid-local initiated state-transition, possible false-positive? + // mid-local initiated state-transition, possible + // false-positive? // First we generate the witness generation function which will // be used to sweep the output only we can satisfy on the @@ -591,7 +712,8 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, desc.SigHashes = hc desc.InputIndex = inputIndex - return lnwallet.CommitSpendNoDelay(b.wallet.Cfg.Signer, &desc, tx) + return lnwallet.CommitSpendNoDelay( + b.wallet.Cfg.Signer, &desc, tx) } // Next we create the witness generation function that will be @@ -606,32 +728,67 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, desc.SigHashes = hc desc.InputIndex = inputIndex - return lnwallet.CommitSpendRevoke(b.wallet.Cfg.Signer, &desc, tx) + return lnwallet.CommitSpendRevoke( + b.wallet.Cfg.Signer, &desc, tx) } - // Finally, we send the retribution information into the breachArbiter - // event loop to deal swift justice. + // Assemble the retribution information that parameterizes the + // construction of transactions required to correct the breach. // TODO(roasbeef): populate htlc breaches - b.breachedContracts <- &retributionInfo{ + retInfo := &retributionInfo{ commitHash: breachInfo.BreachTransaction.TxHash(), chanPoint: *chanPoint, + remoteIdentity: chanInfo.RemoteIdentity, + capacity: chanInfo.Capacity, + settledBalance: chanInfo.LocalBalance.ToSatoshis(), + selfOutput: &breachedOutput{ amt: btcutil.Amount(localSignDesc.Output.Value), outpoint: breachInfo.LocalOutpoint, signDescriptor: localSignDesc, - witnessType: localWitnessType, + witnessType: lnwallet.CommitmentNoDelay, + witnessFunc: localWitness, }, revokedOutput: &breachedOutput{ amt: btcutil.Amount(remoteSignDesc.Output.Value), outpoint: breachInfo.RemoteOutpoint, signDescriptor: remoteSignDesc, - witnessType: remoteWitnessType, + witnessType: lnwallet.CommitmentRevoke, + witnessFunc: remoteWitness, }, htlcOutputs: []*breachedOutput{}, - doneChan: make(chan struct{}), + + doneChan: make(chan struct{}), + } + + // Persist the pending retribution state to disk. + if err := b.retributionStore.Add(retInfo); err != nil { + brarLog.Errorf("unable to persist "+ + "retribution info to db: %v", err) + } + + closeInfo := &channeldb.ChannelCloseSummary{ + ChanPoint: *chanPoint, + ClosingTXID: breachInfo.BreachTransaction.TxHash(), + RemotePub: &chanInfo.RemoteIdentity, + Capacity: chanInfo.Capacity, + SettledBalance: chanInfo.LocalBalance.ToSatoshis(), + CloseType: channeldb.BreachClose, + IsPending: true, + } + if err := contract.DeleteState(closeInfo); err != nil { + brarLog.Errorf( + "unable to delete channel state: %v", err) + } + + // Finally, we send the retribution information into the + // breachArbiter event loop to deal swift justice. + select { + case b.breachedContracts <- retInfo: + case <-b.quit: } case <-b.quit: @@ -646,8 +803,9 @@ type breachedOutput struct { amt btcutil.Amount outpoint wire.OutPoint - signDescriptor *lnwallet.SignDescriptor + signDescriptor lnwallet.SignDescriptor witnessType lnwallet.WitnessType + witnessFunc lnwallet.WitnessGenerator twoStageClaim bool } @@ -661,6 +819,14 @@ type retributionInfo struct { commitHash chainhash.Hash chanPoint wire.OutPoint + // Fields copied from channel snapshot when a breach is detected. This + // is necessary for deterministically constructing the channel close + // summary in the event that the breach arbiter crashes before closing + // the channel. + remoteIdentity btcec.PublicKey + capacity btcutil.Amount + settledBalance btcutil.Amount + selfOutput *breachedOutput revokedOutput *breachedOutput @@ -674,7 +840,9 @@ type retributionInfo struct { // the funds within the channel which we are now entitled to due to a breach of // the channel's contract by the counterparty. This function returns a *fully* // signed transaction with the witness for each input fully in place. -func (b *breachArbiter) createJusticeTx(r *retributionInfo) (*wire.MsgTx, error) { +func (b *breachArbiter) createJusticeTx( + r *retributionInfo) (*wire.MsgTx, error) { + // First, we obtain a new public key script from the wallet which we'll // sweep the funds to. // TODO(roasbeef): possibly create many outputs to minimize change in @@ -684,8 +852,19 @@ func (b *breachArbiter) createJusticeTx(r *retributionInfo) (*wire.MsgTx, error) return nil, err } - // Before creating the actual TxOut, we'll need to calculate the proper fee - // to attach to the transaction to ensure a timely confirmation. + r.selfOutput.witnessFunc = r.selfOutput.witnessType.GenWitnessFunc( + &b.wallet.Cfg.Signer, &r.selfOutput.signDescriptor) + + r.revokedOutput.witnessFunc = r.revokedOutput.witnessType.GenWitnessFunc( + &b.wallet.Cfg.Signer, &r.revokedOutput.signDescriptor) + + for i := range r.htlcOutputs { + r.htlcOutputs[i].witnessFunc = r.htlcOutputs[i].witnessType.GenWitnessFunc( + &b.wallet.Cfg.Signer, &r.htlcOutputs[i].signDescriptor) + } + + // Before creating the actual TxOut, we'll need to calculate the proper + // fee to attach to the transaction to ensure a timely confirmation. // TODO(roasbeef): remove hard-coded fee totalAmt := r.selfOutput.amt + r.revokedOutput.amt sweepedAmt := int64(totalAmt - 5000) @@ -711,17 +890,13 @@ func (b *breachArbiter) createJusticeTx(r *retributionInfo) (*wire.MsgTx, error) // witnesses for both commitment outputs, and all the pending HTLCs at // this state in the channel's history. // TODO(roasbeef): handle the 2-layer HTLCs - localWitnessFunc := r.selfOutput.witnessType.GenWitnessFunc( - &b.wallet.Signer, r.selfOutput.signDescriptor) - localWitness, err := localWitnessFunc(justiceTx, hashCache, 0) + localWitness, err := r.selfOutput.witnessFunc(justiceTx, hashCache, 0) if err != nil { return nil, err } justiceTx.TxIn[0].Witness = localWitness - remoteWitnessFunc := r.revokedOutput.witnessType.GenWitnessFunc( - &b.wallet.Signer, r.revokedOutput.signDescriptor) - remoteWitness, err := remoteWitnessFunc(justiceTx, hashCache, 1) + remoteWitness, err := r.revokedOutput.witnessFunc(justiceTx, hashCache, 1) if err != nil { return nil, err } @@ -738,7 +913,9 @@ func (b *breachArbiter) createJusticeTx(r *retributionInfo) (*wire.MsgTx, error) // TODO(roasbeef): alternative options // * leave the output in the chain, use as input to future funding tx // * leave output in the chain, extend wallet to add knowledge of how to claim -func (b *breachArbiter) craftCommitSweepTx(closeInfo *lnwallet.UnilateralCloseSummary) (*wire.MsgTx, error) { +func (b *breachArbiter) craftCommitSweepTx( + closeInfo *lnwallet.UnilateralCloseSummary) (*wire.MsgTx, error) { + // First, we'll fetch a fresh script that we can use to sweep the funds // under the control of the wallet. sweepPkScript, err := newSweepPkScript(b.wallet) @@ -796,33 +973,32 @@ func (b *breachArbiter) craftCommitSweepTx(closeInfo *lnwallet.UnilateralCloseSu return sweepTx, nil } -// breachedOutput contains all the information needed to sweep a breached -// output. A breached output is an output that we are now entitled to due to a -// revoked commitment transaction being broadcast. -type breachedOutput struct { - amt btcutil.Amount - outpoint wire.OutPoint +// RetributionStore provides an interface for managing a persistent map from +// wire.OutPoint -> retributionInfo. Upon learning of a breach, a BreachArbiter +// should record the retributionInfo for the breached channel, which serves a +// checkpoint in the event that retribution needs to be resumed after failure. +// A RetributionStore provides an interface for managing the persisted set, as +// well as mapping user defined functions over the entire on-disk contents. +// +// Calls to RetributionStore may occur concurrently. A concrete instance of +// RetributionStore should use appropriate synchronization primitives, or +// be otherwise safe for concurrent access. +type RetributionStore interface { - signDescriptor *lnwallet.SignDescriptor - witnessType lnwallet.WitnessType + // Add persists the retributionInfo to disk, using the information's + // chanPoint as the key. This method should overwrite any existing + // entires found under the same key, and an error should be raised if + // the addition fails. + Add(retInfo *retributionInfo) error - twoStageClaim bool -} + // Remove deletes the retributionInfo from disk, if any exists, under + // the given key. An error should be re raised if the removal fails. + Remove(key *wire.OutPoint) error -// retribution encapsulates all the data needed to sweep all the contested -// funds within a channel whose contract has been breached by the prior -// counterparty. This struct is used to create the justice transaction which -// spends all outputs of the commitment transaction into an output controlled -// by the wallet. -type retributionInfo struct { - commitHash chainhash.Hash - chanPoint wire.OutPoint - - selfOutput *breachedOutput - revokedOutput *breachedOutput - htlcOutputs []*breachedOutput - - doneChan chan struct{} + // ForAll iterates over the existing on-disk contents and applies a + // chosen, read-only callback to each. This method should ensure that it + // immediately propagate any errors generated by the callback. + ForAll(cb func(*retributionInfo) error) error } // retributionStore handles persistence of retribution states to disk and is @@ -844,8 +1020,8 @@ func newRetributionStore(db *channeldb.DB) *retributionStore { // to disk. func (rs *retributionStore) Add(ret *retributionInfo) error { return rs.db.Update(func(tx *bolt.Tx) error { - // If this is our first contract breach, the retributionBucket won't - // exist, in which case, we just create a new bucket. + // If this is our first contract breach, the retributionBucket + // won't exist, in which case, we just create a new bucket. retBucket, err := tx.CreateBucketIfNotExists(retributionBucket) if err != nil { return err @@ -861,7 +1037,10 @@ func (rs *retributionStore) Add(ret *retributionInfo) error { return err } - if err := retBucket.Put(outBuf.Bytes(), retBuf.Bytes()); err != nil { + if err := retBucket.Put( + outBuf.Bytes(), + retBuf.Bytes(), + ); err != nil { return err } @@ -874,12 +1053,13 @@ func (rs *retributionStore) Remove(key *wire.OutPoint) error { return rs.db.Update(func(tx *bolt.Tx) error { retBucket := tx.Bucket(retributionBucket) - // We return an error if the bucket is not already created, since normal - // operation of the breach arbiter should never try to remove a - // finalized retribution state that is not already stored in the db. + // We return an error if the bucket is not already created, + // since normal operation of the breach arbiter should never try + // to remove a finalized retribution state that is not already + // stored in the db. if retBucket == nil { - return errors.New("unable to remove retribution because the " + - "db bucket doesn't exist.") + return errors.New("unable to remove retribution " + + "because the db bucket doesn't exist.") } var outBuf bytes.Buffer @@ -899,17 +1079,21 @@ func (rs *retributionStore) Remove(key *wire.OutPoint) error { // callback function on each retribution. func (rs *retributionStore) ForAll(cb func(*retributionInfo) error) error { return rs.db.View(func(tx *bolt.Tx) error { - // If the bucket does not exist, then there are no pending retributions. + // If the bucket does not exist, then there are no pending + // retributions. retBucket := tx.Bucket(retributionBucket) if retBucket == nil { return nil } - // Otherwise, we fetch each serialized retribution info, deserialize - // it, and execute the passed in callback function on it. + // Otherwise, we fetch each serialized retribution info, + // deserialize it, and execute the passed in callback function + // on it. return retBucket.ForEach(func(outBytes, retBytes []byte) error { ret := &retributionInfo{} - if err := ret.Decode(bytes.NewBuffer(retBytes)); err != nil { + if err := ret.Decode( + bytes.NewBuffer(retBytes), + ); err != nil { return err } @@ -920,6 +1104,8 @@ func (rs *retributionStore) ForAll(cb func(*retributionInfo) error) error { // Encode serializes the retribution into the passed byte stream. func (ret *retributionInfo) Encode(w io.Writer) error { + var scratch [8]byte + if _, err := w.Write(ret.commitHash[:]); err != nil { return err } @@ -928,6 +1114,21 @@ func (ret *retributionInfo) Encode(w io.Writer) error { return err } + if _, err := w.Write( + ret.remoteIdentity.SerializeCompressed()); err != nil { + return err + } + + binary.BigEndian.PutUint64(scratch[:8], uint64(ret.capacity)) + if _, err := w.Write(scratch[:8]); err != nil { + return err + } + + binary.BigEndian.PutUint64(scratch[:8], uint64(ret.settledBalance)) + if _, err := w.Write(scratch[:8]); err != nil { + return err + } + if err := ret.selfOutput.Encode(w); err != nil { return err } @@ -952,12 +1153,12 @@ func (ret *retributionInfo) Encode(w io.Writer) error { // Dencode deserializes a retribution from the passed byte stream. func (ret *retributionInfo) Decode(r io.Reader) error { - var scratch [32]byte + var scratch [33]byte - if _, err := io.ReadFull(r, scratch[:]); err != nil { + if _, err := io.ReadFull(r, scratch[:32]); err != nil { return err } - hash, err := chainhash.NewHash(scratch[:]) + hash, err := chainhash.NewHash(scratch[:32]) if err != nil { return err } @@ -967,6 +1168,26 @@ func (ret *retributionInfo) Decode(r io.Reader) error { return err } + if _, err = io.ReadFull(r, scratch[:33]); err != nil { + return err + } + remoteIdentity, err := btcec.ParsePubKey(scratch[:33], btcec.S256()) + if err != nil { + return err + } + ret.remoteIdentity = *remoteIdentity + + if _, err := io.ReadFull(r, scratch[:8]); err != nil { + return err + } + ret.capacity = btcutil.Amount(binary.BigEndian.Uint64(scratch[:8])) + + if _, err := io.ReadFull(r, scratch[:8]); err != nil { + return err + } + ret.settledBalance = btcutil.Amount( + binary.BigEndian.Uint64(scratch[:8])) + ret.selfOutput = &breachedOutput{} if err := ret.selfOutput.Decode(r); err != nil { return err @@ -1007,7 +1228,8 @@ func (bo *breachedOutput) Encode(w io.Writer) error { return err } - if err := lnwallet.WriteSignDescriptor(w, bo.signDescriptor); err != nil { + if err := lnwallet.WriteSignDescriptor( + w, &bo.signDescriptor); err != nil { return err } @@ -1041,16 +1263,16 @@ func (bo *breachedOutput) Decode(r io.Reader) error { return err } - signDescriptor := lnwallet.SignDescriptor{} - if err := lnwallet.ReadSignDescriptor(r, &signDescriptor); err != nil { + if err := lnwallet.ReadSignDescriptor( + r, &bo.signDescriptor); err != nil { return err } - bo.signDescriptor = &signDescriptor if _, err := io.ReadFull(r, scratch[:2]); err != nil { return err } - bo.witnessType = lnwallet.WitnessType(binary.BigEndian.Uint16(scratch[:2])) + bo.witnessType = lnwallet.WitnessType( + binary.BigEndian.Uint16(scratch[:2])) if _, err := io.ReadFull(r, scratch[:1]); err != nil { return err diff --git a/breacharbiter_test.go b/breacharbiter_test.go index d2732d8fa..74722beba 100644 --- a/breacharbiter_test.go +++ b/breacharbiter_test.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "os" "reflect" + "sync" "testing" "github.com/lightningnetwork/lnd/channeldb" @@ -77,7 +78,7 @@ var ( breachSignDescs = []lnwallet.SignDescriptor{ { - PrivateTweak: []byte{ + SingleTweak: []byte{ 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, @@ -107,7 +108,7 @@ var ( HashType: txscript.SigHashAll, }, { - PrivateTweak: []byte{ + SingleTweak: []byte{ 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, @@ -137,7 +138,7 @@ var ( HashType: txscript.SigHashAll, }, { - PrivateTweak: []byte{ + SingleTweak: []byte{ 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, @@ -199,10 +200,12 @@ var ( 0x4f, 0x2f, 0x6f, 0x25, 0x88, 0xa3, 0xef, 0xb9, 0x6a, 0x49, 0x18, 0x83, 0x31, 0x98, 0x47, 0x53, }, - chanPoint: breachOutPoints[0], - selfOutput: &breachedOutputs[0], - revokedOutput: &breachedOutputs[1], - htlcOutputs: []*breachedOutput{}, + chanPoint: breachOutPoints[0], + capacity: btcutil.Amount(1e7), + settledBalance: btcutil.Amount(1e7), + selfOutput: &breachedOutputs[0], + revokedOutput: &breachedOutputs[1], + htlcOutputs: []*breachedOutput{}, }, { commitHash: [chainhash.HashSize]byte{ @@ -211,9 +214,11 @@ var ( 0x2d, 0xe7, 0x93, 0xe4, 0xb7, 0x25, 0xb8, 0x4d, 0x1f, 0xb, 0x4c, 0xf9, 0x9e, 0xc5, 0x8c, 0xe9, }, - chanPoint: breachOutPoints[1], - selfOutput: &breachedOutputs[0], - revokedOutput: &breachedOutputs[1], + chanPoint: breachOutPoints[1], + capacity: btcutil.Amount(1e7), + settledBalance: btcutil.Amount(1e7), + selfOutput: &breachedOutputs[0], + revokedOutput: &breachedOutputs[1], htlcOutputs: []*breachedOutput{ &breachedOutputs[1], &breachedOutputs[2], @@ -224,7 +229,7 @@ var ( // Parse the pubkeys in the breached outputs. func initBreachedOutputs() error { - for i := 0; i < len(breachedOutputs); i++ { + for i := range breachedOutputs { bo := &breachedOutputs[i] // Parse the sign descriptor's pubkey. @@ -234,7 +239,7 @@ func initBreachedOutputs() error { return fmt.Errorf("unable to parse pubkey: %v", breachKeys[i]) } sd.PubKey = pubkey - bo.signDescriptor = sd + bo.signDescriptor = *sd } return nil @@ -278,6 +283,12 @@ func TestRetributionSerialization(t *testing.T) { for i := 0; i < len(retributions); i++ { ret := &retributions[i] + remoteIdentity, err := btcec.ParsePubKey(breachKeys[i], btcec.S256()) + if err != nil { + t.Fatalf("unable to parse public key [%v]: %v", i, err) + } + ret.remoteIdentity = *remoteIdentity + var buf bytes.Buffer if err := ret.Encode(&buf); err != nil { @@ -298,37 +309,104 @@ func TestRetributionSerialization(t *testing.T) { } } -// TODO(phlip9): reuse existing function? -// makeTestDB creates a new instance of the ChannelDB for testing purposes. A -// callback which cleans up the created temporary directories is also returned -// and intended to be executed after the test completes. -func makeTestDB() (*channeldb.DB, func(), error) { - var db *channeldb.DB +// copyRetInfo creates a complete copy of the given retributionInfo. +func copyRetInfo(retInfo *retributionInfo) *retributionInfo { + ret := &retributionInfo{ + commitHash: retInfo.commitHash, + chanPoint: retInfo.chanPoint, + remoteIdentity: retInfo.remoteIdentity, + capacity: retInfo.capacity, + settledBalance: retInfo.settledBalance, + selfOutput: retInfo.selfOutput, + revokedOutput: retInfo.revokedOutput, + htlcOutputs: make([]*breachedOutput, len(retInfo.htlcOutputs)), + doneChan: make(chan struct{}), + } + for i, htlco := range retInfo.htlcOutputs { + ret.htlcOutputs[i] = htlco + } + + return ret +} + +// mockRetributionStore implements the RetributionStore interface and is backed +// by an in-memory map. Access to the internal state is provided by a mutex. +// TODO(cfromknecht) extend to support and test controlled failures. +type mockRetributionStore struct { + mu sync.Mutex + state map[wire.OutPoint]*retributionInfo +} + +func newMockRetributionStore() *mockRetributionStore { + return &mockRetributionStore{ + mu: sync.Mutex{}, + state: make(map[wire.OutPoint]*retributionInfo), + } +} + +func (rs *mockRetributionStore) Add(retInfo *retributionInfo) error { + rs.mu.Lock() + rs.state[retInfo.chanPoint] = copyRetInfo(retInfo) + rs.mu.Unlock() + + return nil +} + +func (rs *mockRetributionStore) Remove(key *wire.OutPoint) error { + rs.mu.Lock() + delete(rs.state, *key) + rs.mu.Unlock() + + return nil +} + +func (rs *mockRetributionStore) ForAll(cb func(*retributionInfo) error) error { + rs.mu.Lock() + defer rs.mu.Unlock() + + for _, retInfo := range rs.state { + if err := cb(copyRetInfo(retInfo)); err != nil { + return err + } + } + + return nil +} + +// TestMockRetributionStore instantiates a mockRetributionStore and tests its +// behavior using the general RetributionStore test suite. +func TestMockRetributionStore(t *testing.T) { + mrs := newMockRetributionStore() + testRetributionStore(mrs, t) +} + +// TestChannelDBRetributionStore instantiates a retributionStore backed by a +// channeldb.DB, and tests its behavior using the general RetributionStore test +// suite. +func TestChannelDBRetributionStore(t *testing.T) { // First, create a temporary directory to be used for the duration of // this test. tempDirName, err := ioutil.TempDir("", "channeldb") if err != nil { - return nil, nil, err + t.Fatalf("unable to initialize temp directory for channeldb: %v", err) } + defer os.RemoveAll(tempDirName) // Next, create channeldb for the first time. - db, err = channeldb.Open(tempDirName) + db, err := channeldb.Open(tempDirName) if err != nil { - return nil, nil, err + t.Fatalf("unable to open channeldb: %v", err) } + defer db.Close() - cleanUp := func() { - if db != nil { - db.Close() - } - os.RemoveAll(tempDirName) - } - - return db, cleanUp, nil + // Finally, instantiate retribution store and execute RetributionStore test + // suite. + rs := newRetributionStore(db) + testRetributionStore(rs, t) } -func countRetributions(t *testing.T, rs *retributionStore) int { +func countRetributions(t *testing.T, rs RetributionStore) int { count := 0 err := rs.ForAll(func(_ *retributionInfo) error { count++ @@ -341,32 +419,29 @@ func countRetributions(t *testing.T, rs *retributionStore) int { } // Test that the retribution persistence layer works. -func TestRetributionStore(t *testing.T) { - db, cleanUp, err := makeTestDB() - defer cleanUp() - if err != nil { - t.Fatalf("unable to create test db: %v", err) - } - +func testRetributionStore(rs RetributionStore, t *testing.T) { if err := initBreachedOutputs(); err != nil { t.Fatalf("unable to init breached outputs: %v", err) } - rs := newRetributionStore(db) - // Make sure that a new retribution store is actually emtpy. if count := countRetributions(t, rs); count != 0 { t.Fatalf("expected 0 retributions, found %v", count) } - // Add some retribution states to the store. + // Add first retribution state to the store. if err := rs.Add(&retributions[0]); err != nil { t.Fatalf("unable to add to retribution store: %v", err) } + // Ensure that the retribution store has one retribution. + if count := countRetributions(t, rs); count != 1 { + t.Fatalf("expected 1 retributions, found %v", count) + } + + // Add second retribution state to the store. if err := rs.Add(&retributions[1]); err != nil { t.Fatalf("unable to add to retribution store: %v", err) } - // There should be 2 retributions in the store. if count := countRetributions(t, rs); count != 2 { t.Fatalf("expected 2 retributions, found %v", count) @@ -387,6 +462,11 @@ func TestRetributionStore(t *testing.T) { if err := rs.Remove(&retributions[0].chanPoint); err != nil { t.Fatalf("unable to remove from retribution store: %v", err) } + // Ensure that the retribution store has one retribution. + if count := countRetributions(t, rs); count != 1 { + t.Fatalf("expected 1 retributions, found %v", count) + } + if err := rs.Remove(&retributions[1].chanPoint); err != nil { t.Fatalf("unable to remove from retribution store: %v", err) }