Added paging and the ability to specify the table in query script method (#44)
* paging: added controller for paging through results * paging: added command and key binding for going to the next page * paging: added the ability to specify the table in the query script method * paging: have got exclusive start key written to backstack
This commit is contained in:
parent
9e658b8619
commit
ad1a77a257
|
@ -2,6 +2,7 @@ package controllers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
||||||
"github.com/lmika/audax/internal/dynamo-browse/models"
|
"github.com/lmika/audax/internal/dynamo-browse/models"
|
||||||
"io/fs"
|
"io/fs"
|
||||||
)
|
)
|
||||||
|
@ -11,7 +12,8 @@ type TableReadService interface {
|
||||||
Describe(ctx context.Context, table string) (*models.TableInfo, error)
|
Describe(ctx context.Context, table string) (*models.TableInfo, error)
|
||||||
Scan(ctx context.Context, tableInfo *models.TableInfo) (*models.ResultSet, error)
|
Scan(ctx context.Context, tableInfo *models.TableInfo) (*models.ResultSet, error)
|
||||||
Filter(resultSet *models.ResultSet, filter string) *models.ResultSet
|
Filter(resultSet *models.ResultSet, filter string) *models.ResultSet
|
||||||
ScanOrQuery(ctx context.Context, tableInfo *models.TableInfo, query models.Queryable) (*models.ResultSet, error)
|
ScanOrQuery(ctx context.Context, tableInfo *models.TableInfo, query models.Queryable, exclusiveStartKey map[string]types.AttributeValue) (*models.ResultSet, error)
|
||||||
|
NextPage(ctx context.Context, resultSet *models.ResultSet) (*models.ResultSet, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type SettingsProvider interface {
|
type SettingsProvider interface {
|
||||||
|
|
|
@ -169,12 +169,8 @@ func (s *sessionImpl) SetResultSet(ctx context.Context, newResultSet *models.Res
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sessionImpl) Query(ctx context.Context, query string, opts scriptmanager.QueryOptions) (*models.ResultSet, error) {
|
func (s *sessionImpl) Query(ctx context.Context, query string, opts scriptmanager.QueryOptions) (*models.ResultSet, error) {
|
||||||
currentResultSet := s.sc.tableReadController.state.ResultSet()
|
|
||||||
if currentResultSet == nil {
|
|
||||||
// TODO: this should only be used if there's no current table
|
|
||||||
return nil, errors.New("no table selected")
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Parse the query
|
||||||
expr, err := queryexpr.Parse(query)
|
expr, err := queryexpr.Parse(query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -187,7 +183,32 @@ func (s *sessionImpl) Query(ctx context.Context, query string, opts scriptmanage
|
||||||
expr = expr.WithValueParams(opts.ValuePlaceholders)
|
expr = expr.WithValueParams(opts.ValuePlaceholders)
|
||||||
}
|
}
|
||||||
|
|
||||||
newResultSet, err := s.sc.tableReadController.tableService.ScanOrQuery(context.Background(), currentResultSet.TableInfo, expr)
|
// Get the table info
|
||||||
|
var tableInfo *models.TableInfo
|
||||||
|
|
||||||
|
tableName := opts.TableName
|
||||||
|
currentResultSet := s.sc.tableReadController.state.ResultSet()
|
||||||
|
|
||||||
|
if tableName != "" {
|
||||||
|
// Table specified. If it's the same as the current table, then use the existing table info
|
||||||
|
if currentResultSet != nil && currentResultSet.TableInfo.Name == tableName {
|
||||||
|
tableInfo = currentResultSet.TableInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise, describe the table
|
||||||
|
tableInfo, err = s.sc.tableReadController.tableService.Describe(ctx, tableName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "cannot describe table '%v'", tableName)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Table not specified. Use the existing table, if any
|
||||||
|
if currentResultSet == nil {
|
||||||
|
return nil, errors.New("no table currently selected")
|
||||||
|
}
|
||||||
|
tableInfo = currentResultSet.TableInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
newResultSet, err := s.sc.tableReadController.tableService.ScanOrQuery(ctx, tableInfo, expr, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,6 +67,25 @@ func TestScriptController_RunScript(t *testing.T) {
|
||||||
assert.Len(t, srv.msgSender.msgs, 1)
|
assert.Len(t, srv.msgSender.msgs, 1)
|
||||||
assert.Equal(t, events.StatusMsg("2"), srv.msgSender.msgs[0])
|
assert.Equal(t, events.StatusMsg("2"), srv.msgSender.msgs[0])
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("should run query against another table", func(t *testing.T) {
|
||||||
|
srv := newService(t, serviceConfig{
|
||||||
|
tableName: "alpha-table",
|
||||||
|
scriptFS: testScriptFile(t, "test.tm", `
|
||||||
|
rs := session.query('pk!="abc"', { table: "count-to-30" }).unwrap()
|
||||||
|
ui.print(rs.length)
|
||||||
|
`),
|
||||||
|
})
|
||||||
|
|
||||||
|
invokeCommand(t, srv.readController.Init())
|
||||||
|
msg := srv.scriptController.RunScript("test.tm")
|
||||||
|
assert.Nil(t, msg)
|
||||||
|
|
||||||
|
srv.msgSender.waitForAtLeastOneMessages(t, 5*time.Second)
|
||||||
|
|
||||||
|
assert.Len(t, srv.msgSender.msgs, 1)
|
||||||
|
assert.Equal(t, events.StatusMsg("30"), srv.msgSender.msgs[0])
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("session.set_result_set", func(t *testing.T) {
|
t.Run("session.set_result_set", func(t *testing.T) {
|
||||||
|
|
|
@ -4,9 +4,11 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
||||||
tea "github.com/charmbracelet/bubbletea"
|
tea "github.com/charmbracelet/bubbletea"
|
||||||
"github.com/lmika/audax/internal/common/ui/events"
|
"github.com/lmika/audax/internal/common/ui/events"
|
||||||
"github.com/lmika/audax/internal/dynamo-browse/models"
|
"github.com/lmika/audax/internal/dynamo-browse/models"
|
||||||
|
"github.com/lmika/audax/internal/dynamo-browse/models/attrcodec"
|
||||||
"github.com/lmika/audax/internal/dynamo-browse/models/queryexpr"
|
"github.com/lmika/audax/internal/dynamo-browse/models/queryexpr"
|
||||||
"github.com/lmika/audax/internal/dynamo-browse/models/serialisable"
|
"github.com/lmika/audax/internal/dynamo-browse/models/serialisable"
|
||||||
"github.com/lmika/audax/internal/dynamo-browse/services/itemrenderer"
|
"github.com/lmika/audax/internal/dynamo-browse/services/itemrenderer"
|
||||||
|
@ -28,6 +30,7 @@ const (
|
||||||
resultSetUpdateSnapshotRestore
|
resultSetUpdateSnapshotRestore
|
||||||
resultSetUpdateRescan
|
resultSetUpdateRescan
|
||||||
resultSetUpdateTouch
|
resultSetUpdateTouch
|
||||||
|
resultSetUpdateNextPage
|
||||||
resultSetUpdateScript
|
resultSetUpdateScript
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -128,7 +131,7 @@ func (c *TableReadController) ScanTable(name string) tea.Msg {
|
||||||
}
|
}
|
||||||
|
|
||||||
return resultSet, err
|
return resultSet, err
|
||||||
}).OnEither(c.handleResultSetFromJobResult(c.state.Filter(), true, resultSetUpdateInit)).Submit()
|
}).OnEither(c.handleResultSetFromJobResult(c.state.Filter(), true, false, resultSetUpdateInit)).Submit()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TableReadController) PromptForQuery() tea.Msg {
|
func (c *TableReadController) PromptForQuery() tea.Msg {
|
||||||
|
@ -149,33 +152,39 @@ func (c *TableReadController) PromptForQuery() tea.Msg {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.runQuery(resultSet.TableInfo, q, "", true)
|
return c.runQuery(resultSet.TableInfo, q, "", true, nil)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TableReadController) runQuery(tableInfo *models.TableInfo, query *queryexpr.QueryExpr, newFilter string, pushSnapshot bool) tea.Msg {
|
func (c *TableReadController) runQuery(
|
||||||
|
tableInfo *models.TableInfo,
|
||||||
|
query *queryexpr.QueryExpr,
|
||||||
|
newFilter string,
|
||||||
|
pushSnapshot bool,
|
||||||
|
exclusiveStartKey map[string]types.AttributeValue,
|
||||||
|
) tea.Msg {
|
||||||
if query == nil {
|
if query == nil {
|
||||||
return NewJob(c.jobController, "Scanning…", func(ctx context.Context) (*models.ResultSet, error) {
|
return NewJob(c.jobController, "Scanning…", func(ctx context.Context) (*models.ResultSet, error) {
|
||||||
newResultSet, err := c.tableService.ScanOrQuery(context.Background(), tableInfo, nil)
|
newResultSet, err := c.tableService.ScanOrQuery(context.Background(), tableInfo, nil, exclusiveStartKey)
|
||||||
|
|
||||||
if newResultSet != nil && newFilter != "" {
|
if newResultSet != nil && newFilter != "" {
|
||||||
newResultSet = c.tableService.Filter(newResultSet, newFilter)
|
newResultSet = c.tableService.Filter(newResultSet, newFilter)
|
||||||
}
|
}
|
||||||
|
|
||||||
return newResultSet, err
|
return newResultSet, err
|
||||||
}).OnEither(c.handleResultSetFromJobResult(newFilter, pushSnapshot, resultSetUpdateQuery)).Submit()
|
}).OnEither(c.handleResultSetFromJobResult(newFilter, pushSnapshot, false, resultSetUpdateQuery)).Submit()
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.doIfNoneDirty(func() tea.Msg {
|
return c.doIfNoneDirty(func() tea.Msg {
|
||||||
return NewJob(c.jobController, "Running query…", func(ctx context.Context) (*models.ResultSet, error) {
|
return NewJob(c.jobController, "Running query…", func(ctx context.Context) (*models.ResultSet, error) {
|
||||||
newResultSet, err := c.tableService.ScanOrQuery(context.Background(), tableInfo, query)
|
newResultSet, err := c.tableService.ScanOrQuery(context.Background(), tableInfo, query, exclusiveStartKey)
|
||||||
|
|
||||||
if newFilter != "" && newResultSet != nil {
|
if newFilter != "" && newResultSet != nil {
|
||||||
newResultSet = c.tableService.Filter(newResultSet, newFilter)
|
newResultSet = c.tableService.Filter(newResultSet, newFilter)
|
||||||
}
|
}
|
||||||
return newResultSet, err
|
return newResultSet, err
|
||||||
}).OnEither(c.handleResultSetFromJobResult(newFilter, pushSnapshot, resultSetUpdateQuery)).Submit()
|
}).OnEither(c.handleResultSetFromJobResult(newFilter, pushSnapshot, false, resultSetUpdateQuery)).Submit()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -210,13 +219,13 @@ func (c *TableReadController) Rescan() tea.Msg {
|
||||||
|
|
||||||
func (c *TableReadController) doScan(resultSet *models.ResultSet, query models.Queryable, pushBackstack bool, op resultSetUpdateOp) tea.Msg {
|
func (c *TableReadController) doScan(resultSet *models.ResultSet, query models.Queryable, pushBackstack bool, op resultSetUpdateOp) tea.Msg {
|
||||||
return NewJob(c.jobController, "Rescan…", func(ctx context.Context) (*models.ResultSet, error) {
|
return NewJob(c.jobController, "Rescan…", func(ctx context.Context) (*models.ResultSet, error) {
|
||||||
newResultSet, err := c.tableService.ScanOrQuery(ctx, resultSet.TableInfo, query)
|
newResultSet, err := c.tableService.ScanOrQuery(ctx, resultSet.TableInfo, query, resultSet.LastEvaluatedKey)
|
||||||
if newResultSet != nil {
|
if newResultSet != nil {
|
||||||
newResultSet = c.tableService.Filter(newResultSet, c.state.Filter())
|
newResultSet = c.tableService.Filter(newResultSet, c.state.Filter())
|
||||||
}
|
}
|
||||||
|
|
||||||
return newResultSet, err
|
return newResultSet, err
|
||||||
}).OnEither(c.handleResultSetFromJobResult(c.state.Filter(), pushBackstack, op)).Submit()
|
}).OnEither(c.handleResultSetFromJobResult(c.state.Filter(), pushBackstack, false, op)).Submit()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TableReadController) setResultSetAndFilter(resultSet *models.ResultSet, filter string, pushBackstack bool, op resultSetUpdateOp) tea.Msg {
|
func (c *TableReadController) setResultSetAndFilter(resultSet *models.ResultSet, filter string, pushBackstack bool, op resultSetUpdateOp) tea.Msg {
|
||||||
|
@ -235,7 +244,16 @@ func (c *TableReadController) setResultSetAndFilter(resultSet *models.ResultSet,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("pushing to backstack: table = %v, filter = %v, query_hash = %v", details.TableName, details.Filter, details.QueryHash)
|
if len(resultSet.ExclusiveStartKey) > 0 {
|
||||||
|
var err error
|
||||||
|
details.ExclusiveStartKey, err = attrcodec.SerializeMapToBytes(resultSet.ExclusiveStartKey)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("cannot serialize last evaluated key to byte: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("pushing to backstack: table = %v, filter = %v, query_hash = %v",
|
||||||
|
details.TableName, details.Filter, details.QueryHash)
|
||||||
if err := c.workspaceService.PushSnapshot(details); err != nil {
|
if err := c.workspaceService.PushSnapshot(details); err != nil {
|
||||||
log.Printf("cannot push snapshot: %v", err)
|
log.Printf("cannot push snapshot: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -280,14 +298,22 @@ func (c *TableReadController) Filter() tea.Msg {
|
||||||
return NewJob(c.jobController, "Applying Filter…", func(ctx context.Context) (*models.ResultSet, error) {
|
return NewJob(c.jobController, "Applying Filter…", func(ctx context.Context) (*models.ResultSet, error) {
|
||||||
newResultSet := c.tableService.Filter(resultSet, value)
|
newResultSet := c.tableService.Filter(resultSet, value)
|
||||||
return newResultSet, nil
|
return newResultSet, nil
|
||||||
}).OnEither(c.handleResultSetFromJobResult(value, true, resultSetUpdateFilter)).Submit()
|
}).OnEither(c.handleResultSetFromJobResult(value, true, false, resultSetUpdateFilter)).Submit()
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TableReadController) handleResultSetFromJobResult(filter string, pushbackStack bool, op resultSetUpdateOp) func(newResultSet *models.ResultSet, err error) tea.Msg {
|
func (c *TableReadController) handleResultSetFromJobResult(
|
||||||
|
filter string,
|
||||||
|
pushbackStack, errIfEmpty bool,
|
||||||
|
op resultSetUpdateOp,
|
||||||
|
) func(newResultSet *models.ResultSet, err error) tea.Msg {
|
||||||
return func(newResultSet *models.ResultSet, err error) tea.Msg {
|
return func(newResultSet *models.ResultSet, err error) tea.Msg {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
if errIfEmpty && newResultSet.NoResults() {
|
||||||
|
return events.StatusMsg("No more results")
|
||||||
|
}
|
||||||
|
|
||||||
return c.setResultSetAndFilter(newResultSet, filter, pushbackStack, op)
|
return c.setResultSetAndFilter(newResultSet, filter, pushbackStack, op)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -336,6 +362,20 @@ func (c *TableReadController) ViewForward() tea.Msg {
|
||||||
return c.updateViewToSnapshot(viewSnapshot)
|
return c.updateViewToSnapshot(viewSnapshot)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *TableReadController) NextPage() tea.Msg {
|
||||||
|
resultSet := c.state.ResultSet()
|
||||||
|
if resultSet == nil {
|
||||||
|
return events.StatusMsg("Result-set is nil")
|
||||||
|
} else if resultSet.LastEvaluatedKey == nil {
|
||||||
|
return events.StatusMsg("No more results")
|
||||||
|
}
|
||||||
|
currentFilter := c.state.filter
|
||||||
|
|
||||||
|
return NewJob(c.jobController, "Fetching next page…", func(ctx context.Context) (*models.ResultSet, error) {
|
||||||
|
return c.tableService.NextPage(ctx, resultSet)
|
||||||
|
}).OnEither(c.handleResultSetFromJobResult(currentFilter, true, true, resultSetUpdateNextPage)).Submit()
|
||||||
|
}
|
||||||
|
|
||||||
func (c *TableReadController) updateViewToSnapshot(viewSnapshot *serialisable.ViewSnapshot) tea.Msg {
|
func (c *TableReadController) updateViewToSnapshot(viewSnapshot *serialisable.ViewSnapshot) tea.Msg {
|
||||||
var err error
|
var err error
|
||||||
currentResultSet := c.state.ResultSet()
|
currentResultSet := c.state.ResultSet()
|
||||||
|
@ -348,6 +388,14 @@ func (c *TableReadController) updateViewToSnapshot(viewSnapshot *serialisable.Vi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var exclusiveStartKey map[string]types.AttributeValue
|
||||||
|
if len(viewSnapshot.Details.ExclusiveStartKey) > 0 {
|
||||||
|
exclusiveStartKey, err = attrcodec.DeseralizedMapFromBytes(viewSnapshot.Details.ExclusiveStartKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if currentResultSet == nil {
|
if currentResultSet == nil {
|
||||||
return NewJob(c.jobController, "Fetching table info…", func(ctx context.Context) (*models.TableInfo, error) {
|
return NewJob(c.jobController, "Fetching table info…", func(ctx context.Context) (*models.TableInfo, error) {
|
||||||
tableInfo, err := c.tableService.Describe(context.Background(), viewSnapshot.Details.TableName)
|
tableInfo, err := c.tableService.Describe(context.Background(), viewSnapshot.Details.TableName)
|
||||||
|
@ -356,7 +404,7 @@ func (c *TableReadController) updateViewToSnapshot(viewSnapshot *serialisable.Vi
|
||||||
}
|
}
|
||||||
return tableInfo, nil
|
return tableInfo, nil
|
||||||
}).OnDone(func(tableInfo *models.TableInfo) tea.Msg {
|
}).OnDone(func(tableInfo *models.TableInfo) tea.Msg {
|
||||||
return c.runQuery(tableInfo, query, viewSnapshot.Details.Filter, false)
|
return c.runQuery(tableInfo, query, viewSnapshot.Details.Filter, false, exclusiveStartKey)
|
||||||
}).Submit()
|
}).Submit()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -368,7 +416,7 @@ func (c *TableReadController) updateViewToSnapshot(viewSnapshot *serialisable.Vi
|
||||||
if viewSnapshot.Details.TableName == currentResultSet.TableInfo.Name && queryEqualsCurrentQuery {
|
if viewSnapshot.Details.TableName == currentResultSet.TableInfo.Name && queryEqualsCurrentQuery {
|
||||||
return NewJob(c.jobController, "Applying filter…", func(ctx context.Context) (*models.ResultSet, error) {
|
return NewJob(c.jobController, "Applying filter…", func(ctx context.Context) (*models.ResultSet, error) {
|
||||||
return c.tableService.Filter(currentResultSet, viewSnapshot.Details.Filter), nil
|
return c.tableService.Filter(currentResultSet, viewSnapshot.Details.Filter), nil
|
||||||
}).OnEither(c.handleResultSetFromJobResult(viewSnapshot.Details.Filter, false, resultSetUpdateSnapshotRestore)).Submit()
|
}).OnEither(c.handleResultSetFromJobResult(viewSnapshot.Details.Filter, false, false, resultSetUpdateSnapshotRestore)).Submit()
|
||||||
}
|
}
|
||||||
|
|
||||||
return NewJob(c.jobController, "Running query…", func(ctx context.Context) (tea.Msg, error) {
|
return NewJob(c.jobController, "Running query…", func(ctx context.Context) (tea.Msg, error) {
|
||||||
|
@ -380,7 +428,7 @@ func (c *TableReadController) updateViewToSnapshot(viewSnapshot *serialisable.Vi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.runQuery(tableInfo, query, viewSnapshot.Details.Filter, false), nil
|
return c.runQuery(tableInfo, query, viewSnapshot.Details.Filter, false, exclusiveStartKey), nil
|
||||||
}).OnDone(func(m tea.Msg) tea.Msg {
|
}).OnDone(func(m tea.Msg) tea.Msg {
|
||||||
return m
|
return m
|
||||||
}).Submit()
|
}).Submit()
|
||||||
|
|
|
@ -2,6 +2,7 @@ package controllers_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
||||||
tea "github.com/charmbracelet/bubbletea"
|
tea "github.com/charmbracelet/bubbletea"
|
||||||
"github.com/lmika/audax/internal/common/ui/events"
|
"github.com/lmika/audax/internal/common/ui/events"
|
||||||
"github.com/lmika/audax/internal/dynamo-browse/controllers"
|
"github.com/lmika/audax/internal/dynamo-browse/controllers"
|
||||||
|
@ -36,7 +37,7 @@ func TestTableReadController_ListTables(t *testing.T) {
|
||||||
|
|
||||||
event := srv.readController.ListTables(false).(controllers.PromptForTableMsg)
|
event := srv.readController.ListTables(false).(controllers.PromptForTableMsg)
|
||||||
|
|
||||||
assert.Equal(t, []string{"alpha-table", "bravo-table"}, event.Tables)
|
assert.Equal(t, []string{"alpha-table", "bravo-table", "count-to-30"}, event.Tables)
|
||||||
|
|
||||||
selectedEvent := event.OnSelected("alpha-table")
|
selectedEvent := event.OnSelected("alpha-table")
|
||||||
|
|
||||||
|
@ -109,6 +110,54 @@ func TestTableReadController_Query(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTableReadController_NextPage(t *testing.T) {
|
||||||
|
t.Run("should return successive pages of results", func(t *testing.T) {
|
||||||
|
scenarios := []struct {
|
||||||
|
pageLimit int
|
||||||
|
expectedPageSizes []int
|
||||||
|
}{
|
||||||
|
{pageLimit: 10, expectedPageSizes: []int{10, 10, 10}},
|
||||||
|
{pageLimit: 5, expectedPageSizes: []int{5, 5, 5, 5, 5, 5}},
|
||||||
|
{pageLimit: 13, expectedPageSizes: []int{13, 13, 4}},
|
||||||
|
{pageLimit: 7, expectedPageSizes: []int{7, 7, 7, 7, 2}},
|
||||||
|
{pageLimit: 3, expectedPageSizes: []int{3, 3, 3, 3, 3, 3, 3, 3, 3, 3}},
|
||||||
|
{pageLimit: 50, expectedPageSizes: []int{30}},
|
||||||
|
{pageLimit: 100, expectedPageSizes: []int{30}},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, scenario := range scenarios {
|
||||||
|
t.Run(fmt.Sprintf("page size = %v", scenario.pageLimit), func(t *testing.T) {
|
||||||
|
srv := newService(t, serviceConfig{tableName: "count-to-30", defaultLimit: scenario.pageLimit})
|
||||||
|
|
||||||
|
invokeCommand(t, srv.readController.Init())
|
||||||
|
|
||||||
|
var currentCount = 1
|
||||||
|
|
||||||
|
// Go through each page and confirm that the items are correct
|
||||||
|
for i, pageSize := range scenario.expectedPageSizes {
|
||||||
|
if i > 0 {
|
||||||
|
invokeCommand(t, srv.readController.NextPage())
|
||||||
|
}
|
||||||
|
|
||||||
|
rs := srv.state.ResultSet()
|
||||||
|
assert.Len(t, rs.Items(), pageSize)
|
||||||
|
for _, item := range rs.Items() {
|
||||||
|
assert.Equal(t, fmt.Sprintf("NUM#%02d", currentCount), item["sk"].(*types.AttributeValueMemberS).Value)
|
||||||
|
currentCount += 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt to get the last page of results, but no more results. This should not
|
||||||
|
// clear the current page of results
|
||||||
|
invokeCommand(t, srv.readController.NextPage())
|
||||||
|
|
||||||
|
rs := srv.state.ResultSet()
|
||||||
|
assert.Len(t, rs.Items(), scenario.expectedPageSizes[len(scenario.expectedPageSizes)-1])
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func tempFile(t *testing.T) string {
|
func tempFile(t *testing.T) string {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
|
@ -221,4 +270,22 @@ var testData = []testdynamo.TestData{
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
TableName: "count-to-30",
|
||||||
|
Data: sequenceToN(1, 30, func(n int) map[string]any {
|
||||||
|
return map[string]any{
|
||||||
|
"pk": "NUM",
|
||||||
|
"sk": fmt.Sprintf("NUM#%02d", n),
|
||||||
|
"num": n,
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func sequenceToN[T any](from int, to int, fn func(n int) T) []T {
|
||||||
|
ns := make([]T, 0, to-from+1)
|
||||||
|
for i := from; i <= to; i++ {
|
||||||
|
ns = append(ns, fn(i))
|
||||||
|
}
|
||||||
|
return ns
|
||||||
}
|
}
|
||||||
|
|
|
@ -591,6 +591,7 @@ type services struct {
|
||||||
type serviceConfig struct {
|
type serviceConfig struct {
|
||||||
tableName string
|
tableName string
|
||||||
isReadOnly bool
|
isReadOnly bool
|
||||||
|
defaultLimit int
|
||||||
scriptFS fs.FS
|
scriptFS fs.FS
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -626,9 +627,15 @@ func newService(t *testing.T, cfg serviceConfig) *services {
|
||||||
t.Errorf("cannot set ro: %v", err)
|
t.Errorf("cannot set ro: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if cfg.defaultLimit != 0 {
|
||||||
|
if err := settingStore.SetDefaultLimit(cfg.defaultLimit); err != nil {
|
||||||
|
t.Errorf("cannot set default limit: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
msgSender := &msgSender{}
|
msgSender := &msgSender{}
|
||||||
scriptController.Init()
|
scriptController.Init()
|
||||||
|
jobsController.SetMessageSender(msgSender.send)
|
||||||
scriptController.SetMessageSender(msgSender.send)
|
scriptController.SetMessageSender(msgSender.send)
|
||||||
|
|
||||||
// Initting will setup the default script lookup paths, so revert them to the test ones
|
// Initting will setup the default script lookup paths, so revert them to the test ones
|
||||||
|
@ -676,6 +683,15 @@ func (s *msgSender) send(msg tea.Msg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *msgSender) drain() []tea.Msg {
|
||||||
|
s.mutex.Lock()
|
||||||
|
defer s.mutex.Unlock()
|
||||||
|
|
||||||
|
msgs := s.msgs
|
||||||
|
s.msgs = nil
|
||||||
|
return msgs
|
||||||
|
}
|
||||||
|
|
||||||
func (s *msgSender) waitForAtLeastOneMessages(t *testing.T, d time.Duration) {
|
func (s *msgSender) waitForAtLeastOneMessages(t *testing.T, d time.Duration) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
|
|
30
internal/dynamo-browse/models/attrcodec/utils.go
Normal file
30
internal/dynamo-browse/models/attrcodec/utils.go
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
package attrcodec
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func SerializeMapToBytes(ms map[string]types.AttributeValue) ([]byte, error) {
|
||||||
|
bs := new(bytes.Buffer)
|
||||||
|
if err := NewEncoder(bs).Encode(&types.AttributeValueMemberM{
|
||||||
|
Value: ms,
|
||||||
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return bs.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func DeseralizedMapFromBytes(bs []byte) (map[string]types.AttributeValue, error) {
|
||||||
|
attr, err := NewDecoder(bytes.NewReader(bs)).Decode()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
mapAttr, isMapAttr := attr.(*types.AttributeValueMemberM)
|
||||||
|
if !isMapAttr {
|
||||||
|
return nil, errors.New("expected attribute value to be a map")
|
||||||
|
}
|
||||||
|
return mapAttr.Value, nil
|
||||||
|
}
|
|
@ -1,10 +1,18 @@
|
||||||
package models
|
package models
|
||||||
|
|
||||||
import "sort"
|
import (
|
||||||
|
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
||||||
|
"sort"
|
||||||
|
)
|
||||||
|
|
||||||
type ResultSet struct {
|
type ResultSet struct {
|
||||||
|
// Query information
|
||||||
TableInfo *TableInfo
|
TableInfo *TableInfo
|
||||||
Query Queryable
|
Query Queryable
|
||||||
|
ExclusiveStartKey map[string]types.AttributeValue
|
||||||
|
|
||||||
|
// Result information
|
||||||
|
LastEvaluatedKey map[string]types.AttributeValue
|
||||||
items []Item
|
items []Item
|
||||||
attributes []ItemAttribute
|
attributes []ItemAttribute
|
||||||
|
|
||||||
|
@ -25,6 +33,10 @@ type ItemAttribute struct {
|
||||||
New bool
|
New bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (rs *ResultSet) NoResults() bool {
|
||||||
|
return len(rs.items) == 0 && rs.LastEvaluatedKey == nil
|
||||||
|
}
|
||||||
|
|
||||||
func (rs *ResultSet) Items() []Item {
|
func (rs *ResultSet) Items() []Item {
|
||||||
return rs.items
|
return rs.items
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,11 +19,13 @@ type ViewSnapshotDetails struct {
|
||||||
Query []byte
|
Query []byte
|
||||||
QueryHash uint64
|
QueryHash uint64
|
||||||
Filter string
|
Filter string
|
||||||
|
ExclusiveStartKey []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d ViewSnapshotDetails) Equals(other ViewSnapshotDetails, compareHashesOnly bool) bool {
|
func (d ViewSnapshotDetails) Equals(other ViewSnapshotDetails, compareHashesOnly bool) bool {
|
||||||
return d.TableName == other.TableName &&
|
return d.TableName == other.TableName &&
|
||||||
d.Filter == other.Filter &&
|
d.Filter == other.Filter &&
|
||||||
|
bytes.Equal(d.ExclusiveStartKey, d.ExclusiveStartKey) &&
|
||||||
d.compareQueries(other, compareHashesOnly)
|
d.compareQueries(other, compareHashesOnly)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -104,10 +104,20 @@ func (p *Provider) batchPutItems(ctx context.Context, name string, items []model
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Provider) ScanItems(ctx context.Context, tableName string, filterExpr *expression.Expression, maxItems int) ([]models.Item, error) {
|
func (p *Provider) ScanItems(
|
||||||
|
ctx context.Context,
|
||||||
|
tableName string,
|
||||||
|
filterExpr *expression.Expression,
|
||||||
|
exclusiveStartKey map[string]types.AttributeValue,
|
||||||
|
maxItems int,
|
||||||
|
) ([]models.Item, map[string]types.AttributeValue, error) {
|
||||||
|
const maxItemsPerPage = 100
|
||||||
|
|
||||||
input := &dynamodb.ScanInput{
|
input := &dynamodb.ScanInput{
|
||||||
TableName: aws.String(tableName),
|
TableName: aws.String(tableName),
|
||||||
Limit: aws.Int32(int32(maxItems)),
|
//Limit: aws.Int32(int32(maxItems)),
|
||||||
|
//Limit: aws.Int32(100),
|
||||||
|
//ExclusiveStartKey: exclusiveStartKey,
|
||||||
}
|
}
|
||||||
if filterExpr != nil {
|
if filterExpr != nil {
|
||||||
input.FilterExpression = filterExpr.Filter()
|
input.FilterExpression = filterExpr.Filter()
|
||||||
|
@ -115,44 +125,59 @@ func (p *Provider) ScanItems(ctx context.Context, tableName string, filterExpr *
|
||||||
input.ExpressionAttributeValues = filterExpr.Values()
|
input.ExpressionAttributeValues = filterExpr.Values()
|
||||||
}
|
}
|
||||||
|
|
||||||
paginator := dynamodb.NewScanPaginator(p.client, input, func(opt *dynamodb.ScanPaginatorOptions) {
|
var (
|
||||||
opt.Limit = 100
|
items = make([]models.Item, 0)
|
||||||
})
|
nextUpdate = time.Now().Add(1 * time.Second)
|
||||||
|
lastEvalKey = exclusiveStartKey
|
||||||
|
)
|
||||||
|
|
||||||
items := make([]models.Item, 0)
|
for len(items) < maxItems {
|
||||||
|
remainingItemsToFetch := maxItems - len(items)
|
||||||
|
if remainingItemsToFetch > maxItemsPerPage {
|
||||||
|
input.Limit = aws.Int32(maxItemsPerPage)
|
||||||
|
} else {
|
||||||
|
input.Limit = aws.Int32(int32(remainingItemsToFetch))
|
||||||
|
}
|
||||||
|
input.ExclusiveStartKey = lastEvalKey
|
||||||
|
|
||||||
nextUpdate := time.Now().Add(1 * time.Second)
|
out, err := p.client.Scan(ctx, input)
|
||||||
|
|
||||||
outer:
|
|
||||||
for paginator.HasMorePages() {
|
|
||||||
res, err := paginator.NextPage(ctx)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
return items, models.NewPartialResultsError(ctx.Err())
|
return items, nil, models.NewPartialResultsError(ctx.Err())
|
||||||
}
|
}
|
||||||
return nil, errors.Wrapf(err, "cannot execute scan on table %v", tableName)
|
return nil, nil, errors.Wrapf(err, "cannot execute scan on table %v", tableName)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, itm := range res.Items {
|
for _, itm := range out.Items {
|
||||||
items = append(items, itm)
|
items = append(items, itm)
|
||||||
if len(items) >= maxItems {
|
|
||||||
break outer
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if time.Now().After(nextUpdate) {
|
if time.Now().After(nextUpdate) {
|
||||||
jobs.PostUpdate(ctx, fmt.Sprintf("found %d items", len(items)))
|
jobs.PostUpdate(ctx, fmt.Sprintf("found %d items", len(items)))
|
||||||
nextUpdate = time.Now().Add(1 * time.Second)
|
nextUpdate = time.Now().Add(1 * time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lastEvalKey = out.LastEvaluatedKey
|
||||||
|
if lastEvalKey == nil {
|
||||||
|
// We've reached the last page
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return items, nil
|
return items, lastEvalKey, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Provider) QueryItems(ctx context.Context, tableName string, filterExpr *expression.Expression, maxItems int) ([]models.Item, error) {
|
func (p *Provider) QueryItems(
|
||||||
|
ctx context.Context,
|
||||||
|
tableName string,
|
||||||
|
filterExpr *expression.Expression,
|
||||||
|
exclusiveStartKey map[string]types.AttributeValue,
|
||||||
|
maxItems int,
|
||||||
|
) ([]models.Item, map[string]types.AttributeValue, error) {
|
||||||
|
const maxItemsPerPage = 100
|
||||||
|
|
||||||
input := &dynamodb.QueryInput{
|
input := &dynamodb.QueryInput{
|
||||||
TableName: aws.String(tableName),
|
TableName: aws.String(tableName),
|
||||||
Limit: aws.Int32(int32(maxItems)),
|
|
||||||
}
|
}
|
||||||
if filterExpr != nil {
|
if filterExpr != nil {
|
||||||
input.KeyConditionExpression = filterExpr.KeyCondition()
|
input.KeyConditionExpression = filterExpr.KeyCondition()
|
||||||
|
@ -161,38 +186,46 @@ func (p *Provider) QueryItems(ctx context.Context, tableName string, filterExpr
|
||||||
input.ExpressionAttributeValues = filterExpr.Values()
|
input.ExpressionAttributeValues = filterExpr.Values()
|
||||||
}
|
}
|
||||||
|
|
||||||
paginator := dynamodb.NewQueryPaginator(p.client, input, func(opt *dynamodb.QueryPaginatorOptions) {
|
var (
|
||||||
opt.Limit = 100
|
items = make([]models.Item, 0)
|
||||||
})
|
nextUpdate = time.Now().Add(1 * time.Second)
|
||||||
|
lastEvalKey = exclusiveStartKey
|
||||||
|
)
|
||||||
|
|
||||||
items := make([]models.Item, 0)
|
for len(items) < maxItems {
|
||||||
|
remainingItemsToFetch := maxItems - len(items)
|
||||||
|
if remainingItemsToFetch > maxItemsPerPage {
|
||||||
|
input.Limit = aws.Int32(maxItemsPerPage)
|
||||||
|
} else {
|
||||||
|
input.Limit = aws.Int32(int32(remainingItemsToFetch))
|
||||||
|
}
|
||||||
|
input.ExclusiveStartKey = lastEvalKey
|
||||||
|
|
||||||
nextUpdate := time.Now().Add(1 * time.Second)
|
out, err := p.client.Query(ctx, input)
|
||||||
|
|
||||||
outer:
|
|
||||||
for paginator.HasMorePages() {
|
|
||||||
res, err := paginator.NextPage(ctx)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
return items, models.NewPartialResultsError(ctx.Err())
|
return items, nil, models.NewPartialResultsError(ctx.Err())
|
||||||
}
|
}
|
||||||
return nil, errors.Wrapf(err, "cannot execute query on table %v", tableName)
|
return nil, nil, errors.Wrapf(err, "cannot execute scan on table %v", tableName)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, itm := range res.Items {
|
for _, itm := range out.Items {
|
||||||
items = append(items, itm)
|
items = append(items, itm)
|
||||||
if len(items) >= maxItems {
|
|
||||||
break outer
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if time.Now().After(nextUpdate) {
|
if time.Now().After(nextUpdate) {
|
||||||
jobs.PostUpdate(ctx, fmt.Sprintf("found %d items", len(items)))
|
jobs.PostUpdate(ctx, fmt.Sprintf("found %d items", len(items)))
|
||||||
nextUpdate = time.Now().Add(1 * time.Second)
|
nextUpdate = time.Now().Add(1 * time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lastEvalKey = out.LastEvaluatedKey
|
||||||
|
if lastEvalKey == nil {
|
||||||
|
// We've reached the last page
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return items, nil
|
return items, lastEvalKey, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Provider) DeleteItem(ctx context.Context, tableName string, key map[string]types.AttributeValue) error {
|
func (p *Provider) DeleteItem(ctx context.Context, tableName string, key map[string]types.AttributeValue) error {
|
||||||
|
|
|
@ -21,8 +21,9 @@ func TestProvider_ScanItems(t *testing.T) {
|
||||||
t.Run("should return scanned items from the table", func(t *testing.T) {
|
t.Run("should return scanned items from the table", func(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
items, err := provider.ScanItems(ctx, tableName, nil, 100)
|
items, lev, err := provider.ScanItems(ctx, tableName, nil, nil, 100)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
assert.Nil(t, lev)
|
||||||
assert.Len(t, items, 3)
|
assert.Len(t, items, 3)
|
||||||
|
|
||||||
assert.Contains(t, items, testdynamo.TestRecordAsItem(t, testData[0].Data[0]))
|
assert.Contains(t, items, testdynamo.TestRecordAsItem(t, testData[0].Data[0]))
|
||||||
|
@ -33,8 +34,9 @@ func TestProvider_ScanItems(t *testing.T) {
|
||||||
t.Run("should return error if table name does not exist", func(t *testing.T) {
|
t.Run("should return error if table name does not exist", func(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
items, err := provider.ScanItems(ctx, "does-not-exist", nil, 100)
|
items, lev, err := provider.ScanItems(ctx, "does-not-exist", nil, nil, 100)
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
|
assert.Nil(t, lev)
|
||||||
assert.Nil(t, items)
|
assert.Nil(t, items)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -79,8 +81,9 @@ func TestProvider_PutItems(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Verify the data
|
// Verify the data
|
||||||
readItems, err := provider.ScanItems(ctx, tableName, nil, scenario.maxItems+5)
|
readItems, lev, err := provider.ScanItems(ctx, tableName, nil, nil, scenario.maxItems+5)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
assert.Nil(t, lev)
|
||||||
assert.Len(t, readItems, scenario.maxItems)
|
assert.Len(t, readItems, scenario.maxItems)
|
||||||
|
|
||||||
for i := 0; i < scenario.maxItems; i++ {
|
for i := 0; i < scenario.maxItems; i++ {
|
||||||
|
@ -104,8 +107,9 @@ func TestProvider_DeleteItem(t *testing.T) {
|
||||||
"sk": &types.AttributeValueMemberS{Value: "222"},
|
"sk": &types.AttributeValueMemberS{Value: "222"},
|
||||||
})
|
})
|
||||||
|
|
||||||
items, err := provider.ScanItems(ctx, tableName, nil, 100)
|
items, lev, err := provider.ScanItems(ctx, tableName, nil, nil, 100)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
assert.Nil(t, lev)
|
||||||
assert.Len(t, items, 2)
|
assert.Len(t, items, 2)
|
||||||
|
|
||||||
assert.Contains(t, items, testdynamo.TestRecordAsItem(t, testData[0].Data[0]))
|
assert.Contains(t, items, testdynamo.TestRecordAsItem(t, testData[0].Data[0]))
|
||||||
|
@ -125,8 +129,9 @@ func TestProvider_DeleteItem(t *testing.T) {
|
||||||
"sk": &types.AttributeValueMemberS{Value: "999"},
|
"sk": &types.AttributeValueMemberS{Value: "999"},
|
||||||
})
|
})
|
||||||
|
|
||||||
items, err := provider.ScanItems(ctx, tableName, nil, 100)
|
items, lev, err := provider.ScanItems(ctx, tableName, nil, nil, 100)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
assert.Nil(t, lev)
|
||||||
assert.Len(t, items, 3)
|
assert.Len(t, items, 3)
|
||||||
|
|
||||||
assert.Contains(t, items, testdynamo.TestRecordAsItem(t, testData[0].Data[0]))
|
assert.Contains(t, items, testdynamo.TestRecordAsItem(t, testData[0].Data[0]))
|
||||||
|
@ -140,8 +145,9 @@ func TestProvider_DeleteItem(t *testing.T) {
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
items, err := provider.ScanItems(ctx, "does-not-exist", nil, 100)
|
items, lev, err := provider.ScanItems(ctx, "does-not-exist", nil, nil, 100)
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
|
assert.Nil(t, lev)
|
||||||
assert.Nil(t, items)
|
assert.Nil(t, items)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ type SessionService interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type QueryOptions struct {
|
type QueryOptions struct {
|
||||||
|
TableName string
|
||||||
NamePlaceholders map[string]string
|
NamePlaceholders map[string]string
|
||||||
ValuePlaceholders map[string]types.AttributeValue
|
ValuePlaceholders map[string]types.AttributeValue
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,11 @@ func (um *sessionModule) query(ctx context.Context, args ...object.Object) objec
|
||||||
return objErr
|
return objErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Table name
|
||||||
|
if val, isVal := objMap.Get("table").(*object.String); isVal && val.Value() != "" {
|
||||||
|
options.TableName = val.Value()
|
||||||
|
}
|
||||||
|
|
||||||
// Placeholders
|
// Placeholders
|
||||||
if argsVal, isArgsValMap := objMap.Get("args").(*object.Map); isArgsValMap {
|
if argsVal, isArgsValMap := objMap.Get("args").(*object.Map); isArgsValMap {
|
||||||
options.NamePlaceholders = make(map[string]string)
|
options.NamePlaceholders = make(map[string]string)
|
||||||
|
|
|
@ -79,6 +79,37 @@ func TestModSession_Query(t *testing.T) {
|
||||||
mockedSessionService.AssertExpectations(t)
|
mockedSessionService.AssertExpectations(t)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("should successfully specify table name", func(t *testing.T) {
|
||||||
|
rs := &models.ResultSet{}
|
||||||
|
|
||||||
|
mockedSessionService := mocks.NewSessionService(t)
|
||||||
|
mockedSessionService.EXPECT().Query(mock.Anything, "some expr", scriptmanager.QueryOptions{
|
||||||
|
TableName: "some-table",
|
||||||
|
}).Return(rs, nil)
|
||||||
|
|
||||||
|
mockedUIService := mocks.NewUIService(t)
|
||||||
|
|
||||||
|
testFS := testScriptFile(t, "test.tm", `
|
||||||
|
res := session.query("some expr", {
|
||||||
|
table: "some-table",
|
||||||
|
})
|
||||||
|
assert(!res.is_err())
|
||||||
|
`)
|
||||||
|
|
||||||
|
srv := scriptmanager.New(scriptmanager.WithFS(testFS))
|
||||||
|
srv.SetIFaces(scriptmanager.Ifaces{
|
||||||
|
UI: mockedUIService,
|
||||||
|
Session: mockedSessionService,
|
||||||
|
})
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
err := <-srv.RunAdHocScript(ctx, "test.tm")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
mockedUIService.AssertExpectations(t)
|
||||||
|
mockedSessionService.AssertExpectations(t)
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("should set placeholder values", func(t *testing.T) {
|
t.Run("should set placeholder values", func(t *testing.T) {
|
||||||
rs := &models.ResultSet{}
|
rs := &models.ResultSet{}
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,6 @@ package tables
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
|
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
||||||
"github.com/lmika/audax/internal/dynamo-browse/models"
|
"github.com/lmika/audax/internal/dynamo-browse/models"
|
||||||
)
|
)
|
||||||
|
@ -11,11 +10,24 @@ import (
|
||||||
type TableProvider interface {
|
type TableProvider interface {
|
||||||
ListTables(ctx context.Context) ([]string, error)
|
ListTables(ctx context.Context) ([]string, error)
|
||||||
DescribeTable(ctx context.Context, tableName string) (*models.TableInfo, error)
|
DescribeTable(ctx context.Context, tableName string) (*models.TableInfo, error)
|
||||||
QueryItems(ctx context.Context, tableName string, filterExpr *expression.Expression, maxItems int) ([]models.Item, error)
|
|
||||||
ScanItems(ctx context.Context, tableName string, filterExpr *expression.Expression, maxItems int) ([]models.Item, error)
|
|
||||||
DeleteItem(ctx context.Context, tableName string, key map[string]types.AttributeValue) error
|
DeleteItem(ctx context.Context, tableName string, key map[string]types.AttributeValue) error
|
||||||
PutItem(ctx context.Context, name string, item models.Item) error
|
PutItem(ctx context.Context, name string, item models.Item) error
|
||||||
PutItems(ctx context.Context, name string, items []models.Item) error
|
PutItems(ctx context.Context, name string, items []models.Item) error
|
||||||
|
|
||||||
|
QueryItems(
|
||||||
|
ctx context.Context,
|
||||||
|
tableName string,
|
||||||
|
filterExpr *expression.Expression,
|
||||||
|
exclusiveStartKey map[string]types.AttributeValue,
|
||||||
|
maxItems int,
|
||||||
|
) (items []models.Item, lastEvaluatedKey map[string]types.AttributeValue, err error)
|
||||||
|
ScanItems(
|
||||||
|
ctx context.Context,
|
||||||
|
tableName string,
|
||||||
|
filterExpr *expression.Expression,
|
||||||
|
exclusiveStartKey map[string]types.AttributeValue,
|
||||||
|
maxItems int,
|
||||||
|
) (item []models.Item, lastEvaluatedKey map[string]types.AttributeValue, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ConfigProvider interface {
|
type ConfigProvider interface {
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
|
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
|
||||||
|
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
||||||
"github.com/lmika/audax/internal/common/sliceutils"
|
"github.com/lmika/audax/internal/common/sliceutils"
|
||||||
"github.com/lmika/audax/internal/dynamo-browse/services/jobs"
|
"github.com/lmika/audax/internal/dynamo-browse/services/jobs"
|
||||||
"log"
|
"log"
|
||||||
|
@ -35,10 +36,16 @@ func (s *Service) Describe(ctx context.Context, table string) (*models.TableInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Scan(ctx context.Context, tableInfo *models.TableInfo) (*models.ResultSet, error) {
|
func (s *Service) Scan(ctx context.Context, tableInfo *models.TableInfo) (*models.ResultSet, error) {
|
||||||
return s.doScan(ctx, tableInfo, nil, s.configProvider.DefaultLimit())
|
return s.doScan(ctx, tableInfo, nil, nil, s.configProvider.DefaultLimit())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) doScan(ctx context.Context, tableInfo *models.TableInfo, expr models.Queryable, limit int) (*models.ResultSet, error) {
|
func (s *Service) doScan(
|
||||||
|
ctx context.Context,
|
||||||
|
tableInfo *models.TableInfo,
|
||||||
|
expr models.Queryable,
|
||||||
|
exclusiveStartKey map[string]types.AttributeValue,
|
||||||
|
limit int,
|
||||||
|
) (*models.ResultSet, error) {
|
||||||
var (
|
var (
|
||||||
filterExpr *expression.Expression
|
filterExpr *expression.Expression
|
||||||
runAsQuery bool
|
runAsQuery bool
|
||||||
|
@ -55,18 +62,21 @@ func (s *Service) doScan(ctx context.Context, tableInfo *models.TableInfo, expr
|
||||||
}
|
}
|
||||||
|
|
||||||
var results []models.Item
|
var results []models.Item
|
||||||
|
var lastEvalKey map[string]types.AttributeValue
|
||||||
if runAsQuery {
|
if runAsQuery {
|
||||||
log.Printf("executing query")
|
log.Printf("executing query")
|
||||||
results, err = s.provider.QueryItems(ctx, tableInfo.Name, filterExpr, limit)
|
results, lastEvalKey, err = s.provider.QueryItems(ctx, tableInfo.Name, filterExpr, exclusiveStartKey, limit)
|
||||||
} else {
|
} else {
|
||||||
log.Printf("executing scan")
|
log.Printf("executing scan")
|
||||||
results, err = s.provider.ScanItems(ctx, tableInfo.Name, filterExpr, limit)
|
results, lastEvalKey, err = s.provider.ScanItems(ctx, tableInfo.Name, filterExpr, exclusiveStartKey, limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil && len(results) == 0 {
|
if err != nil && len(results) == 0 {
|
||||||
return &models.ResultSet{
|
return &models.ResultSet{
|
||||||
TableInfo: tableInfo,
|
TableInfo: tableInfo,
|
||||||
Query: expr,
|
Query: expr,
|
||||||
|
ExclusiveStartKey: exclusiveStartKey,
|
||||||
|
LastEvaluatedKey: lastEvalKey,
|
||||||
}, errors.Wrapf(err, "unable to scan table %v", tableInfo.Name)
|
}, errors.Wrapf(err, "unable to scan table %v", tableInfo.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,6 +85,8 @@ func (s *Service) doScan(ctx context.Context, tableInfo *models.TableInfo, expr
|
||||||
resultSet := &models.ResultSet{
|
resultSet := &models.ResultSet{
|
||||||
TableInfo: tableInfo,
|
TableInfo: tableInfo,
|
||||||
Query: expr,
|
Query: expr,
|
||||||
|
ExclusiveStartKey: exclusiveStartKey,
|
||||||
|
LastEvaluatedKey: lastEvalKey,
|
||||||
}
|
}
|
||||||
resultSet.SetItems(results)
|
resultSet.SetItems(results)
|
||||||
resultSet.RefreshColumns()
|
resultSet.RefreshColumns()
|
||||||
|
@ -147,8 +159,12 @@ func (s *Service) Delete(ctx context.Context, tableInfo *models.TableInfo, items
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) ScanOrQuery(ctx context.Context, tableInfo *models.TableInfo, expr models.Queryable) (*models.ResultSet, error) {
|
func (s *Service) ScanOrQuery(ctx context.Context, tableInfo *models.TableInfo, expr models.Queryable, exclusiveStartKey map[string]types.AttributeValue) (*models.ResultSet, error) {
|
||||||
return s.doScan(ctx, tableInfo, expr, s.configProvider.DefaultLimit())
|
return s.doScan(ctx, tableInfo, expr, exclusiveStartKey, s.configProvider.DefaultLimit())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) NextPage(ctx context.Context, resultSet *models.ResultSet) (*models.ResultSet, error) {
|
||||||
|
return s.doScan(ctx, resultSet.TableInfo, resultSet.Query, resultSet.LastEvaluatedKey, s.configProvider.DefaultLimit())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) assertReadWrite() error {
|
func (s *Service) assertReadWrite() error {
|
||||||
|
|
|
@ -29,6 +29,7 @@ func Default() *KeyBindings {
|
||||||
Rescan: key.NewBinding(key.WithKeys("R"), key.WithHelp("R", "rescan")),
|
Rescan: key.NewBinding(key.WithKeys("R"), key.WithHelp("R", "rescan")),
|
||||||
PromptForQuery: key.NewBinding(key.WithKeys("?"), key.WithHelp("?", "prompt for query")),
|
PromptForQuery: key.NewBinding(key.WithKeys("?"), key.WithHelp("?", "prompt for query")),
|
||||||
PromptForFilter: key.NewBinding(key.WithKeys("/"), key.WithHelp("/", "filter")),
|
PromptForFilter: key.NewBinding(key.WithKeys("/"), key.WithHelp("/", "filter")),
|
||||||
|
FetchNextPage: key.NewBinding(key.WithKeys(">"), key.WithHelp(">", "fetch next page")),
|
||||||
ViewBack: key.NewBinding(key.WithKeys("backspace"), key.WithHelp("backspace", "go back")),
|
ViewBack: key.NewBinding(key.WithKeys("backspace"), key.WithHelp("backspace", "go back")),
|
||||||
ViewForward: key.NewBinding(key.WithKeys("\\"), key.WithHelp("\\", "go forward")),
|
ViewForward: key.NewBinding(key.WithKeys("\\"), key.WithHelp("\\", "go forward")),
|
||||||
CycleLayoutForward: key.NewBinding(key.WithKeys("w"), key.WithHelp("w", "cycle layout forward")),
|
CycleLayoutForward: key.NewBinding(key.WithKeys("w"), key.WithHelp("w", "cycle layout forward")),
|
||||||
|
|
|
@ -36,6 +36,7 @@ type ViewKeyBindings struct {
|
||||||
PromptForQuery key.Binding `keymap:"prompt-for-query"`
|
PromptForQuery key.Binding `keymap:"prompt-for-query"`
|
||||||
PromptForFilter key.Binding `keymap:"prompt-for-filter"`
|
PromptForFilter key.Binding `keymap:"prompt-for-filter"`
|
||||||
PromptForTable key.Binding `keymap:"prompt-for-table"`
|
PromptForTable key.Binding `keymap:"prompt-for-table"`
|
||||||
|
FetchNextPage key.Binding `keymap:"fetch-next-page"`
|
||||||
ViewBack key.Binding `keymap:"view-back"`
|
ViewBack key.Binding `keymap:"view-back"`
|
||||||
ViewForward key.Binding `keymap:"view-forward"`
|
ViewForward key.Binding `keymap:"view-forward"`
|
||||||
CycleLayoutForward key.Binding `keymap:"cycle-layout-forward"`
|
CycleLayoutForward key.Binding `keymap:"cycle-layout-forward"`
|
||||||
|
|
|
@ -120,6 +120,9 @@ func NewModel(
|
||||||
|
|
||||||
return rc.Mark(markOp)
|
return rc.Mark(markOp)
|
||||||
},
|
},
|
||||||
|
"next-page": func(ctx commandctrl.ExecContext, args []string) tea.Msg {
|
||||||
|
return rc.NextPage()
|
||||||
|
},
|
||||||
"delete": commandctrl.NoArgCommand(wc.DeleteMarked),
|
"delete": commandctrl.NoArgCommand(wc.DeleteMarked),
|
||||||
|
|
||||||
// TEMP
|
// TEMP
|
||||||
|
@ -205,6 +208,7 @@ func NewModel(
|
||||||
"unmark": cc.Alias("mark", []string{"none"}),
|
"unmark": cc.Alias("mark", []string{"none"}),
|
||||||
"sa": cc.Alias("set-attr", nil),
|
"sa": cc.Alias("set-attr", nil),
|
||||||
"da": cc.Alias("del-attr", nil),
|
"da": cc.Alias("del-attr", nil),
|
||||||
|
"np": cc.Alias("next-page", nil),
|
||||||
"w": cc.Alias("put", nil),
|
"w": cc.Alias("put", nil),
|
||||||
"q": cc.Alias("quit", nil),
|
"q": cc.Alias("quit", nil),
|
||||||
},
|
},
|
||||||
|
@ -258,6 +262,8 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
|
||||||
return m, m.tableReadController.PromptForQuery
|
return m, m.tableReadController.PromptForQuery
|
||||||
case key.Matches(msg, m.keyMap.PromptForFilter):
|
case key.Matches(msg, m.keyMap.PromptForFilter):
|
||||||
return m, m.tableReadController.Filter
|
return m, m.tableReadController.Filter
|
||||||
|
case key.Matches(msg, m.keyMap.FetchNextPage):
|
||||||
|
return m, m.tableReadController.NextPage
|
||||||
case key.Matches(msg, m.keyMap.ViewBack):
|
case key.Matches(msg, m.keyMap.ViewBack):
|
||||||
return m, m.tableReadController.ViewBack
|
return m, m.tableReadController.ViewBack
|
||||||
case key.Matches(msg, m.keyMap.ViewForward):
|
case key.Matches(msg, m.keyMap.ViewForward):
|
||||||
|
|
|
@ -35,15 +35,15 @@ func main() {
|
||||||
dynamodb.WithEndpointResolver(dynamodb.EndpointResolverFromURL("http://localhost:4566")))
|
dynamodb.WithEndpointResolver(dynamodb.EndpointResolverFromURL("http://localhost:4566")))
|
||||||
|
|
||||||
// Other tables
|
// Other tables
|
||||||
if err := createTable(ctx, dynamoClient, "user-accounts"); err != nil {
|
if err := createTable(ctx, dynamoClient, "user-accounts", false); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := createTable(ctx, dynamoClient, "inventory"); err != nil {
|
if err := createTable(ctx, dynamoClient, "inventory", true); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := createTable(ctx, dynamoClient, tableName); err != nil {
|
if err := createTable(ctx, dynamoClient, tableName, false); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,6 +51,10 @@ func main() {
|
||||||
Name: tableName,
|
Name: tableName,
|
||||||
Keys: models.KeyAttribute{PartitionKey: "pk", SortKey: "sk"},
|
Keys: models.KeyAttribute{PartitionKey: "pk", SortKey: "sk"},
|
||||||
}
|
}
|
||||||
|
inventoryTableInfo := &models.TableInfo{
|
||||||
|
Name: "inventory",
|
||||||
|
Keys: models.KeyAttribute{PartitionKey: "pk", SortKey: "sk"},
|
||||||
|
}
|
||||||
|
|
||||||
dynamoProvider := dynamo.NewProvider(dynamoClient)
|
dynamoProvider := dynamo.NewProvider(dynamoClient)
|
||||||
tableService := tables.NewService(dynamoProvider, notROService{})
|
tableService := tables.NewService(dynamoProvider, notROService{})
|
||||||
|
@ -87,16 +91,32 @@ func main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
key := gofakeit.UUID()
|
||||||
|
for i := 0; i < totalItems; i++ {
|
||||||
|
if err := tableService.Put(ctx, inventoryTableInfo, models.Item{
|
||||||
|
"pk": &types.AttributeValueMemberS{Value: key},
|
||||||
|
"sk": &types.AttributeValueMemberN{Value: fmt.Sprint(i)},
|
||||||
|
"uuid": &types.AttributeValueMemberS{Value: gofakeit.UUID()},
|
||||||
|
}); err != nil {
|
||||||
|
log.Fatalln(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
log.Printf("table '%v' created with %v items", tableName, totalItems)
|
log.Printf("table '%v' created with %v items", tableName, totalItems)
|
||||||
}
|
}
|
||||||
|
|
||||||
func createTable(ctx context.Context, dynamoClient *dynamodb.Client, tableName string) error {
|
func createTable(ctx context.Context, dynamoClient *dynamodb.Client, tableName string, skNumber bool) error {
|
||||||
if _, err := dynamoClient.DeleteTable(ctx, &dynamodb.DeleteTableInput{
|
if _, err := dynamoClient.DeleteTable(ctx, &dynamodb.DeleteTableInput{
|
||||||
TableName: aws.String(tableName),
|
TableName: aws.String(tableName),
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
log.Printf("warn: cannot delete table: %v: %v", tableName, err)
|
log.Printf("warn: cannot delete table: %v: %v", tableName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var skType = types.ScalarAttributeTypeS
|
||||||
|
if skNumber {
|
||||||
|
skType = types.ScalarAttributeTypeN
|
||||||
|
}
|
||||||
|
|
||||||
if _, err := dynamoClient.CreateTable(ctx, &dynamodb.CreateTableInput{
|
if _, err := dynamoClient.CreateTable(ctx, &dynamodb.CreateTableInput{
|
||||||
TableName: aws.String(tableName),
|
TableName: aws.String(tableName),
|
||||||
KeySchema: []types.KeySchemaElement{
|
KeySchema: []types.KeySchemaElement{
|
||||||
|
@ -105,7 +125,7 @@ func createTable(ctx context.Context, dynamoClient *dynamodb.Client, tableName s
|
||||||
},
|
},
|
||||||
AttributeDefinitions: []types.AttributeDefinition{
|
AttributeDefinitions: []types.AttributeDefinition{
|
||||||
{AttributeName: aws.String("pk"), AttributeType: types.ScalarAttributeTypeS},
|
{AttributeName: aws.String("pk"), AttributeType: types.ScalarAttributeTypeS},
|
||||||
{AttributeName: aws.String("sk"), AttributeType: types.ScalarAttributeTypeS},
|
{AttributeName: aws.String("sk"), AttributeType: skType},
|
||||||
},
|
},
|
||||||
ProvisionedThroughput: &types.ProvisionedThroughput{
|
ProvisionedThroughput: &types.ProvisionedThroughput{
|
||||||
ReadCapacityUnits: aws.Int64(100),
|
ReadCapacityUnits: aws.Int64(100),
|
||||||
|
|
Loading…
Reference in a new issue