diff --git a/server.go b/server.go index 4425fb721..8e2f3ec0a 100644 --- a/server.go +++ b/server.go @@ -818,9 +818,7 @@ func (s *server) establishPersistentConnections() error { // messages to all peers other than the one specified by the `skip` parameter. // // NOTE: This function is safe for concurrent access. -func (s *server) BroadcastMessage(skip *btcec.PublicKey, - msgs ...lnwire.Message) error { - +func (s *server) BroadcastMessage(skip *btcec.PublicKey, msgs ...lnwire.Message) error { s.mu.Lock() defer s.mu.Unlock() @@ -831,9 +829,7 @@ func (s *server) BroadcastMessage(skip *btcec.PublicKey, // peers except the one specified by `skip`. // // NOTE: This method MUST be called while the server's mutex is locked. -func (s *server) broadcastMessages( - skip *btcec.PublicKey, - msgs []lnwire.Message) error { +func (s *server) broadcastMessages(skip *btcec.PublicKey, msgs []lnwire.Message) error { srvrLog.Debugf("Broadcasting %v messages", len(msgs)) @@ -841,23 +837,15 @@ func (s *server) broadcastMessages( // all messages to each of peers. We synchronize access to peersByPub // throughout this process to ensure we deliver messages to exact set // of peers present at the time of invocation. - var wg sync.WaitGroup for pubStr, sPeer := range s.peersByPub { if skip != nil && sPeer.addr.IdentityKey.IsEqual(skip) { srvrLog.Debugf("Skipping %v in broadcast", pubStr) continue } - // Dispatch a go routine to enqueue all messages to this peer. - wg.Add(1) - s.wg.Add(1) - go s.sendPeerMessages(sPeer, msgs, &wg) + go s.sendPeerMessages(sPeer, msgs) } - // Wait for all messages to have been dispatched before returning to - // caller. - wg.Wait() - return nil } @@ -866,9 +854,7 @@ func (s *server) broadcastMessages( // method will return a non-nil error. // // NOTE: This function is safe for concurrent access. -func (s *server) SendToPeer(target *btcec.PublicKey, - msgs ...lnwire.Message) error { - +func (s *server) SendToPeer(target *btcec.PublicKey, msgs ...lnwire.Message) error { s.mu.Lock() defer s.mu.Unlock() @@ -879,8 +865,7 @@ func (s *server) SendToPeer(target *btcec.PublicKey, // particular peer comes online. // // NOTE: This function is safe for concurrent access. -func (s *server) NotifyWhenOnline(peer *btcec.PublicKey, - connectedChan chan<- struct{}) { +func (s *server) NotifyWhenOnline(peer *btcec.PublicKey, connectedChan chan<- struct{}) { s.mu.Lock() defer s.mu.Unlock() @@ -904,8 +889,7 @@ func (s *server) NotifyWhenOnline(peer *btcec.PublicKey, // sendToPeer is an internal method that delivers messages to the specified // `target` peer. -func (s *server) sendToPeer(target *btcec.PublicKey, - msgs []lnwire.Message) error { +func (s *server) sendToPeer(target *btcec.PublicKey, msgs []lnwire.Message) error { // Compute the target peer's identifier. targetPubBytes := target.SerializeCompressed() @@ -924,37 +908,14 @@ func (s *server) sendToPeer(target *btcec.PublicKey, return err } - s.sendPeerMessages(targetPeer, msgs, nil) + s.sendPeerMessages(targetPeer, msgs) return nil } // sendPeerMessages enqueues a list of messages into the outgoingQueue of the -// `targetPeer`. This method supports additional broadcast-level -// synchronization by using the additional `wg` to coordinate a particular -// broadcast. -// -// NOTE: This method must be invoked with a non-nil `wg` if it is spawned as a -// go routine--both `wg` and the server's WaitGroup should be incremented -// beforehand. If this method is not spawned as a go routine, the provided -// `wg` should be nil, and the server's WaitGroup should not be tracking this -// invocation. -func (s *server) sendPeerMessages( - targetPeer *peer, - msgs []lnwire.Message, - wg *sync.WaitGroup) { - - // If a WaitGroup is provided, we assume that this method was spawned - // as a go routine, and that it is being tracked by both the server's - // WaitGroup, as well as the broadcast-level WaitGroup `wg`. In this - // event, we defer a call to Done on both WaitGroups to 1) ensure that - // server will be able to shutdown after its go routines exit, and 2) - // so the server can return to the caller of BroadcastMessage. - if wg != nil { - defer s.wg.Done() - defer wg.Done() - } - +// `targetPeer`. +func (s *server) sendPeerMessages(targetPeer *peer, msgs []lnwire.Message) { for _, msg := range msgs { targetPeer.queueMsg(msg, nil) }