package events type Bus struct { topics map[string]*topic } func New() *Bus { return &Bus{topics: make(map[string]*topic)} } func (d *Bus) On(event string, receiver interface{}) *Subscription { // TODO: make thread safe t, hasTopic := d.topics[event] if !hasTopic { t = &topic{} d.topics[event] = t } sub, err := newSubscriptionFromFunc(receiver) if err != nil { panic(err) } t.addSubscriber(sub) return sub } func (d *Bus) Fire(event string, args ...interface{}) { _ = d.TryFire(event, args...) } func (d *Bus) TryFire(event string, args ...interface{}) error { // TODO: make thead safe topic, hasTopic := d.topics[event] if !hasTopic { return nil } preparedArgs := prepareArgs(args) var errs []error var lastSub *Subscription = nil for sub := topic.head; sub != nil; sub = sub.next { // Remove unsubscribers if sub.remove { if lastSub == nil { topic.head = sub.next } else { lastSub.next = sub.next } if topic.tail == sub { topic.tail = lastSub } continue } lastSub = sub err := sub.handler.invoke(preparedArgs) if err != nil { errs = append(errs, err) } } switch len(errs) { case 0: return nil case 1: return errs[0] } return HandlerError{topic: event, errs: errs} } type topic struct { head *Subscription tail *Subscription } func (t *topic) addSubscriber(sub *Subscription) { if t.head == nil { t.head = sub } if t.tail != nil { t.tail.next = sub } t.tail = sub } type Subscription struct { remove bool handler receiptHandler next *Subscription } func newSubscriptionFromFunc(fn interface{}) (*Subscription, error) { handler, err := newReceiptHandler(fn) if err != nil { return nil, err } return &Subscription{handler: handler, next: nil}, nil } func (s *Subscription) Unsubscribe() { s.remove = true }