Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -56,6 +57,13 @@
* <ol>
* <li>Compatibility checks ignore schema name, unless schema is held inside
* a union</li>
* <li>Memoization does not depend on schema instance identity. Avro keys its memo on
* identity, assuming each subschema occurrence is a distinct instance; that assumption
* does not hold for {@link HoodieSchema}, whose navigation returns fresh (or, in the
* future, canonical interned) wrappers. Instead, recursion is detected with an
* in-progress set keyed on value-equal schema pairs, and only compatible results are
* memoized so every occurrence of an incompatible subschema reports its own field
* path.</li>
* </ol>
*/
@NoArgsConstructor(access = AccessLevel.PACKAGE)
Expand Down Expand Up @@ -136,8 +144,9 @@ public static HoodieSchemaField lookupWriterField(final HoodieSchema writerSchem
/**
* Reader/writer schema pair that can be used as a key in a hash map.
* <p>
* This reader/writer pair differentiates Schema objects based on their system
* hash code.
* Schemas are compared by value equality, never by instance identity: {@link HoodieSchema}
* wrappers may be freshly allocated per navigation call or interned to canonical instances,
* so instance identity carries no meaning for the checker.
*/
@RequiredArgsConstructor
private static final class ReaderWriter {
Expand All @@ -149,7 +158,7 @@ private static final class ReaderWriter {
*/
@Override
public int hashCode() {
return System.identityHashCode(mReader) ^ System.identityHashCode(mWriter);
return Objects.hash(mReader, mWriter);
}

/**
Expand All @@ -161,8 +170,7 @@ public boolean equals(Object obj) {
return false;
}
final ReaderWriter that = (ReaderWriter) obj;
// Use pointer comparison here:
return (this.mReader == that.mReader) && (this.mWriter == that.mWriter);
return this.mReader.equals(that.mReader) && this.mWriter.equals(that.mWriter);
}

/**
Expand All @@ -182,7 +190,12 @@ public String toString() {
* </p>
*/
private static final class ReaderWriterCompatibilityChecker {
private final Map<ReaderWriter, SchemaCompatibilityResult> mMemoizeMap = new HashMap<>();
// Compatible results only: they carry no location state, so they are safe to reuse
// when the same value pair recurs at a different field path.
private final Map<ReaderWriter, SchemaCompatibilityResult> memoizedCompatibleResults = new HashMap<>();

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Values in this map are always value-equal to compatible() (mergedWith only yields COMPATIBLE from two COMPATIBLE inputs whose lists are empty), so this can be a Set<ReaderWriter> with return SchemaCompatibilityResult.compatible() on hit -- less garbage, and the compatible-only invariant becomes structural instead of comment-enforced.

// Value pairs currently being computed; re-entering one means a named type was reached
// through itself, i.e. genuine recursion.
private final Set<ReaderWriter> inProgressPairs = new HashSet<>();
private final boolean checkNaming;

public ReaderWriterCompatibilityChecker(boolean checkNaming) {
Expand Down Expand Up @@ -210,7 +223,7 @@ public SchemaCompatibilityResult getCompatibility(final HoodieSchema reader, fin
/**
* Reports the compatibility of a reader/writer schema pair.
* <p>
* Memorizes the compatibility results.
* Memoizes compatible results and breaks recursion on value-equal pairs.
* </p>
*
* @param reader Reader schema to test.
Expand All @@ -224,18 +237,21 @@ private SchemaCompatibilityResult getCompatibility(final HoodieSchema reader,
final Deque<LocationInfo> locations) {
log.debug("Checking compatibility of reader {} with writer {}", reader, writer);
final ReaderWriter pair = new ReaderWriter(reader, writer);
SchemaCompatibilityResult result = mMemoizeMap.get(pair);
if (result != null) {
if (result.getCompatibilityType() == SchemaCompatibilityType.RECURSION_IN_PROGRESS) {
// Break the recursion here.
// schemas are compatible unless proven incompatible:
result = SchemaCompatibilityResult.compatible();
}
} else {
// Mark this reader/writer pair as "in progress":
mMemoizeMap.put(pair, SchemaCompatibilityResult.recursionInProgress());
result = calculateCompatibility(reader, writer, locations);
mMemoizeMap.put(pair, result);
final SchemaCompatibilityResult memoized = memoizedCompatibleResults.get(pair);
if (memoized != null) {
return memoized;
}
if (!inProgressPairs.add(pair)) {
// Break the recursion here.
// schemas are compatible unless proven incompatible:
return SchemaCompatibilityResult.compatible();
}
SchemaCompatibilityResult result = calculateCompatibility(reader, writer, locations);
inProgressPairs.remove(pair);
if (result.getCompatibilityType() == SchemaCompatibilityType.COMPATIBLE) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Since incompatible results are intentionally not memoized (to report each occurrence path), a subschema that is shared across many field paths gets fully recomputed at every path. For a schema where an incompatible named type is reachable through nested diamond-shaped sharing, the number of recomputations (and reported incompatibility entries) can grow multiplicatively with nesting depth. Non-recursive shared types don't hit inProgressPairs, so nothing bounds this. Is that a realistic concern for the schemas this runs on, or is depth small enough in practice that it's fine? A Set of already-reported (pair, locationPath) keys, or capping duplicate paths, could bound it if needed.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recomputation is output-bound: the contract is to report every occurrence path, and in a diamond the path count is itself 2^d, so anything that doesn't under-report does at least that much work. Compatible children of a shared subtree still memo-hit, so a revisit only walks the incompatible spine. Upstream Avro bounds this corner only by under-reporting (first path wins) -- the bug the four-path test pins. Pre-#18967 code had the same profile as this PR: fresh wrappers meant the identity memo never hit either.

Measured the corner (named type shared at 2 fields per level, incompatible leaf):

depth=10  paths=1024    reported=1024    9ms
depth=15  paths=32768   reported=32768   67ms
depth=18  paths=262144  reported=262144  262ms

Linear in output, ~1us per reported path, and this runs once per schema validation, not per record. Real schemas share a type at 2-3 fields, a level or two deep.

A (pair, locationPath) set wouldn't bound it: every (pair, path) in a diamond is distinct, so it never dedups. If this corner ever got real, the fix is memoizing incompatible results with subtree-relative locations re-anchored on reuse (noted on #19143) -- bounds the walk, still emits every path.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incompatible pairs recompute per occurrence, so a shared named type referenced k times per level over d levels costs k^d subtree re-checks and accumulates k^d Incompatibility objects (measured: k=2, d=22 -> 3.7s / 4.2M objects, OOM near d=25; k=4 hits the wall around d=12). To be fair this exposure predates the branch -- the old identity memo never hit across occurrences either -- and the memo now makes the compatible case linear. But Spark's deduceWriterSchema runs this unconditionally per write, so worth fixing while here: memoize a location-free INCOMPATIBLE verdict for pruning and re-derive paths on reuse, or cap accumulation.

// Incompatible results embed the field path they were found at, so they are
// recomputed per occurrence; memoizing them would report only the first path.
memoizedCompatibleResults.put(pair, result);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memoizing COMPATIBLE results keyed only on the value pair (reader, writer) can mask a union-scope name mismatch and flip the verdict to COMPATIBLE. checkSchemaNames only compares fully-qualified names when the pair is top-level or directly inside a UNION, so a named record/enum/fixed's verdict is path-dependent. If such a type is first reached at a plain (non-union) field path the name check is skipped and the pair is memoized COMPATIBLE; a later occurrence of the same pair inside a union then hits this memo and returns COMPATIBLE without ever running the union-scope name check, dropping a genuine NAME_MISMATCH. Reachable from the checkNaming=true entry points (isSchemaCompatible / checkSchemaCompatible). The old identity-keyed memo over fresh navigation wrappers never hit across occurrences, so each path recomputed and this could not happen. Consider folding the name-checking context into the memo key.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, confirmed -- reproduced it in a regression test (plain-field occurrence memoized COMPATIBLE first, union occurrence of the same pair then skipped the name check). Fixed as you suggested: the memo key now carries whether names are validated at that location, computed by the same predicate checkSchemaNames uses, so a verdict is only reusable within the same naming context. Union-first order was already safe since only COMPATIBLE results are memoized.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified issue: a COMPATIBLE result computed while an ancestor pair is still in inProgressPairs rests on the optimistic recursion-break compatible(), but is memoized here unconditionally; a later occurrence of the pair outside the cycle reuses the tainted verdict. Repro (executed): reader Top{g: union[G{p:X}, G2{}], s: Z} with X{bad:int, z:Z}, Z{x: array<X>} vs writer with X.bad=string -> full check returns COMPATIBLE while the s subtree alone is INCOMPATIBLE at /s/x/bad (the in-progress (X,X') INCOMPATIBLE is discarded by the reader-union branch probe, the one place failures are dropped). Without the union it degrades to silently dropping /s/x/bad from the report. Fix direction: skip memoizing results whose computation overlapped a recursion break on a still-in-progress pair.

}
return result;
}
Expand Down Expand Up @@ -590,12 +606,7 @@ private static List<String> asList(Deque<LocationInfo> deque) {
* Identifies the type of schema compatibility result.
*/
public enum SchemaCompatibilityType {
COMPATIBLE, INCOMPATIBLE,

/**
* Used internally to tag a reader/writer schema pair and prevent recursion.
*/
RECURSION_IN_PROGRESS
COMPATIBLE, INCOMPATIBLE
}

public enum SchemaIncompatibilityType {
Expand Down Expand Up @@ -634,11 +645,9 @@ public SchemaCompatibilityResult mergedWith(SchemaCompatibilityResult toMerge) {
SchemaCompatibilityType compatibilityType;
// the below fields are only valid if INCOMPATIBLE
List<Incompatibility> incompatibilities;
// cached objects for stateless details
// cached object for stateless details
private static final SchemaCompatibilityResult COMPATIBLE = new SchemaCompatibilityResult(
SchemaCompatibilityType.COMPATIBLE, Collections.emptyList());
private static final SchemaCompatibilityResult RECURSION_IN_PROGRESS = new SchemaCompatibilityResult(
SchemaCompatibilityType.RECURSION_IN_PROGRESS, Collections.emptyList());

/**
* Returns a details object representing a compatible schema pair.
Expand All @@ -650,17 +659,6 @@ public static SchemaCompatibilityResult compatible() {
return COMPATIBLE;
}

/**
* Returns a details object representing a state indicating that recursion is in
* progress.
*
* @return a SchemaCompatibilityDetails object with RECURSION_IN_PROGRESS
* SchemaCompatibilityType, and no other state.
*/
public static SchemaCompatibilityResult recursionInProgress() {
return RECURSION_IN_PROGRESS;
}

/**
* Returns a details object representing an incompatible schema pair, including
* error details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.schema.TestHoodieSchemaUtils.EVOLVED_SCHEMA;
Expand Down Expand Up @@ -950,4 +952,58 @@ public void testVariantNestedInRecordCompatible() {
assertTrue(HoodieSchemaCompatibility.isSchemaCompatible(writerRecord, readerRecord, true, false));
assertTrue(HoodieSchemaCompatibility.isSchemaCompatible(readerRecord, writerRecord, true, false));
}

// Checker memoization must not depend on schema instance identity: recursion is detected on
// value-equal pairs, and incompatibilities are reported once per occurrence path even when
// subschema instances are shared.

private static final String RECURSIVE_NODE_SCHEMA = "{\"type\": \"record\", \"name\": \"Node\", \"fields\": ["
+ "{\"name\": \"value\", \"type\": \"int\"},"
+ "{\"name\": \"next\", \"type\": [\"null\", \"Node\"], \"default\": null}]}";

@Test
public void testRecursiveSchemaCompatibility() {
// Reader and writer are parsed independently, so no schema instances are shared between
// them; recursion must be detected by value, not by instance identity.
HoodieSchema reader = HoodieSchema.fromAvroSchema(new Schema.Parser().parse(RECURSIVE_NODE_SCHEMA));
HoodieSchema writer = HoodieSchema.fromAvroSchema(new Schema.Parser().parse(RECURSIVE_NODE_SCHEMA));
HoodieSchemaCompatibilityChecker.SchemaPairCompatibility result =
HoodieSchemaCompatibilityChecker.checkReaderWriterCompatibility(reader, writer, true);
assertEquals(HoodieSchemaCompatibilityChecker.SchemaCompatibilityType.COMPATIBLE, result.getType());
}

@Test
public void testRecursiveSchemaIncompatibilityReported() {
HoodieSchema reader = HoodieSchema.fromAvroSchema(new Schema.Parser().parse(RECURSIVE_NODE_SCHEMA));
HoodieSchema writer = HoodieSchema.fromAvroSchema(new Schema.Parser().parse(
RECURSIVE_NODE_SCHEMA.replace("\"type\": \"int\"", "\"type\": \"string\"")));
HoodieSchemaCompatibilityChecker.SchemaPairCompatibility result =
HoodieSchemaCompatibilityChecker.checkReaderWriterCompatibility(reader, writer, true);
assertEquals(HoodieSchemaCompatibilityChecker.SchemaCompatibilityType.INCOMPATIBLE, result.getType());
assertEquals(Collections.singletonList("/value"), incompatibilityLocations(result));
}

@Test
public void testSharedSubschemaIncompatibilityReportedAtEveryPath() {
// The named type Inner is referenced at two field paths, so in the parsed Avro schema both
// fields share one Schema instance. Each path must report its own incompatibility even when
// subschema instances (or future interned wrappers) collide.
String readerJson = "{\"type\": \"record\", \"name\": \"rec\", \"fields\": ["
+ "{\"name\": \"f1\", \"type\": {\"type\": \"record\", \"name\": \"Inner\", \"fields\": ["
+ "{\"name\": \"leaf\", \"type\": \"int\"}]}},"
+ "{\"name\": \"f2\", \"type\": \"Inner\"}]}";
String writerJson = readerJson.replace("\"type\": \"int\"", "\"type\": \"string\"");
HoodieSchema reader = HoodieSchema.fromAvroSchema(new Schema.Parser().parse(readerJson));
HoodieSchema writer = HoodieSchema.fromAvroSchema(new Schema.Parser().parse(writerJson));
HoodieSchemaCompatibilityChecker.SchemaPairCompatibility result =
HoodieSchemaCompatibilityChecker.checkReaderWriterCompatibility(reader, writer, true);
assertEquals(HoodieSchemaCompatibilityChecker.SchemaCompatibilityType.INCOMPATIBLE, result.getType());
assertEquals(Arrays.asList("/f1/leaf", "/f2/leaf"), incompatibilityLocations(result));
}

private static List<String> incompatibilityLocations(HoodieSchemaCompatibilityChecker.SchemaPairCompatibility result) {
return result.getResult().getIncompatibilities().stream()
.map(HoodieSchemaCompatibilityChecker.Incompatibility::getLocation)
.collect(Collectors.toList());
}
}
Loading