graph/db+sqldb: implement AddChannelEdge on SQLStore

In this commit, the `AddChannelEdge` method of the SQLStore is
implemented. Like the KVStore implementation, it makes use of the
available channel `batch.Scheduler` and also updates the reject and
channel caches.

This then lets us convert the following 2 unit tests to run against the
SQL backends:
- TestPartialNode
- TestAddChannelEdgeShellNodes
This commit is contained in:
Elle Mouton
2025-05-24 15:09:37 +02:00
parent c5f159f485
commit d93d104a66
5 changed files with 383 additions and 2 deletions

View File

@ -261,7 +261,7 @@ func TestNodeInsertionAndDeletion(t *testing.T) {
func TestPartialNode(t *testing.T) {
t.Parallel()
graph := MakeTestGraph(t)
graph := MakeTestGraphNew(t)
// To insert a partial node, we need to add a channel edge that has
// node keys for nodes we are not yet aware
@ -3332,7 +3332,7 @@ func TestPruneGraphNodes(t *testing.T) {
func TestAddChannelEdgeShellNodes(t *testing.T) {
t.Parallel()
graph := MakeTestGraph(t)
graph := MakeTestGraphNew(t)
// To start, we'll create two nodes, and only add one of them to the
// channel graph.

View File

@ -69,6 +69,15 @@ type SQLQueries interface {
*/
AddSourceNode(ctx context.Context, nodeID int64) error
GetSourceNodesByVersion(ctx context.Context, version int16) ([]sqlc.GetSourceNodesByVersionRow, error)
/*
Channel queries.
*/
CreateChannel(ctx context.Context, arg sqlc.CreateChannelParams) (int64, error)
GetChannelBySCID(ctx context.Context, arg sqlc.GetChannelBySCIDParams) (sqlc.Channel, error)
CreateChannelExtraType(ctx context.Context, arg sqlc.CreateChannelExtraTypeParams) error
InsertChannelFeature(ctx context.Context, arg sqlc.InsertChannelFeatureParams) error
}
// BatchedSQLQueries is a version of SQLQueries that's capable of batched
@ -455,6 +464,54 @@ func (s *SQLStore) NodeUpdatesInHorizon(startTime,
return nodes, nil
}
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
// undirected edge from the two target nodes are created. The information stored
// denotes the static attributes of the channel, such as the channelID, the keys
// involved in creation of the channel, and the set of features that the channel
// supports. The chanPoint and chanID are used to uniquely identify the edge
// globally within the database.
//
// NOTE: part of the V1Store interface.
func (s *SQLStore) AddChannelEdge(edge *models.ChannelEdgeInfo,
opts ...batch.SchedulerOption) error {
ctx := context.TODO()
var alreadyExists bool
r := &batch.Request[SQLQueries]{
Opts: batch.NewSchedulerOptions(opts...),
Reset: func() {
alreadyExists = false
},
Do: func(tx SQLQueries) error {
err := insertChannel(ctx, tx, edge)
// Silence ErrEdgeAlreadyExist so that the batch can
// succeed, but propagate the error via local state.
if errors.Is(err, ErrEdgeAlreadyExist) {
alreadyExists = true
return nil
}
return err
},
OnCommit: func(err error) error {
switch {
case err != nil:
return err
case alreadyExists:
return ErrEdgeAlreadyExist
default:
s.rejectCache.remove(edge.ChannelID)
s.chanCache.remove(edge.ChannelID)
return nil
}
},
}
return s.chanScheduler.Execute(ctx, r)
}
// getNodeByPubKey attempts to look up a target node by its public key.
func getNodeByPubKey(ctx context.Context, db SQLQueries,
pubKey route.Vertex) (int64, *models.LightningNode, error) {
@ -1022,3 +1079,151 @@ func marshalExtraOpaqueData(data []byte) (map[uint64][]byte, error) {
return records, nil
}
// insertChannel inserts a new channel record into the database.
func insertChannel(ctx context.Context, db SQLQueries,
edge *models.ChannelEdgeInfo) error {
var chanIDB [8]byte
byteOrder.PutUint64(chanIDB[:], edge.ChannelID)
// Make sure that the channel doesn't already exist. We do this
// explicitly instead of relying on catching a unique constraint error
// because relying on SQL to throw that error would abort the entire
// batch of transactions.
_, err := db.GetChannelBySCID(
ctx, sqlc.GetChannelBySCIDParams{
Scid: chanIDB[:],
Version: int16(ProtocolV1),
},
)
if err == nil {
return ErrEdgeAlreadyExist
} else if !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("unable to fetch channel: %w", err)
}
// Make sure that at least a "shell" entry for each node is present in
// the nodes table.
node1DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey1Bytes)
if err != nil {
return fmt.Errorf("unable to create shell node: %w", err)
}
node2DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey2Bytes)
if err != nil {
return fmt.Errorf("unable to create shell node: %w", err)
}
var capacity sql.NullInt64
if edge.Capacity != 0 {
capacity = sqldb.SQLInt64(int64(edge.Capacity))
}
createParams := sqlc.CreateChannelParams{
Version: int16(ProtocolV1),
Scid: chanIDB[:],
NodeID1: node1DBID,
NodeID2: node2DBID,
Outpoint: edge.ChannelPoint.String(),
Capacity: capacity,
BitcoinKey1: edge.BitcoinKey1Bytes[:],
BitcoinKey2: edge.BitcoinKey2Bytes[:],
}
if edge.AuthProof != nil {
proof := edge.AuthProof
createParams.Node1Signature = proof.NodeSig1Bytes
createParams.Node2Signature = proof.NodeSig2Bytes
createParams.Bitcoin1Signature = proof.BitcoinSig1Bytes
createParams.Bitcoin2Signature = proof.BitcoinSig2Bytes
}
// Insert the new channel record.
dbChanID, err := db.CreateChannel(ctx, createParams)
if err != nil {
return err
}
// Insert any channel features.
if len(edge.Features) != 0 {
chanFeatures := lnwire.NewRawFeatureVector()
err := chanFeatures.Decode(bytes.NewReader(edge.Features))
if err != nil {
return err
}
fv := lnwire.NewFeatureVector(chanFeatures, lnwire.Features)
for feature := range fv.Features() {
err = db.InsertChannelFeature(
ctx, sqlc.InsertChannelFeatureParams{
ChannelID: dbChanID,
FeatureBit: int32(feature),
},
)
if err != nil {
return fmt.Errorf("unable to insert "+
"channel(%d) feature(%v): %w", dbChanID,
feature, err)
}
}
}
// Finally, insert any extra TLV fields in the channel announcement.
extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData)
if err != nil {
return fmt.Errorf("unable to marshal extra opaque data: %w",
err)
}
for tlvType, value := range extra {
err := db.CreateChannelExtraType(
ctx, sqlc.CreateChannelExtraTypeParams{
ChannelID: dbChanID,
Type: int64(tlvType),
Value: value,
},
)
if err != nil {
return fmt.Errorf("unable to upsert channel(%d) extra "+
"signed field(%v): %w", edge.ChannelID,
tlvType, err)
}
}
return nil
}
// maybeCreateShellNode checks if a shell node entry exists for the
// given public key. If it does not exist, then a new shell node entry is
// created. The ID of the node is returned. A shell node only has a protocol
// version and public key persisted.
func maybeCreateShellNode(ctx context.Context, db SQLQueries,
pubKey route.Vertex) (int64, error) {
dbNode, err := db.GetNodeByPubKey(
ctx, sqlc.GetNodeByPubKeyParams{
PubKey: pubKey[:],
Version: int16(ProtocolV1),
},
)
// The node exists. Return the ID.
if err == nil {
return dbNode.ID, nil
} else if !errors.Is(err, sql.ErrNoRows) {
return 0, err
}
// Otherwise, the node does not exist, so we create a shell entry for
// it.
id, err := db.UpsertNode(ctx, sqlc.UpsertNodeParams{
Version: int16(ProtocolV1),
PubKey: pubKey[:],
})
if err != nil {
return 0, fmt.Errorf("unable to create shell node: %w", err)
}
return id, nil
}

View File

@ -26,6 +26,81 @@ func (q *Queries) AddSourceNode(ctx context.Context, nodeID int64) error {
return err
}
const createChannel = `-- name: CreateChannel :one
/* ─────────────────────────────────────────────
channels table queries
─────────────────────────────────────────────
*/
INSERT INTO channels (
version, scid, node_id_1, node_id_2,
outpoint, capacity, bitcoin_key_1, bitcoin_key_2,
node_1_signature, node_2_signature, bitcoin_1_signature,
bitcoin_2_signature
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12
)
RETURNING id
`
type CreateChannelParams struct {
Version int16
Scid []byte
NodeID1 int64
NodeID2 int64
Outpoint string
Capacity sql.NullInt64
BitcoinKey1 []byte
BitcoinKey2 []byte
Node1Signature []byte
Node2Signature []byte
Bitcoin1Signature []byte
Bitcoin2Signature []byte
}
func (q *Queries) CreateChannel(ctx context.Context, arg CreateChannelParams) (int64, error) {
row := q.db.QueryRowContext(ctx, createChannel,
arg.Version,
arg.Scid,
arg.NodeID1,
arg.NodeID2,
arg.Outpoint,
arg.Capacity,
arg.BitcoinKey1,
arg.BitcoinKey2,
arg.Node1Signature,
arg.Node2Signature,
arg.Bitcoin1Signature,
arg.Bitcoin2Signature,
)
var id int64
err := row.Scan(&id)
return id, err
}
const createChannelExtraType = `-- name: CreateChannelExtraType :exec
/* ─────────────────────────────────────────────
channel_extra_types table queries
─────────────────────────────────────────────
*/
INSERT INTO channel_extra_types (
channel_id, type, value
)
VALUES ($1, $2, $3)
`
type CreateChannelExtraTypeParams struct {
ChannelID int64
Type int64
Value []byte
}
func (q *Queries) CreateChannelExtraType(ctx context.Context, arg CreateChannelExtraTypeParams) error {
_, err := q.db.ExecContext(ctx, createChannelExtraType, arg.ChannelID, arg.Type, arg.Value)
return err
}
const deleteExtraNodeType = `-- name: DeleteExtraNodeType :exec
DELETE FROM node_extra_types
WHERE node_id = $1
@ -83,6 +158,37 @@ func (q *Queries) DeleteNodeFeature(ctx context.Context, arg DeleteNodeFeaturePa
return err
}
const getChannelBySCID = `-- name: GetChannelBySCID :one
SELECT id, version, scid, node_id_1, node_id_2, outpoint, capacity, bitcoin_key_1, bitcoin_key_2, node_1_signature, node_2_signature, bitcoin_1_signature, bitcoin_2_signature FROM channels
WHERE scid = $1 AND version = $2
`
type GetChannelBySCIDParams struct {
Scid []byte
Version int16
}
func (q *Queries) GetChannelBySCID(ctx context.Context, arg GetChannelBySCIDParams) (Channel, error) {
row := q.db.QueryRowContext(ctx, getChannelBySCID, arg.Scid, arg.Version)
var i Channel
err := row.Scan(
&i.ID,
&i.Version,
&i.Scid,
&i.NodeID1,
&i.NodeID2,
&i.Outpoint,
&i.Capacity,
&i.BitcoinKey1,
&i.BitcoinKey2,
&i.Node1Signature,
&i.Node2Signature,
&i.Bitcoin1Signature,
&i.Bitcoin2Signature,
)
return i, err
}
const getExtraNodeTypes = `-- name: GetExtraNodeTypes :many
SELECT node_id, type, value
FROM node_extra_types
@ -323,6 +429,29 @@ func (q *Queries) GetSourceNodesByVersion(ctx context.Context, version int16) ([
return items, nil
}
const insertChannelFeature = `-- name: InsertChannelFeature :exec
/* ─────────────────────────────────────────────
channel_features table queries
─────────────────────────────────────────────
*/
INSERT INTO channel_features (
channel_id, feature_bit
) VALUES (
$1, $2
)
`
type InsertChannelFeatureParams struct {
ChannelID int64
FeatureBit int32
}
func (q *Queries) InsertChannelFeature(ctx context.Context, arg InsertChannelFeatureParams) error {
_, err := q.db.ExecContext(ctx, insertChannelFeature, arg.ChannelID, arg.FeatureBit)
return err
}
const insertNodeAddress = `-- name: InsertNodeAddress :exec
/* ─────────────────────────────────────────────
node_addresses table queries

View File

@ -13,6 +13,8 @@ import (
type Querier interface {
AddSourceNode(ctx context.Context, nodeID int64) error
ClearKVInvoiceHashIndex(ctx context.Context) error
CreateChannel(ctx context.Context, arg CreateChannelParams) (int64, error)
CreateChannelExtraType(ctx context.Context, arg CreateChannelExtraTypeParams) error
DeleteCanceledInvoices(ctx context.Context) (sql.Result, error)
DeleteExtraNodeType(ctx context.Context, arg DeleteExtraNodeTypeParams) error
DeleteInvoice(ctx context.Context, arg DeleteInvoiceParams) (sql.Result, error)
@ -24,6 +26,7 @@ type Querier interface {
FetchSettledAMPSubInvoices(ctx context.Context, arg FetchSettledAMPSubInvoicesParams) ([]FetchSettledAMPSubInvoicesRow, error)
FilterInvoices(ctx context.Context, arg FilterInvoicesParams) ([]Invoice, error)
GetAMPInvoiceID(ctx context.Context, setID []byte) (int64, error)
GetChannelBySCID(ctx context.Context, arg GetChannelBySCIDParams) (Channel, error)
GetDatabaseVersion(ctx context.Context) (int32, error)
GetExtraNodeTypes(ctx context.Context, nodeID int64) ([]NodeExtraType, error)
// This method may return more than one invoice if filter using multiple fields
@ -45,6 +48,7 @@ type Querier interface {
GetSourceNodesByVersion(ctx context.Context, version int16) ([]GetSourceNodesByVersionRow, error)
InsertAMPSubInvoice(ctx context.Context, arg InsertAMPSubInvoiceParams) error
InsertAMPSubInvoiceHTLC(ctx context.Context, arg InsertAMPSubInvoiceHTLCParams) error
InsertChannelFeature(ctx context.Context, arg InsertChannelFeatureParams) error
InsertInvoice(ctx context.Context, arg InsertInvoiceParams) (int64, error)
InsertInvoiceFeature(ctx context.Context, arg InsertInvoiceFeatureParams) error
InsertInvoiceHTLC(ctx context.Context, arg InsertInvoiceHTLCParams) (int64, error)

View File

@ -132,3 +132,46 @@ SELECT sn.node_id, n.pub_key
FROM source_nodes sn
JOIN nodes n ON sn.node_id = n.id
WHERE n.version = $1;
/* ─────────────────────────────────────────────
channels table queries
─────────────────────────────────────────────
*/
-- name: CreateChannel :one
INSERT INTO channels (
version, scid, node_id_1, node_id_2,
outpoint, capacity, bitcoin_key_1, bitcoin_key_2,
node_1_signature, node_2_signature, bitcoin_1_signature,
bitcoin_2_signature
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12
)
RETURNING id;
-- name: GetChannelBySCID :one
SELECT * FROM channels
WHERE scid = $1 AND version = $2;
/* ─────────────────────────────────────────────
channel_features table queries
─────────────────────────────────────────────
*/
-- name: InsertChannelFeature :exec
INSERT INTO channel_features (
channel_id, feature_bit
) VALUES (
$1, $2
);
/* ─────────────────────────────────────────────
channel_extra_types table queries
─────────────────────────────────────────────
*/
-- name: CreateChannelExtraType :exec
INSERT INTO channel_extra_types (
channel_id, type, value
)
VALUES ($1, $2, $3);