htlcswitch: add handler resumeLink

This commit is contained in:
yyforyongyu
2025-06-30 21:07:32 +08:00
parent ca770bb4af
commit 3af8281370

View File

@@ -1272,80 +1272,12 @@ func (l *channelLink) htlcManager(ctx context.Context) {
// TODO(roasbeef): need to call wipe chan whenever D/C?
// If this isn't the first time that this channel link has been
// created, then we'll need to check to see if we need to
// re-synchronize state with the remote peer. settledHtlcs is a map of
// HTLC's that we re-settled as part of the channel state sync.
if l.cfg.SyncStates {
err := l.syncChanStates(ctx)
if err != nil {
l.handleChanSyncErr(err)
return
}
}
// If a shutdown message has previously been sent on this link, then we
// need to make sure that we have disabled any HTLC adds on the outgoing
// direction of the link and that we re-resend the same shutdown message
// that we previously sent.
l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) {
// Immediately disallow any new outgoing HTLCs.
if !l.DisableAdds(Outgoing) {
l.log.Warnf("Outgoing link adds already disabled")
}
// Re-send the shutdown message the peer. Since syncChanStates
// would have sent any outstanding CommitSig, it is fine for us
// to immediately queue the shutdown message now.
err := l.cfg.Peer.SendMessage(false, &shutdown)
if err != nil {
l.log.Warnf("Error sending shutdown message: %v", err)
}
})
// We've successfully reestablished the channel, mark it as such to
// allow the switch to forward HTLCs in the outbound direction.
l.markReestablished()
// With the channel states synced, we now reset the mailbox to ensure
// we start processing all unacked packets in order. This is done here
// to ensure that all acknowledgments that occur during channel
// resynchronization have taken affect, causing us only to pull unacked
// packets after starting to read from the downstream mailbox.
l.mailBox.ResetPackets()
// After cleaning up any memory pertaining to incoming packets, we now
// replay our forwarding packages to handle any htlcs that can be
// processed locally, or need to be forwarded out to the switch. We will
// only attempt to resolve packages if our short chan id indicates that
// the channel is not pending, otherwise we should have no htlcs to
// reforward.
if l.ShortChanID() != hop.Source {
err := l.resolveFwdPkgs(ctx)
switch err {
// No error was encountered, success.
case nil:
// If the duplicate keystone error was encountered, we'll fail
// without sending an Error message to the peer.
case ErrDuplicateKeystone:
l.failf(LinkFailureError{code: ErrCircuitError},
"temporary circuit error: %v", err)
return
// A non-nil error was encountered, send an Error message to
// the peer.
default:
l.failf(LinkFailureError{code: ErrInternalError},
"unable to resolve fwd pkgs: %v", err)
return
}
// With our link's in-memory state fully reconstructed, spawn a
// goroutine to manage the reclamation of disk space occupied by
// completed forwarding packages.
l.cg.WgAdd(1)
go l.fwdPkgGarbager()
// If the link is not started for the first time, we need to take extra
// steps to resume its state.
err := l.resumeLink(ctx)
if err != nil {
l.log.Errorf("resuming link failed: %v", err)
return
}
// Now that we've received both channel_ready and channel reestablish,
@@ -4626,3 +4558,92 @@ func (l *channelLink) toggleBatchTicker() {
l.log.Trace("BatchTicker paused due to zero NumPendingUpdates" +
"(Local, Remote)")
}
// resumeLink is called when starting a previous link. It will go through the
// reestablishment protocol and reforwarding packets that are yet resolved.
func (l *channelLink) resumeLink(ctx context.Context) error {
// If this isn't the first time that this channel link has been created,
// then we'll need to check to see if we need to re-synchronize state
// with the remote peer. settledHtlcs is a map of HTLC's that we
// re-settled as part of the channel state sync.
if l.cfg.SyncStates {
err := l.syncChanStates(ctx)
if err != nil {
l.handleChanSyncErr(err)
return err
}
}
// If a shutdown message has previously been sent on this link, then we
// need to make sure that we have disabled any HTLC adds on the outgoing
// direction of the link and that we re-resend the same shutdown message
// that we previously sent.
//
// TODO(yy): we should either move this to chanCloser, or move all
// shutdown handling logic to be managed by the link, but not a mixed of
// partial management by two subsystems.
l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) {
// Immediately disallow any new outgoing HTLCs.
if !l.DisableAdds(Outgoing) {
l.log.Warnf("Outgoing link adds already disabled")
}
// Re-send the shutdown message the peer. Since syncChanStates
// would have sent any outstanding CommitSig, it is fine for us
// to immediately queue the shutdown message now.
err := l.cfg.Peer.SendMessage(false, &shutdown)
if err != nil {
l.log.Warnf("Error sending shutdown message: %v", err)
}
})
// We've successfully reestablished the channel, mark it as such to
// allow the switch to forward HTLCs in the outbound direction.
l.markReestablished()
// With the channel states synced, we now reset the mailbox to ensure we
// start processing all unacked packets in order. This is done here to
// ensure that all acknowledgments that occur during channel
// resynchronization have taken affect, causing us only to pull unacked
// packets after starting to read from the downstream mailbox.
l.mailBox.ResetPackets()
// If the channel is pending, there's no need to reforwarding packets.
if l.ShortChanID() == hop.Source {
return nil
}
// After cleaning up any memory pertaining to incoming packets, we now
// replay our forwarding packages to handle any htlcs that can be
// processed locally, or need to be forwarded out to the switch. We will
// only attempt to resolve packages if our short chan id indicates that
// the channel is not pending, otherwise we should have no htlcs to
// reforward.
err := l.resolveFwdPkgs(ctx)
switch err {
// No error was encountered, success.
case nil:
// With our link's in-memory state fully reconstructed, spawn a
// goroutine to manage the reclamation of disk space occupied by
// completed forwarding packages.
l.cg.WgAdd(1)
go l.fwdPkgGarbager()
return nil
// If the duplicate keystone error was encountered, we'll fail
// without sending an Error message to the peer.
case ErrDuplicateKeystone:
l.failf(LinkFailureError{code: ErrCircuitError},
"temporary circuit error: %v", err)
// A non-nil error was encountered, send an Error message to
// the peer.
default:
l.failf(LinkFailureError{code: ErrInternalError},
"unable to resolve fwd pkgs: %v", err)
}
return err
}