5050 // for select
5151 write_subscriber & Subscription = unsafe { nil }
5252 read_subscriber & Subscription = unsafe { nil }
53- write_sub_mtx u16
54- read_sub_mtx u16
53+ write_sub_mtx & SpinLock
54+ read_sub_mtx & SpinLock
5555 closed u16
5656pub :
5757 cap u32 // queue length in #objects
@@ -80,6 +80,8 @@ fn new_channel_st(n u32, st u32) &Channel {
8080 statusbuf: sbuf
8181 write_subscriber: unsafe { nil }
8282 read_subscriber: unsafe { nil }
83+ write_sub_mtx: new_spin_lock ()
84+ read_sub_mtx: new_spin_lock ()
8385 }
8486 ch.writesem.init (wsem)
8587 ch.readsem.init (rsem)
@@ -103,6 +105,8 @@ fn new_channel_st_noscan(n u32, st u32) &Channel {
103105 statusbuf: sbuf
104106 write_subscriber: unsafe { nil }
105107 read_subscriber: unsafe { nil }
108+ write_sub_mtx: new_spin_lock ()
109+ read_sub_mtx: new_spin_lock ()
106110 }
107111 ch.writesem.init (wsem)
108112 ch.readsem.init (rsem)
@@ -130,27 +134,24 @@ pub fn (mut ch Channel) close() {
130134 }
131135 ch.readsem_im.post ()
132136 ch.readsem.post ()
133- mut null16 := u16 (0 )
134- for ! C.atomic_compare_exchange_weak_u16 (& ch.read_sub_mtx, & null16 , u16 (1 )) {
135- null16 = u16 (0 )
136- }
137+ ch.read_sub_mtx.lock ()
137138 if ch.read_subscriber != unsafe { nil } {
138139 ch.read_subscriber.sem.post ()
139140 }
140- C.atomic_store_u16 (& ch.read_sub_mtx, u16 (0 ))
141- null16 = u16 (0 )
142- for ! C.atomic_compare_exchange_weak_u16 (& ch.write_sub_mtx, & null16 , u16 (1 )) {
143- null16 = u16 (0 )
144- }
141+ ch.read_sub_mtx.unlock ()
142+ ch.write_sub_mtx.lock ()
145143 if ch.write_subscriber != unsafe { nil } {
146144 ch.write_subscriber.sem.post ()
147145 }
148- C. atomic_store_u16 ( & ch.write_sub_mtx, u16 ( 0 ) )
146+ ch.write_sub_mtx. unlock ( )
149147 ch.writesem.post ()
150148 if ch.cap == 0 {
151149 C.atomic_store_ptr (unsafe { & voidptr (& ch.read_adr) }, unsafe { nil })
152150 }
153151 ch.writesem_im.post ()
152+
153+ // Do not destroy `read_sub_mtx` and `write_sub_mtx` here,
154+ // because we can read from a closed channel later.
154155}
155156
156157@[inline]
@@ -236,15 +237,11 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState {
236237 }
237238 }
238239 if ! read_in_progress {
239- mut null16 := u16 (0 )
240- for ! C.atomic_compare_exchange_weak_u16 (voidptr (& ch.read_sub_mtx), & null16 ,
241- u16 (1 )) {
242- null16 = u16 (0 )
243- }
240+ ch.read_sub_mtx.lock ()
244241 if ch.read_subscriber != unsafe { nil } {
245242 ch.read_subscriber.sem.post ()
246243 }
247- C. atomic_store_u16 ( & ch.read_sub_mtx, u16 ( 0 ) )
244+ ch.read_sub_mtx. unlock ( )
248245 }
249246 mut src2 := src
250247 for sp := u32 (0 ); sp < spinloops_ || read_in_progress; sp++ {
@@ -335,14 +332,11 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState {
335332 C.atomic_store_u16 (unsafe { & u16 (status_adr) }, u16 (BufferElemStat.written))
336333 C.atomic_fetch_add_u32 (voidptr (& ch.read_avail), 1 )
337334 ch.readsem.post ()
338- mut null16 := u16 (0 )
339- for ! C.atomic_compare_exchange_weak_u16 (& ch.read_sub_mtx, & null16 , u16 (1 )) {
340- null16 = u16 (0 )
341- }
335+ ch.read_sub_mtx.lock ()
342336 if ch.read_subscriber != unsafe { nil } {
343337 ch.read_subscriber.sem.post ()
344338 }
345- C. atomic_store_u16 ( & ch.read_sub_mtx, u16 ( 0 ) )
339+ ch.read_sub_mtx. unlock ( )
346340 return .success
347341 } else {
348342 if no_block {
@@ -456,14 +450,11 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState {
456450 C.atomic_store_u16 (unsafe { & u16 (status_adr) }, u16 (BufferElemStat.unused))
457451 C.atomic_fetch_add_u32 (voidptr (& ch.write_free), 1 )
458452 ch.writesem.post ()
459- mut null16 := u16 (0 )
460- for ! C.atomic_compare_exchange_weak_u16 (& ch.write_sub_mtx, & null16 , u16 (1 )) {
461- null16 = u16 (0 )
462- }
453+ ch.write_sub_mtx.lock ()
463454 if ch.write_subscriber != unsafe { nil } {
464455 ch.write_subscriber.sem.post ()
465456 }
466- C. atomic_store_u16 ( & ch.write_sub_mtx, u16 ( 0 ) )
457+ ch.write_sub_mtx. unlock ( )
467458 return .success
468459 }
469460 }
@@ -484,14 +475,11 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState {
484475 }
485476 }
486477 if ch.cap == 0 && ! write_in_progress {
487- mut null16 := u16 (0 )
488- for ! C.atomic_compare_exchange_weak_u16 (& ch.write_sub_mtx, & null16 , u16 (1 )) {
489- null16 = u16 (0 )
490- }
478+ ch.write_sub_mtx.lock ()
491479 if ch.write_subscriber != unsafe { nil } {
492480 ch.write_subscriber.sem.post ()
493481 }
494- C. atomic_store_u16 ( & ch.write_sub_mtx, u16 ( 0 ) )
482+ ch.write_sub_mtx. unlock ( )
495483 }
496484 mut dest2 := dest
497485 for sp := u32 (0 ); sp < spinloops_ || write_in_progress; sp++ {
@@ -556,14 +544,11 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
556544 for i, ch in channels {
557545 subscr[i].sem = unsafe { & sem }
558546 sub_mtx , subscriber := if dir[i] == .push {
559- & ch.write_sub_mtx, & ch.write_subscriber
547+ ch.write_sub_mtx, & ch.write_subscriber
560548 } else {
561- & ch.read_sub_mtx, & ch.read_subscriber
562- }
563- mut null16 := u16 (0 )
564- for ! C.atomic_compare_exchange_weak_u16 (sub_mtx, & null16 , u16 (1 )) {
565- null16 = u16 (0 )
549+ ch.read_sub_mtx, & ch.read_subscriber
566550 }
551+ sub_mtx.lock ()
567552 subscr[i].prev = unsafe { subscriber }
568553 unsafe {
569554 subscr[i].nxt = & Subscription (C.atomic_exchange_ptr (& voidptr (subscriber),
@@ -572,7 +557,7 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
572557 if voidptr (subscr[i].nxt) != unsafe { nil } {
573558 subscr[i].nxt.prev = unsafe { & subscr[i].nxt }
574559 }
575- C. atomic_store_u16 (sub_mtx, u16 ( 0 ) )
560+ sub_mtx. unlock ( )
576561 }
577562 stopwatch := if timeout == time.infinite || timeout < = 0 {
578563 time.StopWatch{}
@@ -620,22 +605,19 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
620605 // reset subscribers
621606 for i, ch in channels {
622607 sub_mtx := if dir[i] == .push {
623- & ch.write_sub_mtx
608+ ch.write_sub_mtx
624609 } else {
625- & ch.read_sub_mtx
626- }
627- mut null16 := u16 (0 )
628- for ! C.atomic_compare_exchange_weak_u16 (sub_mtx, & null16 , u16 (1 )) {
629- null16 = u16 (0 )
610+ ch.read_sub_mtx
630611 }
612+ sub_mtx.lock ()
631613 unsafe {
632614 * subscr[i].prev = subscr[i].nxt
633615 }
634616 if unsafe { subscr[i].nxt != 0 } {
635617 subscr[i].nxt.prev = subscr[i].prev
636618 subscr[i].nxt.sem.post ()
637619 }
638- C. atomic_store_u16 (sub_mtx, u16 ( 0 ) )
620+ sub_mtx. unlock ( )
639621 }
640622 sem.destroy ()
641623 return event_idx
0 commit comments