diff --git a/autopilot/graph.go b/autopilot/graph.go index 036579d07..61515a790 100644 --- a/autopilot/graph.go +++ b/autopilot/graph.go @@ -85,6 +85,17 @@ func (d dbNode) ForEachChannel(cb func(ChannelEdge) error) error { return d.node.ForEachChannel(d.tx, func(tx *bolt.Tx, ei *channeldb.ChannelEdgeInfo, ep, _ *channeldb.ChannelEdgePolicy) error { + // Skip channels for which no outgoing edge policy is available. + // + // TODO(joostjager): Ideally the case where channels have a nil + // policy should be supported, as auto pilot is not looking at + // the policies. For now, it is not easily possible to get a + // reference to the other end LightningNode object without + // retrieving the policy. + if ep == nil { + return nil + } + pubkey, _ := ep.Node.PubKey() edge := ChannelEdge{ Channel: Channel{ diff --git a/channeldb/db.go b/channeldb/db.go index 5e13f820e..071a3bc85 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -59,6 +59,14 @@ var ( number: 3, migration: migrateInvoiceTimeSeriesOutgoingPayments, }, + { + // The version of the database where every channel + // always has two entries in the edges bucket. If + // a policy is unknown, this will be represented + // by a special byte sequence. + number: 4, + migration: migrateEdgePolicies, + }, } // Big endian is the preferred byte order, due to cursor scans over diff --git a/channeldb/graph.go b/channeldb/graph.go index 5d96949f6..0ce57ad46 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -56,16 +56,22 @@ var ( // edgeBucket is a bucket which houses all of the edge or channel // information within the channel graph. This bucket essentially acts // as an adjacency list, which in conjunction with a range scan, can be - // used to iterate over all the _outgoing_ edges for a particular node. - // Key in the bucket use a prefix scheme which leads with the node's - // public key and sends with the compact edge ID. For each edgeID, - // there will be two entries within the bucket, as the graph is - // directed: nodes may have different policies w.r.t to fees for their - // respective directions. + // used to iterate over all the incoming and outgoing edges for a + // particular node. Key in the bucket use a prefix scheme which leads + // with the node's public key and sends with the compact edge ID. + // For each chanID, there will be two entries within the bucket, as the + // graph is directed: nodes may have different policies w.r.t to fees + // for their respective directions. // - // maps: pubKey || edgeID -> edge policy for node + // maps: pubKey || chanID -> channel edge policy for node edgeBucket = []byte("graph-edge") + // unknownPolicy is represented as an empty slice. It is + // used as the value in edgeBucket for unknown channel edge policies. + // Unknown policies are still stored in the database to enable efficient + // lookup of incoming channel edges. + unknownPolicy = []byte{} + // chanStart is an array of all zero bytes which is used to perform // range scans within the edgeBucket to obtain all of the outgoing // edges for a particular node. @@ -511,6 +517,18 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error { return err } + // Mark edge policies for both sides as unknown. This is to + // enable efficient incoming channel lookup for a node. + for _, key := range []*[33]byte{&edge.NodeKey1Bytes, + &edge.NodeKey2Bytes} { + + err := putChanEdgePolicyUnknown(edges, edge.ChannelID, + key[:]) + if err != nil { + return err + } + } + // Finally we add it to the channel index which maps channel // points (outpoints) to the shorter channel ID's. var b bytes.Buffer @@ -1759,12 +1777,14 @@ func (c *ChannelGraph) HasLightningNode(nodePub [33]byte) (time.Time, bool, erro return updateTime, exists, nil } -// ForEachChannel iterates through all the outgoing channel edges from this -// node, executing the passed callback with each edge as its sole argument. The -// first edge policy is the outgoing edge *to* the connecting node, while the -// second is the incoming edge *from* the connecting node. If the callback -// returns an error, then the iteration is halted with the error propagated -// back up to the caller. +// ForEachChannel iterates through all channels of this node, executing the +// passed callback with an edge info structure and the policies of each end +// of the channel. The first edge policy is the outgoing edge *to* the +// the connecting node, while the second is the incoming edge *from* the +// connecting node. If the callback returns an error, then the iteration is +// halted with the error propagated back up to the caller. +// +// Unknown policies are passed into the callback as nil values. // // If the caller wishes to re-use an existing boltdb transaction, then it // should be passed as the first argument. Otherwise the first argument should @@ -1805,44 +1825,32 @@ func (l *LightningNode) ForEachChannel(tx *bolt.Tx, // as its prefix. This indicates that we've stepped over into // another node's edges, so we can terminate our scan. edgeCursor := edges.Cursor() - for nodeEdge, edgeInfo := edgeCursor.Seek(nodeStart[:]); bytes.HasPrefix(nodeEdge, nodePub); nodeEdge, edgeInfo = edgeCursor.Next() { - // If the prefix still matches, then the value is the - // raw edge information. So we can now serialize the - // edge info and fetch the outgoing node in order to - // retrieve the full channel edge. - edgeReader := bytes.NewReader(edgeInfo) - toEdgePolicy, err := deserializeChanEdgePolicy(edgeReader, nodes) - if err != nil { - return err - } - toEdgePolicy.db = l.db - toEdgePolicy.Node.db = l.db - + for nodeEdge, _ := edgeCursor.Seek(nodeStart[:]); bytes.HasPrefix(nodeEdge, nodePub); nodeEdge, _ = edgeCursor.Next() { + // If the prefix still matches, the channel id is + // returned in nodeEdge. Channel id is used to lookup + // the node at the other end of the channel and both + // edge policies. chanID := nodeEdge[33:] edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID) if err != nil { return err } - // We'll also fetch the incoming edge so this - // information can be available to the caller. - incomingNode := toEdgePolicy.Node.PubKeyBytes[:] - fromEdgePolicy, err := fetchChanEdgePolicy( - edges, chanID, incomingNode, nodes, + outgoingPolicy, err := fetchChanEdgePolicy( + edges, chanID, nodePub, nodes, ) - if err != nil && err != ErrEdgeNotFound && - err != ErrGraphNodeNotFound { + + otherNode, err := edgeInfo.OtherNodeKeyBytes(nodePub) + if err != nil { return err } - if fromEdgePolicy != nil { - fromEdgePolicy.db = l.db - if fromEdgePolicy.Node != nil { - fromEdgePolicy.Node.db = l.db - } - } + + incomingPolicy, err := fetchChanEdgePolicy( + edges, chanID, otherNode, nodes, + ) // Finally, we execute the callback. - err = cb(tx, &edgeInfo, toEdgePolicy, fromEdgePolicy) + err = cb(tx, &edgeInfo, outgoingPolicy, incomingPolicy) if err != nil { return err } @@ -2016,6 +2024,21 @@ func (c *ChannelEdgeInfo) BitcoinKey2() (*btcec.PublicKey, error) { return key, nil } +// OtherNodeKeyBytes returns the node key bytes of the other end of +// the channel. +func (c *ChannelEdgeInfo) OtherNodeKeyBytes(thisNodeKey []byte) ( + []byte, error) { + + switch { + case bytes.Equal(c.NodeKey1Bytes[:], thisNodeKey): + return c.NodeKey2Bytes[:], nil + case bytes.Equal(c.NodeKey2Bytes[:], thisNodeKey): + return c.NodeKey1Bytes[:], nil + default: + return nil, fmt.Errorf("Node not participating in this channel") + } +} + // ChannelAuthProof is the authentication proof (the signature portion) for a // channel. Using the four signatures contained in the struct, and some // auxiliary knowledge (the funding script, node identities, and outpoint) nodes @@ -2871,7 +2894,11 @@ func putChanEdgePolicy(edges *bolt.Bucket, edge *ChannelEdgePolicy, from, to []b // If there was already an entry for this edge, then we'll need to // delete the old one to ensure we don't leave around any after-images. - if edgeBytes := edges.Get(edgeKey[:]); edgeBytes != nil { + // An unknown policy value does not have a update time recorded, so + // it also does not need to be removed. + if edgeBytes := edges.Get(edgeKey[:]); edgeBytes != nil && + !bytes.Equal(edgeBytes[:], unknownPolicy) { + // In order to delete the old entry, we'll need to obtain the // *prior* update time in order to delete it. To do this, we'll // create an offset to slice in. Starting backwards, we'll @@ -2899,6 +2926,23 @@ func putChanEdgePolicy(edges *bolt.Bucket, edge *ChannelEdgePolicy, from, to []b return edges.Put(edgeKey[:], b.Bytes()[:]) } +// putChanEdgePolicyUnknown marks the edge policy as unknown +// in the edges bucket. +func putChanEdgePolicyUnknown(edges *bolt.Bucket, channelID uint64, + from []byte) error { + + var edgeKey [33 + 8]byte + copy(edgeKey[:], from) + byteOrder.PutUint64(edgeKey[33:], channelID) + + if edges.Get(edgeKey[:]) != nil { + return fmt.Errorf("Cannot write unknown policy for channel %v "+ + " when there is already a policy present", channelID) + } + + return edges.Put(edgeKey[:], unknownPolicy) +} + func fetchChanEdgePolicy(edges *bolt.Bucket, chanID []byte, nodePub []byte, nodes *bolt.Bucket) (*ChannelEdgePolicy, error) { @@ -2911,6 +2955,11 @@ func fetchChanEdgePolicy(edges *bolt.Bucket, chanID []byte, return nil, ErrEdgeNotFound } + // No need to deserialize unknown policy. + if bytes.Equal(edgeBytes[:], unknownPolicy) { + return nil, nil + } + edgeReader := bytes.NewReader(edgeBytes) return deserializeChanEdgePolicy(edgeReader, nodes) @@ -2930,7 +2979,7 @@ func fetchChanEdgePolicies(edgeIndex *bolt.Bucket, edges *bolt.Bucket, // something other than edge non-existence. node1Pub := edgeInfo[:33] edge1, err := fetchChanEdgePolicy(edges, chanID, node1Pub, nodes) - if err != nil && err != ErrEdgeNotFound { + if err != nil { return nil, nil, err } @@ -2945,7 +2994,7 @@ func fetchChanEdgePolicies(edgeIndex *bolt.Bucket, edges *bolt.Bucket, // half of the edge information. node2Pub := edgeInfo[33:67] edge2, err := fetchChanEdgePolicy(edges, chanID, node2Pub, nodes) - if err != nil && err != ErrEdgeNotFound { + if err != nil { return nil, nil, err } diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 7c8287600..b56847423 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -350,6 +350,15 @@ func TestEdgeInsertionDeletion(t *testing.T) { t.Fatalf("unable to create channel edge: %v", err) } + // Ensure that both policies are returned as unknown (nil). + _, e1, e2, err := graph.FetchChannelEdgesByID(chanID) + if err != nil { + t.Fatalf("unable to fetch channel edge") + } + if e1 != nil || e2 != nil { + t.Fatalf("channel edges not unknown") + } + // Next, attempt to delete the edge from the database, again this // should proceed without any issues. if err := graph.DeleteChannelEdge(&outpoint); err != nil { @@ -918,6 +927,12 @@ func TestGraphTraversal(t *testing.T) { err = firstNode.ForEachChannel(nil, func(_ *bolt.Tx, _ *ChannelEdgeInfo, outEdge, inEdge *ChannelEdgePolicy) error { + // All channels between first and second node should have fully + // (both sides) specified policies. + if inEdge == nil || outEdge == nil { + return fmt.Errorf("channel policy not present") + } + // Each should indicate that it's outgoing (pointed // towards the second node). if !bytes.Equal(outEdge.Node.PubKeyBytes[:], secondNode.PubKeyBytes[:]) { @@ -1941,6 +1956,119 @@ func TestFetchChanInfos(t *testing.T) { } } +// TestIncompleteChannelPolicies tests that a channel that only has a policy +// specified on one end is properly returned in ForEachChannel calls from +// both sides. +func TestIncompleteChannelPolicies(t *testing.T) { + t.Parallel() + + db, cleanUp, err := makeTestDB() + defer cleanUp() + if err != nil { + t.Fatalf("unable to make test database: %v", err) + } + + graph := db.ChannelGraph() + + // Create two nodes. + node1, err := createTestVertex(db) + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + if err := graph.AddLightningNode(node1); err != nil { + t.Fatalf("unable to add node: %v", err) + } + node2, err := createTestVertex(db) + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + if err := graph.AddLightningNode(node2); err != nil { + t.Fatalf("unable to add node: %v", err) + } + + // Create channel between nodes. + txHash := sha256.Sum256([]byte{0}) + op := wire.OutPoint{ + Hash: txHash, + Index: 0, + } + + channel, chanID := createEdge( + uint32(0), 0, 0, 0, node1, node2, + ) + + if err := graph.AddChannelEdge(&channel); err != nil { + t.Fatalf("unable to create channel edge: %v", err) + } + + // Ensure that channel is reported with unknown policies. + + checkPolicies := func(node *LightningNode, expectedIn, expectedOut bool) { + calls := 0 + node.ForEachChannel(nil, func(_ *bolt.Tx, _ *ChannelEdgeInfo, + outEdge, inEdge *ChannelEdgePolicy) error { + + if !expectedOut && outEdge != nil { + t.Fatalf("Expected no outgoing policy") + } + + if expectedOut && outEdge == nil { + t.Fatalf("Expected an outgoing policy") + } + + if !expectedIn && inEdge != nil { + t.Fatalf("Expected no incoming policy") + } + + if expectedIn && inEdge == nil { + t.Fatalf("Expected an incoming policy") + } + + calls++ + + return nil + }) + + if calls != 1 { + t.Fatalf("Expected only one callback call") + } + } + + checkPolicies(node2, false, false) + + // Only create an edge policy for node1 and leave the policy for node2 + // unknown. + updateTime := time.Unix(1234, 0) + + edgePolicy := newEdgePolicy( + chanID.ToUint64(), op, db, updateTime.Unix(), + ) + edgePolicy.Flags = 0 + edgePolicy.Node = node2 + edgePolicy.SigBytes = testSig.Serialize() + if err := graph.UpdateEdgePolicy(edgePolicy); err != nil { + t.Fatalf("unable to update edge: %v", err) + } + + checkPolicies(node1, false, true) + checkPolicies(node2, true, false) + + // Create second policy and assert that both policies are reported + // as present. + edgePolicy = newEdgePolicy( + chanID.ToUint64(), op, db, updateTime.Unix(), + ) + edgePolicy.Flags = 1 + edgePolicy.Node = node1 + edgePolicy.SigBytes = testSig.Serialize() + if err := graph.UpdateEdgePolicy(edgePolicy); err != nil { + t.Fatalf("unable to update edge: %v", err) + } + + checkPolicies(node1, true, true) + checkPolicies(node2, true, true) +} + // TestChannelEdgePruningUpdateIndexDeletion tests that once edges are deleted // from the graph, then their entries within the update index are also cleaned // up. diff --git a/channeldb/migrations.go b/channeldb/migrations.go index d2679b393..e8b658ed8 100644 --- a/channeldb/migrations.go +++ b/channeldb/migrations.go @@ -300,3 +300,76 @@ func migrateInvoiceTimeSeriesOutgoingPayments(tx *bolt.Tx) error { return nil } + +// migrateEdgePolicies is a migration function that will update the edges +// bucket. It ensure that edges with unknown policies will also have an entry +// in the bucket. After the migration, there will be two edge entries for +// every channel, regardless of whether the policies are known. +func migrateEdgePolicies(tx *bolt.Tx) error { + nodes := tx.Bucket(nodeBucket) + if nodes == nil { + return nil + } + + edges := tx.Bucket(edgeBucket) + if edges == nil { + return nil + } + + edgeIndex := edges.Bucket(edgeIndexBucket) + if edgeIndex == nil { + return nil + } + + // checkKey gets the policy from the database with a low-level call + // so that it is still possible to distinguish between unknown and + // not present. + checkKey := func(channelId uint64, keyBytes []byte) error { + var channelID [8]byte + byteOrder.PutUint64(channelID[:], channelId) + + _, err := fetchChanEdgePolicy(edges, + channelID[:], keyBytes, nodes) + + if err == ErrEdgeNotFound { + log.Tracef("Adding unknown edge policy present for node %x, channel %v", + keyBytes, channelId) + + err := putChanEdgePolicyUnknown(edges, channelId, keyBytes) + if err != nil { + return err + } + + return nil + } + + return err + } + + // Iterate over all channels and check both edge policies. + err := edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error { + infoReader := bytes.NewReader(edgeInfoBytes) + edgeInfo, err := deserializeChanEdgeInfo(infoReader) + if err != nil { + return err + } + + for _, key := range [][]byte{edgeInfo.NodeKey1Bytes[:], + edgeInfo.NodeKey2Bytes[:]} { + + if err := checkKey(edgeInfo.ChannelID, key); err != nil { + return err + } + } + + return nil + }) + + if err != nil { + return fmt.Errorf("unable to update edge policies: %v", err) + } + + log.Infof("Migration of edge policies complete!") + + return nil +} diff --git a/routing/router.go b/routing/router.go index 888588654..dd4d4d821 100644 --- a/routing/router.go +++ b/routing/router.go @@ -2136,6 +2136,10 @@ func (r *ChannelRouter) ForAllOutgoingChannels(cb func(*channeldb.ChannelEdgeInf return r.selfNode.ForEachChannel(nil, func(_ *bolt.Tx, c *channeldb.ChannelEdgeInfo, e, _ *channeldb.ChannelEdgePolicy) error { + if e == nil { + return fmt.Errorf("Channel from self node has no policy") + } + return cb(c, e) }) } diff --git a/rpcserver.go b/rpcserver.go index acf58921d..faee7607e 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3733,6 +3733,12 @@ func (r *rpcServer) FeeReport(ctx context.Context, err = selfNode.ForEachChannel(nil, func(_ *bolt.Tx, chanInfo *channeldb.ChannelEdgeInfo, edgePolicy, _ *channeldb.ChannelEdgePolicy) error { + // Self node should always have policies for its channels. + if edgePolicy == nil { + return fmt.Errorf("no policy for outgoing channel %v ", + chanInfo.ChannelID) + } + // We'll compute the effective fee rate by converting from a // fixed point fee rate to a floating point fee rate. The fee // rate field in the database the amount of mSAT charged per