diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index eb04ef20b..cf3ef3deb 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -3291,7 +3291,7 @@ func TestPruneGraphNodes(t *testing.T) { t.Parallel() ctx := context.Background() - graph := MakeTestGraph(t) + graph := MakeTestGraphNew(t) // We'll start off by inserting our source node, to ensure that it's // the only node left after we prune the graph. diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 6d2dd85ae..c3f9541c7 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -63,7 +63,9 @@ type SQLQueries interface { ListNodesPaginated(ctx context.Context, arg sqlc.ListNodesPaginatedParams) ([]sqlc.Node, error) ListNodeIDsAndPubKeys(ctx context.Context, arg sqlc.ListNodeIDsAndPubKeysParams) ([]sqlc.ListNodeIDsAndPubKeysRow, error) IsPublicV1Node(ctx context.Context, pubKey []byte) (bool, error) + GetUnconnectedNodes(ctx context.Context) ([]sqlc.GetUnconnectedNodesRow, error) DeleteNodeByPubKey(ctx context.Context, arg sqlc.DeleteNodeByPubKeyParams) (sql.Result, error) + DeleteNode(ctx context.Context, id int64) error GetExtraNodeTypes(ctx context.Context, nodeID int64) ([]sqlc.NodeExtraType, error) UpsertNodeExtraType(ctx context.Context, arg sqlc.UpsertNodeExtraTypeParams) error @@ -2200,6 +2202,70 @@ func (s *SQLStore) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo) ([]uint64, return newChanIDs, knownZombies, nil } +// PruneGraphNodes is a garbage collection method which attempts to prune out +// any nodes from the channel graph that are currently unconnected. This ensure +// that we only maintain a graph of reachable nodes. In the event that a pruned +// node gains more channels, it will be re-added back to the graph. +// +// NOTE: this prunes nodes across protocol versions. It will never prune the +// source nodes. +// +// NOTE: part of the V1Store interface. +func (s *SQLStore) PruneGraphNodes() ([]route.Vertex, error) { + var ctx = context.TODO() + + var prunedNodes []route.Vertex + err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error { + var err error + prunedNodes, err = s.pruneGraphNodes(ctx, db) + + return err + }, func() { + prunedNodes = nil + }) + if err != nil { + return nil, fmt.Errorf("unable to prune nodes: %w", err) + } + + return prunedNodes, nil +} + +// pruneGraphNodes deletes any node in the DB that doesn't have a channel. +// +// NOTE: this prunes nodes across protocol versions. It will never prune the +// source nodes. +func (s *SQLStore) pruneGraphNodes(ctx context.Context, + db SQLQueries) ([]route.Vertex, error) { + + // Fetch all un-connected nodes from the database. + // NOTE: this will not include any nodes that are listed in the + // source table. + nodes, err := db.GetUnconnectedNodes(ctx) + if err != nil { + return nil, fmt.Errorf("unable to fetch unconnected nodes: %w", + err) + } + + prunedNodes := make([]route.Vertex, 0, len(nodes)) + for _, node := range nodes { + // TODO(elle): update to use sqlc.slice() once that works. + if err = db.DeleteNode(ctx, node.ID); err != nil { + return nil, fmt.Errorf("unable to delete "+ + "node(id=%d): %w", node.ID, err) + } + + pubKey, err := route.NewVertexFromBytes(node.PubKey) + if err != nil { + return nil, fmt.Errorf("unable to parse pubkey "+ + "for node(id=%d): %w", node.ID, err) + } + + prunedNodes = append(prunedNodes, pubKey) + } + + return prunedNodes, nil +} + // forEachNodeDirectedChannel iterates through all channels of a given // node, executing the passed callback on the directed edge representing the // channel and its incoming policy. If the node is not found, no error is diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 6463885cd..b25472aa4 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -149,6 +149,16 @@ func (q *Queries) DeleteExtraNodeType(ctx context.Context, arg DeleteExtraNodeTy return err } +const deleteNode = `-- name: DeleteNode :exec +DELETE FROM nodes +WHERE id = $1 +` + +func (q *Queries) DeleteNode(ctx context.Context, id int64) error { + _, err := q.db.ExecContext(ctx, deleteNode, id) + return err +} + const deleteNodeAddresses = `-- name: DeleteNodeAddresses :exec DELETE FROM node_addresses WHERE node_id = $1 @@ -1219,6 +1229,51 @@ func (q *Queries) GetSourceNodesByVersion(ctx context.Context, version int16) ([ return items, nil } +const getUnconnectedNodes = `-- name: GetUnconnectedNodes :many +SELECT n.id, n.pub_key +FROM nodes n +WHERE NOT EXISTS ( + SELECT 1 + FROM channels c + WHERE c.node_id_1 = n.id OR c.node_id_2 = n.id +) +AND NOT EXISTS ( + SELECT 1 + FROM source_nodes sn + WHERE sn.node_id = n.id +) +` + +type GetUnconnectedNodesRow struct { + ID int64 + PubKey []byte +} + +// Select all nodes that do not have any channels. +// Ignore any of our source nodes. +func (q *Queries) GetUnconnectedNodes(ctx context.Context) ([]GetUnconnectedNodesRow, error) { + rows, err := q.db.QueryContext(ctx, getUnconnectedNodes) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetUnconnectedNodesRow + for rows.Next() { + var i GetUnconnectedNodesRow + if err := rows.Scan(&i.ID, &i.PubKey); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getV1DisabledSCIDs = `-- name: GetV1DisabledSCIDs :many SELECT c.scid FROM channels c diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index 83ee30a8a..4b8b49a2d 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -21,6 +21,7 @@ type Querier interface { DeleteChannelPolicyExtraTypes(ctx context.Context, channelPolicyID int64) error DeleteExtraNodeType(ctx context.Context, arg DeleteExtraNodeTypeParams) error DeleteInvoice(ctx context.Context, arg DeleteInvoiceParams) (sql.Result, error) + DeleteNode(ctx context.Context, id int64) error DeleteNodeAddresses(ctx context.Context, nodeID int64) error DeleteNodeByPubKey(ctx context.Context, arg DeleteNodeByPubKeyParams) (sql.Result, error) DeleteNodeFeature(ctx context.Context, arg DeleteNodeFeatureParams) error @@ -60,6 +61,9 @@ type Querier interface { GetPublicV1ChannelsBySCID(ctx context.Context, arg GetPublicV1ChannelsBySCIDParams) ([]Channel, error) GetSCIDByOutpoint(ctx context.Context, arg GetSCIDByOutpointParams) ([]byte, error) GetSourceNodesByVersion(ctx context.Context, version int16) ([]GetSourceNodesByVersionRow, error) + // Select all nodes that do not have any channels. + // Ignore any of our source nodes. + GetUnconnectedNodes(ctx context.Context) ([]GetUnconnectedNodesRow, error) // NOTE: this is V1 specific since for V1, disabled is a // simple, single boolean. The proposed V2 policy // structure will have a more complex disabled bit vector diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index ca40db176..c0ac68121 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -65,11 +65,31 @@ SELECT EXISTS ( AND n.pub_key = $1 ); +-- name: GetUnconnectedNodes :many +SELECT n.id, n.pub_key +FROM nodes n +-- Select all nodes that do not have any channels. +WHERE NOT EXISTS ( + SELECT 1 + FROM channels c + WHERE c.node_id_1 = n.id OR c.node_id_2 = n.id +) +-- Ignore any of our source nodes. +AND NOT EXISTS ( + SELECT 1 + FROM source_nodes sn + WHERE sn.node_id = n.id +); + -- name: DeleteNodeByPubKey :execresult DELETE FROM nodes WHERE pub_key = $1 AND version = $2; +-- name: DeleteNode :exec +DELETE FROM nodes +WHERE id = $1; + /* ───────────────────────────────────────────── node_features table queries ─────────────────────────────────────────────