diff --git a/chanbackup/pubsub.go b/chanbackup/pubsub.go index 43c368aaa..6304f0cb3 100644 --- a/chanbackup/pubsub.go +++ b/chanbackup/pubsub.go @@ -7,6 +7,7 @@ import ( "net" "os" "sync" + "sync/atomic" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" @@ -95,7 +96,12 @@ type ChannelNotifier interface { // be used to implement a system that always keeps the multi-chan backup file // on disk in a consistent state for safety purposes. type SubSwapper struct { + // started tracks whether the SubSwapper has been started and ensures + // it can only be started once. started sync.Once + + // stopped tracks whether the SubSwapper has been stopped and ensures + // it can only be stopped once. stopped sync.Once // backupState are the set of SCBs for all open channels we know of. @@ -115,6 +121,11 @@ type SubSwapper struct { quit chan struct{} wg sync.WaitGroup + + // isActive tracks whether the SubSwapper is active and ready to receive + // messages. It is used to prevent manual updates from being sent to the + // SubSwapper after it has been stopped or not yet started. + isActive atomic.Bool } // NewSubSwapper creates a new instance of the SubSwapper given the starting @@ -165,6 +176,7 @@ func (s *SubSwapper) Start() error { if err := s.updateBackupFile(); err != nil { startErr = fmt.Errorf("unable to refresh backup "+ "file: %v", err) + return } @@ -181,9 +193,13 @@ func (s *SubSwapper) Stop() error { log.Infof("chanbackup.SubSwapper shutting down...") defer log.Debug("chanbackup.SubSwapper shutdown complete") + // Mark the SubSwapper as not running. + s.isActive.Store(false) + close(s.quit) s.wg.Wait() }) + return nil } @@ -191,6 +207,11 @@ func (s *SubSwapper) Stop() error { // are processed in another goroutine. The method waits for the updates to be // fully processed and the file to be updated on-disk before returning. func (s *SubSwapper) ManualUpdate(singles []Single) error { + if !s.isActive.Load() { + return fmt.Errorf("swapper is not active, cannot perform " + + "manual update") + } + // Create the channel to send an error back. If the update handling // and the subsequent file updating succeeds, nil is sent. // The channel must have capacity of 1 to prevent Swapper blocking. @@ -300,7 +321,8 @@ func (s *SubSwapper) updateBackupFile(closedChans ...wire.OutPoint) error { // backupFileUpdater is the primary goroutine of the SubSwapper which is // responsible for listening for changes to the channel, and updating the // persistent multi backup state with a new packed multi of the latest channel -// state. +// state. Once active, it will process subscription updates and manual updates +// until the SubSwapper is stopped. func (s *SubSwapper) backupUpdater() { // Ensure that once we exit, we'll cancel our active channel // subscription. @@ -309,6 +331,9 @@ func (s *SubSwapper) backupUpdater() { log.Debugf("SubSwapper's backupUpdater is active!") + // Mark the SubSwapper as active. + s.isActive.Store(true) + for { select { // The channel state has been modified! We'll evaluate all