Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 99CF4200CA4 for ; Wed, 7 Jun 2017 11:39:11 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 985CE160BE5; Wed, 7 Jun 2017 09:39:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DFE23160BE2 for ; Wed, 7 Jun 2017 11:39:09 +0200 (CEST) Received: (qmail 52525 invoked by uid 500); 7 Jun 2017 09:39:07 -0000 Mailing-List: contact commits-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list commits@apex.apache.org Received: (qmail 52516 invoked by uid 99); 7 Jun 2017 09:39:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jun 2017 09:39:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6D636DFE22; Wed, 7 Jun 2017 09:39:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bhupesh@apache.org To: commits@apex.apache.org Date: Wed, 07 Jun 2017 09:39:07 -0000 Message-Id: <679ebf2c7db34627b6388cdffef92215@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] apex-malhar git commit: APEXMALHAR-2366 #resolve #comment Apply BloomFilter to Bucket, use internal BloomFilter archived-at: Wed, 07 Jun 2017 09:39:11 -0000 Repository: apex-malhar Updated Branches: refs/heads/master 349be9d9d -> 07869c829 APEXMALHAR-2366 #resolve #comment Apply BloomFilter to Bucket, use internal BloomFilter Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/3c3a0177 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/3c3a0177 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/3c3a0177 Branch: refs/heads/master Commit: 3c3a01777329252aaa46a39e52ab9a190dbfb74f Parents: 349be9d Author: brightchen Authored: Mon Dec 5 11:34:48 2016 -0800 Committer: Pramod Immaneni Committed: Wed Jun 7 00:54:18 2017 -0700 ---------------------------------------------------------------------- .../apex/malhar/lib/state/managed/Bucket.java | 100 +++- .../lib/state/managed/SliceBloomFilter.java | 459 +++++++++++++++++++ .../lib/state/managed/DefaultBucketTest.java | 35 ++ .../state/managed/ManagedTimeStateImplTest.java | 3 + .../ManagedTimeUnifiedStateImplTest.java | 6 + .../lib/state/managed/SliceBloomFilterTest.java | 191 ++++++++ 6 files changed, 793 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3c3a0177/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java index 6292fe2..8e1112e 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java @@ -232,6 +232,12 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide protected ConcurrentLinkedQueue windowsForFreeMemory = new ConcurrentLinkedQueue<>(); + private static boolean disableBloomFilterByDefault = false; + private boolean disableBloomFilter = disableBloomFilterByDefault; + private static int bloomFilterDefaultBitSize = 1000000; + private SliceBloomFilter bloomFilter = null; + private int bloomFilterBitSize = bloomFilterDefaultBitSize; + private DefaultBucket() { //for kryo @@ -247,6 +253,9 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide public void setup(@NotNull ManagedStateContext managedStateContext) { this.managedStateContext = Preconditions.checkNotNull(managedStateContext, "managed state context"); + if (!disableBloomFilter && bloomFilter == null) { + bloomFilter = new SliceBloomFilter(bloomFilterBitSize, 0.99); + } } @Override @@ -353,6 +362,33 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide } } + + private int filteredCount = 0; + private int unfilteredCount = 0; + private static final int STATISTICS_BARRIER_NUM = 10000; + /** + * Test the result of using bloom filter and remove it if it not helpful for improving the performance. + * + * @param mightContain true if collection might contain the value + */ + private void verifyBloomFilter(boolean mightContain) + { + if (!mightContain) { + filteredCount++; + } else { + unfilteredCount++; + } + + if (unfilteredCount + filteredCount > STATISTICS_BARRIER_NUM && unfilteredCount > filteredCount * 4) { + unloadBloomFilter(); + } + } + + private void unloadBloomFilter() + { + bloomFilter = null; + } + /** * Returns the value for the key from a valid time-bucket reader. Here, valid means the time bucket which is not purgeable. * If the timebucketAssigner is of type MovingBoundaryTimeBucketAssigner and the time bucket is purgeable, then return null. @@ -362,11 +398,21 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide */ private BucketedValue getValueFromTimeBucketReader(Slice key, long timeBucket) { - if (managedStateContext.getTimeBucketAssigner() instanceof MovingBoundaryTimeBucketAssigner && timeBucket <= ((MovingBoundaryTimeBucketAssigner)managedStateContext.getTimeBucketAssigner()).getLowestPurgeableTimeBucket()) { return null; } + + if (bloomFilter != null) { + boolean mightContain = bloomFilter.mightContain(key); + + verifyBloomFilter(mightContain); + + if (!mightContain) { + return null; + } + } + FileAccess.FileReader fileReader = readers.get(timeBucket); if (fileReader != null) { return readValue(fileReader, key, timeBucket); @@ -417,6 +463,7 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide { // This call is lightweight releaseMemory(); + key = SliceUtils.toBufferSlice(key); value = SliceUtils.toBufferSlice(value); @@ -460,6 +507,14 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide entryIter.remove(); for (Map.Entry entry : windowData.entrySet()) { + /** + * The data still in memory and reachable before the memory released + * So put key into bloom filter here + */ + if (bloomFilter != null) { + bloomFilter.put(entry.getKey()); + } + memoryFreed += entry.getKey().length + entry.getValue().getSize(); } } @@ -511,6 +566,7 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide public Map checkpoint(long windowId) { releaseMemory(); + try { //transferring the data from flash to check-pointed state in finally block and re-initializing the flash. return flash; @@ -635,6 +691,48 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide return valueStream; } + public static int getBloomFilterDefaultBitSize() + { + return bloomFilterDefaultBitSize; + } + + public static void setBloomFilterDefaultBitSize(int bloomFilterDefaultBitSize) + { + DefaultBucket.bloomFilterDefaultBitSize = bloomFilterDefaultBitSize; + } + + public int getBloomFilterBitSize() + { + return bloomFilterBitSize; + } + + public void setBloomFilterBitSize(int bloomFilterBitSize) + { + this.bloomFilterBitSize = bloomFilterBitSize; + } + + public static boolean isDisableBloomFilterByDefault() + { + return disableBloomFilterByDefault; + } + + public static void setDisableBloomFilterByDefault(boolean disableBloomFilterByDefault) + { + DefaultBucket.disableBloomFilterByDefault = disableBloomFilterByDefault; + } + + public boolean isDisableBloomFilter() + { + return disableBloomFilter; + } + + public void setDisableBloomFilter(boolean disableBloomFilter) + { + this.disableBloomFilter = disableBloomFilter; + this.unloadBloomFilter(); + } + + private static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3c3a0177/library/src/main/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilter.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilter.java new file mode 100644 index 0000000..4fd9e02 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilter.java @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.util.BitSet; + +import com.datatorrent.netlet.util.Slice; + +/** + * This class implemented BloomFilter algorithm, the key is Slice + * + */ +public class SliceBloomFilter +{ + private BitSet bitset; + private int bitSetSize; + private int expectedNumberOfFilterElements; // expected (maximum) number of elements to be added + private int numberOfAddedElements; // number of elements actually added to the Bloom filter + private int numberOfHashes; // number of hash functions + protected transient HashFunction hasher = new HashFunction(); + + /** + * Set the attributes to the empty Bloom filter. The total length of the Bloom + * filter will be bitsPerElement*expectedNumberOfFilterElements. + * + * @param bitsPerElement + * is the number of bits used per element. + * @param expectedNumberOfFilterElements + * is the expected number of elements the filter will contain. + * @param numberOfHashes + * is the number of hash functions used. + */ + private void SetAttributes(double bitsPerElement, int expectedNumberOfFilterElements, int numberOfHashes) + { + this.expectedNumberOfFilterElements = expectedNumberOfFilterElements; + this.numberOfHashes = numberOfHashes; + this.bitSetSize = (int)Math.ceil(bitsPerElement * expectedNumberOfFilterElements); + numberOfAddedElements = 0; + this.bitset = new BitSet(bitSetSize); + } + + private SliceBloomFilter() + { + //for kyro + } + + /** + * Constructs an empty Bloom filter with a given false positive probability. + * + * @param expectedNumberOfElements + * is the expected number of elements the filter will contain. + * @param falsePositiveProbability + * is the desired false positive probability. + */ + public SliceBloomFilter(int expectedNumberOfElements, double falsePositiveProbability) + { + if (this.bitset == null) { + SetAttributes(Math.ceil(-(Math.log(falsePositiveProbability) / Math.log(2))) / Math.log(2), // c = k / ln(2) + expectedNumberOfElements, (int)Math.ceil(-(Math.log(falsePositiveProbability) / Math.log(2)))); + } + } + + /** + * Generate integer array based on the hash function till the number of + * hashes. + * + * @param slice + * specifies input slice. + * @return array of int-sized hashes + */ + private int[] createHashes(Slice slice) + { + int[] result = new int[numberOfHashes]; + long hash64 = hasher.hash(slice); + // apply the less hashing technique + int hash1 = (int)hash64; + int hash2 = (int)(hash64 >>> 32); + for (int i = 1; i <= numberOfHashes; i++) { + int nextHash = hash1 + i * hash2; + if (nextHash < 0) { + nextHash = ~nextHash; + } + result[i - 1] = nextHash; + } + return result; + } + + /** + * Calculates the expected probability of false positives based on the number + * of expected filter elements and the size of the Bloom filter. + * + * @return expected probability of false positives. + */ + public double expectedFalsePositiveProbability() + { + return getFalsePositiveProbability(expectedNumberOfFilterElements); + } + + /** + * Calculate the probability of a false positive given the specified number of + * inserted elements. + * + * @param numberOfElements + * number of inserted elements. + * @return probability of a false positive. + */ + public double getFalsePositiveProbability(double numberOfElements) + { + // (1 - e^(-k * n / m)) ^ k + return Math.pow((1 - Math.exp(-numberOfHashes * numberOfElements / bitSetSize)), numberOfHashes); + } + + /** + * Get the current probability of a false positive. The probability is + * calculated from the size of the Bloom filter and the current number of + * elements added to it. + * + * @return probability of false positives. + */ + public double getFalsePositiveProbability() + { + return getFalsePositiveProbability(numberOfAddedElements); + } + + /** + * Returns the value chosen for numberOfHashes.
+ *
+ * numberOfHashes is the optimal number of hash functions based on the size of + * the Bloom filter and the expected number of inserted elements. + * + * @return optimal numberOfHashes. + */ + public int getNumberOfHashes() + { + return numberOfHashes; + } + + /** + * Sets all bits to false in the Bloom filter. + */ + public void clear() + { + bitset.clear(); + numberOfAddedElements = 0; + } + + /** + * Adds a slice of byte array to the Bloom filter. + * + * @param slice + * slice of byte array to add to the Bloom filter. + */ + public void put(Slice slice) + { + int[] hashes = createHashes(slice); + for (int hash : hashes) { + bitset.set(Math.abs(hash % bitSetSize), true); + } + numberOfAddedElements++; + } + + /** + * Returns true if the slice of byte array could have been inserted into the Bloom + * filter. Use getFalsePositiveProbability() to calculate the probability of + * this being correct. + * + * @param slice + * slice of byte array to check. + * @return true if the array could have been inserted into the Bloom filter. + */ + public boolean mightContain(Slice slice) + { + int[] hashes = createHashes(slice); + for (int hash : hashes) { + if (!bitset.get(Math.abs(hash % bitSetSize))) { + return false; + } + } + return true; + } + + /** + * Read a single bit from the Bloom filter. + * + * @param bit + * the bit to read. + * @return true if the bit is set, false if it is not. + */ + public boolean getBit(int bit) + { + return bitset.get(bit); + } + + /** + * Set a single bit in the Bloom filter. + * + * @param bit + * is the bit to set. + * @param value + * If true, the bit is set. If false, the bit is cleared. + */ + public void setBit(int bit, boolean value) + { + bitset.set(bit, value); + } + + /** + * Return the bit set used to store the Bloom filter. + * + * @return bit set representing the Bloom filter. + */ + public BitSet getBitSet() + { + return bitset; + } + + public void setBitSet(BitSet bitset) + { + this.bitset = bitset; + } + + /** + * Returns the number of bits in the Bloom filter. Use count() to retrieve the + * number of inserted elements. + * + * @return the size of the bitset used by the Bloom filter. + */ + public int size() + { + return this.bitSetSize; + } + + /** + * Returns the number of elements added to the Bloom filter after it was + * constructed or after clear() was called. + * + * @return number of elements added to the Bloom filter. + */ + public int count() + { + return this.numberOfAddedElements; + } + + /** + * Returns the expected number of elements to be inserted into the filter. + * This value is the same value as the one passed to the constructor. + * + * @return expected number of elements. + */ + public int getExpectedNumberOfElements() + { + return expectedNumberOfFilterElements; + } + + /** + * Get actual number of bits per element based on the number of elements that + * have currently been inserted and the length of the Bloom filter. See also + * getExpectedBitsPerElement(). + * + * @return number of bits per element. + */ + public double getBitsPerElement() + { + return this.bitSetSize / (double)numberOfAddedElements; + } + + /** + * Set the hasher in the Bloom filter. + * + * @param hasher + * is the hash function to set. + */ + + public void setHasher(HashFunction hasher) + { + this.hasher = hasher; + } + + public static final class HashFunction + { + private static final long SEED = 0x7f3a21eaL; + private static long X64_128_C1 = 0x87c37b91114253d5L; + private static long X64_128_C2 = 0x4cf5ad432745937fL; + /** + * Helps convert a byte into its unsigned value + */ + public static final int UNSIGNED_MASK = 0xff; + + /** + * Return the hash of the bytes as long. + * + * @param bytes + * the bytes to be hashed + * + * @return the generated hash value + */ + public long hash(Slice slice) + { + long h1 = SEED; + long h2 = SEED; + + //the offset related to the begin of slice + int relativeOffset = 0; + while (slice.length - relativeOffset >= 16) { + long k1 = getLong(slice, slice.offset + relativeOffset); + relativeOffset += 8; //size of long count by int + long k2 = getLong(slice, slice.offset + relativeOffset); + relativeOffset += 8; + + h1 ^= mixK1(k1); + + h1 = Long.rotateLeft(h1, 27); + h1 += h2; + h1 = h1 * 5 + 0x52dce729; + + h2 ^= mixK2(k2); + + h2 = Long.rotateLeft(h2, 31); + h2 += h1; + h2 = h2 * 5 + 0x38495ab5; + } + + if (slice.length > relativeOffset) { + long k1 = 0; + long k2 = 0; + int absoluteOffset = slice.offset + relativeOffset; + switch (slice.length - relativeOffset) { + case 15: + k2 ^= (long)(slice.buffer[absoluteOffset + 14] & UNSIGNED_MASK) << 48; // fall through + + case 14: + k2 ^= (long)(slice.buffer[absoluteOffset + 13] & UNSIGNED_MASK) << 40; // fall through + + case 13: + k2 ^= (long)(slice.buffer[absoluteOffset + 12] & UNSIGNED_MASK) << 32; // fall through + + case 12: + k2 ^= (long)(slice.buffer[absoluteOffset + 11] & UNSIGNED_MASK) << 24; // fall through + + case 11: + k2 ^= (long)(slice.buffer[absoluteOffset + 10] & UNSIGNED_MASK) << 16; // fall through + + case 10: + k2 ^= (long)(slice.buffer[absoluteOffset + 9] & UNSIGNED_MASK) << 8; // fall through + + case 9: + k2 ^= slice.buffer[absoluteOffset + 8] & UNSIGNED_MASK; // fall through + + case 8: + k1 ^= getLong(slice, absoluteOffset); + break; + + case 7: + k1 ^= (long)(slice.buffer[absoluteOffset + 6] & UNSIGNED_MASK) << 48; // fall through + + case 6: + k1 ^= (long)(slice.buffer[absoluteOffset + 5] & UNSIGNED_MASK) << 40; // fall through + + case 5: + k1 ^= (long)(slice.buffer[absoluteOffset + 4] & UNSIGNED_MASK) << 32; // fall through + + case 4: + k1 ^= (long)(slice.buffer[absoluteOffset + 3] & UNSIGNED_MASK) << 24; // fall through + + case 3: + k1 ^= (long)(slice.buffer[absoluteOffset + 2] & UNSIGNED_MASK) << 16; // fall through + + case 2: + k1 ^= (long)(slice.buffer[absoluteOffset + 1] & UNSIGNED_MASK) << 8; // fall through + + case 1: + k1 ^= slice.buffer[absoluteOffset] & UNSIGNED_MASK; + break; + + default: + throw new AssertionError("Code should not reach here!"); + } + + // mix + h1 ^= mixK1(k1); + h2 ^= mixK2(k2); + } + + // ---------- + // finalization + + h1 ^= slice.length; + h2 ^= slice.length; + + h1 += h2; + h2 += h1; + + h1 = fmix64(h1); + h2 = fmix64(h2); + + h1 += h2; + h2 += h1; + + return h1; + } + + private static long getLong(Slice slice, int absoluteOffset) + { + return ((((long)slice.buffer[absoluteOffset++]) << 56) | + (((long)slice.buffer[absoluteOffset++] & 0xff) << 48) | + (((long)slice.buffer[absoluteOffset++] & 0xff) << 40) | + (((long)slice.buffer[absoluteOffset++] & 0xff) << 32) | + (((long)slice.buffer[absoluteOffset++] & 0xff) << 24) | + (((long)slice.buffer[absoluteOffset++] & 0xff) << 16) | + (((long)slice.buffer[absoluteOffset++] & 0xff) << 8) | + (((long)slice.buffer[absoluteOffset++] & 0xff) )); + } + + private static long mixK1(long k1) + { + k1 *= X64_128_C1; + k1 = Long.rotateLeft(k1, 31); + k1 *= X64_128_C2; + + return k1; + } + + private static long mixK2(long k2) + { + k2 *= X64_128_C2; + k2 = Long.rotateLeft(k2, 33); + k2 *= X64_128_C1; + + return k2; + } + + private static long fmix64(long k) + { + k ^= k >>> 33; + k *= 0xff51afd7ed558ccdL; + k ^= k >>> 33; + k *= 0xc4ceb9fe1a85ec53L; + k ^= k >>> 33; + + return k; + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3c3a0177/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java index 8a63f5a..8bbc3dc 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java @@ -28,6 +28,7 @@ import org.junit.Test; import org.junit.rules.TestWatcher; import org.junit.runner.Description; +import org.apache.apex.malhar.lib.state.managed.Bucket.DefaultBucket; import org.apache.apex.malhar.lib.state.managed.Bucket.ReadSource; import org.apache.apex.malhar.lib.utils.serde.AffixSerde; import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer; @@ -52,6 +53,9 @@ public class DefaultBucketTest @Override protected void starting(Description description) { + //lots of test case get around the normal workflow and directly write to file. So should disable bloom filter + DefaultBucket.setDisableBloomFilterByDefault(true); + TestUtils.deleteTargetTestClassFolder(description); managedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(9)); applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); @@ -193,6 +197,8 @@ public class DefaultBucketTest @Test public void testFreeMemory() throws IOException { + DefaultBucket.setDisableBloomFilterByDefault(false); + testMeta.defaultBucket.setup(testMeta.managedStateContext); testGetFromReader(); long initSize = testMeta.defaultBucket.getSizeInBytes(); @@ -227,4 +233,33 @@ public class DefaultBucketTest testMeta.defaultBucket.teardown(); } + + @Test + public void testBloomFilter() throws IOException + { + testMeta.defaultBucket.setDisableBloomFilter(false); + testMeta.defaultBucket.setup(testMeta.managedStateContext); + final int itemSize = 1000; + final int bucketId = 1; + for (int i = 0; i < itemSize; i += 2) { + //put only even value + Slice keyAndValue = ManagedStateTestUtils.getSliceFor(String.valueOf(i)); + testMeta.defaultBucket.put(keyAndValue, bucketId, keyAndValue); + } + + testMeta.defaultBucket.freeMemory(Long.MAX_VALUE); + + for (int i = 0; i < itemSize; ++i) { + //put only even value + Slice key = ManagedStateTestUtils.getSliceFor(String.valueOf(i)); + Slice value = testMeta.defaultBucket.get(key, bucketId, ReadSource.ALL); + if ((i & 0x01) == 0) { + Assert.assertEquals(key, value); + } else { + Assert.assertTrue(value == null); + } + } + + testMeta.defaultBucket.teardown(); + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3c3a0177/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java index 2882828..d5f4856 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java @@ -30,6 +30,8 @@ import org.junit.Test; import org.junit.rules.TestWatcher; import org.junit.runner.Description; +import org.apache.apex.malhar.lib.state.managed.Bucket.DefaultBucket; + import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; import com.datatorrent.api.Context.OperatorContext; @@ -83,6 +85,7 @@ public class ManagedTimeStateImplTest Slice zero = ManagedStateTestUtils.getSliceFor("0"); long time = System.currentTimeMillis(); + DefaultBucket.setDisableBloomFilterByDefault(true); testMeta.managedState.setup(testMeta.operatorContext); Map unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, time); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3c3a0177/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java index 42ab187..faa6f71 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java @@ -30,6 +30,8 @@ import org.junit.Test; import org.junit.rules.TestWatcher; import org.junit.runner.Description; +import org.apache.apex.malhar.lib.state.managed.Bucket.DefaultBucket; + import com.datatorrent.api.Context; import com.datatorrent.lib.fileaccess.FileAccessFSImpl; import com.datatorrent.lib.util.TestUtils; @@ -116,6 +118,8 @@ public class ManagedTimeUnifiedStateImplTest @Test public void testSyncGetFromFiles() throws IOException, ExecutionException, InterruptedException { + DefaultBucket.setDisableBloomFilterByDefault(true); + Slice zero = ManagedStateTestUtils.getSliceFor("0"); long time = System.currentTimeMillis(); @@ -138,6 +142,8 @@ public class ManagedTimeUnifiedStateImplTest @Test public void testAsyncSyncGetFromFiles() throws IOException, ExecutionException, InterruptedException { + DefaultBucket.setDisableBloomFilterByDefault(true); + Slice zero = ManagedStateTestUtils.getSliceFor("0"); long time = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3c3a0177/library/src/test/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilterTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilterTest.java new file mode 100644 index 0000000..9fce3a5 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilterTest.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.util.Random; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.netlet.util.Slice; + +public class SliceBloomFilterTest +{ + private int loop = 100000; + + @Test + public void testBloomFilterForBytes() + { + final int maxSliceLength = 1000; + Random random = new Random(); + final byte[] bytes = new byte[loop + maxSliceLength]; + random.nextBytes(bytes); + + long beginTime = System.currentTimeMillis(); + SliceBloomFilter bloomFilter = new SliceBloomFilter(100000, 0.99); + for (int i = 0; i < loop; i++) { + bloomFilter.put(new Slice(bytes, i, i % maxSliceLength + 1)); + } + + for (int i = 0; i < loop; i++) { + Assert.assertTrue(bloomFilter.mightContain(new Slice(bytes, i, i % maxSliceLength + 1))); + } + } + + @Test + public void testBloomFilterForInt() + { + testBloomFilterForInt(2); + testBloomFilterForInt(3); + testBloomFilterForInt(5); + testBloomFilterForInt(7); + } + + public void testBloomFilterForInt(int span) + { + double expectedFalseProbability = 0.3; + SerializationBuffer buffer = SerializationBuffer.READ_BUFFER; + + SliceBloomFilter bloomFilter = new SliceBloomFilter(loop, expectedFalseProbability); + + for (int i = 0; i < loop; i++) { + if (i % span == 0) { + buffer.writeInt(i); + bloomFilter.put(buffer.toSlice()); + } + } + buffer.getWindowedBlockStream().releaseAllFreeMemory(); + + int falsePositive = 0; + for (int i = 0; i < loop; i++) { + buffer.writeInt(i); + if (!bloomFilter.mightContain(buffer.toSlice())) { + Assert.assertTrue(i % span != 0); + } else { + // BF says its present + if (i % 2 != 0) { + // But was not there + falsePositive++; + } + } + } + buffer.getWindowedBlockStream().releaseAllFreeMemory(); + // Verify false positive prob + double falsePositiveProb = falsePositive; + falsePositiveProb /= loop; + Assert.assertTrue(falsePositiveProb <= expectedFalseProbability); + + for (int i = 0; i < loop; i++) { + if (i % span == 0) { + buffer.writeInt(i); + Assert.assertTrue(bloomFilter.mightContain(buffer.toSlice())); + } + } + buffer.getWindowedBlockStream().releaseAllFreeMemory(); + } + + private static class FilterOperator extends BaseOperator + { + private SliceBloomFilter bloomFilter = new SliceBloomFilter(10000, 0.99); + private SerializationBuffer buffer = SerializationBuffer.READ_BUFFER; + + public final transient DefaultInputPort input = new DefaultInputPort() + { + @Override + public void process(String tuple) + { + processTuple(tuple); + } + }; + + @Override + public void setup(Context.OperatorContext context) + { + } + + private int count = 0; + + public void processTuple(String tuple) + { + buffer.writeString(tuple); + bloomFilter.mightContain(buffer.toSlice()); + buffer.reset(); + } + } + + private static class TestInputOperator extends BaseOperator implements InputOperator + { + public final transient DefaultOutputPort data = new DefaultOutputPort(); + private int current = 0; + + @Override + public void emitTuples() + { + data.emit("" + current++); + } + } + + /** + * Just test SliceBloomFilter can be used by operator. such as it is serializable etc + * @throws Exception + */ + @Test + public void testBloomFilterForApplication() throws Exception + { + Configuration conf = new Configuration(false); + + LocalMode lma = LocalMode.newInstance(); + DAG dag = lma.getDAG(); + + TestInputOperator generator = new TestInputOperator(); + dag.addOperator("Generator", generator); + + FilterOperator filterOperator = new FilterOperator(); + dag.addOperator("filterOperator", filterOperator); + dag.addStream("Data", generator.data, filterOperator.input).setLocality(Locality.CONTAINER_LOCAL); + + StreamingApplication app = new StreamingApplication() + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + } + }; + + lma.prepareDAG(app, conf); + + // Create local cluster + final LocalMode.Controller lc = lma.getController(); + lc.run(3000); + + lc.shutdown(); + } +}