Speeding up numerical Python code

A guide with astromical examples

by Bjoern Soergel, Institute of Astronomy, Cambridge, UK

Part 2: General ways of making Python code run parallel

by Bjoern Soergel, Institute of Astronomy, Cambridge, UK

Background:

CPython (the standard Python implementation written in C that we all use) has a feature called the Global Interpreter Lock (GIL). In a nutshell it means that only one Python thread can execute bytecode at a time (because the memory management is not thread-safe).

">>> import that"

The Unwritten Rules of Python

  1. You do not talk about the GIL.
  2. You do NOT talk about the GIL.
  3. Don't even mention the GIL. No seriously.

(C) David Beazley

There are two ways of parallelizing code in Python.

We'll show examples for both of them. In both cases, we'll look at embarassingly parallel problems, i.e. cases where we can split up the computation in completely separate tasks that do not depend on each other.

1) Multi-threading (shared memory)

This is best suited for I/O-limited problems, as it does not release the GIL. Therefore it does not provide any speed-up for CPU-bound problems.

2) Multi-processing (separate memory)

If we want to execute CPU-limited code in parallel, we need to circumvent the GIL (as in the Cython parallel example above). Even without Cython, the multiprocessing module provides a way of doing this.

In [ ]:
 

Parallel Python 1: Multi-threading:

Let's begin with a simple example (partially based on http://roman-kh.github.io/numba-2/).

In [1]:
import numpy as np
import threading
from math import ceil

def testfunc_threading(x,S,N,res):
    """
    all threads work on the same x and write to the same res, but all on different chunks
    """
    for i in range(S,min(S+N, len(x))):
        print(x[i])
        res[i] = x[i]**2

# number of threads
T = 2
# data size
N = 8
# data
x = np.arange(N)
# array for results
r = np.zeros(N,dtype=int)

# data size for each thread
chunk_N = ceil(float(N)/T)
# starting index for each thread
chunks = [i * chunk_N for i in range(T)]

#create threads
threads = [threading.Thread(target=testfunc_threading, args=(x,chunks[i],chunk_N,r)) for i in range(T)]  
#start them
for thread in threads:
    thread.start()
#wait for them to finish
for thread in threads:
    thread.join()
04
5

16

27

3
In [2]:
r
Out[2]:
array([ 0,  1,  4,  9, 16, 25, 36, 49])

Let's do this on a larger example to show that we are not making anything faster.

In [3]:
def testfunc_threading2(x,S,N,res):
    for i in range(S,min(S+N, len(x))):
        res[i] = x[i]**2
        
# number of threads
T = 2
# data size
N = int(1e7)
# data
x = np.arange(N)
# array for results
r = np.zeros(N,dtype=int)
# data size for each thread
chunk_N = ceil(float(N)/T)
# starting index for each thread
chunks = [i * chunk_N for i in range(T)]
In [4]:
%%time
#create threads
threads = [threading.Thread(target=testfunc_threading2, args=(x,chunks[i],chunk_N,r)) for i in range(T)]  
#start them
for thread in threads:
    thread.start()
#wait for them to finish
for thread in threads:
    thread.join()
CPU times: user 5.5 s, sys: 35 ms, total: 5.53 s
Wall time: 5.5 s

CPU time and Wall time are identical, indicating that we have not made anything faster.

A few things worth noting:

  • If the task is I/O bound instead of CPU bound, there can indeed be a speed-up.
  • If we can release the GIL when calling the worker function, it is possible to run true multi-threaded code. Both numba and Cython allow this. For Cython, see our previous exercise. For numba, see e.g. http://roman-kh.github.io/numba-2/
  • There is an easier interface to multithreading that works exactly like the multiprocessing we show below, e.g. here http://chriskiehl.com/article/parallelism-in-one-line/

Parallel Python 2: Multiprocessing

Python's multiprocessing module spawns multiple subprocesses. All the data required by the subprocesses is serialized ('pickled') and passed to the individual subprocesses. Every subprocess works on their task independently, thus circumventing the GIL.

For easy problems this is by far the quickest way of making your code parallel. But in comes with a few disadvantages:

  • Everything needs to be serialized. In some cases (e.g. class instances) this can be a problem.
  • Every subprocess has its own memory and needs a copy of all required data in it. This can lead to excessive memory requirements. Therefore: Don't just blindly use Pool.map(), especially on shared systems like the IoA clusters!
In [5]:
#this is now actual multiprocessing
from multiprocessing import Pool

def testfunc(a):
    print(a)
    return a*a

#create a pool of 4 subprocesses
p = Pool(4)
#sends task to subprocesses
res = p.map(testfunc, np.arange(8))
#clean-up
p.close()

print(res)
0
1
2
3
5
4
6
7
[0, 1, 4, 9, 16, 25, 36, 49]

Let's revisit the example from above

In [6]:
def testfunc_mp2(x):
    """
    same computation but different I/O
    """
    res = np.zeros(len(x))
    for i in range(len(x)):
        res[i] = x[i]**2
    return res

def split(a, n):
    """
    split array or list along first dimension, into roughly equal sizes
    """
    k, m = divmod(len(a), n)
    splitlist = [ a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n) ] 
    return splitlist

# number of threads
T = 2
# data size
N = int(1e7)
# data
x = np.arange(N)
x_split = split(x,T)
# array for results
len(x_split),x_split[0].size
Out[6]:
(2, 5000000)
In [7]:
%%time
#Note the elegant interface!
pool = Pool(T) 
results = pool.map(testfunc_mp2, x_split)
pool.close() 
CPU times: user 79 ms, sys: 62 ms, total: 141 ms
Wall time: 1.27 s

This is around 2 times faster than the multi-threaded version. Now we are actually running in parallel!

(NB: The CPU timings are a bit useless here because apparently sub-processes are not counted.)

Let's apply the same to our naive Python implementation of the contaminant removal problem.

NB: This is not the easiest example to apply multi-processing to, because (unlike the simple example above) we are interested in the indices of the objects.

In [8]:
#same as in other notebook
def angdistcut_python_naive(vec_obj,vec_ps,cos_maxsep):
    nobj = vec_obj.shape[0]
    nps = vec_ps.shape[0]
    dim = vec_obj.shape[1]
    #objects to be deleted
    out = []
    for i in range(nobj):
        for j in range(nps):
            cos = 0.
            #compute dot product
            for k in range(dim):
                cos += vec_obj[i,k] * vec_ps[j,k]
            #stop once we have found one contaminant
            if cos > cos_maxsep:
                out.append(i)
                break   
    return np.array(out)

#helper function to generate points
def gen_points_sphere(n):
    """
    generate random points on sphere
    """
    #angles
    costheta = 2*np.random.rand(n)-1
    theta = np.arccos(costheta)
    phi = 2*np.pi*np.random.rand(n)
    #unit vectors on the sphere
    x = np.sin(theta)*np.cos(phi)
    y = np.sin(theta)*np.sin(phi)
    z = np.cos(theta)
    vec = np.array([x,y,z]).T
    return vec,theta,phi

#generate populations for example
# objects (obj)
vec_obj,theta_obj,phi_obj = gen_points_sphere(5000) # ~500,000 in my real use case
# contaminants (ps)
vec_ps,theta_ps,phi_ps = gen_points_sphere(500) # ~50,000 in my real use case
print(vec_obj.shape,vec_ps.shape)
# maximum separation: 
maxsep_deg = 1.
cos_maxsep = np.cos(np.deg2rad(maxsep_deg))
(5000, 3) (500, 3)
In [9]:
#Every process needs the full list of contaminants. 
#The easiest way to get this is to create a new function where these fixed arguments are already filled in.
from functools import partial
angdistcut_python_partial = partial(angdistcut_python_naive,vec_ps=vec_ps,cos_maxsep=cos_maxsep)

#split vec_obj array along first dimension into roughly equal sized chunks
ncores = 2
vec_obj_split = split(vec_obj,ncores)
vec_obj.shape,len(vec_obj_split),vec_obj_split[0].shape
Out[9]:
((5000, 3), 2, (2500, 3))
In [ ]:
 
In [10]:
%%time
p = Pool(ncores)
#sends task to subprocesses
reslist = p.map(angdistcut_python_partial, vec_obj_split)
#clean-up
p.close()
CPU times: user 3 ms, sys: 7 ms, total: 10 ms
Wall time: 1.49 s
In [11]:
#bring results in the correct format again
counter = 0
result_python_mp = []
#make sure we get the original indices again
for res,s in zip(reslist,vec_obj_split):
    result_python_mp.append(np.array(res)+counter)
    counter += len(s)
result_python_mp = np.array([item for sublist in result_python_mp for item in sublist])
In [12]:
%%time
result_python_naive = angdistcut_python_partial(vec_obj)
CPU times: user 2.85 s, sys: 0 ns, total: 2.85 s
Wall time: 2.85 s
In [13]:
assert np.array_equal(result_python_mp,result_python_naive)

Because there is some overhead associated with serializing and creating the subprocesses, we get only a factor 1.5 speedup on 2 cores. For larger computations, the impact of the overhead is smaller.

In [ ]: