diff --git a/cmdlang/builtins.go b/cmdlang/builtins.go index 46fd5ba..f98dc4c 100644 --- a/cmdlang/builtins.go +++ b/cmdlang/builtins.go @@ -25,10 +25,10 @@ func echoBuiltin(ctx context.Context, args invocationArgs) (object, error) { return asStream(line.String()), nil } -func toUpperBuiltin(ctx context.Context, args invocationArgs) (object, error) { +func toUpperBuiltin(ctx context.Context, inStream stream, args invocationArgs) (object, error) { // Handle args return mapFilterStream{ - in: args.inStream, + in: inStream, mapFn: func(x object) (object, bool) { s, ok := x.(string) if !ok { @@ -86,8 +86,8 @@ func (f *fileLinesStream) close() error { return nil } -func errorTestBuiltin(ctx context.Context, args invocationArgs) (object, error) { - return &timeBombStream{args.inStream, 2}, nil +func errorTestBuiltin(ctx context.Context, inStream stream, args invocationArgs) (object, error) { + return &timeBombStream{inStream, 2}, nil } type timeBombStream struct { @@ -104,9 +104,5 @@ func (ms *timeBombStream) next() (object, error) { } func (ms *timeBombStream) close() error { - closable, ok := ms.in.(closableStream) - if ok { - return closable.close() - } - return nil + return ms.in.close() } diff --git a/cmdlang/eval.go b/cmdlang/eval.go index 2785872..b00026e 100644 --- a/cmdlang/eval.go +++ b/cmdlang/eval.go @@ -45,10 +45,17 @@ func (e evaluator) evalCmd(ctx context.Context, ec *evalCtx, ast *astCmd) (objec return nil, err } - return cmd.invoke(ctx, invocationArgs{ - args: args, - inStream: ec.currentStream, - }) + if ec.currentStream != nil { + if si, ok := cmd.(streamInvokable); ok { + return si.invokeWithStream(ctx, ec.currentStream, invocationArgs{args: args}) + } else { + if err := ec.currentStream.close(); err != nil { + return nil, err + } + } + } + + return cmd.invoke(ctx, invocationArgs{args: args}) } func (e evaluator) evalArg(ctx context.Context, ec *evalCtx, n astCmdArg) (object, error) { diff --git a/cmdlang/inst.go b/cmdlang/inst.go index d94e3d9..88c9d1f 100644 --- a/cmdlang/inst.go +++ b/cmdlang/inst.go @@ -13,10 +13,10 @@ type Inst struct { func New() *Inst { rootEC := evalCtx{} rootEC.addCmd("echo", invokableFunc(echoBuiltin)) - rootEC.addCmd("toUpper", invokableFunc(toUpperBuiltin)) + rootEC.addCmd("toUpper", invokableStreamFunc(toUpperBuiltin)) rootEC.addCmd("cat", invokableFunc(catBuiltin)) - rootEC.addCmd("testTimebomb", invokableFunc(errorTestBuiltin)) + rootEC.addCmd("testTimebomb", invokableStreamFunc(errorTestBuiltin)) return &Inst{ rootEC: &rootEC, diff --git a/cmdlang/objs.go b/cmdlang/objs.go index 934da5a..8f372e7 100644 --- a/cmdlang/objs.go +++ b/cmdlang/objs.go @@ -17,8 +17,7 @@ func (s strObject) String() string { } type invocationArgs struct { - args []object - inStream stream + args []object } func (ia invocationArgs) expectArgn(x int) error { @@ -44,8 +43,23 @@ type invokable interface { invoke(ctx context.Context, args invocationArgs) (object, error) } +type streamInvokable interface { + invokable + invokeWithStream(context.Context, stream, invocationArgs) (object, error) +} + type invokableFunc func(ctx context.Context, args invocationArgs) (object, error) func (i invokableFunc) invoke(ctx context.Context, args invocationArgs) (object, error) { return i(ctx, args) } + +type invokableStreamFunc func(ctx context.Context, inStream stream, args invocationArgs) (object, error) + +func (i invokableStreamFunc) invoke(ctx context.Context, args invocationArgs) (object, error) { + return i(ctx, nil, args) +} + +func (i invokableStreamFunc) invokeWithStream(ctx context.Context, inStream stream, args invocationArgs) (object, error) { + return i(ctx, inStream, args) +} diff --git a/cmdlang/streams.go b/cmdlang/streams.go index e8d7637..b7f4e5c 100644 --- a/cmdlang/streams.go +++ b/cmdlang/streams.go @@ -7,21 +7,25 @@ import ( // stream is an object which returns a collection of objects from a source. // These are used to create pipelines +// +// The stream implementation can expect close to be called if at least one next() call is made. Otherwise +// closableStream cannot assume that close will be called (the pipe may be left unconsumed, for example). +// +// It is the job of the final iterator to call close. Any steam that consumes from another stream must +// implement this, and call close on the parent stream. type stream interface { // next pulls the next object from the stream. If an object is available, the result is the // object and a nil error. If no more objects are available, error returns io.EOF. // Otherwise, an error is returned. next() (object, error) + + close() error } // forEach will iterate over all the items of a stream. The iterating function can return an error, which will // be returned as is. A stream that has consumed every item will return nil. The stream will automatically be closed. func forEach(s stream, f func(object) error) (err error) { - defer func() { - if c, ok := s.(closableStream); ok { - c.close() - } - }() + defer s.close() var sv object for sv, err = s.next(); err == nil; sv, err = s.next() { @@ -35,19 +39,6 @@ func forEach(s stream, f func(object) error) (err error) { return nil } -// closableStream is a stream that has opened resources that must be closed when the stream is -// consumed. The stream implementation can expect close to be called if at least one next() call is made. Otherwise -// closableStream cannot assume that close will be called (the pipe may be left unconsumed, for example). -// -// It is the job of the final iterator to call close. Any steam that consumes from another stream must -// implement this, and call close on the parent stream. -type closableStream interface { - stream - - // close closes the stream - close() error -} - // asStream converts an object to a stream. If t is already a stream, it's returned as is. // Otherwise, a singleton stream is returned. func asStream(v object) stream { @@ -63,6 +54,8 @@ func (s emptyStream) next() (object, error) { return nil, io.EOF } +func (s emptyStream) close() error { return nil } + type singletonStream struct { t any consumed bool @@ -76,6 +69,8 @@ func (s *singletonStream) next() (object, error) { return s.t, nil } +func (s *singletonStream) close() error { return nil } + type mapFilterStream struct { in stream mapFn func(x object) (object, bool) @@ -96,9 +91,5 @@ func (ms mapFilterStream) next() (object, error) { } func (ms mapFilterStream) close() error { - closable, ok := ms.in.(closableStream) - if ok { - return closable.close() - } - return nil + return ms.in.close() }