From 2415675c3d5eab6112d1a883e6cd51128c6edbc7 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 31 Aug 2018 14:48:58 -0700 Subject: [PATCH 1/4] server: move gossip dispatch to peer See next commit msg for more detail. --- server.go | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/server.go b/server.go index 3ed068387..2378010bc 100644 --- a/server.go +++ b/server.go @@ -2364,33 +2364,6 @@ func (s *server) peerInitializer(p *peer) { // was successful, and to begin watching the peer's wait group. close(ready) - switch { - // If the remote peer knows of the new gossip queries feature, then - // we'll create a new gossipSyncer in the AuthenticatedGossiper for it. - case p.remoteLocalFeatures.HasFeature(lnwire.GossipQueriesOptional): - srvrLog.Infof("Negotiated chan series queries with %x", - p.pubKeyBytes[:]) - - // We'll only request channel updates from the remote peer if - // its enabled in the config, or we're already getting updates - // from enough peers. - // - // TODO(roasbeef): craft s.t. we only get updates from a few - // peers - recvUpdates := !cfg.NoChanUpdates - go s.authGossiper.InitSyncState(p, recvUpdates) - - // If the remote peer has the initial sync feature bit set, then we'll - // being the synchronization protocol to exchange authenticated channel - // graph edges/vertexes, but only if they don't know of the new gossip - // queries. - case p.remoteLocalFeatures.HasFeature(lnwire.InitialRoutingSync): - srvrLog.Infof("Requesting full table sync with %x", - p.pubKeyBytes[:]) - - go s.authGossiper.SynchronizeNode(p) - } - pubStr := string(p.addr.IdentityKey.SerializeCompressed()) s.mu.Lock() From adf6b8619f8de4bae35fd06116a01047167d030e Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 3 Sep 2018 21:58:48 -0700 Subject: [PATCH 2/4] peer: dispatch gossip sync in peer start This commit moves the gossip sync dispatch such that it is more tightly coupled to the life cycle of the peer. In testing, I noticed that the gossip syncer needs to be dispatched before the first gossip messages come across the wire. The prior spawn location in the server happens after starting all of the peer's goroutines, which could permit an ordering where the gossip syncer has not yet been registered. The new location registers the gossip syncer within the read handler such that the call is blocks before any messages are read. --- peer.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/peer.go b/peer.go index c1afa0ed1..a45263265 100644 --- a/peer.go +++ b/peer.go @@ -320,6 +320,43 @@ func (p *peer) Start() error { return nil } +// initGossipSync initializes either a gossip syncer or an initial routing +// dump, depending on the negotiated synchronization method. +func (p *peer) initGossipSync() { + switch { + + // If the remote peer knows of the new gossip queries feature, then + // we'll create a new gossipSyncer in the AuthenticatedGossiper for it. + case p.remoteLocalFeatures.HasFeature(lnwire.GossipQueriesOptional): + srvrLog.Infof("Negotiated chan series queries with %x", + p.pubKeyBytes[:]) + + // We'll only request channel updates from the remote peer if + // its enabled in the config, or we're already getting updates + // from enough peers. + // + // TODO(roasbeef): craft s.t. we only get updates from a few + // peers + recvUpdates := !cfg.NoChanUpdates + + // Register the this peer's for gossip syncer with the gossiper. + // This is blocks synchronously to ensure the gossip syncer is + // registered with the gossiper before attempting to read + // messages from the remote peer. + p.server.authGossiper.InitSyncState(p, recvUpdates) + + // If the remote peer has the initial sync feature bit set, then we'll + // being the synchronization protocol to exchange authenticated channel + // graph edges/vertexes, but only if they don't know of the new gossip + // queries. + case p.remoteLocalFeatures.HasFeature(lnwire.InitialRoutingSync): + srvrLog.Infof("Requesting full table sync with %x", + p.pubKeyBytes[:]) + + go p.server.authGossiper.SynchronizeNode(p) + } +} + // QuitSignal is a method that should return a channel which will be sent upon // or closed once the backing peer exits. This allows callers using the // interface to cancel any processing in the event the backing implementation @@ -877,6 +914,14 @@ func (p *peer) readHandler() { p.Disconnect(err) }) + // Initialize our negotiated gossip sync method before reading + // messages off the wire. When using gossip queries, this ensures + // a gossip syncer is active by the time query messages arrive. + // + // TODO(conner): have peer store gossip syncer directly and bypass + // gossiper? + p.initGossipSync() + discStream := newDiscMsgStream(p) discStream.Start() defer discStream.Stop() From 51090a41b5c9aee8b43a552013983acf49e44f7b Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 31 Aug 2018 14:54:35 -0700 Subject: [PATCH 3/4] peer: log disconnect to info, remove go-errors pkg --- peer.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/peer.go b/peer.go index a45263265..de8e54b87 100644 --- a/peer.go +++ b/peer.go @@ -3,6 +3,7 @@ package main import ( "bytes" "container/list" + "errors" "fmt" "net" "sync" @@ -15,7 +16,6 @@ import ( "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/davecgh/go-spew/spew" - "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/brontide" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" @@ -32,7 +32,7 @@ var ( numNodes int32 // ErrPeerExiting signals that the peer received a disconnect request. - ErrPeerExiting = errors.Errorf("peer exiting") + ErrPeerExiting = fmt.Errorf("peer exiting") ) const ( @@ -113,7 +113,7 @@ type peer struct { pubKeyBytes [33]byte // startTime is the time this peer connection was successfully - // established. It will be zero for peers that did not successfuly + // established. It will be zero for peers that did not successfully // Start(). startTime time.Time @@ -239,7 +239,7 @@ func (p *peer) Start() error { return nil } - peerLog.Tracef("peer %v starting", p) + peerLog.Tracef("Peer %v starting", p) // Exchange local and global features, the init message should be very // first between two nodes. @@ -606,7 +606,7 @@ func (p *peer) Disconnect(reason error) { return } - peerLog.Debugf("Disconnecting %s, reason: %v", p, reason) + peerLog.Infof("Disconnecting %s, reason: %v", p, reason) // Ensure that the TCP connection is properly closed before continuing. p.conn.Close() @@ -1361,7 +1361,8 @@ out: } if err != nil { - exitErr = errors.Errorf("unable to write message: %v", err) + exitErr = fmt.Errorf("unable to write "+ + "message: %v", err) break out } @@ -2093,17 +2094,15 @@ func (p *peer) handleInitMsg(msg *lnwire.Init) error { unknownLocalFeatures := p.remoteLocalFeatures.UnknownRequiredFeatures() if len(unknownLocalFeatures) > 0 { - err := errors.Errorf("Peer set unknown local feature bits: %v", + err := fmt.Errorf("Peer set unknown local feature bits: %v", unknownLocalFeatures) - peerLog.Error(err) return err } unknownGlobalFeatures := p.remoteGlobalFeatures.UnknownRequiredFeatures() if len(unknownGlobalFeatures) > 0 { - err := errors.Errorf("Peer set unknown global feature bits: %v", + err := fmt.Errorf("Peer set unknown global feature bits: %v", unknownGlobalFeatures) - peerLog.Error(err) return err } From 8e94e5583994e93855b0d4e85b34be79dab9153e Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 4 Sep 2018 17:31:57 -0700 Subject: [PATCH 4/4] discovery/gossiper: require explict gossip syncer init This commit removes the fallback in fetchGossipSyncer that creates a gossip syncer if one is not registered w/in the gossiper. Now that we register gossip syncers explicitly before reading any gossip query messages, this should not longer be required. The fallback also did not honor the cfg.NoChanUpdates flag, which may have led to inconsistencies between configuration and actual behavior. --- discovery/gossiper.go | 51 ++++++++++--------------------------------- 1 file changed, 12 insertions(+), 39 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 5f3b89273..1dad460ac 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -35,6 +35,11 @@ var ( // ErrGossiperShuttingDown is an error that is returned if the gossiper // is in the process of being shut down. ErrGossiperShuttingDown = errors.New("gossiper is shutting down") + + // ErrGossipSyncerNotFound signals that we were unable to find an active + // gossip syncer corresponding to a gossip query message received from + // the remote peer. + ErrGossipSyncerNotFound = errors.New("gossip syncer not found") ) // networkMsg couples a routing related wire message with the peer that @@ -919,45 +924,7 @@ func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) ( return syncer, nil } - // A known gossip syncer doesn't exist, so we may have to create one - // from scratch. To do so, we'll query for a reference directly to the - // active peer. - syncPeer, err := d.cfg.FindPeer(pub) - if err != nil { - log.Debugf("unable to find gossip peer %v: %v", - pub.SerializeCompressed(), err) - return nil, err - } - - // Finally, we'll obtain the exclusive mutex, then check again if a - // gossiper was added after we dropped the read mutex. - d.syncerMtx.Lock() - syncer, ok = d.peerSyncers[target] - if ok { - d.syncerMtx.Unlock() - return syncer, nil - } - - // At this point, a syncer doesn't yet exist, so we'll create a new one - // for the peer and return it to the caller. - encoding := lnwire.EncodingSortedPlain - syncer = newGossiperSyncer(gossipSyncerCfg{ - chainHash: d.cfg.ChainHash, - syncChanUpdates: true, - channelSeries: d.cfg.ChanSeries, - encodingType: encoding, - chunkSize: encodingTypeToChunkSize[encoding], - sendToPeer: func(msgs ...lnwire.Message) error { - return syncPeer.SendMessage(false, msgs...) - }, - }) - copy(syncer.peerPub[:], pub.SerializeCompressed()) - d.peerSyncers[target] = syncer - syncer.Start() - - d.syncerMtx.Unlock() - - return syncer, nil + return nil, ErrGossipSyncerNotFound } // networkHandler is the primary goroutine that drives this service. The roles @@ -1041,6 +1008,9 @@ func (d *AuthenticatedGossiper) networkHandler() { announcement.source, ) if err != nil { + log.Warnf("Unable to find gossip "+ + "syncer for peer=%x: %v", + announcement.peer.PubKey(), err) continue } @@ -1066,6 +1036,9 @@ func (d *AuthenticatedGossiper) networkHandler() { announcement.source, ) if err != nil { + log.Warnf("Unable to find gossip "+ + "syncer for peer=%x: %v", + announcement.source, err) continue }