diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bloom/InternalBloomFilter.java b/hudi-common/src/main/java/org/apache/hudi/common/bloom/InternalBloomFilter.java index 7ef766a2a3c5a..af45cb0245c62 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/bloom/InternalBloomFilter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/bloom/InternalBloomFilter.java @@ -82,21 +82,15 @@ * @see Space/Time Trade-Offs in Hash Coding with Allowable Errors */ public class InternalBloomFilter extends InternalFilter { - private static final byte[] BIT_VALUES = new byte[] { - (byte) 0x01, - (byte) 0x02, - (byte) 0x04, - (byte) 0x08, - (byte) 0x10, - (byte) 0x20, - (byte) 0x40, - (byte) 0x80 - }; - /** - * The bit vector. + * The bit vector, as little-endian 64-bit words: bit {@code i} lives at + * {@code words[i >> 6]} under mask {@code 1L << (i & 63)}. The serialized layout + * (bit {@code i} at byte {@code i >> 3} under mask {@code 1 << (i & 7)}) is the + * little-endian byte view of this array, so {@link #write} and {@link #readFields} + * translate between the two by byte position alone. Bits at positions greater than + * or equal to {@code vectorSize} are always zero. */ - BitSet bits; + long[] words; /** * Default constructor - use with readFields @@ -116,7 +110,7 @@ public InternalBloomFilter() { public InternalBloomFilter(int vectorSize, int nbHash, int hashType) { super(vectorSize, nbHash, hashType); - bits = new BitSet(this.vectorSize); + words = new long[wordCount(this.vectorSize)]; } /** @@ -134,7 +128,7 @@ public void add(Key key) { hash.clear(); for (int i = 0; i < nbHash; i++) { - bits.set(h[i]); + words[h[i] >>> 6] |= 1L << (h[i] & 63); } } @@ -147,7 +141,10 @@ public void and(InternalFilter filter) { throw new IllegalArgumentException("filters cannot be and-ed"); } - this.bits.and(((InternalBloomFilter) filter).bits); + long[] other = ((InternalBloomFilter) filter).words; + for (int i = 0; i < words.length; i++) { + words[i] &= other[i]; + } } @Override @@ -159,7 +156,7 @@ public boolean membershipTest(Key key) { int[] h = hash.hash(key); hash.clear(); for (int i = 0; i < nbHash; i++) { - if (!bits.get(h[i])) { + if ((words[h[i] >>> 6] & (1L << (h[i] & 63))) == 0) { return false; } } @@ -168,7 +165,10 @@ public boolean membershipTest(Key key) { @Override public void not() { - bits.flip(0, vectorSize); + for (int i = 0; i < words.length; i++) { + words[i] = ~words[i]; + } + clearUnusedBits(); } @Override @@ -179,7 +179,10 @@ public void or(InternalFilter filter) { || filter.nbHash != this.nbHash) { throw new IllegalArgumentException("filters cannot be or-ed"); } - bits.or(((InternalBloomFilter) filter).bits); + long[] other = ((InternalBloomFilter) filter).words; + for (int i = 0; i < words.length; i++) { + words[i] |= other[i]; + } } @Override @@ -190,12 +193,15 @@ public void xor(InternalFilter filter) { || filter.nbHash != this.nbHash) { throw new IllegalArgumentException("filters cannot be xor-ed"); } - bits.xor(((InternalBloomFilter) filter).bits); + long[] other = ((InternalBloomFilter) filter).words; + for (int i = 0; i < words.length; i++) { + words[i] ^= other[i]; + } } @Override public String toString() { - return bits.toString(); + return BitSet.valueOf(words).toString(); } /** @@ -209,17 +215,8 @@ public int getVectorSize() { public void write(DataOutput out) throws IOException { super.write(out); byte[] bytes = new byte[getNBytes()]; - for (int i = 0, byteIndex = 0, bitIndex = 0; i < vectorSize; i++, bitIndex++) { - if (bitIndex == 8) { - bitIndex = 0; - byteIndex++; - } - if (bitIndex == 0) { - bytes[byteIndex] = 0; - } - if (bits.get(i)) { - bytes[byteIndex] |= BIT_VALUES[bitIndex]; - } + for (int byteIndex = 0; byteIndex < bytes.length; byteIndex++) { + bytes[byteIndex] = (byte) (words[byteIndex >>> 3] >>> ((byteIndex & 7) << 3)); } out.write(bytes); } @@ -227,22 +224,32 @@ public void write(DataOutput out) throws IOException { @Override public void readFields(DataInput in) throws IOException { super.readFields(in); - bits = new BitSet(this.vectorSize); + words = new long[wordCount(vectorSize)]; byte[] bytes = new byte[getNBytes()]; in.readFully(bytes); - for (int i = 0, byteIndex = 0, bitIndex = 0; i < vectorSize; i++, bitIndex++) { - if (bitIndex == 8) { - bitIndex = 0; - byteIndex++; - } - if ((bytes[byteIndex] & BIT_VALUES[bitIndex]) != 0) { - bits.set(i); - } + for (int byteIndex = 0; byteIndex < bytes.length; byteIndex++) { + words[byteIndex >>> 3] |= (bytes[byteIndex] & 0xFFL) << ((byteIndex & 7) << 3); } + clearUnusedBits(); } /* @return number of bytes needed to hold bit vector */ private int getNBytes() { return (int) (((long) vectorSize + 7) / 8); } + + private static int wordCount(int vectorSize) { + return (vectorSize + 63) >>> 6; + } + + /** + * Clears bits at positions greater than or equal to {@code vectorSize}, such as the unused + * trailing bits of the last serialized byte, so bitwise ops and serialization stay exact. + */ + private void clearUnusedBits() { + int usedBitsInLastWord = vectorSize & 63; + if (usedBitsInLastWord != 0) { + words[words.length - 1] &= (1L << usedBitsInLastWord) - 1; + } + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/bloom/InternalBloomFilterBenchmark.java b/hudi-common/src/test/java/org/apache/hudi/common/bloom/InternalBloomFilterBenchmark.java new file mode 100644 index 0000000000000..3c3751ee3f21e --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/bloom/InternalBloomFilterBenchmark.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.bloom; + +import org.junit.jupiter.api.Test; + +import java.util.Locale; +import java.util.Random; +import java.util.function.Supplier; + +/** + * Manual microbenchmark for bloom filter hot paths: key adds (the write-path cost paid per + * record by the HFile writer), membership tests, and serialization round trips. + *

+ * The class name intentionally does not match the surefire test patterns, so it never runs + * in CI. Run it explicitly with: + *

+ * mvn test -pl hudi-common -Dtest=InternalBloomFilterBenchmark -Dsurefire.failIfNoSpecifiedTests=false
+ * 
+ */ +public class InternalBloomFilterBenchmark { + + private static final int WARMUP_ROUNDS = 1; + private static final int MEASURED_ROUNDS = 3; + private static final int KEY_POOL_SIZE = 1_000_000; + private static final int MEMBERSHIP_PROBES = 100_000; + + @Test + public void benchmarkBloomFilterHotPaths() { + runScenario("SIMPLE, 1M entries, fpp 1e-3, 1M adds", + () -> BloomFilterFactory.createBloomFilter( + 1_000_000, 0.001, -1, BloomFilterTypeCode.SIMPLE.name()), + 1_000_000); + runScenario("SIMPLE, 10M entries, fpp 1e-9, 10M adds", + () -> BloomFilterFactory.createBloomFilter( + 10_000_000, 0.000000001, -1, BloomFilterTypeCode.SIMPLE.name()), + 10_000_000); + runScenario("DYNAMIC_V0, 60K entries, fpp 1e-9, max 100K, 10M adds", + () -> BloomFilterFactory.createBloomFilter( + 60_000, 0.000000001, 100_000, BloomFilterTypeCode.DYNAMIC_V0.name()), + 10_000_000); + } + + private void runScenario(String name, Supplier filterSupplier, int numAdds) { + String[] keys = generateKeys(KEY_POOL_SIZE, 42); + String[] absentKeys = generateKeys(MEMBERSHIP_PROBES, 4242); + System.out.println("== " + name + " =="); + BloomFilter filter = null; + for (int round = 0; round < WARMUP_ROUNDS + MEASURED_ROUNDS; round++) { + filter = filterSupplier.get(); + long start = System.nanoTime(); + for (int i = 0; i < numAdds; i++) { + filter.add(keys[i % KEY_POOL_SIZE]); + } + long addMs = (System.nanoTime() - start) / 1_000_000; + + start = System.nanoTime(); + int hits = 0; + for (int i = 0; i < MEMBERSHIP_PROBES; i++) { + if (filter.mightContain(keys[i])) { + hits++; + } + } + long hitMs = (System.nanoTime() - start) / 1_000_000; + + start = System.nanoTime(); + int falsePositives = 0; + for (String absentKey : absentKeys) { + if (filter.mightContain(absentKey)) { + falsePositives++; + } + } + long missMs = (System.nanoTime() - start) / 1_000_000; + + String label = round < WARMUP_ROUNDS ? "warmup" : "round" + (round - WARMUP_ROUNDS + 1); + System.out.println(String.format(Locale.ROOT, + "%s: adds(%d)=%d ms, membership hits(%d)=%d ms, misses(%d)=%d ms (hits=%d, falsePositives=%d)", + label, numAdds, addMs, MEMBERSHIP_PROBES, hitMs, absentKeys.length, missMs, hits, falsePositives)); + } + + for (int round = 0; round < WARMUP_ROUNDS + MEASURED_ROUNDS; round++) { + long start = System.nanoTime(); + String serialized = filter.serializeToString(); + long serMs = (System.nanoTime() - start) / 1_000_000; + start = System.nanoTime(); + BloomFilterFactory.fromString(serialized, filter.getBloomFilterTypeCode().name()); + long deserMs = (System.nanoTime() - start) / 1_000_000; + String label = round < WARMUP_ROUNDS ? "warmup" : "round" + (round - WARMUP_ROUNDS + 1); + System.out.println(String.format(Locale.ROOT, + "%s: serialize=%d ms, deserialize=%d ms (serialized length=%d)", + label, serMs, deserMs, serialized.length())); + } + } + + /** Generates fixed-seed 32-character hex keys, mirroring hash-based record keys. */ + private static String[] generateKeys(int count, long seed) { + Random random = new Random(seed); + String[] keys = new String[count]; + byte[] buffer = new byte[16]; + StringBuilder sb = new StringBuilder(32); + for (int i = 0; i < count; i++) { + random.nextBytes(buffer); + sb.setLength(0); + for (byte b : buffer) { + sb.append(Character.forDigit((b >> 4) & 0xF, 16)).append(Character.forDigit(b & 0xF, 16)); + } + keys[i] = sb.toString(); + } + return keys; + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/bloom/TestInternalBloomFilter.java b/hudi-common/src/test/java/org/apache/hudi/common/bloom/TestInternalBloomFilter.java new file mode 100644 index 0000000000000..175fe725ba6c0 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/bloom/TestInternalBloomFilter.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.bloom; + +import org.apache.hudi.common.util.hash.Hash; + +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Random; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests {@link InternalBloomFilter}, pinning the serialized byte layout (bit {@code i} at + * byte {@code i >> 3} under mask {@code 1 << (i & 7)}, the format shared with Hadoop's + * {@code BloomFilter}) against a {@link BitSet}-based oracle and pre-captured golden strings. + */ +public class TestInternalBloomFilter { + + private static final String SIMPLE_GOLDEN = "/////wAAAAoBAAACz+cFPAf5vcr6feFygnZHoFAOwLY7/WznVO6WN9QF7fMmM1m+zXrzC9ICIvydFz8bNEUfQN/L" + + "vtxjs9bkOxNklqSWK6H6aacyEc1SNA0+iZW9Ae0xTLgQp2k6dg=="; + private static final String DYNAMIC_GOLDEN = "/////wAAAAoBAAABIAAAABQAAAA8AAAAA/////8AAAAKAQAAASBSsn9fX63/7M15X+Rmc+75/96eez/Tdme7//nv" + + "1JuP6WZX/tv/////AAAACgEAAAEg6fC+36p6fdz/Lr98Ppl+/QvlV2rfp5+9/Pm358cj1x/f/Hes/////wAAAAoB" + + "AAABINvt/Ha3Ped32/7//3Wtp+3rLH2o/08eqTXZa/u/j77vpvzdvg=="; + + @Test + public void testSerializedBitsMatchBitSetOracle() throws IOException { + int[][] configs = {{63, 3}, {64, 3}, {65, 3}, {127, 5}, {128, 5}, {1000, 7}, {43133, 30}}; + for (int[] config : configs) { + int vectorSize = config[0]; + int nbHash = config[1]; + InternalBloomFilter filter = new InternalBloomFilter(vectorSize, nbHash, Hash.MURMUR_HASH); + HashFunction hashFunction = new HashFunction(vectorSize, nbHash, Hash.MURMUR_HASH); + BitSet oracle = new BitSet(vectorSize); + Random random = new Random(vectorSize); + for (int i = 0; i < 200; i++) { + Key key = randomKey(random); + filter.add(key); + for (int pos : hashFunction.hash(key)) { + oracle.set(pos); + } + assertTrue(filter.membershipTest(key)); + } + assertArrayEquals(oracleBits(vectorSize, oracle), serializedBits(filter), + "Serialized bits mismatch for vectorSize=" + vectorSize); + for (int i = 0; i < 200; i++) { + Key key = randomKey(random); + boolean oracleContains = Arrays.stream(hashFunction.hash(key)).allMatch(oracle::get); + assertEquals(oracleContains, filter.membershipTest(key), + "Membership mismatch for vectorSize=" + vectorSize); + } + } + } + + @Test + public void testWriteReadFieldsRoundTrip() throws IOException { + for (int vectorSize : new int[] {63, 64, 65, 127, 128, 1000}) { + InternalBloomFilter filter = new InternalBloomFilter(vectorSize, 3, Hash.MURMUR_HASH); + Random random = new Random(vectorSize); + Key[] keys = new Key[100]; + for (int i = 0; i < keys.length; i++) { + keys[i] = randomKey(random); + filter.add(keys[i]); + } + byte[] serialized = serialize(filter); + InternalBloomFilter deserialized = new InternalBloomFilter(); + deserialized.readFields(new DataInputStream(new ByteArrayInputStream(serialized))); + for (Key key : keys) { + assertTrue(deserialized.membershipTest(key)); + } + assertArrayEquals(serialized, serialize(deserialized), + "Round-trip bytes mismatch for vectorSize=" + vectorSize); + } + } + + @Test + public void testReadFieldsIgnoresUnusedTrailingBits() throws IOException { + int vectorSize = 61; + InternalBloomFilter filter = new InternalBloomFilter(vectorSize, 3, Hash.MURMUR_HASH); + Random random = new Random(vectorSize); + Key[] keys = new Key[50]; + for (int i = 0; i < keys.length; i++) { + keys[i] = randomKey(random); + filter.add(keys[i]); + } + byte[] serialized = serialize(filter); + byte[] mutated = Arrays.copyOf(serialized, serialized.length); + // The last byte carries bits 56..60; bits 61..63 are beyond vectorSize and must be ignored. + mutated[mutated.length - 1] |= (byte) 0xE0; + InternalBloomFilter deserialized = new InternalBloomFilter(); + deserialized.readFields(new DataInputStream(new ByteArrayInputStream(mutated))); + for (Key key : keys) { + assertTrue(deserialized.membershipTest(key)); + } + assertArrayEquals(serialized, serialize(deserialized), "Unused trailing bits must not survive a round trip"); + } + + @Test + public void testBitwiseOpsMatchBitSetOracle() throws IOException { + int vectorSize = 127; + int nbHash = 5; + HashFunction hashFunction = new HashFunction(vectorSize, nbHash, Hash.MURMUR_HASH); + Random random = new Random(42); + InternalBloomFilter first = new InternalBloomFilter(vectorSize, nbHash, Hash.MURMUR_HASH); + InternalBloomFilter second = new InternalBloomFilter(vectorSize, nbHash, Hash.MURMUR_HASH); + BitSet firstOracle = new BitSet(vectorSize); + BitSet secondOracle = new BitSet(vectorSize); + for (int i = 0; i < 100; i++) { + Key key = randomKey(random); + first.add(key); + for (int pos : hashFunction.hash(key)) { + firstOracle.set(pos); + } + key = randomKey(random); + second.add(key); + for (int pos : hashFunction.hash(key)) { + secondOracle.set(pos); + } + } + + InternalBloomFilter orFilter = copy(first); + orFilter.or(second); + BitSet orOracle = (BitSet) firstOracle.clone(); + orOracle.or(secondOracle); + assertArrayEquals(oracleBits(vectorSize, orOracle), serializedBits(orFilter)); + + InternalBloomFilter andFilter = copy(first); + andFilter.and(second); + BitSet andOracle = (BitSet) firstOracle.clone(); + andOracle.and(secondOracle); + assertArrayEquals(oracleBits(vectorSize, andOracle), serializedBits(andFilter)); + + InternalBloomFilter xorFilter = copy(first); + xorFilter.xor(second); + BitSet xorOracle = (BitSet) firstOracle.clone(); + xorOracle.xor(secondOracle); + assertArrayEquals(oracleBits(vectorSize, xorOracle), serializedBits(xorFilter)); + + InternalBloomFilter notFilter = copy(first); + notFilter.not(); + BitSet notOracle = (BitSet) firstOracle.clone(); + notOracle.flip(0, vectorSize); + assertArrayEquals(oracleBits(vectorSize, notOracle), serializedBits(notFilter)); + } + + @Test + public void testSerializedStringGoldens() { + BloomFilter simple = BloomFilterFactory.createBloomFilter( + 50, 0.001, -1, BloomFilterTypeCode.SIMPLE.name()); + for (int i = 0; i < 50; i++) { + simple.add(goldenKey(i)); + } + assertEquals(SIMPLE_GOLDEN, simple.serializeToString()); + BloomFilter simpleFromGolden = BloomFilterFactory.fromString(SIMPLE_GOLDEN, BloomFilterTypeCode.SIMPLE.name()); + for (int i = 0; i < 50; i++) { + assertTrue(simpleFromGolden.mightContain(goldenKey(i))); + } + + BloomFilter dynamic = BloomFilterFactory.createBloomFilter( + 20, 0.001, 60, BloomFilterTypeCode.DYNAMIC_V0.name()); + for (int i = 0; i < 100; i++) { + dynamic.add(goldenKey(i)); + } + assertEquals(DYNAMIC_GOLDEN, dynamic.serializeToString()); + BloomFilter dynamicFromGolden = BloomFilterFactory.fromString(DYNAMIC_GOLDEN, BloomFilterTypeCode.DYNAMIC_V0.name()); + for (int i = 0; i < 100; i++) { + assertTrue(dynamicFromGolden.mightContain(goldenKey(i))); + } + } + + private static String goldenKey(int i) { + return String.format("key-%03d", i); + } + + private static Key randomKey(Random random) { + byte[] keyBytes = new byte[1 + random.nextInt(40)]; + random.nextBytes(keyBytes); + return new Key(keyBytes); + } + + /** Serializes the filter and strips the 13-byte header (version, nbHash, hashType, vectorSize). */ + private static byte[] serializedBits(InternalBloomFilter filter) throws IOException { + byte[] serialized = serialize(filter); + return Arrays.copyOfRange(serialized, 13, serialized.length); + } + + private static byte[] serialize(InternalBloomFilter filter) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + filter.write(new DataOutputStream(baos)); + return baos.toByteArray(); + } + + /** Packs the oracle bits with the Hadoop BloomFilter byte layout: bit i at byte i >> 3, mask 1 << (i & 7). */ + private static byte[] oracleBits(int vectorSize, BitSet bits) { + byte[] bytes = new byte[(vectorSize + 7) / 8]; + for (int i = 0; i < vectorSize; i++) { + if (bits.get(i)) { + bytes[i >> 3] |= (byte) (1 << (i & 7)); + } + } + return bytes; + } + + private static InternalBloomFilter copy(InternalBloomFilter filter) throws IOException { + InternalBloomFilter copied = new InternalBloomFilter(); + copied.readFields(new DataInputStream(new ByteArrayInputStream(serialize(filter)))); + return copied; + } +}