From 89b0e25e2c6084025c3c6e88237a54aa8766fd9c Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 19 Jan 2023 06:31:17 +0800 Subject: [PATCH 1/4] multi: add `lnutils` to host fundamental utility functions We also move the `fn/stream.go` into the package `lnutils`. Eventually we will put all the [utility functions](https://github.com/lightninglabs/taro/tree/main/chanutils) into this package. --- discovery/gossiper.go | 4 ++-- lnutils/README.md | 4 ++++ {fn => lnutils}/stream.go | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) create mode 100644 lnutils/README.md rename {fn => lnutils}/stream.go (93%) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 9bb6959e5..eb13d77ea 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -18,10 +18,10 @@ import ( "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" - "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lnutils" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/multimutex" @@ -1185,7 +1185,7 @@ func (d *AuthenticatedGossiper) sendBatch(annBatch []msgWithSenders, // skip the filter and dedup logic below, and just send the // announcements out to all our coonnected peers. if isLocal { - msgsToSend := fn.Map( + msgsToSend := lnutils.Map( annBatch, func(m msgWithSenders) lnwire.Message { return m.msg }, diff --git a/lnutils/README.md b/lnutils/README.md new file mode 100644 index 000000000..fe2fafa7c --- /dev/null +++ b/lnutils/README.md @@ -0,0 +1,4 @@ +# `lnutils` + +This package provides fundamental types and utility functions that can be used +by all other packages. diff --git a/fn/stream.go b/lnutils/stream.go similarity index 93% rename from fn/stream.go rename to lnutils/stream.go index 9974eb467..ed04fa2ee 100644 --- a/fn/stream.go +++ b/lnutils/stream.go @@ -1,4 +1,4 @@ -package fn +package lnutils // Map takes an input slice, and applies the function f to each element, // yielding a new slice. From 692cd4bc4f6787dc14ed93ee24f1f996e30e4b07 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 19 Jan 2023 06:37:57 +0800 Subject: [PATCH 2/4] lnutils+lntemp: move `SyncMap` to `lnutils` This commit moves the `SyncMap` from `lntemp/node` into `lnutils` so it can be used by other packages. --- lntemp/node/state.go | 37 ++-- lntemp/node/watcher.go | 13 +- {lntemp/node => lnutils}/sync_map.go | 2 +- lnutils/sync_map_bench_test.go | 255 +++++++++++++++++++++++++++ 4 files changed, 286 insertions(+), 21 deletions(-) rename {lntemp/node => lnutils}/sync_map.go (98%) create mode 100644 lnutils/sync_map_bench_test.go diff --git a/lntemp/node/state.go b/lntemp/node/state.go index e852bf9ab..2c3c45f02 100644 --- a/lntemp/node/state.go +++ b/lntemp/node/state.go @@ -10,6 +10,7 @@ import ( "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/lntemp/rpc" + "github.com/lightningnetwork/lnd/lnutils" ) type ( @@ -153,18 +154,18 @@ type State struct { // openChans records each opened channel and how many times it has // heard the announcements from its graph subscription. - openChans *SyncMap[wire.OutPoint, []*OpenChannelUpdate] + openChans *lnutils.SyncMap[wire.OutPoint, []*OpenChannelUpdate] // closedChans records each closed channel and its close channel update // message received from its graph subscription. - closedChans *SyncMap[wire.OutPoint, *lnrpc.ClosedChannelUpdate] + closedChans *lnutils.SyncMap[wire.OutPoint, *lnrpc.ClosedChannelUpdate] // numChanUpdates records the number of channel updates seen by each // channel. - numChanUpdates *SyncMap[wire.OutPoint, int] + numChanUpdates *lnutils.SyncMap[wire.OutPoint, int] // nodeUpdates records the node announcements seen by each node. - nodeUpdates *SyncMap[string, []*lnrpc.NodeUpdate] + nodeUpdates *lnutils.SyncMap[string, []*lnrpc.NodeUpdate] // policyUpdates defines a type to store channel policy updates. It has // the format, @@ -179,21 +180,23 @@ type State struct { // }, // "chanPoint2": ... // } - policyUpdates *SyncMap[wire.OutPoint, PolicyUpdate] + policyUpdates *lnutils.SyncMap[wire.OutPoint, PolicyUpdate] } // newState initialize a new state with every field being set to its zero // value. func newState(rpc *rpc.HarnessRPC) *State { return &State{ - rpc: rpc, - openChans: &SyncMap[wire.OutPoint, []*OpenChannelUpdate]{}, - closedChans: &SyncMap[ + rpc: rpc, + openChans: &lnutils.SyncMap[ + wire.OutPoint, []*OpenChannelUpdate, + ]{}, + closedChans: &lnutils.SyncMap[ wire.OutPoint, *lnrpc.ClosedChannelUpdate, ]{}, - numChanUpdates: &SyncMap[wire.OutPoint, int]{}, - nodeUpdates: &SyncMap[string, []*lnrpc.NodeUpdate]{}, - policyUpdates: &SyncMap[wire.OutPoint, PolicyUpdate]{}, + numChanUpdates: &lnutils.SyncMap[wire.OutPoint, int]{}, + nodeUpdates: &lnutils.SyncMap[string, []*lnrpc.NodeUpdate]{}, + policyUpdates: &lnutils.SyncMap[wire.OutPoint, PolicyUpdate]{}, } } @@ -352,9 +355,11 @@ func (s *State) resetEphermalStates(rpc *rpc.HarnessRPC) { // Reset ephermal states which are used to record info from finished // tests. - s.openChans = &SyncMap[wire.OutPoint, []*OpenChannelUpdate]{} - s.closedChans = &SyncMap[wire.OutPoint, *lnrpc.ClosedChannelUpdate]{} - s.numChanUpdates = &SyncMap[wire.OutPoint, int]{} - s.nodeUpdates = &SyncMap[string, []*lnrpc.NodeUpdate]{} - s.policyUpdates = &SyncMap[wire.OutPoint, PolicyUpdate]{} + s.openChans = &lnutils.SyncMap[wire.OutPoint, []*OpenChannelUpdate]{} + s.closedChans = &lnutils.SyncMap[ + wire.OutPoint, *lnrpc.ClosedChannelUpdate, + ]{} + s.numChanUpdates = &lnutils.SyncMap[wire.OutPoint, int]{} + s.nodeUpdates = &lnutils.SyncMap[string, []*lnrpc.NodeUpdate]{} + s.policyUpdates = &lnutils.SyncMap[wire.OutPoint, PolicyUpdate]{} } diff --git a/lntemp/node/watcher.go b/lntemp/node/watcher.go index c081daaad..af0bf66cb 100644 --- a/lntemp/node/watcher.go +++ b/lntemp/node/watcher.go @@ -14,6 +14,7 @@ import ( "github.com/lightningnetwork/lnd/lntemp/rpc" "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lntest/wait" + "github.com/lightningnetwork/lnd/lnutils" ) type chanWatchType uint8 @@ -68,8 +69,8 @@ type nodeWatcher struct { // of edges seen for that channel within the network. When this number // reaches 2, then it means that both edge advertisements has // propagated through the network. - openChanWatchers *SyncMap[wire.OutPoint, []chan struct{}] - closeChanWatchers *SyncMap[wire.OutPoint, []chan struct{}] + openChanWatchers *lnutils.SyncMap[wire.OutPoint, []chan struct{}] + closeChanWatchers *lnutils.SyncMap[wire.OutPoint, []chan struct{}] wg sync.WaitGroup } @@ -79,8 +80,12 @@ func newNodeWatcher(rpc *rpc.HarnessRPC, state *State) *nodeWatcher { rpc: rpc, state: state, chanWatchRequests: make(chan *chanWatchRequest, 100), - openChanWatchers: &SyncMap[wire.OutPoint, []chan struct{}]{}, - closeChanWatchers: &SyncMap[wire.OutPoint, []chan struct{}]{}, + openChanWatchers: &lnutils.SyncMap[ + wire.OutPoint, []chan struct{}, + ]{}, + closeChanWatchers: &lnutils.SyncMap[ + wire.OutPoint, []chan struct{}, + ]{}, } } diff --git a/lntemp/node/sync_map.go b/lnutils/sync_map.go similarity index 98% rename from lntemp/node/sync_map.go rename to lnutils/sync_map.go index 1ef157331..83789d7b5 100644 --- a/lntemp/node/sync_map.go +++ b/lnutils/sync_map.go @@ -1,4 +1,4 @@ -package node +package lnutils import "sync" diff --git a/lnutils/sync_map_bench_test.go b/lnutils/sync_map_bench_test.go new file mode 100644 index 000000000..e1e0cbc8f --- /dev/null +++ b/lnutils/sync_map_bench_test.go @@ -0,0 +1,255 @@ +package lnutils_test + +import ( + "sync" + "sync/atomic" + "testing" + + "github.com/lightningnetwork/lnd/lnutils" +) + +func BenchmarkReadMutexMap(b *testing.B) { + // Create a map with a mutex. + m := make(map[int64]struct{}) + + // k is the unique key for each goroutine. + k := int64(0) + + // Create a general mutex. + var mu sync.Mutex + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Perform a lock read. + mu.Lock() + _ = m[k] + mu.Unlock() + } + }) +} + +func BenchmarkReadRWMutexMap(b *testing.B) { + // Create a map with a mutex. + m := make(map[int64]struct{}) + + // k is the unique key for each goroutine. + k := int64(0) + + // Create a read write mutex. + var mu sync.RWMutex + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Perform a lock read. + mu.RLock() + _ = m[k] + mu.RUnlock() + } + }) +} + +func BenchmarkReadSyncMap(b *testing.B) { + // Create a sync.Map. + syncMap := &sync.Map{} + + // k is the unique key for each goroutine. + k := int64(0) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Read the value. + syncMap.Load(k) + } + }) +} + +func BenchmarkReadLndSyncMap(b *testing.B) { + // Create a sync.Map. + syncMap := &lnutils.SyncMap[int64, struct{}]{} + + // k is the unique key for each goroutine. + k := int64(0) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Read the value. + syncMap.Load(k) + } + }) +} + +func BenchmarkWriteMutexMap(b *testing.B) { + // Create a map with a mutex. + m := make(map[int64]struct{}) + + // k is the unique key for each goroutine. + k := int64(0) + + // Create a general mutex. + var mu sync.Mutex + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Perform a lock write. + mu.Lock() + m[k] = struct{}{} + mu.Unlock() + } + }) +} + +func BenchmarkWriteRWMutexMap(b *testing.B) { + // Create a map with a mutex. + m := make(map[int64]struct{}) + + // k is the unique key for each goroutine. + k := int64(0) + + // Create a read write mutex. + var mu sync.RWMutex + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Perform a lock write. + mu.Lock() + m[k] = struct{}{} + mu.Unlock() + } + }) +} + +func BenchmarkWriteSyncMap(b *testing.B) { + // Create a sync.Map. + syncMap := &sync.Map{} + + // k is the unique key for each goroutine. + k := int64(0) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Write the value. + syncMap.Store(k, struct{}{}) + } + }) +} + +func BenchmarkWriteLndSyncMap(b *testing.B) { + // Create a sync.Map. + syncMap := &lnutils.SyncMap[int64, struct{}]{} + + // k is the unique key for each goroutine. + k := int64(0) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Write the value. + syncMap.Store(k, struct{}{}) + } + }) +} + +func BenchmarkDeleteMutexMap(b *testing.B) { + // Create a map with a mutex. + m := make(map[int64]struct{}) + + // k is the unique key for each goroutine. + k := int64(0) + + // Create a general mutex. + var mu sync.Mutex + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Perform a lock delete. + mu.Lock() + delete(m, k) + mu.Unlock() + } + }) +} + +func BenchmarkDeleteRWMutexMap(b *testing.B) { + // Create a map with a mutex. + m := make(map[int64]struct{}) + + // k is the unique key for each goroutine. + k := int64(0) + + // Create a read write mutex. + var mu sync.RWMutex + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Perform a lock delete. + mu.Lock() + delete(m, k) + mu.Unlock() + } + }) +} + +func BenchmarkDeleteSyncMap(b *testing.B) { + // Create a sync.Map. + syncMap := &sync.Map{} + + // k is the unique key for each goroutine. + k := int64(0) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Delete the value. + syncMap.Delete(k) + } + }) +} + +func BenchmarkDeleteLndSyncMap(b *testing.B) { + // Create a sync.Map. + syncMap := &lnutils.SyncMap[int64, struct{}]{} + + // k is the unique key for each goroutine. + k := int64(0) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Delete the value. + syncMap.Delete(k) + } + }) +} From fa2c766866da783cb44457616a6fa04b428dc2f9 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 19 Jan 2023 07:06:34 +0800 Subject: [PATCH 3/4] routing: remove the mutex and use `SyncMap` instead This commit removes the lock used to protect the map `topologyClients` and replace the map with a `SyncMap`. --- routing/notifications.go | 33 ++++++++++++++++++--------------- routing/router.go | 23 ++++++++--------------- 2 files changed, 26 insertions(+), 30 deletions(-) diff --git a/routing/notifications.go b/routing/notifications.go index fe1b3c236..387bd9126 100644 --- a/routing/notifications.go +++ b/routing/notifications.go @@ -10,7 +10,6 @@ import ( "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/wire" - "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" @@ -117,23 +116,19 @@ type topologyClient struct { // notifyTopologyChange notifies all registered clients of a new change in // graph topology in a non-blocking. func (r *ChannelRouter) notifyTopologyChange(topologyDiff *TopologyChange) { - r.RLock() - defer r.RUnlock() - numClients := len(r.topologyClients) - if numClients == 0 { - return - } - - log.Tracef("Sending topology notification to %v clients %v", - numClients, newLogClosure(func() string { - return spew.Sdump(topologyDiff) - }), - ) - - for _, client := range r.topologyClients { + // notifyClient is a helper closure that will send topology updates to + // the given client. + notifyClient := func(clientID uint64, client *topologyClient) bool { client.wg.Add(1) + log.Tracef("Sending topology notification to client=%v, "+ + "NodeUpdates=%v, ChannelEdgeUpdates=%v, "+ + "ClosedChannels=%v", clientID, + len(topologyDiff.NodeUpdates), + len(topologyDiff.ChannelEdgeUpdates), + len(topologyDiff.ClosedChannels)) + go func(c *topologyClient) { defer c.wg.Done() @@ -153,7 +148,15 @@ func (r *ChannelRouter) notifyTopologyChange(topologyDiff *TopologyChange) { } }(client) + + // Always return true here so the following Range will iterate + // all clients. + return true } + + // Range over the set of active clients, and attempt to send the + // topology updates. + r.topologyClients.Range(notifyClient) } // TopologyChange represents a new set of modifications to the channel graph. diff --git a/routing/router.go b/routing/router.go index 378f45187..eea5189d9 100644 --- a/routing/router.go +++ b/routing/router.go @@ -25,6 +25,7 @@ import ( "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lntypes" + "github.com/lightningnetwork/lnd/lnutils" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/btcwallet" "github.com/lightningnetwork/lnd/lnwallet/chanvalidate" @@ -425,7 +426,7 @@ type ChannelRouter struct { // topologyClients maps a client's unique notification ID to a // topologyClient client that contains its notification dispatch // channel. - topologyClients map[uint64]*topologyClient + topologyClients *lnutils.SyncMap[uint64, *topologyClient] // ntfnClientUpdates is a channel that's used to send new updates to // topology notification clients to the ChannelRouter. Updates either @@ -446,8 +447,6 @@ type ChannelRouter struct { // announcements over a window of defaultStatInterval. stats *routerStats - sync.RWMutex - quit chan struct{} wg sync.WaitGroup } @@ -474,7 +473,7 @@ func New(cfg Config) (*ChannelRouter, error) { source: selfNode.PubKeyBytes, }, networkUpdates: make(chan *routingMsg), - topologyClients: make(map[uint64]*topologyClient), + topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{}, ntfnClientUpdates: make(chan *topologyClientUpdate), channelEdgeMtx: multimutex.NewMutex(), selfNode: selfNode, @@ -1203,14 +1202,10 @@ func (r *ChannelRouter) networkHandler() { clientID := ntfnUpdate.clientID if ntfnUpdate.cancel { - r.RLock() - client, ok := r.topologyClients[ntfnUpdate.clientID] - r.RUnlock() + client, ok := r.topologyClients.LoadAndDelete( + clientID, + ) if ok { - r.Lock() - delete(r.topologyClients, clientID) - r.Unlock() - close(client.exit) client.wg.Wait() @@ -1220,12 +1215,10 @@ func (r *ChannelRouter) networkHandler() { continue } - r.Lock() - r.topologyClients[ntfnUpdate.clientID] = &topologyClient{ + r.topologyClients.Store(clientID, &topologyClient{ ntfnChan: ntfnUpdate.ntfnChan, exit: make(chan struct{}), - } - r.Unlock() + }) // The graph prune ticker has ticked, so we'll examine the // state of the known graph to filter out any zombie channels From 1a1fe37ad4ab7df7c899f26f52b05eb27502534b Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 17 Jan 2023 11:33:27 +0800 Subject: [PATCH 4/4] docs: update release notes --- docs/release-notes/release-notes-0.16.0.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/release-notes/release-notes-0.16.0.md b/docs/release-notes/release-notes-0.16.0.md index 92338cd9f..0ff661e87 100644 --- a/docs/release-notes/release-notes-0.16.0.md +++ b/docs/release-notes/release-notes-0.16.0.md @@ -222,6 +222,9 @@ data. * [Fix race condition in `TestUpdatePaymentState`](https://github.com/lightningnetwork/lnd/pull/7336) +* [Decreased the mutex lock + scope](https://github.com/lightningnetwork/lnd/pull/7330) inside `ChannelRouter`. + ## `lncli` * [Add an `insecure` flag to skip tls auth as well as a `metadata` string slice