lnrpc+sweep: make sure public interface takes public types as params

This commit exports and renames the following variable names:
- `PendingInput` is now `PendingInputResponse` as it's responding to a
  request.
- `pendingInput` is now renamed and exported as `SweeperInput`.
- `pendingInputs` is now renamed and exported as `InputsMap`.

This commit is first made from running:
```
gofmt -d -w -r 'PendingInput -> PendingInputResponse' .
gofmt -d -w -r 'pendingInput -> SweeperInput' .
gofmt -d -w -r 'pendingInputs -> InputsMap' .
```
And followed by some docs and variable names fixes.
This commit is contained in:
yyforyongyu
2024-03-18 03:20:37 +08:00
parent 9e7d4b7e0b
commit 28df2d7327
9 changed files with 207 additions and 206 deletions

View File

@@ -179,10 +179,10 @@ type RBFInfo struct {
Fee btcutil.Amount
}
// pendingInput is created when an input reaches the main loop for the first
// SweeperInput is created when an input reaches the main loop for the first
// time. It wraps the input and tracks all relevant state that is needed for
// sweeping.
type pendingInput struct {
type SweeperInput struct {
input.Input
// state tracks the current state of the input.
@@ -212,20 +212,20 @@ type pendingInput struct {
}
// String returns a human readable interpretation of the pending input.
func (p *pendingInput) String() string {
func (p *SweeperInput) String() string {
return fmt.Sprintf("%v (%v)", p.Input.OutPoint(), p.Input.WitnessType())
}
// parameters returns the sweep parameters for this input.
//
// NOTE: Part of the txInput interface.
func (p *pendingInput) parameters() Params {
func (p *SweeperInput) parameters() Params {
return p.params
}
// terminated returns a boolean indicating whether the input has reached a
// final state.
func (p *pendingInput) terminated() bool {
func (p *SweeperInput) terminated() bool {
switch p.state {
// If the input has reached a final state, that it's either
// been swept, or failed, or excluded, we will remove it from
@@ -238,20 +238,20 @@ func (p *pendingInput) terminated() bool {
}
}
// pendingInputs is a type alias for a set of pending inputs.
type pendingInputs = map[wire.OutPoint]*pendingInput
// InputsMap is a type alias for a set of pending inputs.
type InputsMap = map[wire.OutPoint]*SweeperInput
// pendingSweepsReq is an internal message we'll use to represent an external
// caller's intent to retrieve all of the pending inputs the UtxoSweeper is
// attempting to sweep.
type pendingSweepsReq struct {
respChan chan map[wire.OutPoint]*PendingInput
respChan chan map[wire.OutPoint]*PendingInputResponse
errChan chan error
}
// PendingInput contains information about an input that is currently being
// swept by the UtxoSweeper.
type PendingInput struct {
// PendingInputResponse contains information about an input that is currently
// being swept by the UtxoSweeper.
type PendingInputResponse struct {
// OutPoint is the identify outpoint of the input being swept.
OutPoint wire.OutPoint
@@ -309,7 +309,7 @@ type UtxoSweeper struct {
// inputs is the total set of inputs the UtxoSweeper has been requested
// to sweep.
inputs pendingInputs
inputs InputsMap
currentOutputScript []byte
@@ -408,7 +408,7 @@ func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
updateReqs: make(chan *updateReq),
pendingSweepsReqs: make(chan *pendingSweepsReq),
quit: make(chan struct{}),
inputs: make(pendingInputs),
inputs: make(InputsMap),
bumpResultChan: make(chan *BumpResult, 100),
}
}
@@ -768,7 +768,7 @@ func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
// signalResult notifies the listeners of the final result of the input sweep.
// It also cancels any pending spend notification.
func (s *UtxoSweeper) signalResult(pi *pendingInput, result Result) {
func (s *UtxoSweeper) signalResult(pi *SweeperInput, result Result) {
op := pi.OutPoint()
listeners := pi.listeners
@@ -1012,8 +1012,10 @@ func (s *UtxoSweeper) monitorSpend(outpoint wire.OutPoint,
// PendingInputs returns the set of inputs that the UtxoSweeper is currently
// attempting to sweep.
func (s *UtxoSweeper) PendingInputs() (map[wire.OutPoint]*PendingInput, error) {
respChan := make(chan map[wire.OutPoint]*PendingInput, 1)
func (s *UtxoSweeper) PendingInputs() (
map[wire.OutPoint]*PendingInputResponse, error) {
respChan := make(chan map[wire.OutPoint]*PendingInputResponse, 1)
errChan := make(chan error, 1)
select {
case s.pendingSweepsReqs <- &pendingSweepsReq{
@@ -1037,26 +1039,26 @@ func (s *UtxoSweeper) PendingInputs() (map[wire.OutPoint]*PendingInput, error) {
// handlePendingSweepsReq handles a request to retrieve all pending inputs the
// UtxoSweeper is attempting to sweep.
func (s *UtxoSweeper) handlePendingSweepsReq(
req *pendingSweepsReq) map[wire.OutPoint]*PendingInput {
req *pendingSweepsReq) map[wire.OutPoint]*PendingInputResponse {
pendingInputs := make(map[wire.OutPoint]*PendingInput, len(s.inputs))
for _, pendingInput := range s.inputs {
resps := make(map[wire.OutPoint]*PendingInputResponse, len(s.inputs))
for _, inp := range s.inputs {
// Only the exported fields are set, as we expect the response
// to only be consumed externally.
op := *pendingInput.OutPoint()
pendingInputs[op] = &PendingInput{
op := *inp.OutPoint()
resps[op] = &PendingInputResponse{
OutPoint: op,
WitnessType: pendingInput.WitnessType(),
WitnessType: inp.WitnessType(),
Amount: btcutil.Amount(
pendingInput.SignDesc().Output.Value,
inp.SignDesc().Output.Value,
),
LastFeeRate: pendingInput.lastFeeRate,
BroadcastAttempts: pendingInput.publishAttempts,
Params: pendingInput.params,
LastFeeRate: inp.lastFeeRate,
BroadcastAttempts: inp.publishAttempts,
Params: inp.params,
}
}
return pendingInputs
return resps
}
// UpdateParams allows updating the sweep parameters of a pending input in the
@@ -1117,30 +1119,30 @@ func (s *UtxoSweeper) handleUpdateReq(req *updateReq) (
// batched with others which also have a similar fee rate, creating a
// higher fee rate transaction that replaces the original input's
// sweeping transaction.
pendingInput, ok := s.inputs[req.input]
sweeperInput, ok := s.inputs[req.input]
if !ok {
return nil, lnwallet.ErrNotMine
}
// Create the updated parameters struct. Leave the exclusive group
// unchanged.
newParams := pendingInput.params
newParams := sweeperInput.params
newParams.Fee = req.params.Fee
newParams.Force = req.params.Force
log.Debugf("Updating parameters for %v(state=%v) from (%v) to (%v)",
req.input, pendingInput.state, pendingInput.params, newParams)
req.input, sweeperInput.state, sweeperInput.params, newParams)
pendingInput.params = newParams
sweeperInput.params = newParams
// We need to reset the state so this input will be attempted again by
// our sweeper.
//
// TODO(yy): a dedicated state?
pendingInput.state = Init
sweeperInput.state = Init
resultChan := make(chan Result, 1)
pendingInput.listeners = append(pendingInput.listeners, resultChan)
sweeperInput.listeners = append(sweeperInput.listeners, resultChan)
return resultChan, nil
}
@@ -1229,7 +1231,7 @@ func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) {
// Create a new pendingInput and initialize the listeners slice with
// the passed in result channel. If this input is offered for sweep
// again, the result channel will be appended to this slice.
pi = &pendingInput{
pi = &SweeperInput{
state: state,
listeners: []chan Result{input.resultChan},
Input: input.input,
@@ -1294,8 +1296,7 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
// If the tx is not found in the store, it means it's not broadcast by
// us, hence we can't find the fee info. This is fine as, later on when
// this tx is confirmed, we will remove the input from our
// pendingInputs.
// this tx is confirmed, we will remove the input from our inputs.
if errors.Is(err, ErrTxNotFound) {
log.Warnf("Spending tx %v not found in sweeper store", txid)
return Published, fn.None[RBFInfo]()
@@ -1322,7 +1323,7 @@ func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
// handleExistingInput processes an input that is already known to the sweeper.
// It will overwrite the params of the old input with the new ones.
func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage,
oldInput *pendingInput) {
oldInput *SweeperInput) {
// Before updating the input details, check if an exclusive group was
// set. In case the same input is registered again without an exclusive
@@ -1412,9 +1413,9 @@ func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) {
outpoint := txIn.PreviousOutPoint
// Check if this input is known to us. It could probably be
// unknown if we canceled the registration, deleted from
// pendingInputs but the ntfn was in-flight already. Or this
// could be not one of our inputs.
// unknown if we canceled the registration, deleted from inputs
// map but the ntfn was in-flight already. Or this could be not
// one of our inputs.
input, ok := s.inputs[outpoint]
if !ok {
// It's very likely that a spending tx contains inputs
@@ -1460,7 +1461,7 @@ func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) {
// markInputFailed marks the given input as failed and won't be retried. It
// will also notify all the subscribers of this input.
func (s *UtxoSweeper) markInputFailed(pi *pendingInput, err error) {
func (s *UtxoSweeper) markInputFailed(pi *SweeperInput, err error) {
log.Errorf("Failed to sweep input: %v, error: %v", pi, err)
pi.state = Failed
@@ -1476,16 +1477,16 @@ func (s *UtxoSweeper) markInputFailed(pi *pendingInput, err error) {
// updateSweeperInputs updates the sweeper's internal state and returns a map
// of inputs to be swept. It will remove the inputs that are in final states,
// and returns a map of inputs that have either state Init or PublishFailed.
func (s *UtxoSweeper) updateSweeperInputs() pendingInputs {
func (s *UtxoSweeper) updateSweeperInputs() InputsMap {
// Create a map of inputs to be swept.
inputs := make(pendingInputs)
inputs := make(InputsMap)
// Iterate the pending inputs and update the sweeper's state.
//
// TODO(yy): sweeper is made to communicate via go channels, so no
// locks are needed to access the map. However, it'd be safer if we
// turn this pendingInputs into a SyncMap in case we wanna add
// concurrent access to the map in the future.
// turn this inputs map into a SyncMap in case we wanna add concurrent
// access to the map in the future.
for op, input := range s.inputs {
// If the input has reached a final state, that it's either
// been swept, or failed, or excluded, we will remove it from
@@ -1524,7 +1525,7 @@ func (s *UtxoSweeper) updateSweeperInputs() pendingInputs {
// sweepPendingInputs is called when the ticker fires. It will create clusters
// and attempt to create and publish the sweeping transactions.
func (s *UtxoSweeper) sweepPendingInputs(inputs pendingInputs) {
func (s *UtxoSweeper) sweepPendingInputs(inputs InputsMap) {
// Cluster all of our inputs based on the specific Aggregator.
sets := s.cfg.Aggregator.ClusterInputs(inputs)