peer+server+test_utlils: use new ChanStatusManager

This commit hooks up the new netann.ChanStatusManager,
replacing the prior method which used the
watchChannelStatus goroutine.
This commit is contained in:
Conner Fromknecht
2019-02-14 17:13:44 -08:00
parent 8b185e6301
commit de282172a1
3 changed files with 149 additions and 237 deletions

228
server.go
View File

@@ -89,6 +89,8 @@ type server struct {
// that's backed by the identity private key of the running lnd node.
nodeSigner *netann.NodeSigner
chanStatusMgr *netann.ChanStatusManager
// listenAddrs is the list of addresses the server is currently
// listening on.
listenAddrs []net.Addr
@@ -179,11 +181,6 @@ type server struct {
// changed since last start.
currentNodeAnn *lnwire.NodeAnnouncement
// sendDisabled is used to keep track of the disabled flag of the last
// sent ChannelUpdate from announceChanStatus.
sentDisabled map[wire.OutPoint]bool
sentDisabledMtx sync.Mutex
quit chan struct{}
wg sync.WaitGroup
@@ -300,7 +297,6 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
outboundPeers: make(map[string]*peer),
peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),
peerDisconnectedListeners: make(map[string][]chan<- struct{}),
sentDisabled: make(map[wire.OutPoint]bool),
globalFeatures: lnwire.NewFeatureVector(globalFeatures,
lnwire.GlobalFeatures),
@@ -369,6 +365,24 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
return nil, err
}
chanStatusMgrCfg := &netann.ChanStatusConfig{
ChanStatusSampleInterval: cfg.ChanStatusSampleInterval,
ChanEnableTimeout: cfg.ChanEnableTimeout,
ChanDisableTimeout: cfg.ChanDisableTimeout,
OurPubKey: privKey.PubKey(),
MessageSigner: s.nodeSigner,
IsChannelActive: s.htlcSwitch.HasActiveLink,
ApplyChannelUpdate: s.applyChannelUpdate,
DB: chanDB,
Graph: chanDB.ChannelGraph(),
}
chanStatusMgr, err := netann.NewChanStatusManager(chanStatusMgrCfg)
if err != nil {
return nil, err
}
s.chanStatusMgr = chanStatusMgr
// If enabled, use either UPnP or NAT-PMP to automatically configure
// port forwarding for users behind a NAT.
if cfg.NAT {
@@ -739,9 +753,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
return ErrServerShuttingDown
}
},
DisableChannel: func(op wire.OutPoint) error {
return s.announceChanStatus(op, true)
},
DisableChannel: s.chanStatusMgr.RequestDisable,
Sweeper: s.sweeper,
SettleInvoice: s.invoices.SettleInvoice,
NotifyClosedChannel: s.channelNotifier.NotifyClosedChannelEvent,
@@ -1030,6 +1042,9 @@ func (s *server) Start() error {
if err := s.invoices.Start(); err != nil {
return err
}
if err := s.chanStatusMgr.Start(); err != nil {
return err
}
// With all the relevant sub-systems started, we'll now attempt to
// establish persistent connections to our direct channel collaborators
@@ -1060,11 +1075,6 @@ func (s *server) Start() error {
srvrLog.Infof("Auto peer bootstrapping is disabled")
}
// Start a goroutine that will periodically send out ChannelUpdates
// based on a channel's status.
s.wg.Add(1)
go s.watchChannelStatus()
return nil
}
@@ -1085,6 +1095,7 @@ func (s *server) Stop() error {
}
// Shutdown the wallet, funding manager, and the rpc server.
s.chanStatusMgr.Stop()
s.sigPool.Stop()
s.cc.chainNotifier.Stop()
s.chanRouter.Stop()
@@ -2414,7 +2425,10 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
// Now that we've established a connection, create a peer, and it to
// the set of currently active peers.
p, err := newPeer(conn, connReq, s, peerAddr, inbound, localFeatures)
p, err := newPeer(
conn, connReq, s, peerAddr, inbound, localFeatures,
cfg.ChanEnableTimeout,
)
if err != nil {
srvrLog.Errorf("unable to create peer %v", err)
return
@@ -2995,68 +3009,6 @@ func (s *server) fetchNodeAdvertisedAddr(pub *btcec.PublicKey) (net.Addr, error)
return node.Addresses[0], nil
}
// announceChanStatus disables a channel if disabled=true, otherwise activates
// it. This is done by sending a new channel update across the network with the
// disabled flag set accordingly. The result of disabling the channel is it not
// being able to forward payments.
func (s *server) announceChanStatus(op wire.OutPoint, disabled bool) error {
s.sentDisabledMtx.Lock()
defer s.sentDisabledMtx.Unlock()
// If we have already sent out an update reflecting the current status,
// skip this channel.
alreadyDisabled, ok := s.sentDisabled[op]
if ok && alreadyDisabled == disabled {
return nil
}
// Retrieve the latest update for this channel. We'll use this
// as our starting point to send the new update.
chanUpdate, err := s.fetchLastChanUpdateByOutPoint(op)
if err != nil {
return err
}
// Now, sign a new update toggling the disable bit.
err = netann.SignChannelUpdate(
s.nodeSigner, s.identityPriv.PubKey(), chanUpdate,
netann.ChannelUpdateSetDisable(disabled),
)
if err != nil {
return err
}
srvrLog.Debugf("Announcing channel(%v) disabled=%v", op, disabled)
// Once signed, we'll send the new update to all of our peers.
if err := s.applyChannelUpdate(chanUpdate); err != nil {
return err
}
// We'll keep track of the status set in the last update we sent, to
// avoid sending updates if nothing has changed.
s.sentDisabled[op] = disabled
return nil
}
// fetchLastChanUpdateByOutPoint fetches the latest policy for our direction of
// a channel, and crafts a new ChannelUpdate with this policy. Returns an error
// in case our ChannelEdgePolicy is not found in the database.
func (s *server) fetchLastChanUpdateByOutPoint(op wire.OutPoint) (
*lnwire.ChannelUpdate, error) {
// Get the edge info and policies for this channel from the graph.
graph := s.chanDB.ChannelGraph()
info, edge1, edge2, err := graph.FetchChannelEdgesByOutpoint(&op)
if err != nil {
return nil, err
}
pubKey := s.identityPriv.PubKey().SerializeCompressed()
return netann.ExtractChannelUpdate(pubKey, info, edge1, edge2)
}
// fetchLastChanUpdate returns a function which is able to retrieve our latest
// channel update for a target channel.
func (s *server) fetchLastChanUpdate() func(lnwire.ShortChannelID) (
@@ -3068,6 +3020,7 @@ func (s *server) fetchLastChanUpdate() func(lnwire.ShortChannelID) (
if err != nil {
return nil, err
}
return netann.ExtractChannelUpdate(
ourPubKey[:], info, edge1, edge2,
)
@@ -3086,124 +3039,3 @@ func (s *server) applyChannelUpdate(update *lnwire.ChannelUpdate) error {
return ErrServerShuttingDown
}
}
// watchChannelStatus periodically queries the Switch for the status of the
// open channels, and sends out ChannelUpdates to the network indicating their
// active status. Currently we'll send out either a Disabled or Active update
// if the channel has been in the same status over a given amount of time.
//
// NOTE: This MUST be run as a goroutine.
func (s *server) watchChannelStatus() {
defer s.wg.Done()
// A map with values activeStatus is used to keep track of the first
// time we saw a link changing to the current active status.
type activeStatus struct {
active bool
time time.Time
}
status := make(map[wire.OutPoint]activeStatus)
// We'll check in on the channel statuses every 1/4 of the timeout.
unchangedTimeout := cfg.ChanDisableTimeout
tickerTimeout := unchangedTimeout / 4
if unchangedTimeout == 0 || tickerTimeout == 0 {
srvrLog.Debugf("Won't watch channel statuses")
return
}
ticker := time.NewTicker(tickerTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
channels, err := s.chanDB.FetchAllOpenChannels()
if err != nil {
srvrLog.Errorf("Unable to fetch open "+
"channels: %v", err)
continue
}
// For each open channel, update the status. We'll copy
// the updated statuses to a new map, to avoid keeping
// the status of closed channels around.
newStatus := make(map[wire.OutPoint]activeStatus)
for _, c := range channels {
// We'll skip any private channels, as they
// aren't used for routing within the network by
// other nodes.
if c.ChannelFlags&lnwire.FFAnnounceChannel == 0 {
continue
}
chanID := lnwire.NewChanIDFromOutPoint(
&c.FundingOutpoint)
// Get the current active stauts from the
// Switch.
active := s.htlcSwitch.HasActiveLink(chanID)
var currentStatus activeStatus
// If this link is not in the map, or the
// status has changed, set an updated active
// status.
st, ok := status[c.FundingOutpoint]
if !ok || st.active != active {
currentStatus = activeStatus{
active: active,
time: time.Now(),
}
} else {
// The status is unchanged, we'll keep
// it as is.
currentStatus = st
}
newStatus[c.FundingOutpoint] = currentStatus
}
// Set the status map to the map of new statuses.
status = newStatus
// If no change in status has happened during the last
// interval, we'll send out an update. Note that we add
// the negative of the timeout to set our limit in the
// past.
limit := time.Now().Add(-unchangedTimeout)
// We'll send out an update for all channels that have
// had their status unchanged for longer than the limit.
// NOTE: We also make sure to activate any channel when
// we connect to a peer, to make them available for
// path finding immediately.
for op, st := range status {
disable := !st.active
if st.time.Before(limit) {
// Before we attempt to announce the
// status of the channel, we remove it
// from the status map such that it
// will need a full unchaged interval
// before we attempt to announce its
// status again.
delete(status, op)
err = s.announceChanStatus(op, disable)
if err != nil &&
err != channeldb.ErrEdgeNotFound {
srvrLog.Errorf("Unable to "+
"disable channel %v: %v",
op, err)
}
}
}
case <-s.quit:
return
}
}
}