ucl/cmdlang/streams.go

153 lines
3.2 KiB
Go
Raw Normal View History

package cmdlang
import (
"errors"
2024-04-11 12:05:05 +00:00
"fmt"
"io"
)
// stream is an object which returns a collection of objects from a source.
// These are used to create pipelines
2024-04-11 10:47:59 +00:00
//
// The stream implementation can expect close to be called if at least one next() call is made. Otherwise
// closableStream cannot assume that close will be called (the pipe may be left unconsumed, for example).
//
// It is the job of the final iterator to call close. Any steam that consumes from another stream must
// implement this, and call close on the parent stream.
type stream interface {
2024-04-11 12:05:05 +00:00
object
// next pulls the next object from the stream. If an object is available, the result is the
// object and a nil error. If no more objects are available, error returns io.EOF.
// Otherwise, an error is returned.
next() (object, error)
2024-04-11 10:47:59 +00:00
close() error
}
// forEach will iterate over all the items of a stream. The iterating function can return an error, which will
// be returned as is. A stream that has consumed every item will return nil. The stream will automatically be closed.
2024-04-11 12:05:05 +00:00
func forEach(s stream, f func(object, int) error) (err error) {
2024-04-11 10:47:59 +00:00
defer s.close()
var sv object
2024-04-11 12:05:05 +00:00
i := 0
for sv, err = s.next(); err == nil; sv, err = s.next() {
2024-04-11 12:05:05 +00:00
if err := f(sv, i); err != nil {
return err
}
2024-04-11 12:05:05 +00:00
i += 1
}
if !errors.Is(err, io.EOF) {
return err
}
return nil
}
// asStream converts an object to a stream. If t is already a stream, it's returned as is.
// Otherwise, a singleton stream is returned.
func asStream(v object) stream {
switch s := v.(type) {
case stream:
return s
case listObject:
return &listIterStream{list: s}
}
return &singletonStream{t: v}
}
type emptyStream struct{}
2024-04-11 12:05:05 +00:00
func (s *emptyStream) String() string {
return "(nil)"
}
func (s emptyStream) next() (object, error) {
return nil, io.EOF
}
2024-04-11 10:47:59 +00:00
func (s emptyStream) close() error { return nil }
type singletonStream struct {
2024-04-11 12:05:05 +00:00
t object
consumed bool
}
2024-04-11 12:05:05 +00:00
func (s *singletonStream) String() string {
return s.t.String()
}
2024-04-13 11:46:50 +00:00
func (s *singletonStream) Truthy() bool {
return !s.consumed
}
func (s *singletonStream) next() (object, error) {
if s.consumed {
return nil, io.EOF
}
s.consumed = true
return s.t, nil
}
2024-04-11 10:47:59 +00:00
func (s *singletonStream) close() error { return nil }
2024-04-11 12:05:05 +00:00
type listIterStream struct {
list []object
cusr int
}
func (s *listIterStream) String() string {
return fmt.Sprintf("listIterStream{list: %v}", s.list)
}
2024-04-13 11:46:50 +00:00
func (s *listIterStream) Truthy() bool {
return len(s.list) > s.cusr
}
2024-04-11 12:05:05 +00:00
func (s *listIterStream) next() (o object, err error) {
if s.cusr >= len(s.list) {
return nil, io.EOF
}
o = s.list[s.cusr]
s.cusr += 1
return o, nil
}
func (s *listIterStream) close() error { return nil }
type mapFilterStream struct {
in stream
2024-04-18 10:53:25 +00:00
mapFn func(x object) (object, bool, error)
}
2024-04-11 12:05:05 +00:00
func (ms mapFilterStream) String() string {
return fmt.Sprintf("mapFilterStream{in: %v}", ms.in)
}
2024-04-13 11:46:50 +00:00
func (ms mapFilterStream) Truthy() bool {
return true // ???
}
func (ms mapFilterStream) next() (object, error) {
for {
u, err := ms.in.next()
if err != nil {
return nil, err
}
2024-04-18 10:53:25 +00:00
t, ok, err := ms.mapFn(u)
if err != nil {
return nil, err
} else if ok {
return t, nil
}
}
}
func (ms mapFilterStream) close() error {
2024-04-11 10:47:59 +00:00
return ms.in.close()
}