dask-distributed to manage a pool of workers on multiple machines, and use them in joblibIn parallel computing, an embarrassingly parallel problem is one which is obviously decomposable into many identical but separate subtasks. For such tasks, joblib is a very easy-to-use Python package, which allows to distribute work on multiple procesors. It is used for instance internally in scikit-learn for parallel grid search and cross-validation. joblib makes parallel computing ridiculously easy (see the doc):
from joblib import Parallel, delayed
from math import sqrt
result = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10))
# result = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
However, joblib is limited to local processors, which means it is easy to make full use of an 80-core cluster, but it is more complicated to distribute tasks on (for example) the 40 desktops of an university lab room, on which you might have a direct SSH access.
But recently, dask-distributed implemented a joblib backend, which makes it very easy to use if you are familiar with joblib.
The cool part is that your python script will (almost) not change.
Here are the details.
First of all, you will need to install the following packages:
conda install dask distributed -c conda-forge
conda install bokeh
pip install paramiko joblib
dask is a flexible parallel computing library.dask.distributed is a lightweight library for distributed computing.bokeh is an interactive visualization library.paramiko is an implementation of the SSHv2 protocol.joblib is a set of tools to provide lightweight pipelining.dask-sshIn short, dask-ssh is the command you need, and it is available after installing dask-distributed. However, the connection to the servers may require a password, which you don’t want to type every time you start your script, and definetely not for each one of the servers. Here is one way to handle this issue:
id_rsa and id_rsa.pub) in your machine: ls -a ~/.ssh
ssh-keygen
lame10 and lame11 with my username tdupre): ssh-copy-id tdupre@lame10
ssh-copy-id tdupre@lame11
eval "$(ssh-agent -s)"
ssh-add ~/.ssh/id_rsa
ssh tdupre@lame10
dask-ssh.The scheduler is the process which receives the work from joblib, and dispatches it to the workers. The workers are the processes on the distant servers which are going to perform the tasks.
To create the scheduler and the workers, all you need is dask-ssh:
dask-ssh \
--scheduler localhost \
--nprocs 1 \
--nthreads 1 \
--ssh-username tdupre \
--ssh-private-key ~/.ssh/id_rsa \
lame10 lame11
Remarks:
tdupre is my username, probably not yours.localhost can be changed to any IP address, to host the scheduler.lame10 lame11 is my list of servers were I want some workers. You probably also need to change it.--hostfile list_of_server.txt, where list_of_server.txt contains:lame10
lame11
You can connect to a webpage to have a nice overview of your workers:
http://localhost:8787/status
By the way, this is why you need to install bokeh.
joblibNow the cool part is that you barely have to update your joblib scripts ! Here are some examples.
Minimal API example:
import distributed.joblib # noqa
from joblib import parallel_backend
with parallel_backend('dask.distributed',
scheduler_host='localhost:8786'):
pass # your script using joblib
Example with sklearn:
import distributed.joblib # noqa
# scikit-learn bundles joblib, so you need to import from
# `sklearn.externals.joblib` instead of `joblib` directly
from sklearn.externals.joblib import parallel_backend
from sklearn.datasets import load_digits
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC
import numpy as np
digits = load_digits()
param_space = {
'C': np.logspace(-6, 6, 13),
'gamma': np.logspace(-8, 8, 17),
'tol': np.logspace(-4, -1, 4),
'class_weight': [None, 'balanced'],
}
model = SVC(kernel='rbf')
search = RandomizedSearchCV(model, param_space, cv=3, n_iter=150, verbose=10, n_jobs=-1)
with parallel_backend('dask.distributed', scheduler_host='localhost:8786'):
search.fit(digits.data, digits.target)
Remarks:
http://localhost:8787/status.Other pure joblib example:
import time
import numpy as np
import distributed.joblib # noqa
from joblib import parallel_backend, Parallel, delayed
def run(duration):
time.sleep(duration)
return duration
delayed_run = delayed(run)
with parallel_backend('dask.distributed',
scheduler_host='localhost:8786'):
results = Parallel()(delayed_run(duration)
for duration in np.arange(1, 5, 0.1))
print(np.array(results))
See more details in the dask-distributed doc
As paramiko does not handle easily X11-forwarding (like in shh -X), we can’t display a figure in a distant worker. However, with matplotlib, we can create a figure in a non-interactive backend, and save the figure with fig.savefig('save_name.png').
To use a non-interactive backend, use this command before importing matplotlib.pyplot:
import matplotlib
matplotlib.use('agg')
Again, this command works only before importing matplotlib.pyplot.
However, for some obscure reasons, this may fail.
You may have more luck with this command instead:
import matplotlib.pyplot as plt
plt.switch_backend('agg')
In the bash command dask-ssh, the number of processes (--nprocs 1) is identical in all servers. To have a different number of processes in each server, we need to customize dask-ssh.
The command dask-ssh is just a shortcut to a python script, dask-ssh.py, so let’s copy it and customize it.
For instance, let’s assume we want to give the servers as a list of hostnames and integers.
localhost 3
lame10 2
lame11 10
Each line corresponds to a server and the number of processes we want in this server. We will call the script giving the list in a file: --hostfile list_of_server.txt.
In dask-ssh.py, the server list is given in the parameter hostnames, so we first modify the parsing to keep the lines intact:
if hostfile:
with open(hostfile) as f:
hosts = f.readlines()
hostnames.extend([h.split() for h in hosts])
Then, we give an empty list of servers to SSHCluster, and we start the workers manually with start_worker:
c = SSHCluster(scheduler, scheduler_port, [], nthreads, nprocs,
ssh_username, ssh_port, ssh_private_key, nohost,
log_directory)
# start the workers, giving a specific number of processes if provided
for hostname in hostnames:
if len(hostname) == 1:
address = hostname[0]
nprocs = c.nprocs
else:
address = hostname[0]
try:
nprocs = int(hostname[1])
except:
raise ValueError('Invalid hostname and number of processes %s'
% (hostname, ))
c.workers.append(start_worker(c.logdir, c.scheduler_addr,
c.scheduler_port, address,
c.nthreads, nprocs,
c.ssh_username, c.ssh_port,
c.ssh_private_key, c.nohost))
Then we simply call the script with python my_dask_ssh.py instead of dask-ssh.
The full script is given below (click on Details).