aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zma...@apache.org
Subject [25/37] aurora git commit: Import of Twitter Commons.
Date Tue, 25 Aug 2015 18:19:39 GMT
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/ApproximateHistogram.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/stats/ApproximateHistogram.java b/commons/src/main/java/com/twitter/common/stats/ApproximateHistogram.java
new file mode 100644
index 0000000..75a8449
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/stats/ApproximateHistogram.java
@@ -0,0 +1,566 @@
+// =================================================================================================
+// Copyright 2013 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+import java.util.Arrays;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+
+/**
+ * <p>
+ * Implements Histogram structure for computing approximate quantiles.
+ * The implementation is based on the following paper:
+ *
+ * <pre>
+ * [MP80]  Munro & Paterson, "Selection and Sorting with Limited Storage",
+ *         Theoretical Computer Science, Vol 12, p 315-323, 1980.
+ * </pre>
+ * </p>
+ * <p>
+ * You could read a detailed description of the same algorithm here:
+ *
+ * <pre>
+ * [MRL98] Manku, Rajagopalan & Lindsay, "Approximate Medians and other
+ *         Quantiles in One Pass and with Limited Memory", Proc. 1998 ACM
+ *         SIGMOD, Vol 27, No 2, p 426-435, June 1998.
+ * </pre>
+ * </p>
+ * <p>
+ * There's a good explanation of the algorithm in the Sawzall source code
+ * See: http://szl.googlecode.com/svn-history/r36/trunk/src/emitters/szlquantile.cc
+ * </p>
+ * Here's a schema of the tree:
+ * <pre>
+ *      [4]     level 3, weight=rootWeight=8
+ *       |
+ *      [3]     level 2, weight=4
+ *       |
+ *      [2]     level 1, weight=2
+ *     /   \
+ *   [0]   [1]  level 0, weight=1
+ * </pre>
+ * <p>
+ * {@code [i]} represents {@code buffer[i]}
+ * The depth of the tree is limited to a maximum value
+ * Every buffer has the same size
+ * </p>
+ * <p>
+ * We add element in {@code [0]} or {@code [1]}.
+ * When {@code [0]} and {@code [1]} are full, we collapse them, it generates a temporary buffer
+ * of weight 2, if {@code [2]} is empty, we put the collapsed buffer into {@code [2]} otherwise
+ * we collapse {@code [2]} with the temporary buffer and put it in {@code [3]} if it's empty and
+ * so on...
+ * </p>
+ */
+public final class ApproximateHistogram implements Histogram {
+  @VisibleForTesting
+  public static final Precision DEFAULT_PRECISION = new Precision(0.02, 100 * 1000);
+  @VisibleForTesting
+  public static final Amount<Long, Data> DEFAULT_MAX_MEMORY = Amount.of(12L, Data.KB);
+  @VisibleForTesting static final long ELEM_SIZE = 8; // sizeof long
+
+  // See above
+  @VisibleForTesting long[][] buffer;
+  @VisibleForTesting long count = 0L;
+  @VisibleForTesting int leafCount = 0; // number of elements in the bottom two leaves
+  @VisibleForTesting int currentTop = 1;
+  @VisibleForTesting int[] indices; // member for optimization reason
+  private boolean leavesSorted = true;
+  private int rootWeight = 1;
+  private long[][] bufferPool; // pool of 2 buffers (used for merging)
+  private int bufferSize;
+  private int maxDepth;
+
+  /**
+   * Private init method that is called only by constructors.
+   * All allocations are done in this method.
+   *
+   * @param bufSize size of each buffer
+   * @param depth maximum depth of the tree of buffers
+   */
+  @VisibleForTesting
+  void init(int bufSize, int depth) {
+    bufferSize = bufSize;
+    maxDepth = depth;
+    bufferPool = new long[2][bufferSize];
+    indices = new int[depth + 1];
+    buffer = new long[depth + 1][bufferSize];
+    // only allocate the first 2 buffers, lazily allocate the others.
+    allocate(0);
+    allocate(1);
+    Arrays.fill(buffer, 2, buffer.length, null);
+    clear();
+  }
+
+  @VisibleForTesting
+  ApproximateHistogram(int bufSize, int depth) {
+    init(bufSize, depth);
+  }
+
+  /**
+   * Constructor with precision constraint, it will allocated as much memory as require to match
+   * this precision constraint.
+   * @param precision the requested precision
+   */
+  public ApproximateHistogram(Precision precision) {
+    Preconditions.checkNotNull(precision);
+    int depth = computeDepth(precision.getEpsilon(), precision.getN());
+    int bufSize = computeBufferSize(depth, precision.getN());
+    init(bufSize, depth);
+  }
+
+  /**
+   * Constructor with memory constraint, it will find the best possible precision that satisfied
+   * the memory constraint.
+   * @param maxMemory the maximum amount of memory that the instance will take
+   */
+  public ApproximateHistogram(Amount<Long, Data> maxMemory, int expectedSize) {
+    Preconditions.checkNotNull(maxMemory);
+    Preconditions.checkArgument(1024 <= maxMemory.as(Data.BYTES),
+        "at least 1KB is required for an Histogram");
+
+    double epsilon = DEFAULT_PRECISION.getEpsilon();
+    int n = expectedSize;
+    int depth = computeDepth(epsilon, n);
+    int bufSize = computeBufferSize(depth, n);
+    long maxBytes = maxMemory.as(Data.BYTES);
+
+    // Increase precision if the maxMemory allow it, otherwise reduce precision. (by 5% steps)
+    boolean tooMuchMem = memoryUsage(bufSize, depth) > maxBytes;
+    double multiplier = tooMuchMem ? 1.05 : 0.95;
+    while((maxBytes < memoryUsage(bufSize, depth)) == tooMuchMem) {
+      epsilon *= multiplier;
+      if (epsilon < 0.00001) {
+        // for very high memory constraint increase N as well
+        n *= 10;
+        epsilon = DEFAULT_PRECISION.getEpsilon();
+      }
+      depth = computeDepth(epsilon, n);
+      bufSize = computeBufferSize(depth, n);
+    }
+    if (!tooMuchMem) {
+      // It's ok to consume less memory than the constraint
+      // but we never have to consume more!
+      depth = computeDepth(epsilon / multiplier, n);
+      bufSize = computeBufferSize(depth, n);
+    }
+
+    init(bufSize, depth);
+  }
+
+  /**
+   * Constructor with memory constraint.
+   * @see #ApproximateHistogram(Amount, int)
+   */
+  public ApproximateHistogram(Amount<Long, Data> maxMemory) {
+    this(maxMemory, DEFAULT_PRECISION.getN());
+  }
+
+  /**
+   * Default Constructor.
+   * @see #ApproximateHistogram(Amount)
+   */
+  public ApproximateHistogram() {
+    this(DEFAULT_MAX_MEMORY);
+  }
+
+  @Override
+  public synchronized void add(long x) {
+    // if the leaves of the tree are full, "collapse" recursively the tree
+    if (leafCount == 2 * bufferSize) {
+      Arrays.sort(buffer[0]);
+      Arrays.sort(buffer[1]);
+      recCollapse(buffer[0], 1);
+      leafCount = 0;
+    }
+
+    // Now we're sure there is space for adding x
+    if (leafCount < bufferSize) {
+      buffer[0][leafCount] = x;
+    } else {
+      buffer[1][leafCount - bufferSize] = x;
+    }
+    leafCount++;
+    count++;
+    leavesSorted = (leafCount == 1);
+  }
+
+  @Override
+  public synchronized long getQuantile(double q) {
+    Preconditions.checkArgument(0.0 <= q && q <= 1.0,
+        "quantile must be in the range 0.0 to 1.0 inclusive");
+    if (count == 0) {
+      return 0L;
+    }
+
+    // the two leaves are the only buffer that can be partially filled
+    int buf0Size = Math.min(bufferSize, leafCount);
+    int buf1Size = Math.max(0, leafCount - buf0Size);
+    long sum = 0;
+    long target = (long) Math.ceil(count * (1.0 - q));
+    int i;
+
+    if (! leavesSorted) {
+      Arrays.sort(buffer[0], 0, buf0Size);
+      Arrays.sort(buffer[1], 0, buf1Size);
+      leavesSorted = true;
+    }
+    Arrays.fill(indices, bufferSize - 1);
+    indices[0] = buf0Size - 1;
+    indices[1] = buf1Size - 1;
+
+    do {
+      i = biggest(indices);
+      indices[i]--;
+      sum += weight(i);
+    } while (sum < target);
+    return buffer[i][indices[i] + 1];
+  }
+
+  @Override
+  public synchronized long[] getQuantiles(double[] quantiles) {
+    return Histograms.extractQuantiles(this, quantiles);
+  }
+
+  @Override
+  public synchronized void clear() {
+    count = 0L;
+    leafCount = 0;
+    currentTop = 1;
+    rootWeight = 1;
+    leavesSorted = true;
+  }
+
+  /**
+   * MergedHistogram is a Wrapper on top of multiple histograms, it gives a view of all the
+   * underlying histograms as it was just one.
+   * Note: Should only be used for querying the underlying histograms.
+   */
+  private static class MergedHistogram implements Histogram {
+    private final ApproximateHistogram[] histograms;
+
+    private MergedHistogram(ApproximateHistogram[] histograms) {
+      this.histograms = histograms;
+    }
+
+    @Override
+    public void add(long x) {
+      /* Ignore, Shouldn't be used */
+      assert(false);
+    }
+
+    @Override
+    public void clear() {
+      /* Ignore, Shouldn't be used */
+      assert(false);
+    }
+
+    @Override
+    public long getQuantile(double quantile) {
+      Preconditions.checkArgument(0.0 <= quantile && quantile <= 1.0,
+          "quantile must be in the range 0.0 to 1.0 inclusive");
+
+      long count = initIndices();
+      if (count == 0) {
+        return 0L;
+      }
+
+      long sum = 0;
+      long target = (long) Math.ceil(count * (1.0 - quantile));
+      int iHist = -1;
+      int iBiggest = -1;
+      do {
+        long biggest = Long.MIN_VALUE;
+        for (int i = 0; i < histograms.length; i++) {
+          ApproximateHistogram hist = histograms[i];
+          int indexBiggest = hist.biggest(hist.indices);
+          if (indexBiggest >= 0) {
+            long value = hist.buffer[indexBiggest][hist.indices[indexBiggest]];
+            if (iBiggest == -1 || biggest <= value) {
+              iBiggest = indexBiggest;
+              biggest = value;
+              iHist = i;
+            }
+          }
+        }
+        histograms[iHist].indices[iBiggest]--;
+        sum += histograms[iHist].weight(iBiggest);
+      } while (sum < target);
+
+      ApproximateHistogram hist = histograms[iHist];
+      int i = hist.indices[iBiggest];
+      return hist.buffer[iBiggest][i + 1];
+    }
+
+    @Override
+    public synchronized long[] getQuantiles(double[] quantiles) {
+      return Histograms.extractQuantiles(this, quantiles);
+    }
+
+    /**
+     * Initialize the indices array for each Histogram and return the global count.
+     */
+    private long initIndices() {
+      long count = 0L;
+      for (int i = 0; i < histograms.length; i++) {
+        ApproximateHistogram h = histograms[i];
+        int[] indices = h.indices;
+        count += h.count;
+        int buf0Size = Math.min(h.bufferSize, h.leafCount);
+        int buf1Size = Math.max(0, h.leafCount - buf0Size);
+
+        if (! h.leavesSorted) {
+          Arrays.sort(h.buffer[0], 0, buf0Size);
+          Arrays.sort(h.buffer[1], 0, buf1Size);
+          h.leavesSorted = true;
+        }
+        Arrays.fill(indices, h.bufferSize - 1);
+        indices[0] = buf0Size - 1;
+        indices[1] = buf1Size - 1;
+      }
+      return count;
+    }
+  }
+
+  /**
+   * Return a MergedHistogram
+   * @param histograms array of histograms to merged together
+   * @return a new Histogram
+   */
+  public static Histogram merge(ApproximateHistogram[] histograms) {
+    return new MergedHistogram(histograms);
+  }
+
+  /**
+   * We compute the "smallest possible b" satisfying two inequalities:
+   *    1)   (b - 2) * (2 ^ (b - 2)) + 0.5 <= epsilon * N
+   *    2)   k * (2 ^ (b - 1)) >= N
+   *
+   * For an explanation of these inequalities, please read the Munro-Paterson or
+   * the Manku-Rajagopalan-Linday papers.
+   */
+  @VisibleForTesting static int computeDepth(double epsilon, long n) {
+    int b = 2;
+    while ((b - 2) * (1L << (b - 2)) + 0.5 <= epsilon * n) {
+      b += 1;
+    }
+    return b;
+  }
+
+  @VisibleForTesting static int computeBufferSize(int depth, long n) {
+    return (int) (n / (1L << (depth - 1)));
+  }
+
+  /**
+   * Return an estimation of the memory used by an instance.
+   * The size is due to:
+   * - a fix cost (76 bytes) for the class + fields
+   * - bufferPool: 16 + 2 * (16 + bufferSize * ELEM_SIZE)
+   * - indices: 16 + sizeof(Integer) * (depth + 1)
+   * - buffer: 16 + (depth + 1) * (16 + bufferSize * ELEM_SIZE)
+   *
+   * Note: This method is tested with unit test, it will break if you had new fields.
+   * @param bufferSize the size of a buffer
+   * @param depth the depth of the tree of buffer (depth + 1 buffers)
+   */
+  @VisibleForTesting
+  static long memoryUsage(int bufferSize, int depth) {
+    return 176 + (24 * depth) + (bufferSize * ELEM_SIZE * (depth + 3));
+  }
+
+  /**
+   * Return the level of the biggest element (using the indices array 'ids'
+   * to track which elements have been already returned). Every buffer has
+   * already been sorted at this point.
+   * @return the level of the biggest element or -1 if no element has been found
+   */
+  @VisibleForTesting
+  int biggest(final int[] ids) {
+    long biggest = Long.MIN_VALUE;
+    final int id0 = ids[0], id1 = ids[1];
+    int iBiggest = -1;
+
+    if (0 < leafCount && 0 <= id0) {
+      biggest = buffer[0][id0];
+      iBiggest = 0;
+    }
+    if (bufferSize < leafCount && 0 <= id1) {
+      long x = buffer[1][id1];
+      if (x > biggest) {
+        biggest = x;
+        iBiggest = 1;
+      }
+    }
+    for (int i = 2; i < currentTop + 1; i++) {
+      if (!isBufferEmpty(i) && 0 <= ids[i]) {
+        long x = buffer[i][ids[i]];
+        if (x > biggest) {
+          biggest = x;
+          iBiggest = i;
+        }
+      }
+    }
+    return iBiggest;
+  }
+
+
+  /**
+   * Based on the number of elements inserted we can easily know if a buffer
+   * is empty or not
+   */
+  @VisibleForTesting
+  boolean isBufferEmpty(int level) {
+    if (level == currentTop) {
+      return false; // root buffer (if present) is always full
+    } else {
+      long levelWeight = 1 << (level - 1);
+      return (((count - leafCount) / bufferSize) & levelWeight) == 0;
+    }
+  }
+
+  /**
+   * Return the weight of the level ie. 2^(i-1) except for the two tree
+   * leaves (weight=1) and for the root
+   */
+  private int weight(int level) {
+    if (level == 0) {
+      return 1;
+    } else if (level == maxDepth) {
+      return rootWeight;
+    } else {
+      return 1 << (level - 1);
+    }
+  }
+
+  private void allocate(int i) {
+    if (buffer[i] == null) {
+      buffer[i] = new long[bufferSize];
+    }
+  }
+
+  /**
+   * Recursively collapse the buffers of the tree.
+   * Upper buffers will be allocated on first access in this method.
+   */
+  private void recCollapse(long[] buf, int level) {
+    // if we reach the root, we can't add more buffer
+    if (level == maxDepth) {
+      // weight() return the weight of the root, in that case we need the
+      // weight of merge result
+      int mergeWeight = 1 << (level - 1);
+      int idx = level % 2;
+      long[] merged = bufferPool[idx];
+      long[] tmp = buffer[level];
+      collapse(buf, mergeWeight, buffer[level], rootWeight, merged);
+      buffer[level] = merged;
+      bufferPool[idx] = tmp;
+      rootWeight += mergeWeight;
+    } else {
+      allocate(level + 1); // lazy allocation (if needed)
+      if (level == currentTop) {
+        // if we reach the top, add a new buffer
+        collapse1(buf, buffer[level], buffer[level + 1]);
+        currentTop += 1;
+        rootWeight *= 2;
+      } else if (isBufferEmpty(level + 1)) {
+        // if the upper buffer is empty, use it
+        collapse1(buf, buffer[level], buffer[level + 1]);
+      } else {
+        // it the upper buffer isn't empty, collapse with it
+        long[] merged = bufferPool[level % 2];
+        collapse1(buf, buffer[level], merged);
+        recCollapse(merged, level + 1);
+      }
+    }
+  }
+
+  /**
+   * collapse two sorted Arrays of different weight
+   * ex: [2,5,7] weight 2 and [3,8,9] weight 3
+   *     weight x array + concat = [2,2,5,5,7,7,3,3,3,8,8,8,9,9,9]
+   *     sort = [2,2,3,3,3,5,5,7,7,8,8,8,9,9,9]
+   *     select every nth elems = [3,7,9]  (n = sum weight / 2)
+   */
+  @VisibleForTesting
+  static void collapse(
+    long[] left,
+    int leftWeight,
+    long[] right,
+    int rightWeight,
+    long[] output) {
+
+    int totalWeight = leftWeight + rightWeight;
+    int halfTotalWeight = (totalWeight / 2) - 1;
+    int i = 0, j = 0, k = 0, cnt = 0;
+
+    int weight;
+    long smallest;
+
+    while (i < left.length || j < right.length) {
+      if (i < left.length && (j == right.length || left[i] < right[j])) {
+        smallest = left[i];
+        weight = leftWeight;
+        i++;
+      } else {
+        smallest = right[j];
+        weight = rightWeight;
+        j++;
+      }
+
+      int cur = (cnt + halfTotalWeight) / totalWeight;
+      cnt += weight;
+      int next = (cnt + halfTotalWeight) / totalWeight;
+
+      for(; cur < next; cur++) {
+        output[k] = smallest;
+        k++;
+      }
+    }
+  }
+
+/**
+ * Optimized version of collapse for collapsing two array of the same weight
+ * (which is what we want most of the time)
+ */
+  private static void collapse1(
+    long[] left,
+    long[] right,
+    long[] output) {
+
+    int i = 0, j = 0, k = 0, cnt = 0;
+    long smallest;
+
+    while (i < left.length || j < right.length) {
+      if (i < left.length && (j == right.length || left[i] < right[j])) {
+        smallest = left[i];
+        i++;
+      } else {
+        smallest = right[j];
+        j++;
+      }
+      if (cnt % 2 == 1) {
+        output[k] = smallest;
+        k++;
+      }
+      cnt++;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/CounterMap.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/stats/CounterMap.java b/commons/src/main/java/com/twitter/common/stats/CounterMap.java
new file mode 100644
index 0000000..fb4d7eb
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/stats/CounterMap.java
@@ -0,0 +1,141 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Logger;
+
+/**
+ * A map from a key type to integers.  This simplifies the process of storing counters for multiple
+ * values of the same type.
+ */
+public class CounterMap <K> implements Iterable<Map.Entry<K, Integer>>, Cloneable {
+  private final Map<K, Integer> map = Maps.newHashMap();
+
+  private static Logger log = Logger.getLogger(CounterMap.class.getName());
+
+  /**
+   * Increments the counter value associated with {@code key}, and returns the new value.
+   *
+   * @param key The key to increment
+   * @return The incremented value.
+   */
+  public int incrementAndGet(K key) {
+    return incrementAndGet(key, 1);
+  }
+
+  /**
+   * Increments the value associated with {@code key} by {@code value}, returning the new value.
+   *
+   * @param key The key to increment
+   * @return The incremented value.
+   */
+  public int incrementAndGet(K key, int count) {
+    Integer value = map.get(key);
+    if (value == null) {
+      value = 0;
+    }
+    int newValue = count + value;
+    map.put(key, newValue);
+    return newValue;
+  }
+
+  /**
+   * Gets the value associated with a key.
+   *
+   * @param key The key to look up.
+   * @return The counter value stored for {@code key}, or 0 if no mapping exists.
+   */
+  public int get(K key) {
+    if (!map.containsKey(key)) {
+      return 0;
+    }
+
+    return map.get(key);
+  }
+
+  /**
+   * Assigns a value to a key.
+   *
+   * @param key The key to assign a value to.
+   * @param newValue The value to assign.
+   */
+  public void set(K key, int newValue) {
+    Preconditions.checkNotNull(key);
+    map.put(key, newValue);
+  }
+
+  /**
+   * Resets the value for {@code key}.  This will remove the key from the counter.
+   *
+   * @param key The key to reset.
+   */
+  public void reset(K key) {
+    map.remove(key);
+  }
+
+  /**
+   * Gets the number of entries stored in the map.
+   *
+   * @return The size of the map.
+   */
+  public int size() {
+    return map.size();
+  }
+
+  /**
+   * Gets an iterator for the mapped values.
+   *
+   * @return Iterator for mapped values.
+   */
+  public Iterator<Map.Entry<K, Integer>> iterator() {
+    return map.entrySet().iterator();
+  }
+
+  public Collection<Integer> values() {
+    return map.values();
+  }
+
+  public Set<K> keySet() {
+    return map.keySet();
+  }
+
+  public String toString() {
+    StringBuilder strVal = new StringBuilder();
+    for (Map.Entry<K, Integer> entry : this) {
+      strVal.append(entry.getKey().toString()).append(": ").append(entry.getValue()).append('\n');
+    }
+    return strVal.toString();
+  }
+
+  public Map<K, Integer> toMap() {
+    return map;
+  }
+
+  @Override
+  public CounterMap<K> clone() {
+    CounterMap<K> newInstance = new CounterMap<K>();
+    newInstance.map.putAll(map);
+    return newInstance;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/CounterMapWithTopKey.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/stats/CounterMapWithTopKey.java b/commons/src/main/java/com/twitter/common/stats/CounterMapWithTopKey.java
new file mode 100644
index 0000000..547a1ac
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/stats/CounterMapWithTopKey.java
@@ -0,0 +1,92 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+import java.util.Map;
+
+/**
+ * Same as CounterMap<K>, but also keeps track of the item with the highest count.
+ */
+public class CounterMapWithTopKey<K> extends CounterMap<K> {
+
+  private K mostCommonKey = null;
+
+  /**
+   * Updates the most common key, if needed.
+   *
+   * @param key The key to check.
+   * @param count The count for the key.
+   * @return The count.
+   */
+  private int updateMostCommon(K key, int count) {
+    if (count > get(mostCommonKey)) {
+      mostCommonKey = key;
+    }
+    return count;
+  }
+
+  /**
+   * Increments the counter value associated with {@code key}, and returns the new value.
+   *
+   * @param key The key to increment
+   * @return The incremented value.
+   */
+  @Override
+  public int incrementAndGet(K key) {
+    return updateMostCommon(key, super.incrementAndGet(key));
+  }
+
+  /**
+   * Assigns a value to a key.
+   *
+   * @param key The key to assign a value to.
+   * @param newValue The value to assign.
+   */
+  @Override
+  public void set(K key, int newValue) {
+    super.set(key, updateMostCommon(key, newValue));
+  }
+
+  /**
+   * Resets the value for {@code key}.  This will simply set the stored value to 0.
+   * The most common key is updated by scanning the entire map.
+   *
+   * @param key The key to reset.
+   */
+  @Override
+  public void reset(K key) {
+    super.reset(key);
+    for (Map.Entry<K, Integer> entry : this) {
+      updateMostCommon(entry.getKey(), entry.getValue());
+    }
+  }
+
+  /**
+   *
+   * @return The key with the highest count in the map. If multiple keys have this count, return
+   * an arbitrary one.
+   */
+  public K getMostCommonKey() {
+    return mostCommonKey;
+  }
+
+  @Override
+  public String toString() {
+    return new StringBuilder(super.toString()).append(String.format("Most common key: %s\n",
+        mostCommonKey.toString())).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/Elapsed.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/stats/Elapsed.java b/commons/src/main/java/com/twitter/common/stats/Elapsed.java
new file mode 100644
index 0000000..dfe6c18
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/stats/Elapsed.java
@@ -0,0 +1,70 @@
+package com.twitter.common.stats;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+/**
+ * A stat that exports the amount of time since it was last reset.
+ *
+ * @author William Farner
+ */
+public class Elapsed {
+
+  private final Ticker ticker;
+  private final AtomicLong lastEventNs = new AtomicLong();
+
+  /**
+   * Calls {@link #Elapsed(String, Time)} using a default granularity of nanoseconds.
+   *
+   * @param name Name of the stat to export.
+   */
+  public Elapsed(String name) {
+    this(name, Time.NANOSECONDS);
+  }
+
+  /**
+   * Equivalent to calling {@link #Elapsed(String, Time, Ticker)} passing {@code name},
+   * {@code granularity} and {@link com.google.common.base.Ticker#systemTicker()}.
+   * <br/>
+   * @param name Name of the stat to export.
+   * @param granularity Time unit granularity to export.
+   */
+  public Elapsed(String name, Time granularity) {
+    this(name, granularity, Ticker.systemTicker());
+  }
+
+   /**
+    * Creates and exports a new stat that maintains the difference between the tick time
+    * and the time since it was last reset.  Upon export, the counter will act as though it were just
+    * reset.
+    * <br/>
+    * @param name Name of stat to export
+    * @param granularity Time unit granularity to export.
+    * @param ticker Ticker implementation
+    */
+  public Elapsed(String name, final Time granularity, final Ticker ticker) {
+    MorePreconditions.checkNotBlank(name);
+    Preconditions.checkNotNull(granularity);
+    this.ticker = Preconditions.checkNotNull(ticker);
+
+    reset();
+
+    Stats.export(new StatImpl<Long>(name) {
+      @Override public Long read() {
+        return Amount.of(ticker.read() - lastEventNs.get(), Time.NANOSECONDS).as(granularity);
+      }
+    });
+  }
+
+  public void reset() {
+    lastEventNs.set(ticker.read());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/Entropy.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/stats/Entropy.java b/commons/src/main/java/com/twitter/common/stats/Entropy.java
new file mode 100644
index 0000000..1ae9963
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/stats/Entropy.java
@@ -0,0 +1,54 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Calculate the entropy of a discrete distribution of <T>.
+ *
+ * @author Gilad Mishne
+ */
+public class Entropy<T> {
+  private final CounterMap<T> counts = new CounterMap<T>();
+  private int total = 0;
+
+  private static double Log2(double n) {
+    return Math.log(n) / Math.log(2);
+  }
+
+  public Entropy(Iterable<T> elements) {
+    Preconditions.checkNotNull(elements);
+    for (T element : elements) {
+      counts.incrementAndGet(element);
+      total++;
+    }
+  }
+
+  public double entropy() {
+    double entropy = 0;
+    for (int count: counts.values()) {
+      double prob = (double) count / total;
+      entropy -= prob * Log2(prob);
+    }
+    return entropy;
+  }
+
+  public double perplexity() {
+    return Math.pow(2, entropy());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/Histogram.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/stats/Histogram.java b/commons/src/main/java/com/twitter/common/stats/Histogram.java
new file mode 100644
index 0000000..30e69d5
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/stats/Histogram.java
@@ -0,0 +1,46 @@
+// =================================================================================================
+// Copyright 2013 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+/**
+ * An interface for Histogram
+ */
+public interface Histogram {
+
+  /**
+   * Add an entry into the histogram.
+   * @param x the value to insert.
+   */
+  void add(long x);
+
+  /**
+   * Clear the histogram.
+   */
+  void clear();
+
+  /**
+   * Return the current quantile of the histogram.
+   * @param quantile value to compute.
+   */
+  long getQuantile(double quantile);
+
+  /**
+   * Return the quantiles of the histogram.
+   * @param quantiles array of values to compute.
+   */
+  long[] getQuantiles(double[] quantiles);
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/Histograms.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/stats/Histograms.java b/commons/src/main/java/com/twitter/common/stats/Histograms.java
new file mode 100644
index 0000000..f45a1c7
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/stats/Histograms.java
@@ -0,0 +1,26 @@
+package com.twitter.common.stats;
+
+/**
+ * Helper class containing only static methods
+ */
+public final class Histograms {
+
+  private Histograms() {
+    /* Disable */
+  }
+
+  /**
+   * Helper method that return an array of quantiles
+   * @param h the histogram to query
+   * @param quantiles an array of double representing the quantiles
+   * @return the array of computed quantiles
+   */
+  public static long[] extractQuantiles(Histogram h, double[] quantiles) {
+    long[] results = new long[quantiles.length];
+    for (int i = 0; i < results.length; i++) {
+      double q = quantiles[i];
+      results[i] = h.getQuantile(q);
+    }
+    return results;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/JvmStats.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/stats/JvmStats.java b/commons/src/main/java/com/twitter/common/stats/JvmStats.java
new file mode 100644
index 0000000..610409b
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/stats/JvmStats.java
@@ -0,0 +1,243 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+import java.lang.management.ClassLoadingMXBean;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.management.RuntimeMXBean;
+import java.lang.management.ThreadMXBean;
+import java.util.Map;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+
+import com.google.common.collect.Iterables;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+import com.twitter.common.quantity.Time;
+
+/**
+ * Convenience class to export statistics about the JVM.
+ */
+public class JvmStats {
+
+  private static final long BYTES_PER_MB = Amount.of(1L, Data.MB).as(Data.BYTES);
+  private static final double SECS_PER_NANO =
+      ((double) 1) / Amount.of(1L, Time.SECONDS).as(Time.NANOSECONDS);
+
+  private JvmStats() {
+    // Utility class.
+  }
+
+  /**
+   * Exports stats related to the JVM and runtime environment.
+   */
+  public static void export() {
+    final OperatingSystemMXBean osMbean = ManagementFactory.getOperatingSystemMXBean();
+    if (osMbean instanceof com.sun.management.OperatingSystemMXBean) {
+      final com.sun.management.OperatingSystemMXBean sunOsMbean =
+          (com.sun.management.OperatingSystemMXBean) osMbean;
+
+      Stats.exportAll(
+          ImmutableList.<Stat<? extends Number>>builder()
+          .add(new StatImpl<Long>("system_free_physical_memory_mb") {
+            @Override public Long read() {
+              return sunOsMbean.getFreePhysicalMemorySize() / BYTES_PER_MB;
+            }
+          })
+          .add(new StatImpl<Long>("system_free_swap_mb") {
+            @Override public Long read() {
+              return sunOsMbean.getFreeSwapSpaceSize() / BYTES_PER_MB;
+            }
+          })
+          .add(
+          Rate.of(
+              new StatImpl<Long>("process_cpu_time_nanos") {
+                @Override public Long read() {
+                  return sunOsMbean.getProcessCpuTime();
+                }
+              }).withName("process_cpu_cores_utilized").withScaleFactor(SECS_PER_NANO).build())
+          .build());
+    }
+    if (osMbean instanceof com.sun.management.UnixOperatingSystemMXBean) {
+      final com.sun.management.UnixOperatingSystemMXBean unixOsMbean =
+          (com.sun.management.UnixOperatingSystemMXBean) osMbean;
+
+      Stats.exportAll(ImmutableList.<Stat<? extends Number>>builder()
+          .add(new StatImpl<Long>("process_max_fd_count") {
+            @Override public Long read() { return unixOsMbean.getMaxFileDescriptorCount(); }
+          }).add(new StatImpl<Long>("process_open_fd_count") {
+            @Override public Long read() { return unixOsMbean.getOpenFileDescriptorCount(); }
+          }).build());
+    }
+
+    final Runtime runtime = Runtime.getRuntime();
+    final ClassLoadingMXBean classLoadingBean = ManagementFactory.getClassLoadingMXBean();
+    final MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
+    final ThreadMXBean threads = ManagementFactory.getThreadMXBean();
+    final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
+
+    Stats.exportAll(ImmutableList.<Stat<? extends Number>>builder()
+      .add(new StatImpl<Long>("jvm_time_ms") {
+        @Override public Long read() { return System.currentTimeMillis(); }
+      })
+      .add(new StatImpl<Integer>("jvm_available_processors") {
+        @Override public Integer read() { return runtime.availableProcessors(); }
+      })
+      .add(new StatImpl<Long>("jvm_memory_free_mb") {
+        @Override public Long read() { return runtime.freeMemory() / BYTES_PER_MB; }
+      })
+      .add(new StatImpl<Long>("jvm_memory_max_mb") {
+        @Override public Long read() { return runtime.maxMemory() / BYTES_PER_MB; }
+      })
+      .add(new StatImpl<Long>("jvm_memory_mb_total") {
+        @Override public Long read() { return runtime.totalMemory() / BYTES_PER_MB; }
+      })
+      .add(new StatImpl<Integer>("jvm_class_loaded_count") {
+        @Override public Integer read() { return classLoadingBean.getLoadedClassCount(); }
+      })
+      .add(new StatImpl<Long>("jvm_class_total_loaded_count") {
+        @Override public Long read() { return classLoadingBean.getTotalLoadedClassCount(); }
+      })
+      .add(new StatImpl<Long>("jvm_class_unloaded_count") {
+        @Override public Long read() { return classLoadingBean.getUnloadedClassCount(); }
+      })
+      .add(new StatImpl<Long>("jvm_gc_collection_time_ms") {
+        @Override public Long read() {
+          long collectionTimeMs = 0;
+          for (GarbageCollectorMXBean bean : ManagementFactory.getGarbageCollectorMXBeans()) {
+            collectionTimeMs += bean.getCollectionTime();
+          }
+          return collectionTimeMs;
+        }
+      })
+      .add(new StatImpl<Long>("jvm_gc_collection_count") {
+        @Override public Long read() {
+          long collections = 0;
+          for (GarbageCollectorMXBean bean : ManagementFactory.getGarbageCollectorMXBeans()) {
+            collections += bean.getCollectionCount();
+          }
+          return collections;
+        }
+      })
+      .add(new StatImpl<Long>("jvm_memory_heap_mb_used") {
+        @Override public Long read() {
+          return memoryBean.getHeapMemoryUsage().getUsed() / BYTES_PER_MB;
+        }
+      })
+      .add(new StatImpl<Long>("jvm_memory_heap_mb_committed") {
+        @Override public Long read() {
+          return memoryBean.getHeapMemoryUsage().getCommitted() / BYTES_PER_MB;
+        }
+      })
+      .add(new StatImpl<Long>("jvm_memory_heap_mb_max") {
+        @Override public Long read() {
+          return memoryBean.getHeapMemoryUsage().getMax() / BYTES_PER_MB;
+        }
+      })
+      .add(new StatImpl<Long>("jvm_memory_non_heap_mb_used") {
+        @Override public Long read() {
+          return memoryBean.getNonHeapMemoryUsage().getUsed() / BYTES_PER_MB;
+        }
+      })
+      .add(new StatImpl<Long>("jvm_memory_non_heap_mb_committed") {
+        @Override public Long read() {
+          return memoryBean.getNonHeapMemoryUsage().getCommitted() / BYTES_PER_MB;
+        }
+      })
+      .add(new StatImpl<Long>("jvm_memory_non_heap_mb_max") {
+        @Override public Long read() {
+          return memoryBean.getNonHeapMemoryUsage().getMax() / BYTES_PER_MB;
+        }
+      })
+      .add(new StatImpl<Long>("jvm_uptime_secs") {
+        @Override public Long read() { return runtimeMXBean.getUptime() / 1000; }
+      })
+      .add(new StatImpl<Double>("system_load_avg") {
+        @Override public Double read() { return osMbean.getSystemLoadAverage(); }
+      })
+      .add(new StatImpl<Integer>("jvm_threads_peak") {
+        @Override public Integer read() { return threads.getPeakThreadCount(); }
+      })
+      .add(new StatImpl<Long>("jvm_threads_started") {
+        @Override public Long read() { return threads.getTotalStartedThreadCount(); }
+      })
+      .add(new StatImpl<Integer>("jvm_threads_daemon") {
+        @Override public Integer read() { return threads.getDaemonThreadCount(); }
+      })
+      .add(new StatImpl<Integer>("jvm_threads_active") {
+        @Override public Integer read() { return threads.getThreadCount(); }
+      })
+      .build());
+
+    // Export per memory pool gc time and cycle count like Ostrich
+    // This is based on code in Bridcage: https://cgit.twitter.biz/birdcage/tree/ \
+    // ostrich/src/main/scala/com/twitter/ostrich/stats/StatsCollection.scala
+    Stats.exportAll(Iterables.transform(ManagementFactory.getGarbageCollectorMXBeans(),
+        new Function<GarbageCollectorMXBean, Stat<? extends Number>>(){
+          @Override
+          public Stat<? extends Number> apply(final GarbageCollectorMXBean gcMXBean) {
+            return new StatImpl<Long>(
+                "jvm_gc_" + Stats.normalizeName(gcMXBean.getName()) + "_collection_count") {
+              @Override public Long read() {
+                return gcMXBean.getCollectionCount();
+              }
+            };
+          }
+        }
+    ));
+
+    Stats.exportAll(Iterables.transform(ManagementFactory.getGarbageCollectorMXBeans(),
+        new Function<GarbageCollectorMXBean, Stat<? extends Number>>(){
+          @Override
+          public Stat<? extends Number> apply(final GarbageCollectorMXBean gcMXBean) {
+            return new StatImpl<Long>(
+                "jvm_gc_" + Stats.normalizeName(gcMXBean.getName()) + "_collection_time_ms") {
+              @Override public Long read() {
+                return gcMXBean.getCollectionTime();
+              }
+            };
+          }
+        }
+    ));
+
+    Stats.exportString(
+        new StatImpl<String>("jvm_input_arguments") {
+          @Override public String read() {
+            return runtimeMXBean.getInputArguments().toString();
+          }
+        }
+    );
+
+    for (final String property : System.getProperties().stringPropertyNames()) {
+      Stats.exportString(
+          new StatImpl<String>("jvm_prop_" + Stats.normalizeName(property)) {
+            @Override public String read() { return System.getProperty(property); }
+          });
+    }
+
+    for (final Map.Entry<String, String> environmentVariable : System.getenv().entrySet()) {
+      Stats.exportString(
+          new StatImpl<String>("system_env_" + Stats.normalizeName(environmentVariable.getKey())) {
+            @Override public String read() { return environmentVariable.getValue(); }
+          });
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/MovingAverage.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/stats/MovingAverage.java b/commons/src/main/java/com/twitter/common/stats/MovingAverage.java
new file mode 100644
index 0000000..7ab4302
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/stats/MovingAverage.java
@@ -0,0 +1,71 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Function to compute the moving average of a time series.
+ *
+ * @author William Farner
+ */
+public class MovingAverage<T extends Number> extends SampledStat<Double> {
+
+  private static final int DEFAULT_WINDOW = 10;
+  private final Stat<T> input;
+
+  private final LinkedBlockingDeque<T> samples;
+  private double sampleSum = 0;
+
+  private MovingAverage(String name, Stat<T> input, int windowSize) {
+    super(name, 0d);
+    Preconditions.checkArgument(windowSize > 1);
+
+    this.input = Preconditions.checkNotNull(input);
+    this.samples = new LinkedBlockingDeque<T>(windowSize);
+    Stats.export(input);
+  }
+
+  public static <T extends Number> MovingAverage<T> of(Stat<T> input) {
+    return MovingAverage.of(input, DEFAULT_WINDOW);
+  }
+
+  public static <T extends Number> MovingAverage<T> of(Stat<T> input, int windowSize) {
+    return MovingAverage.of(String.format("%s_avg", input.getName()), input, windowSize);
+  }
+
+  public static <T extends Number> MovingAverage<T> of(String name, Stat<T> input,
+      int windowSize) {
+    return new MovingAverage<T>(name, input, windowSize);
+  }
+
+  @Override
+  public Double doSample() {
+    T sample = input.read();
+
+    if (samples.remainingCapacity() == 0) {
+      sampleSum -= samples.removeLast().doubleValue();
+    }
+
+    samples.addFirst(sample);
+    sampleSum += sample.doubleValue();
+
+    return sampleSum / samples.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/MovingWindowDelta.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/stats/MovingWindowDelta.java b/commons/src/main/java/com/twitter/common/stats/MovingWindowDelta.java
new file mode 100644
index 0000000..2319128
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/stats/MovingWindowDelta.java
@@ -0,0 +1,100 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.twitter.common.base.MorePreconditions;
+
+
+/**
+ * Delta over the most recent k sample periods.
+ *
+ * If you use this class with a counter, you can get the cumulation of counts in a sliding window.
+ *
+ * One sample period is the time in between doSample() calls.
+ *
+ * @author Feng Zhuge
+ */
+public class MovingWindowDelta<T extends Number> extends SampledStat<Long> {
+  private static final int DEFAULT_WINDOW_SIZE = 60;
+  private final LinkedBlockingDeque<Long> deltaSeries;
+  private final Supplier<T> inputAccessor;
+  long sumDelta = 0l;
+  long lastInput = 0l;
+
+  private MovingWindowDelta(String name, Supplier<T> inputAccessor, int windowSize) {
+    super(name, 0l);
+
+    Preconditions.checkArgument(windowSize >= 1);
+    Preconditions.checkNotNull(inputAccessor);
+    MorePreconditions.checkNotBlank(name);
+
+    deltaSeries = new LinkedBlockingDeque<Long>(windowSize);
+    this.inputAccessor = inputAccessor;
+
+    Stats.export(this);
+  }
+
+  /**
+   * Create a new MovingWindowDelta instance.
+   *
+   * @param name The name of the value to be tracked.
+   * @param inputAccessor The accessor of the value.
+   * @param windowSize How many sample periods shall we use to calculate delta.
+   * @param <T> The type of the value.
+   * @return The created MovingWindowSum instance.
+   */
+  public static <T extends Number> MovingWindowDelta<T> of(
+      String name, Supplier<T> inputAccessor, int windowSize) {
+    return new MovingWindowDelta<T>(name, inputAccessor, windowSize);
+  }
+
+  /**
+   * Create a new MovingWindowDelta instance using the default window size (currently 60).
+   *
+   * @param name The name of the value to be tracked.
+   * @param inputAccessor The accessor of the value.
+   * @param <T> The type of the value.
+   * @return The created MovingWindowSum instance.
+   */
+  public static <T extends Number> MovingWindowDelta<T> of(String name, Supplier<T> inputAccessor) {
+    return of(name, inputAccessor, DEFAULT_WINDOW_SIZE);
+  }
+
+  @Override
+  public Long doSample() {
+    long lastDelta = 0l;
+    if (deltaSeries.remainingCapacity() == 0) {
+      lastDelta = deltaSeries.removeFirst();
+    }
+
+    long newInput = inputAccessor.get().longValue();
+    long newDelta = newInput - lastInput;
+    lastInput = newInput;
+
+    deltaSeries.addLast(newDelta);
+
+    sumDelta += newDelta - lastDelta;
+
+    return sumDelta;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/NumericStatExporter.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/stats/NumericStatExporter.java b/commons/src/main/java/com/twitter/common/stats/NumericStatExporter.java
new file mode 100644
index 0000000..c34d1ae
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/stats/NumericStatExporter.java
@@ -0,0 +1,128 @@
+// =================================================================================================
+// Copyright 2012 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.Command;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Stat exporter that extracts numeric {@link Stat}s from the {@link Stats} system, and exports them
+ * via a caller-defined sink.
+ *
+ * @author William Farner
+ */
+public class NumericStatExporter {
+
+  private static final Logger LOG = Logger.getLogger(NumericStatExporter.class.getName());
+
+  private final ScheduledExecutorService executor;
+  private final Amount<Long, Time> exportInterval;
+  private final Closure<Map<String, ? extends Number>> exportSink;
+
+  private final Runnable exporter;
+
+  /**
+   * Creates a new numeric stat exporter that will export to the specified sink.
+   *
+   * @param exportSink Consumes stats.
+   * @param executor Executor to handle export thread.
+   * @param exportInterval Export period.
+   */
+  public NumericStatExporter(final Closure<Map<String, ? extends Number>> exportSink,
+      ScheduledExecutorService executor, Amount<Long, Time> exportInterval) {
+    checkNotNull(exportSink);
+    this.executor = checkNotNull(executor);
+    this.exportInterval = checkNotNull(exportInterval);
+    this.exportSink = exportSink;
+
+    exporter = new Runnable() {
+      @Override public void run() {
+          exportSink.execute(Maps.transformValues(
+            Maps.uniqueIndex(Stats.getNumericVariables(), GET_NAME), READ_STAT));
+      }
+    };
+  }
+
+  /**
+   * Starts the stat exporter.
+   *
+   * @param shutdownRegistry Shutdown hook registry to allow the exporter to cleanly halt.
+   */
+  public void start(ShutdownRegistry shutdownRegistry) {
+    long intervalSecs = exportInterval.as(Time.SECONDS);
+    executor.scheduleAtFixedRate(exporter, intervalSecs, intervalSecs, TimeUnit.SECONDS);
+
+    shutdownRegistry.addAction(new Command() {
+      @Override public void execute() {
+        stop();
+        exportSink.execute(Maps.transformValues(
+            Maps.uniqueIndex(Stats.getNumericVariables(), GET_NAME), SAMPLE_AND_READ_STAT));
+      }
+    });
+  }
+
+  /**
+   * Stops the stat exporter.  Once stopped, it may be started again by calling
+   * {@link #start(ShutdownRegistry)}.
+   */
+  public void stop() {
+    try {
+      if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+        executor.shutdownNow();
+        if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+          LOG.severe("Failed to stop stat exporter.");
+        }
+      }
+    } catch (InterruptedException e) {
+      executor.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  public static final Function<Stat<?>, String> GET_NAME = new Function<Stat<?>, String>() {
+    @Override public String apply(Stat<?> stat) {
+      return stat.getName();
+    }
+  };
+
+  public static final Function<Stat<? extends Number>, Number> READ_STAT =
+      new Function<Stat<? extends Number>, Number>() {
+        @Override public Number apply(Stat<? extends Number> stat) {
+          return stat.read();
+        }
+      };
+
+  private static final Function<RecordingStat<? extends Number>, Number> SAMPLE_AND_READ_STAT =
+      new Function<RecordingStat<? extends Number>, Number>() {
+        @Override public Number apply(RecordingStat<? extends Number> stat) {
+          return stat.sample();
+        }
+      };
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/Percentile.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/stats/Percentile.java b/commons/src/main/java/com/twitter/common/stats/Percentile.java
new file mode 100644
index 0000000..01b05df
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/stats/Percentile.java
@@ -0,0 +1,201 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.util.Sampler;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import javax.annotation.Nullable;
+
+/**
+ * A stats tracker to export percentiles of inputs based on a sampling rate.
+ *
+ * A percentile tracker will randomly sample recorded events with the given sampling rate, and
+ * will automatically register variables to track the percentiles requested.
+ * Percentiles are calculated based on the K most recent sampling windows, where each sampling
+ * window has the recorded events for a sampling period.
+ *
+ * @author William Farner
+ */
+public class Percentile<T extends Number & Comparable<T>> {
+
+  @VisibleForTesting
+  static final int MAX_BUFFER_SIZE = 10001;
+
+  private final Sampler sampler;
+
+  private final Map<Double, SampledStat<Double>> statsByPercentile;
+  @VisibleForTesting
+  final LinkedList<T> samples = Lists.newLinkedList();
+
+  private final LinkedBlockingDeque<ArrayList<T>> sampleQueue;
+  private final ArrayList<T> allSamples = new ArrayList<T>();
+
+  /**
+   * Creates a new percentile tracker.
+   *
+   * @param name The name of the value whose percentile is being tracked.
+   * @param samplePercent The percent of events to sample [0, 100].
+   * @param percentiles The percentiles to track.
+   */
+  public Percentile(String name, float samplePercent, double... percentiles) {
+    this(name, new Sampler(samplePercent), percentiles);
+  }
+
+  /**
+   * Creates a new percentile tracker.
+   *
+   * @param name The name of the value whose percentile is being tracked.
+   * @param sampler The sampler to use for selecting recorded events.
+   * @param percentiles The percentiles to track.
+   */
+  public Percentile(String name, Sampler sampler, double... percentiles) {
+    this(name, 1, sampler, percentiles);
+  }
+
+  /**
+   * Creates a new percentile tracker.
+   *
+   * A percentile tracker will randomly sample recorded events with the given sampling rate, and
+   * will automatically register variables to track the percentiles requested.
+   * When allowFlushAfterSample is set to true, once the last percentile is sampled,
+   * all recorded values are flushed in preparation for the next window; otherwise, the percentile
+   * is calculated using the moving window of the most recent values.
+   *
+   * @param name The name of the value whose percentile is being tracked.
+   * @param numSampleWindows How many sampling windows are used for calculation.
+   * @param sampler The sampler to use for selecting recorded events. You may set sampler to null
+   *        to sample all input.
+   * @param percentiles The percentiles to track.
+   */
+  public Percentile(String name, int numSampleWindows,
+      @Nullable Sampler sampler, double... percentiles) {
+    MorePreconditions.checkNotBlank(name);
+    Preconditions.checkArgument(numSampleWindows >= 1, "Must have one or more sample windows.");
+    Preconditions.checkNotNull(percentiles);
+    Preconditions.checkArgument(percentiles.length > 0, "Must specify at least one percentile.");
+
+    this.sampler = sampler;
+
+    sampleQueue = new LinkedBlockingDeque<ArrayList<T>>(numSampleWindows);
+
+    ImmutableMap.Builder<Double, SampledStat<Double>> builder =
+        new ImmutableMap.Builder<Double, SampledStat<Double>>();
+
+    for (int i = 0; i < percentiles.length; i++) {
+      boolean sortFirst = i == 0;
+      String statName = String.format("%s_%s_percentile", name, percentiles[i])
+          .replace('.', '_');
+
+      SampledStat<Double> stat = new PercentileVar(statName, percentiles[i], sortFirst);
+      Stats.export(stat);
+      builder.put(percentiles[i], stat);
+    }
+
+    statsByPercentile = builder.build();
+  }
+
+  /**
+   * Get the variables associated with this percentile tracker.
+   *
+   * @return A map from tracked percentile to the Stat corresponding to it
+   */
+  public Map<Double, ? extends Stat<?>> getPercentiles() {
+    return ImmutableMap.copyOf(statsByPercentile);
+  }
+
+  @VisibleForTesting
+  SampledStat<Double> getPercentile(double percentile) {
+    return statsByPercentile.get(percentile);
+  }
+
+  /**
+   * Records an event.
+   *
+   * @param value The value to record if it is randomly selected based on the sampling rate.
+   */
+  public void record(T value) {
+    if (sampler == null || sampler.select()) {
+      synchronized (samples) {
+        samples.addLast(value);
+
+        while (samples.size() > MAX_BUFFER_SIZE) samples.removeFirst();
+      }
+    }
+  }
+
+  private class PercentileVar extends SampledStat<Double> {
+    private final double percentile;
+    private final boolean sortFirst;
+
+    PercentileVar(String name, double percentile, boolean sortFirst) {
+      super(name, 0d);
+      this.percentile = percentile;
+      this.sortFirst = sortFirst;
+    }
+
+    @Override
+    public Double doSample() {
+      synchronized (samples) {
+        if (sortFirst) {
+          if (sampleQueue.remainingCapacity() == 0) {
+            sampleQueue.removeFirst();
+          }
+          sampleQueue.addLast(new ArrayList<T>(samples));
+          samples.clear();
+
+          allSamples.clear();
+          for (ArrayList<T> sample : sampleQueue) {
+            allSamples.addAll(sample);
+          }
+
+          Collections.sort(allSamples, Ordering.<T>natural());
+        }
+
+        if (allSamples.isEmpty()) {
+          return 0d;
+        }
+
+        int maxIndex = allSamples.size() - 1;
+        double selectIndex = maxIndex * percentile / 100;
+        selectIndex = selectIndex < 0d ? 0d : selectIndex;
+        selectIndex = selectIndex > maxIndex ? maxIndex : selectIndex;
+
+        int indexLeft = (int) selectIndex;
+        if (indexLeft == maxIndex) {
+          return allSamples.get(indexLeft).doubleValue();
+        }
+
+        double residue = selectIndex - indexLeft;
+        return allSamples.get(indexLeft).doubleValue() * (1 - residue) +
+            allSamples.get(indexLeft + 1).doubleValue() * residue;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/PipelineStats.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/stats/PipelineStats.java b/commons/src/main/java/com/twitter/common/stats/PipelineStats.java
new file mode 100644
index 0000000..63fc09c
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/stats/PipelineStats.java
@@ -0,0 +1,137 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.collections.Pair;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Tracks the latency of different pipeline stages in a process.
+ *
+ * @author William Farner
+ */
+public class PipelineStats {
+  private static final String FULL_PIPELINE_NAME = "full";
+
+  private final Time precision;
+  private final Clock clock;
+
+  private final Map<String, SlidingStats> stages;
+
+  /**
+   * Creates a new pipeline tracker with the given pipeline name and stages. The stage name "full"
+   * is reserved to represent the duration of the entire pipeline.
+   *
+   * @param pipelineName Name of the pipeline.
+   * @param stages Stage names.
+   * @param precision Precision for time interval recording.
+   */
+  public PipelineStats(String pipelineName, Set<String> stages, Time precision) {
+    this(pipelineName, stages, Clock.SYSTEM_CLOCK, precision);
+  }
+
+  @VisibleForTesting
+  PipelineStats(String pipelineName, Set<String> stages, Clock clock, Time precision) {
+    MorePreconditions.checkNotBlank(pipelineName);
+    MorePreconditions.checkNotBlank(stages);
+    Preconditions.checkArgument(!stages.contains(FULL_PIPELINE_NAME));
+
+    this.clock = Preconditions.checkNotNull(clock);
+    this.precision = Preconditions.checkNotNull(precision);
+
+    this.stages = Maps.newHashMap();
+    for (String stage : stages) {
+      this.stages.put(stage, new SlidingStats(
+          String.format("%s_%s", pipelineName, stage), precision.toString()));
+    }
+    this.stages.put(FULL_PIPELINE_NAME, new SlidingStats(
+        String.format("%s_%s", pipelineName, FULL_PIPELINE_NAME), precision.toString()));
+  }
+
+  private void record(Snapshot snapshot) {
+    for (Pair<String, Long> stage : snapshot.stages) {
+      stages.get(stage.getFirst()).accumulate(stage.getSecond());
+    }
+  }
+
+  public Snapshot newSnapshot() {
+    return new Snapshot(this);
+  }
+
+  @VisibleForTesting
+  public SlidingStats getStatsForStage(String stage) {
+    return stages.get(stage);
+  }
+
+  public class Snapshot {
+    private final List<Pair<String, Long>> stages = Lists.newLinkedList();
+    private final PipelineStats parent;
+
+    private String currentStage;
+    private long startTime;
+    private long ticker;
+
+    private Snapshot(PipelineStats parent) {
+      this.parent = parent;
+    }
+
+    /**
+     * Records the duration for the current pipeline stage, and advances to the next stage. The
+     * stage name must be one of the stages specified in the constructor.
+     *
+     * @param stageName Name of the stage to enter.
+     */
+    public void start(String stageName) {
+      record(Preconditions.checkNotNull(stageName));
+    }
+
+    private void record(String stageName) {
+      long now = Amount.of(clock.nowNanos(), Time.NANOSECONDS).as(precision);
+      if (currentStage != null) {
+        stages.add(Pair.of(currentStage, now - ticker));
+      } else {
+        startTime = now;
+      }
+
+      if (stageName == null) stages.add(Pair.of(FULL_PIPELINE_NAME, now - startTime));
+
+      ticker = now;
+      currentStage = stageName;
+    }
+
+    /**
+     * Stops the pipeline, recording the interval for the last registered stage.
+     * This is the same as calling {@link #start(String)} with {@code null};
+     *
+     */
+    public void end() {
+      record(null);
+      parent.record(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/Precision.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/stats/Precision.java b/commons/src/main/java/com/twitter/common/stats/Precision.java
new file mode 100644
index 0000000..2b088e7
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/stats/Precision.java
@@ -0,0 +1,37 @@
+package com.twitter.common.stats;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Precision expresses the maximum epsilon tolerated for a typical size of input
+ * e.g.: Precision(0.01, 1000) express that we tolerate a error of 1% for 1000 entries
+ *       it means that max difference between the real quantile and the estimate one is
+ *       error = 0.01*1000 = 10
+ *       For an entry like (1 to 1000), q(0.5) will be [490 <= x <= 510] (real q(0.5) = 500)
+ */
+public class Precision {
+  private final double epsilon;
+  private final int n;
+
+  /**
+   * Create a Precision instance representing a precision per number of entries
+   *
+   * @param epsilon is the maximum error tolerated
+   * @param n size of the data set
+   */
+  public Precision(double epsilon, int n) {
+    Preconditions.checkArgument(0.0 < epsilon, "Epsilon must be positive!");
+    Preconditions.checkArgument(1 < n, "N (expected number of elements) must be greater than 1!");
+
+    this.epsilon = epsilon;
+    this.n = n;
+  }
+
+  public double getEpsilon() {
+    return epsilon;
+  }
+
+  public int getN() {
+    return n;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/PrintableHistogram.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/stats/PrintableHistogram.java b/commons/src/main/java/com/twitter/common/stats/PrintableHistogram.java
new file mode 100644
index 0000000..a3ecebb
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/stats/PrintableHistogram.java
@@ -0,0 +1,93 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+import com.google.common.base.Preconditions;
+
+public class PrintableHistogram {
+  private double[] bucketBoundaries;
+  private int[] bucketCounts;
+  private int totalCount = 0;
+
+  /**
+   * Creates a histogram with the given bucket boundaries.  The boundaries
+   * 0 and infinity are implicitly added.
+   *
+   * @param buckets Boundaries for histogram buckets.
+   */
+  public PrintableHistogram(double ... buckets) {
+    Preconditions.checkState(buckets[0] != 0);
+
+    bucketBoundaries = new double[buckets.length + 2];
+    bucketBoundaries[0] = 0;
+    bucketCounts = new int[buckets.length + 2];
+    for (int i = 0; i < buckets.length; i++) {
+      if (i > 0) {
+        Preconditions.checkState(buckets[i] > buckets[i - 1],
+            "Bucket %f must be greater than %f.", buckets[i], buckets[i - 1]);
+      }
+      bucketCounts[i] = 0;
+      bucketBoundaries[i + 1] = buckets[i];
+    }
+
+    bucketBoundaries[bucketBoundaries.length - 1] = Integer.MAX_VALUE;
+  }
+
+  public void addValue(double value) {
+    addValue(value, 1);
+  }
+
+  public void addValue(double value, int count) {
+    Preconditions.checkState(value >= 0);
+    Preconditions.checkState(count >= 0);
+    Preconditions.checkState(bucketBoundaries.length > 1);
+    int bucketId = -1;
+    for (double boundary : bucketBoundaries) {
+      if (value <= boundary) {
+        break;
+      }
+      bucketId++;
+    }
+
+    bucketId = Math.max(0, bucketId);
+    bucketId = Math.min(bucketCounts.length - 1, bucketId);
+    bucketCounts[bucketId] += count;
+    totalCount += count;
+  }
+
+  public double getBucketRatio(int bucketId) {
+    Preconditions.checkState(bucketId >= 0);
+    Preconditions.checkState(bucketId < bucketCounts.length);
+    return (double) bucketCounts[bucketId] / totalCount;
+  }
+
+  public String toString() {
+    StringBuilder display = new StringBuilder();
+    display.append("Histogram: ");
+    for (int bucketId = 0; bucketId < bucketCounts.length - 1; bucketId++) {
+      display.append(String.format("\n(%g - %g]\n\t",
+          bucketBoundaries[bucketId], bucketBoundaries[bucketId + 1]));
+      for (int i = 0; i < getBucketRatio(bucketId) * 100; i++) {
+        display.append('#');
+      }
+      display.append(
+          String.format(" %.2g%% (%d)", getBucketRatio(bucketId) * 100, bucketCounts[bucketId]));
+    }
+
+    return display.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/Rate.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/stats/Rate.java b/commons/src/main/java/com/twitter/common/stats/Rate.java
new file mode 100644
index 0000000..f5df7e7
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/stats/Rate.java
@@ -0,0 +1,149 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Ticker;
+
+import com.twitter.common.collections.Pair;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Function to compute a windowed per-second rate of a value.
+ *
+ * @author William Farner
+ */
+public class Rate<T extends Number> extends SampledStat<Double> {
+
+  private static final int DEFAULT_WINDOW_SIZE = 1;
+  private static final double DEFAULT_SCALE_FACTOR = 1;
+  private static final long NANOS_PER_SEC = Amount.of(1L, Time.SECONDS).as(Time.NANOSECONDS);
+
+  private final Supplier<T> inputAccessor;
+  private final Ticker ticker;
+  private final double scaleFactor;
+
+  private final LinkedBlockingDeque<Pair<Long, Double>> samples;
+
+  private Rate(String name, Supplier<T> inputAccessor, int windowSize, double scaleFactor,
+      Ticker ticker) {
+    super(name, 0d);
+
+    this.inputAccessor = Preconditions.checkNotNull(inputAccessor);
+    this.ticker = Preconditions.checkNotNull(ticker);
+    samples = new LinkedBlockingDeque<Pair<Long, Double>>(windowSize);
+    Preconditions.checkArgument(scaleFactor != 0, "Scale factor must be non-zero!");
+    this.scaleFactor = scaleFactor;
+  }
+
+  public static <T extends Number> Builder<T> of(Stat<T> input) {
+    return new Builder<T>(input);
+  }
+
+  public static Builder<Long> of(String name, Supplier<Long> input) {
+    return new Builder<Long>(name, input);
+  }
+
+  public static Builder<AtomicInteger> of(String name, AtomicInteger input) {
+    return new Builder<AtomicInteger>(name, input);
+  }
+
+  public static Builder<AtomicLong> of(String name, AtomicLong input) {
+    return new Builder<AtomicLong>(name, input);
+  }
+
+  @Override
+  public Double doSample() {
+    T newSample = inputAccessor.get();
+    long newTimestamp = ticker.read();
+
+    double rate = 0;
+    if (!samples.isEmpty()) {
+      Pair<Long, Double> oldestSample = samples.peekLast();
+
+      double dy = newSample.doubleValue() - oldestSample.getSecond();
+      double dt = newTimestamp - oldestSample.getFirst();
+      rate = dt == 0 ? 0 : (NANOS_PER_SEC * scaleFactor * dy) / dt;
+    }
+
+    if (samples.remainingCapacity() == 0) samples.removeLast();
+    samples.addFirst(Pair.of(newTimestamp, newSample.doubleValue()));
+
+    return rate;
+  }
+
+  public static class Builder<T extends Number> {
+
+    private String name;
+    private int windowSize = DEFAULT_WINDOW_SIZE;
+    private double scaleFactor = DEFAULT_SCALE_FACTOR;
+    private Supplier<T> inputAccessor;
+    private Ticker ticker = Ticker.systemTicker();
+
+    Builder(String name, final T input) {
+      this.name = name;
+      inputAccessor = Suppliers.ofInstance(input);
+    }
+
+    Builder(String name, Supplier<T> input) {
+      this.name = name;
+      inputAccessor = input;
+    }
+
+    Builder(final Stat<T> input) {
+      Stats.export(input);
+      this.name = input.getName() + "_per_sec";
+      inputAccessor = new Supplier<T>() {
+        @Override public T get() { return input.read(); }
+      };
+    }
+
+    public Builder<T> withName(String name) {
+      this.name = name;
+      return this;
+    }
+
+    public Builder<T> withWindowSize(int windowSize) {
+      this.windowSize = windowSize;
+      return this;
+    }
+
+    public Builder<T> withScaleFactor(double scaleFactor) {
+      this.scaleFactor = scaleFactor;
+      return this;
+    }
+
+    @VisibleForTesting
+    Builder<T> withTicker(Ticker ticker ) {
+      this.ticker = ticker;
+      return this;
+    }
+
+    public Rate<T> build() {
+      return new Rate<T>(name, inputAccessor, windowSize, scaleFactor, ticker);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/Ratio.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/stats/Ratio.java b/commons/src/main/java/com/twitter/common/stats/Ratio.java
new file mode 100644
index 0000000..7ac7842
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/stats/Ratio.java
@@ -0,0 +1,101 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+
+/**
+ * Function to compute the ratio of two time series.
+ * The first argument is the numerator, and the second is the denominator.  Assumes that the
+ * timestamps of the two arguments are suitably synchronized (i.e. the ith point in the numerator
+ * time series corresponds with the ith point of the denominator time series).
+ *
+ * @author William Farner
+ */
+public class Ratio extends SampledStat<Double> {
+
+  private final Supplier<Number> numeratorAccessor;
+  private final Supplier<Number> denominatorAccessor;
+
+  private Ratio(String name, Supplier<Number> numeratorAccessor,
+      Supplier<Number> denominatorAccessor) {
+    super(name, 0d);
+    this.numeratorAccessor = Preconditions.checkNotNull(numeratorAccessor);
+    this.denominatorAccessor = Preconditions.checkNotNull(denominatorAccessor);
+  }
+
+  public static <T extends Number> Ratio of(Stat<T> numerator, Stat<T> denominator) {
+    Preconditions.checkNotNull(numerator);
+    Preconditions.checkNotNull(denominator);
+
+    String name = String.format("%s_per_%s", numerator.getName(), denominator.getName());
+    return Ratio.of(name, numerator, denominator);
+  }
+
+  public static <T extends Number> Ratio of(String name, final Stat<T> numerator,
+      final Stat<T> denominator) {
+    Preconditions.checkNotNull(numerator);
+    Preconditions.checkNotNull(denominator);
+
+    Stats.export(numerator);
+    Stats.export(denominator);
+
+    return new Ratio(name,
+        new Supplier<Number>() {
+          @Override public Number get() {
+            return numerator.read();
+          }
+        },
+        new Supplier<Number>() {
+          @Override public Number get() {
+            return denominator.read();
+          }
+        });
+  }
+
+  public static Ratio of(String name, final Number numerator, final Number denominator) {
+    Preconditions.checkNotNull(numerator);
+    Preconditions.checkNotNull(denominator);
+
+    return new Ratio(name,
+        new Supplier<Number>() {
+          @Override public Number get() {
+            return numerator;
+          }
+        },
+        new Supplier<Number>() {
+          @Override public Number get() {
+            return denominator;
+          }
+        });
+  }
+
+  @Override
+  public Double doSample() {
+    double numeratorValue = numeratorAccessor.get().doubleValue();
+    double denominatorValue = denominatorAccessor.get().doubleValue();
+
+    if ((denominatorValue == 0)
+        || (denominatorValue == Double.NaN)
+        || (numeratorValue == Double.NaN)) {
+      return 0d;
+    }
+
+    return numeratorValue / denominatorValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/stats/RecordingStat.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/stats/RecordingStat.java b/commons/src/main/java/com/twitter/common/stats/RecordingStat.java
new file mode 100644
index 0000000..b65b396
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/stats/RecordingStat.java
@@ -0,0 +1,36 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.stats;
+
+/**
+ * A variable that contains information about a (possibly changing) value.
+ *
+ * @author William Farner
+ */
+interface RecordingStat<T extends Number> extends Stat<T> {
+
+  /**
+   * Called by the variable sampler when a sample is being taken.  Only calls to this method should
+   * be used to store variable history.
+   *
+   * Note - if the sampling of this value depends on other variables, it is imperative that those
+   * variables values are updated first (and available via {@link Stat#read()}.
+   *
+   * @return A new sample of the variable.
+   */
+  public T sample();
+}


Mime
View raw message