Skip to content

Commit 7e60a01

Browse files
committed
queue: make SimpleQueue thread-safe
This uses _PyMutex and the PyParkingLot functions to implement a thread-safe unbouneded MPMC queue.
1 parent 4450445 commit 7e60a01

1 file changed

Lines changed: 177 additions & 108 deletions

File tree

‎Modules/_queuemodule.c‎

Lines changed: 177 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "Python.h"
66
#include "pycore_moduleobject.h" // _PyModule_GetState()
77
#include "structmember.h" // PyMemberDef
8+
#include "parking_lot.h"
89
#include <stddef.h> // offsetof()
910

1011
typedef struct {
@@ -25,10 +26,19 @@ static struct PyModuleDef queuemodule;
2526

2627
typedef struct {
2728
PyObject_HEAD
28-
PyThread_type_lock lock;
29-
int locked;
30-
PyObject *lst;
31-
Py_ssize_t lst_pos;
29+
/* protects all operations on queue */
30+
_PyMutex mutex;
31+
/* number of items in queue */
32+
Py_ssize_t count;
33+
/* offset of where to put next item */
34+
Py_ssize_t put_index;
35+
/* offset of where to take next item */
36+
Py_ssize_t get_index;
37+
/* size of data buffer */
38+
Py_ssize_t buffer_size;
39+
/* array of items with length buffer_size */
40+
PyObject **data;
41+
uintptr_t waiting;
3242
PyObject *weakreflist;
3343
} simplequeueobject;
3444

@@ -41,7 +51,27 @@ class _queue.SimpleQueue "simplequeueobject *" "simplequeue_get_state_by_type(ty
4151
static int
4252
simplequeue_clear(simplequeueobject *self)
4353
{
44-
Py_CLEAR(self->lst);
54+
if (self->data) {
55+
PyObject **data = self->data;
56+
Py_ssize_t n = self->count;
57+
Py_ssize_t idx = self->get_index;
58+
Py_ssize_t buffer_size = self->buffer_size;
59+
60+
self->data = NULL;
61+
self->count = 0;
62+
self->put_index = 0;
63+
self->get_index = 0;
64+
self->buffer_size = 0;
65+
66+
for (; n != 0 ; n--) {
67+
Py_DECREF(data[idx]);
68+
idx++;
69+
if (idx == buffer_size) {
70+
idx = 0;
71+
}
72+
}
73+
PyMem_Free(self->data);
74+
}
4575
return 0;
4676
}
4777

@@ -51,11 +81,8 @@ simplequeue_dealloc(simplequeueobject *self)
5181
PyTypeObject *tp = Py_TYPE(self);
5282

5383
PyObject_GC_UnTrack(self);
54-
if (self->lock != NULL) {
55-
/* Unlock the lock so it's safe to free it */
56-
if (self->locked > 0)
57-
PyThread_release_lock(self->lock);
58-
PyThread_free_lock(self->lock);
84+
if (_PyMutex_is_locked(&self->mutex)) {
85+
Py_FatalError("SimpleQueue: dealloc with locked queue");
5986
}
6087
(void)simplequeue_clear(self);
6188
if (self->weakreflist != NULL)
@@ -67,7 +94,16 @@ simplequeue_dealloc(simplequeueobject *self)
6794
static int
6895
simplequeue_traverse(simplequeueobject *self, visitproc visit, void *arg)
6996
{
70-
Py_VISIT(self->lst);
97+
PyObject **data = self->data;
98+
Py_ssize_t n = self->count;
99+
Py_ssize_t idx = self->get_index;
100+
for (; n != 0 ; n--) {
101+
Py_VISIT(data[idx]);
102+
idx++;
103+
if (idx == self->buffer_size) {
104+
idx = 0;
105+
}
106+
}
71107
Py_VISIT(Py_TYPE(self));
72108
return 0;
73109
}
@@ -86,25 +122,51 @@ simplequeue_new_impl(PyTypeObject *type)
86122
simplequeueobject *self;
87123

88124
self = (simplequeueobject *) type->tp_alloc(type, 0);
89-
if (self != NULL) {
90-
self->weakreflist = NULL;
91-
self->lst = PyList_New(0);
92-
self->lock = PyThread_allocate_lock();
93-
self->lst_pos = 0;
94-
if (self->lock == NULL) {
95-
Py_DECREF(self);
96-
PyErr_SetString(PyExc_MemoryError, "can't allocate lock");
97-
return NULL;
98-
}
99-
if (self->lst == NULL) {
100-
Py_DECREF(self);
101-
return NULL;
102-
}
125+
if (!self) {
126+
return NULL;
103127
}
104128

129+
self->weakreflist = NULL;
130+
self->buffer_size = 8;
131+
self->data = PyMem_Malloc(self->buffer_size * sizeof(PyObject*));
132+
if (self->data == NULL) {
133+
Py_DECREF(self);
134+
return NULL;
135+
}
136+
memset(self->data, 0, self->buffer_size * sizeof(PyObject*));
105137
return (PyObject *) self;
106138
}
107139

140+
static int
141+
simplequeue_grow(simplequeueobject *self)
142+
{
143+
Py_ssize_t new_buffer_size = Py_MAX(8, self->buffer_size * 2);
144+
PyObject **new_data = PyMem_Malloc(new_buffer_size * sizeof(PyObject*));
145+
if (!new_data) {
146+
return -1;
147+
}
148+
149+
/* Copy the contiguous "tail" of the old buffer to the beginning
150+
* of the new buffer. */
151+
Py_ssize_t tail_size = self->buffer_size - self->get_index;
152+
if (tail_size > 0) {
153+
memcpy(new_data, self->data + self->get_index, tail_size * sizeof(PyObject*));
154+
}
155+
156+
/* Next copy any elements that wrapped around the old buffer */
157+
Py_ssize_t remaining = self->count - tail_size;
158+
if (remaining > 0) {
159+
memcpy(new_data + tail_size, self->data, remaining * sizeof(PyObject*));
160+
}
161+
162+
PyMem_Free(self->data);
163+
self->data = new_data;
164+
self->buffer_size = new_buffer_size;
165+
self->get_index = 0;
166+
self->put_index = self->count;
167+
return 0;
168+
}
169+
108170
/*[clinic input]
109171
_queue.SimpleQueue.put
110172
item: object
@@ -123,15 +185,41 @@ _queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item,
123185
int block, PyObject *timeout)
124186
/*[clinic end generated code: output=4333136e88f90d8b input=6e601fa707a782d5]*/
125187
{
126-
/* BEGIN GIL-protected critical section */
127-
if (PyList_Append(self->lst, item) < 0)
128-
return NULL;
129-
if (self->locked) {
130-
/* A get() may be waiting, wake it up */
131-
self->locked = 0;
132-
PyThread_release_lock(self->lock);
188+
_PyMutex_lock(&self->mutex);
189+
190+
int handoff = 0;
191+
if (self->waiting) {
192+
int more_waiters;
193+
struct wait_entry *waiter;
194+
PyObject **objptr;
195+
196+
/* If there is a waiter, handoff the item directly */
197+
objptr = _PyParkingLot_BeginUnpark(&self->waiting, &waiter, &more_waiters);
198+
if (objptr) {
199+
Py_INCREF(item);
200+
*objptr = item;
201+
handoff = 1;
202+
}
203+
self->waiting = more_waiters;
204+
_PyParkingLot_FinishUnpark(&self->waiting, waiter);
133205
}
134-
/* END GIL-protected critical section */
206+
207+
if (!handoff) {
208+
/* If we didn't handoff the item, add it to the queue */
209+
if (self->count == self->buffer_size && simplequeue_grow(self) < 0) {
210+
_PyMutex_unlock(&self->mutex);
211+
return NULL;
212+
}
213+
Py_INCREF(item);
214+
self->data[self->put_index] = item;
215+
self->put_index++;
216+
self->count++;
217+
if (self->put_index == self->buffer_size) {
218+
self->put_index = 0;
219+
}
220+
}
221+
222+
_PyMutex_unlock(&self->mutex);
135223
Py_RETURN_NONE;
136224
}
137225

@@ -154,30 +242,12 @@ _queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item)
154242
}
155243

156244
static PyObject *
157-
simplequeue_pop_item(simplequeueobject *self)
245+
empty_error(PyTypeObject *cls)
158246
{
159-
Py_ssize_t count, n;
160-
PyObject *item;
161-
162-
n = PyList_GET_SIZE(self->lst);
163-
assert(self->lst_pos < n);
164-
165-
item = PyList_GET_ITEM(self->lst, self->lst_pos);
166-
Py_INCREF(Py_None);
167-
PyList_SET_ITEM(self->lst, self->lst_pos, Py_None);
168-
self->lst_pos += 1;
169-
count = n - self->lst_pos;
170-
if (self->lst_pos > count) {
171-
/* The list is more than 50% empty, reclaim space at the beginning */
172-
if (PyList_SetSlice(self->lst, 0, self->lst_pos, NULL)) {
173-
/* Undo pop */
174-
self->lst_pos -= 1;
175-
PyList_SET_ITEM(self->lst, self->lst_pos, item);
176-
return NULL;
177-
}
178-
self->lst_pos = 0;
179-
}
180-
return item;
247+
PyObject *module = PyType_GetModule(cls);
248+
simplequeue_state *state = simplequeue_get_state(module);
249+
PyErr_SetNone(state->EmptyError);
250+
return NULL;
181251
}
182252

183253
/*[clinic input]
@@ -206,16 +276,8 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
206276
/*[clinic end generated code: output=5c2cca914cd1e55b input=5b4047bfbc645ec1]*/
207277
{
208278
_PyTime_t endtime = 0;
209-
_PyTime_t timeout;
210-
PyObject *item;
211-
PyLockStatus r;
212-
PY_TIMEOUT_T microseconds;
213-
214-
if (block == 0) {
215-
/* Non-blocking */
216-
microseconds = 0;
217-
}
218-
else if (timeout_obj != Py_None) {
279+
if (block != 0 && timeout_obj != Py_None) {
280+
_PyTime_t timeout;
219281
/* With timeout */
220282
if (_PyTime_FromSecondsObject(&timeout,
221283
timeout_obj, _PyTime_ROUND_CEILING) < 0) {
@@ -226,7 +288,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
226288
"'timeout' must be a non-negative number");
227289
return NULL;
228290
}
229-
microseconds = _PyTime_AsMicroseconds(timeout,
291+
PY_TIMEOUT_T microseconds = _PyTime_AsMicroseconds(timeout,
230292
_PyTime_ROUND_CEILING);
231293
if (microseconds > PY_TIMEOUT_MAX) {
232294
PyErr_SetString(PyExc_OverflowError,
@@ -235,54 +297,55 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
235297
}
236298
endtime = _PyDeadline_Init(timeout);
237299
}
238-
else {
239-
/* Infinitely blocking */
240-
microseconds = -1;
241-
}
242300

243-
/* put() signals the queue to be non-empty by releasing the lock.
244-
* So we simply try to acquire the lock in a loop, until the condition
245-
* (queue non-empty) becomes true.
246-
*/
247-
while (self->lst_pos == PyList_GET_SIZE(self->lst)) {
248-
/* First a simple non-blocking try without releasing the GIL */
249-
r = PyThread_acquire_lock_timed(self->lock, 0, 0);
250-
if (r == PY_LOCK_FAILURE && microseconds != 0) {
251-
Py_BEGIN_ALLOW_THREADS
252-
r = PyThread_acquire_lock_timed(self->lock, microseconds, 1);
253-
Py_END_ALLOW_THREADS
301+
for (;;) {
302+
PyObject *item = NULL;
303+
304+
_PyMutex_lock(&self->mutex);
305+
if (self->count > 0) {
306+
item = self->data[self->get_index];
307+
self->data[self->get_index] = NULL;
308+
309+
self->count--;
310+
self->get_index++;
311+
if (self->get_index == self->buffer_size) {
312+
self->get_index = 0;
313+
}
314+
}
315+
else {
316+
_Py_atomic_store_uintptr_relaxed(&self->waiting, 1);
254317
}
318+
_PyMutex_unlock(&self->mutex);
255319

256-
if (r == PY_LOCK_INTR && Py_MakePendingCalls() < 0) {
257-
return NULL;
320+
if (item) {
321+
return item;
258322
}
259-
if (r == PY_LOCK_FAILURE) {
260-
PyObject *module = PyType_GetModule(cls);
261-
simplequeue_state *state = simplequeue_get_state(module);
262-
/* Timed out */
263-
PyErr_SetNone(state->EmptyError);
264-
return NULL;
323+
324+
if (!block) {
325+
return empty_error(cls);
265326
}
266-
self->locked = 1;
267327

268-
/* Adjust timeout for next iteration (if any) */
269-
if (microseconds > 0) {
270-
timeout = _PyDeadline_Get(endtime);
271-
microseconds = _PyTime_AsMicroseconds(timeout,
272-
_PyTime_ROUND_CEILING);
328+
int64_t timeout_ns = -1;
329+
if (endtime != 0) {
330+
timeout_ns = _PyDeadline_Get(endtime);
331+
if (timeout_ns < 0) {
332+
return empty_error(cls);
333+
}
273334
}
274-
}
275335

276-
/* BEGIN GIL-protected critical section */
277-
assert(self->lst_pos < PyList_GET_SIZE(self->lst));
278-
item = simplequeue_pop_item(self);
279-
if (self->locked) {
280-
PyThread_release_lock(self->lock);
281-
self->locked = 0;
336+
int ret = _PyParkingLot_Park(&self->waiting, 1, &item, timeout_ns);
337+
if (ret == PY_PARK_OK) {
338+
assert(item);
339+
return item;
340+
}
341+
else if (ret == PY_PARK_INTR && Py_MakePendingCalls() < 0) {
342+
/* interrupted */
343+
return NULL;
344+
}
345+
else if (ret == PY_PARK_TIMEOUT) {
346+
return empty_error(cls);
347+
}
282348
}
283-
/* END GIL-protected critical section */
284-
285-
return item;
286349
}
287350

288351
/*[clinic input]
@@ -315,7 +378,10 @@ static int
315378
_queue_SimpleQueue_empty_impl(simplequeueobject *self)
316379
/*[clinic end generated code: output=1a02a1b87c0ef838 input=1a98431c45fd66f9]*/
317380
{
318-
return self->lst_pos == PyList_GET_SIZE(self->lst);
381+
_PyMutex_lock(&self->mutex);
382+
int empty = self->count == 0;
383+
_PyMutex_unlock(&self->mutex);
384+
return empty;
319385
}
320386

321387
/*[clinic input]
@@ -328,7 +394,10 @@ static Py_ssize_t
328394
_queue_SimpleQueue_qsize_impl(simplequeueobject *self)
329395
/*[clinic end generated code: output=f9dcd9d0a90e121e input=7a74852b407868a1]*/
330396
{
331-
return PyList_GET_SIZE(self->lst) - self->lst_pos;
397+
_PyMutex_lock(&self->mutex);
398+
Py_ssize_t qsize = self->count;
399+
_PyMutex_unlock(&self->mutex);
400+
return qsize;
332401
}
333402

334403
static int

0 commit comments

Comments
 (0)