Skip to content

Commit 8b37d7f

Browse files
committed
fix: drain channel on unsubscribe
1 parent d20bdef commit 8b37d7f

File tree

2 files changed

+36
-0
lines changed

2 files changed

+36
-0
lines changed

pubsub/pubsub.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ import (
1616
// This is a last-ditch effort to avoid deadlocks.
1717
const AckTimeout = time.Second * 30
1818

19+
// PublishTimeout is the time to wait for a publish before panicking.
20+
const PublishTimeout = time.Second * 10
21+
1922
// Message is a message that must be acknowledge by the receiver.
2023
type Message[T any] struct {
2124
Msg T
@@ -148,6 +151,11 @@ func (s *Topic[T]) Unsubscribe(c chan T) {
148151
panic("channel not subscribed")
149152
}
150153
s.rawChannelMap.Delete(c)
154+
// Drain the subscription channel
155+
go func() {
156+
for range c {
157+
}
158+
}()
151159
s.control <- unsubscribe[T](ackch.(chan Message[T]))
152160
}
153161

@@ -203,6 +211,7 @@ func (s *Topic[T]) run() {
203211
case err := <-smsg.ack:
204212
errs = append(errs, err)
205213
case <-timer.C:
214+
// Print all goroutines
206215
panic("ack timeout for " + sub.subscriber)
207216
}
208217
timer.Stop()

pubsub/pubsub_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package pubsub_test
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"testing"
@@ -107,3 +108,29 @@ func TestSyncPubSub(t *testing.T) {
107108
}
108109
assert.Equal(t, []string{"received", "published", "received", "published"}, actual)
109110
}
111+
112+
func TestPubSubPanicAfterUnsubscribe(t *testing.T) {
113+
t.Skip("This test is slow")
114+
topic := New[string]()
115+
for range 100 {
116+
go func() {
117+
foo := topic.Subscribe(make(chan string, 1))
118+
<-time.After(time.Second)
119+
topic.Unsubscribe(foo)
120+
}()
121+
}
122+
ctx, cancel := context.WithCancel(context.TODO())
123+
defer cancel()
124+
go func() {
125+
for {
126+
select {
127+
case <-time.After(time.Millisecond * 100):
128+
topic.Publish("foo")
129+
130+
case <-ctx.Done():
131+
return
132+
}
133+
}
134+
}()
135+
<-time.After(time.Minute)
136+
}

0 commit comments

Comments
 (0)