From 3c1c640612244abd1be5a5c867b3880731995ef6 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 7 Jan 2022 16:38:51 +0800 Subject: [PATCH] rpcserver: split PendingChannels into three sections This commit refactors the method PendingChannels for maintenance purpose. --- rpcserver.go | 158 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 109 insertions(+), 49 deletions(-) diff --git a/rpcserver.go b/rpcserver.go index 717f35ba3..5b27117e7 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3190,38 +3190,27 @@ func (r *rpcServer) ChannelBalance(ctx context.Context, }, nil } -// PendingChannels returns a list of all the channels that are currently -// considered "pending". A channel is pending if it has finished the funding -// workflow and is waiting for confirmations for the funding txn, or is in the -// process of closure, either initiated cooperatively or non-cooperatively. -func (r *rpcServer) PendingChannels(ctx context.Context, - in *lnrpc.PendingChannelsRequest) (*lnrpc.PendingChannelsResponse, error) { - - rpcsLog.Debugf("[pendingchannels]") - - resp := &lnrpc.PendingChannelsResponse{} - - // rpcInitiator returns the correct lnrpc initiator for channels where - // we have a record of the opening channel. - rpcInitiator := func(isInitiator bool) lnrpc.Initiator { - if isInitiator { - return lnrpc.Initiator_INITIATOR_LOCAL - } - - return lnrpc.Initiator_INITIATOR_REMOTE - } +type ( + pendingOpenChannels []*lnrpc.PendingChannelsResponse_PendingOpenChannel + pendingForceClose []*lnrpc.PendingChannelsResponse_ForceClosedChannel + waitingCloseChannels []*lnrpc.PendingChannelsResponse_WaitingCloseChannel +) +// fetchPendingOpenChannels queries the database for a list of channels that +// have pending open state. The returned result is used in the response of the +// PendingChannels RPC. +func (r *rpcServer) fetchPendingOpenChannels() (pendingOpenChannels, error) { // First, we'll populate the response with all the channels that are // soon to be opened. We can easily fetch this data from the database // and map the db struct to the proto response. - pendingOpenChannels, err := r.server.chanStateDB.FetchPendingChannels() + channels, err := r.server.chanStateDB.FetchPendingChannels() if err != nil { rpcsLog.Errorf("unable to fetch pending channels: %v", err) return nil, err } - resp.PendingOpenChannels = make([]*lnrpc.PendingChannelsResponse_PendingOpenChannel, - len(pendingOpenChannels)) - for i, pendingChan := range pendingOpenChannels { + + result := make(pendingOpenChannels, len(channels)) + for i, pendingChan := range channels { pub := pendingChan.IdentityPub.SerializeCompressed() // As this is required for display purposes, we'll calculate @@ -3236,7 +3225,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context, commitBaseWeight := blockchain.GetTransactionWeight(utx) commitWeight := commitBaseWeight + input.WitnessCommitmentTxWeight - resp.PendingOpenChannels[i] = &lnrpc.PendingChannelsResponse_PendingOpenChannel{ + result[i] = &lnrpc.PendingChannelsResponse_PendingOpenChannel{ Channel: &lnrpc.PendingChannelsResponse_PendingChannel{ RemoteNodePub: hex.EncodeToString(pub), ChannelPoint: pendingChan.FundingOutpoint.String(), @@ -3255,19 +3244,32 @@ func (r *rpcServer) PendingChannels(ctx context.Context, } } + return result, nil +} + +// fetchPendingForceCloseChannels queries the database for a list of channels +// that have their closing transactions confirmed but not fully resolved yet. +// The returned result is used in the response of the PendingChannels RPC. +func (r *rpcServer) fetchPendingForceCloseChannels() (pendingForceClose, + int64, error) { + _, currentHeight, err := r.server.cc.ChainIO.GetBestBlock() if err != nil { - return nil, err + return nil, 0, err } // Next, we'll examine the channels that are soon to be closed so we // can populate these fields within the response. - pendingCloseChannels, err := r.server.chanStateDB.FetchClosedChannels(true) + channels, err := r.server.chanStateDB.FetchClosedChannels(true) if err != nil { rpcsLog.Errorf("unable to fetch closed channels: %v", err) - return nil, err + return nil, 0, err } - for _, pendingClose := range pendingCloseChannels { + + result := make(pendingForceClose, 0) + limboBalance := int64(0) + + for _, pendingClose := range channels { // First construct the channel struct itself, this will be // needed regardless of how this channel was closed. pub := pendingClose.RemotePub.SerializeCompressed() @@ -3313,7 +3315,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context, rpcsLog.Errorf("unable to load forwarding "+ "packages for channel:%s, %v", historical.ShortChannelID, err) - return nil, err + return nil, 0, err } channel.NumForwardingPackages = int64(len(fwdPkgs)) @@ -3324,7 +3326,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context, // If the error is non-nil, and not due to older versions of lnd // not persisting historical channels, return it. default: - return nil, err + return nil, 0, err } closeTXID := pendingClose.ClosingTXID.String() @@ -3359,36 +3361,44 @@ func (r *rpcServer) PendingChannels(ctx context.Context, &chanPoint, currentHeight, forceClose, ) if err != nil { - return nil, err + return nil, 0, err } err = r.arbitratorPopulateForceCloseResp( &chanPoint, currentHeight, forceClose, ) if err != nil { - return nil, err + return nil, 0, err } - resp.TotalLimboBalance += forceClose.LimboBalance - - resp.PendingForceClosingChannels = append( - resp.PendingForceClosingChannels, - forceClose, - ) + limboBalance += forceClose.LimboBalance + result = append(result, forceClose) } } + return result, limboBalance, nil +} + +// fetchWaitingCloseChannels queries the database for a list of channels +// that have their closing transactions broadcast but not confirmed yet. +// The returned result is used in the response of the PendingChannels RPC. +func (r *rpcServer) fetchWaitingCloseChannels() (waitingCloseChannels, + int64, error) { + // We'll also fetch all channels that are open, but have had their // commitment broadcasted, meaning they are waiting for the closing // transaction to confirm. - waitingCloseChans, err := r.server.chanStateDB.FetchWaitingCloseChannels() + channels, err := r.server.chanStateDB.FetchWaitingCloseChannels() if err != nil { rpcsLog.Errorf("unable to fetch channels waiting close: %v", err) - return nil, err + return nil, 0, err } - for _, waitingClose := range waitingCloseChans { + result := make(waitingCloseChannels, 0) + limboBalance := int64(0) + + for _, waitingClose := range channels { pub := waitingClose.IdentityPub.SerializeCompressed() chanPoint := waitingClose.FundingOutpoint @@ -3426,7 +3436,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context, // An unexpected error occurred. case err != nil: - return nil, err + return nil, 0, err // There is a pending remote commit. Set its hash in the // response. @@ -3443,7 +3453,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context, rpcsLog.Errorf("unable to load forwarding packages "+ "for channel:%s, %v", waitingClose.ShortChannelID, err) - return nil, err + return nil, 0, err } channel := &lnrpc.PendingChannelsResponse_PendingChannel{ @@ -3468,13 +3478,53 @@ func (r *rpcServer) PendingChannels(ctx context.Context, // A close tx has been broadcasted, all our balance will be in // limbo until it confirms. - resp.WaitingCloseChannels = append( - resp.WaitingCloseChannels, waitingCloseResp, - ) - - resp.TotalLimboBalance += channel.LocalBalance + result = append(result, waitingCloseResp) + limboBalance += channel.LocalBalance } + return result, limboBalance, nil +} + +// PendingChannels returns a list of all the channels that are currently +// considered "pending". A channel is pending if it has finished the funding +// workflow and is waiting for confirmations for the funding txn, or is in the +// process of closure, either initiated cooperatively or non-cooperatively. +func (r *rpcServer) PendingChannels(ctx context.Context, + in *lnrpc.PendingChannelsRequest) ( + *lnrpc.PendingChannelsResponse, error) { + + rpcsLog.Debugf("[pendingchannels]") + + resp := &lnrpc.PendingChannelsResponse{} + + // First, we find all the channels that will soon be opened. + pendingOpenChannels, err := r.fetchPendingOpenChannels() + if err != nil { + return nil, err + } + resp.PendingOpenChannels = pendingOpenChannels + + // Second, we fetch all channels that considered pending force closing. + // This means the channels here have their closing transactions + // confirmed but not considered fully resolved yet. For instance, they + // may have a second level HTLCs to be resolved onchain. + pendingCloseChannels, limbo, err := r.fetchPendingForceCloseChannels() + if err != nil { + return nil, err + } + resp.PendingForceClosingChannels = pendingCloseChannels + resp.TotalLimboBalance = limbo + + // Third, we fetch all channels that are open, but have had their + // commitment broadcasted, meaning they are waiting for the closing + // transaction to confirm. + waitingCloseChannels, limbo, err := r.fetchWaitingCloseChannels() + if err != nil { + return nil, err + } + resp.WaitingCloseChannels = waitingCloseChannels + resp.TotalLimboBalance += limbo + return resp, nil } @@ -7422,3 +7472,13 @@ func (r *rpcServer) SubscribeCustomMessages(req *lnrpc.SubscribeCustomMessagesRe } } } + +// rpcInitiator returns the correct lnrpc initiator for channels where we have +// a record of the opening channel. +func rpcInitiator(isInitiator bool) lnrpc.Initiator { + if isInitiator { + return lnrpc.Initiator_INITIATOR_LOCAL + } + + return lnrpc.Initiator_INITIATOR_REMOTE +}