Published on September 27, 2017
Author: MatthewRocklin
Source: slideshare.net
1. © 2017 Anaconda, Inc. - Confidential & Proprietary Dask: Scaling Python Matthew Rocklin @mrocklin
2. © 2017 Anaconda, Inc. - Confidential & Proprietary Python is large and growing
3. © 2017 Anaconda, Inc. - Confidential & Proprietary https://stackoverflow.blog/2017/09/06/incredible-growth-python/ https://stackoverflow.blog/2017/09/14/python-growing-quickly/
4. Python’s Scientific Stack
5. Python’s Scientific Stack
6. Bokeh Python’s Scientific Stack
7. Bokeh Python’s Scientific Stack
8. Python’s Scientific Ecosystem (and many, many more) Bokeh
9. (and many, many more) Bokeh
10. © 2017 Anaconda, Inc. - Confidential & Proprietary Numeric Python’s virtues and vices • Fast: Native code with C/C++/CUDA • Intuitive: Long history with scientists and analysts • Established: Trusted and well understood • Broad: Packages for everything, community supported • But wasn’t designed to scale: • Limited to a single thread • Limited to in-memory data
11. © 2017 Anaconda, Inc. - Confidential & Proprietary How do we scale an ecosystem? From a parallel computing perspective
12. © 2017 Anaconda, Inc. - Confidential & Proprietary • Designed to parallelize the Python ecosystem • Flexible parallel computing paradigm • Familiar APIs for Python users • Co-developed with Pandas/SKLearn/Jupyter teams • Scales • Scales from multicore to 1000-node clusters • Resilience, responsive, and real-time
13. © 2017 Anaconda, Inc. - Confidential & Proprietary • High Level: Parallel NumPy, Pandas, ML • Satisfies subset of these APIs • Uses these libraries internally • Co-developed with these teams • Low Level: Task scheduling for arbitrary execution • Parallelize existing code • Build novel real-time systems • Arbitrary task graphs with data dependencies • Same scalability
14. © 2017 Anaconda, Inc. - Confidential & Proprietary demo • High level: Scaling Pandas • Same Pandas look and feel • Uses Pandas under the hood • Scales nicely onto many machines • Low level: Arbitrary task scheduling • Parallelize normal Python code • Build custom algorithms • React real-time • Demo deployed with • dask-kubernetes Google Compute Engine • github.com/dask/dask-kubernetes • Youtube link • https://www.youtube.com/watch?v=o ds97a5Pzw0&
15. © 2017 Anaconda, Inc. - Confidential & Proprietary What makes Dask different?
16. © 2017 Anaconda, Inc. - Confidential & Proprietary Most Parallel Frameworks Follow the following architecture 1. High level user-facing API like the SQL language, or Linear Algebra 2. Medium level query plan For databases/Spark: Big data map-steps, shuffle-steps, and aggregation-steps For arrays: Matrix multiplies, transposes, slicing 3. Low-level task graph Read 100MB chunk of data, run black-box function on it 4. Execution system Run task 9352 on worker 32, move data x-123 to worker 26 Flow from higher to lower level abstractions
17. © 2017 Anaconda, Inc. - Confidential & Proprietary Most Parallel Framework Architectures User API High Level Representation Logical Plan Low Level Representation Physical Plan Task scheduler for execution
18. © 2017 Anaconda, Inc. - Confidential & Proprietary SQL Database Architecture SELECT avg(value) FROM accounts INNER JOIN customers ON … WHERE name == ‘Alice’
19. © 2017 Anaconda, Inc. - Confidential & Proprietary SQL Database Architecture SELECT avg(value) FROM accounts WHERE name == ‘Alice’ INNER JOIN customers ON … Optimize
20. © 2017 Anaconda, Inc. - Confidential & Proprietary Spark Architecture df.join(df2, …) .select(…) .filter(…) Optimize
21. © 2017 Anaconda, Inc. - Confidential & Proprietary Large Matrix Architecture (A’ * A) A’ * b Optimize
22. © 2017 Anaconda, Inc. - Confidential & Proprietary Dask Architecture accts=dd.read_parquet(…) accts=accts[accts.name == ‘Alice’] df=dd.merge(accts, customers) df.value.mean().compute() Dask doesn’t have a high-level abstraction Dask can’t optimize But Dask is general to many domains
23. © 2017 Anaconda, Inc. - Confidential & Proprietary Dask Architecture u, s, v = da.linalg.svd(X) Y = u.dot(da.diag(s)).dot(v.T) da.linalg.norm(X - y)
24. © 2017 Anaconda, Inc. - Confidential & Proprietary Dask Architecture for i in range(256): x = dask.delayed(f)(i) y = dask.delayed(g)(x) z = dask.delayed(add)(x, y
25. © 2017 Anaconda, Inc. - Confidential & Proprietary Dask Architecture async def func(): client = await Client() futures = client.map(…) async for f in as_completed(…): result = await f
26. © 2017 Anaconda, Inc. - Confidential & Proprietary Dask Architecture Your own system here
27. © 2017 Anaconda, Inc. - Confidential & Proprietary High-level representations are powerful But they also box you in
28. © 2017 Anaconda, Inc. - Confidential & Proprietary Spark Map stage Shuffle stage Reduce stage Dask
29. © 2017 Anaconda, Inc. - Confidential & Proprietary DaskSpark Map stage Shuffle stage Reduce stage
30. © 2017 Anaconda, Inc. - Confidential & Proprietary By dropping the high level representation Costs • Lose specialization • Lose opportunities for high level optimization Benefits • Become generalists • More flexibility for new domains and algorithms • Access to smarter algorithms • Better task scheduling Resource constraints, GPUs, multiple clients, async-real-time, etc..
31. © 2017 Anaconda, Inc. - Confidential & Proprietary Ten Reasons People Choose Dask
32. © 2017 Anaconda, Inc. - Confidential & Proprietary 1. Scalable Pandas DataFrames • Same API import dask.dataframe as dd df = dd.read_parquet(‘s3://bucket/accounts/2017') df.groupby(df.name).value.mean().compute() • Efficient Timeseries Operations # Use the pandas index for efficient operations df.loc[‘2017-01-01’] df.value.rolling(10).std() df.value.resample(‘10m’).mean() • Co-developed with Pandas and by the Pandas developer community
33. © 2017 Anaconda, Inc. - Confidential & Proprietary 2. Scalable NumPy Arrays • Same API import dask.array as da x = da.from_array(my_hdf5_file) y = x.dot(x.T) • Applications • Atmospheric science • Satellite imagery • Biomedical imagery • Optimization algorithms check out dask-glm
34. © 2017 Anaconda, Inc. - Confidential & Proprietary 3. Parallelize Scikit-Learn/Joblib • Scikit-Learn parallelizes with Joblib estimator = RandomForest(…) estimator.fit(train_data, train_labels, njobs=8) • Joblib can use Dask from sklearn.externals.joblib import parallel_backend with parallel_backend('dask', scheduler=‘…’): estimator.fit(train_data, train_labels) https://pythonhosted.org/joblib/ http://distributed.readthedocs.io/en/latest/joblib.html Joblib Thread pool
35. © 2017 Anaconda, Inc. - Confidential & Proprietary 3. Parallelize Scikit-Learn/Joblib • Scikit-Learn parallelizes with Joblib estimator = RandomForest(…) estimator.fit(train_data, train_labels, njobs=8) • Joblib can use Dask from sklearn.externals.joblib import parallel_backend with parallel_backend('dask', scheduler=‘…’): estimator.fit(train_data, train_labels) https://pythonhosted.org/joblib/ http://distributed.readthedocs.io/en/latest/joblib.html Joblib Dask
36. © 2017 Anaconda, Inc. - Confidential & Proprietary 4. Parallelize Existing Codebases • Parallelize custom code with minimal intrusion results = {} for x in X: for y in Y: if x < y: result = f(x, y) else: result = g(x, y) results.append(result) • Good for algorithm researchers • Good for enterprises with entrenched business logic M Tepper, G Sapiro “Compressed nonnegative matrix factorization is fast and accurate”, IEEE Transactions on Signal Processing, 2016
37. © 2017 Anaconda, Inc. - Confidential & Proprietary 4. Parallelize Existing Codebases • Parallelize custom code with minimal intrusion f = dask.delayed(f) g = dask.delayed(g) results = {} for x in X: for y in Y: if x < y: result = f(x, y) else: result = g(x, y) results.append(result) result = dask.compute(results) • Good for algorithm researchers • Good for enterprises with entrenched business logic M Tepper, G Sapiro “Compressed nonnegative matrix factorization is fast and accurate”, IEEE Transactions on Signal Processing, 2016
38. © 2017 Anaconda, Inc. - Confidential & Proprietary 5. Many Other Libraries in Anaconda • Scikit-Image uses Dask to break down images and accelerate algorithms with overlapping regions • Geopandas can scale with Dask • Spatial partitioning • Accelerate spatial joins • (new work)
39. © 2017 Anaconda, Inc. - Confidential & Proprietary 6. Dask Scales Up • Thousand node clusters • Cloud computing • Super computers • Gigabyte/s bandwidth • 200 microsecond task overhead Dask Scales Down (the median cluster size is one) • Can run in a single Python thread pool • Almost no performance penalty (microseconds) • Lightweight • Few dependencies • Easy install
40. © 2017 Anaconda, Inc. - Confidential & Proprietary 7. Parallelize Web Backends • Web servers process thousands of small computations asynchronously for web pages or REST endpoints • Dask provides dynamic, heterogenous computation • Supports small data • 10ms roundtrip times • Dynamic scaling for different loads • Supports asynchronous Python (like GoLang) async def serve(request): future = dask_client.submit(process, request) result = await future return result
41. © 2017 Anaconda, Inc. - Confidential & Proprietary 8. Debugging support • Clean Python tracebacks when user code breaks • Connect to remote workers with IPython sessions for advanced debugging
42. © 2017 Anaconda, Inc. - Confidential & Proprietary 9. Resource constraints • Define limited hardware resources for workers • Specify resource constraints when submitting tasks $ dask-worker … —resources GPU=2 $ dask-worker … —resources GPU=2 $ dask-worker … —resources special-db=1 dask.compute(…, resources={ x: {’GPU’: 1}, read: {‘special-db’: 1}) • Used for GPUs, big-memory machines, special hardware, database connections, I/O machines, etc..
43. © 2017 Anaconda, Inc. - Confidential & Proprietary 10. Beautiful Diagnostic Dashboards • Fast responsive dashboards • Provide users performance insight • Powered by Bokeh Bokeh
44. © 2017 Anaconda, Inc. - Confidential & Proprietary Some Reasons not to Choose Dask
45. © 2017 Anaconda, Inc. - Confidential & Proprietary • Dask is not a SQL database. Does Pandas well, but won’t optimize complex queries • Dask is not a JVM technology It’s a Python library (although Julia bindings are available) • Dask is not a monolithic framework You’ll have to install Pandas, SKLearn and others as well Dask is small, designed to complement existing systems • Parallelism is not always necessary Use simple solutions if feasible Dask’s limitations
46. © 2017 Anaconda, Inc. - Confidential & Proprietary Why do people choose Dask? • Familiar with Python: • Drop-in NumPy/Pandas/SKLearn APIs • Native memory environment • Easy debugging and diagnostics • Have complex problems: • Parallelize existing code without expensive rewrites • Sophisticated algorithms and systems • Real-time response to small-data • Scales up and down: • Scales to 1000-node clusters • Also runs cheaply on a laptop #import pandas as pd import dask.dataframe as dd
47. © 2017 Anaconda, Inc. - Confidential & Proprietary Thank you for your time Questions?
48. © 2017 Anaconda, Inc. - Confidential & Proprietary dask.pydata.org conda install dask
49. © 2017 Anaconda, Inc. - Confidential & Proprietary