diff --git a/htlcswitch/link.go b/htlcswitch/link.go index d1f27fa12..194415f4c 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1272,80 +1272,12 @@ func (l *channelLink) htlcManager(ctx context.Context) { // TODO(roasbeef): need to call wipe chan whenever D/C? - // If this isn't the first time that this channel link has been - // created, then we'll need to check to see if we need to - // re-synchronize state with the remote peer. settledHtlcs is a map of - // HTLC's that we re-settled as part of the channel state sync. - if l.cfg.SyncStates { - err := l.syncChanStates(ctx) - if err != nil { - l.handleChanSyncErr(err) - return - } - } - - // If a shutdown message has previously been sent on this link, then we - // need to make sure that we have disabled any HTLC adds on the outgoing - // direction of the link and that we re-resend the same shutdown message - // that we previously sent. - l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) { - // Immediately disallow any new outgoing HTLCs. - if !l.DisableAdds(Outgoing) { - l.log.Warnf("Outgoing link adds already disabled") - } - - // Re-send the shutdown message the peer. Since syncChanStates - // would have sent any outstanding CommitSig, it is fine for us - // to immediately queue the shutdown message now. - err := l.cfg.Peer.SendMessage(false, &shutdown) - if err != nil { - l.log.Warnf("Error sending shutdown message: %v", err) - } - }) - - // We've successfully reestablished the channel, mark it as such to - // allow the switch to forward HTLCs in the outbound direction. - l.markReestablished() - - // With the channel states synced, we now reset the mailbox to ensure - // we start processing all unacked packets in order. This is done here - // to ensure that all acknowledgments that occur during channel - // resynchronization have taken affect, causing us only to pull unacked - // packets after starting to read from the downstream mailbox. - l.mailBox.ResetPackets() - - // After cleaning up any memory pertaining to incoming packets, we now - // replay our forwarding packages to handle any htlcs that can be - // processed locally, or need to be forwarded out to the switch. We will - // only attempt to resolve packages if our short chan id indicates that - // the channel is not pending, otherwise we should have no htlcs to - // reforward. - if l.ShortChanID() != hop.Source { - err := l.resolveFwdPkgs(ctx) - switch err { - // No error was encountered, success. - case nil: - - // If the duplicate keystone error was encountered, we'll fail - // without sending an Error message to the peer. - case ErrDuplicateKeystone: - l.failf(LinkFailureError{code: ErrCircuitError}, - "temporary circuit error: %v", err) - return - - // A non-nil error was encountered, send an Error message to - // the peer. - default: - l.failf(LinkFailureError{code: ErrInternalError}, - "unable to resolve fwd pkgs: %v", err) - return - } - - // With our link's in-memory state fully reconstructed, spawn a - // goroutine to manage the reclamation of disk space occupied by - // completed forwarding packages. - l.cg.WgAdd(1) - go l.fwdPkgGarbager() + // If the link is not started for the first time, we need to take extra + // steps to resume its state. + err := l.resumeLink(ctx) + if err != nil { + l.log.Errorf("resuming link failed: %v", err) + return } // Now that we've received both channel_ready and channel reestablish, @@ -4626,3 +4558,92 @@ func (l *channelLink) toggleBatchTicker() { l.log.Trace("BatchTicker paused due to zero NumPendingUpdates" + "(Local, Remote)") } + +// resumeLink is called when starting a previous link. It will go through the +// reestablishment protocol and reforwarding packets that are yet resolved. +func (l *channelLink) resumeLink(ctx context.Context) error { + // If this isn't the first time that this channel link has been created, + // then we'll need to check to see if we need to re-synchronize state + // with the remote peer. settledHtlcs is a map of HTLC's that we + // re-settled as part of the channel state sync. + if l.cfg.SyncStates { + err := l.syncChanStates(ctx) + if err != nil { + l.handleChanSyncErr(err) + + return err + } + } + + // If a shutdown message has previously been sent on this link, then we + // need to make sure that we have disabled any HTLC adds on the outgoing + // direction of the link and that we re-resend the same shutdown message + // that we previously sent. + // + // TODO(yy): we should either move this to chanCloser, or move all + // shutdown handling logic to be managed by the link, but not a mixed of + // partial management by two subsystems. + l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) { + // Immediately disallow any new outgoing HTLCs. + if !l.DisableAdds(Outgoing) { + l.log.Warnf("Outgoing link adds already disabled") + } + + // Re-send the shutdown message the peer. Since syncChanStates + // would have sent any outstanding CommitSig, it is fine for us + // to immediately queue the shutdown message now. + err := l.cfg.Peer.SendMessage(false, &shutdown) + if err != nil { + l.log.Warnf("Error sending shutdown message: %v", err) + } + }) + + // We've successfully reestablished the channel, mark it as such to + // allow the switch to forward HTLCs in the outbound direction. + l.markReestablished() + + // With the channel states synced, we now reset the mailbox to ensure we + // start processing all unacked packets in order. This is done here to + // ensure that all acknowledgments that occur during channel + // resynchronization have taken affect, causing us only to pull unacked + // packets after starting to read from the downstream mailbox. + l.mailBox.ResetPackets() + + // If the channel is pending, there's no need to reforwarding packets. + if l.ShortChanID() == hop.Source { + return nil + } + + // After cleaning up any memory pertaining to incoming packets, we now + // replay our forwarding packages to handle any htlcs that can be + // processed locally, or need to be forwarded out to the switch. We will + // only attempt to resolve packages if our short chan id indicates that + // the channel is not pending, otherwise we should have no htlcs to + // reforward. + err := l.resolveFwdPkgs(ctx) + switch err { + // No error was encountered, success. + case nil: + // With our link's in-memory state fully reconstructed, spawn a + // goroutine to manage the reclamation of disk space occupied by + // completed forwarding packages. + l.cg.WgAdd(1) + go l.fwdPkgGarbager() + + return nil + + // If the duplicate keystone error was encountered, we'll fail + // without sending an Error message to the peer. + case ErrDuplicateKeystone: + l.failf(LinkFailureError{code: ErrCircuitError}, + "temporary circuit error: %v", err) + + // A non-nil error was encountered, send an Error message to + // the peer. + default: + l.failf(LinkFailureError{code: ErrInternalError}, + "unable to resolve fwd pkgs: %v", err) + } + + return err +}