105 lines
2.4 KiB
Go
105 lines
2.4 KiB
Go
package cmdlang
|
|
|
|
import (
|
|
"errors"
|
|
"io"
|
|
)
|
|
|
|
// stream is an object which returns a collection of objects from a source.
|
|
// These are used to create pipelines
|
|
type stream interface {
|
|
// next pulls the next object from the stream. If an object is available, the result is the
|
|
// object and a nil error. If no more objects are available, error returns io.EOF.
|
|
// Otherwise, an error is returned.
|
|
next() (object, error)
|
|
}
|
|
|
|
// forEach will iterate over all the items of a stream. The iterating function can return an error, which will
|
|
// be returned as is. A stream that has consumed every item will return nil. The stream will automatically be closed.
|
|
func forEach(s stream, f func(object) error) (err error) {
|
|
defer func() {
|
|
if c, ok := s.(closableStream); ok {
|
|
c.close()
|
|
}
|
|
}()
|
|
|
|
var sv object
|
|
for sv, err = s.next(); err == nil; sv, err = s.next() {
|
|
if err := f(sv); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if !errors.Is(err, io.EOF) {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// closableStream is a stream that has opened resources that must be closed when the stream is
|
|
// consumed. The stream implementation can expect close to be called if at least one next() call is made. Otherwise
|
|
// closableStream cannot assume that close will be called (the pipe may be left unconsumed, for example).
|
|
//
|
|
// It is the job of the final iterator to call close. Any steam that consumes from another stream must
|
|
// implement this, and call close on the parent stream.
|
|
type closableStream interface {
|
|
stream
|
|
|
|
// close closes the stream
|
|
close() error
|
|
}
|
|
|
|
// asStream converts an object to a stream. If t is already a stream, it's returned as is.
|
|
// Otherwise, a singleton stream is returned.
|
|
func asStream(v object) stream {
|
|
if s, ok := v.(stream); ok {
|
|
return s
|
|
}
|
|
return &singletonStream{t: v}
|
|
}
|
|
|
|
type emptyStream struct{}
|
|
|
|
func (s emptyStream) next() (object, error) {
|
|
return nil, io.EOF
|
|
}
|
|
|
|
type singletonStream struct {
|
|
t any
|
|
consumed bool
|
|
}
|
|
|
|
func (s *singletonStream) next() (object, error) {
|
|
if s.consumed {
|
|
return nil, io.EOF
|
|
}
|
|
s.consumed = true
|
|
return s.t, nil
|
|
}
|
|
|
|
type mapFilterStream struct {
|
|
in stream
|
|
mapFn func(x object) (object, bool)
|
|
}
|
|
|
|
func (ms mapFilterStream) next() (object, error) {
|
|
for {
|
|
u, err := ms.in.next()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
t, ok := ms.mapFn(u)
|
|
if ok {
|
|
return t, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ms mapFilterStream) close() error {
|
|
closable, ok := ms.in.(closableStream)
|
|
if ok {
|
|
return closable.close()
|
|
}
|
|
return nil
|
|
}
|