diff --git a/channelnotifier/channelnotifier.go b/channelnotifier/channelnotifier.go index e6a1fada7..5d67fd51c 100644 --- a/channelnotifier/channelnotifier.go +++ b/channelnotifier/channelnotifier.go @@ -40,6 +40,13 @@ type OpenChannelEvent struct { Channel *channeldb.OpenChannel } +// ActiveLinkEvent represents a new event where the link becomes active in the +// switch. This happens before the ActiveChannelEvent. +type ActiveLinkEvent struct { + // ChannelPoint is the channel point for the newly active channel. + ChannelPoint *wire.OutPoint +} + // ActiveChannelEvent represents a new event where a channel becomes active. type ActiveChannelEvent struct { // ChannelPoint is the channelpoint for the newly active channel. @@ -146,6 +153,15 @@ func (c *ChannelNotifier) NotifyClosedChannelEvent(chanPoint wire.OutPoint) { } } +// NotifyActiveLinkEvent notifies the channelEventNotifier goroutine that a +// link has been added to the switch. +func (c *ChannelNotifier) NotifyActiveLinkEvent(chanPoint wire.OutPoint) { + event := ActiveLinkEvent{ChannelPoint: &chanPoint} + if err := c.ntfnServer.SendUpdate(event); err != nil { + log.Warnf("Unable to send active link update: %v", err) + } +} + // NotifyActiveChannelEvent notifies the channelEventNotifier goroutine that a // channel is active. func (c *ChannelNotifier) NotifyActiveChannelEvent(chanPoint wire.OutPoint) { diff --git a/htlcswitch/link.go b/htlcswitch/link.go index ff631a59d..d91bc8568 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -265,6 +265,10 @@ type ChannelLinkConfig struct { // initiator of the channel. MaxFeeAllocation float64 + // NotifyActiveLink allows the link to tell the ChannelNotifier when a + // link is first started. + NotifyActiveLink func(wire.OutPoint) + // NotifyActiveChannel allows the link to tell the ChannelNotifier when // channels becomes active. NotifyActiveChannel func(wire.OutPoint) @@ -870,6 +874,10 @@ func (l *channelLink) htlcManager() { l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth()) + // Notify any clients that the link is now in the switch via an + // ActiveLinkEvent. + l.cfg.NotifyActiveLink(*l.ChannelPoint()) + // TODO(roasbeef): need to call wipe chan whenever D/C? // If this isn't the first time that this channel link has been diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index d7f95df16..d150ddc55 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1748,6 +1748,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( MaxFeeUpdateTimeout: 40 * time.Minute, MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry, MaxFeeAllocation: DefaultMaxLinkFeeAllocation, + NotifyActiveLink: func(wire.OutPoint) {}, NotifyActiveChannel: func(wire.OutPoint) {}, NotifyInactiveChannel: func(wire.OutPoint) {}, HtlcNotifier: aliceSwitch.cfg.HtlcNotifier, @@ -4474,6 +4475,7 @@ func (h *persistentLinkHarness) restartLink( HodlMask: hodl.MaskFromFlags(hodlFlags...), MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry, MaxFeeAllocation: DefaultMaxLinkFeeAllocation, + NotifyActiveLink: func(wire.OutPoint) {}, NotifyActiveChannel: func(wire.OutPoint) {}, NotifyInactiveChannel: func(wire.OutPoint) {}, HtlcNotifier: aliceSwitch.cfg.HtlcNotifier, diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index f628f280b..f302ff01c 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -1173,6 +1173,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer, OutgoingCltvRejectDelta: 3, MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry, MaxFeeAllocation: DefaultMaxLinkFeeAllocation, + NotifyActiveLink: func(wire.OutPoint) {}, NotifyActiveChannel: func(wire.OutPoint) {}, NotifyInactiveChannel: func(wire.OutPoint) {}, HtlcNotifier: server.htlcSwitch.cfg.HtlcNotifier, diff --git a/peer.go b/peer.go index 2236e66c3..df77c4fd6 100644 --- a/peer.go +++ b/peer.go @@ -21,6 +21,7 @@ import ( "github.com/lightningnetwork/lnd/buffer" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/feature" "github.com/lightningnetwork/lnd/htlcswitch" @@ -159,6 +160,13 @@ type peer struct { // activeChannels is a map which stores the state machines of all // active channels. Channels are indexed into the map by the txid of // the funding transaction which opened the channel. + // + // NOTE: On startup, pending channels are stored as nil in this map. + // Confirmed channels have channel data populated in the map. This means + // that accesses to this map should nil-check the LightningChannel to + // see if this is a pending channel or not. The tradeoff here is either + // having two maps everywhere (one for pending, one for confirmed chans) + // or having an extra nil-check per access. activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel // addedChannels tracks any new channels opened during this peer's @@ -563,9 +571,21 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) ( peerLog.Tracef("Using link policy of: %v", spew.Sdump(forwardingPolicy)) - // Register this new channel link with the HTLC Switch. This is - // necessary to properly route multi-hop payments, and forward - // new payments triggered by RPC clients. + // If the channel is pending, set the value to nil in the + // activeChannels map. This is done to signify that the channel is + // pending. We don't add the link to the switch here - it's the funding + // manager's responsibility to spin up pending channels. Adding them + // here would just be extra work as we'll tear them down when creating + // + adding the final link. + if lnChan.IsPending() { + p.activeChanMtx.Lock() + p.activeChannels[chanID] = nil + p.activeChanMtx.Unlock() + + continue + } + + // Subscribe to the set of on-chain events for this channel. chainEvents, err := p.server.chainArb.SubscribeChannelEvents( *chanPoint, ) @@ -573,7 +593,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) ( return nil, err } - // Create the link and add it to the switch. err = p.addLink( chanPoint, lnChan, forwardingPolicy, chainEvents, currentHeight, true, @@ -649,6 +668,7 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, TowerClient: p.server.towerClient, MaxOutgoingCltvExpiry: cfg.MaxOutgoingCltvExpiry, MaxFeeAllocation: cfg.MaxChannelFeeAllocation, + NotifyActiveLink: p.server.channelNotifier.NotifyActiveLinkEvent, NotifyActiveChannel: p.server.channelNotifier.NotifyActiveChannelEvent, NotifyInactiveChannel: p.server.channelNotifier.NotifyInactiveChannelEvent, HtlcNotifier: p.server.htlcNotifier, @@ -925,6 +945,69 @@ func (ms *msgStream) AddMsg(msg lnwire.Message) { ms.msgCond.Signal() } +// waitUntilLinkActive waits until the target link is active and returns a +// ChannelLink to pass messages to. It accomplishes this by subscribing to +// an ActiveLinkEvent which is emitted by the link when it first starts up. +func waitUntilLinkActive(p *peer, cid lnwire.ChannelID) htlcswitch.ChannelLink { + // Subscribe to receive channel events. + // + // NOTE: If the link is already active by SubscribeChannelEvents, then + // GetLink will retrieve the link and we can send messages. If the link + // becomes active between SubscribeChannelEvents and GetLink, then GetLink + // will retrieve the link. If the link becomes active after GetLink, then + // we will get an ActiveLinkEvent notification and retrieve the link. If + // the call to GetLink is before SubscribeChannelEvents, however, there + // will be a race condition. + sub, err := p.server.channelNotifier.SubscribeChannelEvents() + if err != nil { + // If we have a non-nil error, then the server is shutting down and we + // can exit here and return nil. This means no message will be delivered + // to the link. + return nil + } + defer sub.Cancel() + + // The link may already be active by this point, and we may have missed the + // ActiveLinkEvent. Check if the link exists. + link, _ := p.server.htlcSwitch.GetLink(cid) + if link != nil { + return link + } + + // If the link is nil, we must wait for it to be active. + for { + select { + // A new event has been sent by the ChannelNotifier. We first check + // whether the event is an ActiveLinkEvent. If it is, we'll check + // that the event is for this channel. Otherwise, we discard the + // message. + case e := <-sub.Updates(): + event, ok := e.(channelnotifier.ActiveLinkEvent) + if !ok { + // Ignore this notification. + continue + } + + chanPoint := event.ChannelPoint + + // Check whether the retrieved chanPoint matches the target + // channel id. + if !cid.IsChanPoint(chanPoint) { + continue + } + + // The link shouldn't be nil as we received an + // ActiveLinkEvent. If it is nil, we return nil and the + // calling function should catch it. + link, _ = p.server.htlcSwitch.GetLink(cid) + return link + + case <-p.quit: + return nil + } + } +} + // newChanMsgStream is used to create a msgStream between the peer and // particular channel link in the htlcswitch. We utilize additional // synchronization with the fundingManager to ensure we don't attempt to @@ -940,51 +1023,17 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid[:]), 1000, func(msg lnwire.Message) { - _, isChanSyncMsg := msg.(*lnwire.ChannelReestablish) - - // If this is the chanSync message, then we'll deliver - // it immediately to the active link. - if !isChanSyncMsg { - // We'll send a message to the funding manager - // and wait iff an active funding process for - // this channel hasn't yet completed. We do - // this in order to account for the following - // scenario: we send the funding locked message - // to the other side, they immediately send a - // channel update message, but we haven't yet - // sent the channel to the channelManager. - err := p.server.fundingMgr.waitUntilChannelOpen( - cid, p.quit, - ) - if err != nil { - // If we have a non-nil error, then the - // funding manager is shutting down, s - // we can exit here without attempting - // to deliver the message. - return - } - } - - // In order to avoid unnecessarily delivering message - // as the peer is exiting, we'll check quickly to see - // if we need to exit. - select { - case <-p.quit: - return - default: - } - - // Dispatch the commitment update message to the proper - // active goroutine dedicated to this channel. + // This check is fine because if the link no longer exists, it will + // be removed from the activeChannels map and subsequent messages + // shouldn't reach the chan msg stream. if chanLink == nil { - link, err := p.server.htlcSwitch.GetLink(cid) - if err != nil { - peerLog.Errorf("recv'd update for "+ - "unknown channel %v from %v: "+ - "%v", cid, p, err) + chanLink = waitUntilLinkActive(p, cid) + + // If the link is still not active and the calling function + // errored out, just return. + if chanLink == nil { return } - chanLink = link } // In order to avoid unnecessarily delivering message @@ -1227,13 +1276,23 @@ func (p *peer) isActiveChannel(chanID lnwire.ChannelID) bool { // channel with the peer to mitigate dos attack vectors where a peer costlessly // connects to us and spams us with errors. func (p *peer) storeError(err error) { + var haveChannels bool + p.activeChanMtx.RLock() - channelCount := len(p.activeChannels) + for _, channel := range p.activeChannels { + // Pending channels will be nil in the activeChannels map. + if channel == nil { + continue + } + + haveChannels = true + break + } p.activeChanMtx.RUnlock() // If we do not have any active channels with the peer, we do not store // errors as a dos mitigation. - if channelCount == 0 { + if !haveChannels { peerLog.Tracef("no channels with peer: %v, not storing err", p) return } @@ -1775,6 +1834,11 @@ func (p *peer) ChannelSnapshots() []*channeldb.ChannelSnapshot { snapshots := make([]*channeldb.ChannelSnapshot, 0, len(p.activeChannels)) for _, activeChan := range p.activeChannels { + // If the activeChan is nil, then we skip it as the channel is pending. + if activeChan == nil { + continue + } + // We'll only return a snapshot for channels that are // *immedately* available for routing payments over. if activeChan.RemoteNextRevocation() == nil { @@ -1827,9 +1891,12 @@ out: chanPoint := &newChan.FundingOutpoint chanID := lnwire.NewChanIDFromOutPoint(chanPoint) - // Make sure this channel is not already active. + // Only update RemoteNextRevocation if the channel is in the + // activeChannels map and if we added the link to the switch. + // Only active channels will be added to the switch. p.activeChanMtx.Lock() - if currentChan, ok := p.activeChannels[chanID]; ok { + currentChan, ok := p.activeChannels[chanID] + if ok && currentChan != nil { peerLog.Infof("Already have ChannelPoint(%v), "+ "ignoring.", chanPoint) @@ -1875,6 +1942,8 @@ out: continue } + // This refreshes the activeChannels entry if the link was not in + // the switch, also populates for new entries. p.activeChannels[chanID] = lnChan p.addedChannels[chanID] = struct{}{} p.activeChanMtx.Unlock() @@ -1923,10 +1992,17 @@ out: TimeLockDelta: defaultPolicy.TimeLockDelta, } + // If we've reached this point, there are two possible scenarios. + // If the channel was in the active channels map as nil, then it + // was loaded from disk and we need to send reestablish. Else, + // it was not loaded from disk and we don't need to send + // reestablish as this is a fresh channel. + shouldReestablish := ok + // Create the link and add it to the switch. err = p.addLink( chanPoint, lnChan, forwardingPolicy, - chainEvents, currentHeight, false, + chainEvents, currentHeight, shouldReestablish, ) if err != nil { err := fmt.Errorf("can't register new channel "+ @@ -2045,6 +2121,11 @@ out: // our active channel back to their default state. p.activeChanMtx.Lock() for _, channel := range p.activeChannels { + // If the channel is nil, continue as it's a pending channel. + if channel == nil { + continue + } + channel.ResetState() } p.activeChanMtx.Unlock() @@ -2064,6 +2145,11 @@ func (p *peer) reenableActiveChannels() { var activePublicChans []wire.OutPoint p.activeChanMtx.RLock() for chanID, lnChan := range p.activeChannels { + // If the lnChan is nil, continue as this is a pending channel. + if lnChan == nil { + continue + } + dbChan := lnChan.State() isPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0 if !isPublic || dbChan.IsPending { @@ -2107,7 +2193,10 @@ func (p *peer) fetchActiveChanCloser(chanID lnwire.ChannelID) (*channelCloser, e p.activeChanMtx.RLock() channel, ok := p.activeChannels[chanID] p.activeChanMtx.RUnlock() - if !ok { + + // If the channel isn't in the map or the channel is nil, return + // ErrChannelNotFound as the channel is pending. + if !ok || channel == nil { return nil, ErrChannelNotFound } @@ -2216,7 +2305,10 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) { p.activeChanMtx.RLock() channel, ok := p.activeChannels[chanID] p.activeChanMtx.RUnlock() - if !ok { + + // Though this function can't be called for pending channels, we still + // check whether channel is nil for safety. + if !ok || channel == nil { err := fmt.Errorf("unable to close channel, ChannelID(%v) is "+ "unknown", chanID) peerLog.Errorf(err.Error()) diff --git a/rpcserver.go b/rpcserver.go index 5cb8ade2b..f2ab704a8 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3527,6 +3527,11 @@ func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription, }, } + // Completely ignore ActiveLinkEvent as this is explicitly not + // exposed to the RPC. + case channelnotifier.ActiveLinkEvent: + continue + default: return fmt.Errorf("unexpected channel event update: %v", event) } @@ -5959,6 +5964,8 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription continue case channelnotifier.InactiveChannelEvent: continue + case channelnotifier.ActiveLinkEvent: + continue } // Now that we know the channel state has changed,