graph/db: add tests for iterator implementations

This commit is contained in:
Olaoluwa Osuntokun
2025-08-04 16:01:50 -07:00
parent 31ab2ae4b6
commit 32528afa0a

View File

@@ -2028,11 +2028,13 @@ func TestChanUpdatesInHorizon(t *testing.T) {
// If we issue an arbitrary query before any channel updates are
// inserted in the database, we should get zero results.
chanUpdatesIter, err := graph.ChanUpdatesInHorizon(
chanIter, err := graph.ChanUpdatesInHorizon(
time.Unix(999, 0), time.Unix(9999, 0),
)
require.NoError(t, err, "unable to updates for updates")
chanUpdates := fn.Collect(chanUpdatesIter)
chanUpdates := fn.Collect(chanIter)
if len(chanUpdates) != 0 {
t.Fatalf("expected 0 chan updates, instead got %v",
len(chanUpdates))
@@ -2151,6 +2153,7 @@ func TestChanUpdatesInHorizon(t *testing.T) {
if err != nil {
t.Fatalf("unable to query for updates: %v", err)
}
resp := fn.Collect(respIter)
if len(resp) != len(queryCase.resp) {
@@ -2191,10 +2194,11 @@ func TestNodeUpdatesInHorizon(t *testing.T) {
// If we issue an arbitrary query before we insert any nodes into the
// database, then we shouldn't get any results back.
nodeUpdates, err := graph.NodeUpdatesInHorizon(
nodeUpdatesIter, err := graph.NodeUpdatesInHorizon(
time.Unix(999, 0), time.Unix(9999, 0),
)
require.NoError(t, err, "unable to query for node updates")
nodeUpdates := fn.Collect(nodeUpdatesIter)
require.Len(t, nodeUpdates, 0)
// We'll create 10 node announcements, each with an update timestamp 10
@@ -2255,22 +2259,22 @@ func TestNodeUpdatesInHorizon(t *testing.T) {
resp: nodeAnns,
},
// If we reduce the ending time by 10 seconds, then we should
// get all but the last node we inserted.
// If we reduce the ending time by 1 nanosecond before the last
// node's timestamp, then we should get all but the last node.
{
start: startTime,
end: endTime.Add(-time.Second * 10),
end: endTime.Add(-time.Second*10 - time.Nanosecond),
resp: nodeAnns[:9],
},
}
for _, queryCase := range queryCases {
respIter, err := graph.NodeUpdatesInHorizon(
iter, err := graph.NodeUpdatesInHorizon(
queryCase.start, queryCase.end,
)
require.NoError(t, err, "unable to query for node updates")
resp := fn.Collect(respIter)
require.NoError(t, err)
resp := fn.Collect(iter)
require.Len(t, resp, len(queryCase.resp))
for i := 0; i < len(resp); i++ {
@@ -2279,6 +2283,278 @@ func TestNodeUpdatesInHorizon(t *testing.T) {
}
}
// TestNodeUpdatesInHorizonBoundaryConditions tests the iterator boundary
// conditions, specifically around batch boundaries and edge cases.
func TestNodeUpdatesInHorizonBoundaryConditions(t *testing.T) {
t.Parallel()
ctx := t.Context()
// Test with various batch sizes to ensure the iterator works correctly
// across batch boundaries.
batchSizes := []int{1, 3, 5, 10, 25, 100}
for _, batchSize := range batchSizes {
t.Run(fmt.Sprintf("BatchSize%d", batchSize), func(t *testing.T) {
// Create a fresh graph for each test.
testGraph := MakeTestGraph(t)
// Add 25 nodes with increasing timestamps.
startTime := time.Unix(1234567890, 0)
var nodeAnns []models.Node
for i := 0; i < 25; i++ {
nodeAnn := createTestVertex(t)
nodeAnn.LastUpdate = startTime.Add(
time.Duration(i) * time.Hour,
)
nodeAnns = append(nodeAnns, *nodeAnn)
require.NoError(
t, testGraph.AddNode(ctx, nodeAnn),
)
}
testCases := []struct {
name string
start time.Time
end time.Time
want int
}{
{
name: "all nodes",
start: startTime,
end: startTime.Add(26 * time.Hour),
want: 25,
},
{
name: "first batch only",
start: startTime,
end: startTime.Add(
time.Duration(
min(batchSize, 25)-1,
) * time.Hour,
),
want: min(batchSize, 25),
},
{
name: "cross batch boundary",
start: startTime,
end: startTime.Add(
time.Duration(
min(batchSize, 24),
) * time.Hour,
),
want: min(batchSize+1, 25),
},
{
name: "exact boundary",
start: func() time.Time {
// Test querying exactly at a
// batch boundary.
if batchSize <= 25 {
return startTime.Add(
time.Duration(
batchSize-1,
) * time.Hour,
)
}
// For batch sizes > 25, test
// beyond our data range.
return startTime.Add(
time.Duration(25) * time.Hour,
)
}(),
end: func() time.Time {
if batchSize <= 25 {
return startTime.Add(
time.Duration(
batchSize-1,
) * time.Hour,
)
}
return startTime.Add(
time.Duration(25) * time.Hour,
)
}(),
want: func() int {
if batchSize <= 25 {
return 1
}
// No nodes exist at hour 25 or
// beyond.
return 0
}(),
},
{
name: "empty range before",
start: startTime.Add(-time.Hour),
end: startTime.Add(-time.Minute),
want: 0,
},
{
name: "empty range after",
start: startTime.Add(30 * time.Hour),
end: startTime.Add(40 * time.Hour),
want: 0,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
iter, err := testGraph.NodeUpdatesInHorizon(
tc.start, tc.end,
WithNodeUpdateIterBatchSize(
batchSize,
),
)
require.NoError(t, err)
nodes := fn.Collect(iter)
require.Len(
t, nodes, tc.want,
"expected %d nodes, got %d",
tc.want, len(nodes),
)
// Verify nodes are in the correct time
// order.
for i := 1; i < len(nodes); i++ {
require.True(t,
nodes[i-1].LastUpdate.Before(
nodes[i].LastUpdate,
) || nodes[i-1].LastUpdate.Equal(
nodes[i].LastUpdate,
),
"nodes should be in "+
"chronological order",
)
}
})
}
})
}
}
// TestNodeUpdatesInHorizonEarlyTermination tests that the iterator properly
// handles early termination when the caller stops iterating.
func TestNodeUpdatesInHorizonEarlyTermination(t *testing.T) {
t.Parallel()
ctx := t.Context()
graph := MakeTestGraph(t)
// We'll start by creating 100 nodes, each with an update time spaced
// one hour apart.
startTime := time.Unix(1234567890, 0)
for i := 0; i < 100; i++ {
nodeAnn := createTestVertex(t)
nodeAnn.LastUpdate = startTime.Add(time.Duration(i) * time.Hour)
require.NoError(t, graph.AddNode(ctx, nodeAnn))
}
// Test early termination at various points
terminationPoints := []int{0, 1, 5, 10, 23, 50, 99}
for _, stopAt := range terminationPoints {
t.Run(fmt.Sprintf("StopAt%d", stopAt), func(t *testing.T) {
iter, err := graph.NodeUpdatesInHorizon(
startTime, startTime.Add(200*time.Hour),
WithNodeUpdateIterBatchSize(10),
)
require.NoError(t, err)
// Collect only up to stopAt nodes, breaking afterwards.
var collected []models.Node
count := 0
for node := range iter {
if count >= stopAt {
break
}
collected = append(collected, node)
count++
}
require.Len(
t, collected, stopAt,
"should have collected exactly %d nodes",
stopAt,
)
})
}
}
// TestChanUpdatesInHorizonBoundaryConditions tests the channel iterator
// boundary conditions.
func TestChanUpdatesInHorizonBoundaryConditions(t *testing.T) {
t.Parallel()
ctx := t.Context()
batchSizes := []int{1, 3, 5, 10}
for _, batchSize := range batchSizes {
t.Run(fmt.Sprintf("BatchSize%d", batchSize), func(t *testing.T) {
// Create a fresh graph for each test, then add two new
// nodes to the graph.
graph := MakeTestGraph(t)
node1 := createTestVertex(t)
node2 := createTestVertex(t)
require.NoError(t, graph.AddNode(ctx, node1))
require.NoError(t, graph.AddNode(ctx, node2))
// Next, we'll create 25 channels between the two nodes,
// each with increasing timestamps.
startTime := time.Unix(1234567890, 0)
const numChans = 25
for i := 0; i < numChans; i++ {
updateTime := startTime.Add(
time.Duration(i) * time.Hour,
)
channel, chanID := createEdge(
uint32(i*10), 0, 0, 0, node1, node2,
)
require.NoError(t, graph.AddChannelEdge(ctx, &channel))
edge1 := newEdgePolicy(
chanID.ToUint64(), updateTime.Unix(),
)
edge1.ChannelFlags = 0
edge1.ToNode = node2.PubKeyBytes
edge1.SigBytes = testSig.Serialize()
require.NoError(
t, graph.UpdateEdgePolicy(ctx, edge1),
)
edge2 := newEdgePolicy(
chanID.ToUint64(), updateTime.Unix(),
)
edge2.ChannelFlags = 1
edge2.ToNode = node1.PubKeyBytes
edge2.SigBytes = testSig.Serialize()
require.NoError(t, graph.UpdateEdgePolicy(ctx, edge2))
}
// Now we'll run the main query, and verify that we get
// back the expected number of channels.
iter, err := graph.ChanUpdatesInHorizon(
startTime, startTime.Add(26*time.Hour),
WithChanUpdateIterBatchSize(batchSize),
)
require.NoError(t, err)
channels := fn.Collect(iter)
require.Len(
t, channels, numChans,
"expected %d channels, got %d", numChans,
len(channels),
)
})
}
}
// TestFilterKnownChanIDsZombieRevival tests that if a ChannelUpdateInfo is
// passed to FilterKnownChanIDs that contains a channel that we have marked as
// a zombie, then we will mark it as live again if the new ChannelUpdate has
@@ -3532,6 +3808,7 @@ func TestNodePruningUpdateIndexDeletion(t *testing.T) {
// the horizon. This time we should have no nodes at all.
nodesInHorizonIter, err = graph.NodeUpdatesInHorizon(startTime, endTime)
require.NoError(t, err, "unable to fetch nodes in horizon")
nodesInHorizon = fn.Collect(nodesInHorizonIter)
if len(fn.Collect(nodesInHorizonIter)) != 0 {
t.Fatalf("should have zero nodes instead have: %v",