accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch 1.7 updated: ACCUMULO-4669 Use windowed statistics in RFile
Date Fri, 01 Dec 2017 02:12:36 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1.7
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.7 by this push:
     new c28e11c  ACCUMULO-4669 Use windowed statistics in RFile
c28e11c is described below

commit c28e11c1079d2e1c6d68078b3328016569138fed
Author: Keith Turner <kturner@apache.org>
AuthorDate: Thu Nov 30 20:21:13 2017 -0500

    ACCUMULO-4669 Use windowed statistics in RFile
---
 .../org/apache/accumulo/core/file/rfile/RFile.java |   9 +-
 .../accumulo/core/file/rfile/RollingStats.java     | 113 +++++++++++++
 .../core/file/rfile/RolllingStatsTest.java         | 178 +++++++++++++++++++++
 3 files changed, 296 insertions(+), 4 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index d5779ce..fe1c832 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -65,7 +65,6 @@ import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityG
 import org.apache.accumulo.core.util.LocalityGroupUtil;
 import org.apache.accumulo.core.util.MutableByteSequence;
 import org.apache.commons.lang.mutable.MutableLong;
-import org.apache.commons.math.stat.descriptive.SummaryStatistics;
 import org.apache.hadoop.io.Writable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -310,7 +309,8 @@ public class RFile {
 
     private HashSet<ByteSequence> previousColumnFamilies;
 
-    private SummaryStatistics keyLenStats = new SummaryStatistics();
+    // Use windowed stats to fix ACCUMULO-4669
+    private RollingStats keyLenStats = new RollingStats(2017);
     private double avergageKeySize = 0;
 
     public Writer(BlockFileWriter bfw, int blockSize) throws IOException {
@@ -370,8 +370,9 @@ public class RFile {
     }
 
     private boolean isGiantKey(Key k) {
-      // consider a key thats more than 3 standard deviations from previously seen key sizes
as giant
-      return k.getSize() > keyLenStats.getMean() + keyLenStats.getStandardDeviation()
* 3;
+      double mean = keyLenStats.getMean();
+      double stddev = keyLenStats.getStandardDeviation();
+      return k.getSize() > mean + Math.max(9 * mean, 4 * stddev);
     }
 
     @Override
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RollingStats.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RollingStats.java
new file mode 100644
index 0000000..d223574
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RollingStats.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import org.apache.commons.math.stat.StatUtils;
+
+/**
+ * This class supports efficient window statistics. Apache commons math3 has a class called
DescriptiveStatistics that supports windows. DescriptiveStatistics
+ * recomputes the statistics over the entire window each time its requested. In a test over
1,000,000 entries with a window size of 1019 that requested stats
+ * for each entry this class took ~50ms and DescriptiveStatistics took ~6,000ms.
+ *
+ * <p>
+ * This class may not be as accurate as DescriptiveStatistics. In unit test its within 1/1000
of DescriptiveStatistics.
+ */
+class RollingStats {
+  private int position;
+  private double window[];
+
+  private double average;
+  private double variance;
+  private double stddev;
+
+  // indicates if the window is full
+  private boolean windowFull;
+
+  private int recomputeCounter = 0;
+
+  RollingStats(int windowSize) {
+    this.windowFull = false;
+    this.position = 0;
+    this.window = new double[windowSize];
+  }
+
+  /**
+   * @see <a href= "http://jonisalonen.com/2014/efficient-and-accurate-rolling-standard-deviation/">Efficient
and accurate rolling standard deviation</a>
+   */
+  private void update(double newValue, double oldValue, int windowSize) {
+    double delta = newValue - oldValue;
+
+    double oldAverage = average;
+    average = average + delta / windowSize;
+    variance += delta * (newValue - average + oldValue - oldAverage) / (windowSize - 1);
+    stddev = Math.sqrt(variance);
+  }
+
+  void addValue(long stat) {
+
+    double old = window[position];
+    window[position] = stat;
+    position++;
+    recomputeCounter++;
+
+    if (windowFull) {
+      update(stat, old, window.length);
+    } else if (position == window.length) {
+      computeStats(window.length);
+      windowFull = true;
+    }
+
+    if (position == window.length) {
+      position = 0;
+    }
+  }
+
+  private void computeStats(int len) {
+    average = StatUtils.mean(window, 0, len);
+    variance = StatUtils.variance(window, average, 0, len);
+    stddev = Math.sqrt(variance);
+    recomputeCounter = 0;
+  }
+
+  private void computeStats() {
+    if (windowFull) {
+      if (variance < 0 || recomputeCounter >= 100) {
+        // incremental computation drifts over time, so periodically force a recompute
+        computeStats(window.length);
+      }
+    } else if (recomputeCounter > 0) {
+      computeStats(position);
+    }
+  }
+
+  double getMean() {
+    computeStats();
+    return average;
+  }
+
+  double getVariance() {
+    computeStats();
+    return variance;
+  }
+
+  double getStandardDeviation() {
+    computeStats();
+    return stddev;
+  }
+
+  boolean isWindowFull() {
+    return windowFull;
+  }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RolllingStatsTest.java
b/core/src/test/java/org/apache/accumulo/core/file/rfile/RolllingStatsTest.java
new file mode 100644
index 0000000..28e1b20
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RolllingStatsTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.file.rfile;
+
+import java.util.Random;
+
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.math.DoubleMath;
+
+public class RolllingStatsTest {
+
+  private static final double TOLERANCE = 1.0 / 1000;
+
+  private static void assertFuzzyEquals(double expected, double actual) {
+    Assert.assertTrue(String.format("expected: %f, actual: %f diff: %f", expected, actual,
Math.abs(expected - actual)),
+        DoubleMath.fuzzyEquals(expected, actual, TOLERANCE));
+  }
+
+  private static void checkAgreement(DescriptiveStatistics ds, RollingStats rs) {
+    // getting stats from ds is expensive, so do it once... otherwise unit test takes 11
sec
+    // instead of 5 secs
+    double expMean = ds.getMean();
+    double expVar = ds.getVariance();
+    double expStdDev = Math.sqrt(expVar);
+
+    assertFuzzyEquals(expMean, rs.getMean());
+    assertFuzzyEquals(expVar, rs.getVariance());
+    assertFuzzyEquals(expStdDev, rs.getStandardDeviation());
+
+    Assert.assertTrue(expMean >= 0);
+    Assert.assertTrue(rs.getMean() >= 0);
+    Assert.assertTrue(expVar >= 0);
+    Assert.assertTrue(rs.getVariance() >= 0);
+    Assert.assertTrue(expStdDev >= 0);
+    Assert.assertTrue(rs.getStandardDeviation() >= 0);
+  }
+
+  private static class StatTester {
+
+    Random rand = new Random(42);
+    private DescriptiveStatistics ds;
+    private RollingStats rs;
+    private RollingStats rsp;
+
+    StatTester(int windowSize) {
+      ds = new DescriptiveStatistics();
+      ds.setWindowSize(windowSize);
+
+      rs = new RollingStats(windowSize);
+      rsp = new RollingStats(windowSize);
+    }
+
+    void addValue(long v) {
+      ds.addValue(v);
+      rs.addValue(v);
+      rsp.addValue(v);
+      checkAgreement(ds, rs);
+
+      if (rand.nextDouble() < 0.001) {
+        checkAgreement(ds, rsp);
+      }
+    }
+
+    void check() {
+      checkAgreement(ds, rsp);
+    }
+  }
+
+  @Test
+  public void testFewSizes() {
+    StatTester st = new StatTester(1019);
+    int[] keySizes = new int[] {103, 113, 123, 2345};
+    Random rand = new Random(42);
+    for (int i = 0; i < 10000; i++) {
+      st.addValue(keySizes[rand.nextInt(keySizes.length)]);
+    }
+    st.check();
+  }
+
+  @Test
+  public void testConstant() {
+
+    StatTester st = new StatTester(1019);
+
+    for (int i = 0; i < 10000; i++) {
+      st.addValue(111);
+    }
+
+    st.check();
+  }
+
+  @Test
+  public void testUniformIncreasing() {
+
+    for (int windowSize : new int[] {10, 13, 20, 100, 500}) {
+
+      StatTester st = new StatTester(windowSize);
+
+      Random rand = new Random();
+
+      for (int i = 0; i < 1000; i++) {
+        int v = 200 + rand.nextInt(50);
+
+        st.addValue(v);
+      }
+
+      st.check();
+    }
+  }
+
+  @Test
+  public void testSlowIncreases() {
+    // number of keys with the same len
+    int len = 100;
+
+    StatTester st = new StatTester(1019);
+
+    for (int i = 0; i < 50; i++) {
+      for (int j = 0; j < 3000; j++) {
+        st.addValue(len);
+      }
+
+      len = (int) (len * 1.1);
+    }
+
+    st.check();
+  }
+
+  @Test
+  public void testSpikes() {
+
+    Random rand = new Random();
+
+    StatTester st = new StatTester(3017);
+
+    for (int i = 0; i < 13; i++) {
+
+      // write small keys
+      int numSmall = 1000 + rand.nextInt(1000);
+      for (int s = 0; s < numSmall; s++) {
+        int sks = 50 + rand.nextInt(100);
+        // simulate row with multiple cols
+        for (int c = 0; c < 3; c++) {
+          st.addValue(sks);
+        }
+      }
+
+      // write a few large keys
+      int numLarge = 1 + rand.nextInt(1);
+      for (int l = 0; l < numLarge; l++) {
+        int lks = 500000 + rand.nextInt(1000000);
+        for (int c = 0; c < 3; c++) {
+          st.addValue(lks);
+        }
+      }
+    }
+
+    st.check();
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@accumulo.apache.org" <commits@accumulo.apache.org>'].

Mime
View raw message