From commits-return-21345-archive-asf-public=cust-asf.ponee.io@phoenix.apache.org Fri May 18 19:44:20 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0B2CF180648 for ; Fri, 18 May 2018 19:44:18 +0200 (CEST) Received: (qmail 13532 invoked by uid 500); 18 May 2018 17:44:18 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 13523 invoked by uid 99); 18 May 2018 17:44:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 May 2018 17:44:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 05933DFB19; Fri, 18 May 2018 17:44:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vincentpoon@apache.org To: commits@phoenix.apache.org Message-Id: <9844e54b029a4f8081d66c61d8f51315@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: phoenix git commit: PHOENIX-4724 Efficient Equi-Depth histogram for streaming data Date: Fri, 18 May 2018 17:44:18 +0000 (UTC) Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.1 04af584ec -> 933dd0b80 PHOENIX-4724 Efficient Equi-Depth histogram for streaming data Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/933dd0b8 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/933dd0b8 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/933dd0b8 Branch: refs/heads/4.x-HBase-1.1 Commit: 933dd0b8048db56b33e7d750a69503ed857d85c9 Parents: 04af584 Author: Vincent Poon Authored: Thu May 3 17:07:27 2018 -0700 Committer: Vincent Poon Committed: Fri May 18 10:44:11 2018 -0700 ---------------------------------------------------------------------- .../phoenix/util/EquiDepthStreamHistogram.java | 453 +++++++++++++++++++ .../util/EquiDepthStreamHistogramTest.java | 303 +++++++++++++ 2 files changed, 756 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/933dd0b8/phoenix-core/src/main/java/org/apache/phoenix/util/EquiDepthStreamHistogram.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/EquiDepthStreamHistogram.java b/phoenix-core/src/main/java/org/apache/phoenix/util/EquiDepthStreamHistogram.java new file mode 100644 index 0000000..7649933 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/EquiDepthStreamHistogram.java @@ -0,0 +1,453 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +/** + * Equi-Depth histogram based on http://web.cs.ucla.edu/~zaniolo/papers/Histogram-EDBT2011-CamReady.pdf, + * but without the sliding window - we assume a single window over the entire data set. + * + * Used to generate the bucket boundaries of a histogram where each bucket has the same # of items. + * This is useful, for example, for pre-splitting an index table, by feeding in data from the indexed column. + * Works on streaming data - the histogram is dynamically updated for each new value. + * + * Add values by calling addValue(), then at the end computeBuckets() can be called to get + * the buckets with their bounds. + * + * Average time complexity: O(log(B x p) + (B x p)/T) = nearly constant + * B = number of buckets, p = expansion factor constant, T = # of values + * + * Space complexity: different from paper since here we keep the blocked bars but don't have expiration, + * comes out to basically O(log(T)) + */ +public class EquiDepthStreamHistogram { + private static final Log LOG = LogFactory.getLog(EquiDepthStreamHistogram.class); + + // used in maxSize calculation for each bar + private static final double MAX_COEF = 1.7; + // higher expansion factor = better accuracy and worse performance + private static final short DEFAULT_EXPANSION_FACTOR = 7; + private int numBuckets; + private int maxBars; + @VisibleForTesting + long totalCount; // number of values - i.e. count across all bars + @VisibleForTesting + List bars; + + /** + * Create a new histogram + * @param numBuckets number of buckets, which can be used to get the splits + */ + public EquiDepthStreamHistogram(int numBuckets) { + this(numBuckets, DEFAULT_EXPANSION_FACTOR); + } + + /** + * @param numBuckets number of buckets + * @param expansionFactor number of bars = expansionFactor * numBuckets + * The more bars, the better the accuracy, at the cost of worse performance + */ + public EquiDepthStreamHistogram(int numBuckets, int expansionFactor) { + this.numBuckets = numBuckets; + this.maxBars = numBuckets * expansionFactor; + this.bars = new ArrayList<>(maxBars); + } + + /** + * Add a new value to the histogram, updating the count for the appropriate bucket + * @param value + */ + public void addValue(byte[] value) { + Bar bar = getBar(value); + bar.incrementCount(); + totalCount++; + // split the bar if necessary + if (bar.getSize() > getMaxBarSize()) { + splitBar(bar); + } + } + + /** + * Compute the buckets, which have the boundaries and estimated counts. + * Note that the right bound for the very last bucket is inclusive. + * The left and right bounds can be equivalent, for single value buckets. + * @return + */ + public List computeBuckets() { + Preconditions.checkState(bars.size() >= numBuckets, "Not enough data points to compute buckets"); + List buckets = new ArrayList<>(); + long idealBuckSize = (long) Math.ceil(totalCount / (double) numBuckets); + long currCount = 0; + int barsIdx = 0; + byte[] prevBound = bars.get(0).leftBoundInclusive; + Bar currBar = null; + for (int i = 0; i < numBuckets; i++) { + while (currCount <= idealBuckSize && barsIdx < bars.size()) { + currBar = bars.get(barsIdx++); + currCount += currBar.getSize(); + } + long surplus = Math.max(currCount - idealBuckSize, 0); + // deviate a bit from the paper here + // to estimate the bound, we split the range into 8 splits for a total of 10 including start/end + // then we calculate the % of the currBar's count we've used, and round down to the closest split + int closestSplitIdx = (int) ((1 - ((double) surplus / currBar.getSize())) * 9); + byte[][] splits = Bytes.split(currBar.leftBoundInclusive, currBar.rightBoundExclusive, 8); + Bucket bucket = new Bucket(prevBound, splits[closestSplitIdx]); + bucket.incrementCountEstimate(currCount - surplus); + prevBound = splits[closestSplitIdx]; + buckets.add(bucket); + currCount = surplus; + } + return buckets; + } + + /** + * @return total number of values added to this histogram + */ + public long getTotalCount() { + return totalCount; + } + + // attempts to split the given bar into two new bars + @VisibleForTesting + void splitBar(Bar origBar) { + // short circuit - don't split a bar of length 1 + if (Bytes.compareTo(origBar.leftBoundInclusive, origBar.rightBoundExclusive) == 0) { + return; + } + if (bars.size() == maxBars) { // max bars hit, need to merge two existing bars first + boolean mergeSuccessful = mergeBars(); + if (!mergeSuccessful) return; // don't split if we couldn't merge + } + byte[] mid = Bytes.split(origBar.getLeftBoundInclusive(), origBar.getRightBoundExclusive(), 1)[1]; + Bar newLeft = new Bar(origBar.getLeftBoundInclusive(), mid); + Bar newRight = new Bar(mid, origBar.getRightBoundExclusive()); + // distribute blocked bars between the new bars + long leftSize = 0; + long bbAggCount = origBar.getBlockedBarsSize(); + for (Bar bb : origBar.getBlockedBars()) { + long bbSize = bb.getSize(); + if (leftSize + bbSize < bbAggCount/2) { + leftSize += bbSize; + newLeft.addBlockedBar(bb); + } else { + newRight.addBlockedBar(bb); + } + } + // at this point the two new bars may have different counts, + // distribute the rest of origBar's count to make them as close as possible + long countToDistribute = origBar.getSize() - bbAggCount; + long rightSize = newRight.getSize(); + long sizeDiff = Math.abs(leftSize - rightSize); + Bar smallerBar = leftSize <= rightSize ? newLeft : newRight; + if (sizeDiff <= countToDistribute) { + smallerBar.incrementCount(sizeDiff); + countToDistribute -= sizeDiff; + long halfDistrib = countToDistribute / 2; + newLeft.incrementCount(halfDistrib); + newRight.incrementCount(countToDistribute - halfDistrib); + } else { + smallerBar.incrementCount(countToDistribute); + } + if (LOG.isTraceEnabled()) { + LOG.trace(String.format("Split orig=%s , newLeft=%s , newRight=%s", origBar, newLeft, newRight)); + } + bars.remove(origBar); + bars.add(newLeft); + bars.add(newRight); + // technically don't need to sort here, as we can get the index from getBar, + // and put the new bars in the same index. But we'd have to handle merge as well, + // doable but not worth the more complicated code since bars.size is fixed and generally small + Collections.sort(bars); + } + + //Merges the two adjacent bars with the lowest summed count + @VisibleForTesting + boolean mergeBars() { + Preconditions.checkState(bars.size() > 1, "Need at least two bars to merge"); + // pairwise search for the two bars with the smallest summed count + int currIdx = 0; + Bar currBar = bars.get(currIdx); + Bar nextBar = bars.get(currIdx + 1); + long currMinSum = Long.MAX_VALUE; + int currMinIdx = currIdx; // keep this for fast removal from ArrayList later + Pair minBars = new Pair<>(currBar, nextBar); + while (nextBar != null) { + long sum = currBar.getSize() + nextBar.getSize(); + if (sum < currMinSum) { + currMinSum = sum; + minBars = new Pair<>(currBar, nextBar); + currMinIdx = currIdx; + } + currBar = nextBar; + nextBar = ++currIdx < bars.size() - 1 ? bars.get(currIdx+1) : null; + } + // don't want to merge bars into one that will just need an immediate split again + if (currMinSum >= getMaxBarSize()) { + return false; + } + // do the merge + Bar leftBar = minBars.getFirst(); + Bar rightBar = minBars.getSecond(); + Bar newBar = new Bar(leftBar.getLeftBoundInclusive(), rightBar.getRightBoundExclusive()); + if (leftBar.getSize() >= rightBar.getSize()) { + newBar.incrementCount(rightBar.getCount()); // count of rightBar without its blocked bars + // this just adds the leftBar without its blocked bars, as we don't want nested blocked bars + // the leftBar's blocked bars are added later below + newBar.addBlockedBar(new Bar(leftBar)); + } else { + newBar.incrementCount(leftBar.getCount()); + newBar.addBlockedBar(new Bar(rightBar)); + } + newBar.addBlockedBars(leftBar.getBlockedBars()); + newBar.addBlockedBars(rightBar.getBlockedBars()); + bars.subList(currMinIdx, currMinIdx + 2).clear(); // remove minBars + bars.add(newBar); + Collections.sort(bars); + if (LOG.isTraceEnabled()) { + LOG.trace(String.format("Merged left=%s , right=%s , newBar=%s", leftBar, rightBar, newBar)); + } + return true; + } + + /** + * Get the appropriate bar for the value, extending existing bar bounds to accommodate if necessary + * @param value value to add + * @return the bar for the value + */ + @VisibleForTesting + Bar getBar(byte[] value) { + Bar searchKey = new Bar(value, value); + int searchIdx = Collections.binarySearch(this.bars, searchKey); + if (searchIdx < 0) { + // copy value so later changes by caller don't affect histogram results + byte[] newBound = Bytes.copy(value); + if (this.bars.size() == 0) { + Bar firstBar = new Bar(newBound, newBound); + bars.add(firstBar); + return firstBar; + } + int expectedIndex = Math.abs(searchIdx + 1); // jdk binary search index + if (expectedIndex == bars.size()) { // no bars >= value, need to extend rightBound of last bar + Bar lastBar = bars.get(expectedIndex - 1); + lastBar.setRightBoundExclusive(newBound); // actually inclusive for last bar + return lastBar; + } else { // extend leftBound of next greatest bar + Bar nextBar = bars.get(expectedIndex); + nextBar.setLeftBoundInclusive(newBound); + return nextBar; + } + } else { + return bars.get(searchIdx); + } + } + + private long getMaxBarSize() { + // from the paper, 1.7 has been "determined empirically" + // interpretation: We don't want a given bar to deviate more than 70% from its ideal target size + return (long) (MAX_COEF * (totalCount / maxBars)); + } + + public static class Bucket { + protected long count = 0; + protected byte[] leftBoundInclusive; + protected byte[] rightBoundExclusive; + + public Bucket(byte[] leftBoundInclusive, byte[] rightBoundExclusive) { + this.leftBoundInclusive = leftBoundInclusive; + this.rightBoundExclusive = rightBoundExclusive; + } + + public byte[] getLeftBoundInclusive() { + return leftBoundInclusive; + } + + public void setLeftBoundInclusive(byte[] leftBoundInclusive) { + this.leftBoundInclusive = leftBoundInclusive; + } + + public byte[] getRightBoundExclusive() { + return rightBoundExclusive; + } + + public void setRightBoundExclusive(byte[] rightBoundExclusive) { + this.rightBoundExclusive = rightBoundExclusive; + } + + public long getCountEstimate() { + return count; + } + + public void incrementCountEstimate(long count) { + this.count += count; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + Arrays.hashCode(leftBoundInclusive); + result = prime * result + Arrays.hashCode(rightBoundExclusive); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + Bucket other = (Bucket) obj; + if (!Arrays.equals(leftBoundInclusive, other.leftBoundInclusive)) return false; + if (!Arrays.equals(rightBoundExclusive, other.rightBoundExclusive)) return false; + return true; + } + + @Override + public String toString() { + return "Bucket [count=" + count + ", leftBoundInclusive=" + + Bytes.toString(leftBoundInclusive) + ", rightBoundExclusive=" + + Bytes.toString(rightBoundExclusive) + "]"; + } + } + + // Used internally to further subdivide each bucket + @VisibleForTesting + static class Bar extends Bucket implements Comparable { + private List blockedBars = new ArrayList<>(); // populated through a merge + + /** + * Create a new bar. Single value buckets can have leftBound = rightBound + * @param leftBoundInclusive + * @param rightBoundExclusive + */ + public Bar(byte[] leftBoundInclusive, byte[] rightBoundExclusive) { + super(leftBoundInclusive, rightBoundExclusive); + } + + /** + * Creates a copy of the passed in bar, but without any blocked bars + * @param bar + */ + public Bar(Bar bar) { + super(bar.leftBoundInclusive, bar.rightBoundExclusive); + this.count = bar.count; + } + + // Used to keep the bars sorted by bounds + @Override + public int compareTo(Bar other) { + // if one bar fully contains the other, they are considered the same. For binary search + int leftComp = Bytes.compareTo(this.leftBoundInclusive, other.leftBoundInclusive); + int rightComp = Bytes.compareTo(this.rightBoundExclusive, other.rightBoundExclusive); + if ((leftComp >= 0 && rightComp < 0) || (leftComp <= 0 && rightComp > 0) + || (leftComp == 0 && rightComp == 0)) { + return 0; + } + if (Bytes.compareTo(this.leftBoundInclusive, other.rightBoundExclusive) >= 0) { + return 1; + } + if (Bytes.compareTo(this.rightBoundExclusive, other.leftBoundInclusive) <= 0) { + return -1; + } + throw new AssertionError("Cannot not have overlapping bars"); + } + + /** + * @return The aggregate count of this bar and its blocked bars' counts + */ + public long getSize() { + long blockedBarSum = getBlockedBarsSize(); + return count + blockedBarSum; + } + + /** + * @return The sum of the counts of all the blocked bars + */ + public long getBlockedBarsSize() { + long blockedBarSum = 0; + for (Bar bb : blockedBars) { + blockedBarSum += bb.getSize(); + } + return blockedBarSum; + } + + public void addBlockedBar(Bar bar) { + blockedBars.add(bar); + } + + public void addBlockedBars(List bars) { + blockedBars.addAll(bars); + } + + public List getBlockedBars() { + return blockedBars; + } + + public long getCount() { + return this.count; + } + + public void incrementCount() { + count++; + } + + public void incrementCount(long increment) { + count += increment; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((blockedBars == null) ? 0 : blockedBars.hashCode()); + result = prime * result + (int) (count ^ (count >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + Bar other = (Bar) obj; + if (blockedBars == null) { + if (other.blockedBars != null) return false; + } else if (!blockedBars.equals(other.blockedBars)) return false; + if (count != other.count) return false; + return true; + } + + @Override + public String toString() { + return "Bar[count=" + count + ", blockedBars=" + blockedBars + ", leftBoundInclusive=" + + Bytes.toString(leftBoundInclusive) + ", rightBoundExclusive=" + + Bytes.toString(rightBoundExclusive) + "]"; + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/933dd0b8/phoenix-core/src/test/java/org/apache/phoenix/util/EquiDepthStreamHistogramTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/EquiDepthStreamHistogramTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/EquiDepthStreamHistogramTest.java new file mode 100644 index 0000000..9cf1f43 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/EquiDepthStreamHistogramTest.java @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.util.EquiDepthStreamHistogram.Bar; +import org.apache.phoenix.util.EquiDepthStreamHistogram.Bucket; +import org.junit.Before; +import org.junit.Test; + +public class EquiDepthStreamHistogramTest { + byte[] bytesA = Bytes.toBytes("a"); + byte[] bytesB = Bytes.toBytes("b"); + byte[] bytesC = Bytes.toBytes("c"); + byte[] bytesD = Bytes.toBytes("d"); + byte[] bytesE = Bytes.toBytes("e"); + Bar a_b; + Bar b_c; + Bar c_d; + Bar d_e; + + @Before + public void resetBars() { + a_b = new Bar(bytesA, bytesB); + b_c = new Bar(bytesB, bytesC); + c_d = new Bar(bytesC, bytesD); + d_e = new Bar(bytesD, bytesE); + } + + @Test + public void testComparator() { + // test ordering + List barList = new ArrayList<>(); + barList.add(b_c); + barList.add(c_d); + barList.add(a_b); + + Collections.sort(barList); + assertEquals(a_b, barList.get(0)); + assertEquals(b_c, barList.get(1)); + assertEquals(c_d, barList.get(2)); + + // test when a bar fully contains another + Bar a_a = new Bar(bytesA, bytesA); + assertEquals(0, a_b.compareTo(a_a)); + assertEquals(0, a_a.compareTo(a_b)); + assertEquals(1, b_c.compareTo(a_a)); + assertEquals(-1, a_a.compareTo(b_c)); + assertEquals(0, Collections.binarySearch(barList, a_a)); + assertEquals(1, Collections.binarySearch(barList, new Bar(bytesB, bytesB))); + assertEquals(-4, Collections.binarySearch(barList, new Bar(Bytes.toBytes("e"), Bytes.toBytes("e")))); + assertEquals(0, a_a.compareTo(a_a)); + } + + @Test + public void testGetBar() { + EquiDepthStreamHistogram histo = new EquiDepthStreamHistogram(10); + Bar bar = histo.getBar(bytesB); + assertTrue(Arrays.equals(bytesB, bar.getLeftBoundInclusive())); + assertEquals(1, histo.bars.size()); + assertTrue(bar == histo.getBar(bytesB)); + assertTrue(bar == histo.getBar(bytesA)); + assertTrue(bar == histo.getBar(bytesC)); + assertEquals(1, histo.bars.size()); + assertArrayEquals(bytesA, bar.getLeftBoundInclusive()); + assertArrayEquals(bytesC, bar.getRightBoundExclusive()); + + histo.bars = new ArrayList(); + histo.bars.add(b_c); + histo.bars.add(c_d); + assertEquals(b_c, histo.getBar(bytesB)); + assertEquals(c_d, histo.getBar(bytesC)); + + assertTrue(histo.getBar(bytesA) == b_c); + assertTrue(histo.getBar(bytesE) == c_d); + assertArrayEquals(bytesA, b_c.getLeftBoundInclusive()); + assertArrayEquals(bytesE, c_d.getRightBoundExclusive()); + } + + @Test + public void testMergeBars() { + EquiDepthStreamHistogram histo = new EquiDepthStreamHistogram(2, 1); + // test merge of two bars + histo.bars.add(a_b); + histo.bars.add(b_c); + histo.bars.add(c_d); + histo.bars.add(d_e); + histo.totalCount = 20; // maxBarCount of 1.7 * (10/2) = 17 + a_b.incrementCount(3); + b_c.incrementCount(2); + c_d.incrementCount(10); + d_e.incrementCount(5); + histo.mergeBars(); + assertEquals(3, histo.bars.size()); + Bar mergedBar = histo.bars.get(0); + assertEquals(5, mergedBar.getSize()); + assertArrayEquals(bytesA, mergedBar.getLeftBoundInclusive()); + assertArrayEquals(bytesC, mergedBar.getRightBoundExclusive()); + + // merge again a_c=5 c_d=10 d_e=5 + histo.mergeBars(); + assertEquals(2, histo.bars.size()); + mergedBar = histo.bars.get(0); + assertEquals(15, mergedBar.getSize()); + assertArrayEquals(bytesA, mergedBar.getLeftBoundInclusive()); + assertArrayEquals(bytesD, mergedBar.getRightBoundExclusive()); + + // a_d=15 d_e=5 , 20 > 17 so merge shouldn't happen + histo.mergeBars(); + assertEquals(2, histo.bars.size()); + } + + @Test + public void testSplitBar() { + EquiDepthStreamHistogram histo = new EquiDepthStreamHistogram(10); + Bar targetBar = new Bar(bytesA, bytesC); + targetBar.incrementCount(31); + histo.bars.add(targetBar); + histo.splitBar(targetBar); + assertEquals(2, histo.bars.size()); + Bar newLeft = histo.bars.get(0); + assertArrayEquals(bytesA, newLeft.getLeftBoundInclusive()); + assertArrayEquals(bytesB, newLeft.getRightBoundExclusive()); + assertEquals(15, newLeft.getSize()); + Bar newRight = histo.bars.get(1); + assertArrayEquals(bytesB, newRight.getLeftBoundInclusive()); + assertArrayEquals(bytesC, newRight.getRightBoundExclusive()); + assertEquals(16, newRight.getSize()); + + // test blocked bars are distributed correctly + histo.bars.clear(); + targetBar = new Bar(bytesA, bytesE); + targetBar.incrementCount(10); + a_b.incrementCount(3); + targetBar.addBlockedBar(a_b); + b_c.incrementCount(4); + targetBar.addBlockedBar(b_c); + c_d.incrementCount(2); + targetBar.addBlockedBar(c_d); + d_e.incrementCount(1); + targetBar.addBlockedBar(d_e); + histo.bars.add(targetBar); + histo.splitBar(targetBar); + newLeft = histo.bars.get(0); + newRight = histo.bars.get(1); + assertEquals(10, newLeft.getSize()); + assertEquals(a_b, newLeft.getBlockedBars().get(0)); + assertEquals(d_e, newLeft.getBlockedBars().get(1)); + assertEquals(10, newRight.getSize()); + assertEquals(b_c, newRight.getBlockedBars().get(0)); + assertEquals(c_d, newRight.getBlockedBars().get(1)); + } + + @Test + public void testAddValues() { + EquiDepthStreamHistogram histo = new EquiDepthStreamHistogram(3); + for (int i = 0; i < 100; i++) { + histo.addValue(Bytes.toBytes(i + "")); + } + // (expansion factor 7) * (3 buckets) + assertEquals(21, histo.bars.size()); + long total = 0; + for (Bar b : histo.bars) { + total += b.getSize(); + } + assertEquals(100, total); + } + + @Test + public void testComputeBuckets() { + EquiDepthStreamHistogram histo = new EquiDepthStreamHistogram(3); + histo.addValue(bytesA); + histo.addValue(bytesB); + histo.addValue(bytesC); + histo.addValue(bytesD); + histo.addValue(bytesE); + List buckets = histo.computeBuckets(); + assertEquals(3, buckets.size()); + Bucket bucket = buckets.get(0); + assertEquals(2, bucket.getCountEstimate()); + assertInBucket(bucket, bytesA); + assertInBucket(bucket, bytesB); + bucket = buckets.get(1); + assertEquals(2, bucket.getCountEstimate()); + assertInBucket(bucket, bytesC); + assertInBucket(bucket, bytesD); + bucket = buckets.get(2); + assertEquals(1, bucket.getCountEstimate()); + assertInBucketInclusive(bucket, bytesE); + + // test closestSplitIdx - total count is currently 5, idealBuckSize=2 + histo.bars.clear(); + a_b.incrementCount(); + histo.bars.add(a_b); + Bar b_d = new Bar(bytesB, bytesD); + b_d.incrementCount(3); // use 1/3 of this bar's count for first bucket + histo.bars.add(b_d); + histo.bars.add(d_e); + buckets = histo.computeBuckets(); + bucket = buckets.get(0); + // bound should be 1/3 of [bytesB, bytesD), + // since we used 1/3 of b_d's count for first bucket + byte[][] splits = Bytes.split(bytesB, bytesD, 8); + assertArrayEquals(splits[3], bucket.getRightBoundExclusive()); + bucket = buckets.get(1); + assertArrayEquals(splits[3], bucket.leftBoundInclusive); + } + + // check if the value lies in the bucket range + private void assertInBucket(Bucket bucket, byte[] value) { + assertTrue(Bytes.compareTo(value, bucket.getLeftBoundInclusive()) >= 0); + assertTrue(Bytes.compareTo(value, bucket.getRightBoundExclusive()) < 0); + } + + // right bound is inclusive + private void assertInBucketInclusive(Bucket bucket, byte[] value) { + assertTrue(Bytes.compareTo(value, bucket.getLeftBoundInclusive()) >= 0); + assertTrue(Bytes.compareTo(value, bucket.getRightBoundExclusive()) <= 0); + } + + /** + * Stream of data is has uniformly distributed values + */ + @Test + public void testUniformDistribution() { + EquiDepthStreamHistogram histo = new EquiDepthStreamHistogram(4); + for (int i = 0; i < 100000; i++) { + histo.addValue(Bytes.toBytes((i % 8) + "")); + } + Iterator buckets = histo.computeBuckets().iterator(); + Bucket bucket = buckets.next(); + assertEquals(25000, bucket.getCountEstimate()); + assertInBucket(bucket, Bytes.toBytes("0")); + assertInBucket(bucket, Bytes.toBytes("1")); + bucket = buckets.next(); + assertEquals(25000, bucket.getCountEstimate()); + assertInBucket(bucket, Bytes.toBytes("2")); + assertInBucket(bucket, Bytes.toBytes("3")); + bucket = buckets.next(); + assertEquals(25000, bucket.getCountEstimate()); + assertInBucket(bucket, Bytes.toBytes("4")); + assertInBucket(bucket, Bytes.toBytes("5")); + bucket = buckets.next(); + assertEquals(25000, bucket.getCountEstimate()); + assertInBucket(bucket, Bytes.toBytes("6")); + assertInBucket(bucket, Bytes.toBytes("7")); + } + + /** + * Stream of data is skewed Gaussian distribution with mean of 100 and standard deviation of 25 + */ + @Test + public void testSkewedDistribution() { + Random random = new Random(); + EquiDepthStreamHistogram histo = new EquiDepthStreamHistogram(5); + for (int i = 0; i < 100000; i++) { + int value = (int) Math.round(random.nextGaussian() * 25 + 100); + histo.addValue(Bytes.toBytes(value)); + } + // our middle bucket should have a smaller length than the end buckets, + // since we have more values clustered in the middle + List buckets = histo.computeBuckets(); + Bucket first = buckets.get(0); + int firstLength = getLength(first); + Bucket last = buckets.get(4); + int lastLength = getLength(last); + Bucket middle = buckets.get(2); + int middleLength = getLength(middle); + assertTrue(firstLength - middleLength > 25); + assertTrue(lastLength - middleLength > 25); + } + + private int getLength(Bucket last) { + return Math.abs( + Bytes.toInt(last.getLeftBoundInclusive()) - Bytes.toInt(last.getRightBoundExclusive())); + } +}