Issue 47: Query expression planner now recognises GSIs (#48)
The query expression planner now recognises GSIs, and will use them if the expression can be executed as a query over an index.
This commit is contained in:
parent
7caf905c82
commit
733e59ec95
15 changed files with 190 additions and 163 deletions
|
|
@ -100,6 +100,24 @@ func TestTableReadController_Query(t *testing.T) {
|
|||
}, ""))
|
||||
})
|
||||
|
||||
t.Run("should run query on index with filter based on user query", func(t *testing.T) {
|
||||
srv := newService(t, serviceConfig{tableName: "bravo-table"})
|
||||
|
||||
tempFile := tempFile(t)
|
||||
|
||||
invokeCommand(t, srv.readController.Init())
|
||||
invokeCommandWithPrompts(t, srv.readController.PromptForQuery(), `alpha = "This is some value"`)
|
||||
invokeCommand(t, srv.exportController.ExportCSV(tempFile))
|
||||
|
||||
bts, err := os.ReadFile(tempFile)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, string(bts), strings.Join([]string{
|
||||
"pk,sk,alpha\n",
|
||||
"foo,bar,This is some value\n",
|
||||
}, ""))
|
||||
})
|
||||
|
||||
t.Run("should return error if result set is not set", func(t *testing.T) {
|
||||
srv := newService(t, serviceConfig{tableName: "non-existant-table"})
|
||||
|
||||
|
|
@ -250,6 +268,7 @@ var testData = []testdynamo.TestData{
|
|||
},
|
||||
{
|
||||
TableName: "bravo-table",
|
||||
Index: []string{"alpha"},
|
||||
Data: []map[string]interface{}{
|
||||
{
|
||||
"pk": "foo",
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
type QueryExecutionPlan struct {
|
||||
CanQuery bool
|
||||
IndexName string
|
||||
Expression expression.Expression
|
||||
}
|
||||
|
||||
|
|
@ -18,6 +19,9 @@ func (qep QueryExecutionPlan) Describe(dp DescribingPrinter) {
|
|||
dp.Println(" execute as: scan")
|
||||
}
|
||||
|
||||
if qep.IndexName != "" {
|
||||
dp.Printf(" index: %v", qep.IndexName)
|
||||
}
|
||||
if keyCond := aws.ToString(qep.Expression.KeyCondition()); keyCond != "" {
|
||||
dp.Printf(" key condition: %v", keyCond)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
"github.com/alecthomas/participle/v2/lexer"
|
||||
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
|
||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
||||
"github.com/lmika/audax/internal/common/sliceutils"
|
||||
"github.com/lmika/audax/internal/dynamo-browse/models"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
|
@ -121,30 +122,43 @@ func Parse(expr string) (*QueryExpr, error) {
|
|||
}
|
||||
|
||||
func (a *astExpr) calcQuery(ctx *evalContext, info *models.TableInfo) (*models.QueryExecutionPlan, error) {
|
||||
type queryTestAttempt struct {
|
||||
index string
|
||||
keysUnderTest models.KeyAttribute
|
||||
}
|
||||
queryTestAttempts := append(
|
||||
[]queryTestAttempt{{keysUnderTest: info.Keys}},
|
||||
sliceutils.Map(info.GSIs, func(gsi models.TableGSI) queryTestAttempt {
|
||||
return queryTestAttempt{index: gsi.Name, keysUnderTest: gsi.Keys}
|
||||
})...)
|
||||
|
||||
ir, err := a.evalToIR(ctx, info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var qci queryCalcInfo
|
||||
if canExecuteAsQuery(ir, info, &qci) {
|
||||
ke, err := ir.(queryableIRAtom).calcQueryForQuery(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
for _, attempt := range queryTestAttempts {
|
||||
var qci = queryCalcInfo{keysUnderTest: attempt.keysUnderTest}
|
||||
if canExecuteAsQuery(ir, &qci) {
|
||||
ke, err := ir.(queryableIRAtom).calcQueryForQuery()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
builder := expression.NewBuilder()
|
||||
builder = builder.WithKeyCondition(ke)
|
||||
|
||||
expr, err := builder.Build()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &models.QueryExecutionPlan{
|
||||
CanQuery: true,
|
||||
IndexName: attempt.index,
|
||||
Expression: expr,
|
||||
}, nil
|
||||
}
|
||||
|
||||
builder := expression.NewBuilder()
|
||||
builder = builder.WithKeyCondition(ke)
|
||||
|
||||
expr, err := builder.Build()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &models.QueryExecutionPlan{
|
||||
CanQuery: true,
|
||||
Expression: expr,
|
||||
}, nil
|
||||
}
|
||||
|
||||
cb, err := ir.calcQueryForScan(info)
|
||||
|
|
|
|||
|
|
@ -128,14 +128,14 @@ type irKeyFieldCmp struct {
|
|||
cmpType int
|
||||
}
|
||||
|
||||
func (a irKeyFieldCmp) canBeExecutedAsQuery(info *models.TableInfo, qci *queryCalcInfo) bool {
|
||||
func (a irKeyFieldCmp) canBeExecutedAsQuery(qci *queryCalcInfo) bool {
|
||||
keyName := a.name.keyName()
|
||||
if keyName == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
if keyName == info.Keys.SortKey {
|
||||
return qci.addKey(info, keyName)
|
||||
if keyName == qci.keysUnderTest.SortKey {
|
||||
return qci.addKey(keyName)
|
||||
}
|
||||
|
||||
return false
|
||||
|
|
@ -158,7 +158,7 @@ func (a irKeyFieldCmp) calcQueryForScan(info *models.TableInfo) (expression.Cond
|
|||
return expression.ConditionBuilder{}, errors.New("unsupported cmp type")
|
||||
}
|
||||
|
||||
func (a irKeyFieldCmp) calcQueryForQuery(info *models.TableInfo) (expression.KeyConditionBuilder, error) {
|
||||
func (a irKeyFieldCmp) calcQueryForQuery() (expression.KeyConditionBuilder, error) {
|
||||
keyName := a.name.keyName()
|
||||
vb := a.value.goValue()
|
||||
|
||||
|
|
|
|||
|
|
@ -100,31 +100,31 @@ type irDualConjunction struct {
|
|||
leftIsPK bool
|
||||
}
|
||||
|
||||
func (i *irDualConjunction) canBeExecutedAsQuery(info *models.TableInfo, qci *queryCalcInfo) bool {
|
||||
func (i *irDualConjunction) canBeExecutedAsQuery(qci *queryCalcInfo) bool {
|
||||
qciCopy := qci.clone()
|
||||
|
||||
leftCanExecuteAsQuery := canExecuteAsQuery(i.left, info, qci)
|
||||
leftCanExecuteAsQuery := canExecuteAsQuery(i.left, qci)
|
||||
if leftCanExecuteAsQuery {
|
||||
i.leftIsPK = qci.hasSeenPrimaryKey(info)
|
||||
return canExecuteAsQuery(i.right, info, qci)
|
||||
i.leftIsPK = qci.hasSeenPrimaryKey()
|
||||
return canExecuteAsQuery(i.right, qci)
|
||||
}
|
||||
|
||||
// Might be that the right is the partition key, so test again with them swapped
|
||||
rightCanExecuteAsQuery := canExecuteAsQuery(i.right, info, qciCopy)
|
||||
rightCanExecuteAsQuery := canExecuteAsQuery(i.right, qciCopy)
|
||||
if rightCanExecuteAsQuery {
|
||||
return canExecuteAsQuery(i.left, info, qciCopy)
|
||||
return canExecuteAsQuery(i.left, qciCopy)
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (i *irDualConjunction) calcQueryForQuery(info *models.TableInfo) (expression.KeyConditionBuilder, error) {
|
||||
left, err := i.left.(queryableIRAtom).calcQueryForQuery(info)
|
||||
func (i *irDualConjunction) calcQueryForQuery() (expression.KeyConditionBuilder, error) {
|
||||
left, err := i.left.(queryableIRAtom).calcQueryForQuery()
|
||||
if err != nil {
|
||||
return expression.KeyConditionBuilder{}, err
|
||||
}
|
||||
|
||||
right, err := i.right.(queryableIRAtom).calcQueryForQuery(info)
|
||||
right, err := i.right.(queryableIRAtom).calcQueryForQuery()
|
||||
if err != nil {
|
||||
return expression.KeyConditionBuilder{}, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -136,15 +136,15 @@ type irKeyFieldEq struct {
|
|||
value valueIRAtom
|
||||
}
|
||||
|
||||
func (a irKeyFieldEq) canBeExecutedAsQuery(info *models.TableInfo, qci *queryCalcInfo) bool {
|
||||
func (a irKeyFieldEq) canBeExecutedAsQuery(qci *queryCalcInfo) bool {
|
||||
keyName := a.name.keyName()
|
||||
if keyName == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
if keyName == info.Keys.PartitionKey ||
|
||||
(keyName == info.Keys.SortKey && qci.hasSeenPrimaryKey(info)) {
|
||||
return qci.addKey(info, keyName)
|
||||
if keyName == qci.keysUnderTest.PartitionKey ||
|
||||
(keyName == qci.keysUnderTest.SortKey && qci.hasSeenPrimaryKey()) {
|
||||
return qci.addKey(keyName)
|
||||
}
|
||||
|
||||
return false
|
||||
|
|
@ -156,7 +156,7 @@ func (a irKeyFieldEq) calcQueryForScan(info *models.TableInfo) (expression.Condi
|
|||
return nb.Equal(vb), nil
|
||||
}
|
||||
|
||||
func (a irKeyFieldEq) calcQueryForQuery(info *models.TableInfo) (expression.KeyConditionBuilder, error) {
|
||||
func (a irKeyFieldEq) calcQueryForQuery() (expression.KeyConditionBuilder, error) {
|
||||
vb := a.value.goValue()
|
||||
return expression.Key(a.name.keyName()).Equal(expression.Value(vb)), nil
|
||||
}
|
||||
|
|
@ -188,14 +188,14 @@ type irFieldBeginsWith struct {
|
|||
value irValue
|
||||
}
|
||||
|
||||
func (a irFieldBeginsWith) canBeExecutedAsQuery(info *models.TableInfo, qci *queryCalcInfo) bool {
|
||||
func (a irFieldBeginsWith) canBeExecutedAsQuery(qci *queryCalcInfo) bool {
|
||||
keyName := a.name.keyName()
|
||||
if keyName == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
if keyName == info.Keys.SortKey && qci.hasSeenPrimaryKey(info) {
|
||||
return qci.addKey(info, a.name.keyName())
|
||||
if keyName == qci.keysUnderTest.SortKey && qci.hasSeenPrimaryKey() {
|
||||
return qci.addKey(a.name.keyName())
|
||||
}
|
||||
|
||||
return false
|
||||
|
|
@ -212,7 +212,7 @@ func (a irFieldBeginsWith) calcQueryForScan(info *models.TableInfo) (expression.
|
|||
return nb.BeginsWith(strValue), nil
|
||||
}
|
||||
|
||||
func (a irFieldBeginsWith) calcQueryForQuery(info *models.TableInfo) (expression.KeyConditionBuilder, error) {
|
||||
func (a irFieldBeginsWith) calcQueryForQuery() (expression.KeyConditionBuilder, error) {
|
||||
vb := a.value.goValue()
|
||||
strValue, isStrValue := vb.(string)
|
||||
if !isStrValue {
|
||||
|
|
|
|||
|
|
@ -199,7 +199,8 @@ func (a *astExpr) String() string {
|
|||
}
|
||||
|
||||
type queryCalcInfo struct {
|
||||
seenKeys map[string]struct{}
|
||||
keysUnderTest models.KeyAttribute
|
||||
seenKeys map[string]struct{}
|
||||
}
|
||||
|
||||
func (qc *queryCalcInfo) clone() *queryCalcInfo {
|
||||
|
|
@ -207,16 +208,16 @@ func (qc *queryCalcInfo) clone() *queryCalcInfo {
|
|||
for k, v := range qc.seenKeys {
|
||||
newKeys[k] = v
|
||||
}
|
||||
return &queryCalcInfo{seenKeys: newKeys}
|
||||
return &queryCalcInfo{keysUnderTest: qc.keysUnderTest, seenKeys: newKeys}
|
||||
}
|
||||
|
||||
func (qc *queryCalcInfo) hasSeenPrimaryKey(tableInfo *models.TableInfo) bool {
|
||||
_, hasKey := qc.seenKeys[tableInfo.Keys.PartitionKey]
|
||||
func (qc *queryCalcInfo) hasSeenPrimaryKey() bool {
|
||||
_, hasKey := qc.seenKeys[qc.keysUnderTest.PartitionKey]
|
||||
return hasKey
|
||||
}
|
||||
|
||||
func (qc *queryCalcInfo) addKey(tableInfo *models.TableInfo, key string) bool {
|
||||
if tableInfo.Keys.PartitionKey != key && tableInfo.Keys.SortKey != key {
|
||||
func (qc *queryCalcInfo) addKey(key string) bool {
|
||||
if qc.keysUnderTest.PartitionKey != key && qc.keysUnderTest.SortKey != key {
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,6 +19,22 @@ func TestModExpr_Query(t *testing.T) {
|
|||
PartitionKey: "pk",
|
||||
SortKey: "sk",
|
||||
},
|
||||
GSIs: []models.TableGSI{
|
||||
{
|
||||
Name: "with-color",
|
||||
Keys: models.KeyAttribute{
|
||||
PartitionKey: "color",
|
||||
SortKey: "shade",
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "with-apples",
|
||||
Keys: models.KeyAttribute{
|
||||
PartitionKey: "apples",
|
||||
SortKey: "sk",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("as queries", func(t *testing.T) {
|
||||
|
|
@ -113,10 +129,25 @@ func TestModExpr_Query(t *testing.T) {
|
|||
),
|
||||
|
||||
// Querying the index
|
||||
scanCase("when request pk is fixed",
|
||||
`pk="prefix"`,
|
||||
scanCase("querying the index with the index pk",
|
||||
`color="blue"`,
|
||||
`#0 = :0`,
|
||||
exprNameIsString(0, 0, "pk", "prefix"),
|
||||
indexName("with-color"),
|
||||
exprNameIsString(0, 0, "color", "blue"),
|
||||
),
|
||||
scanCase("querying the index with the index pk and index sk",
|
||||
`color="red" and shade="gray"`,
|
||||
`(#0 = :0) AND (#1 = :1)`,
|
||||
indexName("with-color"),
|
||||
exprNameIsString(0, 0, "color", "red"),
|
||||
exprNameIsString(1, 1, "shade", "gray"),
|
||||
),
|
||||
scanCase("querying the index with the index pk and begins with index sk",
|
||||
`color="yellow" and shade ^= "dark"`,
|
||||
`(#0 = :0) AND (begins_with (#1, :1))`,
|
||||
indexName("with-color"),
|
||||
exprNameIsString(0, 0, "color", "yellow"),
|
||||
exprNameIsString(1, 1, "shade", "dark"),
|
||||
),
|
||||
}
|
||||
|
||||
|
|
@ -131,6 +162,7 @@ func TestModExpr_Query(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
|
||||
assert.True(t, plan.CanQuery)
|
||||
assert.Equal(t, scenario.indexName, plan.IndexName)
|
||||
assert.Equal(t, scenario.expectedFilter, aws.ToString(plan.Expression.KeyCondition()))
|
||||
for k, v := range scenario.expectedNames {
|
||||
assert.Equal(t, v, plan.Expression.Names()[k])
|
||||
|
|
@ -824,6 +856,7 @@ type scanScenario struct {
|
|||
description string
|
||||
expression string
|
||||
expectedFilter string
|
||||
indexName string
|
||||
expectedNames map[string]string
|
||||
expectedValues map[string]types.AttributeValue
|
||||
placeholderNames map[string]string
|
||||
|
|
@ -844,6 +877,12 @@ func scanCase(description, expression, expectedFilter string, options ...func(ss
|
|||
return ss
|
||||
}
|
||||
|
||||
func indexName(indexName string) func(ss *scanScenario) {
|
||||
return func(ss *scanScenario) {
|
||||
ss.indexName = indexName
|
||||
}
|
||||
}
|
||||
|
||||
func placeholderNames(placeholderNames map[string]string) func(ss *scanScenario) {
|
||||
return func(ss *scanScenario) {
|
||||
ss.placeholderNames = placeholderNames
|
||||
|
|
|
|||
|
|
@ -16,10 +16,10 @@ type queryableIRAtom interface {
|
|||
irAtom
|
||||
|
||||
// canBeExecutedAsQuery returns true if the atom is capable of being executed as a query
|
||||
canBeExecutedAsQuery(info *models.TableInfo, qci *queryCalcInfo) bool
|
||||
canBeExecutedAsQuery(qci *queryCalcInfo) bool
|
||||
|
||||
// calcQueryForQuery returns a key condition builder for this atom to include in a query
|
||||
calcQueryForQuery(info *models.TableInfo) (expression.KeyConditionBuilder, error)
|
||||
calcQueryForQuery() (expression.KeyConditionBuilder, error)
|
||||
}
|
||||
|
||||
type oprIRAtom interface {
|
||||
|
|
@ -43,10 +43,10 @@ type multiValueIRAtom interface {
|
|||
calcGoValues(info *models.TableInfo) ([]any, error)
|
||||
}
|
||||
|
||||
func canExecuteAsQuery(ir irAtom, info *models.TableInfo, qci *queryCalcInfo) bool {
|
||||
func canExecuteAsQuery(ir irAtom, qci *queryCalcInfo) bool {
|
||||
queryable, isQuearyable := ir.(queryableIRAtom)
|
||||
if !isQuearyable {
|
||||
return false
|
||||
}
|
||||
return queryable.canBeExecutedAsQuery(info, qci)
|
||||
return queryable.canBeExecutedAsQuery(qci)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -41,12 +41,13 @@ func (p *Provider) DescribeTable(ctx context.Context, tableName string) (*models
|
|||
|
||||
var tableInfo models.TableInfo
|
||||
tableInfo.Name = aws.ToString(out.Table.TableName)
|
||||
tableInfo.Keys = p.keySchemaToKeyAttributes(out.Table.KeySchema)
|
||||
|
||||
for _, keySchema := range out.Table.KeySchema {
|
||||
if keySchema.KeyType == types.KeyTypeHash {
|
||||
tableInfo.Keys.PartitionKey = aws.ToString(keySchema.AttributeName)
|
||||
} else if keySchema.KeyType == types.KeyTypeRange {
|
||||
tableInfo.Keys.SortKey = aws.ToString(keySchema.AttributeName)
|
||||
tableInfo.GSIs = make([]models.TableGSI, len(out.Table.GlobalSecondaryIndexes))
|
||||
for i, gsiIndex := range out.Table.GlobalSecondaryIndexes {
|
||||
tableInfo.GSIs[i] = models.TableGSI{
|
||||
Name: aws.ToString(gsiIndex.IndexName),
|
||||
Keys: p.keySchemaToKeyAttributes(gsiIndex.KeySchema),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -57,6 +58,17 @@ func (p *Provider) DescribeTable(ctx context.Context, tableName string) (*models
|
|||
return &tableInfo, nil
|
||||
}
|
||||
|
||||
func (p *Provider) keySchemaToKeyAttributes(keySchemaElements []types.KeySchemaElement) (keyAttribute models.KeyAttribute) {
|
||||
for _, keySchema := range keySchemaElements {
|
||||
if keySchema.KeyType == types.KeyTypeHash {
|
||||
keyAttribute.PartitionKey = aws.ToString(keySchema.AttributeName)
|
||||
} else if keySchema.KeyType == types.KeyTypeRange {
|
||||
keyAttribute.SortKey = aws.ToString(keySchema.AttributeName)
|
||||
}
|
||||
}
|
||||
return keyAttribute
|
||||
}
|
||||
|
||||
func (p *Provider) PutItem(ctx context.Context, name string, item models.Item) error {
|
||||
_, err := p.client.PutItem(ctx, &dynamodb.PutItemInput{
|
||||
TableName: aws.String(name),
|
||||
|
|
@ -170,6 +182,7 @@ func (p *Provider) ScanItems(
|
|||
func (p *Provider) QueryItems(
|
||||
ctx context.Context,
|
||||
tableName string,
|
||||
indexName string,
|
||||
filterExpr *expression.Expression,
|
||||
exclusiveStartKey map[string]types.AttributeValue,
|
||||
maxItems int,
|
||||
|
|
@ -179,6 +192,9 @@ func (p *Provider) QueryItems(
|
|||
input := &dynamodb.QueryInput{
|
||||
TableName: aws.String(tableName),
|
||||
}
|
||||
if indexName != "" {
|
||||
input.IndexName = aws.String(indexName)
|
||||
}
|
||||
if filterExpr != nil {
|
||||
input.KeyConditionExpression = filterExpr.KeyCondition()
|
||||
input.FilterExpression = filterExpr.Filter()
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ type TableProvider interface {
|
|||
QueryItems(
|
||||
ctx context.Context,
|
||||
tableName string,
|
||||
indexName string,
|
||||
filterExpr *expression.Expression,
|
||||
exclusiveStartKey map[string]types.AttributeValue,
|
||||
maxItems int,
|
||||
|
|
|
|||
|
|
@ -49,6 +49,7 @@ func (s *Service) doScan(
|
|||
var (
|
||||
filterExpr *expression.Expression
|
||||
runAsQuery bool
|
||||
index string
|
||||
err error
|
||||
)
|
||||
if expr != nil {
|
||||
|
|
@ -58,6 +59,7 @@ func (s *Service) doScan(
|
|||
}
|
||||
|
||||
runAsQuery = plan.CanQuery
|
||||
index = plan.IndexName
|
||||
filterExpr = &plan.Expression
|
||||
|
||||
log.Printf("Running query over '%v'", tableInfo.Name)
|
||||
|
|
@ -69,7 +71,7 @@ func (s *Service) doScan(
|
|||
var results []models.Item
|
||||
var lastEvalKey map[string]types.AttributeValue
|
||||
if runAsQuery {
|
||||
results, lastEvalKey, err = s.provider.QueryItems(ctx, tableInfo.Name, filterExpr, exclusiveStartKey, limit)
|
||||
results, lastEvalKey, err = s.provider.QueryItems(ctx, tableInfo.Name, index, filterExpr, exclusiveStartKey, limit)
|
||||
} else {
|
||||
results, lastEvalKey, err = s.provider.ScanItems(ctx, tableInfo.Name, filterExpr, exclusiveStartKey, limit)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue