From 09e22b329b6ec44d022fa7d1b923853bc457d49b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 2 Mar 2025 09:10:33 -0700 Subject: [PATCH 1/8] Update README --- README.md | 112 +++++++++++++++++-------------------------- docs/contributing.md | 87 +++++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+), 68 deletions(-) create mode 100644 docs/contributing.md diff --git a/README.md b/README.md index e497ca5..3a47226 100644 --- a/README.md +++ b/README.md @@ -19,98 +19,74 @@ # DataFusion on Ray -> This was originally a research project donated from [ray-sql] to evaluate performing distributed SQL queries from -> Python, using [Ray] and [Apache DataFusion] +## Overview -[ray-sql]: https://github.com/datafusion-contrib/ray-sql +DataFusion Ray is a distributed execution framework that enables DataFusion DataFrame and SQL queries to run on a +Ray cluster. This integration allows users to leverage Ray's dynamic scheduling capabilities while executing +queries in a distributed fashion. -DataFusion Ray is a distributed Python DataFrame and SQL query engine powered by the Rust implementation -of [Apache Arrow], [Apache DataFusion], and [Ray]. +## Execution Modes -[Ray]: https://www.ray.io/ -[Apache Arrow]: https://arrow.apache.org/ -[Apache DataFusion]: https://datafusion.apache.org/ +DataFusion Ray supports two execution modes: -## Comparison to other DataFusion projects +1. Greedy Execution Mode -### Comparison to DataFusion Ballista +This mode mimics the default execution strategy of DataFusion. Each operator in the query plan starts executing +as soon as its inputs are available, leading to a more pipelined execution model. -- Unlike [DataFusion Ballista], DataFusion Ray does not provide its own distributed scheduler and instead relies on - Ray for this functionality. As a result of this design choice, DataFusion Ray is a much smaller and simpler project. -- DataFusion Ray is Python-first, and DataFusion Ballista is Rust-first +**Pros:** -[DataFusion Ballista]: https://github.com/apache/datafusion-ballista +- Lower latency for streaming-like queries where intermediate results can be processed as they arrive. +- Efficient for smaller queries that fit within a single Ray task's memory. +- Good for interactive workloads. -### Comparison to DataFusion Python +**Cons:** -- [DataFusion Python] provides a Python DataFrame and SQL API for in-process execution. DataFusion Ray extends - DataFusion Python to provide scalability across multiple nodes. +- Can lead to high memory pressure as intermediate results are held in memory instead of being written to disk. +- More difficult to scale efficiently for large queries since tasks must hold data until downstream consumers are ready. -[DataFusion Python]: https://github.com/apache/datafusion-python +2. Spark-like Execution Mode -## Building +In this mode, execution follows a staged model similar to Apache Spark. Each query stage runs to completion, producing intermediate shuffle files that are persisted and used as input for the next stage. -To build DataFusion Ray, you will need rust installed, as well as [https://github.com/PyO3/maturin](maturin). +**Pros:** -Install maturin in your current python environment (a virtual environment is recommended), with +- Better memory management since intermediate results are persisted instead of held in memory. +- Improved scalability for large queries due to controlled task execution and shuffle handling. +- More predictable execution patterns suitable for batch processing. -```bash -pip install maturin -``` - -Then build the project with the following command: - -```bash -maturin develop # --release for a release build -``` - -## Example - -- In the `examples` directory, run - -```bash -RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tips.py --data-dir=$(pwd)/../testdata/tips/ -``` +**Cons:** -- In the `tpch` directory, use `make_data.py` to create a TPCH dataset at a provided scale factor, then +- Higher query latency due to the overhead of writing and reading shuffle files. +- May not be as efficient for interactive or low-latency queries. -```bash -RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpc.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --qnum 2 -``` - -To execute the TPCH query #2. To execute an arbitrary query against the TPCH dataset, provide it with `--query` instead of `--qnum`. This is useful for validating plans that DataFusion Ray will create. - -For example, to execute the following query: - -```bash -RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpc.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --query 'select c.c_name, sum(o.o_totalprice) as total from orders o inner join customer c on o.o_custkey = c.c_custkey group by c_name limit 1' -``` - -To further parallelize execution, you can choose how many partitions will be served by each Stage with `--partitions-per-worker`. If this number is less than `--concurrency` Then multiple Actors will host portions of the stage. For example, if there are 10 stages calculated for a query, `concurrency=16` and `partitions-per-worker=4`, then `40` `RayStage` Actors will be created. If `partitions-per-worker=16` or is absent, then `10` `RayStage` Actors will be created. +## Getting Started -To validate the output against non-ray single node datafusion, add `--validate` which will ensure that both systems produce the same output. +To use DataFusion Ray, install the required dependencies and configure your Ray cluster. Then, you can run queries +using DataFusion's familiar API while leveraging the distributed execution capabilities of Ray. -To run the entire TPCH benchmark use +```python +import ray +from datafusion_ray import DataFusionRaySession -```bash -RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 [--partitions-per-worker=] [--validate] +ray.init() +session = DataFusionRaySession() +df = session.sql("SELECT * FROM my_table WHERE value > 100") +df.show() ``` -This will output a json file in the current directory with query timings. - -## Logging - -DataFusion Ray's logging output is determined by the `DATAFUSION_RAY_LOG_LEVEL` environment variable. The default log level is `WARN`. To change the log level, set the environment variable to one of the following values: `ERROR`, `WARN`, `INFO`, `DEBUG`, or `TRACE`. +## Use Cases -DataFusion Ray outputs logs from both python and rust, and in order to handle this consistently, the python logger for `datafusion_ray` is routed to rust for logging. The `RUST_LOG` environment variable can be used to control other rust log output other than `datafusion_ray`. +Greedy Execution Mode is ideal for interactive queries and real-time analytics with lower latency requirements. -## Status +Spark-like Execution Mode is better suited for batch processing, large-scale queries, and workloads where memory +constraints are a concern. -- DataFusion Ray can execute all TPCH queries. Tested up to SF100. +## Contributing -## Known Issues +Contributions are welcome! Please open an issue or submit a pull request if you would like to contribute. See the +[contributor guide] for more information. -- We are waiting to upgrade to a DataFusion version where the parquet options are serialized into substrait in order to send them correctly in a plan. Currently, we - manually add back `table_parquet_options.pushdown_filters=true` after deserialization to compensate. This will be refactored in the future. +## License -see +DataFusion Ray is licensed under Apache 2.0. diff --git a/docs/contributing.md b/docs/contributing.md new file mode 100644 index 0000000..e93447c --- /dev/null +++ b/docs/contributing.md @@ -0,0 +1,87 @@ + + +# DataFusion Ray Contributor Guide + +## Building + +To build DataFusion Ray, you will need rust installed, as well as [https://github.com/PyO3/maturin](maturin). + +Install maturin in your current python environment (a virtual environment is recommended), with + +```bash +pip install maturin +``` + +Then build the project with the following command: + +```bash +maturin develop # --release for a release build +``` + +## Example + +- In the `examples` directory, run + +```bash +RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tips.py --data-dir=$(pwd)/../testdata/tips/ +``` + +- In the `tpch` directory, use `make_data.py` to create a TPCH dataset at a provided scale factor, then + +```bash +RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpc.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --qnum 2 +``` + +To execute the TPCH query #2. To execute an arbitrary query against the TPCH dataset, provide it with `--query` instead of `--qnum`. This is useful for validating plans that DataFusion Ray will create. + +For example, to execute the following query: + +```bash +RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpc.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --query 'select c.c_name, sum(o.o_totalprice) as total from orders o inner join customer c on o.o_custkey = c.c_custkey group by c_name limit 1' +``` + +To further parallelize execution, you can choose how many partitions will be served by each Stage with `--partitions-per-worker`. If this number is less than `--concurrency` Then multiple Actors will host portions of the stage. For example, if there are 10 stages calculated for a query, `concurrency=16` and `partitions-per-worker=4`, then `40` `RayStage` Actors will be created. If `partitions-per-worker=16` or is absent, then `10` `RayStage` Actors will be created. + +To validate the output against non-ray single node datafusion, add `--validate` which will ensure that both systems produce the same output. + +To run the entire TPCH benchmark use + +```bash +RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 [--partitions-per-worker=] [--validate] +``` + +This will output a json file in the current directory with query timings. + +## Logging + +DataFusion Ray's logging output is determined by the `DATAFUSION_RAY_LOG_LEVEL` environment variable. The default log level is `WARN`. To change the log level, set the environment variable to one of the following values: `ERROR`, `WARN`, `INFO`, `DEBUG`, or `TRACE`. + +DataFusion Ray outputs logs from both python and rust, and in order to handle this consistently, the python logger for `datafusion_ray` is routed to rust for logging. The `RUST_LOG` environment variable can be used to control other rust log output other than `datafusion_ray`. + +## Status + +- DataFusion Ray can execute all TPCH queries. Tested up to SF100. + +## Known Issues + +- We are waiting to upgrade to a DataFusion version where the parquet options are serialized into substrait in order to send them correctly in a plan. Currently, we + manually add back `table_parquet_options.pushdown_filters=true` after deserialization to compensate. This will be refactored in the future. + +see From 69ee30113d1a2dc51c2186462b11b0dcd8f6ca14 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 2 Mar 2025 09:13:03 -0700 Subject: [PATCH 2/8] update --- README.md | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 3a47226..91434fe 100644 --- a/README.md +++ b/README.md @@ -62,8 +62,10 @@ In this mode, execution follows a staged model similar to Apache Spark. Each que ## Getting Started -To use DataFusion Ray, install the required dependencies and configure your Ray cluster. Then, you can run queries -using DataFusion's familiar API while leveraging the distributed execution capabilities of Ray. +See the [contributor guide] for instructions on building DataFusion Ray. + +Once instralled, you can run queries using DataFusion's familiar API while leveraging the distributed execution +capabilities of Ray. ```python import ray @@ -90,3 +92,6 @@ Contributions are welcome! Please open an issue or submit a pull request if you ## License DataFusion Ray is licensed under Apache 2.0. + + +[contributor guide]: docs/contributing.md \ No newline at end of file From fbeb8d156b8224c2b6b9720bdea9aa70f192a059 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 2 Mar 2025 09:14:30 -0700 Subject: [PATCH 3/8] update --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 91434fe..d4b84fc 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ queries in a distributed fashion. DataFusion Ray supports two execution modes: -1. Greedy Execution Mode +### Greedy Execution Mode This mode mimics the default execution strategy of DataFusion. Each operator in the query plan starts executing as soon as its inputs are available, leading to a more pipelined execution model. @@ -45,7 +45,7 @@ as soon as its inputs are available, leading to a more pipelined execution model - Can lead to high memory pressure as intermediate results are held in memory instead of being written to disk. - More difficult to scale efficiently for large queries since tasks must hold data until downstream consumers are ready. -2. Spark-like Execution Mode +### Spark-like Execution Mode In this mode, execution follows a staged model similar to Apache Spark. Each query stage runs to completion, producing intermediate shuffle files that are persisted and used as input for the next stage. From 322a6a30a60f4142d3538fee3c69e8fe07582d56 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 2 Mar 2025 09:25:16 -0700 Subject: [PATCH 4/8] use terms batch vs streaming --- README.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index d4b84fc..2b96256 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ queries in a distributed fashion. DataFusion Ray supports two execution modes: -### Greedy Execution Mode +### Streaming Execution This mode mimics the default execution strategy of DataFusion. Each operator in the query plan starts executing as soon as its inputs are available, leading to a more pipelined execution model. @@ -45,9 +45,10 @@ as soon as its inputs are available, leading to a more pipelined execution model - Can lead to high memory pressure as intermediate results are held in memory instead of being written to disk. - More difficult to scale efficiently for large queries since tasks must hold data until downstream consumers are ready. -### Spark-like Execution Mode +### Batch Execution (not implemented yet) -In this mode, execution follows a staged model similar to Apache Spark. Each query stage runs to completion, producing intermediate shuffle files that are persisted and used as input for the next stage. +In this mode, execution follows a staged model similar to Apache Spark. Each query stage runs to completion, producing +intermediate shuffle files that are persisted and used as input for the next stage. **Pros:** @@ -64,7 +65,7 @@ In this mode, execution follows a staged model similar to Apache Spark. Each que See the [contributor guide] for instructions on building DataFusion Ray. -Once instralled, you can run queries using DataFusion's familiar API while leveraging the distributed execution +Once installed, you can run queries using DataFusion's familiar API while leveraging the distributed execution capabilities of Ray. ```python From b131d5a8e34e63ac82c85b4a6b9461df48d8bbe8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 2 Mar 2025 09:27:13 -0700 Subject: [PATCH 5/8] update code sample --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 2b96256..3676a42 100644 --- a/README.md +++ b/README.md @@ -70,10 +70,10 @@ capabilities of Ray. ```python import ray -from datafusion_ray import DataFusionRaySession +from datafusion_ray import DFRayContext ray.init() -session = DataFusionRaySession() +session = DFRayContext() df = session.sql("SELECT * FROM my_table WHERE value > 100") df.show() ``` From da2359349e1d26bb919688f672cee45c219ffb9b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 2 Mar 2025 09:28:17 -0700 Subject: [PATCH 6/8] add note that batch execution is not yet implemented --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 3676a42..d79fbd9 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,9 @@ as soon as its inputs are available, leading to a more pipelined execution model - Can lead to high memory pressure as intermediate results are held in memory instead of being written to disk. - More difficult to scale efficiently for large queries since tasks must hold data until downstream consumers are ready. -### Batch Execution (not implemented yet) +### Batch Execution + +_Note: Batch Execution is not implemented yet. Tracking issue: https://github.com/apache/datafusion-ray/issues/69_ In this mode, execution follows a staged model similar to Apache Spark. Each query stage runs to completion, producing intermediate shuffle files that are persisted and used as input for the next stage. From aab64fdf4f3e7ea19efa55b4c50bb68927ba6e89 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 2 Mar 2025 09:36:04 -0700 Subject: [PATCH 7/8] remove trade offs --- README.md | 29 ----------------------------- 1 file changed, 29 deletions(-) diff --git a/README.md b/README.md index d79fbd9..15c0f2c 100644 --- a/README.md +++ b/README.md @@ -34,17 +34,6 @@ DataFusion Ray supports two execution modes: This mode mimics the default execution strategy of DataFusion. Each operator in the query plan starts executing as soon as its inputs are available, leading to a more pipelined execution model. -**Pros:** - -- Lower latency for streaming-like queries where intermediate results can be processed as they arrive. -- Efficient for smaller queries that fit within a single Ray task's memory. -- Good for interactive workloads. - -**Cons:** - -- Can lead to high memory pressure as intermediate results are held in memory instead of being written to disk. -- More difficult to scale efficiently for large queries since tasks must hold data until downstream consumers are ready. - ### Batch Execution _Note: Batch Execution is not implemented yet. Tracking issue: https://github.com/apache/datafusion-ray/issues/69_ @@ -52,17 +41,6 @@ _Note: Batch Execution is not implemented yet. Tracking issue: https://github.co In this mode, execution follows a staged model similar to Apache Spark. Each query stage runs to completion, producing intermediate shuffle files that are persisted and used as input for the next stage. -**Pros:** - -- Better memory management since intermediate results are persisted instead of held in memory. -- Improved scalability for large queries due to controlled task execution and shuffle handling. -- More predictable execution patterns suitable for batch processing. - -**Cons:** - -- Higher query latency due to the overhead of writing and reading shuffle files. -- May not be as efficient for interactive or low-latency queries. - ## Getting Started See the [contributor guide] for instructions on building DataFusion Ray. @@ -80,13 +58,6 @@ df = session.sql("SELECT * FROM my_table WHERE value > 100") df.show() ``` -## Use Cases - -Greedy Execution Mode is ideal for interactive queries and real-time analytics with lower latency requirements. - -Spark-like Execution Mode is better suited for batch processing, large-scale queries, and workloads where memory -constraints are a concern. - ## Contributing Contributions are welcome! Please open an issue or submit a pull request if you would like to contribute. See the From a7ddca503f2a9de2a62908cc041a1041cf5ed736 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 2 Mar 2025 10:14:43 -0700 Subject: [PATCH 8/8] address feedback --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 15c0f2c..11951f7 100644 --- a/README.md +++ b/README.md @@ -50,9 +50,9 @@ capabilities of Ray. ```python import ray -from datafusion_ray import DFRayContext +from datafusion_ray import DFRayContext, df_ray_runtime_env -ray.init() +ray.init(runtime_env=df_ray_runtime_env) session = DFRayContext() df = session.sql("SELECT * FROM my_table WHERE value > 100") df.show()