metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject incubator-metron git commit: METRON-416: Provide the ability to store mergeable data structures for summarizing data on-line closes apache/incubator-metron#250
Date Wed, 14 Sep 2016 18:31:36 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master 6fb85e124 -> 5e86bc3dd


METRON-416: Provide the ability to store mergeable data structures for summarizing data on-line closes apache/incubator-metron#250


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/5e86bc3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/5e86bc3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/5e86bc3d

Branch: refs/heads/master
Commit: 5e86bc3dd661be137e88bc62cfa5b8952bf3d3ac
Parents: 6fb85e1
Author: cstella <cestella@gmail.com>
Authored: Wed Sep 14 14:31:11 2016 -0400
Committer: cstella <cestella@gmail.com>
Committed: Wed Sep 14 14:31:11 2016 -0400

----------------------------------------------------------------------
 metron-platform/metron-common/pom.xml           |   5 +
 .../dsl/functions/DataStructureFunctions.java   | 113 ++++++
 .../common/dsl/functions/StellarStatistics.java | 199 -----------
 .../functions/StellarStatisticsFunctions.java   |  80 +++--
 .../math/stats/OnlineStatisticsProvider.java    | 351 +++++++++++++++++++
 .../common/math/stats/StatisticsProvider.java   |  63 ++++
 .../math/stats/WindowedStatisticsProvider.java  | 147 ++++++++
 .../metron/common/stellar/StellarCompiler.java  |  13 +-
 .../metron/common/utils/SerializationUtils.java |  63 ++++
 .../stats/OnlineStatisticsProviderTest.java     | 176 ++++++++++
 .../metron/common/stellar/BloomFilterTest.java  |  96 +++++
 .../stellar/StellarStatisticsFunctionsTest.java | 105 ++++--
 12 files changed, 1158 insertions(+), 253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5e86bc3d/metron-platform/metron-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
index 7a1ea25..737de08 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -229,6 +229,11 @@
             <artifactId>reflections</artifactId>
             <version>0.9.10</version>
         </dependency>
+        <dependency>
+            <groupId>com.tdunning</groupId>
+            <artifactId>t-digest</artifactId>
+            <version>3.1</version>
+        </dependency>
     </dependencies>
 
     <reporting>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5e86bc3d/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DataStructureFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DataStructureFunctions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DataStructureFunctions.java
index dcfb6e8..b71391c 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DataStructureFunctions.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DataStructureFunctions.java
@@ -19,11 +19,124 @@ package org.apache.metron.common.dsl.functions;
 
 import org.apache.metron.common.dsl.BaseStellarFunction;
 import org.apache.metron.common.dsl.Stellar;
+import org.apache.metron.common.utils.BloomFilter;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.SerializationUtils;
 
 import java.util.Collection;
 import java.util.List;
 
 public class DataStructureFunctions {
+
+  @Stellar(name="ADD"
+          , namespace="BLOOM"
+          , description="Adds an element to the bloom filter passed in"
+          , params = { "bloom - The bloom filter"
+                     , "value* - The values to add"
+                     }
+          , returns = "Bloom Filter"
+          )
+  public static class BloomAdd extends BaseStellarFunction {
+
+    @Override
+    public Object apply(List<Object> args) {
+      BloomFilter<Object> filter = (BloomFilter)args.get(0);
+      for(int i = 1;i < args.size();++i) {
+        Object arg = args.get(i);
+        if(arg != null) {
+          filter.add(args.get(i));
+        }
+      }
+      return filter;
+    }
+  }
+
+  @Stellar(name="EXISTS"
+          , namespace="BLOOM"
+          , description="If the bloom filter contains the value"
+          , params = { "bloom - The bloom filter"
+                     , "value - The value to check"
+                     }
+          , returns = "True if the filter might contain the value and false otherwise"
+          )
+  public static class BloomExists extends BaseStellarFunction {
+
+    @Override
+    public Object apply(List<Object> args) {
+      if(args.size() == 0) {
+        return false;
+      }
+      BloomFilter<Object> filter = (BloomFilter)args.get(0);
+      if(args.size() > 1) {
+        Object arg = args.get(1);
+        if(arg == null) {
+          return false;
+        }
+        return filter.mightContain(arg);
+      }
+      return false;
+    }
+  }
+
+  @Stellar(name="INIT"
+         , namespace="BLOOM"
+          , description="Returns an empty bloom filter"
+          , params = { "expectedInsertions - The expected insertions"
+                     , "falsePositiveRate - The false positive rate you are willing to tolerate"
+                     }
+          , returns = "Bloom Filter"
+          )
+  public static class BloomInit extends BaseStellarFunction {
+
+    @Override
+    public Object apply(List<Object> args) {
+      int expectedInsertions = 100000;
+      float falsePositiveRate = 0.01f;
+      if(args.size() > 1) {
+        expectedInsertions = ConversionUtils.convert(args.get(0), Integer.class);
+      }
+      if(args.size() > 2) {
+        falsePositiveRate= ConversionUtils.convert(args.get(1), Float.class);
+      }
+      return new BloomFilter<>(SerializationUtils.INSTANCE, expectedInsertions, falsePositiveRate);
+    }
+  }
+
+  @Stellar( name="MERGE"
+          , namespace="BLOOM"
+          , description="Returns a merged bloom filter"
+          , params = { "bloomfilters - A list of bloom filters to merge"
+                     }
+          , returns = "Bloom Filter or null if the list is empty"
+          )
+  public static class BloomMerge extends BaseStellarFunction {
+
+    @Override
+    public Object apply(List<Object> args) {
+      if(args.size() > 0) {
+        Object firstArg = args.get(0);
+        if(firstArg instanceof List) {
+          BloomFilter ret = null;
+          for(Object bf : (List)firstArg) {
+            if(bf instanceof BloomFilter) {
+              if(ret == null) {
+                ret = (BloomFilter)bf;
+              }
+              else {
+                ret.merge((BloomFilter)bf);
+              }
+            }
+          }
+          return ret;
+        }
+        else {
+          return null;
+        }
+      }
+      return null;
+    }
+  }
+
   @Stellar(name="IS_EMPTY"
           , description="Returns true if string or collection is empty and false otherwise"
           , params = { "input - Object of string or collection type (e.g. list)"}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5e86bc3d/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/StellarStatistics.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/StellarStatistics.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/StellarStatistics.java
deleted file mode 100644
index b199c36..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/StellarStatistics.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.common.dsl.functions;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
-import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
-
-import java.io.Serializable;
-
-/**
- * Provides basic summary statistics to Stellar.
- *
- * Used as an adapter to provide a single interface to two underlying Commons
- * Math classes that provide summary statistics.
- */
-public class StellarStatistics implements Serializable {
-
-  /**
-   * DescriptiveStatistics stores a rolling window of input data elements
-   * which is then used to calculate the summary statistic.  There are some
-   * summary statistics like kurtosis and percentiles, that can only
-   * be calculated with this.
-   *
-   * This implementation is used if the windowSize > 0.
-   */
-  private DescriptiveStatistics descStats;
-
-  /**
-   * SummaryStatistics can be used for summary statistics that only
-   * require a single pass over the input data.  This has the advantage
-   * of being less memory intensive, but not all summary statistics are
-   * available.
-   *
-   * This implementation is used if the windowSize == 0.
-   */
-  private SummaryStatistics summStats;
-
-  /**
-   * @param windowSize The number of input data elements to maintain in memory.  If
-   *                   windowSize == 0, then no data elements will be maintained in
-   *                   memory.
-   */
-  public StellarStatistics(int windowSize) {
-
-    // only one of the underlying implementation classes will be used at a time
-    if(windowSize > 0) {
-      descStats = new DescriptiveStatistics(windowSize);
-    } else {
-      summStats = new SummaryStatistics();
-    }
-  }
-
-  public void addValue(double value) {
-    if(descStats != null) {
-      descStats.addValue(value);
-    } else {
-      summStats.addValue(value);
-    }
-  }
-
-  public long getCount() {
-    if(descStats != null) {
-      return descStats.getN();
-    } else {
-      return summStats.getN();
-    }
-  }
-
-  public double getMin() {
-    if(descStats != null) {
-      return descStats.getMin();
-    } else {
-      return summStats.getMin();
-    }
-  }
-
-  public double getMax() {
-    if(descStats != null) {
-      return descStats.getMax();
-    } else {
-      return summStats.getMax();
-    }
-  }
-
-  public double getMean() {
-    if(descStats != null) {
-      return descStats.getMean();
-    } else {
-      return summStats.getMean();
-    }
-  }
-
-  public double getSum() {
-    if(descStats != null) {
-      return descStats.getSum();
-    } else {
-      return summStats.getSum();
-    }
-  }
-
-  public double getVariance() {
-    if(descStats != null) {
-      return descStats.getVariance();
-    } else {
-      return summStats.getVariance();
-    }
-  }
-
-  public double getStandardDeviation() {
-    if(descStats != null) {
-      return descStats.getStandardDeviation();
-    } else {
-      return summStats.getStandardDeviation();
-    }
-  }
-
-  public double getGeometricMean() {
-    if(descStats != null) {
-      return descStats.getGeometricMean();
-    } else {
-      return summStats.getGeometricMean();
-    }
-  }
-
-  public double getPopulationVariance() {
-    if(descStats != null) {
-      return descStats.getPopulationVariance();
-    } else {
-      return summStats.getPopulationVariance();
-    }
-  }
-
-  public double getQuadraticMean() {
-    if(descStats != null) {
-      return descStats.getQuadraticMean();
-    } else {
-      return summStats.getQuadraticMean();
-    }
-  }
-
-  public double getSumLogs() {
-    if(descStats != null) {
-      throw new NotImplementedException("sum logs not available if 'windowSize' > 0");
-    } else {
-      return summStats.getSumOfLogs();
-    }
-  }
-
-  public double getSumSquares() {
-    if(descStats != null) {
-      return descStats.getSumsq();
-    } else {
-      return summStats.getSumsq();
-    }
-  }
-
-  public double getKurtosis() {
-    if(descStats != null) {
-      return descStats.getKurtosis();
-    } else {
-      throw new NotImplementedException("kurtosis not available if 'windowSize' = 0");
-    }
-  }
-
-  public double getSkewness() {
-    if(descStats != null) {
-      return descStats.getSkewness();
-    } else {
-      throw new NotImplementedException("skewness not available if 'windowSize' = 0");
-    }
-  }
-
-  public double getPercentile(double p) {
-    if(descStats != null) {
-      return descStats.getPercentile(p);
-    } else {
-      throw new NotImplementedException("percentile not available if 'windowSize' = 0");
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5e86bc3d/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/StellarStatisticsFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/StellarStatisticsFunctions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/StellarStatisticsFunctions.java
index dd01775..d45ed82 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/StellarStatisticsFunctions.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/StellarStatisticsFunctions.java
@@ -22,6 +22,9 @@ package org.apache.metron.common.dsl.functions;
 
 import org.apache.metron.common.dsl.BaseStellarFunction;
 import org.apache.metron.common.dsl.Stellar;
+import org.apache.metron.common.math.stats.OnlineStatisticsProvider;
+import org.apache.metron.common.math.stats.StatisticsProvider;
+import org.apache.metron.common.math.stats.WindowedStatisticsProvider;
 
 import java.util.Collections;
 import java.util.List;
@@ -39,13 +42,50 @@ public class StellarStatisticsFunctions {
    *
    * Initialization can occur from either STATS_INIT and STATS_ADD.
    */
-  private static StellarStatistics statsInit(List<Object> args) {
+  private static StatisticsProvider statsInit(List<Object> args) {
     int windowSize = 0;
     if(args.size() > 0 && args.get(0) instanceof Number) {
       windowSize = convert(args.get(0), Integer.class);
     }
+    if(windowSize > 0) {
+      return new WindowedStatisticsProvider(windowSize);
+    }
+    return new OnlineStatisticsProvider();
+  }
 
-    return new StellarStatistics(windowSize);
+  @Stellar( namespace="STATS"
+          , name="MERGE"
+          , description = "Merge statistic providers"
+          , params = {
+                      "statisticsProviders - A list of statistics providers"
+                      }
+          , returns = "A StatisticsProvider object"
+          )
+  public static class Merge extends BaseStellarFunction {
+    @Override
+    public Object apply(List<Object> args) {
+      if(args.size() > 0) {
+        Object firstArg = args.get(0);
+        if(firstArg instanceof List) {
+          StatisticsProvider ret = null;
+          for(Object sp : (List)firstArg) {
+            if(sp instanceof StatisticsProvider) {
+              if(ret == null) {
+                ret = (StatisticsProvider)sp;
+              }
+              else {
+                ret = ret.merge((StatisticsProvider)sp);
+              }
+            }
+          }
+          return ret;
+        }
+        else {
+          return null;
+        }
+      }
+      return null;
+    }
   }
 
   /**
@@ -67,7 +107,7 @@ public class StellarStatisticsFunctions {
                       "Using no rolling window is less memory intensive, but cannot " +
                       "calculate certain statistics like percentiles and kurtosis."
                       }
-          , returns = "A StellarStatistics object"
+          , returns = "A StatisticsProvider object"
           )
   public static class Init extends BaseStellarFunction {
     @Override
@@ -88,14 +128,14 @@ public class StellarStatisticsFunctions {
                       "stats - The Stellar statistics object.  If null, then a new one is initialized."
                      , "value+ - one or more numbers to add "
                      }
-          , returns = "A StellarStatistics object"
+          , returns = "A StatisticsProvider object"
           )
   public static class Add extends BaseStellarFunction {
     @Override
     public Object apply(List<Object> args) {
 
       // initialize a stats object, if one does not already exist
-      StellarStatistics stats = convert(args.get(0), StellarStatistics.class);
+      StatisticsProvider stats = convert(args.get(0), StatisticsProvider.class);
       if(stats == null) {
         stats = statsInit(Collections.emptyList());
       }
@@ -126,7 +166,7 @@ public class StellarStatisticsFunctions {
   public static class Mean extends BaseStellarFunction {
     @Override
     public Object apply(List<Object> args) {
-      StellarStatistics stats = convert(args.get(0), StellarStatistics.class);
+      StatisticsProvider stats = convert(args.get(0), StatisticsProvider.class);
       return (stats != null) ? stats.getMean() : Double.NaN;
     }
   }
@@ -145,7 +185,7 @@ public class StellarStatisticsFunctions {
   public static class GeometricMean extends BaseStellarFunction {
     @Override
     public Object apply(List<Object> args) {
-      StellarStatistics stats = convert(args.get(0), StellarStatistics.class);
+      StatisticsProvider stats = convert(args.get(0), StatisticsProvider.class);
       return (stats != null) ? stats.getGeometricMean() : Double.NaN;
     }
   }
@@ -164,7 +204,7 @@ public class StellarStatisticsFunctions {
   public static class Sum extends BaseStellarFunction {
     @Override
     public Object apply(List<Object> args) {
-      StellarStatistics stats = convert(args.get(0), StellarStatistics.class);
+      StatisticsProvider stats = convert(args.get(0), StatisticsProvider.class);
       return (stats != null) ? stats.getSum() : Double.NaN;
     }
   }
@@ -182,7 +222,7 @@ public class StellarStatisticsFunctions {
   public static class Max extends BaseStellarFunction {
     @Override
     public Object apply(List<Object> args) {
-      StellarStatistics stats = convert(args.get(0), StellarStatistics.class);
+      StatisticsProvider stats = convert(args.get(0), StatisticsProvider.class);
       return (stats != null) ? stats.getMax() : Double.NaN;
     }
   }
@@ -200,7 +240,7 @@ public class StellarStatisticsFunctions {
   public static class Min extends BaseStellarFunction {
     @Override
     public Object apply(List<Object> args) {
-      StellarStatistics stats = convert(args.get(0), StellarStatistics.class);
+      StatisticsProvider stats = convert(args.get(0), StatisticsProvider.class);
       return (stats != null) ? stats.getMin() : Double.NaN;
     }
   }
@@ -217,7 +257,7 @@ public class StellarStatisticsFunctions {
   public static class Count extends BaseStellarFunction {
     @Override
     public Object apply(List<Object> args) {
-      StellarStatistics stats = convert(args.get(0), StellarStatistics.class);
+      StatisticsProvider stats = convert(args.get(0), StatisticsProvider.class);
       return (stats != null) ? convert(stats.getCount(), Double.class) : Double.NaN;
     }
   }
@@ -234,7 +274,7 @@ public class StellarStatisticsFunctions {
   public static class PopulationVariance extends BaseStellarFunction {
     @Override
     public Object apply(List<Object> args) {
-      StellarStatistics stats = convert(args.get(0), StellarStatistics.class);
+      StatisticsProvider stats = convert(args.get(0), StatisticsProvider.class);
       return (stats != null) ? stats.getPopulationVariance() : Double.NaN;
     }
   }
@@ -251,7 +291,7 @@ public class StellarStatisticsFunctions {
   public static class Variance extends BaseStellarFunction {
     @Override
     public Object apply(List<Object> args) {
-      StellarStatistics stats = convert(args.get(0), StellarStatistics.class);
+      StatisticsProvider stats = convert(args.get(0), StatisticsProvider.class);
       return (stats != null) ? stats.getVariance() : Double.NaN;
     }
   }
@@ -268,7 +308,7 @@ public class StellarStatisticsFunctions {
   public static class QuadraticMean extends BaseStellarFunction {
     @Override
     public Object apply(List<Object> args) {
-      StellarStatistics stats = convert(args.get(0), StellarStatistics.class);
+      StatisticsProvider stats = convert(args.get(0), StatisticsProvider.class);
       return (stats != null) ? stats.getQuadraticMean() : Double.NaN;
     }
   }
@@ -285,7 +325,7 @@ public class StellarStatisticsFunctions {
   public static class StandardDeviation extends BaseStellarFunction {
     @Override
     public Object apply(List<Object> args) {
-      StellarStatistics stats = convert(args.get(0), StellarStatistics.class);
+      StatisticsProvider stats = convert(args.get(0), StatisticsProvider.class);
       return (stats != null) ? stats.getStandardDeviation() : Double.NaN;
     }
   }
@@ -302,7 +342,7 @@ public class StellarStatisticsFunctions {
   public static class SumLogs extends BaseStellarFunction {
     @Override
     public Object apply(List<Object> args) {
-      StellarStatistics stats = convert(args.get(0), StellarStatistics.class);
+      StatisticsProvider stats = convert(args.get(0), StatisticsProvider.class);
       return (stats != null) ? stats.getSumLogs() : Double.NaN;
     }
   }
@@ -319,7 +359,7 @@ public class StellarStatisticsFunctions {
   public static class SumSquares extends BaseStellarFunction {
     @Override
     public Object apply(List<Object> args) {
-      StellarStatistics stats = convert(args.get(0), StellarStatistics.class);
+      StatisticsProvider stats = convert(args.get(0), StatisticsProvider.class);
       return (stats != null) ? stats.getSumSquares() : Double.NaN;
     }
   }
@@ -336,7 +376,7 @@ public class StellarStatisticsFunctions {
   public static class Kurtosis extends BaseStellarFunction {
     @Override
     public Object apply(List<Object> args) {
-      StellarStatistics stats = convert(args.get(0), StellarStatistics.class);
+      StatisticsProvider stats = convert(args.get(0), StatisticsProvider.class);
       return (stats != null) ? stats.getKurtosis() : Double.NaN;
     }
   }
@@ -353,7 +393,7 @@ public class StellarStatisticsFunctions {
   public static class Skewness extends BaseStellarFunction {
     @Override
     public Object apply(List<Object> args) {
-      StellarStatistics stats = convert(args.get(0), StellarStatistics.class);
+      StatisticsProvider stats = convert(args.get(0), StatisticsProvider.class);
       return (stats != null) ? stats.getSkewness() : Double.NaN;
     }
   }
@@ -375,7 +415,7 @@ public class StellarStatisticsFunctions {
   public static class Percentile extends BaseStellarFunction {
     @Override
     public Object apply(List<Object> args) {
-      StellarStatistics stats = convert(args.get(0), StellarStatistics.class);
+      StatisticsProvider stats = convert(args.get(0), StatisticsProvider.class);
       Double p = convert(args.get(1), Double.class);
 
       Double result;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5e86bc3d/metron-platform/metron-common/src/main/java/org/apache/metron/common/math/stats/OnlineStatisticsProvider.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/math/stats/OnlineStatisticsProvider.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/math/stats/OnlineStatisticsProvider.java
new file mode 100644
index 0000000..c8a7d5c
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/math/stats/OnlineStatisticsProvider.java
@@ -0,0 +1,351 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.common.math.stats;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.tdunning.math.stats.AVLTreeDigest;
+import com.tdunning.math.stats.TDigest;
+import org.apache.commons.math3.util.FastMath;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A (near) constant memory implementation of a statistics provider.
+ * For first order statistics, simple terms are stored and composed
+ * to return the statistics results.  This is intended to provide a
+ * mergeable implementation for a statistics provider.
+ */
+public class OnlineStatisticsProvider implements StatisticsProvider, KryoSerializable {
+  /**
+   * A sensible default for compression to use in the T-Digest.
+   * As per https://github.com/tdunning/t-digest/blob/master/src/main/java/com/tdunning/math/stats/TDigest.java#L86
+   * 100 is a sensible default and the number of centroids retained (to construct the sketch)
+   * is usually a smallish (usually < 10) multiple of the compression.
+   */
+  public static final int COMPRESSION = 100;
+
+
+  /**
+   * A distributional sketch that uses a variant of 1-D k-means to construct a tree of ranges
+   * that sketches the distribution.  See https://github.com/tdunning/t-digest#t-digest for
+   * more detail.
+   */
+  private TDigest digest;
+
+  private long n = 0;
+  private double sum = 0;
+  private double sumOfSquares = 0;
+  private double sumOfLogs = 0;
+  private Double min = null;
+  private Double max = null;
+
+  //\mu_1, E[X]
+  private double M1 = 0;
+  //\mu_2: E[(X - \mu)^2]
+  private double M2 = 0;
+  //\mu_3: E[(X - \mu)^3]
+  private double M3 = 0;
+  //\mu_4: E[(X - \mu)^4]
+  private double M4 = 0;
+
+  public OnlineStatisticsProvider() {
+    digest = TDigest.createAvlTreeDigest(COMPRESSION);
+  }
+
+  /**
+   * Add a value.
+   * NOTE: This does not store the point, but only updates internal state.
+   * NOTE: This is NOT threadsafe.
+   * @param value
+   */
+  @Override
+  public void addValue(double value) {
+    long n1 = n;
+    min = min == null?value:Math.min(min, value);
+    max = max == null?value:Math.max(max, value);
+    sum += value;
+    sumOfLogs += Math.log(value);
+    sumOfSquares += value*value;
+    digest.add(value);
+    n++;
+    double delta, delta_n, delta_n2, term1;
+    //delta between the value and the mean
+    delta = value - M1;
+    //(x - E[x])/n
+    delta_n = delta / n;
+    delta_n2 = delta_n * delta_n;
+    term1 = delta * delta_n * n1;
+
+    // Adjusting expected value: See Knuth TAOCP vol 2, 3rd edition, page 232
+    M1 += delta_n;
+    // Adjusting the \mu_i, see http://www.johndcook.com/blog/skewness_kurtosis/
+    M4 += term1 * delta_n2 * (n*n - 3*n + 3) + 6 * delta_n2 * M2 - 4 * delta_n * M3;
+    M3 += term1 * delta_n * (n - 2) - 3 * delta_n * M2;
+    M2 += term1;
+    checkFlowError(sumOfSquares, sum, sumOfSquares, M1, M2, M3, M4);
+
+  }
+
+  private void checkFlowError(double sumOfSquares, double sum, double... vals) {
+    //overflow
+    for(double val : vals) {
+      if(Double.isInfinite(val)) {
+        throw new IllegalStateException("Double overflow!");
+      }
+    }
+    //underflow.  It is sufficient to check sumOfSquares because sumOfSquares is going to converge to 0 faster than sum
+    //in the situation where we're looking at an underflow.
+    if(sumOfSquares == 0.0 && sum > 0) {
+      throw new IllegalStateException("Double overflow!");
+    }
+  }
+
+  @Override
+  public long getCount() {
+    return n;
+  }
+
+  @Override
+  public double getMin() {
+    return min == null?Double.NaN:min;
+  }
+
+  @Override
+  public double getMax() {
+    return max == null?Double.NaN:max;
+  }
+
+  @Override
+  public double getMean() {
+    return getSum()/getCount();
+  }
+
+  @Override
+  public double getSum() {
+    return sum;
+  }
+
+  @Override
+  public double getVariance() {
+    return M2/(n - 1.0);
+  }
+
+  @Override
+  public double getStandardDeviation() {
+    return FastMath.sqrt(getVariance());
+  }
+
+  @Override
+  public double getGeometricMean() {
+    throw new UnsupportedOperationException("Unwilling to compute the geometric mean.");
+  }
+
+  @Override
+  public double getPopulationVariance() {
+    throw new UnsupportedOperationException("Unwilling to compute the geometric mean.");
+  }
+
+  @Override
+  public double getQuadraticMean() {
+    return FastMath.sqrt(sumOfSquares/n);
+  }
+
+  @Override
+  public double getSumLogs() {
+    return sumOfLogs;
+  }
+
+  @Override
+  public double getSumSquares() {
+    return sumOfSquares;
+  }
+
+  /**
+   * Unbiased kurtosis.
+   * See http://commons.apache.org/proper/commons-math/apidocs/org/apache/commons/math4/stat/descriptive/moment/Kurtosis.html
+   * @return
+   */
+  @Override
+  public double getKurtosis() {
+    //kurtosis = { [n(n+1) / (n -1)(n - 2)(n-3)] \mu_4 / std^4 } - [3(n-1)^2 / (n-2)(n-3)]
+    if(n < 4) {
+      return Double.NaN;
+    }
+    double std = getStandardDeviation();
+    double t1 = (1.0*n)*(n+1)/((n-1)*(n-2)*(n-3));
+    double t3 = 3.0*((n-1)*(n-1))/((n-2)*(n-3));
+    return t1*(M4/FastMath.pow(std, 4))-t3;
+  }
+
+  /**
+   * Unbiased skewness.
+   * See  http://commons.apache.org/proper/commons-math/apidocs/org/apache/commons/math4/stat/descriptive/moment/Skewness.html
+   * @return
+   */
+  @Override
+  public double getSkewness() {
+    //  skewness = [n / (n -1) (n - 2)] sum[(x_i - mean)^3] / std^3
+    if(n < 3) {
+      return Double.NaN;
+    }
+    double t1 = (1.0*n)/((n - 1)*(n-2));
+    double std = getStandardDeviation();
+    return t1*M3/FastMath.pow(std, 3);
+  }
+
+  /**
+   * This returns an approximate percentile based on a t-digest.
+   * @param p
+   * @return
+   */
+  @Override
+  public double getPercentile(double p) {
+    return digest.quantile(p/100.0);
+  }
+
+  @Override
+  public StatisticsProvider merge(StatisticsProvider provider) {
+    OnlineStatisticsProvider combined = new OnlineStatisticsProvider();
+    OnlineStatisticsProvider a = this;
+    OnlineStatisticsProvider b = (OnlineStatisticsProvider)provider;
+
+    //Combining the simple terms that obviously form a semigroup
+    combined.n = a.n + b.n;
+    combined.sum = a.sum + b.sum;
+    combined.min = Math.min(a.min, b.min);
+    combined.max = Math.max(a.max, b.max);
+    combined.sumOfSquares = a.sumOfSquares + b.sumOfSquares;
+    combined.sumOfLogs = a.sumOfLogs+ b.sumOfLogs;
+
+    // Adjusting the standardized moments, see http://www.johndcook.com/blog/skewness_kurtosis/
+    double delta = b.M1 - a.M1;
+    double delta2 = delta*delta;
+    double delta3 = delta*delta2;
+    double delta4 = delta2*delta2;
+
+    combined.M1 = (a.n*a.M1 + b.n*b.M1) / combined.n;
+
+    combined.M2 = a.M2 + b.M2 +
+            delta2 * a.n * b.n / combined.n;
+
+    combined.M3 = a.M3 + b.M3 +
+            delta3 * a.n * b.n * (a.n - b.n)/(combined.n*combined.n);
+    combined.M3 += 3.0*delta * (a.n*b.M2 - b.n*a.M2) / combined.n;
+
+    combined.M4 = a.M4 + b.M4 + delta4*a.n*b.n * (a.n*a.n - a.n*b.n + b.n*b.n) /
+            (combined.n*combined.n*combined.n);
+    combined.M4 += 6.0*delta2 * (a.n*a.n*b.M2 + b.n*b.n*a.M2)/(combined.n*combined.n) +
+            4.0*delta*(a.n*b.M3 - b.n*a.M3) / combined.n;
+
+    //Merging the distributional sketches
+    combined.digest.add(a.digest);
+    combined.digest.add(b.digest);
+    checkFlowError(combined.sumOfSquares, sum, combined.sumOfSquares, combined.M1, combined.M2, combined.M3, combined.M4);
+    return combined;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    OnlineStatisticsProvider that = (OnlineStatisticsProvider) o;
+
+    if (n != that.n) return false;
+    if (Double.compare(that.sum, sum) != 0) return false;
+    if (Double.compare(that.sumOfSquares, sumOfSquares) != 0) return false;
+    if (Double.compare(that.sumOfLogs, sumOfLogs) != 0) return false;
+    if (Double.compare(that.M1, M1) != 0) return false;
+    if (Double.compare(that.M2, M2) != 0) return false;
+    if (Double.compare(that.M3, M3) != 0) return false;
+    if (Double.compare(that.M4, M4) != 0) return false;
+    if (digest != null ? !digest.equals(that.digest) : that.digest != null) return false;
+    if (min != null ? !min.equals(that.min) : that.min != null) return false;
+    return max != null ? max.equals(that.max) : that.max == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result;
+    long temp;
+    result = digest != null ? digest.hashCode() : 0;
+    result = 31 * result + (int) (n ^ (n >>> 32));
+    temp = Double.doubleToLongBits(sum);
+    result = 31 * result + (int) (temp ^ (temp >>> 32));
+    temp = Double.doubleToLongBits(sumOfSquares);
+    result = 31 * result + (int) (temp ^ (temp >>> 32));
+    temp = Double.doubleToLongBits(sumOfLogs);
+    result = 31 * result + (int) (temp ^ (temp >>> 32));
+    result = 31 * result + (min != null ? min.hashCode() : 0);
+    result = 31 * result + (max != null ? max.hashCode() : 0);
+    temp = Double.doubleToLongBits(M1);
+    result = 31 * result + (int) (temp ^ (temp >>> 32));
+    temp = Double.doubleToLongBits(M2);
+    result = 31 * result + (int) (temp ^ (temp >>> 32));
+    temp = Double.doubleToLongBits(M3);
+    result = 31 * result + (int) (temp ^ (temp >>> 32));
+    temp = Double.doubleToLongBits(M4);
+    result = 31 * result + (int) (temp ^ (temp >>> 32));
+    return result;
+  }
+
+  @Override
+  public void write(Kryo kryo, Output output) {
+    //storing tdigest
+    ByteBuffer outBuffer = ByteBuffer.allocate(digest.byteSize());
+    digest.asBytes(outBuffer);
+    byte[] tdigestSerialized = outBuffer.array();
+    output.writeInt(tdigestSerialized.length);
+    output.writeBytes(tdigestSerialized);
+    output.writeLong(n);
+    output.writeDouble(sum);
+    output.writeDouble(sumOfSquares);
+    output.writeDouble(sumOfLogs);
+    output.writeDouble(getMin());
+    output.writeDouble(getMax());
+    output.writeDouble(M1);
+    output.writeDouble(M2);
+    output.writeDouble(M3);
+    output.writeDouble(M4);
+  }
+
+  @Override
+  public void read(Kryo kryo, Input input) {
+    int digestSize = input.readInt();
+    byte[] digestBytes = input.readBytes(digestSize);
+    ByteBuffer digestBuff = ByteBuffer.wrap(digestBytes);
+    digest = AVLTreeDigest.fromBytes(digestBuff);
+    n = input.readLong();
+    sum = input.readDouble();
+    sumOfSquares = input.readDouble();
+    sumOfLogs = input.readDouble();
+    min = input.readDouble();
+    max = input.readDouble();
+    M1 = input.readDouble();
+    M2 = input.readDouble();
+    M3 = input.readDouble();
+    M4 = input.readDouble();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5e86bc3d/metron-platform/metron-common/src/main/java/org/apache/metron/common/math/stats/StatisticsProvider.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/math/stats/StatisticsProvider.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/math/stats/StatisticsProvider.java
new file mode 100644
index 0000000..0561d60
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/math/stats/StatisticsProvider.java
@@ -0,0 +1,63 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.common.math.stats;
+
+
+/**
+ * Provides statistical functions.
+ */
+public interface StatisticsProvider {
+  void addValue(double value);
+  long getCount();
+  double getMin();
+  double getMax();
+  double getMean();
+  double getSum();
+  double getVariance();
+  double getStandardDeviation();
+  double getGeometricMean();
+  double getPopulationVariance();
+  double getQuadraticMean();
+  double getSumLogs();
+  double getSumSquares();
+
+  /**
+   * Unbiased Kurtosis.
+   * See http://commons.apache.org/proper/commons-math/apidocs/org/apache/commons/math4/stat/descriptive/moment/Kurtosis.html
+   * @return unbiased kurtosis
+   */
+  double getKurtosis();
+
+  /**
+   * Unbiased skewness.
+   * See  http://commons.apache.org/proper/commons-math/apidocs/org/apache/commons/math4/stat/descriptive/moment/Skewness.html
+   * @return
+   */
+  double getSkewness();
+
+  double getPercentile(double p);
+
+  /**
+   * Merge an existing statistics provider.
+   * @param provider The provider to merge with the current object
+   * @return A merged statistics provider.
+   */
+  StatisticsProvider merge(StatisticsProvider provider);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5e86bc3d/metron-platform/metron-common/src/main/java/org/apache/metron/common/math/stats/WindowedStatisticsProvider.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/math/stats/WindowedStatisticsProvider.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/math/stats/WindowedStatisticsProvider.java
new file mode 100644
index 0000000..fc145ac
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/math/stats/WindowedStatisticsProvider.java
@@ -0,0 +1,147 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.common.math.stats;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+
+/**
+ * Provides basic summary statistics to Stellar.
+ *
+ * Used as an adapter to provide an interface to the underlying Commons
+ * Math class that provide summary statistics for windows of data.
+ * NOTE: Windowed statistics providers cannot be merged.
+ */
+public class WindowedStatisticsProvider implements StatisticsProvider {
+
+  /**
+   * DescriptiveStatistics stores a rolling window of input data elements
+   * which is then used to calculate the summary statistic.  There are some
+   * summary statistics like kurtosis and percentiles, that can only
+   * be calculated with this.
+   *
+   * This implementation is used if the windowSize > 0.
+   */
+  private DescriptiveStatistics descStats;
+
+  public WindowedStatisticsProvider(int windowSize) {
+    descStats = new DescriptiveStatistics(windowSize);
+  }
+
+  @Override
+  public void addValue(double value) {
+    descStats.addValue(value);
+  }
+
+  @Override
+  public long getCount() {
+    return descStats.getN();
+  }
+
+  @Override
+  public double getMin() {
+    return descStats.getMin();
+  }
+
+  @Override
+  public double getMax() {
+    return descStats.getMax();
+  }
+
+  @Override
+  public double getMean() {
+    return descStats.getMean();
+  }
+
+  @Override
+  public double getSum() {
+    return descStats.getSum();
+  }
+
+  @Override
+  public double getVariance() {
+    return descStats.getVariance();
+  }
+
+  @Override
+  public double getStandardDeviation() {
+    return descStats.getStandardDeviation();
+  }
+
+  @Override
+  public double getGeometricMean() {
+    return descStats.getGeometricMean();
+  }
+
+  @Override
+  public double getPopulationVariance() {
+    return descStats.getPopulationVariance();
+  }
+
+  @Override
+  public double getQuadraticMean() {
+    return descStats.getQuadraticMean();
+  }
+
+  @Override
+  public double getSumLogs() {
+    throw new UnsupportedOperationException("sum logs not available if 'windowSize' > 0");
+  }
+
+  @Override
+  public double getSumSquares() {
+    return descStats.getSumsq();
+  }
+
+  @Override
+  public double getKurtosis() {
+    return descStats.getKurtosis();
+  }
+
+  @Override
+  public double getSkewness() {
+    return descStats.getSkewness();
+  }
+
+  @Override
+  public double getPercentile(double p) {
+    return descStats.getPercentile(p);
+  }
+
+  @Override
+  public StatisticsProvider merge(StatisticsProvider provider) {
+    throw new UnsupportedOperationException("Windowed Statistics cannot be merged.");
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    WindowedStatisticsProvider that = (WindowedStatisticsProvider) o;
+
+    return descStats != null ? descStats.equals(that.descStats) : that.descStats == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    return descStats != null ? descStats.hashCode() : 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5e86bc3d/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/StellarCompiler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/StellarCompiler.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/StellarCompiler.java
index d6c60d5..e764352 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/StellarCompiler.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/StellarCompiler.java
@@ -48,6 +48,7 @@ public class StellarCompiler extends StellarBaseListener {
   private Stack<Token> tokenStack = new Stack<>();
   private FunctionResolver functionResolver;
   private VariableResolver variableResolver;
+  private Throwable actualException = null;
 
   public StellarCompiler(VariableResolver variableResolver, FunctionResolver functionResolver, Context context) {
     this.variableResolver = variableResolver;
@@ -241,8 +242,13 @@ public class StellarCompiler extends StellarBaseListener {
 
     // fetch the args, execute, and push result onto the stack
     List<Object> args = getFunctionArguments(popStack());
-    Object result = function.apply(args, context);
-    tokenStack.push(new Token<>(result, Object.class));
+    try {
+      Object result = function.apply(args, context);
+      tokenStack.push(new Token<>(result, Object.class));
+    }
+    catch(Throwable t) {
+      actualException = t;
+    }
   }
 
   /**
@@ -418,6 +424,9 @@ public class StellarCompiler extends StellarBaseListener {
   }
 
   public Object getResult() throws ParseException {
+    if(actualException != null) {
+      throw new ParseException("Unable to execute: " +actualException.getMessage(), actualException);
+    }
     if (tokenStack.empty()) {
       throw new ParseException("Invalid predicate: Empty stack.");
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5e86bc3d/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerializationUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerializationUtils.java
new file mode 100644
index 0000000..96916af
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerializationUtils.java
@@ -0,0 +1,63 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.common.utils;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
+import java.util.function.Function;
+
+public enum SerializationUtils implements Function<Object, byte[]> {
+  INSTANCE;
+  protected static final Logger LOG = LoggerFactory.getLogger(SerializationUtils.class);
+  ThreadLocal<Kryo> kryo = new ThreadLocal<Kryo>() {
+
+    @Override
+    protected Kryo initialValue() {
+      return new Kryo();
+    }
+  };
+
+  /**
+   * Applies this function to the given argument.
+   *
+   * @param t the function argument
+   * @return the function result
+   */
+  @Override
+  public byte[] apply(Object t) {
+    try {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      Output output = new Output(bos);
+      kryo.get().writeObject(output, t);
+      output.flush();
+      byte[] ret = bos.toByteArray();
+      return ret;
+    }
+    catch(Throwable ex) {
+      LOG.error("Unable to serialize " + t + ": " + ex.getMessage(), ex);
+      throw ex;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5e86bc3d/metron-platform/metron-common/src/test/java/org/apache/metron/common/math/stats/OnlineStatisticsProviderTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/math/stats/OnlineStatisticsProviderTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/math/stats/OnlineStatisticsProviderTest.java
new file mode 100644
index 0000000..06339b7
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/math/stats/OnlineStatisticsProviderTest.java
@@ -0,0 +1,176 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.common.math.stats;
+
+import org.apache.commons.math.random.GaussianRandomGenerator;
+import org.apache.commons.math.random.MersenneTwister;
+import org.apache.commons.math.random.RandomGenerator;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class OnlineStatisticsProviderTest {
+
+  public static void validateStatisticsProvider( StatisticsProvider statsProvider
+                                               , SummaryStatistics summaryStats
+                                               , DescriptiveStatistics stats
+                                               ) {
+    //N
+    Assert.assertEquals(statsProvider.getCount(), stats.getN());
+    //sum
+    Assert.assertEquals(statsProvider.getSum(), stats.getSum(), 1e-3);
+    //sum of squares
+    Assert.assertEquals(statsProvider.getSumSquares(), stats.getSumsq(), 1e-3);
+    //sum of squares
+    Assert.assertEquals(statsProvider.getSumLogs(), summaryStats.getSumOfLogs(), 1e-3);
+    //Mean
+    Assert.assertEquals(statsProvider.getMean(), stats.getMean(), 1e-3);
+    //Quadratic Mean
+    Assert.assertEquals(statsProvider.getQuadraticMean(), summaryStats.getQuadraticMean(), 1e-3);
+    //SD
+    Assert.assertEquals(statsProvider.getStandardDeviation(), stats.getStandardDeviation(), 1e-3);
+    //Variance
+    Assert.assertEquals(statsProvider.getVariance(), stats.getVariance(), 1e-3);
+    //Min
+    Assert.assertEquals(statsProvider.getMin(), stats.getMin(), 1e-3);
+    //Max
+    Assert.assertEquals(statsProvider.getMax(), stats.getMax(), 1e-3);
+
+    //Kurtosis
+    Assert.assertEquals(stats.getKurtosis(), statsProvider.getKurtosis(), 1e-3);
+
+    //Skewness
+    Assert.assertEquals(stats.getSkewness(), statsProvider.getSkewness(), 1e-3);
+    for(double d = 10.0;d < 100.0;d+=10) {
+      //This is a sketch, so we're a bit more forgiving here in our choice of \epsilon.
+      Assert.assertEquals("Percentile mismatch for " + d +"th %ile"
+                         , statsProvider.getPercentile(d)
+                         , stats.getPercentile(d)
+                         , 1e-2
+                         );
+    }
+  }
+
+  private void validateEquality(Iterable<Double> values) {
+    DescriptiveStatistics stats = new DescriptiveStatistics();
+    SummaryStatistics summaryStats = new SummaryStatistics();
+    OnlineStatisticsProvider statsProvider = new OnlineStatisticsProvider();
+    //Test that the aggregated provider gives the same results as the provider that is shown all the data.
+    List<OnlineStatisticsProvider> providers = new ArrayList<>();
+    for(int i = 0;i < 10;++i) {
+      providers.add(new OnlineStatisticsProvider());
+    }
+    int i = 0;
+    for(double d : values) {
+      i++;
+      stats.addValue(d);
+      summaryStats.addValue(d);
+      providers.get(i % providers.size()).addValue(d);
+      statsProvider.addValue(d);
+    }
+    StatisticsProvider aggregatedProvider = providers.get(0);
+    for(int j = 1;j < providers.size();++j) {
+      aggregatedProvider = aggregatedProvider.merge(providers.get(j));
+    }
+    validateStatisticsProvider(statsProvider, summaryStats, stats);
+    validateStatisticsProvider(aggregatedProvider, summaryStats, stats);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testOverflow() {
+    OnlineStatisticsProvider statsProvider = new OnlineStatisticsProvider();
+    statsProvider.addValue(Double.MAX_VALUE + 1);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testUnderflow() {
+    OnlineStatisticsProvider statsProvider = new OnlineStatisticsProvider();
+    double d = 3e-305;
+    for(int i = 0;i < 5;++i,d/=100000) {
+      statsProvider.addValue(d);
+    }
+  }
+
+  @Test
+  public void testNormallyDistributedRandomData() {
+    List<Double> values = new ArrayList<>();
+    GaussianRandomGenerator gaussian = new GaussianRandomGenerator(new MersenneTwister(0L));
+    for(int i = 0;i < 1000000;++i) {
+      double d = gaussian.nextNormalizedDouble();
+      values.add(d);
+    }
+    validateEquality(values);
+  }
+  @Test
+  public void testNormallyDistributedRandomDataShifted() {
+    List<Double> values = new ArrayList<>();
+    GaussianRandomGenerator gaussian = new GaussianRandomGenerator(new MersenneTwister(0L));
+    for(int i = 0;i < 1000000;++i) {
+      double d = gaussian.nextNormalizedDouble() + 10;
+      values.add(d);
+    }
+    validateEquality(values);
+  }
+
+  @Test
+  public void testNormallyDistributedRandomDataShiftedBackwards() {
+    List<Double> values = new ArrayList<>();
+    GaussianRandomGenerator gaussian = new GaussianRandomGenerator(new MersenneTwister(0L));
+    for(int i = 0;i < 1000000;++i) {
+      double d = gaussian.nextNormalizedDouble() - 10;
+      values.add(d);
+    }
+    validateEquality(values);
+  }
+  @Test
+  public void testNormallyDistributedRandomDataSkewed() {
+    List<Double> values = new ArrayList<>();
+    GaussianRandomGenerator gaussian = new GaussianRandomGenerator(new MersenneTwister(0L));
+    for(int i = 0;i < 1000000;++i) {
+      double d = (gaussian.nextNormalizedDouble()+ 10000) /1000;
+      values.add(d);
+    }
+    validateEquality(values);
+  }
+
+  @Test
+  public void testNormallyDistributedRandomDataAllNegative() {
+    List<Double> values = new ArrayList<>();
+    GaussianRandomGenerator gaussian = new GaussianRandomGenerator(new MersenneTwister(0L));
+    for(int i = 0;i < 1000000;++i) {
+      double d = -1*gaussian.nextNormalizedDouble();
+      values.add(d);
+    }
+    validateEquality(values);
+  }
+  @Test
+  public void testUniformlyDistributedRandomData() {
+    List<Double> values = new ArrayList<>();
+    for(int i = 0;i < 100000;++i) {
+      double d = Math.random();
+      values.add(d);
+    }
+    validateEquality(values);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5e86bc3d/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/BloomFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/BloomFilterTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/BloomFilterTest.java
new file mode 100644
index 0000000..86da4d8
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/BloomFilterTest.java
@@ -0,0 +1,96 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.common.stellar;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.metron.common.utils.BloomFilter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.metron.common.stellar.StellarTest.run;
+
+public class BloomFilterTest {
+  private Map<String, Object> variables = new HashMap<String, Object>() {{
+    put("string", "casey");
+    put("double", 1.0);
+    put("integer", 1);
+    put("map", ImmutableMap.of("key1", "value1", "key2", "value2"));
+  }};
+
+  @Test
+  public void testMerge() {
+
+    BloomFilter bloomString = (BloomFilter)run("BLOOM_ADD(BLOOM_INIT(), string)", variables);
+    BloomFilter bloomDouble = (BloomFilter)run("BLOOM_ADD(BLOOM_INIT(), double)", variables);
+    BloomFilter bloomInteger= (BloomFilter)run("BLOOM_ADD(BLOOM_INIT(), integer)", variables);
+    BloomFilter bloomMap= (BloomFilter)run("BLOOM_ADD(BLOOM_INIT(), map)", variables);
+    BloomFilter merged = (BloomFilter)run("BLOOM_MERGE([stringFilter, doubleFilter, integerFilter, mapFilter])"
+                                         , ImmutableMap.of("stringFilter", bloomString
+                                                          ,"doubleFilter", bloomDouble
+                                                          ,"integerFilter", bloomInteger
+                                                          ,"mapFilter", bloomMap
+                                                          )
+                                         );
+    Assert.assertNotNull(merged);
+    for(Object val : variables.values()) {
+      Assert.assertTrue(merged.mightContain(val));
+    }
+  }
+
+  @Test
+  public void testAdd() {
+    BloomFilter result = (BloomFilter)run("BLOOM_ADD(BLOOM_INIT(), string, double, integer, map)", variables);
+    for(Object val : variables.values()) {
+      Assert.assertTrue(result.mightContain(val));
+    }
+    Assert.assertTrue(result.mightContain(ImmutableMap.of("key1", "value1", "key2", "value2")));
+  }
+
+  @Test
+  public void testExists() {
+    {
+      Boolean result = (Boolean) run("BLOOM_EXISTS(BLOOM_ADD(BLOOM_INIT(), string, double, integer, map), 'casey')", variables);
+      Assert.assertTrue(result);
+    }
+    {
+      Boolean result = (Boolean) run("BLOOM_EXISTS(BLOOM_ADD(BLOOM_INIT(), string, double, integer, map), double)", variables);
+      Assert.assertTrue(result);
+    }
+    {
+      Boolean result = (Boolean) run("BLOOM_EXISTS(BLOOM_ADD(BLOOM_INIT(), string, double, integer, map), integer)", variables);
+      Assert.assertTrue(result);
+    }
+    {
+      Boolean result = (Boolean) run("BLOOM_EXISTS(BLOOM_ADD(BLOOM_INIT(), string, double, integer, map), map)", variables);
+      Assert.assertTrue(result);
+    }
+    {
+      Boolean result = (Boolean) run("BLOOM_EXISTS(BLOOM_ADD(BLOOM_INIT(), string, double, integer, map), 'samantha')", variables);
+      Assert.assertFalse(result);
+    }
+    {
+      Boolean result = (Boolean) run("BLOOM_EXISTS(BLOOM_ADD(BLOOM_INIT(), string, double, integer, map), sam)", variables);
+      Assert.assertFalse(result);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5e86bc3d/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarStatisticsFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarStatisticsFunctionsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarStatisticsFunctionsTest.java
index 8c7ff70..016d2c9 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarStatisticsFunctionsTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarStatisticsFunctionsTest.java
@@ -20,21 +20,23 @@
 
 package org.apache.metron.common.stellar;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.math3.random.GaussianRandomGenerator;
+import org.apache.commons.math3.random.MersenneTwister;
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
 import org.apache.metron.common.dsl.Context;
 import org.apache.metron.common.dsl.ParseException;
 import org.apache.metron.common.dsl.StellarFunctions;
+import org.apache.metron.common.math.stats.OnlineStatisticsProviderTest;
+import org.apache.metron.common.math.stats.StatisticsProvider;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -97,10 +99,53 @@ public class StellarStatisticsFunctionsTest {
     variables.put("stats", result);
 
     // add some values
-    values = Arrays.asList(10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0);
     values.stream().forEach(val -> run(format("STATS_ADD (stats, %f)", val), variables));
   }
 
+  @Test(expected=ParseException.class)
+  public void testOverflow() throws Exception {
+   run(format("STATS_ADD(STATS_INIT(), %f)", (Double.MAX_VALUE + 1)), new HashMap<>());
+  }
+
+  @Test
+  public void testMergeProviders() throws Exception {
+    List<StatisticsProvider> providers = new ArrayList<>();
+    /*
+    Create 10 providers, each with a sample drawn from a gaussian distribution.
+    Update the reference stats from commons math to ensure we are
+     */
+    GaussianRandomGenerator gaussian = new GaussianRandomGenerator(new MersenneTwister(0L));
+    SummaryStatistics sStatistics= new SummaryStatistics();
+    DescriptiveStatistics dStatistics = new DescriptiveStatistics();
+    for(int i = 0;i < 10;++i) {
+      List<Double> sample = new ArrayList<>();
+      for(int j = 0;j < 100;++j) {
+        double s = gaussian.nextNormalizedDouble();
+        sample.add(s);
+        sStatistics.addValue(s);
+        dStatistics.addValue(s);
+      }
+      StatisticsProvider provider = (StatisticsProvider)run("STATS_ADD(STATS_INIT(), " + Joiner.on(",").join(sample) + ")"
+                                                           , new HashMap<>()
+                                                           );
+      providers.add(provider);
+    }
+
+    /*
+    Merge the providers and validate
+     */
+    Map<String, Object> providerVariables = new HashMap<>();
+    for(int i = 0;i < providers.size();++i) {
+      providerVariables.put("provider_" + i, providers.get(i));
+    }
+    StatisticsProvider mergedProvider =
+            (StatisticsProvider)run("STATS_MERGE([" + Joiner.on(",").join(providerVariables.keySet()) + "])"
+                                   , providerVariables
+                                   );
+    OnlineStatisticsProviderTest.validateStatisticsProvider(mergedProvider, sStatistics , dStatistics);
+
+  }
+
   @Test
   public void testAddManyIntegers() throws Exception {
     statsInit(windowSize);
@@ -141,9 +186,11 @@ public class StellarStatisticsFunctionsTest {
 
   @Test
   public void testGeometricMean() throws Exception {
-    statsInit(windowSize);
-    Object actual = run("STATS_GEOMETRIC_MEAN(stats)", variables);
-    assertEquals(stats.getGeometricMean(), (Double) actual, 0.1);
+    if(windowSize > 0) {
+      statsInit(windowSize);
+      Object actual = run("STATS_GEOMETRIC_MEAN(stats)", variables);
+      assertEquals(stats.getGeometricMean(), (Double) actual, 0.1);
+    }
   }
 
   @Test
@@ -183,16 +230,20 @@ public class StellarStatisticsFunctionsTest {
 
   @Test
   public void testPopulationVariance() throws Exception {
-    statsInit(windowSize);
-    Object actual = run("STATS_POPULATION_VARIANCE(stats)", variables);
-    assertEquals(stats.getPopulationVariance(), (Double) actual, 0.1);
+    if(windowSize > 0) {
+      statsInit(windowSize);
+      Object actual = run("STATS_POPULATION_VARIANCE(stats)", variables);
+      assertEquals(stats.getPopulationVariance(), (Double) actual, 0.1);
+    }
   }
 
   @Test
   public void testQuadraticMean() throws Exception {
-    statsInit(windowSize);
-    Object actual = run("STATS_QUADRATIC_MEAN(stats)", variables);
-    assertEquals(stats.getQuadraticMean(), (Double) actual, 0.1);
+    if(windowSize > 0) {
+      statsInit(windowSize);
+      Object actual = run("STATS_QUADRATIC_MEAN(stats)", variables);
+      assertEquals(stats.getQuadraticMean(), (Double) actual, 0.1);
+    }
   }
 
   @Test
@@ -215,37 +266,27 @@ public class StellarStatisticsFunctionsTest {
     assertEquals(stats.getSumsq(), (Double) actual, 0.1);
   }
 
-  @Test(expected = ParseException.class)
-  public void testKurtosisNoWindow() throws Exception {
-    statsInit(0);
-    run("STATS_KURTOSIS(stats)", variables);
-  }
-
   @Test
-  public void testKurtosisWithWindow() throws Exception {
-    statsInit(100);
+  public void testKurtosis() throws Exception {
+    statsInit(windowSize);
     Object actual = run("STATS_KURTOSIS(stats)", variables);
     assertEquals(stats.getKurtosis(), (Double) actual, 0.1);
   }
 
-  @Test(expected = ParseException.class)
-  public void testSkewnessNoWindow() throws Exception {
-    statsInit(0);
-    run("STATS_SKEWNESS(stats)", variables);
-  }
-
   @Test
-  public void testSkewnessWithWindow() throws Exception {
-    statsInit(100);
+  public void testSkewness() throws Exception {
+    statsInit(windowSize);
     Object actual = run("STATS_SKEWNESS(stats)", variables);
     assertEquals(stats.getSkewness(), (Double) actual, 0.1);
   }
 
-  @Test(expected = ParseException.class)
+
+  @Test
   public void testPercentileNoWindow() throws Exception {
     statsInit(0);
     final double percentile = 0.9;
     Object actual = run(format("STATS_PERCENTILE(stats, %f)", percentile), variables);
+    assertEquals(stats.getPercentile(percentile), (Double) actual, 1);
   }
 
   @Test


Mime
View raw message