Merge pull request #5513 from bhandras/commit_queue_fix

etcd: fix dereferencing issue in commit queue causing contention and change design to be more scalable
This commit is contained in:
András Bánki-Horváth 2021-08-06 09:26:40 +02:00 committed by GitHub
commit 254d9be6ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 177 additions and 98 deletions

View File

@ -107,6 +107,10 @@ you.
* [Optimized payment sequence generation](https://github.com/lightningnetwork/lnd/pull/5514/)
to make LNDs payment throughput (and latency) with better when using etcd.
* [More robust commit queue design](https://github.com/lightningnetwork/lnd/pull/5513)
to make it less likely that we retry etcd transactions and make the commit
queue more scalable.
## Performance improvements
* [Update MC store in blocks](https://github.com/lightningnetwork/lnd/pull/5515)
@ -119,6 +123,7 @@ currnet DNS seeds when in SigNet
mode](https://github.com/lightningnetwork/lnd/pull/5564).
# Contributors (Alphabetical Order)
* Andras Banki-Horvath
* ErikEk
* Martin Habovstiak
* Zero-1729

View File

@ -3,14 +3,11 @@
package etcd
import (
"container/list"
"context"
"sync"
)
// commitQueueSize is the maximum number of commits we let to queue up. All
// remaining commits will block on commitQueue.Add().
const commitQueueSize = 100
// commitQueue is a simple execution queue to manage conflicts for transactions
// and thereby reduce the number of times conflicting transactions need to be
// retried. When a new transaction is added to the queue, we first upgrade the
@ -25,9 +22,18 @@ type commitQueue struct {
readerMap map[string]int
writerMap map[string]int
commitMutex sync.RWMutex
queue chan (func())
wg sync.WaitGroup
queue *list.List
queueMx sync.Mutex
queueCond *sync.Cond
shutdown chan struct{}
}
type commitQueueTxn struct {
commitLoop func()
blocked bool
rset []string
wset []string
}
// NewCommitQueue creates a new commit queue, with the passed abort context.
@ -36,32 +42,37 @@ func NewCommitQueue(ctx context.Context) *commitQueue {
ctx: ctx,
readerMap: make(map[string]int),
writerMap: make(map[string]int),
queue: make(chan func(), commitQueueSize),
queue: list.New(),
shutdown: make(chan struct{}),
}
q.queueCond = sync.NewCond(&q.queueMx)
// Start the queue consumer loop.
q.wg.Add(1)
go q.mainLoop()
return q
}
// Wait waits for the queue to stop (after the queue context has been canceled).
func (c *commitQueue) Wait() {
c.wg.Wait()
// Stop signals the queue to stop after the queue context has been canceled and
// waits until the has stopped.
func (c *commitQueue) Stop() {
// Signal the queue's condition variable to ensure the mainLoop reliably
// unblocks to check for the exit condition.
c.queueCond.Signal()
<-c.shutdown
}
// Add increases lock counts and queues up tx commit closure for execution.
// Transactions that don't have any conflicts are executed immediately by
// "downgrading" the count mutex to allow concurrency.
func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) {
func (c *commitQueue) Add(commitLoop func(), rset []string, wset []string) {
c.mx.Lock()
blocked := false
// Mark as blocked if there's any writer changing any of the keys in
// the read set. Do not increment the reader counts yet as we'll need to
// use the original reader counts when scanning through the write set.
for key := range rset {
for _, key := range rset {
if c.writerMap[key] > 0 {
blocked = true
break
@ -70,7 +81,7 @@ func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) {
// Mark as blocked if there's any writer or reader for any of the keys
// in the write set.
for key := range wset {
for _, key := range wset {
blocked = blocked || c.readerMap[key] > 0 || c.writerMap[key] > 0
// Increment the writer count.
@ -78,48 +89,37 @@ func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) {
}
// Finally we can increment the reader counts for keys in the read set.
for key := range rset {
for _, key := range rset {
c.readerMap[key] += 1
}
if blocked {
// Add the transaction to the queue if conflicts with an already
// queued one.
c.mx.Unlock()
c.queueCond.L.Lock()
c.queue.PushBack(&commitQueueTxn{
commitLoop: commitLoop,
blocked: blocked,
rset: rset,
wset: wset,
})
c.queueCond.L.Unlock()
select {
case c.queue <- commitLoop:
case <-c.ctx.Done():
}
} else {
// To make sure we don't add a new tx to the queue that depends
// on this "unblocked" tx, grab the commitMutex before lifting
// the mutex guarding the lock maps.
c.commitMutex.RLock()
c.mx.Unlock()
c.mx.Unlock()
// At this point we're safe to execute the "unblocked" tx, as
// we cannot execute blocked tx that may have been read from the
// queue until the commitMutex is held.
commitLoop()
c.commitMutex.RUnlock()
}
c.queueCond.Signal()
}
// Done decreases lock counts of the keys in the read/write sets.
func (c *commitQueue) Done(rset readSet, wset writeSet) {
// done decreases lock counts of the keys in the read/write sets.
func (c *commitQueue) done(rset []string, wset []string) {
c.mx.Lock()
defer c.mx.Unlock()
for key := range rset {
for _, key := range rset {
c.readerMap[key] -= 1
if c.readerMap[key] == 0 {
delete(c.readerMap, key)
}
}
for key := range wset {
for _, key := range wset {
c.writerMap[key] -= 1
if c.writerMap[key] == 0 {
delete(c.writerMap, key)
@ -131,20 +131,82 @@ func (c *commitQueue) Done(rset readSet, wset writeSet) {
// dependencies. The queue ensures that the top element doesn't conflict with
// any other transactions and therefore can be executed freely.
func (c *commitQueue) mainLoop() {
defer c.wg.Done()
defer close(c.shutdown)
for {
select {
case top := <-c.queue:
// Execute the next blocked transaction. As it is
// the top element in the queue it means that it doesn't
// depend on any other transactions anymore.
c.commitMutex.Lock()
top()
c.commitMutex.Unlock()
// Wait until there are no unblocked transactions being
// executed, and for there to be at least one blocked
// transaction in our queue.
c.queueCond.L.Lock()
for c.queue.Front() == nil {
c.queueCond.Wait()
// Check the exit condition before looping again.
select {
case <-c.ctx.Done():
c.queueCond.L.Unlock()
return
default:
}
}
// Now collect all txns until we find the next blocking one.
// These shouldn't conflict (if the precollected read/write
// keys sets don't grow), meaning we can safely commit them
// in parallel.
work := make([]*commitQueueTxn, 1)
e := c.queue.Front()
work[0] = c.queue.Remove(e).(*commitQueueTxn)
for {
e := c.queue.Front()
if e == nil {
break
}
next := e.Value.(*commitQueueTxn)
if !next.blocked {
work = append(work, next)
c.queue.Remove(e)
} else {
// We found the next blocking txn which means
// the block of work needs to be cut here.
break
}
}
c.queueCond.L.Unlock()
// Check if we need to exit before continuing.
select {
case <-c.ctx.Done():
return
default:
}
var wg sync.WaitGroup
wg.Add(len(work))
// Fire up N goroutines where each will run its commit loop
// and then clean up the reader/writer maps.
for _, txn := range work {
go func(txn *commitQueueTxn) {
defer wg.Done()
txn.commitLoop()
// We can safely cleanup here as done only
// holds the main mutex.
c.done(txn.rset, txn.wset)
}(txn)
}
wg.Wait()
// Check if we need to exit before continuing.
select {
case <-c.ctx.Done():
return
default:
}
}
}

View File

@ -17,7 +17,7 @@ import (
func TestCommitQueue(t *testing.T) {
// The duration of each commit.
const commitDuration = time.Millisecond * 500
const numCommits = 4
const numCommits = 5
var wg sync.WaitGroup
commits := make([]string, numCommits)
@ -30,69 +30,53 @@ func TestCommitQueue(t *testing.T) {
// Update our log of commit order. Avoid blocking
// by preallocating the commit log and increasing
// the log index atomically.
i := atomic.AddInt32(&idx, 1)
commits[i] = tag
if sleep {
time.Sleep(commitDuration)
}
i := atomic.AddInt32(&idx, 1)
commits[i] = tag
}
}
// Helper function to create a read set from the passed keys.
makeReadSet := func(keys []string) readSet {
rs := make(map[string]stmGet)
for _, key := range keys {
rs[key] = stmGet{}
}
return rs
}
// Helper function to create a write set from the passed keys.
makeWriteSet := func(keys []string) writeSet {
ws := make(map[string]stmPut)
for _, key := range keys {
ws[key] = stmPut{}
}
return ws
}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
q := NewCommitQueue(ctx)
defer q.Wait()
defer q.Stop()
defer cancel()
wg.Add(numCommits)
t1 := time.Now()
// Tx1: reads: key1, key2, writes: key3, conflict: none
// Tx1 (long): reads: key1, key2, writes: key3, conflict: none
q.Add(
commit("free", true),
makeReadSet([]string{"key1", "key2"}),
makeWriteSet([]string{"key3"}),
[]string{"key1", "key2"},
[]string{"key3"},
)
// Tx2: reads: key1, key2, writes: key3, conflict: Tx1
q.Add(
commit("blocked1", false),
makeReadSet([]string{"key1", "key2"}),
makeWriteSet([]string{"key3"}),
[]string{"key1", "key2"},
[]string{"key3"},
)
// Tx3: reads: key1, writes: key4, conflict: none
// Tx3 (long): reads: key1, writes: key4, conflict: none
q.Add(
commit("free", true),
makeReadSet([]string{"key1", "key2"}),
makeWriteSet([]string{"key4"}),
[]string{"key1", "key2"},
[]string{"key4"},
)
// Tx4 (long): reads: key1, writes: none, conflict: none
q.Add(
commit("free", true),
[]string{"key1", "key2"},
[]string{},
)
// Tx4: reads: key2, writes: key4 conflict: Tx3
q.Add(
commit("blocked2", false),
makeReadSet([]string{"key2"}),
makeWriteSet([]string{"key4"}),
[]string{"key2"},
[]string{"key4"},
)
// Wait for all commits.
@ -109,7 +93,7 @@ func TestCommitQueue(t *testing.T) {
// before the blocking ones, and the blocking ones are executed in
// the order of addition.
require.Equal(t,
[]string{"free", "free", "blocked1", "blocked2"},
[]string{"free", "blocked1", "free", "free", "blocked2"},
commits,
)
}

View File

@ -122,6 +122,7 @@ func (c *commitStatsCollector) callback(succ bool, stats CommitStats) {
type db struct {
cfg Config
ctx context.Context
cancel func()
cli *clientv3.Client
commitStatsCollector *commitStatsCollector
txQueue *commitQueue
@ -135,7 +136,6 @@ var _ walletdb.DB = (*db)(nil)
// config. If etcd connection cannot be established, then returns error.
func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) {
clientCfg := clientv3.Config{
Context: ctx,
Endpoints: []string{cfg.Host},
DialTimeout: etcdConnectionTimeout,
Username: cfg.User,
@ -158,8 +158,11 @@ func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) {
clientCfg.TLS = tlsConfig
}
ctx, cancel := context.WithCancel(ctx)
clientCfg.Context = ctx
cli, err := clientv3.New(clientCfg)
if err != nil {
cancel()
return nil, err
}
@ -171,6 +174,7 @@ func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) {
backend := &db{
cfg: cfg,
ctx: ctx,
cancel: cancel,
cli: cli,
txQueue: NewCommitQueue(ctx),
}
@ -296,5 +300,8 @@ func (db *db) Copy(w io.Writer) error {
// Close cleanly shuts down the database and syncs all data.
// This function is part of the walletdb.Db interface implementation.
func (db *db) Close() error {
return db.cli.Close()
err := db.cli.Close()
db.cancel()
db.txQueue.Stop()
return err
}

View File

@ -280,17 +280,38 @@ func runSTM(s *stm, apply func(STM) error) error {
return preApplyErr
}
// Make a copy of the read/write set keys here. The reason why we need
// to do this is because subsequent applies may change (shrink) these
// sets and so when we decrease reference counts in the commit queue in
// done(...) we'd potentially miss removing references which would
// result in queueing up transactions and contending DB access.
// Copying these strings is cheap due to Go's immutable string which is
// always a reference.
rkeys := make([]string, len(s.rset))
wkeys := make([]string, len(s.wset))
i := 0
for key := range s.rset {
rkeys[i] = key
i++
}
i = 0
for key := range s.wset {
wkeys[i] = key
i++
}
// Queue up the transaction for execution.
s.txQueue.Add(execute, s.rset, s.wset)
s.txQueue.Add(execute, rkeys, wkeys)
// Wait for the transaction to execute, or break if aborted.
select {
case <-done:
case <-s.options.ctx.Done():
return context.Canceled
}
s.txQueue.Done(s.rset, s.wset)
if s.options.commitStatsCallback != nil {
stats.Retries = retries
s.options.commitStatsCallback(executeErr == nil, stats)

View File

@ -28,7 +28,7 @@ func TestPutToEmpty(t *testing.T) {
defer func() {
cancel()
f.Cleanup()
txQueue.Wait()
txQueue.Stop()
}()
db, err := newEtcdBackend(ctx, f.BackendConfig())
@ -55,7 +55,7 @@ func TestGetPutDel(t *testing.T) {
defer func() {
cancel()
f.Cleanup()
txQueue.Wait()
txQueue.Stop()
}()
testKeyValues := []KV{
@ -141,7 +141,7 @@ func TestFirstLastNextPrev(t *testing.T) {
defer func() {
cancel()
f.Cleanup()
txQueue.Wait()
txQueue.Stop()
}()
testKeyValues := []KV{
@ -299,7 +299,7 @@ func TestCommitError(t *testing.T) {
defer func() {
cancel()
f.Cleanup()
txQueue.Wait()
txQueue.Stop()
}()
db, err := newEtcdBackend(ctx, f.BackendConfig())
@ -347,7 +347,7 @@ func TestManualTxError(t *testing.T) {
defer func() {
cancel()
f.Cleanup()
txQueue.Wait()
txQueue.Stop()
}()
db, err := newEtcdBackend(ctx, f.BackendConfig())