diff --git a/batch/batch.go b/batch/batch.go index e2d8f1daa..e31701439 100644 --- a/batch/batch.go +++ b/batch/batch.go @@ -14,6 +14,19 @@ var errSolo = errors.New( "batch function returned an error and should be re-run solo", ) +// txOpts implements the sqldb.TxOptions interface. It is used to indicate that +// the transaction can be read-only or not transaction. +type txOpts struct { + readOnly bool +} + +// ReadOnly returns true if the transaction should be read only. +// +// NOTE: This is part of the sqldb.TxOptions interface. +func (t *txOpts) ReadOnly() bool { + return t.readOnly +} + type request[Q any] struct { *Request[Q] errChan chan error @@ -25,6 +38,7 @@ type batch[Q any] struct { reqs []*request[Q] clear func(b *batch[Q]) locker sync.Locker + txOpts txOpts } // trigger is the entry point for the batch and ensures that run is started at @@ -52,12 +66,11 @@ func (b *batch[Q]) run(ctx context.Context) { // Apply the batch until a subset succeeds or all of them fail. Requests // that fail will be retried individually. - var writeTx writeOpts for len(b.reqs) > 0 { var failIdx = -1 - err := b.db.ExecTx(ctx, &writeTx, func(tx Q) error { + err := b.db.ExecTx(ctx, &b.txOpts, func(tx Q) error { for i, req := range b.reqs { - err := req.Update(tx) + err := req.Do(tx) if err != nil { // If we get a serialization error, we // want the underlying SQL retry diff --git a/batch/batch_test.go b/batch/batch_test.go index e9dd7074e..dd9a5fb4f 100644 --- a/batch/batch_test.go +++ b/batch/batch_test.go @@ -58,7 +58,7 @@ func TestRetry(t *testing.T) { // execute it non-lazily. It should still return the error the second // time. req := &Request[kvdb.RwTx]{ - Update: func(tx kvdb.RwTx) error { + Do: func(tx kvdb.RwTx) error { called++ return errors.New("test") @@ -79,7 +79,7 @@ func TestRetry(t *testing.T) { // transaction to retry. Since we aren't using postgres, this will // cause the transaction to not be retried at all. req = &Request[kvdb.RwTx]{ - Update: func(tx kvdb.RwTx) error { + Do: func(tx kvdb.RwTx) error { called++ return errors.New("could not serialize access") @@ -95,6 +95,265 @@ func TestRetry(t *testing.T) { require.ErrorContains(t, err, "could not serialize access") } +// TestReadOnly just ensures that nothing breaks if we specify a read-only tx +// and then continue to add a write transaction to the same batch. +func TestReadOnly(t *testing.T) { + t.Parallel() + ctx := context.Background() + + t.Run("bbolt-ReadWrite", func(t *testing.T) { + db, err := walletdb.Create( + "bdb", filepath.Join(t.TempDir(), "weks.db"), true, + kvdb.DefaultDBTimeout, false, + ) + require.NoError(t, err) + if err != nil { + t.Fatalf("unable to create walletdb: %v", err) + } + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + // Create a bbolt read-write scheduler. + rwSche := NewTimeScheduler[kvdb.RwTx]( + NewBoltBackend[kvdb.RwTx](db), nil, time.Second, + ) + + // Call it without a read-only option. + var called bool + req := &Request[kvdb.RwTx]{ + Do: func(tx kvdb.RwTx) error { + called = true + return nil + }, + } + require.NoError(t, rwSche.Execute(ctx, req)) + require.True(t, called) + + // Call it with a read-only option. + called = false + req = &Request[kvdb.RwTx]{ + Opts: NewSchedulerOptions(ReadOnly()), + Do: func(tx kvdb.RwTx) error { + called = true + return nil + }, + } + require.NoError(t, rwSche.Execute(ctx, req)) + require.True(t, called) + + // Now, spin off a bunch of reads and writes at the same time + // so that we can simulate the upgrade from read-only to + // read-write. + var ( + wg sync.WaitGroup + reads = 0 + readsMu sync.Mutex + writes = 0 + writesMu sync.Mutex + ) + for i := 0; i < 100; i++ { + // Spin off the reads. + wg.Add(1) + go func() { + defer wg.Done() + + req := &Request[kvdb.RwTx]{ + Opts: NewSchedulerOptions(ReadOnly()), + Do: func(tx kvdb.RwTx) error { + readsMu.Lock() + reads++ + readsMu.Unlock() + + return nil + }, + } + require.NoError(t, rwSche.Execute(ctx, req)) + }() + + // Spin off the writes. + wg.Add(1) + go func() { + defer wg.Done() + + req := &Request[kvdb.RwTx]{ + Do: func(tx kvdb.RwTx) error { + writesMu.Lock() + writes++ + writesMu.Unlock() + + return nil + }, + } + require.NoError(t, rwSche.Execute(ctx, req)) + }() + } + + wg.Wait() + require.Equal(t, 100, reads) + require.Equal(t, 100, writes) + }) + + // Note that if the scheduler is initialized with a read-only bbolt tx, + // then the ReadOnly option does nothing as it will be read-only + // regardless. + t.Run("bbolt-ReadOnly", func(t *testing.T) { + db, err := walletdb.Create( + "bdb", filepath.Join(t.TempDir(), "weks.db"), true, + kvdb.DefaultDBTimeout, false, + ) + require.NoError(t, err) + if err != nil { + t.Fatalf("unable to create walletdb: %v", err) + } + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + // Create a bbolt read only scheduler. + rwSche := NewTimeScheduler[kvdb.RTx]( + NewBoltBackend[kvdb.RTx](db), nil, time.Second, + ) + + // Call it without a read-only option. + var called bool + req := &Request[kvdb.RTx]{ + Do: func(tx kvdb.RTx) error { + called = true + return nil + }, + } + require.NoError(t, rwSche.Execute(ctx, req)) + require.True(t, called) + + // Call it with a read-only option. + called = false + req = &Request[kvdb.RTx]{ + Opts: NewSchedulerOptions(ReadOnly()), + Do: func(tx kvdb.RTx) error { + called = true + return nil + }, + } + require.NoError(t, rwSche.Execute(ctx, req)) + require.True(t, called) + }) + + t.Run("sql", func(t *testing.T) { + base := sqldb.NewTestSqliteDB(t).BaseDB + db := sqldb.NewTransactionExecutor( + base, func(tx *sql.Tx) *sqlc.Queries { + return base.WithTx(tx) + }, + ) + + // Create a SQL scheduler with a long batch interval. + scheduler := NewTimeScheduler[*sqlc.Queries]( + db, nil, time.Second, + ) + + // writeRecord is a helper that adds a single new invoice to the + // database. It uses the 'i' argument to create a unique hash + // for the invoice. + writeRecord := func(t *testing.T, tx *sqlc.Queries, i int64) { + var hash [8]byte + binary.BigEndian.PutUint64(hash[:], uint64(i)) + + _, err := tx.InsertInvoice( + ctx, sqlc.InsertInvoiceParams{ + Hash: hash[:], + PaymentAddr: hash[:], + PaymentRequestHash: hash[:], + Expiry: -123, + }, + ) + require.NoError(t, err) + } + + // readRecord is a helper that reads a single invoice from the + // database. It uses the 'i' argument to create a unique hash + // for the invoice. + readRecord := func(t *testing.T, tx *sqlc.Queries, + i int) error { + + var hash [8]byte + binary.BigEndian.PutUint64(hash[:], uint64(i)) + + _, err := tx.GetInvoiceByHash(ctx, hash[:]) + + return err + } + + // Execute a bunch of read-only requests in parallel. These + // should be batched together and kept as read only. + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + + req := &Request[*sqlc.Queries]{ + Opts: NewSchedulerOptions(ReadOnly()), + Do: func(tx *sqlc.Queries) error { + err := readRecord(t, tx, i) + require.ErrorIs( + t, err, sql.ErrNoRows, + ) + + return nil + }, + } + require.NoError(t, scheduler.Execute(ctx, req)) + }(i) + } + wg.Wait() + + // Now, execute reads and writes in parallel. These should be + // batched together and the tx should be updated to read-write. + // We just simulate this scenario. Write transactions succeeding + // are how we know that the tx was upgraded to read-write. + for i := 0; i < 100; i++ { + // Spin off the writes. + wg.Add(1) + go func(i int) { + defer wg.Done() + + req := &Request[*sqlc.Queries]{ + Do: func(tx *sqlc.Queries) error { + writeRecord(t, tx, int64(i)) + + return nil + }, + } + require.NoError(t, scheduler.Execute(ctx, req)) + }(i) + + // Spin off the reads. + wg.Add(1) + go func(i int) { + defer wg.Done() + + errExpected := func(err error) { + noRows := errors.Is(err, sql.ErrNoRows) + require.True(t, err == nil || noRows) + } + + req := &Request[*sqlc.Queries]{ + Opts: NewSchedulerOptions(ReadOnly()), + Do: func(tx *sqlc.Queries) error { + err := readRecord(t, tx, i) + errExpected(err) + + return nil + }, + } + require.NoError(t, scheduler.Execute(ctx, req)) + }(i) + } + wg.Wait() + }) +} + // BenchmarkBoltBatching benchmarks the performance of the batch scheduler // against the bolt backend. func BenchmarkBoltBatching(b *testing.B) { @@ -221,7 +480,7 @@ func BenchmarkBoltBatching(b *testing.B) { Opts: NewSchedulerOptions( opts..., ), - Update: func(tx kvdb.RwTx) error { + Do: func(tx kvdb.RwTx) error { writeRecord(b, tx) return nil }, @@ -291,7 +550,7 @@ func benchmarkSQLBatching(b *testing.B, sqlite bool) { } ctx := context.Background() - var opts writeOpts + var opts txOpts // writeRecord is a helper that adds a single new invoice to the // database. It uses the 'i' argument to create a unique hash for the @@ -320,7 +579,7 @@ func benchmarkSQLBatching(b *testing.B, sqlite bool) { binary.BigEndian.PutUint64(hash[:], uint64(N-1)) err := tx.ExecTx( - ctx, &writeOpts{}, func(queries *sqlc.Queries) error { + ctx, &txOpts{}, func(queries *sqlc.Queries) error { _, err := queries.GetInvoiceByHash(ctx, hash[:]) require.NoError(b, err) @@ -406,7 +665,7 @@ func benchmarkSQLBatching(b *testing.B, sqlite bool) { Opts: NewSchedulerOptions( opts..., ), - Update: func(tx *sqlc.Queries) error { + Do: func(tx *sqlc.Queries) error { writeRecord(b, tx, int64(j)) return nil }, diff --git a/batch/interface.go b/batch/interface.go index aad02c8c3..adb447ca7 100644 --- a/batch/interface.go +++ b/batch/interface.go @@ -16,10 +16,10 @@ type Request[Q any] struct { // NOTE: This field is optional. Reset func() - // Update is applied alongside other operations in the batch. + // Do is applied alongside other operations in the batch. // // NOTE: This method MUST NOT acquire any mutexes. - Update func(tx Q) error + Do func(tx Q) error // OnCommit is called if the batch or a subset of the batch including // this request all succeeded without failure. The passed error should @@ -36,12 +36,17 @@ type SchedulerOptions struct { // request when it comes in. This means that it can be scheduled later, // allowing larger batches. Lazy bool + + // ReadOnly should be true if the request is read-only. By default, + // this is false. + ReadOnly bool } // NewDefaultSchedulerOpts returns a new SchedulerOptions with default values. func NewDefaultSchedulerOpts() *SchedulerOptions { return &SchedulerOptions{ - Lazy: false, + Lazy: false, + ReadOnly: false, } } @@ -68,6 +73,16 @@ func LazyAdd() SchedulerOption { } } +// ReadOnly will mark the request as read-only. This means that the +// transaction will be executed in read-only mode, and no changes will be +// made to the database. If any requests in the same batch are not read-only, +// then the entire batch will be executed in read-write mode. +func ReadOnly() SchedulerOption { + return func(opts *SchedulerOptions) { + opts.ReadOnly = true + } +} + // Scheduler abstracts a generic batching engine that accumulates an incoming // set of Requests, executes them, and returns the error from the operation. type Scheduler[Q any] interface { diff --git a/batch/kvdb.go b/batch/kvdb.go index d9b969b78..2e3ea10a9 100644 --- a/batch/kvdb.go +++ b/batch/kvdb.go @@ -26,7 +26,16 @@ func (t *BoltBatcher[Q]) ExecTx(_ context.Context, opts sqldb.TxOptions, txBody func(Q) error, reset func()) error { if opts.ReadOnly() { - return fmt.Errorf("read-only transactions not supported") + return kvdb.View(t.db, func(tx kvdb.RTx) error { + q, ok := any(tx).(Q) + if !ok { + return fmt.Errorf("unable to cast tx(%T) "+ + "into the type expected by the "+ + "BoltBatcher(%T)", tx, t) + } + + return txBody(q) + }, reset) } return kvdb.Update(t.db, func(tx kvdb.RwTx) error { diff --git a/batch/scheduler.go b/batch/scheduler.go index af7a68a4b..dbd319f30 100644 --- a/batch/scheduler.go +++ b/batch/scheduler.go @@ -37,12 +37,6 @@ func NewTimeScheduler[Q any](db sqldb.BatchedTx[Q], locker sync.Locker, } } -type writeOpts struct{} - -func (*writeOpts) ReadOnly() bool { - return false -} - // Execute schedules the provided request for batch execution along with other // concurrent requests. The request will be executed within a fixed horizon, // parameterizeed by the duration of the scheduler. The error from the @@ -67,6 +61,13 @@ func (s *TimeScheduler[Q]) Execute(ctx context.Context, r *Request[Q]) error { db: s.db, clear: s.clear, locker: s.locker, + + // By default, we assume that the batch is read-only, + // and we only upgrade it to read-write if a request + // is added that is not read-only. + txOpts: txOpts{ + readOnly: true, + }, } trigger := s.b.trigger time.AfterFunc(s.duration, func() { @@ -75,11 +76,22 @@ func (s *TimeScheduler[Q]) Execute(ctx context.Context, r *Request[Q]) error { } s.b.reqs = append(s.b.reqs, &req) + // We only upgrade the batch to read-write if the new request is not + // read-only. If it is already read-write, we don't need to do anything. + if s.b.txOpts.readOnly && !r.Opts.ReadOnly { + s.b.txOpts.readOnly = false + } + // If this is a non-lazy request, we'll execute the batch immediately. if !r.Opts.Lazy { go s.b.trigger(ctx) } + // We need to grab a reference to the batch's txOpts so that we can + // pass it before we unlock the scheduler's mutex since the batch may + // be set to nil before we access the txOpts below. + txOpts := s.b.txOpts + s.mu.Unlock() // Wait for the batch to process the request. If the batch didn't @@ -97,9 +109,8 @@ func (s *TimeScheduler[Q]) Execute(ctx context.Context, r *Request[Q]) error { } // Otherwise, run the request on its own. - var writeTx writeOpts - commitErr := s.db.ExecTx(ctx, &writeTx, func(tx Q) error { - return req.Update(tx) + commitErr := s.db.ExecTx(ctx, &txOpts, func(tx Q) error { + return req.Do(tx) }, func() { if req.Reset != nil { req.Reset() diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index 05ca6d435..3e1e009e2 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -861,7 +861,7 @@ func (c *KVStore) AddLightningNode(node *models.LightningNode, r := &batch.Request[kvdb.RwTx]{ Opts: batch.NewSchedulerOptions(opts...), - Update: func(tx kvdb.RwTx) error { + Do: func(tx kvdb.RwTx) error { return addLightningNode(tx, node) }, } @@ -1002,7 +1002,7 @@ func (c *KVStore) AddChannelEdge(edge *models.ChannelEdgeInfo, Reset: func() { alreadyExists = false }, - Update: func(tx kvdb.RwTx) error { + Do: func(tx kvdb.RwTx) error { err := c.addChannelEdge(tx, edge) // Silence ErrEdgeAlreadyExist so that the batch can @@ -2712,7 +2712,7 @@ func (c *KVStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, isUpdate1 = false edgeNotFound = false }, - Update: func(tx kvdb.RwTx) error { + Do: func(tx kvdb.RwTx) error { var err error from, to, isUpdate1, err = updateEdgePolicy(tx, edge) if err != nil {