Skip to content

Commit bd465b5

Browse files
authored
sync: use SpinLock for channel (fix #24680) (#24802)
1 parent 2cfeb6d commit bd465b5

2 files changed

Lines changed: 118 additions & 47 deletions

File tree

‎vlib/sync/bench/run_bench.v‎

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Usage: v run run_bench.v
2+
// It will generate a `result.md` in current dir.
3+
import os
4+
import arrays
5+
import strings
6+
7+
const compilers = [
8+
'tcc',
9+
'clang',
10+
'gcc',
11+
]
12+
13+
const run_iterations = 10
14+
15+
const nobj = 10000000
16+
const run_settings = [
17+
[1, 1, 0],
18+
[1, 1, 100],
19+
[4, 4, 0],
20+
[4, 4, 100],
21+
]
22+
23+
fn get_perf_from_result(result string) !f32 {
24+
lines := result.split_into_lines()
25+
for l in lines {
26+
if l.contains('objects') && l.contains('(') && l.contains(')') {
27+
f := l.find_between('(', ')').all_before('objs/µs').trim_space().f32()
28+
return f
29+
}
30+
}
31+
return error('run fail?')
32+
}
33+
34+
fn main() {
35+
mut perf_result := []f32{}
36+
37+
for cc in compilers {
38+
// 1. compile
39+
compile_cmd := 'v channel_bench_v.v -cc ${cc}'
40+
println('compile_cmd: ${compile_cmd}')
41+
compile_result := os.execute(compile_cmd)
42+
if compile_result.exit_code != 0 {
43+
panic('compile fail with "${compile_cmd}"')
44+
}
45+
46+
// 2. run
47+
for s in run_settings {
48+
run_cmd := './channel_bench_v ${s[0]:-3} ${s[1]:-3} ${s[2]:-3} ${nobj}'
49+
println('-----------------------------------------------------------')
50+
mut iteration_result := []f32{}
51+
for i in 0 .. run_iterations {
52+
print('${i:3}: ${run_cmd}')
53+
run_result := os.execute(run_cmd)
54+
f := get_perf_from_result(run_result.output)!
55+
iteration_result << f
56+
println(' => ${f:.2} objs/µs')
57+
}
58+
avg := arrays.sum(iteration_result)! / run_iterations
59+
perf_result << avg
60+
}
61+
}
62+
63+
// 3. output result
64+
mut sb := strings.new_builder(8192)
65+
sb.write_string('\n| nsend | nrec | buflen |')
66+
for cc in compilers {
67+
sb.write_string(' **V (${cc:-5})** |')
68+
}
69+
sb.writeln('')
70+
sb.write_string('| :---: | :---:| :---: |')
71+
for _ in 0 .. compilers.len {
72+
sb.write_string(' :---: |')
73+
}
74+
sb.writeln('')
75+
for i, s in run_settings {
76+
sb.write_string('| ${s[0]:-3} | ${s[1]:-3} | ${s[2]:-3} |')
77+
for j in 0 .. compilers.len {
78+
sb.write_string(' ${perf_result[j * run_settings.len + i]:-5.2} |')
79+
}
80+
sb.writeln('')
81+
}
82+
sb.writeln('')
83+
println('***********************************************************')
84+
println('writing result to `result.md`...')
85+
println('***********************************************************')
86+
os.write_file('result.md', sb.str())!
87+
println(sb.str())
88+
os.rm('./channel_bench_v')!
89+
}

‎vlib/sync/channels.c.v‎

Lines changed: 29 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ mut:
5050
// for select
5151
write_subscriber &Subscription = unsafe { nil }
5252
read_subscriber &Subscription = unsafe { nil }
53-
write_sub_mtx u16
54-
read_sub_mtx u16
53+
write_sub_mtx &SpinLock
54+
read_sub_mtx &SpinLock
5555
closed u16
5656
pub:
5757
cap u32 // queue length in #objects
@@ -80,6 +80,8 @@ fn new_channel_st(n u32, st u32) &Channel {
8080
statusbuf: sbuf
8181
write_subscriber: unsafe { nil }
8282
read_subscriber: unsafe { nil }
83+
write_sub_mtx: new_spin_lock()
84+
read_sub_mtx: new_spin_lock()
8385
}
8486
ch.writesem.init(wsem)
8587
ch.readsem.init(rsem)
@@ -103,6 +105,8 @@ fn new_channel_st_noscan(n u32, st u32) &Channel {
103105
statusbuf: sbuf
104106
write_subscriber: unsafe { nil }
105107
read_subscriber: unsafe { nil }
108+
write_sub_mtx: new_spin_lock()
109+
read_sub_mtx: new_spin_lock()
106110
}
107111
ch.writesem.init(wsem)
108112
ch.readsem.init(rsem)
@@ -130,27 +134,24 @@ pub fn (mut ch Channel) close() {
130134
}
131135
ch.readsem_im.post()
132136
ch.readsem.post()
133-
mut null16 := u16(0)
134-
for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) {
135-
null16 = u16(0)
136-
}
137+
ch.read_sub_mtx.lock()
137138
if ch.read_subscriber != unsafe { nil } {
138139
ch.read_subscriber.sem.post()
139140
}
140-
C.atomic_store_u16(&ch.read_sub_mtx, u16(0))
141-
null16 = u16(0)
142-
for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
143-
null16 = u16(0)
144-
}
141+
ch.read_sub_mtx.unlock()
142+
ch.write_sub_mtx.lock()
145143
if ch.write_subscriber != unsafe { nil } {
146144
ch.write_subscriber.sem.post()
147145
}
148-
C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
146+
ch.write_sub_mtx.unlock()
149147
ch.writesem.post()
150148
if ch.cap == 0 {
151149
C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, unsafe { nil })
152150
}
153151
ch.writesem_im.post()
152+
153+
// Do not destroy `read_sub_mtx` and `write_sub_mtx` here,
154+
// because we can read from a closed channel later.
154155
}
155156

156157
@[inline]
@@ -236,15 +237,11 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState {
236237
}
237238
}
238239
if !read_in_progress {
239-
mut null16 := u16(0)
240-
for !C.atomic_compare_exchange_weak_u16(voidptr(&ch.read_sub_mtx), &null16,
241-
u16(1)) {
242-
null16 = u16(0)
243-
}
240+
ch.read_sub_mtx.lock()
244241
if ch.read_subscriber != unsafe { nil } {
245242
ch.read_subscriber.sem.post()
246243
}
247-
C.atomic_store_u16(&ch.read_sub_mtx, u16(0))
244+
ch.read_sub_mtx.unlock()
248245
}
249246
mut src2 := src
250247
for sp := u32(0); sp < spinloops_ || read_in_progress; sp++ {
@@ -335,14 +332,11 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState {
335332
C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.written))
336333
C.atomic_fetch_add_u32(voidptr(&ch.read_avail), 1)
337334
ch.readsem.post()
338-
mut null16 := u16(0)
339-
for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) {
340-
null16 = u16(0)
341-
}
335+
ch.read_sub_mtx.lock()
342336
if ch.read_subscriber != unsafe { nil } {
343337
ch.read_subscriber.sem.post()
344338
}
345-
C.atomic_store_u16(&ch.read_sub_mtx, u16(0))
339+
ch.read_sub_mtx.unlock()
346340
return .success
347341
} else {
348342
if no_block {
@@ -456,14 +450,11 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState {
456450
C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.unused))
457451
C.atomic_fetch_add_u32(voidptr(&ch.write_free), 1)
458452
ch.writesem.post()
459-
mut null16 := u16(0)
460-
for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
461-
null16 = u16(0)
462-
}
453+
ch.write_sub_mtx.lock()
463454
if ch.write_subscriber != unsafe { nil } {
464455
ch.write_subscriber.sem.post()
465456
}
466-
C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
457+
ch.write_sub_mtx.unlock()
467458
return .success
468459
}
469460
}
@@ -484,14 +475,11 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState {
484475
}
485476
}
486477
if ch.cap == 0 && !write_in_progress {
487-
mut null16 := u16(0)
488-
for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
489-
null16 = u16(0)
490-
}
478+
ch.write_sub_mtx.lock()
491479
if ch.write_subscriber != unsafe { nil } {
492480
ch.write_subscriber.sem.post()
493481
}
494-
C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
482+
ch.write_sub_mtx.unlock()
495483
}
496484
mut dest2 := dest
497485
for sp := u32(0); sp < spinloops_ || write_in_progress; sp++ {
@@ -556,14 +544,11 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
556544
for i, ch in channels {
557545
subscr[i].sem = unsafe { &sem }
558546
sub_mtx, subscriber := if dir[i] == .push {
559-
&ch.write_sub_mtx, &ch.write_subscriber
547+
ch.write_sub_mtx, &ch.write_subscriber
560548
} else {
561-
&ch.read_sub_mtx, &ch.read_subscriber
562-
}
563-
mut null16 := u16(0)
564-
for !C.atomic_compare_exchange_weak_u16(sub_mtx, &null16, u16(1)) {
565-
null16 = u16(0)
549+
ch.read_sub_mtx, &ch.read_subscriber
566550
}
551+
sub_mtx.lock()
567552
subscr[i].prev = unsafe { subscriber }
568553
unsafe {
569554
subscr[i].nxt = &Subscription(C.atomic_exchange_ptr(&voidptr(subscriber),
@@ -572,7 +557,7 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
572557
if voidptr(subscr[i].nxt) != unsafe { nil } {
573558
subscr[i].nxt.prev = unsafe { &subscr[i].nxt }
574559
}
575-
C.atomic_store_u16(sub_mtx, u16(0))
560+
sub_mtx.unlock()
576561
}
577562
stopwatch := if timeout == time.infinite || timeout <= 0 {
578563
time.StopWatch{}
@@ -620,22 +605,19 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
620605
// reset subscribers
621606
for i, ch in channels {
622607
sub_mtx := if dir[i] == .push {
623-
&ch.write_sub_mtx
608+
ch.write_sub_mtx
624609
} else {
625-
&ch.read_sub_mtx
626-
}
627-
mut null16 := u16(0)
628-
for !C.atomic_compare_exchange_weak_u16(sub_mtx, &null16, u16(1)) {
629-
null16 = u16(0)
610+
ch.read_sub_mtx
630611
}
612+
sub_mtx.lock()
631613
unsafe {
632614
*subscr[i].prev = subscr[i].nxt
633615
}
634616
if unsafe { subscr[i].nxt != 0 } {
635617
subscr[i].nxt.prev = subscr[i].prev
636618
subscr[i].nxt.sem.post()
637619
}
638-
C.atomic_store_u16(sub_mtx, u16(0))
620+
sub_mtx.unlock()
639621
}
640622
sem.destroy()
641623
return event_idx

0 commit comments

Comments
 (0)