Parallel Computing with Python¶
Rodrigo Nemmen, IAG USP
This IPython notebook illustrates a few simple ways of doing parallel computing.
Practical examples included:
- Parallel function mapping to a list of arguments (multiprocessing module)
- Parallel execution of array function (scatter/gather) + parallel execution of scripts
- Easy parallel Monte Carlo (parallel magics)
import numpy as np
1. Mapping a model to a grid of parameters¶
Uses the multiprocess module. Note: We use multiprocess instead of multiprocessing because even though the latter comes by default with python it is known to be problematic with Jupyter.
Idea: you have a function $f(\mathbf{x},\mathbf{y})$ of two parameters (e.g., $f$ may represent your model) stored in the arrays $(\mathbf{x},\mathbf{y})$. Given the arrays $\mathbf{x}$ and $\mathbf{y}$, you want to compute the values of $f(\mathbf{x},\mathbf{y})$. Let's assume for simplicity that there is no dependence on the neighbours. This is an embarassingly parallel problem.
import multiprocess as mp
Time wasting function that depends on two parameters. Here, I generate 1E5 random numbers based on the normal distribution and then sum them. The two parameters are $\mu$ and $\sigma$.
def f(z):
x=np.random.normal(z[0], z[1], 100000)
return x.sum()
Arrays of input parameters. You could easily modify this to take as input a matrix, not two arrays.
n=3000
X=np.linspace(-1,1,n) # mean
Y=np.linspace(0.1,1,n) # std. dev.
# creates list of arguments [Xi, Yi]
pargs=[] # this is a list of lists!
for i in range(X.size):
pargs.append([X[i],Y[i]])
Parallel execution. Check out all the cores being used with a tool like htop.
ncores=mp.cpu_count() # number of cores
pool = mp.Pool(processes=ncores) # initializes parallel engine
%%time
t=pool.map(f, pargs) # parallel function map
CPU times: user 2.45 s, sys: 28.6 ms, total: 2.48 s Wall time: 29.5 s
pool.close() # close the parallel engine
Serial execution
%time t=map(f, pargs)
CPU times: user 636 µs, sys: 3 µs, total: 639 µs Wall time: 645 µs
If you want to convert the list to an array use y=np.array(t). Notes: (1) there is a map method for ipyparallel. (2) the example I picked is not the best one because numpy is highly optimized hence faster than doing in parallel where communication between processes will kill the performance in this case.
2. Parallel execution of array function¶
Uses ipyparallel. Consider a function $f(x)$ which takes an array $x$ containing the grid of input parameters. We want to split the function calls ("split the array") to the different cores in our machine:

Start parallel engine¶
Make sure you have ipyparallel installed:
conda install ipyparallel
Now, start the parallel engines. Open a terminal and use the command:
ipcluster start
Function to be parallelized¶
Our time-waster function $f(x)$ that can be applied to an array of integers
# test if n is prime
def isprime(n):
for i in range(3, n):
if n % i == 0:
return False
return True
# tests each element of an array if it is prime
def f(x):
return list(map(isprime,x))
Generates big array (10k elements) of random integers between 0 and 100000
x=np.random.randint(low=0, high=100000, size=10000)
Tests¶
Serial execution
%time y=f(x)
CPU times: user 8.17 s, sys: 32.4 ms, total: 8.2 s Wall time: 9.74 s
Now explain how ipyparallel works (here I show a slide). See documentation at the end of the notebook for details.
import ipyparallel as ipp
cluster = ipp.Cluster(n=ncores)
cluster.start_cluster_sync()
Starting 8 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
<Cluster(cluster_id='1683573801-aucc', profile='default', controller=<running>, engine_sets=['1683573802'])>
We are going to use the direct view, which means that commands always run on all nodes. This as opposed to a balanced view, which asynchronously executes code on nodes which are idle. In addition, we are going to turn blocking on. This means that jobs will block further execution until all nodes have finished.
rc = cluster.connect_client_sync()
rc.wait_for_engines()
direct = rc[:] # use all engines
direct.block=True
Splits the input array $x$ between the cores
direct.scatter('x',x)
Verify that the array was indeed divided equally
direct['x.size']
[1250, 1250, 1250, 1250, 1250, 1250, 1250, 1250]
direct['x']
[array([94305, 72839, 17104, ..., 29346, 73755, 29269]), array([31625, 37515, 37053, ..., 76381, 32938, 13199]), array([44846, 30440, 38205, ..., 83728, 5019, 84130]), array([29578, 88280, 80813, ..., 32620, 52857, 27595])]
Let's try to apply the function in each different core
%%px
y=f(x)
[2:execute] --------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[1], line 1 ----> 1 y=f(x) NameError: name 'f' is not defined [3:execute] --------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[1], line 1 ----> 1 y=f(x) NameError: name 'f' is not defined [7:execute] --------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[1], line 1 ----> 1 y=f(x) NameError: name 'f' is not defined [0:execute] --------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[1], line 1 ----> 1 y=f(x) NameError: name 'f' is not defined [6:execute] NameError: name 'f' is not defined [4:execute] NameError: name 'f' is not defined [1:execute] NameError: name 'f' is not defined [5:execute] NameError: name 'f' is not defined
8 errors
Why the errors above? Because each core does not see the local engine. They work as separate machines and you have to load all variables and modules separately in each engine. Fortunately, that's easy.
%%file myscript.py
# test if n is prime
def isprime(n):
for i in range(3, n):
if n % i == 0:
return False
return True
# tests each element of an array if it is prime
def f(x):
return list(map(isprime,x))
Overwriting myscript.py
Execute code which defines the methods on the different engines
direct.run("myscript.py")
<AsyncResult(execute): finished>
Now compute the "model grid" correctly
%%time
%px y=f(x)
%px: 0%| | 0/8 [00:00<?, ?tasks/s]
CPU times: user 107 ms, sys: 47.7 ms, total: 155 ms Wall time: 2.62 s
Alternatively to the command above, you could use
direct.apply(f,x)
or
direct.execute('y=f(x)')
Now we have the separate arrays $y$ containing the results on each engine. How to get it back to the local engine?
%%px
import numpy
numpy.size(y)
Out[0:5]: 1250
Out[1:5]: 1250
Out[3:5]: 1250
Out[2:5]: 1250
Out[4:5]: 1250
Out[7:5]: 1250
Out[5:5]: 1250
Out[6:5]: 1250
y=direct.gather('y')
We have the array magically reassembled back in the local engine. :)
3. Easy parallel Monte Carlo¶
Suppose you need to do 100k Monte Carlo simulations. Wouldn't it be great if you could easily split them among your (hopefully many) cores?
In this example, I will perform 100k realizations of a 300x300 array of random floats.
import astropy.io.ascii as ascii
Load dataset
data=ascii.read('/Users/nemmen/Dropbox/science/projects/finished/jetpower/data/allrb.dat')
xdata=data['pb']
ydata=data['pj']
# number of desired bootstraps
nboot=200000
# number of bootstraps that will go to each core
n=int(nboot/np.size(rc.ids))
Passes variables to the engines
direct.push(dict(n=n,xdata=xdata,ydata=ydata))
%%px
import numpy as np
import nmmn.lsd, scipy
Now everything below is executed in parallel! (IPython magic)
%%time
%%px
r=np.zeros(n)
for i in range(n):
[xsim,ysim]=nmmn.lsd.bootstrap([xdata,ydata]) # data bootstrapping
r[i]=scipy.stats.pearsonr(xsim,ysim)[0] # just compute simple Pearson coefficient
%px: 0%| | 0/8 [00:00<?, ?tasks/s]
CPU times: user 118 ms, sys: 30.4 ms, total: 148 ms Wall time: 8.21 s
Assemble result from cores
r=direct.gather('r')
For comparison, how long does it take to do the same simulation in serial mode?
%%time
for i in range(nboot):
[xsim,ysim]=nmmn.lsd.bootstrap([xdata,ydata])
t=scipy.stats.pearsonr(xsim,ysim)[0]
CPU times: user 44.8 s, sys: 2.42 s, total: 47.2 s Wall time: 56.6 s