Files
lnd/sqldb/paginate_test.go
Boris Nagaev dee8ad3754 multi: context.Background() -> t.Context()
Use the new feature of Go 1.24, fix linter warnings.

This change was produced by:
 - running golangci-lint run --fix
 - sed 's/context.Background/t.Context/' -i `git grep -l context.Background | grep test.go`
 - manually fixing broken tests
 - itest, lntest: use ht.Context() where ht or hn is available
 - in HarnessNode.Stop() we keep using context.Background(), because it is
   called from a cleanup handler in which t.Context() is canceled already.
2025-08-30 14:13:44 -03:00

1097 lines
24 KiB
Go

//go:build test_db_postgres || test_db_sqlite
package sqldb
import (
"context"
"errors"
"fmt"
"testing"
"github.com/lightningnetwork/lnd/sqldb/sqlc"
"github.com/stretchr/testify/require"
)
// TestExecuteBatchQuery tests the ExecuteBatchQuery function which processes
// items in pages, allowing for efficient querying and processing of large
// datasets.
func TestExecuteBatchQuery(t *testing.T) {
t.Parallel()
ctx := t.Context()
t.Run("empty input returns nil", func(t *testing.T) {
var (
cfg = DefaultSQLiteConfig()
inputItems []int
)
convertFunc := func(i int) string {
return fmt.Sprintf("%d", i)
}
queryFunc := func(ctx context.Context, items []string) (
[]string, error) {
require.Fail(t, "queryFunc should not be called "+
"with empty input")
return nil, nil
}
callback := func(ctx context.Context, result string) error {
require.Fail(t, "callback should not be called with "+
"empty input")
return nil
}
err := ExecuteBatchQuery(
ctx, cfg, inputItems, convertFunc, queryFunc, callback,
)
require.NoError(t, err)
})
t.Run("single page processes all items", func(t *testing.T) {
var (
convertedItems []string
callbackResults []string
inputItems = []int{1, 2, 3, 4, 5}
cfg = &QueryConfig{
MaxBatchSize: 10,
}
)
convertFunc := func(i int) string {
return fmt.Sprintf("converted_%d", i)
}
queryFunc := func(ctx context.Context,
items []string) ([]string, error) {
convertedItems = append(convertedItems, items...)
results := make([]string, len(items))
for i, item := range items {
results[i] = fmt.Sprintf("result_%s", item)
}
return results, nil
}
callback := func(ctx context.Context, result string) error {
callbackResults = append(callbackResults, result)
return nil
}
err := ExecuteBatchQuery(
ctx, cfg, inputItems, convertFunc, queryFunc, callback,
)
require.NoError(t, err)
require.Equal(t, []string{
"converted_1", "converted_2", "converted_3",
"converted_4", "converted_5",
}, convertedItems)
require.Equal(t, []string{
"result_converted_1", "result_converted_2",
"result_converted_3", "result_converted_4",
"result_converted_5",
}, callbackResults)
})
t.Run("multiple pages process correctly", func(t *testing.T) {
var (
queryCallCount int
pageSizes []int
allResults []string
inputItems = []int{1, 2, 3, 4, 5, 6, 7, 8}
cfg = &QueryConfig{
MaxBatchSize: 3,
}
)
convertFunc := func(i int) string {
return fmt.Sprintf("item_%d", i)
}
queryFunc := func(ctx context.Context,
items []string) ([]string, error) {
queryCallCount++
pageSizes = append(pageSizes, len(items))
results := make([]string, len(items))
for i, item := range items {
results[i] = fmt.Sprintf("result_%s", item)
}
return results, nil
}
callback := func(ctx context.Context, result string) error {
allResults = append(allResults, result)
return nil
}
err := ExecuteBatchQuery(
ctx, cfg, inputItems, convertFunc, queryFunc, callback,
)
require.NoError(t, err)
// Should have 3 pages: [1,2,3], [4,5,6], [7,8]
require.Equal(t, 3, queryCallCount)
require.Equal(t, []int{3, 3, 2}, pageSizes)
require.Len(t, allResults, 8)
})
t.Run("query function error is propagated", func(t *testing.T) {
var (
cfg = DefaultSQLiteConfig()
inputItems = []int{1, 2, 3}
)
convertFunc := func(i int) string {
return fmt.Sprintf("%d", i)
}
queryFunc := func(ctx context.Context,
items []string) ([]string, error) {
return nil, errors.New("query failed")
}
callback := func(ctx context.Context, result string) error {
require.Fail(t, "callback should not be called when "+
"query fails")
return nil
}
err := ExecuteBatchQuery(
ctx, cfg, inputItems, convertFunc, queryFunc, callback,
)
require.ErrorContains(t, err, "query failed for page "+
"starting at 0: query failed")
})
t.Run("callback error is propagated", func(t *testing.T) {
var (
cfg = DefaultSQLiteConfig()
inputItems = []int{1, 2, 3}
)
convertFunc := func(i int) string {
return fmt.Sprintf("%d", i)
}
queryFunc := func(ctx context.Context,
items []string) ([]string, error) {
return items, nil
}
callback := func(ctx context.Context, result string) error {
if result == "2" {
return errors.New("callback failed")
}
return nil
}
err := ExecuteBatchQuery(
ctx, cfg, inputItems, convertFunc, queryFunc, callback,
)
require.ErrorContains(t, err, "callback failed for result: "+
"callback failed")
})
t.Run("query error in second page is propagated", func(t *testing.T) {
var (
inputItems = []int{1, 2, 3, 4}
cfg = &QueryConfig{
MaxBatchSize: 2,
}
queryCallCount int
)
convertFunc := func(i int) string {
return fmt.Sprintf("%d", i)
}
queryFunc := func(ctx context.Context,
items []string) ([]string, error) {
queryCallCount++
if queryCallCount == 2 {
return nil, fmt.Errorf("second page failed")
}
return items, nil
}
callback := func(ctx context.Context, result string) error {
return nil
}
err := ExecuteBatchQuery(
ctx, cfg, inputItems, convertFunc, queryFunc, callback,
)
require.ErrorContains(t, err, "query failed for page "+
"starting at 2: second page failed")
})
}
// TestSQLSliceQueries tests ExecuteBatchQuery helper by first showing that a
// query the /*SLICE:<field_name>*/ directive has a maximum number of
// parameters it can handle, and then showing that the paginated version which
// uses ExecuteBatchQuery instead of a raw query can handle more parameters by
// executing the query in pages.
func TestSQLSliceQueries(t *testing.T) {
t.Parallel()
ctx := t.Context()
db := NewTestDB(t)
// Increase the number of query strings by an order of magnitude each
// iteration until we hit the limit of the backing DB.
//
// NOTE: from testing, the following limits have been noted:
// - for Postgres, the limit is 65535 parameters.
// - for SQLite, the limit is 32766 parameters.
x := 10
var queryParams []string
for {
for len(queryParams) < x {
queryParams = append(
queryParams,
fmt.Sprintf("%d", len(queryParams)),
)
}
_, err := db.GetChannelsByOutpoints(ctx, queryParams)
if err != nil {
if isSQLite {
require.ErrorContains(
t, err, "SQL logic error: too many "+
"SQL variables",
)
} else {
require.ErrorContains(
t, err, "extended protocol limited "+
"to 65535 parameters",
)
}
break
}
// If it succeeded, we expect it to be under the maximum that
// we expect for this DB.
if isSQLite {
require.LessOrEqual(t, x, maxSQLiteBatchSize,
"SQLite should not exceed 32766 parameters")
} else {
require.LessOrEqual(t, x, maxPostgresBatchSize,
"Postgres should not exceed 65535 parameters")
}
x *= 10
}
// Now that we have found the limit that the raw query can handle, we
// switch to the wrapped version which will perform the query in pages
// so that the limit is not hit. We use the same number of query params
// that caused the error above.
queryWrapper := func(ctx context.Context,
pageOutpoints []string) ([]sqlc.GetChannelsByOutpointsRow,
error) {
return db.GetChannelsByOutpoints(ctx, pageOutpoints)
}
err := ExecuteBatchQuery(
ctx,
DefaultSQLiteConfig(),
queryParams,
func(s string) string {
return s
},
queryWrapper,
func(context.Context, sqlc.GetChannelsByOutpointsRow) error {
return nil
},
)
require.NoError(t, err)
}
// TestExecutePaginatedQuery tests the ExecutePaginatedQuery function which
// processes items in pages, allowing for efficient querying and processing of
// large datasets. It simulates a cursor-based pagination system where items
// are fetched in pages, processed, and the cursor is updated for the next
// page until all items are processed or an error occurs.
func TestExecutePaginatedQuery(t *testing.T) {
t.Parallel()
ctx := t.Context()
type testItem struct {
id int64
name string
}
type testResult struct {
itemID int64
value string
}
tests := []struct {
name string
pageSize uint32
allItems []testItem
initialCursor int64
queryError error
// Which call number to return error on (0 = never).
queryErrorOnCall int
processError error
// Which item ID to fail processing on (0 = never).
processErrorOnID int64
expectedError string
expectedResults []testResult
expectedPages int
}{
{
name: "happy path multiple pages",
pageSize: 2,
allItems: []testItem{
{id: 1, name: "Item1"},
{id: 2, name: "Item2"},
{id: 3, name: "Item3"},
{id: 4, name: "Item4"},
{id: 5, name: "Item5"}},
initialCursor: 0,
expectedResults: []testResult{
{itemID: 1, value: "Processed-Item1"},
{itemID: 2, value: "Processed-Item2"},
{itemID: 3, value: "Processed-Item3"},
{itemID: 4, value: "Processed-Item4"},
{itemID: 5, value: "Processed-Item5"},
},
expectedPages: 3, // 2+2+1 items across 3 pages.
},
{
name: "empty results",
pageSize: 10,
allItems: []testItem{},
initialCursor: 0,
expectedPages: 1, // One call that returns empty.
},
{
name: "single page",
pageSize: 10,
allItems: []testItem{
{id: 1, name: "OnlyItem"},
},
initialCursor: 0,
expectedResults: []testResult{
{itemID: 1, value: "Processed-OnlyItem"},
},
// The first page returns less than the max size,
// indicating no more items to fetch after that.
expectedPages: 1,
},
{
name: "query error first call",
pageSize: 2,
allItems: []testItem{
{id: 1, name: "Item1"},
},
initialCursor: 0,
queryError: errors.New(
"database connection failed",
),
queryErrorOnCall: 1,
expectedError: "failed to fetch page with cursor 0",
expectedPages: 1,
},
{
name: "query error second call",
pageSize: 1,
allItems: []testItem{
{id: 1, name: "Item1"},
{id: 2, name: "Item2"},
},
initialCursor: 0,
queryError: errors.New(
"database error on second page",
),
queryErrorOnCall: 2,
expectedError: "failed to fetch page with cursor 1",
// First item processed before error.
expectedResults: []testResult{
{itemID: 1, value: "Processed-Item1"},
},
expectedPages: 2,
},
{
name: "process error first item",
pageSize: 10,
allItems: []testItem{
{id: 1, name: "Item1"}, {id: 2, name: "Item2"},
},
initialCursor: 0,
processError: errors.New("processing failed"),
processErrorOnID: 1,
expectedError: "failed to process item",
// No results since first item failed.
expectedPages: 1,
},
{
name: "process error second item",
pageSize: 10,
allItems: []testItem{
{id: 1, name: "Item1"}, {id: 2, name: "Item2"},
},
initialCursor: 0,
processError: errors.New("processing failed"),
processErrorOnID: 2,
expectedError: "failed to process item",
// First item processed before error.
expectedResults: []testResult{
{itemID: 1, value: "Processed-Item1"},
},
expectedPages: 1,
},
{
name: "different initial cursor",
pageSize: 2,
allItems: []testItem{
{id: 1, name: "Item1"},
{id: 2, name: "Item2"},
{id: 3, name: "Item3"},
},
// Start from ID > 1.
initialCursor: 1,
expectedResults: []testResult{
{itemID: 2, value: "Processed-Item2"},
{itemID: 3, value: "Processed-Item3"},
},
// 2+0 items across 2 pages.
expectedPages: 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
var (
processedResults []testResult
queryCallCount int
cfg = &QueryConfig{
MaxPageSize: tt.pageSize,
}
)
queryFunc := func(ctx context.Context, cursor int64,
limit int32) ([]testItem, error) {
queryCallCount++
// Return error on specific call if configured.
if tt.queryErrorOnCall > 0 &&
queryCallCount == tt.queryErrorOnCall {
return nil, tt.queryError
}
// Simulate cursor-based pagination
var items []testItem
for _, item := range tt.allItems {
if item.id > cursor &&
len(items) < int(limit) {
items = append(items, item)
}
}
return items, nil
}
extractCursor := func(item testItem) int64 {
return item.id
}
processItem := func(ctx context.Context,
item testItem) error {
// Return error on specific item if configured.
if tt.processErrorOnID > 0 &&
item.id == tt.processErrorOnID {
return tt.processError
}
processedResults = append(
processedResults, testResult{
itemID: item.id,
value: fmt.Sprintf(
"Processed-%s",
item.name,
),
},
)
return nil
}
err := ExecutePaginatedQuery(
ctx, cfg, tt.initialCursor, queryFunc,
extractCursor, processItem,
)
// Check error expectations
if tt.expectedError != "" {
require.ErrorContains(t, err, tt.expectedError)
if tt.queryError != nil {
require.ErrorIs(t, err, tt.queryError)
}
if tt.processError != nil {
require.ErrorIs(t, err, tt.processError)
}
} else {
require.NoError(t, err)
}
// Check processed results.
require.Equal(t, tt.expectedResults, processedResults)
// Check number of query calls.
require.Equal(t, tt.expectedPages, queryCallCount)
})
}
}
// TestExecuteCollectAndBatchWithSharedDataQuery tests the
// ExecuteCollectAndBatchWithSharedDataQuery function which processes items in
// pages, allowing for efficient querying and processing of large datasets with
// shared data across batches.
func TestExecuteCollectAndBatchWithSharedDataQuery(t *testing.T) {
t.Parallel()
type channelRow struct {
id int64
name string
policyIDs []int64
}
type channelBatchData struct {
lookupTable map[int64]string
sharedInfo string
}
type processedChannel struct {
id int64
name string
sharedInfo string
lookupData string
}
tests := []struct {
name string
maxPageSize uint32
allRows []channelRow
initialCursor int64
pageQueryError error
pageQueryErrorOnCall int
batchDataError error
batchDataErrorOnBatch int
processError error
processErrorOnID int64
earlyTerminationOnPage int
expectedError string
expectedProcessedItems []processedChannel
expectedPageCalls int
expectedBatchCalls int
}{
{
name: "multiple pages multiple batches",
maxPageSize: 2,
allRows: []channelRow{
{
id: 1,
name: "Chan1",
policyIDs: []int64{10, 11},
},
{
id: 2,
name: "Chan2",
policyIDs: []int64{20},
},
{
id: 3,
name: "Chan3",
policyIDs: []int64{30, 31},
},
{
id: 4,
name: "Chan4",
policyIDs: []int64{40},
},
{
id: 5,
name: "Chan5",
policyIDs: []int64{50},
},
},
initialCursor: 0,
expectedProcessedItems: []processedChannel{
{
id: 1,
name: "Chan1",
sharedInfo: "batch-shared",
lookupData: "lookup-1",
},
{
id: 2,
name: "Chan2",
sharedInfo: "batch-shared",
lookupData: "lookup-2",
},
{
id: 3,
name: "Chan3",
sharedInfo: "batch-shared",
lookupData: "lookup-3",
},
{
id: 4,
name: "Chan4",
sharedInfo: "batch-shared",
lookupData: "lookup-4",
},
{
id: 5,
name: "Chan5",
sharedInfo: "batch-shared",
lookupData: "lookup-5",
},
},
// Pages: [1,2], [3,4], [5].
expectedPageCalls: 3,
// One batch call per page with data: [1,2], [3,4], [5].
expectedBatchCalls: 3,
},
{
name: "empty results",
maxPageSize: 10,
allRows: []channelRow{},
initialCursor: 0,
// One call that returns empty.
expectedPageCalls: 1,
// No batches since no items.
expectedBatchCalls: 0,
},
{
name: "single page single batch",
maxPageSize: 10,
allRows: []channelRow{
{
id: 1,
name: "Chan1",
policyIDs: []int64{10},
},
{
id: 2,
name: "Chan2",
policyIDs: []int64{20},
},
},
initialCursor: 0,
expectedProcessedItems: []processedChannel{
{
id: 1,
name: "Chan1",
sharedInfo: "batch-shared",
lookupData: "lookup-1",
},
{
id: 2,
name: "Chan2",
sharedInfo: "batch-shared",
lookupData: "lookup-2",
},
},
// One page with all items.
expectedPageCalls: 1,
// One batch call for the single page.
expectedBatchCalls: 1,
},
{
name: "page query error first call",
maxPageSize: 5,
allRows: []channelRow{
{
id: 1,
name: "Chan1",
},
},
initialCursor: 0,
pageQueryError: errors.New(
"database connection failed",
),
pageQueryErrorOnCall: 1,
expectedError: "failed to fetch page with " +
"cursor 0",
expectedPageCalls: 1,
expectedBatchCalls: 0,
},
{
name: "page query error second call",
maxPageSize: 1,
allRows: []channelRow{
{
id: 1,
name: "Chan1",
},
{
id: 2,
name: "Chan2",
},
},
initialCursor: 0,
pageQueryError: errors.New("database error on " +
"second page"),
pageQueryErrorOnCall: 2,
expectedError: "failed to fetch page with " +
"cursor 1",
expectedProcessedItems: []processedChannel{
{
id: 1,
name: "Chan1",
sharedInfo: "batch-shared",
lookupData: "lookup-1",
},
},
expectedPageCalls: 2,
expectedBatchCalls: 1,
},
{
name: "batch data error first batch",
maxPageSize: 10,
allRows: []channelRow{
{
id: 1,
name: "Chan1",
},
{
id: 2,
name: "Chan2",
},
},
initialCursor: 0,
batchDataError: errors.New("batch loading " +
"failed"),
batchDataErrorOnBatch: 1,
expectedError: "failed to load batch data " +
"for page",
expectedPageCalls: 1,
expectedBatchCalls: 1,
},
{
name: "batch data error second page",
maxPageSize: 1,
allRows: []channelRow{
{
id: 1,
name: "Chan1",
},
{
id: 2,
name: "Chan2",
},
},
initialCursor: 0,
batchDataError: errors.New("batch loading " +
"failed on second page"),
batchDataErrorOnBatch: 2,
expectedError: "failed to load batch data " +
"for page",
expectedProcessedItems: []processedChannel{
{
id: 1,
name: "Chan1",
sharedInfo: "batch-shared",
lookupData: "lookup-1",
},
},
expectedPageCalls: 2,
expectedBatchCalls: 2,
},
{
name: "process error first item",
maxPageSize: 10,
allRows: []channelRow{
{
id: 1,
name: "Chan1",
},
{
id: 2,
name: "Chan2",
},
},
initialCursor: 0,
processError: errors.New("processing failed"),
processErrorOnID: 1,
expectedError: "failed to process item with " +
"batch data",
expectedPageCalls: 1,
expectedBatchCalls: 1,
},
{
name: "process error second item",
maxPageSize: 10,
allRows: []channelRow{
{
id: 1,
name: "Chan1",
},
{
id: 2,
name: "Chan2",
},
},
initialCursor: 0,
processError: errors.New("processing failed"),
processErrorOnID: 2,
expectedError: "failed to process item with batch " +
"data",
expectedProcessedItems: []processedChannel{
{
id: 1,
name: "Chan1",
sharedInfo: "batch-shared",
lookupData: "lookup-1",
},
},
expectedPageCalls: 1,
expectedBatchCalls: 1,
},
{
name: "early termination partial page",
maxPageSize: 3,
allRows: []channelRow{
{
id: 1,
name: "Chan1",
},
{
id: 2,
name: "Chan2",
},
},
initialCursor: 0,
earlyTerminationOnPage: 1,
expectedProcessedItems: []processedChannel{
{
id: 1,
name: "Chan1",
sharedInfo: "batch-shared",
lookupData: "lookup-1",
},
{
id: 2,
name: "Chan2",
sharedInfo: "batch-shared",
lookupData: "lookup-2",
},
},
expectedPageCalls: 1,
expectedBatchCalls: 1,
},
{
name: "different initial cursor",
maxPageSize: 2,
allRows: []channelRow{
{
id: 1,
name: "Chan1",
},
{
id: 2,
name: "Chan2",
},
{
id: 3,
name: "Chan3",
},
},
initialCursor: 1,
expectedProcessedItems: []processedChannel{
{
id: 2,
name: "Chan2",
sharedInfo: "batch-shared",
lookupData: "lookup-2",
},
{
id: 3,
name: "Chan3",
sharedInfo: "batch-shared",
lookupData: "lookup-3",
},
},
// [2,3], [].
expectedPageCalls: 2,
// One batch call for the page with [2,3].
expectedBatchCalls: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := t.Context()
cfg := &QueryConfig{
MaxPageSize: tt.maxPageSize,
}
var (
processedItems []processedChannel
pageCallCount int
batchCallCount int
)
pageQueryFunc := func(ctx context.Context, cursor int64,
limit int32) ([]channelRow, error) {
pageCallCount++
// Return error on specific call if configured.
//nolint:ll
if tt.pageQueryErrorOnCall > 0 &&
pageCallCount == tt.pageQueryErrorOnCall {
return nil, tt.pageQueryError
}
// Simulate cursor-based pagination.
var items []channelRow
for _, row := range tt.allRows {
if row.id > cursor &&
len(items) < int(limit) {
items = append(items, row)
}
}
// Handle early termination test case.
//nolint:ll
if tt.earlyTerminationOnPage > 0 &&
pageCallCount == tt.earlyTerminationOnPage {
// Return fewer items than maxPageSize
// to trigger termination
if len(items) >= int(tt.maxPageSize) {
items = items[:tt.maxPageSize-1]
}
}
return items, nil
}
extractPageCursor := func(row channelRow) int64 {
return row.id
}
collectFunc := func(row channelRow) (int64, error) {
return row.id, nil
}
batchDataFunc := func(ctx context.Context,
ids []int64) (*channelBatchData, error) {
batchCallCount++
// Return error on specific batch if configured.
//nolint:ll
if tt.batchDataErrorOnBatch > 0 &&
batchCallCount == tt.batchDataErrorOnBatch {
return nil, tt.batchDataError
}
// Create mock batch data.
lookupTable := make(map[int64]string)
for _, id := range ids {
lookupTable[id] =
fmt.Sprintf("lookup-%d", id)
}
return &channelBatchData{
lookupTable: lookupTable,
sharedInfo: "batch-shared",
}, nil
}
processItem := func(ctx context.Context, row channelRow,
batchData *channelBatchData) error {
// Return error on specific item if configured.
if tt.processErrorOnID > 0 &&
row.id == tt.processErrorOnID {
return tt.processError
}
processedChan := processedChannel{
id: row.id,
name: row.name,
sharedInfo: batchData.sharedInfo,
lookupData: batchData.
lookupTable[row.id],
}
processedItems = append(
processedItems, processedChan,
)
return nil
}
err := ExecuteCollectAndBatchWithSharedDataQuery(
ctx, cfg, tt.initialCursor,
pageQueryFunc, extractPageCursor, collectFunc,
batchDataFunc, processItem,
)
// Check error expectations.
if tt.expectedError != "" {
require.ErrorContains(t, err, tt.expectedError)
if tt.pageQueryError != nil {
require.ErrorIs(
t, err, tt.pageQueryError,
)
}
if tt.batchDataError != nil {
require.ErrorIs(
t, err, tt.batchDataError,
)
}
if tt.processError != nil {
require.ErrorIs(
t, err, tt.processError,
)
}
} else {
require.NoError(t, err)
}
// Check processed results.
require.Equal(
t, tt.expectedProcessedItems, processedItems,
)
// Check call counts.
require.Equal(
t, tt.expectedPageCalls, pageCallCount,
)
require.Equal(
t, tt.expectedBatchCalls, batchCallCount,
)
})
}
}