From d3501197be646e49a763bf1a71250372c04b034b Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 22 Jul 2025 22:49:09 +0800 Subject: [PATCH] contractcourt: add a new chan `resolvedChan` to handle resolved channels Thus we can mark channels as resolved in an async way to avoid deadlock. --- contractcourt/chain_arbitrator.go | 53 +++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 83ae56c7a..02e28cb26 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -270,6 +270,11 @@ type ChainArbitrator struct { // beat is the current best known blockbeat. beat chainio.Blockbeat + // resolvedChan is used to signal that the given channel outpoint has + // been resolved onchain. Once received, chain arbitrator will perform + // cleanups. + resolvedChan chan wire.OutPoint + quit chan struct{} wg sync.WaitGroup @@ -286,6 +291,7 @@ func NewChainArbitrator(cfg ChainArbitratorConfig, activeWatchers: make(map[wire.OutPoint]*chainWatcher), chanSource: db, quit: make(chan struct{}), + resolvedChan: make(chan wire.OutPoint), } // Mount the block consumer. @@ -578,6 +584,17 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error { // Set the current beat. c.beat = beat + // Start the goroutine which listens for signals to mark the channel as + // resolved. + // + // NOTE: We must start this goroutine here we won't block the following + // channel loading. + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.resolveContracts() + }() + // First, we'll fetch all the channels that are still open, in order to // collect them within our set of active contracts. if err := c.loadOpenChannels(); err != nil { @@ -697,6 +714,32 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error { return nil } +// resolveContracts listens to the `resolvedChan` to mark a given channel as +// fully resolved. +func (c *ChainArbitrator) resolveContracts() { + for { + select { + // The channel arbitrator signals that a given channel has been + // resolved, we now update chain arbitrator's internal state for + // this channel. + case cp := <-c.resolvedChan: + if c.cfg.NotifyFullyResolvedChannel != nil { + c.cfg.NotifyFullyResolvedChannel(cp) + } + + err := c.ResolveContract(cp) + if err != nil { + log.Errorf("Failed to resolve contract for "+ + "channel %v", cp) + } + + // Exit if the chain arbitrator is shutting down. + case <-c.quit: + return + } + } +} + // dispatchBlocks consumes a block epoch notification stream and dispatches // blocks to each of the chain arb's active channel arbitrators. This function // must be run in a goroutine. @@ -762,6 +805,16 @@ func (c *ChainArbitrator) handleBlockbeat(beat chainio.Blockbeat) { c.NotifyBlockProcessed(beat, err) } +// notifyChannelResolved is used by the channel arbitrator to signal that a +// given channel has been resolved. +func (c *ChainArbitrator) notifyChannelResolved(cp wire.OutPoint) { + select { + case c.resolvedChan <- cp: + case <-c.quit: + return + } +} + // republishClosingTxs will load any stored cooperative or unilateral closing // transactions and republish them. This helps ensure propagation of the // transactions in the event that prior publications failed.