diff --git a/peer/brontide.go b/peer/brontide.go index 67531f39b..229013580 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -42,6 +42,7 @@ import ( "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/netann" "github.com/lightningnetwork/lnd/pool" + "github.com/lightningnetwork/lnd/protofsm" "github.com/lightningnetwork/lnd/queue" "github.com/lightningnetwork/lnd/subscribe" "github.com/lightningnetwork/lnd/ticker" @@ -493,6 +494,10 @@ type Brontide struct { // potentially holding lots of un-consumed events. channelEventClient *subscribe.Client + // msgRouter is an instance of the MsgRouter which is used to send off + // new wire messages for handing. + msgRouter fn.Option[protofsm.MsgRouter] + startReady chan struct{} quit chan struct{} wg sync.WaitGroup @@ -530,6 +535,9 @@ func NewBrontide(cfg Config) *Brontide { startReady: make(chan struct{}), quit: make(chan struct{}), log: build.NewPrefixLog(logPrefix, peerLog), + msgRouter: fn.Some[protofsm.MsgRouter]( + protofsm.NewMultiMsgRouter(), + ), } var ( @@ -704,6 +712,12 @@ func (p *Brontide) Start() error { return err } + // Register the message router now as we may need to register some + // endpoints while loading the channels below. + p.msgRouter.WhenSome(func(router protofsm.MsgRouter) { + router.Start() + }) + msgs, err := p.loadActiveChannels(activeChans) if err != nil { return fmt.Errorf("unable to load channels: %w", err) @@ -882,7 +896,8 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) ( p.cfg.Signer, dbChan, p.cfg.SigPool, chanOpts..., ) if err != nil { - return nil, err + return nil, fmt.Errorf("unable to create channel "+ + "state machine: %w", err) } chanPoint := dbChan.FundingOutpoint @@ -1270,6 +1285,10 @@ func (p *Brontide) Disconnect(reason error) { p.log.Errorf("couldn't stop pingManager during disconnect: %v", err) } + + p.msgRouter.WhenSome(func(router protofsm.MsgRouter) { + router.Stop() + }) } // String returns the string representation of this peer. @@ -1709,6 +1728,24 @@ out: } } + // If a message router is active, then we'll try to have it + // handle this message. If it can, then we're able to skip the + // rest of the message handling logic. + err = fn.MapOptionZ( + p.msgRouter, func(r protofsm.MsgRouter) error { + return r.RouteMsg(protofsm.PeerMsg{ + PeerPub: *p.IdentityKey(), + Message: nextMsg, + }) + }, + ) + + // No error occurred, and the message was handled by the + // router. + if err == nil { + continue + } + var ( targetChan lnwire.ChannelID isLinkUpdate bool