This commit is contained in:
parent
c11560e6cd
commit
08a3c162a2
12 changed files with 270 additions and 32 deletions
96
internal/common/ui/commandctrl/cmdpacks/modasync.go
Normal file
96
internal/common/ui/commandctrl/cmdpacks/modasync.go
Normal file
|
|
@ -0,0 +1,96 @@
|
|||
package cmdpacks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
"lmika.dev/cmd/dynamo-browse/internal/common/ui/commandctrl"
|
||||
"lmika.dev/cmd/dynamo-browse/internal/dynamo-browse/controllers"
|
||||
"lmika.dev/cmd/dynamo-browse/internal/dynamo-browse/services/tables"
|
||||
"ucl.lmika.dev/ucl"
|
||||
)
|
||||
|
||||
type asyncModule struct {
|
||||
tableService *tables.Service
|
||||
state *controllers.State
|
||||
}
|
||||
|
||||
func (m asyncModule) asyncDo(ctx context.Context, args ucl.CallArgs) (any, error) {
|
||||
var block ucl.Invokable
|
||||
if err := args.Bind(&block); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, commandctrl.ScheduleTask(ctx, func(ctx context.Context) error {
|
||||
_, err := block.Invoke(ctx)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (m asyncModule) asyncIn(ctx context.Context, args ucl.CallArgs) (any, error) {
|
||||
var (
|
||||
duration int
|
||||
block ucl.Invokable
|
||||
)
|
||||
if err := args.Bind(&duration, &block); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err := commandctrl.CronScheduler(ctx).NewJob(
|
||||
gocron.OneTimeJob(
|
||||
gocron.OneTimeJobStartDateTime(time.Now().Add(time.Duration(duration)*time.Second)),
|
||||
),
|
||||
gocron.NewTask(func(ctx context.Context) {
|
||||
commandctrl.ScheduleTask(ctx, func(ctx context.Context) error {
|
||||
_, err := block.Invoke(ctx)
|
||||
return err
|
||||
})
|
||||
}),
|
||||
gocron.WithContext(ctx),
|
||||
)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (m asyncModule) asyncQuery(ctx context.Context, args ucl.CallArgs) (any, error) {
|
||||
var (
|
||||
block ucl.Invokable
|
||||
)
|
||||
|
||||
args, q, tableInfo, err := parseQuery(ctx, args, m.state.ResultSet(), m.tableService, 1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := args.Bind(&block); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, commandctrl.ScheduleAuxTask(ctx, "query: "+q.String(), func(ctx context.Context) error {
|
||||
newResultSet, err := m.tableService.ScanOrQuery(context.Background(), tableInfo, q, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return commandctrl.ScheduleTask(ctx, func(ctx context.Context) error {
|
||||
_, err := block.Invoke(ctx, newResultSetProxy(newResultSet))
|
||||
return err
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func moduleAsync(tableService *tables.Service, state *controllers.State) ucl.Module {
|
||||
m := asyncModule{
|
||||
state: state,
|
||||
tableService: tableService,
|
||||
}
|
||||
|
||||
return ucl.Module{
|
||||
Name: "async",
|
||||
Builtins: map[string]ucl.BuiltinHandler{
|
||||
"do": m.asyncDo,
|
||||
"in": m.asyncIn,
|
||||
"query": m.asyncQuery,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
@ -2,6 +2,7 @@ package cmdpacks
|
|||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
||||
"ucl.lmika.dev/ucl"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -2,6 +2,8 @@ package cmdpacks
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
||||
"github.com/pkg/errors"
|
||||
"lmika.dev/cmd/dynamo-browse/internal/common/ui/commandctrl"
|
||||
|
|
@ -9,7 +11,6 @@ import (
|
|||
"lmika.dev/cmd/dynamo-browse/internal/dynamo-browse/models"
|
||||
"lmika.dev/cmd/dynamo-browse/internal/dynamo-browse/models/queryexpr"
|
||||
"lmika.dev/cmd/dynamo-browse/internal/dynamo-browse/services/tables"
|
||||
"time"
|
||||
"ucl.lmika.dev/repl"
|
||||
"ucl.lmika.dev/ucl"
|
||||
)
|
||||
|
|
@ -71,21 +72,22 @@ func parseQuery(
|
|||
args ucl.CallArgs,
|
||||
currentRS *models.ResultSet,
|
||||
tablesService *tables.Service,
|
||||
) (*queryexpr.QueryExpr, *models.TableInfo, error) {
|
||||
extraArgs int,
|
||||
) (ucl.CallArgs, *queryexpr.QueryExpr, *models.TableInfo, error) {
|
||||
var expr string
|
||||
if err := args.Bind(&expr); err != nil {
|
||||
return nil, nil, err
|
||||
return args, nil, nil, err
|
||||
}
|
||||
|
||||
q, err := queryexpr.Parse(expr)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return args, nil, nil, err
|
||||
}
|
||||
|
||||
if args.NArgs() > 0 {
|
||||
if args.NArgs() > extraArgs {
|
||||
var queryArgs ucl.Hashable
|
||||
if err := args.Bind(&queryArgs); err != nil {
|
||||
return nil, nil, err
|
||||
return args, nil, nil, err
|
||||
}
|
||||
|
||||
queryNames := map[string]string{}
|
||||
|
|
@ -97,12 +99,15 @@ func parseQuery(
|
|||
|
||||
queryNames[k] = v.String()
|
||||
|
||||
switch v.(type) {
|
||||
switch t := v.(type) {
|
||||
case ucl.StringObject:
|
||||
queryValues[k] = &types.AttributeValueMemberS{Value: v.String()}
|
||||
case ucl.IntObject:
|
||||
queryValues[k] = &types.AttributeValueMemberN{Value: v.String()}
|
||||
// TODO: other types
|
||||
case ucl.BoolObject:
|
||||
queryValues[k] = &types.AttributeValueMemberBOOL{Value: t.Truthy()}
|
||||
case attributeValueProxy:
|
||||
queryValues[k] = t.value
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
|
@ -114,24 +119,24 @@ func parseQuery(
|
|||
if args.HasSwitch("table") {
|
||||
var tblName string
|
||||
if err := args.BindSwitch("table", &tblName); err != nil {
|
||||
return nil, nil, err
|
||||
return args, nil, nil, err
|
||||
}
|
||||
|
||||
tableInfo, err = tablesService.Describe(ctx, tblName)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return args, nil, nil, err
|
||||
}
|
||||
} else if currentRS != nil && currentRS.TableInfo != nil {
|
||||
tableInfo = currentRS.TableInfo
|
||||
} else {
|
||||
return nil, nil, errors.New("no table specified")
|
||||
return args, nil, nil, errors.New("no table specified")
|
||||
}
|
||||
|
||||
return q, tableInfo, nil
|
||||
return args, q, tableInfo, nil
|
||||
}
|
||||
|
||||
func (rs *rsModule) rsQuery(ctx context.Context, args ucl.CallArgs) (any, error) {
|
||||
q, tableInfo, err := parseQuery(ctx, args, rs.state.ResultSet(), rs.tableService)
|
||||
_, q, tableInfo, err := parseQuery(ctx, args, rs.state.ResultSet(), rs.tableService, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -214,6 +214,14 @@ func TestModRS_First(t *testing.T) {
|
|||
rs = rs:query 'pk="zzz"' -table service-test-data
|
||||
assert (eq $rs.First ()) "expected First to be nil"
|
||||
`,
|
||||
}, {
|
||||
descr: "returns the first item using placeholders",
|
||||
cmd: `
|
||||
rs = rs:query 'pk=$v and sk=$u' [v:"abc" u:"222"] -table service-test-data
|
||||
assert (eq $rs.First.pk "abc") "expected First.pk == abc"
|
||||
assert (eq $rs.First.sk "222") "expected First.sk == 222"
|
||||
assert (eq $rs.First.beta 1231) "expected First.beta == 1231"
|
||||
`,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
|
|
|
|||
|
|
@ -175,7 +175,7 @@ func (m *uiModule) uiBind(ctx context.Context, args ucl.CallArgs) (any, error) {
|
|||
}
|
||||
|
||||
func (m *uiModule) uiQuery(ctx context.Context, args ucl.CallArgs) (any, error) {
|
||||
q, tableInfo, err := parseQuery(ctx, args, m.state.ResultSet(), m.tableService)
|
||||
_, q, tableInfo, err := parseQuery(ctx, args, m.state.ResultSet(), m.tableService, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -210,6 +210,7 @@ func (m *uiModule) uiSetItemAnnotator(ctx context.Context, args ucl.CallArgs) (a
|
|||
}
|
||||
return fmt.Sprint(v)
|
||||
}))
|
||||
commandctrl.QueueRefresh(ctx)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -404,6 +404,7 @@ func (sc StandardCommands) InstOptions() []ucl.InstOption {
|
|||
ucl.WithModule(modulePB(sc.PBProvider)),
|
||||
ucl.WithModule(moduleOpt(sc.SettingsController)),
|
||||
ucl.WithModule(moduleAttrValue()),
|
||||
ucl.WithModule(moduleAsync(sc.TableService, sc.State)),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -453,4 +454,30 @@ ui:bind "view.toggle-marked-items" "M" {
|
|||
mark all
|
||||
}
|
||||
}
|
||||
|
||||
proc _prep_officeCount {
|
||||
openedOffices = 0
|
||||
closedOffices = 0
|
||||
|
||||
async:query 'officeOpened=$v' [v:(av:true)] { |rs|
|
||||
openedOffices = len $rs
|
||||
async:query 'officeOpened=$u' [u:(av:false)] { |rs|
|
||||
closedOffices = len $rs
|
||||
|
||||
ui:set-item-annotator { |rs item path|
|
||||
if (eq $path.(-1) "officeOpened") {
|
||||
if $item.officeOpened {
|
||||
"Count = ${openedOffices}"
|
||||
} else {
|
||||
"Count = ${closedOffices}"
|
||||
}
|
||||
} else {
|
||||
""
|
||||
}
|
||||
}
|
||||
} -table business-addresses
|
||||
} -table business-addresses
|
||||
}
|
||||
|
||||
_prep_officeCount
|
||||
`
|
||||
|
|
|
|||
|
|
@ -4,12 +4,14 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
tea "github.com/charmbracelet/bubbletea"
|
||||
"github.com/pkg/errors"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
tea "github.com/charmbracelet/bubbletea"
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
"github.com/pkg/errors"
|
||||
"ucl.lmika.dev/ucl"
|
||||
"ucl.lmika.dev/ucl/builtins"
|
||||
|
||||
|
|
@ -17,12 +19,22 @@ import (
|
|||
"lmika.dev/cmd/dynamo-browse/internal/common/ui/events"
|
||||
)
|
||||
|
||||
const commandsCategory = "commands"
|
||||
const (
|
||||
commandsCategory = "commands"
|
||||
pendingTaskBuffer = 50
|
||||
pendingAuxTaskBuffer = 50
|
||||
auxWorkers = 4
|
||||
)
|
||||
|
||||
type cmdMessage struct {
|
||||
cmd string
|
||||
}
|
||||
|
||||
type pendingTask struct {
|
||||
descr string
|
||||
task func(ctx context.Context) error
|
||||
}
|
||||
|
||||
type CommandController struct {
|
||||
uclInst *ucl.Inst
|
||||
historyProvider IterProvider
|
||||
|
|
@ -30,19 +42,29 @@ type CommandController struct {
|
|||
lookupExtensions []CommandLookupExtension
|
||||
completionProvider CommandCompletionProvider
|
||||
uiStateProvider UIStateProvider
|
||||
cronScheduler gocron.Scheduler
|
||||
cmdChan chan cmdMessage
|
||||
pendingTaskChan chan pendingTask
|
||||
pendingAuxTaskChan chan pendingTask
|
||||
msgChan chan tea.Msg
|
||||
interactive bool
|
||||
}
|
||||
|
||||
func NewCommandController(historyProvider IterProvider, pkgs ...CommandPack) (*CommandController, error) {
|
||||
sched, err := gocron.NewScheduler()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cc := &CommandController{
|
||||
historyProvider: historyProvider,
|
||||
commandList: nil,
|
||||
lookupExtensions: nil,
|
||||
cmdChan: make(chan cmdMessage),
|
||||
msgChan: make(chan tea.Msg),
|
||||
interactive: true,
|
||||
historyProvider: historyProvider,
|
||||
commandList: nil,
|
||||
lookupExtensions: nil,
|
||||
cronScheduler: sched,
|
||||
cmdChan: make(chan cmdMessage),
|
||||
pendingTaskChan: make(chan pendingTask, pendingTaskBuffer),
|
||||
pendingAuxTaskChan: make(chan pendingTask, pendingAuxTaskBuffer),
|
||||
msgChan: make(chan tea.Msg),
|
||||
interactive: true,
|
||||
}
|
||||
|
||||
options := []ucl.InstOption{
|
||||
|
|
@ -75,6 +97,8 @@ func NewCommandController(historyProvider IterProvider, pkgs ...CommandPack) (*C
|
|||
}
|
||||
|
||||
go cc.cmdLooper()
|
||||
go cc.auxCmdLooper()
|
||||
sched.Start()
|
||||
|
||||
return cc, nil
|
||||
}
|
||||
|
|
@ -172,12 +196,13 @@ func (c *CommandController) Invoke(invokable ucl.Invokable, args []any) (msg tea
|
|||
}
|
||||
|
||||
func (c *CommandController) cmdLooper() {
|
||||
execCtx := execContext{ctrl: c}
|
||||
ctx := context.WithValue(context.Background(), commandCtlKey, &execCtx)
|
||||
|
||||
ctx := context.Background()
|
||||
for {
|
||||
select {
|
||||
case cmdChan := <-c.cmdChan:
|
||||
execCtx := execContext{ctrl: c}
|
||||
ctx := context.WithValue(ctx, commandCtlKey, &execCtx)
|
||||
|
||||
res, err := c.ExecuteAndWait(ctx, cmdChan.cmd)
|
||||
if err != nil {
|
||||
c.postMessage(events.Error(err))
|
||||
|
|
@ -187,6 +212,16 @@ func (c *CommandController) cmdLooper() {
|
|||
if execCtx.requestRefresh {
|
||||
c.postMessage(events.ResultSetUpdated{})
|
||||
}
|
||||
case task := <-c.pendingTaskChan:
|
||||
execCtx := execContext{ctrl: c}
|
||||
ctx := context.WithValue(ctx, commandCtlKey, &execCtx)
|
||||
|
||||
if err := task.task(ctx); err != nil {
|
||||
c.postMessage(events.Error(err))
|
||||
}
|
||||
if execCtx.requestRefresh {
|
||||
c.postMessage(events.ResultSetUpdated{})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -305,15 +340,13 @@ func (c *CommandController) cmdInvoker(ctx context.Context, name string, args uc
|
|||
}
|
||||
|
||||
func (c *CommandController) printLine(s string) {
|
||||
log.Println(s)
|
||||
if c.msgChan == nil || !c.interactive {
|
||||
log.Println(s)
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case c.msgChan <- events.StatusMsg(s):
|
||||
default:
|
||||
log.Println(s)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -325,6 +358,21 @@ func (c *CommandController) postMessage(msg tea.Msg) {
|
|||
c.msgChan <- msg
|
||||
}
|
||||
|
||||
func (c *CommandController) auxCmdLooper() {
|
||||
ctx := context.WithValue(context.Background(), commandCtlKey, &execContext{ctrl: c})
|
||||
|
||||
for i := 0; i < auxWorkers; i++ {
|
||||
go func() {
|
||||
for auxTask := range c.pendingAuxTaskChan {
|
||||
log.Printf("running aux task: %v", auxTask.descr)
|
||||
if err := auxTask.task(ctx); err != nil {
|
||||
log.Printf("aux task error: %v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
type teaMsgWrapper struct {
|
||||
msg tea.Msg
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,10 @@ package commandctrl
|
|||
|
||||
import (
|
||||
"context"
|
||||
|
||||
tea "github.com/charmbracelet/bubbletea"
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
"github.com/pkg/errors"
|
||||
"ucl.lmika.dev/ucl"
|
||||
)
|
||||
|
||||
|
|
@ -57,6 +60,40 @@ func QueueRefresh(ctx context.Context) {
|
|||
cmdCtl.requestRefresh = true
|
||||
}
|
||||
|
||||
func CronScheduler(ctx context.Context) gocron.Scheduler {
|
||||
cmdCtl, ok := ctx.Value(commandCtlKey).(*execContext)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return cmdCtl.ctrl.cronScheduler
|
||||
}
|
||||
|
||||
func ScheduleTask(ctx context.Context, task func(ctx context.Context) error) error {
|
||||
cmdCtl, ok := ctx.Value(commandCtlKey).(*execContext)
|
||||
if !ok {
|
||||
return errors.New("no command controller")
|
||||
}
|
||||
select {
|
||||
case cmdCtl.ctrl.pendingTaskChan <- pendingTask{task: task}:
|
||||
return nil
|
||||
default:
|
||||
return errors.New("task queue is full")
|
||||
}
|
||||
}
|
||||
|
||||
func ScheduleAuxTask(ctx context.Context, descr string, task func(ctx context.Context) error) error {
|
||||
cmdCtl, ok := ctx.Value(commandCtlKey).(*execContext)
|
||||
if !ok {
|
||||
return errors.New("no command controller")
|
||||
}
|
||||
select {
|
||||
case cmdCtl.ctrl.pendingAuxTaskChan <- pendingTask{descr: descr, task: task}:
|
||||
return nil
|
||||
default:
|
||||
return errors.New("aux task queue is full")
|
||||
}
|
||||
}
|
||||
|
||||
type Invoker interface {
|
||||
Invoke(invokable ucl.Invokable, args []any) tea.Msg
|
||||
Inst() *ucl.Inst
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue