diff --git a/channeldb/graph.go b/channeldb/graph.go index 7306cbcdc..f19cebb19 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -1162,10 +1162,10 @@ func (c *ChannelGraph) HasChannelEdge( // We'll query the cache with the shared lock held to allow multiple // readers to access values in the cache concurrently if they exist. c.cacheMu.RLock() - if entry, ok := c.rejectCache.get(chanID); ok { + if entry, ok := c.rejectCache.get(chanID); ok && entry.times != nil { c.cacheMu.RUnlock() - upd1Time = time.Unix(entry.upd1Time, 0) - upd2Time = time.Unix(entry.upd2Time, 0) + upd1Time = time.Unix(entry.times.upd1Time, 0) + upd2Time = time.Unix(entry.times.upd2Time, 0) exists, isZombie = entry.flags.unpack() return upd1Time, upd2Time, exists, isZombie, nil } @@ -1177,9 +1177,9 @@ func (c *ChannelGraph) HasChannelEdge( // The item was not found with the shared lock, so we'll acquire the // exclusive lock and check the cache again in case another method added // the entry to the cache while no lock was held. - if entry, ok := c.rejectCache.get(chanID); ok { - upd1Time = time.Unix(entry.upd1Time, 0) - upd2Time = time.Unix(entry.upd2Time, 0) + if entry, ok := c.rejectCache.get(chanID); ok && entry.times != nil { + upd1Time = time.Unix(entry.times.upd1Time, 0) + upd2Time = time.Unix(entry.times.upd2Time, 0) exists, isZombie = entry.flags.unpack() return upd1Time, upd2Time, exists, isZombie, nil } @@ -1244,9 +1244,11 @@ func (c *ChannelGraph) HasChannelEdge( } c.rejectCache.insert(chanID, rejectCacheEntry{ - upd1Time: upd1Time.Unix(), - upd2Time: upd2Time.Unix(), - flags: packRejectFlags(exists, isZombie), + times: &updateTimes{ + upd1Time: upd1Time.Unix(), + upd2Time: upd2Time.Unix(), + }, + flags: packRejectFlags(exists, isZombie), }) return upd1Time, upd2Time, exists, isZombie, nil @@ -2817,33 +2819,39 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy1, return c.chanScheduler.Execute(r) } -func (c *ChannelGraph) updateEdgeCache(e *models.ChannelEdgePolicy1, +func (c *ChannelGraph) updateEdgeCache(e models.ChannelEdgePolicy, isUpdate1 bool) { // If an entry for this channel is found in reject cache, we'll modify // the entry with the updated timestamp for the direction that was just // written. If the edge doesn't exist, we'll load the cache entry lazily // during the next query for this edge. - if entry, ok := c.rejectCache.get(e.ChannelID); ok { - if isUpdate1 { - entry.upd1Time = e.LastUpdate.Unix() - } else { - entry.upd2Time = e.LastUpdate.Unix() - } - c.rejectCache.insert(e.ChannelID, entry) + chanID := e.SCID().ToUint64() + if entry, ok := c.rejectCache.get(chanID); ok { + entry.update(isUpdate1, e) + c.rejectCache.insert(chanID, entry) } // If an entry for this channel is found in channel cache, we'll modify // the entry with the updated policy for the direction that was just // written. If the edge doesn't exist, we'll defer loading the info and // policies and lazily read from disk during the next query. - if channel, ok := c.chanCache.get(e.ChannelID); ok { - if isUpdate1 { - channel.Policy1 = e - } else { - channel.Policy2 = e + if channel, ok := c.chanCache.get(chanID); ok { + edge, ok := e.(*models.ChannelEdgePolicy1) + if !ok { + log.Errorf("expected *models.ChannelEdgePolicy1, "+ + "got: %T", e) + + return } - c.chanCache.insert(e.ChannelID, channel) + + if isUpdate1 { + channel.Policy1 = edge + } else { + channel.Policy2 = edge + } + + c.chanCache.insert(chanID, channel) } } diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index ba4567a58..afdd8bced 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -3622,7 +3622,17 @@ func compareNodes(a, b *LightningNode) error { // compareEdgePolicies is used to compare two ChannelEdgePolices using // compareNodes, so as to exclude comparisons of the Nodes' Features struct. -func compareEdgePolicies(a, b *models.ChannelEdgePolicy1) error { +func compareEdgePolicies(edgeA, edgeB models.ChannelEdgePolicy) error { + a, ok := edgeA.(*models.ChannelEdgePolicy1) + if !ok { + return fmt.Errorf("wanted edge policy 1") + } + + b, ok := edgeB.(*models.ChannelEdgePolicy1) + if !ok { + return fmt.Errorf("wanted edge policy 1") + } + if a.ChannelID != b.ChannelID { return fmt.Errorf("ChannelID doesn't match: expected %v, "+ "got %v", a.ChannelID, b.ChannelID) diff --git a/channeldb/reject_cache.go b/channeldb/reject_cache.go index acadb8780..a76f80d7c 100644 --- a/channeldb/reject_cache.go +++ b/channeldb/reject_cache.go @@ -1,5 +1,7 @@ package channeldb +import "github.com/lightningnetwork/lnd/channeldb/models" + // rejectFlags is a compact representation of various metadata stored by the // reject cache about a particular channel. type rejectFlags uint8 @@ -41,9 +43,47 @@ func (f rejectFlags) unpack() (bool, bool) { // including the timestamps of its latest edge policies and whether or not the // channel exists in the graph. type rejectCacheEntry struct { + times *updateTimes + blocks *updateBlocks + flags rejectFlags +} + +type updateTimes struct { upd1Time int64 upd2Time int64 - flags rejectFlags +} + +type updateBlocks struct { + updBlock1 uint32 + updBlock2 uint32 +} + +func (e *rejectCacheEntry) update(isUpdate1 bool, + policy models.ChannelEdgePolicy) { + + switch pol := policy.(type) { + case *models.ChannelEdgePolicy1: + if e.times == nil { + e.times = &updateTimes{} + } + + if isUpdate1 { + e.times.upd1Time = pol.LastUpdate.Unix() + } else { + e.times.upd2Time = pol.LastUpdate.Unix() + } + + case *models.ChannelEdgePolicy2: + if e.blocks == nil { + e.blocks = &updateBlocks{} + } + + if isUpdate1 { + e.blocks.updBlock1 = pol.BlockHeight.Val + } else { + e.blocks.updBlock2 = pol.BlockHeight.Val + } + } } // rejectCache is an in-memory cache used to improve the performance of diff --git a/channeldb/reject_cache_test.go b/channeldb/reject_cache_test.go index 6974f4257..a4ab798c0 100644 --- a/channeldb/reject_cache_test.go +++ b/channeldb/reject_cache_test.go @@ -100,8 +100,10 @@ func entryForInt(i uint64) rejectCacheEntry { exists := i%2 == 0 isZombie := i%3 == 0 return rejectCacheEntry{ - upd1Time: int64(2 * i), - upd2Time: int64(2*i + 1), - flags: packRejectFlags(exists, isZombie), + times: &updateTimes{ + upd1Time: int64(2 * i), + upd2Time: int64(2*i + 1), + }, + flags: packRejectFlags(exists, isZombie), } } diff --git a/graph/builder.go b/graph/builder.go index a736330b6..b95d0d4da 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -474,30 +474,6 @@ func (b *Builder) syncGraphWithChain() error { return nil } -// isZombieChannel takes two edge policy updates and determines if the -// corresponding channel should be considered a zombie. The first boolean is -// true if the policy update from node 1 is considered a zombie, the second -// boolean is that of node 2, and the final boolean is true if the channel -// is considered a zombie. -func (b *Builder) isZombieChannel(e1, - e2 *models.ChannelEdgePolicy1) (bool, bool, bool) { - - chanExpiry := b.cfg.ChannelPruneExpiry - - e1Zombie := e1 == nil || time.Since(e1.LastUpdate) >= chanExpiry - e2Zombie := e2 == nil || time.Since(e2.LastUpdate) >= chanExpiry - - var e1Time, e2Time time.Time - if e1 != nil { - e1Time = e1.LastUpdate - } - if e2 != nil { - e2Time = e2.LastUpdate - } - - return e1Zombie, e2Zombie, b.IsZombieChannel(e1Time, e2Time) -} - // IsZombieChannel takes the timestamps of the latest channel updates for a // channel and returns true if the channel should be considered a zombie based // on these timestamps. @@ -512,6 +488,10 @@ func (b *Builder) IsZombieChannel(updateTime1, e2Zombie := updateTime2.IsZero() || time.Since(updateTime2) >= chanExpiry + return b.isZombieChannel(e1Zombie, e2Zombie) +} + +func (b *Builder) isZombieChannel(e1Zombie, e2Zombie bool) bool { // If we're using strict zombie pruning, then a channel is only // considered live if both edges have a recent update we know of. if b.cfg.StrictZombiePruning { @@ -562,7 +542,15 @@ func (b *Builder) pruneZombieChans() error { return nil } - e1Zombie, e2Zombie, isZombieChan := b.isZombieChannel(e1, e2) + e1Zombie, err := b.isZombieEdge(e1) + if err != nil { + return err + } + + e2Zombie, err := b.isZombieEdge(e2) + if err != nil { + return err + } if e1Zombie { log.Tracef("Node1 pubkey=%x of chan_id=%v is zombie", @@ -577,7 +565,7 @@ func (b *Builder) pruneZombieChans() error { // If either edge hasn't been updated for a period of // chanExpiry, then we'll mark the channel itself as eligible // for graph pruning. - if !isZombieChan { + if !b.isZombieChannel(e1Zombie, e2Zombie) { return nil } @@ -662,6 +650,40 @@ func (b *Builder) pruneZombieChans() error { return nil } +func (b *Builder) isZombieEdge(edge models.ChannelEdgePolicy) (bool, + error) { + + if edge == nil { + return true, nil + } + + switch e := edge.(type) { + case *models.ChannelEdgePolicy1: + chanExpiry := b.cfg.ChannelPruneExpiry + + if e == nil { + return true, nil + } + + return time.Since(e.LastUpdate) >= chanExpiry, nil + + case *models.ChannelEdgePolicy2: + chanExpiryBlocks := uint32(b.cfg.ChannelPruneExpiry.Hours() * 6) + + if e == nil { + return true, nil + } + + blockSince := b.SyncedHeight() - e.BlockHeight.Val + + return blockSince >= chanExpiryBlocks, nil + + default: + return false, fmt.Errorf("unhandled implementation of "+ + "models.ChannelEdgePolicy: %T", edge) + } +} + // handleNetworkUpdate is responsible for processing the update message and // notifies topology changes, if any. //