Re-arranged invoke with stream

This commit is contained in:
Leon Mika 2024-04-11 20:47:59 +10:00
parent 3b320206e9
commit 1506692198
5 changed files with 48 additions and 40 deletions

View file

@ -25,10 +25,10 @@ func echoBuiltin(ctx context.Context, args invocationArgs) (object, error) {
return asStream(line.String()), nil return asStream(line.String()), nil
} }
func toUpperBuiltin(ctx context.Context, args invocationArgs) (object, error) { func toUpperBuiltin(ctx context.Context, inStream stream, args invocationArgs) (object, error) {
// Handle args // Handle args
return mapFilterStream{ return mapFilterStream{
in: args.inStream, in: inStream,
mapFn: func(x object) (object, bool) { mapFn: func(x object) (object, bool) {
s, ok := x.(string) s, ok := x.(string)
if !ok { if !ok {
@ -86,8 +86,8 @@ func (f *fileLinesStream) close() error {
return nil return nil
} }
func errorTestBuiltin(ctx context.Context, args invocationArgs) (object, error) { func errorTestBuiltin(ctx context.Context, inStream stream, args invocationArgs) (object, error) {
return &timeBombStream{args.inStream, 2}, nil return &timeBombStream{inStream, 2}, nil
} }
type timeBombStream struct { type timeBombStream struct {
@ -104,9 +104,5 @@ func (ms *timeBombStream) next() (object, error) {
} }
func (ms *timeBombStream) close() error { func (ms *timeBombStream) close() error {
closable, ok := ms.in.(closableStream) return ms.in.close()
if ok {
return closable.close()
}
return nil
} }

View file

@ -45,10 +45,17 @@ func (e evaluator) evalCmd(ctx context.Context, ec *evalCtx, ast *astCmd) (objec
return nil, err return nil, err
} }
return cmd.invoke(ctx, invocationArgs{ if ec.currentStream != nil {
args: args, if si, ok := cmd.(streamInvokable); ok {
inStream: ec.currentStream, return si.invokeWithStream(ctx, ec.currentStream, invocationArgs{args: args})
}) } else {
if err := ec.currentStream.close(); err != nil {
return nil, err
}
}
}
return cmd.invoke(ctx, invocationArgs{args: args})
} }
func (e evaluator) evalArg(ctx context.Context, ec *evalCtx, n astCmdArg) (object, error) { func (e evaluator) evalArg(ctx context.Context, ec *evalCtx, n astCmdArg) (object, error) {

View file

@ -13,10 +13,10 @@ type Inst struct {
func New() *Inst { func New() *Inst {
rootEC := evalCtx{} rootEC := evalCtx{}
rootEC.addCmd("echo", invokableFunc(echoBuiltin)) rootEC.addCmd("echo", invokableFunc(echoBuiltin))
rootEC.addCmd("toUpper", invokableFunc(toUpperBuiltin)) rootEC.addCmd("toUpper", invokableStreamFunc(toUpperBuiltin))
rootEC.addCmd("cat", invokableFunc(catBuiltin)) rootEC.addCmd("cat", invokableFunc(catBuiltin))
rootEC.addCmd("testTimebomb", invokableFunc(errorTestBuiltin)) rootEC.addCmd("testTimebomb", invokableStreamFunc(errorTestBuiltin))
return &Inst{ return &Inst{
rootEC: &rootEC, rootEC: &rootEC,

View file

@ -18,7 +18,6 @@ func (s strObject) String() string {
type invocationArgs struct { type invocationArgs struct {
args []object args []object
inStream stream
} }
func (ia invocationArgs) expectArgn(x int) error { func (ia invocationArgs) expectArgn(x int) error {
@ -44,8 +43,23 @@ type invokable interface {
invoke(ctx context.Context, args invocationArgs) (object, error) invoke(ctx context.Context, args invocationArgs) (object, error)
} }
type streamInvokable interface {
invokable
invokeWithStream(context.Context, stream, invocationArgs) (object, error)
}
type invokableFunc func(ctx context.Context, args invocationArgs) (object, error) type invokableFunc func(ctx context.Context, args invocationArgs) (object, error)
func (i invokableFunc) invoke(ctx context.Context, args invocationArgs) (object, error) { func (i invokableFunc) invoke(ctx context.Context, args invocationArgs) (object, error) {
return i(ctx, args) return i(ctx, args)
} }
type invokableStreamFunc func(ctx context.Context, inStream stream, args invocationArgs) (object, error)
func (i invokableStreamFunc) invoke(ctx context.Context, args invocationArgs) (object, error) {
return i(ctx, nil, args)
}
func (i invokableStreamFunc) invokeWithStream(ctx context.Context, inStream stream, args invocationArgs) (object, error) {
return i(ctx, inStream, args)
}

View file

@ -7,21 +7,25 @@ import (
// stream is an object which returns a collection of objects from a source. // stream is an object which returns a collection of objects from a source.
// These are used to create pipelines // These are used to create pipelines
//
// The stream implementation can expect close to be called if at least one next() call is made. Otherwise
// closableStream cannot assume that close will be called (the pipe may be left unconsumed, for example).
//
// It is the job of the final iterator to call close. Any steam that consumes from another stream must
// implement this, and call close on the parent stream.
type stream interface { type stream interface {
// next pulls the next object from the stream. If an object is available, the result is the // next pulls the next object from the stream. If an object is available, the result is the
// object and a nil error. If no more objects are available, error returns io.EOF. // object and a nil error. If no more objects are available, error returns io.EOF.
// Otherwise, an error is returned. // Otherwise, an error is returned.
next() (object, error) next() (object, error)
close() error
} }
// forEach will iterate over all the items of a stream. The iterating function can return an error, which will // forEach will iterate over all the items of a stream. The iterating function can return an error, which will
// be returned as is. A stream that has consumed every item will return nil. The stream will automatically be closed. // be returned as is. A stream that has consumed every item will return nil. The stream will automatically be closed.
func forEach(s stream, f func(object) error) (err error) { func forEach(s stream, f func(object) error) (err error) {
defer func() { defer s.close()
if c, ok := s.(closableStream); ok {
c.close()
}
}()
var sv object var sv object
for sv, err = s.next(); err == nil; sv, err = s.next() { for sv, err = s.next(); err == nil; sv, err = s.next() {
@ -35,19 +39,6 @@ func forEach(s stream, f func(object) error) (err error) {
return nil return nil
} }
// closableStream is a stream that has opened resources that must be closed when the stream is
// consumed. The stream implementation can expect close to be called if at least one next() call is made. Otherwise
// closableStream cannot assume that close will be called (the pipe may be left unconsumed, for example).
//
// It is the job of the final iterator to call close. Any steam that consumes from another stream must
// implement this, and call close on the parent stream.
type closableStream interface {
stream
// close closes the stream
close() error
}
// asStream converts an object to a stream. If t is already a stream, it's returned as is. // asStream converts an object to a stream. If t is already a stream, it's returned as is.
// Otherwise, a singleton stream is returned. // Otherwise, a singleton stream is returned.
func asStream(v object) stream { func asStream(v object) stream {
@ -63,6 +54,8 @@ func (s emptyStream) next() (object, error) {
return nil, io.EOF return nil, io.EOF
} }
func (s emptyStream) close() error { return nil }
type singletonStream struct { type singletonStream struct {
t any t any
consumed bool consumed bool
@ -76,6 +69,8 @@ func (s *singletonStream) next() (object, error) {
return s.t, nil return s.t, nil
} }
func (s *singletonStream) close() error { return nil }
type mapFilterStream struct { type mapFilterStream struct {
in stream in stream
mapFn func(x object) (object, bool) mapFn func(x object) (object, bool)
@ -96,9 +91,5 @@ func (ms mapFilterStream) next() (object, error) {
} }
func (ms mapFilterStream) close() error { func (ms mapFilterStream) close() error {
closable, ok := ms.in.(closableStream) return ms.in.close()
if ok {
return closable.close()
}
return nil
} }