From 81c54611c1d102d555c688e87d079997df5b7c58 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 13 Aug 2025 13:37:25 +0200 Subject: [PATCH] graph/db+sqldb: use batch fetching during channel&policy migration Restructue the `migrateChannelsAndPolicies` function so that it does the validation of migrated channels and policies in batches. So instead of fetching channel and its policies individually after migrating it, we wait for a minimum batch size to be reached and then validate a batch of them together. This lets us make way fewer DB round trips. --- graph/db/sql_migration.go | 260 ++++++++++++++++++++++++----------- graph/db/sql_store.go | 46 +++++++ sqldb/sqlc/graph.sql.go | 173 +++++++++++++++++++++++ sqldb/sqlc/querier.go | 1 + sqldb/sqlc/queries/graph.sql | 53 +++++++ 5 files changed, 452 insertions(+), 81 deletions(-) diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go index 72947bd3f..dcf664a1e 100644 --- a/graph/db/sql_migration.go +++ b/graph/db/sql_migration.go @@ -394,6 +394,24 @@ func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend, return nil } +// migChanInfo holds the information about a channel and its policies. +type migChanInfo struct { + // edge is the channel object as read from the KVDB source. + edge *models.ChannelEdgeInfo + + // policy1 is the first channel policy for the channel as read from + // the KVDB source. + policy1 *models.ChannelEdgePolicy + + // policy2 is the second channel policy for the channel as read + // from the KVDB source. + policy2 *models.ChannelEdgePolicy + + // dbInfo holds location info (in the form of DB IDs) of the channel + // and its policies in the native-SQL destination. + dbInfo *dbChanInfo +} + // migrateChannelsAndPolicies migrates all channels and their policies // from the KV backend to the SQL database. func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig, @@ -434,6 +452,11 @@ func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig, return nil } + // batch is used to collect migrated channel info that we will + // batch-validate. Each entry is indexed by the DB ID of the channel + // in the SQL database. + batch := make(map[int64]*migChanInfo, cfg.QueryCfg.MaxBatchSize) + // Iterate over each channel in the KV store and migrate it and its // policies to the SQL database. err := forEachChannel(kvBackend, func(channel *models.ChannelEdgeInfo, @@ -489,14 +512,47 @@ func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig, channelCount++ chunk++ - err = migrateSingleChannel( - ctx, cfg, sqlDB, channel, policy1, policy2, - migChanPolicy, - ) + // Migrate the channel info along with its policies. + dbChanInfo, err := insertChannel(ctx, sqlDB, channel) if err != nil { - return fmt.Errorf("could not migrate channel %d: %w", + return fmt.Errorf("could not insert record for "+ + "channel %d in SQL store: %w", scid, err) + } + + // Now, migrate the two channel policies for the channel. + err = migChanPolicy(policy1) + if err != nil { + return fmt.Errorf("could not migrate policy1(%d): %w", scid, err) } + err = migChanPolicy(policy2) + if err != nil { + return fmt.Errorf("could not migrate policy2(%d): %w", + scid, err) + } + + // Collect the migrated channel info and policies in a batch for + // later validation. + batch[dbChanInfo.channelID] = &migChanInfo{ + edge: channel, + policy1: policy1, + policy2: policy2, + dbInfo: dbChanInfo, + } + + if len(batch) >= cfg.QueryCfg.MaxBatchSize { + // Do batch validation. + err := validateMigratedChannels(ctx, cfg, sqlDB, batch) + if err != nil { + return fmt.Errorf("could not validate "+ + "channel batch: %w", err) + } + + batch = make( + map[int64]*migChanInfo, + cfg.QueryCfg.MaxBatchSize, + ) + } s.Do(func() { elapsed := time.Since(t0).Seconds() @@ -518,6 +574,17 @@ func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig, err) } + if len(batch) > 0 { + // Do a final batch validation for any remaining channels. + err := validateMigratedChannels(ctx, cfg, sqlDB, batch) + if err != nil { + return fmt.Errorf("could not validate final channel "+ + "batch: %w", err) + } + + batch = make(map[int64]*migChanInfo, cfg.QueryCfg.MaxBatchSize) + } + log.Infof("Migrated %d channels and %d policies from KV to SQL "+ "(skipped %d channels and %d policies due to invalid TLV "+ "streams)", channelCount, policyCount, skippedChanCount, @@ -526,70 +593,142 @@ func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig, return nil } -func migrateSingleChannel(ctx context.Context, cfg *SQLStoreConfig, - sqlDB SQLQueries, channel *models.ChannelEdgeInfo, - policy1, policy2 *models.ChannelEdgePolicy, - migChanPolicy func(*models.ChannelEdgePolicy) error) error { +// validateMigratedChannels validates the channels in the batch after they have +// been migrated to the SQL database. It batch fetches all channels by their IDs +// and compares the migrated channels and their policies with the original ones +// to ensure they match using batch construction patterns. +func validateMigratedChannels(ctx context.Context, cfg *SQLStoreConfig, + sqlDB SQLQueries, batch map[int64]*migChanInfo) error { - scid := channel.ChannelID - - // First, migrate the channel info along with its policies. - dbChanInfo, err := insertChannel(ctx, sqlDB, channel) - if err != nil { - return fmt.Errorf("could not insert record for channel %d "+ - "in SQL store: %w", scid, err) + // Convert batch keys (DB IDs) to an int slice for the batch query. + dbChanIDs := make([]int64, 0, len(batch)) + for id := range batch { + dbChanIDs = append(dbChanIDs, id) } - // Now, migrate the two channel policies. - err = migChanPolicy(policy1) + // Batch fetch all channels with their policies. + rows, err := sqlDB.GetChannelsByIDs(ctx, dbChanIDs) if err != nil { - return fmt.Errorf("could not migrate policy1(%d): %w", scid, - err) - } - err = migChanPolicy(policy2) - if err != nil { - return fmt.Errorf("could not migrate policy2(%d): %w", scid, + return fmt.Errorf("could not batch get channels by IDs: %w", err) } - // Now, fetch the channel and its policies from the SQL DB. - row, err := sqlDB.GetChannelBySCIDWithPolicies( - ctx, sqlc.GetChannelBySCIDWithPoliciesParams{ - Scid: channelIDToBytes(scid), - Version: int16(ProtocolV1), - }, + // Sanity check that the same number of channels were returned + // as requested. + if len(rows) != len(dbChanIDs) { + return fmt.Errorf("expected to fetch %d channels, "+ + "but got %d", len(dbChanIDs), len(rows)) + } + + // Collect all policy IDs needed for batch data loading. + dbPolicyIDs := make([]int64, 0, len(dbChanIDs)*2) + + for _, row := range rows { + scid := byteOrder.Uint64(row.GraphChannel.Scid) + + dbPol1, dbPol2, err := extractChannelPolicies(row) + if err != nil { + return fmt.Errorf("could not extract channel policies"+ + " for SCID %d: %w", scid, err) + } + if dbPol1 != nil { + dbPolicyIDs = append(dbPolicyIDs, dbPol1.ID) + } + if dbPol2 != nil { + dbPolicyIDs = append(dbPolicyIDs, dbPol2.ID) + } + } + + // Batch load all channel and policy data (features, extras). + batchData, err := batchLoadChannelData( + ctx, cfg.QueryCfg, sqlDB, dbChanIDs, dbPolicyIDs, ) if err != nil { - return fmt.Errorf("could not get channel by SCID(%d): %w", scid, - err) + return fmt.Errorf("could not batch load channel and policy "+ + "data: %w", err) } + // Validate each channel in the batch using pre-loaded data. + for _, row := range rows { + kvdbChan, ok := batch[row.GraphChannel.ID] + if !ok { + return fmt.Errorf("channel with ID %d not found "+ + "in batch", row.GraphChannel.ID) + } + + scid := byteOrder.Uint64(row.GraphChannel.Scid) + + err = validateMigratedChannelWithBatchData( + cfg, scid, kvdbChan, row, batchData, + ) + if err != nil { + return fmt.Errorf("channel %d validation failed "+ + "after migration: %w", scid, err) + } + } + + return nil +} + +// validateMigratedChannelWithBatchData validates a single migrated channel +// using pre-fetched batch data for optimal performance. +func validateMigratedChannelWithBatchData(cfg *SQLStoreConfig, + scid uint64, info *migChanInfo, row sqlc.GetChannelsByIDsRow, + batchData *batchChannelData) error { + + dbChanInfo := info.dbInfo + channel := info.edge + // Assert that the DB IDs for the channel and nodes are as expected // given the inserted channel info. - err = sqldb.CompareRecords( + err := sqldb.CompareRecords( dbChanInfo.channelID, row.GraphChannel.ID, "channel DB ID", ) if err != nil { return err } err = sqldb.CompareRecords( - dbChanInfo.node1ID, row.GraphNode.ID, "node1 DB ID", + dbChanInfo.node1ID, row.Node1ID, "node1 DB ID", ) if err != nil { return err } err = sqldb.CompareRecords( - dbChanInfo.node2ID, row.GraphNode_2.ID, "node2 DB ID", + dbChanInfo.node2ID, row.Node2ID, "node2 DB ID", ) if err != nil { return err } - migChan, migPol1, migPol2, err := getAndBuildChanAndPolicies( - ctx, cfg, sqlDB, row, + // Build node vertices from the row data. + node1, node2, err := buildNodeVertices( + row.Node1PubKey, row.Node2PubKey, ) if err != nil { - return fmt.Errorf("could not build migrated channel and "+ + return err + } + + // Build channel info using batch data. + migChan, err := buildEdgeInfoWithBatchData( + cfg.ChainHash, row.GraphChannel, node1, node2, batchData, + ) + if err != nil { + return fmt.Errorf("could not build migrated channel info: %w", + err) + } + + // Extract channel policies from the row. + dbPol1, dbPol2, err := extractChannelPolicies(row) + if err != nil { + return fmt.Errorf("could not extract channel policies: %w", err) + } + + // Build channel policies using batch data. + migPol1, migPol2, err := buildChanPoliciesWithBatchData( + dbPol1, dbPol2, scid, node1, node2, batchData, + ) + if err != nil { + return fmt.Errorf("could not build migrated channel "+ "policies: %w", err) } @@ -639,13 +778,13 @@ func migrateSingleChannel(ctx context.Context, cfg *SQLStoreConfig, ) } - err = checkPolicy(policy1, migPol1) + err = checkPolicy(info.policy1, migPol1) if err != nil { return fmt.Errorf("policy1 mismatch for channel %d: %w", scid, err) } - err = checkPolicy(policy2, migPol2) + err = checkPolicy(info.policy2, migPol2) if err != nil { return fmt.Errorf("policy2 mismatch for channel %d: %w", scid, err) @@ -778,47 +917,6 @@ func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend, return nil } -// getAndBuildChanAndPolicies is a helper that builds the channel edge info -// and policies from the given row returned by the SQL query -// GetChannelBySCIDWithPolicies. -func getAndBuildChanAndPolicies(ctx context.Context, cfg *SQLStoreConfig, - db SQLQueries, - row sqlc.GetChannelBySCIDWithPoliciesRow) (*models.ChannelEdgeInfo, - *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) { - - node1, node2, err := buildNodeVertices( - row.GraphNode.PubKey, row.GraphNode_2.PubKey, - ) - if err != nil { - return nil, nil, nil, err - } - - edge, err := getAndBuildEdgeInfo( - ctx, cfg, db, row.GraphChannel, node1, node2, - ) - if err != nil { - return nil, nil, nil, fmt.Errorf("unable to build channel "+ - "info: %w", err) - } - - dbPol1, dbPol2, err := extractChannelPolicies(row) - if err != nil { - return nil, nil, nil, fmt.Errorf("unable to extract channel "+ - "policies: %w", err) - } - - policy1, policy2, err := getAndBuildChanPolicies( - ctx, cfg.QueryCfg, db, dbPol1, dbPol2, edge.ChannelID, node1, - node2, - ) - if err != nil { - return nil, nil, nil, fmt.Errorf("unable to build channel "+ - "policies: %w", err) - } - - return edge, policy1, policy2, nil -} - // forEachPruneLogEntry iterates over each prune log entry in the KV // backend and calls the provided callback function for each entry. func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32, diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index de6c150b3..c80d32a16 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -98,6 +98,7 @@ type SQLQueries interface { GetChannelsBySCIDRange(ctx context.Context, arg sqlc.GetChannelsBySCIDRangeParams) ([]sqlc.GetChannelsBySCIDRangeRow, error) GetChannelBySCIDWithPolicies(ctx context.Context, arg sqlc.GetChannelBySCIDWithPoliciesParams) (sqlc.GetChannelBySCIDWithPoliciesRow, error) GetChannelsBySCIDWithPolicies(ctx context.Context, arg sqlc.GetChannelsBySCIDWithPoliciesParams) ([]sqlc.GetChannelsBySCIDWithPoliciesRow, error) + GetChannelsByIDs(ctx context.Context, ids []int64) ([]sqlc.GetChannelsByIDsRow, error) GetChannelAndNodesBySCID(ctx context.Context, arg sqlc.GetChannelAndNodesBySCIDParams) (sqlc.GetChannelAndNodesBySCIDRow, error) HighestSCID(ctx context.Context, version int16) ([]byte, error) ListChannelsByNodeID(ctx context.Context, arg sqlc.ListChannelsByNodeIDParams) ([]sqlc.ListChannelsByNodeIDRow, error) @@ -4519,6 +4520,51 @@ func extractChannelPolicies(row any) (*sqlc.GraphChannelPolicy, } return policy1, policy2, nil + + case sqlc.GetChannelsByIDsRow: + if r.Policy1ID.Valid { + policy1 = &sqlc.GraphChannelPolicy{ + ID: r.Policy1ID.Int64, + Version: r.Policy1Version.Int16, + ChannelID: r.GraphChannel.ID, + NodeID: r.Policy1NodeID.Int64, + Timelock: r.Policy1Timelock.Int32, + FeePpm: r.Policy1FeePpm.Int64, + BaseFeeMsat: r.Policy1BaseFeeMsat.Int64, + MinHtlcMsat: r.Policy1MinHtlcMsat.Int64, + MaxHtlcMsat: r.Policy1MaxHtlcMsat, + LastUpdate: r.Policy1LastUpdate, + InboundBaseFeeMsat: r.Policy1InboundBaseFeeMsat, + InboundFeeRateMilliMsat: r.Policy1InboundFeeRateMilliMsat, + Disabled: r.Policy1Disabled, + MessageFlags: r.Policy1MessageFlags, + ChannelFlags: r.Policy1ChannelFlags, + Signature: r.Policy1Signature, + } + } + if r.Policy2ID.Valid { + policy2 = &sqlc.GraphChannelPolicy{ + ID: r.Policy2ID.Int64, + Version: r.Policy2Version.Int16, + ChannelID: r.GraphChannel.ID, + NodeID: r.Policy2NodeID.Int64, + Timelock: r.Policy2Timelock.Int32, + FeePpm: r.Policy2FeePpm.Int64, + BaseFeeMsat: r.Policy2BaseFeeMsat.Int64, + MinHtlcMsat: r.Policy2MinHtlcMsat.Int64, + MaxHtlcMsat: r.Policy2MaxHtlcMsat, + LastUpdate: r.Policy2LastUpdate, + InboundBaseFeeMsat: r.Policy2InboundBaseFeeMsat, + InboundFeeRateMilliMsat: r.Policy2InboundFeeRateMilliMsat, + Disabled: r.Policy2Disabled, + MessageFlags: r.Policy2MessageFlags, + ChannelFlags: r.Policy2ChannelFlags, + Signature: r.Policy2Signature, + } + } + + return policy1, policy2, nil + default: return nil, nil, fmt.Errorf("unexpected row type in "+ "extractChannelPolicies: %T", r) diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 5a81e5217..eb5eecf9c 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -873,6 +873,179 @@ func (q *Queries) GetChannelPolicyExtraTypesBatch(ctx context.Context, policyIds return items, nil } +const getChannelsByIDs = `-- name: GetChannelsByIDs :many +SELECT + c.id, c.version, c.scid, c.node_id_1, c.node_id_2, c.outpoint, c.capacity, c.bitcoin_key_1, c.bitcoin_key_2, c.node_1_signature, c.node_2_signature, c.bitcoin_1_signature, c.bitcoin_2_signature, + + -- Minimal node data. + n1.id AS node1_id, + n1.pub_key AS node1_pub_key, + n2.id AS node2_id, + n2.pub_key AS node2_pub_key, + + -- Policy 1 + cp1.id AS policy1_id, + cp1.node_id AS policy1_node_id, + cp1.version AS policy1_version, + cp1.timelock AS policy1_timelock, + cp1.fee_ppm AS policy1_fee_ppm, + cp1.base_fee_msat AS policy1_base_fee_msat, + cp1.min_htlc_msat AS policy1_min_htlc_msat, + cp1.max_htlc_msat AS policy1_max_htlc_msat, + cp1.last_update AS policy1_last_update, + cp1.disabled AS policy1_disabled, + cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat, + cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat, + cp1.message_flags AS policy1_message_flags, + cp1.channel_flags AS policy1_channel_flags, + cp1.signature AS policy1_signature, + + -- Policy 2 + cp2.id AS policy2_id, + cp2.node_id AS policy2_node_id, + cp2.version AS policy2_version, + cp2.timelock AS policy2_timelock, + cp2.fee_ppm AS policy2_fee_ppm, + cp2.base_fee_msat AS policy2_base_fee_msat, + cp2.min_htlc_msat AS policy2_min_htlc_msat, + cp2.max_htlc_msat AS policy2_max_htlc_msat, + cp2.last_update AS policy2_last_update, + cp2.disabled AS policy2_disabled, + cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat, + cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat, + cp2.message_flags AS policy2_message_flags, + cp2.channel_flags AS policy2_channel_flags, + cp2.signature AS policy2_signature + +FROM graph_channels c + JOIN graph_nodes n1 ON c.node_id_1 = n1.id + JOIN graph_nodes n2 ON c.node_id_2 = n2.id + LEFT JOIN graph_channel_policies cp1 + ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version + LEFT JOIN graph_channel_policies cp2 + ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version +WHERE c.id IN (/*SLICE:ids*/?) +` + +type GetChannelsByIDsRow struct { + GraphChannel GraphChannel + Node1ID int64 + Node1PubKey []byte + Node2ID int64 + Node2PubKey []byte + Policy1ID sql.NullInt64 + Policy1NodeID sql.NullInt64 + Policy1Version sql.NullInt16 + Policy1Timelock sql.NullInt32 + Policy1FeePpm sql.NullInt64 + Policy1BaseFeeMsat sql.NullInt64 + Policy1MinHtlcMsat sql.NullInt64 + Policy1MaxHtlcMsat sql.NullInt64 + Policy1LastUpdate sql.NullInt64 + Policy1Disabled sql.NullBool + Policy1InboundBaseFeeMsat sql.NullInt64 + Policy1InboundFeeRateMilliMsat sql.NullInt64 + Policy1MessageFlags sql.NullInt16 + Policy1ChannelFlags sql.NullInt16 + Policy1Signature []byte + Policy2ID sql.NullInt64 + Policy2NodeID sql.NullInt64 + Policy2Version sql.NullInt16 + Policy2Timelock sql.NullInt32 + Policy2FeePpm sql.NullInt64 + Policy2BaseFeeMsat sql.NullInt64 + Policy2MinHtlcMsat sql.NullInt64 + Policy2MaxHtlcMsat sql.NullInt64 + Policy2LastUpdate sql.NullInt64 + Policy2Disabled sql.NullBool + Policy2InboundBaseFeeMsat sql.NullInt64 + Policy2InboundFeeRateMilliMsat sql.NullInt64 + Policy2MessageFlags sql.NullInt16 + Policy2ChannelFlags sql.NullInt16 + Policy2Signature []byte +} + +func (q *Queries) GetChannelsByIDs(ctx context.Context, ids []int64) ([]GetChannelsByIDsRow, error) { + query := getChannelsByIDs + var queryParams []interface{} + if len(ids) > 0 { + for _, v := range ids { + queryParams = append(queryParams, v) + } + query = strings.Replace(query, "/*SLICE:ids*/?", makeQueryParams(len(queryParams), len(ids)), 1) + } else { + query = strings.Replace(query, "/*SLICE:ids*/?", "NULL", 1) + } + rows, err := q.db.QueryContext(ctx, query, queryParams...) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetChannelsByIDsRow + for rows.Next() { + var i GetChannelsByIDsRow + if err := rows.Scan( + &i.GraphChannel.ID, + &i.GraphChannel.Version, + &i.GraphChannel.Scid, + &i.GraphChannel.NodeID1, + &i.GraphChannel.NodeID2, + &i.GraphChannel.Outpoint, + &i.GraphChannel.Capacity, + &i.GraphChannel.BitcoinKey1, + &i.GraphChannel.BitcoinKey2, + &i.GraphChannel.Node1Signature, + &i.GraphChannel.Node2Signature, + &i.GraphChannel.Bitcoin1Signature, + &i.GraphChannel.Bitcoin2Signature, + &i.Node1ID, + &i.Node1PubKey, + &i.Node2ID, + &i.Node2PubKey, + &i.Policy1ID, + &i.Policy1NodeID, + &i.Policy1Version, + &i.Policy1Timelock, + &i.Policy1FeePpm, + &i.Policy1BaseFeeMsat, + &i.Policy1MinHtlcMsat, + &i.Policy1MaxHtlcMsat, + &i.Policy1LastUpdate, + &i.Policy1Disabled, + &i.Policy1InboundBaseFeeMsat, + &i.Policy1InboundFeeRateMilliMsat, + &i.Policy1MessageFlags, + &i.Policy1ChannelFlags, + &i.Policy1Signature, + &i.Policy2ID, + &i.Policy2NodeID, + &i.Policy2Version, + &i.Policy2Timelock, + &i.Policy2FeePpm, + &i.Policy2BaseFeeMsat, + &i.Policy2MinHtlcMsat, + &i.Policy2MaxHtlcMsat, + &i.Policy2LastUpdate, + &i.Policy2Disabled, + &i.Policy2InboundBaseFeeMsat, + &i.Policy2InboundFeeRateMilliMsat, + &i.Policy2MessageFlags, + &i.Policy2ChannelFlags, + &i.Policy2Signature, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getChannelsByOutpoints = `-- name: GetChannelsByOutpoints :many SELECT c.id, c.version, c.scid, c.node_id_1, c.node_id_2, c.outpoint, c.capacity, c.bitcoin_key_1, c.bitcoin_key_2, c.node_1_signature, c.node_2_signature, c.bitcoin_1_signature, c.bitcoin_2_signature, diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index 7f1ad9abb..4e1e26a02 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -42,6 +42,7 @@ type Querier interface { GetChannelFeaturesBatch(ctx context.Context, chanIds []int64) ([]GraphChannelFeature, error) GetChannelPolicyByChannelAndNode(ctx context.Context, arg GetChannelPolicyByChannelAndNodeParams) (GraphChannelPolicy, error) GetChannelPolicyExtraTypesBatch(ctx context.Context, policyIds []int64) ([]GetChannelPolicyExtraTypesBatchRow, error) + GetChannelsByIDs(ctx context.Context, ids []int64) ([]GetChannelsByIDsRow, error) GetChannelsByOutpoints(ctx context.Context, outpoints []string) ([]GetChannelsByOutpointsRow, error) GetChannelsByPolicyLastUpdateRange(ctx context.Context, arg GetChannelsByPolicyLastUpdateRangeParams) ([]GetChannelsByPolicyLastUpdateRangeRow, error) GetChannelsBySCIDRange(ctx context.Context, arg GetChannelsBySCIDRangeParams) ([]GetChannelsBySCIDRangeRow, error) diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index 55629a9ac..c0b890131 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -335,6 +335,59 @@ WHERE c.version = @version AND c.scid IN (sqlc.slice('scids')/*SLICE:scids*/); +-- name: GetChannelsByIDs :many +SELECT + sqlc.embed(c), + + -- Minimal node data. + n1.id AS node1_id, + n1.pub_key AS node1_pub_key, + n2.id AS node2_id, + n2.pub_key AS node2_pub_key, + + -- Policy 1 + cp1.id AS policy1_id, + cp1.node_id AS policy1_node_id, + cp1.version AS policy1_version, + cp1.timelock AS policy1_timelock, + cp1.fee_ppm AS policy1_fee_ppm, + cp1.base_fee_msat AS policy1_base_fee_msat, + cp1.min_htlc_msat AS policy1_min_htlc_msat, + cp1.max_htlc_msat AS policy1_max_htlc_msat, + cp1.last_update AS policy1_last_update, + cp1.disabled AS policy1_disabled, + cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat, + cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat, + cp1.message_flags AS policy1_message_flags, + cp1.channel_flags AS policy1_channel_flags, + cp1.signature AS policy1_signature, + + -- Policy 2 + cp2.id AS policy2_id, + cp2.node_id AS policy2_node_id, + cp2.version AS policy2_version, + cp2.timelock AS policy2_timelock, + cp2.fee_ppm AS policy2_fee_ppm, + cp2.base_fee_msat AS policy2_base_fee_msat, + cp2.min_htlc_msat AS policy2_min_htlc_msat, + cp2.max_htlc_msat AS policy2_max_htlc_msat, + cp2.last_update AS policy2_last_update, + cp2.disabled AS policy2_disabled, + cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat, + cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat, + cp2.message_flags AS policy2_message_flags, + cp2.channel_flags AS policy2_channel_flags, + cp2.signature AS policy2_signature + +FROM graph_channels c + JOIN graph_nodes n1 ON c.node_id_1 = n1.id + JOIN graph_nodes n2 ON c.node_id_2 = n2.id + LEFT JOIN graph_channel_policies cp1 + ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version + LEFT JOIN graph_channel_policies cp2 + ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version +WHERE c.id IN (sqlc.slice('ids')/*SLICE:ids*/); + -- name: GetChannelsByPolicyLastUpdateRange :many SELECT sqlc.embed(c),