diff --git a/config.go b/config.go index ab872c587..4c8c3e311 100644 --- a/config.go +++ b/config.go @@ -252,6 +252,8 @@ type config struct { net tor.Net Routing *routing.Conf `group:"routing" namespace:"routing"` + + Workers *lncfg.Workers `group:"workers" namespace:"workers"` } // loadConfig initializes and parses the config using a config file and command @@ -334,6 +336,11 @@ func loadConfig() (*config, error) { Control: defaultTorControl, }, net: &tor.ClearNet{}, + Workers: &lncfg.Workers{ + Read: lncfg.DefaultReadWorkers, + Write: lncfg.DefaultWriteWorkers, + Sig: lncfg.DefaultSigWorkers, + }, } // Pre-parse the command line options to pick up an alternative config @@ -968,6 +975,12 @@ func loadConfig() (*config, error) { "minbackoff") } + // Assert that all worker pools will have a positive number of + // workers, otherwise the pools will rendered useless. + if err := cfg.Workers.Validate(); err != nil { + return nil, err + } + // Finally, ensure that the user's color is correctly formatted, // otherwise the server will not be able to start after the unlocking // the wallet. diff --git a/lncfg/workers.go b/lncfg/workers.go new file mode 100644 index 000000000..fec57ddda --- /dev/null +++ b/lncfg/workers.go @@ -0,0 +1,49 @@ +package lncfg + +import "fmt" + +const ( + // DefaultReadWorkers is the default maximum number of concurrent + // workers used by the daemon's read pool. + DefaultReadWorkers = 16 + + // DefaultWriteWorkers is the default maximum number of concurrent + // workers used by the daemon's write pool. + DefaultWriteWorkers = 16 + + // DefaultSigWorkers is the default maximum number of concurrent workers + // used by the daemon's sig pool. + DefaultSigWorkers = 8 +) + +// Workers exposes CLI configuration for turning resources consumed by worker +// pools. +type Workers struct { + // Read is the maximum number of concurrent read pool workers. + Read int `long:"read" description:"Maximum number of concurrent read pool workers."` + + // Write is the maximum number of concurrent write pool workers. + Write int `long:"write" description:"Maximum number of concurrent write pool workers."` + + // Sig is the maximum number of concurrent sig pool workers. + Sig int `long:"sig" description:"Maximum number of concurrent sig pool workers."` +} + +// Validate checks the Workers configuration to ensure that the input values are +// sane. +func (w *Workers) Validate() error { + if w.Read <= 0 { + return fmt.Errorf("number of read workers (%d) must be "+ + "positive", w.Read) + } + if w.Write <= 0 { + return fmt.Errorf("number of write workers (%d) must be "+ + "positive", w.Write) + } + if w.Sig <= 0 { + return fmt.Errorf("number of sig workers (%d) must be "+ + "positive", w.Sig) + } + + return nil +} diff --git a/lncfg/workers_test.go b/lncfg/workers_test.go new file mode 100644 index 000000000..cc32202d3 --- /dev/null +++ b/lncfg/workers_test.go @@ -0,0 +1,102 @@ +package lncfg_test + +import ( + "testing" + + "github.com/lightningnetwork/lnd/lncfg" +) + +const ( + maxUint = ^uint(0) + maxInt = int(maxUint >> 1) + minInt = -maxInt - 1 +) + +// TestValidateWorkers asserts that validating the Workers config only succeeds +// if all fields specify a positive number of workers. +func TestValidateWorkers(t *testing.T) { + tests := []struct { + name string + cfg *lncfg.Workers + valid bool + }{ + { + name: "min valid", + cfg: &lncfg.Workers{ + Read: 1, + Write: 1, + Sig: 1, + }, + valid: true, + }, + { + name: "max valid", + cfg: &lncfg.Workers{ + Read: maxInt, + Write: maxInt, + Sig: maxInt, + }, + valid: true, + }, + { + name: "read max invalid", + cfg: &lncfg.Workers{ + Read: 0, + Write: 1, + Sig: 1, + }, + }, + { + name: "write max invalid", + cfg: &lncfg.Workers{ + Read: 1, + Write: 0, + Sig: 1, + }, + }, + { + name: "sig max invalid", + cfg: &lncfg.Workers{ + Read: 1, + Write: 1, + Sig: 0, + }, + }, + { + name: "read min invalid", + cfg: &lncfg.Workers{ + Read: minInt, + Write: 1, + Sig: 1, + }, + }, + { + name: "write min invalid", + cfg: &lncfg.Workers{ + Read: 1, + Write: minInt, + Sig: 1, + }, + }, + { + name: "sig min invalid", + cfg: &lncfg.Workers{ + Read: 1, + Write: 1, + Sig: minInt, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + err := test.cfg.Validate() + switch { + case test.valid && err != nil: + t.Fatalf("valid config was invalid: %v", err) + case !test.valid && err == nil: + t.Fatalf("invalid config was valid") + } + }) + } +} diff --git a/server.go b/server.go index 77cd8d044..d86f340cd 100644 --- a/server.go +++ b/server.go @@ -11,7 +11,6 @@ import ( "net" "path/filepath" "regexp" - "runtime" "strconv" "sync" "sync/atomic" @@ -273,7 +272,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, ) writePool := pool.NewWrite( - writeBufferPool, runtime.NumCPU(), pool.DefaultWorkerTimeout, + writeBufferPool, cfg.Workers.Write, pool.DefaultWorkerTimeout, ) readBufferPool := pool.NewReadBuffer( @@ -282,7 +281,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, ) readPool := pool.NewRead( - readBufferPool, runtime.NumCPU(), pool.DefaultWorkerTimeout, + readBufferPool, cfg.Workers.Read, pool.DefaultWorkerTimeout, ) decodeFinalCltvExpiry := func(payReq string) (uint32, error) { @@ -296,7 +295,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, s := &server{ chanDB: chanDB, cc: cc, - sigPool: lnwallet.NewSigPool(runtime.NumCPU()*2, cc.signer), + sigPool: lnwallet.NewSigPool(cfg.Workers.Sig, cc.signer), writePool: writePool, readPool: readPool,