From 56485330dd142a31d3a9009b6b3a5e89e1d901d0 Mon Sep 17 00:00:00 2001 From: Leon Mika Date: Mon, 12 May 2025 20:03:27 +1000 Subject: [PATCH] Added mutexes to writes --- bus.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/bus.go b/bus.go index a6dca30..81aa338 100644 --- a/bus.go +++ b/bus.go @@ -1,5 +1,7 @@ package events +import "sync" + type Bus struct { topics map[string]*topic } @@ -12,7 +14,7 @@ func (d *Bus) On(event string, receiver interface{}) *Subscription { // TODO: make thread safe t, hasTopic := d.topics[event] if !hasTopic { - t = &topic{} + t = &topic{mutex: sync.Mutex{}, head: nil, tail: nil} d.topics[event] = t } @@ -38,12 +40,17 @@ func (d *Bus) TryFire(event string, args ...interface{}) error { preparedArgs := prepareArgs(args) + topic.mutex.Lock() + var head = topic.head + topic.mutex.Unlock() + var errs []error var lastSub *Subscription = nil - for sub := topic.head; sub != nil; sub = sub.next { + for sub := head; sub != nil; sub = sub.next { // Remove unsubscribers if sub.remove { + topic.mutex.Lock() if lastSub == nil { topic.head = sub.next } else { @@ -52,6 +59,7 @@ func (d *Bus) TryFire(event string, args ...interface{}) error { if topic.tail == sub { topic.tail = lastSub } + topic.mutex.Unlock() continue } lastSub = sub @@ -72,11 +80,16 @@ func (d *Bus) TryFire(event string, args ...interface{}) error { } type topic struct { - head *Subscription - tail *Subscription + // mutex protects writes to the head and tail + mutex sync.Mutex + head *Subscription + tail *Subscription } func (t *topic) addSubscriber(sub *Subscription) { + t.mutex.Lock() + defer t.mutex.Unlock() + if t.head == nil { t.head = sub }