From f99933fa69bee111a698538d4e9a4a74fbee6999 Mon Sep 17 00:00:00 2001 From: eugene Date: Thu, 13 Jan 2022 14:45:10 -0500 Subject: [PATCH] server+contractcourt: add breachResolver that subscribes to breacharbiter Introduces a breachResolver that subscribes to the breacharbiter to determine if the final justice transaction has confirmed and can clean itself up. --- contractcourt/breach_resolver.go | 121 ++++++++++++++++++++++++++++++ contractcourt/breacharbiter.go | 56 +++++++++++++- contractcourt/chain_arbitrator.go | 6 ++ server.go | 29 +++---- 4 files changed, 196 insertions(+), 16 deletions(-) create mode 100644 contractcourt/breach_resolver.go diff --git a/contractcourt/breach_resolver.go b/contractcourt/breach_resolver.go new file mode 100644 index 000000000..922ec609a --- /dev/null +++ b/contractcourt/breach_resolver.go @@ -0,0 +1,121 @@ +package contractcourt + +import ( + "encoding/binary" + "io" + + "github.com/lightningnetwork/lnd/channeldb" +) + +// breachResolver is a resolver that will handle breached closes. In the +// future, this will likely take over the duties the current breacharbiter has. +type breachResolver struct { + // resolved reflects if the contract has been fully resolved or not. + resolved bool + + // subscribed denotes whether or not the breach resolver has subscribed + // to the breacharbiter for breach resolution. + subscribed bool + + // replyChan is closed when the breach arbiter has completed serving + // justice. + replyChan chan struct{} + + contractResolverKit +} + +// newBreachResolver instantiates a new breach resolver. +func newBreachResolver(resCfg ResolverConfig) *breachResolver { + r := &breachResolver{ + contractResolverKit: *newContractResolverKit(resCfg), + replyChan: make(chan struct{}), + } + + r.initLogger(r) + + return r +} + +// ResolverKey returns the unique identifier for this resolver. +func (b *breachResolver) ResolverKey() []byte { + key := newResolverID(b.ChanPoint) + return key[:] +} + +// Resolve queries the breacharbiter to see if the justice transaction has been +// broadcast. +func (b *breachResolver) Resolve() (ContractResolver, error) { + if !b.subscribed { + complete, err := b.SubscribeBreachComplete( + &b.ChanPoint, b.replyChan, + ) + if err != nil { + return nil, err + } + + // If the breach resolution process is already complete, then + // we can cleanup and checkpoint the resolved state. + if complete { + b.resolved = true + return nil, b.Checkpoint(b) + } + + // Prevent duplicate subscriptions. + b.subscribed = true + } + + select { + case <-b.replyChan: + // The replyChan has been closed, signalling that the breach + // has been fully resolved. Checkpoint the resolved state and + // exit. + b.resolved = true + return nil, b.Checkpoint(b) + case <-b.quit: + } + + return nil, errResolverShuttingDown +} + +// Stop signals the breachResolver to stop. +func (b *breachResolver) Stop() { + close(b.quit) +} + +// IsResolved returns true if the breachResolver is fully resolved and cleanup +// can occur. +func (b *breachResolver) IsResolved() bool { + return b.resolved +} + +// SupplementState adds additional state to the breachResolver. +func (b *breachResolver) SupplementState(_ *channeldb.OpenChannel) { +} + +// Encode encodes the breachResolver to the passed writer. +func (b *breachResolver) Encode(w io.Writer) error { + return binary.Write(w, endian, b.resolved) +} + +// newBreachResolverFromReader attempts to decode an encoded breachResolver +// from the passed Reader instance, returning an active breachResolver. +func newBreachResolverFromReader(r io.Reader, resCfg ResolverConfig) ( + *breachResolver, error) { + + b := &breachResolver{ + contractResolverKit: *newContractResolverKit(resCfg), + replyChan: make(chan struct{}), + } + + if err := binary.Read(r, endian, &b.resolved); err != nil { + return nil, err + } + + b.initLogger(b) + + return b, nil +} + +// A compile time assertion to ensure breachResolver meets the ContractResolver +// interface. +var _ ContractResolver = (*breachResolver)(nil) diff --git a/contractcourt/breacharbiter.go b/contractcourt/breacharbiter.go index 6d20580a7..b1daa857d 100644 --- a/contractcourt/breacharbiter.go +++ b/contractcourt/breacharbiter.go @@ -185,6 +185,8 @@ type BreachArbiter struct { cfg *BreachConfig + subscriptions map[wire.OutPoint]chan struct{} + quit chan struct{} wg sync.WaitGroup sync.Mutex @@ -194,8 +196,9 @@ type BreachArbiter struct { // its dependent objects. func NewBreachArbiter(cfg *BreachConfig) *BreachArbiter { return &BreachArbiter{ - cfg: cfg, - quit: make(chan struct{}), + cfg: cfg, + subscriptions: make(map[wire.OutPoint]chan struct{}), + quit: make(chan struct{}), } } @@ -322,6 +325,47 @@ func (b *BreachArbiter) IsBreached(chanPoint *wire.OutPoint) (bool, error) { return b.cfg.Store.IsBreached(chanPoint) } +// SubscribeBreachComplete is used by outside subsystems to be notified of a +// successful breach resolution. +func (b *BreachArbiter) SubscribeBreachComplete(chanPoint *wire.OutPoint, + c chan struct{}) (bool, error) { + + breached, err := b.cfg.Store.IsBreached(chanPoint) + if err != nil { + // If an error occurs, no subscription will be registered. + return false, err + } + + if !breached { + // If chanPoint no longer exists in the Store, then the breach + // was cleaned up successfully. Any subscription that occurs + // happens after the breach information was persisted to the + // underlying store. + return true, nil + } + + // Otherwise since the channel point is not resolved, add a + // subscription. There can only be one subscription per channel point. + b.Lock() + defer b.Unlock() + b.subscriptions[*chanPoint] = c + + return false, nil +} + +// notifyBreachComplete is used by the BreachArbiter to notify outside +// subsystems that the breach resolution process is complete. +func (b *BreachArbiter) notifyBreachComplete(chanPoint *wire.OutPoint) { + b.Lock() + defer b.Unlock() + if c, ok := b.subscriptions[*chanPoint]; ok { + close(c) + } + + // Remove the subscription. + delete(b.subscriptions, *chanPoint) +} + // contractObserver is the primary goroutine for the BreachArbiter. This // goroutine is responsible for handling breach events coming from the // contractcourt on the ContractBreaches channel. If a channel breach is @@ -857,6 +901,14 @@ func (b *BreachArbiter) cleanupBreach(chanPoint *wire.OutPoint) error { err) } + // This is after the Remove call so that the chan passed in via + // SubscribeBreachComplete is always notified, no matter when it is + // called. Otherwise, if notifyBreachComplete was before Remove, a + // very rare edge case could occur in which SubscribeBreachComplete + // is called after notifyBreachComplete and before Remove, meaning the + // caller would never be notified. + b.notifyBreachComplete(chanPoint) + return nil } diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index a39d939f6..c434513d7 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -183,6 +183,12 @@ type ChainArbitratorConfig struct { // Clock is the clock implementation that ChannelArbitrator uses. // It is useful for testing. Clock clock.Clock + + // SubscribeBreachComplete is used by the breachResolver to register a + // subscription that notifies when the breach resolution process is + // complete. + SubscribeBreachComplete func(op *wire.OutPoint, c chan struct{}) ( + bool, error) } // ChainArbitrator is a sub-system that oversees the on-chain resolution of all diff --git a/server.go b/server.go index e401e1705..40c1a4a24 100644 --- a/server.go +++ b/server.go @@ -1026,6 +1026,20 @@ func newServer(cfg *Config, listenAddrs []net.Addr, // breach events from the ChannelArbitrator to the breachArbiter, contractBreaches := make(chan *contractcourt.ContractBreachEvent, 1) + s.breachArbiter = contractcourt.NewBreachArbiter(&contractcourt.BreachConfig{ + CloseLink: closeLink, + DB: s.chanStateDB, + Estimator: s.cc.FeeEstimator, + GenSweepScript: newSweepPkScriptGen(cc.Wallet), + Notifier: cc.ChainNotifier, + PublishTransaction: cc.Wallet.PublishTransaction, + ContractBreaches: contractBreaches, + Signer: cc.Wallet.Cfg.Signer, + Store: contractcourt.NewRetributionStore( + dbs.ChanStateDB, + ), + }) + s.chainArb = contractcourt.NewChainArbitrator(contractcourt.ChainArbitratorConfig{ ChainHash: *s.cfg.ActiveNetParams.GenesisHash, IncomingBroadcastDelta: lncfg.DefaultIncomingBroadcastDelta, @@ -1125,22 +1139,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr, PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod, IsForwardedHTLC: s.htlcSwitch.IsForwardedHTLC, Clock: clock.NewDefaultClock(), + SubscribeBreachComplete: s.breachArbiter.SubscribeBreachComplete, }, dbs.ChanStateDB) - s.breachArbiter = contractcourt.NewBreachArbiter(&contractcourt.BreachConfig{ - CloseLink: closeLink, - DB: s.chanStateDB, - Estimator: s.cc.FeeEstimator, - GenSweepScript: newSweepPkScriptGen(cc.Wallet), - Notifier: cc.ChainNotifier, - PublishTransaction: cc.Wallet.PublishTransaction, - ContractBreaches: contractBreaches, - Signer: cc.Wallet.Cfg.Signer, - Store: contractcourt.NewRetributionStore( - dbs.ChanStateDB, - ), - }) - // Select the configuration and furnding parameters for Bitcoin or // Litecoin, depending on the primary registered chain. primaryChain := cfg.registeredChains.PrimaryChain()