diff --git a/contractcourt/briefcase.go b/contractcourt/briefcase.go index 7d199c5c2..3e58147c6 100644 --- a/contractcourt/briefcase.go +++ b/contractcourt/briefcase.go @@ -249,6 +249,15 @@ func (a ArbitratorState) String() string { } } +// IsContractClosed returns a bool to indicate whether the closing/breaching tx +// has been confirmed onchain. If the state is StateContractClosed, +// StateWaitingFullResolution, or StateFullyResolved, it means the contract has +// been closed and all related contracts have been launched. +func (a ArbitratorState) IsContractClosed() bool { + return a == StateContractClosed || a == StateWaitingFullResolution || + a == StateFullyResolved +} + // resolverType is an enum that enumerates the various types of resolvers. When // writing resolvers to disk, we prepend this to the raw bytes stored. This // allows us to properly decode the resolver into the proper type. diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 10f7bc21b..83ae56c7a 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -578,9 +578,6 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error { // Set the current beat. c.beat = beat - log.Infof("ChainArbitrator starting at height %d with budget=[%v]", - &c.cfg.Budget, c.beat.Height()) - // First, we'll fetch all the channels that are still open, in order to // collect them within our set of active contracts. if err := c.loadOpenChannels(); err != nil { @@ -690,6 +687,11 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error { c.dispatchBlocks() }() + log.Infof("ChainArbitrator starting at height %d with %d chain "+ + "watchers, %d channel arbitrators, and budget config=[%v]", + c.beat.Height(), len(c.activeWatchers), len(c.activeChannels), + &c.cfg.Budget) + // TODO(roasbeef): eventually move all breach watching here return nil @@ -1061,8 +1063,8 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error chanPoint := newChan.FundingOutpoint - log.Infof("Creating new ChannelArbitrator for ChannelPoint(%v)", - chanPoint) + log.Infof("Creating new chainWatcher and ChannelArbitrator for "+ + "ChannelPoint(%v)", chanPoint) // If we're already watching this channel, then we'll ignore this // request. diff --git a/contractcourt/channel_arbitrator.go b/contractcourt/channel_arbitrator.go index 45c7b5d37..f193ee68d 100644 --- a/contractcourt/channel_arbitrator.go +++ b/contractcourt/channel_arbitrator.go @@ -2992,6 +2992,22 @@ func (c *ChannelArbitrator) handleBlockbeat(beat chainio.Blockbeat) error { // Notify we've processed the block. defer c.NotifyBlockProcessed(beat, nil) + // If the state is StateContractClosed, StateWaitingFullResolution, or + // StateFullyResolved, there's no need to read the close event channel + // and launch the resolvers since the arbitrator can only get to this + // state after processing a previous close event and launched all its + // resolvers. + if c.state.IsContractClosed() { + log.Infof("ChannelArbitrator(%v): skipping launching "+ + "resolvers in state=%v", c.cfg.ChanPoint, c.state) + + return nil + } + + // Perform a non-blocking read on the close events in case the channel + // is closed in this blockbeat. + c.receiveAndProcessCloseEvent() + // Try to advance the state if we are in StateDefault. if c.state == StateDefault { // Now that a new block has arrived, we'll attempt to advance @@ -3010,6 +3026,60 @@ func (c *ChannelArbitrator) handleBlockbeat(beat chainio.Blockbeat) error { return nil } +// receiveAndProcessCloseEvent does a non-blocking read on all the channel +// close event channels. If an event is received, it will be further processed. +func (c *ChannelArbitrator) receiveAndProcessCloseEvent() { + select { + // Received a coop close event, we now mark the channel as resolved and + // exit. + case closeInfo := <-c.cfg.ChainEvents.CooperativeClosure: + err := c.handleCoopCloseEvent(closeInfo) + if err != nil { + log.Errorf("Failed to handle coop close: %v", err) + return + } + + // We have broadcast our commitment, and it is now confirmed onchain. + case closeInfo := <-c.cfg.ChainEvents.LocalUnilateralClosure: + if c.state != StateCommitmentBroadcasted { + log.Errorf("ChannelArbitrator(%v): unexpected "+ + "local on-chain channel close", c.cfg.ChanPoint) + } + + err := c.handleLocalForceCloseEvent(closeInfo) + if err != nil { + log.Errorf("Failed to handle local force close: %v", + err) + + return + } + + // The remote party has broadcast the commitment. We'll examine our + // state to determine if we need to act at all. + case uniClosure := <-c.cfg.ChainEvents.RemoteUnilateralClosure: + err := c.handleRemoteForceCloseEvent(uniClosure) + if err != nil { + log.Errorf("Failed to handle remote force close: %v", + err) + + return + } + + // The remote has breached the channel! We now launch the breach + // contract resolvers. + case breachInfo := <-c.cfg.ChainEvents.ContractBreach: + err := c.handleContractBreach(breachInfo) + if err != nil { + log.Errorf("Failed to handle contract breach: %v", err) + return + } + + default: + log.Infof("ChannelArbitrator(%v) no close event", + c.cfg.ChanPoint) + } +} + // Name returns a human-readable string for this subsystem. // // NOTE: Part of chainio.Consumer interface.