contractcourt/channel_arbitrator: only act on close signal from chain_watcher

This commit changes the channel arbitrator state machine to only care
about commitment transactions that are being confirmed on-chain
according to the chain_watcher. This is meant to handles the cases where
we would broadcast our commitment, expecting it to get confirmed, but
instead a competing transaction was confirmed.

This commit readies the ChannelArbitrator state machine for the change
that will make the ChainWatcher only notify on confirmed commitments.

The state machine has gotten a new state, StateCommitmentBroadcasted,
which we'll transition to after we have broadcasted our own commitment.
From this state we'll go to the StateContractClosed state regardless of
which commitment the ChainWatcher notifies about, unifying the contract
resolution betweee the local and remote force close.
This commit is contained in:
Johan T. Halseth
2018-04-03 13:34:07 +02:00
parent a60e621b5f
commit d2d87758f7

View File

@@ -319,14 +319,18 @@ const (
// being attached. // being attached.
chainTrigger transitionTrigger = iota chainTrigger transitionTrigger = iota
// remotePeerTrigger is a transition trigger driven by actions of the
// remote peer.
remotePeerTrigger
// userTrigger is a transition trigger driven by user action. Examples // userTrigger is a transition trigger driven by user action. Examples
// of such a trigger include a user requesting a forgive closure of the // of such a trigger include a user requesting a force closure of the
// channel. // channel.
userTrigger userTrigger
// remoteCloseTrigger is a transition trigger driven by the remote
// peer's commitment being confirmed.
remoteCloseTrigger
// localCloseTrigger is a transition trigger driven by our commitment
// being confirmed.
localCloseTrigger
) )
// String returns a human readable string describing the passed // String returns a human readable string describing the passed
@@ -336,12 +340,15 @@ func (t transitionTrigger) String() string {
case chainTrigger: case chainTrigger:
return "chainTrigger" return "chainTrigger"
case remotePeerTrigger: case remoteCloseTrigger:
return "remotePeerTrigger" return "remoteCloseTrigger"
case userTrigger: case userTrigger:
return "userTrigger" return "userTrigger"
case localCloseTrigger:
return "localCloseTrigger"
default: default:
return "unknown trigger" return "unknown trigger"
} }
@@ -407,10 +414,16 @@ func (c *ChannelArbitrator) stateStep(triggerHeight uint32,
case userTrigger: case userTrigger:
nextState = StateBroadcastCommit nextState = StateBroadcastCommit
// Otherwise, if this state advance was triggered by the remote // Otherwise, if this state advance was triggered by a
// peer, then we'll jump straight to the state where the // commitment being confirmed on chain, then we'll jump
// contract has already been closed. // straight to the state where the contract has already been
case remotePeerTrigger: // closed.
case localCloseTrigger:
log.Errorf("ChannelArbitrator(%v): unexpected local "+
"commitment confirmed while in StateDefault",
c.cfg.ChanPoint)
fallthrough
case remoteCloseTrigger:
nextState = StateContractClosed nextState = StateContractClosed
} }
@@ -454,26 +467,6 @@ func (c *ChannelArbitrator) stateStep(triggerHeight uint32,
} }
} }
// As we've have broadcast the commitment transaction, we send
// out commitment output for incubation, but only if it wasn't
// trimmed. We'll need to wait for a CSV timeout before we can
// reclaim the funds.
if closeSummary.CommitResolution != nil {
log.Infof("ChannelArbitrator(%v): sending commit "+
"output for incubation", c.cfg.ChanPoint)
err = c.cfg.IncubateOutputs(
c.cfg.ChanPoint, closeSummary.CommitResolution,
nil, nil,
)
if err != nil {
// TODO(roasbeef): check for AlreadyExists errors
log.Errorf("unable to incubate commitment "+
"output: %v", err)
return StateError, closeTx, err
}
}
contractRes := ContractResolutions{ contractRes := ContractResolutions{
CommitHash: closeTx.TxHash(), CommitHash: closeTx.TxHash(),
CommitResolution: closeSummary.CommitResolution, CommitResolution: closeSummary.CommitResolution,
@@ -491,23 +484,32 @@ func (c *ChannelArbitrator) stateStep(triggerHeight uint32,
return StateError, closeTx, err return StateError, closeTx, err
} }
// With the channel force closed, we'll now log our // We go to the StateCommitmentBroadcasted state, where we'll
// resolutions, then advance our state forward. // be waiting for the commitment to be confirmed.
log.Infof("ChannelArbitrator(%v): logging contract "+ nextState = StateCommitmentBroadcasted
"resolutions: commit=%v, num_htlcs=%v",
c.cfg.ChanPoint,
closeSummary.CommitResolution != nil,
len(closeSummary.HtlcResolutions.IncomingHTLCs)+
len(closeSummary.HtlcResolutions.OutgoingHTLCs))
err = c.log.LogContractResolutions(&contractRes) // In this state we have broadcasted our own commitment, and will need
if err != nil { // to wait for a commitment (not necessarily the one we broadcasted!)
log.Errorf("unable to write resolutions: %v", err) // to be confirmed.
return StateError, closeTx, err case StateCommitmentBroadcasted:
switch trigger {
// We are waiting for a commitment to be confirmed, so any
// other trigger will be ignored.
case chainTrigger, userTrigger:
log.Infof("ChannelArbitrator(%v): noop state %v",
c.cfg.ChanPoint, trigger)
nextState = StateCommitmentBroadcasted
// If this state advance was triggered by any of the
// commitments being confirmed, then we'll jump to the state
// where the contract has been closed.
case localCloseTrigger, remoteCloseTrigger:
log.Infof("ChannelArbitrator(%v): state %v, "+
" going to StateContractClosed",
c.cfg.ChanPoint, trigger)
nextState = StateContractClosed
} }
nextState = StateContractClosed
// If we're in this state, then the contract has been fully closed to // If we're in this state, then the contract has been fully closed to
// outside sub-systems, so we'll process the prior set of on-chain // outside sub-systems, so we'll process the prior set of on-chain
// contract actions and launch a set of resolvers. // contract actions and launch a set of resolvers.
@@ -537,6 +539,27 @@ func (c *ChannelArbitrator) stateStep(triggerHeight uint32,
break break
} }
// If we've have broadcast the commitment transaction, we send
// our commitment output for incubation, but only if it wasn't
// trimmed. We'll need to wait for a CSV timeout before we can
// reclaim the funds.
commitRes := contractResolutions.CommitResolution
if commitRes != nil && commitRes.MaturityDelay > 0 {
log.Infof("ChannelArbitrator(%v): sending commit "+
"output for incubation", c.cfg.ChanPoint)
err = c.cfg.IncubateOutputs(
c.cfg.ChanPoint, commitRes,
nil, nil,
)
if err != nil {
// TODO(roasbeef): check for AlreadyExists errors
log.Errorf("unable to incubate commitment "+
"output: %v", err)
return StateError, closeTx, err
}
}
// Now that we know we'll need to act, we'll process the htlc // Now that we know we'll need to act, we'll process the htlc
// actions, wen create the structures we need to resolve all // actions, wen create the structures we need to resolve all
// outstanding contracts. // outstanding contracts.
@@ -629,13 +652,13 @@ func (c *ChannelArbitrator) advanceState(triggerHeight uint32,
forceCloseTx *wire.MsgTx forceCloseTx *wire.MsgTx
) )
log.Tracef("ChannelArbitrator(%v): attempting state step with "+
"trigger=%v", c.cfg.ChanPoint, trigger)
// We'll continue to advance our state forward until the state we // We'll continue to advance our state forward until the state we
// transition to is that same state that we started at. // transition to is that same state that we started at.
for { for {
priorState = c.state priorState = c.state
log.Tracef("ChannelArbitrator(%v): attempting state step with "+
"trigger=%v from state=%v", c.cfg.ChanPoint, trigger,
priorState)
nextState, closeTx, err := c.stateStep( nextState, closeTx, err := c.stateStep(
triggerHeight, trigger, triggerHeight, trigger,
@@ -1348,20 +1371,67 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
}), }),
) )
// We've cooperatively closed the channel, so we're no longer // We've cooperatively closed the channel, so we're no longer
// needed. // needed.
case <-c.cfg.ChainEvents.CooperativeClosure: case <-c.cfg.ChainEvents.CooperativeClosure:
log.Infof("ChannelArbitrator(%v) closing due to co-op "+ log.Infof("ChannelArbitrator(%v) closing due to co-op "+
"closure", c.cfg.ChanPoint) "closure", c.cfg.ChanPoint)
return return
// We have broadcasted our commitment, and it is now confirmed
// on-chain.
case closeInfo := <-c.cfg.ChainEvents.LocalUnilateralClosure:
log.Infof("ChannelArbitrator(%v): local on-chain "+
"channel close", c.cfg.ChanPoint)
if c.state != StateCommitmentBroadcasted {
log.Errorf("ChannelArbitrator(%v): unexpected "+
"local on-chain channel close",
c.cfg.ChanPoint)
}
closeTx := closeInfo.CloseTx
contractRes := &ContractResolutions{
CommitHash: closeTx.TxHash(),
CommitResolution: closeInfo.CommitResolution,
HtlcResolutions: *closeInfo.HtlcResolutions,
}
// When processing a unilateral close event, we'll
// transition directly to the ContractClosed state.
// When the state machine reaches that state, we'll log
// out the set of resolutions.
stateCb := func(nextState ArbitratorState) error {
if nextState != StateContractClosed {
return nil
}
err := c.log.LogContractResolutions(
contractRes,
)
if err != nil {
return fmt.Errorf("unable to "+
"write resolutions: %v",
err)
}
return nil
}
// We'll now advance our state machine until it reaches
// a terminal state.
_, _, err := c.advanceState(
uint32(closeInfo.SpendingHeight),
localCloseTrigger, stateCb,
)
if err != nil {
log.Errorf("unable to advance state: %v", err)
}
// The remote party has broadcast the commitment on-chain. // The remote party has broadcast the commitment on-chain.
// We'll examine our state to determine if we need to act at // We'll examine our state to determine if we need to act at
// all. // all.
case uniClosure := <-c.cfg.ChainEvents.RemoteUnilateralClosure: case uniClosure := <-c.cfg.ChainEvents.RemoteUnilateralClosure:
if c.state != StateDefault {
continue
}
log.Infof("ChannelArbitrator(%v): remote party has "+ log.Infof("ChannelArbitrator(%v): remote party has "+
"closed channel out on-chain", c.cfg.ChanPoint) "closed channel out on-chain", c.cfg.ChanPoint)
@@ -1384,19 +1454,21 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
// present on their commitment. // present on their commitment.
c.activeHTLCs = newHtlcSet(uniClosure.RemoteCommit.Htlcs) c.activeHTLCs = newHtlcSet(uniClosure.RemoteCommit.Htlcs)
// When processing a remote party initiated event, // When processing a unilateral close event, we'll
// we'll skip the BroadcastCommit state, and transition // transition directly to the ContractClosed state.
// directly to the ContractClosed state. As a result, // When the state machine reaches that state, we'll log
// we'll now manually log out set of resolutions. // out the set of resolutions.
stateCb := func(nextState ArbitratorState) error { stateCb := func(nextState ArbitratorState) error {
if nextState == StateContractClosed { if nextState != StateContractClosed {
err := c.log.LogContractResolutions( return nil
contractRes, }
)
if err != nil { err := c.log.LogContractResolutions(
return fmt.Errorf("unable to write "+ contractRes,
"resolutions: %v", err) )
} if err != nil {
return fmt.Errorf("unable to write "+
"resolutions: %v", err)
} }
return nil return nil
@@ -1406,7 +1478,7 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
// a terminal state. // a terminal state.
_, _, err := c.advanceState( _, _, err := c.advanceState(
uint32(uniClosure.SpendingHeight), uint32(uniClosure.SpendingHeight),
remotePeerTrigger, stateCb, remoteCloseTrigger, stateCb,
) )
if err != nil { if err != nil {
log.Errorf("unable to advance state: %v", err) log.Errorf("unable to advance state: %v", err)