Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ jobs:
org.apache.comet.CometFuzzTestSuite
org.apache.comet.CometFuzzIcebergSuite
org.apache.comet.DataGeneratorSuite
org.apache.comet.CometDeltaReadSuite
- name: "shuffle"
value: |
org.apache.comet.exec.CometShuffleSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ jobs:
org.apache.comet.CometFuzzTestSuite
org.apache.comet.CometFuzzIcebergSuite
org.apache.comet.DataGeneratorSuite
org.apache.comet.CometDeltaReadSuite
- name: "shuffle"
value: |
org.apache.comet.exec.CometShuffleSuite
Expand Down
9 changes: 9 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ under the License.
<spark.version>4.1.2</spark.version>
<spark.version.short>4.1</spark.version.short>
<spark.maven.scope>provided</spark.maven.scope>
<!-- Delta Lake version used only by Comet's Delta scan tests. Overridden per Spark profile.
Delta >= 3.0 publishes "delta-spark"; Delta < 3.0 (Spark 3.4) publishes "delta-core". -->
<delta.version>4.1.0</delta.version>
<delta.artifact.name>delta-spark</delta.artifact.name>
<protobuf.version>3.25.5</protobuf.version>
<parquet.version>1.16.0</parquet.version>
<parquet.maven.scope>provided</parquet.maven.scope>
Expand Down Expand Up @@ -654,6 +658,8 @@ under the License.
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.4.3</spark.version>
<spark.version.short>3.4</spark.version.short>
<delta.version>2.4.0</delta.version>
<delta.artifact.name>delta-core</delta.artifact.name>
<parquet.version>1.13.1</parquet.version>
<semanticdb.version>4.8.8</semanticdb.version>
<slf4j.version>2.0.6</slf4j.version>
Expand All @@ -672,6 +678,7 @@ under the License.
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.5.8</spark.version>
<spark.version.short>3.5</spark.version.short>
<delta.version>3.3.2</delta.version>
<parquet.version>1.13.1</parquet.version>
<semanticdb.version>4.8.8</semanticdb.version>
<slf4j.version>2.0.7</slf4j.version>
Expand All @@ -690,6 +697,7 @@ under the License.
<scala.binary.version>2.13</scala.binary.version>
<spark.version>4.0.2</spark.version>
<spark.version.short>4.0</spark.version.short>
<delta.version>4.0.0</delta.version>
<parquet.version>1.15.2</parquet.version>
<semanticdb.version>4.13.6</semanticdb.version>
<slf4j.version>2.0.16</slf4j.version>
Expand Down Expand Up @@ -732,6 +740,7 @@ under the License.
<scala.binary.version>2.13</scala.binary.version>
<spark.version>4.2.0-preview4</spark.version>
<spark.version.short>4.2</spark.version.short>
<delta.version>4.2.0</delta.version>
<parquet.version>1.17.0</parquet.version>
<semanticdb.version>4.13.6</semanticdb.version>
<slf4j.version>2.0.17</slf4j.version>
Expand Down
17 changes: 17 additions & 0 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,23 @@ under the License.
<artifactId>junit-4-13_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<!-- Used by CometDeltaReadSuite to exercise Comet's native scan of Delta Lake tables.
Version/artifact are set per Spark profile in the root pom (delta-spark for Delta >= 3.0,
delta-core for Delta < 3.0). The suite self-cancels when the resolved Delta version cannot
support Tier 0 detection. -->
<dependency>
<groupId>io.delta</groupId>
<artifactId>${delta.artifact.name}_${scala.binary.version}</artifactId>
<version>${delta.version}</version>
<scope>test</scope>
</dependency>
<!-- Guava's failureaccess is needed on the test classpath when the Delta catalog initializes. -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>failureaccess</artifactId>
<version>1.0.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Expand Down
10 changes: 10 additions & 0 deletions spark/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ object CometConf extends ShimCometConf {
.checkValue(v => v > 0, "Data file concurrency limit must be positive")
.createWithDefault(1)

val COMET_DELTA_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.scan.delta.enabled")
.category(CATEGORY_SCAN)
.doc(
"When enabled, Comet will natively scan Delta Lake tables that store plain Parquet, " +
"reusing Comet's native Parquet reader. Delta tables that use deletion vectors or " +
"column mapping are not supported and fall back to Spark. Experimental.")
.booleanConf
.createWithDefault(false)

val COMET_CSV_V2_NATIVE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.scan.csv.v2.enabled")
.category(CATEGORY_TESTING)
Expand Down
50 changes: 50 additions & 0 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ case class CometScanRule(session: SparkSession)
if (!CometScanExec.isFileFormatSupported(r.fileFormat)) {
return withFallbackReason(scanExec, s"Unsupported file format ${r.fileFormat}")
}
if (isDeltaFileFormat(r)) {
val deltaReason = deltaScanUnsupportedReason(r, scanExec)
if (deltaReason.isDefined) {
return withFallbackReason(scanExec, deltaReason.get)
}
}
val hadoopConf = r.sparkSession.sessionState.newHadoopConfWithOptions(r.options)

// TODO is this restriction valid for all native scan types?
Expand All @@ -192,6 +198,50 @@ case class CometScanRule(session: SparkSession)
}
}

private def isDeltaFileFormat(r: HadoopFsRelation): Boolean =
r.fileFormat.getClass.getName == CometScanExec.DELTA_PARQUET_FILE_FORMAT

/**
* Returns a fallback reason if a Delta scan uses features that Comet's native Parquet reader
* cannot reproduce. Tier 0 native Delta support only covers plain Parquet tables; column
* mapping and deletion vectors must fall back to Spark to avoid incorrect results.
*/
private def deltaScanUnsupportedReason(
r: HadoopFsRelation,
scanExec: FileSourceScanExec): Option[String] = {
// Column mapping remaps logical names to physical Parquet names. Delta strips the mapping
// from the schema it exposes to Spark and applies it inside DeltaParquetFileFormat, so the
// only reliable signal is the file format's columnMappingMode.
if (deltaColumnMappingEnabled(r)) {
return Some("Comet native Delta scan does not support column mapping")
}
// With deletion vectors, Delta injects synthetic __delta_internal_* columns plus a row filter.
// The native scan would not reproduce that filter, so deleted rows could leak through.
val hasInternalCols =
(scanExec.requiredSchema.fieldNames ++ scanExec.output.map(_.name))
.exists(_.startsWith("__delta_internal"))
if (hasInternalCols) {
return Some("Comet native Delta scan does not support deletion vectors")
}
None
}

/**
* Reads DeltaParquetFileFormat.columnMappingMode reflectively (no compile-time dependency on
* delta-spark). The mode's name is "none", "name", or "id". On any reflection failure we treat
* the table as column-mapped so that we conservatively fall back to Spark.
*/
private def deltaColumnMappingEnabled(r: HadoopFsRelation): Boolean = {
try {
val ff = r.fileFormat
val mode = ff.getClass.getMethod("columnMappingMode").invoke(ff)
val name = mode.getClass.getMethod("name").invoke(mode).asInstanceOf[String]
!name.equalsIgnoreCase("none")
} catch {
case _: Throwable => true
}
}

private def nativeScan(
plan: SparkPlan,
session: SparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.collection._

import org.apache.comet.CometConf

/**
* Comet physical scan node for DataSource V1. This node is created by CometScanRule as a planning
* intermediate and is always replaced before execution: CometExecRule converts it to a
Expand Down Expand Up @@ -415,9 +417,18 @@ object CometScanExec {
}

def isFileFormatSupported(fileFormat: FileFormat): Boolean = {
// Only support Spark's built-in Parquet scans, not others such as Delta which use a subclass
// of ParquetFileFormat.
fileFormat.getClass().equals(classOf[ParquetFileFormat])
// Spark's built-in Parquet scan is always supported (exact class match). Delta uses a
// subclass of ParquetFileFormat; allow it only when native Delta scan is explicitly enabled.
// CometScanRule applies further guards to fall back for deletion vectors / column mapping.
fileFormat.getClass().equals(classOf[ParquetFileFormat]) ||
(CometConf.COMET_DELTA_NATIVE_SCAN_ENABLED.get() &&
fileFormat.getClass.getName == DELTA_PARQUET_FILE_FORMAT)
}

/**
* Fully qualified class name of Delta's Parquet file format (matched by name to avoid a
* compile-time dependency on delta-spark).
*/
val DELTA_PARQUET_FILE_FORMAT: String = "org.apache.spark.sql.delta.DeltaParquetFileFormat"

}
Loading