Dask for High Energy Physics

Dask: Flexible parallel execution library for analytic computing

Martin Durant, Anaconda Inc.

Introducing Dask

  • easy
  • efficient
  • scalable
  • familiar
  • low-latency

The HPC gulf

Local machine HPC cluster
few GB of RAM, ~TB storage TB RAM, PB storage
<10 cores >10k cores
python compiled languages
simple programming dedicated parallelism framework
interactive/exploratory work scheduling system

Dask: How to scale up with a minimum of hassle

Run dask on your laptop, or on a large cluster: just specify the scheduler address.

In [1]:
import dask.distributed
client = dask.distributed.Client('dask-scheduler:8786')
client
Out[1]:

Client

Cluster

  • Workers: 8
  • Cores: 16
  • Memory: 54.19 GB
@dask.delayed 
def f(x, y):
    do_thing_with_inputs
    return output
In [2]:
%%writefile work.py
import dask
import time
import random

@dask.delayed
def load(fn):
    time.sleep(random.random())
    return fn

@dask.delayed
def load_from_sql(fn):
    time.sleep(random.random() * 3)
    return fn

@dask.delayed
def normalize(in1, in2):
    time.sleep(random.random())
    return in1

@dask.delayed
def roll(in1, in2, in3):
    time.sleep(random.random())
    return in1

@dask.delayed
def compare(in1, in2):
    time.sleep(1)
    return in1

@dask.delayed
def reduction(inlist):
    return True
Overwriting work.py
In [4]:
# Normal make-work functions annotated with "delayed"
from work import load, load_from_sql, normalize, roll, compare, reduction, random
filenames = ['mydata-%d.dat' % i for i in range(30)]
data = [load(fn) for fn in filenames]

reference = load_from_sql('sql://mytable')
processed = [normalize(d, reference) for d in data]

rolled = []
for i in range(len(processed) - 2):
    r = roll(processed[i], processed[i + 1], processed[i + 2])
    rolled.append(r)

compared = []
for i in range(20):
    a = random.choice(rolled)
    b = random.choice(rolled)
    c = compare(a, b)
    compared.append(c)

out = reduction(compared)
out
Out[4]:
Delayed('reduction-59fbe054-4c7b-4c7a-96b9-950c36b9c8ce')
In [6]:
out.visualize()
Out[6]: