diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 7e124c02b..cc2907040 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1292,6 +1292,26 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate( return nil, err } + // We'll avoid broadcasting any updates for private channels to + // avoid directly giving away their existence. Instead, we'll + // send the update directly to the remote party. + if edgeInfo.info.AuthProof == nil { + remotePubKey := remotePubFromChanInfo( + edgeInfo.info, chanUpdate.ChannelFlags, + ) + err := d.reliableSender.sendMessage( + chanUpdate, remotePubKey, + ) + if err != nil { + log.Errorf("Unable to reliably send %v for "+ + "channel=%v to peer=%x: %v", + chanUpdate.MsgType(), + chanUpdate.ShortChannelID, + remotePubKey, err) + } + continue + } + // We set ourselves as the source of this message to indicate // that we shouldn't skip any peers when sending this message. chanUpdates = append(chanUpdates, networkMsg{ @@ -1922,13 +1942,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // so we'll try sending the update directly to the remote peer. if !nMsg.isRemote && chanInfo.AuthProof == nil { // Get our peer's public key. - var remotePubKey [33]byte - switch { - case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0: - remotePubKey = chanInfo.NodeKey2Bytes - case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1: - remotePubKey = chanInfo.NodeKey1Bytes - } + remotePubKey := remotePubFromChanInfo( + chanInfo, msg.ChannelFlags, + ) // Now, we'll attempt to send the channel update message // reliably to the remote peer in the background, so diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 066d76e2e..02c26f4d0 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -202,6 +202,30 @@ func (r *mockGraphSource) ForEachNode(func(node *channeldb.LightningNode) error) func (r *mockGraphSource) ForAllOutgoingChannels(cb func(i *channeldb.ChannelEdgeInfo, c *channeldb.ChannelEdgePolicy) error) error { + + r.mu.Lock() + defer r.mu.Unlock() + + chans := make(map[uint64]channeldb.ChannelEdge) + for _, info := range r.infos { + info := info + + edgeInfo := chans[info.ChannelID] + edgeInfo.Info = &info + chans[info.ChannelID] = edgeInfo + } + for _, edges := range r.edges { + edges := edges + + edge := chans[edges[0].ChannelID] + edge.Policy1 = &edges[0] + chans[edges[0].ChannelID] = edge + } + + for _, channel := range chans { + cb(channel.Info, channel.Policy1) + } + return nil } @@ -471,12 +495,10 @@ func createAnnouncements(blockHeight uint32) (*annBatch, error) { return nil, err } - batch.localProofAnn = &lnwire.AnnounceSignatures{ - NodeSignature: batch.remoteChanAnn.NodeSig1, - BitcoinSignature: batch.remoteChanAnn.BitcoinSig1, - } - batch.remoteProofAnn = &lnwire.AnnounceSignatures{ + ShortChannelID: lnwire.ShortChannelID{ + BlockHeight: blockHeight, + }, NodeSignature: batch.remoteChanAnn.NodeSig2, BitcoinSignature: batch.remoteChanAnn.BitcoinSig2, } @@ -486,6 +508,14 @@ func createAnnouncements(blockHeight uint32) (*annBatch, error) { return nil, err } + batch.localProofAnn = &lnwire.AnnounceSignatures{ + ShortChannelID: lnwire.ShortChannelID{ + BlockHeight: blockHeight, + }, + NodeSignature: batch.localChanAnn.NodeSig1, + BitcoinSignature: batch.localChanAnn.BitcoinSig1, + } + batch.chanUpdAnn1, err = createUpdateAnnouncement( blockHeight, 0, nodeKeyPriv1, timestamp, ) @@ -724,6 +754,7 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), ActiveSyncerTimeoutTicker: ticker.NewForce(DefaultActiveSyncerTimeout), NumActiveSyncers: 3, + AnnSigner: &mockSigner{nodeKeyPriv1}, }, nodeKeyPub1) if err := gossiper.Start(); err != nil { @@ -3235,6 +3266,222 @@ func TestSendChannelUpdateReliably(t *testing.T) { } } +func sendLocalMsg(t *testing.T, ctx *testCtx, msg lnwire.Message, + localPub *btcec.PublicKey) { + + t.Helper() + + select { + case err := <-ctx.gossiper.ProcessLocalAnnouncement(msg, localPub): + if err != nil { + t.Fatalf("unable to process channel msg: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } +} + +func sendRemoteMsg(t *testing.T, ctx *testCtx, msg lnwire.Message, + remotePeer lnpeer.Peer) { + + t.Helper() + + select { + case err := <-ctx.gossiper.ProcessRemoteAnnouncement(msg, remotePeer): + if err != nil { + t.Fatalf("unable to process channel msg: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } +} + +func assertBroadcastMsg(t *testing.T, ctx *testCtx, + predicate func(lnwire.Message) error) { + + t.Helper() + + // We don't care about the order of the broadcast, only that our target + // predicate returns true for any of the messages, so we'll continue to + // retry until either we hit our timeout, or it returns with no error + // (message found). + err := lntest.WaitNoError(func() error { + select { + case msg := <-ctx.broadcastedMessage: + return predicate(msg.msg) + case <-time.After(2 * trickleDelay): + return fmt.Errorf("no message broadcast") + } + }, time.Second*5) + if err != nil { + t.Fatal(err) + } +} + +// TestPropagateChanPolicyUpdate tests that we're able to issue requests to +// update policies for all channels and also select target channels. +// Additionally, we ensure that we don't propagate updates for any private +// channels. +func TestPropagateChanPolicyUpdate(t *testing.T) { + t.Parallel() + + // First, we'll make out test context and add 3 random channels to the + // graph. + startingHeight := uint32(10) + ctx, cleanup, err := createTestCtx(startingHeight) + if err != nil { + t.Fatalf("unable to create test context: %v", err) + } + defer cleanup() + + const numChannels = 3 + channelsToAnnounce := make([]*annBatch, 0, numChannels) + for i := 0; i < numChannels; i++ { + newChan, err := createAnnouncements(uint32(i + 1)) + if err != nil { + t.Fatalf("unable to make new channel ann: %v", err) + } + + channelsToAnnounce = append(channelsToAnnounce, newChan) + } + + localKey := nodeKeyPriv1.PubKey() + remoteKey := nodeKeyPriv2.PubKey() + + sentMsgs := make(chan lnwire.Message, 10) + remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} + + // The forced code path for sending the private ChannelUpdate to the + // remote peer will be hit, forcing it to request a notification that + // the remote peer is active. We'll ensure that it targets the proper + // pubkey, and hand it our mock peer above. + notifyErr := make(chan error, 1) + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func( + targetPub *btcec.PublicKey, peerChan chan<- lnpeer.Peer) { + + if !targetPub.IsEqual(remoteKey) { + notifyErr <- fmt.Errorf("reliableSender attempted to send the "+ + "message to the wrong peer: expected %x got %x", + remoteKey.SerializeCompressed(), + targetPub.SerializeCompressed()) + } + + peerChan <- remotePeer + } + + // With our channel announcements created, we'll now send them all to + // the gossiper in order for it to process. However, we'll hold back + // the channel ann proof from the first channel in order to have it be + // marked as private channel. + firstChanID := channelsToAnnounce[0].localChanAnn.ShortChannelID + for _, batch := range channelsToAnnounce { + sendLocalMsg(t, ctx, batch.localChanAnn, localKey) + sendLocalMsg(t, ctx, batch.chanUpdAnn1, localKey) + sendLocalMsg(t, ctx, batch.nodeAnn1, localKey) + + sendRemoteMsg(t, ctx, batch.chanUpdAnn2, remotePeer) + sendRemoteMsg(t, ctx, batch.nodeAnn2, remotePeer) + + // We'll skip sending the auth proofs from the first channel to + // ensure that it's seen as a private channel. + if batch.localChanAnn.ShortChannelID == firstChanID { + continue + } + + sendLocalMsg(t, ctx, batch.localProofAnn, localKey) + sendRemoteMsg(t, ctx, batch.remoteProofAnn, remotePeer) + } + + // Drain out any broadcast or direct messages we might not have read up + // to this point. We'll also check out notifyErr to detect if the + // reliable sender had an issue sending to the remote peer. +out: + for { + select { + case <-ctx.broadcastedMessage: + case <-sentMsgs: + case err := <-notifyErr: + t.Fatal(err) + default: + break out + } + } + + // Now that all of our channels are loaded, we'll attempt to update the + // policy of all of them. + const newTimeLockDelta = 100 + newPolicy := routing.ChannelPolicy{ + TimeLockDelta: newTimeLockDelta, + } + err = ctx.gossiper.PropagateChanPolicyUpdate(newPolicy) + if err != nil { + t.Fatalf("unable to chan policies: %v", err) + } + + // Two channel updates should now be broadcast, with neither of them + // being the channel our first private channel. + for i := 0; i < numChannels-1; i++ { + assertBroadcastMsg(t, ctx, func(msg lnwire.Message) error { + upd, ok := msg.(*lnwire.ChannelUpdate) + if !ok { + return fmt.Errorf("channel update not "+ + "broadcast, instead %T was", msg) + } + + if upd.ShortChannelID == firstChanID { + return fmt.Errorf("private channel upd " + + "broadcast") + } + if upd.TimeLockDelta != newTimeLockDelta { + return fmt.Errorf("wrong delta: expected %v, "+ + "got %v", newTimeLockDelta, + upd.TimeLockDelta) + } + + return nil + }) + } + + // Finally the ChannelUpdate should have been sent directly to the + // remote peer via the reliable sender. + select { + case msg := <-sentMsgs: + upd, ok := msg.(*lnwire.ChannelUpdate) + if !ok { + t.Fatalf("channel update not "+ + "broadcast, instead %T was", msg) + } + if upd.TimeLockDelta != newTimeLockDelta { + t.Fatalf("wrong delta: expected %v, "+ + "got %v", newTimeLockDelta, + upd.TimeLockDelta) + } + if upd.ShortChannelID != firstChanID { + t.Fatalf("private channel upd " + + "broadcast") + } + case <-time.After(time.Second * 5): + t.Fatalf("message not sent directly to peer") + } + + // At this point, no other ChannelUpdate messages should be broadcast + // as we sent the two public ones to the network, and the private one + // was sent directly to the peer. + for { + select { + case msg := <-ctx.broadcastedMessage: + if upd, ok := msg.msg.(*lnwire.ChannelUpdate); ok { + if upd.ShortChannelID == firstChanID { + t.Fatalf("chan update msg received: %v", + spew.Sdump(msg)) + } + } + default: + return + } + } +} + func assertMessage(t *testing.T, expected, got lnwire.Message) { t.Helper() diff --git a/discovery/utils.go b/discovery/utils.go index 3c0fdc17a..148e667b8 100644 --- a/discovery/utils.go +++ b/discovery/utils.go @@ -146,3 +146,19 @@ func SignAnnouncement(signer lnwallet.MessageSigner, pubKey *btcec.PublicKey, return signer.SignMessage(pubKey, data) } + +// remotePubFromChanInfo returns the public key of the remote peer given a +// ChannelEdgeInfo that describe a channel we have with them. +func remotePubFromChanInfo(chanInfo *channeldb.ChannelEdgeInfo, + chanFlags lnwire.ChanUpdateChanFlags) [33]byte { + + var remotePubKey [33]byte + switch { + case chanFlags&lnwire.ChanUpdateDirection == 0: + remotePubKey = chanInfo.NodeKey2Bytes + case chanFlags&lnwire.ChanUpdateDirection == 1: + remotePubKey = chanInfo.NodeKey1Bytes + } + + return remotePubKey +}