rpcserver: split PendingChannels into three sections

This commit refactors the method PendingChannels for maintenance
purpose.
This commit is contained in:
yyforyongyu 2022-01-07 16:38:51 +08:00
parent 61bffa70f9
commit 3c1c640612
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868

View File

@ -3190,38 +3190,27 @@ func (r *rpcServer) ChannelBalance(ctx context.Context,
}, nil
}
// PendingChannels returns a list of all the channels that are currently
// considered "pending". A channel is pending if it has finished the funding
// workflow and is waiting for confirmations for the funding txn, or is in the
// process of closure, either initiated cooperatively or non-cooperatively.
func (r *rpcServer) PendingChannels(ctx context.Context,
in *lnrpc.PendingChannelsRequest) (*lnrpc.PendingChannelsResponse, error) {
rpcsLog.Debugf("[pendingchannels]")
resp := &lnrpc.PendingChannelsResponse{}
// rpcInitiator returns the correct lnrpc initiator for channels where
// we have a record of the opening channel.
rpcInitiator := func(isInitiator bool) lnrpc.Initiator {
if isInitiator {
return lnrpc.Initiator_INITIATOR_LOCAL
}
return lnrpc.Initiator_INITIATOR_REMOTE
}
type (
pendingOpenChannels []*lnrpc.PendingChannelsResponse_PendingOpenChannel
pendingForceClose []*lnrpc.PendingChannelsResponse_ForceClosedChannel
waitingCloseChannels []*lnrpc.PendingChannelsResponse_WaitingCloseChannel
)
// fetchPendingOpenChannels queries the database for a list of channels that
// have pending open state. The returned result is used in the response of the
// PendingChannels RPC.
func (r *rpcServer) fetchPendingOpenChannels() (pendingOpenChannels, error) {
// First, we'll populate the response with all the channels that are
// soon to be opened. We can easily fetch this data from the database
// and map the db struct to the proto response.
pendingOpenChannels, err := r.server.chanStateDB.FetchPendingChannels()
channels, err := r.server.chanStateDB.FetchPendingChannels()
if err != nil {
rpcsLog.Errorf("unable to fetch pending channels: %v", err)
return nil, err
}
resp.PendingOpenChannels = make([]*lnrpc.PendingChannelsResponse_PendingOpenChannel,
len(pendingOpenChannels))
for i, pendingChan := range pendingOpenChannels {
result := make(pendingOpenChannels, len(channels))
for i, pendingChan := range channels {
pub := pendingChan.IdentityPub.SerializeCompressed()
// As this is required for display purposes, we'll calculate
@ -3236,7 +3225,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
commitBaseWeight := blockchain.GetTransactionWeight(utx)
commitWeight := commitBaseWeight + input.WitnessCommitmentTxWeight
resp.PendingOpenChannels[i] = &lnrpc.PendingChannelsResponse_PendingOpenChannel{
result[i] = &lnrpc.PendingChannelsResponse_PendingOpenChannel{
Channel: &lnrpc.PendingChannelsResponse_PendingChannel{
RemoteNodePub: hex.EncodeToString(pub),
ChannelPoint: pendingChan.FundingOutpoint.String(),
@ -3255,19 +3244,32 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
}
}
return result, nil
}
// fetchPendingForceCloseChannels queries the database for a list of channels
// that have their closing transactions confirmed but not fully resolved yet.
// The returned result is used in the response of the PendingChannels RPC.
func (r *rpcServer) fetchPendingForceCloseChannels() (pendingForceClose,
int64, error) {
_, currentHeight, err := r.server.cc.ChainIO.GetBestBlock()
if err != nil {
return nil, err
return nil, 0, err
}
// Next, we'll examine the channels that are soon to be closed so we
// can populate these fields within the response.
pendingCloseChannels, err := r.server.chanStateDB.FetchClosedChannels(true)
channels, err := r.server.chanStateDB.FetchClosedChannels(true)
if err != nil {
rpcsLog.Errorf("unable to fetch closed channels: %v", err)
return nil, err
return nil, 0, err
}
for _, pendingClose := range pendingCloseChannels {
result := make(pendingForceClose, 0)
limboBalance := int64(0)
for _, pendingClose := range channels {
// First construct the channel struct itself, this will be
// needed regardless of how this channel was closed.
pub := pendingClose.RemotePub.SerializeCompressed()
@ -3313,7 +3315,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
rpcsLog.Errorf("unable to load forwarding "+
"packages for channel:%s, %v",
historical.ShortChannelID, err)
return nil, err
return nil, 0, err
}
channel.NumForwardingPackages = int64(len(fwdPkgs))
@ -3324,7 +3326,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
// If the error is non-nil, and not due to older versions of lnd
// not persisting historical channels, return it.
default:
return nil, err
return nil, 0, err
}
closeTXID := pendingClose.ClosingTXID.String()
@ -3359,36 +3361,44 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
&chanPoint, currentHeight, forceClose,
)
if err != nil {
return nil, err
return nil, 0, err
}
err = r.arbitratorPopulateForceCloseResp(
&chanPoint, currentHeight, forceClose,
)
if err != nil {
return nil, err
return nil, 0, err
}
resp.TotalLimboBalance += forceClose.LimboBalance
resp.PendingForceClosingChannels = append(
resp.PendingForceClosingChannels,
forceClose,
)
limboBalance += forceClose.LimboBalance
result = append(result, forceClose)
}
}
return result, limboBalance, nil
}
// fetchWaitingCloseChannels queries the database for a list of channels
// that have their closing transactions broadcast but not confirmed yet.
// The returned result is used in the response of the PendingChannels RPC.
func (r *rpcServer) fetchWaitingCloseChannels() (waitingCloseChannels,
int64, error) {
// We'll also fetch all channels that are open, but have had their
// commitment broadcasted, meaning they are waiting for the closing
// transaction to confirm.
waitingCloseChans, err := r.server.chanStateDB.FetchWaitingCloseChannels()
channels, err := r.server.chanStateDB.FetchWaitingCloseChannels()
if err != nil {
rpcsLog.Errorf("unable to fetch channels waiting close: %v",
err)
return nil, err
return nil, 0, err
}
for _, waitingClose := range waitingCloseChans {
result := make(waitingCloseChannels, 0)
limboBalance := int64(0)
for _, waitingClose := range channels {
pub := waitingClose.IdentityPub.SerializeCompressed()
chanPoint := waitingClose.FundingOutpoint
@ -3426,7 +3436,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
// An unexpected error occurred.
case err != nil:
return nil, err
return nil, 0, err
// There is a pending remote commit. Set its hash in the
// response.
@ -3443,7 +3453,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
rpcsLog.Errorf("unable to load forwarding packages "+
"for channel:%s, %v",
waitingClose.ShortChannelID, err)
return nil, err
return nil, 0, err
}
channel := &lnrpc.PendingChannelsResponse_PendingChannel{
@ -3468,13 +3478,53 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
// A close tx has been broadcasted, all our balance will be in
// limbo until it confirms.
resp.WaitingCloseChannels = append(
resp.WaitingCloseChannels, waitingCloseResp,
)
resp.TotalLimboBalance += channel.LocalBalance
result = append(result, waitingCloseResp)
limboBalance += channel.LocalBalance
}
return result, limboBalance, nil
}
// PendingChannels returns a list of all the channels that are currently
// considered "pending". A channel is pending if it has finished the funding
// workflow and is waiting for confirmations for the funding txn, or is in the
// process of closure, either initiated cooperatively or non-cooperatively.
func (r *rpcServer) PendingChannels(ctx context.Context,
in *lnrpc.PendingChannelsRequest) (
*lnrpc.PendingChannelsResponse, error) {
rpcsLog.Debugf("[pendingchannels]")
resp := &lnrpc.PendingChannelsResponse{}
// First, we find all the channels that will soon be opened.
pendingOpenChannels, err := r.fetchPendingOpenChannels()
if err != nil {
return nil, err
}
resp.PendingOpenChannels = pendingOpenChannels
// Second, we fetch all channels that considered pending force closing.
// This means the channels here have their closing transactions
// confirmed but not considered fully resolved yet. For instance, they
// may have a second level HTLCs to be resolved onchain.
pendingCloseChannels, limbo, err := r.fetchPendingForceCloseChannels()
if err != nil {
return nil, err
}
resp.PendingForceClosingChannels = pendingCloseChannels
resp.TotalLimboBalance = limbo
// Third, we fetch all channels that are open, but have had their
// commitment broadcasted, meaning they are waiting for the closing
// transaction to confirm.
waitingCloseChannels, limbo, err := r.fetchWaitingCloseChannels()
if err != nil {
return nil, err
}
resp.WaitingCloseChannels = waitingCloseChannels
resp.TotalLimboBalance += limbo
return resp, nil
}
@ -7422,3 +7472,13 @@ func (r *rpcServer) SubscribeCustomMessages(req *lnrpc.SubscribeCustomMessagesRe
}
}
}
// rpcInitiator returns the correct lnrpc initiator for channels where we have
// a record of the opening channel.
func rpcInitiator(isInitiator bool) lnrpc.Initiator {
if isInitiator {
return lnrpc.Initiator_INITIATOR_LOCAL
}
return lnrpc.Initiator_INITIATOR_REMOTE
}