mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-11-09 13:55:17 +01:00
peer+protofsm: move MsgRouter to new protofsm package
Without this, any other sub-system that wants to use the interface may run into an import cycle.
This commit is contained in:
committed by
Oliver Gugger
parent
b9786e1f20
commit
c9d8adc83c
@@ -44,7 +44,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/lnwallet/btcwallet"
|
"github.com/lightningnetwork/lnd/lnwallet/btcwallet"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet/rpcwallet"
|
"github.com/lightningnetwork/lnd/lnwallet/rpcwallet"
|
||||||
"github.com/lightningnetwork/lnd/macaroons"
|
"github.com/lightningnetwork/lnd/macaroons"
|
||||||
"github.com/lightningnetwork/lnd/peer"
|
"github.com/lightningnetwork/lnd/protofsm"
|
||||||
"github.com/lightningnetwork/lnd/rpcperms"
|
"github.com/lightningnetwork/lnd/rpcperms"
|
||||||
"github.com/lightningnetwork/lnd/signal"
|
"github.com/lightningnetwork/lnd/signal"
|
||||||
"github.com/lightningnetwork/lnd/sqldb"
|
"github.com/lightningnetwork/lnd/sqldb"
|
||||||
@@ -161,7 +161,7 @@ type AuxComponents struct {
|
|||||||
|
|
||||||
// MsgRouter is an optional message router that if set will be used in
|
// MsgRouter is an optional message router that if set will be used in
|
||||||
// place of a new balnk default message router.
|
// place of a new balnk default message router.
|
||||||
MsgRouter fn.Option[peer.MsgRouter]
|
MsgRouter fn.Option[protofsm.MsgRouter]
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultWalletImpl is the default implementation of our normal, btcwallet
|
// DefaultWalletImpl is the default implementation of our normal, btcwallet
|
||||||
|
|||||||
@@ -42,6 +42,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
"github.com/lightningnetwork/lnd/netann"
|
"github.com/lightningnetwork/lnd/netann"
|
||||||
"github.com/lightningnetwork/lnd/pool"
|
"github.com/lightningnetwork/lnd/pool"
|
||||||
|
"github.com/lightningnetwork/lnd/protofsm"
|
||||||
"github.com/lightningnetwork/lnd/queue"
|
"github.com/lightningnetwork/lnd/queue"
|
||||||
"github.com/lightningnetwork/lnd/subscribe"
|
"github.com/lightningnetwork/lnd/subscribe"
|
||||||
"github.com/lightningnetwork/lnd/ticker"
|
"github.com/lightningnetwork/lnd/ticker"
|
||||||
@@ -377,7 +378,7 @@ type Config struct {
|
|||||||
// MsgRouter is an optional instance of the main message router that
|
// MsgRouter is an optional instance of the main message router that
|
||||||
// the peer will use. If None, then a new default version will be used
|
// the peer will use. If None, then a new default version will be used
|
||||||
// in place.
|
// in place.
|
||||||
MsgRouter fn.Option[MsgRouter]
|
MsgRouter fn.Option[protofsm.MsgRouter]
|
||||||
|
|
||||||
// Quit is the server's quit channel. If this is closed, we halt operation.
|
// Quit is the server's quit channel. If this is closed, we halt operation.
|
||||||
Quit chan struct{}
|
Quit chan struct{}
|
||||||
@@ -500,7 +501,7 @@ type Brontide struct {
|
|||||||
|
|
||||||
// msgRouter is an instance of the MsgRouter which is used to send off
|
// msgRouter is an instance of the MsgRouter which is used to send off
|
||||||
// new wire messages for handing.
|
// new wire messages for handing.
|
||||||
msgRouter fn.Option[MsgRouter]
|
msgRouter fn.Option[protofsm.MsgRouter]
|
||||||
|
|
||||||
startReady chan struct{}
|
startReady chan struct{}
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
@@ -522,7 +523,7 @@ func NewBrontide(cfg Config) *Brontide {
|
|||||||
//
|
//
|
||||||
// TODO(roasbeef): extend w/ source peer info?
|
// TODO(roasbeef): extend w/ source peer info?
|
||||||
msgRouter := cfg.MsgRouter.Alt(
|
msgRouter := cfg.MsgRouter.Alt(
|
||||||
fn.Some[MsgRouter](NewMultiMsgRouter()),
|
fn.Some[protofsm.MsgRouter](protofsm.NewMultiMsgRouter()),
|
||||||
)
|
)
|
||||||
|
|
||||||
p := &Brontide{
|
p := &Brontide{
|
||||||
@@ -724,7 +725,7 @@ func (p *Brontide) Start() error {
|
|||||||
|
|
||||||
// Register the message router now as we may need to register some
|
// Register the message router now as we may need to register some
|
||||||
// endpoints while loading the channels below.
|
// endpoints while loading the channels below.
|
||||||
p.msgRouter.WhenSome(func(router MsgRouter) {
|
p.msgRouter.WhenSome(func(router protofsm.MsgRouter) {
|
||||||
router.Start()
|
router.Start()
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -1294,7 +1295,7 @@ func (p *Brontide) Disconnect(reason error) {
|
|||||||
err)
|
err)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.msgRouter.WhenSome(func(router MsgRouter) {
|
p.msgRouter.WhenSome(func(router protofsm.MsgRouter) {
|
||||||
router.Stop()
|
router.Stop()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -1739,7 +1740,7 @@ out:
|
|||||||
// If a message router is active, then we'll try to have it
|
// If a message router is active, then we'll try to have it
|
||||||
// handle this message. If it can, then we're able to skip the
|
// handle this message. If it can, then we're able to skip the
|
||||||
// rest of the message handling logic.
|
// rest of the message handling logic.
|
||||||
ok := fn.MapOptionZ(p.msgRouter, func(r MsgRouter) error {
|
ok := fn.MapOptionZ(p.msgRouter, func(r protofsm.MsgRouter) error {
|
||||||
return r.RouteMsg(nextMsg)
|
return r.RouteMsg(nextMsg)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
45
protofsm/log.go
Normal file
45
protofsm/log.go
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
package protofsm
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/btcsuite/btclog"
|
||||||
|
"github.com/lightningnetwork/lnd/build"
|
||||||
|
)
|
||||||
|
|
||||||
|
// log is a logger that is initialized with no output filters. This
|
||||||
|
// means the package will not perform any logging by default until the caller
|
||||||
|
// requests it.
|
||||||
|
var log btclog.Logger
|
||||||
|
|
||||||
|
// The default amount of logging is none.
|
||||||
|
func init() {
|
||||||
|
UseLogger(build.NewSubLogger("PFSM", nil))
|
||||||
|
}
|
||||||
|
|
||||||
|
// DisableLog disables all library log output. Logging output is disabled
|
||||||
|
// by default until UseLogger is called.
|
||||||
|
func DisableLog() {
|
||||||
|
UseLogger(btclog.Disabled)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UseLogger uses a specified Logger to output package logging info.
|
||||||
|
// This should be used in preference to SetLogWriter if the caller is also
|
||||||
|
// using btclog.
|
||||||
|
func UseLogger(logger btclog.Logger) {
|
||||||
|
log = logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// logClosure is used to provide a closure over expensive logging operations
|
||||||
|
// so they aren't performed when the logging level doesn't warrant it.
|
||||||
|
type logClosure func() string
|
||||||
|
|
||||||
|
// String invokes the underlying function and returns the result.
|
||||||
|
func (c logClosure) String() string {
|
||||||
|
return c()
|
||||||
|
}
|
||||||
|
|
||||||
|
// newLogClosure returns a new closure over a function that returns a string
|
||||||
|
// which itself provides a Stringer interface so that it can be used with the
|
||||||
|
// logging system.
|
||||||
|
func newLogClosure(c func() string) logClosure {
|
||||||
|
return logClosure(c)
|
||||||
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package peer
|
package protofsm
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -152,7 +152,7 @@ func NewMultiMsgRouter() *MultiMsgRouter {
|
|||||||
|
|
||||||
// Start starts the peer message router.
|
// Start starts the peer message router.
|
||||||
func (p *MultiMsgRouter) Start() {
|
func (p *MultiMsgRouter) Start() {
|
||||||
peerLog.Infof("Starting MsgRouter")
|
log.Infof("Starting MsgRouter")
|
||||||
|
|
||||||
p.startOnce.Do(func() {
|
p.startOnce.Do(func() {
|
||||||
p.wg.Add(1)
|
p.wg.Add(1)
|
||||||
@@ -162,7 +162,7 @@ func (p *MultiMsgRouter) Start() {
|
|||||||
|
|
||||||
// Stop stops the peer message router.
|
// Stop stops the peer message router.
|
||||||
func (p *MultiMsgRouter) Stop() {
|
func (p *MultiMsgRouter) Stop() {
|
||||||
peerLog.Infof("Stopping MsgRouter")
|
log.Infof("Stopping MsgRouter")
|
||||||
|
|
||||||
p.stopOnce.Do(func() {
|
p.stopOnce.Do(func() {
|
||||||
close(p.quit)
|
close(p.quit)
|
||||||
@@ -214,13 +214,13 @@ func (p *MultiMsgRouter) msgRouter() {
|
|||||||
case newEndpointMsg := <-p.registerChan:
|
case newEndpointMsg := <-p.registerChan:
|
||||||
endpoint := newEndpointMsg.query
|
endpoint := newEndpointMsg.query
|
||||||
|
|
||||||
peerLog.Infof("MsgRouter: registering new "+
|
log.Infof("MsgRouter: registering new "+
|
||||||
"MsgEndpoint(%s)", endpoint.Name())
|
"MsgEndpoint(%s)", endpoint.Name())
|
||||||
|
|
||||||
// If this endpoint already exists, then we'll return
|
// If this endpoint already exists, then we'll return
|
||||||
// an error as we require unique names.
|
// an error as we require unique names.
|
||||||
if _, ok := endpoints[endpoint.Name()]; ok {
|
if _, ok := endpoints[endpoint.Name()]; ok {
|
||||||
peerLog.Errorf("MsgRouter: rejecting "+
|
log.Errorf("MsgRouter: rejecting "+
|
||||||
"duplicate endpoint: %v",
|
"duplicate endpoint: %v",
|
||||||
endpoint.Name())
|
endpoint.Name())
|
||||||
|
|
||||||
@@ -243,7 +243,7 @@ func (p *MultiMsgRouter) msgRouter() {
|
|||||||
case endpointName := <-p.unregisterChan:
|
case endpointName := <-p.unregisterChan:
|
||||||
delete(endpoints, endpointName.query)
|
delete(endpoints, endpointName.query)
|
||||||
|
|
||||||
peerLog.Infof("MsgRouter: unregistering "+
|
log.Infof("MsgRouter: unregistering "+
|
||||||
"MsgEndpoint(%s)", endpointName.query)
|
"MsgEndpoint(%s)", endpointName.query)
|
||||||
|
|
||||||
endpointName.respChan <- fn.NewRight[error, error](
|
endpointName.respChan <- fn.NewRight[error, error](
|
||||||
@@ -260,7 +260,7 @@ func (p *MultiMsgRouter) msgRouter() {
|
|||||||
var couldSend bool
|
var couldSend bool
|
||||||
for _, endpoint := range endpoints {
|
for _, endpoint := range endpoints {
|
||||||
if endpoint.CanHandle(msg) {
|
if endpoint.CanHandle(msg) {
|
||||||
peerLog.Tracef("MsgRouter: sending "+
|
log.Tracef("MsgRouter: sending "+
|
||||||
"msg %T to endpoint %s", msg,
|
"msg %T to endpoint %s", msg,
|
||||||
endpoint.Name())
|
endpoint.Name())
|
||||||
|
|
||||||
@@ -271,7 +271,7 @@ func (p *MultiMsgRouter) msgRouter() {
|
|||||||
|
|
||||||
var err error
|
var err error
|
||||||
if !couldSend {
|
if !couldSend {
|
||||||
peerLog.Tracef("MsgRouter: unable to route "+
|
log.Tracef("MsgRouter: unable to route "+
|
||||||
"msg %T", msg)
|
"msg %T", msg)
|
||||||
|
|
||||||
err = ErrUnableToRouteMsg
|
err = ErrUnableToRouteMsg
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package peer
|
package protofsm
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
Reference in New Issue
Block a user