import asyncio import unittest loop = asyncio.get_event_loop() global_future = loop.create_future() closing = loop.create_future() async def push_errors_to_main_task(exc): global_future.set_exception(exc) print("Error pushed to main task. Backoff for some time...") try: await asyncio.sleep(0.2, loop=loop) except BaseException as exc: print("Sleep failed", repr(exc)) raise @asyncio.coroutine def coordination(): while not closing.done(): print("Coordinate") yield from asyncio.sleep(1, loop=loop) try: raise ValueError() except Exception as exc: print("Error occured") yield from push_errors_to_main_task(exc) continue class TestOne(unittest.TestCase): async def main(self): # Create background task that is suspendable in case of error task = loop.create_task(coordination()) with self.assertRaises(ValueError): await global_future # Teardown like print("Graceful stop") closing.set_result(None) await task print("Finished") def test_one(self): loop.run_until_complete(self.main()) if __name__ == "__main__": unittest.main()