mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-09-13 18:10:25 +02:00
contractcourt: add a new chan resolvedChan
to handle resolved channels
Thus we can mark channels as resolved in an async way to avoid deadlock.
This commit is contained in:
@@ -270,6 +270,11 @@ type ChainArbitrator struct {
|
|||||||
// beat is the current best known blockbeat.
|
// beat is the current best known blockbeat.
|
||||||
beat chainio.Blockbeat
|
beat chainio.Blockbeat
|
||||||
|
|
||||||
|
// resolvedChan is used to signal that the given channel outpoint has
|
||||||
|
// been resolved onchain. Once received, chain arbitrator will perform
|
||||||
|
// cleanups.
|
||||||
|
resolvedChan chan wire.OutPoint
|
||||||
|
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
|
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
@@ -286,6 +291,7 @@ func NewChainArbitrator(cfg ChainArbitratorConfig,
|
|||||||
activeWatchers: make(map[wire.OutPoint]*chainWatcher),
|
activeWatchers: make(map[wire.OutPoint]*chainWatcher),
|
||||||
chanSource: db,
|
chanSource: db,
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
|
resolvedChan: make(chan wire.OutPoint),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mount the block consumer.
|
// Mount the block consumer.
|
||||||
@@ -578,6 +584,17 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
|
|||||||
// Set the current beat.
|
// Set the current beat.
|
||||||
c.beat = beat
|
c.beat = beat
|
||||||
|
|
||||||
|
// Start the goroutine which listens for signals to mark the channel as
|
||||||
|
// resolved.
|
||||||
|
//
|
||||||
|
// NOTE: We must start this goroutine here we won't block the following
|
||||||
|
// channel loading.
|
||||||
|
c.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer c.wg.Done()
|
||||||
|
c.resolveContracts()
|
||||||
|
}()
|
||||||
|
|
||||||
// First, we'll fetch all the channels that are still open, in order to
|
// First, we'll fetch all the channels that are still open, in order to
|
||||||
// collect them within our set of active contracts.
|
// collect them within our set of active contracts.
|
||||||
if err := c.loadOpenChannels(); err != nil {
|
if err := c.loadOpenChannels(); err != nil {
|
||||||
@@ -697,6 +714,32 @@ func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// resolveContracts listens to the `resolvedChan` to mark a given channel as
|
||||||
|
// fully resolved.
|
||||||
|
func (c *ChainArbitrator) resolveContracts() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
// The channel arbitrator signals that a given channel has been
|
||||||
|
// resolved, we now update chain arbitrator's internal state for
|
||||||
|
// this channel.
|
||||||
|
case cp := <-c.resolvedChan:
|
||||||
|
if c.cfg.NotifyFullyResolvedChannel != nil {
|
||||||
|
c.cfg.NotifyFullyResolvedChannel(cp)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := c.ResolveContract(cp)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Failed to resolve contract for "+
|
||||||
|
"channel %v", cp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exit if the chain arbitrator is shutting down.
|
||||||
|
case <-c.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// dispatchBlocks consumes a block epoch notification stream and dispatches
|
// dispatchBlocks consumes a block epoch notification stream and dispatches
|
||||||
// blocks to each of the chain arb's active channel arbitrators. This function
|
// blocks to each of the chain arb's active channel arbitrators. This function
|
||||||
// must be run in a goroutine.
|
// must be run in a goroutine.
|
||||||
@@ -762,6 +805,16 @@ func (c *ChainArbitrator) handleBlockbeat(beat chainio.Blockbeat) {
|
|||||||
c.NotifyBlockProcessed(beat, err)
|
c.NotifyBlockProcessed(beat, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// notifyChannelResolved is used by the channel arbitrator to signal that a
|
||||||
|
// given channel has been resolved.
|
||||||
|
func (c *ChainArbitrator) notifyChannelResolved(cp wire.OutPoint) {
|
||||||
|
select {
|
||||||
|
case c.resolvedChan <- cp:
|
||||||
|
case <-c.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// republishClosingTxs will load any stored cooperative or unilateral closing
|
// republishClosingTxs will load any stored cooperative or unilateral closing
|
||||||
// transactions and republish them. This helps ensure propagation of the
|
// transactions and republish them. This helps ensure propagation of the
|
||||||
// transactions in the event that prior publications failed.
|
// transactions in the event that prior publications failed.
|
||||||
|
Reference in New Issue
Block a user