diff --git a/autopilot/graph.go b/autopilot/graph.go index 0b062a535..89e03cbdd 100644 --- a/autopilot/graph.go +++ b/autopilot/graph.go @@ -53,6 +53,8 @@ func ChannelGraphFromDatabase(db *channeldb.ChannelGraph) ChannelGraph { // channeldb.LightningNode. The wrapper method implement the autopilot.Node // interface. type dbNode struct { + db *channeldb.ChannelGraph + tx kvdb.RTx node *channeldb.LightningNode @@ -86,31 +88,36 @@ func (d dbNode) Addrs() []net.Addr { // // NOTE: Part of the autopilot.Node interface. func (d dbNode) ForEachChannel(cb func(ChannelEdge) error) error { - return d.node.ForEachChannel(d.tx, func(tx kvdb.RTx, - ei *channeldb.ChannelEdgeInfo, ep, _ *channeldb.ChannelEdgePolicy) error { + return d.db.ForEachNodeChannel(d.tx, d.node.PubKeyBytes, + func(tx kvdb.RTx, ei *channeldb.ChannelEdgeInfo, ep, + _ *channeldb.ChannelEdgePolicy) error { - // Skip channels for which no outgoing edge policy is available. - // - // TODO(joostjager): Ideally the case where channels have a nil - // policy should be supported, as autopilot is not looking at - // the policies. For now, it is not easily possible to get a - // reference to the other end LightningNode object without - // retrieving the policy. - if ep == nil { - return nil - } + // Skip channels for which no outgoing edge policy is + // available. + // + // TODO(joostjager): Ideally the case where channels + // have a nil policy should be supported, as autopilot + // is not looking at the policies. For now, it is not + // easily possible to get a reference to the other end + // LightningNode object without retrieving the policy. + if ep == nil { + return nil + } - edge := ChannelEdge{ - ChanID: lnwire.NewShortChanIDFromInt(ep.ChannelID), - Capacity: ei.Capacity, - Peer: dbNode{ - tx: tx, - node: ep.Node, - }, - } + edge := ChannelEdge{ + ChanID: lnwire.NewShortChanIDFromInt( + ep.ChannelID, + ), + Capacity: ei.Capacity, + Peer: dbNode{ + tx: tx, + db: d.db, + node: ep.Node, + }, + } - return cb(edge) - }) + return cb(edge) + }) } // ForEachNode is a higher-order function that should be called once for each @@ -128,6 +135,7 @@ func (d *databaseChannelGraph) ForEachNode(cb func(Node) error) error { } node := dbNode{ + db: d.db, tx: tx, node: n, } @@ -266,6 +274,7 @@ func (d *databaseChannelGraph) addRandChannel(node1, node2 *btcec.PublicKey, ChanID: chanID, Capacity: capacity, Peer: dbNode{ + db: d.db, node: vertex1, }, }, @@ -273,6 +282,7 @@ func (d *databaseChannelGraph) addRandChannel(node1, node2 *btcec.PublicKey, ChanID: chanID, Capacity: capacity, Peer: dbNode{ + db: d.db, node: vertex2, }, }, diff --git a/channeldb/graph.go b/channeldb/graph.go index 8367aaf68..00be88d18 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -459,16 +459,14 @@ func (c *ChannelGraph) ForEachChannel(cb func(*ChannelEdgeInfo, }, func() {}) } -// ForEachNodeChannel iterates through all channels of a given node, executing the -// passed callback with an edge info structure and the policies of each end -// of the channel. The first edge policy is the outgoing edge *to* the -// connecting node, while the second is the incoming edge *from* the -// connecting node. If the callback returns an error, then the iteration is -// halted with the error propagated back up to the caller. +// ForEachNodeDirectedChannel iterates through all channels of a given node, +// executing the passed callback on the directed edge representing the channel +// and its incoming policy. If the callback returns an error, then the iteration +// is halted with the error propagated back up to the caller. // // Unknown policies are passed into the callback as nil values. -func (c *ChannelGraph) ForEachNodeChannel(tx kvdb.RTx, node route.Vertex, - cb func(channel *DirectedChannel) error) error { +func (c *ChannelGraph) ForEachNodeDirectedChannel(tx kvdb.RTx, + node route.Vertex, cb func(channel *DirectedChannel) error) error { if c.graphCache != nil { return c.graphCache.ForEachChannel(node, cb) @@ -557,44 +555,49 @@ func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex, return c.ForEachNode(func(tx kvdb.RTx, node *LightningNode) error { channels := make(map[uint64]*DirectedChannel) - err := node.ForEachChannel(tx, func(tx kvdb.RTx, - e *ChannelEdgeInfo, p1 *ChannelEdgePolicy, - p2 *ChannelEdgePolicy) error { + err := c.ForEachNodeChannel(tx, node.PubKeyBytes, + func(tx kvdb.RTx, e *ChannelEdgeInfo, + p1 *ChannelEdgePolicy, + p2 *ChannelEdgePolicy) error { - toNodeCallback := func() route.Vertex { - return node.PubKeyBytes - } - toNodeFeatures, err := c.FetchNodeFeatures( - node.PubKeyBytes, - ) - if err != nil { - return err - } + toNodeCallback := func() route.Vertex { + return node.PubKeyBytes + } + toNodeFeatures, err := c.FetchNodeFeatures( + node.PubKeyBytes, + ) + if err != nil { + return err + } - var cachedInPolicy *CachedEdgePolicy - if p2 != nil { - cachedInPolicy := NewCachedPolicy(p2) - cachedInPolicy.ToNodePubKey = toNodeCallback - cachedInPolicy.ToNodeFeatures = toNodeFeatures - } + var cachedInPolicy *CachedEdgePolicy + if p2 != nil { + cachedInPolicy := NewCachedPolicy(p2) + cachedInPolicy.ToNodePubKey = + toNodeCallback + cachedInPolicy.ToNodeFeatures = + toNodeFeatures + } - directedChannel := &DirectedChannel{ - ChannelID: e.ChannelID, - IsNode1: node.PubKeyBytes == e.NodeKey1Bytes, - OtherNode: e.NodeKey2Bytes, - Capacity: e.Capacity, - OutPolicySet: p1 != nil, - InPolicy: cachedInPolicy, - } + directedChannel := &DirectedChannel{ + ChannelID: e.ChannelID, + IsNode1: node.PubKeyBytes == + e.NodeKey1Bytes, + OtherNode: e.NodeKey2Bytes, + Capacity: e.Capacity, + OutPolicySet: p1 != nil, + InPolicy: cachedInPolicy, + } - if node.PubKeyBytes == e.NodeKey2Bytes { - directedChannel.OtherNode = e.NodeKey1Bytes - } + if node.PubKeyBytes == e.NodeKey2Bytes { + directedChannel.OtherNode = + e.NodeKey1Bytes + } - channels[e.ChannelID] = directedChannel + channels[e.ChannelID] = directedChannel - return nil - }) + return nil + }) if err != nil { return err } @@ -2740,15 +2743,18 @@ func (l *LightningNode) NodeAnnouncement(signed bool) (*lnwire.NodeAnnouncement, // isPublic determines whether the node is seen as public within the graph from // the source node's point of view. An existing database transaction can also be // specified. -func (l *LightningNode) isPublic(tx kvdb.RTx, sourcePubKey []byte) (bool, error) { +func (c *ChannelGraph) isPublic(tx kvdb.RTx, nodePub route.Vertex, + sourcePubKey []byte) (bool, error) { + // In order to determine whether this node is publicly advertised within // the graph, we'll need to look at all of its edges and check whether // they extend to any other node than the source node. errDone will be // used to terminate the check early. nodeIsPublic := false errDone := errors.New("done") - err := l.ForEachChannel(tx, func(_ kvdb.RTx, info *ChannelEdgeInfo, - _, _ *ChannelEdgePolicy) error { + err := c.ForEachNodeChannel(tx, nodePub, func(tx kvdb.RTx, + info *ChannelEdgeInfo, _ *ChannelEdgePolicy, + _ *ChannelEdgePolicy) error { // If this edge doesn't extend to the source node, we'll // terminate our search as we can now conclude that the node is @@ -3003,10 +3009,10 @@ func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend, return traversal(tx) } -// ForEachChannel iterates through all channels of this node, executing the -// passed callback with an edge info structure and the policies of each end -// of the channel. The first edge policy is the outgoing edge *to* the -// connecting node, while the second is the incoming edge *from* the +// ForEachNodeChannel iterates through all channels of the given node, +// executing the passed callback with an edge info structure and the policies +// of each end of the channel. The first edge policy is the outgoing edge *to* +// the connecting node, while the second is the incoming edge *from* the // connecting node. If the callback returns an error, then the iteration is // halted with the error propagated back up to the caller. // @@ -3016,14 +3022,11 @@ func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend, // should be passed as the first argument. Otherwise the first argument should // be nil and a fresh transaction will be created to execute the graph // traversal. -func (l *LightningNode) ForEachChannel(tx kvdb.RTx, +func (c *ChannelGraph) ForEachNodeChannel(tx kvdb.RTx, nodePub route.Vertex, cb func(kvdb.RTx, *ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy) error) error { - nodePub := l.PubKeyBytes[:] - db := l.db - - return nodeTraversal(tx, nodePub, db, cb) + return nodeTraversal(tx, nodePub[:], c.db, cb) } // ChannelEdgeInfo represents a fully authenticated channel along with all its @@ -3718,7 +3721,7 @@ func (c *ChannelGraph) IsPublicNode(pubKey [33]byte) (bool, error) { return err } - nodeIsPublic, err = node.isPublic(tx, ourPubKey) + nodeIsPublic, err = c.isPublic(tx, node.PubKeyBytes, ourPubKey) return err }, func() { nodeIsPublic = false diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 0afc9ea19..1455763a7 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -1056,30 +1056,39 @@ func TestGraphTraversal(t *testing.T) { // outgoing channels for a particular node. numNodeChans := 0 firstNode, secondNode := nodeList[0], nodeList[1] - err = firstNode.ForEachChannel(nil, func(_ kvdb.RTx, _ *ChannelEdgeInfo, - outEdge, inEdge *ChannelEdgePolicy) error { + err = graph.ForEachNodeChannel(nil, firstNode.PubKeyBytes, + func(_ kvdb.RTx, _ *ChannelEdgeInfo, outEdge, + inEdge *ChannelEdgePolicy) error { - // All channels between first and second node should have fully - // (both sides) specified policies. - if inEdge == nil || outEdge == nil { - return fmt.Errorf("channel policy not present") - } + // All channels between first and second node should + // have fully (both sides) specified policies. + if inEdge == nil || outEdge == nil { + return fmt.Errorf("channel policy not present") + } - // Each should indicate that it's outgoing (pointed - // towards the second node). - if !bytes.Equal(outEdge.Node.PubKeyBytes[:], secondNode.PubKeyBytes[:]) { - return fmt.Errorf("wrong outgoing edge") - } + // Each should indicate that it's outgoing (pointed + // towards the second node). + if !bytes.Equal( + outEdge.Node.PubKeyBytes[:], + secondNode.PubKeyBytes[:], + ) { - // The incoming edge should also indicate that it's pointing to - // the origin node. - if !bytes.Equal(inEdge.Node.PubKeyBytes[:], firstNode.PubKeyBytes[:]) { - return fmt.Errorf("wrong outgoing edge") - } + return fmt.Errorf("wrong outgoing edge") + } - numNodeChans++ - return nil - }) + // The incoming edge should also indicate that it's + // pointing to the origin node. + if !bytes.Equal( + inEdge.Node.PubKeyBytes[:], + firstNode.PubKeyBytes[:], + ) { + + return fmt.Errorf("wrong outgoing edge") + } + + numNodeChans++ + return nil + }) require.NoError(t, err) require.Equal(t, numChannels, numNodeChans) } @@ -2280,29 +2289,30 @@ func TestIncompleteChannelPolicies(t *testing.T) { // Ensure that channel is reported with unknown policies. checkPolicies := func(node *LightningNode, expectedIn, expectedOut bool) { calls := 0 - err := node.ForEachChannel(nil, func(_ kvdb.RTx, _ *ChannelEdgeInfo, - outEdge, inEdge *ChannelEdgePolicy) error { + err := graph.ForEachNodeChannel(nil, node.PubKeyBytes, + func(_ kvdb.RTx, _ *ChannelEdgeInfo, outEdge, + inEdge *ChannelEdgePolicy) error { - if !expectedOut && outEdge != nil { - t.Fatalf("Expected no outgoing policy") - } + if !expectedOut && outEdge != nil { + t.Fatalf("Expected no outgoing policy") + } - if expectedOut && outEdge == nil { - t.Fatalf("Expected an outgoing policy") - } + if expectedOut && outEdge == nil { + t.Fatalf("Expected an outgoing policy") + } - if !expectedIn && inEdge != nil { - t.Fatalf("Expected no incoming policy") - } + if !expectedIn && inEdge != nil { + t.Fatalf("Expected no incoming policy") + } - if expectedIn && inEdge == nil { - t.Fatalf("Expected an incoming policy") - } + if expectedIn && inEdge == nil { + t.Fatalf("Expected an incoming policy") + } - calls++ + calls++ - return nil - }) + return nil + }) if err != nil { t.Fatalf("unable to scan channels: %v", err) } @@ -3470,8 +3480,8 @@ func BenchmarkForEachChannel(b *testing.B) { } } -// TestGraphCacheForEachNodeChannel tests that the ForEachNodeChannel method -// works as expected, and is able to handle nil self edges. +// TestGraphCacheForEachNodeChannel tests that the ForEachNodeDirectedChannel +// method works as expected, and is able to handle nil self edges. func TestGraphCacheForEachNodeChannel(t *testing.T) { graph, err := MakeTestGraph(t) require.NoError(t, err) @@ -3498,11 +3508,13 @@ func TestGraphCacheForEachNodeChannel(t *testing.T) { // We should be able to accumulate the single channel added, even // though we have a nil edge policy here. var numChans int - err = graph.ForEachNodeChannel(nil, node1.PubKeyBytes, - func(channel *DirectedChannel) error { + err = graph.ForEachNodeDirectedChannel(nil, node1.PubKeyBytes, + func(_ *DirectedChannel) error { numChans++ + return nil - }) + }, + ) require.NoError(t, err) require.Equal(t, numChans, 1) diff --git a/routing/graph.go b/routing/graph.go index 23bdd41e2..7f344f54a 100644 --- a/routing/graph.go +++ b/routing/graph.go @@ -75,7 +75,7 @@ func (g *CachedGraph) Close() error { func (g *CachedGraph) forEachNodeChannel(nodePub route.Vertex, cb func(channel *channeldb.DirectedChannel) error) error { - return g.graph.ForEachNodeChannel(g.tx, nodePub, cb) + return g.graph.ForEachNodeDirectedChannel(g.tx, nodePub, cb) } // sourceNode returns the source node of the graph. diff --git a/routing/router.go b/routing/router.go index 827e21864..1eca002dd 100644 --- a/routing/router.go +++ b/routing/router.go @@ -2824,16 +2824,19 @@ func (r *ChannelRouter) ForEachNode( func (r *ChannelRouter) ForAllOutgoingChannels(cb func(kvdb.RTx, *channeldb.ChannelEdgeInfo, *channeldb.ChannelEdgePolicy) error) error { - return r.selfNode.ForEachChannel(nil, func(tx kvdb.RTx, - c *channeldb.ChannelEdgeInfo, - e, _ *channeldb.ChannelEdgePolicy) error { + return r.cfg.Graph.ForEachNodeChannel(nil, r.selfNode.PubKeyBytes, + func(tx kvdb.RTx, c *channeldb.ChannelEdgeInfo, + e *channeldb.ChannelEdgePolicy, + _ *channeldb.ChannelEdgePolicy) error { - if e == nil { - return fmt.Errorf("channel from self node has no policy") - } + if e == nil { + return fmt.Errorf("channel from self node " + + "has no policy") + } - return cb(tx, c, e) - }) + return cb(tx, c, e) + }, + ) } // AddProof updates the channel edge info with proof which is needed to diff --git a/rpcserver.go b/rpcserver.go index 85a156cf9..67337e08d 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -6151,30 +6151,33 @@ func (r *rpcServer) GetNodeInfo(ctx context.Context, channels []*lnrpc.ChannelEdge ) - if err := node.ForEachChannel(nil, func(_ kvdb.RTx, - edge *channeldb.ChannelEdgeInfo, - c1, c2 *channeldb.ChannelEdgePolicy) error { + err = graph.ForEachNodeChannel(nil, node.PubKeyBytes, + func(_ kvdb.RTx, edge *channeldb.ChannelEdgeInfo, + c1, c2 *channeldb.ChannelEdgePolicy) error { - numChannels++ - totalCapacity += edge.Capacity + numChannels++ + totalCapacity += edge.Capacity - // Only populate the node's channels if the user requested them. - if in.IncludeChannels { - // Do not include unannounced channels - private - // channels or public channels whose authentication - // proof were not confirmed yet. - if edge.AuthProof == nil { - return nil + // Only populate the node's channels if the user + // requested them. + if in.IncludeChannels { + // Do not include unannounced channels - private + // channels or public channels whose + // authentication proof were not confirmed yet. + if edge.AuthProof == nil { + return nil + } + + // Convert the database's edge format into the + // network/RPC edge format. + channelEdge := marshalDbEdge(edge, c1, c2) + channels = append(channels, channelEdge) } - // Convert the database's edge format into the - // network/RPC edge format. - channelEdge := marshalDbEdge(edge, c1, c2) - channels = append(channels, channelEdge) - } - - return nil - }); err != nil { + return nil + }, + ) + if err != nil { return nil, err } @@ -6763,34 +6766,39 @@ func (r *rpcServer) FeeReport(ctx context.Context, } var feeReports []*lnrpc.ChannelFeeReport - err = selfNode.ForEachChannel(nil, func(_ kvdb.RTx, chanInfo *channeldb.ChannelEdgeInfo, - edgePolicy, _ *channeldb.ChannelEdgePolicy) error { + err = channelGraph.ForEachNodeChannel(nil, selfNode.PubKeyBytes, + func(_ kvdb.RTx, chanInfo *channeldb.ChannelEdgeInfo, + edgePolicy, _ *channeldb.ChannelEdgePolicy) error { - // Self node should always have policies for its channels. - if edgePolicy == nil { - return fmt.Errorf("no policy for outgoing channel %v ", - chanInfo.ChannelID) - } + // Self node should always have policies for its + // channels. + if edgePolicy == nil { + return fmt.Errorf("no policy for outgoing "+ + "channel %v ", chanInfo.ChannelID) + } - // We'll compute the effective fee rate by converting from a - // fixed point fee rate to a floating point fee rate. The fee - // rate field in the database the amount of mSAT charged per - // 1mil mSAT sent, so will divide by this to get the proper fee - // rate. - feeRateFixedPoint := edgePolicy.FeeProportionalMillionths - feeRate := float64(feeRateFixedPoint) / feeBase + // We'll compute the effective fee rate by converting + // from a fixed point fee rate to a floating point fee + // rate. The fee rate field in the database the amount + // of mSAT charged per 1mil mSAT sent, so will divide by + // this to get the proper fee rate. + feeRateFixedPoint := + edgePolicy.FeeProportionalMillionths + feeRate := float64(feeRateFixedPoint) / feeBase - // TODO(roasbeef): also add stats for revenue for each channel - feeReports = append(feeReports, &lnrpc.ChannelFeeReport{ - ChanId: chanInfo.ChannelID, - ChannelPoint: chanInfo.ChannelPoint.String(), - BaseFeeMsat: int64(edgePolicy.FeeBaseMSat), - FeePerMil: int64(feeRateFixedPoint), - FeeRate: feeRate, - }) + // TODO(roasbeef): also add stats for revenue for each + // channel + feeReports = append(feeReports, &lnrpc.ChannelFeeReport{ + ChanId: chanInfo.ChannelID, + ChannelPoint: chanInfo.ChannelPoint.String(), + BaseFeeMsat: int64(edgePolicy.FeeBaseMSat), + FeePerMil: int64(feeRateFixedPoint), + FeeRate: feeRate, + }) - return nil - }) + return nil + }, + ) if err != nil { return nil, err } diff --git a/server.go b/server.go index f3419a446..5d3eea6b0 100644 --- a/server.go +++ b/server.go @@ -3092,7 +3092,7 @@ func (s *server) establishPersistentConnections() error { // TODO(roasbeef): instead iterate over link nodes and query graph for // each of the nodes. selfPub := s.identityECDH.PubKey().SerializeCompressed() - err = sourceNode.ForEachChannel(nil, func( + err = s.graphDB.ForEachNodeChannel(nil, sourceNode.PubKeyBytes, func( tx kvdb.RTx, chanInfo *channeldb.ChannelEdgeInfo, policy, _ *channeldb.ChannelEdgePolicy) error {