Tom Dupré la Tour

Tutorial: How to use dask-distributed to manage a pool of workers on multiple machines, and use them in joblib

In parallel computing, an embarrassingly parallel problem is one which is obviously decomposable into many identical but separate subtasks. For such tasks, joblib is a very easy-to-use Python package, which allows to distribute work on multiple procesors. It is used for instance internally in scikit-learn for parallel grid search and cross-validation. joblib makes parallel computing ridiculously easy (see the doc):

from joblib import Parallel, delayed
from math import sqrt
result = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10))
# result = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

However, joblib is limited to local processors, which means it is easy to make full use of an 80-core cluster, but it is more complicated to distribute tasks on (for example) the 40 desktops of an university lab room, on which you might have a direct SSH access.

But recently, dask-distributed implemented a joblib backend, which makes it very easy to use if you are familiar with joblib. The cool part is that your python script will (almost) not change. Here are the details.

Install

First of all, you will need to install the following packages:

conda install dask distributed -c conda-forge
conda install bokeh
pip install paramiko joblib

How to deal with passwords in dask-ssh

In short, dask-ssh is the command you need, and it is available after installing dask-distributed. However, the connection to the servers may require a password, which you don’t want to type every time you start your script, and definetely not for each one of the servers. Here is one way to handle this issue:

  1. First, you need a ssh key. Check if there is already a ssh key (called e.g. id_rsa and id_rsa.pub) in your machine:
  ls -a ~/.ssh
  1. If there is a ssh key and you know its pass-phrase, use it. Otherwise, create a ssh key with:
  ssh-keygen
  1. Then add your public key to all your distant servers (e.g. for me lame10 and lame11 with my username tdupre):
  ssh-copy-id tdupre@lame10
  ssh-copy-id tdupre@lame11
  1. Start an ssh-agent in the background:
  eval "$(ssh-agent -s)"
  1. Add your private key to the ssh-agent:
  ssh-add ~/.ssh/id_rsa
  1. Test that the connection to your server is now password-free:
  ssh tdupre@lame10
  1. Tadaaa ! You will have to repete steps 4 and 5 in every terminal in which you want to run dask-ssh.

How to create the scheduler and the workers in each server

The scheduler is the process which receives the work from joblib, and dispatches it to the workers. The workers are the processes on the distant servers which are going to perform the tasks. To create the scheduler and the workers, all you need is dask-ssh:

dask-ssh \
    --scheduler localhost \
    --nprocs 1 \
    --nthreads 1 \
    --ssh-username tdupre \
    --ssh-private-key ~/.ssh/id_rsa \
    lame10 lame11

Remarks:

lame10
lame11

How to have a nice overview of your workers

You can connect to a webpage to have a nice overview of your workers:

http://localhost:8787/status

By the way, this is why you need to install bokeh.

How to use this scheduler with joblib

Now the cool part is that you barely have to update your joblib scripts ! Here are some examples.

Minimal API example:

import distributed.joblib  # noqa
from joblib import parallel_backend

with parallel_backend('dask.distributed',
                      scheduler_host='localhost:8786'):
    pass  # your script using joblib

Example with sklearn:

import distributed.joblib  # noqa
# scikit-learn bundles joblib, so you need to import from
# `sklearn.externals.joblib` instead of `joblib` directly
from sklearn.externals.joblib import parallel_backend
from sklearn.datasets import load_digits
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC
import numpy as np

digits = load_digits()

param_space = {
    'C': np.logspace(-6, 6, 13),
    'gamma': np.logspace(-8, 8, 17),
    'tol': np.logspace(-4, -1, 4),
    'class_weight': [None, 'balanced'],
}

model = SVC(kernel='rbf')
search = RandomizedSearchCV(model, param_space, cv=3, n_iter=150, verbose=10, n_jobs=-1)

with parallel_backend('dask.distributed', scheduler_host='localhost:8786'):
    search.fit(digits.data, digits.target)

Remarks:

Other pure joblib example:

import time
import numpy as np
import distributed.joblib  # noqa
from joblib import parallel_backend, Parallel, delayed

def run(duration):
    time.sleep(duration)
    return duration

delayed_run = delayed(run)

with parallel_backend('dask.distributed',
                      scheduler_host='localhost:8786'):
    results = Parallel()(delayed_run(duration)
                         for duration in np.arange(1, 5, 0.1))
    print(np.array(results))

See more details in the dask-distributed doc

How to generate figures in a distant worker

As paramiko does not handle easily X11-forwarding (like in shh -X), we can’t display a figure in a distant worker. However, with matplotlib, we can create a figure in a non-interactive backend, and save the figure with fig.savefig('save_name.png').

To use a non-interactive backend, use this command before importing matplotlib.pyplot:

import matplotlib
matplotlib.use('agg')

Again, this command works only before importing matplotlib.pyplot. However, for some obscure reasons, this may fail. You may have more luck with this command instead:

import matplotlib.pyplot as plt
plt.switch_backend('agg')

Advanced: How to create a different number of worker in each server

In the bash command dask-ssh, the number of processes (--nprocs 1) is identical in all servers. To have a different number of processes in each server, we need to customize dask-ssh. The command dask-ssh is just a shortcut to a python script, dask-ssh.py, so let’s copy it and customize it.

For instance, let’s assume we want to give the servers as a list of hostnames and integers.

localhost 3
lame10 2
lame11 10

Each line corresponds to a server and the number of processes we want in this server. We will call the script giving the list in a file: --hostfile list_of_server.txt.

In dask-ssh.py, the server list is given in the parameter hostnames, so we first modify the parsing to keep the lines intact:

if hostfile:
    with open(hostfile) as f:
        hosts = f.readlines()
    hostnames.extend([h.split() for h in hosts])

Then, we give an empty list of servers to SSHCluster, and we start the workers manually with start_worker:

c = SSHCluster(scheduler, scheduler_port, [], nthreads, nprocs,
               ssh_username, ssh_port, ssh_private_key, nohost,
               log_directory)

# start the workers, giving a specific number of processes if provided
for hostname in hostnames:
    if len(hostname) == 1:
        address = hostname[0]
        nprocs = c.nprocs
    else:
        address = hostname[0]
        try:
            nprocs = int(hostname[1])
        except:
            raise ValueError('Invalid hostname and number of processes %s'
                             % (hostname, ))
    c.workers.append(start_worker(c.logdir, c.scheduler_addr,
                                  c.scheduler_port, address,
                                  c.nthreads, nprocs,
                                  c.ssh_username, c.ssh_port,
                                  c.ssh_private_key, c.nohost))

Then we simply call the script with python my_dask_ssh.py instead of dask-ssh.

The full script is given below (click on Details).

```python from __future__ import print_function, division, absolute_import from distributed.deploy.ssh import SSHCluster, start_worker import click @click.command( help="""Launch a distributed cluster over SSH. A 'dask-scheduler' process will run on the first host specified in [HOSTNAMES] or in the hostfile (unless --scheduler is specified explicitly). One or more 'dask-worker' processes will be run each host in [HOSTNAMES] or in the hostfile. Use command line flags to adjust how many dask-worker process are run on each host (--nprocs) and how many cpus are used by each dask-worker process (--nthreads).""") @click.option('--scheduler', default=None, type=str, help="Specify scheduler node. Defaults to first address.") @click.option('--scheduler-port', default=8786, type=int, help="Specify scheduler port number. Defaults to port 8786.") @click.option('--nthreads', default=0, type=int, help="Number of threads per worker process. Defaults to number " "of cores divided by the number of processes per host.") @click.option('--nprocs', default=1, type=int, help="Number of worker processes per host. Defaults to one.") @click.argument('hostnames', nargs=-1, type=str) @click.option('--hostfile', default=None, type=click.Path(exists=True), help="Textfile with hostnames/IP addresses") @click.option('--ssh-username', default=None, type=str, help="Username to use when establishing SSH connections.") @click.option('--ssh-port', default=22, type=int, help="Port to use for SSH connections.") @click.option('--ssh-private-key', default=None, type=str, help="Private key file to use for SSH connections.") @click.option('--nohost', is_flag=True, help="Do not pass the hostname to the worker.") @click.option('--log-directory', default=None, type=click.Path(exists=True), help="Directory to use on all cluster nodes for the output of " "dask-scheduler and dask-worker commands.") @click.pass_context def main(ctx, scheduler, scheduler_port, hostnames, hostfile, nthreads, nprocs, ssh_username, ssh_port, ssh_private_key, nohost, log_directory): try: hostnames = list(hostnames) if hostfile: with open(hostfile) as f: hosts = f.readlines() hostnames.extend([h.split() for h in hosts]) if not scheduler: scheduler = hostnames[0] except IndexError: print(ctx.get_help()) exit(1) c = SSHCluster(scheduler, scheduler_port, [], nthreads, nprocs, ssh_username, ssh_port, ssh_private_key, nohost, log_directory) # start the workers, giving a specific number of processes if provided for hostname in hostnames: if len(hostname) == 1: address = hostname[0] nprocs = c.nprocs else: address = hostname[0] try: nprocs = int(hostname[1]) except: raise ValueError('Invalid hostname and number of processes %s' % (hostname, )) c.workers.append(start_worker(c.logdir, c.scheduler_addr, c.scheduler_port, address, c.nthreads, nprocs, c.ssh_username, c.ssh_port, c.ssh_private_key, c.nohost)) import distributed print('\n---------------------------------------------------------------') print(' Dask.distributed v{version}\n'.format( version=distributed.__version__)) print('Worker nodes:'.format(n=len(hostnames))) for i, host in enumerate(hostnames): print(' {num}: {host}'.format(num=i, host=host)) print('\nscheduler node: {addr}:{port}'.format(addr=scheduler, port=scheduler_port)) print( '---------------------------------------------------------------\n\n') # Monitor the output of remote processes. # This blocks until the user issues a KeyboardInterrupt. c.monitor_remote_processes() # Close down the remote processes and exit. print("\n[ dask-ssh ]: Shutting down remote processes" " (this may take a moment).") c.shutdown() print("[ dask-ssh ]: Remote processes have been terminated. Exiting.") if __name__ == '__main__': main() ```