mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-06-30 18:43:42 +02:00
Merge pull request #4683 from carlaKC/4471-acceptorupfrontshutdown
channelacceptor: add new fields to acceptor response
This commit is contained in:
185
rpcserver.go
185
rpcserver.go
@ -56,6 +56,7 @@ import (
|
||||
"github.com/lightningnetwork/lnd/lnwallet"
|
||||
"github.com/lightningnetwork/lnd/lnwallet/btcwallet"
|
||||
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
|
||||
"github.com/lightningnetwork/lnd/lnwallet/chancloser"
|
||||
"github.com/lightningnetwork/lnd/lnwallet/chanfunding"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/lightningnetwork/lnd/macaroons"
|
||||
@ -1878,7 +1879,7 @@ func (r *rpcServer) parseOpenChannelReq(in *lnrpc.OpenChannelRequest,
|
||||
rpcsLog.Debugf("[openchannel]: using fee of %v sat/kw for funding tx",
|
||||
int64(feeRate))
|
||||
|
||||
script, err := parseUpfrontShutdownAddress(
|
||||
script, err := chancloser.ParseUpfrontShutdownAddress(
|
||||
in.CloseAddress, r.cfg.ActiveNetParams.Params,
|
||||
)
|
||||
if err != nil {
|
||||
@ -2053,26 +2054,6 @@ func (r *rpcServer) OpenChannelSync(ctx context.Context,
|
||||
}
|
||||
}
|
||||
|
||||
// parseUpfrontShutdownScript attempts to parse an upfront shutdown address.
|
||||
// If the address is empty, it returns nil. If it successfully decoded the
|
||||
// address, it returns a script that pays out to the address.
|
||||
func parseUpfrontShutdownAddress(address string,
|
||||
params *chaincfg.Params) (lnwire.DeliveryAddress, error) {
|
||||
|
||||
if len(address) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
addr, err := btcutil.DecodeAddress(
|
||||
address, params,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid address: %v", err)
|
||||
}
|
||||
|
||||
return txscript.PayToAddrScript(addr)
|
||||
}
|
||||
|
||||
// GetChanPointFundingTxid returns the given channel point's funding txid in
|
||||
// raw bytes.
|
||||
func GetChanPointFundingTxid(chanPoint *lnrpc.ChannelPoint) (*chainhash.Hash, error) {
|
||||
@ -6401,14 +6382,6 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription
|
||||
}
|
||||
}
|
||||
|
||||
// chanAcceptInfo is used in the ChannelAcceptor bidirectional stream and
|
||||
// encapsulates the request information sent from the RPCAcceptor to the
|
||||
// RPCServer.
|
||||
type chanAcceptInfo struct {
|
||||
chanReq *chanacceptor.ChannelAcceptRequest
|
||||
responseChan chan bool
|
||||
}
|
||||
|
||||
// ChannelAcceptor dispatches a bi-directional streaming RPC in which
|
||||
// OpenChannel requests are sent to the client and the client responds with
|
||||
// a boolean that tells LND whether or not to accept the channel. This allows
|
||||
@ -6417,153 +6390,23 @@ type chanAcceptInfo struct {
|
||||
func (r *rpcServer) ChannelAcceptor(stream lnrpc.Lightning_ChannelAcceptorServer) error {
|
||||
chainedAcceptor := r.chanPredicate
|
||||
|
||||
// Create two channels to handle requests and responses respectively.
|
||||
newRequests := make(chan *chanAcceptInfo)
|
||||
responses := make(chan lnrpc.ChannelAcceptResponse)
|
||||
|
||||
// Define a quit channel that will be used to signal to the RPCAcceptor's
|
||||
// closure whether the stream still exists.
|
||||
quit := make(chan struct{})
|
||||
defer close(quit)
|
||||
|
||||
// demultiplexReq is a closure that will be passed to the RPCAcceptor and
|
||||
// acts as an intermediary between the RPCAcceptor and the RPCServer.
|
||||
demultiplexReq := func(req *chanacceptor.ChannelAcceptRequest) bool {
|
||||
respChan := make(chan bool, 1)
|
||||
|
||||
newRequest := &chanAcceptInfo{
|
||||
chanReq: req,
|
||||
responseChan: respChan,
|
||||
}
|
||||
|
||||
// timeout is the time after which ChannelAcceptRequests expire.
|
||||
timeout := time.After(r.cfg.AcceptorTimeout)
|
||||
|
||||
// Send the request to the newRequests channel.
|
||||
select {
|
||||
case newRequests <- newRequest:
|
||||
case <-timeout:
|
||||
rpcsLog.Errorf("RPCAcceptor returned false - reached timeout of %d",
|
||||
r.cfg.AcceptorTimeout)
|
||||
return false
|
||||
case <-quit:
|
||||
return false
|
||||
case <-r.quit:
|
||||
return false
|
||||
}
|
||||
|
||||
// Receive the response and return it. If no response has been received
|
||||
// in AcceptorTimeout, then return false.
|
||||
select {
|
||||
case resp := <-respChan:
|
||||
return resp
|
||||
case <-timeout:
|
||||
rpcsLog.Errorf("RPCAcceptor returned false - reached timeout of %d",
|
||||
r.cfg.AcceptorTimeout)
|
||||
return false
|
||||
case <-quit:
|
||||
return false
|
||||
case <-r.quit:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Create a new RPCAcceptor via the NewRPCAcceptor method.
|
||||
rpcAcceptor := chanacceptor.NewRPCAcceptor(demultiplexReq)
|
||||
// Create a new RPCAcceptor which will send requests into the
|
||||
// newRequests channel when it receives them.
|
||||
rpcAcceptor := chanacceptor.NewRPCAcceptor(
|
||||
stream.Recv, stream.Send, r.cfg.AcceptorTimeout,
|
||||
r.cfg.ActiveNetParams.Params, r.quit,
|
||||
)
|
||||
|
||||
// Add the RPCAcceptor to the ChainedAcceptor and defer its removal.
|
||||
id := chainedAcceptor.AddAcceptor(rpcAcceptor)
|
||||
defer chainedAcceptor.RemoveAcceptor(id)
|
||||
|
||||
// errChan is used by the receive loop to signal any errors that occur
|
||||
// during reading from the stream. This is primarily used to shutdown the
|
||||
// send loop in the case of an RPC client disconnecting.
|
||||
errChan := make(chan error, 1)
|
||||
|
||||
// We need to have the stream.Recv() in a goroutine since the call is
|
||||
// blocking and would prevent us from sending more ChannelAcceptRequests to
|
||||
// the RPC client.
|
||||
go func() {
|
||||
for {
|
||||
resp, err := stream.Recv()
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
|
||||
var pendingID [32]byte
|
||||
copy(pendingID[:], resp.PendingChanId)
|
||||
|
||||
openChanResp := lnrpc.ChannelAcceptResponse{
|
||||
Accept: resp.Accept,
|
||||
PendingChanId: pendingID[:],
|
||||
}
|
||||
|
||||
// Now that we have the response from the RPC client, send it to
|
||||
// the responses chan.
|
||||
select {
|
||||
case responses <- openChanResp:
|
||||
case <-quit:
|
||||
return
|
||||
case <-r.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
acceptRequests := make(map[[32]byte]chan bool)
|
||||
|
||||
for {
|
||||
select {
|
||||
case newRequest := <-newRequests:
|
||||
|
||||
req := newRequest.chanReq
|
||||
pendingChanID := req.OpenChanMsg.PendingChannelID
|
||||
|
||||
acceptRequests[pendingChanID] = newRequest.responseChan
|
||||
|
||||
// A ChannelAcceptRequest has been received, send it to the client.
|
||||
chanAcceptReq := &lnrpc.ChannelAcceptRequest{
|
||||
NodePubkey: req.Node.SerializeCompressed(),
|
||||
ChainHash: req.OpenChanMsg.ChainHash[:],
|
||||
PendingChanId: req.OpenChanMsg.PendingChannelID[:],
|
||||
FundingAmt: uint64(req.OpenChanMsg.FundingAmount),
|
||||
PushAmt: uint64(req.OpenChanMsg.PushAmount),
|
||||
DustLimit: uint64(req.OpenChanMsg.DustLimit),
|
||||
MaxValueInFlight: uint64(req.OpenChanMsg.MaxValueInFlight),
|
||||
ChannelReserve: uint64(req.OpenChanMsg.ChannelReserve),
|
||||
MinHtlc: uint64(req.OpenChanMsg.HtlcMinimum),
|
||||
FeePerKw: uint64(req.OpenChanMsg.FeePerKiloWeight),
|
||||
CsvDelay: uint32(req.OpenChanMsg.CsvDelay),
|
||||
MaxAcceptedHtlcs: uint32(req.OpenChanMsg.MaxAcceptedHTLCs),
|
||||
ChannelFlags: uint32(req.OpenChanMsg.ChannelFlags),
|
||||
}
|
||||
|
||||
if err := stream.Send(chanAcceptReq); err != nil {
|
||||
return err
|
||||
}
|
||||
case resp := <-responses:
|
||||
// Look up the appropriate channel to send on given the pending ID.
|
||||
// If a channel is found, send the response over it.
|
||||
var pendingID [32]byte
|
||||
copy(pendingID[:], resp.PendingChanId)
|
||||
respChan, ok := acceptRequests[pendingID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Send the response boolean over the buffered response channel.
|
||||
respChan <- resp.Accept
|
||||
|
||||
// Delete the channel from the acceptRequests map.
|
||||
delete(acceptRequests, pendingID)
|
||||
case err := <-errChan:
|
||||
rpcsLog.Errorf("Received an error: %v, shutting down", err)
|
||||
return err
|
||||
case <-r.quit:
|
||||
return fmt.Errorf("RPC server is shutting down")
|
||||
}
|
||||
}
|
||||
// Run the rpc acceptor, which will accept requests for channel
|
||||
// acceptance decisions from our chained acceptor, send them to the
|
||||
// channel acceptor and listen for and report responses. This function
|
||||
// blocks, and will exit if the rpcserver receives the instruction to
|
||||
// shutdown, or the client cancels.
|
||||
return rpcAcceptor.Run()
|
||||
}
|
||||
|
||||
// BakeMacaroon allows the creation of a new macaroon with custom read and write
|
||||
|
Reference in New Issue
Block a user