From d3403306b642b13db4dcc3a4c8593789055866f5 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 1 May 2018 16:30:30 -0700 Subject: [PATCH] htlcswitch/switch: segment pending links from live links --- htlcswitch/switch.go | 217 ++++++++++++++++++++++++------------------- 1 file changed, 123 insertions(+), 94 deletions(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 515b22541..94c424a2f 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -175,17 +175,20 @@ type Switch struct { // forward the settle/fail htlc updates back to the add htlc initiator. circuits CircuitMap - // mailMtx is a read/write mutex that protects the mailboxes map. - mailMtx sync.RWMutex - - // mailboxes is a map of channel id to mailboxes, which allows the - // switch to buffer messages for peers that have not come back online. - mailboxes map[lnwire.ShortChannelID]MailBox + // mailOrchestrator manages the lifecycle of mailboxes used throughout + // the switch, and facilitates delayed delivery of packets to links that + // later come online. + mailOrchestrator *mailOrchestrator // indexMtx is a read/write mutex that protects the set of indexes // below. indexMtx sync.RWMutex + // pendingLinkIndex holds links that have not had their final, live + // short_chan_id assigned. These links can be transitioned into the + // primary linkIndex by using UpdateShortChanID to load their live id. + pendingLinkIndex map[lnwire.ChannelID]ChannelLink + // links is a map of channel id and channel link which manages // this channel. linkIndex map[lnwire.ChannelID]ChannelLink @@ -248,9 +251,10 @@ func New(cfg Config) (*Switch, error) { circuits: circuitMap, paymentSequencer: sequencer, linkIndex: make(map[lnwire.ChannelID]ChannelLink), - mailboxes: make(map[lnwire.ShortChannelID]MailBox), + mailOrchestrator: newMailOrchestrator(), forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink), interfaceIndex: make(map[[33]byte]map[ChannelLink]struct{}), + pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink), pendingPayments: make(map[uint64]*pendingPayment), htlcPlex: make(chan *plexPacket), chanCloseRequests: make(chan *ChanClose), @@ -1089,8 +1093,7 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { // Check to see that the source link is online before removing // the circuit. - sourceMailbox := s.getOrCreateMailBox(packet.incomingChanID) - return sourceMailbox.AddPacket(packet) + return s.mailOrchestrator.Deliver(packet.incomingChanID, packet) default: return errors.New("wrong update type") @@ -1116,16 +1119,18 @@ func (s *Switch) failAddPacket(packet *htlcPacket, log.Error(failErr) - // Route a fail packet back to the source link. - sourceMailbox := s.getOrCreateMailBox(packet.incomingChanID) - if err = sourceMailbox.AddPacket(&htlcPacket{ + failPkt := &htlcPacket{ incomingChanID: packet.incomingChanID, incomingHTLCID: packet.incomingHTLCID, circuit: packet.circuit, htlc: &lnwire.UpdateFailHTLC{ Reason: reason, }, - }); err != nil { + } + + // Route a fail packet back to the source link. + err = s.mailOrchestrator.Deliver(failPkt.incomingChanID, failPkt) + if err != nil { err = errors.Errorf("source chanid=%v unable to "+ "handle switch packet: %v", packet.incomingChanID, err) @@ -1343,6 +1348,12 @@ func (s *Switch) htlcForwarder() { "channel link on stop: %v", err) } } + for _, link := range s.pendingLinkIndex { + if err := s.removeLink(link.ChanID()); err != nil { + log.Errorf("unable to remove pending "+ + "channel link on stop: %v", err) + } + } s.indexMtx.Unlock() // Before we exit fully, we'll attempt to flush out any @@ -1699,11 +1710,7 @@ func (s *Switch) Stop() error { // Wait until all active goroutines have finished exiting before // stopping the mailboxes, otherwise the mailbox map could still be // accessed and modified. - for _, mailBox := range s.mailboxes { - mailBox.Stop() - } - for _, mailBox := range s.pendingMailboxes { - mailBox.Stop() + s.mailOrchestrator.Stop() return nil } @@ -1714,64 +1721,67 @@ func (s *Switch) AddLink(link ChannelLink) error { s.indexMtx.Lock() defer s.indexMtx.Unlock() - // First we'll add the link to the linkIndex which lets us quickly look - // up a channel when we need to close or register it, and the - // forwarding index which'll be used when forwarding HTLC's in the - // multi-hop setting. + chanID := link.ChanID() + + // Get and attach the mailbox for this link, which buffers packets in + // case there packets that we tried to deliver while this link was + // offline. + mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID) + link.AttachMailBox(mailbox) + + if err := link.Start(); err != nil { + s.removeLink(chanID) + return err + } + + shortChanID := link.ShortChanID() + if shortChanID == sourceHop { + log.Infof("Adding pending link chan_id=%v, short_chan_id=%v", + chanID, shortChanID) + + s.pendingLinkIndex[chanID] = link + } else { + log.Infof("Adding live link chan_id=%v, short_chan_id=%v", + chanID, shortChanID) + + s.addLiveLink(link) + s.mailOrchestrator.BindLiveShortChanID( + mailbox, chanID, shortChanID, + ) + } + + return nil +} + +// addLiveLink adds a link to all associated forwarding index, this makes it a +// candidate for forwarding HTLCs. +func (s *Switch) addLiveLink(link ChannelLink) { + // We'll add the link to the linkIndex which lets us quickly + // look up a channel when we need to close or register it, and + // the forwarding index which'll be used when forwarding HTLC's + // in the multi-hop setting. s.linkIndex[link.ChanID()] = link s.forwardingIndex[link.ShortChanID()] = link - // Next we'll add the link to the interface index so we can quickly - // look up all the channels for a particular node. + // Next we'll add the link to the interface index so we can + // quickly look up all the channels for a particular node. peerPub := link.Peer().PubKey() if _, ok := s.interfaceIndex[peerPub]; !ok { s.interfaceIndex[peerPub] = make(map[ChannelLink]struct{}) } s.interfaceIndex[peerPub][link] = struct{}{} - - // Get the mailbox for this link, which buffers packets in case there - // packets that we tried to deliver while this link was offline. - mailbox := s.getOrCreateMailBox(link.ShortChanID()) - - // Give the link its mailbox, we only need to start the mailbox if it - // wasn't previously found. - link.AttachMailBox(mailbox) - - if err := link.Start(); err != nil { - s.removeLink(link.ChanID()) - return err - } - - log.Infof("Added channel link with chan_id=%v, short_chan_id=(%v)", - link.ChanID(), spew.Sdump(link.ShortChanID())) - - return nil } -// getOrCreateMailBox returns the known mailbox for a particular short channel -// id, or creates one if the link has no existing mailbox. -func (s *Switch) getOrCreateMailBox(chanID lnwire.ShortChannelID) MailBox { - // Check to see if we have a mailbox already populated for this link. - s.mailMtx.RLock() - mailbox, ok := s.mailboxes[chanID] - if ok { - s.mailMtx.RUnlock() - return mailbox - } - s.mailMtx.RUnlock() +// removeLiveLink removes a link from all associated forwarding indexes, this +// prevents it from being a candidate in forwarding. +func (s *Switch) removeLiveLink(link ChannelLink) { + // Remove the channel from live link indexes. + delete(s.linkIndex, link.ChanID()) + delete(s.forwardingIndex, link.ShortChanID()) - // Otherwise, we will make a new one only if the mailbox still is not - // present after the exclusive mutex is acquired. - s.mailMtx.Lock() - mailbox, ok = s.mailboxes[chanID] - if !ok { - mailbox = newMemoryMailBox() - mailbox.Start() - s.mailboxes[chanID] = mailbox - } - s.mailMtx.Unlock() - - return mailbox + // Remove the channel from channel index. + peerPub := link.Peer().PubKey() + delete(s.interfaceIndex, peerPub) } // GetLink is used to initiate the handling of the get link command. The @@ -1782,7 +1792,10 @@ func (s *Switch) GetLink(chanID lnwire.ChannelID) (ChannelLink, error) { link, ok := s.linkIndex[chanID] if !ok { - return nil, ErrChannelLinkNotFound + link, ok = s.pendingLinkIndex[chanID] + if !ok { + return nil, ErrChannelLinkNotFound + } } return link, nil @@ -1817,52 +1830,68 @@ func (s *Switch) removeLink(chanID lnwire.ChannelID) error { log.Infof("Removing channel link with ChannelID(%v)", chanID) link, ok := s.linkIndex[chanID] - if !ok { - return ErrChannelLinkNotFound + if ok { + s.removeLiveLink(link) + link.Stop() + + return nil } - // Remove the channel from channel map. - delete(s.linkIndex, chanID) - delete(s.forwardingIndex, link.ShortChanID()) + link, ok = s.pendingLinkIndex[chanID] + if ok { + delete(s.pendingLinkIndex, chanID) + link.Stop() - // Remove the channel from channel index. - peerPub := link.Peer().PubKey() - delete(s.interfaceIndex, peerPub) + return nil + } - link.Stop() - - return nil + return ErrChannelLinkNotFound } // UpdateShortChanID updates the short chan ID for an existing channel. This is // required in the case of a re-org and re-confirmation or a channel, or in the // case that a link was added to the switch before its short chan ID was known. -func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID, - shortChanID lnwire.ShortChannelID) error { - +func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID) error { s.indexMtx.Lock() + defer s.indexMtx.Unlock() - // First, we'll extract the current link as is from the link - // index. If the link isn't even in the index, then we'll return an - // error. - link, ok := s.linkIndex[chanID] + // Locate the target link in the pending link index. If no such link + // exists, then we will ignore the request. + link, ok := s.pendingLinkIndex[chanID] if !ok { - s.indexMtx.Unlock() - return fmt.Errorf("link %v not found", chanID) } - log.Infof("Updating short_chan_id for ChannelLink(%v): old=%v, new=%v", - chanID, link.ShortChanID(), shortChanID) + oldShortChanID := link.ShortChanID() - // At this point the link is actually active, so we'll update the - // forwarding index with the next short channel ID. - s.forwardingIndex[shortChanID] = link + // Try to update the link's short channel ID, returning early if this + // update failed. + shortChanID, err := link.UpdateShortChanID() + if err != nil { + return err + } - s.indexMtx.Unlock() + // Reject any blank short channel ids. + if shortChanID == sourceHop { + return fmt.Errorf("refusing trivial short_chan_id for chan_id=%v"+ + "live link", chanID) + } - // Finally, we'll notify the link of its new short channel ID. - link.UpdateShortChanID(shortChanID) + log.Infof("Updated short_chan_id for ChannelLink(%v): old=%v, new=%v", + chanID, oldShortChanID, shortChanID) + + // Since the link was in the pending state before, we will remove it + // from the pending link index and add it to the live link index so that + // it can be available in forwarding. + delete(s.pendingLinkIndex, chanID) + s.addLiveLink(link) + + // Finally, alert the mail orchestrator to the change of short channel + // ID, and deliver any unclaimed packets to the link. + mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID) + s.mailOrchestrator.BindLiveShortChanID( + mailbox, chanID, shortChanID, + ) return nil }