itest: use run context when making rpc requests

This commit is contained in:
yyforyongyu
2021-09-15 01:41:24 +08:00
parent bd521dd2f0
commit 403fdaa716

View File

@@ -962,23 +962,19 @@ func saveProfilesPage(node *HarnessNode) error {
// waitForTxInMempool blocks until the target txid is seen in the mempool. If // waitForTxInMempool blocks until the target txid is seen in the mempool. If
// the transaction isn't seen within the network before the passed timeout, // the transaction isn't seen within the network before the passed timeout,
// then an error is returned. // then an error is returned.
func (n *NetworkHarness) waitForTxInMempool(ctx context.Context, func (n *NetworkHarness) waitForTxInMempool(txid chainhash.Hash) error {
txid chainhash.Hash) error {
// Return immediately if harness has been torn down.
select {
case <-n.runCtx.Done():
return fmt.Errorf("NetworkHarness has been torn down")
default:
}
ticker := time.NewTicker(50 * time.Millisecond) ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop() defer ticker.Stop()
ctxt, cancel := context.WithTimeout(n.runCtx, DefaultTimeout)
defer cancel()
var mempool []*chainhash.Hash var mempool []*chainhash.Hash
for { for {
select { select {
case <-ctx.Done(): case <-n.runCtx.Done():
return fmt.Errorf("NetworkHarness has been torn down")
case <-ctxt.Done():
return fmt.Errorf("wanted %v, found %v txs "+ return fmt.Errorf("wanted %v, found %v txs "+
"in mempool: %v", txid, len(mempool), mempool) "in mempool: %v", txid, len(mempool), mempool)
@@ -1044,11 +1040,6 @@ type OpenChannelParams struct {
func (n *NetworkHarness) OpenChannel(srcNode, destNode *HarnessNode, func (n *NetworkHarness) OpenChannel(srcNode, destNode *HarnessNode,
p OpenChannelParams) (lnrpc.Lightning_OpenChannelClient, error) { p OpenChannelParams) (lnrpc.Lightning_OpenChannelClient, error) {
// The cancel is intentionally left out here because the returned
// item(open channel client) relies on the context being active. This
// will be fixed once we finish refactoring the NetworkHarness.
ctx, _ := context.WithTimeout(n.runCtx, ChannelOpenTimeout) // nolint: govet
// Wait until srcNode and destNode have the latest chain synced. // Wait until srcNode and destNode have the latest chain synced.
// Otherwise, we may run into a check within the funding manager that // Otherwise, we may run into a check within the funding manager that
// prevents any funding workflows from being kicked off if the chain // prevents any funding workflows from being kicked off if the chain
@@ -1079,7 +1070,9 @@ func (n *NetworkHarness) OpenChannel(srcNode, destNode *HarnessNode,
CommitmentType: p.CommitmentType, CommitmentType: p.CommitmentType,
} }
respStream, err := srcNode.OpenChannel(ctx, openReq) // We need to use n.runCtx here to keep the response stream alive after
// the function is returned.
respStream, err := srcNode.OpenChannel(n.runCtx, openReq)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to open channel between "+ return nil, fmt.Errorf("unable to open channel between "+
"alice and bob: %v", err) "alice and bob: %v", err)
@@ -1088,17 +1081,18 @@ func (n *NetworkHarness) OpenChannel(srcNode, destNode *HarnessNode,
chanOpen := make(chan struct{}) chanOpen := make(chan struct{})
errChan := make(chan error) errChan := make(chan error)
go func() { go func() {
// Consume the "channel pending" update. This waits until the node // Consume the "channel pending" update. This waits until the
// notifies us that the final message in the channel funding workflow // node notifies us that the final message in the channel
// has been sent to the remote node. // funding workflow has been sent to the remote node.
resp, err := respStream.Recv() resp, err := respStream.Recv()
if err != nil { if err != nil {
errChan <- err errChan <- err
return return
} }
if _, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanPending); !ok { _, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanPending)
errChan <- fmt.Errorf("expected channel pending update, "+ if !ok {
"instead got %v", resp) errChan <- fmt.Errorf("expected channel pending: "+
"update, instead got %v", resp)
return return
} }
@@ -1106,7 +1100,7 @@ func (n *NetworkHarness) OpenChannel(srcNode, destNode *HarnessNode,
}() }()
select { select {
case <-ctx.Done(): case <-time.After(ChannelOpenTimeout):
return nil, fmt.Errorf("timeout reached before chan pending "+ return nil, fmt.Errorf("timeout reached before chan pending "+
"update sent: %v", err) "update sent: %v", err)
case err := <-errChan: case err := <-errChan:
@@ -1116,17 +1110,14 @@ func (n *NetworkHarness) OpenChannel(srcNode, destNode *HarnessNode,
} }
} }
// OpenPendingChannel attempts to open a channel between srcNode and destNode with the // OpenPendingChannel attempts to open a channel between srcNode and destNode
// passed channel funding parameters. If the passed context has a timeout, then // with the passed channel funding parameters. If the passed context has a
// if the timeout is reached before the channel pending notification is // timeout, then if the timeout is reached before the channel pending
// received, an error is returned. // notification is received, an error is returned.
func (n *NetworkHarness) OpenPendingChannel(srcNode, destNode *HarnessNode, func (n *NetworkHarness) OpenPendingChannel(srcNode, destNode *HarnessNode,
amt btcutil.Amount, amt btcutil.Amount,
pushAmt btcutil.Amount) (*lnrpc.PendingUpdate, error) { pushAmt btcutil.Amount) (*lnrpc.PendingUpdate, error) {
ctx, cancel := context.WithTimeout(n.runCtx, ChannelOpenTimeout)
defer cancel()
// Wait until srcNode and destNode have blockchain synced // Wait until srcNode and destNode have blockchain synced
if err := srcNode.WaitForBlockchainSync(); err != nil { if err := srcNode.WaitForBlockchainSync(); err != nil {
return nil, fmt.Errorf("unable to sync srcNode chain: %v", err) return nil, fmt.Errorf("unable to sync srcNode chain: %v", err)
@@ -1142,7 +1133,9 @@ func (n *NetworkHarness) OpenPendingChannel(srcNode, destNode *HarnessNode,
Private: false, Private: false,
} }
respStream, err := srcNode.OpenChannel(ctx, openReq) // We need to use n.runCtx here to keep the response stream alive after
// the function is returned.
respStream, err := srcNode.OpenChannel(n.runCtx, openReq)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to open channel between "+ return nil, fmt.Errorf("unable to open channel between "+
"alice and bob: %v", err) "alice and bob: %v", err)
@@ -1151,9 +1144,9 @@ func (n *NetworkHarness) OpenPendingChannel(srcNode, destNode *HarnessNode,
chanPending := make(chan *lnrpc.PendingUpdate) chanPending := make(chan *lnrpc.PendingUpdate)
errChan := make(chan error) errChan := make(chan error)
go func() { go func() {
// Consume the "channel pending" update. This waits until the node // Consume the "channel pending" update. This waits until the
// notifies us that the final message in the channel funding workflow // node notifies us that the final message in the channel
// has been sent to the remote node. // funding workflow has been sent to the remote node.
resp, err := respStream.Recv() resp, err := respStream.Recv()
if err != nil { if err != nil {
errChan <- err errChan <- err
@@ -1161,8 +1154,8 @@ func (n *NetworkHarness) OpenPendingChannel(srcNode, destNode *HarnessNode,
} }
pendingResp, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanPending) pendingResp, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanPending)
if !ok { if !ok {
errChan <- fmt.Errorf("expected channel pending update, "+ errChan <- fmt.Errorf("expected channel pending "+
"instead got %v", resp) "update, instead got %v", resp)
return return
} }
@@ -1170,7 +1163,7 @@ func (n *NetworkHarness) OpenPendingChannel(srcNode, destNode *HarnessNode,
}() }()
select { select {
case <-ctx.Done(): case <-time.After(ChannelOpenTimeout):
return nil, fmt.Errorf("timeout reached before chan pending " + return nil, fmt.Errorf("timeout reached before chan pending " +
"update sent") "update sent")
case err := <-errChan: case err := <-errChan:
@@ -1185,7 +1178,8 @@ func (n *NetworkHarness) OpenPendingChannel(srcNode, destNode *HarnessNode,
// has a timeout, then if the timeout is reached before the channel has been // has a timeout, then if the timeout is reached before the channel has been
// opened, then an error is returned. // opened, then an error is returned.
func (n *NetworkHarness) WaitForChannelOpen( func (n *NetworkHarness) WaitForChannelOpen(
openChanStream lnrpc.Lightning_OpenChannelClient) (*lnrpc.ChannelPoint, error) { openChanStream lnrpc.Lightning_OpenChannelClient) (
*lnrpc.ChannelPoint, error) {
ctx, cancel := context.WithTimeout(n.runCtx, ChannelOpenTimeout) ctx, cancel := context.WithTimeout(n.runCtx, ChannelOpenTimeout)
defer cancel() defer cancel()
@@ -1224,13 +1218,14 @@ func (n *NetworkHarness) WaitForChannelOpen(
// has a timeout, an error is returned if that timeout is reached before the // has a timeout, an error is returned if that timeout is reached before the
// channel close is pending. // channel close is pending.
func (n *NetworkHarness) CloseChannel(lnNode *HarnessNode, func (n *NetworkHarness) CloseChannel(lnNode *HarnessNode,
cp *lnrpc.ChannelPoint, cp *lnrpc.ChannelPoint, force bool) (lnrpc.Lightning_CloseChannelClient,
force bool) (lnrpc.Lightning_CloseChannelClient, *chainhash.Hash, error) { *chainhash.Hash, error) {
// The cancel is intentionally left out here because the returned // The cancel is intentionally left out here because the returned
// item(close channel client) relies on the context being active. This // item(close channel client) relies on the context being active. This
// will be fixed once we finish refactoring the NetworkHarness. // will be fixed once we finish refactoring the NetworkHarness.
ctx, _ := context.WithTimeout(n.runCtx, ChannelCloseTimeout) // nolint: govet ctxt, cancel := context.WithTimeout(n.runCtx, ChannelCloseTimeout)
defer cancel()
// Create a channel outpoint that we can use to compare to channels // Create a channel outpoint that we can use to compare to channels
// from the ListChannelsResponse. // from the ListChannelsResponse.
@@ -1258,7 +1253,7 @@ func (n *NetworkHarness) CloseChannel(lnNode *HarnessNode,
// not. // not.
filterChannel := func(node *HarnessNode, filterChannel := func(node *HarnessNode,
op wire.OutPoint) (*lnrpc.Channel, error) { op wire.OutPoint) (*lnrpc.Channel, error) {
listResp, err := node.ListChannels(ctx, listReq) listResp, err := node.ListChannels(ctxt, listReq)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -1283,7 +1278,8 @@ func (n *NetworkHarness) CloseChannel(lnNode *HarnessNode,
} }
// Next, we'll fetch the target channel in order to get the // Next, we'll fetch the target channel in order to get the
// harness node that will be receiving the channel close request. // harness node that will be receiving the channel close
// request.
targetChan, err := filterChannel(lnNode, chanPoint) targetChan, err := filterChannel(lnNode, chanPoint)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@@ -1300,7 +1296,9 @@ func (n *NetworkHarness) CloseChannel(lnNode *HarnessNode,
return nil, nil, fmt.Errorf("channel of closing " + return nil, nil, fmt.Errorf("channel of closing " +
"node not active in time") "node not active in time")
} }
err = wait.Predicate(activeChanPredicate(receivingNode), timeout) err = wait.Predicate(
activeChanPredicate(receivingNode), timeout,
)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("channel of receiving " + return nil, nil, fmt.Errorf("channel of receiving " +
"node not active in time") "node not active in time")
@@ -1316,7 +1314,9 @@ func (n *NetworkHarness) CloseChannel(lnNode *HarnessNode,
closeReq := &lnrpc.CloseChannelRequest{ closeReq := &lnrpc.CloseChannelRequest{
ChannelPoint: cp, Force: force, ChannelPoint: cp, Force: force,
} }
closeRespStream, err = lnNode.CloseChannel(ctx, closeReq) // We need to use n.runCtx to keep the client stream alive
// after the function has returned.
closeRespStream, err = lnNode.CloseChannel(n.runCtx, closeReq)
if err != nil { if err != nil {
return fmt.Errorf("unable to close channel: %v", err) return fmt.Errorf("unable to close channel: %v", err)
} }
@@ -1342,7 +1342,7 @@ func (n *NetworkHarness) CloseChannel(lnNode *HarnessNode,
return fmt.Errorf("unable to decode closeTxid: "+ return fmt.Errorf("unable to decode closeTxid: "+
"%v", err) "%v", err)
} }
if err := n.waitForTxInMempool(ctx, *closeTxid); err != nil { if err := n.waitForTxInMempool(*closeTxid); err != nil {
return fmt.Errorf("error while waiting for "+ return fmt.Errorf("error while waiting for "+
"broadcast tx: %v", err) "broadcast tx: %v", err)
} }
@@ -1360,10 +1360,8 @@ func (n *NetworkHarness) CloseChannel(lnNode *HarnessNode,
// passed context has a timeout, then if the timeout is reached before the // passed context has a timeout, then if the timeout is reached before the
// notification is received then an error is returned. // notification is received then an error is returned.
func (n *NetworkHarness) WaitForChannelClose( func (n *NetworkHarness) WaitForChannelClose(
closeChanStream lnrpc.Lightning_CloseChannelClient) (*chainhash.Hash, error) { closeChanStream lnrpc.Lightning_CloseChannelClient) (
*chainhash.Hash, error) {
ctx, cancel := context.WithTimeout(n.runCtx, ChannelCloseTimeout)
defer cancel()
errChan := make(chan error) errChan := make(chan error)
updateChan := make(chan *lnrpc.CloseStatusUpdate_ChanClose) updateChan := make(chan *lnrpc.CloseStatusUpdate_ChanClose)
@@ -1387,7 +1385,7 @@ func (n *NetworkHarness) WaitForChannelClose(
// Wait until either the deadline for the context expires, an error // Wait until either the deadline for the context expires, an error
// occurs, or the channel close update is received. // occurs, or the channel close update is received.
select { select {
case <-ctx.Done(): case <-time.After(ChannelCloseTimeout):
return nil, fmt.Errorf("timeout reached before update sent") return nil, fmt.Errorf("timeout reached before update sent")
case err := <-errChan: case err := <-errChan:
return nil, err return nil, err