routing: remove the mutex and use SyncMap instead

This commit removes the lock used to protect the map `topologyClients`
and replace the map with a `SyncMap`.
This commit is contained in:
yyforyongyu
2023-01-19 07:06:34 +08:00
parent 692cd4bc4f
commit fa2c766866
2 changed files with 26 additions and 30 deletions

View File

@@ -10,7 +10,6 @@ import (
"github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors" "github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
@@ -117,23 +116,19 @@ type topologyClient struct {
// notifyTopologyChange notifies all registered clients of a new change in // notifyTopologyChange notifies all registered clients of a new change in
// graph topology in a non-blocking. // graph topology in a non-blocking.
func (r *ChannelRouter) notifyTopologyChange(topologyDiff *TopologyChange) { func (r *ChannelRouter) notifyTopologyChange(topologyDiff *TopologyChange) {
r.RLock()
defer r.RUnlock()
numClients := len(r.topologyClients) // notifyClient is a helper closure that will send topology updates to
if numClients == 0 { // the given client.
return notifyClient := func(clientID uint64, client *topologyClient) bool {
}
log.Tracef("Sending topology notification to %v clients %v",
numClients, newLogClosure(func() string {
return spew.Sdump(topologyDiff)
}),
)
for _, client := range r.topologyClients {
client.wg.Add(1) client.wg.Add(1)
log.Tracef("Sending topology notification to client=%v, "+
"NodeUpdates=%v, ChannelEdgeUpdates=%v, "+
"ClosedChannels=%v", clientID,
len(topologyDiff.NodeUpdates),
len(topologyDiff.ChannelEdgeUpdates),
len(topologyDiff.ClosedChannels))
go func(c *topologyClient) { go func(c *topologyClient) {
defer c.wg.Done() defer c.wg.Done()
@@ -153,7 +148,15 @@ func (r *ChannelRouter) notifyTopologyChange(topologyDiff *TopologyChange) {
} }
}(client) }(client)
// Always return true here so the following Range will iterate
// all clients.
return true
} }
// Range over the set of active clients, and attempt to send the
// topology updates.
r.topologyClients.Range(notifyClient)
} }
// TopologyChange represents a new set of modifications to the channel graph. // TopologyChange represents a new set of modifications to the channel graph.

View File

@@ -25,6 +25,7 @@ import (
"github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnutils"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/btcwallet" "github.com/lightningnetwork/lnd/lnwallet/btcwallet"
"github.com/lightningnetwork/lnd/lnwallet/chanvalidate" "github.com/lightningnetwork/lnd/lnwallet/chanvalidate"
@@ -425,7 +426,7 @@ type ChannelRouter struct {
// topologyClients maps a client's unique notification ID to a // topologyClients maps a client's unique notification ID to a
// topologyClient client that contains its notification dispatch // topologyClient client that contains its notification dispatch
// channel. // channel.
topologyClients map[uint64]*topologyClient topologyClients *lnutils.SyncMap[uint64, *topologyClient]
// ntfnClientUpdates is a channel that's used to send new updates to // ntfnClientUpdates is a channel that's used to send new updates to
// topology notification clients to the ChannelRouter. Updates either // topology notification clients to the ChannelRouter. Updates either
@@ -446,8 +447,6 @@ type ChannelRouter struct {
// announcements over a window of defaultStatInterval. // announcements over a window of defaultStatInterval.
stats *routerStats stats *routerStats
sync.RWMutex
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
} }
@@ -474,7 +473,7 @@ func New(cfg Config) (*ChannelRouter, error) {
source: selfNode.PubKeyBytes, source: selfNode.PubKeyBytes,
}, },
networkUpdates: make(chan *routingMsg), networkUpdates: make(chan *routingMsg),
topologyClients: make(map[uint64]*topologyClient), topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
ntfnClientUpdates: make(chan *topologyClientUpdate), ntfnClientUpdates: make(chan *topologyClientUpdate),
channelEdgeMtx: multimutex.NewMutex(), channelEdgeMtx: multimutex.NewMutex(),
selfNode: selfNode, selfNode: selfNode,
@@ -1203,14 +1202,10 @@ func (r *ChannelRouter) networkHandler() {
clientID := ntfnUpdate.clientID clientID := ntfnUpdate.clientID
if ntfnUpdate.cancel { if ntfnUpdate.cancel {
r.RLock() client, ok := r.topologyClients.LoadAndDelete(
client, ok := r.topologyClients[ntfnUpdate.clientID] clientID,
r.RUnlock() )
if ok { if ok {
r.Lock()
delete(r.topologyClients, clientID)
r.Unlock()
close(client.exit) close(client.exit)
client.wg.Wait() client.wg.Wait()
@@ -1220,12 +1215,10 @@ func (r *ChannelRouter) networkHandler() {
continue continue
} }
r.Lock() r.topologyClients.Store(clientID, &topologyClient{
r.topologyClients[ntfnUpdate.clientID] = &topologyClient{
ntfnChan: ntfnUpdate.ntfnChan, ntfnChan: ntfnUpdate.ntfnChan,
exit: make(chan struct{}), exit: make(chan struct{}),
} })
r.Unlock()
// The graph prune ticker has ticked, so we'll examine the // The graph prune ticker has ticked, so we'll examine the
// state of the known graph to filter out any zombie channels // state of the known graph to filter out any zombie channels