55#include "Python.h"
66#include "pycore_moduleobject.h" // _PyModule_GetState()
77#include "structmember.h" // PyMemberDef
8+ #include "parking_lot.h"
89#include <stddef.h> // offsetof()
910
1011typedef struct {
@@ -25,10 +26,19 @@ static struct PyModuleDef queuemodule;
2526
2627typedef struct {
2728 PyObject_HEAD
28- PyThread_type_lock lock ;
29- int locked ;
30- PyObject * lst ;
31- Py_ssize_t lst_pos ;
29+ /* protects all operations on queue */
30+ _PyMutex mutex ;
31+ /* number of items in queue */
32+ Py_ssize_t count ;
33+ /* offset of where to put next item */
34+ Py_ssize_t put_index ;
35+ /* offset of where to take next item */
36+ Py_ssize_t get_index ;
37+ /* size of data buffer */
38+ Py_ssize_t buffer_size ;
39+ /* array of items with length buffer_size */
40+ PyObject * * data ;
41+ uintptr_t waiting ;
3242 PyObject * weakreflist ;
3343} simplequeueobject ;
3444
@@ -41,7 +51,27 @@ class _queue.SimpleQueue "simplequeueobject *" "simplequeue_get_state_by_type(ty
4151static int
4252simplequeue_clear (simplequeueobject * self )
4353{
44- Py_CLEAR (self -> lst );
54+ if (self -> data ) {
55+ PyObject * * data = self -> data ;
56+ Py_ssize_t n = self -> count ;
57+ Py_ssize_t idx = self -> get_index ;
58+ Py_ssize_t buffer_size = self -> buffer_size ;
59+
60+ self -> data = NULL ;
61+ self -> count = 0 ;
62+ self -> put_index = 0 ;
63+ self -> get_index = 0 ;
64+ self -> buffer_size = 0 ;
65+
66+ for (; n != 0 ; n -- ) {
67+ Py_DECREF (data [idx ]);
68+ idx ++ ;
69+ if (idx == buffer_size ) {
70+ idx = 0 ;
71+ }
72+ }
73+ PyMem_Free (self -> data );
74+ }
4575 return 0 ;
4676}
4777
@@ -51,11 +81,8 @@ simplequeue_dealloc(simplequeueobject *self)
5181 PyTypeObject * tp = Py_TYPE (self );
5282
5383 PyObject_GC_UnTrack (self );
54- if (self -> lock != NULL ) {
55- /* Unlock the lock so it's safe to free it */
56- if (self -> locked > 0 )
57- PyThread_release_lock (self -> lock );
58- PyThread_free_lock (self -> lock );
84+ if (_PyMutex_is_locked (& self -> mutex )) {
85+ Py_FatalError ("SimpleQueue: dealloc with locked queue" );
5986 }
6087 (void )simplequeue_clear (self );
6188 if (self -> weakreflist != NULL )
@@ -67,7 +94,16 @@ simplequeue_dealloc(simplequeueobject *self)
6794static int
6895simplequeue_traverse (simplequeueobject * self , visitproc visit , void * arg )
6996{
70- Py_VISIT (self -> lst );
97+ PyObject * * data = self -> data ;
98+ Py_ssize_t n = self -> count ;
99+ Py_ssize_t idx = self -> get_index ;
100+ for (; n != 0 ; n -- ) {
101+ Py_VISIT (data [idx ]);
102+ idx ++ ;
103+ if (idx == self -> buffer_size ) {
104+ idx = 0 ;
105+ }
106+ }
71107 Py_VISIT (Py_TYPE (self ));
72108 return 0 ;
73109}
@@ -86,25 +122,51 @@ simplequeue_new_impl(PyTypeObject *type)
86122 simplequeueobject * self ;
87123
88124 self = (simplequeueobject * ) type -> tp_alloc (type , 0 );
89- if (self != NULL ) {
90- self -> weakreflist = NULL ;
91- self -> lst = PyList_New (0 );
92- self -> lock = PyThread_allocate_lock ();
93- self -> lst_pos = 0 ;
94- if (self -> lock == NULL ) {
95- Py_DECREF (self );
96- PyErr_SetString (PyExc_MemoryError , "can't allocate lock" );
97- return NULL ;
98- }
99- if (self -> lst == NULL ) {
100- Py_DECREF (self );
101- return NULL ;
102- }
125+ if (!self ) {
126+ return NULL ;
103127 }
104128
129+ self -> weakreflist = NULL ;
130+ self -> buffer_size = 8 ;
131+ self -> data = PyMem_Malloc (self -> buffer_size * sizeof (PyObject * ));
132+ if (self -> data == NULL ) {
133+ Py_DECREF (self );
134+ return NULL ;
135+ }
136+ memset (self -> data , 0 , self -> buffer_size * sizeof (PyObject * ));
105137 return (PyObject * ) self ;
106138}
107139
140+ static int
141+ simplequeue_grow (simplequeueobject * self )
142+ {
143+ Py_ssize_t new_buffer_size = Py_MAX (8 , self -> buffer_size * 2 );
144+ PyObject * * new_data = PyMem_Malloc (new_buffer_size * sizeof (PyObject * ));
145+ if (!new_data ) {
146+ return -1 ;
147+ }
148+
149+ /* Copy the contiguous "tail" of the old buffer to the beginning
150+ * of the new buffer. */
151+ Py_ssize_t tail_size = self -> buffer_size - self -> get_index ;
152+ if (tail_size > 0 ) {
153+ memcpy (new_data , self -> data + self -> get_index , tail_size * sizeof (PyObject * ));
154+ }
155+
156+ /* Next copy any elements that wrapped around the old buffer */
157+ Py_ssize_t remaining = self -> count - tail_size ;
158+ if (remaining > 0 ) {
159+ memcpy (new_data + tail_size , self -> data , remaining * sizeof (PyObject * ));
160+ }
161+
162+ PyMem_Free (self -> data );
163+ self -> data = new_data ;
164+ self -> buffer_size = new_buffer_size ;
165+ self -> get_index = 0 ;
166+ self -> put_index = self -> count ;
167+ return 0 ;
168+ }
169+
108170/*[clinic input]
109171_queue.SimpleQueue.put
110172 item: object
@@ -123,15 +185,41 @@ _queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item,
123185 int block , PyObject * timeout )
124186/*[clinic end generated code: output=4333136e88f90d8b input=6e601fa707a782d5]*/
125187{
126- /* BEGIN GIL-protected critical section */
127- if (PyList_Append (self -> lst , item ) < 0 )
128- return NULL ;
129- if (self -> locked ) {
130- /* A get() may be waiting, wake it up */
131- self -> locked = 0 ;
132- PyThread_release_lock (self -> lock );
188+ _PyMutex_lock (& self -> mutex );
189+
190+ int handoff = 0 ;
191+ if (self -> waiting ) {
192+ int more_waiters ;
193+ struct wait_entry * waiter ;
194+ PyObject * * objptr ;
195+
196+ /* If there is a waiter, handoff the item directly */
197+ objptr = _PyParkingLot_BeginUnpark (& self -> waiting , & waiter , & more_waiters );
198+ if (objptr ) {
199+ Py_INCREF (item );
200+ * objptr = item ;
201+ handoff = 1 ;
202+ }
203+ self -> waiting = more_waiters ;
204+ _PyParkingLot_FinishUnpark (& self -> waiting , waiter );
133205 }
134- /* END GIL-protected critical section */
206+
207+ if (!handoff ) {
208+ /* If we didn't handoff the item, add it to the queue */
209+ if (self -> count == self -> buffer_size && simplequeue_grow (self ) < 0 ) {
210+ _PyMutex_unlock (& self -> mutex );
211+ return NULL ;
212+ }
213+ Py_INCREF (item );
214+ self -> data [self -> put_index ] = item ;
215+ self -> put_index ++ ;
216+ self -> count ++ ;
217+ if (self -> put_index == self -> buffer_size ) {
218+ self -> put_index = 0 ;
219+ }
220+ }
221+
222+ _PyMutex_unlock (& self -> mutex );
135223 Py_RETURN_NONE ;
136224}
137225
@@ -154,30 +242,12 @@ _queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item)
154242}
155243
156244static PyObject *
157- simplequeue_pop_item ( simplequeueobject * self )
245+ empty_error ( PyTypeObject * cls )
158246{
159- Py_ssize_t count , n ;
160- PyObject * item ;
161-
162- n = PyList_GET_SIZE (self -> lst );
163- assert (self -> lst_pos < n );
164-
165- item = PyList_GET_ITEM (self -> lst , self -> lst_pos );
166- Py_INCREF (Py_None );
167- PyList_SET_ITEM (self -> lst , self -> lst_pos , Py_None );
168- self -> lst_pos += 1 ;
169- count = n - self -> lst_pos ;
170- if (self -> lst_pos > count ) {
171- /* The list is more than 50% empty, reclaim space at the beginning */
172- if (PyList_SetSlice (self -> lst , 0 , self -> lst_pos , NULL )) {
173- /* Undo pop */
174- self -> lst_pos -= 1 ;
175- PyList_SET_ITEM (self -> lst , self -> lst_pos , item );
176- return NULL ;
177- }
178- self -> lst_pos = 0 ;
179- }
180- return item ;
247+ PyObject * module = PyType_GetModule (cls );
248+ simplequeue_state * state = simplequeue_get_state (module );
249+ PyErr_SetNone (state -> EmptyError );
250+ return NULL ;
181251}
182252
183253/*[clinic input]
@@ -206,16 +276,8 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
206276/*[clinic end generated code: output=5c2cca914cd1e55b input=5b4047bfbc645ec1]*/
207277{
208278 _PyTime_t endtime = 0 ;
209- _PyTime_t timeout ;
210- PyObject * item ;
211- PyLockStatus r ;
212- PY_TIMEOUT_T microseconds ;
213-
214- if (block == 0 ) {
215- /* Non-blocking */
216- microseconds = 0 ;
217- }
218- else if (timeout_obj != Py_None ) {
279+ if (block != 0 && timeout_obj != Py_None ) {
280+ _PyTime_t timeout ;
219281 /* With timeout */
220282 if (_PyTime_FromSecondsObject (& timeout ,
221283 timeout_obj , _PyTime_ROUND_CEILING ) < 0 ) {
@@ -226,7 +288,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
226288 "'timeout' must be a non-negative number" );
227289 return NULL ;
228290 }
229- microseconds = _PyTime_AsMicroseconds (timeout ,
291+ PY_TIMEOUT_T microseconds = _PyTime_AsMicroseconds (timeout ,
230292 _PyTime_ROUND_CEILING );
231293 if (microseconds > PY_TIMEOUT_MAX ) {
232294 PyErr_SetString (PyExc_OverflowError ,
@@ -235,54 +297,55 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
235297 }
236298 endtime = _PyDeadline_Init (timeout );
237299 }
238- else {
239- /* Infinitely blocking */
240- microseconds = -1 ;
241- }
242300
243- /* put() signals the queue to be non-empty by releasing the lock.
244- * So we simply try to acquire the lock in a loop, until the condition
245- * (queue non-empty) becomes true.
246- */
247- while (self -> lst_pos == PyList_GET_SIZE (self -> lst )) {
248- /* First a simple non-blocking try without releasing the GIL */
249- r = PyThread_acquire_lock_timed (self -> lock , 0 , 0 );
250- if (r == PY_LOCK_FAILURE && microseconds != 0 ) {
251- Py_BEGIN_ALLOW_THREADS
252- r = PyThread_acquire_lock_timed (self -> lock , microseconds , 1 );
253- Py_END_ALLOW_THREADS
301+ for (;;) {
302+ PyObject * item = NULL ;
303+
304+ _PyMutex_lock (& self -> mutex );
305+ if (self -> count > 0 ) {
306+ item = self -> data [self -> get_index ];
307+ self -> data [self -> get_index ] = NULL ;
308+
309+ self -> count -- ;
310+ self -> get_index ++ ;
311+ if (self -> get_index == self -> buffer_size ) {
312+ self -> get_index = 0 ;
313+ }
314+ }
315+ else {
316+ _Py_atomic_store_uintptr_relaxed (& self -> waiting , 1 );
254317 }
318+ _PyMutex_unlock (& self -> mutex );
255319
256- if (r == PY_LOCK_INTR && Py_MakePendingCalls () < 0 ) {
257- return NULL ;
320+ if (item ) {
321+ return item ;
258322 }
259- if (r == PY_LOCK_FAILURE ) {
260- PyObject * module = PyType_GetModule (cls );
261- simplequeue_state * state = simplequeue_get_state (module );
262- /* Timed out */
263- PyErr_SetNone (state -> EmptyError );
264- return NULL ;
323+
324+ if (!block ) {
325+ return empty_error (cls );
265326 }
266- self -> locked = 1 ;
267327
268- /* Adjust timeout for next iteration (if any) */
269- if (microseconds > 0 ) {
270- timeout = _PyDeadline_Get (endtime );
271- microseconds = _PyTime_AsMicroseconds (timeout ,
272- _PyTime_ROUND_CEILING );
328+ int64_t timeout_ns = -1 ;
329+ if (endtime != 0 ) {
330+ timeout_ns = _PyDeadline_Get (endtime );
331+ if (timeout_ns < 0 ) {
332+ return empty_error (cls );
333+ }
273334 }
274- }
275335
276- /* BEGIN GIL-protected critical section */
277- assert (self -> lst_pos < PyList_GET_SIZE (self -> lst ));
278- item = simplequeue_pop_item (self );
279- if (self -> locked ) {
280- PyThread_release_lock (self -> lock );
281- self -> locked = 0 ;
336+ int ret = _PyParkingLot_Park (& self -> waiting , 1 , & item , timeout_ns );
337+ if (ret == PY_PARK_OK ) {
338+ assert (item );
339+ return item ;
340+ }
341+ else if (ret == PY_PARK_INTR && Py_MakePendingCalls () < 0 ) {
342+ /* interrupted */
343+ return NULL ;
344+ }
345+ else if (ret == PY_PARK_TIMEOUT ) {
346+ return empty_error (cls );
347+ }
282348 }
283- /* END GIL-protected critical section */
284-
285- return item ;
286349}
287350
288351/*[clinic input]
@@ -315,7 +378,10 @@ static int
315378_queue_SimpleQueue_empty_impl (simplequeueobject * self )
316379/*[clinic end generated code: output=1a02a1b87c0ef838 input=1a98431c45fd66f9]*/
317380{
318- return self -> lst_pos == PyList_GET_SIZE (self -> lst );
381+ _PyMutex_lock (& self -> mutex );
382+ int empty = self -> count == 0 ;
383+ _PyMutex_unlock (& self -> mutex );
384+ return empty ;
319385}
320386
321387/*[clinic input]
@@ -328,7 +394,10 @@ static Py_ssize_t
328394_queue_SimpleQueue_qsize_impl (simplequeueobject * self )
329395/*[clinic end generated code: output=f9dcd9d0a90e121e input=7a74852b407868a1]*/
330396{
331- return PyList_GET_SIZE (self -> lst ) - self -> lst_pos ;
397+ _PyMutex_lock (& self -> mutex );
398+ Py_ssize_t qsize = self -> count ;
399+ _PyMutex_unlock (& self -> mutex );
400+ return qsize ;
332401}
333402
334403static int
0 commit comments