diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 2e8879ded1..58aba3fb12 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -318,6 +318,7 @@ jobs: org.apache.comet.CometFuzzTestSuite org.apache.comet.CometFuzzIcebergSuite org.apache.comet.DataGeneratorSuite + org.apache.comet.CometDeltaReadSuite - name: "shuffle" value: | org.apache.comet.exec.CometShuffleSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index a2d22c30f9..2c3cd91c98 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -134,6 +134,7 @@ jobs: org.apache.comet.CometFuzzTestSuite org.apache.comet.CometFuzzIcebergSuite org.apache.comet.DataGeneratorSuite + org.apache.comet.CometDeltaReadSuite - name: "shuffle" value: | org.apache.comet.exec.CometShuffleSuite diff --git a/pom.xml b/pom.xml index f53b153d28..9760a5b065 100644 --- a/pom.xml +++ b/pom.xml @@ -72,6 +72,10 @@ under the License. 4.1.2 4.1 provided + + 4.1.0 + delta-spark 3.25.5 1.16.0 provided @@ -654,6 +658,8 @@ under the License. 2.12 3.4.3 3.4 + 2.4.0 + delta-core 1.13.1 4.8.8 2.0.6 @@ -672,6 +678,7 @@ under the License. 2.12 3.5.8 3.5 + 3.3.2 1.13.1 4.8.8 2.0.7 @@ -690,6 +697,7 @@ under the License. 2.13 4.0.2 4.0 + 4.0.0 1.15.2 4.13.6 2.0.16 @@ -732,6 +740,7 @@ under the License. 2.13 4.2.0-preview4 4.2 + 4.2.0 1.17.0 4.13.6 2.0.17 diff --git a/spark/pom.xml b/spark/pom.xml index 780ecb63aa..c9e2f83853 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -87,6 +87,23 @@ under the License. junit-4-13_${scala.binary.version} test + + + io.delta + ${delta.artifact.name}_${scala.binary.version} + ${delta.version} + test + + + + com.google.guava + failureaccess + 1.0.2 + test + org.apache.spark spark-core_${scala.binary.version} diff --git a/spark/src/main/scala/org/apache/comet/CometConf.scala b/spark/src/main/scala/org/apache/comet/CometConf.scala index aabd64b9b3..3bf22665b2 100644 --- a/spark/src/main/scala/org/apache/comet/CometConf.scala +++ b/spark/src/main/scala/org/apache/comet/CometConf.scala @@ -132,6 +132,16 @@ object CometConf extends ShimCometConf { .checkValue(v => v > 0, "Data file concurrency limit must be positive") .createWithDefault(1) + val COMET_DELTA_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.scan.delta.enabled") + .category(CATEGORY_SCAN) + .doc( + "When enabled, Comet will natively scan Delta Lake tables that store plain Parquet, " + + "reusing Comet's native Parquet reader. Delta tables that use deletion vectors or " + + "column mapping are not supported and fall back to Spark. Experimental.") + .booleanConf + .createWithDefault(false) + val COMET_CSV_V2_NATIVE_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.scan.csv.v2.enabled") .category(CATEGORY_TESTING) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index af68f9530a..7a37470f53 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -168,6 +168,12 @@ case class CometScanRule(session: SparkSession) if (!CometScanExec.isFileFormatSupported(r.fileFormat)) { return withFallbackReason(scanExec, s"Unsupported file format ${r.fileFormat}") } + if (isDeltaFileFormat(r)) { + val deltaReason = deltaScanUnsupportedReason(r, scanExec) + if (deltaReason.isDefined) { + return withFallbackReason(scanExec, deltaReason.get) + } + } val hadoopConf = r.sparkSession.sessionState.newHadoopConfWithOptions(r.options) // TODO is this restriction valid for all native scan types? @@ -192,6 +198,50 @@ case class CometScanRule(session: SparkSession) } } + private def isDeltaFileFormat(r: HadoopFsRelation): Boolean = + r.fileFormat.getClass.getName == CometScanExec.DELTA_PARQUET_FILE_FORMAT + + /** + * Returns a fallback reason if a Delta scan uses features that Comet's native Parquet reader + * cannot reproduce. Tier 0 native Delta support only covers plain Parquet tables; column + * mapping and deletion vectors must fall back to Spark to avoid incorrect results. + */ + private def deltaScanUnsupportedReason( + r: HadoopFsRelation, + scanExec: FileSourceScanExec): Option[String] = { + // Column mapping remaps logical names to physical Parquet names. Delta strips the mapping + // from the schema it exposes to Spark and applies it inside DeltaParquetFileFormat, so the + // only reliable signal is the file format's columnMappingMode. + if (deltaColumnMappingEnabled(r)) { + return Some("Comet native Delta scan does not support column mapping") + } + // With deletion vectors, Delta injects synthetic __delta_internal_* columns plus a row filter. + // The native scan would not reproduce that filter, so deleted rows could leak through. + val hasInternalCols = + (scanExec.requiredSchema.fieldNames ++ scanExec.output.map(_.name)) + .exists(_.startsWith("__delta_internal")) + if (hasInternalCols) { + return Some("Comet native Delta scan does not support deletion vectors") + } + None + } + + /** + * Reads DeltaParquetFileFormat.columnMappingMode reflectively (no compile-time dependency on + * delta-spark). The mode's name is "none", "name", or "id". On any reflection failure we treat + * the table as column-mapped so that we conservatively fall back to Spark. + */ + private def deltaColumnMappingEnabled(r: HadoopFsRelation): Boolean = { + try { + val ff = r.fileFormat + val mode = ff.getClass.getMethod("columnMappingMode").invoke(ff) + val name = mode.getClass.getMethod("name").invoke(mode).asInstanceOf[String] + !name.equalsIgnoreCase("none") + } catch { + case _: Throwable => true + } + } + private def nativeScan( plan: SparkPlan, session: SparkSession, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index 8dd092fc74..05647b3195 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -40,6 +40,8 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.collection._ +import org.apache.comet.CometConf + /** * Comet physical scan node for DataSource V1. This node is created by CometScanRule as a planning * intermediate and is always replaced before execution: CometExecRule converts it to a @@ -415,9 +417,18 @@ object CometScanExec { } def isFileFormatSupported(fileFormat: FileFormat): Boolean = { - // Only support Spark's built-in Parquet scans, not others such as Delta which use a subclass - // of ParquetFileFormat. - fileFormat.getClass().equals(classOf[ParquetFileFormat]) + // Spark's built-in Parquet scan is always supported (exact class match). Delta uses a + // subclass of ParquetFileFormat; allow it only when native Delta scan is explicitly enabled. + // CometScanRule applies further guards to fall back for deletion vectors / column mapping. + fileFormat.getClass().equals(classOf[ParquetFileFormat]) || + (CometConf.COMET_DELTA_NATIVE_SCAN_ENABLED.get() && + fileFormat.getClass.getName == DELTA_PARQUET_FILE_FORMAT) } + /** + * Fully qualified class name of Delta's Parquet file format (matched by name to avoid a + * compile-time dependency on delta-spark). + */ + val DELTA_PARQUET_FILE_FORMAT: String = "org.apache.spark.sql.delta.DeltaParquetFileFormat" + } diff --git a/spark/src/test/scala/org/apache/comet/CometDeltaReadSuite.scala b/spark/src/test/scala/org/apache/comet/CometDeltaReadSuite.scala new file mode 100644 index 0000000000..836f532467 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/CometDeltaReadSuite.scala @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import org.apache.spark.SparkConf +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.internal.SQLConf + +/** + * Empirical analysis of how Comet handles reads of Delta Lake tables. + * + * Comet has no native Delta integration (see docs/source/about/gluten_comparison.md). These tests + * pin down what actually happens to a Delta scan, end to end, on a real Comet + Delta build: + * + * 1. No native Comet scan is ever used for Delta. Delta reads through a `FileSourceScanExec` + * backed by `DeltaParquetFileFormat`, a subclass of Spark's `ParquetFileFormat`. + * `CometScanExec.isFileFormatSupported` requires the EXACT `ParquetFileFormat` class, so the + * Delta scan is never replaced by a Comet native Parquet scan. + * + * 2. With `spark.comet.convert.parquet.enabled=true`, `CometExecRule` matches the Delta scan via + * `case _: ParquetFileFormat` (which DOES match the subclass) and wraps Spark's Delta scan in a + * `CometSparkToColumnarExec`. The file bytes are still decoded by Spark, but the rows are + * converted to Arrow so downstream operators (filter, project, aggregate, ...) run natively. + * + * This suite requires the `delta-spark` test dependency declared in `spark/pom.xml`. + */ +class CometDeltaReadSuite extends CometTestBase { + + private val nativeScanNodeNames = + Set("CometScanExec", "CometNativeScanExec", "CometBatchScanExec") + + /** + * Tier 0 detection relies on DeltaParquetFileFormat exposing `columnMappingMode`. Cancel the + * test on Delta versions that do not, so this suite stays green across the Spark/Delta build + * matrix without asserting behaviour we cannot guarantee. + */ + private def assumeDeltaTier0(): Unit = { + val ok = + try { + Class + .forName("org.apache.spark.sql.delta.DeltaParquetFileFormat") + .getMethods + .exists(_.getName == "columnMappingMode") + } catch { + case _: Throwable => false + } + assume(ok, "Delta version does not expose columnMappingMode; skipping Tier 0 assertions") + } + + override protected def sparkConf: SparkConf = { + val conf = super.sparkConf + conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + conf + } + + private def writeDeltaTable(path: String, properties: Map[String, String] = Map.empty): Unit = { + val writer = spark + .range(0, 100) + .selectExpr("id", "id % 5 as grp", "cast(id as string) as name") + .write + .format("delta") + properties.foldLeft(writer) { case (w, (k, v)) => w.option(k, v) }.save(path) + } + + /** + * delta-spark 4.1.0 is built against Spark 4.1.0, but Spark 4.1.2 removed + * `org.apache.spark.sql.catalyst.plans.logical.IgnoreCachedData`, which Delta DML commands + * (DELETE/UPDATE/ALTER) extend. That skew is unrelated to Comet, so cancel rather than fail. + */ + private def cancelIfDeltaDmlSkew(t: Throwable): Nothing = { + var c: Throwable = t + while (c != null) { + if (c.toString.contains("IgnoreCachedData")) { + cancel( + "Skipping: delta-spark 4.1.0 DML is binary-incompatible with this Spark " + + "(IgnoreCachedData was removed after 4.1.0); read-path coverage is unaffected.") + } + c = c.getCause + } + throw t + } + + /** Force execution, then return every physical-plan node (AQE stripped). */ + private def planNodes(df: DataFrame): Seq[SparkPlan] = { + df.collect() + stripAQEPlan(df.queryExecution.executedPlan).collect { case p => p } + } + + private def nodeNames(df: DataFrame): Seq[String] = + planNodes(df).map(_.getClass.getSimpleName) + + private def fileFormatOf(df: DataFrame): String = + planNodes(df) + .collectFirst { case f: FileSourceScanExec => + f.relation.fileFormat.getClass.getName + } + .getOrElse("") + + test("Delta scan is never replaced by a native Comet scan") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true", + CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "false") { + withTempPath { dir => + val path = dir.getCanonicalPath + writeDeltaTable(path) + val df = spark.read.format("delta").load(path) + val names = nodeNames(df) + val fmt = fileFormatOf(df) + info(s"convert=off plan nodes: ${names.mkString(", ")}") + info(s"file format: $fmt") + assert(fmt.toLowerCase.contains("delta"), s"expected a Delta file format, got $fmt") + assert( + !names.exists(nativeScanNodeNames.contains), + s"Comet must NOT natively scan Delta, but found: ${names.mkString(", ")}") + assert( + !names.contains("CometSparkToColumnarExec"), + "with convert.parquet disabled Comet should not touch the Delta scan") + checkSparkAnswer(df) + } + } + } + + test("convert.parquet wraps the Spark Delta scan with CometSparkToColumnarExec") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true") { + withTempPath { dir => + val path = dir.getCanonicalPath + writeDeltaTable(path) + val df = spark.read + .format("delta") + .load(path) + .filter("id > 10") + .selectExpr("grp", "id + 1 as id1") + val names = nodeNames(df) + info(s"convert=on plan nodes: ${names.mkString(", ")}") + assert( + names.contains("CometSparkToColumnarExec"), + s"expected CometSparkToColumnarExec over the Delta scan, got: ${names.mkString(", ")}") + assert( + !names.exists(nativeScanNodeNames.contains), + "file reading must remain in Spark even when converting to Arrow") + checkSparkAnswer(df) + } + } + } + + test("convert.parquet lets downstream aggregate run natively over a Delta scan") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true") { + withTempPath { dir => + val path = dir.getCanonicalPath + writeDeltaTable(path) + val df = spark.read + .format("delta") + .load(path) + .groupBy("grp") + .count() + val names = nodeNames(df) + info(s"aggregate plan nodes: ${names.mkString(", ")}") + assert(names.contains("CometSparkToColumnarExec")) + assert( + names.exists(_.startsWith("CometHashAggregate")), + s"expected a native Comet aggregate, got: ${names.mkString(", ")}") + checkSparkAnswer(df) + } + } + } + + test("Delta read with deletion vectors: no native scan, correct results") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true") { + withTempPath { dir => + val path = dir.getCanonicalPath + writeDeltaTable(path, Map("delta.enableDeletionVectors" -> "true")) + try { + spark.sql(s"DELETE FROM delta.`$path` WHERE id % 7 = 0") + } catch { + case e: Throwable => cancelIfDeltaDmlSkew(e) + } + val df = spark.read.format("delta").load(path) + val names = nodeNames(df) + info(s"deletion-vectors plan nodes: ${names.mkString(", ")}") + assert( + !names.exists(nativeScanNodeNames.contains), + s"Comet must NOT natively scan Delta with deletion vectors: ${names.mkString(", ")}") + checkSparkAnswer(df) + assert(df.count() == 100 - (0 until 100).count(_ % 7 == 0)) + } + } + } + + test("Delta read with column mapping: no native scan, correct results") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true") { + withTempPath { dir => + val path = dir.getCanonicalPath + writeDeltaTable( + path, + Map( + "delta.columnMapping.mode" -> "name", + "delta.minReaderVersion" -> "2", + "delta.minWriterVersion" -> "5")) + val df = spark.read.format("delta").load(path) + val names = nodeNames(df) + info(s"column-mapping plan nodes: ${names.mkString(", ")}") + assert( + !names.exists(nativeScanNodeNames.contains), + s"Comet must NOT natively scan Delta with column mapping: ${names.mkString(", ")}") + checkSparkAnswer(df) + } + } + } + + // --------------------------------------------------------------------------- + // Tier 0: native Delta scan for plain Parquet tables, behind + // spark.comet.scan.delta.enabled. Deletion vectors and column mapping must + // still fall back to Spark. + // --------------------------------------------------------------------------- + + test("Tier 0: plain Delta table is scanned natively when enabled") { + assumeDeltaTier0() + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + CometConf.COMET_DELTA_NATIVE_SCAN_ENABLED.key -> "true", + CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "false") { + withTempPath { dir => + val path = dir.getCanonicalPath + writeDeltaTable(path) + val df = spark.read.format("delta").load(path) + val names = nodeNames(df) + info(s"tier0 plain plan nodes: ${names.mkString(", ")}") + assert( + names.exists(nativeScanNodeNames.contains), + s"expected a native Comet scan of the Delta table, got: ${names.mkString(", ")}") + assert( + !names.contains("FileSourceScanExec"), + "the Spark Delta FileSourceScanExec should have been replaced by a native Comet scan") + checkSparkAnswer(df) + } + } + } + + test("Tier 0: partitioned Delta table is scanned natively when enabled") { + assumeDeltaTier0() + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + CometConf.COMET_DELTA_NATIVE_SCAN_ENABLED.key -> "true", + CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "false") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark + .range(0, 100) + .selectExpr("id", "id % 4 as part", "cast(id as string) as name") + .write + .format("delta") + .partitionBy("part") + .save(path) + val df = spark.read.format("delta").load(path).filter("part = 1") + val names = nodeNames(df) + info(s"tier0 partitioned plan nodes: ${names.mkString(", ")}") + assert( + names.exists(nativeScanNodeNames.contains), + s"expected a native Comet scan of the partitioned Delta table: ${names.mkString(", ")}") + checkSparkAnswer(df) + } + } + } + + test("Tier 0: deletion vectors still fall back to Spark even when enabled") { + assumeDeltaTier0() + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + CometConf.COMET_DELTA_NATIVE_SCAN_ENABLED.key -> "true", + CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "false") { + withTempPath { dir => + val path = dir.getCanonicalPath + writeDeltaTable(path, Map("delta.enableDeletionVectors" -> "true")) + try { + spark.sql(s"DELETE FROM delta.`$path` WHERE id % 7 = 0") + } catch { + case e: Throwable => cancelIfDeltaDmlSkew(e) + } + val df = spark.read.format("delta").load(path) + val names = nodeNames(df) + info(s"tier0 deletion-vectors plan nodes: ${names.mkString(", ")}") + // If no synthetic deletion-vector columns are present, this Delta version materialized the + // DELETE as a copy-on-write rewrite (no DV), so there is nothing to fall back for. + if (names.exists(nativeScanNodeNames.contains)) { + cancel("DELETE did not materialize a deletion vector on this Delta version") + } + assert( + !names.exists(nativeScanNodeNames.contains), + s"Delta tables with deletion vectors must fall back to Spark: ${names.mkString(", ")}") + checkSparkAnswer(df) + assert(df.count() == 100 - (0 until 100).count(_ % 7 == 0)) + } + } + } + + test("Tier 0: column mapping still falls back to Spark even when enabled") { + assumeDeltaTier0() + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + CometConf.COMET_DELTA_NATIVE_SCAN_ENABLED.key -> "true", + CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "false") { + withTempPath { dir => + val path = dir.getCanonicalPath + writeDeltaTable( + path, + Map( + "delta.columnMapping.mode" -> "name", + "delta.minReaderVersion" -> "2", + "delta.minWriterVersion" -> "5")) + val df = spark.read.format("delta").load(path) + val names = nodeNames(df) + info(s"tier0 column-mapping plan nodes: ${names.mkString(", ")}") + assert( + !names.exists(nativeScanNodeNames.contains), + s"Delta tables with column mapping must fall back to Spark: ${names.mkString(", ")}") + checkSparkAnswer(df) + } + } + } +}