Dask: Scaling Python

Information about Dask: Scaling Python

Published on September 27, 2017

Author: MatthewRocklin

Source: slideshare.net

Content

1. © 2017 Anaconda, Inc. - Confidential & Proprietary Dask: Scaling Python Matthew Rocklin @mrocklin

2. © 2017 Anaconda, Inc. - Confidential & Proprietary Python is large and growing

3. © 2017 Anaconda, Inc. - Confidential & Proprietary https://stackoverflow.blog/2017/09/06/incredible-growth-python/ https://stackoverflow.blog/2017/09/14/python-growing-quickly/

4. Python’s Scientific Stack

5. Python’s Scientific Stack

6. Bokeh Python’s Scientific Stack

7. Bokeh Python’s Scientific Stack

8. Python’s Scientific Ecosystem (and many, many more) Bokeh

9. (and many, many more) Bokeh

10. © 2017 Anaconda, Inc. - Confidential & Proprietary Numeric Python’s virtues and vices • Fast: Native code with C/C++/CUDA • Intuitive: Long history with scientists and analysts • Established: Trusted and well understood • Broad: Packages for everything, community supported • But wasn’t designed to scale: • Limited to a single thread • Limited to in-memory data

11. © 2017 Anaconda, Inc. - Confidential & Proprietary How do we scale an ecosystem? From a parallel computing perspective

12. © 2017 Anaconda, Inc. - Confidential & Proprietary • Designed to parallelize the Python ecosystem • Flexible parallel computing paradigm • Familiar APIs for Python users • Co-developed with Pandas/SKLearn/Jupyter teams • Scales • Scales from multicore to 1000-node clusters • Resilience, responsive, and real-time

13. © 2017 Anaconda, Inc. - Confidential & Proprietary • High Level: Parallel NumPy, Pandas, ML • Satisfies subset of these APIs • Uses these libraries internally • Co-developed with these teams • Low Level: Task scheduling for arbitrary execution • Parallelize existing code • Build novel real-time systems • Arbitrary task graphs with data dependencies • Same scalability

14. © 2017 Anaconda, Inc. - Confidential & Proprietary demo • High level: Scaling Pandas • Same Pandas look and feel • Uses Pandas under the hood • Scales nicely onto many machines • Low level: Arbitrary task scheduling • Parallelize normal Python code • Build custom algorithms • React real-time • Demo deployed with • dask-kubernetes Google Compute Engine • github.com/dask/dask-kubernetes • Youtube link • https://www.youtube.com/watch?v=o ds97a5Pzw0&

15. © 2017 Anaconda, Inc. - Confidential & Proprietary What makes Dask different?

16. © 2017 Anaconda, Inc. - Confidential & Proprietary Most Parallel Frameworks Follow the following architecture 1. High level user-facing API like the SQL language, or Linear Algebra 2. Medium level query plan For databases/Spark: Big data map-steps, shuffle-steps, and aggregation-steps For arrays: Matrix multiplies, transposes, slicing 3. Low-level task graph Read 100MB chunk of data, run black-box function on it 4. Execution system Run task 9352 on worker 32, move data x-123 to worker 26 Flow from higher to lower level abstractions

17. © 2017 Anaconda, Inc. - Confidential & Proprietary Most Parallel Framework Architectures User API High Level Representation Logical Plan Low Level Representation Physical Plan Task scheduler for execution

18. © 2017 Anaconda, Inc. - Confidential & Proprietary SQL Database Architecture SELECT avg(value) FROM accounts INNER JOIN customers ON … WHERE name == ‘Alice’

19. © 2017 Anaconda, Inc. - Confidential & Proprietary SQL Database Architecture SELECT avg(value) FROM accounts WHERE name == ‘Alice’ INNER JOIN customers ON … Optimize

20. © 2017 Anaconda, Inc. - Confidential & Proprietary Spark Architecture df.join(df2, …) .select(…) .filter(…) Optimize

21. © 2017 Anaconda, Inc. - Confidential & Proprietary Large Matrix Architecture (A’ * A) A’ * b Optimize

22. © 2017 Anaconda, Inc. - Confidential & Proprietary Dask Architecture accts=dd.read_parquet(…) accts=accts[accts.name == ‘Alice’] df=dd.merge(accts, customers) df.value.mean().compute() Dask doesn’t have a high-level abstraction Dask can’t optimize But Dask is general to many domains

23. © 2017 Anaconda, Inc. - Confidential & Proprietary Dask Architecture u, s, v = da.linalg.svd(X) Y = u.dot(da.diag(s)).dot(v.T) da.linalg.norm(X - y)

24. © 2017 Anaconda, Inc. - Confidential & Proprietary Dask Architecture for i in range(256): x = dask.delayed(f)(i) y = dask.delayed(g)(x) z = dask.delayed(add)(x, y

25. © 2017 Anaconda, Inc. - Confidential & Proprietary Dask Architecture async def func(): client = await Client() futures = client.map(…) async for f in as_completed(…): result = await f

26. © 2017 Anaconda, Inc. - Confidential & Proprietary Dask Architecture Your own system here

27. © 2017 Anaconda, Inc. - Confidential & Proprietary High-level representations are powerful But they also box you in

28. © 2017 Anaconda, Inc. - Confidential & Proprietary Spark Map stage Shuffle stage Reduce stage Dask

29. © 2017 Anaconda, Inc. - Confidential & Proprietary DaskSpark Map stage Shuffle stage Reduce stage

30. © 2017 Anaconda, Inc. - Confidential & Proprietary By dropping the high level representation Costs • Lose specialization • Lose opportunities for high level optimization Benefits • Become generalists • More flexibility for new domains and algorithms • Access to smarter algorithms • Better task scheduling Resource constraints, GPUs, multiple clients, async-real-time, etc..

31. © 2017 Anaconda, Inc. - Confidential & Proprietary Ten Reasons People Choose Dask

32. © 2017 Anaconda, Inc. - Confidential & Proprietary 1. Scalable Pandas DataFrames • Same API import dask.dataframe as dd df = dd.read_parquet(‘s3://bucket/accounts/2017') df.groupby(df.name).value.mean().compute() • Efficient Timeseries Operations # Use the pandas index for efficient operations df.loc[‘2017-01-01’] df.value.rolling(10).std() df.value.resample(‘10m’).mean() • Co-developed with Pandas and by the Pandas developer community

33. © 2017 Anaconda, Inc. - Confidential & Proprietary 2. Scalable NumPy Arrays • Same API import dask.array as da x = da.from_array(my_hdf5_file) y = x.dot(x.T) • Applications • Atmospheric science • Satellite imagery • Biomedical imagery • Optimization algorithms check out dask-glm

34. © 2017 Anaconda, Inc. - Confidential & Proprietary 3. Parallelize Scikit-Learn/Joblib • Scikit-Learn parallelizes with Joblib estimator = RandomForest(…) estimator.fit(train_data, train_labels, njobs=8) • Joblib can use Dask from sklearn.externals.joblib import parallel_backend with parallel_backend('dask', scheduler=‘…’): estimator.fit(train_data, train_labels) https://pythonhosted.org/joblib/ http://distributed.readthedocs.io/en/latest/joblib.html Joblib Thread pool

35. © 2017 Anaconda, Inc. - Confidential & Proprietary 3. Parallelize Scikit-Learn/Joblib • Scikit-Learn parallelizes with Joblib estimator = RandomForest(…) estimator.fit(train_data, train_labels, njobs=8) • Joblib can use Dask from sklearn.externals.joblib import parallel_backend with parallel_backend('dask', scheduler=‘…’): estimator.fit(train_data, train_labels) https://pythonhosted.org/joblib/ http://distributed.readthedocs.io/en/latest/joblib.html Joblib Dask

36. © 2017 Anaconda, Inc. - Confidential & Proprietary 4. Parallelize Existing Codebases • Parallelize custom code with minimal intrusion results = {} for x in X: for y in Y: if x < y: result = f(x, y) else: result = g(x, y) results.append(result) • Good for algorithm researchers • Good for enterprises with entrenched business logic M Tepper, G Sapiro “Compressed nonnegative matrix factorization is fast and accurate”, IEEE Transactions on Signal Processing, 2016

37. © 2017 Anaconda, Inc. - Confidential & Proprietary 4. Parallelize Existing Codebases • Parallelize custom code with minimal intrusion f = dask.delayed(f) g = dask.delayed(g) results = {} for x in X: for y in Y: if x < y: result = f(x, y) else: result = g(x, y) results.append(result) result = dask.compute(results) • Good for algorithm researchers • Good for enterprises with entrenched business logic M Tepper, G Sapiro “Compressed nonnegative matrix factorization is fast and accurate”, IEEE Transactions on Signal Processing, 2016

38. © 2017 Anaconda, Inc. - Confidential & Proprietary 5. Many Other Libraries in Anaconda • Scikit-Image uses Dask to break down images and accelerate algorithms with overlapping regions • Geopandas can scale with Dask • Spatial partitioning • Accelerate spatial joins • (new work)

39. © 2017 Anaconda, Inc. - Confidential & Proprietary 6. Dask Scales Up • Thousand node clusters • Cloud computing • Super computers • Gigabyte/s bandwidth • 200 microsecond task overhead Dask Scales Down (the median cluster size is one) • Can run in a single Python thread pool • Almost no performance penalty (microseconds) • Lightweight • Few dependencies • Easy install

40. © 2017 Anaconda, Inc. - Confidential & Proprietary 7. Parallelize Web Backends • Web servers process thousands of small computations asynchronously for web pages or REST endpoints • Dask provides dynamic, heterogenous computation • Supports small data • 10ms roundtrip times • Dynamic scaling for different loads • Supports asynchronous Python (like GoLang) async def serve(request): future = dask_client.submit(process, request) result = await future return result

41. © 2017 Anaconda, Inc. - Confidential & Proprietary 8. Debugging support • Clean Python tracebacks when user code breaks • Connect to remote workers with IPython sessions for advanced debugging

42. © 2017 Anaconda, Inc. - Confidential & Proprietary 9. Resource constraints • Define limited hardware resources for workers • Specify resource constraints when submitting tasks $ dask-worker … —resources GPU=2 $ dask-worker … —resources GPU=2 $ dask-worker … —resources special-db=1 dask.compute(…, resources={ x: {’GPU’: 1}, read: {‘special-db’: 1}) • Used for GPUs, big-memory machines, special hardware, database connections, I/O machines, etc..

43. © 2017 Anaconda, Inc. - Confidential & Proprietary 10. Beautiful Diagnostic Dashboards • Fast responsive dashboards • Provide users performance insight • Powered by Bokeh Bokeh

44. © 2017 Anaconda, Inc. - Confidential & Proprietary Some Reasons not to Choose Dask

45. © 2017 Anaconda, Inc. - Confidential & Proprietary • Dask is not a SQL database. Does Pandas well, but won’t optimize complex queries • Dask is not a JVM technology It’s a Python library (although Julia bindings are available) • Dask is not a monolithic framework You’ll have to install Pandas, SKLearn and others as well Dask is small, designed to complement existing systems • Parallelism is not always necessary Use simple solutions if feasible Dask’s limitations

46. © 2017 Anaconda, Inc. - Confidential & Proprietary Why do people choose Dask? • Familiar with Python: • Drop-in NumPy/Pandas/SKLearn APIs • Native memory environment • Easy debugging and diagnostics • Have complex problems: • Parallelize existing code without expensive rewrites • Sophisticated algorithms and systems • Real-time response to small-data • Scales up and down: • Scales to 1000-node clusters • Also runs cheaply on a laptop #import pandas as pd import dask.dataframe as dd

47. © 2017 Anaconda, Inc. - Confidential & Proprietary Thank you for your time Questions?

48. © 2017 Anaconda, Inc. - Confidential & Proprietary dask.pydata.org conda install dask

49. © 2017 Anaconda, Inc. - Confidential & Proprietary

#import presentations

Introduction to ml
18. 06. 2020
0 views

Introduction to ml

Python OpenCV Real Time projects
31. 05. 2020
0 views

Python OpenCV Real Time projects

Related presentations