routing: fix bug when sending topology ntfns to multiple clients

This commit fixes a bug which was originally introduced when the
topology notifications were added to the channel router. The issue was
that a pointer to the loop-scope range variable was being passed into
the goroutine which dispatches the notification rather than the value
itself. It seems that the memory location is re-used between range
iterations causing the same client to receive _all_ the notifications.

This bug is fixed by passing a copy of the client struct rather than a
pointer to the range variable.

In the process, we also add some additional debug logging messages, and
remove the Curve parameter from any public keys involved in a
notification so the pretty print properly.
This commit is contained in:
Olaoluwa Osuntokun
2017-03-14 20:02:33 -07:00
parent a179a3adbb
commit 8ed79ae497

View File

@@ -6,6 +6,7 @@ import (
"net" "net"
"sync/atomic" "sync/atomic"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/btcec"
@@ -54,11 +55,14 @@ type topologyClientUpdate struct {
// nodes appearing, node updating their attributes, new channels, channels // nodes appearing, node updating their attributes, new channels, channels
// closing, and updates in the routing policies of a channel's directed edges. // closing, and updates in the routing policies of a channel's directed edges.
func (r *ChannelRouter) SubscribeTopology() (*TopologyClient, error) { func (r *ChannelRouter) SubscribeTopology() (*TopologyClient, error) {
// We'll first atomitcally obtain the next ID for this client from the // We'll first atomically obtain the next ID for this client from the
// incrementing client ID counter. // incrementing client ID counter.
clientID := atomic.AddUint64(&r.ntfnClientCounter, 1) clientID := atomic.AddUint64(&r.ntfnClientCounter, 1)
ntfnChan := make(chan *TopologyChange) log.Debugf("New graph topology client subscription, client %v",
clientID)
ntfnChan := make(chan *TopologyChange, 10)
select { select {
case r.ntfnClientUpdates <- &topologyClientUpdate{ case r.ntfnClientUpdates <- &topologyClientUpdate{
@@ -103,8 +107,15 @@ type topologyClient struct {
// notifyTopologyChange notifies all registered clients of a new change in // notifyTopologyChange notifies all registered clients of a new change in
// graph topology in a non-blocking. // graph topology in a non-blocking.
func (r *ChannelRouter) notifyTopologyChange(topologyDiff *TopologyChange) { func (r *ChannelRouter) notifyTopologyChange(topologyDiff *TopologyChange) {
log.Tracef("Sending topology notification to %v clients %v",
len(r.topologyClients),
newLogClosure(func() string {
return spew.Sdump(topologyDiff)
}),
)
for _, client := range r.topologyClients { for _, client := range r.topologyClients {
go func(c *topologyClient) { go func(c topologyClient) {
select { select {
// In this case we'll try to send the notification // In this case we'll try to send the notification
@@ -120,7 +131,7 @@ func (r *ChannelRouter) notifyTopologyChange(topologyDiff *TopologyChange) {
case <-r.quit: case <-r.quit:
} }
}(&client) }(client)
} }
} }
@@ -271,11 +282,14 @@ func addToTopologyChange(graph *channeldb.ChannelGraph, update *TopologyChange,
// Any node announcement maps directly to a NetworkNodeUpdate struct. // Any node announcement maps directly to a NetworkNodeUpdate struct.
// No further data munging or db queries are required. // No further data munging or db queries are required.
case *lnwire.NodeAnnouncement: case *lnwire.NodeAnnouncement:
update.NodeUpdates = append(update.NodeUpdates, &NetworkNodeUpdate{ nodeUpdate := &NetworkNodeUpdate{
Addresses: []net.Addr{m.Address}, Addresses: []net.Addr{m.Address},
IdentityKey: m.NodeID, IdentityKey: m.NodeID,
Alias: m.Alias.String(), Alias: m.Alias.String(),
}) }
nodeUpdate.IdentityKey.Curve = nil
update.NodeUpdates = append(update.NodeUpdates, nodeUpdate)
return nil return nil
// We ignore initial channel announcements as we'll only send out // We ignore initial channel announcements as we'll only send out
@@ -315,6 +329,8 @@ func addToTopologyChange(graph *channeldb.ChannelGraph, update *TopologyChange,
AdvertisingNode: sourceNode, AdvertisingNode: sourceNode,
ConnectingNode: connectingNode, ConnectingNode: connectingNode,
} }
edgeUpdate.AdvertisingNode.Curve = nil
edgeUpdate.ConnectingNode.Curve = nil
// TODO(roasbeef): add bit to toggle // TODO(roasbeef): add bit to toggle
update.ChannelEdgeUpdates = append(update.ChannelEdgeUpdates, update.ChannelEdgeUpdates = append(update.ChannelEdgeUpdates,