kvdb: add ForAll

A new method to allow efficient range queries for backends that support
it.
This commit is contained in:
Joost Jager 2021-12-24 11:29:16 +01:00
parent 102a1cbaaa
commit b8408a1484
No known key found for this signature in database
GPG Key ID: A61B9D4C393C59C7
7 changed files with 114 additions and 3 deletions

2
go.mod
View File

@ -46,7 +46,7 @@ require (
github.com/lightningnetwork/lnd/cert v1.1.0
github.com/lightningnetwork/lnd/clock v1.1.0
github.com/lightningnetwork/lnd/healthcheck v1.2.0
github.com/lightningnetwork/lnd/kvdb v1.2.5
github.com/lightningnetwork/lnd/kvdb v1.3.0
github.com/lightningnetwork/lnd/queue v1.1.0
github.com/lightningnetwork/lnd/ticker v1.1.0
github.com/ltcsuite/ltcd v0.0.0-20190101042124-f37f8bf35796

View File

@ -406,3 +406,9 @@ func (b *readWriteBucket) Prefetch(paths ...[]string) {
b.tx.stm.Prefetch(flattenMap(keys), flattenMap(ranges))
}
// ForAll is an optimized version of ForEach with the limitation that no
// additional queries can be executed within the callback.
func (b *readWriteBucket) ForAll(cb func(k, v []byte) error) error {
return b.ForEach(cb)
}

View File

@ -109,6 +109,12 @@ type ExtendedRBucket interface {
// Prefetch will attempt to prefetch all values under a path.
Prefetch(paths ...[]string)
// ForAll is an optimized version of ForEach.
//
// NOTE: ForAll differs from ForEach in that no additional queries can
// be executed within the callback.
ForAll(func(k, v []byte) error) error
}
// Prefetch will attempt to prefetch all values under a path from the passed
@ -119,6 +125,16 @@ func Prefetch(b RBucket, paths ...[]string) {
}
}
// ForAll is an optimized version of ForEach with the limitation that no
// additional queries can be executed within the callback.
func ForAll(b RBucket, cb func(k, v []byte) error) error {
if bucket, ok := b.(ExtendedRBucket); ok {
return bucket.ForAll(cb)
}
return b.ForEach(cb)
}
// RootBucket is a wrapper to ExtendedRTx.RootBucket which does nothing if
// the implementation doesn't have ExtendedRTx.
func RootBucket(t RTx) RBucket {

View File

@ -427,3 +427,36 @@ func (b *readWriteBucket) Sequence() uint64 {
return uint64(seq)
}
// Prefetch will attempt to prefetch all values under a path from the passed
// bucket.
func (b *readWriteBucket) Prefetch(paths ...[]string) {}
// ForAll is an optimized version of ForEach with the limitation that no
// additional queries can be executed within the callback.
func (b *readWriteBucket) ForAll(cb func(k, v []byte) error) error {
rows, cancel, err := b.tx.Query(
"SELECT key, value FROM " + b.table + " WHERE " +
parentSelector(b.id) + " ORDER BY key",
)
if err != nil {
return err
}
defer cancel()
for rows.Next() {
var key, value []byte
err := rows.Scan(&key, &value)
if err != nil {
return err
}
err = cb(key, value)
if err != nil {
return err
}
}
return nil
}

View File

@ -175,6 +175,21 @@ func (tx *readWriteTx) QueryRow(query string, args ...interface{}) (*sql.Row,
return tx.tx.QueryRowContext(ctx, query, args...), cancel
}
// Query executes a multi-row query call with a timeout context.
func (tx *readWriteTx) Query(query string, args ...interface{}) (*sql.Rows,
func(), error) {
ctx, cancel := tx.db.getTimeoutCtx()
rows, err := tx.tx.QueryContext(ctx, query, args...)
if err != nil {
cancel()
return nil, func() {}, err
}
return rows, cancel, nil
}
// Exec executes a Exec call with a timeout context.
func (tx *readWriteTx) Exec(query string, args ...interface{}) (sql.Result,
error) {

View File

@ -84,7 +84,35 @@ func TestPostgres(t *testing.T) {
},
{
name: "bucket for each",
test: testBucketForEach,
test: func(t *testing.T, db walletdb.DB) {
testBucketIterator(t, db, func(bucket walletdb.ReadWriteBucket,
callback func(key, val []byte) error) error {
return bucket.ForEach(callback)
})
},
expectedDb: m{
"test_kv": []m{
{"id": int64(1), "key": "apple", "parent_id": nil, "sequence": nil, "value": nil},
{"id": int64(2), "key": "banana", "parent_id": int64(1), "sequence": nil, "value": nil},
{"id": int64(3), "key": "key1", "parent_id": int64(1), "sequence": nil, "value": "val1"},
{"id": int64(4), "key": "key1", "parent_id": int64(2), "sequence": nil, "value": "val1"},
{"id": int64(5), "key": "key2", "parent_id": int64(1), "sequence": nil, "value": "val2"},
{"id": int64(6), "key": "key2", "parent_id": int64(2), "sequence": nil, "value": "val2"},
{"id": int64(7), "key": "key3", "parent_id": int64(1), "sequence": nil, "value": "val3"},
{"id": int64(8), "key": "key3", "parent_id": int64(2), "sequence": nil, "value": "val3"},
},
},
},
{
name: "bucket for all",
test: func(t *testing.T, db walletdb.DB) {
testBucketIterator(t, db, func(bucket walletdb.ReadWriteBucket,
callback func(key, val []byte) error) error {
return ForAll(bucket, callback)
})
},
expectedDb: m{
"test_kv": []m{
{"id": int64(1), "key": "apple", "parent_id": nil, "sequence": nil, "value": nil},

View File

@ -159,7 +159,20 @@ func testBucketDeletion(t *testing.T, db walletdb.DB) {
require.Nil(t, err)
}
type bucketIterator = func(walletdb.ReadWriteBucket,
func(key, val []byte) error) error
func testBucketForEach(t *testing.T, db walletdb.DB) {
testBucketIterator(t, db, func(bucket walletdb.ReadWriteBucket,
callback func(key, val []byte) error) error {
return bucket.ForEach(callback)
})
}
func testBucketIterator(t *testing.T, db walletdb.DB,
iterator bucketIterator) {
err := Update(db, func(tx walletdb.ReadWriteTx) error {
// "apple"
apple, err := tx.CreateTopLevelBucket([]byte("apple"))
@ -199,7 +212,7 @@ func testBucketForEach(t *testing.T, db walletdb.DB) {
require.Equal(t, expected, got)
got = make(map[string]string)
err = banana.ForEach(func(key, val []byte) error {
err = iterator(banana, func(key, val []byte) error {
got[string(key)] = string(val)
return nil
})