Merge pull request #1809 from wpaulino/autopilot-balance-update

autopilot/agent: use updateBalance rather than tracking balance explicitly
This commit is contained in:
Olaoluwa Osuntokun 2018-09-03 19:18:03 -07:00 committed by GitHub
commit 99a5fd9672
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 35 additions and 39 deletions

View File

@ -154,13 +154,8 @@ func (a *Agent) Start() error {
log.Infof("Autopilot Agent starting") log.Infof("Autopilot Agent starting")
startingBalance, err := a.cfg.WalletBalance()
if err != nil {
return err
}
a.wg.Add(1) a.wg.Add(1)
go a.controller(startingBalance) go a.controller()
return nil return nil
} }
@ -183,7 +178,6 @@ func (a *Agent) Stop() error {
// balanceUpdate is a type of external state update that reflects an // balanceUpdate is a type of external state update that reflects an
// increase/decrease in the funds currently available to the wallet. // increase/decrease in the funds currently available to the wallet.
type balanceUpdate struct { type balanceUpdate struct {
balanceDelta btcutil.Amount
} }
// nodeUpdates is a type of external state update that reflects an addition or // nodeUpdates is a type of external state update that reflects an addition or
@ -214,13 +208,13 @@ type chanCloseUpdate struct {
// OnBalanceChange is a callback that should be executed each time the balance of // OnBalanceChange is a callback that should be executed each time the balance of
// the backing wallet changes. // the backing wallet changes.
func (a *Agent) OnBalanceChange(delta btcutil.Amount) { func (a *Agent) OnBalanceChange() {
a.wg.Add(1) a.wg.Add(1)
go func() { go func() {
defer a.wg.Done() defer a.wg.Done()
select { select {
case a.stateUpdates <- &balanceUpdate{balanceDelta: delta}: case a.stateUpdates <- &balanceUpdate{}:
case <-a.quit: case <-a.quit:
} }
}() }()
@ -342,12 +336,12 @@ func mergeChanState(pendingChans map[NodeID]Channel,
// and external state changes as a result of decisions it makes w.r.t channel // and external state changes as a result of decisions it makes w.r.t channel
// allocation, or attributes affecting its control loop being updated by the // allocation, or attributes affecting its control loop being updated by the
// backing Lightning Node. // backing Lightning Node.
func (a *Agent) controller(startingBalance btcutil.Amount) { func (a *Agent) controller() {
defer a.wg.Done() defer a.wg.Done()
// We'll start off by assigning our starting balance, and injecting // We'll start off by assigning our starting balance, and injecting
// that amount as an initial wake up to the main controller goroutine. // that amount as an initial wake up to the main controller goroutine.
a.OnBalanceChange(startingBalance) a.OnBalanceChange()
// TODO(roasbeef): do we in fact need to maintain order? // TODO(roasbeef): do we in fact need to maintain order?
// * use sync.Cond if so // * use sync.Cond if so
@ -388,15 +382,16 @@ func (a *Agent) controller(startingBalance btcutil.Amount) {
// up an additional channel, or splice in funds to an // up an additional channel, or splice in funds to an
// existing one. // existing one.
case *balanceUpdate: case *balanceUpdate:
log.Debugf("Applying external balance state "+ log.Debug("Applying external balance state " +
"update of: %v", update.balanceDelta) "update")
a.totalBalance += update.balanceDelta updateBalance()
// The channel we tried to open previously failed for // The channel we tried to open previously failed for
// whatever reason. // whatever reason.
case *chanOpenFailureUpdate: case *chanOpenFailureUpdate:
log.Debug("Retrying after previous channel open failure.") log.Debug("Retrying after previous channel " +
"open failure.")
updateBalance() updateBalance()

View File

@ -527,7 +527,8 @@ func TestAgentBalanceUpdate(t *testing.T) {
memGraph, _, _ := newMemChanGraph() memGraph, _, _ := newMemChanGraph()
// The wallet will start with 2 BTC available. // The wallet will start with 2 BTC available.
const walletBalance = btcutil.SatoshiPerBitcoin * 2 var walletBalanceMtx sync.Mutex
walletBalance := btcutil.Amount(btcutil.SatoshiPerBitcoin * 2)
// With the dependencies we created, we can now create the initial // With the dependencies we created, we can now create the initial
// agent itself. // agent itself.
@ -536,6 +537,8 @@ func TestAgentBalanceUpdate(t *testing.T) {
Heuristic: heuristic, Heuristic: heuristic,
ChanController: chanController, ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
walletBalanceMtx.Lock()
defer walletBalanceMtx.Unlock()
return walletBalance, nil return walletBalance, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
@ -583,8 +586,11 @@ func TestAgentBalanceUpdate(t *testing.T) {
// Next we'll send a new balance update signal to the agent, adding 5 // Next we'll send a new balance update signal to the agent, adding 5
// BTC to the amount of available funds. // BTC to the amount of available funds.
const balanceDelta = btcutil.SatoshiPerBitcoin * 5 walletBalanceMtx.Lock()
agent.OnBalanceChange(balanceDelta) walletBalance += btcutil.SatoshiPerBitcoin * 5
walletBalanceMtx.Unlock()
agent.OnBalanceChange()
wg = sync.WaitGroup{} wg = sync.WaitGroup{}
@ -597,11 +603,10 @@ func TestAgentBalanceUpdate(t *testing.T) {
// At this point, the local state of the agent should // At this point, the local state of the agent should
// have also been updated to reflect that the LN node // have also been updated to reflect that the LN node
// now has an additional 5BTC available. // now has an additional 5BTC available.
const expectedAmt = walletBalance + balanceDelta if agent.totalBalance != walletBalance {
if agent.totalBalance != expectedAmt {
t.Fatalf("expected %v wallet balance "+ t.Fatalf("expected %v wallet balance "+
"instead have %v", agent.totalBalance, "instead have %v", agent.totalBalance,
expectedAmt) walletBalance)
} }
// With all of our assertions passed, we'll signal the // With all of our assertions passed, we'll signal the
@ -932,7 +937,8 @@ func TestAgentPendingChannelState(t *testing.T) {
memGraph, _, _ := newMemChanGraph() memGraph, _, _ := newMemChanGraph()
// The wallet will start with 6 BTC available. // The wallet will start with 6 BTC available.
const walletBalance = btcutil.SatoshiPerBitcoin * 6 var walletBalanceMtx sync.Mutex
walletBalance := btcutil.Amount(btcutil.SatoshiPerBitcoin * 6)
// With the dependencies we created, we can now create the initial // With the dependencies we created, we can now create the initial
// agent itself. // agent itself.
@ -941,6 +947,9 @@ func TestAgentPendingChannelState(t *testing.T) {
Heuristic: heuristic, Heuristic: heuristic,
ChanController: chanController, ChanController: chanController,
WalletBalance: func() (btcutil.Amount, error) { WalletBalance: func() (btcutil.Amount, error) {
walletBalanceMtx.Lock()
defer walletBalanceMtx.Unlock()
return walletBalance, nil return walletBalance, nil
}, },
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
@ -1039,7 +1048,11 @@ func TestAgentPendingChannelState(t *testing.T) {
// Now, in order to test that the pending state was properly updated, // Now, in order to test that the pending state was properly updated,
// we'll trigger a balance update in order to trigger a query to the // we'll trigger a balance update in order to trigger a query to the
// heuristic. // heuristic.
agent.OnBalanceChange(0.4 * btcutil.SatoshiPerBitcoin) walletBalanceMtx.Lock()
walletBalance += 0.4 * btcutil.SatoshiPerBitcoin
walletBalanceMtx.Unlock()
agent.OnBalanceChange()
// The heuristic should be queried, and the argument for the set of // The heuristic should be queried, and the argument for the set of
// channels passed in should include the pending channels that // channels passed in should include the pending channels that

View File

@ -55,18 +55,6 @@ func (c *chanController) OpenChannel(target *btcec.PublicKey,
updateStream, errChan := c.server.OpenChannel(req) updateStream, errChan := c.server.OpenChannel(req)
select { select {
case err := <-errChan: case err := <-errChan:
// If we were not able to actually open a channel to the peer
// for whatever reason, then we'll disconnect from the peer to
// ensure we don't accumulate a bunch of unnecessary
// connections.
if err != nil {
dcErr := c.server.DisconnectPeer(target)
if dcErr != nil {
atplLog.Errorf("Unable to disconnect from peer %v",
target.SerializeCompressed())
}
}
return err return err
case <-updateStream: case <-updateStream:
return nil return nil
@ -172,8 +160,8 @@ func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error)
// If we weren't able to establish a connection at all, // If we weren't able to establish a connection at all,
// then we'll error out. // then we'll error out.
if !connected { if !connected {
return false, fmt.Errorf("unable to connect "+ return false, errors.New("exhausted all " +
"to %x", target.SerializeCompressed()) "advertised addresses")
} }
return false, nil return false, nil
@ -224,8 +212,8 @@ func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error)
for { for {
select { select {
case txnUpdate := <-txnSubscription.ConfirmedTransactions(): case <-txnSubscription.ConfirmedTransactions():
pilot.OnBalanceChange(txnUpdate.Value) pilot.OnBalanceChange()
case <-svr.quit: case <-svr.quit:
return return
} }