graph: updated builder to use atomic ints

Instead of relying on devs to remember that they must only be accessed
atomically.
This commit is contained in:
Elle Mouton
2024-07-15 15:00:10 +02:00
parent fe34d62eb1
commit 90dff730ce
3 changed files with 12 additions and 16 deletions

View File

@@ -116,8 +116,8 @@ type Builder struct {
started atomic.Bool started atomic.Bool
stopped atomic.Bool stopped atomic.Bool
ntfnClientCounter uint64 // To be used atomically. ntfnClientCounter atomic.Uint64
bestHeight uint32 // To be used atomically. bestHeight atomic.Uint32
cfg *Config cfg *Config
@@ -278,7 +278,7 @@ func (b *Builder) Start() error {
if err != nil { if err != nil {
return err return err
} }
b.bestHeight = uint32(bestHeight) b.bestHeight.Store(uint32(bestHeight))
// Before we begin normal operation of the router, we first need // Before we begin normal operation of the router, we first need
// to synchronize the channel graph to the latest state of the // to synchronize the channel graph to the latest state of the
@@ -340,7 +340,7 @@ func (b *Builder) syncGraphWithChain() error {
if err != nil { if err != nil {
return err return err
} }
b.bestHeight = uint32(bestHeight) b.bestHeight.Store(uint32(bestHeight))
pruneHash, pruneHeight, err := b.cfg.Graph.PruneTip() pruneHash, pruneHeight, err := b.cfg.Graph.PruneTip()
if err != nil { if err != nil {
@@ -806,7 +806,7 @@ func (b *Builder) networkHandler() {
// Since this block is stale, we update our best height // Since this block is stale, we update our best height
// to the previous block. // to the previous block.
blockHeight := chainUpdate.Height blockHeight := chainUpdate.Height
atomic.StoreUint32(&b.bestHeight, blockHeight-1) b.bestHeight.Store(blockHeight - 1)
// Update the channel graph to reflect that this block // Update the channel graph to reflect that this block
// was disconnected. // was disconnected.
@@ -834,7 +834,7 @@ func (b *Builder) networkHandler() {
// directly to the end of our main chain. If not, then // directly to the end of our main chain. If not, then
// we've somehow missed some blocks. Here we'll catch // we've somehow missed some blocks. Here we'll catch
// up the chain with the latest blocks. // up the chain with the latest blocks.
currentHeight := atomic.LoadUint32(&b.bestHeight) currentHeight := b.bestHeight.Load()
switch { switch {
case chainUpdate.Height == currentHeight+1: case chainUpdate.Height == currentHeight+1:
err := b.updateGraphWithClosedChannels( err := b.updateGraphWithClosedChannels(
@@ -991,7 +991,7 @@ func (b *Builder) updateGraphWithClosedChannels(
// of the chain tip. // of the chain tip.
blockHeight := chainUpdate.Height blockHeight := chainUpdate.Height
atomic.StoreUint32(&b.bestHeight, blockHeight) b.bestHeight.Store(blockHeight)
log.Infof("Pruning channel graph using block %v (height=%v)", log.Infof("Pruning channel graph using block %v (height=%v)",
chainUpdate.Hash, blockHeight) chainUpdate.Hash, blockHeight)
@@ -1342,7 +1342,7 @@ func (b *Builder) processUpdate(msg interface{},
}, },
} }
err = b.cfg.ChainView.UpdateFilter( err = b.cfg.ChainView.UpdateFilter(
filterUpdate, atomic.LoadUint32(&b.bestHeight), filterUpdate, b.bestHeight.Load(),
) )
if err != nil { if err != nil {
return errors.Errorf("unable to update chain "+ return errors.Errorf("unable to update chain "+
@@ -1658,7 +1658,7 @@ func (b *Builder) CurrentBlockHeight() (uint32, error) {
// is synced to. This can differ from the above chain height if the goroutine // is synced to. This can differ from the above chain height if the goroutine
// responsible for processing the blocks isn't yet up to speed. // responsible for processing the blocks isn't yet up to speed.
func (b *Builder) SyncedHeight() uint32 { func (b *Builder) SyncedHeight() uint32 {
return atomic.LoadUint32(&b.bestHeight) return b.bestHeight.Load()
} }
// GetChannelByID return the channel by the channel id. // GetChannelByID return the channel by the channel id.

View File

@@ -11,7 +11,6 @@ import (
"net" "net"
"os" "os"
"strings" "strings"
"sync/atomic"
"testing" "testing"
"time" "time"
@@ -1392,12 +1391,10 @@ func TestBlockDifferenceFix(t *testing.T) {
err := wait.NoError(func() error { err := wait.NoError(func() error {
// Then router height should be updated to the latest block. // Then router height should be updated to the latest block.
if atomic.LoadUint32(&ctx.builder.bestHeight) != if ctx.builder.bestHeight.Load() != newBlockHeight {
newBlockHeight {
return fmt.Errorf("height should have been updated "+ return fmt.Errorf("height should have been updated "+
"to %v, instead got %v", newBlockHeight, "to %v, instead got %v", newBlockHeight,
ctx.builder.bestHeight) ctx.builder.bestHeight.Load())
} }
return nil return nil

View File

@@ -5,7 +5,6 @@ import (
"image/color" "image/color"
"net" "net"
"sync" "sync"
"sync/atomic"
"github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/btcutil"
@@ -65,7 +64,7 @@ func (b *Builder) SubscribeTopology() (*TopologyClient, error) {
// We'll first atomically obtain the next ID for this client from the // We'll first atomically obtain the next ID for this client from the
// incrementing client ID counter. // incrementing client ID counter.
clientID := atomic.AddUint64(&b.ntfnClientCounter, 1) clientID := b.ntfnClientCounter.Add(1)
log.Debugf("New graph topology client subscription, client %v", log.Debugf("New graph topology client subscription, client %v",
clientID) clientID)