diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 2c8bdeb41..b8a5c6ecf 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -2,12 +2,14 @@ package discovery import ( "bytes" + "encoding/binary" "fmt" "runtime" "sync" "sync/atomic" "time" + "github.com/boltdb/bolt" "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/chainntnfs" @@ -20,6 +22,15 @@ import ( "github.com/roasbeef/btcd/wire" ) +var ( + // messageStoreKey is a key used to create a top level bucket in + // the gossiper database, used for storing messages that are to + // be sent to peers. Currently this is used for reliably sending + // AnnounceSignatures messages, by peristing them until a send + // operation has succeeded. + messageStoreKey = []byte("message-store") +) + // networkMsg couples a routing related wire message with the peer that // originally sent it. type networkMsg struct { @@ -78,6 +89,11 @@ type Config struct { // messages to a particular peer identified by the target public key. SendToPeer func(target *btcec.PublicKey, msg ...lnwire.Message) error + // NotifyWhenOnline is a function that allows the gossiper to be + // notified when a certain peer comes online, allowing it to + // retry sending a peer message. + NotifyWhenOnline func(peer *btcec.PublicKey, connectedChan chan<- struct{}) + // ProofMatureDelta the number of confirmations which is needed before // exchange the channel announcement proofs. ProofMatureDelta uint32 @@ -327,6 +343,14 @@ func (d *AuthenticatedGossiper) Start() error { } d.bestHeight = height + // In case we had an AnnounceSignatures ready to be sent when the + // gossiper was last shut down, we must continue on our quest to + // deliver this message to our peer such that they can craft the + // full channel proof. + if err := d.resendAnnounceSignatures(); err != nil { + return err + } + d.wg.Add(1) go d.networkHandler() @@ -526,6 +550,136 @@ func (d *deDupedAnnouncements) Emit() []lnwire.Message { return announcements } +// resendAnnounceSignatures will inspect the messageStore database +// bucket for AnnounceSignatures messages that we recently tried +// to send to a peer. If the associated channels still not have the +// full channel proofs assembled, we will try to resend them. If +// we have the full proof, we can safely delete the message from +// the messageStore. +func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { + type msgTuple struct { + peer *btcec.PublicKey + msg *lnwire.AnnounceSignatures + dbKey []byte + } + + // Fetch all the AnnounceSignatures messages that was added + // to the database. + // TODO(halseth): database access should be abstracted + // behind interface. + var msgsResend []msgTuple + if err := d.cfg.DB.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(messageStoreKey) + if bucket == nil { + return nil + } + + // Iterate over each message added to the database. + if err := bucket.ForEach(func(k, v []byte) error { + // The database value represents the encoded + // AnnounceSignatures message. + r := bytes.NewReader(v) + msg := &lnwire.AnnounceSignatures{} + if err := msg.Decode(r, 0); err != nil { + return err + } + + // The first 33 bytes of the database key is + // the peer's public key. + peer, err := btcec.ParsePubKey(k[:33], btcec.S256()) + if err != nil { + return err + } + t := msgTuple{peer, msg, k} + + // Add the message to the slice, such that we + // can resend it after the database transaction + // is over. + msgsResend = append(msgsResend, t) + return nil + }); err != nil { + return err + } + return nil + }); err != nil { + return err + } + + // deleteMsg removes the message associated with the passed + // msgTuple from the messageStore. + deleteMsg := func(t msgTuple) error { + log.Debugf("Deleting message for chanID=%v from "+ + "messageStore", t.msg.ChannelID) + if err := d.cfg.DB.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(messageStoreKey) + if bucket == nil { + return fmt.Errorf("bucket " + + "unexpectedly did not exist") + } + + return bucket.Delete(t.dbKey[:]) + }); err != nil { + return fmt.Errorf("Failed deleting message "+ + "from database: %v", err) + } + return nil + } + + // We now iterate over these messages, resending those that we + // don't have the full proof for, deleting the rest. + for _, t := range msgsResend { + // Check if the full channel proof exists in our graph. + chanInfo, _, _, err := d.cfg.Router.GetChannelByID( + t.msg.ShortChannelID) + if err != nil { + // If the channel cannot be found, it is most likely + // a leftover message for a channel that was closed. + // In this case we delete it from the message store. + log.Warnf("unable to fetch channel info for "+ + "chanID=%v from graph: %v. Will delete local"+ + "proof from database", + t.msg.ChannelID, err) + if err := deleteMsg(t); err != nil { + return err + } + continue + } + + // 1. If the full proof does not exist in the graph, + // it means that we haven't received the remote proof + // yet (or that we crashed before able to assemble the + // full proof). Since the remote node might think they + // have delivered their proof to us, we will resend + // _our_ proof to trigger a resend on their part: + // they will then be able to assemble and send us the + // full proof. + if chanInfo.AuthProof == nil { + err := d.sendAnnSigReliably(t.msg, t.peer) + if err != nil { + return err + } + continue + } + + // 2. If the proof does exist in the graph, we have + // successfully received the remote proof and assembled + // the full proof. In this case we can safely delete the + // local proof from the database. In case the remote + // hasn't been able to assemble the full proof yet + // (maybe because of a crash), we will send them the full + // proof if we notice that they retry sending their half + // proof. + if chanInfo.AuthProof != nil { + log.Debugf("Deleting message for chanID=%v from "+ + "messageStore", t.msg.ChannelID) + if err := deleteMsg(t); err != nil { + return err + } + } + } + return nil +} + // networkHandler is the primary goroutine that drives this service. The roles // of this goroutine includes answering queries related to the state of the // network, syncing up newly connected peers, and also periodically @@ -1010,8 +1164,78 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { + // The edge will get rejected if we already + // added the same edge without AuthProof to the + // graph. If the received announcement contains + // a proof, we can add this proof to our edge. + // We can end up in this situatation in the case + // where we create a channel, but for some + // reason fail to receive the remote peer's + // proof, while the remote peer is able to fully + // assemble the proof and craft the + // ChannelAnnouncement. + // TODO(halseth): the following chunk of code + // should be moved into own method, indentation + // and readability is not exactly on point. + chanInfo, e1, e2, err2 := d.cfg.Router.GetChannelByID( + msg.ShortChannelID) + if err2 != nil { + log.Errorf("Failed fetching channel "+ + "edge: %v", err2) + nMsg.err <- err2 + return nil + } + + // If the edge already exists in the graph, but + // has no proof attached, we can add that now. + if chanInfo.AuthProof == nil && proof != nil { + chanAnn, e1Ann, e2Ann := + createChanAnnouncement(proof, + chanInfo, e1, e2) + + // Validate the assembled proof. + err := ValidateChannelAnn(chanAnn) + if err != nil { + err := errors.Errorf("assembled"+ + "channel announcement "+ + "proof for shortChanID=%v"+ + " isn't valid: %v", + msg.ShortChannelID, err) + + log.Error(err) + nMsg.err <- err + return nil + } + err = d.cfg.Router.AddProof( + msg.ShortChannelID, proof) + if err != nil { + err := errors.Errorf("unable "+ + "add proof to "+ + "shortChanID=%v: %v", + msg.ShortChannelID, err) + log.Error(err) + nMsg.err <- err + return nil + } + announcements = append(announcements, + chanAnn) + if e1Ann != nil { + announcements = append( + announcements, e1Ann) + } + if e2Ann != nil { + announcements = append( + announcements, e2Ann) + } + + nMsg.err <- nil + return announcements + } + + // If not, this was just an outdated edge. log.Debugf("Router rejected channel edge: %v", err) + } else { log.Errorf("Router rejected channel edge: %v", err) @@ -1283,7 +1507,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l // Ensure that we know of a channel with the target channel ID // before proceeding further. - chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) + chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID( + msg.ShortChannelID) if err != nil { // TODO(andrew.shvv) this is dangerous because remote // node might rewrite the waiting proof. @@ -1320,6 +1545,72 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l return nil } + // If proof was sent by a local sub-system, then we'll + // send the announcement signature to the remote node + // so they can also reconstruct the full channel + // announcement. + if !nMsg.isRemote { + var remotePeer *btcec.PublicKey + if isFirstNode { + remotePeer = chanInfo.NodeKey2 + } else { + remotePeer = chanInfo.NodeKey1 + } + // Since the remote peer might not be online + // we'll call a method that will attempt to + // deliver the proof when it comes online. + if err := d.sendAnnSigReliably(msg, remotePeer); err != nil { + err := errors.Errorf("unable to send reliably "+ + "to remote for short_chan_id=%v: %v", + shortChanID, err) + log.Error(err) + nMsg.err <- err + return nil + } + } + + // Check if we already have the full proof for this channel. + if chanInfo.AuthProof != nil { + // If we already have the fully assembled proof, then + // the peer sending us their proof has probably not + // received our local proof yet. So be kind and send + // them the full proof. + if nMsg.isRemote { + peerID := nMsg.peer.SerializeCompressed() + log.Debugf("Got AnnounceSignatures for " + + "channel with full proof.") + + d.wg.Add(1) + go func() { + defer d.wg.Done() + log.Debugf("Received half proof for "+ + "channel %v with existing "+ + "full proof. Sending full "+ + "proof to peer=%x", + msg.ChannelID, + peerID) + + chanAnn, _, _ := createChanAnnouncement( + chanInfo.AuthProof, chanInfo, e1, e2) + err := d.cfg.SendToPeer(nMsg.peer, chanAnn) + if err != nil { + log.Errorf("Failed sending "+ + "full proof to "+ + "peer=%x: %v", + peerID, err) + return + } + log.Debugf("Full proof sent to peer=%x"+ + " for chanID=%v", peerID, msg.ChannelID) + }() + } + + log.Debugf("Already have proof for channel "+ + "with chanID=%v", msg.ChannelID) + nMsg.err <- nil + return nil + } + // Check that we received the opposite proof. If so, then we're // now able to construct the full proof, and create the channel // announcement. If we didn't receive the opposite half of the @@ -1346,33 +1637,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l return nil } - // If proof was sent by a local sub-system, then we'll - // send the announcement signature to the remote node - // so they can also reconstruct the full channel - // announcement. - if !nMsg.isRemote { - // Check that first node of the channel info - // corresponds to us. - var remotePeer *btcec.PublicKey - if isFirstNode { - remotePeer = chanInfo.NodeKey2 - } else { - remotePeer = chanInfo.NodeKey1 - } - - err := d.cfg.SendToPeer(remotePeer, msg) - if err != nil { - log.Errorf("unable to send "+ - "announcement message to peer: %x", - remotePeer.SerializeCompressed()) - } - - log.Infof("Sent channel announcement proof "+ - "for short_chan_id=%v to remote peer: "+ - "%x", shortChanID, - remotePeer.SerializeCompressed()) - } - log.Infof("1/2 of channel ann proof received for "+ "short_chan_id=%v, waiting for other half", shortChanID) @@ -1381,9 +1645,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l return nil } - // If we now have both halves of the channel announcement - // proof, then we'll reconstruct the initial announcement so we - // can validate it shortly below. + // We now have both halves of the channel announcement proof, + // then we'll reconstruct the initial announcement so we can + // validate it shortly below. var dbProof channeldb.ChannelAuthProof if isFirstNode { dbProof.NodeSig1 = msg.NodeSignature @@ -1450,26 +1714,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l announcements = append(announcements, e2Ann) } - // If this a local announcement, then we'll send it to the - // remote side so they can reconstruct the full channel - // announcement proof. - if !nMsg.isRemote { - var remotePeer *btcec.PublicKey - if isFirstNode { - remotePeer = chanInfo.NodeKey2 - } else { - remotePeer = chanInfo.NodeKey1 - } - - log.Debugf("Sending local AnnounceSignatures message "+ - "to peer(%x)", remotePeer.SerializeCompressed()) - if err = d.cfg.SendToPeer(remotePeer, msg); err != nil { - log.Errorf("unable to send announcement "+ - "message to peer: %x", - remotePeer.SerializeCompressed()) - } - } - nMsg.err <- nil return announcements @@ -1479,6 +1723,90 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l } } +// sendAnnSigReliably will try to send the provided local AnnounceSignatures +// to the remote peer, waiting for it to come online if necessary. This +// method returns after adding the message to persistent storage, such +// that the caller knows that the message will be delivered at one point. +func (d *AuthenticatedGossiper) sendAnnSigReliably( + msg *lnwire.AnnounceSignatures, remotePeer *btcec.PublicKey) error { + // We first add this message to the database, such that in case + // we do not succeed in sending it to the peer, we'll fetch it + // from the DB next time we start, and retry. We use the peer ID + // + shortChannelID as key, as there possibly is more than one + // channel oepning in progress to the same peer. + var key [41]byte + copy(key[:33], remotePeer.SerializeCompressed()) + binary.BigEndian.PutUint64(key[33:], msg.ShortChannelID.ToUint64()) + + err := d.cfg.DB.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(messageStoreKey) + if err != nil { + return err + } + + // Encode the AnnounceSignatures message. + var b bytes.Buffer + if err := msg.Encode(&b, 0); err != nil { + return err + } + + // Add the encoded message to the database using the peer + // + shortChanID as key. + return bucket.Put(key[:], b.Bytes()) + + }) + if err != nil { + return err + } + + // We have succeeded adding the message to the database. We now launch + // a goroutine that will keep on trying sending the message to the + // remote peer until it succeeds, or the gossiper shuts down. In case + // of success, the message will be removed from the database. + d.wg.Add(1) + go func() { + defer d.wg.Done() + for { + log.Debugf("Sending AnnounceSignatures for channel "+ + "%v to remote peer %x", msg.ChannelID, + remotePeer.SerializeCompressed()) + err := d.cfg.SendToPeer(remotePeer, msg) + if err == nil { + // Sending succeeded, we can + // continue the flow. + break + } + + log.Errorf("unable to send AnnounceSignatures message "+ + "to peer(%x): %v. Will retry when online.", + remotePeer.SerializeCompressed(), err) + + connected := make(chan struct{}) + d.cfg.NotifyWhenOnline(remotePeer, connected) + + select { + case <-connected: + log.Infof("peer %x reconnected. Retry sending" + + " AnnounceSignatures.") + // Retry sending. + case <-d.quit: + log.Infof("Gossiper shutting down, did not send" + + " AnnounceSignatures.") + return + } + } + + log.Infof("Sent channel announcement proof to remote peer: %x", + remotePeer.SerializeCompressed()) + }() + + // This method returns after the message has been added to the database, + // such that the caller don't have to wait until the message is actually + // delivered, but can be assured that it will be delivered eventually + // when this method returns. + return nil +} + // updateChannel creates a new fully signed update for the channel, and updates // the underlying graph with the new state. func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,