Merge pull request from wpaulino/send-channel-update-reliably

discovery/gossiper: reliably send channel update msg to remote peer
This commit is contained in:
Olaoluwa Osuntokun 2019-02-19 21:09:04 -08:00 committed by GitHub
commit 2bf22617d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 2127 additions and 729 deletions

@ -90,6 +90,13 @@ var (
number: 7,
migration: migrateOptionalChannelCloseSummaryFields,
},
{
// The DB version that changes the gossiper's message
// store keys to account for the message's type and
// ShortChannelID.
number: 8,
migration: migrateGossipMessageStoreKeys,
},
}
// Big endian is the preferred byte order, due to cursor scans over

@ -50,7 +50,7 @@ func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB),
if err == nil && shouldFail {
t.Fatal("error wasn't received on migration stage")
} else if err != nil && !shouldFail {
t.Fatal("error was received on migration stage")
t.Fatalf("error was received on migration stage: %v", err)
}
// afterMigration usually used for checking the database state and

@ -7,6 +7,7 @@ import (
"fmt"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/lnwire"
)
// migrateNodeAndEdgeUpdateIndex is a migration function that will update the
@ -610,3 +611,75 @@ func migrateOptionalChannelCloseSummaryFields(tx *bbolt.Tx) error {
return nil
}
var messageStoreBucket = []byte("message-store")
// migrateGossipMessageStoreKeys migrates the key format for gossip messages
// found in the message store to a new one that takes into consideration the of
// the message being stored.
func migrateGossipMessageStoreKeys(tx *bbolt.Tx) error {
// We'll start by retrieving the bucket in which these messages are
// stored within. If there isn't one, there's nothing left for us to do
// so we can avoid the migration.
messageStore := tx.Bucket(messageStoreBucket)
if messageStore == nil {
return nil
}
log.Info("Migrating to the gossip message store new key format")
// Otherwise we'll proceed with the migration. We'll start by coalescing
// all the current messages within the store, which are indexed by the
// public key of the peer which they should be sent to, followed by the
// short channel ID of the channel for which the message belongs to. We
// should only expect to find channel announcement signatures as that
// was the only support message type previously.
msgs := make(map[[33 + 8]byte]*lnwire.AnnounceSignatures)
err := messageStore.ForEach(func(k, v []byte) error {
var msgKey [33 + 8]byte
copy(msgKey[:], k)
msg := &lnwire.AnnounceSignatures{}
if err := msg.Decode(bytes.NewReader(v), 0); err != nil {
return err
}
msgs[msgKey] = msg
return nil
})
if err != nil {
return err
}
// Then, we'll go over all of our messages, remove their previous entry,
// and add another with the new key format. Once we've done this for
// every message, we can consider the migration complete.
for oldMsgKey, msg := range msgs {
if err := messageStore.Delete(oldMsgKey[:]); err != nil {
return err
}
// Construct the new key for which we'll find this message with
// in the store. It'll be the same as the old, but we'll also
// include the message type.
var msgType [2]byte
binary.BigEndian.PutUint16(msgType[:], uint16(msg.MsgType()))
newMsgKey := append(oldMsgKey[:], msgType[:]...)
// Serialize the message with its wire encoding.
var b bytes.Buffer
if _, err := lnwire.WriteMessage(&b, msg, 0); err != nil {
return err
}
if err := messageStore.Put(newMsgKey, b.Bytes()); err != nil {
return err
}
}
log.Info("Migration to the gossip message store new key format complete!")
return nil
}

@ -12,6 +12,7 @@ import (
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/lnwire"
)
// TestPaymentStatusesMigration checks that already completed payments will have
@ -468,3 +469,98 @@ func TestMigrateOptionalChannelCloseSummaryFields(t *testing.T) {
false)
}
}
// TestMigrateGossipMessageStoreKeys ensures that the migration to the new
// gossip message store key format is successful/unsuccessful under various
// scenarios.
func TestMigrateGossipMessageStoreKeys(t *testing.T) {
t.Parallel()
// Construct the message which we'll use to test the migration, along
// with its old and new key formats.
shortChanID := lnwire.ShortChannelID{BlockHeight: 10}
msg := &lnwire.AnnounceSignatures{ShortChannelID: shortChanID}
var oldMsgKey [33 + 8]byte
copy(oldMsgKey[:33], pubKey.SerializeCompressed())
binary.BigEndian.PutUint64(oldMsgKey[33:41], shortChanID.ToUint64())
var newMsgKey [33 + 8 + 2]byte
copy(newMsgKey[:41], oldMsgKey[:])
binary.BigEndian.PutUint16(newMsgKey[41:43], uint16(msg.MsgType()))
// Before the migration, we'll create the bucket where the messages
// should live and insert them.
beforeMigration := func(db *DB) {
var b bytes.Buffer
if err := msg.Encode(&b, 0); err != nil {
t.Fatalf("unable to serialize message: %v", err)
}
err := db.Update(func(tx *bbolt.Tx) error {
messageStore, err := tx.CreateBucketIfNotExists(
messageStoreBucket,
)
if err != nil {
return err
}
return messageStore.Put(oldMsgKey[:], b.Bytes())
})
if err != nil {
t.Fatal(err)
}
}
// After the migration, we'll make sure that:
// 1. We cannot find the message under its old key.
// 2. We can find the message under its new key.
// 3. The message matches the original.
afterMigration := func(db *DB) {
meta, err := db.FetchMeta(nil)
if err != nil {
t.Fatalf("unable to fetch db version: %v", err)
}
if meta.DbVersionNumber != 1 {
t.Fatalf("migration should have succeeded but didn't")
}
var rawMsg []byte
err = db.View(func(tx *bbolt.Tx) error {
messageStore := tx.Bucket(messageStoreBucket)
if messageStore == nil {
return errors.New("message store bucket not " +
"found")
}
rawMsg = messageStore.Get(oldMsgKey[:])
if rawMsg != nil {
t.Fatal("expected to not find message under " +
"old key, but did")
}
rawMsg = messageStore.Get(newMsgKey[:])
if rawMsg == nil {
return fmt.Errorf("expected to find message " +
"under new key, but didn't")
}
return nil
})
if err != nil {
t.Fatal(err)
}
gotMsg, err := lnwire.ReadMessage(bytes.NewReader(rawMsg), 0)
if err != nil {
t.Fatalf("unable to deserialize raw message: %v", err)
}
if !reflect.DeepEqual(msg, gotMsg) {
t.Fatalf("expected message: %v\ngot message: %v",
spew.Sdump(msg), spew.Sdump(gotMsg))
}
}
applyMigration(
t, beforeMigration, afterMigration,
migrateGossipMessageStoreKeys, false,
)
}

@ -2,7 +2,6 @@ package discovery
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"runtime"
@ -13,7 +12,6 @@ import (
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
@ -25,13 +23,6 @@ import (
)
var (
// messageStoreKey is a key used to create a top level bucket in
// the gossiper database, used for storing messages that are to
// be sent to peers. Currently this is used for reliably sending
// AnnounceSignatures messages, by persisting them until a send
// operation has succeeded.
messageStoreKey = []byte("message-store")
// ErrGossiperShuttingDown is an error that is returned if the gossiper
// is in the process of being shut down.
ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
@ -104,22 +95,20 @@ type Config struct {
Broadcast func(skips map[routing.Vertex]struct{},
msg ...lnwire.Message) error
// SendToPeer is a function which allows the service to send a set of
// messages to a particular peer identified by the target public key.
SendToPeer func(target *btcec.PublicKey, msg ...lnwire.Message) error
// FindPeer returns the actively registered peer for a given remote
// public key. An error is returned if the peer was not found or a
// shutdown has been requested.
FindPeer func(identityKey *btcec.PublicKey) (lnpeer.Peer, error)
// NotifyWhenOnline is a function that allows the gossiper to be
// notified when a certain peer comes online, allowing it to
// retry sending a peer message.
//
// NOTE: The peerChan channel must be buffered.
//
// TODO(wilmer): use [33]byte to avoid unnecessary serializations.
NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer)
// NotifyWhenOffline is a function that allows the gossiper to be
// notified when a certain peer disconnects, allowing it to request a
// notification for when it reconnects.
NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
// ProofMatureDelta the number of confirmations which is needed before
// exchange the channel announcement proofs.
ProofMatureDelta uint32
@ -133,9 +122,18 @@ type Config struct {
// should check if we need re-broadcast any of our personal channels.
RetransmitDelay time.Duration
// DB is a global boltdb instance which is needed to pass it in waiting
// proof storage to make waiting proofs persistent.
DB *channeldb.DB
// WaitingProofStore is a persistent storage of partial channel proof
// announcement messages. We use it to buffer half of the material
// needed to reconstruct a full authenticated channel announcement.
// Once we receive the other half the channel proof, we'll be able to
// properly validate it and re-broadcast it out to the network.
//
// TODO(wilmer): make interface to prevent channeldb dependency.
WaitingProofStore *channeldb.WaitingProofStore
// MessageStore is a persistent storage of gossip messages which we will
// use to determine which messages need to be resent for a given peer.
MessageStore GossipMessageStore
// AnnSigner is an instance of the MessageSigner interface which will
// be used to manually sign any outgoing channel updates. The signer
@ -193,13 +191,6 @@ type AuthenticatedGossiper struct {
prematureChannelUpdates map[uint64][]*networkMsg
pChanUpdMtx sync.Mutex
// waitingProofs is a persistent storage of partial channel proof
// announcement messages. We use it to buffer half of the material
// needed to reconstruct a full authenticated channel announcement.
// Once we receive the other half the channel proof, we'll be able to
// properly validate it and re-broadcast it out to the network.
waitingProofs *channeldb.WaitingProofStore
// networkMsgs is a channel that carries new network broadcasted
// message from outside the gossiper service to be processed by the
// networkHandler.
@ -229,18 +220,17 @@ type AuthenticatedGossiper struct {
syncerMtx sync.RWMutex
peerSyncers map[routing.Vertex]*gossipSyncer
// reliableSender is a subsystem responsible for handling reliable
// message send requests to peers.
reliableSender *reliableSender
sync.Mutex
}
// New creates a new AuthenticatedGossiper instance, initialized with the
// passed configuration parameters.
func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) {
storage, err := channeldb.NewWaitingProofStore(cfg.DB)
if err != nil {
return nil, err
}
return &AuthenticatedGossiper{
func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
gossiper := &AuthenticatedGossiper{
selfKey: selfKey,
cfg: &cfg,
networkMsgs: make(chan *networkMsg),
@ -248,11 +238,19 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) {
chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
prematureAnnouncements: make(map[uint32][]*networkMsg),
prematureChannelUpdates: make(map[uint64][]*networkMsg),
waitingProofs: storage,
channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}),
peerSyncers: make(map[routing.Vertex]*gossipSyncer),
}, nil
}
gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
NotifyWhenOnline: cfg.NotifyWhenOnline,
NotifyWhenOffline: cfg.NotifyWhenOffline,
MessageStore: cfg.MessageStore,
IsMsgStale: gossiper.isMsgStale,
})
return gossiper
}
// SynchronizeNode sends a message to the service indicating it should
@ -411,11 +409,10 @@ func (d *AuthenticatedGossiper) Start() error {
}
d.bestHeight = height
// In case we had an AnnounceSignatures ready to be sent when the
// gossiper was last shut down, we must continue on our quest to
// deliver this message to our peer such that they can craft the
// full channel proof.
if err := d.resendAnnounceSignatures(); err != nil {
// Start the reliable sender. In case we had any pending messages ready
// to be sent when the gossiper was last shut down, we must continue on
// our quest to deliver them to their respective peers.
if err := d.reliableSender.Start(); err != nil {
return err
}
@ -443,6 +440,10 @@ func (d *AuthenticatedGossiper) Stop() {
close(d.quit)
d.wg.Wait()
// We'll stop our reliable sender after all of the gossiper's goroutines
// have exited to ensure nothing can cause it to continue executing.
d.reliableSender.Stop()
}
// TODO(roasbeef): need method to get current gossip timestamp?
@ -808,138 +809,6 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders {
return msgs
}
// resendAnnounceSignatures will inspect the messageStore database bucket for
// AnnounceSignatures messages that we recently tried to send to a peer. If the
// associated channels still not have the full channel proofs assembled, we
// will try to resend them. If we have the full proof, we can safely delete the
// message from the messageStore.
func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
type msgTuple struct {
peer *btcec.PublicKey
msg *lnwire.AnnounceSignatures
dbKey []byte
}
// Fetch all the AnnounceSignatures messages that was added to the
// database.
//
// TODO(halseth): database access should be abstracted
// behind interface.
var msgsResend []msgTuple
if err := d.cfg.DB.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(messageStoreKey)
if bucket == nil {
return nil
}
if err := bucket.ForEach(func(k, v []byte) error {
// The database value represents the encoded
// AnnounceSignatures message.
r := bytes.NewReader(v)
msg := &lnwire.AnnounceSignatures{}
if err := msg.Decode(r, 0); err != nil {
return err
}
// The first 33 bytes of the database key is the peer's
// public key.
peer, err := btcec.ParsePubKey(k[:33], btcec.S256())
if err != nil {
return err
}
// Make a copy of the database key corresponding to
// these AnnounceSignatures.
dbKey := make([]byte, len(k))
copy(dbKey, k)
t := msgTuple{peer, msg, dbKey}
// Add the message to the slice, such that we can
// resend it after the database transaction is over.
msgsResend = append(msgsResend, t)
return nil
}); err != nil {
return err
}
return nil
}); err != nil {
return err
}
// deleteMsg removes the message associated with the passed msgTuple
// from the messageStore.
deleteMsg := func(t msgTuple) error {
log.Debugf("Deleting message for chanID=%v from "+
"messageStore", t.msg.ChannelID)
if err := d.cfg.DB.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(messageStoreKey)
if bucket == nil {
return fmt.Errorf("bucket " +
"unexpectedly did not exist")
}
return bucket.Delete(t.dbKey[:])
}); err != nil {
return fmt.Errorf("Failed deleting message "+
"from database: %v", err)
}
return nil
}
// We now iterate over these messages, resending those that we don't
// have the full proof for, deleting the rest.
for _, t := range msgsResend {
// Check if the full channel proof exists in our graph.
chanInfo, _, _, err := d.cfg.Router.GetChannelByID(
t.msg.ShortChannelID)
if err != nil {
// If the channel cannot be found, it is most likely a
// leftover message for a channel that was closed. In
// this case we delete it from the message store.
log.Warnf("unable to fetch channel info for "+
"chanID=%v from graph: %v. Will delete local"+
"proof from database",
t.msg.ChannelID, err)
if err := deleteMsg(t); err != nil {
return err
}
continue
}
// 1. If the full proof does not exist in the graph, it means
// that we haven't received the remote proof yet (or that we
// crashed before able to assemble the full proof). Since the
// remote node might think they have delivered their proof to
// us, we will resend _our_ proof to trigger a resend on their
// part: they will then be able to assemble and send us the
// full proof.
if chanInfo.AuthProof == nil {
err := d.sendAnnSigReliably(t.msg, t.peer)
if err != nil {
return err
}
continue
}
// 2. If the proof does exist in the graph, we have
// successfully received the remote proof and assembled the
// full proof. In this case we can safely delete the local
// proof from the database. In case the remote hasn't been able
// to assemble the full proof yet (maybe because of a crash),
// we will send them the full proof if we notice that they
// retry sending their half proof.
if chanInfo.AuthProof != nil {
log.Debugf("Deleting message for chanID=%v from "+
"messageStore", t.msg.ChannelID)
if err := deleteMsg(t); err != nil {
return err
}
}
}
return nil
}
// findGossipSyncer is a utility method used by the gossiper to locate the
// gossip syncer for an inbound message so we can properly dispatch the
// incoming message. If a gossip syncer isn't found, then one will be created
@ -2053,30 +1922,26 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// so we'll try sending the update directly to the remote peer.
if !nMsg.isRemote && chanInfo.AuthProof == nil {
// Get our peer's public key.
var remotePub *btcec.PublicKey
var remotePubKey [33]byte
switch {
case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0:
remotePub, _ = chanInfo.NodeKey2()
remotePubKey = chanInfo.NodeKey2Bytes
case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1:
remotePub, _ = chanInfo.NodeKey1()
remotePubKey = chanInfo.NodeKey1Bytes
}
sPeer, err := d.cfg.FindPeer(remotePub)
// Now, we'll attempt to send the channel update message
// reliably to the remote peer in the background, so
// that we don't block if the peer happens to be offline
// at the moment.
err := d.reliableSender.sendMessage(msg, remotePubKey)
if err != nil {
log.Errorf("unable to send channel update -- "+
"could not find peer %x: %v",
remotePub.SerializeCompressed(),
err)
} else {
// Send ChannelUpdate directly to remotePeer.
// TODO(halseth): make reliable send?
err = sPeer.SendMessage(false, msg)
if err != nil {
log.Errorf("unable to send channel "+
"update message to peer %x: %v",
remotePub.SerializeCompressed(),
err)
}
err := fmt.Errorf("unable to reliably send %v "+
"for channel=%v to peer=%x: %v",
msg.MsgType(), msg.ShortChannelID,
remotePubKey, err)
nMsg.err <- err
return nil
}
}
@ -2146,7 +2011,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// TODO(andrew.shvv) this is dangerous because remote
// node might rewrite the waiting proof.
proof := channeldb.NewWaitingProof(nMsg.isRemote, msg)
if err := d.waitingProofs.Add(proof); err != nil {
err := d.cfg.WaitingProofStore.Add(proof)
if err != nil {
err := fmt.Errorf("unable to store "+
"the proof for short_chan_id=%v: %v",
shortChanID, err)
@ -2182,21 +2048,21 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// so they can also reconstruct the full channel
// announcement.
if !nMsg.isRemote {
var remotePeer *btcec.PublicKey
var remotePubKey [33]byte
if isFirstNode {
remotePeer, _ = chanInfo.NodeKey2()
remotePubKey = chanInfo.NodeKey2Bytes
} else {
remotePeer, _ = chanInfo.NodeKey1()
remotePubKey = chanInfo.NodeKey1Bytes
}
// Since the remote peer might not be online
// we'll call a method that will attempt to
// deliver the proof when it comes online.
err := d.sendAnnSigReliably(msg, remotePeer)
err := d.reliableSender.sendMessage(msg, remotePubKey)
if err != nil {
err := fmt.Errorf("unable to send reliably "+
"to remote for short_chan_id=%v: %v",
shortChanID, err)
log.Error(err)
err := fmt.Errorf("unable to reliably send %v "+
"for channel=%v to peer=%x: %v",
msg.MsgType(), msg.ShortChannelID,
remotePubKey, err)
nMsg.err <- err
return nil
}
@ -2260,7 +2126,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// proof than we should store it this one, and wait for
// opposite to be received.
proof := channeldb.NewWaitingProof(nMsg.isRemote, msg)
oppositeProof, err := d.waitingProofs.Get(proof.OppositeKey())
oppositeProof, err := d.cfg.WaitingProofStore.Get(
proof.OppositeKey(),
)
if err != nil && err != channeldb.ErrWaitingProofNotFound {
err := fmt.Errorf("unable to get "+
"the opposite proof for short_chan_id=%v: %v",
@ -2271,7 +2139,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
}
if err == channeldb.ErrWaitingProofNotFound {
if err := d.waitingProofs.Add(proof); err != nil {
err := d.cfg.WaitingProofStore.Add(proof)
if err != nil {
err := fmt.Errorf("unable to store "+
"the proof for short_chan_id=%v: %v",
shortChanID, err)
@ -2340,7 +2209,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
return nil
}
err = d.waitingProofs.Remove(proof.OppositeKey())
err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
if err != nil {
err := fmt.Errorf("unable remove opposite proof "+
"for the channel with chanID=%v: %v",
@ -2425,91 +2294,49 @@ func (d *AuthenticatedGossiper) fetchNodeAnn(
return node.NodeAnnouncement(true)
}
// sendAnnSigReliably will try to send the provided local AnnounceSignatures
// to the remote peer, waiting for it to come online if necessary. This
// method returns after adding the message to persistent storage, such
// that the caller knows that the message will be delivered at one point.
func (d *AuthenticatedGossiper) sendAnnSigReliably(
msg *lnwire.AnnounceSignatures, remotePeer *btcec.PublicKey) error {
// isMsgStale determines whether a message retrieved from the backing
// MessageStore is seen as stale by the current graph.
func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
switch msg := msg.(type) {
case *lnwire.AnnounceSignatures:
chanInfo, _, _, err := d.cfg.Router.GetChannelByID(
msg.ShortChannelID,
)
// We first add this message to the database, such that in case
// we do not succeed in sending it to the peer, we'll fetch it
// from the DB next time we start, and retry. We use the peer ID
// + shortChannelID as key, as there possibly is more than one
// channel opening in progress to the same peer.
var key [41]byte
copy(key[:33], remotePeer.SerializeCompressed())
binary.BigEndian.PutUint64(key[33:], msg.ShortChannelID.ToUint64())
err := d.cfg.DB.Update(func(tx *bbolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(messageStoreKey)
// If the channel cannot be found, it is most likely a leftover
// message for a channel that was closed, so we can consider it
// stale.
if err == channeldb.ErrEdgeNotFound {
return true
}
if err != nil {
return err
log.Debugf("Unable to retrieve channel=%v from graph: "+
"%v", err)
return false
}
// Encode the AnnounceSignatures message.
var b bytes.Buffer
if err := msg.Encode(&b, 0); err != nil {
return err
// If the proof exists in the graph, then we have successfully
// received the remote proof and assembled the full proof, so we
// can safely delete the local proof from the database.
return chanInfo.AuthProof != nil
case *lnwire.ChannelUpdate:
// The MessageStore will always store the latest ChannelUpdate
// as it is not aware of its timestamp (by design), so it will
// never be stale. We should still however check if the channel
// is part of our graph. If it's not, we can mark it as stale.
_, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
if err != nil && err != channeldb.ErrEdgeNotFound {
log.Debugf("Unable to retrieve channel=%v from graph: "+
"%v", err)
}
return err == channeldb.ErrEdgeNotFound
// Add the encoded message to the database using the peer
// + shortChanID as key.
return bucket.Put(key[:], b.Bytes())
})
if err != nil {
return err
default:
// We'll make sure to not mark any unsupported messages as stale
// to ensure they are not removed.
return false
}
// We have succeeded adding the message to the database. We now launch
// a goroutine that will keep on trying sending the message to the
// remote peer until it succeeds, or the gossiper shuts down. In case
// of success, the message will be removed from the database.
d.wg.Add(1)
go func() {
defer d.wg.Done()
for {
log.Debugf("Sending AnnounceSignatures for channel "+
"%v to remote peer %x", msg.ChannelID,
remotePeer.SerializeCompressed())
err := d.cfg.SendToPeer(remotePeer, msg)
if err == nil {
// Sending succeeded, we can
// continue the flow.
break
}
log.Errorf("unable to send AnnounceSignatures message "+
"to peer(%x): %v. Will retry when online.",
remotePeer.SerializeCompressed(), err)
peerChan := make(chan lnpeer.Peer, 1)
d.cfg.NotifyWhenOnline(remotePeer, peerChan)
select {
case <-peerChan:
// Retry sending.
log.Infof("Peer %x reconnected. Retry sending"+
" AnnounceSignatures.",
remotePeer.SerializeCompressed())
case <-d.quit:
log.Infof("Gossiper shutting down, did not " +
"send AnnounceSignatures.")
return
}
}
log.Infof("Sent channel announcement proof to remote peer: %x",
remotePeer.SerializeCompressed())
}()
// This method returns after the message has been added to the database,
// such that the caller don't have to wait until the message is actually
// delivered, but can be assured that it will be delivered eventually
// when this method returns.
return nil
}
// updateChannel creates a new fully signed update for the channel, and updates

File diff suppressed because it is too large Load Diff

294
discovery/message_store.go Normal file

@ -0,0 +1,294 @@
package discovery
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
)
var (
// messageStoreBucket is a key used to create a top level bucket in the
// gossiper database, used for storing messages that are to be sent to
// peers. Upon restarts, these messages will be read and resent to their
// respective peers.
//
// maps:
// pubKey (33 bytes) + msgShortChanID (8 bytes) + msgType (2 bytes) -> msg
messageStoreBucket = []byte("message-store")
// ErrUnsupportedMessage is an error returned when we attempt to add a
// message to the store that is not supported.
ErrUnsupportedMessage = errors.New("unsupported message type")
// ErrCorruptedMessageStore indicates that the on-disk bucketing
// structure has altered since the gossip message store instance was
// initialized.
ErrCorruptedMessageStore = errors.New("gossip message store has been " +
"corrupted")
)
// GossipMessageStore is a store responsible for storing gossip messages which
// we should reliably send to our peers.
type GossipMessageStore interface {
// AddMessage adds a message to the store for this peer.
AddMessage(lnwire.Message, [33]byte) error
// DeleteMessage deletes a message from the store for this peer.
DeleteMessage(lnwire.Message, [33]byte) error
// Messages returns the total set of messages that exist within the
// store for all peers.
Messages() (map[[33]byte][]lnwire.Message, error)
// Peers returns the public key of all peers with messages within the
// store.
Peers() (map[[33]byte]struct{}, error)
// MessagesForPeer returns the set of messages that exists within the
// store for the given peer.
MessagesForPeer([33]byte) ([]lnwire.Message, error)
}
// MessageStore is an implementation of the GossipMessageStore interface backed
// by a channeldb instance. By design, this store will only keep the latest
// version of a message (like in the case of multiple ChannelUpdate's) for a
// channel with a peer.
type MessageStore struct {
db *channeldb.DB
}
// A compile-time assertion to ensure messageStore implements the
// GossipMessageStore interface.
var _ GossipMessageStore = (*MessageStore)(nil)
// NewMessageStore creates a new message store backed by a channeldb instance.
func NewMessageStore(db *channeldb.DB) (*MessageStore, error) {
err := db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(messageStoreBucket)
return err
})
if err != nil {
return nil, fmt.Errorf("unable to create required buckets: %v",
err)
}
return &MessageStore{db}, nil
}
// msgShortChanID retrieves the short channel ID of the message.
func msgShortChanID(msg lnwire.Message) (lnwire.ShortChannelID, error) {
var shortChanID lnwire.ShortChannelID
switch msg := msg.(type) {
case *lnwire.AnnounceSignatures:
shortChanID = msg.ShortChannelID
case *lnwire.ChannelUpdate:
shortChanID = msg.ShortChannelID
default:
return shortChanID, ErrUnsupportedMessage
}
return shortChanID, nil
}
// messageStoreKey constructs the database key for the message to be stored.
func messageStoreKey(msg lnwire.Message, peerPubKey [33]byte) ([]byte, error) {
shortChanID, err := msgShortChanID(msg)
if err != nil {
return nil, err
}
var k [33 + 8 + 2]byte
copy(k[:33], peerPubKey[:])
binary.BigEndian.PutUint64(k[33:41], shortChanID.ToUint64())
binary.BigEndian.PutUint16(k[41:43], uint16(msg.MsgType()))
return k[:], nil
}
// AddMessage adds a message to the store for this peer.
func (s *MessageStore) AddMessage(msg lnwire.Message, peerPubKey [33]byte) error {
// Construct the key for which we'll find this message with in the store.
msgKey, err := messageStoreKey(msg, peerPubKey)
if err != nil {
return err
}
// Serialize the message with its wire encoding.
var b bytes.Buffer
if _, err := lnwire.WriteMessage(&b, msg, 0); err != nil {
return err
}
return s.db.Batch(func(tx *bbolt.Tx) error {
messageStore := tx.Bucket(messageStoreBucket)
if messageStore == nil {
return ErrCorruptedMessageStore
}
return messageStore.Put(msgKey, b.Bytes())
})
}
// DeleteMessage deletes a message from the store for this peer.
func (s *MessageStore) DeleteMessage(msg lnwire.Message,
peerPubKey [33]byte) error {
// Construct the key for which we'll find this message with in the
// store.
msgKey, err := messageStoreKey(msg, peerPubKey)
if err != nil {
return err
}
return s.db.Batch(func(tx *bbolt.Tx) error {
messageStore := tx.Bucket(messageStoreBucket)
if messageStore == nil {
return ErrCorruptedMessageStore
}
// In the event that we're attempting to delete a ChannelUpdate
// from the store, we'll make sure that we're actually deleting
// the correct one as it can be overwritten.
if msg, ok := msg.(*lnwire.ChannelUpdate); ok {
// Deleting a value from a bucket that doesn't exist
// acts as a NOP, so we'll return if a message doesn't
// exist under this key.
v := messageStore.Get(msgKey)
if v == nil {
return nil
}
dbMsg, err := lnwire.ReadMessage(bytes.NewReader(v), 0)
if err != nil {
return err
}
// If the timestamps don't match, then the update stored
// should be the latest one, so we'll avoid deleting it.
if msg.Timestamp != dbMsg.(*lnwire.ChannelUpdate).Timestamp {
return nil
}
}
return messageStore.Delete(msgKey)
})
}
// readMessage reads a message from its serialized form and ensures its
// supported by the current version of the message store.
func readMessage(msgBytes []byte) (lnwire.Message, error) {
msg, err := lnwire.ReadMessage(bytes.NewReader(msgBytes), 0)
if err != nil {
return nil, err
}
// Check if the message is supported by the store. We can reuse the
// check for ShortChannelID as its a dependency on messages stored.
if _, err := msgShortChanID(msg); err != nil {
return nil, err
}
return msg, nil
}
// Messages returns the total set of messages that exist within the store for
// all peers.
func (s *MessageStore) Messages() (map[[33]byte][]lnwire.Message, error) {
msgs := make(map[[33]byte][]lnwire.Message)
err := s.db.View(func(tx *bbolt.Tx) error {
messageStore := tx.Bucket(messageStoreBucket)
if messageStore == nil {
return ErrCorruptedMessageStore
}
return messageStore.ForEach(func(k, v []byte) error {
var pubKey [33]byte
copy(pubKey[:], k[:33])
// Deserialize the message from its raw bytes and filter
// out any which are not currently supported by the
// store.
msg, err := readMessage(v)
if err == ErrUnsupportedMessage {
return nil
}
if err != nil {
return err
}
msgs[pubKey] = append(msgs[pubKey], msg)
return nil
})
})
if err != nil {
return nil, err
}
return msgs, nil
}
// MessagesForPeer returns the set of messages that exists within the store for
// the given peer.
func (s *MessageStore) MessagesForPeer(
peerPubKey [33]byte) ([]lnwire.Message, error) {
var msgs []lnwire.Message
err := s.db.View(func(tx *bbolt.Tx) error {
messageStore := tx.Bucket(messageStoreBucket)
if messageStore == nil {
return ErrCorruptedMessageStore
}
c := messageStore.Cursor()
k, v := c.Seek(peerPubKey[:])
for ; bytes.HasPrefix(k, peerPubKey[:]); k, v = c.Next() {
// Deserialize the message from its raw bytes and filter
// out any which are not currently supported by the
// store.
msg, err := readMessage(v)
if err == ErrUnsupportedMessage {
continue
}
if err != nil {
return err
}
msgs = append(msgs, msg)
}
return nil
})
if err != nil {
return nil, err
}
return msgs, nil
}
// Peers returns the public key of all peers with messages within the store.
func (s *MessageStore) Peers() (map[[33]byte]struct{}, error) {
peers := make(map[[33]byte]struct{})
err := s.db.View(func(tx *bbolt.Tx) error {
messageStore := tx.Bucket(messageStoreBucket)
if messageStore == nil {
return ErrCorruptedMessageStore
}
return messageStore.ForEach(func(k, _ []byte) error {
var pubKey [33]byte
copy(pubKey[:], k[:33])
peers[pubKey] = struct{}{}
return nil
})
})
if err != nil {
return nil, err
}
return peers, nil
}

@ -0,0 +1,351 @@
package discovery
import (
"bytes"
"io/ioutil"
"math/rand"
"os"
"reflect"
"testing"
"github.com/btcsuite/btcd/btcec"
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
)
func createTestMessageStore(t *testing.T) (*MessageStore, func()) {
t.Helper()
tempDir, err := ioutil.TempDir("", "channeldb")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
db, err := channeldb.Open(tempDir)
if err != nil {
os.RemoveAll(tempDir)
t.Fatalf("unable to open db: %v", err)
}
cleanUp := func() {
db.Close()
os.RemoveAll(tempDir)
}
store, err := NewMessageStore(db)
if err != nil {
cleanUp()
t.Fatalf("unable to initialize message store: %v", err)
}
return store, cleanUp
}
func randPubKey(t *testing.T) *btcec.PublicKey {
priv, err := btcec.NewPrivateKey(btcec.S256())
if err != nil {
t.Fatalf("unable to create private key: %v", err)
}
return priv.PubKey()
}
func randCompressedPubKey(t *testing.T) [33]byte {
t.Helper()
pubKey := randPubKey(t)
var compressedPubKey [33]byte
copy(compressedPubKey[:], pubKey.SerializeCompressed())
return compressedPubKey
}
func randAnnounceSignatures() *lnwire.AnnounceSignatures {
return &lnwire.AnnounceSignatures{
ShortChannelID: lnwire.NewShortChanIDFromInt(rand.Uint64()),
}
}
func randChannelUpdate() *lnwire.ChannelUpdate {
return &lnwire.ChannelUpdate{
ShortChannelID: lnwire.NewShortChanIDFromInt(rand.Uint64()),
}
}
// TestMessageStoreMessages ensures that messages can be properly queried from
// the store.
func TestMessageStoreMessages(t *testing.T) {
t.Parallel()
// We'll start by creating our test message store.
msgStore, cleanUp := createTestMessageStore(t)
defer cleanUp()
// We'll then create some test messages for two test peers, and none for
// an additional test peer.
channelUpdate1 := randChannelUpdate()
announceSignatures1 := randAnnounceSignatures()
peer1 := randCompressedPubKey(t)
if err := msgStore.AddMessage(channelUpdate1, peer1); err != nil {
t.Fatalf("unable to add message: %v", err)
}
if err := msgStore.AddMessage(announceSignatures1, peer1); err != nil {
t.Fatalf("unable to add message: %v", err)
}
expectedPeerMsgs1 := map[uint64]lnwire.MessageType{
channelUpdate1.ShortChannelID.ToUint64(): channelUpdate1.MsgType(),
announceSignatures1.ShortChannelID.ToUint64(): announceSignatures1.MsgType(),
}
channelUpdate2 := randChannelUpdate()
peer2 := randCompressedPubKey(t)
if err := msgStore.AddMessage(channelUpdate2, peer2); err != nil {
t.Fatalf("unable to add message: %v", err)
}
expectedPeerMsgs2 := map[uint64]lnwire.MessageType{
channelUpdate2.ShortChannelID.ToUint64(): channelUpdate2.MsgType(),
}
peer3 := randCompressedPubKey(t)
expectedPeerMsgs3 := map[uint64]lnwire.MessageType{}
// assertPeerMsgs is a helper closure that we'll use to ensure we
// retrieve the correct set of messages for a given peer.
assertPeerMsgs := func(peerMsgs []lnwire.Message,
expected map[uint64]lnwire.MessageType) {
t.Helper()
if len(peerMsgs) != len(expected) {
t.Fatalf("expected %d pending messages, got %d",
len(expected), len(peerMsgs))
}
for _, msg := range peerMsgs {
var shortChanID uint64
switch msg := msg.(type) {
case *lnwire.AnnounceSignatures:
shortChanID = msg.ShortChannelID.ToUint64()
case *lnwire.ChannelUpdate:
shortChanID = msg.ShortChannelID.ToUint64()
default:
t.Fatalf("found unexpected message type %T", msg)
}
msgType, ok := expected[shortChanID]
if !ok {
t.Fatalf("retrieved message with unexpected ID "+
"%d from store", shortChanID)
}
if msgType != msg.MsgType() {
t.Fatalf("expected message of type %v, got %v",
msg.MsgType(), msgType)
}
}
}
// Then, we'll query the store for the set of messages for each peer and
// ensure it matches what we expect.
peers := [][33]byte{peer1, peer2, peer3}
expectedPeerMsgs := []map[uint64]lnwire.MessageType{
expectedPeerMsgs1, expectedPeerMsgs2, expectedPeerMsgs3,
}
for i, peer := range peers {
peerMsgs, err := msgStore.MessagesForPeer(peer)
if err != nil {
t.Fatalf("unable to retrieve messages: %v", err)
}
assertPeerMsgs(peerMsgs, expectedPeerMsgs[i])
}
// Finally, we'll query the store for all of its messages of every peer.
// Again, each peer should have a set of messages that match what we
// expect.
//
// We'll construct the expected response. Only the first two peers will
// have messages.
totalPeerMsgs := make(map[[33]byte]map[uint64]lnwire.MessageType, 2)
for i := 0; i < 2; i++ {
totalPeerMsgs[peers[i]] = expectedPeerMsgs[i]
}
msgs, err := msgStore.Messages()
if err != nil {
t.Fatalf("unable to retrieve all peers with pending messages: "+
"%v", err)
}
if len(msgs) != len(totalPeerMsgs) {
t.Fatalf("expected %d peers with messages, got %d",
len(totalPeerMsgs), len(msgs))
}
for peer, peerMsgs := range msgs {
expected, ok := totalPeerMsgs[peer]
if !ok {
t.Fatalf("expected to find pending messages for peer %x",
peer)
}
assertPeerMsgs(peerMsgs, expected)
}
peerPubKeys, err := msgStore.Peers()
if err != nil {
t.Fatalf("unable to retrieve all peers with pending messages: "+
"%v", err)
}
if len(peerPubKeys) != len(totalPeerMsgs) {
t.Fatalf("expected %d peers with messages, got %d",
len(totalPeerMsgs), len(peerPubKeys))
}
for peerPubKey := range peerPubKeys {
if _, ok := totalPeerMsgs[peerPubKey]; !ok {
t.Fatalf("expected to find peer %x", peerPubKey)
}
}
}
// TestMessageStoreUnsupportedMessage ensures that we are not able to add a
// message which is unsupported, and if a message is found to be unsupported by
// the current version of the store, that it is properly filtered out from the
// response.
func TestMessageStoreUnsupportedMessage(t *testing.T) {
t.Parallel()
// We'll start by creating our test message store.
msgStore, cleanUp := createTestMessageStore(t)
defer cleanUp()
// Create a message that is known to not be supported by the store.
peer := randCompressedPubKey(t)
unsupportedMsg := &lnwire.Error{}
// Attempting to add it to the store should result in
// ErrUnsupportedMessage.
err := msgStore.AddMessage(unsupportedMsg, peer)
if err != ErrUnsupportedMessage {
t.Fatalf("expected ErrUnsupportedMessage, got %v", err)
}
// We'll now pretend that the message is actually supported in a future
// version of the store, so it's able to be added successfully. To
// replicate this, we'll add the message manually rather than through
// the existing AddMessage method.
msgKey := peer[:]
var rawMsg bytes.Buffer
if _, err := lnwire.WriteMessage(&rawMsg, unsupportedMsg, 0); err != nil {
t.Fatalf("unable to serialize message: %v", err)
}
err = msgStore.db.Update(func(tx *bbolt.Tx) error {
messageStore := tx.Bucket(messageStoreBucket)
return messageStore.Put(msgKey, rawMsg.Bytes())
})
if err != nil {
t.Fatalf("unable to add unsupported message to store: %v", err)
}
// Finally, we'll check that the store can properly filter out messages
// that are currently unknown to it. We'll make sure this is done for
// both Messages and MessagesForPeer.
totalMsgs, err := msgStore.Messages()
if err != nil {
t.Fatalf("unable to retrieve messages: %v", err)
}
if len(totalMsgs) != 0 {
t.Fatalf("expected to filter out unsupported message")
}
peerMsgs, err := msgStore.MessagesForPeer(peer)
if err != nil {
t.Fatalf("unable to retrieve peer messages: %v", err)
}
if len(peerMsgs) != 0 {
t.Fatalf("expected to filter out unsupported message")
}
}
// TestMessageStoreDeleteMessage ensures that we can properly delete messages
// from the store.
func TestMessageStoreDeleteMessage(t *testing.T) {
t.Parallel()
msgStore, cleanUp := createTestMessageStore(t)
defer cleanUp()
// assertMsg is a helper closure we'll use to ensure a message
// does/doesn't exist within the store.
assertMsg := func(msg lnwire.Message, peer [33]byte, exists bool) {
t.Helper()
storeMsgs, err := msgStore.MessagesForPeer(peer)
if err != nil {
t.Fatalf("unable to retrieve messages: %v", err)
}
found := false
for _, storeMsg := range storeMsgs {
if reflect.DeepEqual(msg, storeMsg) {
found = true
}
}
if found != exists {
str := "find"
if !exists {
str = "not find"
}
t.Fatalf("expected to %v message %v", str,
spew.Sdump(msg))
}
}
// An AnnounceSignatures message should exist within the store after
// adding it, and should no longer exists after deleting it.
peer := randCompressedPubKey(t)
annSig := randAnnounceSignatures()
if err := msgStore.AddMessage(annSig, peer); err != nil {
t.Fatalf("unable to add message: %v", err)
}
assertMsg(annSig, peer, true)
if err := msgStore.DeleteMessage(annSig, peer); err != nil {
t.Fatalf("unable to delete message: %v", err)
}
assertMsg(annSig, peer, false)
// The store allows overwriting ChannelUpdates, since there can be
// multiple versions, so we'll test things slightly different.
//
// The ChannelUpdate message should exist within the store after adding
// it.
chanUpdate := randChannelUpdate()
if err := msgStore.AddMessage(chanUpdate, peer); err != nil {
t.Fatalf("unable to add message: %v", err)
}
assertMsg(chanUpdate, peer, true)
// Now, we'll create a new version for the same ChannelUpdate message.
// Adding this one to the store will overwrite the previous one, so only
// the new one should exist.
newChanUpdate := randChannelUpdate()
newChanUpdate.ShortChannelID = chanUpdate.ShortChannelID
newChanUpdate.Timestamp = chanUpdate.Timestamp + 1
if err := msgStore.AddMessage(newChanUpdate, peer); err != nil {
t.Fatalf("unable to add message: %v", err)
}
assertMsg(chanUpdate, peer, false)
assertMsg(newChanUpdate, peer, true)
// Deleting the older message should act as a NOP and should NOT delete
// the newer version as the older no longer exists.
if err := msgStore.DeleteMessage(chanUpdate, peer); err != nil {
t.Fatalf("unable to delete message: %v", err)
}
assertMsg(chanUpdate, peer, false)
assertMsg(newChanUpdate, peer, true)
// The newer version should no longer exist within the store after
// deleting it.
if err := msgStore.DeleteMessage(newChanUpdate, peer); err != nil {
t.Fatalf("unable to delete message: %v", err)
}
assertMsg(newChanUpdate, peer, false)
}

133
discovery/mock_test.go Normal file

@ -0,0 +1,133 @@
package discovery
import (
"net"
"sync"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire"
)
// mockPeer implements the lnpeer.Peer interface and is used to test the
// gossiper's interaction with peers.
type mockPeer struct {
pk *btcec.PublicKey
sentMsgs chan lnwire.Message
quit chan struct{}
}
var _ lnpeer.Peer = (*mockPeer)(nil)
func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error {
if p.sentMsgs == nil && p.quit == nil {
return nil
}
for _, msg := range msgs {
select {
case p.sentMsgs <- msg:
case <-p.quit:
}
}
return nil
}
func (p *mockPeer) AddNewChannel(_ *channeldb.OpenChannel, _ <-chan struct{}) error {
return nil
}
func (p *mockPeer) WipeChannel(_ *wire.OutPoint) error { return nil }
func (p *mockPeer) IdentityKey() *btcec.PublicKey { return p.pk }
func (p *mockPeer) PubKey() [33]byte {
var pubkey [33]byte
copy(pubkey[:], p.pk.SerializeCompressed())
return pubkey
}
func (p *mockPeer) Address() net.Addr { return nil }
func (p *mockPeer) QuitSignal() <-chan struct{} {
return p.quit
}
// mockMessageStore is an in-memory implementation of the MessageStore interface
// used for the gossiper's unit tests.
type mockMessageStore struct {
sync.Mutex
messages map[[33]byte]map[lnwire.Message]struct{}
}
func newMockMessageStore() *mockMessageStore {
return &mockMessageStore{
messages: make(map[[33]byte]map[lnwire.Message]struct{}),
}
}
var _ GossipMessageStore = (*mockMessageStore)(nil)
func (s *mockMessageStore) AddMessage(msg lnwire.Message, pubKey [33]byte) error {
s.Lock()
defer s.Unlock()
if _, ok := s.messages[pubKey]; !ok {
s.messages[pubKey] = make(map[lnwire.Message]struct{})
}
s.messages[pubKey][msg] = struct{}{}
return nil
}
func (s *mockMessageStore) DeleteMessage(msg lnwire.Message, pubKey [33]byte) error {
s.Lock()
defer s.Unlock()
peerMsgs, ok := s.messages[pubKey]
if !ok {
return nil
}
delete(peerMsgs, msg)
return nil
}
func (s *mockMessageStore) Messages() (map[[33]byte][]lnwire.Message, error) {
s.Lock()
defer s.Unlock()
msgs := make(map[[33]byte][]lnwire.Message, len(s.messages))
for peer, peerMsgs := range s.messages {
for msg := range peerMsgs {
msgs[peer] = append(msgs[peer], msg)
}
}
return msgs, nil
}
func (s *mockMessageStore) Peers() (map[[33]byte]struct{}, error) {
s.Lock()
defer s.Unlock()
peers := make(map[[33]byte]struct{}, len(s.messages))
for peer := range s.messages {
peers[peer] = struct{}{}
}
return peers, nil
}
func (s *mockMessageStore) MessagesForPeer(pubKey [33]byte) ([]lnwire.Message, error) {
s.Lock()
defer s.Unlock()
peerMsgs, ok := s.messages[pubKey]
if !ok {
return nil, nil
}
msgs := make([]lnwire.Message, 0, len(peerMsgs))
for msg := range peerMsgs {
msgs = append(msgs, msg)
}
return msgs, nil
}

@ -0,0 +1,316 @@
package discovery
import (
"sync"
"github.com/btcsuite/btcd/btcec"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire"
)
// reliableSenderCfg contains all of necessary items for the reliableSender to
// carry out its duties.
type reliableSenderCfg struct {
// NotifyWhenOnline is a function that allows the gossiper to be
// notified when a certain peer comes online, allowing it to
// retry sending a peer message.
//
// NOTE: The peerChan channel must be buffered.
//
// TODO(wilmer): use [33]byte to avoid unnecessary serializations.
NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer)
// NotifyWhenOffline is a function that allows the gossiper to be
// notified when a certain peer disconnects, allowing it to request a
// notification for when it reconnects.
NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
// MessageStore is a persistent storage of gossip messages which we will
// use to determine which messages need to be resent for a given peer.
MessageStore GossipMessageStore
// IsMsgStale determines whether a message retrieved from the backing
// MessageStore is seen as stale by the current graph.
IsMsgStale func(lnwire.Message) bool
}
// peerManager contains the set of channels required for the peerHandler to
// properly carry out its duties.
type peerManager struct {
// msgs is the channel through which messages will be streamed to the
// handler in order to send the message to the peer while they're
// online.
msgs chan lnwire.Message
// done is a channel that will be closed to signal that the handler for
// the given peer has been torn down for whatever reason.
done chan struct{}
}
// reliableSender is a small subsystem of the gossiper used to reliably send
// gossip messages to peers.
type reliableSender struct {
start sync.Once
stop sync.Once
cfg reliableSenderCfg
// activePeers keeps track of whether a peerHandler exists for a given
// peer. A peerHandler is tasked with handling requests for messages
// that should be reliably sent to peers while also taking into account
// the peer's connection lifecycle.
activePeers map[[33]byte]peerManager
activePeersMtx sync.Mutex
wg sync.WaitGroup
quit chan struct{}
}
// newReliableSender returns a new reliableSender backed by the given config.
func newReliableSender(cfg *reliableSenderCfg) *reliableSender {
return &reliableSender{
cfg: *cfg,
activePeers: make(map[[33]byte]peerManager),
quit: make(chan struct{}),
}
}
// Start spawns message handlers for any peers with pending messages.
func (s *reliableSender) Start() error {
var err error
s.start.Do(func() {
err = s.resendPendingMsgs()
})
return err
}
// Stop halts the reliable sender from sending messages to peers.
func (s *reliableSender) Stop() {
s.stop.Do(func() {
close(s.quit)
s.wg.Wait()
})
}
// sendMessage constructs a request to send a message reliably to a peer. In the
// event that the peer is currently offline, this will only write the message to
// disk. Once the peer reconnects, this message, along with any others pending,
// will be sent to the peer.
func (s *reliableSender) sendMessage(msg lnwire.Message, peerPubKey [33]byte) error {
// We'll start by persisting the message to disk. This allows us to
// resend the message upon restarts and peer reconnections.
if err := s.cfg.MessageStore.AddMessage(msg, peerPubKey); err != nil {
return err
}
// Then, we'll spawn a peerHandler for this peer to handle resending its
// pending messages while taking into account its connection lifecycle.
spawnHandler:
msgHandler, ok := s.spawnPeerHandler(peerPubKey)
// If the handler wasn't previously active, we can exit now as we know
// that the message will be sent once the peer online notification is
// received. This prevents us from potentially sending the message
// twice.
if !ok {
return nil
}
// Otherwise, we'll attempt to stream the message to the handler.
// There's a subtle race condition where the handler can be torn down
// due to all of the messages sent being stale, so we'll handle this
// gracefully by spawning another one to prevent blocking.
select {
case msgHandler.msgs <- msg:
case <-msgHandler.done:
goto spawnHandler
case <-s.quit:
return ErrGossiperShuttingDown
}
return nil
}
// spawnPeerMsgHandler spawns a peerHandler for the given peer if there isn't
// one already active. The boolean returned signals whether there was already
// one active or not.
func (s *reliableSender) spawnPeerHandler(peerPubKey [33]byte) (peerManager, bool) {
s.activePeersMtx.Lock()
defer s.activePeersMtx.Unlock()
msgHandler, ok := s.activePeers[peerPubKey]
if !ok {
msgHandler = peerManager{
msgs: make(chan lnwire.Message),
done: make(chan struct{}),
}
s.activePeers[peerPubKey] = msgHandler
s.wg.Add(1)
go s.peerHandler(msgHandler, peerPubKey)
}
return msgHandler, ok
}
// peerHandler is responsible for handling our reliable message send requests
// for a given peer while also taking into account the peer's connection
// lifecycle. Any messages that are attempted to be sent while the peer is
// offline will be queued and sent once the peer reconnects.
//
// NOTE: This must be run as a goroutine.
func (s *reliableSender) peerHandler(peerMgr peerManager, peerPubKey [33]byte) {
defer s.wg.Done()
// We'll start by requesting a notification for when the peer
// reconnects.
pubKey, _ := btcec.ParsePubKey(peerPubKey[:], btcec.S256())
peerChan := make(chan lnpeer.Peer, 1)
waitUntilOnline:
log.Debugf("Requesting online notification for peer=%x", peerPubKey)
s.cfg.NotifyWhenOnline(pubKey, peerChan)
var peer lnpeer.Peer
out:
for {
select {
// While we're waiting, we'll also consume any messages that
// must be sent to prevent blocking the caller. These can be
// ignored for now since the peer is currently offline. Once
// they reconnect, the messages will be sent since they should
// have been persisted to disk.
case <-peerMgr.msgs:
case peer = <-peerChan:
break out
case <-s.quit:
return
}
}
log.Debugf("Peer=%x is now online, proceeding to send pending messages",
peerPubKey)
// Once we detect the peer has reconnected, we'll also request a
// notification for when they disconnect. We'll use this to make sure
// they haven't disconnected (in the case of a flappy peer, etc.) by the
// time we attempt to send them the pending messages.
log.Debugf("Requesting offline notification for peer=%x", peerPubKey)
offlineChan := s.cfg.NotifyWhenOffline(peerPubKey)
pendingMsgs, err := s.cfg.MessageStore.MessagesForPeer(peerPubKey)
if err != nil {
log.Errorf("Unable to retrieve pending messages for peer %x: %v",
peerPubKey, err)
return
}
// With the peer online, we can now proceed to send our pending messages
// for them.
for _, msg := range pendingMsgs {
// Retrieve the short channel ID for which this message applies
// for logging purposes. The error can be ignored as the store
// can only contain messages which have a ShortChannelID field.
shortChanID, _ := msgShortChanID(msg)
if err := peer.SendMessage(false, msg); err != nil {
log.Errorf("Unable to send %v message for channel=%v "+
"to %x: %v", msg.MsgType(), shortChanID,
peerPubKey, err)
goto waitUntilOnline
}
log.Debugf("Successfully sent %v message for channel=%v with "+
"peer=%x upon reconnection", msg.MsgType(), shortChanID,
peerPubKey)
// Now that the message has at least been sent once, we can
// check whether it's stale. This guarantees that
// AnnounceSignatures are sent at least once if we happen to
// already have signatures for both parties.
if s.cfg.IsMsgStale(msg) {
err := s.cfg.MessageStore.DeleteMessage(msg, peerPubKey)
if err != nil {
log.Errorf("Unable to remove stale %v message "+
"for channel=%v with peer %x: %v",
msg.MsgType(), shortChanID, peerPubKey,
err)
continue
}
log.Debugf("Removed stale %v message for channel=%v "+
"with peer=%x", msg.MsgType(), shortChanID,
peerPubKey)
}
}
// If all of our messages were stale, then there's no need for this
// handler to continue running, so we can exit now.
pendingMsgs, err = s.cfg.MessageStore.MessagesForPeer(peerPubKey)
if err != nil {
log.Errorf("Unable to retrieve pending messages for peer %x: %v",
peerPubKey, err)
return
}
if len(pendingMsgs) == 0 {
log.Debugf("No pending messages left for peer=%x", peerPubKey)
s.activePeersMtx.Lock()
delete(s.activePeers, peerPubKey)
s.activePeersMtx.Unlock()
close(peerMgr.done)
return
}
// Once the pending messages are sent, we can continue to send any
// future messages while the peer remains connected.
for {
select {
case msg := <-peerMgr.msgs:
// Retrieve the short channel ID for which this message
// applies for logging purposes. The error can be
// ignored as the store can only contain messages which
// have a ShortChannelID field.
shortChanID, _ := msgShortChanID(msg)
if err := peer.SendMessage(false, msg); err != nil {
log.Errorf("Unable to send %v message for "+
"channel=%v to %x: %v", msg.MsgType(),
shortChanID, peerPubKey, err)
}
log.Debugf("Successfully sent %v message for "+
"channel=%v with peer=%x", msg.MsgType(),
shortChanID, peerPubKey)
case <-offlineChan:
goto waitUntilOnline
case <-s.quit:
return
}
}
}
// resendPendingMsgs retrieves and sends all of the messages within the message
// store that should be reliably sent to their respective peers.
func (s *reliableSender) resendPendingMsgs() error {
// Fetch all of the peers for which we have pending messages for and
// spawn a peerMsgHandler for each. Once the peer is seen as online, all
// of the pending messages will be sent.
peers, err := s.cfg.MessageStore.Peers()
if err != nil {
return err
}
for peer := range peers {
s.spawnPeerHandler(peer)
}
return nil
}

@ -0,0 +1,312 @@
package discovery
import (
"fmt"
"testing"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire"
)
// newTestReliableSender creates a new reliable sender instance used for
// testing.
func newTestReliableSender(t *testing.T) *reliableSender {
t.Helper()
cfg := &reliableSenderCfg{
NotifyWhenOnline: func(pubKey *btcec.PublicKey,
peerChan chan<- lnpeer.Peer) {
peerChan <- &mockPeer{pk: pubKey}
},
NotifyWhenOffline: func(_ [33]byte) <-chan struct{} {
c := make(chan struct{}, 1)
return c
},
MessageStore: newMockMessageStore(),
IsMsgStale: func(lnwire.Message) bool {
return false
},
}
return newReliableSender(cfg)
}
// assertMsgsSent ensures that the given messages can be read from a mock peer's
// msgChan.
func assertMsgsSent(t *testing.T, msgChan chan lnwire.Message,
msgs ...lnwire.Message) {
t.Helper()
m := make(map[lnwire.Message]struct{}, len(msgs))
for _, msg := range msgs {
m[msg] = struct{}{}
}
for i := 0; i < len(msgs); i++ {
select {
case msg := <-msgChan:
if _, ok := m[msg]; !ok {
t.Fatalf("found unexpected message sent: %v",
spew.Sdump(msg))
}
case <-time.After(time.Second):
t.Fatal("reliable sender did not send message to peer")
}
}
}
// waitPredicate is a helper test function that will wait for a timeout period
// of time until the passed predicate returns true.
func waitPredicate(t *testing.T, timeout time.Duration, pred func() bool) {
t.Helper()
const pollInterval = 20 * time.Millisecond
exitTimer := time.After(timeout)
for {
<-time.After(pollInterval)
select {
case <-exitTimer:
t.Fatalf("predicate not satisfied after timeout")
default:
}
if pred() {
return
}
}
}
// TestReliableSenderFlow ensures that the flow for sending messages reliably to
// a peer while taking into account its connection lifecycle works as expected.
func TestReliableSenderFlow(t *testing.T) {
t.Parallel()
reliableSender := newTestReliableSender(t)
// Create a mock peer to send the messages to.
pubKey := randPubKey(t)
msgsSent := make(chan lnwire.Message)
peer := &mockPeer{pubKey, msgsSent, reliableSender.quit}
// Override NotifyWhenOnline and NotifyWhenOffline to provide the
// notification channels so that we can control when notifications get
// dispatched.
notifyOnline := make(chan chan<- lnpeer.Peer, 2)
notifyOffline := make(chan chan struct{}, 1)
reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey,
peerChan chan<- lnpeer.Peer) {
notifyOnline <- peerChan
}
reliableSender.cfg.NotifyWhenOffline = func(_ [33]byte) <-chan struct{} {
c := make(chan struct{}, 1)
notifyOffline <- c
return c
}
// We'll start by creating our first message which we should reliably
// send to our peer.
msg1 := randChannelUpdate()
var peerPubKey [33]byte
copy(peerPubKey[:], pubKey.SerializeCompressed())
if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil {
t.Fatalf("unable to reliably send message: %v", err)
}
// Since there isn't a peerHandler for this peer currently active due to
// this being the first message being sent reliably, we should expect to
// see a notification request for when the peer is online.
var peerChan chan<- lnpeer.Peer
select {
case peerChan = <-notifyOnline:
case <-time.After(time.Second):
t.Fatal("reliable sender did not request online notification")
}
// We'll then attempt to send another additional message reliably.
msg2 := randAnnounceSignatures()
if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil {
t.Fatalf("unable to reliably send message: %v", err)
}
// This should not however request another peer online notification as
// the peerHandler has already been started and is waiting for the
// notification to be dispatched.
select {
case <-notifyOnline:
t.Fatal("reliable sender should not request online notification")
case <-time.After(time.Second):
}
// We'll go ahead and notify the peer.
peerChan <- peer
// By doing so, we should expect to see a notification request for when
// the peer is offline.
var offlineChan chan struct{}
select {
case offlineChan = <-notifyOffline:
case <-time.After(time.Second):
t.Fatal("reliable sender did not request offline notification")
}
// We should also see the messages arrive at the peer since they are now
// seen as online.
assertMsgsSent(t, peer.sentMsgs, msg1, msg2)
// Then, we'll send one more message reliably.
msg3 := randChannelUpdate()
if err := reliableSender.sendMessage(msg3, peerPubKey); err != nil {
t.Fatalf("unable to reliably send message: %v", err)
}
// Again, this should not request another peer online notification
// request since we are currently waiting for the peer to be offline.
select {
case <-notifyOnline:
t.Fatal("reliable sender should not request online notification")
case <-time.After(time.Second):
}
// The expected message should be sent to the peer.
assertMsgsSent(t, peer.sentMsgs, msg3)
// We'll then notify that the peer is offline.
close(offlineChan)
// This should cause an online notification to be requested.
select {
case peerChan = <-notifyOnline:
case <-time.After(time.Second):
t.Fatal("reliable sender did not request online notification")
}
// Once we dispatch it, we should expect to see the messages be resent
// to the peer as they are not stale.
peerChan <- peer
select {
case <-notifyOffline:
case <-time.After(5 * time.Second):
t.Fatal("reliable sender did not request offline notification")
}
assertMsgsSent(t, peer.sentMsgs, msg1, msg2, msg3)
}
// TestReliableSenderStaleMessages ensures that the reliable sender is no longer
// active for a peer which has successfully sent all of its messages and deemed
// them as stale.
func TestReliableSenderStaleMessages(t *testing.T) {
t.Parallel()
reliableSender := newTestReliableSender(t)
// Create a mock peer to send the messages to.
pubKey := randPubKey(t)
msgsSent := make(chan lnwire.Message)
peer := &mockPeer{pubKey, msgsSent, reliableSender.quit}
// Override NotifyWhenOnline to provide the notification channel so that
// we can control when notifications get dispatched.
notifyOnline := make(chan chan<- lnpeer.Peer, 1)
reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey,
peerChan chan<- lnpeer.Peer) {
notifyOnline <- peerChan
}
// We'll also override IsMsgStale to mark all messages as stale as we're
// interested in testing the stale message behavior.
reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool {
return true
}
// We'll start by creating our first message which we should reliably
// send to our peer, but will be seen as stale.
msg1 := randAnnounceSignatures()
var peerPubKey [33]byte
copy(peerPubKey[:], pubKey.SerializeCompressed())
if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil {
t.Fatalf("unable to reliably send message: %v", err)
}
// Since there isn't a peerHandler for this peer currently active due to
// this being the first message being sent reliably, we should expect to
// see a notification request for when the peer is online.
var peerChan chan<- lnpeer.Peer
select {
case peerChan = <-notifyOnline:
case <-time.After(time.Second):
t.Fatal("reliable sender did not request online notification")
}
// We'll go ahead and notify the peer.
peerChan <- peer
// This should cause the message to be sent to the peer since they are
// now seen as online. The message will be sent at least once to ensure
// they can propagate before deciding whether they are stale or not.
assertMsgsSent(t, peer.sentMsgs, msg1)
// We'll create another message which we'll send reliably. This one
// won't be seen as stale.
msg2 := randChannelUpdate()
// We'll then wait for the message to be removed from the backing
// message store since it is seen as stale and has been sent at least
// once. Once the message is removed, the peerHandler should be torn
// down as there are no longer any pending messages within the store.
var predErr error
waitPredicate(t, time.Second, func() bool {
msgs, err := reliableSender.cfg.MessageStore.MessagesForPeer(
peerPubKey,
)
if err != nil {
predErr = fmt.Errorf("unable to retrieve messages for "+
"peer: %v", err)
return false
}
if len(msgs) != 0 {
predErr = fmt.Errorf("expected to not find any "+
"messages for peer, found %d", len(msgs))
return false
}
predErr = nil
return true
})
if predErr != nil {
t.Fatal(predErr)
}
// Override IsMsgStale to no longer mark messages as stale.
reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool {
return false
}
// We'll request the message to be sent reliably.
if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil {
t.Fatalf("unable to reliably send message: %v", err)
}
// We should see an online notification request indicating that a new
// peerHandler has been spawned since it was previously torn down.
select {
case peerChan = <-notifyOnline:
case <-time.After(time.Second):
t.Fatal("reliable sender did not request online notification")
}
// Finally, notifying the peer is online should prompt the message to be
// sent. Only the ChannelUpdate will be sent in this case since the
// AnnounceSignatures message above was seen as stale.
peerChan <- peer
assertMsgsSent(t, peer.sentMsgs, msg2)
}

@ -599,28 +599,32 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
s.chanDB.ChannelGraph(),
)
s.authGossiper, err = discovery.New(discovery.Config{
Router: s.chanRouter,
Notifier: s.cc.chainNotifier,
ChainHash: *activeNetParams.GenesisHash,
Broadcast: s.BroadcastMessage,
ChanSeries: chanSeries,
SendToPeer: s.SendToPeer,
FindPeer: func(pub *btcec.PublicKey) (lnpeer.Peer, error) {
return s.FindPeer(pub)
},
NotifyWhenOnline: s.NotifyWhenOnline,
ProofMatureDelta: 0,
TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay),
RetransmitDelay: time.Minute * 30,
DB: chanDB,
AnnSigner: s.nodeSigner,
},
s.identityPriv.PubKey(),
)
gossipMessageStore, err := discovery.NewMessageStore(s.chanDB)
if err != nil {
return nil, err
}
waitingProofStore, err := channeldb.NewWaitingProofStore(s.chanDB)
if err != nil {
return nil, err
}
s.authGossiper = discovery.New(discovery.Config{
Router: s.chanRouter,
Notifier: s.cc.chainNotifier,
ChainHash: *activeNetParams.GenesisHash,
Broadcast: s.BroadcastMessage,
ChanSeries: chanSeries,
NotifyWhenOnline: s.NotifyWhenOnline,
NotifyWhenOffline: s.NotifyWhenOffline,
ProofMatureDelta: 0,
TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay),
RetransmitDelay: time.Minute * 30,
WaitingProofStore: waitingProofStore,
MessageStore: gossipMessageStore,
AnnSigner: s.nodeSigner,
},
s.identityPriv.PubKey(),
)
utxnStore, err := newNurseryStore(activeNetParams.GenesisHash, chanDB)
if err != nil {