An update and proposal on continuing development of Dask and yt.
Dask and yt: a pre-YTEP
Table of Contents
yt and Dask: an overview
In the past months, I’ve been investigating and working on integrating Dask into the yt codebase. This document provides an overview of my efforts to date but also is meant as a a preliminary YTEP (or pYTEP?) to solicit feedback from the yt community at an early stage before getting to far into the weeds of refactoring.
So in general, Dask provides a flexible framework for managing computations across chunks objects (stored in serial on a single processor or in parallel across workers). The yt operations that could potentially be simplified are any of the operations that rely on the chunking protocol such as data IO, calculating derived quantities, calculating profiles, sampling data (slices, projections) and more. Furthermore, allowing yt to return a
dask.array object to the user would allow the user to create their own parallel workflows more easily.
Before diving in, it’s worth discussing the interplay between Dask and the existing MPI architecture within yt. Dask itself provides mpi management via the dask-mpi package so from a user perspective, anyone already using yt and MPI should see minimal disruption to their workflows.
experiments in daskifying yt
Thus far, my efforts have focused on developing a series of experiments demonstrating yt + Dask integration at different levels withint yt covering using dask to read data off of disk, constructing a daskified version of a non-trivial and parallel yt calculation (profiles) and an initial prototype for adding
dask functionality to
unyt arrays. Each of these subjects has a detailed description at the following links:
I encourage you to check out the detailed descriptions, but I’ll provide a short summary here before describing some general takewaways and then proposing a plan for moving development into the yt pipeline.
1. (particle) data IO (link)
In this experiment, I re-wrote the
BaseIOHandler._read_particle_selection() function (in
yt.utilities.io_handler) to use dask to read in particle data from a Gadget dataset. The implementation iterates over the dataset chunks to build a list of
delayed_chunks = [ dask.delayed(self._read_single_ptype)( ch, this_ptf, selector, ptype_meta[ptype] ) for ch in chunks ]
The main challenges here were related to dask communication. The first is that dask uses pickle to serialize and distribute objects to different workers, so any arguments to delayed functions must be pickleable. So in order to implement this, I had to add some pickling methods for the base
selector objects and slightly modify the underlying
ParticleContainer class (that gets stored in each chunk) so that the dataset index is not needlessly rebuilt when unpickling.
The second communication related issue is that when yt pickles a
DataSet object, the hash values are stored in an in-memory cache by default, which is not accessible to the various Dask workers when working in parallel. In the IO prototype, I simply switched to using the on-disk hash storage, but it may be worth considering more direct memory management with Dask, perhaps creating a shared dask context to distribute certain objects across workers.
2. profile calculation (link)
In this experiment, I focused on refactoring a task that leverages chunked data: calculating profiles. I first attempted to write a pure dask version of calculating a binned statistic equivalent to a yt 1D profile but performance wasn’t great and it wasn’t clear how to generalize the code. So instead I focused on building a delayed workflow that directly uses yt’s optimized 1d binning function,
yt.utilities.lib.misc_utilities.new_binprofile1d. This approach can easily be extended across yt where we are performing collections and reductions across chunks. The modifications to the code would also be fairly minimal – mostly replacing MPI gathering operations with iterations over delayed dask objects (reminder: you would still be able to use MPI as normal, it’s just that dask would handle the MPI communications behind the scenes).
3. dask-unyt arrays (link)
In order to leverage dask wherever chunks are used, we need to be able to return dask arrays from the IO functions. In yt, however, our base arrays are
unyt_array objects. So in this experiment, I built a rough
dask-unyt array prototype. The basic approach was to create a new
unyt class that is subclassed off of the base dask
Array object (
dask.array.core.Array) that behaves as a dask
Array but carries units alongside in hidden
unyt attributes. Since the initial attempt, I’ve started an improved implementation that does a better job of minimizing code duplication (hopefully a PR to unyt soon).
data IO complexity
Finally, it is worth noting that the work here, particularly in the above section on the daskified particle reader, is closely related to Matt Turk’s thoughts on frontend refactoring (Part 1, Part 2, Part 3). While his posts do not mention dask, there are some synergies with the present work. In refactoring to leverage dask, we should considers ways to simplify frontend development.
Now that I’ve worked through some isolated experiments in daskifying parts of yt, it makes sense to get a wider range of folks involved. Towards that end, I’m proposing the following work plan:
- Stage 0: initial input from the yt community <—– We are Here
- Stage 1: move development to the yt pipeline
- Stage 2: particle dataset IO
- Stage 3: chunk operations on delayed arrays
- Stage 4: non-particle datasets (and more)
Stage 0: initial input from the yt community
This is the current stage. Do you love/hate any/all of this? Send me your ideas, thoughts, fears and hopes for yt + Dask! You can email me (email@example.com) or come and discuss on the yt slack channel.
Stage 1: move development to the yt pipeline (branch logistics)
So far, my development has mainly proceeded as standalone notebooks and modules in the DXL yt-dask-experiments repository. But in order to start fully devloping these new features, we need to move development into the yt pipeline. Given that these changes will take some time and will likely temporarily break many things, we need to isolate yt-Dask development from the main yt development . Towards that end, we can create a new
dask_yt development branch, after which development would proceed via:
- dask-specific PRs: these are PRs submitted directly to the
dask_ytbranch. They may introduce breaking changes.
- “neutral” PRs: these are PRs that make non-breaking changes that are independent of dask and are submitted to yt’s
masterbranch as normal PRs.
Occasionally, we merge yt
dask_yt as neutral changes are merged into
master (and as normal yt development occurs).
Stage 1 Tasks & Follow Up:
- create the new
Stage 2: particle dataset IO
The simplest place to start in actual refactoring is to implement a modified prototype particle reader within yt proper. While it will use dask to read the chunks, it can simply return expected in-memory dict with data and will not break anything.
Stage 2 Tasks & Follow Up:
- implement/copy the prototype
dask.dataframeusage (at present the protopye uses
dask.dataframefor the initial read to avoid having to know the number of particles a priori)
- consider the initial chunk creation – can we use Dask here initialy instead of the
Stage 3: chunk operations on delayed arrays
Once we have a daskified particle reader in place, we need to add the option to return the data as delayed dask arrays. Once in place, we can refactor many of the operations that use the
chunk iterator object. The main obstacle to this, besides refactoring any of the operations that use the chunks, is the fact that the arrays returned by
_read_particle_fields are converted to
unyt_arrays, so the first step in this stage is completing the
Stage 3 Tasks & Follow Up:
dask-unyt_arrayclass (as upstream contribution to
unyt, in progress here)
- add a
return_daskargument to return dask arrays when reading
- refactor the simpler routines that use the
chunkiteration (derived quantities and profile calculations) to use the dask arrays (following the profile calculation experiment).
- start conducting performance tests for the new daskified routines. Compare computation times and memory usage to both serial and MPI-parallel equivalanets on yt
Stage 4: non-particle datasets (and more)
Once we have working IO for particle datasets, the ability to return dask arrays, and some parallel operations succesfully using the dask arrays, the development path becomes a bit broader. Work could start on gridded datasets or some of the other yt operations that leverage chunks could be daskified.
several small related PRs that would qualify as “neutral changes” in the above context already exist: 2416, 2934, 2954.
related links and references
- The dxl repo home to the experiments describe above
- RHytHM2020 talk on Leveraging Dask in yt
- An earlier overview of my yt + Dask efforts
- Matt Turk’s thoughts of frontend refactoring: Part 1, Part 2, Part 3.