Merge pull request #7707 from ellemouton/wtclientFixes

watchtower: miscellaneous fixes
This commit is contained in:
Oliver Gugger 2023-05-19 12:22:04 +02:00 committed by GitHub
commit 0a2d6b61aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 9 deletions

View File

@ -67,6 +67,20 @@ const (
TypeRewardCommit = Type(FlagCommitOutputs | FlagReward)
)
// Identifier returns a unique, stable string identifier for the blob Type.
func (t Type) Identifier() (string, error) {
switch t {
case TypeAltruistCommit:
return "legacy", nil
case TypeAltruistAnchorCommit:
return "anchor", nil
case TypeRewardCommit:
return "reward", nil
default:
return "", fmt.Errorf("unknown blob type: %v", t)
}
}
// Has returns true if the Type has the passed flag enabled.
func (t Type) Has(flag Flag) bool {
return Flag(t)&flag == flag

View File

@ -350,10 +350,12 @@ func New(config *Config) (*TowerClient, error) {
cfg.WriteTimeout = DefaultWriteTimeout
}
prefix := "(legacy)"
if cfg.Policy.IsAnchorChannel() {
prefix = "(anchor)"
identifier, err := cfg.Policy.BlobType.Identifier()
if err != nil {
return nil, err
}
prefix := fmt.Sprintf("(%s)", identifier)
plog := build.NewPrefixLog(prefix, log)
// Load the sweep pkscripts that have been generated for all previously
@ -363,10 +365,7 @@ func New(config *Config) (*TowerClient, error) {
return nil, err
}
var (
policy = cfg.Policy.BlobType.String()
queueDB = cfg.DB.GetDBQueue([]byte(policy))
)
queueDB := cfg.DB.GetDBQueue([]byte(identifier))
queue, err := NewDiskOverflowQueue[*wtdb.BackupID](
queueDB, cfg.MaxTasksInMemQueue, plog,
)
@ -678,7 +677,11 @@ func (c *TowerClient) Start() error {
// Start the task pipeline to which new backup tasks will be
// submitted from active links.
c.pipeline.Start()
err = c.pipeline.Start()
if err != nil {
returnErr = err
return
}
c.wg.Add(1)
go c.backupDispatcher()
@ -727,7 +730,10 @@ func (c *TowerClient) Stop() error {
// 4. Since all valid tasks have been assigned to session
// queues, we no longer need to negotiate sessions.
c.negotiator.Stop()
err = c.negotiator.Stop()
if err != nil {
returnErr = err
}
c.log.Debugf("Waiting for active session queues to finish "+
"draining, stats: %s", c.stats)