From 006905d57fb9153349e93515c0800ee00fa590e8 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 16 Jul 2025 08:27:44 +0200 Subject: [PATCH] sqldb: add ExecutePagedQuery helper Along with a test for it. This helper will allow us to easily create a pagination wrapper for queries that will make use of the new /*SLICE:*/ directive. The next commit will add a test showing this. --- sqldb/paginate.go | 76 +++++++++++++ sqldb/paginate_test.go | 236 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 312 insertions(+) create mode 100644 sqldb/paginate.go create mode 100644 sqldb/paginate_test.go diff --git a/sqldb/paginate.go b/sqldb/paginate.go new file mode 100644 index 000000000..143b456d5 --- /dev/null +++ b/sqldb/paginate.go @@ -0,0 +1,76 @@ +package sqldb + +import ( + "context" + "fmt" +) + +// PagedQueryFunc represents a function that takes a slice of converted items +// and returns results. +type PagedQueryFunc[T any, R any] func(context.Context, []T) ([]R, error) + +// ItemCallbackFunc represents a function that processes individual results. +type ItemCallbackFunc[R any] func(context.Context, R) error + +// ConvertFunc represents a function that converts from input type to query type +type ConvertFunc[I any, T any] func(I) T + +// PagedQueryConfig holds configuration values for calls to ExecutePagedQuery. +type PagedQueryConfig struct { + PageSize int +} + +// DefaultPagedQueryConfig returns a default configuration +func DefaultPagedQueryConfig() *PagedQueryConfig { + return &PagedQueryConfig{ + PageSize: 1000, + } +} + +// ExecutePagedQuery executes a paginated query over a slice of input items. +// It converts the input items to a query type using the provided convertFunc, +// executes the query using the provided queryFunc, and applies the callback +// to each result. +func ExecutePagedQuery[I any, T any, R any](ctx context.Context, + cfg *PagedQueryConfig, inputItems []I, convertFunc ConvertFunc[I, T], + queryFunc PagedQueryFunc[T, R], callback ItemCallbackFunc[R]) error { + + if len(inputItems) == 0 { + return nil + } + + // Process items in pages. + for i := 0; i < len(inputItems); i += cfg.PageSize { + // Calculate the end index for this page. + end := i + cfg.PageSize + if end > len(inputItems) { + end = len(inputItems) + } + + // Get the page slice of input items. + inputPage := inputItems[i:end] + + // Convert only the items needed for this page. + convertedPage := make([]T, len(inputPage)) + for j, inputItem := range inputPage { + convertedPage[j] = convertFunc(inputItem) + } + + // Execute the query for this page. + results, err := queryFunc(ctx, convertedPage) + if err != nil { + return fmt.Errorf("query failed for page "+ + "starting at %d: %w", i, err) + } + + // Apply the callback to each result. + for _, result := range results { + if err := callback(ctx, result); err != nil { + return fmt.Errorf("callback failed for "+ + "result: %w", err) + } + } + } + + return nil +} diff --git a/sqldb/paginate_test.go b/sqldb/paginate_test.go new file mode 100644 index 000000000..33b9cd93e --- /dev/null +++ b/sqldb/paginate_test.go @@ -0,0 +1,236 @@ +package sqldb + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestExecutePagedQuery tests the ExecutePagedQuery function which processes +// items in pages, allowing for efficient querying and processing of large +// datasets. +func TestExecutePagedQuery(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + t.Run("empty input returns nil", func(t *testing.T) { + var ( + cfg = DefaultPagedQueryConfig() + inputItems []int + ) + + convertFunc := func(i int) string { + return fmt.Sprintf("%d", i) + } + + queryFunc := func(ctx context.Context, items []string) ( + []string, error) { + + require.Fail(t, "queryFunc should not be called "+ + "with empty input") + return nil, nil + } + callback := func(ctx context.Context, result string) error { + require.Fail(t, "callback should not be called with "+ + "empty input") + + return nil + } + + err := ExecutePagedQuery( + ctx, cfg, inputItems, convertFunc, queryFunc, callback, + ) + require.NoError(t, err) + }) + + t.Run("single page processes all items", func(t *testing.T) { + var ( + convertedItems []string + callbackResults []string + inputItems = []int{1, 2, 3, 4, 5} + cfg = &PagedQueryConfig{ + PageSize: 10, + } + ) + + convertFunc := func(i int) string { + return fmt.Sprintf("converted_%d", i) + } + + queryFunc := func(ctx context.Context, + items []string) ([]string, error) { + + convertedItems = append(convertedItems, items...) + results := make([]string, len(items)) + for i, item := range items { + results[i] = fmt.Sprintf("result_%s", item) + } + + return results, nil + } + + callback := func(ctx context.Context, result string) error { + callbackResults = append(callbackResults, result) + return nil + } + + err := ExecutePagedQuery( + ctx, cfg, inputItems, convertFunc, queryFunc, callback, + ) + require.NoError(t, err) + + require.Equal(t, []string{ + "converted_1", "converted_2", "converted_3", + "converted_4", "converted_5", + }, convertedItems) + + require.Equal(t, []string{ + "result_converted_1", "result_converted_2", + "result_converted_3", "result_converted_4", + "result_converted_5", + }, callbackResults) + }) + + t.Run("multiple pages process correctly", func(t *testing.T) { + var ( + queryCallCount int + pageSizes []int + allResults []string + inputItems = []int{1, 2, 3, 4, 5, 6, 7, 8} + cfg = &PagedQueryConfig{ + PageSize: 3, + } + ) + + convertFunc := func(i int) string { + return fmt.Sprintf("item_%d", i) + } + + queryFunc := func(ctx context.Context, + items []string) ([]string, error) { + + queryCallCount++ + pageSizes = append(pageSizes, len(items)) + results := make([]string, len(items)) + for i, item := range items { + results[i] = fmt.Sprintf("result_%s", item) + } + + return results, nil + } + + callback := func(ctx context.Context, result string) error { + allResults = append(allResults, result) + return nil + } + + err := ExecutePagedQuery( + ctx, cfg, inputItems, convertFunc, queryFunc, callback, + ) + require.NoError(t, err) + + // Should have 3 pages: [1,2,3], [4,5,6], [7,8] + require.Equal(t, 3, queryCallCount) + require.Equal(t, []int{3, 3, 2}, pageSizes) + require.Len(t, allResults, 8) + }) + + t.Run("query function error is propagated", func(t *testing.T) { + var ( + cfg = DefaultPagedQueryConfig() + inputItems = []int{1, 2, 3} + ) + + convertFunc := func(i int) string { + return fmt.Sprintf("%d", i) + } + + queryFunc := func(ctx context.Context, + items []string) ([]string, error) { + + return nil, errors.New("query failed") + } + + callback := func(ctx context.Context, result string) error { + require.Fail(t, "callback should not be called when "+ + "query fails") + + return nil + } + + err := ExecutePagedQuery( + ctx, cfg, inputItems, convertFunc, queryFunc, callback, + ) + require.ErrorContains(t, err, "query failed for page "+ + "starting at 0: query failed") + }) + + t.Run("callback error is propagated", func(t *testing.T) { + var ( + cfg = DefaultPagedQueryConfig() + inputItems = []int{1, 2, 3} + ) + + convertFunc := func(i int) string { + return fmt.Sprintf("%d", i) + } + + queryFunc := func(ctx context.Context, + items []string) ([]string, error) { + + return items, nil + } + + callback := func(ctx context.Context, result string) error { + if result == "2" { + return errors.New("callback failed") + } + return nil + } + + err := ExecutePagedQuery( + ctx, cfg, inputItems, convertFunc, queryFunc, callback, + ) + require.ErrorContains(t, err, "callback failed for result: "+ + "callback failed") + }) + + t.Run("query error in second page is propagated", func(t *testing.T) { + var ( + inputItems = []int{1, 2, 3, 4} + cfg = &PagedQueryConfig{ + PageSize: 2, + } + queryCallCount int + ) + + convertFunc := func(i int) string { + return fmt.Sprintf("%d", i) + } + + queryFunc := func(ctx context.Context, + items []string) ([]string, error) { + + queryCallCount++ + if queryCallCount == 2 { + return nil, fmt.Errorf("second page failed") + } + + return items, nil + } + + callback := func(ctx context.Context, result string) error { + return nil + } + + err := ExecutePagedQuery( + ctx, cfg, inputItems, convertFunc, queryFunc, callback, + ) + require.ErrorContains(t, err, "query failed for page "+ + "starting at 2: second page failed") + }) +}