mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-09-01 18:27:43 +02:00
funding: add batch funding function
This commit is contained in:
529
funding/batch.go
Normal file
529
funding/batch.go
Normal file
@@ -0,0 +1,529 @@
|
|||||||
|
package funding
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"encoding/base64"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/btcsuite/btcd/chaincfg"
|
||||||
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||||
|
"github.com/btcsuite/btcd/wire"
|
||||||
|
"github.com/btcsuite/btcutil"
|
||||||
|
"github.com/btcsuite/btcutil/psbt"
|
||||||
|
"github.com/lightningnetwork/lnd/labels"
|
||||||
|
"github.com/lightningnetwork/lnd/lnrpc"
|
||||||
|
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
|
||||||
|
"github.com/lightningnetwork/lnd/lnwallet/chanfunding"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// errShuttingDown is the error that is returned if a signal on the
|
||||||
|
// quit channel is received which means the whole server is shutting
|
||||||
|
// down.
|
||||||
|
errShuttingDown = errors.New("shutting down")
|
||||||
|
|
||||||
|
// emptyChannelID is a channel ID that consists of all zeros.
|
||||||
|
emptyChannelID = [32]byte{}
|
||||||
|
)
|
||||||
|
|
||||||
|
// batchChannel is a struct that keeps track of a single channel's state within
|
||||||
|
// the batch funding process.
|
||||||
|
type batchChannel struct {
|
||||||
|
fundingReq *InitFundingMsg
|
||||||
|
pendingChanID [32]byte
|
||||||
|
updateChan chan *lnrpc.OpenStatusUpdate
|
||||||
|
errChan chan error
|
||||||
|
fundingAddr string
|
||||||
|
chanPoint *wire.OutPoint
|
||||||
|
isPending bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// processPsbtUpdate processes the first channel update message that is sent
|
||||||
|
// once the initial part of the negotiation has completed and the funding output
|
||||||
|
// (and therefore address) is known.
|
||||||
|
func (c *batchChannel) processPsbtUpdate(u *lnrpc.OpenStatusUpdate) error {
|
||||||
|
psbtUpdate := u.GetPsbtFund()
|
||||||
|
if psbtUpdate == nil {
|
||||||
|
return fmt.Errorf("got unexpected channel update %v", u.Update)
|
||||||
|
}
|
||||||
|
|
||||||
|
if psbtUpdate.FundingAmount != int64(c.fundingReq.LocalFundingAmt) {
|
||||||
|
return fmt.Errorf("got unexpected funding amount %d, wanted "+
|
||||||
|
"%d", psbtUpdate.FundingAmount,
|
||||||
|
c.fundingReq.LocalFundingAmt)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.fundingAddr = psbtUpdate.FundingAddress
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// processPendingUpdate is the second channel update message that is sent once
|
||||||
|
// the negotiation with the peer has completed and the channel is now pending.
|
||||||
|
func (c *batchChannel) processPendingUpdate(u *lnrpc.OpenStatusUpdate) error {
|
||||||
|
pendingUpd := u.GetChanPending()
|
||||||
|
if pendingUpd == nil {
|
||||||
|
return fmt.Errorf("got unexpected channel update %v", u.Update)
|
||||||
|
}
|
||||||
|
|
||||||
|
hash, err := chainhash.NewHash(pendingUpd.Txid)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not parse outpoint TX hash: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.chanPoint = &wire.OutPoint{
|
||||||
|
Index: pendingUpd.OutputIndex,
|
||||||
|
Hash: *hash,
|
||||||
|
}
|
||||||
|
c.isPending = true
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RequestParser is a function that parses an incoming RPC request into the
|
||||||
|
// internal funding initialization message.
|
||||||
|
type RequestParser func(*lnrpc.OpenChannelRequest) (*InitFundingMsg, error)
|
||||||
|
|
||||||
|
// ChannelOpener is a function that kicks off the initial channel open
|
||||||
|
// negotiation with the peer.
|
||||||
|
type ChannelOpener func(*InitFundingMsg) (chan *lnrpc.OpenStatusUpdate,
|
||||||
|
chan error)
|
||||||
|
|
||||||
|
// ChannelAbandoner is a function that can abandon a channel in the local
|
||||||
|
// database, graph and arbitrator state.
|
||||||
|
type ChannelAbandoner func(*wire.OutPoint) error
|
||||||
|
|
||||||
|
// WalletKitServer is a local interface that abstracts away the methods we need
|
||||||
|
// from the wallet kit sub server instance.
|
||||||
|
type WalletKitServer interface {
|
||||||
|
// FundPsbt creates a fully populated PSBT that contains enough inputs
|
||||||
|
// to fund the outputs specified in the template.
|
||||||
|
FundPsbt(context.Context,
|
||||||
|
*walletrpc.FundPsbtRequest) (*walletrpc.FundPsbtResponse, error)
|
||||||
|
|
||||||
|
// FinalizePsbt expects a partial transaction with all inputs and
|
||||||
|
// outputs fully declared and tries to sign all inputs that belong to
|
||||||
|
// the wallet.
|
||||||
|
FinalizePsbt(context.Context,
|
||||||
|
*walletrpc.FinalizePsbtRequest) (*walletrpc.FinalizePsbtResponse,
|
||||||
|
error)
|
||||||
|
|
||||||
|
// ReleaseOutput unlocks an output, allowing it to be available for coin
|
||||||
|
// selection if it remains unspent. The ID should match the one used to
|
||||||
|
// originally lock the output.
|
||||||
|
ReleaseOutput(context.Context,
|
||||||
|
*walletrpc.ReleaseOutputRequest) (*walletrpc.ReleaseOutputResponse,
|
||||||
|
error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wallet is a local interface that abstracts away the methods we need from the
|
||||||
|
// internal lightning wallet instance.
|
||||||
|
type Wallet interface {
|
||||||
|
// PsbtFundingVerify looks up a previously registered funding intent by
|
||||||
|
// its pending channel ID and tries to advance the state machine by
|
||||||
|
// verifying the passed PSBT.
|
||||||
|
PsbtFundingVerify([32]byte, *psbt.Packet) error
|
||||||
|
|
||||||
|
// PsbtFundingFinalize looks up a previously registered funding intent
|
||||||
|
// by its pending channel ID and tries to advance the state machine by
|
||||||
|
// finalizing the passed PSBT.
|
||||||
|
PsbtFundingFinalize([32]byte, *psbt.Packet, *wire.MsgTx) error
|
||||||
|
|
||||||
|
// PublishTransaction performs cursory validation (dust checks, etc),
|
||||||
|
// then finally broadcasts the passed transaction to the Bitcoin network.
|
||||||
|
PublishTransaction(*wire.MsgTx, string) error
|
||||||
|
|
||||||
|
// CancelFundingIntent allows a caller to cancel a previously registered
|
||||||
|
// funding intent. If no intent was found, then an error will be
|
||||||
|
// returned.
|
||||||
|
CancelFundingIntent([32]byte) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchConfig is the configuration for executing a single batch transaction for
|
||||||
|
// opening multiple channels atomically.
|
||||||
|
type BatchConfig struct {
|
||||||
|
// RequestParser is the function that parses an incoming RPC request
|
||||||
|
// into the internal funding initialization message.
|
||||||
|
RequestParser RequestParser
|
||||||
|
|
||||||
|
// ChannelOpener is the function that kicks off the initial channel open
|
||||||
|
// negotiation with the peer.
|
||||||
|
ChannelOpener ChannelOpener
|
||||||
|
|
||||||
|
// ChannelAbandoner is the function that can abandon a channel in the
|
||||||
|
// local database, graph and arbitrator state.
|
||||||
|
ChannelAbandoner ChannelAbandoner
|
||||||
|
|
||||||
|
// WalletKitServer is an instance of the wallet kit sub server that can
|
||||||
|
// handle PSBT funding and finalization.
|
||||||
|
WalletKitServer WalletKitServer
|
||||||
|
|
||||||
|
// Wallet is an instance of the internal lightning wallet.
|
||||||
|
Wallet Wallet
|
||||||
|
|
||||||
|
// NetParams contains the current bitcoin network parameters.
|
||||||
|
NetParams *chaincfg.Params
|
||||||
|
|
||||||
|
// Quit is the channel that is selected on to recognize if the main
|
||||||
|
// server is shutting down.
|
||||||
|
Quit chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Batcher is a type that can be used to perform an atomic funding of multiple
|
||||||
|
// channels within a single on-chain transaction.
|
||||||
|
type Batcher struct {
|
||||||
|
cfg *BatchConfig
|
||||||
|
|
||||||
|
channels []*batchChannel
|
||||||
|
lockedUTXOs []*walletrpc.UtxoLease
|
||||||
|
|
||||||
|
didPublish bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBatcher returns a new batch channel funding helper.
|
||||||
|
func NewBatcher(cfg *BatchConfig) *Batcher {
|
||||||
|
return &Batcher{
|
||||||
|
cfg: cfg,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchFund starts the atomic batch channel funding process.
|
||||||
|
//
|
||||||
|
// NOTE: This method should only be called once per instance.
|
||||||
|
func (b *Batcher) BatchFund(ctx context.Context,
|
||||||
|
req *lnrpc.BatchOpenChannelRequest) ([]*lnrpc.PendingUpdate, error) {
|
||||||
|
|
||||||
|
label, err := labels.ValidateAPI(req.Label)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse and validate each individual channel.
|
||||||
|
b.channels = make([]*batchChannel, 0, len(req.Channels))
|
||||||
|
for idx, rpcChannel := range req.Channels {
|
||||||
|
// If the user specifies a channel ID, it must be exactly 32
|
||||||
|
// bytes long.
|
||||||
|
if len(rpcChannel.PendingChanId) > 0 &&
|
||||||
|
len(rpcChannel.PendingChanId) != 32 {
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("invalid temp chan ID %x",
|
||||||
|
rpcChannel.PendingChanId)
|
||||||
|
}
|
||||||
|
|
||||||
|
var pendingChanID [32]byte
|
||||||
|
if len(rpcChannel.PendingChanId) == 32 {
|
||||||
|
copy(pendingChanID[:], rpcChannel.PendingChanId)
|
||||||
|
|
||||||
|
// Don't allow the user to be clever by just setting an
|
||||||
|
// all zero channel ID, we need a "real" value here.
|
||||||
|
if pendingChanID == emptyChannelID {
|
||||||
|
return nil, fmt.Errorf("invalid empty temp " +
|
||||||
|
"chan ID")
|
||||||
|
}
|
||||||
|
} else if _, err := rand.Read(pendingChanID[:]); err != nil {
|
||||||
|
return nil, fmt.Errorf("error making temp chan ID: %v",
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fundingReq, err := b.cfg.RequestParser(&lnrpc.OpenChannelRequest{
|
||||||
|
SatPerVbyte: uint64(req.SatPerVbyte),
|
||||||
|
NodePubkey: rpcChannel.NodePubkey,
|
||||||
|
LocalFundingAmount: rpcChannel.LocalFundingAmount,
|
||||||
|
PushSat: rpcChannel.PushSat,
|
||||||
|
TargetConf: req.TargetConf,
|
||||||
|
Private: rpcChannel.Private,
|
||||||
|
MinHtlcMsat: rpcChannel.MinHtlcMsat,
|
||||||
|
RemoteCsvDelay: rpcChannel.RemoteCsvDelay,
|
||||||
|
MinConfs: req.MinConfs,
|
||||||
|
SpendUnconfirmed: req.SpendUnconfirmed,
|
||||||
|
CloseAddress: rpcChannel.CloseAddress,
|
||||||
|
CommitmentType: rpcChannel.CommitmentType,
|
||||||
|
FundingShim: &lnrpc.FundingShim{
|
||||||
|
Shim: &lnrpc.FundingShim_PsbtShim{
|
||||||
|
PsbtShim: &lnrpc.PsbtShim{
|
||||||
|
PendingChanId: pendingChanID[:],
|
||||||
|
NoPublish: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error parsing channel %d: %v",
|
||||||
|
idx, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare the stuff that we'll need for the internal PSBT
|
||||||
|
// funding.
|
||||||
|
fundingReq.PendingChanID = pendingChanID
|
||||||
|
fundingReq.ChanFunder = chanfunding.NewPsbtAssembler(
|
||||||
|
btcutil.Amount(rpcChannel.LocalFundingAmount), nil,
|
||||||
|
b.cfg.NetParams, false,
|
||||||
|
)
|
||||||
|
|
||||||
|
b.channels = append(b.channels, &batchChannel{
|
||||||
|
pendingChanID: pendingChanID,
|
||||||
|
fundingReq: fundingReq,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// From this point on we can fail for any of the channels and for any
|
||||||
|
// number of reasons. This deferred function makes sure that the full
|
||||||
|
// operation is actually atomic: We either succeed and publish a
|
||||||
|
// transaction for the full batch or we clean up everything.
|
||||||
|
defer b.cleanup(ctx)
|
||||||
|
|
||||||
|
// Now that we know the user input is sane, we need to kick off the
|
||||||
|
// channel funding negotiation with the peers. Because we specified a
|
||||||
|
// PSBT assembler, we'll get a special response in the channel once the
|
||||||
|
// funding output script is known (which we need to craft the TX).
|
||||||
|
eg := &errgroup.Group{}
|
||||||
|
for _, channel := range b.channels {
|
||||||
|
channel.updateChan, channel.errChan = b.cfg.ChannelOpener(
|
||||||
|
channel.fundingReq,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Launch a goroutine that waits for the initial response on
|
||||||
|
// either the update or error chan.
|
||||||
|
channel := channel
|
||||||
|
eg.Go(func() error {
|
||||||
|
return b.waitForUpdate(channel, true)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all goroutines to report back. Any error at this stage means
|
||||||
|
// we need to abort.
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
return nil, fmt.Errorf("error batch opening channel, initial "+
|
||||||
|
"negotiation failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We can now assemble all outputs that we're going to give to the PSBT
|
||||||
|
// funding method of the wallet kit server.
|
||||||
|
txTemplate := &walletrpc.TxTemplate{
|
||||||
|
Outputs: make(map[string]uint64),
|
||||||
|
}
|
||||||
|
for _, channel := range b.channels {
|
||||||
|
txTemplate.Outputs[channel.fundingAddr] = uint64(
|
||||||
|
channel.fundingReq.LocalFundingAmt,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Great, we've now started the channel negotiation successfully with
|
||||||
|
// all peers. This means we know the channel outputs for all channels
|
||||||
|
// and can craft our PSBT now. We take the fee rate and min conf
|
||||||
|
// settings from the first request as all of them should be equal
|
||||||
|
// anyway.
|
||||||
|
firstReq := b.channels[0].fundingReq
|
||||||
|
feeRateSatPerKVByte := firstReq.FundingFeePerKw.FeePerKVByte()
|
||||||
|
fundPsbtReq := &walletrpc.FundPsbtRequest{
|
||||||
|
Template: &walletrpc.FundPsbtRequest_Raw{
|
||||||
|
Raw: txTemplate,
|
||||||
|
},
|
||||||
|
Fees: &walletrpc.FundPsbtRequest_SatPerVbyte{
|
||||||
|
SatPerVbyte: uint64(feeRateSatPerKVByte) / 1000,
|
||||||
|
},
|
||||||
|
MinConfs: firstReq.MinConfs,
|
||||||
|
SpendUnconfirmed: firstReq.MinConfs == 0,
|
||||||
|
}
|
||||||
|
fundPsbtResp, err := b.cfg.WalletKitServer.FundPsbt(ctx, fundPsbtReq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error funding PSBT for batch channel "+
|
||||||
|
"open: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Funding was successful. This means there are some UTXOs that are now
|
||||||
|
// locked for us. We need to make sure we release them if we don't
|
||||||
|
// complete the publish process.
|
||||||
|
b.lockedUTXOs = fundPsbtResp.LockedUtxos
|
||||||
|
|
||||||
|
// Parse and log the funded PSBT for debugging purposes.
|
||||||
|
unsignedPacket, err := psbt.NewFromRawBytes(
|
||||||
|
bytes.NewReader(fundPsbtResp.FundedPsbt), false,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error parsing funded PSBT for batch "+
|
||||||
|
"channel open: %v", err)
|
||||||
|
}
|
||||||
|
log.Tracef("[batchopenchannel] funded PSBT: %s",
|
||||||
|
base64.StdEncoding.EncodeToString(fundPsbtResp.FundedPsbt))
|
||||||
|
|
||||||
|
// With the funded PSBT we can now advance the funding state machine of
|
||||||
|
// each of the channels.
|
||||||
|
for _, channel := range b.channels {
|
||||||
|
err = b.cfg.Wallet.PsbtFundingVerify(
|
||||||
|
channel.pendingChanID, unsignedPacket,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error verifying PSBT: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The funded PSBT was accepted by each of the assemblers, let's now
|
||||||
|
// sign/finalize it.
|
||||||
|
finalizePsbtResp, err := b.cfg.WalletKitServer.FinalizePsbt(
|
||||||
|
ctx, &walletrpc.FinalizePsbtRequest{
|
||||||
|
FundedPsbt: fundPsbtResp.FundedPsbt,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error finalizing PSBT for batch "+
|
||||||
|
"channel open: %v", err)
|
||||||
|
}
|
||||||
|
finalTx := &wire.MsgTx{}
|
||||||
|
txReader := bytes.NewReader(finalizePsbtResp.RawFinalTx)
|
||||||
|
if err := finalTx.Deserialize(txReader); err != nil {
|
||||||
|
return nil, fmt.Errorf("error parsing signed raw TX: %v", err)
|
||||||
|
}
|
||||||
|
log.Tracef("[batchopenchannel] signed PSBT: %s",
|
||||||
|
base64.StdEncoding.EncodeToString(finalizePsbtResp.SignedPsbt))
|
||||||
|
|
||||||
|
// Advance the funding state machine of each of the channels a last time
|
||||||
|
// to complete the negotiation with the now signed funding TX.
|
||||||
|
for _, channel := range b.channels {
|
||||||
|
err = b.cfg.Wallet.PsbtFundingFinalize(
|
||||||
|
channel.pendingChanID, nil, finalTx,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error finalizing PSBT: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now every channel should be ready for the funding transaction to be
|
||||||
|
// broadcast. Let's wait for the updates that actually confirm this
|
||||||
|
// state.
|
||||||
|
eg = &errgroup.Group{}
|
||||||
|
for _, channel := range b.channels {
|
||||||
|
// Launch another goroutine that waits for the channel pending
|
||||||
|
// response on the update chan.
|
||||||
|
channel := channel
|
||||||
|
eg.Go(func() error {
|
||||||
|
return b.waitForUpdate(channel, false)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all updates and make sure we're still good to proceed.
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
return nil, fmt.Errorf("error batch opening channel, final "+
|
||||||
|
"negotiation failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Great, we're now finally ready to publish the transaction.
|
||||||
|
err = b.cfg.Wallet.PublishTransaction(finalTx, label)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error publishing final batch "+
|
||||||
|
"transaction: %v", err)
|
||||||
|
}
|
||||||
|
b.didPublish = true
|
||||||
|
|
||||||
|
rpcPoints := make([]*lnrpc.PendingUpdate, len(b.channels))
|
||||||
|
for idx, channel := range b.channels {
|
||||||
|
rpcPoints[idx] = &lnrpc.PendingUpdate{
|
||||||
|
Txid: channel.chanPoint.Hash.CloneBytes(),
|
||||||
|
OutputIndex: channel.chanPoint.Index,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rpcPoints, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitForUpdate waits for an incoming channel update (or error) for a single
|
||||||
|
// channel.
|
||||||
|
//
|
||||||
|
// NOTE: Must be called in a goroutine as this blocks until an update or error
|
||||||
|
// is received.
|
||||||
|
func (b *Batcher) waitForUpdate(channel *batchChannel, firstUpdate bool) error {
|
||||||
|
select {
|
||||||
|
// If an error occurs then immediately return the error to the client.
|
||||||
|
case err := <-channel.errChan:
|
||||||
|
log.Errorf("unable to open channel to NodeKey(%x): %v",
|
||||||
|
channel.fundingReq.TargetPubkey.SerializeCompressed(),
|
||||||
|
err)
|
||||||
|
return err
|
||||||
|
|
||||||
|
// Otherwise, wait for the next channel update. The first update sent
|
||||||
|
// must be the signal to start the PSBT funding in our case since we
|
||||||
|
// specified a PSBT shim. The second update will be the signal that the
|
||||||
|
// channel is now pending.
|
||||||
|
case fundingUpdate := <-channel.updateChan:
|
||||||
|
log.Tracef("[batchopenchannel] received update: %v",
|
||||||
|
fundingUpdate)
|
||||||
|
|
||||||
|
// Depending on what update we were waiting for the batch
|
||||||
|
// channel knows what to do with it.
|
||||||
|
if firstUpdate {
|
||||||
|
return channel.processPsbtUpdate(fundingUpdate)
|
||||||
|
}
|
||||||
|
|
||||||
|
return channel.processPendingUpdate(fundingUpdate)
|
||||||
|
|
||||||
|
case <-b.cfg.Quit:
|
||||||
|
return errShuttingDown
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanup tries to remove any pending state or UTXO locks in case we had to
|
||||||
|
// abort before finalizing and publishing the funding transaction.
|
||||||
|
func (b *Batcher) cleanup(ctx context.Context) {
|
||||||
|
// Did we publish a transaction? Then there's nothing to clean up since
|
||||||
|
// we succeeded.
|
||||||
|
if b.didPublish {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the error message doesn't sound too scary. These might be
|
||||||
|
// logged quite frequently depending on where exactly things were
|
||||||
|
// aborted. We could just not log any cleanup errors though it might be
|
||||||
|
// helpful to debug things if something doesn't go as expected.
|
||||||
|
const errMsgTpl = "Attempted to clean up after failed batch channel " +
|
||||||
|
"open but could not %s: %v"
|
||||||
|
|
||||||
|
// If we failed, we clean up in reverse order. First, let's unlock the
|
||||||
|
// leased outputs.
|
||||||
|
for _, lockedUTXO := range b.lockedUTXOs {
|
||||||
|
rpcOP := &lnrpc.OutPoint{
|
||||||
|
OutputIndex: lockedUTXO.Outpoint.OutputIndex,
|
||||||
|
TxidBytes: lockedUTXO.Outpoint.TxidBytes,
|
||||||
|
}
|
||||||
|
_, err := b.cfg.WalletKitServer.ReleaseOutput(
|
||||||
|
ctx, &walletrpc.ReleaseOutputRequest{
|
||||||
|
Id: lockedUTXO.Id,
|
||||||
|
Outpoint: rpcOP,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf(errMsgTpl, "release locked output "+
|
||||||
|
lockedUTXO.Outpoint.String(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then go through all channels that ever got into a pending state and
|
||||||
|
// remove the pending channel by abandoning them.
|
||||||
|
for _, channel := range b.channels {
|
||||||
|
if !channel.isPending {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err := b.cfg.ChannelAbandoner(channel.chanPoint)
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf(errMsgTpl, "abandon pending open channel",
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// And finally clean up the funding shim for each channel that didn't
|
||||||
|
// make it into a pending state.
|
||||||
|
for _, channel := range b.channels {
|
||||||
|
if channel.isPending {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err := b.cfg.Wallet.CancelFundingIntent(channel.pendingChanID)
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf(errMsgTpl, "cancel funding shim", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
422
funding/batch_test.go
Normal file
422
funding/batch_test.go
Normal file
@@ -0,0 +1,422 @@
|
|||||||
|
package funding
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/btcsuite/btcd/btcec"
|
||||||
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||||
|
"github.com/btcsuite/btcd/wire"
|
||||||
|
"github.com/btcsuite/btcutil"
|
||||||
|
"github.com/btcsuite/btcutil/psbt"
|
||||||
|
"github.com/lightningnetwork/lnd/lnrpc"
|
||||||
|
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
|
||||||
|
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
|
||||||
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
errFundingFailed = errors.New("funding failed")
|
||||||
|
|
||||||
|
testPubKey1Hex = "02e1ce77dfdda9fd1cf5e9d796faf57d1cedef9803aec84a6d7" +
|
||||||
|
"f8487d32781341e"
|
||||||
|
testPubKey1Bytes, _ = hex.DecodeString(testPubKey1Hex)
|
||||||
|
|
||||||
|
testPubKey2Hex = "039ddfc912035417b24aefe8da155267d71c3cf9e35405fc390" +
|
||||||
|
"df8357c5da7a5eb"
|
||||||
|
testPubKey2Bytes, _ = hex.DecodeString(testPubKey2Hex)
|
||||||
|
|
||||||
|
testOutPoint = wire.OutPoint{
|
||||||
|
Hash: [32]byte{1, 2, 3},
|
||||||
|
Index: 2,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
type fundingIntent struct {
|
||||||
|
chanIndex uint32
|
||||||
|
updateChan chan *lnrpc.OpenStatusUpdate
|
||||||
|
errChan chan error
|
||||||
|
}
|
||||||
|
|
||||||
|
type testHarness struct {
|
||||||
|
t *testing.T
|
||||||
|
batcher *Batcher
|
||||||
|
|
||||||
|
failUpdate1 bool
|
||||||
|
failUpdate2 bool
|
||||||
|
failPublish bool
|
||||||
|
|
||||||
|
intentsCreated map[[32]byte]*fundingIntent
|
||||||
|
intentsCanceled map[[32]byte]struct{}
|
||||||
|
abandonedChannels map[wire.OutPoint]struct{}
|
||||||
|
releasedUTXOs map[wire.OutPoint]struct{}
|
||||||
|
|
||||||
|
pendingPacket *psbt.Packet
|
||||||
|
pendingTx *wire.MsgTx
|
||||||
|
|
||||||
|
txPublished bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestHarness(t *testing.T, failUpdate1, failUpdate2,
|
||||||
|
failPublish bool) *testHarness {
|
||||||
|
|
||||||
|
h := &testHarness{
|
||||||
|
t: t,
|
||||||
|
failUpdate1: failUpdate1,
|
||||||
|
failUpdate2: failUpdate2,
|
||||||
|
failPublish: failPublish,
|
||||||
|
intentsCreated: make(map[[32]byte]*fundingIntent),
|
||||||
|
intentsCanceled: make(map[[32]byte]struct{}),
|
||||||
|
abandonedChannels: make(map[wire.OutPoint]struct{}),
|
||||||
|
releasedUTXOs: make(map[wire.OutPoint]struct{}),
|
||||||
|
pendingTx: &wire.MsgTx{
|
||||||
|
Version: 2,
|
||||||
|
TxIn: []*wire.TxIn{{
|
||||||
|
// Our one input that pays for everything.
|
||||||
|
PreviousOutPoint: testOutPoint,
|
||||||
|
}},
|
||||||
|
TxOut: []*wire.TxOut{{
|
||||||
|
// Our static change output.
|
||||||
|
PkScript: []byte{1, 2, 3},
|
||||||
|
Value: 99,
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
h.batcher = NewBatcher(&BatchConfig{
|
||||||
|
RequestParser: h.parseRequest,
|
||||||
|
ChannelOpener: h.openChannel,
|
||||||
|
ChannelAbandoner: h.abandonChannel,
|
||||||
|
WalletKitServer: h,
|
||||||
|
Wallet: h,
|
||||||
|
Quit: make(chan struct{}),
|
||||||
|
})
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *testHarness) parseRequest(
|
||||||
|
in *lnrpc.OpenChannelRequest) (*InitFundingMsg, error) {
|
||||||
|
|
||||||
|
pubKey, err := btcec.ParsePubKey(in.NodePubkey, btcec.S256())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &InitFundingMsg{
|
||||||
|
TargetPubkey: pubKey,
|
||||||
|
LocalFundingAmt: btcutil.Amount(in.LocalFundingAmount),
|
||||||
|
PushAmt: lnwire.NewMSatFromSatoshis(
|
||||||
|
btcutil.Amount(in.PushSat),
|
||||||
|
),
|
||||||
|
FundingFeePerKw: chainfee.SatPerKVByte(
|
||||||
|
in.SatPerVbyte * 1000,
|
||||||
|
).FeePerKWeight(),
|
||||||
|
Private: in.Private,
|
||||||
|
RemoteCsvDelay: uint16(in.RemoteCsvDelay),
|
||||||
|
MinConfs: in.MinConfs,
|
||||||
|
MaxLocalCsv: uint16(in.MaxLocalCsv),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *testHarness) openChannel(
|
||||||
|
req *InitFundingMsg) (chan *lnrpc.OpenStatusUpdate, chan error) {
|
||||||
|
|
||||||
|
updateChan := make(chan *lnrpc.OpenStatusUpdate, 2)
|
||||||
|
errChan := make(chan error, 1)
|
||||||
|
|
||||||
|
// The change output is always index 0.
|
||||||
|
chanIndex := uint32(len(h.intentsCreated) + 1)
|
||||||
|
|
||||||
|
h.intentsCreated[req.PendingChanID] = &fundingIntent{
|
||||||
|
chanIndex: chanIndex,
|
||||||
|
updateChan: updateChan,
|
||||||
|
errChan: errChan,
|
||||||
|
}
|
||||||
|
h.pendingTx.TxOut = append(h.pendingTx.TxOut, &wire.TxOut{
|
||||||
|
PkScript: []byte{1, 2, 3, byte(chanIndex)},
|
||||||
|
Value: int64(req.LocalFundingAmt),
|
||||||
|
})
|
||||||
|
|
||||||
|
if h.failUpdate1 {
|
||||||
|
errChan <- errFundingFailed
|
||||||
|
|
||||||
|
// Once we fail we don't send any more updates.
|
||||||
|
return updateChan, errChan
|
||||||
|
}
|
||||||
|
|
||||||
|
updateChan <- &lnrpc.OpenStatusUpdate{
|
||||||
|
PendingChanId: req.PendingChanID[:],
|
||||||
|
Update: &lnrpc.OpenStatusUpdate_PsbtFund{
|
||||||
|
PsbtFund: &lnrpc.ReadyForPsbtFunding{
|
||||||
|
FundingAmount: int64(
|
||||||
|
req.LocalFundingAmt,
|
||||||
|
),
|
||||||
|
FundingAddress: fmt.Sprintf("foo%d", chanIndex),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return updateChan, errChan
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *testHarness) abandonChannel(op *wire.OutPoint) error {
|
||||||
|
h.abandonedChannels[*op] = struct{}{}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *testHarness) FundPsbt(context.Context,
|
||||||
|
*walletrpc.FundPsbtRequest) (*walletrpc.FundPsbtResponse, error) {
|
||||||
|
|
||||||
|
packet, err := psbt.NewFromUnsignedTx(h.pendingTx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
h.pendingPacket = packet
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
if err := packet.Serialize(&buf); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &walletrpc.FundPsbtResponse{
|
||||||
|
FundedPsbt: buf.Bytes(),
|
||||||
|
LockedUtxos: []*walletrpc.UtxoLease{{
|
||||||
|
Id: []byte{1, 2, 3},
|
||||||
|
Outpoint: &lnrpc.OutPoint{
|
||||||
|
TxidBytes: testOutPoint.Hash[:],
|
||||||
|
OutputIndex: testOutPoint.Index,
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *testHarness) FinalizePsbt(context.Context,
|
||||||
|
*walletrpc.FinalizePsbtRequest) (*walletrpc.FinalizePsbtResponse,
|
||||||
|
error) {
|
||||||
|
|
||||||
|
var psbtBuf bytes.Buffer
|
||||||
|
if err := h.pendingPacket.Serialize(&psbtBuf); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var txBuf bytes.Buffer
|
||||||
|
if err := h.pendingTx.Serialize(&txBuf); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &walletrpc.FinalizePsbtResponse{
|
||||||
|
SignedPsbt: psbtBuf.Bytes(),
|
||||||
|
RawFinalTx: txBuf.Bytes(),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *testHarness) ReleaseOutput(_ context.Context,
|
||||||
|
r *walletrpc.ReleaseOutputRequest) (*walletrpc.ReleaseOutputResponse,
|
||||||
|
error) {
|
||||||
|
|
||||||
|
hash, err := chainhash.NewHash(r.Outpoint.TxidBytes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
op := wire.OutPoint{
|
||||||
|
Hash: *hash,
|
||||||
|
Index: r.Outpoint.OutputIndex,
|
||||||
|
}
|
||||||
|
|
||||||
|
h.releasedUTXOs[op] = struct{}{}
|
||||||
|
|
||||||
|
return &walletrpc.ReleaseOutputResponse{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *testHarness) PsbtFundingVerify([32]byte, *psbt.Packet) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *testHarness) PsbtFundingFinalize(pid [32]byte, _ *psbt.Packet,
|
||||||
|
_ *wire.MsgTx) error {
|
||||||
|
|
||||||
|
// During the finalize phase we can now prepare the next update to send.
|
||||||
|
// For this we first need to find the intent that has the channels we
|
||||||
|
// need to send on.
|
||||||
|
intent, ok := h.intentsCreated[pid]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("intent %x not found", pid)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We should now also have the final TX, let's get its hash.
|
||||||
|
hash := h.pendingTx.TxHash()
|
||||||
|
|
||||||
|
// For the second update we fail on the second channel only so the first
|
||||||
|
// is actually pending.
|
||||||
|
if h.failUpdate2 && intent.chanIndex == 2 {
|
||||||
|
intent.errChan <- errFundingFailed
|
||||||
|
} else {
|
||||||
|
intent.updateChan <- &lnrpc.OpenStatusUpdate{
|
||||||
|
PendingChanId: pid[:],
|
||||||
|
Update: &lnrpc.OpenStatusUpdate_ChanPending{
|
||||||
|
ChanPending: &lnrpc.PendingUpdate{
|
||||||
|
Txid: hash[:],
|
||||||
|
OutputIndex: intent.chanIndex,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *testHarness) PublishTransaction(*wire.MsgTx, string) error {
|
||||||
|
if h.failPublish {
|
||||||
|
return errFundingFailed
|
||||||
|
}
|
||||||
|
|
||||||
|
h.txPublished = true
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *testHarness) CancelFundingIntent(pid [32]byte) error {
|
||||||
|
h.intentsCanceled[pid] = struct{}{}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestBatchFund tests different success and error scenarios of the atomic batch
|
||||||
|
// channel funding.
|
||||||
|
func TestBatchFund(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
failUpdate1 bool
|
||||||
|
failUpdate2 bool
|
||||||
|
failPublish bool
|
||||||
|
channels []*lnrpc.BatchOpenChannel
|
||||||
|
expectedErr string
|
||||||
|
}{{
|
||||||
|
name: "happy path",
|
||||||
|
channels: []*lnrpc.BatchOpenChannel{{
|
||||||
|
NodePubkey: testPubKey1Bytes,
|
||||||
|
LocalFundingAmount: 1234,
|
||||||
|
}, {
|
||||||
|
NodePubkey: testPubKey2Bytes,
|
||||||
|
LocalFundingAmount: 4321,
|
||||||
|
}},
|
||||||
|
}, {
|
||||||
|
name: "initial negotiation failure",
|
||||||
|
failUpdate1: true,
|
||||||
|
channels: []*lnrpc.BatchOpenChannel{{
|
||||||
|
NodePubkey: testPubKey1Bytes,
|
||||||
|
LocalFundingAmount: 1234,
|
||||||
|
}, {
|
||||||
|
NodePubkey: testPubKey2Bytes,
|
||||||
|
LocalFundingAmount: 4321,
|
||||||
|
}},
|
||||||
|
expectedErr: "initial negotiation failed",
|
||||||
|
}, {
|
||||||
|
name: "final negotiation failure",
|
||||||
|
failUpdate2: true,
|
||||||
|
channels: []*lnrpc.BatchOpenChannel{{
|
||||||
|
NodePubkey: testPubKey1Bytes,
|
||||||
|
LocalFundingAmount: 1234,
|
||||||
|
}, {
|
||||||
|
NodePubkey: testPubKey2Bytes,
|
||||||
|
LocalFundingAmount: 4321,
|
||||||
|
}},
|
||||||
|
expectedErr: "final negotiation failed",
|
||||||
|
}, {
|
||||||
|
name: "publish failure",
|
||||||
|
failPublish: true,
|
||||||
|
channels: []*lnrpc.BatchOpenChannel{{
|
||||||
|
NodePubkey: testPubKey1Bytes,
|
||||||
|
LocalFundingAmount: 1234,
|
||||||
|
}, {
|
||||||
|
NodePubkey: testPubKey2Bytes,
|
||||||
|
LocalFundingAmount: 4321,
|
||||||
|
}},
|
||||||
|
expectedErr: "error publishing final batch transaction",
|
||||||
|
}}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
tc := tc
|
||||||
|
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
h := newTestHarness(
|
||||||
|
t, tc.failUpdate1, tc.failUpdate2,
|
||||||
|
tc.failPublish,
|
||||||
|
)
|
||||||
|
|
||||||
|
req := &lnrpc.BatchOpenChannelRequest{
|
||||||
|
Channels: tc.channels,
|
||||||
|
SatPerVbyte: 5,
|
||||||
|
MinConfs: 1,
|
||||||
|
}
|
||||||
|
updates, err := h.batcher.BatchFund(
|
||||||
|
context.Background(), req,
|
||||||
|
)
|
||||||
|
|
||||||
|
if tc.failUpdate1 || tc.failUpdate2 || tc.failPublish {
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Contains(t, err.Error(), tc.expectedErr)
|
||||||
|
} else {
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, updates, len(tc.channels))
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.failUpdate1 {
|
||||||
|
require.Len(t, h.releasedUTXOs, 0)
|
||||||
|
require.Len(t, h.intentsCreated, 2)
|
||||||
|
for pid := range h.intentsCreated {
|
||||||
|
require.Contains(
|
||||||
|
t, h.intentsCanceled, pid,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
hash := h.pendingTx.TxHash()
|
||||||
|
if tc.failUpdate2 {
|
||||||
|
require.Len(t, h.releasedUTXOs, 1)
|
||||||
|
require.Len(t, h.intentsCreated, 2)
|
||||||
|
|
||||||
|
// If we fail on update 2 we do so on the second
|
||||||
|
// channel so one will be pending and one not
|
||||||
|
// yet.
|
||||||
|
require.Len(t, h.intentsCanceled, 1)
|
||||||
|
require.Len(t, h.abandonedChannels, 1)
|
||||||
|
require.Contains(
|
||||||
|
t, h.abandonedChannels, wire.OutPoint{
|
||||||
|
Hash: hash,
|
||||||
|
Index: 1,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.failPublish {
|
||||||
|
require.Len(t, h.releasedUTXOs, 1)
|
||||||
|
require.Len(t, h.intentsCreated, 2)
|
||||||
|
|
||||||
|
require.Len(t, h.intentsCanceled, 0)
|
||||||
|
require.Len(t, h.abandonedChannels, 2)
|
||||||
|
require.Contains(
|
||||||
|
t, h.abandonedChannels, wire.OutPoint{
|
||||||
|
Hash: hash,
|
||||||
|
Index: 1,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
require.Contains(
|
||||||
|
t, h.abandonedChannels, wire.OutPoint{
|
||||||
|
Hash: hash,
|
||||||
|
Index: 2,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
1
go.mod
1
go.mod
@@ -57,6 +57,7 @@ require (
|
|||||||
go.etcd.io/etcd/client/v3 v3.5.0
|
go.etcd.io/etcd/client/v3 v3.5.0
|
||||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
|
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
|
||||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
|
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
|
||||||
|
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
|
||||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
|
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
|
||||||
google.golang.org/grpc v1.38.0
|
google.golang.org/grpc v1.38.0
|
||||||
google.golang.org/protobuf v1.26.0
|
google.golang.org/protobuf v1.26.0
|
||||||
|
Reference in New Issue
Block a user