diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index 2c928f27a..dade94ed5 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -1366,6 +1366,266 @@ func testCatchUpClientOnMissedBlocks(miner *rpctest.Harness, } } +// testCatchUpOnMissedBlocks the case of multiple registered clients receiving +// historical block epoch notifications due to the notifier's best known block +// being out of date. +func testCatchUpOnMissedBlocks(miner *rpctest.Harness, + notifier chainntnfs.TestChainNotifier, t *testing.T) { + + const numBlocks = 10 + const numClients = 5 + var wg sync.WaitGroup + + _, bestHeight, err := miner.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current blockheight %v", err) + } + + // This function is used by UnsafeStart to ensure all notifications + // are fully drained before clients register for notifications. + generateBlocks := func() error { + _, err = miner.Node.Generate(numBlocks) + return err + } + + // Next, start the notifier with outdated best block information. + if err := notifier.UnsafeStart(bestHeight, + nil, bestHeight+numBlocks, generateBlocks); err != nil { + + t.Fatalf("Unable to unsafe start the notifier: %v", err) + } + + // Create numClients clients who will listen for block notifications. + clients := make([]*chainntnfs.BlockEpochEvent, 0, numClients) + for i := 0; i < numClients; i++ { + epochClient, err := notifier.RegisterBlockEpochNtfn(nil) + if err != nil { + t.Fatalf("unable to register for epoch notification: %v", err) + } + clients = append(clients, epochClient) + } + + // Generate a single block to trigger the backlog of historical + // notifications for the previously mined blocks. + if _, err := miner.Node.Generate(1); err != nil { + t.Fatalf("unable to generate blocks: %v", err) + } + + // We expect each client to receive numBlocks + 1 notifications, 1 for + // each block that the notifier has missed out on. + for expectedHeight := bestHeight + 1; expectedHeight <= + bestHeight+numBlocks+1; expectedHeight++ { + + for _, epochClient := range clients { + select { + case block := <-epochClient.Epochs: + if block.Height != expectedHeight { + t.Fatalf("received block of height: %d, "+ + "expected: %d", block.Height, + expectedHeight) + } + case <-time.After(20 * time.Second): + t.Fatalf("did not receive historical notification "+ + "for height %d", expectedHeight) + } + } + } + + // Finally, ensure that an extra block notification wasn't received. + anyExtras := make(chan struct{}, len(clients)) + for _, epochClient := range clients { + wg.Add(1) + go func(epochClient *chainntnfs.BlockEpochEvent) { + defer wg.Done() + select { + case <-epochClient.Epochs: + anyExtras <- struct{}{} + case <-time.After(5 * time.Second): + } + }(epochClient) + } + + wg.Wait() + close(anyExtras) + + var extraCount int + for range anyExtras { + extraCount++ + } + + if extraCount > 0 { + t.Fatalf("received %d unexpected block notification", extraCount) + } +} + +// testCatchUpOnMissedBlocks tests that a client will still receive all valid +// block notifications in the case where a notifier's best block has been reorged +// out of the chain. +func testCatchUpOnMissedBlocksWithReorg(miner1 *rpctest.Harness, + notifier chainntnfs.TestChainNotifier, t *testing.T) { + + const numBlocks = 10 + const numClients = 5 + var wg sync.WaitGroup + + // Set up a new miner that we can use to cause a reorg. + miner2, err := rpctest.New(netParams, nil, nil) + if err != nil { + t.Fatalf("unable to create mining node: %v", err) + } + if err := miner2.SetUp(false, 0); err != nil { + t.Fatalf("unable to set up mining node: %v", err) + } + defer miner2.TearDown() + + // We start by connecting the new miner to our original miner, + // such that it will sync to our original chain. + if err := rpctest.ConnectNode(miner1, miner2); err != nil { + t.Fatalf("unable to connect harnesses: %v", err) + } + nodeSlice := []*rpctest.Harness{miner1, miner2} + if err := rpctest.JoinNodes(nodeSlice, rpctest.Blocks); err != nil { + t.Fatalf("unable to join node on blocks: %v", err) + } + + // The two should be on the same blockheight. + _, nodeHeight1, err := miner1.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current blockheight %v", err) + } + + _, nodeHeight2, err := miner2.Node.GetBestBlock() + if err != nil { + t.Fatalf("unable to get current blockheight %v", err) + } + + if nodeHeight1 != nodeHeight2 { + t.Fatalf("expected both miners to be on the same height: %v vs %v", + nodeHeight1, nodeHeight2) + } + + // We disconnect the two nodes, such that we can start mining on them + // individually without the other one learning about the new blocks. + err = miner1.Node.AddNode(miner2.P2PAddress(), rpcclient.ANRemove) + if err != nil { + t.Fatalf("unable to remove node: %v", err) + } + + // Now mine on each chain separately + blocks, err := miner1.Node.Generate(numBlocks) + if err != nil { + t.Fatalf("unable to generate single block: %v", err) + } + + // We generate an extra block on miner 2's chain to ensure it is the + // longer chain. + _, err = miner2.Node.Generate(numBlocks + 1) + if err != nil { + t.Fatalf("unable to generate single block: %v", err) + } + + // Sync the two chains to ensure they will sync to miner2's chain. + if err := rpctest.ConnectNode(miner1, miner2); err != nil { + t.Fatalf("unable to connect harnesses: %v", err) + } + nodeSlice = []*rpctest.Harness{miner1, miner2} + if err := rpctest.JoinNodes(nodeSlice, rpctest.Blocks); err != nil { + t.Fatalf("unable to join node on blocks: %v", err) + } + + // Next, start the notifier with outdated best block information. + // We set the notifier's best block to be the last block mined on the + // shorter chain, to test that the notifier correctly rewinds to + // the common ancestor between the two chains. + syncHeight := nodeHeight1 + numBlocks + 1 + if err := notifier.UnsafeStart(nodeHeight1+numBlocks, + blocks[numBlocks-1], syncHeight, nil); err != nil { + + t.Fatalf("Unable to unsafe start the notifier: %v", err) + } + + // Create numClients clients who will listen for block notifications. + clients := make([]*chainntnfs.BlockEpochEvent, 0, numClients) + for i := 0; i < numClients; i++ { + epochClient, err := notifier.RegisterBlockEpochNtfn(nil) + if err != nil { + t.Fatalf("unable to register for epoch notification: %v", err) + } + clients = append(clients, epochClient) + } + + // Generate a single block, which should trigger the notifier to rewind + // to the common ancestor and dispatch notifications from there. + _, err = miner2.Node.Generate(1) + if err != nil { + t.Fatalf("unable to generate single block: %v", err) + } + + // If the chain backend to the notifier stores information about reorged + // blocks, the notifier is able to rewind the chain to the common + // ancestor between the chain tip and its outdated best known block. + // In this case, the client is expected to receive numBlocks + 2 + // notifications, 1 for each block the notifier has missed out on from + // the longer chain. + // + // If the chain backend does not store information about reorged blocks, + // the notifier has no way of knowing where to rewind to and therefore + // the client is only expected to receive notifications for blocks + // whose height is greater than the notifier's best known height: 2 + // notifications, in this case. + var startingHeight int32 + switch notifier.(type) { + case *neutrinonotify.NeutrinoNotifier: + startingHeight = nodeHeight1 + numBlocks + 1 + default: + startingHeight = nodeHeight1 + 1 + } + + for expectedHeight := startingHeight; expectedHeight <= + nodeHeight1+numBlocks+2; expectedHeight++ { + + for _, epochClient := range clients { + select { + case block := <-epochClient.Epochs: + if block.Height != expectedHeight { + t.Fatalf("received block of height: %d, "+ + "expected: %d", block.Height, + expectedHeight) + } + case <-time.After(20 * time.Second): + t.Fatalf("did not receive historical notification "+ + "for height %d", expectedHeight) + } + } + } + + // Finally, ensure that an extra block notification wasn't received. + anyExtras := make(chan struct{}, len(clients)) + for _, epochClient := range clients { + wg.Add(1) + go func(epochClient *chainntnfs.BlockEpochEvent) { + defer wg.Done() + select { + case <-epochClient.Epochs: + anyExtras <- struct{}{} + case <-time.After(5 * time.Second): + } + }(epochClient) + } + + wg.Wait() + close(anyExtras) + + var extraCount int + for range anyExtras { + extraCount++ + } + + if extraCount > 0 { + t.Fatalf("received %d unexpected block notification", extraCount) + } +} + type testCase struct { name string @@ -1435,6 +1695,14 @@ var blockCatchupTests = []blockCatchupTestCase{ name: "catch up client on historical block epoch ntfns", test: testCatchUpClientOnMissedBlocks, }, + { + name: "test catch up on missed blocks", + test: testCatchUpOnMissedBlocks, + }, + { + name: "test catch up on missed blocks w/ reorged best block", + test: testCatchUpOnMissedBlocksWithReorg, + }, } // TestInterfaces tests all registered interfaces with a unified set of tests @@ -1626,6 +1894,7 @@ func TestInterfaces(t *testing.T) { t.Fatalf("unable to create %v notifier: %v", notifierType, err) } + testName := fmt.Sprintf("%v: %v", notifierType, blockCatchupTest.name)