From 86d48390ca1d2228d87cbaede5e8eb7d46ac6b2f Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Mon, 19 May 2025 11:38:06 +0200 Subject: [PATCH] sqldb+graph/db: implement SQLStore.NodeUpdatesInHorizon In this commit we add the necessary SQL queries and then implement the SQLStore's NodeUpdatesInHorizon method. This lets us run the TestNodeUpdatesInHorizon unit tests against SQL backends. --- graph/db/graph_test.go | 24 +++++-------------- graph/db/sql_store.go | 46 ++++++++++++++++++++++++++++++++++++ sqldb/sqlc/graph.sql.go | 43 +++++++++++++++++++++++++++++++++ sqldb/sqlc/querier.go | 1 + sqldb/sqlc/queries/graph.sql | 6 +++++ 5 files changed, 102 insertions(+), 18 deletions(-) diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 2b5d0dbce..42c88d49c 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -2080,7 +2080,7 @@ func TestChanUpdatesInHorizon(t *testing.T) { func TestNodeUpdatesInHorizon(t *testing.T) { t.Parallel() - graph := MakeTestGraph(t) + graph := MakeTestGraphNew(t) startTime := time.Unix(1234, 0) endTime := startTime @@ -2091,10 +2091,7 @@ func TestNodeUpdatesInHorizon(t *testing.T) { time.Unix(999, 0), time.Unix(9999, 0), ) require.NoError(t, err, "unable to query for node updates") - if len(nodeUpdates) != 0 { - t.Fatalf("expected 0 node updates, instead got %v", - len(nodeUpdates)) - } + require.Len(t, nodeUpdates, 0) // We'll create 10 node announcements, each with an update timestamp 10 // seconds after the other. @@ -2113,9 +2110,7 @@ func TestNodeUpdatesInHorizon(t *testing.T) { nodeAnns = append(nodeAnns, *nodeAnn) - if err := graph.AddLightningNode(nodeAnn); err != nil { - t.Fatalf("unable to add lightning node: %v", err) - } + require.NoError(t, graph.AddLightningNode(nodeAnn)) } queryCases := []struct { @@ -2169,15 +2164,8 @@ func TestNodeUpdatesInHorizon(t *testing.T) { resp, err := graph.NodeUpdatesInHorizon( queryCase.start, queryCase.end, ) - if err != nil { - t.Fatalf("unable to query for nodes: %v", err) - } - - if len(resp) != len(queryCase.resp) { - t.Fatalf("expected %v nodes, got %v nodes", - len(queryCase.resp), len(resp)) - - } + require.NoError(t, err) + require.Len(t, resp, len(queryCase.resp)) for i := 0; i < len(resp); i++ { compareNodes(t, &queryCase.resp[i], &resp[i]) @@ -3382,7 +3370,7 @@ func TestAddChannelEdgeShellNodes(t *testing.T) { func TestNodePruningUpdateIndexDeletion(t *testing.T) { t.Parallel() - graph := MakeTestGraph(t) + graph := MakeTestGraphNew(t) // We'll first populate our graph with a single node that will be // removed shortly. diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 971a3acff..33fee1ae9 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -48,6 +48,7 @@ type SQLQueries interface { */ UpsertNode(ctx context.Context, arg sqlc.UpsertNodeParams) (int64, error) GetNodeByPubKey(ctx context.Context, arg sqlc.GetNodeByPubKeyParams) (sqlc.Node, error) + GetNodesByLastUpdateRange(ctx context.Context, arg sqlc.GetNodesByLastUpdateRangeParams) ([]sqlc.Node, error) DeleteNodeByPubKey(ctx context.Context, arg sqlc.DeleteNodeByPubKeyParams) (sql.Result, error) GetExtraNodeTypes(ctx context.Context, nodeID int64) ([]sqlc.NodeExtraType, error) @@ -371,6 +372,51 @@ func (s *SQLStore) LookupAlias(pub *btcec.PublicKey) (string, error) { return alias, nil } +// NodeUpdatesInHorizon returns all the known lightning node which have an +// update timestamp within the passed range. This method can be used by two +// nodes to quickly determine if they have the same set of up to date node +// announcements. +// +// NOTE: This is part of the V1Store interface. +func (s *SQLStore) NodeUpdatesInHorizon(startTime, + endTime time.Time) ([]models.LightningNode, error) { + + ctx := context.TODO() + + var ( + readTx = NewReadTx() + nodes []models.LightningNode + ) + err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error { + dbNodes, err := db.GetNodesByLastUpdateRange( + ctx, sqlc.GetNodesByLastUpdateRangeParams{ + StartTime: sqldb.SQLInt64(startTime.Unix()), + EndTime: sqldb.SQLInt64(endTime.Unix()), + }, + ) + if err != nil { + return fmt.Errorf("unable to fetch nodes: %w", err) + } + + for _, dbNode := range dbNodes { + node, err := buildNode(ctx, db, &dbNode) + if err != nil { + return fmt.Errorf("unable to build node: %w", + err) + } + + nodes = append(nodes, *node) + } + + return nil + }, func() {}) + if err != nil { + return nil, fmt.Errorf("unable to fetch nodes: %w", err) + } + + return nodes, nil +} + // getNodeByPubKey attempts to look up a target node by its public key. func getNodeByPubKey(ctx context.Context, db SQLQueries, pubKey route.Vertex) (int64, *models.LightningNode, error) { diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 1fadc7e3f..cde66e77e 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -229,6 +229,49 @@ func (q *Queries) GetNodeFeaturesByPubKey(ctx context.Context, arg GetNodeFeatur return items, nil } +const getNodesByLastUpdateRange = `-- name: GetNodesByLastUpdateRange :many +SELECT id, version, pub_key, alias, last_update, color, signature +FROM nodes +WHERE last_update >= $1 + AND last_update < $2 +` + +type GetNodesByLastUpdateRangeParams struct { + StartTime sql.NullInt64 + EndTime sql.NullInt64 +} + +func (q *Queries) GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByLastUpdateRangeParams) ([]Node, error) { + rows, err := q.db.QueryContext(ctx, getNodesByLastUpdateRange, arg.StartTime, arg.EndTime) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Node + for rows.Next() { + var i Node + if err := rows.Scan( + &i.ID, + &i.Version, + &i.PubKey, + &i.Alias, + &i.LastUpdate, + &i.Color, + &i.Signature, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const insertNodeAddress = `-- name: InsertNodeAddress :exec /* ───────────────────────────────────────────── node_addresses table queries diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index 557bb1dfe..e26bf05c7 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -40,6 +40,7 @@ type Querier interface { GetNodeByPubKey(ctx context.Context, arg GetNodeByPubKeyParams) (Node, error) GetNodeFeatures(ctx context.Context, nodeID int64) ([]NodeFeature, error) GetNodeFeaturesByPubKey(ctx context.Context, arg GetNodeFeaturesByPubKeyParams) ([]int32, error) + GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByLastUpdateRangeParams) ([]Node, error) InsertAMPSubInvoice(ctx context.Context, arg InsertAMPSubInvoiceParams) error InsertAMPSubInvoiceHTLC(ctx context.Context, arg InsertAMPSubInvoiceHTLCParams) error InsertInvoice(ctx context.Context, arg InsertInvoiceParams) (int64, error) diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index 390870086..5c8c9b800 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -82,6 +82,12 @@ LEFT JOIN node_addresses a ON a.node_id = n.id WHERE n.pub_key = $1 AND n.version = $2 ORDER BY a.type ASC, a.position ASC; +-- name: GetNodesByLastUpdateRange :many +SELECT * +FROM nodes +WHERE last_update >= @start_time + AND last_update < @end_time; + -- name: DeleteNodeAddresses :exec DELETE FROM node_addresses WHERE node_id = $1;