mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-04-06 02:58:03 +02:00
contractcourt: refactor start up of arbitrators
We decouple the state machine of the channel arbitrator from the start-up process so that we can startup the whole daemon reliably.
This commit is contained in:
parent
48fba10562
commit
17bc8827c5
@ -482,6 +482,20 @@ func (c *ChannelArbitrator) Start(state *chanArbStartState) error {
|
||||
return err
|
||||
}
|
||||
|
||||
c.wg.Add(1)
|
||||
go c.channelAttendant(bestHeight, state.commitSet)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// progressStateMachineAfterRestart attempts to progress the state machine
|
||||
// after a restart. This makes sure that if the state transition failed, we
|
||||
// will try to progress the state machine again. Moreover it will relaunch
|
||||
// resolvers if the channel is still in the pending close state and has not
|
||||
// been fully resolved yet.
|
||||
func (c *ChannelArbitrator) progressStateMachineAfterRestart(bestHeight int32,
|
||||
commitSet *CommitSet) error {
|
||||
|
||||
// If the channel has been marked pending close in the database, and we
|
||||
// haven't transitioned the state machine to StateContractClosed (or a
|
||||
// succeeding state), then a state transition most likely failed. We'll
|
||||
@ -527,7 +541,7 @@ func (c *ChannelArbitrator) Start(state *chanArbStartState) error {
|
||||
// on-chain state, and our set of active contracts.
|
||||
startingState := c.state
|
||||
nextState, _, err := c.advanceState(
|
||||
triggerHeight, trigger, state.commitSet,
|
||||
triggerHeight, trigger, commitSet,
|
||||
)
|
||||
if err != nil {
|
||||
switch err {
|
||||
@ -564,14 +578,12 @@ func (c *ChannelArbitrator) Start(state *chanArbStartState) error {
|
||||
// receive a chain event from the chain watcher that the
|
||||
// commitment has been confirmed on chain, and before we
|
||||
// advance our state step, we call InsertConfirmedCommitSet.
|
||||
err := c.relaunchResolvers(state.commitSet, triggerHeight)
|
||||
err := c.relaunchResolvers(commitSet, triggerHeight)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
c.wg.Add(1)
|
||||
go c.channelAttendant(bestHeight)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -2716,13 +2728,28 @@ func (c *ChannelArbitrator) updateActiveHTLCs() {
|
||||
// Nursery for incubation, and ultimate sweeping.
|
||||
//
|
||||
// NOTE: This MUST be run as a goroutine.
|
||||
func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
|
||||
//
|
||||
//nolint:funlen
|
||||
func (c *ChannelArbitrator) channelAttendant(bestHeight int32,
|
||||
commitSet *CommitSet) {
|
||||
|
||||
// TODO(roasbeef): tell top chain arb we're done
|
||||
defer func() {
|
||||
c.wg.Done()
|
||||
}()
|
||||
|
||||
err := c.progressStateMachineAfterRestart(bestHeight, commitSet)
|
||||
if err != nil {
|
||||
// In case of an error, we return early but we do not shutdown
|
||||
// LND, because there might be other channels that still can be
|
||||
// resolved and we don't want to interfere with that.
|
||||
// We continue to run the channel attendant in case the channel
|
||||
// closes via other means for example the remote pary force
|
||||
// closes the channel. So we log the error and continue.
|
||||
log.Errorf("Unable to progress state machine after "+
|
||||
"restart: %v", err)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"github.com/lightningnetwork/lnd/input"
|
||||
"github.com/lightningnetwork/lnd/kvdb"
|
||||
"github.com/lightningnetwork/lnd/lntest/mock"
|
||||
"github.com/lightningnetwork/lnd/lntest/wait"
|
||||
"github.com/lightningnetwork/lnd/lntypes"
|
||||
"github.com/lightningnetwork/lnd/lnwallet"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
@ -1045,10 +1046,19 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) {
|
||||
|
||||
// Post restart, it should be the case that our resolver was properly
|
||||
// supplemented, and we only have a single resolver in the final set.
|
||||
if len(chanArb.activeResolvers) != 1 {
|
||||
t.Fatalf("expected single resolver, instead got: %v",
|
||||
len(chanArb.activeResolvers))
|
||||
}
|
||||
// The resolvers are added concurrently so we need to wait here.
|
||||
err = wait.NoError(func() error {
|
||||
chanArb.activeResolversLock.Lock()
|
||||
defer chanArb.activeResolversLock.Unlock()
|
||||
|
||||
if len(chanArb.activeResolvers) != 1 {
|
||||
return fmt.Errorf("expected single resolver, instead "+
|
||||
"got: %v", len(chanArb.activeResolvers))
|
||||
}
|
||||
|
||||
return nil
|
||||
}, defaultTimeout)
|
||||
require.NoError(t, err)
|
||||
|
||||
// We'll now examine the in-memory state of the active resolvers to
|
||||
// ensure t hey were populated properly.
|
||||
@ -3000,9 +3010,12 @@ func TestChannelArbitratorStartForceCloseFail(t *testing.T) {
|
||||
{
|
||||
name: "Commitment is rejected with an " +
|
||||
"unmatched error",
|
||||
broadcastErr: fmt.Errorf("Reject Commitment Tx"),
|
||||
expectedState: StateBroadcastCommit,
|
||||
expectedStartup: false,
|
||||
broadcastErr: fmt.Errorf("Reject Commitment Tx"),
|
||||
expectedState: StateBroadcastCommit,
|
||||
// We should still be able to start up since we other
|
||||
// channels might be closing as well and we should
|
||||
// resolve the contracts.
|
||||
expectedStartup: true,
|
||||
},
|
||||
|
||||
// We started after the DLP was triggered, and try to force
|
||||
|
Loading…
x
Reference in New Issue
Block a user