Skip to content

Commit d6a2a5e

Browse files
authored
sync: add condition support (#24574)
1 parent 2432286 commit d6a2a5e

2 files changed

Lines changed: 278 additions & 0 deletions

File tree

‎vlib/sync/cond.v‎

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
module sync
2+
3+
@[heap]
4+
pub struct Cond {
5+
mut:
6+
// Externally provided mutex for shared resource protection
7+
mutex &Mutex
8+
// Internal lock for protecting wait queue access
9+
inner_mutex Mutex
10+
// Queue of waiting channels
11+
waiters []chan bool
12+
}
13+
14+
// new_cond creates new condition variable associated with given mutex
15+
pub fn new_cond(m &Mutex) &Cond {
16+
return &Cond{
17+
mutex: m
18+
inner_mutex: new_mutex()
19+
waiters: []chan bool{}
20+
}
21+
}
22+
23+
// wait waits for condition notification
24+
// NOTE: Spurious wakeups are possible; always use in a loop:
25+
// mutex.lock()
26+
// for !condition {
27+
// cond.wait()
28+
// }
29+
// mutex.unlock()
30+
@[direct_array_access]
31+
pub fn (mut c Cond) wait() {
32+
// Create a channel for this waiting operation with capacity 1
33+
ch := chan bool{cap: 1}
34+
defer {
35+
ch.close()
36+
}
37+
38+
// Add this channel to the waiters queue
39+
c.inner_mutex.lock()
40+
c.waiters << ch
41+
c.inner_mutex.unlock()
42+
43+
// Release external lock and suspend
44+
c.mutex.unlock()
45+
_ := <-ch // Block until signaled
46+
47+
c.inner_mutex.lock()
48+
for i := c.waiters.len - 1; i >= 0; i-- {
49+
if c.waiters[i] == ch {
50+
c.waiters.delete(i)
51+
break
52+
}
53+
}
54+
c.inner_mutex.unlock()
55+
// Re-acquire external lock before returning
56+
c.mutex.lock()
57+
}
58+
59+
// signal wakes one waiting thread
60+
@[direct_array_access]
61+
pub fn (mut c Cond) signal() {
62+
c.inner_mutex.lock()
63+
defer { c.inner_mutex.unlock() }
64+
if c.waiters.len > 0 {
65+
// Remove first waiter from queue
66+
mut waiter := c.waiters[0]
67+
c.waiters.delete(0)
68+
if !waiter.closed {
69+
waiter <- true // Wake up the thread
70+
}
71+
}
72+
}
73+
74+
// broadcast wakes all waiting threads
75+
@[direct_array_access]
76+
pub fn (mut c Cond) broadcast() {
77+
c.inner_mutex.lock()
78+
defer { c.inner_mutex.unlock() }
79+
// Release all waiting ch
80+
for i in 0 .. c.waiters.len {
81+
mut waiter := c.waiters[i]
82+
if !waiter.closed {
83+
waiter <- true // Wake up the thread
84+
}
85+
}
86+
c.waiters.clear()
87+
}

‎vlib/sync/cond_test.v‎

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
import sync
2+
import sync.stdatomic { new_atomic }
3+
4+
// Test single thread wake-up scenario for condition variable
5+
fn test_single_thread_wakeup() {
6+
mut mutex := sync.new_mutex()
7+
mut cond := sync.new_cond(mutex)
8+
mut done := new_atomic(false)
9+
mut wake_count := new_atomic(0)
10+
ready_ch := chan bool{cap: 1}
11+
done_ch := chan bool{cap: 1}
12+
defer {
13+
ready_ch.close()
14+
done_ch.close()
15+
}
16+
17+
// Spawn waiting thread
18+
spawn fn [mut done, mut wake_count, mut cond, mut mutex, ready_ch, done_ch] () {
19+
mutex.lock()
20+
defer {
21+
mutex.unlock()
22+
}
23+
ready_ch <- true // Notify main thread of readiness
24+
25+
// Wait loop for conditional signals
26+
for !done.load() {
27+
cond.wait()
28+
wake_count.add(1)
29+
}
30+
done_ch <- true // Notify completion
31+
}()
32+
33+
// Wait for the worker to enter waiting state
34+
_ := <-ready_ch
35+
36+
// Trigger signaling sequence
37+
mutex.lock()
38+
cond.signal() // Wake the waiting thread
39+
done.store(true) // Terminate worker loop
40+
cond.signal() // Extra signal for loop exit
41+
mutex.unlock()
42+
43+
// Verify result
44+
_ := <-done_ch
45+
assert wake_count.load() == 1, 'Should wake exactly 1 thread'
46+
}
47+
48+
// Test broadcast wake-up of multiple waiting threads
49+
fn test_broadcast_wakeup() {
50+
mut mutex := sync.new_mutex()
51+
mut cond := sync.new_cond(mutex)
52+
num_threads := 5
53+
mut wake_counter := new_atomic(0)
54+
ready_ch := chan bool{cap: num_threads}
55+
done_ch := chan bool{cap: num_threads}
56+
defer {
57+
ready_ch.close()
58+
done_ch.close()
59+
}
60+
61+
// Spawn multiple waiting threads
62+
for _ in 0 .. num_threads {
63+
spawn fn [mut wake_counter, mut cond, mut mutex, ready_ch, done_ch] () {
64+
mutex.lock()
65+
defer {
66+
mutex.unlock()
67+
}
68+
ready_ch <- true // Notify readiness
69+
cond.wait() // Wait for broadcast
70+
wake_counter.add(1)
71+
done_ch <- true // Notify completion
72+
}()
73+
}
74+
75+
// Wait for all threads to enter waiting state
76+
for _ in 0 .. num_threads {
77+
_ := <-ready_ch
78+
}
79+
80+
// Trigger broadcast wake-up
81+
mutex.lock()
82+
cond.broadcast()
83+
mutex.unlock()
84+
85+
// Verify all threads completed
86+
for _ in 0 .. num_threads {
87+
_ := <-done_ch
88+
}
89+
assert wake_counter.load() == num_threads, 'Should wake all threads'
90+
}
91+
92+
// Test consecutive signal delivery sequencing
93+
fn test_multiple_signals() {
94+
mut mutex := sync.new_mutex()
95+
mut cond := sync.new_cond(mutex)
96+
mut counter := new_atomic(0)
97+
num_signals := 3
98+
ready_ch := chan bool{cap: 1}
99+
wait_sync_ch := chan bool{cap: 1} // Synchronization for wait-sequence tracking
100+
done_ch := chan bool{cap: 1}
101+
defer {
102+
ready_ch.close()
103+
wait_sync_ch.close()
104+
done_ch.close()
105+
}
106+
107+
spawn fn [num_signals, mut counter, mut cond, mut mutex, ready_ch, wait_sync_ch, done_ch] () {
108+
mutex.lock()
109+
defer {
110+
mutex.unlock()
111+
}
112+
113+
ready_ch <- true // Initial readiness notification
114+
115+
// Process multiple signals sequentially
116+
for _ in 0 .. num_signals {
117+
cond.wait()
118+
counter.add(1)
119+
wait_sync_ch <- true // Signal processing complete
120+
}
121+
done_ch <- true
122+
}()
123+
124+
// Wait for initial setup
125+
_ := <-ready_ch
126+
127+
// Send first signal
128+
mutex.lock()
129+
cond.signal()
130+
mutex.unlock()
131+
132+
// Send subsequent signals with synchronization
133+
for _ in 1 .. num_signals {
134+
_ := <-wait_sync_ch // Wait for previous signal processing
135+
mutex.lock()
136+
cond.signal()
137+
mutex.unlock()
138+
}
139+
140+
_ := <-done_ch
141+
assert counter.load() == num_signals, 'Signal count should match counter value'
142+
}
143+
144+
// Test lock reacquisition mechanics after wait()
145+
fn test_lock_reacquire() {
146+
mut mutex := sync.new_mutex()
147+
mut cond := sync.new_cond(mutex)
148+
mut lock_held := new_atomic(false)
149+
ready_ch := chan bool{cap: 1}
150+
done_ch := chan bool{cap: 1}
151+
defer {
152+
ready_ch.close()
153+
done_ch.close()
154+
}
155+
156+
spawn fn [mut lock_held, mut cond, mut mutex, ready_ch, done_ch] () {
157+
mutex.lock()
158+
defer {
159+
mutex.unlock()
160+
}
161+
ready_ch <- true
162+
163+
cond.wait()
164+
// Test lock state after wakeup
165+
lock_held.store(!mutex.try_lock()) // Should fail -> store true
166+
done_ch <- true
167+
}()
168+
169+
_ := <-ready_ch
170+
171+
mutex.lock()
172+
cond.signal()
173+
mutex.unlock()
174+
175+
_ := <-done_ch
176+
assert lock_held.load(), 'Mutex should be reacquired automatically after wait()'
177+
}
178+
179+
// Test empty signal/broadcast scenario
180+
fn test_signal_without_waiters() {
181+
mut mutex := sync.new_mutex()
182+
mut cond := sync.new_cond(mutex)
183+
184+
// Verify no panic occurs
185+
mutex.lock()
186+
cond.signal() // No-op with no waiters
187+
cond.broadcast() // No-op with no waiters
188+
mutex.unlock()
189+
190+
assert true, 'Should handle empty signal operations safely'
191+
}

0 commit comments

Comments
 (0)