From e0dc0a41d5d7f3fef613eac02f2862988e93415d Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Fri, 2 Jan 2026 17:01:33 -0500 Subject: [PATCH 1/9] Report Beam Lineage from Parquet reads --- .../scio/parquet/LineageReportingDoFn.scala | 55 +++++++++++++++++++ .../scio/parquet/avro/ParquetAvroIO.scala | 14 +++-- .../scio/parquet/read/ParquetReadFn.scala | 3 + .../parquet/tensorflow/ParquetExampleIO.scala | 14 ++++- .../scio/parquet/types/ParquetTypeIO.scala | 21 +++++-- 5 files changed, 95 insertions(+), 12 deletions(-) create mode 100644 scio-parquet/src/main/scala/com/spotify/scio/parquet/LineageReportingDoFn.scala diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/LineageReportingDoFn.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/LineageReportingDoFn.scala new file mode 100644 index 0000000000..74fc0b4b91 --- /dev/null +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/LineageReportingDoFn.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2019 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.parquet + +import org.apache.beam.sdk.io.FileSystems +import org.apache.beam.sdk.transforms.DoFn +import org.apache.beam.sdk.transforms.DoFn.{ProcessElement, Setup} +import org.slf4j.LoggerFactory + +/** + * DoFn that reports directory-level source lineage for legacy Parquet reads. + * + * @param filePattern + * The file pattern or path to report lineage for + */ +private[parquet] class LineageReportingDoFn[T](filePattern: String) extends DoFn[T, T] { + + @transient + private lazy val logger = LoggerFactory.getLogger(classOf[LineageReportingDoFn[T]]) + + @Setup + def setup(): Unit = { + try { + val isDirectory = filePattern.endsWith("/") + val resourceId = FileSystems.matchNewResource(filePattern, isDirectory) + val directory = resourceId.getCurrentDirectory + FileSystems.reportSourceLineage(directory) + } catch { + case e: Exception => + logger.warn( + s"Error when reporting lineage for pattern: $filePattern", + e + ) + } + } + + @ProcessElement + def processElement(@DoFn.Element element: T, out: DoFn.OutputReceiver[T]): Unit = + out.output(element) +} diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala index 12520c9c0e..76d8348236 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala @@ -22,7 +22,7 @@ import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT} import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory} -import com.spotify.scio.parquet.{GcsConnectorUtil, ParquetConfiguration} +import com.spotify.scio.parquet.{GcsConnectorUtil, LineageReportingDoFn, ParquetConfiguration} import com.spotify.scio.testing.TestDataManager import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil} import com.spotify.scio.values.SCollection @@ -240,6 +240,7 @@ object ParquetAvroIO { ): SCollection[T] = { val job = Job.getInstance(confOrDefault) val filePattern = ScioUtil.filePattern(path, suffix) + GcsConnectorUtil.setInputPaths(sc, job, filePattern) job.setInputFormatClass(classOf[AvroParquetInputFormat[T]]) job.getConfiguration.setClass("key.class", classOf[Void], classOf[Void]) @@ -259,12 +260,17 @@ object ParquetAvroIO { // `SCollection#map` might throw NPE on incomplete Avro objects when the runner tries // to serialized them. Lifting the mapping function here fixes the problem. override def apply(input: A): T = g(input) - override def getInputTypeDescriptor = TypeDescriptor.of(aCls) - override def getOutputTypeDescriptor = TypeDescriptor.of(oCls) + override def getInputTypeDescriptor: TypeDescriptor[A] = TypeDescriptor.of(aCls) + override def getOutputTypeDescriptor: TypeDescriptor[T] = TypeDescriptor.of(oCls) }) .withConfiguration(job.getConfiguration) - sc.applyTransform(transform).map(_.getValue) + // Report lineage during execution (not construction time) + sc.applyTransform(transform) + .applyTransform( + org.apache.beam.sdk.transforms.ParDo.of(new LineageReportingDoFn[org.apache.beam.sdk.values.KV[JBoolean, T]](filePattern)) + ) + .map(_.getValue) } } diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetReadFn.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetReadFn.scala index f0fa14fd4e..e724a748d0 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetReadFn.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetReadFn.scala @@ -19,6 +19,7 @@ package com.spotify.scio.parquet.read import com.spotify.scio.parquet.BeamInputFile import com.spotify.scio.parquet.read.ParquetReadFn._ import org.apache.beam.sdk.io.FileIO.ReadableFile +import org.apache.beam.sdk.io.FileSystems import org.apache.beam.sdk.io.hadoop.SerializableConfiguration import org.apache.beam.sdk.io.range.OffsetRange import org.apache.beam.sdk.transforms.DoFn._ @@ -167,6 +168,8 @@ class ParquetReadFn[T, R]( tracker.currentRestriction.getFrom, if (splitGranularity == SplitGranularity.File) "end" else tracker.currentRestriction().getTo ) + FileSystems.reportSourceLineage(file.getMetadata.resourceId()) + val reader = parquetFileReader(file) try { val filter = options.getRecordFilter diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala index 3efca5477d..2381480b4b 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala @@ -29,7 +29,7 @@ import com.spotify.scio.coders.{Coder, CoderMaterializer} import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT} import com.spotify.scio.parquet.ParquetConfiguration import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory} -import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil} +import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil, LineageReportingDoFn} import com.spotify.scio.testing.TestDataManager import com.spotify.scio.util.ScioUtil import com.spotify.scio.util.FilenamePolicySupplier @@ -100,7 +100,9 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] { params: ReadP ): SCollection[Example] = { val job = Job.getInstance(conf) - GcsConnectorUtil.setInputPaths(sc, job, path) + val filePattern = ScioUtil.filePattern(path, params.suffix) + + GcsConnectorUtil.setInputPaths(sc, job, filePattern) job.setInputFormatClass(classOf[TensorflowExampleParquetInputFormat]) job.getConfiguration.setClass("key.class", classOf[Void], classOf[Void]) job.getConfiguration.setClass("value.class", classOf[Example], classOf[Example]) @@ -122,7 +124,13 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] { override def apply(input: Void): JBoolean = true }) .withConfiguration(job.getConfiguration) - sc.applyTransform(source).map(_.getValue) + + // Report lineage during execution (not construction time) + sc.applyTransform(source) + .applyTransform( + org.apache.beam.sdk.transforms.ParDo.of(new LineageReportingDoFn[org.apache.beam.sdk.values.KV[JBoolean, Example]](filePattern)) + ) + .map(_.getValue) } override protected def readTest(sc: ScioContext, params: ReadP): SCollection[Example] = { diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala index 7acbcf8154..b17a3357fa 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala @@ -22,19 +22,23 @@ import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT} import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory} -import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil, ParquetConfiguration} +import com.spotify.scio.parquet.{ + BeamInputFile, + GcsConnectorUtil, + LineageReportingDoFn, + ParquetConfiguration +} import com.spotify.scio.util.ScioUtil import com.spotify.scio.util.FilenamePolicySupplier import com.spotify.scio.values.SCollection import magnolify.parquet.ParquetType import org.apache.beam.sdk.io.fs.ResourceId -import org.apache.beam.sdk.transforms.SerializableFunctions -import org.apache.beam.sdk.transforms.SimpleFunction +import org.apache.beam.sdk.transforms.{ParDo, SerializableFunctions, SimpleFunction} import org.apache.beam.sdk.io.hadoop.SerializableConfiguration import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO import org.apache.beam.sdk.io.{DynamicFileDestinations, FileSystems, WriteFiles} import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider -import org.apache.beam.sdk.values.TypeDescriptor +import org.apache.beam.sdk.values.{KV, TypeDescriptor} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.parquet.filter2.predicate.FilterPredicate @@ -90,6 +94,7 @@ final case class ParquetTypeIO[T: ClassTag: Coder: ParquetType]( val cls = ScioUtil.classOf[T] val job = Job.getInstance(conf) val filePattern = ScioUtil.filePattern(path, params.suffix) + GcsConnectorUtil.setInputPaths(sc, job, filePattern) tpe.setupInput(job) job.getConfiguration.setClass("key.class", classOf[Void], classOf[Void]) @@ -119,7 +124,13 @@ final case class ParquetTypeIO[T: ClassTag: Coder: ParquetType]( true ) ) - sc.applyTransform(source).map(_.getValue) + + // Report lineage during execution (not construction time) + sc.applyTransform(source) + .applyTransform( + ParDo.of(new LineageReportingDoFn[KV[JBoolean, T]](filePattern)) + ) + .map(_.getValue) } private def parquetOut( From 88cc9672cae6d29634b6c34b1687d148477d7328 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Fri, 2 Jan 2026 17:08:46 -0500 Subject: [PATCH 2/9] Fix issues --- .../spotify/scio/parquet/LineageReportingDoFn.scala | 2 +- .../com/spotify/scio/parquet/avro/ParquetAvroIO.scala | 9 ++++----- .../scio/parquet/tensorflow/ParquetExampleIO.scala | 10 +++++----- .../com/spotify/scio/parquet/types/ParquetTypeIO.scala | 2 -- 4 files changed, 10 insertions(+), 13 deletions(-) diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/LineageReportingDoFn.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/LineageReportingDoFn.scala index 74fc0b4b91..f39e089467 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/LineageReportingDoFn.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/LineageReportingDoFn.scala @@ -1,5 +1,5 @@ /* - * Copyright 2019 Spotify AB. + * Copyright 2026 Spotify AB. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala index 76d8348236..8de602e98e 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala @@ -240,7 +240,6 @@ object ParquetAvroIO { ): SCollection[T] = { val job = Job.getInstance(confOrDefault) val filePattern = ScioUtil.filePattern(path, suffix) - GcsConnectorUtil.setInputPaths(sc, job, filePattern) job.setInputFormatClass(classOf[AvroParquetInputFormat[T]]) job.getConfiguration.setClass("key.class", classOf[Void], classOf[Void]) @@ -260,15 +259,15 @@ object ParquetAvroIO { // `SCollection#map` might throw NPE on incomplete Avro objects when the runner tries // to serialized them. Lifting the mapping function here fixes the problem. override def apply(input: A): T = g(input) - override def getInputTypeDescriptor: TypeDescriptor[A] = TypeDescriptor.of(aCls) - override def getOutputTypeDescriptor: TypeDescriptor[T] = TypeDescriptor.of(oCls) + override def getInputTypeDescriptor = TypeDescriptor.of(aCls) + override def getOutputTypeDescriptor = TypeDescriptor.of(oCls) }) .withConfiguration(job.getConfiguration) - // Report lineage during execution (not construction time) sc.applyTransform(transform) .applyTransform( - org.apache.beam.sdk.transforms.ParDo.of(new LineageReportingDoFn[org.apache.beam.sdk.values.KV[JBoolean, T]](filePattern)) + org.apache.beam.sdk.transforms.ParDo + .of(new LineageReportingDoFn[org.apache.beam.sdk.values.KV[JBoolean, T]](filePattern)) ) .map(_.getValue) } diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala index 2381480b4b..5a65dfcf4b 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala @@ -39,8 +39,8 @@ import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO import org.apache.beam.sdk.io._ import org.apache.beam.sdk.io.fs.ResourceId import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider -import org.apache.beam.sdk.transforms.SerializableFunctions -import org.apache.beam.sdk.transforms.SimpleFunction +import org.apache.beam.sdk.transforms.{ParDo, SerializableFunctions, SimpleFunction} +import org.apache.beam.sdk.values.KV import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.parquet.filter2.predicate.FilterPredicate @@ -101,7 +101,6 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] { ): SCollection[Example] = { val job = Job.getInstance(conf) val filePattern = ScioUtil.filePattern(path, params.suffix) - GcsConnectorUtil.setInputPaths(sc, job, filePattern) job.setInputFormatClass(classOf[TensorflowExampleParquetInputFormat]) job.getConfiguration.setClass("key.class", classOf[Void], classOf[Void]) @@ -125,10 +124,11 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] { }) .withConfiguration(job.getConfiguration) - // Report lineage during execution (not construction time) sc.applyTransform(source) .applyTransform( - org.apache.beam.sdk.transforms.ParDo.of(new LineageReportingDoFn[org.apache.beam.sdk.values.KV[JBoolean, Example]](filePattern)) + ParDo.of( + new LineageReportingDoFn[KV[JBoolean, Example]](filePattern) + ) ) .map(_.getValue) } diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala index b17a3357fa..bf32271520 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala @@ -94,7 +94,6 @@ final case class ParquetTypeIO[T: ClassTag: Coder: ParquetType]( val cls = ScioUtil.classOf[T] val job = Job.getInstance(conf) val filePattern = ScioUtil.filePattern(path, params.suffix) - GcsConnectorUtil.setInputPaths(sc, job, filePattern) tpe.setupInput(job) job.getConfiguration.setClass("key.class", classOf[Void], classOf[Void]) @@ -125,7 +124,6 @@ final case class ParquetTypeIO[T: ClassTag: Coder: ParquetType]( ) ) - // Report lineage during execution (not construction time) sc.applyTransform(source) .applyTransform( ParDo.of(new LineageReportingDoFn[KV[JBoolean, T]](filePattern)) From d65110849228e7db13ae14df4b6defb2b16b57c5 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Mon, 5 Jan 2026 16:35:45 -0500 Subject: [PATCH 3/9] refactor --- .../spotify/scio/parquet/HadoopParquet.scala | 139 ++++++++++++++++++ .../scio/parquet/LineageReportingDoFn.scala | 55 ------- .../scio/parquet/avro/ParquetAvroIO.scala | 37 +---- .../parquet/tensorflow/ParquetExampleIO.scala | 44 +++--- .../scio/parquet/types/ParquetTypeIO.scala | 45 ++---- 5 files changed, 179 insertions(+), 141 deletions(-) create mode 100644 scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala delete mode 100644 scio-parquet/src/main/scala/com/spotify/scio/parquet/LineageReportingDoFn.scala diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala new file mode 100644 index 0000000000..0d8a9312ca --- /dev/null +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala @@ -0,0 +1,139 @@ +/* + * Copyright 2026 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.parquet + +import com.spotify.scio.ScioContext +import com.spotify.scio.coders.{Coder, CoderMaterializer} +import com.spotify.scio.util.ScioUtil +import com.spotify.scio.values.SCollection +import com.twitter.chill.ClosureCleaner +import org.apache.beam.sdk.io.FileSystems +import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO +import org.apache.beam.sdk.transforms.DoFn.{ProcessElement, Setup} +import org.apache.beam.sdk.transforms.{DoFn, SimpleFunction} +import org.apache.beam.sdk.values.TypeDescriptor +import org.apache.hadoop.conf.Configuration +import org.slf4j.LoggerFactory + +import scala.reflect.ClassTag + +private[parquet] object HadoopParquet { + + /** + * Read data from Hadoop format using HadoopFormatIO. + * + * This method provides low-level access to Hadoop format reading with optional projection and + * value cloning configuration. + * + * @param sc + * ScioContext for the pipeline + * @param conf + * Hadoop Configuration for the input format + * @param projectionFn + * Optional projection function to transform records from type A to type T. This is applied + * during the read to work around issues with incomplete Avro objects that might cause NPE + * during serialization if mapped after the read. + * @param skipValueClone + * Optional flag to skip cloning values during read + * @tparam A + * The input record type read from Hadoop format + * @tparam T + * The output record type after optional projection + * @return + * SCollection of records of type T + */ + def readHadoopFormatIO[A: ClassTag, T: ClassTag]( + sc: ScioContext, + conf: Configuration, + projectionFn: Option[A => T], + skipValueClone: Option[Boolean] + )(implicit coder: Coder[T]): SCollection[T] = { + val inputType = ScioUtil.classOf[A] + val outputType = ScioUtil.classOf[T] + val bcoder = CoderMaterializer.beam(sc, Coder[T]) + + val hadoop = HadoopFormatIO + .read[java.lang.Boolean, T]() + // Hadoop input always emit key-value, and `Void` causes NPE in Beam coder + .withKeyTranslation(new SimpleFunction[Void, java.lang.Boolean]() { + override def apply(input: Void): java.lang.Boolean = true + }) + + val withSkipClone = skipValueClone.fold(hadoop)(skip => hadoop.withSkipValueClone(skip)) + + val withValueTranslation = projectionFn.fold { + withSkipClone.withValueTranslation( + new SimpleFunction[T, T]() { + override def apply(input: T): T = input + override def getInputTypeDescriptor: TypeDescriptor[T] = TypeDescriptor.of(outputType) + override def getOutputTypeDescriptor = TypeDescriptor.of(outputType) + }, + bcoder + ) + } { fn => + val g = ClosureCleaner.clean(fn) // defeat closure + withSkipClone.withValueTranslation( + new SimpleFunction[A, T]() { + // Workaround for incomplete Avro objects + // `SCollection#map` might throw NPE on incomplete Avro objects when the runner tries + // to serialized them. Lifting the mapping function here fixes the problem. + override def apply(input: A): T = g(input) + override def getInputTypeDescriptor = TypeDescriptor.of(inputType) + override def getOutputTypeDescriptor = TypeDescriptor.of(outputType) + }, + bcoder + ) + } + + sc.applyTransform(withValueTranslation.withConfiguration(conf)).map(_.getValue) + } + + /** + * DoFn that reports directory-level source lineage for legacy Parquet reads. + * + * @param filePattern + * The file pattern or path to report lineage for + */ + def reportLineage[T](filePattern: String): DoFn[T, T] = new LineageReportDoFn(filePattern) + + class LineageReportDoFn[T](filePattern: String) extends DoFn[T, T] { + + @transient + private lazy val logger = LoggerFactory.getLogger(this.getClass) + + @Setup + def setup(): Unit = { + try { + val isDirectory = filePattern.endsWith("/") + val resourceId = FileSystems.matchNewResource(filePattern, isDirectory) + val directory = resourceId.getCurrentDirectory + FileSystems.reportSourceLineage(directory) + } catch { + case e: Exception => + logger.warn( + s"Error when reporting lineage for pattern: $filePattern", + e + ) + } + } + + @ProcessElement + def processElement(@DoFn.Element element: T, out: DoFn.OutputReceiver[T]): Unit = + out.output(element) + } +} diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/LineageReportingDoFn.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/LineageReportingDoFn.scala deleted file mode 100644 index f39e089467..0000000000 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/LineageReportingDoFn.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2026 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.spotify.scio.parquet - -import org.apache.beam.sdk.io.FileSystems -import org.apache.beam.sdk.transforms.DoFn -import org.apache.beam.sdk.transforms.DoFn.{ProcessElement, Setup} -import org.slf4j.LoggerFactory - -/** - * DoFn that reports directory-level source lineage for legacy Parquet reads. - * - * @param filePattern - * The file pattern or path to report lineage for - */ -private[parquet] class LineageReportingDoFn[T](filePattern: String) extends DoFn[T, T] { - - @transient - private lazy val logger = LoggerFactory.getLogger(classOf[LineageReportingDoFn[T]]) - - @Setup - def setup(): Unit = { - try { - val isDirectory = filePattern.endsWith("/") - val resourceId = FileSystems.matchNewResource(filePattern, isDirectory) - val directory = resourceId.getCurrentDirectory - FileSystems.reportSourceLineage(directory) - } catch { - case e: Exception => - logger.warn( - s"Error when reporting lineage for pattern: $filePattern", - e - ) - } - } - - @ProcessElement - def processElement(@DoFn.Element element: T, out: DoFn.OutputReceiver[T]): Unit = - out.output(element) -} diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala index 8de602e98e..d764517a93 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala @@ -17,12 +17,11 @@ package com.spotify.scio.parquet.avro -import java.lang.{Boolean => JBoolean} import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT} import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory} -import com.spotify.scio.parquet.{GcsConnectorUtil, LineageReportingDoFn, ParquetConfiguration} +import com.spotify.scio.parquet.{GcsConnectorUtil, HadoopParquet, ParquetConfiguration} import com.spotify.scio.testing.TestDataManager import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil} import com.spotify.scio.values.SCollection @@ -32,12 +31,9 @@ import org.apache.avro.reflect.ReflectData import org.apache.avro.specific.SpecificRecord import org.apache.beam.sdk.io._ import org.apache.beam.sdk.transforms.SerializableFunctions -import org.apache.beam.sdk.transforms.SimpleFunction import org.apache.beam.sdk.io.fs.ResourceId import org.apache.beam.sdk.io.hadoop.SerializableConfiguration -import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider -import org.apache.beam.sdk.values.TypeDescriptor import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.parquet.avro.{ @@ -245,31 +241,14 @@ object ParquetAvroIO { job.getConfiguration.setClass("key.class", classOf[Void], classOf[Void]) job.getConfiguration.setClass("value.class", avroClass, avroClass) - val g = ClosureCleaner.clean(projectionFn) // defeat closure - val aCls = avroClass - val oCls = ScioUtil.classOf[T] - val transform = HadoopFormatIO - .read[JBoolean, T]() - // Hadoop input always emit key-value, and `Void` causes NPE in Beam coder - .withKeyTranslation(new SimpleFunction[Void, JBoolean]() { - override def apply(input: Void): JBoolean = true - }) - .withValueTranslation(new SimpleFunction[A, T]() { - // Workaround for incomplete Avro objects - // `SCollection#map` might throw NPE on incomplete Avro objects when the runner tries - // to serialized them. Lifting the mapping function here fixes the problem. - override def apply(input: A): T = g(input) - override def getInputTypeDescriptor = TypeDescriptor.of(aCls) - override def getOutputTypeDescriptor = TypeDescriptor.of(oCls) - }) - .withConfiguration(job.getConfiguration) - - sc.applyTransform(transform) - .applyTransform( - org.apache.beam.sdk.transforms.ParDo - .of(new LineageReportingDoFn[org.apache.beam.sdk.values.KV[JBoolean, T]](filePattern)) + HadoopParquet + .readHadoopFormatIO[A, T]( + sc, + job.getConfiguration, + Some(projectionFn), + None ) - .map(_.getValue) + .parDo(HadoopParquet.reportLineage(filePattern)) } } diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala index 5a65dfcf4b..c261d315b5 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala @@ -22,32 +22,31 @@ import com.spotify.parquet.tensorflow.{ TensorflowExampleParquetReader, TensorflowExampleReadSupport } - -import java.lang.{Boolean => JBoolean} import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT} -import com.spotify.scio.parquet.ParquetConfiguration import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory} -import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil, LineageReportingDoFn} +import com.spotify.scio.parquet.{ + BeamInputFile, + GcsConnectorUtil, + HadoopParquet, + ParquetConfiguration +} import com.spotify.scio.testing.TestDataManager -import com.spotify.scio.util.ScioUtil -import com.spotify.scio.util.FilenamePolicySupplier +import com.spotify.scio.util.{FilenamePolicySupplier, ScioUtil} import com.spotify.scio.values.SCollection -import org.apache.beam.sdk.io.hadoop.SerializableConfiguration -import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO import org.apache.beam.sdk.io._ import org.apache.beam.sdk.io.fs.ResourceId +import org.apache.beam.sdk.io.hadoop.SerializableConfiguration import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider -import org.apache.beam.sdk.transforms.{ParDo, SerializableFunctions, SimpleFunction} -import org.apache.beam.sdk.values.KV +import org.apache.beam.sdk.transforms.SerializableFunctions import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.parquet.filter2.predicate.FilterPredicate -import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetReader} import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.tensorflow.proto.{Example, Features} +import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetReader} import org.tensorflow.metadata.v0.Schema +import org.tensorflow.proto.{Example, Features} import scala.jdk.CollectionConverters._ @@ -116,21 +115,14 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] { ParquetInputFormat.setFilterPredicate(job.getConfiguration, predicate) } - val source = HadoopFormatIO - .read[JBoolean, Example]() - // Hadoop input always emit key-value, and `Void` causes NPE in Beam coder - .withKeyTranslation(new SimpleFunction[Void, JBoolean]() { - override def apply(input: Void): JBoolean = true - }) - .withConfiguration(job.getConfiguration) - - sc.applyTransform(source) - .applyTransform( - ParDo.of( - new LineageReportingDoFn[KV[JBoolean, Example]](filePattern) - ) + HadoopParquet + .readHadoopFormatIO[Example, Example]( + sc, + job.getConfiguration, + projectionFn = None, + skipValueClone = None ) - .map(_.getValue) + .parDo(HadoopParquet.reportLineage(filePattern)) } override protected def readTest(sc: ScioContext, params: ReadP): SCollection[Example] = { diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala index bf32271520..fb2707a205 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala @@ -17,7 +17,6 @@ package com.spotify.scio.parquet.types -import java.lang.{Boolean => JBoolean} import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT} @@ -25,20 +24,17 @@ import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, Rea import com.spotify.scio.parquet.{ BeamInputFile, GcsConnectorUtil, - LineageReportingDoFn, + HadoopParquet, ParquetConfiguration } -import com.spotify.scio.util.ScioUtil -import com.spotify.scio.util.FilenamePolicySupplier +import com.spotify.scio.util.{FilenamePolicySupplier, ScioUtil} import com.spotify.scio.values.SCollection import magnolify.parquet.ParquetType import org.apache.beam.sdk.io.fs.ResourceId -import org.apache.beam.sdk.transforms.{ParDo, SerializableFunctions, SimpleFunction} import org.apache.beam.sdk.io.hadoop.SerializableConfiguration -import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO import org.apache.beam.sdk.io.{DynamicFileDestinations, FileSystems, WriteFiles} import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider -import org.apache.beam.sdk.values.{KV, TypeDescriptor} +import org.apache.beam.sdk.transforms.SerializableFunctions import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.parquet.filter2.predicate.FilterPredicate @@ -103,32 +99,19 @@ final case class ParquetTypeIO[T: ClassTag: Coder: ParquetType]( ParquetInputFormat.setFilterPredicate(job.getConfiguration, params.predicate) } - val source = HadoopFormatIO - .read[JBoolean, T] - // Hadoop input always emit key-value, and `Void` causes NPE in Beam coder - .withKeyTranslation(new SimpleFunction[Void, JBoolean]() { - override def apply(input: Void): JBoolean = true - }) - .withValueTranslation( - new SimpleFunction[T, T]() { - override def apply(input: T): T = input - override def getInputTypeDescriptor: TypeDescriptor[T] = TypeDescriptor.of(cls) - }, - CoderMaterializer.beam(sc, Coder[T]) - ) - .withConfiguration(job.getConfiguration) - .withSkipValueClone( - job.getConfiguration.getBoolean( - ParquetReadConfiguration.SkipClone: @nowarn("cat=deprecation"), - true + HadoopParquet + .readHadoopFormatIO[T, T]( + sc, + job.getConfiguration, + projectionFn = None, + skipValueClone = Some( + job.getConfiguration.getBoolean( + ParquetReadConfiguration.SkipClone: @nowarn("cat=deprecation"), + true + ) ) ) - - sc.applyTransform(source) - .applyTransform( - ParDo.of(new LineageReportingDoFn[KV[JBoolean, T]](filePattern)) - ) - .map(_.getValue) + .parDo(HadoopParquet.reportLineage(filePattern)) } private def parquetOut( From 785ae4a0d0244761e149ba87daaac951de947d31 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Mon, 5 Jan 2026 17:17:14 -0500 Subject: [PATCH 4/9] simplify docs --- .../spotify/scio/parquet/HadoopParquet.scala | 24 +------------------ 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala index 0d8a9312ca..c484112c44 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala @@ -34,29 +34,7 @@ import scala.reflect.ClassTag private[parquet] object HadoopParquet { - /** - * Read data from Hadoop format using HadoopFormatIO. - * - * This method provides low-level access to Hadoop format reading with optional projection and - * value cloning configuration. - * - * @param sc - * ScioContext for the pipeline - * @param conf - * Hadoop Configuration for the input format - * @param projectionFn - * Optional projection function to transform records from type A to type T. This is applied - * during the read to work around issues with incomplete Avro objects that might cause NPE - * during serialization if mapped after the read. - * @param skipValueClone - * Optional flag to skip cloning values during read - * @tparam A - * The input record type read from Hadoop format - * @tparam T - * The output record type after optional projection - * @return - * SCollection of records of type T - */ + /** Read data from Hadoop format using HadoopFormatIO. */ def readHadoopFormatIO[A: ClassTag, T: ClassTag]( sc: ScioContext, conf: Configuration, From 4ea69232f600942b8bdea1d5a9e443db67ce0015 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Tue, 6 Jan 2026 17:07:33 -0500 Subject: [PATCH 5/9] Refactor a lil bit --- .../scala/com/spotify/scio/parquet/HadoopParquet.scala | 9 ++++++++- .../com/spotify/scio/parquet/avro/ParquetAvroIO.scala | 4 ++-- .../scio/parquet/tensorflow/ParquetExampleIO.scala | 3 ++- .../com/spotify/scio/parquet/types/ParquetTypeIO.scala | 3 ++- 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala index c484112c44..9b16195b27 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala @@ -58,7 +58,9 @@ private[parquet] object HadoopParquet { withSkipClone.withValueTranslation( new SimpleFunction[T, T]() { override def apply(input: T): T = input + override def getInputTypeDescriptor: TypeDescriptor[T] = TypeDescriptor.of(outputType) + override def getOutputTypeDescriptor = TypeDescriptor.of(outputType) }, bcoder @@ -71,7 +73,9 @@ private[parquet] object HadoopParquet { // `SCollection#map` might throw NPE on incomplete Avro objects when the runner tries // to serialized them. Lifting the mapping function here fixes the problem. override def apply(input: A): T = g(input) + override def getInputTypeDescriptor = TypeDescriptor.of(inputType) + override def getOutputTypeDescriptor = TypeDescriptor.of(outputType) }, bcoder @@ -80,6 +84,9 @@ private[parquet] object HadoopParquet { sc.applyTransform(withValueTranslation.withConfiguration(conf)).map(_.getValue) } +} + +private[parquet] object LineageUtil { /** * DoFn that reports directory-level source lineage for legacy Parquet reads. @@ -87,7 +94,7 @@ private[parquet] object HadoopParquet { * @param filePattern * The file pattern or path to report lineage for */ - def reportLineage[T](filePattern: String): DoFn[T, T] = new LineageReportDoFn(filePattern) + def reportSourceDoFn[T](filePattern: String): DoFn[T, T] = new LineageReportDoFn(filePattern) class LineageReportDoFn[T](filePattern: String) extends DoFn[T, T] { diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala index d764517a93..68f341ad26 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala @@ -21,7 +21,7 @@ import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT} import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory} -import com.spotify.scio.parquet.{GcsConnectorUtil, HadoopParquet, ParquetConfiguration} +import com.spotify.scio.parquet.{GcsConnectorUtil, HadoopParquet, LineageUtil, ParquetConfiguration} import com.spotify.scio.testing.TestDataManager import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil} import com.spotify.scio.values.SCollection @@ -248,7 +248,7 @@ object ParquetAvroIO { Some(projectionFn), None ) - .parDo(HadoopParquet.reportLineage(filePattern)) + .parDo(LineageUtil.reportSourceDoFn(filePattern)) } } diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala index c261d315b5..64322671ec 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala @@ -30,6 +30,7 @@ import com.spotify.scio.parquet.{ BeamInputFile, GcsConnectorUtil, HadoopParquet, + LineageUtil, ParquetConfiguration } import com.spotify.scio.testing.TestDataManager @@ -122,7 +123,7 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] { projectionFn = None, skipValueClone = None ) - .parDo(HadoopParquet.reportLineage(filePattern)) + .parDo(LineageUtil.reportSourceDoFn(filePattern)) } override protected def readTest(sc: ScioContext, params: ReadP): SCollection[Example] = { diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala index fb2707a205..36b0f33160 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala @@ -25,6 +25,7 @@ import com.spotify.scio.parquet.{ BeamInputFile, GcsConnectorUtil, HadoopParquet, + LineageUtil, ParquetConfiguration } import com.spotify.scio.util.{FilenamePolicySupplier, ScioUtil} @@ -111,7 +112,7 @@ final case class ParquetTypeIO[T: ClassTag: Coder: ParquetType]( ) ) ) - .parDo(HadoopParquet.reportLineage(filePattern)) + .parDo(LineageUtil.reportSourceDoFn(filePattern)) } private def parquetOut( From 4a650a77a919ed4e5fb99165cf0372d36893ed44 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Tue, 6 Jan 2026 17:30:19 -0500 Subject: [PATCH 6/9] Move LineageDoFn outside of the object --- .../spotify/scio/parquet/HadoopParquet.scala | 61 +++++++++---------- .../scio/parquet/avro/ParquetAvroIO.scala | 9 ++- .../parquet/tensorflow/ParquetExampleIO.scala | 4 +- .../scio/parquet/types/ParquetTypeIO.scala | 4 +- 4 files changed, 39 insertions(+), 39 deletions(-) diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala index 9b16195b27..4aafb70cee 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala @@ -86,39 +86,34 @@ private[parquet] object HadoopParquet { } } -private[parquet] object LineageUtil { - - /** - * DoFn that reports directory-level source lineage for legacy Parquet reads. - * - * @param filePattern - * The file pattern or path to report lineage for - */ - def reportSourceDoFn[T](filePattern: String): DoFn[T, T] = new LineageReportDoFn(filePattern) - - class LineageReportDoFn[T](filePattern: String) extends DoFn[T, T] { - - @transient - private lazy val logger = LoggerFactory.getLogger(this.getClass) - - @Setup - def setup(): Unit = { - try { - val isDirectory = filePattern.endsWith("/") - val resourceId = FileSystems.matchNewResource(filePattern, isDirectory) - val directory = resourceId.getCurrentDirectory - FileSystems.reportSourceLineage(directory) - } catch { - case e: Exception => - logger.warn( - s"Error when reporting lineage for pattern: $filePattern", - e - ) - } +/** + * DoFn that reports directory-level source lineage for legacy Parquet reads. + * + * @param filePattern + * The file pattern or path to report lineage for + */ +private[parquet] class LineageReportDoFn[T](filePattern: String) extends DoFn[T, T] { + + @transient + private lazy val logger = LoggerFactory.getLogger(this.getClass) + + @Setup + def setup(): Unit = { + try { + val isDirectory = filePattern.endsWith("/") + val resourceId = FileSystems.matchNewResource(filePattern, isDirectory) + val directory = resourceId.getCurrentDirectory + FileSystems.reportSourceLineage(directory) + } catch { + case e: Exception => + logger.warn( + s"Error when reporting lineage for pattern: $filePattern", + e + ) } - - @ProcessElement - def processElement(@DoFn.Element element: T, out: DoFn.OutputReceiver[T]): Unit = - out.output(element) } + + @ProcessElement + def processElement(@DoFn.Element element: T, out: DoFn.OutputReceiver[T]): Unit = + out.output(element) } diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala index 68f341ad26..bf10161bf8 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala @@ -21,7 +21,12 @@ import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT} import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory} -import com.spotify.scio.parquet.{GcsConnectorUtil, HadoopParquet, LineageUtil, ParquetConfiguration} +import com.spotify.scio.parquet.{ + GcsConnectorUtil, + HadoopParquet, + LineageReportDoFn, + ParquetConfiguration +} import com.spotify.scio.testing.TestDataManager import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil} import com.spotify.scio.values.SCollection @@ -248,7 +253,7 @@ object ParquetAvroIO { Some(projectionFn), None ) - .parDo(LineageUtil.reportSourceDoFn(filePattern)) + .parDo(new LineageReportDoFn(filePattern)) } } diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala index 64322671ec..8e615d8bf8 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/tensorflow/ParquetExampleIO.scala @@ -30,7 +30,7 @@ import com.spotify.scio.parquet.{ BeamInputFile, GcsConnectorUtil, HadoopParquet, - LineageUtil, + LineageReportDoFn, ParquetConfiguration } import com.spotify.scio.testing.TestDataManager @@ -123,7 +123,7 @@ final case class ParquetExampleIO(path: String) extends ScioIO[Example] { projectionFn = None, skipValueClone = None ) - .parDo(LineageUtil.reportSourceDoFn(filePattern)) + .parDo(new LineageReportDoFn(filePattern)) } override protected def readTest(sc: ScioContext, params: ReadP): SCollection[Example] = { diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala index 36b0f33160..807c172428 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/types/ParquetTypeIO.scala @@ -25,7 +25,7 @@ import com.spotify.scio.parquet.{ BeamInputFile, GcsConnectorUtil, HadoopParquet, - LineageUtil, + LineageReportDoFn, ParquetConfiguration } import com.spotify.scio.util.{FilenamePolicySupplier, ScioUtil} @@ -112,7 +112,7 @@ final case class ParquetTypeIO[T: ClassTag: Coder: ParquetType]( ) ) ) - .parDo(LineageUtil.reportSourceDoFn(filePattern)) + .parDo(new LineageReportDoFn(filePattern)) } private def parquetOut( From fe790e7efee9308348646cecd880506da3f91a87 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Wed, 7 Jan 2026 17:34:43 -0500 Subject: [PATCH 7/9] move metrics report to processElement --- .../spotify/scio/parquet/HadoopParquet.scala | 41 +++++++++++-------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala index 4aafb70cee..93b21b9e8d 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala @@ -24,12 +24,13 @@ import com.spotify.scio.values.SCollection import com.twitter.chill.ClosureCleaner import org.apache.beam.sdk.io.FileSystems import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO -import org.apache.beam.sdk.transforms.DoFn.{ProcessElement, Setup} +import org.apache.beam.sdk.transforms.DoFn.ProcessElement import org.apache.beam.sdk.transforms.{DoFn, SimpleFunction} import org.apache.beam.sdk.values.TypeDescriptor import org.apache.hadoop.conf.Configuration import org.slf4j.LoggerFactory +import java.util.concurrent.atomic.AtomicBoolean import scala.reflect.ClassTag private[parquet] object HadoopParquet { @@ -86,6 +87,11 @@ private[parquet] object HadoopParquet { } } +private[parquet] object LineageReportDoFn { + // Atomic flag to ensure lineage is reported only once per JVM + private val lineageReported = new AtomicBoolean(false) +} + /** * DoFn that reports directory-level source lineage for legacy Parquet reads. * @@ -97,23 +103,22 @@ private[parquet] class LineageReportDoFn[T](filePattern: String) extends DoFn[T, @transient private lazy val logger = LoggerFactory.getLogger(this.getClass) - @Setup - def setup(): Unit = { - try { - val isDirectory = filePattern.endsWith("/") - val resourceId = FileSystems.matchNewResource(filePattern, isDirectory) - val directory = resourceId.getCurrentDirectory - FileSystems.reportSourceLineage(directory) - } catch { - case e: Exception => - logger.warn( - s"Error when reporting lineage for pattern: $filePattern", - e - ) - } - } - @ProcessElement - def processElement(@DoFn.Element element: T, out: DoFn.OutputReceiver[T]): Unit = + def processElement(@DoFn.Element element: T, out: DoFn.OutputReceiver[T]): Unit = { + if (LineageReportDoFn.lineageReported.compareAndSet(false, true)) { + try { + val isDirectory = filePattern.endsWith("/") + val resourceId = FileSystems.matchNewResource(filePattern, isDirectory) + val directory = resourceId.getCurrentDirectory + FileSystems.reportSourceLineage(directory) + } catch { + case e: Exception => + logger.warn( + s"Error when reporting lineage for pattern: $filePattern", + e + ) + } + } out.output(element) + } } From 4c920e39a46e8d2bb5aab866fe424714b36a8276 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Thu, 8 Jan 2026 09:21:41 -0500 Subject: [PATCH 8/9] Update scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala --- .../src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala index 93b21b9e8d..7450aa3114 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala @@ -44,7 +44,7 @@ private[parquet] object HadoopParquet { )(implicit coder: Coder[T]): SCollection[T] = { val inputType = ScioUtil.classOf[A] val outputType = ScioUtil.classOf[T] - val bcoder = CoderMaterializer.beam(sc, Coder[T]) + val bcoder = CoderMaterializer.beam(sc, coder) val hadoop = HadoopFormatIO .read[java.lang.Boolean, T]() From b0a3741b84a12995d61bae11ce0a327b08efb82e Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Thu, 8 Jan 2026 09:21:47 -0500 Subject: [PATCH 9/9] Update scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala --- .../src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala b/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala index 7450aa3114..0ca9e34b87 100644 --- a/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala +++ b/scio-parquet/src/main/scala/com/spotify/scio/parquet/HadoopParquet.scala @@ -60,7 +60,7 @@ private[parquet] object HadoopParquet { new SimpleFunction[T, T]() { override def apply(input: T): T = input - override def getInputTypeDescriptor: TypeDescriptor[T] = TypeDescriptor.of(outputType) + override def getInputTypeDescriptor = TypeDescriptor.of(outputType) override def getOutputTypeDescriptor = TypeDescriptor.of(outputType) },