dynamo-query: started working on queries
This commit is contained in:
parent
41af399215
commit
54fab1b1c3
15 changed files with 305 additions and 16 deletions
|
|
@ -10,4 +10,5 @@ type TableReadService interface {
|
|||
Describe(ctx context.Context, table string) (*models.TableInfo, error)
|
||||
Scan(ctx context.Context, tableInfo *models.TableInfo) (*models.ResultSet, error)
|
||||
Filter(resultSet *models.ResultSet, filter string) *models.ResultSet
|
||||
ScanOrQuery(ctx context.Context, tableInfo *models.TableInfo, queryExpr string) (*models.ResultSet, error)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -75,6 +75,29 @@ func (c *TableReadController) ScanTable(name string) tea.Cmd {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *TableReadController) PromptForQuery() tea.Cmd {
|
||||
return func() tea.Msg {
|
||||
return events.PromptForInputMsg{
|
||||
Prompt: "query: ",
|
||||
OnDone: func(value string) tea.Cmd {
|
||||
if value == "" {
|
||||
return c.Rescan()
|
||||
}
|
||||
|
||||
return func() tea.Msg {
|
||||
resultSet := c.state.ResultSet()
|
||||
newResultSet, err := c.tableService.ScanOrQuery(context.Background(), resultSet.TableInfo, value)
|
||||
if err != nil {
|
||||
return events.Error(err)
|
||||
}
|
||||
|
||||
return c.setResultSetAndFilter(newResultSet, "")
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TableReadController) Rescan() tea.Cmd {
|
||||
return func() tea.Msg {
|
||||
return c.doScan(context.Background(), c.state.ResultSet())
|
||||
|
|
|
|||
|
|
@ -100,6 +100,40 @@ func TestTableReadController_ExportCSV(t *testing.T) {
|
|||
// Hidden items?
|
||||
}
|
||||
|
||||
func TestTableReadController_Query(t *testing.T) {
|
||||
client, cleanupFn := testdynamo.SetupTestTable(t, testData)
|
||||
defer cleanupFn()
|
||||
|
||||
provider := dynamo.NewProvider(client)
|
||||
service := tables.NewService(provider)
|
||||
readController := controllers.NewTableReadController(controllers.NewState(), service, "alpha-table")
|
||||
|
||||
t.Run("should run scan with filter based on user query", func(t *testing.T) {
|
||||
tempFile := tempFile(t)
|
||||
|
||||
invokeCommand(t, readController.Init())
|
||||
invokeCommandWithPrompts(t, readController.PromptForQuery(), `pk ^= "abc"`)
|
||||
invokeCommand(t, readController.ExportCSV(tempFile))
|
||||
|
||||
bts, err := os.ReadFile(tempFile)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, string(bts), strings.Join([]string{
|
||||
"pk,sk,alpha,beta\n",
|
||||
"abc,111,This is some value,\n",
|
||||
"abc,222,This is another some value,1231\n",
|
||||
}, ""))
|
||||
})
|
||||
|
||||
t.Run("should return error if result set is not set", func(t *testing.T) {
|
||||
tempFile := tempFile(t)
|
||||
readController := controllers.NewTableReadController(controllers.NewState(), service, "non-existant-table")
|
||||
|
||||
invokeCommandExpectingError(t, readController.Init())
|
||||
invokeCommandExpectingError(t, readController.ExportCSV(tempFile))
|
||||
})
|
||||
}
|
||||
|
||||
func tempFile(t *testing.T) string {
|
||||
t.Helper()
|
||||
|
||||
|
|
|
|||
8
internal/dynamo-browse/models/query.go
Normal file
8
internal/dynamo-browse/models/query.go
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
package models
|
||||
|
||||
import "github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
|
||||
|
||||
type QueryExecutionPlan struct {
|
||||
CanQuery bool
|
||||
Expression expression.Expression
|
||||
}
|
||||
32
internal/dynamo-browse/models/queryexpr/ast.go
Normal file
32
internal/dynamo-browse/models/queryexpr/ast.go
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
package queryexpr
|
||||
|
||||
import (
|
||||
"github.com/alecthomas/participle/v2"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type astExpr struct {
|
||||
Equality *astBinOp `parser:"@@"`
|
||||
}
|
||||
|
||||
type astBinOp struct {
|
||||
Name string `parser:"@Ident"`
|
||||
Op string `parser:"@('^' '=' | '=')"`
|
||||
Value *astLiteralValue `parser:"@@"`
|
||||
}
|
||||
|
||||
type astLiteralValue struct {
|
||||
String string `parser:"@String"`
|
||||
}
|
||||
|
||||
var parser = participle.MustBuild(&astExpr{})
|
||||
|
||||
func Parse(expr string) (*QueryExpr, error) {
|
||||
var ast astExpr
|
||||
|
||||
if err := parser.ParseString("expr", expr, &ast); err != nil {
|
||||
return nil, errors.Wrapf(err, "cannot parse expression: '%v'", expr)
|
||||
}
|
||||
|
||||
return &QueryExpr{ast: &ast}, nil
|
||||
}
|
||||
52
internal/dynamo-browse/models/queryexpr/astquery.go
Normal file
52
internal/dynamo-browse/models/queryexpr/astquery.go
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
package queryexpr
|
||||
|
||||
import (
|
||||
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
|
||||
"github.com/lmika/awstools/internal/dynamo-browse/models"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func (a *astExpr) calcQuery(tableInfo *models.TableInfo) (*models.QueryExecutionPlan, error) {
|
||||
return a.Equality.calcQuery(tableInfo)
|
||||
}
|
||||
|
||||
func (a *astBinOp) calcQuery(info *models.TableInfo) (*models.QueryExecutionPlan, error) {
|
||||
// TODO: check if can be a query
|
||||
cb, err := a.calcQueryForScan(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
builder := expression.NewBuilder()
|
||||
builder = builder.WithFilter(cb)
|
||||
|
||||
expr, err := builder.Build()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &models.QueryExecutionPlan{
|
||||
CanQuery: false,
|
||||
Expression: expr,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (a *astBinOp) calcQueryForScan(info *models.TableInfo) (expression.ConditionBuilder, error) {
|
||||
v, err := a.Value.goValue()
|
||||
if err != nil {
|
||||
return expression.ConditionBuilder{}, err
|
||||
}
|
||||
|
||||
switch a.Op {
|
||||
case "=":
|
||||
return expression.Name(a.Name).Equal(expression.Value(v)), nil
|
||||
case "^=":
|
||||
strValue, isStrValue := v.(string)
|
||||
if !isStrValue {
|
||||
return expression.ConditionBuilder{}, errors.New("operand '^=' must be string")
|
||||
}
|
||||
return expression.Name(a.Name).BeginsWith(strValue), nil
|
||||
}
|
||||
|
||||
return expression.ConditionBuilder{}, errors.Errorf("unrecognised operator: %v", a.Op)
|
||||
}
|
||||
11
internal/dynamo-browse/models/queryexpr/expr.go
Normal file
11
internal/dynamo-browse/models/queryexpr/expr.go
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
package queryexpr
|
||||
|
||||
import "github.com/lmika/awstools/internal/dynamo-browse/models"
|
||||
|
||||
type QueryExpr struct {
|
||||
ast *astExpr
|
||||
}
|
||||
|
||||
func (md *QueryExpr) BuildQuery(tableInfo *models.TableInfo) (*models.QueryExecutionPlan, error) {
|
||||
return md.ast.calcQuery(tableInfo)
|
||||
}
|
||||
47
internal/dynamo-browse/models/queryexpr/expr_test.go
Normal file
47
internal/dynamo-browse/models/queryexpr/expr_test.go
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
package queryexpr_test
|
||||
|
||||
import (
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
||||
"github.com/lmika/awstools/internal/dynamo-browse/models/queryexpr"
|
||||
"testing"
|
||||
|
||||
"github.com/lmika/awstools/internal/dynamo-browse/models"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestModExpr_Query(t *testing.T) {
|
||||
tableInfo := &models.TableInfo{
|
||||
Name: "test",
|
||||
Keys: models.KeyAttribute{
|
||||
PartitionKey: "pk",
|
||||
SortKey: "sk",
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("perform query when request pk is fixed", func(t *testing.T) {
|
||||
modExpr, err := queryexpr.Parse(`pk="prefix"`)
|
||||
assert.NoError(t, err)
|
||||
|
||||
plan, err := modExpr.BuildQuery(tableInfo)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.False(t, plan.CanQuery)
|
||||
assert.Equal(t, "#0 = :0", aws.ToString(plan.Expression.Filter()))
|
||||
assert.Equal(t, "pk", plan.Expression.Names()["#0"])
|
||||
assert.Equal(t, "prefix", plan.Expression.Values()[":0"].(*types.AttributeValueMemberS).Value)
|
||||
})
|
||||
|
||||
t.Run("perform scan when request pk prefix", func(t *testing.T) {
|
||||
modExpr, err := queryexpr.Parse(`pk^="prefix"`) // TODO: fix this so that '^ =' is invalid
|
||||
assert.NoError(t, err)
|
||||
|
||||
plan, err := modExpr.BuildQuery(tableInfo)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.False(t, plan.CanQuery)
|
||||
assert.Equal(t, "begins_with (#0, :0)", aws.ToString(plan.Expression.Filter()))
|
||||
assert.Equal(t, "pk", plan.Expression.Names()["#0"])
|
||||
assert.Equal(t, "prefix", plan.Expression.Values()[":0"].(*types.AttributeValueMemberS).Value)
|
||||
})
|
||||
}
|
||||
24
internal/dynamo-browse/models/queryexpr/values.go
Normal file
24
internal/dynamo-browse/models/queryexpr/values.go
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
package queryexpr
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func (a *astLiteralValue) dynamoValue() (types.AttributeValue, error) {
|
||||
s, err := strconv.Unquote(a.String)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot unquote string")
|
||||
}
|
||||
return &types.AttributeValueMemberS{Value: s}, nil
|
||||
}
|
||||
|
||||
func (a *astLiteralValue) goValue() (any, error) {
|
||||
s, err := strconv.Unquote(a.String)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot unquote string")
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
|
@ -2,8 +2,8 @@ package dynamo
|
|||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
|
||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
|
||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
||||
"github.com/lmika/awstools/internal/dynamo-browse/models"
|
||||
|
|
@ -64,11 +64,18 @@ func NewProvider(client *dynamodb.Client) *Provider {
|
|||
return &Provider{client: client}
|
||||
}
|
||||
|
||||
func (p *Provider) ScanItems(ctx context.Context, tableName string, maxItems int) ([]models.Item, error) {
|
||||
paginator := dynamodb.NewScanPaginator(p.client, &dynamodb.ScanInput{
|
||||
func (p *Provider) ScanItems(ctx context.Context, tableName string, filterExpr *expression.Expression, maxItems int) ([]models.Item, error) {
|
||||
input := &dynamodb.ScanInput{
|
||||
TableName: aws.String(tableName),
|
||||
Limit: aws.Int32(int32(maxItems)),
|
||||
})
|
||||
}
|
||||
if filterExpr != nil {
|
||||
input.FilterExpression = filterExpr.Filter()
|
||||
input.ExpressionAttributeNames = filterExpr.Names()
|
||||
input.ExpressionAttributeValues = filterExpr.Values()
|
||||
}
|
||||
|
||||
paginator := dynamodb.NewScanPaginator(p.client, input)
|
||||
|
||||
items := make([]models.Item, 0)
|
||||
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package tables
|
|||
|
||||
import (
|
||||
"context"
|
||||
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
||||
"github.com/lmika/awstools/internal/dynamo-browse/models"
|
||||
|
|
@ -10,7 +11,7 @@ import (
|
|||
type TableProvider interface {
|
||||
ListTables(ctx context.Context) ([]string, error)
|
||||
DescribeTable(ctx context.Context, tableName string) (*models.TableInfo, error)
|
||||
ScanItems(ctx context.Context, tableName string, maxItems int) ([]models.Item, error)
|
||||
ScanItems(ctx context.Context, tableName string, filterExpr *expression.Expression, maxItems int) ([]models.Item, error)
|
||||
DeleteItem(ctx context.Context, tableName string, key map[string]types.AttributeValue) error
|
||||
PutItem(ctx context.Context, name string, item models.Item) error
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,8 @@ package tables
|
|||
|
||||
import (
|
||||
"context"
|
||||
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
|
||||
"github.com/lmika/awstools/internal/dynamo-browse/models/queryexpr"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
|
|
@ -28,7 +30,11 @@ func (s *Service) Describe(ctx context.Context, table string) (*models.TableInfo
|
|||
}
|
||||
|
||||
func (s *Service) Scan(ctx context.Context, tableInfo *models.TableInfo) (*models.ResultSet, error) {
|
||||
results, err := s.provider.ScanItems(ctx, tableInfo.Name, 1000)
|
||||
return s.doScan(ctx, tableInfo, nil)
|
||||
}
|
||||
|
||||
func (s *Service) doScan(ctx context.Context, tableInfo *models.TableInfo, filterExpr *expression.Expression) (*models.ResultSet, error) {
|
||||
results, err := s.provider.ScanItems(ctx, tableInfo.Name, filterExpr, 1000)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "unable to scan table %v", tableInfo.Name)
|
||||
}
|
||||
|
|
@ -101,6 +107,25 @@ func (s *Service) Delete(ctx context.Context, tableInfo *models.TableInfo, items
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) ScanOrQuery(ctx context.Context, tableInfo *models.TableInfo, queryExpr string) (*models.ResultSet, error) {
|
||||
expr, err := queryexpr.Parse(queryExpr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
plan, err := expr.BuildQuery(tableInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TEMP
|
||||
if plan.CanQuery {
|
||||
return nil, errors.Errorf("queries not yet supported")
|
||||
}
|
||||
|
||||
return s.doScan(ctx, tableInfo, &plan.Expression)
|
||||
}
|
||||
|
||||
// TODO: move into a new service
|
||||
func (s *Service) Filter(resultSet *models.ResultSet, filter string) *models.ResultSet {
|
||||
for i, item := range resultSet.Items() {
|
||||
|
|
|
|||
|
|
@ -98,8 +98,10 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
|
|||
if idx := m.tableView.SelectedItemIndex(); idx >= 0 {
|
||||
return m, m.tableWriteController.ToggleMark(idx)
|
||||
}
|
||||
case "r":
|
||||
case "R":
|
||||
return m, m.tableReadController.Rescan()
|
||||
case "?":
|
||||
return m, m.tableReadController.PromptForQuery()
|
||||
case "/":
|
||||
return m, m.tableReadController.Filter()
|
||||
case ":":
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue