Skip to content

Commit efb7f46

Browse files
committed
fix(mpmc): check CAS result before accessing ring buffer slot
The tail/head CAS result was ignored in EnqueueM, EnqueueMRich, Enqueue, and Dequeue. When two goroutines race for the same slot, the CAS-losing goroutine could use a stale index and overwrite the winner's data. Fixes #9
1 parent ba3ca29 commit efb7f46

1 file changed

Lines changed: 13 additions & 6 deletions

File tree

mpmc/rb2.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ func (rb *orbuf[T]) EnqueueM(item T) (overwrites uint32, err error) { //nolint:r
3434
overwrites++
3535
}
3636

37+
if !atomic.CompareAndSwapUint32(&rb.tail, tail, nt) {
38+
continue // tail CAS failed, retry with fresh values
39+
}
3740
holder = &rb.data[tail]
38-
atomic.CompareAndSwapUint32(&rb.tail, tail, nt)
3941
retry:
4042
if !atomic.CompareAndSwapUint64(&holder.readWrite, 0, 2) { //nolint:gomnd
4143
if !atomic.CompareAndSwapUint64(&holder.readWrite, 1, 2) { //nolint:gomnd
@@ -88,8 +90,10 @@ func (rb *orbuf[T]) EnqueueMRich(item T) (size, overwrites uint32, err error) {
8890
size = rb.qty(head, tail) + 1
8991
}
9092

93+
if !atomic.CompareAndSwapUint32(&rb.tail, tail, nt) {
94+
continue // tail CAS failed, retry with fresh values
95+
}
9196
holder = &rb.data[tail]
92-
atomic.CompareAndSwapUint32(&rb.tail, tail, nt)
9397
retry:
9498
if !atomic.CompareAndSwapUint64(&holder.readWrite, 0, 2) { //nolint:gomnd
9599
if !atomic.CompareAndSwapUint64(&holder.readWrite, 1, 2) { //nolint:gomnd
@@ -142,8 +146,10 @@ func (rb *orbuf[T]) Enqueue(item T) (err error) { //nolint:revive
142146
atomic.CompareAndSwapUint32(&rb.head, head, nh)
143147
}
144148

149+
if !atomic.CompareAndSwapUint32(&rb.tail, tail, nt) {
150+
continue // tail CAS failed, retry with fresh values
151+
}
145152
holder = &rb.data[tail]
146-
atomic.CompareAndSwapUint32(&rb.tail, tail, nt)
147153
retry:
148154
if !atomic.CompareAndSwapUint64(&holder.readWrite, 0, 2) { //nolint:gomnd
149155
if !atomic.CompareAndSwapUint64(&holder.readWrite, 1, 2) { //nolint:gomnd
@@ -197,10 +203,11 @@ func (rb *orbuf[T]) Dequeue() (item T, err error) { //nolint:revive
197203
return
198204
}
199205

200-
holder = &rb.data[head]
201-
202206
nh = (head + 1) & rb.capModMask
203-
atomic.CompareAndSwapUint32(&rb.head, head, nh)
207+
if !atomic.CompareAndSwapUint32(&rb.head, head, nh) {
208+
continue // head CAS failed, retry with fresh values
209+
}
210+
holder = &rb.data[head]
204211
retry:
205212
if !atomic.CompareAndSwapUint64(&holder.readWrite, 1, 3) { //nolint:gomnd
206213
if atomic.LoadUint64(&holder.readWrite) == 1 {

0 commit comments

Comments
 (0)