contractcourt: process channel close event on new beat

This commit is contained in:
yyforyongyu 2024-11-14 01:39:36 +08:00
parent c5b3033427
commit a6d3a0fa99
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
3 changed files with 86 additions and 5 deletions

View File

@ -249,6 +249,15 @@ func (a ArbitratorState) String() string {
}
}
// IsContractClosed returns a bool to indicate whether the closing/breaching tx
// has been confirmed onchain. If the state is StateContractClosed,
// StateWaitingFullResolution, or StateFullyResolved, it means the contract has
// been closed and all related contracts have been launched.
func (a ArbitratorState) IsContractClosed() bool {
return a == StateContractClosed || a == StateWaitingFullResolution ||
a == StateFullyResolved
}
// resolverType is an enum that enumerates the various types of resolvers. When
// writing resolvers to disk, we prepend this to the raw bytes stored. This
// allows us to properly decode the resolver into the proper type.

View File

@ -578,9 +578,6 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
// Set the current beat.
c.beat = beat
log.Infof("ChainArbitrator starting at height %d with budget=[%v]",
&c.cfg.Budget, c.beat.Height())
// First, we'll fetch all the channels that are still open, in order to
// collect them within our set of active contracts.
if err := c.loadOpenChannels(); err != nil {
@ -690,6 +687,11 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
c.dispatchBlocks()
}()
log.Infof("ChainArbitrator starting at height %d with %d chain "+
"watchers, %d channel arbitrators, and budget config=[%v]",
c.beat.Height(), len(c.activeWatchers), len(c.activeChannels),
&c.cfg.Budget)
// TODO(roasbeef): eventually move all breach watching here
return nil
@ -1061,8 +1063,8 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error
chanPoint := newChan.FundingOutpoint
log.Infof("Creating new ChannelArbitrator for ChannelPoint(%v)",
chanPoint)
log.Infof("Creating new chainWatcher and ChannelArbitrator for "+
"ChannelPoint(%v)", chanPoint)
// If we're already watching this channel, then we'll ignore this
// request.

View File

@ -2992,6 +2992,22 @@ func (c *ChannelArbitrator) handleBlockbeat(beat chainio.Blockbeat) error {
// Notify we've processed the block.
defer c.NotifyBlockProcessed(beat, nil)
// If the state is StateContractClosed, StateWaitingFullResolution, or
// StateFullyResolved, there's no need to read the close event channel
// and launch the resolvers since the arbitrator can only get to this
// state after processing a previous close event and launched all its
// resolvers.
if c.state.IsContractClosed() {
log.Infof("ChannelArbitrator(%v): skipping launching "+
"resolvers in state=%v", c.cfg.ChanPoint, c.state)
return nil
}
// Perform a non-blocking read on the close events in case the channel
// is closed in this blockbeat.
c.receiveAndProcessCloseEvent()
// Try to advance the state if we are in StateDefault.
if c.state == StateDefault {
// Now that a new block has arrived, we'll attempt to advance
@ -3010,6 +3026,60 @@ func (c *ChannelArbitrator) handleBlockbeat(beat chainio.Blockbeat) error {
return nil
}
// receiveAndProcessCloseEvent does a non-blocking read on all the channel
// close event channels. If an event is received, it will be further processed.
func (c *ChannelArbitrator) receiveAndProcessCloseEvent() {
select {
// Received a coop close event, we now mark the channel as resolved and
// exit.
case closeInfo := <-c.cfg.ChainEvents.CooperativeClosure:
err := c.handleCoopCloseEvent(closeInfo)
if err != nil {
log.Errorf("Failed to handle coop close: %v", err)
return
}
// We have broadcast our commitment, and it is now confirmed onchain.
case closeInfo := <-c.cfg.ChainEvents.LocalUnilateralClosure:
if c.state != StateCommitmentBroadcasted {
log.Errorf("ChannelArbitrator(%v): unexpected "+
"local on-chain channel close", c.cfg.ChanPoint)
}
err := c.handleLocalForceCloseEvent(closeInfo)
if err != nil {
log.Errorf("Failed to handle local force close: %v",
err)
return
}
// The remote party has broadcast the commitment. We'll examine our
// state to determine if we need to act at all.
case uniClosure := <-c.cfg.ChainEvents.RemoteUnilateralClosure:
err := c.handleRemoteForceCloseEvent(uniClosure)
if err != nil {
log.Errorf("Failed to handle remote force close: %v",
err)
return
}
// The remote has breached the channel! We now launch the breach
// contract resolvers.
case breachInfo := <-c.cfg.ChainEvents.ContractBreach:
err := c.handleContractBreach(breachInfo)
if err != nil {
log.Errorf("Failed to handle contract breach: %v", err)
return
}
default:
log.Infof("ChannelArbitrator(%v) no close event",
c.cfg.ChanPoint)
}
}
// Name returns a human-readable string for this subsystem.
//
// NOTE: Part of chainio.Consumer interface.