diff --git a/rpcserver.go b/rpcserver.go index 7dd1ef5e7..35eec9521 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3680,18 +3680,23 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { htlcSema <- struct{}{} } + // We keep track of the running goroutines and set up a quit signal we + // can use to request them to exit if the method returns because of an + // encountered error. + var wg sync.WaitGroup + reqQuit := make(chan struct{}) + defer close(reqQuit) + // Launch a new goroutine to handle reading new payment requests from // the client. This way we can handle errors independently of blocking // and waiting for the next payment request to come through. - reqQuit := make(chan struct{}) - defer func() { - close(reqQuit) - }() - // TODO(joostjager): Callers expect result to come in in the same order // as the request were sent, but this is far from guarantueed in the // code below. + wg.Add(1) go func() { + defer wg.Done() + for { select { case <-reqQuit: @@ -3778,11 +3783,18 @@ sendLoop: // We launch a new goroutine to execute the current // payment so we can continue to serve requests while // this payment is being dispatched. + wg.Add(1) go func() { + defer wg.Done() + // Attempt to grab a free semaphore slot, using // a defer to eventually release the slot // regardless of payment success. - <-htlcSema + select { + case <-htlcSema: + case <-reqQuit: + return + } defer func() { htlcSema <- struct{}{} }() @@ -3853,6 +3865,10 @@ sendLoop: }() } } + + // Wait for all goroutines to finish before closing the stream. + wg.Wait() + return nil } // SendPaymentSync is the synchronous non-streaming version of SendPayment.