diff --git a/discovery/syncer.go b/discovery/syncer.go index 6a5c3cb1e..7ef81e838 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -87,17 +87,6 @@ const ( chansSynced ) -const ( - // DefaultMaxUndelayedQueryReplies specifies how many gossip queries we - // will respond to immediately before starting to delay responses. - DefaultMaxUndelayedQueryReplies = 10 - - // DefaultDelayedQueryReplyInterval is the length of time we will wait - // before responding to gossip queries after replying to - // maxUndelayedQueryReplies queries. - DefaultDelayedQueryReplyInterval = 5 * time.Second -) - // String returns a human readable string describing the target syncerState. func (s syncerState) String() string { switch s { @@ -121,6 +110,26 @@ func (s syncerState) String() string { } } +const ( + // DefaultMaxUndelayedQueryReplies specifies how many gossip queries we + // will respond to immediately before starting to delay responses. + DefaultMaxUndelayedQueryReplies = 10 + + // DefaultDelayedQueryReplyInterval is the length of time we will wait + // before responding to gossip queries after replying to + // maxUndelayedQueryReplies queries. + DefaultDelayedQueryReplyInterval = 5 * time.Second + + // chanRangeQueryBuffer is the number of blocks back that we'll go when + // asking the remote peer for their any channels they know of beyond + // our highest known channel ID. + chanRangeQueryBuffer = 144 + + // syncTransitionTimeout is the default timeout in which we'll wait up + // to when attempting to perform a sync transition. + syncTransitionTimeout = 5 * time.Second +) + var ( // encodingTypeToChunkSize maps an encoding type, to the max number of // short chan ID's using the encoding type that we can fit into a @@ -131,14 +140,22 @@ var ( // ErrGossipSyncerExiting signals that the syncer has been killed. ErrGossipSyncerExiting = errors.New("gossip syncer exiting") + + // ErrSyncTransitionTimeout is an error returned when we've timed out + // attempting to perform a sync transition. + ErrSyncTransitionTimeout = errors.New("timed out attempting to " + + "transition sync type") + + // zeroTimestamp is the timestamp we'll use when we want to indicate to + // peers that we do not want to receive any new graph updates. + zeroTimestamp time.Time ) -const ( - // chanRangeQueryBuffer is the number of blocks back that we'll go when - // asking the remote peer for their any channels they know of beyond - // our highest known channel ID. - chanRangeQueryBuffer = 144 -) +// syncTransitionReq encapsulates a request for a gossip syncer sync transition. +type syncTransitionReq struct { + newSyncType SyncerType + errChan chan error +} // gossipSyncerCfg is a struct that packages all the information a GossipSyncer // needs to carry out its duties. @@ -213,6 +230,12 @@ type GossipSyncer struct { // determine if we've already sent out our update. localUpdateHorizon *lnwire.GossipTimestampRange + // syncTransitions is a channel through which new sync type transition + // requests will be sent through. These requests should only be handled + // when the gossip syncer is in a chansSynced state to ensure its state + // machine behaves as expected. + syncTransitionReqs chan *syncTransitionReq + // gossipMsgs is a channel that all messages from the target peer will // be sent over. gossipMsgs chan lnwire.Message @@ -265,10 +288,11 @@ func newGossipSyncer(cfg gossipSyncerCfg) *GossipSyncer { ) return &GossipSyncer{ - cfg: cfg, - rateLimiter: rateLimiter, - gossipMsgs: make(chan lnwire.Message, 100), - quit: make(chan struct{}), + cfg: cfg, + rateLimiter: rateLimiter, + syncTransitionReqs: make(chan *syncTransitionReq), + gossipMsgs: make(chan lnwire.Message, 100), + quit: make(chan struct{}), } } @@ -298,10 +322,6 @@ func (g *GossipSyncer) Stop() { func (g *GossipSyncer) channelGraphSyncer() { defer g.wg.Done() - // TODO(roasbeef): also add ability to force transition back to syncing - // chans - // * needed if we want to sync chan state very few blocks? - for { state := g.syncState() syncType := g.SyncType() @@ -437,25 +457,19 @@ func (g *GossipSyncer) channelGraphSyncer() { // we want to receive real-time channel updates, we'll // do so now. if g.localUpdateHorizon == nil && syncType == ActiveSync { - updateHorizon := time.Now() - log.Infof("GossipSyncer(%x): applying "+ - "gossipFilter(start=%v)", - g.cfg.peerPub[:], updateHorizon) - - g.localUpdateHorizon = &lnwire.GossipTimestampRange{ - ChainHash: g.cfg.chainHash, - FirstTimestamp: uint32(updateHorizon.Unix()), - TimestampRange: math.MaxUint32, - } - err := g.cfg.sendToPeer(g.localUpdateHorizon) + err := g.sendGossipTimestampRange( + time.Now(), math.MaxUint32, + ) if err != nil { - log.Errorf("unable to send update "+ - "horizon: %v", err) + log.Errorf("Unable to send update "+ + "horizon to %x: %v", + g.cfg.peerPub, err) } } // With our horizon set, we'll simply reply to any new - // message and exit if needed. + // messages or process any state transitions and exit if + // needed. select { case msg := <-g.gossipMsgs: err := g.replyPeerQueries(msg) @@ -464,6 +478,9 @@ func (g *GossipSyncer) channelGraphSyncer() { "query: %v", err) } + case req := <-g.syncTransitionReqs: + req.errChan <- g.handleSyncTransition(req) + case <-g.quit: return } @@ -471,6 +488,37 @@ func (g *GossipSyncer) channelGraphSyncer() { } } +// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the +// syncer and sends it to the remote peer. +func (g *GossipSyncer) sendGossipTimestampRange(firstTimestamp time.Time, + timestampRange uint32) error { + + endTimestamp := firstTimestamp.Add( + time.Duration(timestampRange) * time.Second, + ) + + log.Infof("GossipSyncer(%x): applying gossipFilter(start=%v, end=%v)", + g.cfg.peerPub[:], firstTimestamp, endTimestamp) + + localUpdateHorizon := &lnwire.GossipTimestampRange{ + ChainHash: g.cfg.chainHash, + FirstTimestamp: uint32(firstTimestamp.Unix()), + TimestampRange: timestampRange, + } + + if err := g.cfg.sendToPeer(localUpdateHorizon); err != nil { + return err + } + + if firstTimestamp == zeroTimestamp && timestampRange == 0 { + g.localUpdateHorizon = nil + } else { + g.localUpdateHorizon = localUpdateHorizon + } + + return nil +} + // synchronizeChanIDs is called by the channelGraphSyncer when we need to query // the remote peer for its known set of channel IDs within a particular block // range. This method will be called continually until the entire range has @@ -978,6 +1026,85 @@ func (g *GossipSyncer) syncState() syncerState { return syncerState(atomic.LoadUint32(&g.state)) } +// ProcessSyncTransition sends a request to the gossip syncer to transition its +// sync type to a new one. +// +// NOTE: This can only be done once the gossip syncer has reached its final +// chansSynced state. +func (g *GossipSyncer) ProcessSyncTransition(newSyncType SyncerType) error { + errChan := make(chan error, 1) + select { + case g.syncTransitionReqs <- &syncTransitionReq{ + newSyncType: newSyncType, + errChan: errChan, + }: + case <-time.After(syncTransitionTimeout): + return ErrSyncTransitionTimeout + case <-g.quit: + return ErrGossipSyncerExiting + } + + select { + case err := <-errChan: + return err + case <-g.quit: + return ErrGossipSyncerExiting + } +} + +// handleSyncTransition handles a new sync type transition request. +// +// NOTE: The gossip syncer might have another sync state as a result of this +// transition. +func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error { + // Return early from any NOP sync transitions. + syncType := g.SyncType() + if syncType == req.newSyncType { + return nil + } + + log.Debugf("GossipSyncer(%x): transitioning from %v to %v", + g.cfg.peerPub, syncType, req.newSyncType) + + var ( + firstTimestamp time.Time + timestampRange uint32 + newState syncerState + ) + + switch req.newSyncType { + // If an active sync has been requested, then we should resume receiving + // new graph updates from the remote peer. + case ActiveSync: + firstTimestamp = time.Now() + timestampRange = math.MaxUint32 + newState = syncingChans + + // If a PassiveSync transition has been requested, then we should no + // longer receive any new updates from the remote peer. We can do this + // by setting our update horizon to a range in the past ensuring no + // graph updates match the timestamp range. + case PassiveSync: + firstTimestamp = zeroTimestamp + timestampRange = 0 + newState = chansSynced + + default: + return fmt.Errorf("unhandled sync transition %v", + req.newSyncType) + } + + err := g.sendGossipTimestampRange(firstTimestamp, timestampRange) + if err != nil { + return fmt.Errorf("unable to send local update horizon: %v", err) + } + + g.setSyncState(newState) + g.setSyncType(req.newSyncType) + + return nil +} + // setSyncType sets the gossip syncer's sync type to the given type. func (g *GossipSyncer) setSyncType(syncType SyncerType) { atomic.StoreUint32(&g.syncType, uint32(syncType)) diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index 3f01c8b06..f0c6f59ba 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -13,7 +13,9 @@ import ( ) const ( - defaultEncoding = lnwire.EncodingSortedPlain + defaultEncoding = lnwire.EncodingSortedPlain + latestKnownHeight = 1337 + startHeight = latestKnownHeight - chanRangeQueryBuffer ) var ( @@ -1940,3 +1942,143 @@ func TestGossipSyncerAlreadySynced(t *testing.T) { } } } + +// TestGossipSyncerSyncTransitions ensures that the gossip syncer properly +// carries out its duties when accepting a new sync transition request. +func TestGossipSyncerSyncTransitions(t *testing.T) { + t.Parallel() + + assertMsgSent := func(t *testing.T, msgChan chan []lnwire.Message, + msg lnwire.Message) { + + t.Helper() + + var msgSent lnwire.Message + select { + case msgs := <-msgChan: + if len(msgs) != 1 { + t.Fatal("expected to send a single message at "+ + "a time, got %d", len(msgs)) + } + msgSent = msgs[0] + case <-time.After(time.Second): + t.Fatalf("expected to send %T message", msg) + } + + if !reflect.DeepEqual(msgSent, msg) { + t.Fatalf("expected to send message: %v\ngot: %v", + spew.Sdump(msg), spew.Sdump(msgSent)) + } + } + + tests := []struct { + name string + entrySyncType SyncerType + finalSyncType SyncerType + assert func(t *testing.T, msgChan chan []lnwire.Message, + syncer *GossipSyncer) + }{ + { + name: "active to passive", + entrySyncType: ActiveSync, + finalSyncType: PassiveSync, + assert: func(t *testing.T, msgChan chan []lnwire.Message, + g *GossipSyncer) { + + // When transitioning from active to passive, we + // should expect to see a new local update + // horizon sent to the remote peer indicating + // that it would not like to receive any future + // updates. + assertMsgSent(t, msgChan, &lnwire.GossipTimestampRange{ + FirstTimestamp: uint32(zeroTimestamp.Unix()), + TimestampRange: 0, + }) + + syncState := g.syncState() + if syncState != chansSynced { + t.Fatalf("expected syncerState %v, "+ + "got %v", chansSynced, + syncState) + } + }, + }, + { + name: "passive to active", + entrySyncType: PassiveSync, + finalSyncType: ActiveSync, + assert: func(t *testing.T, msgChan chan []lnwire.Message, + g *GossipSyncer) { + + // When transitioning from historical to active, + // we should expect to see a new local update + // horizon sent to the remote peer indicating + // that it would like to receive any future + // updates. + firstTimestamp := uint32(time.Now().Unix()) + assertMsgSent(t, msgChan, &lnwire.GossipTimestampRange{ + FirstTimestamp: firstTimestamp, + TimestampRange: math.MaxUint32, + }) + + // The local update horizon should be followed + // by a QueryChannelRange message sent to the + // remote peer requesting all channels it + // knows of from the highest height the syncer + // knows of. + assertMsgSent(t, msgChan, &lnwire.QueryChannelRange{ + FirstBlockHeight: startHeight, + NumBlocks: math.MaxUint32 - startHeight, + }) + + syncState := g.syncState() + if syncState != waitingQueryRangeReply { + t.Fatalf("expected syncerState %v, "+ + "got %v", waitingQueryRangeReply, + syncState) + } + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + // We'll start each test by creating our syncer. We'll + // initialize it with a state of chansSynced, as that's + // the only time when it can process sync transitions. + msgChan, syncer, _ := newTestSyncer( + lnwire.ShortChannelID{ + BlockHeight: latestKnownHeight, + }, + defaultEncoding, defaultChunkSize, + ) + syncer.setSyncState(chansSynced) + + // We'll set the initial syncType to what the test + // demands. + syncer.setSyncType(test.entrySyncType) + + // We'll then start the syncer in order to process the + // request. + syncer.Start() + defer syncer.Stop() + + syncer.ProcessSyncTransition(test.finalSyncType) + + // The syncer should now have the expected final + // SyncerType that the test expects. + syncType := syncer.SyncType() + if syncType != test.finalSyncType { + t.Fatalf("expected syncType %v, got %v", + test.finalSyncType, syncType) + } + + // Finally, we'll run a set of assertions for each test + // to ensure the syncer performed its expected duties + // after processing its sync transition. + test.assert(t, msgChan, syncer) + }) + } +}