import asyncio import random GLOBAL_TIMEOUT = 2.0 PUT_DELAY = 0.5 GET_TIMEOUT = 0.5 counter = 1 def producer(queue): global counter delay = PUT_DELAY while True: print("put %s" % counter) yield from queue.put(counter) counter += 1 yield from asyncio.sleep(delay) delay *= 1.1 def reader(queue): previous = None while True: coro = asyncio.async(queue.get()) reader = asyncio.async(asyncio.wait_for(coro, GET_TIMEOUT)) try: item = (yield from reader) except asyncio.TimeoutError: reader.cancel() continue else: if previous is not None: if item != previous + 1: print("**** lost %s items!!! ***" % (item - (previous+1))) previous = item print("get %s" % item) def main(): queue = asyncio.Queue(maxsize=10) tasks = [producer(queue), reader(queue)] tasks = [asyncio.async(asyncio.wait_for(task, timeout=GLOBAL_TIMEOUT)) for task in tasks] yield from asyncio.wait(tasks) for task in tasks: task.exception() loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) except asyncio.TimeoutError: print("exit") loop.close()