Merge pull request #2916 from cfromknecht/split-syncer-query-reply

discovery: make gossip replies synchronous
This commit is contained in:
Olaoluwa Osuntokun
2019-04-29 17:40:13 -07:00
committed by GitHub
5 changed files with 350 additions and 564 deletions

View File

@@ -386,6 +386,9 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
sendToPeer: func(msgs ...lnwire.Message) error { sendToPeer: func(msgs ...lnwire.Message) error {
return peer.SendMessageLazy(false, msgs...) return peer.SendMessageLazy(false, msgs...)
}, },
sendToPeerSync: func(msgs ...lnwire.Message) error {
return peer.SendMessageLazy(true, msgs...)
},
}) })
// Gossip syncers are initialized by default in a PassiveSync type // Gossip syncers are initialized by default in a PassiveSync type

View File

@@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"golang.org/x/time/rate" "golang.org/x/time/rate"
) )
@@ -206,11 +207,16 @@ type gossipSyncerCfg struct {
// the remote node in a single QueryShortChanIDs request. // the remote node in a single QueryShortChanIDs request.
batchSize int32 batchSize int32
// sendToPeer is a function closure that should send the set of // sendToPeer sends a variadic number of messages to the remote peer.
// targeted messages to the peer we've been assigned to sync the graph // This method should not block while waiting for sends to be written
// state from. // to the wire.
sendToPeer func(...lnwire.Message) error sendToPeer func(...lnwire.Message) error
// sendToPeerSync sends a variadic number of messages to the remote
// peer, blocking until all messages have been sent successfully or a
// write error is encountered.
sendToPeerSync func(...lnwire.Message) error
// maxUndelayedQueryReplies specifies how many gossip queries we will // maxUndelayedQueryReplies specifies how many gossip queries we will
// respond to immediately before starting to delay responses. // respond to immediately before starting to delay responses.
maxUndelayedQueryReplies int maxUndelayedQueryReplies int
@@ -219,6 +225,16 @@ type gossipSyncerCfg struct {
// responding to gossip queries after replying to // responding to gossip queries after replying to
// maxUndelayedQueryReplies queries. // maxUndelayedQueryReplies queries.
delayedQueryReplyInterval time.Duration delayedQueryReplyInterval time.Duration
// noSyncChannels will prevent the GossipSyncer from spawning a
// channelGraphSyncer, meaning we will not try to reconcile unknown
// channels with the remote peer.
noSyncChannels bool
// noReplyQueries will prevent the GossipSyncer from spawning a
// replyHandler, meaning we will not reply to queries from our remote
// peer.
noReplyQueries bool
} }
// GossipSyncer is a struct that handles synchronizing the channel graph state // GossipSyncer is a struct that handles synchronizing the channel graph state
@@ -271,10 +287,15 @@ type GossipSyncer struct {
// PassiveSync to ActiveSync. // PassiveSync to ActiveSync.
genHistoricalChanRangeQuery bool genHistoricalChanRangeQuery bool
// gossipMsgs is a channel that all messages from the target peer will // gossipMsgs is a channel that all responses to our queries from the
// be sent over. // target peer will be sent over, these will be read by the
// channelGraphSyncer.
gossipMsgs chan lnwire.Message gossipMsgs chan lnwire.Message
// queryMsgs is a channel that all queries from the remote peer will be
// received over, these will be read by the replyHandler.
queryMsgs chan lnwire.Message
// bufferedChanRangeReplies is used in the waitingQueryChanReply to // bufferedChanRangeReplies is used in the waitingQueryChanReply to
// buffer all the chunked response to our query. // buffer all the chunked response to our query.
bufferedChanRangeReplies []lnwire.ShortChannelID bufferedChanRangeReplies []lnwire.ShortChannelID
@@ -332,6 +353,7 @@ func newGossipSyncer(cfg gossipSyncerCfg) *GossipSyncer {
syncTransitionReqs: make(chan *syncTransitionReq), syncTransitionReqs: make(chan *syncTransitionReq),
historicalSyncReqs: make(chan *historicalSyncReq), historicalSyncReqs: make(chan *historicalSyncReq),
gossipMsgs: make(chan lnwire.Message, 100), gossipMsgs: make(chan lnwire.Message, 100),
queryMsgs: make(chan lnwire.Message, 100),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
} }
@@ -342,8 +364,17 @@ func (g *GossipSyncer) Start() {
g.started.Do(func() { g.started.Do(func() {
log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:]) log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:])
// TODO(conner): only spawn channelGraphSyncer if remote
// supports gossip queries, and only spawn replyHandler if we
// advertise support
if !g.cfg.noSyncChannels {
g.wg.Add(1) g.wg.Add(1)
go g.channelGraphSyncer() go g.channelGraphSyncer()
}
if !g.cfg.noReplyQueries {
g.wg.Add(1)
go g.replyHandler()
}
}) })
} }
@@ -369,7 +400,7 @@ func (g *GossipSyncer) channelGraphSyncer() {
log.Debugf("GossipSyncer(%x): state=%v, type=%v", log.Debugf("GossipSyncer(%x): state=%v, type=%v",
g.cfg.peerPub[:], state, syncType) g.cfg.peerPub[:], state, syncType)
switch syncerState(state) { switch state {
// When we're in this state, we're trying to synchronize our // When we're in this state, we're trying to synchronize our
// view of the network with the remote peer. We'll kick off // view of the network with the remote peer. We'll kick off
// this sync by asking them for the set of channels they // this sync by asking them for the set of channels they
@@ -382,14 +413,14 @@ func (g *GossipSyncer) channelGraphSyncer() {
g.genHistoricalChanRangeQuery, g.genHistoricalChanRangeQuery,
) )
if err != nil { if err != nil {
log.Errorf("unable to gen chan range "+ log.Errorf("Unable to gen chan range "+
"query: %v", err) "query: %v", err)
return return
} }
err = g.cfg.sendToPeer(queryRangeMsg) err = g.cfg.sendToPeer(queryRangeMsg)
if err != nil { if err != nil {
log.Errorf("unable to send chan range "+ log.Errorf("Unable to send chan range "+
"query: %v", err) "query: %v", err)
return return
} }
@@ -417,22 +448,16 @@ func (g *GossipSyncer) channelGraphSyncer() {
if ok { if ok {
err := g.processChanRangeReply(queryReply) err := g.processChanRangeReply(queryReply)
if err != nil { if err != nil {
log.Errorf("unable to "+ log.Errorf("Unable to "+
"process chan range "+ "process chan range "+
"query: %v", err) "query: %v", err)
return return
} }
continue continue
} }
// Otherwise, it's the remote peer performing a log.Warnf("Unexpected message: %T in state=%v",
// query, which we'll attempt to reply to. msg, state)
err := g.replyPeerQueries(msg)
if err != nil && err != ErrGossipSyncerExiting {
log.Errorf("unable to reply to peer "+
"query: %v", err)
}
case <-g.quit: case <-g.quit:
return return
@@ -447,7 +472,7 @@ func (g *GossipSyncer) channelGraphSyncer() {
// query chunk. // query chunk.
done, err := g.synchronizeChanIDs() done, err := g.synchronizeChanIDs()
if err != nil { if err != nil {
log.Errorf("unable to sync chan IDs: %v", err) log.Errorf("Unable to sync chan IDs: %v", err)
} }
// If this wasn't our last query, then we'll need to // If this wasn't our last query, then we'll need to
@@ -480,13 +505,8 @@ func (g *GossipSyncer) channelGraphSyncer() {
continue continue
} }
// Otherwise, it's the remote peer performing a log.Warnf("Unexpected message: %T in state=%v",
// query, which we'll attempt to deploy to. msg, state)
err := g.replyPeerQueries(msg)
if err != nil && err != ErrGossipSyncerExiting {
log.Errorf("unable to reply to peer "+
"query: %v", err)
}
case <-g.quit: case <-g.quit:
return return
@@ -520,13 +540,6 @@ func (g *GossipSyncer) channelGraphSyncer() {
// messages or process any state transitions and exit if // messages or process any state transitions and exit if
// needed. // needed.
select { select {
case msg := <-g.gossipMsgs:
err := g.replyPeerQueries(msg)
if err != nil && err != ErrGossipSyncerExiting {
log.Errorf("unable to reply to peer "+
"query: %v", err)
}
case req := <-g.syncTransitionReqs: case req := <-g.syncTransitionReqs:
req.errChan <- g.handleSyncTransition(req) req.errChan <- g.handleSyncTransition(req)
@@ -540,6 +553,38 @@ func (g *GossipSyncer) channelGraphSyncer() {
} }
} }
// replyHandler is an event loop whose sole purpose is to reply to the remote
// peers queries. Our replyHandler will respond to messages generated by their
// channelGraphSyncer, and vice versa. Each party's channelGraphSyncer drives
// the other's replyHandler, allowing the replyHandler to operate independently
// from the state machine maintained on the same node.
//
// NOTE: This method MUST be run as a goroutine.
func (g *GossipSyncer) replyHandler() {
defer g.wg.Done()
for {
select {
case msg := <-g.queryMsgs:
err := g.replyPeerQueries(msg)
switch {
case err == ErrGossipSyncerExiting:
return
case err == lnpeer.ErrPeerExiting:
return
case err != nil:
log.Errorf("Unable to reply to peer "+
"query: %v", err)
}
case <-g.quit:
return
}
}
}
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the // sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
// syncer and sends it to the remote peer. // syncer and sends it to the remote peer.
func (g *GossipSyncer) sendGossipTimestampRange(firstTimestamp time.Time, func (g *GossipSyncer) sendGossipTimestampRange(firstTimestamp time.Time,
@@ -818,7 +863,7 @@ func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro
if isFinalChunk { if isFinalChunk {
replyChunk.Complete = 1 replyChunk.Complete = 1
} }
if err := g.cfg.sendToPeer(&replyChunk); err != nil { if err := g.cfg.sendToPeerSync(&replyChunk); err != nil {
return err return err
} }
@@ -846,7 +891,7 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error
"chain=%v, we're on chain=%v", g.cfg.chainHash, "chain=%v, we're on chain=%v", g.cfg.chainHash,
query.ChainHash) query.ChainHash)
return g.cfg.sendToPeer(&lnwire.ReplyShortChanIDsEnd{ return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{
ChainHash: query.ChainHash, ChainHash: query.ChainHash,
Complete: 0, Complete: 0,
}) })
@@ -873,23 +918,22 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error
query.ShortChanIDs[0].ToUint64(), err) query.ShortChanIDs[0].ToUint64(), err)
} }
// If we didn't find any messages related to those channel ID's, then // Reply with any messages related to those channel ID's, we'll write
// we'll send over a reply marking the end of our response, and exit // each one individually and synchronously to throttle the sends and
// early. // perform buffering of responses in the syncer as opposed to the peer.
if len(replyMsgs) == 0 { for _, msg := range replyMsgs {
return g.cfg.sendToPeer(&lnwire.ReplyShortChanIDsEnd{ err := g.cfg.sendToPeerSync(msg)
ChainHash: query.ChainHash, if err != nil {
Complete: 1, return err
}) }
} }
// Otherwise, we'll send over our set of messages responding to the // Regardless of whether we had any messages to reply with, send over
// query, with the ending message appended to it. // the sentinel message to signal that the stream has terminated.
replyMsgs = append(replyMsgs, &lnwire.ReplyShortChanIDsEnd{ return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{
ChainHash: query.ChainHash, ChainHash: query.ChainHash,
Complete: 1, Complete: 1,
}) })
return g.cfg.sendToPeer(replyMsgs...)
} }
// ApplyGossipFilter applies a gossiper filter sent by the remote node to the // ApplyGossipFilter applies a gossiper filter sent by the remote node to the
@@ -930,9 +974,19 @@ func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er
go func() { go func() {
defer g.wg.Done() defer g.wg.Done()
if err := g.cfg.sendToPeer(newUpdatestoSend...); err != nil { for _, msg := range newUpdatestoSend {
log.Errorf("unable to send messages for peer catch "+ err := g.cfg.sendToPeerSync(msg)
"up: %v", err) switch {
case err == ErrGossipSyncerExiting:
return
case err == lnpeer.ErrPeerExiting:
return
case err != nil:
log.Errorf("Unable to send message for "+
"peer catch up: %v", err)
}
} }
}() }()
@@ -1065,8 +1119,16 @@ func (g *GossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
// ProcessQueryMsg is used by outside callers to pass new channel time series // ProcessQueryMsg is used by outside callers to pass new channel time series
// queries to the internal processing goroutine. // queries to the internal processing goroutine.
func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) { func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) {
var msgChan chan lnwire.Message
switch msg.(type) {
case *lnwire.QueryChannelRange, *lnwire.QueryShortChanIDs:
msgChan = g.queryMsgs
default:
msgChan = g.gossipMsgs
}
select { select {
case g.gossipMsgs <- msg: case msgChan <- msg:
case <-peerQuit: case <-peerQuit:
case <-g.quit: case <-g.quit:
} }

View File

@@ -116,9 +116,28 @@ func (m *mockChannelGraphTimeSeries) FetchChanUpdates(chain chainhash.Hash,
var _ ChannelGraphTimeSeries = (*mockChannelGraphTimeSeries)(nil) var _ ChannelGraphTimeSeries = (*mockChannelGraphTimeSeries)(nil)
// newTestSyncer creates a new test instance of a GossipSyncer. A buffered
// message channel is returned for intercepting messages sent from the syncer,
// in addition to a mock channel series which allows the test to control which
// messages the syncer knows of or wishes to filter out. The variadic flags are
// treated as positional arguments where the first index signals that the syncer
// should spawn a channelGraphSyncer and second index signals that the syncer
// should spawn a replyHandler. Any flags beyond the first two are currently
// ignored. If no flags are provided, both a channelGraphSyncer and replyHandler
// will be spawned by default.
func newTestSyncer(hID lnwire.ShortChannelID, func newTestSyncer(hID lnwire.ShortChannelID,
encodingType lnwire.ShortChanIDEncoding, chunkSize int32, encodingType lnwire.ShortChanIDEncoding, chunkSize int32,
) (chan []lnwire.Message, *GossipSyncer, *mockChannelGraphTimeSeries) { flags ...bool) (chan []lnwire.Message,
*GossipSyncer, *mockChannelGraphTimeSeries) {
syncChannels := true
replyQueries := true
if len(flags) > 0 {
syncChannels = flags[0]
}
if len(flags) > 1 {
replyQueries = flags[1]
}
msgChan := make(chan []lnwire.Message, 20) msgChan := make(chan []lnwire.Message, 20)
cfg := gossipSyncerCfg{ cfg := gossipSyncerCfg{
@@ -126,10 +145,16 @@ func newTestSyncer(hID lnwire.ShortChannelID,
encodingType: encodingType, encodingType: encodingType,
chunkSize: chunkSize, chunkSize: chunkSize,
batchSize: chunkSize, batchSize: chunkSize,
noSyncChannels: !syncChannels,
noReplyQueries: !replyQueries,
sendToPeer: func(msgs ...lnwire.Message) error { sendToPeer: func(msgs ...lnwire.Message) error {
msgChan <- msgs msgChan <- msgs
return nil return nil
}, },
sendToPeerSync: func(msgs ...lnwire.Message) error {
msgChan <- msgs
return nil
},
delayedQueryReplyInterval: 2 * time.Second, delayedQueryReplyInterval: 2 * time.Second,
} }
syncer := newGossipSyncer(cfg) syncer := newGossipSyncer(cfg)
@@ -519,32 +544,39 @@ func TestGossipSyncerReplyShortChanIDs(t *testing.T) {
t.Fatalf("unable to query for chan IDs: %v", err) t.Fatalf("unable to query for chan IDs: %v", err)
} }
for i := 0; i < len(queryReply)+1; i++ {
select { select {
case <-time.After(time.Second * 15): case <-time.After(time.Second * 15):
t.Fatalf("no msgs received") t.Fatalf("no msgs received")
// We should get back exactly 4 messages. The first 3 are the same // We should get back exactly 4 messages. The first 3 are the
// messages we sent above, and the query end message. // same messages we sent above, and the query end message.
case msgs := <-msgChan: case msgs := <-msgChan:
if len(msgs) != 4 { if len(msgs) != 1 {
t.Fatalf("wrong messages: expected %v, got %v", t.Fatalf("wrong number of messages: "+
4, len(msgs)) "expected %v, got %v", 1, len(msgs))
} }
if !reflect.DeepEqual(queryReply, msgs[:3]) { isQueryReply := i < len(queryReply)
t.Fatalf("wrong set of messages: expected %v, got %v", finalMsg, ok := msgs[0].(*lnwire.ReplyShortChanIDsEnd)
spew.Sdump(queryReply), spew.Sdump(msgs[:3]))
}
finalMsg, ok := msgs[3].(*lnwire.ReplyShortChanIDsEnd) switch {
if !ok { case isQueryReply &&
t.Fatalf("expected lnwire.ReplyShortChanIDsEnd "+ !reflect.DeepEqual(queryReply[i], msgs[0]):
"instead got %T", msgs[3])
} t.Fatalf("wrong message: expected %v, got %v",
if finalMsg.Complete != 1 { spew.Sdump(queryReply[i]),
spew.Sdump(msgs[0]))
case !isQueryReply && !ok:
t.Fatalf("expected lnwire.ReplyShortChanIDsEnd"+
" instead got %T", msgs[3])
case !isQueryReply && finalMsg.Complete != 1:
t.Fatalf("complete wasn't set") t.Fatalf("complete wasn't set")
} }
} }
}
} }
// TestGossipSyncerReplyChanRangeQuery tests that if we receive a // TestGossipSyncerReplyChanRangeQuery tests that if we receive a
@@ -1020,13 +1052,13 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
BlockHeight: 1144, BlockHeight: 1144,
} }
msgChan1, syncer1, chanSeries1 := newTestSyncer( msgChan1, syncer1, chanSeries1 := newTestSyncer(
startHeight, defaultEncoding, chunkSize, startHeight, defaultEncoding, chunkSize, true, false,
) )
syncer1.Start() syncer1.Start()
defer syncer1.Stop() defer syncer1.Stop()
msgChan2, syncer2, chanSeries2 := newTestSyncer( msgChan2, syncer2, chanSeries2 := newTestSyncer(
startHeight, defaultEncoding, chunkSize, startHeight, defaultEncoding, chunkSize, false, true,
) )
syncer2.Start() syncer2.Start()
defer syncer2.Stop() defer syncer2.Stop()
@@ -1038,34 +1070,31 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
numUndelayedQueries := syncer1.cfg.maxUndelayedQueryReplies numUndelayedQueries := syncer1.cfg.maxUndelayedQueryReplies
// We will send enough queries to exhaust the undelayed responses, and // We will send enough queries to exhaust the undelayed responses, and
// then send two more queries which should be delayed. // then send two more queries which should be delayed. An additional one
numQueryResponses := numUndelayedQueries + numDelayedQueries // is subtracted from the total since undelayed message will be consumed
// by the initial QueryChannelRange.
numQueryResponses := numUndelayedQueries + numDelayedQueries - 1
// The total number of responses must include the initial reply each // The total number of responses must include the initial reply each
// syner will make to QueryChannelRange. // syncer will make to QueryChannelRange.
numTotalQueries := 1 + numQueryResponses numTotalQueries := 1 + numQueryResponses
// The total number of channels each syncer needs to request must be // The total number of channels each syncer needs to request must be
// scaled by the chunk size being used. // scaled by the chunk size being used.
numTotalChans := numQueryResponses * chunkSize numTotalChans := numQueryResponses * chunkSize
// Although both nodes are at the same height, they'll have a // Construct enough channels so that all of the queries will have enough
// completely disjoint set of chan ID's that they know of. // channels. Since syncer1 won't know of any channels, their sets are
var syncer1Chans []lnwire.ShortChannelID // inherently disjoint.
for i := 0; i < numTotalChans; i++ {
syncer1Chans = append(
syncer1Chans, lnwire.NewShortChanIDFromInt(uint64(i)),
)
}
var syncer2Chans []lnwire.ShortChannelID var syncer2Chans []lnwire.ShortChannelID
for i := numTotalChans; i < numTotalChans+numTotalChans; i++ { for i := 0; i < numTotalChans; i++ {
syncer2Chans = append( syncer2Chans = append(
syncer2Chans, lnwire.NewShortChanIDFromInt(uint64(i)), syncer2Chans, lnwire.NewShortChanIDFromInt(uint64(i)),
) )
} }
// We'll kick off the test by passing over the QueryChannelRange // We'll kick off the test by asserting syncer1 sends over the
// messages from one node to the other. // QueryChannelRange message the other node.
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1") t.Fatalf("didn't get msg from syncer1")
@@ -1083,46 +1112,16 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg") t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg: case syncer2.queryMsgs <- msg:
}
}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a QueryChannelRange message.
_, ok := msg.(*lnwire.QueryChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
} }
} }
} }
// At this point, we'll need to send responses to both nodes from their // At this point, we'll need to a response from syncer2's channel
// respective channel series. Both nodes will simply request the entire // series. This will cause syncer1 to simply request the entire set of
// set of channels from the other. This will count as the first // channels from syncer2. This will count as the first undelayed
// undelayed response for each syncer. // response for sycner2.
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries1.filterRangeReqs:
// We'll send all the channels that it should know of.
chanSeries1.filterRangeResp <- syncer1Chans
}
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("no query recvd") t.Fatalf("no query recvd")
@@ -1132,31 +1131,9 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
chanSeries2.filterRangeResp <- syncer2Chans chanSeries2.filterRangeResp <- syncer2Chans
} }
// At this point, we'll forward the ReplyChannelRange messages to both // At this point, we'll assert that the ReplyChannelRange message is
// parties. After receiving the set of channels known to the remote peer // sent by sycner2.
for i := 0; i < numQueryResponses; i++ { for i := 0; i < numQueryResponses; i++ {
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a ReplyChannelRange message.
_, ok := msg.(*lnwire.ReplyChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg:
}
}
}
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2") t.Fatalf("didn't get msg from syncer2")
@@ -1180,8 +1157,7 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
} }
} }
// We'll now send back a chunked response for both parties of the known // We'll now have syncer1 process the received sids from syncer2.
// short chan ID's.
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("no query recvd") t.Fatalf("no query recvd")
@@ -1189,24 +1165,51 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
case <-chanSeries1.filterReq: case <-chanSeries1.filterReq:
chanSeries1.filterResp <- syncer2Chans chanSeries1.filterResp <- syncer2Chans
} }
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries2.filterReq: // At this point, syncer1 should start to send out initial requests to
chanSeries2.filterResp <- syncer1Chans // query the chan IDs of the remote party. We'll keep track of the
} // number of queries made using the iterated value, which starts at one
// due the initial contribution of the QueryChannelRange msgs.
// At this point, both parties should start to send out initial
// requests to query the chan IDs of the remote party. We'll keep track
// of the number of queries made using the iterated value, which starts
// at one due the initial contribution of the QueryChannelRange msgs.
for i := 1; i < numTotalQueries; i++ { for i := 1; i < numTotalQueries; i++ {
// Both parties should now have sent out the initial requests expDelayResponse := i >= numUndelayedQueries
// to query the chan IDs of the other party. queryBatch(t,
msgChan1, msgChan2,
syncer1, syncer2,
chanSeries2,
expDelayResponse,
delayedQueryInterval,
delayTolerance,
)
}
}
// queryBatch is a helper method that will query for a single batch of channels
// from a peer and assert the responses. The method can also be used to assert
// the same transition happens, but is delayed by the remote peer's DOS
// rate-limiting. The provided chanSeries should belong to syncer2.
//
// The state transition performed is the following:
// syncer1 -- QueryShortChanIDs --> syncer2
// chanSeries.FetchChanAnns()
// syncer1 <-- ReplyShortChanIDsEnd -- syncer2
//
// If expDelayResponse is true, this method will assert that the call the
// FetchChanAnns happens between:
// [delayedQueryInterval-delayTolerance, delayedQueryInterval+delayTolerance].
func queryBatch(t *testing.T,
msgChan1, msgChan2 chan []lnwire.Message,
syncer1, syncer2 *GossipSyncer,
chanSeries *mockChannelGraphTimeSeries,
expDelayResponse bool,
delayedQueryInterval, delayTolerance time.Duration) {
t.Helper()
// First, we'll assert that syncer1 sends a QueryShortChanIDs message to
// the remote peer.
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1") t.Fatalf("didn't get msg from syncer2")
case msgs := <-msgChan1: case msgs := <-msgChan1:
for _, msg := range msgs { for _, msg := range msgs {
@@ -1221,46 +1224,22 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg") t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg: case syncer2.queryMsgs <- msg:
}
}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a QueryShortChanIDs message.
_, ok := msg.(*lnwire.QueryShortChanIDs)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryShortChanIDs for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
} }
} }
} }
// We'll then respond to both parties with an empty set of // We'll then respond to with an empty set of replies (as it doesn't
// replies (as it doesn't affect the test). // affect the test).
switch { switch {
// If this query has surpassed the undelayed query threshold, we // If this query has surpassed the undelayed query threshold, we will
// will impose stricter timing constraints on the response // impose stricter timing constraints on the response times. We'll first
// times. We'll first test that the peers don't immediately // test that syncer2's chanSeries doesn't immediately receive a query,
// receive a query, and then check that both queries haven't // and then check that the query hasn't gone unanswered entirely.
// gone unanswered entirely. case expDelayResponse:
case i >= numUndelayedQueries:
// Create a before and after timeout to test, our test // Create a before and after timeout to test, our test
// will ensure the messages are delivered to the peers // will ensure the messages are delivered to the peer
// in this timeframe. // in this timeframe.
before := time.After( before := time.After(
delayedQueryInterval - delayTolerance, delayedQueryInterval - delayTolerance,
@@ -1269,107 +1248,39 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
delayedQueryInterval + delayTolerance, delayedQueryInterval + delayTolerance,
) )
// First, ensure neither peer tries to respond up until // First, ensure syncer2 doesn't try to respond up until the
// the before time fires. // before time fires.
select { select {
case <-before: case <-before:
// Queries are delayed, proceed. // Query is delayed, proceed.
case <-chanSeries1.annReq: case <-chanSeries.annReq:
t.Fatalf("DOSy query was not delayed")
case <-chanSeries2.annReq:
t.Fatalf("DOSy query was not delayed") t.Fatalf("DOSy query was not delayed")
} }
// Next, we'll need to test that both queries are // If syncer2 doesn't attempt a response within the allowed
// received before the after timer expires. To account // interval, then the messages are probably lost.
// for ordering, we will try to pull a message from both
// peers, and then test that the opposite peer also
// receives the message promptly.
var (
firstChanSeries *mockChannelGraphTimeSeries
laterChanSeries *mockChannelGraphTimeSeries
)
// If neither peer attempts a response within the
// allowed interval, then the messages are probably
// lost. Otherwise, process the message and record the
// induced ordering.
select { select {
case <-after: case <-after:
t.Fatalf("no delayed query received") t.Fatalf("no delayed query received")
case <-chanSeries1.annReq: case <-chanSeries.annReq:
chanSeries1.annResp <- []lnwire.Message{} chanSeries.annResp <- []lnwire.Message{}
firstChanSeries = chanSeries1
laterChanSeries = chanSeries2
case <-chanSeries2.annReq:
chanSeries2.annResp <- []lnwire.Message{}
firstChanSeries = chanSeries2
laterChanSeries = chanSeries1
} }
// Finally, using the same interval timeout as before, // Otherwise, syncer2 should query its chanSeries promtly.
// ensure the later peer also responds promptly. We also
// assert that the first peer doesn't attempt another
// response.
select {
case <-after:
t.Fatalf("no delayed query received")
case <-firstChanSeries.annReq:
t.Fatalf("spurious undelayed response")
case <-laterChanSeries.annReq:
laterChanSeries.annResp <- []lnwire.Message{}
}
// Otherwise, we still haven't exceeded our undelayed query
// limit. Assert that both peers promptly attempt a response to
// the queries.
default: default:
select { select {
case <-time.After(50 * time.Millisecond): case <-time.After(50 * time.Millisecond):
t.Fatalf("no query recvd") t.Fatalf("no query recvd")
case <-chanSeries1.annReq: case <-chanSeries.annReq:
chanSeries1.annResp <- []lnwire.Message{} chanSeries.annResp <- []lnwire.Message{}
}
select {
case <-time.After(50 * time.Millisecond):
t.Fatalf("no query recvd")
case <-chanSeries2.annReq:
chanSeries2.annResp <- []lnwire.Message{}
} }
} }
// Finally, both sides should then receive a // Finally, assert that syncer2 replies to syncer1 with a
// ReplyShortChanIDsEnd as the first chunk has been replied to. // ReplyShortChanIDsEnd.
select {
case <-time.After(50 * time.Millisecond):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a ReplyShortChanIDsEnd message.
_, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg:
}
}
}
select { select {
case <-time.After(50 * time.Millisecond): case <-time.After(50 * time.Millisecond):
t.Fatalf("didn't get msg from syncer2") t.Fatalf("didn't get msg from syncer2")
@@ -1388,8 +1299,6 @@ func TestGossipSyncerDelayDOS(t *testing.T) {
t.Fatalf("node 2 didn't read msg") t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg: case syncer1.gossipMsgs <- msg:
}
} }
} }
} }
@@ -1413,24 +1322,19 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
BlockHeight: 1144, BlockHeight: 1144,
} }
msgChan1, syncer1, chanSeries1 := newTestSyncer( msgChan1, syncer1, chanSeries1 := newTestSyncer(
startHeight, defaultEncoding, chunkSize, startHeight, defaultEncoding, chunkSize, true, false,
) )
syncer1.Start() syncer1.Start()
defer syncer1.Stop() defer syncer1.Stop()
msgChan2, syncer2, chanSeries2 := newTestSyncer( msgChan2, syncer2, chanSeries2 := newTestSyncer(
startHeight, defaultEncoding, chunkSize, startHeight, defaultEncoding, chunkSize, false, true,
) )
syncer2.Start() syncer2.Start()
defer syncer2.Stop() defer syncer2.Stop()
// Although both nodes are at the same height, they'll have a // Although both nodes are at the same height, syncer will have 3 chan
// completely disjoint set of 3 chan ID's that they know of. // ID's that syncer1 doesn't know of.
syncer1Chans := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(1),
lnwire.NewShortChanIDFromInt(2),
lnwire.NewShortChanIDFromInt(3),
}
syncer2Chans := []lnwire.ShortChannelID{ syncer2Chans := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(4), lnwire.NewShortChanIDFromInt(4),
lnwire.NewShortChanIDFromInt(5), lnwire.NewShortChanIDFromInt(5),
@@ -1438,7 +1342,7 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
} }
// We'll kick off the test by passing over the QueryChannelRange // We'll kick off the test by passing over the QueryChannelRange
// messages from one node to the other. // messages from syncer1 to syncer2.
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1") t.Fatalf("didn't get msg from syncer1")
@@ -1456,45 +1360,15 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg") t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg: case syncer2.queryMsgs <- msg:
}
}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a QueryChannelRange message.
_, ok := msg.(*lnwire.QueryChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
} }
} }
} }
// At this point, we'll need to send responses to both nodes from their // At this point, we'll need to send a response from syncer2 to syncer1
// respective channel series. Both nodes will simply request the entire // using syncer2's channels This will cause syncer1 to simply request
// set of channels from the other. // the entire set of channels from the other.
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries1.filterRangeReqs:
// We'll send all the channels that it should know of.
chanSeries1.filterRangeResp <- syncer1Chans
}
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("no query recvd") t.Fatalf("no query recvd")
@@ -1504,32 +1378,9 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
chanSeries2.filterRangeResp <- syncer2Chans chanSeries2.filterRangeResp <- syncer2Chans
} }
// At this point, we'll forward the ReplyChannelRange messages to both // At this point, we'll assert that syncer2 replies with the
// parties. Two replies are expected since the chunk size is 2, and we // ReplyChannelRange messages. Two replies are expected since the chunk
// need to query for 3 channels. // size is 2, and we need to query for 3 channels.
for i := 0; i < chunkSize; i++ {
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a ReplyChannelRange message.
_, ok := msg.(*lnwire.ReplyChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg:
}
}
}
}
for i := 0; i < chunkSize; i++ { for i := 0; i < chunkSize; i++ {
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
@@ -1554,8 +1405,7 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
} }
} }
// We'll now send back a chunked response for both parties of the known // We'll now send back a chunked response from syncer2 back to sycner1.
// short chan ID's.
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("no query recvd") t.Fatalf("no query recvd")
@@ -1563,133 +1413,21 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
case <-chanSeries1.filterReq: case <-chanSeries1.filterReq:
chanSeries1.filterResp <- syncer2Chans chanSeries1.filterResp <- syncer2Chans
} }
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries2.filterReq: // At this point, syncer1 should start to send out initial requests to
chanSeries2.filterResp <- syncer1Chans // query the chan IDs of the remote party. As the chunk size is 2,
} // they'll need 2 rounds in order to fully reconcile the state.
// At this point, both parties should start to send out initial
// requests to query the chan IDs of the remote party. As the chunk
// size is 2, they'll need 2 rounds in order to fully reconcile the
// state.
for i := 0; i < chunkSize; i++ { for i := 0; i < chunkSize; i++ {
// Both parties should now have sent out the initial requests queryBatch(t,
// to query the chan IDs of the other party. msgChan1, msgChan2,
select { syncer1, syncer2,
case <-time.After(time.Second * 2): chanSeries2,
t.Fatalf("didn't get msg from syncer1") false, 0, 0,
)
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a QueryShortChanIDs message.
_, ok := msg.(*lnwire.QueryShortChanIDs)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryShortChanIDs for %T", msg)
} }
select { // At this stage syncer1 should now be sending over its initial
case <-time.After(time.Second * 2): // GossipTimestampRange messages as it should be fully synced.
t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg:
}
}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a QueryShortChanIDs message.
_, ok := msg.(*lnwire.QueryShortChanIDs)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryShortChanIDs for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
}
}
}
// We'll then respond to both parties with an empty set of replies (as
// it doesn't affect the test).
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries1.annReq:
chanSeries1.annResp <- []lnwire.Message{}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries2.annReq:
chanSeries2.annResp <- []lnwire.Message{}
}
// Both sides should then receive a ReplyShortChanIDsEnd as the first
// chunk has been replied to.
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a ReplyShortChanIDsEnd message.
_, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg:
}
}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a ReplyShortChanIDsEnd message.
_, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
if !ok {
t.Fatalf("wrong message: expected "+
"ReplyShortChanIDsEnd for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
}
}
}
}
// At this stage both parties should now be sending over their initial
// GossipTimestampRange messages as they should both be fully synced.
select { select {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1") t.Fatalf("didn't get msg from syncer1")
@@ -1709,28 +1447,6 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
case syncer2.gossipMsgs <- msg: case syncer2.gossipMsgs <- msg:
}
}
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a GossipTimestampRange message.
_, ok := msg.(*lnwire.GossipTimestampRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
} }
} }
} }
@@ -1796,7 +1512,7 @@ func TestGossipSyncerAlreadySynced(t *testing.T) {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg") t.Fatalf("node 2 didn't read msg")
case syncer2.gossipMsgs <- msg: case syncer2.queryMsgs <- msg:
} }
} }
@@ -1818,7 +1534,7 @@ func TestGossipSyncerAlreadySynced(t *testing.T) {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg") t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg: case syncer1.queryMsgs <- msg:
} }
} }

8
lnpeer/errors.go Normal file
View File

@@ -0,0 +1,8 @@
package lnpeer
import "fmt"
var (
// ErrPeerExiting signals that the peer received a disconnect request.
ErrPeerExiting = fmt.Errorf("peer exiting")
)

15
peer.go
View File

@@ -32,9 +32,6 @@ import (
var ( var (
numNodes int32 numNodes int32
// ErrPeerExiting signals that the peer received a disconnect request.
ErrPeerExiting = fmt.Errorf("peer exiting")
) )
const ( const (
@@ -1411,7 +1408,7 @@ func (p *peer) logWireMessage(msg lnwire.Message, read bool) {
func (p *peer) writeMessage(msg lnwire.Message) error { func (p *peer) writeMessage(msg lnwire.Message) error {
// Simply exit if we're shutting down. // Simply exit if we're shutting down.
if atomic.LoadInt32(&p.disconnect) != 0 { if atomic.LoadInt32(&p.disconnect) != 0 {
return ErrPeerExiting return lnpeer.ErrPeerExiting
} }
// Only log the message on the first attempt. // Only log the message on the first attempt.
@@ -1559,7 +1556,7 @@ out:
} }
case <-p.quit: case <-p.quit:
exitErr = ErrPeerExiting exitErr = lnpeer.ErrPeerExiting
break out break out
} }
} }
@@ -1691,7 +1688,7 @@ func (p *peer) queue(priority bool, msg lnwire.Message, errChan chan error) {
case <-p.quit: case <-p.quit:
peerLog.Tracef("Peer shutting down, could not enqueue msg.") peerLog.Tracef("Peer shutting down, could not enqueue msg.")
if errChan != nil { if errChan != nil {
errChan <- ErrPeerExiting errChan <- lnpeer.ErrPeerExiting
} }
} }
} }
@@ -2504,7 +2501,7 @@ func (p *peer) sendMessage(sync, priority bool, msgs ...lnwire.Message) error {
case err := <-errChan: case err := <-errChan:
return err return err
case <-p.quit: case <-p.quit:
return ErrPeerExiting return lnpeer.ErrPeerExiting
} }
} }
@@ -2550,7 +2547,7 @@ func (p *peer) AddNewChannel(channel *channeldb.OpenChannel,
case <-cancel: case <-cancel:
return errors.New("canceled adding new channel") return errors.New("canceled adding new channel")
case <-p.quit: case <-p.quit:
return ErrPeerExiting return lnpeer.ErrPeerExiting
} }
// We pause here to wait for the peer to recognize the new channel // We pause here to wait for the peer to recognize the new channel
@@ -2559,7 +2556,7 @@ func (p *peer) AddNewChannel(channel *channeldb.OpenChannel,
case err := <-errChan: case err := <-errChan:
return err return err
case <-p.quit: case <-p.quit:
return ErrPeerExiting return lnpeer.ErrPeerExiting
} }
} }