death and gravity: ProcessThreadPoolExecutor: when I/O becomes CPU-bound
So, you're doing some I/O bound stuff, in parallel.
Maybe you're scraping some websites – alotof websites.
Maybe you're updating or deleting millions of DynamoDB items.
You've got yourThreadPoolExecutor, you've increased the number of threads and tuned connection limits... but after some point,it's just not getting any faster. You look at your Python process, and you see CPU utilization hovers above 100%.
Youcouldsplit the work into batches and have aProcessPoolExecutorrun your original code in separate processes. But that requires yet more code, and a bunch of changes, which is no fun. And maybe your input is not that easy to split into batches.
If only we had an executor thatworked seamlessly across processes and threads.
Well, you're in luck, since that's exactly what we're building today!
And even better, in a couple years you won't even need it anymore.
Establishing a baseline #
To measure things, we'll use a mock that pretends to domostly I/O, with a sprinkling ofCPU-boundwork thrown in – a stand-in for something like a database connection, a Requestssession, or aDynamoDB client.
Wesleep()for the I/O, and do some math in a loop for the CPU stuff; it doesn't matter exactly how long each takes, as long I/O time dominates.
Real multi-threaded clients are usually backed by a shared connection pool, which allows for connection reuse (so you don't pay the cost of a new connection on each request) and multiplexing (so you can use the same connection for multiple concurrent requests, possible with protocols like HTTP/2 or newer). We could simulate this with asemaphore, but limiting connections is not relevant here – we're assuming the connection pool is effectively unbounded.
Since we'll use our client from multiple processes, we write an initializer function to set up a global, per-process client instance (remember, we want to share potential connection pools between threads); we can then pass the initializer to theexecutorconstructor, along with any arguments we want to pass to the client. Similarly, we do the work through a function that uses this global client.
Finally, we make a simple timing context manager:
...and put everything together in a function that measures how long it takes to do a bunch of work using aconcurrent.futuresexecutor:
Threads #
So, aThreadPoolExecutorshould suffice here, since we're mostly doing I/O, right?
More threads!
Twice the threads, twice as fast. More!
Good, it's still scaling linearly. MORE!
...more?
Problem: CPU becomes a bottleneck #
It's time we take a closer look at what our process is doing. I'd normally use thetopcommand for this, but since the flags and output vary with the operating system, we'll implement our own using the excellentpsutillibrary.
And because it's a context manager, we can use it as a timer:
So, what happens if we increase the number of threads?
With more threads, the compute part of our I/O bound workload increases, eventually becoming high enough to saturate one CPU – and due to theglobal interpreter lock,one CPU is all we can use, regardless of the number of threads.1
Processes? #
I know, let's use aProcessPoolExecutorinstead!
Hmmm... I guess itisa little bit better.
More? More!
OK, it's better, but with diminishing returns – there's no improvement after 80 processes, and even then, it's only2.2xfaster than the best time with threads, when, in theory, it should be able to make full use of all 4 CPUs.
Also, we're not making best use ofconnection pools(since we now have 80 of them, one per process), nor multiplexing (since we now have 80 connections, one per pool).
Problem: more processes, more memory #
But it gets worse!
13.8 MiB * 80 ~= 1 GiB ... that isa lotof memory.
Now, there's some nuance to be had here.
First, on most operating systems that have virtual memory,code segmentpages are shared between processes – there's no point in having 80 copies of libc or the Python interpreter in memory.
Theunique set sizeis probably a better measurement than theresident set size, since it excludes memory shared between processes.2So, for the macOS output above,3the actual usage is more like 8.5 MiB * 80 = 680 MiB.
Second, if you use theforkorforkserverstart methods, processes also share memory allocated before thefork()viacopy-on-write; for Python, this includes module code and variables. On Linux, the actual usage is 1.7 MiB * 80 = 136 MiB:
However, it's important to note that's just a lower bound; memory allocated afterfork()is not shared, and most real work will unavoidably allocate more memory.
Why not both? #
One reasonable way of dealing with this would be to split the input into batches, one per CPU, and pass them to aProcessPoolExecutor, which in turn runs the batch items using aThreadPoolExecutor.4
But that would mean we need to change our code, and that's no fun.
If only we had an executor thatworked seamlessly across processes and threads.
A minimal plausible solution #
In keeping with what hasbecometraditionby now, we'll take an iterative,problem-solutionapproach; since we'renot sure what to do yet, we start withthe simplest thing that could possibly work.
We know we want a process pool executor that starts one thread pool executor per process, so let's deal with that first.
By subclassing ProcessPoolExecutor, we get themap()implementation for free, since the original is implemented in terms ofsubmit().5By going with the defaultmax_workers, we get one process per CPU (which is what we want); we can add more arguments later if needed.
In a custom process initializer, we set up a global thread pool executor,6and then call the process initializer provided by the user:
Likewise,submit()passes the work along to the thread pool executor:
OK, that looks good enough; let's use it and see if it works:
Wait, we got it on the first try?!
Let's measure that:
Hmmm... that's unexpectedly slow... almost as if:
Ah, because_submit()waits for theresult()in the main thread of the worker process, this is just a ProcessPoolExecutor with extra steps.
But what if we send back thefutureobject instead?
Alas:
The immediate cause of the error is that the futurehas a conditionthathas a lockthat can't bepickled, becausethreadinglocks only make sense within the same process.
The deeper cause is that thefutureis not just data, but encapsulates state owned by the thread pool executor, andsharing state between processesrequires extra work.
It may not seem like it, but this is a partial success: the work happens, we just can't get the results back. Not surprising, to be honest, it couldn't have beenthateasy.
Getting results #
If you look carefully at the traceback, you'll find a hint of howProcessPoolExecutorgets its own results back from workers – a queue; themodule docstringeven has a neat data-flow diagram:
|======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ | | => | Work Ids | | | | Call Q | | Process | | | +----------+ | | +-----------+ | Pool | | | | ... | | | | ... | +---------+ | | | 6 | => | | => | 5, call() | => | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | | Executor | | Thread | | | | | +----------- + | | +-----------+ | | | | <=> | Work Items | <=> | | <= | Result Q | <= | | | | +------------+ | | +-----------+ | | | | | 6: call() | | | | ... | | | | | | future | | | | 4, result | | | | | | ... | | | | 3, except | | | +----------+ +------------+ +--------+ +-----------+ +---------+
Now, we could probably use the same queue somehow, but it would involve touching a lot of (private) internals.7Instead, let's use a separate queue:
On the worker side, we make it globally accessible:
...so we can use it from a task callback registered by_submit():
Back in the main process, we handle the results in a thread:
Finally, to stop the handler, we use None as asentinelon executor shutdown:
Let's see if it works:
Yay, the results are making it to the handler!
The error happens because instead of returning aFuture, oursubmit()returns the result of_submit(), which is always None.
Fine, we'll make our own futures #
Butsubmit()mustreturn a future, so we make our own:
In order to map results to their futures, we can use a unique identifier; theid()of the outer future should do, since it is unique for the object's lifetime.
We pass the id to_submit(), then to_put_result()as an attribute on the future, and finally back in the queue with the result:
Back in the result handler, we find the maching future, and set the result accordingly:
And it works:
I mean, itreallyworks:
3.3xis notquitethe 4 CPUs my laptop has, but it's pretty close, and much better than the 2.2x we got from processes alone.
Death becomes a problem #
I wonder what happens when a worker process dies.
For example, the initializer can fail:
...or a worker can die some time later, which we can help along with a customtimer:8
Now let's seeourexecutor:
If the dead worker is not around to send back results, its futures never get completed, andmap()keeps waiting until the end of time, when the expected behavior is to detect when this happens, and fail all pending tasks withBrokenProcessPool.
Before we do that, though, let's address a more specific issue.
Ifmap()hasn't finished submitting tasks when the worker dies,innerfails withBrokenProcessPool, which right now we're ignoring entirely. While we don't need to do anything about it in particular because it gets covered by handling the general case, we should still propagateallerrors to theoutertask anyway.
This fixes the case where a worker dies almost instantly:
For the general case, we need to check if the executor is broken – but how? We've already decided we don't want to depend on internals, so we can't useProcess
Pool
Executor.
_broken. Maybe we can submit a dummy task and see if it fails instead:
Using it is a bit involved, but not completely awful:
When there's a steady stream of results coming in, we don't want to check too often, so we enforce a minimum delay between checks. When there arenoresults coming in, we want to check regularly, so we use theQueue.get()timeout to avoid waiting forever. If the check fails, we break out of the loop and fail the pending tasks. Like so:
So, yeah, I think we're done. Here's the finalexecutorandbenchmarkcode.
Some features left as an exercise for the reader:
- providing a ThreadPoolExecutor initializer
- using otherstart methods
- shutdown()'s
cancel_futures
Learned something new today?Share this with others, it really helps!PyCoder's WeeklyHNRedditlinkedinTwitter
Want to know when new articles come out?Subscribe hereto get new stuff straight to your inbox!
Bonus: free threading #
You may have heard people being excited about the experimentalfree threadingsupport added in Python 3.13, which allows running Python code on multiple CPUs.
And for good reason:
3.6xover to theGILversion, with none of the shenanigans in this article!
Alas, packages with extensions need to be updated to support it:
...but the ecosystem isslowly catching up.
At least, all we can use for pure-Python code. I/O always releases theglobal interpreter lock, and so do some extension modules.[return]
The psutil documentation formemory_full_info()explains the difference quite nicely and links to further resources, becausegood libraries educate.[return]
You may have to run Python as root to get the USS of child processes.[return]
And no,asynciois not a solution, since the event loop runs in a single thread, so you'd still need to run one event loop per CPU in dedicated processes.[return]
We could have usedcompositioninstead, but then we'd have to implement the fullExecutorinterface, defining each method explicitly to delegate to the inner process pool executor, and keep things up to date when the interface gets new methods (and we'd have no way to trick the inner executor'smap()to use oursubmit(), so we'd have to implement it from scratch).
Yet another option would be to use both inheritanceandcomposition – inherit theExecutorbase class directly for thecommon methods(assuming they're defined there and not in subclasses), and delegate to the inner executor only where needed (likely justmap()andshutdown()). But, the only difference from the current code would be that it'd sayself._innerinstead ofsuper()in a few places, so it's not really worth it, in my opinion.[return]
A previous version of this code attempted toshutdown()the thread pool executor usingatexit, but sinceatexitfunctions runafternon-daemon threads finish, it wasn't actually doing anything. Not shutting it down seems to work for now, but we may still need do it to supportshutdown(properly.[return]
cancel_futures=
True)
Check outnilp0inter/threadedprocessfor an idea of what that looks like.[return]
pkill -fn '[Pp]ython'would've done it too, but it gets tedious if you do it a lot, and it's a different command on Windows.[return]