graph/db: use new batch helpers for edge loading

Use the new batch helpers to replace the existing logic for loading an
edge.
This commit is contained in:
Elle Mouton
2025-07-29 12:59:11 +02:00
parent 8ad5f633bc
commit f39edae6e3
4 changed files with 76 additions and 239 deletions

View File

@@ -102,7 +102,6 @@ type SQLQueries interface {
GetChannelBySCIDWithPolicies(ctx context.Context, arg sqlc.GetChannelBySCIDWithPoliciesParams) (sqlc.GetChannelBySCIDWithPoliciesRow, error)
GetChannelsBySCIDWithPolicies(ctx context.Context, arg sqlc.GetChannelsBySCIDWithPoliciesParams) ([]sqlc.GetChannelsBySCIDWithPoliciesRow, error)
GetChannelAndNodesBySCID(ctx context.Context, arg sqlc.GetChannelAndNodesBySCIDParams) (sqlc.GetChannelAndNodesBySCIDRow, error)
GetChannelFeaturesAndExtras(ctx context.Context, channelID int64) ([]sqlc.GetChannelFeaturesAndExtrasRow, error)
HighestSCID(ctx context.Context, version int16) ([]byte, error)
ListChannelsByNodeID(ctx context.Context, arg sqlc.ListChannelsByNodeIDParams) ([]sqlc.ListChannelsByNodeIDRow, error)
ListChannelsWithPoliciesPaginated(ctx context.Context, arg sqlc.ListChannelsWithPoliciesPaginatedParams) ([]sqlc.ListChannelsWithPoliciesPaginatedRow, error)
@@ -127,7 +126,6 @@ type SQLQueries interface {
GetV1DisabledSCIDs(ctx context.Context) ([][]byte, error)
InsertChanPolicyExtraType(ctx context.Context, arg sqlc.InsertChanPolicyExtraTypeParams) error
GetChannelPolicyExtraTypes(ctx context.Context, arg sqlc.GetChannelPolicyExtraTypesParams) ([]sqlc.GetChannelPolicyExtraTypesRow, error)
GetChannelPolicyExtraTypesBatch(ctx context.Context, policyIds []int64) ([]sqlc.GetChannelPolicyExtraTypesBatchRow, error)
DeleteChannelPolicyExtraTypes(ctx context.Context, channelPolicyID int64) error
@@ -4091,16 +4089,44 @@ func getAndBuildEdgeInfo(ctx context.Context, db SQLQueries,
chain chainhash.Hash, dbChanID int64, dbChan sqlc.GraphChannel, node1,
node2 route.Vertex) (*models.ChannelEdgeInfo, error) {
// NOTE: getAndBuildEdgeInfo is only used to load the data for a single
// edge, and so no paged queries will be performed. This means that
// it's ok to used pass in default config values here.
cfg := sqldb.DefaultPagedQueryConfig()
data, err := batchLoadChannelData(ctx, cfg, db, []int64{dbChan.ID}, nil)
if err != nil {
return nil, fmt.Errorf("unable to batch load channel data: %w",
err)
}
return buildEdgeInfoWithBatchData(chain, dbChan, node1, node2, data)
}
// buildEdgeInfoWithBatchData builds edge info using pre-loaded batch data.
func buildEdgeInfoWithBatchData(chain chainhash.Hash,
dbChan sqlc.GraphChannel, node1, node2 route.Vertex,
batchData *batchChannelData) (*models.ChannelEdgeInfo, error) {
if dbChan.Version != int16(ProtocolV1) {
return nil, fmt.Errorf("unsupported channel version: %d",
dbChan.Version)
}
fv, extras, err := getChanFeaturesAndExtras(
ctx, db, dbChanID,
)
if err != nil {
return nil, err
// Use pre-loaded features and extras types.
fv := lnwire.EmptyFeatureVector()
if features, exists := batchData.chanfeatures[dbChan.ID]; exists {
for _, bit := range features {
fv.Set(lnwire.FeatureBit(bit))
}
}
var extras map[uint64][]byte
channelExtras, exists := batchData.chanExtraTypes[dbChan.ID]
if exists {
extras = channelExtras
} else {
extras = make(map[uint64][]byte)
}
op, err := wire.NewOutPointFromString(dbChan.Outpoint)
@@ -4169,46 +4195,6 @@ func buildNodeVertices(node1Pub, node2Pub []byte) (route.Vertex,
return node1Vertex, node2Vertex, nil
}
// getChanFeaturesAndExtras fetches the channel features and extra TLV types
// for a channel with the given ID.
func getChanFeaturesAndExtras(ctx context.Context, db SQLQueries,
id int64) (*lnwire.FeatureVector, map[uint64][]byte, error) {
rows, err := db.GetChannelFeaturesAndExtras(ctx, id)
if err != nil {
return nil, nil, fmt.Errorf("unable to fetch channel "+
"features and extras: %w", err)
}
var (
fv = lnwire.EmptyFeatureVector()
extras = make(map[uint64][]byte)
)
for _, row := range rows {
if row.IsFeature {
fv.Set(lnwire.FeatureBit(row.FeatureBit))
continue
}
tlvType, ok := row.ExtraKey.(int64)
if !ok {
return nil, nil, fmt.Errorf("unexpected type for "+
"TLV type: %T", row.ExtraKey)
}
valueBytes, ok := row.Value.([]byte)
if !ok {
return nil, nil, fmt.Errorf("unexpected type for "+
"Value: %T", row.Value)
}
extras[uint64(tlvType)] = valueBytes
}
return fv, extras, nil
}
// getAndBuildChanPolicies uses the given sqlc.GraphChannelPolicy and also
// retrieves all the extra info required to build the complete
// models.ChannelEdgePolicy types. It returns two policies, which may be nil if
@@ -4222,58 +4208,38 @@ func getAndBuildChanPolicies(ctx context.Context, db SQLQueries,
return nil, nil, nil
}
var (
policy1ID int64
policy2ID int64
)
var policyIDs = make([]int64, 0, 2)
if dbPol1 != nil {
policy1ID = dbPol1.ID
policyIDs = append(policyIDs, dbPol1.ID)
}
if dbPol2 != nil {
policy2ID = dbPol2.ID
policyIDs = append(policyIDs, dbPol2.ID)
}
rows, err := db.GetChannelPolicyExtraTypes(
ctx, sqlc.GetChannelPolicyExtraTypesParams{
ID: policy1ID,
ID_2: policy2ID,
},
// NOTE: getAndBuildChanPolicies is only used to load the data for
// a maximum of two policies, and so no paged queries will be
// performed (unless the page size is one). So it's ok to use
// the default config values here.
cfg := sqldb.DefaultPagedQueryConfig()
batchData, err := batchLoadChannelData(ctx, cfg, db, nil, policyIDs)
if err != nil {
return nil, nil, fmt.Errorf("unable to batch load channel "+
"data: %w", err)
}
pol1, err := buildChanPolicyWithBatchData(
dbPol1, channelID, node2, batchData,
)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("unable to build policy1: %w", err)
}
var (
dbPol1Extras = make(map[uint64][]byte)
dbPol2Extras = make(map[uint64][]byte)
pol2, err := buildChanPolicyWithBatchData(
dbPol2, channelID, node1, batchData,
)
for _, row := range rows {
switch row.PolicyID {
case policy1ID:
dbPol1Extras[uint64(row.Type)] = row.Value
case policy2ID:
dbPol2Extras[uint64(row.Type)] = row.Value
default:
return nil, nil, fmt.Errorf("unexpected policy ID %d "+
"in row: %v", row.PolicyID, row)
}
}
var pol1, pol2 *models.ChannelEdgePolicy
if dbPol1 != nil {
pol1, err = buildChanPolicy(
*dbPol1, channelID, dbPol1Extras, node2,
)
if err != nil {
return nil, nil, err
}
}
if dbPol2 != nil {
pol2, err = buildChanPolicy(
*dbPol2, channelID, dbPol2Extras, node1,
)
if err != nil {
return nil, nil, err
}
if err != nil {
return nil, nil, fmt.Errorf("unable to build policy2: %w", err)
}
return pol1, pol2, nil
@@ -4905,6 +4871,26 @@ func batchLoadNodeExtraTypesHelper(ctx context.Context,
)
}
// buildChanPolicyWithBatchData builds a models.ChannelEdgePolicy instance from
// the provided sqlc.GraphChannelPolicy and the provided batchChannelData.
func buildChanPolicyWithBatchData(dbPol *sqlc.GraphChannelPolicy,
channelID uint64, toNode route.Vertex,
batchData *batchChannelData) (*models.ChannelEdgePolicy, error) {
if dbPol == nil {
return nil, nil
}
var dbPol1Extras map[uint64][]byte
if extras, exists := batchData.policyExtras[dbPol.ID]; exists {
dbPol1Extras = extras
} else {
dbPol1Extras = make(map[uint64][]byte)
}
return buildChanPolicy(*dbPol, channelID, dbPol1Extras, toNode)
}
// batchChannelData holds all the related data for a batch of channels.
type batchChannelData struct {
// chanFeatures is a map from DB channel ID to a slice of feature bits.