diff --git a/channeldb/forwarding_package.go b/channeldb/forwarding_package.go index 11c0099e7..c393a53b3 100644 --- a/channeldb/forwarding_package.go +++ b/channeldb/forwarding_package.go @@ -248,6 +248,11 @@ type FwdPkg struct { // FwdFilter is a filter containing the indices of all Adds that were // forwarded to the switch. + // + // NOTE: This value signals when persisted to disk that the fwd package + // has been processed and garbage collection can happen. So it also + // has to be set for packages with no adds (empty packages or only + // settle/fail packages) so that they can be garbage collected as well. FwdFilter *PkgFilter // AckFilter is a filter containing the indices of all Adds for which @@ -678,24 +683,31 @@ func loadFwdPkg(fwdPkgBkt kvdb.RBucket, source lnwire.ShortChannelID, SettleFailFilter: settleFailFilter, } - // Check to see if we have written the set exported filter adds to - // disk. If we haven't, processing of this package was never started, or - // failed during the last attempt. + // Check if the forward filter has been persisted to disk. + // This indicates whether the Adds in this package have been processed. + // + // NOTE: We also expect packages with no Adds (settle/fail only packages + // or empty packages) to have the fwd filter set to signal that the + // packages have been processed. fwdFilterBytes := heightBkt.Get(fwdFilterKey) + + // Handle packages with Adds that haven't been processed yet. if fwdFilterBytes == nil { + // Create a new forward filter for the unprocessed Adds. nAdds := uint16(len(adds)) fwdPkg.FwdFilter = NewPkgFilter(nAdds) + return fwdPkg, nil } + // Load the existing forward filter from disk. fwdFilterReader := bytes.NewReader(fwdFilterBytes) fwdPkg.FwdFilter = &PkgFilter{} if err := fwdPkg.FwdFilter.Decode(fwdFilterReader); err != nil { return nil, err } - // Otherwise, a complete round of processing was completed, and we - // advance the package to FwdStateProcessed. + // Mark the package as processed since the forward filter exists. fwdPkg.State = FwdStateProcessed // If every add, settle, and fail has been fully acknowledged, we can diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 3aa47caf3..33551fa3a 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -3747,15 +3747,12 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg) { // whether we are reprocessing as a result of a failure or restart. Adds that // have already been acknowledged in the forwarding package will be ignored. // +// NOTE: This function needs also be called for fwd packages with no ADDs +// because it marks the fwdPkg as processed by writing the FwdFilter into the +// database. +// //nolint:funlen func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) { - // Exit early if there are no adds to process. - if len(fwdPkg.Adds) == 0 { - l.log.Trace("fwd package has no adds to process exiting early") - - return - } - // Exit early if the fwdPkg is already processed. if fwdPkg.State == channeldb.FwdStateCompleted { l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)