119 lines
2.1 KiB
Go
119 lines
2.1 KiB
Go
package events
|
|
|
|
import "sync"
|
|
|
|
type Bus struct {
|
|
topics map[string]*topic
|
|
}
|
|
|
|
func New() *Bus {
|
|
return &Bus{topics: make(map[string]*topic)}
|
|
}
|
|
|
|
func (d *Bus) On(event string, receiver interface{}) *Subscription {
|
|
// TODO: make thread safe
|
|
t, hasTopic := d.topics[event]
|
|
if !hasTopic {
|
|
t = &topic{mutex: sync.Mutex{}, head: nil, tail: nil}
|
|
d.topics[event] = t
|
|
}
|
|
|
|
sub, err := newSubscriptionFromFunc(receiver)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
t.addSubscriber(sub)
|
|
return sub
|
|
}
|
|
|
|
func (d *Bus) Fire(event string, args ...interface{}) {
|
|
_ = d.TryFire(event, args...)
|
|
}
|
|
|
|
func (d *Bus) TryFire(event string, args ...interface{}) error {
|
|
// TODO: make thead safe
|
|
topic, hasTopic := d.topics[event]
|
|
if !hasTopic {
|
|
return nil
|
|
}
|
|
|
|
preparedArgs := prepareArgs(args)
|
|
|
|
topic.mutex.Lock()
|
|
var head = topic.head
|
|
topic.mutex.Unlock()
|
|
|
|
var errs []error
|
|
|
|
var lastSub *Subscription = nil
|
|
for sub := head; sub != nil; sub = sub.next {
|
|
// Remove unsubscribers
|
|
if sub.remove {
|
|
topic.mutex.Lock()
|
|
if lastSub == nil {
|
|
topic.head = sub.next
|
|
} else {
|
|
lastSub.next = sub.next
|
|
}
|
|
if topic.tail == sub {
|
|
topic.tail = lastSub
|
|
}
|
|
topic.mutex.Unlock()
|
|
continue
|
|
}
|
|
lastSub = sub
|
|
|
|
err := sub.handler.invoke(preparedArgs)
|
|
if err != nil {
|
|
errs = append(errs, err)
|
|
}
|
|
}
|
|
|
|
switch len(errs) {
|
|
case 0:
|
|
return nil
|
|
case 1:
|
|
return errs[0]
|
|
}
|
|
return HandlerError{topic: event, errs: errs}
|
|
}
|
|
|
|
type topic struct {
|
|
// mutex protects writes to the head and tail
|
|
mutex sync.Mutex
|
|
head *Subscription
|
|
tail *Subscription
|
|
}
|
|
|
|
func (t *topic) addSubscriber(sub *Subscription) {
|
|
t.mutex.Lock()
|
|
defer t.mutex.Unlock()
|
|
|
|
if t.head == nil {
|
|
t.head = sub
|
|
}
|
|
if t.tail != nil {
|
|
t.tail.next = sub
|
|
}
|
|
t.tail = sub
|
|
}
|
|
|
|
type Subscription struct {
|
|
remove bool
|
|
handler receiptHandler
|
|
next *Subscription
|
|
}
|
|
|
|
func newSubscriptionFromFunc(fn interface{}) (*Subscription, error) {
|
|
handler, err := newReceiptHandler(fn)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &Subscription{handler: handler, next: nil}, nil
|
|
}
|
|
|
|
func (s *Subscription) Unsubscribe() {
|
|
s.remove = true
|
|
}
|