Skip to content
This repository was archived by the owner on Nov 23, 2017. It is now read-only.

Conversation

@gvanrossum
Copy link
Member

This fixes both #265 and #268 (I have tests that repro each).

The implementation no longer uses the getter and putter futures to pass items around -- instead they are used to implement a simple condition variable, without the locking (which isn't needed because asyncio guarantees that no other tasks run until you use yield from).

When a new item is added to the queue, all getters are woken up, and they all just check whether there is an item for them. Similar for putters (the code is entirely symmetrical except for the unfinished tasks management).

This may look inefficient when there are many getters (or many putters) but there is much less code, and it is easier to understand its correctness. I think that is more important given the history of this code.

I had to fix one test, because when there are multiple concurrent getters, we don't guarantee the order in which they get items. That seems reasonable.

Guido van Rossum added 2 commits September 26, 2015 07:34
Move one of the new tests to the correct class.
Fixed an old order-dependent test.
@gvanrossum
Copy link
Member Author

So here's a quick discussion of performance.

The worst-case scenario is if there are e.g. 1000 fast consumers (each just getting items and throwing them away in a loop) and one slow producer (a loop that puts one item and then yields). Here each time the producer puts an item, all consumer tasks are woken up, but only one of them gets the item. All the others find the queue empty, create another future to add to the deque of getters, and the story continues. So we would have 1000 futures created for each item that travels through the queue.

Because the code is so utterly symmetrical, a similar scenario exists with many fast producers and one slow consumer as well (using a queue size limit).

We could fix this as follows: instead of waking up all getters, we only wake up one (skipping the cancelled ones):

        while self._getters:
            getter = self._getters.popleft()
            if not getter.done():
                getter.set_result(None)
                break   # <---------- Add this

However, there is the scenario where the awoken getter is cancelled after its future was marked ready, but before the task gets to run. The yield in get() will then raise CancelledError (or TimeoutError) but its future is not marked cancelled. In this case, if there is another getter waiting, it should be woken up. We can do this by adding a copy of the above loop to get() in case yield raises:

        while self.empty():
            getter = futures.Future(loop=self._loop)
            self._getters.append(getter)
            try:
                yield from getter
            except:
                self._wake_up_one_getter()
                raise

Where _wake_up_one_getter() would be a helper method containing the same loop quoted above. Now it is possible that getter was in fact cancelled, and in that case we would wake up another getter needlessly (but harmlessly). We could prevent that as follows:

            except Exception:
                if not getter.cancelled():
                    self._wake_up_one_getter()
                raise

Is the added complexity worth it? I don't know. What are people doing with queues?

Note: you might think that this could be solved more elegantly using an actual condition variable (asyncio.Condition). It's notify() method only wakes up one waiter. Internally it still has a deque of futures containing a future for each waiter; and it doesn't seem to have easy access to the cancelled() state of the waiter's future. It also adds a lock which we don't need.

In fact, I suspect that async.Condition has the same issue we are dealing with here (and have been for the history the Queue class): it is possible that a waiter is cancelled before it can do anything useful with the information. But the promises of condition variables are a little less strict than for queues, so it probably doesn't count as a bug.

Looking over the locks code, I also noticed that async.Semaphore (which can be thought of as a queue without data) uses the approach I sketched above (waking up only one waiter at a time), but without reawakening another waiter when a waiter is cancelled after its future was marked done. So I expect it can be shown to have the same bug.

@manipopopo
Copy link

What are people doing with queues?

I use the queues as task containers. The tasks from http requests will be put into a queue, and there are a few consumers which can process at most 5 tasks from a queue at a time. There will be many pending http requests since consumers are slow.

Awaking one putter (or getter) at a time seems to be a charming implementation.

@manipopopo
Copy link

  • Is it necessary to call putter.cancel() in put (and getter.cancel() in get) ? I can't figure out under what circumstances awoken futures have _state == _PENDING.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not quite understand why the and not putter.cancelled() part. Surely, if the putter was just cancelled in the line above, then not putter.cancelled() must always be true?...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _state of an awoken putter is _FINISHED. If the task containing the awoken putter is cancelled, putter.cancelled() will still return False.

I don't understand why putter.cancelled() needs to be checked.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, I should have added a comment, because this is complicated (and we've had bugs around this issue before). The except clause is mainly there to catch CancelledException.

There are two separate scenarios: (1) the putter has not been awoken by a getter, and the putter is cancelled by its caller (perhaps because of a timeout). In this case the putter.cancel() call is a no-op and the following if does nothing. So the whole except clause is a no-op (and the exception is re-raised of course).

But there is also scenario (2) which is more complicated:

  • The putter's future is made ready by a getter (a slot freed up)
  • Before the putter's task can run in the event loop, it is cancelled or times out
  • Once the putter's task runs, it receives a CancelledError

In this case, if there is another putter waiting, we need to awaken it so it can use the slot (because we only wake up the first putter, it hasn't been woken up yet). The putter.cancel() call is still a no-op, but the following if triggers and wakes up the next putter. (And then the exception is re-raised.)

So what is the putter.cancel() call for? It's because I don't trust that I've analyzed all possible scenarios. There might be a (rare) case where we get an exception at this point and the future is not marked done. If that were to happen, the putter would still be in the self._putters deque, and just to make sure that the queue doesn't get stuck I think we should cancel the putter here so it will be ignored when a getter tries to wake up the next putter. In this case we don't need to wake up another putter, and indeed the if clause skips that (because putter is cancelled).

I'll try to think of a brief comment to add to the code.

@gjcarneiro
Copy link

Overall this is a vast simplification. +1!

@gjcarneiro
Copy link

FWIW, my use case is actually just one producer and one consumer.

@gvanrossum
Copy link
Member Author

@manipopopo -- I saw an email from you asking:

It seems that each pending waiter will be awoken only one time. Would it be more intuitive if the while loops in put and get are replaced with if statements?

The answer is that the while loop is still needed, because there may still be spurious wake-ups. One scenario: a getter's future is marked done, but before its task is run by the event loop, some other task consumes a queued item directly using get_nowait(). Then the getter that was woken up will find that the queue is empty again.

@manipopopo
Copy link

Thank you for your clear and helpful explanation!

  • Now I understand why putter.cancelled is needed to be checked!
  • I checked the code of Future and found that it registers a callback function to wake up the Task wrapping the pending future. And the callback function will be put into the ready queue of the event loop by Future._schedule_callbacks. Future._schedule_callbacks is called by cancel, set_result and set_exception, and all of these methods set Future._state to the state other than _PENDING. So I suspected that putter.cancel() might be a no-op. Maybe I should spend more time thinking over this issue more deeply.
  • Now I understand why the while loop is needed. If there are many fast consumers and a slow producer, will some consumers starve? Do we need a counter to record how many awoken waiters are there and give higher privileges to the waiters?

@gvanrossum
Copy link
Member Author

Re your second bullet: the worry is not that the task gets awoken prematurely, the worry is that some exception gets thrown into the coroutine (perhaps by Task._step()) while the future it is waiting for did not get cancelled (nor completed in another way).

Re starvation: there is no guarantee that the queue is fair. If you need to evenly or fairly distribute items across consumers you will have to do much more work. In general there is no scheduling fairness in asyncio, except that after loop.call_soon(f); loop.call_soon(g), f() will be called before g(). But there are many situations where that promise doesn't help (or not enough), and you shouldn't write code that makes such assumptions.

@gvanrossum
Copy link
Member Author

I'm just going to commit this.

@gvanrossum
Copy link
Member Author

Manually merged (squashed into a single commit).

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants