by Bjoern Soergel, Institute of Astronomy, Cambridge, UK
CPython (the standard Python implementation written in C that we all use) has a feature called the Global Interpreter Lock (GIL). In a nutshell it means that only one Python thread can execute bytecode at a time (because the memory management is not thread-safe).
">>> import that"
The Unwritten Rules of Python
(C) David Beazley
We'll show examples for both of them. In both cases, we'll look at embarassingly parallel problems, i.e. cases where we can split up the computation in completely separate tasks that do not depend on each other.
This is best suited for I/O-limited problems, as it does not release the GIL. Therefore it does not provide any speed-up for CPU-bound problems.
If we want to execute CPU-limited code in parallel, we need to circumvent the GIL (as in the Cython parallel example above). Even without Cython, the multiprocessing module provides a way of doing this.
import numpy as np import threading from math import ceil def testfunc_threading(x,S,N,res): """ all threads work on the same x and write to the same res, but all on different chunks """ for i in range(S,min(S+N, len(x))): print(x[i]) res[i] = x[i]**2 # number of threads T = 2 # data size N = 8 # data x = np.arange(N) # array for results r = np.zeros(N,dtype=int) # data size for each thread chunk_N = ceil(float(N)/T) # starting index for each thread chunks = [i * chunk_N for i in range(T)] #create threads threads = [threading.Thread(target=testfunc_threading, args=(x,chunks[i],chunk_N,r)) for i in range(T)] #start them for thread in threads: thread.start() #wait for them to finish for thread in threads: thread.join()
04 5 16 27 3
array([ 0, 1, 4, 9, 16, 25, 36, 49])
Let's do this on a larger example to show that we are not making anything faster.
def testfunc_threading2(x,S,N,res): for i in range(S,min(S+N, len(x))): res[i] = x[i]**2 # number of threads T = 2 # data size N = int(1e7) # data x = np.arange(N) # array for results r = np.zeros(N,dtype=int) # data size for each thread chunk_N = ceil(float(N)/T) # starting index for each thread chunks = [i * chunk_N for i in range(T)]
%%time #create threads threads = [threading.Thread(target=testfunc_threading2, args=(x,chunks[i],chunk_N,r)) for i in range(T)] #start them for thread in threads: thread.start() #wait for them to finish for thread in threads: thread.join()
CPU times: user 5.5 s, sys: 35 ms, total: 5.53 s Wall time: 5.5 s
CPU time and Wall time are identical, indicating that we have not made anything faster.
A few things worth noting:
Python's multiprocessing module spawns multiple subprocesses. All the data required by the subprocesses is serialized ('pickled') and passed to the individual subprocesses. Every subprocess works on their task independently, thus circumventing the GIL.
For easy problems this is by far the quickest way of making your code parallel. But in comes with a few disadvantages:
#this is now actual multiprocessing from multiprocessing import Pool def testfunc(a): print(a) return a*a #create a pool of 4 subprocesses p = Pool(4) #sends task to subprocesses res = p.map(testfunc, np.arange(8)) #clean-up p.close() print(res)
0 1 2 3 5 4 6 7 [0, 1, 4, 9, 16, 25, 36, 49]
Let's revisit the example from above
def testfunc_mp2(x): """ same computation but different I/O """ res = np.zeros(len(x)) for i in range(len(x)): res[i] = x[i]**2 return res def split(a, n): """ split array or list along first dimension, into roughly equal sizes """ k, m = divmod(len(a), n) splitlist = [ a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n) ] return splitlist # number of threads T = 2 # data size N = int(1e7) # data x = np.arange(N) x_split = split(x,T) # array for results len(x_split),x_split.size
%%time #Note the elegant interface! pool = Pool(T) results = pool.map(testfunc_mp2, x_split) pool.close()
CPU times: user 79 ms, sys: 62 ms, total: 141 ms Wall time: 1.27 s
This is around 2 times faster than the multi-threaded version. Now we are actually running in parallel!
(NB: The CPU timings are a bit useless here because apparently sub-processes are not counted.)
NB: This is not the easiest example to apply multi-processing to, because (unlike the simple example above) we are interested in the indices of the objects.
#same as in other notebook def angdistcut_python_naive(vec_obj,vec_ps,cos_maxsep): nobj = vec_obj.shape nps = vec_ps.shape dim = vec_obj.shape #objects to be deleted out =  for i in range(nobj): for j in range(nps): cos = 0. #compute dot product for k in range(dim): cos += vec_obj[i,k] * vec_ps[j,k] #stop once we have found one contaminant if cos > cos_maxsep: out.append(i) break return np.array(out) #helper function to generate points def gen_points_sphere(n): """ generate random points on sphere """ #angles costheta = 2*np.random.rand(n)-1 theta = np.arccos(costheta) phi = 2*np.pi*np.random.rand(n) #unit vectors on the sphere x = np.sin(theta)*np.cos(phi) y = np.sin(theta)*np.sin(phi) z = np.cos(theta) vec = np.array([x,y,z]).T return vec,theta,phi #generate populations for example # objects (obj) vec_obj,theta_obj,phi_obj = gen_points_sphere(5000) # ~500,000 in my real use case # contaminants (ps) vec_ps,theta_ps,phi_ps = gen_points_sphere(500) # ~50,000 in my real use case print(vec_obj.shape,vec_ps.shape) # maximum separation: maxsep_deg = 1. cos_maxsep = np.cos(np.deg2rad(maxsep_deg))
(5000, 3) (500, 3)
#Every process needs the full list of contaminants. #The easiest way to get this is to create a new function where these fixed arguments are already filled in. from functools import partial angdistcut_python_partial = partial(angdistcut_python_naive,vec_ps=vec_ps,cos_maxsep=cos_maxsep) #split vec_obj array along first dimension into roughly equal sized chunks ncores = 2 vec_obj_split = split(vec_obj,ncores) vec_obj.shape,len(vec_obj_split),vec_obj_split.shape
((5000, 3), 2, (2500, 3))
%%time p = Pool(ncores) #sends task to subprocesses reslist = p.map(angdistcut_python_partial, vec_obj_split) #clean-up p.close()
CPU times: user 3 ms, sys: 7 ms, total: 10 ms Wall time: 1.49 s
#bring results in the correct format again counter = 0 result_python_mp =  #make sure we get the original indices again for res,s in zip(reslist,vec_obj_split): result_python_mp.append(np.array(res)+counter) counter += len(s) result_python_mp = np.array([item for sublist in result_python_mp for item in sublist])
%%time result_python_naive = angdistcut_python_partial(vec_obj)
CPU times: user 2.85 s, sys: 0 ns, total: 2.85 s Wall time: 2.85 s
Because there is some overhead associated with serializing and creating the subprocesses, we get only a factor 1.5 speedup on 2 cores. For larger computations, the impact of the overhead is smaller.