From ad1a77a2576f0c66ee07b2e546dfdbffb58750ab Mon Sep 17 00:00:00 2001 From: Leon Mika Date: Mon, 23 Jan 2023 21:23:46 +1100 Subject: [PATCH] Added paging and the ability to specify the table in query script method (#44) * paging: added controller for paging through results * paging: added command and key binding for going to the next page * paging: added the ability to specify the table in the query script method * paging: have got exclusive start key written to backstack --- internal/dynamo-browse/controllers/iface.go | 4 +- internal/dynamo-browse/controllers/scripts.go | 33 +++++- .../dynamo-browse/controllers/scripts_test.go | 19 ++++ .../dynamo-browse/controllers/tableread.go | 78 ++++++++++--- .../controllers/tableread_test.go | 69 +++++++++++- .../controllers/tablewrite_test.go | 22 +++- .../dynamo-browse/models/attrcodec/utils.go | 30 +++++ internal/dynamo-browse/models/models.go | 22 +++- .../models/serialisable/viewsnapshot.go | 10 +- .../providers/dynamo/provider.go | 105 ++++++++++++------ .../providers/dynamo/provider_test.go | 18 ++- .../services/scriptmanager/iface.go | 1 + .../services/scriptmanager/modsession.go | 5 + .../services/scriptmanager/modsession_test.go | 31 ++++++ .../dynamo-browse/services/tables/iface.go | 18 ++- .../dynamo-browse/services/tables/service.go | 36 ++++-- .../dynamo-browse/ui/keybindings/defaults.go | 1 + .../ui/keybindings/keybindings.go | 1 + internal/dynamo-browse/ui/model.go | 6 + test/cmd/load-test-table/main.go | 30 ++++- 20 files changed, 444 insertions(+), 95 deletions(-) create mode 100644 internal/dynamo-browse/models/attrcodec/utils.go diff --git a/internal/dynamo-browse/controllers/iface.go b/internal/dynamo-browse/controllers/iface.go index 2228605..a604192 100644 --- a/internal/dynamo-browse/controllers/iface.go +++ b/internal/dynamo-browse/controllers/iface.go @@ -2,6 +2,7 @@ package controllers import ( "context" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/lmika/audax/internal/dynamo-browse/models" "io/fs" ) @@ -11,7 +12,8 @@ type TableReadService interface { Describe(ctx context.Context, table string) (*models.TableInfo, error) Scan(ctx context.Context, tableInfo *models.TableInfo) (*models.ResultSet, error) Filter(resultSet *models.ResultSet, filter string) *models.ResultSet - ScanOrQuery(ctx context.Context, tableInfo *models.TableInfo, query models.Queryable) (*models.ResultSet, error) + ScanOrQuery(ctx context.Context, tableInfo *models.TableInfo, query models.Queryable, exclusiveStartKey map[string]types.AttributeValue) (*models.ResultSet, error) + NextPage(ctx context.Context, resultSet *models.ResultSet) (*models.ResultSet, error) } type SettingsProvider interface { diff --git a/internal/dynamo-browse/controllers/scripts.go b/internal/dynamo-browse/controllers/scripts.go index f279cb8..73c9dbf 100644 --- a/internal/dynamo-browse/controllers/scripts.go +++ b/internal/dynamo-browse/controllers/scripts.go @@ -169,12 +169,8 @@ func (s *sessionImpl) SetResultSet(ctx context.Context, newResultSet *models.Res } func (s *sessionImpl) Query(ctx context.Context, query string, opts scriptmanager.QueryOptions) (*models.ResultSet, error) { - currentResultSet := s.sc.tableReadController.state.ResultSet() - if currentResultSet == nil { - // TODO: this should only be used if there's no current table - return nil, errors.New("no table selected") - } + // Parse the query expr, err := queryexpr.Parse(query) if err != nil { return nil, err @@ -187,7 +183,32 @@ func (s *sessionImpl) Query(ctx context.Context, query string, opts scriptmanage expr = expr.WithValueParams(opts.ValuePlaceholders) } - newResultSet, err := s.sc.tableReadController.tableService.ScanOrQuery(context.Background(), currentResultSet.TableInfo, expr) + // Get the table info + var tableInfo *models.TableInfo + + tableName := opts.TableName + currentResultSet := s.sc.tableReadController.state.ResultSet() + + if tableName != "" { + // Table specified. If it's the same as the current table, then use the existing table info + if currentResultSet != nil && currentResultSet.TableInfo.Name == tableName { + tableInfo = currentResultSet.TableInfo + } + + // Otherwise, describe the table + tableInfo, err = s.sc.tableReadController.tableService.Describe(ctx, tableName) + if err != nil { + return nil, errors.Wrapf(err, "cannot describe table '%v'", tableName) + } + } else { + // Table not specified. Use the existing table, if any + if currentResultSet == nil { + return nil, errors.New("no table currently selected") + } + tableInfo = currentResultSet.TableInfo + } + + newResultSet, err := s.sc.tableReadController.tableService.ScanOrQuery(ctx, tableInfo, expr, nil) if err != nil { return nil, err } diff --git a/internal/dynamo-browse/controllers/scripts_test.go b/internal/dynamo-browse/controllers/scripts_test.go index 64026a1..06ee419 100644 --- a/internal/dynamo-browse/controllers/scripts_test.go +++ b/internal/dynamo-browse/controllers/scripts_test.go @@ -67,6 +67,25 @@ func TestScriptController_RunScript(t *testing.T) { assert.Len(t, srv.msgSender.msgs, 1) assert.Equal(t, events.StatusMsg("2"), srv.msgSender.msgs[0]) }) + + t.Run("should run query against another table", func(t *testing.T) { + srv := newService(t, serviceConfig{ + tableName: "alpha-table", + scriptFS: testScriptFile(t, "test.tm", ` + rs := session.query('pk!="abc"', { table: "count-to-30" }).unwrap() + ui.print(rs.length) + `), + }) + + invokeCommand(t, srv.readController.Init()) + msg := srv.scriptController.RunScript("test.tm") + assert.Nil(t, msg) + + srv.msgSender.waitForAtLeastOneMessages(t, 5*time.Second) + + assert.Len(t, srv.msgSender.msgs, 1) + assert.Equal(t, events.StatusMsg("30"), srv.msgSender.msgs[0]) + }) }) t.Run("session.set_result_set", func(t *testing.T) { diff --git a/internal/dynamo-browse/controllers/tableread.go b/internal/dynamo-browse/controllers/tableread.go index ac9875b..1aabff5 100644 --- a/internal/dynamo-browse/controllers/tableread.go +++ b/internal/dynamo-browse/controllers/tableread.go @@ -4,9 +4,11 @@ import ( "bytes" "context" "fmt" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" tea "github.com/charmbracelet/bubbletea" "github.com/lmika/audax/internal/common/ui/events" "github.com/lmika/audax/internal/dynamo-browse/models" + "github.com/lmika/audax/internal/dynamo-browse/models/attrcodec" "github.com/lmika/audax/internal/dynamo-browse/models/queryexpr" "github.com/lmika/audax/internal/dynamo-browse/models/serialisable" "github.com/lmika/audax/internal/dynamo-browse/services/itemrenderer" @@ -28,6 +30,7 @@ const ( resultSetUpdateSnapshotRestore resultSetUpdateRescan resultSetUpdateTouch + resultSetUpdateNextPage resultSetUpdateScript ) @@ -128,7 +131,7 @@ func (c *TableReadController) ScanTable(name string) tea.Msg { } return resultSet, err - }).OnEither(c.handleResultSetFromJobResult(c.state.Filter(), true, resultSetUpdateInit)).Submit() + }).OnEither(c.handleResultSetFromJobResult(c.state.Filter(), true, false, resultSetUpdateInit)).Submit() } func (c *TableReadController) PromptForQuery() tea.Msg { @@ -149,33 +152,39 @@ func (c *TableReadController) PromptForQuery() tea.Msg { } } - return c.runQuery(resultSet.TableInfo, q, "", true) + return c.runQuery(resultSet.TableInfo, q, "", true, nil) }, } } -func (c *TableReadController) runQuery(tableInfo *models.TableInfo, query *queryexpr.QueryExpr, newFilter string, pushSnapshot bool) tea.Msg { +func (c *TableReadController) runQuery( + tableInfo *models.TableInfo, + query *queryexpr.QueryExpr, + newFilter string, + pushSnapshot bool, + exclusiveStartKey map[string]types.AttributeValue, +) tea.Msg { if query == nil { return NewJob(c.jobController, "Scanning…", func(ctx context.Context) (*models.ResultSet, error) { - newResultSet, err := c.tableService.ScanOrQuery(context.Background(), tableInfo, nil) + newResultSet, err := c.tableService.ScanOrQuery(context.Background(), tableInfo, nil, exclusiveStartKey) if newResultSet != nil && newFilter != "" { newResultSet = c.tableService.Filter(newResultSet, newFilter) } return newResultSet, err - }).OnEither(c.handleResultSetFromJobResult(newFilter, pushSnapshot, resultSetUpdateQuery)).Submit() + }).OnEither(c.handleResultSetFromJobResult(newFilter, pushSnapshot, false, resultSetUpdateQuery)).Submit() } return c.doIfNoneDirty(func() tea.Msg { return NewJob(c.jobController, "Running query…", func(ctx context.Context) (*models.ResultSet, error) { - newResultSet, err := c.tableService.ScanOrQuery(context.Background(), tableInfo, query) + newResultSet, err := c.tableService.ScanOrQuery(context.Background(), tableInfo, query, exclusiveStartKey) if newFilter != "" && newResultSet != nil { newResultSet = c.tableService.Filter(newResultSet, newFilter) } return newResultSet, err - }).OnEither(c.handleResultSetFromJobResult(newFilter, pushSnapshot, resultSetUpdateQuery)).Submit() + }).OnEither(c.handleResultSetFromJobResult(newFilter, pushSnapshot, false, resultSetUpdateQuery)).Submit() }) } @@ -210,13 +219,13 @@ func (c *TableReadController) Rescan() tea.Msg { func (c *TableReadController) doScan(resultSet *models.ResultSet, query models.Queryable, pushBackstack bool, op resultSetUpdateOp) tea.Msg { return NewJob(c.jobController, "Rescan…", func(ctx context.Context) (*models.ResultSet, error) { - newResultSet, err := c.tableService.ScanOrQuery(ctx, resultSet.TableInfo, query) + newResultSet, err := c.tableService.ScanOrQuery(ctx, resultSet.TableInfo, query, resultSet.LastEvaluatedKey) if newResultSet != nil { newResultSet = c.tableService.Filter(newResultSet, c.state.Filter()) } return newResultSet, err - }).OnEither(c.handleResultSetFromJobResult(c.state.Filter(), pushBackstack, op)).Submit() + }).OnEither(c.handleResultSetFromJobResult(c.state.Filter(), pushBackstack, false, op)).Submit() } func (c *TableReadController) setResultSetAndFilter(resultSet *models.ResultSet, filter string, pushBackstack bool, op resultSetUpdateOp) tea.Msg { @@ -235,7 +244,16 @@ func (c *TableReadController) setResultSetAndFilter(resultSet *models.ResultSet, } } - log.Printf("pushing to backstack: table = %v, filter = %v, query_hash = %v", details.TableName, details.Filter, details.QueryHash) + if len(resultSet.ExclusiveStartKey) > 0 { + var err error + details.ExclusiveStartKey, err = attrcodec.SerializeMapToBytes(resultSet.ExclusiveStartKey) + if err != nil { + log.Printf("cannot serialize last evaluated key to byte: %v", err) + } + } + + log.Printf("pushing to backstack: table = %v, filter = %v, query_hash = %v", + details.TableName, details.Filter, details.QueryHash) if err := c.workspaceService.PushSnapshot(details); err != nil { log.Printf("cannot push snapshot: %v", err) } @@ -280,14 +298,22 @@ func (c *TableReadController) Filter() tea.Msg { return NewJob(c.jobController, "Applying Filter…", func(ctx context.Context) (*models.ResultSet, error) { newResultSet := c.tableService.Filter(resultSet, value) return newResultSet, nil - }).OnEither(c.handleResultSetFromJobResult(value, true, resultSetUpdateFilter)).Submit() + }).OnEither(c.handleResultSetFromJobResult(value, true, false, resultSetUpdateFilter)).Submit() }, } } -func (c *TableReadController) handleResultSetFromJobResult(filter string, pushbackStack bool, op resultSetUpdateOp) func(newResultSet *models.ResultSet, err error) tea.Msg { +func (c *TableReadController) handleResultSetFromJobResult( + filter string, + pushbackStack, errIfEmpty bool, + op resultSetUpdateOp, +) func(newResultSet *models.ResultSet, err error) tea.Msg { return func(newResultSet *models.ResultSet, err error) tea.Msg { if err == nil { + if errIfEmpty && newResultSet.NoResults() { + return events.StatusMsg("No more results") + } + return c.setResultSetAndFilter(newResultSet, filter, pushbackStack, op) } @@ -336,6 +362,20 @@ func (c *TableReadController) ViewForward() tea.Msg { return c.updateViewToSnapshot(viewSnapshot) } +func (c *TableReadController) NextPage() tea.Msg { + resultSet := c.state.ResultSet() + if resultSet == nil { + return events.StatusMsg("Result-set is nil") + } else if resultSet.LastEvaluatedKey == nil { + return events.StatusMsg("No more results") + } + currentFilter := c.state.filter + + return NewJob(c.jobController, "Fetching next page…", func(ctx context.Context) (*models.ResultSet, error) { + return c.tableService.NextPage(ctx, resultSet) + }).OnEither(c.handleResultSetFromJobResult(currentFilter, true, true, resultSetUpdateNextPage)).Submit() +} + func (c *TableReadController) updateViewToSnapshot(viewSnapshot *serialisable.ViewSnapshot) tea.Msg { var err error currentResultSet := c.state.ResultSet() @@ -348,6 +388,14 @@ func (c *TableReadController) updateViewToSnapshot(viewSnapshot *serialisable.Vi } } + var exclusiveStartKey map[string]types.AttributeValue + if len(viewSnapshot.Details.ExclusiveStartKey) > 0 { + exclusiveStartKey, err = attrcodec.DeseralizedMapFromBytes(viewSnapshot.Details.ExclusiveStartKey) + if err != nil { + return err + } + } + if currentResultSet == nil { return NewJob(c.jobController, "Fetching table info…", func(ctx context.Context) (*models.TableInfo, error) { tableInfo, err := c.tableService.Describe(context.Background(), viewSnapshot.Details.TableName) @@ -356,7 +404,7 @@ func (c *TableReadController) updateViewToSnapshot(viewSnapshot *serialisable.Vi } return tableInfo, nil }).OnDone(func(tableInfo *models.TableInfo) tea.Msg { - return c.runQuery(tableInfo, query, viewSnapshot.Details.Filter, false) + return c.runQuery(tableInfo, query, viewSnapshot.Details.Filter, false, exclusiveStartKey) }).Submit() } @@ -368,7 +416,7 @@ func (c *TableReadController) updateViewToSnapshot(viewSnapshot *serialisable.Vi if viewSnapshot.Details.TableName == currentResultSet.TableInfo.Name && queryEqualsCurrentQuery { return NewJob(c.jobController, "Applying filter…", func(ctx context.Context) (*models.ResultSet, error) { return c.tableService.Filter(currentResultSet, viewSnapshot.Details.Filter), nil - }).OnEither(c.handleResultSetFromJobResult(viewSnapshot.Details.Filter, false, resultSetUpdateSnapshotRestore)).Submit() + }).OnEither(c.handleResultSetFromJobResult(viewSnapshot.Details.Filter, false, false, resultSetUpdateSnapshotRestore)).Submit() } return NewJob(c.jobController, "Running query…", func(ctx context.Context) (tea.Msg, error) { @@ -380,7 +428,7 @@ func (c *TableReadController) updateViewToSnapshot(viewSnapshot *serialisable.Vi } } - return c.runQuery(tableInfo, query, viewSnapshot.Details.Filter, false), nil + return c.runQuery(tableInfo, query, viewSnapshot.Details.Filter, false, exclusiveStartKey), nil }).OnDone(func(m tea.Msg) tea.Msg { return m }).Submit() diff --git a/internal/dynamo-browse/controllers/tableread_test.go b/internal/dynamo-browse/controllers/tableread_test.go index 268ce48..153b06e 100644 --- a/internal/dynamo-browse/controllers/tableread_test.go +++ b/internal/dynamo-browse/controllers/tableread_test.go @@ -2,6 +2,7 @@ package controllers_test import ( "fmt" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" tea "github.com/charmbracelet/bubbletea" "github.com/lmika/audax/internal/common/ui/events" "github.com/lmika/audax/internal/dynamo-browse/controllers" @@ -36,7 +37,7 @@ func TestTableReadController_ListTables(t *testing.T) { event := srv.readController.ListTables(false).(controllers.PromptForTableMsg) - assert.Equal(t, []string{"alpha-table", "bravo-table"}, event.Tables) + assert.Equal(t, []string{"alpha-table", "bravo-table", "count-to-30"}, event.Tables) selectedEvent := event.OnSelected("alpha-table") @@ -109,6 +110,54 @@ func TestTableReadController_Query(t *testing.T) { }) } +func TestTableReadController_NextPage(t *testing.T) { + t.Run("should return successive pages of results", func(t *testing.T) { + scenarios := []struct { + pageLimit int + expectedPageSizes []int + }{ + {pageLimit: 10, expectedPageSizes: []int{10, 10, 10}}, + {pageLimit: 5, expectedPageSizes: []int{5, 5, 5, 5, 5, 5}}, + {pageLimit: 13, expectedPageSizes: []int{13, 13, 4}}, + {pageLimit: 7, expectedPageSizes: []int{7, 7, 7, 7, 2}}, + {pageLimit: 3, expectedPageSizes: []int{3, 3, 3, 3, 3, 3, 3, 3, 3, 3}}, + {pageLimit: 50, expectedPageSizes: []int{30}}, + {pageLimit: 100, expectedPageSizes: []int{30}}, + } + + for _, scenario := range scenarios { + t.Run(fmt.Sprintf("page size = %v", scenario.pageLimit), func(t *testing.T) { + srv := newService(t, serviceConfig{tableName: "count-to-30", defaultLimit: scenario.pageLimit}) + + invokeCommand(t, srv.readController.Init()) + + var currentCount = 1 + + // Go through each page and confirm that the items are correct + for i, pageSize := range scenario.expectedPageSizes { + if i > 0 { + invokeCommand(t, srv.readController.NextPage()) + } + + rs := srv.state.ResultSet() + assert.Len(t, rs.Items(), pageSize) + for _, item := range rs.Items() { + assert.Equal(t, fmt.Sprintf("NUM#%02d", currentCount), item["sk"].(*types.AttributeValueMemberS).Value) + currentCount += 1 + } + } + + // Attempt to get the last page of results, but no more results. This should not + // clear the current page of results + invokeCommand(t, srv.readController.NextPage()) + + rs := srv.state.ResultSet() + assert.Len(t, rs.Items(), scenario.expectedPageSizes[len(scenario.expectedPageSizes)-1]) + }) + } + }) +} + func tempFile(t *testing.T) string { t.Helper() @@ -221,4 +270,22 @@ var testData = []testdynamo.TestData{ }, }, }, + { + TableName: "count-to-30", + Data: sequenceToN(1, 30, func(n int) map[string]any { + return map[string]any{ + "pk": "NUM", + "sk": fmt.Sprintf("NUM#%02d", n), + "num": n, + } + }), + }, +} + +func sequenceToN[T any](from int, to int, fn func(n int) T) []T { + ns := make([]T, 0, to-from+1) + for i := from; i <= to; i++ { + ns = append(ns, fn(i)) + } + return ns } diff --git a/internal/dynamo-browse/controllers/tablewrite_test.go b/internal/dynamo-browse/controllers/tablewrite_test.go index 58b76e6..8faea23 100644 --- a/internal/dynamo-browse/controllers/tablewrite_test.go +++ b/internal/dynamo-browse/controllers/tablewrite_test.go @@ -589,9 +589,10 @@ type services struct { } type serviceConfig struct { - tableName string - isReadOnly bool - scriptFS fs.FS + tableName string + isReadOnly bool + defaultLimit int + scriptFS fs.FS } func newService(t *testing.T, cfg serviceConfig) *services { @@ -626,9 +627,15 @@ func newService(t *testing.T, cfg serviceConfig) *services { t.Errorf("cannot set ro: %v", err) } } + if cfg.defaultLimit != 0 { + if err := settingStore.SetDefaultLimit(cfg.defaultLimit); err != nil { + t.Errorf("cannot set default limit: %v", err) + } + } msgSender := &msgSender{} scriptController.Init() + jobsController.SetMessageSender(msgSender.send) scriptController.SetMessageSender(msgSender.send) // Initting will setup the default script lookup paths, so revert them to the test ones @@ -676,6 +683,15 @@ func (s *msgSender) send(msg tea.Msg) { } } +func (s *msgSender) drain() []tea.Msg { + s.mutex.Lock() + defer s.mutex.Unlock() + + msgs := s.msgs + s.msgs = nil + return msgs +} + func (s *msgSender) waitForAtLeastOneMessages(t *testing.T, d time.Duration) { t.Helper() diff --git a/internal/dynamo-browse/models/attrcodec/utils.go b/internal/dynamo-browse/models/attrcodec/utils.go new file mode 100644 index 0000000..ce89ad0 --- /dev/null +++ b/internal/dynamo-browse/models/attrcodec/utils.go @@ -0,0 +1,30 @@ +package attrcodec + +import ( + "bytes" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/pkg/errors" +) + +func SerializeMapToBytes(ms map[string]types.AttributeValue) ([]byte, error) { + bs := new(bytes.Buffer) + if err := NewEncoder(bs).Encode(&types.AttributeValueMemberM{ + Value: ms, + }); err != nil { + return nil, err + } + return bs.Bytes(), nil +} + +func DeseralizedMapFromBytes(bs []byte) (map[string]types.AttributeValue, error) { + attr, err := NewDecoder(bytes.NewReader(bs)).Decode() + if err != nil { + return nil, err + } + + mapAttr, isMapAttr := attr.(*types.AttributeValueMemberM) + if !isMapAttr { + return nil, errors.New("expected attribute value to be a map") + } + return mapAttr.Value, nil +} diff --git a/internal/dynamo-browse/models/models.go b/internal/dynamo-browse/models/models.go index 258b247..1e9bf16 100644 --- a/internal/dynamo-browse/models/models.go +++ b/internal/dynamo-browse/models/models.go @@ -1,12 +1,20 @@ package models -import "sort" +import ( + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "sort" +) type ResultSet struct { - TableInfo *TableInfo - Query Queryable - items []Item - attributes []ItemAttribute + // Query information + TableInfo *TableInfo + Query Queryable + ExclusiveStartKey map[string]types.AttributeValue + + // Result information + LastEvaluatedKey map[string]types.AttributeValue + items []Item + attributes []ItemAttribute columns []string } @@ -25,6 +33,10 @@ type ItemAttribute struct { New bool } +func (rs *ResultSet) NoResults() bool { + return len(rs.items) == 0 && rs.LastEvaluatedKey == nil +} + func (rs *ResultSet) Items() []Item { return rs.items } diff --git a/internal/dynamo-browse/models/serialisable/viewsnapshot.go b/internal/dynamo-browse/models/serialisable/viewsnapshot.go index a2ea038..610e695 100644 --- a/internal/dynamo-browse/models/serialisable/viewsnapshot.go +++ b/internal/dynamo-browse/models/serialisable/viewsnapshot.go @@ -15,15 +15,17 @@ type ViewSnapshot struct { } type ViewSnapshotDetails struct { - TableName string - Query []byte - QueryHash uint64 - Filter string + TableName string + Query []byte + QueryHash uint64 + Filter string + ExclusiveStartKey []byte } func (d ViewSnapshotDetails) Equals(other ViewSnapshotDetails, compareHashesOnly bool) bool { return d.TableName == other.TableName && d.Filter == other.Filter && + bytes.Equal(d.ExclusiveStartKey, d.ExclusiveStartKey) && d.compareQueries(other, compareHashesOnly) } diff --git a/internal/dynamo-browse/providers/dynamo/provider.go b/internal/dynamo-browse/providers/dynamo/provider.go index ce71c90..f6958b1 100644 --- a/internal/dynamo-browse/providers/dynamo/provider.go +++ b/internal/dynamo-browse/providers/dynamo/provider.go @@ -104,10 +104,20 @@ func (p *Provider) batchPutItems(ctx context.Context, name string, items []model return nil } -func (p *Provider) ScanItems(ctx context.Context, tableName string, filterExpr *expression.Expression, maxItems int) ([]models.Item, error) { +func (p *Provider) ScanItems( + ctx context.Context, + tableName string, + filterExpr *expression.Expression, + exclusiveStartKey map[string]types.AttributeValue, + maxItems int, +) ([]models.Item, map[string]types.AttributeValue, error) { + const maxItemsPerPage = 100 + input := &dynamodb.ScanInput{ TableName: aws.String(tableName), - Limit: aws.Int32(int32(maxItems)), + //Limit: aws.Int32(int32(maxItems)), + //Limit: aws.Int32(100), + //ExclusiveStartKey: exclusiveStartKey, } if filterExpr != nil { input.FilterExpression = filterExpr.Filter() @@ -115,44 +125,59 @@ func (p *Provider) ScanItems(ctx context.Context, tableName string, filterExpr * input.ExpressionAttributeValues = filterExpr.Values() } - paginator := dynamodb.NewScanPaginator(p.client, input, func(opt *dynamodb.ScanPaginatorOptions) { - opt.Limit = 100 - }) + var ( + items = make([]models.Item, 0) + nextUpdate = time.Now().Add(1 * time.Second) + lastEvalKey = exclusiveStartKey + ) - items := make([]models.Item, 0) + for len(items) < maxItems { + remainingItemsToFetch := maxItems - len(items) + if remainingItemsToFetch > maxItemsPerPage { + input.Limit = aws.Int32(maxItemsPerPage) + } else { + input.Limit = aws.Int32(int32(remainingItemsToFetch)) + } + input.ExclusiveStartKey = lastEvalKey - nextUpdate := time.Now().Add(1 * time.Second) - -outer: - for paginator.HasMorePages() { - res, err := paginator.NextPage(ctx) + out, err := p.client.Scan(ctx, input) if err != nil { if ctx.Err() != nil { - return items, models.NewPartialResultsError(ctx.Err()) + return items, nil, models.NewPartialResultsError(ctx.Err()) } - return nil, errors.Wrapf(err, "cannot execute scan on table %v", tableName) + return nil, nil, errors.Wrapf(err, "cannot execute scan on table %v", tableName) } - for _, itm := range res.Items { + for _, itm := range out.Items { items = append(items, itm) - if len(items) >= maxItems { - break outer - } } if time.Now().After(nextUpdate) { jobs.PostUpdate(ctx, fmt.Sprintf("found %d items", len(items))) nextUpdate = time.Now().Add(1 * time.Second) } + + lastEvalKey = out.LastEvaluatedKey + if lastEvalKey == nil { + // We've reached the last page + break + } } - return items, nil + return items, lastEvalKey, nil } -func (p *Provider) QueryItems(ctx context.Context, tableName string, filterExpr *expression.Expression, maxItems int) ([]models.Item, error) { +func (p *Provider) QueryItems( + ctx context.Context, + tableName string, + filterExpr *expression.Expression, + exclusiveStartKey map[string]types.AttributeValue, + maxItems int, +) ([]models.Item, map[string]types.AttributeValue, error) { + const maxItemsPerPage = 100 + input := &dynamodb.QueryInput{ TableName: aws.String(tableName), - Limit: aws.Int32(int32(maxItems)), } if filterExpr != nil { input.KeyConditionExpression = filterExpr.KeyCondition() @@ -161,38 +186,46 @@ func (p *Provider) QueryItems(ctx context.Context, tableName string, filterExpr input.ExpressionAttributeValues = filterExpr.Values() } - paginator := dynamodb.NewQueryPaginator(p.client, input, func(opt *dynamodb.QueryPaginatorOptions) { - opt.Limit = 100 - }) + var ( + items = make([]models.Item, 0) + nextUpdate = time.Now().Add(1 * time.Second) + lastEvalKey = exclusiveStartKey + ) - items := make([]models.Item, 0) + for len(items) < maxItems { + remainingItemsToFetch := maxItems - len(items) + if remainingItemsToFetch > maxItemsPerPage { + input.Limit = aws.Int32(maxItemsPerPage) + } else { + input.Limit = aws.Int32(int32(remainingItemsToFetch)) + } + input.ExclusiveStartKey = lastEvalKey - nextUpdate := time.Now().Add(1 * time.Second) - -outer: - for paginator.HasMorePages() { - res, err := paginator.NextPage(ctx) + out, err := p.client.Query(ctx, input) if err != nil { if ctx.Err() != nil { - return items, models.NewPartialResultsError(ctx.Err()) + return items, nil, models.NewPartialResultsError(ctx.Err()) } - return nil, errors.Wrapf(err, "cannot execute query on table %v", tableName) + return nil, nil, errors.Wrapf(err, "cannot execute scan on table %v", tableName) } - for _, itm := range res.Items { + for _, itm := range out.Items { items = append(items, itm) - if len(items) >= maxItems { - break outer - } } if time.Now().After(nextUpdate) { jobs.PostUpdate(ctx, fmt.Sprintf("found %d items", len(items))) nextUpdate = time.Now().Add(1 * time.Second) } + + lastEvalKey = out.LastEvaluatedKey + if lastEvalKey == nil { + // We've reached the last page + break + } } - return items, nil + return items, lastEvalKey, nil } func (p *Provider) DeleteItem(ctx context.Context, tableName string, key map[string]types.AttributeValue) error { diff --git a/internal/dynamo-browse/providers/dynamo/provider_test.go b/internal/dynamo-browse/providers/dynamo/provider_test.go index 6e044d0..625895d 100644 --- a/internal/dynamo-browse/providers/dynamo/provider_test.go +++ b/internal/dynamo-browse/providers/dynamo/provider_test.go @@ -21,8 +21,9 @@ func TestProvider_ScanItems(t *testing.T) { t.Run("should return scanned items from the table", func(t *testing.T) { ctx := context.Background() - items, err := provider.ScanItems(ctx, tableName, nil, 100) + items, lev, err := provider.ScanItems(ctx, tableName, nil, nil, 100) assert.NoError(t, err) + assert.Nil(t, lev) assert.Len(t, items, 3) assert.Contains(t, items, testdynamo.TestRecordAsItem(t, testData[0].Data[0])) @@ -33,8 +34,9 @@ func TestProvider_ScanItems(t *testing.T) { t.Run("should return error if table name does not exist", func(t *testing.T) { ctx := context.Background() - items, err := provider.ScanItems(ctx, "does-not-exist", nil, 100) + items, lev, err := provider.ScanItems(ctx, "does-not-exist", nil, nil, 100) assert.Error(t, err) + assert.Nil(t, lev) assert.Nil(t, items) }) } @@ -79,8 +81,9 @@ func TestProvider_PutItems(t *testing.T) { assert.NoError(t, err) // Verify the data - readItems, err := provider.ScanItems(ctx, tableName, nil, scenario.maxItems+5) + readItems, lev, err := provider.ScanItems(ctx, tableName, nil, nil, scenario.maxItems+5) assert.NoError(t, err) + assert.Nil(t, lev) assert.Len(t, readItems, scenario.maxItems) for i := 0; i < scenario.maxItems; i++ { @@ -104,8 +107,9 @@ func TestProvider_DeleteItem(t *testing.T) { "sk": &types.AttributeValueMemberS{Value: "222"}, }) - items, err := provider.ScanItems(ctx, tableName, nil, 100) + items, lev, err := provider.ScanItems(ctx, tableName, nil, nil, 100) assert.NoError(t, err) + assert.Nil(t, lev) assert.Len(t, items, 2) assert.Contains(t, items, testdynamo.TestRecordAsItem(t, testData[0].Data[0])) @@ -125,8 +129,9 @@ func TestProvider_DeleteItem(t *testing.T) { "sk": &types.AttributeValueMemberS{Value: "999"}, }) - items, err := provider.ScanItems(ctx, tableName, nil, 100) + items, lev, err := provider.ScanItems(ctx, tableName, nil, nil, 100) assert.NoError(t, err) + assert.Nil(t, lev) assert.Len(t, items, 3) assert.Contains(t, items, testdynamo.TestRecordAsItem(t, testData[0].Data[0])) @@ -140,8 +145,9 @@ func TestProvider_DeleteItem(t *testing.T) { ctx := context.Background() - items, err := provider.ScanItems(ctx, "does-not-exist", nil, 100) + items, lev, err := provider.ScanItems(ctx, "does-not-exist", nil, nil, 100) assert.Error(t, err) + assert.Nil(t, lev) assert.Nil(t, items) }) } diff --git a/internal/dynamo-browse/services/scriptmanager/iface.go b/internal/dynamo-browse/services/scriptmanager/iface.go index 9ab84da..a717057 100644 --- a/internal/dynamo-browse/services/scriptmanager/iface.go +++ b/internal/dynamo-browse/services/scriptmanager/iface.go @@ -31,6 +31,7 @@ type SessionService interface { } type QueryOptions struct { + TableName string NamePlaceholders map[string]string ValuePlaceholders map[string]types.AttributeValue } diff --git a/internal/dynamo-browse/services/scriptmanager/modsession.go b/internal/dynamo-browse/services/scriptmanager/modsession.go index 004d3cb..510946c 100644 --- a/internal/dynamo-browse/services/scriptmanager/modsession.go +++ b/internal/dynamo-browse/services/scriptmanager/modsession.go @@ -32,6 +32,11 @@ func (um *sessionModule) query(ctx context.Context, args ...object.Object) objec return objErr } + // Table name + if val, isVal := objMap.Get("table").(*object.String); isVal && val.Value() != "" { + options.TableName = val.Value() + } + // Placeholders if argsVal, isArgsValMap := objMap.Get("args").(*object.Map); isArgsValMap { options.NamePlaceholders = make(map[string]string) diff --git a/internal/dynamo-browse/services/scriptmanager/modsession_test.go b/internal/dynamo-browse/services/scriptmanager/modsession_test.go index 3e3bcfb..254e800 100644 --- a/internal/dynamo-browse/services/scriptmanager/modsession_test.go +++ b/internal/dynamo-browse/services/scriptmanager/modsession_test.go @@ -79,6 +79,37 @@ func TestModSession_Query(t *testing.T) { mockedSessionService.AssertExpectations(t) }) + t.Run("should successfully specify table name", func(t *testing.T) { + rs := &models.ResultSet{} + + mockedSessionService := mocks.NewSessionService(t) + mockedSessionService.EXPECT().Query(mock.Anything, "some expr", scriptmanager.QueryOptions{ + TableName: "some-table", + }).Return(rs, nil) + + mockedUIService := mocks.NewUIService(t) + + testFS := testScriptFile(t, "test.tm", ` + res := session.query("some expr", { + table: "some-table", + }) + assert(!res.is_err()) + `) + + srv := scriptmanager.New(scriptmanager.WithFS(testFS)) + srv.SetIFaces(scriptmanager.Ifaces{ + UI: mockedUIService, + Session: mockedSessionService, + }) + + ctx := context.Background() + err := <-srv.RunAdHocScript(ctx, "test.tm") + assert.NoError(t, err) + + mockedUIService.AssertExpectations(t) + mockedSessionService.AssertExpectations(t) + }) + t.Run("should set placeholder values", func(t *testing.T) { rs := &models.ResultSet{} diff --git a/internal/dynamo-browse/services/tables/iface.go b/internal/dynamo-browse/services/tables/iface.go index ef44826..0f28797 100644 --- a/internal/dynamo-browse/services/tables/iface.go +++ b/internal/dynamo-browse/services/tables/iface.go @@ -3,7 +3,6 @@ package tables import ( "context" "github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression" - "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/lmika/audax/internal/dynamo-browse/models" ) @@ -11,11 +10,24 @@ import ( type TableProvider interface { ListTables(ctx context.Context) ([]string, error) DescribeTable(ctx context.Context, tableName string) (*models.TableInfo, error) - QueryItems(ctx context.Context, tableName string, filterExpr *expression.Expression, maxItems int) ([]models.Item, error) - ScanItems(ctx context.Context, tableName string, filterExpr *expression.Expression, maxItems int) ([]models.Item, error) DeleteItem(ctx context.Context, tableName string, key map[string]types.AttributeValue) error PutItem(ctx context.Context, name string, item models.Item) error PutItems(ctx context.Context, name string, items []models.Item) error + + QueryItems( + ctx context.Context, + tableName string, + filterExpr *expression.Expression, + exclusiveStartKey map[string]types.AttributeValue, + maxItems int, + ) (items []models.Item, lastEvaluatedKey map[string]types.AttributeValue, err error) + ScanItems( + ctx context.Context, + tableName string, + filterExpr *expression.Expression, + exclusiveStartKey map[string]types.AttributeValue, + maxItems int, + ) (item []models.Item, lastEvaluatedKey map[string]types.AttributeValue, err error) } type ConfigProvider interface { diff --git a/internal/dynamo-browse/services/tables/service.go b/internal/dynamo-browse/services/tables/service.go index 1c8c090..154a5ad 100644 --- a/internal/dynamo-browse/services/tables/service.go +++ b/internal/dynamo-browse/services/tables/service.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/lmika/audax/internal/common/sliceutils" "github.com/lmika/audax/internal/dynamo-browse/services/jobs" "log" @@ -35,10 +36,16 @@ func (s *Service) Describe(ctx context.Context, table string) (*models.TableInfo } func (s *Service) Scan(ctx context.Context, tableInfo *models.TableInfo) (*models.ResultSet, error) { - return s.doScan(ctx, tableInfo, nil, s.configProvider.DefaultLimit()) + return s.doScan(ctx, tableInfo, nil, nil, s.configProvider.DefaultLimit()) } -func (s *Service) doScan(ctx context.Context, tableInfo *models.TableInfo, expr models.Queryable, limit int) (*models.ResultSet, error) { +func (s *Service) doScan( + ctx context.Context, + tableInfo *models.TableInfo, + expr models.Queryable, + exclusiveStartKey map[string]types.AttributeValue, + limit int, +) (*models.ResultSet, error) { var ( filterExpr *expression.Expression runAsQuery bool @@ -55,26 +62,31 @@ func (s *Service) doScan(ctx context.Context, tableInfo *models.TableInfo, expr } var results []models.Item + var lastEvalKey map[string]types.AttributeValue if runAsQuery { log.Printf("executing query") - results, err = s.provider.QueryItems(ctx, tableInfo.Name, filterExpr, limit) + results, lastEvalKey, err = s.provider.QueryItems(ctx, tableInfo.Name, filterExpr, exclusiveStartKey, limit) } else { log.Printf("executing scan") - results, err = s.provider.ScanItems(ctx, tableInfo.Name, filterExpr, limit) + results, lastEvalKey, err = s.provider.ScanItems(ctx, tableInfo.Name, filterExpr, exclusiveStartKey, limit) } if err != nil && len(results) == 0 { return &models.ResultSet{ - TableInfo: tableInfo, - Query: expr, + TableInfo: tableInfo, + Query: expr, + ExclusiveStartKey: exclusiveStartKey, + LastEvaluatedKey: lastEvalKey, }, errors.Wrapf(err, "unable to scan table %v", tableInfo.Name) } models.Sort(results, tableInfo) resultSet := &models.ResultSet{ - TableInfo: tableInfo, - Query: expr, + TableInfo: tableInfo, + Query: expr, + ExclusiveStartKey: exclusiveStartKey, + LastEvaluatedKey: lastEvalKey, } resultSet.SetItems(results) resultSet.RefreshColumns() @@ -147,8 +159,12 @@ func (s *Service) Delete(ctx context.Context, tableInfo *models.TableInfo, items return nil } -func (s *Service) ScanOrQuery(ctx context.Context, tableInfo *models.TableInfo, expr models.Queryable) (*models.ResultSet, error) { - return s.doScan(ctx, tableInfo, expr, s.configProvider.DefaultLimit()) +func (s *Service) ScanOrQuery(ctx context.Context, tableInfo *models.TableInfo, expr models.Queryable, exclusiveStartKey map[string]types.AttributeValue) (*models.ResultSet, error) { + return s.doScan(ctx, tableInfo, expr, exclusiveStartKey, s.configProvider.DefaultLimit()) +} + +func (s *Service) NextPage(ctx context.Context, resultSet *models.ResultSet) (*models.ResultSet, error) { + return s.doScan(ctx, resultSet.TableInfo, resultSet.Query, resultSet.LastEvaluatedKey, s.configProvider.DefaultLimit()) } func (s *Service) assertReadWrite() error { diff --git a/internal/dynamo-browse/ui/keybindings/defaults.go b/internal/dynamo-browse/ui/keybindings/defaults.go index 912bee2..639a369 100644 --- a/internal/dynamo-browse/ui/keybindings/defaults.go +++ b/internal/dynamo-browse/ui/keybindings/defaults.go @@ -29,6 +29,7 @@ func Default() *KeyBindings { Rescan: key.NewBinding(key.WithKeys("R"), key.WithHelp("R", "rescan")), PromptForQuery: key.NewBinding(key.WithKeys("?"), key.WithHelp("?", "prompt for query")), PromptForFilter: key.NewBinding(key.WithKeys("/"), key.WithHelp("/", "filter")), + FetchNextPage: key.NewBinding(key.WithKeys(">"), key.WithHelp(">", "fetch next page")), ViewBack: key.NewBinding(key.WithKeys("backspace"), key.WithHelp("backspace", "go back")), ViewForward: key.NewBinding(key.WithKeys("\\"), key.WithHelp("\\", "go forward")), CycleLayoutForward: key.NewBinding(key.WithKeys("w"), key.WithHelp("w", "cycle layout forward")), diff --git a/internal/dynamo-browse/ui/keybindings/keybindings.go b/internal/dynamo-browse/ui/keybindings/keybindings.go index a4ca6f9..26178fd 100644 --- a/internal/dynamo-browse/ui/keybindings/keybindings.go +++ b/internal/dynamo-browse/ui/keybindings/keybindings.go @@ -36,6 +36,7 @@ type ViewKeyBindings struct { PromptForQuery key.Binding `keymap:"prompt-for-query"` PromptForFilter key.Binding `keymap:"prompt-for-filter"` PromptForTable key.Binding `keymap:"prompt-for-table"` + FetchNextPage key.Binding `keymap:"fetch-next-page"` ViewBack key.Binding `keymap:"view-back"` ViewForward key.Binding `keymap:"view-forward"` CycleLayoutForward key.Binding `keymap:"cycle-layout-forward"` diff --git a/internal/dynamo-browse/ui/model.go b/internal/dynamo-browse/ui/model.go index 3fffe9b..2e7a9ab 100644 --- a/internal/dynamo-browse/ui/model.go +++ b/internal/dynamo-browse/ui/model.go @@ -120,6 +120,9 @@ func NewModel( return rc.Mark(markOp) }, + "next-page": func(ctx commandctrl.ExecContext, args []string) tea.Msg { + return rc.NextPage() + }, "delete": commandctrl.NoArgCommand(wc.DeleteMarked), // TEMP @@ -205,6 +208,7 @@ func NewModel( "unmark": cc.Alias("mark", []string{"none"}), "sa": cc.Alias("set-attr", nil), "da": cc.Alias("del-attr", nil), + "np": cc.Alias("next-page", nil), "w": cc.Alias("put", nil), "q": cc.Alias("quit", nil), }, @@ -258,6 +262,8 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { return m, m.tableReadController.PromptForQuery case key.Matches(msg, m.keyMap.PromptForFilter): return m, m.tableReadController.Filter + case key.Matches(msg, m.keyMap.FetchNextPage): + return m, m.tableReadController.NextPage case key.Matches(msg, m.keyMap.ViewBack): return m, m.tableReadController.ViewBack case key.Matches(msg, m.keyMap.ViewForward): diff --git a/test/cmd/load-test-table/main.go b/test/cmd/load-test-table/main.go index 31435fe..80b4272 100644 --- a/test/cmd/load-test-table/main.go +++ b/test/cmd/load-test-table/main.go @@ -35,15 +35,15 @@ func main() { dynamodb.WithEndpointResolver(dynamodb.EndpointResolverFromURL("http://localhost:4566"))) // Other tables - if err := createTable(ctx, dynamoClient, "user-accounts"); err != nil { + if err := createTable(ctx, dynamoClient, "user-accounts", false); err != nil { log.Fatal(err) } - if err := createTable(ctx, dynamoClient, "inventory"); err != nil { + if err := createTable(ctx, dynamoClient, "inventory", true); err != nil { log.Fatal(err) } - if err := createTable(ctx, dynamoClient, tableName); err != nil { + if err := createTable(ctx, dynamoClient, tableName, false); err != nil { log.Fatal(err) } @@ -51,6 +51,10 @@ func main() { Name: tableName, Keys: models.KeyAttribute{PartitionKey: "pk", SortKey: "sk"}, } + inventoryTableInfo := &models.TableInfo{ + Name: "inventory", + Keys: models.KeyAttribute{PartitionKey: "pk", SortKey: "sk"}, + } dynamoProvider := dynamo.NewProvider(dynamoClient) tableService := tables.NewService(dynamoProvider, notROService{}) @@ -87,16 +91,32 @@ func main() { } } + key := gofakeit.UUID() + for i := 0; i < totalItems; i++ { + if err := tableService.Put(ctx, inventoryTableInfo, models.Item{ + "pk": &types.AttributeValueMemberS{Value: key}, + "sk": &types.AttributeValueMemberN{Value: fmt.Sprint(i)}, + "uuid": &types.AttributeValueMemberS{Value: gofakeit.UUID()}, + }); err != nil { + log.Fatalln(err) + } + } + log.Printf("table '%v' created with %v items", tableName, totalItems) } -func createTable(ctx context.Context, dynamoClient *dynamodb.Client, tableName string) error { +func createTable(ctx context.Context, dynamoClient *dynamodb.Client, tableName string, skNumber bool) error { if _, err := dynamoClient.DeleteTable(ctx, &dynamodb.DeleteTableInput{ TableName: aws.String(tableName), }); err != nil { log.Printf("warn: cannot delete table: %v: %v", tableName, err) } + var skType = types.ScalarAttributeTypeS + if skNumber { + skType = types.ScalarAttributeTypeN + } + if _, err := dynamoClient.CreateTable(ctx, &dynamodb.CreateTableInput{ TableName: aws.String(tableName), KeySchema: []types.KeySchemaElement{ @@ -105,7 +125,7 @@ func createTable(ctx context.Context, dynamoClient *dynamodb.Client, tableName s }, AttributeDefinitions: []types.AttributeDefinition{ {AttributeName: aws.String("pk"), AttributeType: types.ScalarAttributeTypeS}, - {AttributeName: aws.String("sk"), AttributeType: types.ScalarAttributeTypeS}, + {AttributeName: aws.String("sk"), AttributeType: skType}, }, ProvisionedThroughput: &types.ProvisionedThroughput{ ReadCapacityUnits: aws.Int64(100),