mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-03-26 01:33:02 +01:00
peer: update readHandler to dispatch to msgRouter if set
Over time with this, we should be able to significantly reduce the size of the peer.Brontide struct as we only need all those deps as the peer needs to recognize and handle each incoming wire message itself.
This commit is contained in:
parent
a888573fcc
commit
e257188017
@ -42,6 +42,7 @@ import (
|
||||
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
|
||||
"github.com/lightningnetwork/lnd/lnwallet/chancloser"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/lightningnetwork/lnd/msgmux"
|
||||
"github.com/lightningnetwork/lnd/netann"
|
||||
"github.com/lightningnetwork/lnd/pool"
|
||||
"github.com/lightningnetwork/lnd/queue"
|
||||
@ -522,6 +523,10 @@ type Brontide struct {
|
||||
// potentially holding lots of un-consumed events.
|
||||
channelEventClient *subscribe.Client
|
||||
|
||||
// msgRouter is an instance of the msgmux.Router which is used to send
|
||||
// off new wire messages for handing.
|
||||
msgRouter fn.Option[msgmux.Router]
|
||||
|
||||
startReady chan struct{}
|
||||
quit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
@ -559,6 +564,9 @@ func NewBrontide(cfg Config) *Brontide {
|
||||
startReady: make(chan struct{}),
|
||||
quit: make(chan struct{}),
|
||||
log: build.NewPrefixLog(logPrefix, peerLog),
|
||||
msgRouter: fn.Some[msgmux.Router](
|
||||
msgmux.NewMultiMsgRouter(),
|
||||
),
|
||||
}
|
||||
|
||||
if cfg.Conn != nil && cfg.Conn.RemoteAddr() != nil {
|
||||
@ -738,6 +746,12 @@ func (p *Brontide) Start() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Register the message router now as we may need to register some
|
||||
// endpoints while loading the channels below.
|
||||
p.msgRouter.WhenSome(func(router msgmux.Router) {
|
||||
router.Start()
|
||||
})
|
||||
|
||||
msgs, err := p.loadActiveChannels(activeChans)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to load channels: %w", err)
|
||||
@ -913,7 +927,8 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
|
||||
p.cfg.Signer, dbChan, p.cfg.SigPool,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("unable to create channel "+
|
||||
"state machine: %w", err)
|
||||
}
|
||||
|
||||
chanPoint := dbChan.FundingOutpoint
|
||||
@ -1368,6 +1383,10 @@ func (p *Brontide) Disconnect(reason error) {
|
||||
p.cfg.Conn.Close()
|
||||
|
||||
close(p.quit)
|
||||
|
||||
p.msgRouter.WhenSome(func(router msgmux.Router) {
|
||||
router.Stop()
|
||||
})
|
||||
}
|
||||
|
||||
// String returns the string representation of this peer.
|
||||
@ -1809,6 +1828,22 @@ out:
|
||||
}
|
||||
}
|
||||
|
||||
// If a message router is active, then we'll try to have it
|
||||
// handle this message. If it can, then we're able to skip the
|
||||
// rest of the message handling logic.
|
||||
err = fn.MapOptionZ(p.msgRouter, func(r msgmux.Router) error {
|
||||
return r.RouteMsg(msgmux.PeerMsg{
|
||||
PeerPub: *p.IdentityKey(),
|
||||
Message: nextMsg,
|
||||
})
|
||||
})
|
||||
|
||||
// No error occurred, and the message was handled by the
|
||||
// router.
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
var (
|
||||
targetChan lnwire.ChannelID
|
||||
isLinkUpdate bool
|
||||
|
Loading…
x
Reference in New Issue
Block a user