contractcourt: add a new chan resolvedChan to handle resolved channels

Thus we can mark channels as resolved in an async way to avoid deadlock.
This commit is contained in:
yyforyongyu
2025-07-22 22:49:09 +08:00
committed by Oliver Gugger
parent d6523d7490
commit 91000b9b98

View File

@@ -270,6 +270,11 @@ type ChainArbitrator struct {
// beat is the current best known blockbeat.
beat chainio.Blockbeat
// resolvedChan is used to signal that the given channel outpoint has
// been resolved onchain. Once received, chain arbitrator will perform
// cleanups.
resolvedChan chan wire.OutPoint
quit chan struct{}
wg sync.WaitGroup
@@ -286,6 +291,7 @@ func NewChainArbitrator(cfg ChainArbitratorConfig,
activeWatchers: make(map[wire.OutPoint]*chainWatcher),
chanSource: db,
quit: make(chan struct{}),
resolvedChan: make(chan wire.OutPoint),
}
// Mount the block consumer.
@@ -578,6 +584,17 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
// Set the current beat.
c.beat = beat
// Start the goroutine which listens for signals to mark the channel as
// resolved.
//
// NOTE: We must start this goroutine here we won't block the following
// channel loading.
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.resolveContracts()
}()
// First, we'll fetch all the channels that are still open, in order to
// collect them within our set of active contracts.
if err := c.loadOpenChannels(); err != nil {
@@ -697,6 +714,32 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
return nil
}
// resolveContracts listens to the `resolvedChan` to mark a given channel as
// fully resolved.
func (c *ChainArbitrator) resolveContracts() {
for {
select {
// The channel arbitrator signals that a given channel has been
// resolved, we now update chain arbitrator's internal state for
// this channel.
case cp := <-c.resolvedChan:
if c.cfg.NotifyFullyResolvedChannel != nil {
c.cfg.NotifyFullyResolvedChannel(cp)
}
err := c.ResolveContract(cp)
if err != nil {
log.Errorf("Failed to resolve contract for "+
"channel %v", cp)
}
// Exit if the chain arbitrator is shutting down.
case <-c.quit:
return
}
}
}
// dispatchBlocks consumes a block epoch notification stream and dispatches
// blocks to each of the chain arb's active channel arbitrators. This function
// must be run in a goroutine.
@@ -762,6 +805,16 @@ func (c *ChainArbitrator) handleBlockbeat(beat chainio.Blockbeat) {
c.NotifyBlockProcessed(beat, err)
}
// notifyChannelResolved is used by the channel arbitrator to signal that a
// given channel has been resolved.
func (c *ChainArbitrator) notifyChannelResolved(cp wire.OutPoint) {
select {
case c.resolvedChan <- cp:
case <-c.quit:
return
}
}
// republishClosingTxs will load any stored cooperative or unilateral closing
// transactions and republish them. This helps ensure propagation of the
// transactions in the event that prior publications failed.