itest: remove extra graph topology subscription

This commit is contained in:
yyforyongyu 2021-08-08 16:57:24 +08:00
parent 92cd6657c5
commit a58543d1c7
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
2 changed files with 99 additions and 70 deletions

View File

@ -1055,50 +1055,12 @@ func (hn *HarnessNode) initLightningClient(conn *grpc.ClientConn) error {
return err
}
// Due to a race condition between the ChannelRouter starting and us
// making the subscription request, it's possible for our graph
// subscription to fail. To ensure we don't start listening for updates
// until then, we'll create a dummy subscription to ensure we can do so
// successfully before proceeding. We use a dummy subscription in order
// to not consume an update from the real one.
err := wait.NoError(func() error {
req := &lnrpc.GraphTopologySubscription{}
ctx, cancelFunc := context.WithCancel(context.Background())
topologyClient, err := hn.SubscribeChannelGraph(ctx, req)
if err != nil {
return err
}
// We'll wait to receive an error back within a one second
// timeout. This is needed since creating the client's stream is
// independent of the graph subscription being created. The
// stream is closed from the server's side upon an error.
errChan := make(chan error, 1)
go func() {
if _, err := topologyClient.Recv(); err != nil {
errChan <- err
}
}()
select {
case err = <-errChan:
case <-time.After(time.Second):
}
cancelFunc()
return err
}, DefaultTimeout)
if err != nil {
return err
}
// Launch the watcher that will hook into graph related topology change
// from the PoV of this node.
hn.wg.Add(1)
subscribed := make(chan error)
go hn.lightningNetworkWatcher(subscribed)
go hn.lightningNetworkWatcher()
return <-subscribed
return nil
}
// FetchNodeInfo queries an unlocked node to retrieve its public key.
@ -1394,40 +1356,19 @@ func checkChanPointInGraph(ctx context.Context,
// closed or opened within the network. In order to dispatch these
// notifications, the GraphTopologySubscription client exposed as part of the
// gRPC interface is used.
func (hn *HarnessNode) lightningNetworkWatcher(subscribed chan error) {
func (hn *HarnessNode) lightningNetworkWatcher() {
defer hn.wg.Done()
graphUpdates := make(chan *lnrpc.GraphTopologyUpdate)
// Start a goroutine to receive graph updates.
hn.wg.Add(1)
go func() {
defer hn.wg.Done()
req := &lnrpc.GraphTopologySubscription{}
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
topologyClient, err := hn.SubscribeChannelGraph(ctx, req)
err := hn.receiveTopologyClientStream(graphUpdates)
if err != nil {
msg := fmt.Sprintf("%s(%d): unable to create topology "+
"client: %v (%s)", hn.Name(), hn.NodeID, err,
time.Now().String())
subscribed <- fmt.Errorf(msg)
return
}
close(subscribed)
for {
update, err := topologyClient.Recv()
if err == io.EOF {
return
} else if err != nil {
return
}
select {
case graphUpdates <- update:
case <-hn.quit:
return
}
hn.PrintErr("receive topology client stream "+
"got err:%v", err)
}
}()
@ -1713,3 +1654,90 @@ func (hn *HarnessNode) handleCloseChannelWatchRequest(req *chanWatchRequest) {
req.eventChan,
)
}
type topologyClient lnrpc.Lightning_SubscribeChannelGraphClient
// newTopologyClient creates a topology client.
func (hn *HarnessNode) newTopologyClient(
ctx context.Context) (topologyClient, error) {
req := &lnrpc.GraphTopologySubscription{}
client, err := hn.SubscribeChannelGraph(ctx, req)
if err != nil {
return nil, fmt.Errorf("%s(%d): unable to create topology "+
"client: %v (%s)", hn.Name(), hn.NodeID, err,
time.Now().String())
}
return client, nil
}
// receiveTopologyClientStream initializes a topologyClient to subscribe
// topology update events. Due to a race condition between the ChannelRouter
// starting and us making the subscription request, it's possible for our graph
// subscription to fail. In that case, we will retry the subscription until it
// succeeds or fail after 10 seconds.
//
// NOTE: must be run as a goroutine.
func (hn *HarnessNode) receiveTopologyClientStream(
receiver chan *lnrpc.GraphTopologyUpdate) error {
ctxb := context.Background()
// Create a topology client to receive graph updates.
client, err := hn.newTopologyClient(ctxb)
if err != nil {
return fmt.Errorf("create topologyClient failed: %v", err)
}
// We use the context to time out when retrying graph subscription.
ctxt, cancel := context.WithTimeout(ctxb, DefaultTimeout)
defer cancel()
for {
update, err := client.Recv()
switch {
case err == nil:
// Good case. We will send the update to the receiver.
case strings.Contains(err.Error(), "router not started"):
// If the router hasn't been started, we will retry
// every 200 ms until it has been started or fail
// after the ctxt is timed out.
select {
case <-ctxt.Done():
return fmt.Errorf("graph subscription: " +
"router not started before timeout")
case <-time.After(wait.PollInterval):
case <-hn.quit:
return nil
}
// Re-create the topology client.
client, err = hn.newTopologyClient(ctxb)
if err != nil {
return fmt.Errorf("create topologyClient "+
"failed: %v", err)
}
continue
case strings.Contains(err.Error(), "EOF"):
// End of subscription stream. Do nothing and quit.
return nil
default:
// An expected error is returned, return and leave it
// to be handled by the caller.
return fmt.Errorf("graph subscription err: %v", err)
}
// Send the update or quit.
select {
case receiver <- update:
case <-hn.quit:
return nil
}
}
}

View File

@ -5,17 +5,18 @@ import (
"time"
)
// PollInterval is a constant specifying a 200 ms interval.
const PollInterval = 200 * time.Millisecond
// Predicate is a helper test function that will wait for a timeout period of
// time until the passed predicate returns true. This function is helpful as
// timing doesn't always line up well when running integration tests with
// several running lnd nodes. This function gives callers a way to assert that
// some property is upheld within a particular time frame.
func Predicate(pred func() bool, timeout time.Duration) error {
const pollInterval = 200 * time.Millisecond
exitTimer := time.After(timeout)
for {
<-time.After(pollInterval)
<-time.After(PollInterval)
select {
case <-exitTimer: