diff --git a/peer.go b/peer.go index f8cbd3e18..1139e1506 100644 --- a/peer.go +++ b/peer.go @@ -543,18 +543,19 @@ func (p *peer) writeMessage(msg lnwire.Message) error { // // NOTE: This method MUST be run as a goroutine. func (p *peer) writeHandler() { -out: + defer func() { + p.wg.Done() + peerLog.Tracef("writeHandler for peer %v done", p) + }() + for { select { case outMsg := <-p.sendQueue: - if err := p.writeMessage(outMsg.msg); err != nil { - peerLog.Errorf("unable to write message: %v", - err) - p.Disconnect() - break out - } - switch outMsg.msg.(type) { + // If we're about to send a ping message, then log the + // exact time in which we send the message so we can + // use the delay as a rough estimate of latency to the + // remote peer. case *lnwire.Ping: // TODO(roasbeef): do this before the write? // possibly account for processing within func? @@ -562,33 +563,26 @@ out: atomic.StoreInt64(&p.pingLastSend, now) } - // Synchronize with the writeHandler. - p.sendQueueSync <- struct{}{} - case <-p.quit: - break out - } - } - - // Wait for the queueHandler to finish so we can empty out all pending - // messages avoiding a possible deadlock somewhere. - <-p.queueQuit - - // Drain any lingering messages that we're meant to be sent. But since - // we're shutting down, just ignore them. -fin: - for { - select { - case msg := <-p.sendQueue: - if msg.sentChan != nil { - msg.sentChan <- struct{}{} + // Write out the message to the socket, closing the + // 'sentChan' if it's non-nil, The 'sentChan' allows + // callers to optionally synchronize sends with the + // writeHandler. + err := p.writeMessage(outMsg.msg) + if outMsg.sentChan != nil { + close(outMsg.sentChan) } - default: - break fin + + if err != nil { + peerLog.Errorf("unable to write message: %v", + err) + p.Disconnect() + return + } + + case <-p.quit: + return } } - - p.wg.Done() - peerLog.Tracef("writeHandler for peer %v done", p) } // queueHandler is responsible for accepting messages from outside subsystems @@ -596,40 +590,43 @@ fin: // // NOTE: This method MUST be run as a goroutine. func (p *peer) queueHandler() { - waitOnSync := false + defer p.wg.Done() + pendingMsgs := list.New() -out: for { - select { - case msg := <-p.outgoingQueue: - if !waitOnSync { - p.sendQueue <- msg - } else { - pendingMsgs.PushBack(msg) - } - waitOnSync = true - case <-p.sendQueueSync: - // If there aren't any more remaining messages in the - // queue, then we're no longer waiting to synchronize - // with the writeHandler. - next := pendingMsgs.Front() - if next == nil { - waitOnSync = false - continue + // Before add a queue'd message our pending message queue, + // we'll first try to aggressively empty out our pending list of + // messaging. + for { + // Examine the front of the queue. If this message is + // nil, then we've emptied out the queue and can accept + // new messages from outside sub-systems. + elem := pendingMsgs.Front() + if elem == nil { + break } - // Notify the writeHandler about the next item to - // asynchronously send. - val := pendingMsgs.Remove(next) - p.sendQueue <- val.(outgoinMsg) - // TODO(roasbeef): other sync stuffs - case <-p.quit: - break out + select { + case p.sendQueue <- elem.Value.(outgoinMsg): + pendingMsgs.Remove(elem) + case <-p.quit: + return + default: + break + } } - } - close(p.queueQuit) - p.wg.Done() + // If there weren't any messages to send, or the writehandler + // is still blocked, then we'll accept a new message into the + // queue from outside sub-systems. + select { + case <-p.quit: + return + case msg := <-p.outgoingQueue: + pendingMsgs.PushBack(msg) + } + + } } // pingHandler is responsible for periodically sending ping messages to the