From a9340d22c4cb6960168df947fdc963365d975644 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 4 Jun 2018 16:43:53 -0700 Subject: [PATCH 1/3] discovery: ensure stopping/starting a gossipSyncer is idempotent --- discovery/syncer.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/discovery/syncer.go b/discovery/syncer.go index 85e356bca..a187c7ef2 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -175,6 +175,9 @@ type gossipSyncerCfg struct { // // TODO(roasbeef): modify to only sync from one peer at a time? type gossipSyncer struct { + started uint32 + stopped uint32 + // remoteUpdateHorizon is the update horizon of the remote peer. We'll // use this to properly filter out any messages. remoteUpdateHorizon *lnwire.GossipTimestampRange @@ -226,6 +229,10 @@ func newGossiperSyncer(cfg gossipSyncerCfg) *gossipSyncer { // Start starts the gossipSyncer and any goroutines that it needs to carry out // its duties. func (g *gossipSyncer) Start() error { + if !atomic.CompareAndSwapUint32(&g.started, 0, 1) { + return nil + } + log.Debugf("Starting gossipSyncer(%x)", g.peerPub[:]) g.wg.Add(1) @@ -237,6 +244,10 @@ func (g *gossipSyncer) Start() error { // Stop signals the gossipSyncer for a graceful exit, then waits until it has // exited. func (g *gossipSyncer) Stop() error { + if !atomic.CompareAndSwapUint32(&g.stopped, 0, 1) { + return nil + } + close(g.quit) g.wg.Wait() From 0ec4a06e6b2e409427ac78e3b2fd95fb3f3a9e30 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 4 Jun 2018 16:50:04 -0700 Subject: [PATCH 2/3] discovery: fix deadlock by ensure we don't hold sync mutex during send In this commit, we fix an existing deadlock in the gossiper->server->peer pipeline by ensuring that we're not holding the syncer mutex while we attempt to have a syncer filter out the rest of gossip messages. --- discovery/gossiper.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 878a9cd30..84af3ab5b 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -525,7 +525,7 @@ type msgWithSenders struct { // with peers that we have an active gossipSyncer with. We do this to ensure // that we don't broadcast messages to any peers that we have active gossip // syncers for. -func (m *msgWithSenders) mergeSyncerMap(syncers map[routing.Vertex]struct{}) { +func (m *msgWithSenders) mergeSyncerMap(syncers map[routing.Vertex]*gossipSyncer) { for peerPub := range syncers { m.senders[peerPub] = struct{}{} } @@ -1130,9 +1130,9 @@ func (d *AuthenticatedGossiper) networkHandler() { // syncers, we'll collect their pubkeys so we can avoid // sending them the full message blast below. d.syncerMtx.RLock() - syncerPeers := map[routing.Vertex]struct{}{} - for peerPub := range d.peerSyncers { - syncerPeers[peerPub] = struct{}{} + syncerPeers := make(map[routing.Vertex]*gossipSyncer) + for peerPub, syncer := range d.peerSyncers { + syncerPeers[peerPub] = syncer } d.syncerMtx.RUnlock() @@ -1142,11 +1142,9 @@ func (d *AuthenticatedGossiper) networkHandler() { // We'll first attempt to filter out this new message // for all peers that have active gossip syncers // active. - d.syncerMtx.RLock() - for _, syncer := range d.peerSyncers { + for _, syncer := range syncerPeers { syncer.FilterGossipMsgs(announcementBatch...) } - d.syncerMtx.RUnlock() // Next, If we have new things to announce then // broadcast them to all our immediately connected @@ -1234,8 +1232,7 @@ func (d *AuthenticatedGossiper) PruneSyncState(peer *btcec.PublicKey) { peer.SerializeCompressed()) vertex := routing.NewVertex(peer) - - syncer, ok := d.peerSyncers[routing.NewVertex(peer)] + syncer, ok := d.peerSyncers[vertex] if !ok { return } From e6d46f681b54d53b9cddab59bf77c6cac53b35e8 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 4 Jun 2018 17:18:48 -0700 Subject: [PATCH 3/3] discovery: if gossipSyncer is shutting down, don't filter messages --- discovery/syncer.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/discovery/syncer.go b/discovery/syncer.go index a187c7ef2..d115c7234 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -805,6 +805,12 @@ func (g *gossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) { return } + // If we've been signalled to exit, or are exiting, then we'll stop + // short. + if atomic.LoadUint32(&g.stopped) == 1 { + return + } + // TODO(roasbeef): need to ensure that peer still online...send msg to // gossiper on peer termination to signal peer disconnect?