Skip to content

Commit d20bdef

Browse files
committed
fix: report subscriber location when ack panics
eg. ``` ~/…/types/foo $ go run . master panic: ack timeout for /Users/aat/dev/types/foo/main.go:12: main.main goroutine 3 [running]: github.com/alecthomas/types/pubsub.(*Topic[...]).run(0x10010b8a0) /Users/aat/dev/types/pubsub/pubsub.go:206 +0x42c created by github.com/alecthomas/types/pubsub.New[...] in goroutine 1 /Users/aat/dev/types/pubsub/pubsub.go:71 +0x10c exit status 2 ```
1 parent a9d561e commit d20bdef

File tree

1 file changed

+17
-8
lines changed

1 file changed

+17
-8
lines changed

pubsub/pubsub.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package pubsub
66
import (
77
"errors"
88
"fmt"
9+
"runtime"
910
"sync"
1011
"time"
1112
)
@@ -33,7 +34,10 @@ func (a *Message[T]) Nack(err error) {
3334
// Control messages for the topic.
3435
type control[T any] interface{ control() }
3536

36-
type subscribe[T any] chan Message[T]
37+
type subscribe[T any] struct {
38+
subscriber string
39+
msg chan Message[T]
40+
}
3741

3842
func (subscribe[T]) control() {}
3943

@@ -93,6 +97,11 @@ func (s *Topic[T]) PublishSync(t T) error {
9397
}
9498
}
9599

100+
func getSubscriber() string {
101+
pc, file, line, _ := runtime.Caller(2)
102+
return fmt.Sprintf("%s:%d: %s", file, line, runtime.FuncForPC(pc).Name())
103+
}
104+
96105
// Subscribe a channel to the topic.
97106
//
98107
// The channel will be closed when the topic is closed.
@@ -111,7 +120,7 @@ func (s *Topic[T]) Subscribe(c chan T) chan T {
111120
close(c)
112121
}()
113122
s.rawChannelMap.Store(c, forward)
114-
s.control <- subscribe[T](forward)
123+
s.control <- subscribe[T]{msg: forward, subscriber: getSubscriber()}
115124
return c
116125
}
117126

@@ -128,7 +137,7 @@ func (s *Topic[T]) SubscribeSync(c chan Message[T]) chan Message[T] {
128137
if c == nil {
129138
c = make(chan Message[T], 16)
130139
}
131-
s.control <- subscribe[T](c)
140+
s.control <- subscribe[T]{msg: c, subscriber: getSubscriber()}
132141
return c
133142
}
134143

@@ -155,13 +164,13 @@ func (s *Topic[T]) Close() error {
155164
}
156165

157166
func (s *Topic[T]) run() {
158-
subscriptions := map[chan Message[T]]struct{}{}
167+
subscriptions := map[chan Message[T]]subscribe[T]{}
159168
for {
160169
select {
161170
case msg := <-s.control:
162171
switch msg := msg.(type) {
163172
case subscribe[T]:
164-
subscriptions[msg] = struct{}{}
173+
subscriptions[msg.msg] = msg
165174

166175
case unsubscribe[T]:
167176
delete(subscriptions, msg)
@@ -173,7 +182,7 @@ func (s *Topic[T]) run() {
173182
}
174183
close(s.control)
175184
close(s.publish)
176-
s.rawChannelMap.Range(func(k, v interface{}) bool {
185+
s.rawChannelMap.Range(func(k, v any) bool {
177186
s.rawChannelMap.Delete(k)
178187
return true
179188
})
@@ -186,15 +195,15 @@ func (s *Topic[T]) run() {
186195

187196
case msg := <-s.publish:
188197
errs := []error{}
189-
for ch := range subscriptions {
198+
for ch, sub := range subscriptions {
190199
smsg := Message[T]{Msg: msg.Msg, ack: make(chan error, 1)}
191200
ch <- smsg
192201
timer := time.NewTimer(AckTimeout)
193202
select {
194203
case err := <-smsg.ack:
195204
errs = append(errs, err)
196205
case <-timer.C:
197-
panic("ack timeout")
206+
panic("ack timeout for " + sub.subscriber)
198207
}
199208
timer.Stop()
200209
}

0 commit comments

Comments
 (0)