by Bjoern Soergel, Institute of Astronomy, Cambridge, UK

CPython (the standard Python implementation written in C that we all use) has a feature called the *Global Interpreter Lock* (GIL). In a nutshell it means that *only one Python thread can execute bytecode at a time* (because the memory management is *not* thread-safe).

">>> import that"

The Unwritten Rules of Python

- You do not talk about the GIL.
- You do NOT talk about the GIL.
- Don't even mention the GIL. No seriously.

(C) David Beazley

We'll show examples for both of them. In both cases, we'll look at *embarassingly parallel* problems, i.e. cases where we can split up the computation in completely separate tasks that do not depend on each other.

This is best suited for I/O-limited problems, as it does not release the GIL. Therefore it does not provide any speed-up for CPU-bound problems.

If we want to execute CPU-limited code in parallel, we need to circumvent the GIL (as in the Cython parallel example above). Even without Cython, the multiprocessing module provides a way of doing this.

In [ ]:

```
```

Let's begin with a simple example (partially based on http://roman-kh.github.io/numba-2/).

In [1]:

```
import numpy as np
import threading
from math import ceil
def testfunc_threading(x,S,N,res):
"""
all threads work on the same x and write to the same res, but all on different chunks
"""
for i in range(S,min(S+N, len(x))):
print(x[i])
res[i] = x[i]**2
# number of threads
T = 2
# data size
N = 8
# data
x = np.arange(N)
# array for results
r = np.zeros(N,dtype=int)
# data size for each thread
chunk_N = ceil(float(N)/T)
# starting index for each thread
chunks = [i * chunk_N for i in range(T)]
#create threads
threads = [threading.Thread(target=testfunc_threading, args=(x,chunks[i],chunk_N,r)) for i in range(T)]
#start them
for thread in threads:
thread.start()
#wait for them to finish
for thread in threads:
thread.join()
```

In [2]:

```
r
```

Out[2]:

Let's do this on a larger example to show that we are not making anything faster.

In [3]:

```
def testfunc_threading2(x,S,N,res):
for i in range(S,min(S+N, len(x))):
res[i] = x[i]**2
# number of threads
T = 2
# data size
N = int(1e7)
# data
x = np.arange(N)
# array for results
r = np.zeros(N,dtype=int)
# data size for each thread
chunk_N = ceil(float(N)/T)
# starting index for each thread
chunks = [i * chunk_N for i in range(T)]
```

In [4]:

```
%%time
#create threads
threads = [threading.Thread(target=testfunc_threading2, args=(x,chunks[i],chunk_N,r)) for i in range(T)]
#start them
for thread in threads:
thread.start()
#wait for them to finish
for thread in threads:
thread.join()
```

CPU time and Wall time are identical, indicating that we have not made anything faster.

A few things worth noting:

- If the task is I/O bound instead of CPU bound, there can indeed be a speed-up.
- If we can release the GIL when calling the worker function, it is possible to run true multi-threaded code. Both numba and Cython allow this. For Cython, see our previous exercise. For numba, see e.g. http://roman-kh.github.io/numba-2/
- There is an easier interface to
*multithreading*that works exactly like the*multiprocessing*we show below, e.g. here http://chriskiehl.com/article/parallelism-in-one-line/

Python's *multiprocessing* module spawns multiple subprocesses. All the data required by the subprocesses is serialized ('pickled') and passed to the individual subprocesses. Every subprocess works on their task independently, thus circumventing the GIL.

For easy problems this is by far the quickest way of making your code parallel. But in comes with a few disadvantages:

- Everything needs to be serialized. In some cases (e.g. class instances) this can be a problem.
- Every subprocess has its own memory and needs a copy of all required data in it. This can lead to excessive memory requirements. Therefore:
*Don't just blindly use Pool.map(), especially on shared systems like the IoA clusters!*

In [5]:

```
#this is now actual multiprocessing
from multiprocessing import Pool
def testfunc(a):
print(a)
return a*a
#create a pool of 4 subprocesses
p = Pool(4)
#sends task to subprocesses
res = p.map(testfunc, np.arange(8))
#clean-up
p.close()
print(res)
```

Let's revisit the example from above

In [6]:

```
def testfunc_mp2(x):
"""
same computation but different I/O
"""
res = np.zeros(len(x))
for i in range(len(x)):
res[i] = x[i]**2
return res
def split(a, n):
"""
split array or list along first dimension, into roughly equal sizes
"""
k, m = divmod(len(a), n)
splitlist = [ a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n) ]
return splitlist
# number of threads
T = 2
# data size
N = int(1e7)
# data
x = np.arange(N)
x_split = split(x,T)
# array for results
len(x_split),x_split[0].size
```

Out[6]:

In [7]:

```
%%time
#Note the elegant interface!
pool = Pool(T)
results = pool.map(testfunc_mp2, x_split)
pool.close()
```

This is around 2 times faster than the multi-threaded version. Now we are actually running in parallel!

(NB: The CPU timings are a bit useless here because apparently sub-processes are not counted.)

*NB: This is not the easiest example to apply multi-processing to, because (unlike the simple example above) we are interested in the indices of the objects.*

In [8]:

```
#same as in other notebook
def angdistcut_python_naive(vec_obj,vec_ps,cos_maxsep):
nobj = vec_obj.shape[0]
nps = vec_ps.shape[0]
dim = vec_obj.shape[1]
#objects to be deleted
out = []
for i in range(nobj):
for j in range(nps):
cos = 0.
#compute dot product
for k in range(dim):
cos += vec_obj[i,k] * vec_ps[j,k]
#stop once we have found one contaminant
if cos > cos_maxsep:
out.append(i)
break
return np.array(out)
#helper function to generate points
def gen_points_sphere(n):
"""
generate random points on sphere
"""
#angles
costheta = 2*np.random.rand(n)-1
theta = np.arccos(costheta)
phi = 2*np.pi*np.random.rand(n)
#unit vectors on the sphere
x = np.sin(theta)*np.cos(phi)
y = np.sin(theta)*np.sin(phi)
z = np.cos(theta)
vec = np.array([x,y,z]).T
return vec,theta,phi
#generate populations for example
# objects (obj)
vec_obj,theta_obj,phi_obj = gen_points_sphere(5000) # ~500,000 in my real use case
# contaminants (ps)
vec_ps,theta_ps,phi_ps = gen_points_sphere(500) # ~50,000 in my real use case
print(vec_obj.shape,vec_ps.shape)
# maximum separation:
maxsep_deg = 1.
cos_maxsep = np.cos(np.deg2rad(maxsep_deg))
```

In [9]:

```
#Every process needs the full list of contaminants.
#The easiest way to get this is to create a new function where these fixed arguments are already filled in.
from functools import partial
angdistcut_python_partial = partial(angdistcut_python_naive,vec_ps=vec_ps,cos_maxsep=cos_maxsep)
#split vec_obj array along first dimension into roughly equal sized chunks
ncores = 2
vec_obj_split = split(vec_obj,ncores)
vec_obj.shape,len(vec_obj_split),vec_obj_split[0].shape
```

Out[9]:

In [ ]:

```
```

In [10]:

```
%%time
p = Pool(ncores)
#sends task to subprocesses
reslist = p.map(angdistcut_python_partial, vec_obj_split)
#clean-up
p.close()
```

In [11]:

```
#bring results in the correct format again
counter = 0
result_python_mp = []
#make sure we get the original indices again
for res,s in zip(reslist,vec_obj_split):
result_python_mp.append(np.array(res)+counter)
counter += len(s)
result_python_mp = np.array([item for sublist in result_python_mp for item in sublist])
```

In [12]:

```
%%time
result_python_naive = angdistcut_python_partial(vec_obj)
```

In [13]:

```
assert np.array_equal(result_python_mp,result_python_naive)
```

In [ ]:

```
```