diff --git a/chanbackup/pubsub.go b/chanbackup/pubsub.go index 8fa1d5f34..d1860eb4d 100644 --- a/chanbackup/pubsub.go +++ b/chanbackup/pubsub.go @@ -6,6 +6,7 @@ import ( "net" "os" "sync" + "sync/atomic" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" @@ -93,7 +94,12 @@ type ChannelNotifier interface { // be used to implement a system that always keeps the multi-chan backup file // on disk in a consistent state for safety purposes. type SubSwapper struct { + // started tracks whether the SubSwapper has been started and ensures + // it can only be started once. started sync.Once + + // stopped tracks whether the SubSwapper has been stopped and ensures + // it can only be stopped once. stopped sync.Once // backupState are the set of SCBs for all open channels we know of. @@ -113,6 +119,11 @@ type SubSwapper struct { quit chan struct{} wg sync.WaitGroup + + // isActive tracks whether the SubSwapper is active and ready to receive + // messages. It is used to prevent manual updates from being sent to the + // SubSwapper after it has been stopped or not yet started. + isActive atomic.Bool } // NewSubSwapper creates a new instance of the SubSwapper given the starting @@ -162,6 +173,7 @@ func (s *SubSwapper) Start() error { if err := s.updateBackupFile(); err != nil { startErr = fmt.Errorf("unable to refresh backup "+ "file: %v", err) + return } @@ -178,9 +190,13 @@ func (s *SubSwapper) Stop() error { log.Infof("chanbackup.SubSwapper shutting down...") defer log.Debug("chanbackup.SubSwapper shutdown complete") + // Mark the SubSwapper as not running. + s.isActive.Store(false) + close(s.quit) s.wg.Wait() }) + return nil } @@ -188,6 +204,11 @@ func (s *SubSwapper) Stop() error { // are processed in another goroutine. The method waits for the updates to be // fully processed and the file to be updated on-disk before returning. func (s *SubSwapper) ManualUpdate(singles []Single) error { + if !s.isActive.Load() { + return fmt.Errorf("swapper is not active, cannot perform " + + "manual update") + } + // Create the channel to send an error back. If the update handling // and the subsequent file updating succeeds, nil is sent. // The channel must have capacity of 1 to prevent Swapper blocking. @@ -297,7 +318,8 @@ func (s *SubSwapper) updateBackupFile(closedChans ...wire.OutPoint) error { // backupFileUpdater is the primary goroutine of the SubSwapper which is // responsible for listening for changes to the channel, and updating the // persistent multi backup state with a new packed multi of the latest channel -// state. +// state. Once active, it will process subscription updates and manual updates +// until the SubSwapper is stopped. func (s *SubSwapper) backupUpdater() { // Ensure that once we exit, we'll cancel our active channel // subscription. @@ -306,6 +328,9 @@ func (s *SubSwapper) backupUpdater() { log.Debugf("SubSwapper's backupUpdater is active!") + // Mark the SubSwapper as active. + s.isActive.Store(true) + for { select { // The channel state has been modified! We'll evaluate all