This is the fourth post in a series I am writing. All posts are here:
- Speed Up your Algorithms Part 1 — PyTorch
- Speed Up your Algorithms Part 2 — Numba
- Speed Up your Algorithms Part 3 — Parallelization
- Speed Up your Algorithms Part 4 — Dask
And these goes with Jupyter Notebooks available here:
This post goes with Jupyter Notebook available in my Repo on Github:[SpeedUpYourAlgorithms-Dask]
and on Kaggle:
1. Introduction ^
With increasing need for parallelization of Machine Learning algorithms, because of exponential increase in data sizes and even model sizes, it would have been really helpful if we had a tool which could help us parallelize our
Pandas‘s DataFrame handling, which could parallilize our
Numpy‘s computations, and even parallelize our Machine Learning algorithms (maybe algorithms from
tensorflow), efficiently, effectively and without much hassle.
But such a library does exist, and its name is
Dask is a parallel computing library which doesn’t just help parallelize existing Machine Learning tools (
Numpy)[i.e. using High Level Collection], but also helps parallelize low level tasks/functions and can handle complex interactions between these functions by making a tasks’ graph.[i.e. using Low Level Schedulers] This is similar to Threading or multiprocessing modules of Python.
They also have a separate Machine Learning library,
dask-ml, which has integration with existing libraries such as
Dask parallelizes tasks given to it by making a graph of interactions between the tasks. It will be really helpful to visualize what you are doing by using
.visualize() method which is available with all of its data types and with complex chain of tasks you compute. This method will output a graph of your tasks, and if your tasks have many nodes at each level(i.e. your tasks chain structure have many independent tasks at many levels, such as parallelizable task on chunks of data), then
Dask will be able to parallelize them.
2. Data Types ^
Each data type in
Dask provides a distributed version of existing data types, such as
Python. These data types can be larger than your memory,
Dask will run computations on your data parallel(y) in
Blocked in the sense that they perform large computations by performing many small computations, i.e. in blocks, and number of blocks are total number of
Many Numpy arrays in a grid as Dask Array
Dask Array operates on very large arrays, by dividing them into chunks and executing those blocks parallely. It has many of numpy methods available which you can use to get speedup. But some of them are not implemented.
Dask Array can read from any array like structure given it supports numpy like slicing and has
.shape property by using
dask.array.from_array method. It can also read from
import dask.array as da
import numpy as np
arr = numpy.random.randint(1, 1000, (10000, 10000))
darr = da.from_array(arr, chunks=(1000, 1000))
# It will make chunks, each of size (1000, 1000)
It can be used when your arrays are really heavy (i.e. they won’t fit into memory) and
numpy won’t be able to do anything about that. So,
Dask divides them into chunks of arrays and operate on them in parallel for you.
Dask does lazy evaluation of every method. So, to actually compute the value of a function, you have to use
.compute() method. It will compute the result parallely in blocks, parallelizing every independent task at that time.
result = darr.compute()
5 Pandas’ DataFrames each providing monthly data (can be from diff files) in one Dask DataFrame
Dask DataFrames parallelize computation on very large Data Files, which won’t fit on memory, by dividing files into chunks and computing functions to those blocks parallely.
import dask.dataframe as dd
df = dd.read_csv("BigFile(s).csv", chunksize=50000)
Now you can apply/use most of the functions available in
pandas library and apply it here.
agg = df.groupby(["column"]).aggregate(["sum", "mean", "max", "min"])
agg.columns = new_column_names # see in notebook
df_new = df.merge(agg.reset_index(), on="column", how="left")
Bags parallelizes computation on
list like objects which contains elements of many data types. It is useful when you are trying to process some semi-structured data like JSON blobs or log files.
import dask.bag as db
b = db.from_txt("BigSemiStructuredData.txt")
Dask bags reads line by line and
.take method outputs tuple of number of lines specified.
Bag implements operations like
groupby on such collections of Python objects. It does this in parallel with a small memory footprint using Python iterators. It is similar to a parallel version of PyToolz or a Pythonic version of the PySpark RDD.
filtered = b.filter(lambda x: x["Name"]=="James")\
.map(lambda x: x["Address"] = "New_Address")
3. Delayed ^
If your task is a little simple and you are not able to or don’t want to do that with these High Level Collections, then you can use Low Level Schedulers which help you to parallelize your code/algorithm using
dask.delayed also does lazy computation.
import dask.delayed as delay
def add(x, y):
for i in range(len(arr)): sum+=arr[i]
You can add complex interactions between these functions according to your needs using results from previous tasks as an argument to next ones.
Dask will not compute these functions right away, rather it will make a graph for your tasks, effectively incorporating interactions between functions that you use.
inputs = list(np.arange(1, 11))#Will be addin' dask.delayed to list
temp = 
for i in range(len(inputs)):
temp.append(sq(inputs[i])) # Compute sq of inputs and save
# delayed in list
inputs=temp; temp = 
for i in range(0, len(inputs)-1, 2):
temp.append(add(inputs[i]+inputs[i+1])) # Add two consecutive
# results from prev step
inputs = temp
result = sum(inputs) # Sum all results from prev step
You can add delay to any parallelizable code with many possible small blocks and get a speedup. It can be many functions you want to compute like in example above or maybe reading a number of files in parallel using
4. Distributed ^
Firstly, until now we have using
Dask‘s default Scheduler for computing results of our tasks. But you can change them according to your needs from the options available from
Dask comes with four available schedulers:
threaded”: a scheduler backed by a thread pool
processes”: a scheduler backed by a process pool
single-threaded” (aka “
sync”): a synchronous scheduler, good for debugging
distributed: a distributed scheduler for executing graphs on multiple machines
result.compute(scheduler="single-threaded") # for debugging
NOTE: (from official page here)
Threaded tasks will work well when the functions called release the GIL, whereas multiprocessing will always have a slower start-up time and suffer where a lot of communication is required between tasks.
# And you can get the scheduler by the one of these commands:
dask.threaded.get, dask.multiprocessing.get, dask.local.get_sync
# last one for "single-threaded"
Dask has one more scheduler,
dask.distributed, and it can be preferred for following reasons:
- It provides access to asynchronous API, notably Futures,
- It provides a diagnostic dashboard that can provide valuable insight on performance and progress, and
- It handles data locality with more sophistication, and so can be more efficient than the multiprocessing scheduler on workloads that require multiple processes.
You can create
dask.distributed Scheduler by importing and creating a
from dask.distributed import Client
client = Client() # Set up a local cluster
# You can navigate to http://localhost:8787/status to see the
# diagnostic dashboard if you have Bokeh installed.
Now you can submit your tasks to this cluster by using
client.submit method, giving function and arguments as its parameters. And then we can gather our result by either using
sent = client.submit(sq, 4) # sq: square function
result = client.gather(sent) # Or sent.result()
You can also look at progress of your task in current cell only by using
dask.distributed.progress. And you can also explicitly opt to wait for a task to complete by using
For more information look here.
5. Machine Learning ^
Dask also has library which helps in running most popular Machine Learning libraries in parallel, like
In Machine Learning there are a couple of distinct scaling problems you might face. The scaling strategy depends on which problem you’re facing:
- Large Models: Data fits in RAM, but training takes too long. Many hyperparameter combinations, a large ensemble of many models, etc.
- Large Datasets: Data is larger than RAM, and sampling isn’t an option.
So, you should:
- For in-memory fit-able problems, just use scikit-learn (or your favorite ML library);
- For large models, use
dask_ml.jobliband your favorite scikit-learn estimator; and
- For large datasets, use
dask_ml.preprocessing contains some of the functions from
PolynomialFeatures etc., and some of its own such as
You can use them as you have been using them with
from dask_ml.preprocessing import RobustScalar
df = da.read_csv("BigFile.csv", chunks=50000)
rsc = RobustScalar()
df["column"] = rsc.fit_transform(df["column"])
And you can make a pipeline from
make_pipeline method using preprocessing methods of
b) Hyper Parameter Search:
Dask has methods from
sklearn for hyperparameter search such as
from dask_ml.datasets import make_regression
from dask_ml.model_selection import train_test_split, GridSearchCV
X, y = make_regression(chunks=50000)
xtr, ytr, xval, yval = test_train_split(X, y)
gsearch = GridSearchCV(estimator, param_grid, cv=10)
And if you are using
partial_fit with your estimators, you can use
NOTE: (from Dask)
If you want to use post-fit tasks like scoring and prediction, then underlying estimators scoring method is used. If your estimator, possibly from
sklearnis not able to handle large dataset, then wrap your estimator around "
dask_ml.wrappers.ParallelPostFit". It can parallelize methods like "predict", "predict_proba", "transform" etc.
Dask has some of the Linear Models (
LogisticRegression etc.), some Clustering Models (
SpectralClustering), a method to operate with
Tensorflow clusters, methods to train
XGBoost models using
You can use
sklearn‘s models with
Dask, if your training data is small, maybe with with
ParallelPostFit wrapper (if your test data is large).
from sklearn.linear_model import ElasticNet
dask_ml.wrappers import ParallelPostFit
preds = el.predict(Xtest)
If your dataset is not large but your model is big, then you can use
sklearns algorithms were written for parallel execution (you might have used
n_jobs=-1 argument), using
joblib which makes use of threads and processes to parallelize workload. To use
Dask to parallelize you can create a
Client (you have to) and then wrap your code around
from sklearn.externals import joblib
client = Client()
# your scikit-learn code
Note that the Dask joblib backend is useful for scaling out CPU-bound workloads; workloads with datasets that fit in RAM, but have many individual operations that can be done in parallel. To scale out to RAM-bound workloads (larger-than-memory datasets) you should use Dask's inbuilt models and methods.
And if you training data is too big which cannot fit into memory, then you should use
Dask‘s inbuilt estimators to get a speedup. You can also use
wrapper.Incremental which uses underlying estimator’s
partial_fit method to train on whole dataset but it is sequential in nature.
Dask‘s inbuilt estimators scale well for large datasets with variety of optimization algorithms like
gradient_descent etc. and with regularizers like
from dask_ml.linear_model import LogisticRegression
lr = LogisticRegression()
lr.fit(X, y, solver="lbfgs")
6. Further Reading ^
7. References ^
Suggestions and reviews are welcome.
Thank you for reading!
Comment! Share! Applaud!