Merge pull request #7330 from yyforyongyu/routing-rm-mtx

routing: remove the mutex lock inside `ChannelRouter`
This commit is contained in:
Oliver Gugger 2023-01-19 16:15:02 +01:00 committed by GitHub
commit 0eb07c944c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 322 additions and 54 deletions

View File

@ -18,10 +18,10 @@ import (
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnutils"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/multimutex"
@ -1185,7 +1185,7 @@ func (d *AuthenticatedGossiper) sendBatch(annBatch []msgWithSenders,
// skip the filter and dedup logic below, and just send the
// announcements out to all our coonnected peers.
if isLocal {
msgsToSend := fn.Map(
msgsToSend := lnutils.Map(
annBatch, func(m msgWithSenders) lnwire.Message {
return m.msg
},

View File

@ -222,6 +222,9 @@ data.
* [Fix race condition in
`TestUpdatePaymentState`](https://github.com/lightningnetwork/lnd/pull/7336)
* [Decreased the mutex lock
scope](https://github.com/lightningnetwork/lnd/pull/7330) inside `ChannelRouter`.
## `lncli`
* [Add an `insecure` flag to skip tls auth as well as a `metadata` string slice

View File

@ -10,6 +10,7 @@ import (
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
"github.com/lightningnetwork/lnd/lntemp/rpc"
"github.com/lightningnetwork/lnd/lnutils"
)
type (
@ -153,18 +154,18 @@ type State struct {
// openChans records each opened channel and how many times it has
// heard the announcements from its graph subscription.
openChans *SyncMap[wire.OutPoint, []*OpenChannelUpdate]
openChans *lnutils.SyncMap[wire.OutPoint, []*OpenChannelUpdate]
// closedChans records each closed channel and its close channel update
// message received from its graph subscription.
closedChans *SyncMap[wire.OutPoint, *lnrpc.ClosedChannelUpdate]
closedChans *lnutils.SyncMap[wire.OutPoint, *lnrpc.ClosedChannelUpdate]
// numChanUpdates records the number of channel updates seen by each
// channel.
numChanUpdates *SyncMap[wire.OutPoint, int]
numChanUpdates *lnutils.SyncMap[wire.OutPoint, int]
// nodeUpdates records the node announcements seen by each node.
nodeUpdates *SyncMap[string, []*lnrpc.NodeUpdate]
nodeUpdates *lnutils.SyncMap[string, []*lnrpc.NodeUpdate]
// policyUpdates defines a type to store channel policy updates. It has
// the format,
@ -179,21 +180,23 @@ type State struct {
// },
// "chanPoint2": ...
// }
policyUpdates *SyncMap[wire.OutPoint, PolicyUpdate]
policyUpdates *lnutils.SyncMap[wire.OutPoint, PolicyUpdate]
}
// newState initialize a new state with every field being set to its zero
// value.
func newState(rpc *rpc.HarnessRPC) *State {
return &State{
rpc: rpc,
openChans: &SyncMap[wire.OutPoint, []*OpenChannelUpdate]{},
closedChans: &SyncMap[
rpc: rpc,
openChans: &lnutils.SyncMap[
wire.OutPoint, []*OpenChannelUpdate,
]{},
closedChans: &lnutils.SyncMap[
wire.OutPoint, *lnrpc.ClosedChannelUpdate,
]{},
numChanUpdates: &SyncMap[wire.OutPoint, int]{},
nodeUpdates: &SyncMap[string, []*lnrpc.NodeUpdate]{},
policyUpdates: &SyncMap[wire.OutPoint, PolicyUpdate]{},
numChanUpdates: &lnutils.SyncMap[wire.OutPoint, int]{},
nodeUpdates: &lnutils.SyncMap[string, []*lnrpc.NodeUpdate]{},
policyUpdates: &lnutils.SyncMap[wire.OutPoint, PolicyUpdate]{},
}
}
@ -352,9 +355,11 @@ func (s *State) resetEphermalStates(rpc *rpc.HarnessRPC) {
// Reset ephermal states which are used to record info from finished
// tests.
s.openChans = &SyncMap[wire.OutPoint, []*OpenChannelUpdate]{}
s.closedChans = &SyncMap[wire.OutPoint, *lnrpc.ClosedChannelUpdate]{}
s.numChanUpdates = &SyncMap[wire.OutPoint, int]{}
s.nodeUpdates = &SyncMap[string, []*lnrpc.NodeUpdate]{}
s.policyUpdates = &SyncMap[wire.OutPoint, PolicyUpdate]{}
s.openChans = &lnutils.SyncMap[wire.OutPoint, []*OpenChannelUpdate]{}
s.closedChans = &lnutils.SyncMap[
wire.OutPoint, *lnrpc.ClosedChannelUpdate,
]{}
s.numChanUpdates = &lnutils.SyncMap[wire.OutPoint, int]{}
s.nodeUpdates = &lnutils.SyncMap[string, []*lnrpc.NodeUpdate]{}
s.policyUpdates = &lnutils.SyncMap[wire.OutPoint, PolicyUpdate]{}
}

View File

@ -14,6 +14,7 @@ import (
"github.com/lightningnetwork/lnd/lntemp/rpc"
"github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lnutils"
)
type chanWatchType uint8
@ -68,8 +69,8 @@ type nodeWatcher struct {
// of edges seen for that channel within the network. When this number
// reaches 2, then it means that both edge advertisements has
// propagated through the network.
openChanWatchers *SyncMap[wire.OutPoint, []chan struct{}]
closeChanWatchers *SyncMap[wire.OutPoint, []chan struct{}]
openChanWatchers *lnutils.SyncMap[wire.OutPoint, []chan struct{}]
closeChanWatchers *lnutils.SyncMap[wire.OutPoint, []chan struct{}]
wg sync.WaitGroup
}
@ -79,8 +80,12 @@ func newNodeWatcher(rpc *rpc.HarnessRPC, state *State) *nodeWatcher {
rpc: rpc,
state: state,
chanWatchRequests: make(chan *chanWatchRequest, 100),
openChanWatchers: &SyncMap[wire.OutPoint, []chan struct{}]{},
closeChanWatchers: &SyncMap[wire.OutPoint, []chan struct{}]{},
openChanWatchers: &lnutils.SyncMap[
wire.OutPoint, []chan struct{},
]{},
closeChanWatchers: &lnutils.SyncMap[
wire.OutPoint, []chan struct{},
]{},
}
}

4
lnutils/README.md Normal file
View File

@ -0,0 +1,4 @@
# `lnutils`
This package provides fundamental types and utility functions that can be used
by all other packages.

View File

@ -1,4 +1,4 @@
package fn
package lnutils
// Map takes an input slice, and applies the function f to each element,
// yielding a new slice.

View File

@ -1,4 +1,4 @@
package node
package lnutils
import "sync"

View File

@ -0,0 +1,255 @@
package lnutils_test
import (
"sync"
"sync/atomic"
"testing"
"github.com/lightningnetwork/lnd/lnutils"
)
func BenchmarkReadMutexMap(b *testing.B) {
// Create a map with a mutex.
m := make(map[int64]struct{})
// k is the unique key for each goroutine.
k := int64(0)
// Create a general mutex.
var mu sync.Mutex
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Perform a lock read.
mu.Lock()
_ = m[k]
mu.Unlock()
}
})
}
func BenchmarkReadRWMutexMap(b *testing.B) {
// Create a map with a mutex.
m := make(map[int64]struct{})
// k is the unique key for each goroutine.
k := int64(0)
// Create a read write mutex.
var mu sync.RWMutex
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Perform a lock read.
mu.RLock()
_ = m[k]
mu.RUnlock()
}
})
}
func BenchmarkReadSyncMap(b *testing.B) {
// Create a sync.Map.
syncMap := &sync.Map{}
// k is the unique key for each goroutine.
k := int64(0)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Read the value.
syncMap.Load(k)
}
})
}
func BenchmarkReadLndSyncMap(b *testing.B) {
// Create a sync.Map.
syncMap := &lnutils.SyncMap[int64, struct{}]{}
// k is the unique key for each goroutine.
k := int64(0)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Read the value.
syncMap.Load(k)
}
})
}
func BenchmarkWriteMutexMap(b *testing.B) {
// Create a map with a mutex.
m := make(map[int64]struct{})
// k is the unique key for each goroutine.
k := int64(0)
// Create a general mutex.
var mu sync.Mutex
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Perform a lock write.
mu.Lock()
m[k] = struct{}{}
mu.Unlock()
}
})
}
func BenchmarkWriteRWMutexMap(b *testing.B) {
// Create a map with a mutex.
m := make(map[int64]struct{})
// k is the unique key for each goroutine.
k := int64(0)
// Create a read write mutex.
var mu sync.RWMutex
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Perform a lock write.
mu.Lock()
m[k] = struct{}{}
mu.Unlock()
}
})
}
func BenchmarkWriteSyncMap(b *testing.B) {
// Create a sync.Map.
syncMap := &sync.Map{}
// k is the unique key for each goroutine.
k := int64(0)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Write the value.
syncMap.Store(k, struct{}{})
}
})
}
func BenchmarkWriteLndSyncMap(b *testing.B) {
// Create a sync.Map.
syncMap := &lnutils.SyncMap[int64, struct{}]{}
// k is the unique key for each goroutine.
k := int64(0)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Write the value.
syncMap.Store(k, struct{}{})
}
})
}
func BenchmarkDeleteMutexMap(b *testing.B) {
// Create a map with a mutex.
m := make(map[int64]struct{})
// k is the unique key for each goroutine.
k := int64(0)
// Create a general mutex.
var mu sync.Mutex
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Perform a lock delete.
mu.Lock()
delete(m, k)
mu.Unlock()
}
})
}
func BenchmarkDeleteRWMutexMap(b *testing.B) {
// Create a map with a mutex.
m := make(map[int64]struct{})
// k is the unique key for each goroutine.
k := int64(0)
// Create a read write mutex.
var mu sync.RWMutex
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Perform a lock delete.
mu.Lock()
delete(m, k)
mu.Unlock()
}
})
}
func BenchmarkDeleteSyncMap(b *testing.B) {
// Create a sync.Map.
syncMap := &sync.Map{}
// k is the unique key for each goroutine.
k := int64(0)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Delete the value.
syncMap.Delete(k)
}
})
}
func BenchmarkDeleteLndSyncMap(b *testing.B) {
// Create a sync.Map.
syncMap := &lnutils.SyncMap[int64, struct{}]{}
// k is the unique key for each goroutine.
k := int64(0)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Delete the value.
syncMap.Delete(k)
}
})
}

View File

@ -10,7 +10,6 @@ import (
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
@ -117,23 +116,19 @@ type topologyClient struct {
// notifyTopologyChange notifies all registered clients of a new change in
// graph topology in a non-blocking.
func (r *ChannelRouter) notifyTopologyChange(topologyDiff *TopologyChange) {
r.RLock()
defer r.RUnlock()
numClients := len(r.topologyClients)
if numClients == 0 {
return
}
log.Tracef("Sending topology notification to %v clients %v",
numClients, newLogClosure(func() string {
return spew.Sdump(topologyDiff)
}),
)
for _, client := range r.topologyClients {
// notifyClient is a helper closure that will send topology updates to
// the given client.
notifyClient := func(clientID uint64, client *topologyClient) bool {
client.wg.Add(1)
log.Tracef("Sending topology notification to client=%v, "+
"NodeUpdates=%v, ChannelEdgeUpdates=%v, "+
"ClosedChannels=%v", clientID,
len(topologyDiff.NodeUpdates),
len(topologyDiff.ChannelEdgeUpdates),
len(topologyDiff.ClosedChannels))
go func(c *topologyClient) {
defer c.wg.Done()
@ -153,7 +148,15 @@ func (r *ChannelRouter) notifyTopologyChange(topologyDiff *TopologyChange) {
}
}(client)
// Always return true here so the following Range will iterate
// all clients.
return true
}
// Range over the set of active clients, and attempt to send the
// topology updates.
r.topologyClients.Range(notifyClient)
}
// TopologyChange represents a new set of modifications to the channel graph.

View File

@ -25,6 +25,7 @@ import (
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnutils"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/btcwallet"
"github.com/lightningnetwork/lnd/lnwallet/chanvalidate"
@ -425,7 +426,7 @@ type ChannelRouter struct {
// topologyClients maps a client's unique notification ID to a
// topologyClient client that contains its notification dispatch
// channel.
topologyClients map[uint64]*topologyClient
topologyClients *lnutils.SyncMap[uint64, *topologyClient]
// ntfnClientUpdates is a channel that's used to send new updates to
// topology notification clients to the ChannelRouter. Updates either
@ -446,8 +447,6 @@ type ChannelRouter struct {
// announcements over a window of defaultStatInterval.
stats *routerStats
sync.RWMutex
quit chan struct{}
wg sync.WaitGroup
}
@ -474,7 +473,7 @@ func New(cfg Config) (*ChannelRouter, error) {
source: selfNode.PubKeyBytes,
},
networkUpdates: make(chan *routingMsg),
topologyClients: make(map[uint64]*topologyClient),
topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
ntfnClientUpdates: make(chan *topologyClientUpdate),
channelEdgeMtx: multimutex.NewMutex(),
selfNode: selfNode,
@ -1203,14 +1202,10 @@ func (r *ChannelRouter) networkHandler() {
clientID := ntfnUpdate.clientID
if ntfnUpdate.cancel {
r.RLock()
client, ok := r.topologyClients[ntfnUpdate.clientID]
r.RUnlock()
client, ok := r.topologyClients.LoadAndDelete(
clientID,
)
if ok {
r.Lock()
delete(r.topologyClients, clientID)
r.Unlock()
close(client.exit)
client.wg.Wait()
@ -1220,12 +1215,10 @@ func (r *ChannelRouter) networkHandler() {
continue
}
r.Lock()
r.topologyClients[ntfnUpdate.clientID] = &topologyClient{
r.topologyClients.Store(clientID, &topologyClient{
ntfnChan: ntfnUpdate.ntfnChan,
exit: make(chan struct{}),
}
r.Unlock()
})
// The graph prune ticker has ticked, so we'll examine the
// state of the known graph to filter out any zombie channels