Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 35 additions & 34 deletions docs/source/user-guide/latest/understanding-comet-plans.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ inspect that behavior.

When Comet is enabled, the `CometSparkSessionExtensions` rules walk the
physical plan bottom-up and replace Spark operators with Comet equivalents
where possible. Consecutive native operators are combined into a single block
that is serialized as protobuf and executed by DataFusion on the executor.
where possible. Consecutive Rust-implemented operators are combined into a
single block that is serialized as protobuf and executed by DataFusion on the
executor.
Operators that Comet does not support remain as their original Spark form.

As a result, a plan can mix three kinds of nodes:

- **`Comet*` nodes** that run natively in Rust (for example `CometProject`,
- **`Comet*` nodes** that are implemented in Rust (for example `CometProject`,
`CometHashAggregate`).
- **`Comet*` nodes that run on the JVM** but are still part of the Comet
pipeline (for example `CometBroadcastExchange`, `CometColumnarExchange`).
Expand Down Expand Up @@ -62,9 +63,9 @@ the same plan is shown in the Spark SQL UI. When reading a plan, look for:

## Fallback

A "fallback" happens when Comet cannot translate part of a plan into native
execution. Fallback can be partial (a subtree falls back while the rest stays
native) or full (no Comet nodes appear).
A "fallback" happens when Comet cannot run part of a plan in the Comet

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also mention fallback to columnar?

pipeline. Fallback can be partial (a subtree falls back to Spark while the rest
stays in the Comet pipeline) or full (no Comet nodes appear).

Common reasons:

Expand All @@ -88,15 +89,15 @@ They serve different purposes and produce output in different places.

| Config | Output destination | What you see |
| ---------------------------------------- | ---------------------------------- | --------------------------------------------------------------------------------------------- |
| `spark.comet.explainFallback.enabled` | Driver log (only when fallback) | A WARN with the list of reasons each query stage could not run natively. |
| `spark.comet.explainFallback.enabled` | Driver log (only when fallback) | A WARN with the list of reasons each query stage could not run in Comet. |
| `spark.comet.logFallbackReasons.enabled` | Driver log | One WARN per fallback reason as it is encountered, without surrounding plan context. |
| `spark.comet.explain.format` | Spark SQL UI (Spark 4.0 and newer) | Annotated plan or fallback-reason list, depending on `verbose` (default) or `fallback` value. |
| `spark.comet.explain.native.enabled` | Executor logs, per task | The DataFusion native plan with metrics, useful for inspecting native execution. |
| `spark.comet.explain.native.enabled` | Executor logs, per task | The DataFusion plan with metrics, useful for inspecting Rust execution. |

### `spark.comet.explainFallback.enabled`

Logs a single WARN listing the reasons each query stage could not be executed
natively. Nothing is logged when the entire stage runs in Comet. Useful as a
in Comet. Nothing is logged when the entire stage runs in Comet. Useful as a
low-noise check that fallback is or is not happening.

### `spark.comet.logFallbackReasons.enabled`
Expand Down Expand Up @@ -131,12 +132,12 @@ the config has no effect there.

### `spark.comet.explain.native.enabled`

When enabled, each executor task logs the DataFusion native plan it executes,
When enabled, each executor task logs the DataFusion plan it executes,
along with metrics. This is verbose because there is one plan per task, but it
is the only way to see the native plan as DataFusion sees it (including how
is the only way to see the plan as DataFusion sees it (including how
operators were arranged after Comet's serialization). See the
[Metrics Guide](metrics.md) for details on the native metrics that appear in
this output.
[Metrics Guide](metrics.md) for details on the DataFusion metrics that appear
in this output.

## Programmatic Access to Fallback Reasons

Expand Down Expand Up @@ -193,14 +194,14 @@ by role. Names match what is shown in the plan output.
| Node | Description |
| ------------------------ | --------------------------------------------------------------------------------------------- |
| `CometBatchScan` | DataSource V2 scan, including Iceberg Parquet, that produces Arrow batches consumed by Comet. |
| `CometNativeScan` | Fully native Parquet scan that runs entirely in DataFusion. |
| `CometIcebergNativeScan` | Fully native Iceberg Parquet scan. |
| `CometCsvNativeScan` | Fully native CSV scan (experimental). |
| `CometNativeScan` | Parquet scan that runs entirely in Rust via DataFusion. |
| `CometIcebergNativeScan` | Iceberg Parquet scan that runs entirely in Rust via DataFusion. |
| `CometCsvNativeScan` | CSV scan that runs entirely in Rust via DataFusion (experimental). |

### Native Execution Operators
### Native Rust Operators

These run natively in DataFusion. When several appear consecutively in a plan,
they execute as a single fused native block.
These are implemented in Rust and run in DataFusion. When several appear
consecutively in a plan, they execute as a single fused block.

| Node | Spark equivalent |
| ------------------------------ | ----------------------------------------------- |
Expand All @@ -223,13 +224,13 @@ they execute as a single fused native block.

These keep their data on the JVM but participate in the Comet pipeline.

| Node | Notes |
| ------------------------ | ------------------------------------------------------------------------------------- |
| `CometUnion` | JVM-side union of Comet inputs. The native side reads each branch as a separate scan. |
| `CometCoalesce` | JVM-side partition coalesce. |
| `CometCollectLimit` | JVM-side collect limit, equivalent to `CollectLimitExec`. |
| `CometBroadcastExchange` | Broadcast exchange producing serialized Arrow batches that the consumer can decode. |
| `CometSubqueryBroadcast` | Companion to `CometBroadcastExchange` for dynamic partition pruning subqueries. |
| Node | Notes |
| ------------------------ | ----------------------------------------------------------------------------------- |
| `CometUnion` | JVM-side union of Comet inputs. The Rust side reads each branch as a separate scan. |
| `CometCoalesce` | JVM-side partition coalesce. |
| `CometCollectLimit` | JVM-side collect limit, equivalent to `CollectLimitExec`. |
| `CometBroadcastExchange` | Broadcast exchange producing serialized Arrow batches that the consumer can decode. |
| `CometSubqueryBroadcast` | Companion to `CometBroadcastExchange` for dynamic partition pruning subqueries. |

### Shuffle Operators

Expand All @@ -239,8 +240,8 @@ use:
- **`CometExchange`** is the **native shuffle** path. The child must already
be a Comet operator producing columnar Arrow batches; the node calls
`executeColumnar()` on its child and the partition, encode, and compress
steps run in native code. Hash and range partitioning **keys** must be
primitive types because native hashing and ordering do not support complex
steps run in Rust. Hash and range partitioning **keys** must be
primitive types because the Rust hashing and ordering do not support complex
types, but the data columns themselves can include `StructType`,
`ArrayType`, and `MapType` since batches are serialized via the Arrow IPC
writer.
Expand All @@ -265,12 +266,12 @@ Comet inserts these nodes wherever data has to cross the columnar/row boundary.
Multiple implementations exist because the optimal strategy depends on what
produced the columnar data.

| Node | Direction | Notes |
| ------------------------------ | ------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `CometColumnarToRow` | columnar → row | JVM-based row conversion. A fork of Spark's `ColumnarToRowExec` that includes the SPARK-50235 fix. |
| `CometNativeColumnarToRow` | columnar → row | Native row conversion that decodes broadcast Arrow batches via `NativeColumnarToRowConverter`. Used downstream of `CometBroadcastExchange`. Zero-copy for variable-length types and avoids an extra JVM materialization step. |
| `CometSparkColumnarToColumnar` | columnar → columnar | Converts a Spark columnar input (a non-Comet `ColumnarBatch`) into Comet's Arrow batches. |
| `CometSparkRowToColumnar` | row → columnar | Converts a Spark row input into Comet's Arrow batches. |
| Node | Direction | Notes |
| ------------------------------ | ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `CometColumnarToRow` | columnar → row | JVM-based row conversion. A fork of Spark's `ColumnarToRowExec` that includes the SPARK-50235 fix. |
| `CometNativeColumnarToRow` | columnar → row | Rust-based row conversion that decodes broadcast Arrow batches via `NativeColumnarToRowConverter`. Used downstream of `CometBroadcastExchange`. Zero-copy for variable-length types and avoids an extra JVM materialization step. |
| `CometSparkColumnarToColumnar` | columnar → columnar | Converts a Spark columnar input (a non-Comet `ColumnarBatch`) into Comet's Arrow batches. |
| `CometSparkRowToColumnar` | row → columnar | Converts a Spark row input into Comet's Arrow batches. |

The two `CometSpark*` names come from a single `CometSparkToColumnarExec`
operator that picks the node name based on whether its child supports
Expand Down