etcd: redesign commit queue to make it more robust and scalable

This commit builds on the ideas of @cfromknecht in lnd/5153. The
addition is that the design is now simpler and more robust by queueing
up everything, but allowing maximal parallelism where txns don't block.
Furthermore the commit makes CommitQueue.Done() private essentially
removing the need to understand the queue externally.
This commit is contained in:
Andras Banki-Horvath 2021-06-24 22:10:40 +02:00
parent 02aa77261d
commit b29ae94e10
No known key found for this signature in database
GPG Key ID: 80E5375C094198D8
5 changed files with 136 additions and 62 deletions

View File

@ -3,14 +3,11 @@
package etcd
import (
"container/list"
"context"
"sync"
)
// commitQueueSize is the maximum number of commits we let to queue up. All
// remaining commits will block on commitQueue.Add().
const commitQueueSize = 100
// commitQueue is a simple execution queue to manage conflicts for transactions
// and thereby reduce the number of times conflicting transactions need to be
// retried. When a new transaction is added to the queue, we first upgrade the
@ -25,9 +22,18 @@ type commitQueue struct {
readerMap map[string]int
writerMap map[string]int
commitMutex sync.RWMutex
queue chan (func())
wg sync.WaitGroup
queue *list.List
queueMx sync.Mutex
queueCond *sync.Cond
shutdown chan struct{}
}
type commitQueueTxn struct {
commitLoop func()
blocked bool
rset []string
wset []string
}
// NewCommitQueue creates a new commit queue, with the passed abort context.
@ -36,19 +42,24 @@ func NewCommitQueue(ctx context.Context) *commitQueue {
ctx: ctx,
readerMap: make(map[string]int),
writerMap: make(map[string]int),
queue: make(chan func(), commitQueueSize),
queue: list.New(),
shutdown: make(chan struct{}),
}
q.queueCond = sync.NewCond(&q.queueMx)
// Start the queue consumer loop.
q.wg.Add(1)
go q.mainLoop()
return q
}
// Wait waits for the queue to stop (after the queue context has been canceled).
func (c *commitQueue) Wait() {
c.wg.Wait()
// Stop signals the queue to stop after the queue context has been canceled and
// waits until the has stopped.
func (c *commitQueue) Stop() {
// Signal the queue's condition variable to ensure the mainLoop reliably
// unblocks to check for the exit condition.
c.queueCond.Signal()
<-c.shutdown
}
// Add increases lock counts and queues up tx commit closure for execution.
@ -82,33 +93,22 @@ func (c *commitQueue) Add(commitLoop func(), rset []string, wset []string) {
c.readerMap[key] += 1
}
if blocked {
// Add the transaction to the queue if conflicts with an already
// queued one.
c.mx.Unlock()
c.queueCond.L.Lock()
c.queue.PushBack(&commitQueueTxn{
commitLoop: commitLoop,
blocked: blocked,
rset: rset,
wset: wset,
})
c.queueCond.L.Unlock()
select {
case c.queue <- commitLoop:
case <-c.ctx.Done():
}
} else {
// To make sure we don't add a new tx to the queue that depends
// on this "unblocked" tx, grab the commitMutex before lifting
// the mutex guarding the lock maps.
c.commitMutex.RLock()
c.mx.Unlock()
c.mx.Unlock()
// At this point we're safe to execute the "unblocked" tx, as
// we cannot execute blocked tx that may have been read from the
// queue until the commitMutex is held.
commitLoop()
c.commitMutex.RUnlock()
}
c.queueCond.Signal()
}
// Done decreases lock counts of the keys in the read/write sets.
func (c *commitQueue) Done(rset []string, wset []string) {
// done decreases lock counts of the keys in the read/write sets.
func (c *commitQueue) done(rset []string, wset []string) {
c.mx.Lock()
defer c.mx.Unlock()
@ -131,20 +131,82 @@ func (c *commitQueue) Done(rset []string, wset []string) {
// dependencies. The queue ensures that the top element doesn't conflict with
// any other transactions and therefore can be executed freely.
func (c *commitQueue) mainLoop() {
defer c.wg.Done()
defer close(c.shutdown)
for {
select {
case top := <-c.queue:
// Execute the next blocked transaction. As it is
// the top element in the queue it means that it doesn't
// depend on any other transactions anymore.
c.commitMutex.Lock()
top()
c.commitMutex.Unlock()
// Wait until there are no unblocked transactions being
// executed, and for there to be at least one blocked
// transaction in our queue.
c.queueCond.L.Lock()
for c.queue.Front() == nil {
c.queueCond.Wait()
// Check the exit condition before looping again.
select {
case <-c.ctx.Done():
c.queueCond.L.Unlock()
return
default:
}
}
// Now collect all txns until we find the next blocking one.
// These shouldn't conflict (if the precollected read/write
// keys sets don't grow), meaning we can safely commit them
// in parallel.
work := make([]*commitQueueTxn, 1)
e := c.queue.Front()
work[0] = c.queue.Remove(e).(*commitQueueTxn)
for {
e := c.queue.Front()
if e == nil {
break
}
next := e.Value.(*commitQueueTxn)
if !next.blocked {
work = append(work, next)
c.queue.Remove(e)
} else {
// We found the next blocking txn which means
// the block of work needs to be cut here.
break
}
}
c.queueCond.L.Unlock()
// Check if we need to exit before continuing.
select {
case <-c.ctx.Done():
return
default:
}
var wg sync.WaitGroup
wg.Add(len(work))
// Fire up N goroutines where each will run its commit loop
// and then clean up the reader/writer maps.
for _, txn := range work {
go func(txn *commitQueueTxn) {
defer wg.Done()
txn.commitLoop()
// We can safely cleanup here as done only
// holds the main mutex.
c.done(txn.rset, txn.wset)
}(txn)
}
wg.Wait()
// Check if we need to exit before continuing.
select {
case <-c.ctx.Done():
return
default:
}
}
}

View File

@ -17,7 +17,7 @@ import (
func TestCommitQueue(t *testing.T) {
// The duration of each commit.
const commitDuration = time.Millisecond * 500
const numCommits = 4
const numCommits = 5
var wg sync.WaitGroup
commits := make([]string, numCommits)
@ -30,25 +30,25 @@ func TestCommitQueue(t *testing.T) {
// Update our log of commit order. Avoid blocking
// by preallocating the commit log and increasing
// the log index atomically.
i := atomic.AddInt32(&idx, 1)
commits[i] = tag
if sleep {
time.Sleep(commitDuration)
}
i := atomic.AddInt32(&idx, 1)
commits[i] = tag
}
}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
q := NewCommitQueue(ctx)
defer q.Wait()
defer q.Stop()
defer cancel()
wg.Add(numCommits)
t1 := time.Now()
// Tx1: reads: key1, key2, writes: key3, conflict: none
// Tx1 (long): reads: key1, key2, writes: key3, conflict: none
q.Add(
commit("free", true),
[]string{"key1", "key2"},
@ -60,12 +60,18 @@ func TestCommitQueue(t *testing.T) {
[]string{"key1", "key2"},
[]string{"key3"},
)
// Tx3: reads: key1, writes: key4, conflict: none
// Tx3 (long): reads: key1, writes: key4, conflict: none
q.Add(
commit("free", true),
[]string{"key1", "key2"},
[]string{"key4"},
)
// Tx4 (long): reads: key1, writes: none, conflict: none
q.Add(
commit("free", true),
[]string{"key1", "key2"},
[]string{},
)
// Tx4: reads: key2, writes: key4 conflict: Tx3
q.Add(
commit("blocked2", false),
@ -87,7 +93,7 @@ func TestCommitQueue(t *testing.T) {
// before the blocking ones, and the blocking ones are executed in
// the order of addition.
require.Equal(t,
[]string{"free", "free", "blocked1", "blocked2"},
[]string{"free", "blocked1", "free", "free", "blocked2"},
commits,
)
}

View File

@ -122,6 +122,7 @@ func (c *commitStatsCollector) callback(succ bool, stats CommitStats) {
type db struct {
cfg Config
ctx context.Context
cancel func()
cli *clientv3.Client
commitStatsCollector *commitStatsCollector
txQueue *commitQueue
@ -135,7 +136,6 @@ var _ walletdb.DB = (*db)(nil)
// config. If etcd connection cannot be established, then returns error.
func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) {
clientCfg := clientv3.Config{
Context: ctx,
Endpoints: []string{cfg.Host},
DialTimeout: etcdConnectionTimeout,
Username: cfg.User,
@ -158,8 +158,11 @@ func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) {
clientCfg.TLS = tlsConfig
}
ctx, cancel := context.WithCancel(ctx)
clientCfg.Context = ctx
cli, err := clientv3.New(clientCfg)
if err != nil {
cancel()
return nil, err
}
@ -171,6 +174,7 @@ func newEtcdBackend(ctx context.Context, cfg Config) (*db, error) {
backend := &db{
cfg: cfg,
ctx: ctx,
cancel: cancel,
cli: cli,
txQueue: NewCommitQueue(ctx),
}
@ -296,5 +300,8 @@ func (db *db) Copy(w io.Writer) error {
// Close cleanly shuts down the database and syncs all data.
// This function is part of the walletdb.Db interface implementation.
func (db *db) Close() error {
return db.cli.Close()
err := db.cli.Close()
db.cancel()
db.txQueue.Stop()
return err
}

View File

@ -283,7 +283,7 @@ func runSTM(s *stm, apply func(STM) error) error {
// Make a copy of the read/write set keys here. The reason why we need
// to do this is because subsequent applies may change (shrink) these
// sets and so when we decrease reference counts in the commit queue in
// Done(...) we'd potentially miss removing references which would
// done(...) we'd potentially miss removing references which would
// result in queueing up transactions and contending DB access.
// Copying these strings is cheap due to Go's immutable string which is
// always a reference.
@ -309,10 +309,9 @@ func runSTM(s *stm, apply func(STM) error) error {
select {
case <-done:
case <-s.options.ctx.Done():
return context.Canceled
}
s.txQueue.Done(rkeys, wkeys)
if s.options.commitStatsCallback != nil {
stats.Retries = retries
s.options.commitStatsCallback(executeErr == nil, stats)

View File

@ -28,7 +28,7 @@ func TestPutToEmpty(t *testing.T) {
defer func() {
cancel()
f.Cleanup()
txQueue.Wait()
txQueue.Stop()
}()
db, err := newEtcdBackend(ctx, f.BackendConfig())
@ -55,7 +55,7 @@ func TestGetPutDel(t *testing.T) {
defer func() {
cancel()
f.Cleanup()
txQueue.Wait()
txQueue.Stop()
}()
testKeyValues := []KV{
@ -141,7 +141,7 @@ func TestFirstLastNextPrev(t *testing.T) {
defer func() {
cancel()
f.Cleanup()
txQueue.Wait()
txQueue.Stop()
}()
testKeyValues := []KV{
@ -299,7 +299,7 @@ func TestCommitError(t *testing.T) {
defer func() {
cancel()
f.Cleanup()
txQueue.Wait()
txQueue.Stop()
}()
db, err := newEtcdBackend(ctx, f.BackendConfig())
@ -347,7 +347,7 @@ func TestManualTxError(t *testing.T) {
defer func() {
cancel()
f.Cleanup()
txQueue.Wait()
txQueue.Stop()
}()
db, err := newEtcdBackend(ctx, f.BackendConfig())