diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index bd4a2d56f..026fedbdc 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -261,7 +261,7 @@ func TestNodeInsertionAndDeletion(t *testing.T) { func TestPartialNode(t *testing.T) { t.Parallel() - graph := MakeTestGraph(t) + graph := MakeTestGraphNew(t) // To insert a partial node, we need to add a channel edge that has // node keys for nodes we are not yet aware @@ -3332,7 +3332,7 @@ func TestPruneGraphNodes(t *testing.T) { func TestAddChannelEdgeShellNodes(t *testing.T) { t.Parallel() - graph := MakeTestGraph(t) + graph := MakeTestGraphNew(t) // To start, we'll create two nodes, and only add one of them to the // channel graph. diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 1ca00442a..83749c81c 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -69,6 +69,15 @@ type SQLQueries interface { */ AddSourceNode(ctx context.Context, nodeID int64) error GetSourceNodesByVersion(ctx context.Context, version int16) ([]sqlc.GetSourceNodesByVersionRow, error) + + /* + Channel queries. + */ + CreateChannel(ctx context.Context, arg sqlc.CreateChannelParams) (int64, error) + GetChannelBySCID(ctx context.Context, arg sqlc.GetChannelBySCIDParams) (sqlc.Channel, error) + + CreateChannelExtraType(ctx context.Context, arg sqlc.CreateChannelExtraTypeParams) error + InsertChannelFeature(ctx context.Context, arg sqlc.InsertChannelFeatureParams) error } // BatchedSQLQueries is a version of SQLQueries that's capable of batched @@ -455,6 +464,54 @@ func (s *SQLStore) NodeUpdatesInHorizon(startTime, return nodes, nil } +// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An +// undirected edge from the two target nodes are created. The information stored +// denotes the static attributes of the channel, such as the channelID, the keys +// involved in creation of the channel, and the set of features that the channel +// supports. The chanPoint and chanID are used to uniquely identify the edge +// globally within the database. +// +// NOTE: part of the V1Store interface. +func (s *SQLStore) AddChannelEdge(edge *models.ChannelEdgeInfo, + opts ...batch.SchedulerOption) error { + + ctx := context.TODO() + + var alreadyExists bool + r := &batch.Request[SQLQueries]{ + Opts: batch.NewSchedulerOptions(opts...), + Reset: func() { + alreadyExists = false + }, + Do: func(tx SQLQueries) error { + err := insertChannel(ctx, tx, edge) + + // Silence ErrEdgeAlreadyExist so that the batch can + // succeed, but propagate the error via local state. + if errors.Is(err, ErrEdgeAlreadyExist) { + alreadyExists = true + return nil + } + + return err + }, + OnCommit: func(err error) error { + switch { + case err != nil: + return err + case alreadyExists: + return ErrEdgeAlreadyExist + default: + s.rejectCache.remove(edge.ChannelID) + s.chanCache.remove(edge.ChannelID) + return nil + } + }, + } + + return s.chanScheduler.Execute(ctx, r) +} + // getNodeByPubKey attempts to look up a target node by its public key. func getNodeByPubKey(ctx context.Context, db SQLQueries, pubKey route.Vertex) (int64, *models.LightningNode, error) { @@ -1022,3 +1079,151 @@ func marshalExtraOpaqueData(data []byte) (map[uint64][]byte, error) { return records, nil } + +// insertChannel inserts a new channel record into the database. +func insertChannel(ctx context.Context, db SQLQueries, + edge *models.ChannelEdgeInfo) error { + + var chanIDB [8]byte + byteOrder.PutUint64(chanIDB[:], edge.ChannelID) + + // Make sure that the channel doesn't already exist. We do this + // explicitly instead of relying on catching a unique constraint error + // because relying on SQL to throw that error would abort the entire + // batch of transactions. + _, err := db.GetChannelBySCID( + ctx, sqlc.GetChannelBySCIDParams{ + Scid: chanIDB[:], + Version: int16(ProtocolV1), + }, + ) + if err == nil { + return ErrEdgeAlreadyExist + } else if !errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("unable to fetch channel: %w", err) + } + + // Make sure that at least a "shell" entry for each node is present in + // the nodes table. + node1DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey1Bytes) + if err != nil { + return fmt.Errorf("unable to create shell node: %w", err) + } + + node2DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey2Bytes) + if err != nil { + return fmt.Errorf("unable to create shell node: %w", err) + } + + var capacity sql.NullInt64 + if edge.Capacity != 0 { + capacity = sqldb.SQLInt64(int64(edge.Capacity)) + } + + createParams := sqlc.CreateChannelParams{ + Version: int16(ProtocolV1), + Scid: chanIDB[:], + NodeID1: node1DBID, + NodeID2: node2DBID, + Outpoint: edge.ChannelPoint.String(), + Capacity: capacity, + BitcoinKey1: edge.BitcoinKey1Bytes[:], + BitcoinKey2: edge.BitcoinKey2Bytes[:], + } + + if edge.AuthProof != nil { + proof := edge.AuthProof + + createParams.Node1Signature = proof.NodeSig1Bytes + createParams.Node2Signature = proof.NodeSig2Bytes + createParams.Bitcoin1Signature = proof.BitcoinSig1Bytes + createParams.Bitcoin2Signature = proof.BitcoinSig2Bytes + } + + // Insert the new channel record. + dbChanID, err := db.CreateChannel(ctx, createParams) + if err != nil { + return err + } + + // Insert any channel features. + if len(edge.Features) != 0 { + chanFeatures := lnwire.NewRawFeatureVector() + err := chanFeatures.Decode(bytes.NewReader(edge.Features)) + if err != nil { + return err + } + + fv := lnwire.NewFeatureVector(chanFeatures, lnwire.Features) + for feature := range fv.Features() { + err = db.InsertChannelFeature( + ctx, sqlc.InsertChannelFeatureParams{ + ChannelID: dbChanID, + FeatureBit: int32(feature), + }, + ) + if err != nil { + return fmt.Errorf("unable to insert "+ + "channel(%d) feature(%v): %w", dbChanID, + feature, err) + } + } + } + + // Finally, insert any extra TLV fields in the channel announcement. + extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData) + if err != nil { + return fmt.Errorf("unable to marshal extra opaque data: %w", + err) + } + + for tlvType, value := range extra { + err := db.CreateChannelExtraType( + ctx, sqlc.CreateChannelExtraTypeParams{ + ChannelID: dbChanID, + Type: int64(tlvType), + Value: value, + }, + ) + if err != nil { + return fmt.Errorf("unable to upsert channel(%d) extra "+ + "signed field(%v): %w", edge.ChannelID, + tlvType, err) + } + } + + return nil +} + +// maybeCreateShellNode checks if a shell node entry exists for the +// given public key. If it does not exist, then a new shell node entry is +// created. The ID of the node is returned. A shell node only has a protocol +// version and public key persisted. +func maybeCreateShellNode(ctx context.Context, db SQLQueries, + pubKey route.Vertex) (int64, error) { + + dbNode, err := db.GetNodeByPubKey( + ctx, sqlc.GetNodeByPubKeyParams{ + PubKey: pubKey[:], + Version: int16(ProtocolV1), + }, + ) + // The node exists. Return the ID. + if err == nil { + return dbNode.ID, nil + } else if !errors.Is(err, sql.ErrNoRows) { + return 0, err + } + + // Otherwise, the node does not exist, so we create a shell entry for + // it. + id, err := db.UpsertNode(ctx, sqlc.UpsertNodeParams{ + Version: int16(ProtocolV1), + PubKey: pubKey[:], + }) + if err != nil { + return 0, fmt.Errorf("unable to create shell node: %w", err) + } + + return id, nil +} diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index dcdd79053..ec730818f 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -26,6 +26,81 @@ func (q *Queries) AddSourceNode(ctx context.Context, nodeID int64) error { return err } +const createChannel = `-- name: CreateChannel :one +/* ───────────────────────────────────────────── + channels table queries + ───────────────────────────────────────────── +*/ + +INSERT INTO channels ( + version, scid, node_id_1, node_id_2, + outpoint, capacity, bitcoin_key_1, bitcoin_key_2, + node_1_signature, node_2_signature, bitcoin_1_signature, + bitcoin_2_signature +) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12 +) +RETURNING id +` + +type CreateChannelParams struct { + Version int16 + Scid []byte + NodeID1 int64 + NodeID2 int64 + Outpoint string + Capacity sql.NullInt64 + BitcoinKey1 []byte + BitcoinKey2 []byte + Node1Signature []byte + Node2Signature []byte + Bitcoin1Signature []byte + Bitcoin2Signature []byte +} + +func (q *Queries) CreateChannel(ctx context.Context, arg CreateChannelParams) (int64, error) { + row := q.db.QueryRowContext(ctx, createChannel, + arg.Version, + arg.Scid, + arg.NodeID1, + arg.NodeID2, + arg.Outpoint, + arg.Capacity, + arg.BitcoinKey1, + arg.BitcoinKey2, + arg.Node1Signature, + arg.Node2Signature, + arg.Bitcoin1Signature, + arg.Bitcoin2Signature, + ) + var id int64 + err := row.Scan(&id) + return id, err +} + +const createChannelExtraType = `-- name: CreateChannelExtraType :exec +/* ───────────────────────────────────────────── + channel_extra_types table queries + ───────────────────────────────────────────── +*/ + +INSERT INTO channel_extra_types ( + channel_id, type, value +) +VALUES ($1, $2, $3) +` + +type CreateChannelExtraTypeParams struct { + ChannelID int64 + Type int64 + Value []byte +} + +func (q *Queries) CreateChannelExtraType(ctx context.Context, arg CreateChannelExtraTypeParams) error { + _, err := q.db.ExecContext(ctx, createChannelExtraType, arg.ChannelID, arg.Type, arg.Value) + return err +} + const deleteExtraNodeType = `-- name: DeleteExtraNodeType :exec DELETE FROM node_extra_types WHERE node_id = $1 @@ -83,6 +158,37 @@ func (q *Queries) DeleteNodeFeature(ctx context.Context, arg DeleteNodeFeaturePa return err } +const getChannelBySCID = `-- name: GetChannelBySCID :one +SELECT id, version, scid, node_id_1, node_id_2, outpoint, capacity, bitcoin_key_1, bitcoin_key_2, node_1_signature, node_2_signature, bitcoin_1_signature, bitcoin_2_signature FROM channels +WHERE scid = $1 AND version = $2 +` + +type GetChannelBySCIDParams struct { + Scid []byte + Version int16 +} + +func (q *Queries) GetChannelBySCID(ctx context.Context, arg GetChannelBySCIDParams) (Channel, error) { + row := q.db.QueryRowContext(ctx, getChannelBySCID, arg.Scid, arg.Version) + var i Channel + err := row.Scan( + &i.ID, + &i.Version, + &i.Scid, + &i.NodeID1, + &i.NodeID2, + &i.Outpoint, + &i.Capacity, + &i.BitcoinKey1, + &i.BitcoinKey2, + &i.Node1Signature, + &i.Node2Signature, + &i.Bitcoin1Signature, + &i.Bitcoin2Signature, + ) + return i, err +} + const getExtraNodeTypes = `-- name: GetExtraNodeTypes :many SELECT node_id, type, value FROM node_extra_types @@ -323,6 +429,29 @@ func (q *Queries) GetSourceNodesByVersion(ctx context.Context, version int16) ([ return items, nil } +const insertChannelFeature = `-- name: InsertChannelFeature :exec +/* ───────────────────────────────────────────── + channel_features table queries + ───────────────────────────────────────────── +*/ + +INSERT INTO channel_features ( + channel_id, feature_bit +) VALUES ( + $1, $2 +) +` + +type InsertChannelFeatureParams struct { + ChannelID int64 + FeatureBit int32 +} + +func (q *Queries) InsertChannelFeature(ctx context.Context, arg InsertChannelFeatureParams) error { + _, err := q.db.ExecContext(ctx, insertChannelFeature, arg.ChannelID, arg.FeatureBit) + return err +} + const insertNodeAddress = `-- name: InsertNodeAddress :exec /* ───────────────────────────────────────────── node_addresses table queries diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index 7492a21e0..e3482d524 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -13,6 +13,8 @@ import ( type Querier interface { AddSourceNode(ctx context.Context, nodeID int64) error ClearKVInvoiceHashIndex(ctx context.Context) error + CreateChannel(ctx context.Context, arg CreateChannelParams) (int64, error) + CreateChannelExtraType(ctx context.Context, arg CreateChannelExtraTypeParams) error DeleteCanceledInvoices(ctx context.Context) (sql.Result, error) DeleteExtraNodeType(ctx context.Context, arg DeleteExtraNodeTypeParams) error DeleteInvoice(ctx context.Context, arg DeleteInvoiceParams) (sql.Result, error) @@ -24,6 +26,7 @@ type Querier interface { FetchSettledAMPSubInvoices(ctx context.Context, arg FetchSettledAMPSubInvoicesParams) ([]FetchSettledAMPSubInvoicesRow, error) FilterInvoices(ctx context.Context, arg FilterInvoicesParams) ([]Invoice, error) GetAMPInvoiceID(ctx context.Context, setID []byte) (int64, error) + GetChannelBySCID(ctx context.Context, arg GetChannelBySCIDParams) (Channel, error) GetDatabaseVersion(ctx context.Context) (int32, error) GetExtraNodeTypes(ctx context.Context, nodeID int64) ([]NodeExtraType, error) // This method may return more than one invoice if filter using multiple fields @@ -45,6 +48,7 @@ type Querier interface { GetSourceNodesByVersion(ctx context.Context, version int16) ([]GetSourceNodesByVersionRow, error) InsertAMPSubInvoice(ctx context.Context, arg InsertAMPSubInvoiceParams) error InsertAMPSubInvoiceHTLC(ctx context.Context, arg InsertAMPSubInvoiceHTLCParams) error + InsertChannelFeature(ctx context.Context, arg InsertChannelFeatureParams) error InsertInvoice(ctx context.Context, arg InsertInvoiceParams) (int64, error) InsertInvoiceFeature(ctx context.Context, arg InsertInvoiceFeatureParams) error InsertInvoiceHTLC(ctx context.Context, arg InsertInvoiceHTLCParams) (int64, error) diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index a6745227e..b66aaaa75 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -132,3 +132,46 @@ SELECT sn.node_id, n.pub_key FROM source_nodes sn JOIN nodes n ON sn.node_id = n.id WHERE n.version = $1; + +/* ───────────────────────────────────────────── + channels table queries + ───────────────────────────────────────────── +*/ + +-- name: CreateChannel :one +INSERT INTO channels ( + version, scid, node_id_1, node_id_2, + outpoint, capacity, bitcoin_key_1, bitcoin_key_2, + node_1_signature, node_2_signature, bitcoin_1_signature, + bitcoin_2_signature +) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12 +) +RETURNING id; + +-- name: GetChannelBySCID :one +SELECT * FROM channels +WHERE scid = $1 AND version = $2; + +/* ───────────────────────────────────────────── + channel_features table queries + ───────────────────────────────────────────── +*/ + +-- name: InsertChannelFeature :exec +INSERT INTO channel_features ( + channel_id, feature_bit +) VALUES ( + $1, $2 +); + +/* ───────────────────────────────────────────── + channel_extra_types table queries + ───────────────────────────────────────────── +*/ + +-- name: CreateChannelExtraType :exec +INSERT INTO channel_extra_types ( + channel_id, type, value +) +VALUES ($1, $2, $3);