channeldb: update reject cache

To take block heights too.
This commit is contained in:
Elle Mouton 2023-11-06 12:36:24 +02:00
parent 84abc0a311
commit b6410bf65d
No known key found for this signature in database
GPG Key ID: D7D916376026F177
5 changed files with 136 additions and 54 deletions

@ -1162,10 +1162,10 @@ func (c *ChannelGraph) HasChannelEdge(
// We'll query the cache with the shared lock held to allow multiple
// readers to access values in the cache concurrently if they exist.
c.cacheMu.RLock()
if entry, ok := c.rejectCache.get(chanID); ok {
if entry, ok := c.rejectCache.get(chanID); ok && entry.times != nil {
c.cacheMu.RUnlock()
upd1Time = time.Unix(entry.upd1Time, 0)
upd2Time = time.Unix(entry.upd2Time, 0)
upd1Time = time.Unix(entry.times.upd1Time, 0)
upd2Time = time.Unix(entry.times.upd2Time, 0)
exists, isZombie = entry.flags.unpack()
return upd1Time, upd2Time, exists, isZombie, nil
}
@ -1177,9 +1177,9 @@ func (c *ChannelGraph) HasChannelEdge(
// The item was not found with the shared lock, so we'll acquire the
// exclusive lock and check the cache again in case another method added
// the entry to the cache while no lock was held.
if entry, ok := c.rejectCache.get(chanID); ok {
upd1Time = time.Unix(entry.upd1Time, 0)
upd2Time = time.Unix(entry.upd2Time, 0)
if entry, ok := c.rejectCache.get(chanID); ok && entry.times != nil {
upd1Time = time.Unix(entry.times.upd1Time, 0)
upd2Time = time.Unix(entry.times.upd2Time, 0)
exists, isZombie = entry.flags.unpack()
return upd1Time, upd2Time, exists, isZombie, nil
}
@ -1244,9 +1244,11 @@ func (c *ChannelGraph) HasChannelEdge(
}
c.rejectCache.insert(chanID, rejectCacheEntry{
upd1Time: upd1Time.Unix(),
upd2Time: upd2Time.Unix(),
flags: packRejectFlags(exists, isZombie),
times: &updateTimes{
upd1Time: upd1Time.Unix(),
upd2Time: upd2Time.Unix(),
},
flags: packRejectFlags(exists, isZombie),
})
return upd1Time, upd2Time, exists, isZombie, nil
@ -2817,33 +2819,39 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy1,
return c.chanScheduler.Execute(r)
}
func (c *ChannelGraph) updateEdgeCache(e *models.ChannelEdgePolicy1,
func (c *ChannelGraph) updateEdgeCache(e models.ChannelEdgePolicy,
isUpdate1 bool) {
// If an entry for this channel is found in reject cache, we'll modify
// the entry with the updated timestamp for the direction that was just
// written. If the edge doesn't exist, we'll load the cache entry lazily
// during the next query for this edge.
if entry, ok := c.rejectCache.get(e.ChannelID); ok {
if isUpdate1 {
entry.upd1Time = e.LastUpdate.Unix()
} else {
entry.upd2Time = e.LastUpdate.Unix()
}
c.rejectCache.insert(e.ChannelID, entry)
chanID := e.SCID().ToUint64()
if entry, ok := c.rejectCache.get(chanID); ok {
entry.update(isUpdate1, e)
c.rejectCache.insert(chanID, entry)
}
// If an entry for this channel is found in channel cache, we'll modify
// the entry with the updated policy for the direction that was just
// written. If the edge doesn't exist, we'll defer loading the info and
// policies and lazily read from disk during the next query.
if channel, ok := c.chanCache.get(e.ChannelID); ok {
if isUpdate1 {
channel.Policy1 = e
} else {
channel.Policy2 = e
if channel, ok := c.chanCache.get(chanID); ok {
edge, ok := e.(*models.ChannelEdgePolicy1)
if !ok {
log.Errorf("expected *models.ChannelEdgePolicy1, "+
"got: %T", e)
return
}
c.chanCache.insert(e.ChannelID, channel)
if isUpdate1 {
channel.Policy1 = edge
} else {
channel.Policy2 = edge
}
c.chanCache.insert(chanID, channel)
}
}

@ -3622,7 +3622,17 @@ func compareNodes(a, b *LightningNode) error {
// compareEdgePolicies is used to compare two ChannelEdgePolices using
// compareNodes, so as to exclude comparisons of the Nodes' Features struct.
func compareEdgePolicies(a, b *models.ChannelEdgePolicy1) error {
func compareEdgePolicies(edgeA, edgeB models.ChannelEdgePolicy) error {
a, ok := edgeA.(*models.ChannelEdgePolicy1)
if !ok {
return fmt.Errorf("wanted edge policy 1")
}
b, ok := edgeB.(*models.ChannelEdgePolicy1)
if !ok {
return fmt.Errorf("wanted edge policy 1")
}
if a.ChannelID != b.ChannelID {
return fmt.Errorf("ChannelID doesn't match: expected %v, "+
"got %v", a.ChannelID, b.ChannelID)

@ -1,5 +1,7 @@
package channeldb
import "github.com/lightningnetwork/lnd/channeldb/models"
// rejectFlags is a compact representation of various metadata stored by the
// reject cache about a particular channel.
type rejectFlags uint8
@ -41,9 +43,47 @@ func (f rejectFlags) unpack() (bool, bool) {
// including the timestamps of its latest edge policies and whether or not the
// channel exists in the graph.
type rejectCacheEntry struct {
times *updateTimes
blocks *updateBlocks
flags rejectFlags
}
type updateTimes struct {
upd1Time int64
upd2Time int64
flags rejectFlags
}
type updateBlocks struct {
updBlock1 uint32
updBlock2 uint32
}
func (e *rejectCacheEntry) update(isUpdate1 bool,
policy models.ChannelEdgePolicy) {
switch pol := policy.(type) {
case *models.ChannelEdgePolicy1:
if e.times == nil {
e.times = &updateTimes{}
}
if isUpdate1 {
e.times.upd1Time = pol.LastUpdate.Unix()
} else {
e.times.upd2Time = pol.LastUpdate.Unix()
}
case *models.ChannelEdgePolicy2:
if e.blocks == nil {
e.blocks = &updateBlocks{}
}
if isUpdate1 {
e.blocks.updBlock1 = pol.BlockHeight.Val
} else {
e.blocks.updBlock2 = pol.BlockHeight.Val
}
}
}
// rejectCache is an in-memory cache used to improve the performance of

@ -100,8 +100,10 @@ func entryForInt(i uint64) rejectCacheEntry {
exists := i%2 == 0
isZombie := i%3 == 0
return rejectCacheEntry{
upd1Time: int64(2 * i),
upd2Time: int64(2*i + 1),
flags: packRejectFlags(exists, isZombie),
times: &updateTimes{
upd1Time: int64(2 * i),
upd2Time: int64(2*i + 1),
},
flags: packRejectFlags(exists, isZombie),
}
}

@ -474,30 +474,6 @@ func (b *Builder) syncGraphWithChain() error {
return nil
}
// isZombieChannel takes two edge policy updates and determines if the
// corresponding channel should be considered a zombie. The first boolean is
// true if the policy update from node 1 is considered a zombie, the second
// boolean is that of node 2, and the final boolean is true if the channel
// is considered a zombie.
func (b *Builder) isZombieChannel(e1,
e2 *models.ChannelEdgePolicy1) (bool, bool, bool) {
chanExpiry := b.cfg.ChannelPruneExpiry
e1Zombie := e1 == nil || time.Since(e1.LastUpdate) >= chanExpiry
e2Zombie := e2 == nil || time.Since(e2.LastUpdate) >= chanExpiry
var e1Time, e2Time time.Time
if e1 != nil {
e1Time = e1.LastUpdate
}
if e2 != nil {
e2Time = e2.LastUpdate
}
return e1Zombie, e2Zombie, b.IsZombieChannel(e1Time, e2Time)
}
// IsZombieChannel takes the timestamps of the latest channel updates for a
// channel and returns true if the channel should be considered a zombie based
// on these timestamps.
@ -512,6 +488,10 @@ func (b *Builder) IsZombieChannel(updateTime1,
e2Zombie := updateTime2.IsZero() ||
time.Since(updateTime2) >= chanExpiry
return b.isZombieChannel(e1Zombie, e2Zombie)
}
func (b *Builder) isZombieChannel(e1Zombie, e2Zombie bool) bool {
// If we're using strict zombie pruning, then a channel is only
// considered live if both edges have a recent update we know of.
if b.cfg.StrictZombiePruning {
@ -562,7 +542,15 @@ func (b *Builder) pruneZombieChans() error {
return nil
}
e1Zombie, e2Zombie, isZombieChan := b.isZombieChannel(e1, e2)
e1Zombie, err := b.isZombieEdge(e1)
if err != nil {
return err
}
e2Zombie, err := b.isZombieEdge(e2)
if err != nil {
return err
}
if e1Zombie {
log.Tracef("Node1 pubkey=%x of chan_id=%v is zombie",
@ -577,7 +565,7 @@ func (b *Builder) pruneZombieChans() error {
// If either edge hasn't been updated for a period of
// chanExpiry, then we'll mark the channel itself as eligible
// for graph pruning.
if !isZombieChan {
if !b.isZombieChannel(e1Zombie, e2Zombie) {
return nil
}
@ -662,6 +650,40 @@ func (b *Builder) pruneZombieChans() error {
return nil
}
func (b *Builder) isZombieEdge(edge models.ChannelEdgePolicy) (bool,
error) {
if edge == nil {
return true, nil
}
switch e := edge.(type) {
case *models.ChannelEdgePolicy1:
chanExpiry := b.cfg.ChannelPruneExpiry
if e == nil {
return true, nil
}
return time.Since(e.LastUpdate) >= chanExpiry, nil
case *models.ChannelEdgePolicy2:
chanExpiryBlocks := uint32(b.cfg.ChannelPruneExpiry.Hours() * 6)
if e == nil {
return true, nil
}
blockSince := b.SyncedHeight() - e.BlockHeight.Val
return blockSince >= chanExpiryBlocks, nil
default:
return false, fmt.Errorf("unhandled implementation of "+
"models.ChannelEdgePolicy: %T", edge)
}
}
// handleNetworkUpdate is responsible for processing the update message and
// notifies topology changes, if any.
//