sqldb+graph/db: implement SQLStore.NodeUpdatesInHorizon

In this commit we add the necessary SQL queries and then implement the
SQLStore's NodeUpdatesInHorizon method. This lets us run the
TestNodeUpdatesInHorizon unit tests against SQL backends.
This commit is contained in:
Elle Mouton
2025-05-19 11:38:06 +02:00
parent 7a120cb584
commit 86d48390ca
5 changed files with 102 additions and 18 deletions

View File

@ -2080,7 +2080,7 @@ func TestChanUpdatesInHorizon(t *testing.T) {
func TestNodeUpdatesInHorizon(t *testing.T) {
t.Parallel()
graph := MakeTestGraph(t)
graph := MakeTestGraphNew(t)
startTime := time.Unix(1234, 0)
endTime := startTime
@ -2091,10 +2091,7 @@ func TestNodeUpdatesInHorizon(t *testing.T) {
time.Unix(999, 0), time.Unix(9999, 0),
)
require.NoError(t, err, "unable to query for node updates")
if len(nodeUpdates) != 0 {
t.Fatalf("expected 0 node updates, instead got %v",
len(nodeUpdates))
}
require.Len(t, nodeUpdates, 0)
// We'll create 10 node announcements, each with an update timestamp 10
// seconds after the other.
@ -2113,9 +2110,7 @@ func TestNodeUpdatesInHorizon(t *testing.T) {
nodeAnns = append(nodeAnns, *nodeAnn)
if err := graph.AddLightningNode(nodeAnn); err != nil {
t.Fatalf("unable to add lightning node: %v", err)
}
require.NoError(t, graph.AddLightningNode(nodeAnn))
}
queryCases := []struct {
@ -2169,15 +2164,8 @@ func TestNodeUpdatesInHorizon(t *testing.T) {
resp, err := graph.NodeUpdatesInHorizon(
queryCase.start, queryCase.end,
)
if err != nil {
t.Fatalf("unable to query for nodes: %v", err)
}
if len(resp) != len(queryCase.resp) {
t.Fatalf("expected %v nodes, got %v nodes",
len(queryCase.resp), len(resp))
}
require.NoError(t, err)
require.Len(t, resp, len(queryCase.resp))
for i := 0; i < len(resp); i++ {
compareNodes(t, &queryCase.resp[i], &resp[i])
@ -3382,7 +3370,7 @@ func TestAddChannelEdgeShellNodes(t *testing.T) {
func TestNodePruningUpdateIndexDeletion(t *testing.T) {
t.Parallel()
graph := MakeTestGraph(t)
graph := MakeTestGraphNew(t)
// We'll first populate our graph with a single node that will be
// removed shortly.

View File

@ -48,6 +48,7 @@ type SQLQueries interface {
*/
UpsertNode(ctx context.Context, arg sqlc.UpsertNodeParams) (int64, error)
GetNodeByPubKey(ctx context.Context, arg sqlc.GetNodeByPubKeyParams) (sqlc.Node, error)
GetNodesByLastUpdateRange(ctx context.Context, arg sqlc.GetNodesByLastUpdateRangeParams) ([]sqlc.Node, error)
DeleteNodeByPubKey(ctx context.Context, arg sqlc.DeleteNodeByPubKeyParams) (sql.Result, error)
GetExtraNodeTypes(ctx context.Context, nodeID int64) ([]sqlc.NodeExtraType, error)
@ -371,6 +372,51 @@ func (s *SQLStore) LookupAlias(pub *btcec.PublicKey) (string, error) {
return alias, nil
}
// NodeUpdatesInHorizon returns all the known lightning node which have an
// update timestamp within the passed range. This method can be used by two
// nodes to quickly determine if they have the same set of up to date node
// announcements.
//
// NOTE: This is part of the V1Store interface.
func (s *SQLStore) NodeUpdatesInHorizon(startTime,
endTime time.Time) ([]models.LightningNode, error) {
ctx := context.TODO()
var (
readTx = NewReadTx()
nodes []models.LightningNode
)
err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error {
dbNodes, err := db.GetNodesByLastUpdateRange(
ctx, sqlc.GetNodesByLastUpdateRangeParams{
StartTime: sqldb.SQLInt64(startTime.Unix()),
EndTime: sqldb.SQLInt64(endTime.Unix()),
},
)
if err != nil {
return fmt.Errorf("unable to fetch nodes: %w", err)
}
for _, dbNode := range dbNodes {
node, err := buildNode(ctx, db, &dbNode)
if err != nil {
return fmt.Errorf("unable to build node: %w",
err)
}
nodes = append(nodes, *node)
}
return nil
}, func() {})
if err != nil {
return nil, fmt.Errorf("unable to fetch nodes: %w", err)
}
return nodes, nil
}
// getNodeByPubKey attempts to look up a target node by its public key.
func getNodeByPubKey(ctx context.Context, db SQLQueries,
pubKey route.Vertex) (int64, *models.LightningNode, error) {

View File

@ -229,6 +229,49 @@ func (q *Queries) GetNodeFeaturesByPubKey(ctx context.Context, arg GetNodeFeatur
return items, nil
}
const getNodesByLastUpdateRange = `-- name: GetNodesByLastUpdateRange :many
SELECT id, version, pub_key, alias, last_update, color, signature
FROM nodes
WHERE last_update >= $1
AND last_update < $2
`
type GetNodesByLastUpdateRangeParams struct {
StartTime sql.NullInt64
EndTime sql.NullInt64
}
func (q *Queries) GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByLastUpdateRangeParams) ([]Node, error) {
rows, err := q.db.QueryContext(ctx, getNodesByLastUpdateRange, arg.StartTime, arg.EndTime)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Node
for rows.Next() {
var i Node
if err := rows.Scan(
&i.ID,
&i.Version,
&i.PubKey,
&i.Alias,
&i.LastUpdate,
&i.Color,
&i.Signature,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const insertNodeAddress = `-- name: InsertNodeAddress :exec
/* ─────────────────────────────────────────────
node_addresses table queries

View File

@ -40,6 +40,7 @@ type Querier interface {
GetNodeByPubKey(ctx context.Context, arg GetNodeByPubKeyParams) (Node, error)
GetNodeFeatures(ctx context.Context, nodeID int64) ([]NodeFeature, error)
GetNodeFeaturesByPubKey(ctx context.Context, arg GetNodeFeaturesByPubKeyParams) ([]int32, error)
GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByLastUpdateRangeParams) ([]Node, error)
InsertAMPSubInvoice(ctx context.Context, arg InsertAMPSubInvoiceParams) error
InsertAMPSubInvoiceHTLC(ctx context.Context, arg InsertAMPSubInvoiceHTLCParams) error
InsertInvoice(ctx context.Context, arg InsertInvoiceParams) (int64, error)

View File

@ -82,6 +82,12 @@ LEFT JOIN node_addresses a ON a.node_id = n.id
WHERE n.pub_key = $1 AND n.version = $2
ORDER BY a.type ASC, a.position ASC;
-- name: GetNodesByLastUpdateRange :many
SELECT *
FROM nodes
WHERE last_update >= @start_time
AND last_update < @end_time;
-- name: DeleteNodeAddresses :exec
DELETE FROM node_addresses
WHERE node_id = $1;