Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieAvroSchemaCache;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.table.HoodieTableConfig;
Expand Down Expand Up @@ -126,7 +125,7 @@ public RowData getDeleteRow(String recordKey) {
@Override
public RowData convertAvroRecord(IndexedRecord avroRecord) {
Schema recordSchema = avroRecord.getSchema();
AvroToRowDataConverters.AvroToRowDataConverter converter = RowDataQueryContexts.fromSchema(HoodieAvroSchemaCache.intern(recordSchema), utcTimezone).getAvroToRowDataConverter();
AvroToRowDataConverters.AvroToRowDataConverter converter = RowDataQueryContexts.fromSchema(HoodieSchema.fromAvroSchema(recordSchema), utcTimezone).getAvroToRowDataConverter();
RowData rowData = (RowData) converter.convert(avroRecord);
Schema.Field operationField = recordSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD);
if (operationField != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.hudi

import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hudi.common.engine.RecordContext
import org.apache.hudi.common.schema.{HoodieAvroSchemaCache, HoodieSchema}
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
Expand All @@ -47,7 +47,7 @@ trait SparkFileFormatInternalRecordContext extends BaseSparkInternalRecordContex
* @return An [[InternalRow]].
*/
override def convertAvroRecord(avroRecord: IndexedRecord): InternalRow = {
val schema = HoodieAvroSchemaCache.intern(avroRecord.getSchema)
val schema = HoodieSchema.fromAvroSchema(avroRecord.getSchema)
val structType = HoodieInternalRowUtils.getCachedSchema(schema)
val deserializer = deserializerMap.getOrElseUpdate(schema, {
sparkAdapter.createAvroDeserializer(schema, structType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieAvroSchemaCache;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.table.HoodieTableConfig;
Expand Down Expand Up @@ -71,10 +70,10 @@ public AvroRecordContext() {
public static Object getFieldValueFromIndexedRecord(
IndexedRecord record,
String fieldName) {
// Interning returns the canonical wrapper for this schema, whose lazily built field list and
// field map survive across calls, so the per-record cost is a cache hit instead of an
// O(schema width) wrapper rebuild.
HoodieSchema currentSchema = HoodieAvroSchemaCache.intern(record.getSchema());
// fromAvroSchema returns the canonical wrapper for this schema, whose lazily built field
// list and field map survive across calls, so the per-record cost is a cache hit instead
// of an O(schema width) wrapper rebuild.
HoodieSchema currentSchema = HoodieSchema.fromAvroSchema(record.getSchema());

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-existing (since #13646), but since this method is being touched: in the loop below, a dotted path with a null intermediate struct NPEs -- getValue(record, "a.b") with a == null assigns currentRecord = null and the next iteration derefs it. HoodieAvroUtils#getNestedFieldVal guards this case and returns null. Worth adding the null check here plus a TestAvroRecordContext case with address = null (nested ordering/precombine fields reach this path).

IndexedRecord currentRecord = record;
String[] path = fieldName.split("\\.");
for (int i = 0; i < path.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hudi.avro;

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieAvroSchemaCache;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.util.DateTimeUtils;
Expand Down Expand Up @@ -835,7 +834,7 @@ public static Object[] getRecordColumnValues(HoodieRecord record,
Schema schema,
boolean consistentLogicalTimestampEnabled) {
try {
GenericRecord genericRecord = (GenericRecord) (record.toIndexedRecord(HoodieAvroSchemaCache.intern(schema), new Properties()).get()).getData();
GenericRecord genericRecord = (GenericRecord) (record.toIndexedRecord(HoodieSchema.fromAvroSchema(schema), new Properties()).get()).getData();
List<Object> list = new ArrayList<>();
for (String col : columns) {
list.add(HoodieAvroUtils.getNestedFieldVal(genericRecord, col, true, consistentLogicalTimestampEnabled));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.hudi.avro.MercifulJsonConverter;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.schema.HoodieAvroSchemaCache;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.util.FileIOUtils;
Expand Down Expand Up @@ -65,7 +65,7 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord oldRec, Sche
@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
return Option.of(jsonConverter.convert(getJsonData(), HoodieAvroSchemaCache.intern(schema)));
return Option.of(jsonConverter.convert(getJsonData(), HoodieSchema.fromAvroSchema(schema)));
}

private String getJsonData() throws IOException {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.HoodieSchemaException;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.Getter;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalType;
Expand Down Expand Up @@ -94,6 +96,12 @@
public class HoodieSchema implements Serializable {
private static final long serialVersionUID = 1L;

// Avro-identity fast path onto the value-keyed HoodieSchemaCache, backing fromAvroSchema:
// records of one file share the same live Schema instance, so the per-record hot path is a
// single weak-identity hit with no wrapper allocation or type dispatch.
private static final Cache<Schema, HoodieSchema> AVRO_SCHEMA_CACHE =

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This 1024-cap cache is now populated by every navigation call, not just the old top-level intern sites: one parsed 500-col nullable schema is ~1500 identity-distinct Avro nodes (Avro primitives are not shared singletons), ~500 of which enter on a typical read, so two wide tables exceed the cap. Single-table jobs are fine (TinyLFU keeps the per-record-hot keys), but multi-table long-lived JVMs sustain insert pressure between hot-key accesses. Consider weakKeys with no maximumSize (the usual weak-interner pattern) or a much larger cap.

Caffeine.newBuilder().weakKeys().maximumSize(1024).build();

/**
* Constant representing a null JSON value, equivalent to JsonProperties.NULL_VALUE.
* This provides compatibility with Avro's JsonProperties while maintaining Hudi's API.
Expand Down Expand Up @@ -338,13 +346,42 @@ private HoodieSchema(Schema avroSchema, List<HoodieSchemaField> fields) {
/**
* Factory method to create HoodieSchema from an Avro schema.
*
* <p>Returns the canonical instance for the given schema, converting and interning it on
* first use: distinct Avro schema instances with identical serialized content converge on
* one shared wrapper through {@link HoodieSchemaCache}, with a weak identity fast path for
* the per-record hot path where all records of a file share the same live {@link Schema}
* instance.
*
* <p>Interning is never lossy: the canonicalization key covers the schema's full content,

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this rationale is spelled out in full here, on equals(), and in HoodieSchemaCache's class doc -- keep the full version once in HoodieSchemaCache and a one-liner plus link in the two public javadocs. Also 'never lossy' needs the caveat that non-JSON-serializable schemas are returned uncached on Avro 1.11, so not every call yields the canonical instance.

* including doc strings and aliases, which Avro equality ignores. Schemas that differ only
* in docs or aliases stay distinct wrappers (even though they are {@code equals()}), so
* metadata consumed downstream (e.g. catalog sync column comments, alias-based field
* matching) is always preserved.
*
* <p>Because the canonical wrapper may have been created from a content-identical but
* different Avro schema instance, {@code fromAvroSchema(s).getAvroSchema()} does not
* necessarily return {@code s} itself. Canonical instances are shared: neither the wrapper
* nor its underlying Avro schema may be mutated.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This contract is enforced only by javadoc: addProp/setFields (and HoodieSchemaField.addProp) remain public and unguarded, and parse()'s owned-instance protection is depth-0 -- navigated children are interned, so parse(json).getField(f).schema().addProp(...) mutates a global canonical, and a first-seen child's canonical aliases the owned Avro subtree. Verified mechanically that a mutated canonical keeps being served under its stale pre-mutation content key. Suggest a transient frozen flag set at intern time (pool loader, the AVRO_SCHEMA_CACHE.put winner, and the uncached fallback), with mutators throwing when frozen; parse() and builders stay unfrozen.

*
* @param avroSchema the Avro schema to wrap
* @return new HoodieSchema instance
* @return the canonical HoodieSchema instance, or null if avroSchema is null
*/
public static HoodieSchema fromAvroSchema(Schema avroSchema) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Now that fromAvroSchema returns globally-shared canonical instances for every caller (previously only the ~9 hot-path sites went through interning), the public void setFields(...) at L1187 — which mutates avroSchema.setFields(...) in place — becomes a real footgun: any caller that mutates a fromAvroSchema-derived wrapper would silently corrupt the shared pool and also change a live cache key's hashCode. I couldn't find a current path that does this, so it's latent rather than a live bug, but would it be worth guarding (e.g. making the canonical wrapper immutable, or having setFields defensively copy) rather than relying on the javadoc invariant alone?

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

if (avroSchema == null) {
return null;
}
HoodieSchema canonical = AVRO_SCHEMA_CACHE.getIfPresent(avroSchema);
if (canonical == null) {
// getIfPresent/put rather than a computing get: construction may re-enter fromAvroSchema
// for subschemas (e.g. Variant validation), which a Caffeine loader must not do. A racy
// duplicate build converges on one instance through the value-keyed intern.
canonical = HoodieSchemaCache.intern(buildFromAvroSchema(avroSchema));
AVRO_SCHEMA_CACHE.put(avroSchema, canonical);
}
return canonical;
}

private static HoodieSchema buildFromAvroSchema(Schema avroSchema) {
LogicalType logicalType = avroSchema.getLogicalType();
if (logicalType != null) {
if (logicalType instanceof LogicalTypes.Decimal) {
Expand Down Expand Up @@ -1586,6 +1623,14 @@ public String toString(boolean pretty) {
return avroSchema.toString(pretty);
}

/**
* Equality delegates to Avro {@link Schema#equals}, which IGNORES doc strings and aliases:
* two schemas differing only in that metadata are equal. Consumers that care about docs or
* aliases (e.g. catalog sync comments, alias-based field matching) must not rely on
* equality or value-keyed maps to tell such schemas apart. Canonicalization in
* {@link HoodieSchemaCache} deliberately keys on the full serialized content instead, so
* interning never conflates them.
*/
@Override
public boolean equals(Object obj) {
if (this == obj) {
Expand Down Expand Up @@ -1644,7 +1689,10 @@ public HoodieSchema parse(String jsonSchema) {

try {
Schema avroSchema = avroParser.parse(jsonSchema);
return fromAvroSchema(avroSchema);
// Return a fresh, owned instance rather than the interned canonical one: parsed schemas
// are frequently mutated by callers (e.g. addProp in client-init callbacks), and mutating
// a shared interned instance would corrupt it for every other holder of the same schema.
return buildFromAvroSchema(avroSchema);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parsed schemas
// are frequently mutated by callers (e.g. addProp in client-init callbacks), and mutating
// a shared interned insta

shouldn't the schema always been immutable once it is built?

} catch (IllegalArgumentException e) {
throw new HoodieAvroSchemaException("Invalid schema string format", e);
} catch (Exception e) {
Expand All @@ -1664,7 +1712,8 @@ public HoodieSchema parse(InputStream inputStream) {

try {
Schema avroSchema = avroParser.parse(inputStream);
return fromAvroSchema(avroSchema);
// See parse(String): return an owned, mutable instance, not the interned canonical one.
return buildFromAvroSchema(avroSchema);
} catch (IOException e) {
throw new HoodieIOException("Failed to parse schema from InputStream", e);
} catch (IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.avro.AvroRuntimeException;

/**
* A global cache for HoodieSchema instances to ensure that there is only one
Expand All @@ -28,21 +29,77 @@
* <p>This is a global cache which works for a JVM lifecycle.
* A collection of schema instances are maintained.
*
* <p>This value-keyed pool is the canonicalization mechanism behind
* {@link HoodieSchema#fromAvroSchema}, and can also be used directly to intern schemas
* produced without an Avro source (builders, converters).
*
* <p>Interning is never lossy: entries are keyed on the schema's full serialized content,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the intern should be used on hotspot code path that schema is mostly used in read/write path, so the missing of comment or doc should be fine, let's not introduce unnecessary complexities.

* NOT on {@link HoodieSchema#equals}. Avro equality (which HoodieSchema equality delegates
* to) ignores doc strings and aliases, so keying on it would collapse schemas that differ
* only in that metadata and silently drop it -- docs drive catalog sync column comments and
* aliases drive schema-evolution field matching. Schemas that differ in docs or aliases
* intern to distinct canonical instances even though they are {@code equals()}.
*
* <p>NOTE: The schema which is used frequently should be cached through this cache.
*/
public class HoodieSchemaCache {

// Ensure that there is only one variable instance of the same schema within an entire JVM lifetime
private static final LoadingCache<HoodieSchema, HoodieSchema> SCHEMA_CACHE =
Caffeine.newBuilder().weakValues().maximumSize(1024).build(k -> k);
private static final LoadingCache<SchemaContentKey, HoodieSchema> SCHEMA_CACHE =
Caffeine.newBuilder().weakValues().maximumSize(1024).build(key -> key.schema);

/**
* Get schema variable from global cache. If not found, put it into the cache and then return it.
*
* <p>Two schemas converge on one canonical instance only when their full serialized form
* (including doc strings and aliases) is identical; see the class javadoc.
*
* <p>A schema that is valid in memory but cannot be serialized to JSON -- e.g. two distinct
* nested records that share a name, as some projection/reader paths produce -- has no content
* key, so it is returned uncached instead of interned. Canonicalization is only a
* de-duplication optimization, so skipping it stays correct.
*
* @param schema schema to get
* @return if found, return the exist schema variable, otherwise return the param itself.
*/
public static HoodieSchema intern(HoodieSchema schema) {
return SCHEMA_CACHE.get(schema);
SchemaContentKey key;
try {
key = new SchemaContentKey(schema);

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now serializes the full schema JSON on every intern() call including hits (~100us for a 500-col schema vs ~2ns for the old cached-hashCode lookup). The direct callers run per log block (HoodieDataBlock read ctor, KeyBased/PositionBasedFileGroupRecordBuffer), and FileGroupRecordBuffer re-interns the schema handler's already-interned requiredSchema -- ~0.2-0.3ms per block on wide tables. Cache contentJson lazily in a transient field on HoodieSchema, or short-circuit already-canonical instances.

} catch (AvroRuntimeException e) {
// Not serializable -> no content key derivable; skip interning rather than fail the caller.
return schema;
}
return SCHEMA_CACHE.get(key);
}

/**
* Content-complete cache key: the serialized JSON form covers doc strings and aliases that
* Avro equality ignores. The wrapper class is part of the key so a logical-type subclass
* (e.g. {@link HoodieSchema.Decimal}) never collapses onto a plain wrapper of equal content,
* which would break downcasts.
*/
private static final class SchemaContentKey {
private final HoodieSchema schema;

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key strongly references the value, so weakValues() never fires (same shape as the old cache), and each entry now also retains the full JSON string (~80KB for a 1000-col schema) until size eviction -- worst case tens of MB of dead strings in long-lived multi-table JVMs. A fixed-size digest of toString() plus the wrapper class keeps the doc/alias sensitivity without retaining the JSON.

private final String contentJson;

SchemaContentKey(HoodieSchema schema) {
this.schema = schema;
this.contentJson = schema.getAvroSchema().toString();

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified issue (Spark 4 profiles): on Avro 1.12.x toString() does not throw for a second, structurally different nested record sharing a name -- it emits a bare name ref -- so two different schemas produce byte-identical contentJson and intern to one canonical with the wrong structure. Reproduced end-to-end against this branch's classes with avro-1.12.0: two different projections of a schema that reuses a named record (projectSchema builds same-named pruned copies; they reach intern via HoodieSchemaUtils#projectSchema -> fromAvroSchema and the per-record path) -> the second query gets the first query's wrapper and reads the wrong nested fields. The AvroRuntimeException catch only covers the 1.11 throwing behavior. The key needs a structural walk that tracks named types by instance -- or refuse to intern on redefinition -- rather than trusting toString().

}

@Override
public int hashCode() {
return contentJson.hashCode();
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof SchemaContentKey)) {
return false;
}
SchemaContentKey that = (SchemaContentKey) obj;
return schema.getClass() == that.schema.getClass() && contentJson.equals(that.contentJson);
}
}
}
Loading
Loading