Skip to content
This repository was archived by the owner on Nov 23, 2017. It is now read-only.

Commit 1dd213e

Browse files
committed
Merge PR #256: fix issue23812 of queues loosing items on cancellation
Patch by @gjcarneiro.
1 parent 5d71b68 commit 1dd213e

File tree

2 files changed

+98
-10
lines changed

2 files changed

+98
-10
lines changed

‎asyncio/queues.py‎

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def __init__(self, maxsize=0, *, loop=None):
4747

4848
# Futures.
4949
self._getters = collections.deque()
50-
# Pairs of (item, Future).
50+
# Futures
5151
self._putters = collections.deque()
5252
self._unfinished_tasks = 0
5353
self._finished = locks.Event(loop=self._loop)
@@ -98,7 +98,7 @@ def _consume_done_getters(self):
9898

9999
def _consume_done_putters(self):
100100
# Delete waiters at the head of the put() queue who've timed out.
101-
while self._putters and self._putters[0][1].done():
101+
while self._putters and self._putters[0].done():
102102
self._putters.popleft()
103103

104104
def qsize(self):
@@ -148,8 +148,9 @@ def put(self, item):
148148
elif self._maxsize > 0 and self._maxsize <= self.qsize():
149149
waiter = futures.Future(loop=self._loop)
150150

151-
self._putters.append((item, waiter))
151+
self._putters.append(waiter)
152152
yield from waiter
153+
self._put(item)
153154

154155
else:
155156
self.__put_internal(item)
@@ -186,8 +187,7 @@ def get(self):
186187
self._consume_done_putters()
187188
if self._putters:
188189
assert self.full(), 'queue not full, why are putters waiting?'
189-
item, putter = self._putters.popleft()
190-
self.__put_internal(item)
190+
putter = self._putters.popleft()
191191

192192
# When a getter runs and frees up a slot so this putter can
193193
# run, we need to defer the put for a tick to ensure that
@@ -201,9 +201,39 @@ def get(self):
201201
return self._get()
202202
else:
203203
waiter = futures.Future(loop=self._loop)
204-
205204
self._getters.append(waiter)
206-
return (yield from waiter)
205+
try:
206+
return (yield from waiter)
207+
except futures.CancelledError:
208+
# if we get CancelledError, it means someone cancelled this
209+
# get() coroutine. But there is a chance that the waiter
210+
# already is ready and contains an item that has just been
211+
# removed from the queue. In this case, we need to put the item
212+
# back into the front of the queue. This get() must either
213+
# succeed without fault or, if it gets cancelled, it must be as
214+
# if it never happened.
215+
if waiter.done():
216+
self._put_it_back(waiter.result())
217+
raise
218+
219+
def _put_it_back(self, item):
220+
"""
221+
This is called when we have a waiter to get() an item and this waiter
222+
gets cancelled. In this case, we put the item back: wake up another
223+
waiter or put it in the _queue.
224+
"""
225+
self._consume_done_getters()
226+
if self._getters:
227+
assert not self._queue, (
228+
'queue non-empty, why are getters waiting?')
229+
230+
getter = self._getters.popleft()
231+
self._put_internal(item)
232+
233+
# getter cannot be cancelled, we just removed done getters
234+
getter.set_result(item)
235+
else:
236+
self._queue.appendleft(item)
207237

208238
def get_nowait(self):
209239
"""Remove and return an item from the queue.
@@ -213,8 +243,7 @@ def get_nowait(self):
213243
self._consume_done_putters()
214244
if self._putters:
215245
assert self.full(), 'queue not full, why are putters waiting?'
216-
item, putter = self._putters.popleft()
217-
self.__put_internal(item)
246+
putter = self._putters.popleft()
218247
# Wake putter on next tick.
219248

220249
# getter cannot be cancelled, we just removed done putters

‎tests/test_queues.py‎

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ def test_get_with_putters(self):
171171
q.put_nowait(1)
172172

173173
waiter = asyncio.Future(loop=self.loop)
174-
q._putters.append((2, waiter))
174+
q._putters.append(waiter)
175175

176176
res = self.loop.run_until_complete(q.get())
177177
self.assertEqual(1, res)
@@ -322,6 +322,64 @@ def test_nonblocking_put(self):
322322
q.put_nowait(1)
323323
self.assertEqual(1, q.get_nowait())
324324

325+
def test_get_cancel_drop(self):
326+
def gen():
327+
yield 0.01
328+
yield 0.1
329+
330+
loop = self.new_test_loop(gen)
331+
332+
q = asyncio.Queue(loop=loop)
333+
334+
reader = loop.create_task(q.get())
335+
336+
loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
337+
338+
q.put_nowait(1)
339+
q.put_nowait(2)
340+
reader.cancel()
341+
342+
try:
343+
loop.run_until_complete(reader)
344+
except asyncio.CancelledError:
345+
# try again
346+
reader = loop.create_task(q.get())
347+
loop.run_until_complete(reader)
348+
349+
result = reader.result()
350+
# if we get 2, it means 1 got dropped!
351+
self.assertEqual(1, result)
352+
353+
def test_put_cancel_drop(self):
354+
355+
def gen():
356+
yield 0.01
357+
yield 0.1
358+
359+
loop = self.new_test_loop(gen)
360+
q = asyncio.Queue(1, loop=loop)
361+
362+
q.put_nowait(1)
363+
364+
# putting a second item in the queue has to block (qsize=1)
365+
writer = loop.create_task(q.put(2))
366+
loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
367+
368+
value1 = q.get_nowait()
369+
self.assertEqual(value1, 1)
370+
371+
writer.cancel()
372+
try:
373+
loop.run_until_complete(writer)
374+
except asyncio.CancelledError:
375+
# try again
376+
writer = loop.create_task(q.put(2))
377+
loop.run_until_complete(writer)
378+
379+
value2 = q.get_nowait()
380+
self.assertEqual(value2, 2)
381+
self.assertEqual(q.qsize(), 0)
382+
325383
def test_nonblocking_put_exception(self):
326384
q = asyncio.Queue(maxsize=1, loop=self.loop)
327385
q.put_nowait(1)
@@ -374,6 +432,7 @@ def test_put_cancelled_race(self):
374432
test_utils.run_briefly(self.loop)
375433
self.assertTrue(put_c.done())
376434
self.assertEqual(q.get_nowait(), 'a')
435+
test_utils.run_briefly(self.loop)
377436
self.assertEqual(q.get_nowait(), 'b')
378437

379438
self.loop.run_until_complete(put_b)

0 commit comments

Comments
 (0)