package cmdlang import ( "errors" "fmt" "io" ) // stream is an object which returns a collection of objects from a source. // These are used to create pipelines // // The stream implementation can expect close to be called if at least one next() call is made. Otherwise // closableStream cannot assume that close will be called (the pipe may be left unconsumed, for example). // // It is the job of the final iterator to call close. Any steam that consumes from another stream must // implement this, and call close on the parent stream. type stream interface { object // next pulls the next object from the stream. If an object is available, the result is the // object and a nil error. If no more objects are available, error returns io.EOF. // Otherwise, an error is returned. next() (object, error) close() error } // forEach will iterate over all the items of a stream. The iterating function can return an error, which will // be returned as is. A stream that has consumed every item will return nil. The stream will automatically be closed. func forEach(s stream, f func(object, int) error) (err error) { defer s.close() var sv object i := 0 for sv, err = s.next(); err == nil; sv, err = s.next() { if err := f(sv, i); err != nil { return err } i += 1 } if !errors.Is(err, io.EOF) { return err } return nil } // asStream converts an object to a stream. If t is already a stream, it's returned as is. // Otherwise, a singleton stream is returned. func asStream(v object) stream { if s, ok := v.(stream); ok { return s } return &singletonStream{t: v} } type emptyStream struct{} func (s *emptyStream) String() string { return "(nil)" } func (s emptyStream) next() (object, error) { return nil, io.EOF } func (s emptyStream) close() error { return nil } type singletonStream struct { t object consumed bool } func (s *singletonStream) String() string { return s.t.String() } func (s *singletonStream) next() (object, error) { if s.consumed { return nil, io.EOF } s.consumed = true return s.t, nil } func (s *singletonStream) close() error { return nil } type listIterStream struct { list []object cusr int } func (s *listIterStream) String() string { return fmt.Sprintf("listIterStream{list: %v}", s.list) } func (s *listIterStream) next() (o object, err error) { if s.cusr >= len(s.list) { return nil, io.EOF } o = s.list[s.cusr] s.cusr += 1 return o, nil } func (s *listIterStream) close() error { return nil } type mapFilterStream struct { in stream mapFn func(x object) (object, bool) } func (ms mapFilterStream) String() string { return fmt.Sprintf("mapFilterStream{in: %v}", ms.in) } func (ms mapFilterStream) next() (object, error) { for { u, err := ms.in.next() if err != nil { return nil, err } t, ok := ms.mapFn(u) if ok { return t, nil } } } func (ms mapFilterStream) close() error { return ms.in.close() }