mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-06-30 02:21:08 +02:00
multi: move channel acceptor logic out of rpcserver
This commit moves and partially refactors the channel acceptor logic
added in c2a6c86e
into the channel acceptor package. This allows us to
use the same logic in our unit tests as the rpcserver, rather than
needing to replicate it in unit tests.
Two changes are made to the existing implementation:
- Rather than having the Accept function run a closure, the closure
originally used in the rpcserver is moved directly into Accept
- The done channel used to signal client exit is moved into the acceptor
because the rpc server does not need knowledge of this detail (in
addition to other fields required for mocking the actual rpc).
Crediting orginal committer as co-author:
Co-authored-by: Crypt-iQ
This commit is contained in:
161
rpcserver.go
161
rpcserver.go
@ -6399,14 +6399,6 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription
|
||||
}
|
||||
}
|
||||
|
||||
// chanAcceptInfo is used in the ChannelAcceptor bidirectional stream and
|
||||
// encapsulates the request information sent from the RPCAcceptor to the
|
||||
// RPCServer.
|
||||
type chanAcceptInfo struct {
|
||||
chanReq *chanacceptor.ChannelAcceptRequest
|
||||
responseChan chan bool
|
||||
}
|
||||
|
||||
// ChannelAcceptor dispatches a bi-directional streaming RPC in which
|
||||
// OpenChannel requests are sent to the client and the client responds with
|
||||
// a boolean that tells LND whether or not to accept the channel. This allows
|
||||
@ -6415,153 +6407,22 @@ type chanAcceptInfo struct {
|
||||
func (r *rpcServer) ChannelAcceptor(stream lnrpc.Lightning_ChannelAcceptorServer) error {
|
||||
chainedAcceptor := r.chanPredicate
|
||||
|
||||
// Create two channels to handle requests and responses respectively.
|
||||
newRequests := make(chan *chanAcceptInfo)
|
||||
responses := make(chan lnrpc.ChannelAcceptResponse)
|
||||
|
||||
// Define a quit channel that will be used to signal to the RPCAcceptor's
|
||||
// closure whether the stream still exists.
|
||||
quit := make(chan struct{})
|
||||
defer close(quit)
|
||||
|
||||
// demultiplexReq is a closure that will be passed to the RPCAcceptor and
|
||||
// acts as an intermediary between the RPCAcceptor and the RPCServer.
|
||||
demultiplexReq := func(req *chanacceptor.ChannelAcceptRequest) bool {
|
||||
respChan := make(chan bool, 1)
|
||||
|
||||
newRequest := &chanAcceptInfo{
|
||||
chanReq: req,
|
||||
responseChan: respChan,
|
||||
}
|
||||
|
||||
// timeout is the time after which ChannelAcceptRequests expire.
|
||||
timeout := time.After(r.cfg.AcceptorTimeout)
|
||||
|
||||
// Send the request to the newRequests channel.
|
||||
select {
|
||||
case newRequests <- newRequest:
|
||||
case <-timeout:
|
||||
rpcsLog.Errorf("RPCAcceptor returned false - reached timeout of %d",
|
||||
r.cfg.AcceptorTimeout)
|
||||
return false
|
||||
case <-quit:
|
||||
return false
|
||||
case <-r.quit:
|
||||
return false
|
||||
}
|
||||
|
||||
// Receive the response and return it. If no response has been received
|
||||
// in AcceptorTimeout, then return false.
|
||||
select {
|
||||
case resp := <-respChan:
|
||||
return resp
|
||||
case <-timeout:
|
||||
rpcsLog.Errorf("RPCAcceptor returned false - reached timeout of %d",
|
||||
r.cfg.AcceptorTimeout)
|
||||
return false
|
||||
case <-quit:
|
||||
return false
|
||||
case <-r.quit:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Create a new RPCAcceptor via the NewRPCAcceptor method.
|
||||
rpcAcceptor := chanacceptor.NewRPCAcceptor(demultiplexReq)
|
||||
// Create a new RPCAcceptor which will send requests into the
|
||||
// newRequests channel when it receives them.
|
||||
rpcAcceptor := chanacceptor.NewRPCAcceptor(
|
||||
stream.Recv, stream.Send, r.cfg.AcceptorTimeout, r.quit,
|
||||
)
|
||||
|
||||
// Add the RPCAcceptor to the ChainedAcceptor and defer its removal.
|
||||
id := chainedAcceptor.AddAcceptor(rpcAcceptor)
|
||||
defer chainedAcceptor.RemoveAcceptor(id)
|
||||
|
||||
// errChan is used by the receive loop to signal any errors that occur
|
||||
// during reading from the stream. This is primarily used to shutdown the
|
||||
// send loop in the case of an RPC client disconnecting.
|
||||
errChan := make(chan error, 1)
|
||||
|
||||
// We need to have the stream.Recv() in a goroutine since the call is
|
||||
// blocking and would prevent us from sending more ChannelAcceptRequests to
|
||||
// the RPC client.
|
||||
go func() {
|
||||
for {
|
||||
resp, err := stream.Recv()
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
|
||||
var pendingID [32]byte
|
||||
copy(pendingID[:], resp.PendingChanId)
|
||||
|
||||
openChanResp := lnrpc.ChannelAcceptResponse{
|
||||
Accept: resp.Accept,
|
||||
PendingChanId: pendingID[:],
|
||||
}
|
||||
|
||||
// Now that we have the response from the RPC client, send it to
|
||||
// the responses chan.
|
||||
select {
|
||||
case responses <- openChanResp:
|
||||
case <-quit:
|
||||
return
|
||||
case <-r.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
acceptRequests := make(map[[32]byte]chan bool)
|
||||
|
||||
for {
|
||||
select {
|
||||
case newRequest := <-newRequests:
|
||||
|
||||
req := newRequest.chanReq
|
||||
pendingChanID := req.OpenChanMsg.PendingChannelID
|
||||
|
||||
acceptRequests[pendingChanID] = newRequest.responseChan
|
||||
|
||||
// A ChannelAcceptRequest has been received, send it to the client.
|
||||
chanAcceptReq := &lnrpc.ChannelAcceptRequest{
|
||||
NodePubkey: req.Node.SerializeCompressed(),
|
||||
ChainHash: req.OpenChanMsg.ChainHash[:],
|
||||
PendingChanId: req.OpenChanMsg.PendingChannelID[:],
|
||||
FundingAmt: uint64(req.OpenChanMsg.FundingAmount),
|
||||
PushAmt: uint64(req.OpenChanMsg.PushAmount),
|
||||
DustLimit: uint64(req.OpenChanMsg.DustLimit),
|
||||
MaxValueInFlight: uint64(req.OpenChanMsg.MaxValueInFlight),
|
||||
ChannelReserve: uint64(req.OpenChanMsg.ChannelReserve),
|
||||
MinHtlc: uint64(req.OpenChanMsg.HtlcMinimum),
|
||||
FeePerKw: uint64(req.OpenChanMsg.FeePerKiloWeight),
|
||||
CsvDelay: uint32(req.OpenChanMsg.CsvDelay),
|
||||
MaxAcceptedHtlcs: uint32(req.OpenChanMsg.MaxAcceptedHTLCs),
|
||||
ChannelFlags: uint32(req.OpenChanMsg.ChannelFlags),
|
||||
}
|
||||
|
||||
if err := stream.Send(chanAcceptReq); err != nil {
|
||||
return err
|
||||
}
|
||||
case resp := <-responses:
|
||||
// Look up the appropriate channel to send on given the pending ID.
|
||||
// If a channel is found, send the response over it.
|
||||
var pendingID [32]byte
|
||||
copy(pendingID[:], resp.PendingChanId)
|
||||
respChan, ok := acceptRequests[pendingID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Send the response boolean over the buffered response channel.
|
||||
respChan <- resp.Accept
|
||||
|
||||
// Delete the channel from the acceptRequests map.
|
||||
delete(acceptRequests, pendingID)
|
||||
case err := <-errChan:
|
||||
rpcsLog.Errorf("Received an error: %v, shutting down", err)
|
||||
return err
|
||||
case <-r.quit:
|
||||
return fmt.Errorf("RPC server is shutting down")
|
||||
}
|
||||
}
|
||||
// Run the rpc acceptor, which will accept requests for channel
|
||||
// acceptance decisions from our chained acceptor, send them to the
|
||||
// channel acceptor and listen for and report responses. This function
|
||||
// blocks, and will exit if the rpcserver receives the instruction to
|
||||
// shutdown, or the client cancels.
|
||||
return rpcAcceptor.Run()
|
||||
}
|
||||
|
||||
// BakeMacaroon allows the creation of a new macaroon with custom read and write
|
||||
|
Reference in New Issue
Block a user