mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-10-11 10:33:12 +02:00
Here, a new query (GetChannelsByOutpoints) is added which makes use of the /*SLICE:outpoints*/ directive & added workaround. This is then used in a test to demonstrate how the ExecutePagedQuery helper can be used to wrap a query like this such that calls are done in pages. The query that has been added will also be used by live code paths in an upcoming commit.
316 lines
7.4 KiB
Go
316 lines
7.4 KiB
Go
//go:build test_db_postgres || test_db_sqlite
|
|
|
|
package sqldb
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"testing"
|
|
|
|
"github.com/lightningnetwork/lnd/sqldb/sqlc"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// TestExecutePagedQuery tests the ExecutePagedQuery function which processes
|
|
// items in pages, allowing for efficient querying and processing of large
|
|
// datasets.
|
|
func TestExecutePagedQuery(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := context.Background()
|
|
|
|
t.Run("empty input returns nil", func(t *testing.T) {
|
|
var (
|
|
cfg = DefaultPagedQueryConfig()
|
|
inputItems []int
|
|
)
|
|
|
|
convertFunc := func(i int) string {
|
|
return fmt.Sprintf("%d", i)
|
|
}
|
|
|
|
queryFunc := func(ctx context.Context, items []string) (
|
|
[]string, error) {
|
|
|
|
require.Fail(t, "queryFunc should not be called "+
|
|
"with empty input")
|
|
return nil, nil
|
|
}
|
|
callback := func(ctx context.Context, result string) error {
|
|
require.Fail(t, "callback should not be called with "+
|
|
"empty input")
|
|
|
|
return nil
|
|
}
|
|
|
|
err := ExecutePagedQuery(
|
|
ctx, cfg, inputItems, convertFunc, queryFunc, callback,
|
|
)
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
t.Run("single page processes all items", func(t *testing.T) {
|
|
var (
|
|
convertedItems []string
|
|
callbackResults []string
|
|
inputItems = []int{1, 2, 3, 4, 5}
|
|
cfg = &PagedQueryConfig{
|
|
PageSize: 10,
|
|
}
|
|
)
|
|
|
|
convertFunc := func(i int) string {
|
|
return fmt.Sprintf("converted_%d", i)
|
|
}
|
|
|
|
queryFunc := func(ctx context.Context,
|
|
items []string) ([]string, error) {
|
|
|
|
convertedItems = append(convertedItems, items...)
|
|
results := make([]string, len(items))
|
|
for i, item := range items {
|
|
results[i] = fmt.Sprintf("result_%s", item)
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
callback := func(ctx context.Context, result string) error {
|
|
callbackResults = append(callbackResults, result)
|
|
return nil
|
|
}
|
|
|
|
err := ExecutePagedQuery(
|
|
ctx, cfg, inputItems, convertFunc, queryFunc, callback,
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, []string{
|
|
"converted_1", "converted_2", "converted_3",
|
|
"converted_4", "converted_5",
|
|
}, convertedItems)
|
|
|
|
require.Equal(t, []string{
|
|
"result_converted_1", "result_converted_2",
|
|
"result_converted_3", "result_converted_4",
|
|
"result_converted_5",
|
|
}, callbackResults)
|
|
})
|
|
|
|
t.Run("multiple pages process correctly", func(t *testing.T) {
|
|
var (
|
|
queryCallCount int
|
|
pageSizes []int
|
|
allResults []string
|
|
inputItems = []int{1, 2, 3, 4, 5, 6, 7, 8}
|
|
cfg = &PagedQueryConfig{
|
|
PageSize: 3,
|
|
}
|
|
)
|
|
|
|
convertFunc := func(i int) string {
|
|
return fmt.Sprintf("item_%d", i)
|
|
}
|
|
|
|
queryFunc := func(ctx context.Context,
|
|
items []string) ([]string, error) {
|
|
|
|
queryCallCount++
|
|
pageSizes = append(pageSizes, len(items))
|
|
results := make([]string, len(items))
|
|
for i, item := range items {
|
|
results[i] = fmt.Sprintf("result_%s", item)
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
callback := func(ctx context.Context, result string) error {
|
|
allResults = append(allResults, result)
|
|
return nil
|
|
}
|
|
|
|
err := ExecutePagedQuery(
|
|
ctx, cfg, inputItems, convertFunc, queryFunc, callback,
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
// Should have 3 pages: [1,2,3], [4,5,6], [7,8]
|
|
require.Equal(t, 3, queryCallCount)
|
|
require.Equal(t, []int{3, 3, 2}, pageSizes)
|
|
require.Len(t, allResults, 8)
|
|
})
|
|
|
|
t.Run("query function error is propagated", func(t *testing.T) {
|
|
var (
|
|
cfg = DefaultPagedQueryConfig()
|
|
inputItems = []int{1, 2, 3}
|
|
)
|
|
|
|
convertFunc := func(i int) string {
|
|
return fmt.Sprintf("%d", i)
|
|
}
|
|
|
|
queryFunc := func(ctx context.Context,
|
|
items []string) ([]string, error) {
|
|
|
|
return nil, errors.New("query failed")
|
|
}
|
|
|
|
callback := func(ctx context.Context, result string) error {
|
|
require.Fail(t, "callback should not be called when "+
|
|
"query fails")
|
|
|
|
return nil
|
|
}
|
|
|
|
err := ExecutePagedQuery(
|
|
ctx, cfg, inputItems, convertFunc, queryFunc, callback,
|
|
)
|
|
require.ErrorContains(t, err, "query failed for page "+
|
|
"starting at 0: query failed")
|
|
})
|
|
|
|
t.Run("callback error is propagated", func(t *testing.T) {
|
|
var (
|
|
cfg = DefaultPagedQueryConfig()
|
|
inputItems = []int{1, 2, 3}
|
|
)
|
|
|
|
convertFunc := func(i int) string {
|
|
return fmt.Sprintf("%d", i)
|
|
}
|
|
|
|
queryFunc := func(ctx context.Context,
|
|
items []string) ([]string, error) {
|
|
|
|
return items, nil
|
|
}
|
|
|
|
callback := func(ctx context.Context, result string) error {
|
|
if result == "2" {
|
|
return errors.New("callback failed")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
err := ExecutePagedQuery(
|
|
ctx, cfg, inputItems, convertFunc, queryFunc, callback,
|
|
)
|
|
require.ErrorContains(t, err, "callback failed for result: "+
|
|
"callback failed")
|
|
})
|
|
|
|
t.Run("query error in second page is propagated", func(t *testing.T) {
|
|
var (
|
|
inputItems = []int{1, 2, 3, 4}
|
|
cfg = &PagedQueryConfig{
|
|
PageSize: 2,
|
|
}
|
|
queryCallCount int
|
|
)
|
|
|
|
convertFunc := func(i int) string {
|
|
return fmt.Sprintf("%d", i)
|
|
}
|
|
|
|
queryFunc := func(ctx context.Context,
|
|
items []string) ([]string, error) {
|
|
|
|
queryCallCount++
|
|
if queryCallCount == 2 {
|
|
return nil, fmt.Errorf("second page failed")
|
|
}
|
|
|
|
return items, nil
|
|
}
|
|
|
|
callback := func(ctx context.Context, result string) error {
|
|
return nil
|
|
}
|
|
|
|
err := ExecutePagedQuery(
|
|
ctx, cfg, inputItems, convertFunc, queryFunc, callback,
|
|
)
|
|
require.ErrorContains(t, err, "query failed for page "+
|
|
"starting at 2: second page failed")
|
|
})
|
|
}
|
|
|
|
// TestSQLSliceQueries tests ExecutePageQuery helper by first showing that a
|
|
// query the /*SLICE:<field_name>*/ directive has a maximum number of
|
|
// parameters it can handle, and then showing that the paginated version which
|
|
// uses ExecutePagedQuery instead of a raw query can handle more parameters by
|
|
// executing the query in pages.
|
|
func TestSQLSliceQueries(t *testing.T) {
|
|
t.Parallel()
|
|
ctx := context.Background()
|
|
|
|
db := NewTestDB(t)
|
|
|
|
// Increase the number of query strings by an order of magnitude each
|
|
// iteration until we hit the limit of the backing DB.
|
|
//
|
|
// NOTE: from testing, the following limits have been noted:
|
|
// - for Postgres, the limit is 65535 parameters.
|
|
// - for SQLite, the limit is 32766 parameters.
|
|
x := 10
|
|
var queryParams []string
|
|
for {
|
|
for len(queryParams) < x {
|
|
queryParams = append(
|
|
queryParams,
|
|
fmt.Sprintf("%d", len(queryParams)),
|
|
)
|
|
}
|
|
|
|
_, err := db.GetChannelsByOutpoints(ctx, queryParams)
|
|
if err != nil {
|
|
if isSQLite {
|
|
require.ErrorContains(
|
|
t, err, "SQL logic error: too many "+
|
|
"SQL variables",
|
|
)
|
|
} else {
|
|
require.ErrorContains(
|
|
t, err, "extended protocol limited "+
|
|
"to 65535 parameters",
|
|
)
|
|
}
|
|
break
|
|
}
|
|
|
|
x *= 10
|
|
|
|
// Just to make sure that the test doesn't carry on too long,
|
|
// we assert that we don't exceed a reasonable limit.
|
|
require.LessOrEqual(t, x, 100000)
|
|
}
|
|
|
|
// Now that we have found the limit that the raw query can handle, we
|
|
// switch to the wrapped version which will perform the query in pages
|
|
// so that the limit is not hit. We use the same number of query params
|
|
// that caused the error above.
|
|
queryWrapper := func(ctx context.Context,
|
|
pageOutpoints []string) ([]sqlc.GetChannelsByOutpointsRow,
|
|
error) {
|
|
|
|
return db.GetChannelsByOutpoints(ctx, pageOutpoints)
|
|
}
|
|
|
|
err := ExecutePagedQuery(
|
|
ctx,
|
|
DefaultPagedQueryConfig(),
|
|
queryParams,
|
|
func(s string) string {
|
|
return s
|
|
},
|
|
queryWrapper,
|
|
func(context.Context, sqlc.GetChannelsByOutpointsRow) error {
|
|
return nil
|
|
},
|
|
)
|
|
require.NoError(t, err)
|
|
}
|