|
|
|
@ -585,8 +585,11 @@ type Brontide struct {
|
|
|
|
|
globalMsgRouter bool
|
|
|
|
|
|
|
|
|
|
startReady chan struct{}
|
|
|
|
|
quit chan struct{}
|
|
|
|
|
wg sync.WaitGroup
|
|
|
|
|
|
|
|
|
|
// cg is a helper that encapsulates a wait group and quit channel and
|
|
|
|
|
// allows contexts that either block or cancel on those depending on
|
|
|
|
|
// the use case.
|
|
|
|
|
cg *fn.ContextGuard
|
|
|
|
|
|
|
|
|
|
// log is a peer-specific logging instance.
|
|
|
|
|
log btclog.Logger
|
|
|
|
@ -630,10 +633,10 @@ func NewBrontide(cfg Config) *Brontide {
|
|
|
|
|
chanCloseMsgs: make(chan *closeMsg),
|
|
|
|
|
resentChanSyncMsg: make(map[lnwire.ChannelID]struct{}),
|
|
|
|
|
startReady: make(chan struct{}),
|
|
|
|
|
quit: make(chan struct{}),
|
|
|
|
|
log: peerLog.WithPrefix(logPrefix),
|
|
|
|
|
msgRouter: msgRouter,
|
|
|
|
|
globalMsgRouter: globalMsgRouter,
|
|
|
|
|
cg: fn.NewContextGuard(),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if cfg.Conn != nil && cfg.Conn.RemoteAddr() != nil {
|
|
|
|
@ -757,9 +760,9 @@ func (p *Brontide) Start() error {
|
|
|
|
|
// message MUST be sent before any other message.
|
|
|
|
|
readErr := make(chan error, 1)
|
|
|
|
|
msgChan := make(chan lnwire.Message, 1)
|
|
|
|
|
p.wg.Add(1)
|
|
|
|
|
p.cg.WgAdd(1)
|
|
|
|
|
go func() {
|
|
|
|
|
defer p.wg.Done()
|
|
|
|
|
defer p.cg.WgDone()
|
|
|
|
|
|
|
|
|
|
msg, err := p.readNextMessage()
|
|
|
|
|
if err != nil {
|
|
|
|
@ -848,7 +851,7 @@ func (p *Brontide) Start() error {
|
|
|
|
|
return fmt.Errorf("could not start ping manager %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
p.wg.Add(4)
|
|
|
|
|
p.cg.WgAdd(4)
|
|
|
|
|
go p.queueHandler()
|
|
|
|
|
go p.writeHandler()
|
|
|
|
|
go p.channelManager()
|
|
|
|
@ -868,7 +871,7 @@ func (p *Brontide) Start() error {
|
|
|
|
|
//
|
|
|
|
|
// TODO(wilmer): Remove this once we're able to query for node
|
|
|
|
|
// announcements through their timestamps.
|
|
|
|
|
p.wg.Add(2)
|
|
|
|
|
p.cg.WgAdd(2)
|
|
|
|
|
go p.maybeSendNodeAnn(activeChans)
|
|
|
|
|
go p.maybeSendChannelUpdates()
|
|
|
|
|
|
|
|
|
@ -917,7 +920,7 @@ func (p *Brontide) taprootShutdownAllowed() bool {
|
|
|
|
|
//
|
|
|
|
|
// NOTE: Part of the lnpeer.Peer interface.
|
|
|
|
|
func (p *Brontide) QuitSignal() <-chan struct{} {
|
|
|
|
|
return p.quit
|
|
|
|
|
return p.cg.Done()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// addrWithInternalKey takes a delivery script, then attempts to supplement it
|
|
|
|
@ -1279,7 +1282,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint,
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case p.linkFailures <- failure:
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
case <-p.cfg.Quit:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1356,7 +1359,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint,
|
|
|
|
|
// maybeSendNodeAnn sends our node announcement to the remote peer if at least
|
|
|
|
|
// one confirmed public channel exists with them.
|
|
|
|
|
func (p *Brontide) maybeSendNodeAnn(channels []*channeldb.OpenChannel) {
|
|
|
|
|
defer p.wg.Done()
|
|
|
|
|
defer p.cg.WgDone()
|
|
|
|
|
|
|
|
|
|
hasConfirmedPublicChan := false
|
|
|
|
|
for _, channel := range channels {
|
|
|
|
@ -1388,7 +1391,7 @@ func (p *Brontide) maybeSendNodeAnn(channels []*channeldb.OpenChannel) {
|
|
|
|
|
// maybeSendChannelUpdates sends our channel updates to the remote peer if we
|
|
|
|
|
// have any active channels with them.
|
|
|
|
|
func (p *Brontide) maybeSendChannelUpdates() {
|
|
|
|
|
defer p.wg.Done()
|
|
|
|
|
defer p.cg.WgDone()
|
|
|
|
|
|
|
|
|
|
// If we don't have any active channels, then we can exit early.
|
|
|
|
|
if p.activeChannels.Len() == 0 {
|
|
|
|
@ -1464,16 +1467,16 @@ func (p *Brontide) WaitForDisconnect(ready chan struct{}) {
|
|
|
|
|
// set of goroutines are already active.
|
|
|
|
|
select {
|
|
|
|
|
case <-p.startReady:
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case <-ready:
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
p.wg.Wait()
|
|
|
|
|
p.cg.WgWait()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Disconnect terminates the connection with the remote peer. Additionally, a
|
|
|
|
@ -1495,7 +1498,7 @@ func (p *Brontide) Disconnect(reason error) {
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case <-p.startReady:
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1511,7 +1514,7 @@ func (p *Brontide) Disconnect(reason error) {
|
|
|
|
|
// Ensure that the TCP connection is properly closed before continuing.
|
|
|
|
|
p.cfg.Conn.Close()
|
|
|
|
|
|
|
|
|
|
close(p.quit)
|
|
|
|
|
p.cg.Quit()
|
|
|
|
|
|
|
|
|
|
// If our msg router isn't global (local to this instance), then we'll
|
|
|
|
|
// stop it. Otherwise, we'll leave it running.
|
|
|
|
@ -1692,7 +1695,7 @@ func (ms *msgStream) msgConsumer() {
|
|
|
|
|
// Otherwise, we'll check the message queue for any new
|
|
|
|
|
// items.
|
|
|
|
|
select {
|
|
|
|
|
case <-ms.peer.quit:
|
|
|
|
|
case <-ms.peer.cg.Done():
|
|
|
|
|
ms.msgCond.L.Unlock()
|
|
|
|
|
return
|
|
|
|
|
case <-ms.quit:
|
|
|
|
@ -1719,7 +1722,7 @@ func (ms *msgStream) msgConsumer() {
|
|
|
|
|
// grow indefinitely.
|
|
|
|
|
select {
|
|
|
|
|
case ms.producerSema <- struct{}{}:
|
|
|
|
|
case <-ms.peer.quit:
|
|
|
|
|
case <-ms.peer.cg.Done():
|
|
|
|
|
return
|
|
|
|
|
case <-ms.quit:
|
|
|
|
|
return
|
|
|
|
@ -1737,7 +1740,7 @@ func (ms *msgStream) AddMsg(msg lnwire.Message) {
|
|
|
|
|
// we're signalled to quit, or a slot is freed up.
|
|
|
|
|
select {
|
|
|
|
|
case <-ms.producerSema:
|
|
|
|
|
case <-ms.peer.quit:
|
|
|
|
|
case <-ms.peer.cg.Done():
|
|
|
|
|
return
|
|
|
|
|
case <-ms.quit:
|
|
|
|
|
return
|
|
|
|
@ -1814,7 +1817,7 @@ func waitUntilLinkActive(p *Brontide,
|
|
|
|
|
// calling function should catch it.
|
|
|
|
|
return p.fetchLinkFromKeyAndCid(cid)
|
|
|
|
|
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1848,7 +1851,7 @@ func newChanMsgStream(p *Brontide, cid lnwire.ChannelID) *msgStream {
|
|
|
|
|
// as the peer is exiting, we'll check quickly to see
|
|
|
|
|
// if we need to exit.
|
|
|
|
|
select {
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
return
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
@ -1888,7 +1891,7 @@ func newDiscMsgStream(p *Brontide) *msgStream {
|
|
|
|
|
//
|
|
|
|
|
// NOTE: This method MUST be run as a goroutine.
|
|
|
|
|
func (p *Brontide) readHandler() {
|
|
|
|
|
defer p.wg.Done()
|
|
|
|
|
defer p.cg.WgDone()
|
|
|
|
|
|
|
|
|
|
// We'll stop the timer after a new messages is received, and also
|
|
|
|
|
// reset it after we process the next message.
|
|
|
|
@ -2009,13 +2012,13 @@ out:
|
|
|
|
|
case *lnwire.Shutdown:
|
|
|
|
|
select {
|
|
|
|
|
case p.chanCloseMsgs <- &closeMsg{msg.ChannelID, msg}:
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
break out
|
|
|
|
|
}
|
|
|
|
|
case *lnwire.ClosingSigned:
|
|
|
|
|
select {
|
|
|
|
|
case p.chanCloseMsgs <- &closeMsg{msg.ChannelID, msg}:
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
break out
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -2587,7 +2590,7 @@ out:
|
|
|
|
|
break out
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
exitErr = lnpeer.ErrPeerExiting
|
|
|
|
|
break out
|
|
|
|
|
}
|
|
|
|
@ -2595,7 +2598,7 @@ out:
|
|
|
|
|
|
|
|
|
|
// Avoid an exit deadlock by ensuring WaitGroups are decremented before
|
|
|
|
|
// disconnect.
|
|
|
|
|
p.wg.Done()
|
|
|
|
|
p.cg.WgDone()
|
|
|
|
|
|
|
|
|
|
p.Disconnect(exitErr)
|
|
|
|
|
|
|
|
|
@ -2607,7 +2610,7 @@ out:
|
|
|
|
|
//
|
|
|
|
|
// NOTE: This method MUST be run as a goroutine.
|
|
|
|
|
func (p *Brontide) queueHandler() {
|
|
|
|
|
defer p.wg.Done()
|
|
|
|
|
defer p.cg.WgDone()
|
|
|
|
|
|
|
|
|
|
// priorityMsgs holds an in order list of messages deemed high-priority
|
|
|
|
|
// to be added to the sendQueue. This predominately includes messages
|
|
|
|
@ -2648,7 +2651,7 @@ func (p *Brontide) queueHandler() {
|
|
|
|
|
} else {
|
|
|
|
|
lazyMsgs.PushBack(msg)
|
|
|
|
|
}
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
@ -2662,7 +2665,7 @@ func (p *Brontide) queueHandler() {
|
|
|
|
|
} else {
|
|
|
|
|
lazyMsgs.PushBack(msg)
|
|
|
|
|
}
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -2696,7 +2699,7 @@ func (p *Brontide) queue(priority bool, msg lnwire.Message,
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case p.outgoingQueue <- outgoingMsg{priority, msg, errChan}:
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
p.log.Tracef("Peer shutting down, could not enqueue msg: %v.",
|
|
|
|
|
spew.Sdump(msg))
|
|
|
|
|
if errChan != nil {
|
|
|
|
@ -2764,7 +2767,7 @@ func (p *Brontide) genDeliveryScript() ([]byte, error) {
|
|
|
|
|
//
|
|
|
|
|
// NOTE: This method MUST be run as a goroutine.
|
|
|
|
|
func (p *Brontide) channelManager() {
|
|
|
|
|
defer p.wg.Done()
|
|
|
|
|
defer p.cg.WgDone()
|
|
|
|
|
|
|
|
|
|
// reenableTimeout will fire once after the configured channel status
|
|
|
|
|
// interval has elapsed. This will trigger us to sign new channel
|
|
|
|
@ -2840,7 +2843,7 @@ out:
|
|
|
|
|
p.channelEventClient.Cancel()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
// As, we've been signalled to exit, we'll reset all
|
|
|
|
|
// our active channel back to their default state.
|
|
|
|
|
p.activeChannels.ForEach(func(_ lnwire.ChannelID,
|
|
|
|
@ -3123,7 +3126,7 @@ func (p *Brontide) retryRequestEnable(activeChans map[wire.OutPoint]struct{}) {
|
|
|
|
|
p.log.Warnf("Re-enable channel %v failed, received "+
|
|
|
|
|
"inactive link event", chanPoint)
|
|
|
|
|
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
p.log.Debugf("Peer shutdown during retry enabling")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
@ -3294,7 +3297,6 @@ func (p *Brontide) createChanCloser(channel *lnwallet.LightningChannel,
|
|
|
|
|
return p.cfg.DisconnectPeer(p.IdentityKey())
|
|
|
|
|
},
|
|
|
|
|
ChainParams: &p.cfg.Wallet.Cfg.NetParams,
|
|
|
|
|
Quit: p.quit,
|
|
|
|
|
},
|
|
|
|
|
*deliveryScript,
|
|
|
|
|
fee,
|
|
|
|
@ -3859,7 +3861,7 @@ func (p *Brontide) sendMessage(sync, priority bool, msgs ...lnwire.Message) erro
|
|
|
|
|
select {
|
|
|
|
|
case err := <-errChan:
|
|
|
|
|
return err
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
return lnpeer.ErrPeerExiting
|
|
|
|
|
case <-p.cfg.Quit:
|
|
|
|
|
return lnpeer.ErrPeerExiting
|
|
|
|
@ -3907,7 +3909,7 @@ func (p *Brontide) AddNewChannel(newChan *lnpeer.NewChannel,
|
|
|
|
|
case p.newActiveChannel <- newChanMsg:
|
|
|
|
|
case <-cancel:
|
|
|
|
|
return errors.New("canceled adding new channel")
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
return lnpeer.ErrPeerExiting
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -3916,7 +3918,7 @@ func (p *Brontide) AddNewChannel(newChan *lnpeer.NewChannel,
|
|
|
|
|
select {
|
|
|
|
|
case err := <-errChan:
|
|
|
|
|
return err
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
return lnpeer.ErrPeerExiting
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -3940,7 +3942,7 @@ func (p *Brontide) AddPendingChannel(cid lnwire.ChannelID,
|
|
|
|
|
case <-cancel:
|
|
|
|
|
return errors.New("canceled adding pending channel")
|
|
|
|
|
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
return lnpeer.ErrPeerExiting
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -3954,7 +3956,7 @@ func (p *Brontide) AddPendingChannel(cid lnwire.ChannelID,
|
|
|
|
|
case <-cancel:
|
|
|
|
|
return errors.New("canceled adding pending channel")
|
|
|
|
|
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
return lnpeer.ErrPeerExiting
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -3971,7 +3973,7 @@ func (p *Brontide) RemovePendingChannel(cid lnwire.ChannelID) error {
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case p.removePendingChannel <- newChanMsg:
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
return lnpeer.ErrPeerExiting
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -3982,7 +3984,7 @@ func (p *Brontide) RemovePendingChannel(cid lnwire.ChannelID) error {
|
|
|
|
|
case err := <-errChan:
|
|
|
|
|
return err
|
|
|
|
|
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
return lnpeer.ErrPeerExiting
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -4134,7 +4136,7 @@ func (p *Brontide) HandleLocalCloseChanReqs(req *htlcswitch.ChanClose) {
|
|
|
|
|
case p.localCloseChanReqs <- req:
|
|
|
|
|
p.log.Info("Local close channel request is going to be " +
|
|
|
|
|
"delivered to the peer")
|
|
|
|
|
case <-p.quit:
|
|
|
|
|
case <-p.cg.Done():
|
|
|
|
|
p.log.Info("Unable to deliver local close channel request " +
|
|
|
|
|
"to peer")
|
|
|
|
|
}
|
|
|
|
@ -4464,7 +4466,7 @@ func (p *Brontide) sendLinkUpdateMsg(cid lnwire.ChannelID, msg lnwire.Message) {
|
|
|
|
|
|
|
|
|
|
// Stop the stream when quit.
|
|
|
|
|
go func() {
|
|
|
|
|
<-p.quit
|
|
|
|
|
<-p.cg.Done()
|
|
|
|
|
chanStream.Stop()
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|