From 6dc923b6ae8f73463d8f64d23b5ff904438fb119 Mon Sep 17 00:00:00 2001 From: Leon Mika Date: Mon, 12 May 2025 19:58:45 +1000 Subject: [PATCH] Brought up to date and added an unsubscribe option --- bus.go | 37 ++++++++++++++---- bus_test.go | 110 ++++++++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 12 ++++-- go.sum | 14 +------ 4 files changed, 149 insertions(+), 24 deletions(-) diff --git a/bus.go b/bus.go index 7b5eb27..a6dca30 100644 --- a/bus.go +++ b/bus.go @@ -8,7 +8,7 @@ func New() *Bus { return &Bus{topics: make(map[string]*topic)} } -func (d *Bus) On(event string, receiver interface{}) { +func (d *Bus) On(event string, receiver interface{}) *Subscription { // TODO: make thread safe t, hasTopic := d.topics[event] if !hasTopic { @@ -22,6 +22,7 @@ func (d *Bus) On(event string, receiver interface{}) { } t.addSubscriber(sub) + return sub } func (d *Bus) Fire(event string, args ...interface{}) { @@ -39,7 +40,22 @@ func (d *Bus) TryFire(event string, args ...interface{}) error { var errs []error + var lastSub *Subscription = nil for sub := topic.head; sub != nil; sub = sub.next { + // Remove unsubscribers + if sub.remove { + if lastSub == nil { + topic.head = sub.next + } else { + lastSub.next = sub.next + } + if topic.tail == sub { + topic.tail = lastSub + } + continue + } + lastSub = sub + err := sub.handler.invoke(preparedArgs) if err != nil { errs = append(errs, err) @@ -56,11 +72,11 @@ func (d *Bus) TryFire(event string, args ...interface{}) error { } type topic struct { - head *subscription - tail *subscription + head *Subscription + tail *Subscription } -func (t *topic) addSubscriber(sub *subscription) { +func (t *topic) addSubscriber(sub *Subscription) { if t.head == nil { t.head = sub } @@ -70,15 +86,20 @@ func (t *topic) addSubscriber(sub *subscription) { t.tail = sub } -type subscription struct { +type Subscription struct { + remove bool handler receiptHandler - next *subscription + next *Subscription } -func newSubscriptionFromFunc(fn interface{}) (*subscription, error) { +func newSubscriptionFromFunc(fn interface{}) (*Subscription, error) { handler, err := newReceiptHandler(fn) if err != nil { return nil, err } - return &subscription{handler: handler, next: nil}, nil + return &Subscription{handler: handler, next: nil}, nil +} + +func (s *Subscription) Unsubscribe() { + s.remove = true } diff --git a/bus_test.go b/bus_test.go index 808fa05..23dedc4 100644 --- a/bus_test.go +++ b/bus_test.go @@ -52,6 +52,116 @@ func TestFire(t *testing.T) { }) } +func TestUnsubscribe(t *testing.T) { + t.Run("should remove subscription 1", func(t *testing.T) { + fired := make([]int, 3) + + d := New() + s1 := d.On("event", func() { fired[0]++ }) + s2 := d.On("event", func() { fired[1]++ }) + s3 := d.On("event", func() { fired[2]++ }) + + d.Fire("event") + assert.Equal(t, []int{1, 1, 1}, fired) + + s1.Unsubscribe() + d.Fire("event") + assert.Equal(t, []int{1, 2, 2}, fired) + + s2.Unsubscribe() + d.Fire("event") + assert.Equal(t, []int{1, 2, 3}, fired) + + s3.Unsubscribe() + d.Fire("event") + assert.Equal(t, []int{1, 2, 3}, fired) + }) + + t.Run("should remove subscription 2", func(t *testing.T) { + fired := make([]int, 3) + + d := New() + s1 := d.On("event", func() { fired[0]++ }) + s2 := d.On("event", func() { fired[1]++ }) + s3 := d.On("event", func() { fired[2]++ }) + + d.Fire("event") + assert.Equal(t, []int{1, 1, 1}, fired) + + s3.Unsubscribe() + d.Fire("event") + assert.Equal(t, []int{2, 2, 1}, fired) + + s2.Unsubscribe() + d.Fire("event") + assert.Equal(t, []int{3, 2, 1}, fired) + + s1.Unsubscribe() + d.Fire("event") + assert.Equal(t, []int{3, 2, 1}, fired) + }) + + t.Run("should remove subscription 3", func(t *testing.T) { + fired := make([]int, 3) + + d := New() + s1 := d.On("event", func() { fired[0]++ }) + s2 := d.On("event", func() { fired[1]++ }) + s3 := d.On("event", func() { fired[2]++ }) + + d.Fire("event") + assert.Equal(t, []int{1, 1, 1}, fired) + + s2.Unsubscribe() + d.Fire("event") + assert.Equal(t, []int{2, 1, 2}, fired) + + s1.Unsubscribe() + d.Fire("event") + assert.Equal(t, []int{2, 1, 3}, fired) + + s3.Unsubscribe() + d.Fire("event") + assert.Equal(t, []int{2, 1, 3}, fired) + }) + + t.Run("should support new subscribers subscription", func(t *testing.T) { + fired := make([]int, 3) + + d := New() + d.On("event", func() { fired[0]++ }) + s2 := d.On("event", func() { fired[1]++ }) + + d.Fire("event") + assert.Equal(t, []int{1, 1, 0}, fired) + + s2.Unsubscribe() + d.Fire("event") + assert.Equal(t, []int{2, 1, 0}, fired) + + s3 := d.On("event", func() { fired[2]++ }) + s2.Unsubscribe() + d.Fire("event") + assert.Equal(t, []int{3, 1, 1}, fired) + + s3.Unsubscribe() + d.Fire("event") + assert.Equal(t, []int{4, 1, 1}, fired) + }) + + t.Run("should nop if no events", func(t *testing.T) { + fired := false + + d := New() + s1 := d.On("event", func() { fired = true }) + s1.Unsubscribe() + s1.Unsubscribe() + + d.Fire("event") + assert.False(t, fired) + }) +} + func TestTryFire(t *testing.T) { errVal := errors.New("bang") diff --git a/go.mod b/go.mod index e6c9285..6b2ff50 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,11 @@ -module github.com/lmika/events +module lmika.dev/pkg/events -go 1.14 +go 1.24 -require github.com/stretchr/testify v1.9.0 // indirect +require github.com/stretchr/testify v1.9.0 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index f62753e..60ce688 100644 --- a/go.sum +++ b/go.sum @@ -1,22 +1,10 @@ -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=