From 4cc5cfef2a06ddae5efa9f21ed50b1c6dbe9bea5 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 5 Aug 2025 11:44:53 +0200 Subject: [PATCH 1/9] graph/db: add ChanUpdatesInHorizon benchmark --- graph/db/benchmark_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/graph/db/benchmark_test.go b/graph/db/benchmark_test.go index 08dba16de..1022197c5 100644 --- a/graph/db/benchmark_test.go +++ b/graph/db/benchmark_test.go @@ -736,6 +736,15 @@ func BenchmarkGraphReadMethods(b *testing.B) { require.NoError(b, err) }, }, + { + name: "ChanUpdatesInHorizon", + fn: func(b testing.TB, store V1Store) { + _, err := store.ChanUpdatesInHorizon( + time.Unix(0, 0), time.Now(), + ) + require.NoError(b, err) + }, + }, } for _, test := range tests { From f51adaf31fdc6473fbcd91a41e7f05b997e98fc3 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 5 Aug 2025 13:51:13 +0200 Subject: [PATCH 2/9] graph/db: refactor makeZombiePubkeys Let the helper method only take the params it needs so that we dont need to construct an entire models.ChannelEdgeInfo object to pass to it. This will be useful later on. --- graph/db/kv_store.go | 13 +++++++------ graph/db/sql_store.go | 3 ++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index 49191d7c2..1abe34ba4 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -2789,7 +2789,8 @@ func (c *KVStore) delChannelEdgeUnsafe(edges, edgeIndex, chanIndex, } nodeKey1, nodeKey2 = makeZombiePubkeys( - &edgeInfo, e1UpdateTime, e2UpdateTime, + edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes, + e1UpdateTime, e2UpdateTime, ) } @@ -2814,27 +2815,27 @@ func (c *KVStore) delChannelEdgeUnsafe(edges, edgeIndex, chanIndex, // the channel. If the channel were to be marked zombie again, it would be // marked with the correct lagging channel since we received an update from only // one side. -func makeZombiePubkeys(info *models.ChannelEdgeInfo, - e1, e2 *time.Time) ([33]byte, [33]byte) { +func makeZombiePubkeys(node1, node2 [33]byte, e1, e2 *time.Time) ([33]byte, + [33]byte) { switch { // If we don't have either edge policy, we'll return both pubkeys so // that the channel can be resurrected by either party. case e1 == nil && e2 == nil: - return info.NodeKey1Bytes, info.NodeKey2Bytes + return node1, node2 // If we're missing edge1, or if both edges are present but edge1 is // older, we'll return edge1's pubkey and a blank pubkey for edge2. This // means that only an update from edge1 will be able to resurrect the // channel. case e1 == nil || (e2 != nil && e1.Before(*e2)): - return info.NodeKey1Bytes, [33]byte{} + return node1, [33]byte{} // Otherwise, we're missing edge2 or edge2 is the older side, so we // return a blank pubkey for edge1. In this case, only an update from // edge2 can resurect the channel. default: - return [33]byte{}, info.NodeKey2Bytes + return [33]byte{}, node1 } } diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 7a7987855..c991c3b30 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -1680,7 +1680,8 @@ func (s *SQLStore) DeleteChannelEdges(strictZombiePruning, markZombie bool, } nodeKey1, nodeKey2 = makeZombiePubkeys( - info, e1UpdateTime, e2UpdateTime, + info.NodeKey1Bytes, info.NodeKey2Bytes, + e1UpdateTime, e2UpdateTime, ) } From ce3401ee5d7450829f20866ecae6e49a0ebb04fd Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 5 Aug 2025 13:53:40 +0200 Subject: [PATCH 3/9] graph/db: refactor buildNode to not take a pointer Since the type returned from the DB is not a pointer. This will be useful later on. --- graph/db/sql_migration.go | 2 +- graph/db/sql_store.go | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go index e1ff256a2..8a529765b 100644 --- a/graph/db/sql_migration.go +++ b/graph/db/sql_migration.go @@ -192,7 +192,7 @@ func migrateNodes(ctx context.Context, kvBackend kvdb.Backend, pub, id, dbNode.ID) } - migratedNode, err := buildNode(ctx, sqlDB, &dbNode) + migratedNode, err := buildNode(ctx, sqlDB, dbNode) if err != nil { return fmt.Errorf("could not build migrated node "+ "from dbNode(db id: %d, node pub: %x): %w", diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index c991c3b30..95b4ad2ad 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -3231,7 +3231,7 @@ func getNodeByPubKey(ctx context.Context, db SQLQueries, return 0, nil, fmt.Errorf("unable to fetch node: %w", err) } - node, err := buildNode(ctx, db, &dbNode) + node, err := buildNode(ctx, db, dbNode) if err != nil { return 0, nil, fmt.Errorf("unable to build node: %w", err) } @@ -3256,7 +3256,7 @@ func buildCacheableChannelInfo(scid []byte, capacity int64, node1Pub, // record. The node's features, addresses and extra signed fields are also // fetched from the database and set on the node. func buildNode(ctx context.Context, db SQLQueries, - dbNode *sqlc.GraphNode) (*models.LightningNode, error) { + dbNode sqlc.GraphNode) (*models.LightningNode, error) { // NOTE: buildNode is only used to load the data for a single node, and // so no paged queries will be performed. This means that it's ok to @@ -3276,7 +3276,7 @@ func buildNode(ctx context.Context, db SQLQueries, // from the provided sqlc.GraphNode and batchNodeData. If the node does have // features/addresses/extra fields, then the corresponding fields are expected // to be present in the batchNodeData. -func buildNodeWithBatchData(dbNode *sqlc.GraphNode, +func buildNodeWithBatchData(dbNode sqlc.GraphNode, batchData *batchNodeData) (*models.LightningNode, error) { if dbNode.Version != int16(ProtocolV1) { @@ -3364,7 +3364,7 @@ func forEachNodeInBatch(ctx context.Context, cfg *sqldb.QueryConfig, } for _, dbNode := range nodes { - node, err := buildNodeWithBatchData(&dbNode, batchData) + node, err := buildNodeWithBatchData(dbNode, batchData) if err != nil { return fmt.Errorf("unable to build node(id=%d): %w", dbNode.ID, err) @@ -4235,12 +4235,12 @@ func buildNodes(ctx context.Context, db SQLQueries, dbNode1, dbNode2 sqlc.GraphNode) (*models.LightningNode, *models.LightningNode, error) { - node1, err := buildNode(ctx, db, &dbNode1) + node1, err := buildNode(ctx, db, dbNode1) if err != nil { return nil, nil, err } - node2, err := buildNode(ctx, db, &dbNode2) + node2, err := buildNode(ctx, db, dbNode2) if err != nil { return nil, nil, err } @@ -5090,7 +5090,7 @@ func forEachNodePaginated(ctx context.Context, cfg *sqldb.QueryConfig, processItem := func(ctx context.Context, dbNode sqlc.GraphNode, batchData *batchNodeData) error { - node, err := buildNodeWithBatchData(&dbNode, batchData) + node, err := buildNodeWithBatchData(dbNode, batchData) if err != nil { return fmt.Errorf("unable to build "+ "node(id=%d): %w", dbNode.ID, err) From ebe6a8af9f1f3d0b2e52252a5f7b3dc557221c67 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 5 Aug 2025 12:08:26 +0200 Subject: [PATCH 4/9] graph/db: use batch loading for ChanUpdatesInHorizon MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The following performance gains were measured using the new benchmark test. ``` name old time/op new time/op delta ChanUpdatesInHorizon-native-sqlite-10 18.5s ± 3% 2.0s ± 5% -89.11% (p=0.000 n=9+9) ChanUpdatesInHorizon-native-postgres-10 59.0s ± 3% 0.8s ±10% -98.65% (p=0.000 n=10+9) ``` --- graph/db/sql_store.go | 210 ++++++++++++++++++++++++++++++---------- sqldb/sqlc/db_custom.go | 34 +++++++ 2 files changed, 193 insertions(+), 51 deletions(-) diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 95b4ad2ad..d6d7c54ad 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -911,6 +911,7 @@ func (s *SQLStore) ChanUpdatesInHorizon(startTime, edges []ChannelEdge hits int ) + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { rows, err := db.GetChannelsByPolicyLastUpdateRange( ctx, sqlc.GetChannelsByPolicyLastUpdateRangeParams{ @@ -923,72 +924,61 @@ func (s *SQLStore) ChanUpdatesInHorizon(startTime, return err } + if len(rows) == 0 { + return nil + } + + // We'll pre-allocate the slices and maps here with a best + // effort size in order to avoid unnecessary allocations later + // on. + uncachedRows := make( + []sqlc.GetChannelsByPolicyLastUpdateRangeRow, 0, + len(rows), + ) + edgesToCache = make(map[uint64]ChannelEdge, len(rows)) + edgesSeen = make(map[uint64]struct{}, len(rows)) + edges = make([]ChannelEdge, 0, len(rows)) + + // Separate cached from non-cached channels since we will only + // batch load the data for the ones we haven't cached yet. for _, row := range rows { - // If we've already retrieved the info and policies for - // this edge, then we can skip it as we don't need to do - // so again. chanIDInt := byteOrder.Uint64(row.GraphChannel.Scid) + + // Skip duplicates. if _, ok := edgesSeen[chanIDInt]; ok { continue } + edgesSeen[chanIDInt] = struct{}{} + // Check cache first. if channel, ok := s.chanCache.get(chanIDInt); ok { hits++ - edgesSeen[chanIDInt] = struct{}{} edges = append(edges, channel) - continue } - node1, node2, err := buildNodes( - ctx, db, row.GraphNode, row.GraphNode_2, - ) - if err != nil { - return err - } - - channel, err := getAndBuildEdgeInfo( - ctx, db, s.cfg.ChainHash, row.GraphChannel, - node1.PubKeyBytes, node2.PubKeyBytes, - ) - if err != nil { - return fmt.Errorf("unable to build channel "+ - "info: %w", err) - } - - dbPol1, dbPol2, err := extractChannelPolicies(row) - if err != nil { - return fmt.Errorf("unable to extract channel "+ - "policies: %w", err) - } - - p1, p2, err := getAndBuildChanPolicies( - ctx, db, dbPol1, dbPol2, channel.ChannelID, - node1.PubKeyBytes, node2.PubKeyBytes, - ) - if err != nil { - return fmt.Errorf("unable to build channel "+ - "policies: %w", err) - } - - edgesSeen[chanIDInt] = struct{}{} - chanEdge := ChannelEdge{ - Info: channel, - Policy1: p1, - Policy2: p2, - Node1: node1, - Node2: node2, - } - edges = append(edges, chanEdge) - edgesToCache[chanIDInt] = chanEdge + // Mark this row as one we need to batch load data for. + uncachedRows = append(uncachedRows, row) } + // If there are no uncached rows, then we can return early. + if len(uncachedRows) == 0 { + return nil + } + + // Batch load data for all uncached channels. + newEdges, err := batchBuildChannelEdges( + ctx, s.cfg, db, uncachedRows, + ) + if err != nil { + return fmt.Errorf("unable to batch build channel "+ + "edges: %w", err) + } + + edges = append(edges, newEdges...) + return nil - }, func() { - edgesSeen = make(map[uint64]struct{}) - edgesToCache = make(map[uint64]ChannelEdge) - edges = nil - }) + }, sqldb.NoOpReset) if err != nil { return nil, fmt.Errorf("unable to fetch channels: %w", err) } @@ -5298,3 +5288,121 @@ func buildDirectedChannel(chain chainhash.Hash, nodeID int64, return directedChannel, nil } + +// batchBuildChannelEdges builds a slice of ChannelEdge instances from the +// provided rows. It uses batch loading for channels, policies, and nodes. +func batchBuildChannelEdges[T sqlc.ChannelAndNodes](ctx context.Context, + cfg *SQLStoreConfig, db SQLQueries, rows []T) ([]ChannelEdge, error) { + + var ( + channelIDs = make([]int64, len(rows)) + policyIDs = make([]int64, 0, len(rows)*2) + nodeIDs = make([]int64, 0, len(rows)*2) + + // nodeIDSet is used to ensure we only collect unique node IDs. + nodeIDSet = make(map[int64]bool) + + // edges will hold the final channel edges built from the rows. + edges = make([]ChannelEdge, 0, len(rows)) + ) + + // Collect all IDs needed for batch loading. + for i, row := range rows { + channelIDs[i] = row.Channel().ID + + // Collect policy IDs + dbPol1, dbPol2, err := extractChannelPolicies(row) + if err != nil { + return nil, fmt.Errorf("unable to extract channel "+ + "policies: %w", err) + } + if dbPol1 != nil { + policyIDs = append(policyIDs, dbPol1.ID) + } + if dbPol2 != nil { + policyIDs = append(policyIDs, dbPol2.ID) + } + + var ( + node1ID = row.Node1().ID + node2ID = row.Node2().ID + ) + + // Collect unique node IDs. + if !nodeIDSet[node1ID] { + nodeIDs = append(nodeIDs, node1ID) + nodeIDSet[node1ID] = true + } + + if !nodeIDSet[node2ID] { + nodeIDs = append(nodeIDs, node2ID) + nodeIDSet[node2ID] = true + } + } + + // Batch the data for all the channels and policies. + channelBatchData, err := batchLoadChannelData( + ctx, cfg.QueryCfg, db, channelIDs, policyIDs, + ) + if err != nil { + return nil, fmt.Errorf("unable to batch load channel and "+ + "policy data: %w", err) + } + + // Batch the data for all the nodes. + nodeBatchData, err := batchLoadNodeData(ctx, cfg.QueryCfg, db, nodeIDs) + if err != nil { + return nil, fmt.Errorf("unable to batch load node data: %w", + err) + } + + // Build all channel edges using batch data. + for _, row := range rows { + // Build nodes using batch data. + node1, err := buildNodeWithBatchData(row.Node1(), nodeBatchData) + if err != nil { + return nil, fmt.Errorf("unable to build node1: %w", err) + } + + node2, err := buildNodeWithBatchData(row.Node2(), nodeBatchData) + if err != nil { + return nil, fmt.Errorf("unable to build node2: %w", err) + } + + // Build channel info using batch data. + channel, err := buildEdgeInfoWithBatchData( + cfg.ChainHash, row.Channel(), node1.PubKeyBytes, + node2.PubKeyBytes, channelBatchData, + ) + if err != nil { + return nil, fmt.Errorf("unable to build channel "+ + "info: %w", err) + } + + // Extract and build policies using batch data. + dbPol1, dbPol2, err := extractChannelPolicies(row) + if err != nil { + return nil, fmt.Errorf("unable to extract channel "+ + "policies: %w", err) + } + + p1, p2, err := buildChanPoliciesWithBatchData( + dbPol1, dbPol2, channel.ChannelID, + node1.PubKeyBytes, node2.PubKeyBytes, channelBatchData, + ) + if err != nil { + return nil, fmt.Errorf("unable to build channel "+ + "policies: %w", err) + } + + edges = append(edges, ChannelEdge{ + Info: channel, + Policy1: p1, + Policy2: p2, + Node1: node1, + Node2: node2, + }) + } + + return edges, nil +} diff --git a/sqldb/sqlc/db_custom.go b/sqldb/sqlc/db_custom.go index 2490e5feb..64440a262 100644 --- a/sqldb/sqlc/db_custom.go +++ b/sqldb/sqlc/db_custom.go @@ -37,3 +37,37 @@ func makeQueryParams(numTotalArgs, numListArgs int) string { return b.String() } + +// ChannelAndNodes is an interface that provides access to a channel and its +// two nodes. +type ChannelAndNodes interface { + // Channel returns the GraphChannel associated with this interface. + Channel() GraphChannel + + // Node1 returns the first GraphNode associated with this channel. + Node1() GraphNode + + // Node2 returns the second GraphNode associated with this channel. + Node2() GraphNode +} + +// Channel returns the GraphChannel associated with this interface. +// +// NOTE: This method is part of the ChannelAndNodes interface. +func (r GetChannelsByPolicyLastUpdateRangeRow) Channel() GraphChannel { + return r.GraphChannel +} + +// Node1 returns the first GraphNode associated with this channel. +// +// NOTE: This method is part of the ChannelAndNodes interface. +func (r GetChannelsByPolicyLastUpdateRangeRow) Node1() GraphNode { + return r.GraphNode +} + +// Node2 returns the second GraphNode associated with this channel. +// +// NOTE: This method is part of the ChannelAndNodes interface. +func (r GetChannelsByPolicyLastUpdateRangeRow) Node2() GraphNode { + return r.GraphNode_2 +} From 556af8e22161a717ddfa8c6ea34a54de8943c42a Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 5 Aug 2025 14:04:35 +0200 Subject: [PATCH 5/9] graph/db: use batch fetching for DeleteChannelEdges --- graph/db/sql_store.go | 186 ++++++++++++++++++++++++++-------------- sqldb/sqlc/db_custom.go | 34 ++++++++ 2 files changed, 156 insertions(+), 64 deletions(-) diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index d6d7c54ad..cd960b166 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -1615,11 +1615,12 @@ func (s *SQLStore) DeleteChannelEdges(strictZombiePruning, markZombie bool, } var ( - ctx = context.TODO() - deleted []*models.ChannelEdgeInfo + ctx = context.TODO() + edges []*models.ChannelEdgeInfo ) err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error { - chanIDsToDelete := make([]int64, 0, len(chanIDs)) + // First, collect all channel rows. + var channelRows []sqlc.GetChannelsBySCIDWithPoliciesRow chanCallBack := func(ctx context.Context, row sqlc.GetChannelsBySCIDWithPoliciesRow) error { @@ -1628,65 +1629,7 @@ func (s *SQLStore) DeleteChannelEdges(strictZombiePruning, markZombie bool, scid := byteOrder.Uint64(row.GraphChannel.Scid) delete(chanLookup, scid) - node1, node2, err := buildNodeVertices( - row.GraphNode.PubKey, row.GraphNode_2.PubKey, - ) - if err != nil { - return err - } - - info, err := getAndBuildEdgeInfo( - ctx, db, s.cfg.ChainHash, row.GraphChannel, - node1, node2, - ) - if err != nil { - return err - } - - deleted = append(deleted, info) - chanIDsToDelete = append( - chanIDsToDelete, row.GraphChannel.ID, - ) - - if !markZombie { - return nil - } - - nodeKey1, nodeKey2 := info.NodeKey1Bytes, - info.NodeKey2Bytes - if strictZombiePruning { - var e1UpdateTime, e2UpdateTime *time.Time - if row.Policy1LastUpdate.Valid { - e1Time := time.Unix( - row.Policy1LastUpdate.Int64, 0, - ) - e1UpdateTime = &e1Time - } - if row.Policy2LastUpdate.Valid { - e2Time := time.Unix( - row.Policy2LastUpdate.Int64, 0, - ) - e2UpdateTime = &e2Time - } - - nodeKey1, nodeKey2 = makeZombiePubkeys( - info.NodeKey1Bytes, info.NodeKey2Bytes, - e1UpdateTime, e2UpdateTime, - ) - } - - err = db.UpsertZombieChannel( - ctx, sqlc.UpsertZombieChannelParams{ - Version: int16(ProtocolV1), - Scid: channelIDToBytes(scid), - NodeKey1: nodeKey1[:], - NodeKey2: nodeKey2[:], - }, - ) - if err != nil { - return fmt.Errorf("unable to mark channel as "+ - "zombie: %w", err) - } + channelRows = append(channelRows, row) return nil } @@ -1702,9 +1645,37 @@ func (s *SQLStore) DeleteChannelEdges(strictZombiePruning, markZombie bool, return ErrEdgeNotFound } + if len(channelRows) == 0 { + return nil + } + + // Batch build all channel edges. + var chanIDsToDelete []int64 + edges, chanIDsToDelete, err = batchBuildChannelInfo( + ctx, s.cfg, db, channelRows, + ) + if err != nil { + return err + } + + if markZombie { + for i, row := range channelRows { + scid := byteOrder.Uint64(row.GraphChannel.Scid) + + err := handleZombieMarking( + ctx, db, row, edges[i], + strictZombiePruning, scid, + ) + if err != nil { + return fmt.Errorf("unable to mark "+ + "channel as zombie: %w", err) + } + } + } + return s.deleteChannels(ctx, db, chanIDsToDelete) }, func() { - deleted = nil + edges = nil // Re-fill the lookup map. for _, chanID := range chanIDs { @@ -1721,7 +1692,7 @@ func (s *SQLStore) DeleteChannelEdges(strictZombiePruning, markZombie bool, s.chanCache.remove(chanID) } - return deleted, nil + return edges, nil } // FetchChannelEdgesByID attempts to lookup the two directed edges for the @@ -5406,3 +5377,90 @@ func batchBuildChannelEdges[T sqlc.ChannelAndNodes](ctx context.Context, return edges, nil } + +// batchBuildChannelInfo builds a slice of models.ChannelEdgeInfo +// instances from the provided rows using batch loading for channel data. +func batchBuildChannelInfo[T sqlc.ChannelAndNodeIDs](ctx context.Context, + cfg *SQLStoreConfig, db SQLQueries, rows []T) ( + []*models.ChannelEdgeInfo, []int64, error) { + + if len(rows) == 0 { + return nil, nil, nil + } + + // Collect all the channel IDs needed for batch loading. + channelIDs := make([]int64, len(rows)) + for i, row := range rows { + channelIDs[i] = row.Channel().ID + } + + // Batch load the channel data. + channelBatchData, err := batchLoadChannelData( + ctx, cfg.QueryCfg, db, channelIDs, nil, + ) + if err != nil { + return nil, nil, fmt.Errorf("unable to batch load channel "+ + "data: %w", err) + } + + // Build all channel edges using batch data. + edges := make([]*models.ChannelEdgeInfo, 0, len(rows)) + for _, row := range rows { + node1, node2, err := buildNodeVertices( + row.Node1Pub(), row.Node2Pub(), + ) + if err != nil { + return nil, nil, err + } + + // Build channel info using batch data + info, err := buildEdgeInfoWithBatchData( + cfg.ChainHash, row.Channel(), node1, node2, + channelBatchData, + ) + if err != nil { + return nil, nil, err + } + + edges = append(edges, info) + } + + return edges, channelIDs, nil +} + +// handleZombieMarking is a helper function that handles the logic of +// marking a channel as a zombie in the database. It takes into account whether +// we are in strict zombie pruning mode, and adjusts the node public keys +// accordingly based on the last update timestamps of the channel policies. +func handleZombieMarking(ctx context.Context, db SQLQueries, + row sqlc.GetChannelsBySCIDWithPoliciesRow, info *models.ChannelEdgeInfo, + strictZombiePruning bool, scid uint64) error { + + nodeKey1, nodeKey2 := info.NodeKey1Bytes, info.NodeKey2Bytes + + if strictZombiePruning { + var e1UpdateTime, e2UpdateTime *time.Time + if row.Policy1LastUpdate.Valid { + e1Time := time.Unix(row.Policy1LastUpdate.Int64, 0) + e1UpdateTime = &e1Time + } + if row.Policy2LastUpdate.Valid { + e2Time := time.Unix(row.Policy2LastUpdate.Int64, 0) + e2UpdateTime = &e2Time + } + + nodeKey1, nodeKey2 = makeZombiePubkeys( + info.NodeKey1Bytes, info.NodeKey2Bytes, e1UpdateTime, + e2UpdateTime, + ) + } + + return db.UpsertZombieChannel( + ctx, sqlc.UpsertZombieChannelParams{ + Version: int16(ProtocolV1), + Scid: channelIDToBytes(scid), + NodeKey1: nodeKey1[:], + NodeKey2: nodeKey2[:], + }, + ) +} diff --git a/sqldb/sqlc/db_custom.go b/sqldb/sqlc/db_custom.go index 64440a262..823002817 100644 --- a/sqldb/sqlc/db_custom.go +++ b/sqldb/sqlc/db_custom.go @@ -71,3 +71,37 @@ func (r GetChannelsByPolicyLastUpdateRangeRow) Node1() GraphNode { func (r GetChannelsByPolicyLastUpdateRangeRow) Node2() GraphNode { return r.GraphNode_2 } + +// ChannelAndNodeIDs is an interface that provides access to a channel and its +// two node public keys. +type ChannelAndNodeIDs interface { + // Channel returns the GraphChannel associated with this interface. + Channel() GraphChannel + + // Node1Pub returns the public key of the first node as a byte slice. + Node1Pub() []byte + + // Node2Pub returns the public key of the second node as a byte slice. + Node2Pub() []byte +} + +// Channel returns the GraphChannel associated with this interface. +// +// NOTE: This method is part of the ChannelAndNodeIDs interface. +func (r GetChannelsBySCIDWithPoliciesRow) Channel() GraphChannel { + return r.GraphChannel +} + +// Node1Pub returns the public key of the first node as a byte slice. +// +// NOTE: This method is part of the ChannelAndNodeIDs interface. +func (r GetChannelsBySCIDWithPoliciesRow) Node1Pub() []byte { + return r.GraphNode.PubKey +} + +// Node2Pub returns the public key of the second node as a byte slice. +// +// NOTE: This method is part of the ChannelAndNodeIDs interface. +func (r GetChannelsBySCIDWithPoliciesRow) Node2Pub() []byte { + return r.GraphNode_2.PubKey +} From 69bcf47dca2e1bd7f5476b91c96ebf8528d4d3d4 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 5 Aug 2025 14:07:16 +0200 Subject: [PATCH 6/9] graph/db: use batch loading for PruneGraph --- graph/db/sql_store.go | 51 +++++++++++++++++++++++------------------ sqldb/sqlc/db_custom.go | 21 +++++++++++++++++ 2 files changed, 50 insertions(+), 22 deletions(-) diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index cd960b166..f1b34b60e 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -2325,31 +2325,12 @@ func (s *SQLStore) PruneGraph(spentOutputs []*wire.OutPoint, prunedNodes []route.Vertex ) err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error { - var chansToDelete []int64 - - // Define the callback function for processing each channel. + // First, collect all channel rows that need to be pruned. + var channelRows []sqlc.GetChannelsByOutpointsRow channelCallback := func(ctx context.Context, row sqlc.GetChannelsByOutpointsRow) error { - node1, node2, err := buildNodeVertices( - row.Node1Pubkey, row.Node2Pubkey, - ) - if err != nil { - return err - } - - info, err := getAndBuildEdgeInfo( - ctx, db, s.cfg.ChainHash, row.GraphChannel, - node1, node2, - ) - if err != nil { - return err - } - - closedChans = append(closedChans, info) - chansToDelete = append( - chansToDelete, row.GraphChannel.ID, - ) + channelRows = append(channelRows, row) return nil } @@ -2362,6 +2343,32 @@ func (s *SQLStore) PruneGraph(spentOutputs []*wire.OutPoint, "outpoints: %w", err) } + if len(channelRows) == 0 { + // There are no channels to prune. So we can exit early + // after updating the prune log. + err = db.UpsertPruneLogEntry( + ctx, sqlc.UpsertPruneLogEntryParams{ + BlockHash: blockHash[:], + BlockHeight: int64(blockHeight), + }, + ) + if err != nil { + return fmt.Errorf("unable to insert prune log "+ + "entry: %w", err) + } + + return nil + } + + // Batch build all channel edges for pruning. + var chansToDelete []int64 + closedChans, chansToDelete, err = batchBuildChannelInfo( + ctx, s.cfg, db, channelRows, + ) + if err != nil { + return err + } + err = s.deleteChannels(ctx, db, chansToDelete) if err != nil { return fmt.Errorf("unable to delete channels: %w", err) diff --git a/sqldb/sqlc/db_custom.go b/sqldb/sqlc/db_custom.go index 823002817..2b378d4a8 100644 --- a/sqldb/sqlc/db_custom.go +++ b/sqldb/sqlc/db_custom.go @@ -105,3 +105,24 @@ func (r GetChannelsBySCIDWithPoliciesRow) Node1Pub() []byte { func (r GetChannelsBySCIDWithPoliciesRow) Node2Pub() []byte { return r.GraphNode_2.PubKey } + +// Channel returns the GraphChannel associated with this interface. +// +// NOTE: This method is part of the ChannelAndNodeIDs interface. +func (r GetChannelsByOutpointsRow) Channel() GraphChannel { + return r.GraphChannel +} + +// Node1Pub returns the public key of the first node as a byte slice. +// +// NOTE: This method is part of the ChannelAndNodeIDs interface. +func (r GetChannelsByOutpointsRow) Node1Pub() []byte { + return r.Node1Pubkey +} + +// Node2Pub returns the public key of the second node as a byte slice. +// +// NOTE: This method is part of the ChannelAndNodeIDs interface. +func (r GetChannelsByOutpointsRow) Node2Pub() []byte { + return r.Node2Pubkey +} From 594c842aebccaa46f524fd167bb4834263450c72 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 5 Aug 2025 14:09:43 +0200 Subject: [PATCH 7/9] graph/db: batch loading for DisconnectBlockAtHeight --- graph/db/sql_store.go | 38 ++++++++++++++++++++------------------ sqldb/sqlc/db_custom.go | 21 +++++++++++++++++++++ 2 files changed, 41 insertions(+), 18 deletions(-) diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index f1b34b60e..33e3e1c02 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -2627,27 +2627,29 @@ func (s *SQLStore) DisconnectBlockAtHeight(height uint32) ( return fmt.Errorf("unable to fetch channels: %w", err) } - chanIDsToDelete := make([]int64, len(rows)) - for i, row := range rows { - node1, node2, err := buildNodeVertices( - row.Node1PubKey, row.Node2PubKey, + if len(rows) == 0 { + // No channels to disconnect, but still clean up prune + // log. + return db.DeletePruneLogEntriesInRange( + ctx, sqlc.DeletePruneLogEntriesInRangeParams{ + StartHeight: int64(height), + EndHeight: int64( + endShortChanID.BlockHeight, + ), + }, ) - if err != nil { - return err - } - - channel, err := getAndBuildEdgeInfo( - ctx, db, s.cfg.ChainHash, row.GraphChannel, - node1, node2, - ) - if err != nil { - return err - } - - chanIDsToDelete[i] = row.GraphChannel.ID - removedChans = append(removedChans, channel) } + // Batch build all channel edges for disconnection. + channelEdges, chanIDsToDelete, err := batchBuildChannelInfo( + ctx, s.cfg, db, rows, + ) + if err != nil { + return err + } + + removedChans = channelEdges + err = s.deleteChannels(ctx, db, chanIDsToDelete) if err != nil { return fmt.Errorf("unable to delete channels: %w", err) diff --git a/sqldb/sqlc/db_custom.go b/sqldb/sqlc/db_custom.go index 2b378d4a8..7c9825e78 100644 --- a/sqldb/sqlc/db_custom.go +++ b/sqldb/sqlc/db_custom.go @@ -126,3 +126,24 @@ func (r GetChannelsByOutpointsRow) Node1Pub() []byte { func (r GetChannelsByOutpointsRow) Node2Pub() []byte { return r.Node2Pubkey } + +// Channel returns the GraphChannel associated with this interface. +// +// NOTE: This method is part of the ChannelAndNodeIDs interface. +func (r GetChannelsBySCIDRangeRow) Channel() GraphChannel { + return r.GraphChannel +} + +// Node1Pub returns the public key of the first node as a byte slice. +// +// NOTE: This method is part of the ChannelAndNodeIDs interface. +func (r GetChannelsBySCIDRangeRow) Node1Pub() []byte { + return r.Node1PubKey +} + +// Node2Pub returns the public key of the second node as a byte slice. +// +// NOTE: This method is part of the ChannelAndNodeIDs interface. +func (r GetChannelsBySCIDRangeRow) Node2Pub() []byte { + return r.Node2PubKey +} From 8de33fa601d5567db39db0e3d3f8d6fd43448578 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 5 Aug 2025 14:16:22 +0200 Subject: [PATCH 8/9] graph/db: batch fetching for FetchChanInfos --- graph/db/sql_store.go | 86 +++++++++++++---------------------------- sqldb/sqlc/db_custom.go | 14 +++++++ 2 files changed, 40 insertions(+), 60 deletions(-) diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 33e3e1c02..3276f67d7 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -2055,55 +2055,40 @@ func (s *SQLStore) FetchChanInfos(chanIDs []uint64) ([]ChannelEdge, error) { edges = make(map[uint64]ChannelEdge) ) err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + // First, collect all channel rows. + var channelRows []sqlc.GetChannelsBySCIDWithPoliciesRow chanCallBack := func(ctx context.Context, row sqlc.GetChannelsBySCIDWithPoliciesRow) error { - node1, node2, err := buildNodes( - ctx, db, row.GraphNode, row.GraphNode_2, - ) - if err != nil { - return fmt.Errorf("unable to fetch nodes: %w", - err) - } - - edge, err := getAndBuildEdgeInfo( - ctx, db, s.cfg.ChainHash, row.GraphChannel, - node1.PubKeyBytes, node2.PubKeyBytes, - ) - if err != nil { - return fmt.Errorf("unable to build "+ - "channel info: %w", err) - } - - dbPol1, dbPol2, err := extractChannelPolicies(row) - if err != nil { - return fmt.Errorf("unable to extract channel "+ - "policies: %w", err) - } - - p1, p2, err := getAndBuildChanPolicies( - ctx, db, dbPol1, dbPol2, edge.ChannelID, - node1.PubKeyBytes, node2.PubKeyBytes, - ) - if err != nil { - return fmt.Errorf("unable to build channel "+ - "policies: %w", err) - } - - edges[edge.ChannelID] = ChannelEdge{ - Info: edge, - Policy1: p1, - Policy2: p2, - Node1: node1, - Node2: node2, - } - + channelRows = append(channelRows, row) return nil } - return s.forEachChanWithPoliciesInSCIDList( + err := s.forEachChanWithPoliciesInSCIDList( ctx, db, chanCallBack, chanIDs, ) + if err != nil { + return err + } + + if len(channelRows) == 0 { + return nil + } + + // Batch build all channel edges. + chans, err := batchBuildChannelEdges( + ctx, s.cfg, db, channelRows, + ) + if err != nil { + return fmt.Errorf("unable to build channel edges: %w", + err) + } + + for _, c := range chans { + edges[c.Info.ChannelID] = c + } + + return err }, func() { clear(edges) }) @@ -4199,25 +4184,6 @@ func buildChanPolicy(dbPolicy sqlc.GraphChannelPolicy, channelID uint64, }, nil } -// buildNodes builds the models.LightningNode instances for the -// given row which is expected to be a sqlc type that contains node information. -func buildNodes(ctx context.Context, db SQLQueries, dbNode1, - dbNode2 sqlc.GraphNode) (*models.LightningNode, *models.LightningNode, - error) { - - node1, err := buildNode(ctx, db, dbNode1) - if err != nil { - return nil, nil, err - } - - node2, err := buildNode(ctx, db, dbNode2) - if err != nil { - return nil, nil, err - } - - return node1, node2, nil -} - // extractChannelPolicies extracts the sqlc.GraphChannelPolicy records from the give // row which is expected to be a sqlc type that contains channel policy // information. It returns two policies, which may be nil if the policy diff --git a/sqldb/sqlc/db_custom.go b/sqldb/sqlc/db_custom.go index 7c9825e78..f7bc49918 100644 --- a/sqldb/sqlc/db_custom.go +++ b/sqldb/sqlc/db_custom.go @@ -106,6 +106,20 @@ func (r GetChannelsBySCIDWithPoliciesRow) Node2Pub() []byte { return r.GraphNode_2.PubKey } +// Node1 returns the first GraphNode associated with this channel. +// +// NOTE: This method is part of the ChannelAndNodes interface. +func (r GetChannelsBySCIDWithPoliciesRow) Node1() GraphNode { + return r.GraphNode +} + +// Node2 returns the second GraphNode associated with this channel. +// +// NOTE: This method is part of the ChannelAndNodes interface. +func (r GetChannelsBySCIDWithPoliciesRow) Node2() GraphNode { + return r.GraphNode_2 +} + // Channel returns the GraphChannel associated with this interface. // // NOTE: This method is part of the ChannelAndNodeIDs interface. From 522e200c0e91883cb1483da63dfcbb6ac767fcd6 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 5 Aug 2025 14:33:54 +0200 Subject: [PATCH 9/9] graph/db: add missing counter increment --- graph/db/sql_migration.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go index 8a529765b..0b06e7cac 100644 --- a/graph/db/sql_migration.go +++ b/graph/db/sql_migration.go @@ -410,6 +410,8 @@ func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend, } channelCount++ + chunk++ + err = migrateSingleChannel( ctx, sqlDB, channel, policy1, policy2, migChanPolicy, )