diff --git a/autopilot/agent.go b/autopilot/agent.go index 6d888b356..be6480c4e 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -1,9 +1,13 @@ package autopilot import ( + "bytes" + "fmt" + "math/rand" "net" "sync" "sync/atomic" + "time" "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcutil" @@ -51,11 +55,9 @@ type Config struct { // within the graph. Graph ChannelGraph - // MaxPendingOpens is the maximum number of pending channel - // establishment goroutines that can be lingering. We cap this value in - // order to control the level of parallelism caused by the autopilot - // agent. - MaxPendingOpens uint16 + // Constraints is the set of constraints the autopilot must adhere to + // when opening channels. + Constraints *HeuristicConstraints // TODO(roasbeef): add additional signals from fee rates and revenue of // currently opened channels @@ -145,6 +147,22 @@ type Agent struct { // when the agent receives external balance update signals. totalBalance btcutil.Amount + // failedNodes lists nodes that we've previously attempted to initiate + // channels with, but didn't succeed. + failedNodes map[NodeID]struct{} + + // pendingConns tracks the nodes that we are attempting to make + // connections to. This prevents us from making duplicate connection + // requests to the same node. + pendingConns map[NodeID]struct{} + + // pendingOpens tracks the channels that we've requested to be + // initiated, but haven't yet been confirmed as being fully opened. + // This state is required as otherwise, we may go over our allotted + // channel limit, or open multiple channels to the same node. + pendingOpens map[NodeID]Channel + pendingMtx sync.Mutex + quit chan struct{} wg sync.WaitGroup } @@ -163,6 +181,9 @@ func New(cfg Config, initialState []Channel) (*Agent, error) { nodeUpdates: make(chan *nodeUpdates, 1), chanOpenFailures: make(chan *chanOpenFailureUpdate, 1), pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1), + failedNodes: make(map[NodeID]struct{}), + pendingConns: make(map[NodeID]struct{}), + pendingOpens: make(map[NodeID]Channel), } for _, c := range initialState { @@ -179,6 +200,7 @@ func (a *Agent) Start() error { return nil } + rand.Seed(time.Now().Unix()) log.Infof("Autopilot Agent starting") a.wg.Add(1) @@ -344,6 +366,73 @@ func mergeChanState(pendingChans map[NodeID]Channel, return totalChans } +// weightedChoice draws a random index from the map of channel candidates, with +// a probability propotional to their score. +func weightedChoice(s map[NodeID]*AttachmentDirective) (NodeID, error) { + // Calculate the sum of scores found in the map. + var sum float64 + for _, v := range s { + sum += v.Score + } + + if sum <= 0 { + return NodeID{}, fmt.Errorf("non-positive sum") + } + + // Create a map of normalized scores such, that they sum to 1.0. + norm := make(map[NodeID]float64) + for k, v := range s { + norm[k] = v.Score / sum + } + + // Pick a random number in the range [0.0, 1.0), and iterate the map + // until the number goes below 0. This means that each index is picked + // with a probablity equal to their normalized score. + // + // Example: + // Items with scores [1, 5, 2, 2] + // Normalized scores [0.1, 0.5, 0.2, 0.2] + // Imagine they each occupy a "range" equal to their normalized score + // in [0, 1.0]: + // [|-0.1-||-----0.5-----||--0.2--||--0.2--|] + // The following loop is now equivalent to "hitting" the intervals. + r := rand.Float64() + for k, v := range norm { + r -= v + if r <= 0 { + return k, nil + } + } + return NodeID{}, fmt.Errorf("no choice made") +} + +// chooseN picks at random min[n, len(s)] nodes if from the +// AttachmentDirectives map, with a probability weighted by their score. +func chooseN(n int, s map[NodeID]*AttachmentDirective) ( + map[NodeID]*AttachmentDirective, error) { + + // Keep a map of nodes not yet choosen. + rem := make(map[NodeID]*AttachmentDirective) + for k, v := range s { + rem[k] = v + } + + // Pick a weighted choice from the remaining nodes as long as there are + // nodes left, and we haven't already picked n. + chosen := make(map[NodeID]*AttachmentDirective) + for len(chosen) < n && len(rem) > 0 { + choice, err := weightedChoice(rem) + if err != nil { + return nil, err + } + + chosen[choice] = rem[choice] + delete(rem, choice) + } + + return chosen, nil +} + // controller implements the closed-loop control system of the Agent. The // controller will make a decision w.r.t channel placement within the graph // based on: its current internal state of the set of active channels open, @@ -359,23 +448,6 @@ func (a *Agent) controller() { // TODO(roasbeef): do we in fact need to maintain order? // * use sync.Cond if so - - // failedNodes lists nodes that we've previously attempted to initiate - // channels with, but didn't succeed. - failedNodes := make(map[NodeID]struct{}) - - // pendingConns tracks the nodes that we are attempting to make - // connections to. This prevents us from making duplicate connection - // requests to the same node. - pendingConns := make(map[NodeID]struct{}) - - // pendingOpens tracks the channels that we've requested to be - // initiated, but haven't yet been confirmed as being fully opened. - // This state is required as otherwise, we may go over our allotted - // channel limit, or open multiple channels to the same node. - pendingOpens := make(map[NodeID]Channel) - var pendingMtx sync.Mutex - updateBalance := func() { newBalance, err := a.cfg.WalletBalance() if err != nil { @@ -407,9 +479,9 @@ func (a *Agent) controller() { newChan := update.newChan a.chanState[newChan.ChanID] = newChan - pendingMtx.Lock() - delete(pendingOpens, newChan.Node) - pendingMtx.Unlock() + a.pendingMtx.Lock() + delete(a.pendingOpens, newChan.Node) + a.pendingMtx.Unlock() updateBalance() // A channel has been closed, this may free up an @@ -460,17 +532,17 @@ func (a *Agent) controller() { return } - pendingMtx.Lock() - log.Debugf("Pending channels: %v", spew.Sdump(pendingOpens)) - pendingMtx.Unlock() + a.pendingMtx.Lock() + log.Debugf("Pending channels: %v", spew.Sdump(a.pendingOpens)) + a.pendingMtx.Unlock() // With all the updates applied, we'll obtain a set of the // current active channels (confirmed channels), and also // factor in our set of unconfirmed channels. confirmedChans := a.chanState - pendingMtx.Lock() - totalChans := mergeChanState(pendingOpens, confirmedChans) - pendingMtx.Unlock() + a.pendingMtx.Lock() + totalChans := mergeChanState(a.pendingOpens, confirmedChans) + a.pendingMtx.Unlock() // Now that we've updated our internal state, we'll consult our // channel attachment heuristic to determine if we should open @@ -485,211 +557,230 @@ func (a *Agent) controller() { log.Infof("Triggering attachment directive dispatch, "+ "total_funds=%v", a.totalBalance) - // We're to attempt an attachment so we'll obtain the set of - // nodes that we currently have channels with so we avoid - // duplicate edges. - connectedNodes := a.chanState.ConnectedNodes() - pendingMtx.Lock() - nodesToSkip := mergeNodeMaps(pendingOpens, - pendingConns, connectedNodes, failedNodes, - ) - pendingMtx.Unlock() - - // If we reach this point, then according to our heuristic we - // should modify our channel state to tend towards what it - // determines to the optimal state. So we'll call Select to get - // a fresh batch of attachment directives, passing in the - // amount of funds available for us to use. - chanCandidates, err := a.cfg.Heuristic.Select( - a.cfg.Self, a.cfg.Graph, availableFunds, - numChans, nodesToSkip, - ) + err := a.openChans(availableFunds, numChans, totalChans) if err != nil { - log.Errorf("Unable to select candidates for "+ - "attachment: %v", err) - continue + log.Errorf("Unable to open channels: %v", err) } - - if len(chanCandidates) == 0 { - log.Infof("No eligible candidates to connect to") - continue - } - - log.Infof("Attempting to execute channel attachment "+ - "directives: %v", spew.Sdump(chanCandidates)) - - // Before proceeding, check to see if we have any slots - // available to open channels. If there are any, we will attempt - // to dispatch the retrieved directives since we can't be - // certain which ones may actually succeed. If too many - // connections succeed, we will they will be ignored and made - // available to future heuristic selections. - pendingMtx.Lock() - if uint16(len(pendingOpens)) >= a.cfg.MaxPendingOpens { - pendingMtx.Unlock() - log.Debugf("Reached cap of %v pending "+ - "channel opens, will retry "+ - "after success/failure", - a.cfg.MaxPendingOpens) - continue - } - - // For each recommended attachment directive, we'll launch a - // new goroutine to attempt to carry out the directive. If any - // of these succeed, then we'll receive a new state update, - // taking us back to the top of our controller loop. - for _, chanCandidate := range chanCandidates { - // Skip candidates which we are already trying - // to establish a connection with. - nodeID := chanCandidate.NodeID - if _, ok := pendingConns[nodeID]; ok { - continue - } - pendingConns[nodeID] = struct{}{} - - go func(directive AttachmentDirective) { - // We'll start out by attempting to connect to - // the peer in order to begin the funding - // workflow. - pub := directive.NodeKey - alreadyConnected, err := a.cfg.ConnectToPeer( - pub, directive.Addrs, - ) - if err != nil { - log.Warnf("Unable to connect "+ - "to %x: %v", - pub.SerializeCompressed(), - err) - - // Since we failed to connect to them, - // we'll mark them as failed so that we - // don't attempt to connect to them - // again. - nodeID := NewNodeID(pub) - pendingMtx.Lock() - delete(pendingConns, nodeID) - failedNodes[nodeID] = struct{}{} - pendingMtx.Unlock() - - // Finally, we'll trigger the agent to - // select new peers to connect to. - a.OnChannelOpenFailure() - - return - } - - // The connection was successful, though before - // progressing we must check that we have not - // already met our quota for max pending open - // channels. This can happen if multiple - // directives were spawned but fewer slots were - // available, and other successful attempts - // finished first. - pendingMtx.Lock() - if uint16(len(pendingOpens)) >= - a.cfg.MaxPendingOpens { - // Since we've reached our max number of - // pending opens, we'll disconnect this - // peer and exit. However, if we were - // previously connected to them, then - // we'll make sure to maintain the - // connection alive. - if alreadyConnected { - // Since we succeeded in - // connecting, we won't add this - // peer to the failed nodes map, - // but we will remove it from - // pendingConns so that it can - // be retried in the future. - delete(pendingConns, nodeID) - pendingMtx.Unlock() - return - } - - err = a.cfg.DisconnectPeer( - pub, - ) - if err != nil { - log.Warnf("Unable to "+ - "disconnect peer "+ - "%x: %v", - pub.SerializeCompressed(), - err) - } - - // Now that we have disconnected, we can - // remove this node from our pending - // conns map, permitting subsequent - // connection attempts. - delete(pendingConns, nodeID) - pendingMtx.Unlock() - return - } - - // If we were successful, we'll track this peer - // in our set of pending opens. We do this here - // to ensure we don't stall on selecting new - // peers if the connection attempt happens to - // take too long. - nodeID := directive.NodeID - delete(pendingConns, nodeID) - pendingOpens[nodeID] = Channel{ - Capacity: directive.ChanAmt, - Node: nodeID, - } - pendingMtx.Unlock() - - // We can then begin the funding workflow with - // this peer. - err = a.cfg.ChanController.OpenChannel( - pub, directive.ChanAmt, - ) - if err != nil { - log.Warnf("Unable to open "+ - "channel to %x of %v: %v", - pub.SerializeCompressed(), - directive.ChanAmt, err) - - // As the attempt failed, we'll clear - // the peer from the set of pending - // opens and mark them as failed so we - // don't attempt to open a channel to - // them again. - pendingMtx.Lock() - delete(pendingOpens, nodeID) - failedNodes[nodeID] = struct{}{} - pendingMtx.Unlock() - - // Trigger the agent to re-evaluate - // everything and possibly retry with a - // different node. - a.OnChannelOpenFailure() - - // Finally, we should also disconnect - // the peer if we weren't already - // connected to them beforehand by an - // external subsystem. - if alreadyConnected { - return - } - - err = a.cfg.DisconnectPeer(pub) - if err != nil { - log.Warnf("Unable to "+ - "disconnect peer "+ - "%x: %v", - pub.SerializeCompressed(), - err) - } - } - - // Since the channel open was successful and is - // currently pending, we'll trigger the - // autopilot agent to query for more peers. - a.OnChannelPendingOpen() - }(chanCandidate) - } - pendingMtx.Unlock() - } } + +// openChans queries the agent's heuristic for a set of channel candidates, and +// attempts to open channels to them. +func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32, + totalChans []Channel) error { + + // We're to attempt an attachment so we'll obtain the set of + // nodes that we currently have channels with so we avoid + // duplicate edges. + connectedNodes := a.chanState.ConnectedNodes() + a.pendingMtx.Lock() + nodesToSkip := mergeNodeMaps(a.pendingOpens, + a.pendingConns, connectedNodes, a.failedNodes, + ) + a.pendingMtx.Unlock() + + // Gather the set of all nodes in the graph, except those we + // want to skip. + selfPubBytes := a.cfg.Self.SerializeCompressed() + nodes := make(map[NodeID]struct{}) + if err := a.cfg.Graph.ForEachNode(func(node Node) error { + nID := NodeID(node.PubKey()) + + // If we come across ourselves, them we'll continue in + // order to avoid attempting to make a channel with + // ourselves. + if bytes.Equal(nID[:], selfPubBytes) { + return nil + } + + // Additionally, if this node is in the blacklist, then + // we'll skip it. + if _, ok := nodesToSkip[nID]; ok { + return nil + } + + nodes[nID] = struct{}{} + return nil + }); err != nil { + return fmt.Errorf("unable to get graph nodes: %v", err) + } + + // Use the heuristic to calculate a score for each node in the + // graph. + scores, err := a.cfg.Heuristic.NodeScores( + a.cfg.Graph, totalChans, availableFunds, nodes, + ) + if err != nil { + return fmt.Errorf("unable to calculate node scores : %v", err) + } + + log.Debugf("Got scores for %d nodes", len(scores)) + + // Now use the score to make a weighted choice which + // nodes to attempt to open channels to. + chanCandidates, err := chooseN(int(numChans), scores) + if err != nil { + return fmt.Errorf("Unable to make weighted choice: %v", + err) + } + + if len(chanCandidates) == 0 { + log.Infof("No eligible candidates to connect to") + return nil + } + + log.Infof("Attempting to execute channel attachment "+ + "directives: %v", spew.Sdump(chanCandidates)) + + // Before proceeding, check to see if we have any slots + // available to open channels. If there are any, we will attempt + // to dispatch the retrieved directives since we can't be + // certain which ones may actually succeed. If too many + // connections succeed, we will they will be ignored and made + // available to future heuristic selections. + a.pendingMtx.Lock() + defer a.pendingMtx.Unlock() + if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens { + log.Debugf("Reached cap of %v pending "+ + "channel opens, will retry "+ + "after success/failure", + a.cfg.Constraints.MaxPendingOpens) + return nil + } + + // For each recommended attachment directive, we'll launch a + // new goroutine to attempt to carry out the directive. If any + // of these succeed, then we'll receive a new state update, + // taking us back to the top of our controller loop. + for _, chanCandidate := range chanCandidates { + // Skip candidates which we are already trying + // to establish a connection with. + nodeID := chanCandidate.NodeID + if _, ok := a.pendingConns[nodeID]; ok { + continue + } + a.pendingConns[nodeID] = struct{}{} + + a.wg.Add(1) + go a.executeDirective(*chanCandidate) + } + return nil +} + +// executeDirective attempts to connect to the channel candidate specified by +// the given attachment directive, and open a channel of the given size. +// +// NOTE: MUST be run as a goroutine. +func (a *Agent) executeDirective(directive AttachmentDirective) { + defer a.wg.Done() + + // We'll start out by attempting to connect to the peer in order to + // begin the funding workflow. + nodeID := directive.NodeID + pub, err := btcec.ParsePubKey(nodeID[:], btcec.S256()) + if err != nil { + log.Errorf("Unable to parse pubkey %x: %v", nodeID, err) + return + } + + alreadyConnected, err := a.cfg.ConnectToPeer(pub, directive.Addrs) + if err != nil { + log.Warnf("Unable to connect to %x: %v", + pub.SerializeCompressed(), err) + + // Since we failed to connect to them, we'll mark them as + // failed so that we don't attempt to connect to them again. + a.pendingMtx.Lock() + delete(a.pendingConns, nodeID) + a.failedNodes[nodeID] = struct{}{} + a.pendingMtx.Unlock() + + // Finally, we'll trigger the agent to select new peers to + // connect to. + a.OnChannelOpenFailure() + + return + } + + // The connection was successful, though before progressing we must + // check that we have not already met our quota for max pending open + // channels. This can happen if multiple directives were spawned but + // fewer slots were available, and other successful attempts finished + // first. + a.pendingMtx.Lock() + if uint16(len(a.pendingOpens)) >= + a.cfg.Constraints.MaxPendingOpens { + // Since we've reached our max number of pending opens, we'll + // disconnect this peer and exit. However, if we were + // previously connected to them, then we'll make sure to + // maintain the connection alive. + if alreadyConnected { + // Since we succeeded in connecting, we won't add this + // peer to the failed nodes map, but we will remove it + // from a.pendingConns so that it can be retried in the + // future. + delete(a.pendingConns, nodeID) + a.pendingMtx.Unlock() + return + } + + err = a.cfg.DisconnectPeer(pub) + if err != nil { + log.Warnf("Unable to disconnect peer %x: %v", + pub.SerializeCompressed(), err) + } + + // Now that we have disconnected, we can remove this node from + // our pending conns map, permitting subsequent connection + // attempts. + delete(a.pendingConns, nodeID) + a.pendingMtx.Unlock() + return + } + + // If we were successful, we'll track this peer in our set of pending + // opens. We do this here to ensure we don't stall on selecting new + // peers if the connection attempt happens to take too long. + delete(a.pendingConns, nodeID) + a.pendingOpens[nodeID] = Channel{ + Capacity: directive.ChanAmt, + Node: nodeID, + } + a.pendingMtx.Unlock() + + // We can then begin the funding workflow with this peer. + err = a.cfg.ChanController.OpenChannel(pub, directive.ChanAmt) + if err != nil { + log.Warnf("Unable to open channel to %x of %v: %v", + pub.SerializeCompressed(), directive.ChanAmt, err) + + // As the attempt failed, we'll clear the peer from the set of + // pending opens and mark them as failed so we don't attempt to + // open a channel to them again. + a.pendingMtx.Lock() + delete(a.pendingOpens, nodeID) + a.failedNodes[nodeID] = struct{}{} + a.pendingMtx.Unlock() + + // Trigger the agent to re-evaluate everything and possibly + // retry with a different node. + a.OnChannelOpenFailure() + + // Finally, we should also disconnect the peer if we weren't + // already connected to them beforehand by an external + // subsystem. + if alreadyConnected { + return + } + + err = a.cfg.DisconnectPeer(pub) + if err != nil { + log.Warnf("Unable to disconnect peer %x: %v", + pub.SerializeCompressed(), err) + } + } + + // Since the channel open was successful and is currently pending, + // we'll trigger the autopilot agent to query for more peers. + a.OnChannelPendingOpen() +} diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index 16099eeb3..e3aee2642 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -29,8 +29,8 @@ type mockHeuristic struct { moreChansResps chan moreChansResp moreChanArgs chan moreChanArg - directiveResps chan []AttachmentDirective - directiveArgs chan directiveArg + nodeScoresResps chan map[NodeID]*AttachmentDirective + nodeScoresArgs chan directiveArg quit chan struct{} } @@ -60,33 +60,33 @@ func (m *mockHeuristic) NeedMoreChans(chans []Channel, } type directiveArg struct { - self *btcec.PublicKey graph ChannelGraph amt btcutil.Amount - skip map[NodeID]struct{} + chans []Channel + nodes map[NodeID]struct{} } -func (m *mockHeuristic) Select(self *btcec.PublicKey, graph ChannelGraph, - amtToUse btcutil.Amount, numChans uint32, - skipChans map[NodeID]struct{}) ([]AttachmentDirective, error) { +func (m *mockHeuristic) NodeScores(g ChannelGraph, chans []Channel, + fundsAvailable btcutil.Amount, nodes map[NodeID]struct{}) ( + map[NodeID]*AttachmentDirective, error) { - if m.directiveArgs != nil { + if m.nodeScoresArgs != nil { directive := directiveArg{ - self: self, - graph: graph, - amt: amtToUse, - skip: skipChans, + graph: g, + amt: fundsAvailable, + chans: chans, + nodes: nodes, } select { - case m.directiveArgs <- directive: + case m.nodeScoresArgs <- directive: case <-m.quit: return nil, errors.New("exiting") } } select { - case resp := <-m.directiveResps: + case resp := <-m.nodeScoresResps: return resp, nil case <-m.quit: return nil, errors.New("exiting") @@ -144,8 +144,8 @@ func TestAgentChannelOpenSignal(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent, 10), @@ -167,8 +167,10 @@ func TestAgentChannelOpenSignal(t *testing.T) { DisconnectPeer: func(*btcec.PublicKey) error { return nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } initialChans := []Channel{} agent, err := New(testCfg, initialChans) @@ -224,7 +226,7 @@ func TestAgentChannelOpenSignal(t *testing.T) { // If this send success, then Select was erroneously called and the // test should be failed. - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: t.Fatalf("Select was called but shouldn't have been") // This is the correct path as Select should've be called. @@ -267,8 +269,8 @@ func TestAgentChannelFailureSignal(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockFailingChanController{} memGraph, _, _ := newMemChanGraph() @@ -288,8 +290,10 @@ func TestAgentChannelFailureSignal(t *testing.T) { DisconnectPeer: func(*btcec.PublicKey) error { return nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } initialChans := []Channel{} @@ -320,8 +324,7 @@ func TestAgentChannelFailureSignal(t *testing.T) { // At this point, the agent should now be querying the heuristic to // request attachment directives, return a fake so the agent will // attempt to open a channel. - var fakeDirective = AttachmentDirective{ - NodeKey: self, + var fakeDirective = &AttachmentDirective{ NodeID: NewNodeID(self), ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -329,10 +332,13 @@ func TestAgentChannelFailureSignal(t *testing.T) { IP: bytes.Repeat([]byte("a"), 16), }, }, + Score: 0.5, } select { - case heuristic.directiveResps <- []AttachmentDirective{fakeDirective}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{ + NewNodeID(self): fakeDirective, + }: case <-time.After(time.Second * 10): t.Fatal("heuristic wasn't queried in time") } @@ -347,7 +353,7 @@ func TestAgentChannelFailureSignal(t *testing.T) { } select { - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: case <-time.After(time.Second * 10): t.Fatal("heuristic wasn't queried in time") } @@ -366,8 +372,8 @@ func TestAgentChannelCloseSignal(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -389,8 +395,10 @@ func TestAgentChannelCloseSignal(t *testing.T) { DisconnectPeer: func(*btcec.PublicKey) error { return nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } // We'll start the agent with two channels already being active. @@ -453,7 +461,7 @@ func TestAgentChannelCloseSignal(t *testing.T) { // If this send success, then Select was erroneously called and the // test should be failed. - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: t.Fatalf("Select was called but shouldn't have been") // This is the correct path as Select should've be called. @@ -474,8 +482,8 @@ func TestAgentBalanceUpdate(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -503,8 +511,10 @@ func TestAgentBalanceUpdate(t *testing.T) { DisconnectPeer: func(*btcec.PublicKey) error { return nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } initialChans := []Channel{} agent, err := New(testCfg, initialChans) @@ -562,7 +572,7 @@ func TestAgentBalanceUpdate(t *testing.T) { // If this send success, then Select was erroneously called and the // test should be failed. - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: t.Fatalf("Select was called but shouldn't have been") // This is the correct path as Select should've be called. @@ -582,8 +592,8 @@ func TestAgentImmediateAttach(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -608,8 +618,10 @@ func TestAgentImmediateAttach(t *testing.T) { DisconnectPeer: func(*btcec.PublicKey) error { return nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } initialChans := []Channel{} agent, err := New(testCfg, initialChans) @@ -650,7 +662,7 @@ func TestAgentImmediateAttach(t *testing.T) { // At this point, the agent should now be querying the heuristic to // requests attachment directives. We'll generate 5 mock directives so // it can progress within its loop. - directives := make([]AttachmentDirective, numChans) + directives := make(map[NodeID]*AttachmentDirective) nodeKeys := make(map[NodeID]struct{}) for i := 0; i < numChans; i++ { pub, err := randKey() @@ -658,8 +670,7 @@ func TestAgentImmediateAttach(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } nodeID := NewNodeID(pub) - directives[i] = AttachmentDirective{ - NodeKey: pub, + directives[nodeID] = &AttachmentDirective{ NodeID: nodeID, ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -667,6 +678,7 @@ func TestAgentImmediateAttach(t *testing.T) { IP: bytes.Repeat([]byte("a"), 16), }, }, + Score: 0.5, } nodeKeys[nodeID] = struct{}{} } @@ -674,7 +686,7 @@ func TestAgentImmediateAttach(t *testing.T) { // With our fake directives created, we'll now send then to the agent // as a return value for the Select function. select { - case heuristic.directiveResps <- directives: + case heuristic.nodeScoresResps <- directives: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } @@ -696,6 +708,7 @@ func TestAgentImmediateAttach(t *testing.T) { nodeID) } delete(nodeKeys, nodeID) + case <-time.After(time.Second * 10): t.Fatalf("channel not opened in time") } @@ -714,8 +727,8 @@ func TestAgentPrivateChannels(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } // The chanController should be initialized such that all of its open // channel requests are for private channels. @@ -743,8 +756,10 @@ func TestAgentPrivateChannels(t *testing.T) { DisconnectPeer: func(*btcec.PublicKey) error { return nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } agent, err := New(cfg, nil) if err != nil { @@ -783,14 +798,13 @@ func TestAgentPrivateChannels(t *testing.T) { // At this point, the agent should now be querying the heuristic to // requests attachment directives. We'll generate 5 mock directives so // it can progress within its loop. - directives := make([]AttachmentDirective, numChans) + directives := make(map[NodeID]*AttachmentDirective) for i := 0; i < numChans; i++ { pub, err := randKey() if err != nil { t.Fatalf("unable to generate key: %v", err) } - directives[i] = AttachmentDirective{ - NodeKey: pub, + directives[NewNodeID(pub)] = &AttachmentDirective{ NodeID: NewNodeID(pub), ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -798,13 +812,14 @@ func TestAgentPrivateChannels(t *testing.T) { IP: bytes.Repeat([]byte("a"), 16), }, }, + Score: 0.5, } } // With our fake directives created, we'll now send then to the agent // as a return value for the Select function. select { - case heuristic.directiveResps <- directives: + case heuristic.nodeScoresResps <- directives: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } @@ -836,8 +851,8 @@ func TestAgentPendingChannelState(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -866,8 +881,10 @@ func TestAgentPendingChannelState(t *testing.T) { DisconnectPeer: func(*btcec.PublicKey) error { return nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } initialChans := []Channel{} agent, err := New(testCfg, initialChans) @@ -910,8 +927,7 @@ func TestAgentPendingChannelState(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } nodeID := NewNodeID(nodeKey) - nodeDirective := AttachmentDirective{ - NodeKey: nodeKey, + nodeDirective := &AttachmentDirective{ NodeID: nodeID, ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -919,14 +935,18 @@ func TestAgentPendingChannelState(t *testing.T) { IP: bytes.Repeat([]byte("a"), 16), }, }, + Score: 0.5, } + select { - case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{ + nodeID: nodeDirective, + }: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } - heuristic.directiveArgs = make(chan directiveArg) + heuristic.nodeScoresArgs = make(chan directiveArg) // A request to open the channel should've also been sent. select { @@ -989,12 +1009,12 @@ func TestAgentPendingChannelState(t *testing.T) { // Select method. The arguments passed should reflect the fact that the // node we have a pending channel to, should be ignored. select { - case req := <-heuristic.directiveArgs: - if len(req.skip) == 0 { + case req := <-heuristic.nodeScoresArgs: + if len(req.chans) == 0 { t.Fatalf("expected to skip %v nodes, instead "+ - "skipping %v", 1, len(req.skip)) + "skipping %v", 1, len(req.chans)) } - if _, ok := req.skip[nodeID]; !ok { + if req.chans[0].Node != nodeID { t.Fatalf("pending node not included in skip arguments") } case <-time.After(time.Second * 10): @@ -1015,8 +1035,8 @@ func TestAgentPendingOpenChannel(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -1035,8 +1055,10 @@ func TestAgentPendingOpenChannel(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return walletBalance, nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } agent, err := New(cfg, nil) if err != nil { @@ -1077,7 +1099,7 @@ func TestAgentPendingOpenChannel(t *testing.T) { // There shouldn't be a call to the Select method as we've returned // "false" for NeedMoreChans above. select { - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: t.Fatalf("Select was called but shouldn't have been") default: } @@ -1098,8 +1120,8 @@ func TestAgentOnNodeUpdates(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -1118,8 +1140,10 @@ func TestAgentOnNodeUpdates(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return walletBalance, nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } agent, err := New(cfg, nil) if err != nil { @@ -1153,7 +1177,7 @@ func TestAgentOnNodeUpdates(t *testing.T) { // Send over an empty list of attachment directives, which should cause // the agent to return to waiting on a new signal. select { - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: case <-time.After(time.Second * 10): t.Fatalf("Select was not called but should have been") } @@ -1179,7 +1203,7 @@ func TestAgentOnNodeUpdates(t *testing.T) { // It's not important that this list is also empty, so long as the node // updates signal is causing the agent to make this attempt. select { - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: case <-time.After(time.Second * 10): t.Fatalf("Select was not called but should have been") } @@ -1201,8 +1225,8 @@ func TestAgentSkipPendingConns(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -1213,6 +1237,7 @@ func TestAgentSkipPendingConns(t *testing.T) { const walletBalance = btcutil.SatoshiPerBitcoin * 6 connect := make(chan chan error) + quit := make(chan struct{}) // With the dependencies we created, we can now create the initial // agent itself. @@ -1225,15 +1250,27 @@ func TestAgentSkipPendingConns(t *testing.T) { }, ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { errChan := make(chan error) - connect <- errChan - err := <-errChan - return false, err + + select { + case connect <- errChan: + case <-quit: + return false, errors.New("quit") + } + + select { + case err := <-errChan: + return false, err + case <-quit: + return false, errors.New("quit") + } }, DisconnectPeer: func(*btcec.PublicKey) error { return nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } initialChans := []Channel{} agent, err := New(testCfg, initialChans) @@ -1252,6 +1289,11 @@ func TestAgentSkipPendingConns(t *testing.T) { } defer agent.Stop() + // We must defer the closing of quit after the defer agent.Stop(), to + // make sure ConnectToPeer won't block preventing the agent from + // exiting. + defer close(quit) + // We'll send an initial "yes" response to advance the agent past its // initial check. This will cause it to try to get directives from the // graph. @@ -1272,8 +1314,7 @@ func TestAgentSkipPendingConns(t *testing.T) { if err != nil { t.Fatalf("unable to generate key: %v", err) } - nodeDirective := AttachmentDirective{ - NodeKey: nodeKey, + nodeDirective := &AttachmentDirective{ NodeID: NewNodeID(nodeKey), ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -1281,9 +1322,13 @@ func TestAgentSkipPendingConns(t *testing.T) { IP: bytes.Repeat([]byte("a"), 16), }, }, + Score: 0.5, } + select { - case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{ + NewNodeID(nodeKey): nodeDirective, + }: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } @@ -1311,7 +1356,9 @@ func TestAgentSkipPendingConns(t *testing.T) { // Send a directive for the same node, which already has a pending conn. select { - case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{ + NewNodeID(nodeKey): nodeDirective, + }: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } @@ -1348,7 +1395,9 @@ func TestAgentSkipPendingConns(t *testing.T) { // Send a directive for the same node, which already has a pending conn. select { - case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{ + NewNodeID(nodeKey): nodeDirective, + }: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } diff --git a/autopilot/heuristic_constraints.go b/autopilot/heuristic_constraints.go new file mode 100644 index 000000000..916f85071 --- /dev/null +++ b/autopilot/heuristic_constraints.go @@ -0,0 +1,76 @@ +package autopilot + +import ( + "github.com/btcsuite/btcutil" +) + +// HeuristicConstraints is a struct that indicate the constraints an autopilot +// heuristic must adhere to when opening channels. +type HeuristicConstraints struct { + // MinChanSize is the smallest channel that the autopilot agent should + // create. + MinChanSize btcutil.Amount + + // MaxChanSize the largest channel that the autopilot agent should + // create. + MaxChanSize btcutil.Amount + + // ChanLimit the maximum number of channels that should be created. + ChanLimit uint16 + + // Allocation the percentage of total funds that should be committed to + // automatic channel establishment. + Allocation float64 + + // MaxPendingOpens is the maximum number of pending channel + // establishment goroutines that can be lingering. We cap this value in + // order to control the level of parallelism caused by the autopilot + // agent. + MaxPendingOpens uint16 +} + +// availableChans returns the funds and number of channels slots the autopilot +// has available towards new channels, and still be within the set constraints. +func (h *HeuristicConstraints) availableChans(channels []Channel, + funds btcutil.Amount) (btcutil.Amount, uint32) { + + // If we're already over our maximum allowed number of channels, then + // we'll instruct the controller not to create any more channels. + if len(channels) >= int(h.ChanLimit) { + return 0, 0 + } + + // The number of additional channels that should be opened is the + // difference between the channel limit, and the number of channels we + // already have open. + numAdditionalChans := uint32(h.ChanLimit) - uint32(len(channels)) + + // First, we'll tally up the total amount of funds that are currently + // present within the set of active channels. + var totalChanAllocation btcutil.Amount + for _, channel := range channels { + totalChanAllocation += channel.Capacity + } + + // With this value known, we'll now compute the total amount of fund + // allocated across regular utxo's and channel utxo's. + totalFunds := funds + totalChanAllocation + + // Once the total amount has been computed, we then calculate the + // fraction of funds currently allocated to channels. + fundsFraction := float64(totalChanAllocation) / float64(totalFunds) + + // If this fraction is below our threshold, then we'll return true, to + // indicate the controller should call Select to obtain a candidate set + // of channels to attempt to open. + needMore := fundsFraction < h.Allocation + if !needMore { + return 0, 0 + } + + // Now that we know we need more funds, we'll compute the amount of + // additional funds we should allocate towards channels. + targetAllocation := btcutil.Amount(float64(totalFunds) * h.Allocation) + fundsAvailable := targetAllocation - totalChanAllocation + return fundsAvailable, numAdditionalChans +} diff --git a/autopilot/interface.go b/autopilot/interface.go index efa992985..e0d0cb90d 100644 --- a/autopilot/interface.go +++ b/autopilot/interface.go @@ -85,12 +85,10 @@ type ChannelGraph interface { // AttachmentHeuristic. It details to which node a channel should be created // to, and also the parameters which should be used in the channel creation. type AttachmentDirective struct { - // NodeKey is the target node for this attachment directive. It can be - // identified by its public key, and therefore can be used along with - // a ChannelOpener implementation to execute the directive. - NodeKey *btcec.PublicKey - - // NodeID is the serialized compressed pubkey of the target node. + // NodeID is the serialized compressed pubkey of the target node for + // this attachment directive. It can be identified by its public key, + // and therefore can be used along with a ChannelOpener implementation + // to execute the directive. NodeID NodeID // ChanAmt is the size of the channel that should be opened, expressed @@ -100,6 +98,10 @@ type AttachmentDirective struct { // Addrs is a list of addresses that the target peer may be reachable // at. Addrs []net.Addr + + // Score is the score given by the heuristic for opening a channel of + // the given size to this node. + Score float64 } // AttachmentHeuristic is one of the primary interfaces within this package. @@ -119,16 +121,23 @@ type AttachmentHeuristic interface { // ideal state. NeedMoreChans(chans []Channel, balance btcutil.Amount) (btcutil.Amount, uint32, bool) - // Select is a method that given the current state of the channel - // graph, a set of nodes to ignore, and an amount of available funds, - // should return a set of attachment directives which describe which - // additional channels should be opened within the graph to push the - // heuristic back towards its equilibrium state. The numNewChans - // argument represents the additional number of channels that should be - // open. - Select(self *btcec.PublicKey, graph ChannelGraph, - amtToUse btcutil.Amount, numNewChans uint32, - skipNodes map[NodeID]struct{}) ([]AttachmentDirective, error) + // NodeScores is a method that given the current channel graph, current + // set of local channels and funds available, scores the given nodes + // according to the preference of opening a channel with them. The + // returned channel candidates maps the NodeID to an attachemnt + // directive containing a score and a channel size. + // + // The scores will be in the range [0, M], where 0 indicates no + // improvement in connectivity if a channel is opened to this node, + // while M is the maximum possible improvement in connectivity. The + // size of M is up to the implementation of this interface, so scores + // must be normalized if compared against other implementations. + // + // NOTE: A NodeID not found in the returned map is implicitly given a + // score of 0. + NodeScores(g ChannelGraph, chans []Channel, + fundsAvailable btcutil.Amount, nodes map[NodeID]struct{}) ( + map[NodeID]*AttachmentDirective, error) } // ChannelController is a simple interface that allows an auto-pilot agent to diff --git a/autopilot/prefattach.go b/autopilot/prefattach.go index 44c428fe5..d90437b89 100644 --- a/autopilot/prefattach.go +++ b/autopilot/prefattach.go @@ -1,9 +1,8 @@ package autopilot import ( - "bytes" - "fmt" prand "math/rand" + "net" "time" "github.com/btcsuite/btcd/btcec" @@ -22,28 +21,20 @@ import ( // // TODO(roasbeef): BA, with k=-3 type ConstrainedPrefAttachment struct { - minChanSize btcutil.Amount - maxChanSize btcutil.Amount - - chanLimit uint16 - - threshold float64 + constraints *HeuristicConstraints } // NewConstrainedPrefAttachment creates a new instance of a // ConstrainedPrefAttachment heuristics given bounds on allowed channel sizes, // and an allocation amount which is interpreted as a percentage of funds that // is to be committed to channels at all times. -func NewConstrainedPrefAttachment(minChanSize, maxChanSize btcutil.Amount, - chanLimit uint16, allocation float64) *ConstrainedPrefAttachment { +func NewConstrainedPrefAttachment( + cfg *HeuristicConstraints) *ConstrainedPrefAttachment { prand.Seed(time.Now().Unix()) return &ConstrainedPrefAttachment{ - minChanSize: minChanSize, - chanLimit: chanLimit, - maxChanSize: maxChanSize, - threshold: allocation, + constraints: cfg, } } @@ -61,45 +52,11 @@ var _ AttachmentHeuristic = (*ConstrainedPrefAttachment)(nil) func (p *ConstrainedPrefAttachment) NeedMoreChans(channels []Channel, funds btcutil.Amount) (btcutil.Amount, uint32, bool) { - // If we're already over our maximum allowed number of channels, then - // we'll instruct the controller not to create any more channels. - if len(channels) >= int(p.chanLimit) { - return 0, 0, false - } - - // The number of additional channels that should be opened is the - // difference between the channel limit, and the number of channels we - // already have open. - numAdditionalChans := uint32(p.chanLimit) - uint32(len(channels)) - - // First, we'll tally up the total amount of funds that are currently - // present within the set of active channels. - var totalChanAllocation btcutil.Amount - for _, channel := range channels { - totalChanAllocation += channel.Capacity - } - - // With this value known, we'll now compute the total amount of fund - // allocated across regular utxo's and channel utxo's. - totalFunds := funds + totalChanAllocation - - // Once the total amount has been computed, we then calculate the - // fraction of funds currently allocated to channels. - fundsFraction := float64(totalChanAllocation) / float64(totalFunds) - - // If this fraction is below our threshold, then we'll return true, to - // indicate the controller should call Select to obtain a candidate set - // of channels to attempt to open. - needMore := fundsFraction < p.threshold - if !needMore { - return 0, 0, false - } - - // Now that we know we need more funds, we'll compute the amount of - // additional funds we should allocate towards channels. - targetAllocation := btcutil.Amount(float64(totalFunds) * p.threshold) - fundsAvailable := targetAllocation - totalChanAllocation - return fundsAvailable, numAdditionalChans, true + // We'll try to open more channels as long as the constraints allow it. + availableFunds, availableChans := p.constraints.availableChans( + channels, funds, + ) + return availableFunds, availableChans, availableChans > 0 } // NodeID is a simple type that holds an EC public key serialized in compressed @@ -113,189 +70,107 @@ func NewNodeID(pub *btcec.PublicKey) NodeID { return n } -// shuffleCandidates shuffles the set of candidate nodes for preferential -// attachment in order to break any ordering already enforced by the sorted -// order of the public key for each node. To shuffle the set of candidates, we -// use a version of the Fisher–Yates shuffle algorithm. -func shuffleCandidates(candidates []Node) []Node { - shuffledNodes := make([]Node, len(candidates)) - perm := prand.Perm(len(candidates)) - - for i, v := range perm { - shuffledNodes[v] = candidates[i] - } - - return shuffledNodes -} - -// Select returns a candidate set of attachment directives that should be -// executed based on the current internal state, the state of the channel -// graph, the set of nodes we should exclude, and the amount of funds -// available. The heuristic employed by this method is one that attempts to -// promote a scale-free network globally, via local attachment preferences for -// new nodes joining the network with an amount of available funds to be -// allocated to channels. Specifically, we consider the degree of each node -// (and the flow in/out of the node available via its open channels) and -// utilize the Barabási–Albert model to drive our recommended attachment -// heuristics. If implemented globally for each new participant, this results -// in a channel graph that is scale-free and follows a power law distribution -// with k=-3. +// NodeScores is a method that given the current channel graph, current set of +// local channels and funds available, scores the given nodes according the the +// preference of opening a channel with them. +// +// The heuristic employed by this method is one that attempts to promote a +// scale-free network globally, via local attachment preferences for new nodes +// joining the network with an amount of available funds to be allocated to +// channels. Specifically, we consider the degree of each node (and the flow +// in/out of the node available via its open channels) and utilize the +// Barabási–Albert model to drive our recommended attachment heuristics. If +// implemented globally for each new participant, this results in a channel +// graph that is scale-free and follows a power law distribution with k=-3. +// +// The returned scores will be in the range [0.0, 1.0], where higher scores are +// given to nodes already having high connectivity in the graph. // // NOTE: This is a part of the AttachmentHeuristic interface. -func (p *ConstrainedPrefAttachment) Select(self *btcec.PublicKey, g ChannelGraph, - fundsAvailable btcutil.Amount, numNewChans uint32, - skipNodes map[NodeID]struct{}) ([]AttachmentDirective, error) { - - // TODO(roasbeef): rename? - - var directives []AttachmentDirective - - if fundsAvailable < p.minChanSize { - return directives, nil - } - - selfPubBytes := self.SerializeCompressed() - - // We'll continue our attachment loop until we've exhausted the current - // amount of available funds. - visited := make(map[NodeID]struct{}) - for i := uint32(0); i < numNewChans; i++ { - // selectionSlice will be used to randomly select a node - // according to a power law distribution. For each connected - // edge, we'll add an instance of the node to this slice. Thus, - // for a given node, the probability that we'll attach to it - // is: k_i / sum(k_j), where k_i is the degree of the target - // node, and k_j is the degree of all other nodes i != j. This - // implements the classic Barabási–Albert model for - // preferential attachment. - var selectionSlice []Node - - // For each node, and each channel that the node has, we'll add - // an instance of that node to the selection slice above. - // This'll slice where the frequency of each node is equivalent - // to the number of channels that connect to it. - // - // TODO(roasbeef): add noise to make adversarially resistant? - if err := g.ForEachNode(func(node Node) error { - nID := NodeID(node.PubKey()) - - // Once a node has already been attached to, we'll - // ensure that it isn't factored into any further - // decisions within this round. - if _, ok := visited[nID]; ok { - return nil - } - - // If we come across ourselves, them we'll continue in - // order to avoid attempting to make a channel with - // ourselves. - if bytes.Equal(nID[:], selfPubBytes) { - return nil - } - - // Additionally, if this node is in the blacklist, then - // we'll skip it. - if _, ok := skipNodes[nID]; ok { - return nil - } - - // For initial bootstrap purposes, if a node doesn't - // have any channels, then we'll ensure that it has at - // least one item in the selection slice. - // - // TODO(roasbeef): make conditional? - selectionSlice = append(selectionSlice, node) - - // For each active channel the node has, we'll add an - // additional channel to the selection slice to - // increase their weight. - if err := node.ForEachChannel(func(channel ChannelEdge) error { - selectionSlice = append(selectionSlice, node) - return nil - }); err != nil { - return err - } +func (p *ConstrainedPrefAttachment) NodeScores(g ChannelGraph, chans []Channel, + fundsAvailable btcutil.Amount, nodes map[NodeID]struct{}) ( + map[NodeID]*AttachmentDirective, error) { + // Count the number of channels in the graph. We'll also count the + // number of channels as we go for the nodes we are interested in, and + // record their addresses found in the db. + var graphChans int + nodeChanNum := make(map[NodeID]int) + addresses := make(map[NodeID][]net.Addr) + if err := g.ForEachNode(func(n Node) error { + var nodeChans int + err := n.ForEachChannel(func(_ ChannelEdge) error { + nodeChans++ + graphChans++ return nil - }); err != nil { - return nil, err - } - - // If no nodes at all were accumulated, then we'll exit early - // as there are no eligible candidates. - if len(selectionSlice) == 0 { - break - } - - // Given our selection slice, we'll now generate a random index - // into this slice. The node we select will be recommended by - // us to create a channel to. - candidates := shuffleCandidates(selectionSlice) - selectedIndex := prand.Int31n(int32(len(candidates))) - selectedNode := candidates[selectedIndex] - - // TODO(roasbeef): cap on num channels to same participant? - - // With the node selected, we'll add this (node, amount) tuple - // to out set of recommended directives. - pubBytes := selectedNode.PubKey() - pub, err := btcec.ParsePubKey(pubBytes[:], btcec.S256()) - if err != nil { - return nil, err - } - directives = append(directives, AttachmentDirective{ - // TODO(roasbeef): need curve? - NodeKey: &btcec.PublicKey{ - X: pub.X, - Y: pub.Y, - }, - NodeID: NewNodeID(pub), - Addrs: selectedNode.Addrs(), }) - - // With the node selected, we'll add it to the set of visited - // nodes to avoid attaching to it again. - visited[NodeID(pubBytes)] = struct{}{} - } - - numSelectedNodes := int64(len(directives)) - switch { - // If we have enough available funds to distribute the maximum channel - // size for each of the selected peers to attach to, then we'll - // allocate the maximum amount to each peer. - case int64(fundsAvailable) >= numSelectedNodes*int64(p.maxChanSize): - for i := 0; i < int(numSelectedNodes); i++ { - directives[i].ChanAmt = p.maxChanSize + if err != nil { + return err } - return directives, nil - - // Otherwise, we'll greedily allocate our funds to the channels - // successively until we run out of available funds, or can't create a - // channel above the min channel size. - case int64(fundsAvailable) < numSelectedNodes*int64(p.maxChanSize): - i := 0 - for fundsAvailable > p.minChanSize { - // We'll attempt to allocate the max channel size - // initially. If we don't have enough funds to do this, - // then we'll allocate the remainder of the funds - // available to the channel. - delta := p.maxChanSize - if fundsAvailable-delta < 0 { - delta = fundsAvailable - } - - directives[i].ChanAmt = delta - - fundsAvailable -= delta - i++ + // If this node is not among our nodes to score, we can return + // early. + nID := NodeID(n.PubKey()) + if _, ok := nodes[nID]; !ok { + return nil } - // We'll slice the initial set of directives to properly - // reflect the amount of funds we were able to allocate. - return directives[:i:i], nil + // Otherwise we'll record the number of channels, and also + // populate the address in our channel candidates map. + nodeChanNum[nID] = nodeChans + addresses[nID] = n.Addrs() - default: - return nil, fmt.Errorf("err") + return nil + }); err != nil { + return nil, err } + + // If there are no channels in the graph we cannot determine any + // preferences, so we return, indicating all candidates get a score of + // zero. + if graphChans == 0 { + return nil, nil + } + + existingPeers := make(map[NodeID]struct{}) + for _, c := range chans { + existingPeers[c.Node] = struct{}{} + } + + // For each node in the set of nodes, count their fraction of channels + // in the graph, and use that as the score. + candidates := make(map[NodeID]*AttachmentDirective) + for nID, nodeChans := range nodeChanNum { + // As channel size we'll use the maximum channel size available. + chanSize := p.constraints.MaxChanSize + if fundsAvailable-chanSize < 0 { + chanSize = fundsAvailable + } + + _, ok := existingPeers[nID] + + switch { + + // If the node is among or existing channel peers, we don't + // need another channel. + case ok: + continue + + // If the amount is too small, we don't want to attempt opening + // another channel. + case chanSize == 0 || chanSize < p.constraints.MinChanSize: + continue + } + + // Otherwise we score the node according to its fraction of + // channels in the graph. + score := float64(nodeChans) / float64(graphChans) + candidates[nID] = &AttachmentDirective{ + NodeID: nID, + ChanAmt: chanSize, + Score: score, + } + } + + return candidates, nil } diff --git a/autopilot/prefattach_test.go b/autopilot/prefattach_test.go index d34402776..adaee08b7 100644 --- a/autopilot/prefattach_test.go +++ b/autopilot/prefattach_test.go @@ -29,6 +29,13 @@ func TestConstrainedPrefAttachmentNeedMoreChan(t *testing.T) { threshold = 0.5 ) + constraints := &HeuristicConstraints{ + MinChanSize: minChanSize, + MaxChanSize: maxChanSize, + ChanLimit: chanLimit, + Allocation: threshold, + } + randChanID := func() lnwire.ShortChannelID { return lnwire.NewShortChanIDFromInt(uint64(prand.Int63())) } @@ -146,8 +153,7 @@ func TestConstrainedPrefAttachmentNeedMoreChan(t *testing.T) { }, } - prefAttach := NewConstrainedPrefAttachment(minChanSize, maxChanSize, - chanLimit, threshold) + prefAttach := NewConstrainedPrefAttachment(constraints) for i, testCase := range testCases { amtToAllocate, numMore, needMore := prefAttach.NeedMoreChans( @@ -224,10 +230,8 @@ var chanGraphs = []struct { }, } -// TestConstrainedPrefAttachmentSelectEmptyGraph ensures that when passed en -// empty graph, the Select function always detects the state, and returns nil. -// Otherwise, it would be possible for the main Select loop to entire an -// infinite loop. +// TestConstrainedPrefAttachmentSelectEmptyGraph ensures that when passed an +// empty graph, the NodeSores function always returns a score of 0. func TestConstrainedPrefAttachmentSelectEmptyGraph(t *testing.T) { const ( minChanSize = 0 @@ -236,16 +240,25 @@ func TestConstrainedPrefAttachmentSelectEmptyGraph(t *testing.T) { threshold = 0.5 ) - // First, we'll generate a random key that represents "us", and create - // a new instance of the heuristic with our set parameters. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate self key: %v", err) + constraints := &HeuristicConstraints{ + MinChanSize: minChanSize, + MaxChanSize: maxChanSize, + ChanLimit: chanLimit, + Allocation: threshold, + } + + prefAttach := NewConstrainedPrefAttachment(constraints) + + // Create a random public key, which we will query to get a score for. + pub, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + + nodes := map[NodeID]struct{}{ + NewNodeID(pub): {}, } - prefAttach := NewConstrainedPrefAttachment(minChanSize, maxChanSize, - chanLimit, threshold) - skipNodes := make(map[NodeID]struct{}) for _, graph := range chanGraphs { success := t.Run(graph.name, func(t1 *testing.T) { graph, cleanup, err := graph.genFunc() @@ -256,23 +269,21 @@ func TestConstrainedPrefAttachmentSelectEmptyGraph(t *testing.T) { defer cleanup() } - // With the necessary state initialized, we'll not - // attempt to select a set of candidates channel for - // creation given the current state of the graph. + // With the necessary state initialized, we'll now + // attempt to get the score for this one node. const walletFunds = btcutil.SatoshiPerBitcoin - directives, err := prefAttach.Select(self, graph, - walletFunds, 5, skipNodes) + scores, err := prefAttach.NodeScores(graph, nil, + walletFunds, nodes) if err != nil { t1.Fatalf("unable to select attachment "+ "directives: %v", err) } - // We shouldn't have selected any new directives as we - // started with an empty graph. - if len(directives) != 0 { - t1.Fatalf("zero attachment directives "+ - "should have been returned instead %v were", - len(directives)) + // Since the graph is empty, we expect the score to be + // 0, giving an empty return map. + if len(scores) != 0 { + t1.Fatalf("expected empty score map, "+ + "instead got %v ", len(scores)) } }) if !success { @@ -281,9 +292,50 @@ func TestConstrainedPrefAttachmentSelectEmptyGraph(t *testing.T) { } } +// completeGraph is a helper method that adds numNodes fully connected nodes to +// the graph. +func completeGraph(t *testing.T, g testGraph, numNodes int) { + const chanCapacity = btcutil.SatoshiPerBitcoin + nodes := make(map[int]*btcec.PublicKey) + for i := 0; i < numNodes; i++ { + for j := i + 1; j < numNodes; j++ { + + node1 := nodes[i] + node2 := nodes[j] + edge1, edge2, err := g.addRandChannel( + node1, node2, chanCapacity) + if err != nil { + t.Fatalf("unable to generate channel: %v", err) + } + + if node1 == nil { + pubKeyBytes := edge1.Peer.PubKey() + nodes[i], err = btcec.ParsePubKey( + pubKeyBytes[:], btcec.S256(), + ) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", + err) + } + } + + if node2 == nil { + pubKeyBytes := edge2.Peer.PubKey() + nodes[j], err = btcec.ParsePubKey( + pubKeyBytes[:], btcec.S256(), + ) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", + err) + } + } + } + } +} + // TestConstrainedPrefAttachmentSelectTwoVertexes ensures that when passed a -// graph with only two eligible vertexes, then both are selected (without any -// repeats), and the funds are appropriately allocated across each peer. +// graph with only two eligible vertexes, then both are given the same score, +// and the funds are appropriately allocated across each peer. func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) { t.Parallel() @@ -296,7 +348,12 @@ func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) { threshold = 0.5 ) - skipNodes := make(map[NodeID]struct{}) + constraints := &HeuristicConstraints{ + MinChanSize: minChanSize, + MaxChanSize: maxChanSize, + ChanLimit: chanLimit, + Allocation: threshold, + } for _, graph := range chanGraphs { success := t.Run(graph.name, func(t1 *testing.T) { graph, cleanup, err := graph.genFunc() @@ -307,15 +364,7 @@ func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) { defer cleanup() } - // First, we'll generate a random key that represents - // "us", and create a new instance of the heuristic - // with our set parameters. - self, err := randKey() - if err != nil { - t1.Fatalf("unable to generate self key: %v", err) - } - prefAttach := NewConstrainedPrefAttachment(minChanSize, maxChanSize, - chanLimit, threshold) + prefAttach := NewConstrainedPrefAttachment(constraints) // For this set, we'll load the memory graph with two // nodes, and a random channel connecting them. @@ -325,43 +374,67 @@ func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) { t1.Fatalf("unable to generate channel: %v", err) } - // With the necessary state initialized, we'll not - // attempt to select a set of candidates channel for - // creation given the current state of the graph. + // Get the score for all nodes found in the graph at + // this point. + nodes := make(map[NodeID]struct{}) + if err := graph.ForEachNode(func(n Node) error { + nodes[n.PubKey()] = struct{}{} + return nil + }); err != nil { + t1.Fatalf("unable to traverse graph: %v", err) + } + + if len(nodes) != 2 { + t1.Fatalf("expected 2 nodes, found %d", len(nodes)) + } + + // With the necessary state initialized, we'll now + // attempt to get our candidates channel score given + // the current state of the graph. const walletFunds = btcutil.SatoshiPerBitcoin * 10 - directives, err := prefAttach.Select(self, graph, - walletFunds, 2, skipNodes) + candidates, err := prefAttach.NodeScores(graph, nil, + walletFunds, nodes) if err != nil { - t1.Fatalf("unable to select attachment directives: %v", err) + t1.Fatalf("unable to select attachment "+ + "directives: %v", err) } - // Two new directives should have been selected, one - // for each node already present within the graph. - if len(directives) != 2 { - t1.Fatalf("two attachment directives should have been "+ - "returned instead %v were", len(directives)) + if len(candidates) != len(nodes) { + t1.Fatalf("all nodes should be scored, "+ + "instead %v were", len(candidates)) } - // The node attached to should be amongst the two edges + // The candidates should be amongst the two edges // created above. - for _, directive := range directives { + for nodeID, candidate := range candidates { edge1Pub := edge1.Peer.PubKey() edge2Pub := edge2.Peer.PubKey() switch { - case bytes.Equal(directive.NodeKey.SerializeCompressed(), edge1Pub[:]): - case bytes.Equal(directive.NodeKey.SerializeCompressed(), edge2Pub[:]): + case bytes.Equal(nodeID[:], edge1Pub[:]): + case bytes.Equal(nodeID[:], edge2Pub[:]): default: t1.Fatalf("attached to unknown node: %x", - directive.NodeKey.SerializeCompressed()) + nodeID[:]) } // As the number of funds available exceed the // max channel size, both edges should consume // the maximum channel size. - if directive.ChanAmt != maxChanSize { - t1.Fatalf("max channel size should be allocated, "+ - "instead %v was: ", maxChanSize) + if candidate.ChanAmt != maxChanSize { + t1.Fatalf("max channel size should be "+ + "allocated, instead %v was: ", + maxChanSize) + } + + // Since each of the nodes has 1 channel, out + // of only one channel in the graph, we expect + // their score to be 0.5. + expScore := float64(0.5) + if candidate.Score != expScore { + t1.Fatalf("expected candidate score "+ + "to be %v, instead was %v", + expScore, candidate.Score) } } }) @@ -386,7 +459,13 @@ func TestConstrainedPrefAttachmentSelectInsufficientFunds(t *testing.T) { threshold = 0.5 ) - skipNodes := make(map[NodeID]struct{}) + constraints := &HeuristicConstraints{ + MinChanSize: minChanSize, + MaxChanSize: maxChanSize, + ChanLimit: chanLimit, + Allocation: threshold, + } + for _, graph := range chanGraphs { success := t.Run(graph.name, func(t1 *testing.T) { graph, cleanup, err := graph.genFunc() @@ -397,30 +476,36 @@ func TestConstrainedPrefAttachmentSelectInsufficientFunds(t *testing.T) { defer cleanup() } - // First, we'll generate a random key that represents - // "us", and create a new instance of the heuristic - // with our set parameters. - self, err := randKey() - if err != nil { - t1.Fatalf("unable to generate self key: %v", err) - } - prefAttach := NewConstrainedPrefAttachment( - minChanSize, maxChanSize, chanLimit, threshold, - ) + // Add 10 nodes to the graph, with channels between + // them. + completeGraph(t, graph, 10) - // Next, we'll attempt to select a set of candidates, + prefAttach := NewConstrainedPrefAttachment(constraints) + + nodes := make(map[NodeID]struct{}) + if err := graph.ForEachNode(func(n Node) error { + nodes[n.PubKey()] = struct{}{} + return nil + }); err != nil { + t1.Fatalf("unable to traverse graph: %v", err) + } + + // With the necessary state initialized, we'll now + // attempt to get the score for our list of nodes, // passing zero for the amount of wallet funds. This - // should return an empty slice of directives. - directives, err := prefAttach.Select(self, graph, 0, - 0, skipNodes) + // should return an all-zero score set. + scores, err := prefAttach.NodeScores(graph, nil, + 0, nodes) if err != nil { t1.Fatalf("unable to select attachment "+ "directives: %v", err) } - if len(directives) != 0 { - t1.Fatalf("zero attachment directives "+ - "should have been returned instead %v were", - len(directives)) + + // Since all should be given a score of 0, the map + // should be empty. + if len(scores) != 0 { + t1.Fatalf("expected empty score map, "+ + "instead got %v ", len(scores)) } }) if !success { @@ -430,9 +515,8 @@ func TestConstrainedPrefAttachmentSelectInsufficientFunds(t *testing.T) { } // TestConstrainedPrefAttachmentSelectGreedyAllocation tests that if upon -// deciding a set of candidates, we're unable to evenly split our funds, then -// we attempt to greedily allocate all funds to each selected vertex (up to the -// max channel size). +// returning node scores, the NodeScores method will attempt to greedily +// allocate all funds to each vertex (up to the max channel size). func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) { t.Parallel() @@ -445,7 +529,13 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) { threshold = 0.5 ) - skipNodes := make(map[NodeID]struct{}) + constraints := &HeuristicConstraints{ + MinChanSize: minChanSize, + MaxChanSize: maxChanSize, + ChanLimit: chanLimit, + Allocation: threshold, + } + for _, graph := range chanGraphs { success := t.Run(graph.name, func(t1 *testing.T) { graph, cleanup, err := graph.genFunc() @@ -456,16 +546,7 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) { defer cleanup() } - // First, we'll generate a random key that represents - // "us", and create a new instance of the heuristic - // with our set parameters. - self, err := randKey() - if err != nil { - t1.Fatalf("unable to generate self key: %v", err) - } - prefAttach := NewConstrainedPrefAttachment( - minChanSize, maxChanSize, chanLimit, threshold, - ) + prefAttach := NewConstrainedPrefAttachment(constraints) const chanCapacity = btcutil.SatoshiPerBitcoin @@ -494,9 +575,10 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) { // graph, with node node having two edges. numNodes := 0 twoChans := false + nodes := make(map[NodeID]struct{}) if err := graph.ForEachNode(func(n Node) error { numNodes++ - + nodes[n.PubKey()] = struct{}{} numChans := 0 err := n.ForEachChannel(func(c ChannelEdge) error { numChans++ @@ -526,38 +608,61 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) { // result, the heuristic should try to greedily // allocate funds to channels. const availableBalance = btcutil.SatoshiPerBitcoin * 2.5 - directives, err := prefAttach.Select(self, graph, - availableBalance, 5, skipNodes) + scores, err := prefAttach.NodeScores(graph, nil, + availableBalance, nodes) if err != nil { t1.Fatalf("unable to select attachment "+ "directives: %v", err) } - // Three directives should have been returned. - if len(directives) != 3 { - t1.Fatalf("expected 3 directives, instead "+ - "got: %v", len(directives)) + if len(scores) != len(nodes) { + t1.Fatalf("all nodes should be scored, "+ + "instead %v were", len(scores)) } - // The two directive should have the max channel amount - // allocated. - if directives[0].ChanAmt != maxChanSize { - t1.Fatalf("expected recommendation of %v, "+ - "instead got %v", maxChanSize, - directives[0].ChanAmt) - } - if directives[1].ChanAmt != maxChanSize { - t1.Fatalf("expected recommendation of %v, "+ - "instead got %v", maxChanSize, - directives[1].ChanAmt) + // The candidates should have a non-zero score, and + // have the max chan size funds recommended channel + // size. + for _, candidate := range scores { + if candidate.Score == 0 { + t1.Fatalf("Expected non-zero score") + } + + if candidate.ChanAmt != maxChanSize { + t1.Fatalf("expected recommendation "+ + "of %v, instead got %v", + maxChanSize, candidate.ChanAmt) + } } - // The third channel should have been allocated the - // remainder, or 0.5 BTC. - if directives[2].ChanAmt != (btcutil.SatoshiPerBitcoin * 0.5) { - t1.Fatalf("expected recommendation of %v, "+ - "instead got %v", maxChanSize, - directives[2].ChanAmt) + // Imagine a few channels are being opened, and there's + // only 0.5 BTC left. That should leave us with channel + // candidates of that size. + const remBalance = btcutil.SatoshiPerBitcoin * 0.5 + scores, err = prefAttach.NodeScores(graph, nil, + remBalance, nodes) + if err != nil { + t1.Fatalf("unable to select attachment "+ + "directives: %v", err) + } + + if len(scores) != len(nodes) { + t1.Fatalf("all nodes should be scored, "+ + "instead %v were", len(scores)) + } + + // Check that the recommended channel sizes are now the + // remaining channel balance. + for _, candidate := range scores { + if candidate.Score == 0 { + t1.Fatalf("Expected non-zero score") + } + + if candidate.ChanAmt != remBalance { + t1.Fatalf("expected recommendation "+ + "of %v, instead got %v", + remBalance, candidate.ChanAmt) + } } }) if !success { @@ -567,8 +672,8 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) { } // TestConstrainedPrefAttachmentSelectSkipNodes ensures that if a node was -// already select for attachment, then that node is excluded from the set of -// candidate nodes. +// already selected as a channel counterparty, then that node will get a score +// of zero during scoring. func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) { t.Parallel() @@ -581,10 +686,15 @@ func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) { threshold = 0.5 ) + constraints := &HeuristicConstraints{ + MinChanSize: minChanSize, + MaxChanSize: maxChanSize, + ChanLimit: chanLimit, + Allocation: threshold, + } + for _, graph := range chanGraphs { success := t.Run(graph.name, func(t1 *testing.T) { - skipNodes := make(map[NodeID]struct{}) - graph, cleanup, err := graph.genFunc() if err != nil { t1.Fatalf("unable to create graph: %v", err) @@ -593,16 +703,7 @@ func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) { defer cleanup() } - // First, we'll generate a random key that represents - // "us", and create a new instance of the heuristic - // with our set parameters. - self, err := randKey() - if err != nil { - t1.Fatalf("unable to generate self key: %v", err) - } - prefAttach := NewConstrainedPrefAttachment( - minChanSize, maxChanSize, chanLimit, threshold, - ) + prefAttach := NewConstrainedPrefAttachment(constraints) // Next, we'll create a simple topology of two nodes, // with a single channel connecting them. @@ -613,44 +714,74 @@ func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) { t1.Fatalf("unable to create channel: %v", err) } - // With our graph created, we'll now execute the Select - // function to recommend potential attachment - // candidates. + nodes := make(map[NodeID]struct{}) + if err := graph.ForEachNode(func(n Node) error { + nodes[n.PubKey()] = struct{}{} + return nil + }); err != nil { + t1.Fatalf("unable to traverse graph: %v", err) + } + + if len(nodes) != 2 { + t1.Fatalf("expected 2 nodes, found %d", len(nodes)) + } + + // With our graph created, we'll now get the scores for + // all nodes in the graph. const availableBalance = btcutil.SatoshiPerBitcoin * 2.5 - directives, err := prefAttach.Select(self, graph, - availableBalance, 2, skipNodes) + scores, err := prefAttach.NodeScores(graph, nil, + availableBalance, nodes) if err != nil { t1.Fatalf("unable to select attachment "+ "directives: %v", err) } - // As the channel limit is three, and two nodes are - // present in the graph, both should be selected. - if len(directives) != 2 { - t1.Fatalf("expected two directives, instead "+ - "got %v", len(directives)) + if len(scores) != len(nodes) { + t1.Fatalf("all nodes should be scored, "+ + "instead %v were", len(scores)) + } + + // THey should all have a score, and a maxChanSize + // channel size recommendation. + for _, candidate := range scores { + if candidate.Score == 0 { + t1.Fatalf("Expected non-zero score") + } + + if candidate.ChanAmt != maxChanSize { + t1.Fatalf("expected recommendation "+ + "of %v, instead got %v", + maxChanSize, candidate.ChanAmt) + } } // We'll simulate a channel update by adding the nodes - // we just establish channel with the to set of nodes - // to be skipped. - skipNodes[NewNodeID(directives[0].NodeKey)] = struct{}{} - skipNodes[NewNodeID(directives[1].NodeKey)] = struct{}{} + // to our set of channels. + var chans []Channel + for _, candidate := range scores { + chans = append(chans, + Channel{ + Node: candidate.NodeID, + }, + ) + } - // If we attempt to make a call to the Select function, - // without providing any new information, then we - // should get no new directives as both nodes has - // already been attached to. - directives, err = prefAttach.Select(self, graph, - availableBalance, 2, skipNodes) + // If we attempt to make a call to the NodeScores + // function, without providing any new information, + // then all nodes should have a score of zero, since we + // already got channels to them. + scores, err = prefAttach.NodeScores(graph, chans, + availableBalance, nodes) if err != nil { t1.Fatalf("unable to select attachment "+ "directives: %v", err) } - if len(directives) != 0 { - t1.Fatalf("zero new directives should have been "+ - "selected, but %v were", len(directives)) + // Since all should be given a score of 0, the map + // should be empty. + if len(scores) != 0 { + t1.Fatalf("expected empty score map, "+ + "instead got %v ", len(scores)) } }) if !success { diff --git a/pilot.go b/pilot.go index c745082c2..174452d72 100644 --- a/pilot.go +++ b/pilot.go @@ -85,12 +85,19 @@ var _ autopilot.ChannelController = (*chanController)(nil) func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error) { atplLog.Infof("Instantiating autopilot with cfg: %v", spew.Sdump(cfg)) + // Set up the constraints the autopilot heuristics must adhere to. + atplConstraints := &autopilot.HeuristicConstraints{ + MinChanSize: btcutil.Amount(cfg.MinChannelSize), + MaxChanSize: btcutil.Amount(cfg.MaxChannelSize), + ChanLimit: uint16(cfg.MaxChannels), + Allocation: cfg.Allocation, + MaxPendingOpens: 10, + } + // First, we'll create the preferential attachment heuristic, // initialized with the passed auto pilot configuration parameters. prefAttachment := autopilot.NewConstrainedPrefAttachment( - btcutil.Amount(cfg.MinChannelSize), - btcutil.Amount(cfg.MaxChannelSize), - uint16(cfg.MaxChannels), cfg.Allocation, + atplConstraints, ) // With the heuristic itself created, we can now populate the remainder @@ -107,8 +114,8 @@ func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error) WalletBalance: func() (btcutil.Amount, error) { return svr.cc.wallet.ConfirmedBalance(cfg.MinConfs) }, - Graph: autopilot.ChannelGraphFromDatabase(svr.chanDB.ChannelGraph()), - MaxPendingOpens: 10, + Graph: autopilot.ChannelGraphFromDatabase(svr.chanDB.ChannelGraph()), + Constraints: atplConstraints, ConnectToPeer: func(target *btcec.PublicKey, addrs []net.Addr) (bool, error) { // First, we'll check if we're already connected to the // target peer. If we are, we can exit early. Otherwise,