channeldb: Update ChanUpdatesInHorizon

So that a start block and end block can also be passed in.
This commit is contained in:
Elle Mouton 2023-11-07 16:21:47 +02:00
parent deb4c2ba11
commit c248f60cbd
No known key found for this signature in database
GPG Key ID: D7D916376026F177
5 changed files with 93 additions and 41 deletions

View File

@ -2129,7 +2129,7 @@ type ChannelEdge struct {
// ChanUpdatesInHorizon returns all the known channel edges which have at least
// one edge that has an update timestamp within the specified horizon.
func (c *ChannelGraph) ChanUpdatesInHorizon(startTime,
endTime time.Time) ([]ChannelEdge, error) {
endTime time.Time, startBlock, endBlock uint32) ([]ChannelEdge, error) {
// To ensure we don't return duplicate ChannelEdges, we'll use an
// additional map to keep track of the edges already seen to prevent
@ -2138,50 +2138,30 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime,
var edgesToCache map[uint64]ChannelEdge
var edgesInHorizon []ChannelEdge
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
var hits int
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
edges := tx.ReadBucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
}
edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
if edgeIndex == nil {
return ErrGraphNoEdgesFound
}
edgeUpdateIndex := edges.NestedReadBucket(edgeUpdateIndexBucket)
fetchUpdates := func(tx kvdb.RTx, edges, edgeIndex, nodes kvdb.RBucket,
updateIndexBkt []byte, startBytes, endBytes []byte,
chanIDFromKey func([]byte) []byte) error {
edgeUpdateIndex := edges.NestedReadBucket(updateIndexBkt)
if edgeUpdateIndex == nil {
return ErrGraphNoEdgesFound
}
nodes := tx.ReadBucket(nodeBucket)
if nodes == nil {
return ErrGraphNodesNotFound
}
// We'll now obtain a cursor to perform a range query within
// the index to find all channels within the horizon.
updateCursor := edgeUpdateIndex.ReadCursor()
var startTimeBytes, endTimeBytes [8 + 8]byte
byteOrder.PutUint64(
startTimeBytes[:8], uint64(startTime.Unix()),
)
byteOrder.PutUint64(
endTimeBytes[:8], uint64(endTime.Unix()),
)
// With our start and end times constructed, we'll step through
// the index collecting the info and policy of each update of
// each channel that has a last update within the time range.
for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil &&
bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {
//nolint:lll
for indexKey, _ := updateCursor.Seek(startBytes); indexKey != nil &&
bytes.Compare(indexKey, endBytes) <= 0; indexKey, _ = updateCursor.Next() { //nolint:whitespace
// We have a new eligible entry, so we'll slice of the
// chan ID so we can query it in the DB.
chanID := indexKey[8:]
chanID := chanIDFromKey(indexKey)
// If we've already retrieved the info and policies for
// this edge, then we can skip it as we don't need to do
@ -2218,16 +2198,15 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime,
err)
}
var (
node1Bytes = edgeInfo.Node1Bytes()
node2Bytes = edgeInfo.Node2Bytes()
)
node1Bytes := edgeInfo.Node1Bytes()
node1, err := fetchLightningNode(nodes, node1Bytes[:])
if err != nil {
return err
}
node2Bytes := edgeInfo.Node2Bytes()
node2, err := fetchLightningNode(nodes, node2Bytes[:])
if err != nil {
return err
@ -2247,6 +2226,66 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime,
edgesToCache[chanIDInt] = channel
}
return nil
}
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
edges := tx.ReadBucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
}
edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
if edgeIndex == nil {
return ErrGraphNoEdgesFound
}
nodes := tx.ReadBucket(nodeBucket)
if nodes == nil {
return ErrGraphNodesNotFound
}
var startTimeBytes, endTimeBytes [8 + 8]byte
byteOrder.PutUint64(
startTimeBytes[:8], uint64(startTime.Unix()),
)
byteOrder.PutUint64(
endTimeBytes[:8], uint64(endTime.Unix()),
)
var noEdgesFound bool
err := fetchUpdates(
tx, edges, edgeIndex, nodes, edgeUpdateIndexBucket,
startTimeBytes[:], endTimeBytes[:],
func(key []byte) []byte {
return key[8:]
},
)
if errors.Is(err, ErrGraphNoEdgesFound) {
noEdgesFound = true
} else if err != nil {
return err
}
var startBlockBytes, endBlockBytes [4 + 8]byte
byteOrder.PutUint32(startTimeBytes[:4], startBlock)
byteOrder.PutUint32(endTimeBytes[:4], endBlock)
err = fetchUpdates(
tx, edges, edgeIndex, nodes, edgeUpdate2IndexBucket,
startBlockBytes[:], endBlockBytes[:],
func(key []byte) []byte {
return key[4:]
},
)
if errors.Is(err, ErrGraphNoEdgesFound) && noEdgesFound {
return err
} else if err != nil {
return err
}
return nil
}, func() {
edgesSeen = make(map[uint64]struct{})
@ -3664,7 +3703,9 @@ func (c *ChannelGraph) FetchOtherNode(tx kvdb.RTx,
// otherwise we can use the existing db transaction.
var err error
if tx == nil {
err = kvdb.View(c.db, fetchNodeFunc, func() { targetNode = nil })
err = kvdb.View(c.db, fetchNodeFunc, func() {
targetNode = nil
})
} else {
err = fetchNodeFunc(tx)
}

View File

@ -1671,7 +1671,7 @@ func TestChanUpdatesInHorizon(t *testing.T) {
// If we issue an arbitrary query before any channel updates are
// inserted in the database, we should get zero results.
chanUpdates, err := graph.ChanUpdatesInHorizon(
time.Unix(999, 0), time.Unix(9999, 0),
time.Unix(999, 0), time.Unix(9999, 0), 0, 0,
)
require.NoError(t, err, "unable to updates for updates")
if len(chanUpdates) != 0 {
@ -1789,7 +1789,7 @@ func TestChanUpdatesInHorizon(t *testing.T) {
}
for _, queryCase := range queryCases {
resp, err := graph.ChanUpdatesInHorizon(
queryCase.start, queryCase.end,
queryCase.start, queryCase.end, 0, 0,
)
if err != nil {
t.Fatalf("unable to query for updates: %v", err)
@ -2317,6 +2317,7 @@ func TestStressTestChannelGraphAPI(t *testing.T) {
fn: func() error {
_, err := graph.ChanUpdatesInHorizon(
time.Now().Add(-time.Hour), time.Now(),
0, 0,
)
return err

View File

@ -112,7 +112,7 @@ func (c *ChanSeries) UpdatesInHorizon(chain chainhash.Hash,
// First, we'll query for all the set of channels that have an update
// that falls within the specified horizon.
chansInHorizon, err := c.graph.ChanUpdatesInHorizon(
startTime, endTime,
startTime, endTime, 0, 0,
)
if err != nil {
return nil, err

View File

@ -607,7 +607,17 @@ func (b *Builder) pruneZombieChans() error {
startTime := time.Unix(0, 0)
endTime := time.Now().Add(-1 * chanExpiry)
oldEdges, err := b.cfg.Graph.ChanUpdatesInHorizon(startTime, endTime)
startBlock := 0
_, bestBlock, err := b.cfg.Chain.GetBestBlock()
if err != nil {
return err
}
endBlock := uint32(bestBlock) - uint32(chanExpiry.Hours()*6)
oldEdges, err := b.cfg.Graph.ChanUpdatesInHorizon(
startTime, endTime, uint32(startBlock), endBlock,
)
if err != nil {
return fmt.Errorf("unable to fetch expired channel updates "+
"chans: %v", err)

View File

@ -147,8 +147,8 @@ type DB interface {
// ChanUpdatesInHorizon returns all the known channel edges which have
// at least one edge that has an update timestamp within the specified
// horizon.
ChanUpdatesInHorizon(startTime, endTime time.Time) (
[]channeldb.ChannelEdge, error)
ChanUpdatesInHorizon(startTime, endTime time.Time, startBlock,
endBlock uint32) ([]channeldb.ChannelEdge, error)
// DeleteChannelEdges removes edges with the given channel IDs from the
// database and marks them as zombies. This ensures that we're unable to