diff --git a/lntemp/rpc/lnd.go b/lntemp/rpc/lnd.go index a9ec94701..25eb94625 100644 --- a/lntemp/rpc/lnd.go +++ b/lntemp/rpc/lnd.go @@ -2,6 +2,7 @@ package rpc import ( "context" + "strings" "github.com/lightningnetwork/lnd/lnrpc" "github.com/stretchr/testify/require" @@ -132,6 +133,29 @@ func (h *HarnessRPC) PendingChannels() *lnrpc.PendingChannelsResponse { pendingChansRequest := &lnrpc.PendingChannelsRequest{} resp, err := h.LN.PendingChannels(ctxt, pendingChansRequest) + + // TODO(yy): We may get a `unable to find arbitrator` error from the + // rpc point, due to a timing issue in rpcserver, + // 1. `r.server.chanStateDB.FetchClosedChannels` fetches + // the pending force close channel. + // 2. `r.arbitratorPopulateForceCloseResp` relies on the + // channel arbitrator to get the report, and, + // 3. the arbitrator may be deleted due to the force close + // channel being resolved. + // Somewhere along the line is missing a lock to keep the data + // consistent. + // + // Return if there's no error. + if err == nil { + return resp + } + + // Otherwise, give it a second shot if it's the arbitrator error. + if strings.Contains(err.Error(), "unable to find arbitrator") { + resp, err = h.LN.PendingChannels(ctxt, pendingChansRequest) + } + + // It's very unlikely we'd get the arbitrator not found error again. h.NoError(err, "PendingChannels") return resp @@ -517,3 +541,30 @@ func (h *HarnessRPC) SubscribeInvoices( return client } + +type BackupSubscriber lnrpc.Lightning_SubscribeChannelBackupsClient + +// SubscribeChannelBackups creates a client to listen to channel backup stream. +func (h *HarnessRPC) SubscribeChannelBackups() BackupSubscriber { + // Use runCtx here instead of timeout context to keep the stream client + // alive. + backupStream, err := h.LN.SubscribeChannelBackups( + h.runCtx, &lnrpc.ChannelBackupSubscription{}, + ) + require.NoErrorf(h, err, "unable to create backup stream") + + return backupStream +} + +// VerifyChanBackup makes a RPC call to node's VerifyChanBackup and asserts. +func (h *HarnessRPC) VerifyChanBackup( + ss *lnrpc.ChanBackupSnapshot) *lnrpc.VerifyChanBackupResponse { + + ctxt, cancel := context.WithTimeout(h.runCtx, DefaultTimeout) + defer cancel() + + resp, err := h.LN.VerifyChanBackup(ctxt, ss) + require.NoErrorf(h, err, "unable to verify backup") + + return resp +} diff --git a/lntest/itest/list_on_test.go b/lntest/itest/list_on_test.go index 6e8698033..07afeeee5 100644 --- a/lntest/itest/list_on_test.go +++ b/lntest/itest/list_on_test.go @@ -191,4 +191,8 @@ var allTestCasesTemp = []*lntemp.TestCase{ Name: "invoice update subscription", TestFunc: testInvoiceSubscriptions, }, + { + Name: "streaming channel backup update", + TestFunc: testChannelBackupUpdates, + }, } diff --git a/lntest/itest/lnd_channel_backup_test.go b/lntest/itest/lnd_channel_backup_test.go index dc539ae03..09c9a14e7 100644 --- a/lntest/itest/lnd_channel_backup_test.go +++ b/lntest/itest/lnd_channel_backup_test.go @@ -782,12 +782,12 @@ func runChanRestoreScenarioForceClose(ht *lntemp.HarnessTest, zeroConf bool) { // testChannelBackupUpdates tests that both the streaming channel update RPC, // and the on-disk channel.backup are updated each time a channel is // opened/closed. -func testChannelBackupUpdates(net *lntest.NetworkHarness, t *harnessTest) { - ctxb := context.Background() +func testChannelBackupUpdates(ht *lntemp.HarnessTest) { + alice := ht.Alice // First, we'll make a temp directory that we'll use to store our // backup file, so we can check in on it during the test easily. - backupDir := t.t.TempDir() + backupDir := ht.T.TempDir() // First, we'll create a new node, Carol. We'll also create a temporary // file that Carol will use to store her channel backups. @@ -795,17 +795,11 @@ func testChannelBackupUpdates(net *lntest.NetworkHarness, t *harnessTest) { backupDir, chanbackup.DefaultBackupFileName, ) carolArgs := fmt.Sprintf("--backupfilepath=%v", backupFilePath) - carol := net.NewNode(t.t, "carol", []string{carolArgs}) - defer shutdownAndAssert(net, t, carol) + carol := ht.NewNode("carol", []string{carolArgs}) // Next, we'll register for streaming notifications for changes to the // backup file. - backupStream, err := carol.SubscribeChannelBackups( - ctxb, &lnrpc.ChannelBackupSubscription{}, - ) - if err != nil { - t.Fatalf("unable to create backup stream: %v", err) - } + backupStream := carol.RPC.SubscribeChannelBackups() // We'll use this goroutine to proxy any updates to a channel we can // easily use below. @@ -838,18 +832,16 @@ func testChannelBackupUpdates(net *lntest.NetworkHarness, t *harnessTest) { // With Carol up, we'll now connect her to Alice, and open a channel // between them. - net.ConnectNodes(t.t, carol, net.Alice) + ht.ConnectNodes(carol, alice) // Next, we'll open two channels between Alice and Carol back to back. var chanPoints []*lnrpc.ChannelPoint numChans := 2 chanAmt := btcutil.Amount(1000000) for i := 0; i < numChans; i++ { - chanPoint := openChannelAndAssert( - t, net, net.Alice, carol, - lntest.OpenChannelParams{Amt: chanAmt}, + chanPoint := ht.OpenChannel( + alice, carol, lntemp.OpenChannelParams{Amt: chanAmt}, ) - chanPoints = append(chanPoints, chanPoint) } @@ -860,12 +852,14 @@ func testChannelBackupUpdates(net *lntest.NetworkHarness, t *harnessTest) { for i := 0; i < numNtfns; i++ { select { case err := <-streamErr: - t.Fatalf("error with backup stream: %v", err) + require.Failf(ht, "stream err", + "error with backup stream: %v", err) case currentBackup = <-backupUpdates: case <-time.After(time.Second * 5): - t.Fatalf("didn't receive channel backup "+ + require.Failf(ht, "timeout", "didn't "+ + "receive channel backup "+ "notification %v", i+1) } } @@ -885,32 +879,29 @@ func testChannelBackupUpdates(net *lntest.NetworkHarness, t *harnessTest) { // nonce, we can't compare them directly, so instead // we'll compare the length which is a proxy for the // number of channels that the multi-backup contains. - rawBackup := currentBackup.MultiChanBackup.MultiChanBackup - if len(rawBackup) != len(packedBackup) { + backup := currentBackup.MultiChanBackup.MultiChanBackup + if len(backup) != len(packedBackup) { return fmt.Errorf("backup files don't match: "+ - "expected %x got %x", rawBackup, packedBackup) + "expected %x got %x", backup, + packedBackup) } // Additionally, we'll assert that both backups up // returned are valid. - for i, backup := range [][]byte{rawBackup, packedBackup} { + for _, backup := range [][]byte{backup, packedBackup} { snapshot := &lnrpc.ChanBackupSnapshot{ MultiChanBackup: &lnrpc.MultiChanBackup{ MultiChanBackup: backup, }, } - _, err := carol.VerifyChanBackup(ctxb, snapshot) - if err != nil { - return fmt.Errorf("unable to verify "+ - "backup #%d: %v", i, err) - } + + carol.RPC.VerifyChanBackup(snapshot) } return nil }, defaultTimeout) - if err != nil { - t.Fatalf("backup state invalid: %v", err) - } + require.NoError(ht, err, "timeout while checking "+ + "backup state: %v", err) } // As these two channels were just opened, we should've got two times @@ -931,11 +922,11 @@ func testChannelBackupUpdates(net *lntest.NetworkHarness, t *harnessTest) { chanPoint := chanPoints[i] - closeChannelAndAssert(t, net, net.Alice, chanPoint, forceClose) - // If we force closed the channel, then we'll mine enough // blocks to ensure all outputs have been swept. if forceClose { + ht.ForceCloseChannel(alice, chanPoint) + // A local force closed channel will trigger a // notification once the commitment TX confirms on // chain. But that won't remove the channel from the @@ -943,13 +934,12 @@ func testChannelBackupUpdates(net *lntest.NetworkHarness, t *harnessTest) { // locked contract was fully resolved on chain. assertBackupNtfns(1) - cleanupForceClose(t, net, net.Alice, chanPoint) - - // Now that the channel's been fully resolved, we expect - // another notification. + // Now that the channel's been fully resolved, we + // expect another notification. assertBackupNtfns(1) assertBackupFileState() } else { + ht.CloseChannel(alice, chanPoint) // We should get a single notification after closing, // and the on-disk state should match this latest // notifications. diff --git a/lntest/itest/lnd_test_list_on_test.go b/lntest/itest/lnd_test_list_on_test.go index 39e2d5d3d..771b73416 100644 --- a/lntest/itest/lnd_test_list_on_test.go +++ b/lntest/itest/lnd_test_list_on_test.go @@ -126,10 +126,6 @@ var allTestCases = []*testCase{ name: "route fee cutoff", test: testRouteFeeCutoff, }, - { - name: "streaming channel backup update", - test: testChannelBackupUpdates, - }, { name: "export channel backup", test: testExportChannelBackup,